Skip to content

Commit 16bb1e3

Browse files
authored
feat: migrate block processor from the node (#30)
1 parent 0f4562a commit 16bb1e3

File tree

14 files changed

+389
-16
lines changed

14 files changed

+389
-16
lines changed

Cargo.toml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,11 @@ debug = false
3434
incremental = false
3535

3636
[workspace.dependencies]
37-
signet-blobber = { version = "0.10.0", path = "crates/blobber" }
38-
signet-db = { version = "0.10.0", path = "crates/db" }
39-
signet-node-types = { version = "0.10.0", path = "crates/node-types" }
40-
signet-rpc = { version = "0.10.0", path = "crates/rpc" }
37+
signet-blobber = { version = "0.10", path = "crates/blobber" }
38+
signet-block-processor = { version = "0.10", path = "crates/block-processor" }
39+
signet-db = { version = "0.10", path = "crates/db" }
40+
signet-node-types = { version = "0.10", path = "crates/node-types" }
41+
signet-rpc = { version = "0.10", path = "crates/rpc" }
4142

4243
init4-bin-base = { version = "0.11.0", features = ["alloy"] }
4344

crates/block-processor/Cargo.toml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
[package]
2+
name = "signet-block-processor"
3+
description = "High-level flows for Signet blob extraction and EVM invocation"
4+
version.workspace = true
5+
edition.workspace = true
6+
rust-version.workspace = true
7+
authors.workspace = true
8+
license.workspace = true
9+
homepage.workspace = true
10+
repository.workspace = true
11+
12+
[dependencies]
13+
signet-evm.workspace = true
14+
signet-extract.workspace = true
15+
signet-journal.workspace = true
16+
17+
init4-bin-base.workspace = true
18+
19+
signet-blobber.workspace = true
20+
signet-db.workspace = true
21+
signet-node-types.workspace = true
22+
23+
reth.workspace = true
24+
reth-exex.workspace = true
25+
reth-node-api.workspace = true
26+
27+
tracing.workspace = true
28+
eyre.workspace = true
29+
alloy.workspace = true
30+
signet-constants.workspace = true
31+
trevm.workspace = true
32+
reth-chainspec.workspace = true

crates/block-processor/README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Signet Block Processor
2+
3+
Block processing logic for the Signet Node. This crate takes a reth `Chain`,
4+
runs the Signet EVM, and commits the results to a database.
5+
6+
# Significant Types
7+
8+
- A few convenience type aliases:
9+
- `PrimitivesOf<Host>` - The primitives type used by the host.
10+
- `Chain<Host>` - A reth `Chain` using the host's primitives.
11+
- `ExExNotification<Host>` - A reth `ExExNotification` using the host's
12+
primitives.
13+
- `SignetBlockProcessorV1<Host, Db>` - The first version of the block processor.

crates/block-processor/src/lib.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#![doc = include_str!("../README.md")]
2+
#![warn(
3+
missing_copy_implementations,
4+
missing_debug_implementations,
5+
missing_docs,
6+
unreachable_pub,
7+
clippy::missing_const_for_fn,
8+
rustdoc::all
9+
)]
10+
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
11+
#![deny(unused_must_use, rust_2018_idioms)]
12+
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
13+
14+
mod v1;
15+
pub use v1::SignetBlockProcessor as SignetBlockProcessorV1;
16+
17+
/// Primitives used by the host.
18+
pub type PrimitivesOf<Host> =
19+
<<Host as reth_node_api::FullNodeTypes>::Types as reth_node_api::NodeTypes>::Primitives;
20+
21+
/// A [`reth::providers::Chain`] using the host primitives.
22+
pub type Chain<Host> = reth::providers::Chain<PrimitivesOf<Host>>;
23+
24+
/// A [`reth_exex::ExExNotification`] using the host primitives.
25+
pub type ExExNotification<Host> = reth_exex::ExExNotification<PrimitivesOf<Host>>;
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
mod processor;
2+
pub use processor::SignetBlockProcessor;
Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
use crate::Chain;
2+
use alloy::{consensus::BlockHeader, primitives::B256};
3+
use eyre::ContextCompat;
4+
use init4_bin_base::utils::calc::SlotCalculator;
5+
use reth::{
6+
primitives::EthPrimitives,
7+
providers::{BlockNumReader, BlockReader, ExecutionOutcome, HeaderProvider, ProviderFactory},
8+
revm::{database::StateProviderDatabase, db::StateBuilder},
9+
};
10+
use reth_chainspec::{ChainSpec, EthereumHardforks};
11+
use reth_node_api::{FullNodeComponents, NodeTypes};
12+
use signet_blobber::{CacheHandle, ExtractableChainShim};
13+
use signet_constants::SignetSystemConstants;
14+
use signet_db::{DataCompat, DbProviderExt, RuChain, RuRevmState, RuWriter};
15+
use signet_evm::{BlockResult, EvmNeedsCfg, SignetDriver};
16+
use signet_extract::{Extractor, Extracts};
17+
use signet_journal::HostJournal;
18+
use signet_node_types::{NodeTypesDbTrait, SignetNodeTypes};
19+
use std::collections::VecDeque;
20+
use std::sync::Arc;
21+
use tracing::{Instrument, debug, error, info, info_span, instrument};
22+
use trevm::revm::primitives::hardfork::SpecId;
23+
24+
/// A block processor that listens to host chain commits and processes
25+
/// Signet blocks accordingly.
26+
#[derive(Debug)]
27+
pub struct SignetBlockProcessor<Db>
28+
where
29+
Db: NodeTypesDbTrait,
30+
{
31+
/// Signet System Constants
32+
constants: SignetSystemConstants,
33+
34+
/// The chain specification, used to determine active hardforks.
35+
chain_spec: Arc<ChainSpec>,
36+
37+
/// A [`ProviderFactory`] instance to allow RU database access.
38+
ru_provider: ProviderFactory<SignetNodeTypes<Db>>,
39+
40+
/// The slot calculator.
41+
slot_calculator: SlotCalculator,
42+
43+
/// A handle to the blob cacher.
44+
blob_cacher: CacheHandle,
45+
}
46+
47+
impl<Db> SignetBlockProcessor<Db>
48+
where
49+
Db: NodeTypesDbTrait,
50+
{
51+
/// Create a new [`SignetBlockProcessor`].
52+
pub const fn new(
53+
constants: SignetSystemConstants,
54+
chain_spec: Arc<ChainSpec>,
55+
ru_provider: ProviderFactory<SignetNodeTypes<Db>>,
56+
slot_calculator: SlotCalculator,
57+
blob_cacher: CacheHandle,
58+
) -> Self {
59+
Self { constants, chain_spec, ru_provider, slot_calculator, blob_cacher }
60+
}
61+
62+
/// Get the active spec id at the given timestamp.
63+
fn spec_id(&self, timestamp: u64) -> SpecId {
64+
if self.chain_spec.is_prague_active_at_timestamp(timestamp) {
65+
SpecId::PRAGUE
66+
} else {
67+
SpecId::CANCUN
68+
}
69+
}
70+
71+
/// Make a [`StateProviderDatabase`] from the read-write provider, suitable
72+
/// for use with Trevm.
73+
fn state_provider_database(&self, height: u64) -> eyre::Result<RuRevmState> {
74+
// Get the state provider for the block number
75+
let sp = self.ru_provider.history_by_block_number(height)?;
76+
77+
// Wrap in Revm comatibility layer
78+
let spd = StateProviderDatabase::new(sp);
79+
let builder = StateBuilder::new_with_database(spd);
80+
81+
Ok(builder.with_bundle_update().build())
82+
}
83+
84+
/// Make a new Trevm instance, building on the given height.
85+
fn trevm(&self, parent_height: u64, spec_id: SpecId) -> eyre::Result<EvmNeedsCfg<RuRevmState>> {
86+
let db = self.state_provider_database(parent_height)?;
87+
88+
let mut trevm = signet_evm::signet_evm(db, self.constants.clone());
89+
90+
trevm.set_spec_id(spec_id);
91+
92+
Ok(trevm)
93+
}
94+
95+
/// Called when the host chain has committed a block or set of blocks.
96+
#[instrument(skip_all, fields(count = chain.len(), first = chain.first().number(), tip = chain.tip().number()))]
97+
pub async fn on_host_commit<Host>(&self, chain: &Chain<Host>) -> eyre::Result<Option<RuChain>>
98+
where
99+
Host: FullNodeComponents,
100+
Host::Types: NodeTypes<Primitives = EthPrimitives>,
101+
{
102+
let highest = chain.tip().number();
103+
if highest < self.constants.host_deploy_height() {
104+
return Ok(None);
105+
}
106+
107+
// this should never happen but we want to handle it anyway
108+
if chain.is_empty() {
109+
return Ok(None);
110+
}
111+
112+
let extractor = Extractor::new(self.constants.clone());
113+
let shim = ExtractableChainShim::new(chain);
114+
let outputs = extractor.extract_signet(&shim);
115+
116+
// TODO: ENG-481 Inherit prune modes from Reth configuration.
117+
// https://linear.app/initiates/issue/ENG-481/inherit-prune-modes-from-reth-node
118+
119+
// The extractor will filter out blocks at or before the deployment
120+
// height, so we don't need compute the start from the notification.
121+
let mut start = None;
122+
let mut current = 0;
123+
let mut prev_block_journal = self.ru_provider.provider_rw()?.latest_journal_hash()?;
124+
125+
let mut net_outcome = ExecutionOutcome::default();
126+
let last_ru_height = self.ru_provider.last_block_number()?;
127+
128+
// There might be a case where we can get a notification that starts
129+
// "lower" than our last processed block,
130+
// but contains new information beyond one point. In this case, we
131+
// should simply skip the block.
132+
for block_extracts in outputs.skip_while(|extract| extract.ru_height <= last_ru_height) {
133+
// If we haven't set the start yet, set it to the first block.
134+
if start.is_none() {
135+
let new_ru_height = block_extracts.ru_height;
136+
137+
// If the above condition passes, we should always be
138+
// committing without skipping a range of blocks.
139+
if new_ru_height != last_ru_height + 1 {
140+
error!(
141+
%new_ru_height,
142+
%last_ru_height,
143+
"missing range of DB blocks"
144+
);
145+
eyre::bail!("missing range of DB blocks");
146+
}
147+
start = Some(new_ru_height);
148+
}
149+
current = block_extracts.ru_height;
150+
let spec_id = self.spec_id(block_extracts.host_block.timestamp());
151+
152+
let span = info_span!(
153+
"signet::handle_zenith_outputs::block_processing",
154+
start = start.unwrap(),
155+
ru_height = block_extracts.ru_height,
156+
host_height = block_extracts.host_block.number(),
157+
has_ru_block = block_extracts.submitted.is_some(),
158+
);
159+
160+
tracing::trace!("Running EVM");
161+
let block_result = self.run_evm(&block_extracts, spec_id).instrument(span).await?;
162+
tracing::trace!("Committing EVM results");
163+
let journal =
164+
self.commit_evm_results(&block_extracts, &block_result, prev_block_journal)?;
165+
166+
prev_block_journal = journal.journal_hash();
167+
net_outcome.extend(block_result.execution_outcome.convert());
168+
}
169+
info!("committed blocks");
170+
171+
// If we didn't process any blocks, we don't need to return anything.
172+
// In practice, this should never happen, as we should always have at
173+
// least one block to process.
174+
if start.is_none() {
175+
return Ok(None);
176+
}
177+
let start = start.expect("checked by early return");
178+
179+
// Return the range of blocks we processed
180+
let provider = self.ru_provider.provider_rw()?;
181+
182+
let ru_info = provider.get_extraction_results(start..=current)?;
183+
184+
let inner =
185+
Chain::<Host>::new(provider.recovered_block_range(start..=current)?, net_outcome, None);
186+
187+
Ok(Some(RuChain { inner, ru_info }))
188+
}
189+
190+
/// ==========================
191+
/// ==========================
192+
/// ██████ ██ ██ ███ ██
193+
/// ██ ██ ██ ██ ████ ██
194+
/// ██████ ██ ██ ██ ██ ██
195+
/// ██ ██ ██ ██ ██ ██ ██
196+
/// ██ ██ ██████ ██ ████
197+
///
198+
///
199+
/// ███████ ██ ██ ███ ███
200+
/// ██ ██ ██ ████ ████
201+
/// █████ ██ ██ ██ ████ ██
202+
/// ██ ██ ██ ██ ██ ██
203+
/// ███████ ████ ██ ██
204+
/// ===========================
205+
/// ===========================
206+
async fn run_evm(
207+
&self,
208+
block_extracts: &Extracts<'_, ExtractableChainShim<'_>>,
209+
spec_id: SpecId,
210+
) -> eyre::Result<BlockResult> {
211+
let ru_height = block_extracts.ru_height;
212+
let host_height = block_extracts.host_block.number();
213+
let timestamp = block_extracts.host_block.timestamp();
214+
215+
let parent_header = self
216+
.ru_provider
217+
.sealed_header(block_extracts.ru_height.saturating_sub(1))?
218+
.wrap_err("parent ru block not present in DB")
219+
.inspect_err(|e| error!(%e))?;
220+
221+
let slot = self.slot_calculator.slot_ending_at(timestamp).expect("host chain has started");
222+
223+
let txns = match &block_extracts.submitted {
224+
Some(submitted) => {
225+
self.blob_cacher
226+
.signet_block(block_extracts.host_block.number(), slot, submitted)
227+
.await
228+
.map(|block| block.into_parts().1)
229+
.unwrap_or_default()
230+
.into_iter()
231+
.filter(|tx| !tx.is_eip4844()) // redundant, but let's be sure
232+
.map(|tx| tx.into())
233+
.collect::<VecDeque<_>>()
234+
}
235+
None => VecDeque::new(),
236+
};
237+
238+
let mut driver = SignetDriver::new(
239+
block_extracts,
240+
txns,
241+
parent_header.convert(),
242+
self.constants.clone(),
243+
);
244+
245+
let trevm = self.trevm(driver.parent().number(), spec_id)?.fill_cfg(&driver);
246+
247+
let trevm = match trevm.drive_block(&mut driver) {
248+
Ok(t) => t,
249+
Err(e) => return Err(e.into_error().into()),
250+
};
251+
252+
let (sealed_block, receipts) = driver.finish();
253+
let bundle = trevm.finish();
254+
255+
Ok(BlockResult {
256+
sealed_block,
257+
execution_outcome: signet_evm::ExecutionOutcome::new(bundle, vec![receipts], ru_height),
258+
host_height,
259+
})
260+
}
261+
262+
/// Commit the outputs of a zenith block to the database.
263+
#[instrument(skip_all)]
264+
fn commit_evm_results<'a>(
265+
&self,
266+
extracts: &Extracts<'_, ExtractableChainShim<'_>>,
267+
block_result: &'a BlockResult,
268+
prev_block_journal: B256,
269+
) -> eyre::Result<HostJournal<'a>> {
270+
let journal = block_result.make_host_journal(prev_block_journal);
271+
let time = std::time::Instant::now();
272+
let jh = journal.journal_hash();
273+
274+
debug!(
275+
target: "signet::journal::serialize",
276+
bytes = journal.serialized().len(),
277+
hash = %jh,
278+
elapsed_micros = %time.elapsed().as_micros(),
279+
"journal produced"
280+
);
281+
282+
self.ru_provider.provider_rw()?.update(|writer| {
283+
// add execution results to database
284+
writer.append_host_block(
285+
extracts.ru_header(),
286+
extracts.transacts().cloned(),
287+
extracts.enters(),
288+
extracts.enter_tokens(),
289+
block_result,
290+
jh,
291+
)?;
292+
Ok(())
293+
})?;
294+
Ok(journal)
295+
}
296+
}

crates/db/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ signet-journal.workspace = true
1616
signet-types.workspace = true
1717
signet-zenith.workspace = true
1818

19+
trevm.workspace = true
20+
1921
alloy.workspace = true
2022

2123
reth.workspace = true

0 commit comments

Comments
 (0)