Skip to content

Commit a621be0

Browse files
committed
Single writer/multiple reader MMR store
1 parent 4365521 commit a621be0

File tree

8 files changed

+479
-101
lines changed

8 files changed

+479
-101
lines changed

Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,13 @@ members = ["crates/*"]
66
cairo-vm = { git = "https://github.com/maciejka/cairo-vm", rev = "19d8a07ce9799a8af9db6f8a14a8accaad900214" }
77

88
[workspace.dependencies]
9+
# Accumulators
10+
accumulators = { git = "https://github.com/maciejka/rust-accumulators", rev = "1fbc79a472f1659f0aa0213de95a914086732204", features = ["blake", "memory", "mmr"]}
11+
912
# Async runtime
1013
tokio = { version = "1.36", features = ["full"] }
1114
reqwest = { version = "0.12", features = ["json", "gzip", "brotli", "zstd"] }
15+
async-trait = "0.1"
1216

1317
# Bitcoin
1418
bitcoin = { version = "0.32.6", features = ["serde"] }
@@ -19,7 +23,7 @@ jsonrpsee = { version = "0.25.1", features = ["http-client", "async-client"] }
1923
base64 = "0.21"
2024

2125
# Storage
22-
libmdbx = "0.3"
26+
sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite"] }
2327

2428
# CLI
2529
clap = { version = "4.5", features = ["derive", "env"] }

crates/raito-bridge-node/Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ version = "0.1.0"
44
edition = "2021"
55

66
[dependencies]
7+
# Accumulators
8+
accumulators.workspace = true
79
# Core SPV functionality
810
raito-spv-mmr = { path = "../raito-spv-mmr", features = ["sqlite"] }
911
# Bitcoin client
@@ -12,14 +14,15 @@ raito-spv-client = { path = "../raito-spv-client" }
1214
raito-spv-verify = { path = "../raito-spv-verify" }
1315
# Async runtime
1416
tokio.workspace = true
17+
async-trait.workspace = true
1518
# Web framework
1619
axum = "0.7"
1720
tower-http = { version = "0.5", features = ["trace", "cors", "compression-gzip"] }
1821

1922
# Bitcoin RPC and types (re-exported from raito-spv-mmr but needed for specific features)
2023
bitcoin.workspace = true
2124
# Storage
22-
libmdbx.workspace = true
25+
sqlx.workspace = true
2326
# CLI
2427
clap.workspace = true
2528
dotenv.workspace = true

crates/raito-bridge-node/src/app.rs

Lines changed: 12 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
//! Application server and client for managing MMR accumulator operations via async message passing.
22
3-
use std::path::PathBuf;
3+
use std::{path::PathBuf, sync::Arc};
44

5+
use accumulators::hasher::stark_blake::StarkBlakeHasher;
56
use bitcoin::{block::Header as BlockHeader, Txid};
6-
use raito_bitcoin_client::BitcoinClient;
77
use tokio::sync::{broadcast, mpsc, oneshot};
88
use tracing::{error, info};
99

@@ -14,6 +14,8 @@ use raito_spv_mmr::{
1414
};
1515
use raito_spv_verify::TransactionInclusionProof;
1616

17+
use crate::mmr_store::MMRStore;
18+
1719
/// Request sent to the application server via the API channel
1820
pub struct ApiRequest {
1921
/// The body of the API request containing the specific operation
@@ -31,8 +33,6 @@ pub enum ApiRequestBody {
3133
/// Get MMR sparse roots for a given chain height (optional)
3234
/// The chain height is the number of blocks in the MMR minus one
3335
GetSparseRoots(Option<u32>),
34-
/// Add a new block header to the MMR
35-
AddBlock(BlockHeader),
3636
/// Generate an inclusion proof for a block at the given height and chain height (optional)
3737
GenerateBlockProof((u32, Option<u32>)),
3838
/// Get a Bitcoin block header by height
@@ -43,8 +43,6 @@ pub enum ApiRequestBody {
4343

4444
/// Response body for API requests containing the result data
4545
pub enum ApiResponseBody {
46-
/// Empty response
47-
Empty,
4846
/// Response containing the current block count
4947
GetBlockCount(u32),
5048
/// Response containing the sparse roots for a given block count
@@ -99,7 +97,12 @@ impl AppServer {
9997
info!("App server started");
10098

10199
// We need to specify mmr_id to have deterministic keys in the database
102-
let mut mmr = BlockMMR::from_file(&self.config.db_path, "blocks").await?;
100+
let mmr_id = Some("blocks".to_string());
101+
let store = Arc::new(
102+
MMRStore::multiple_concurrent_readers(&self.config.db_path, mmr_id.clone()).await?,
103+
);
104+
let hasher = Arc::new(StarkBlakeHasher::default());
105+
let mmr = BlockMMR::new(store, hasher, mmr_id);
103106

104107
loop {
105108
tokio::select! {
@@ -117,30 +120,9 @@ impl AppServer {
117120
let res = mmr.generate_proof(block_height, chain_height).await.map(|proof| ApiResponseBody::GenerateBlockProof(proof));
118121
req.tx_response.send(res).map_err(|_| anyhow::anyhow!("Failed to send response to GenerateBlockProof request"))?;
119122
}
120-
ApiRequestBody::AddBlock(block_header) => {
121-
// This is a local-only method, so we treat errors differently here
122-
mmr.add_block_header(&block_header).await?;
123-
let res = Ok(ApiResponseBody::Empty);
124-
req.tx_response.send(res).map_err(|_| anyhow::anyhow!("Failed to send response to AddBlock request"))?;
125-
}
126123
ApiRequestBody::GetBlockHeader(block_height) => {
127-
let res = async {
128-
let bitcoin_client = BitcoinClient::new(
129-
self.config.bitcoin_rpc_url.clone(),
130-
self.config.bitcoin_rpc_userpwd.clone(),
131-
)?;
132-
133-
let (block_header, _block_hash) = bitcoin_client
134-
.get_block_header_by_height(block_height)
135-
.await?;
136-
137-
Ok(ApiResponseBody::GetBlockHeader(block_header))
138-
}
139-
.await;
140-
141-
req.tx_response
142-
.send(res)
143-
.map_err(|_| anyhow::anyhow!("Failed to send response to GetBlockHeader request"))?;
124+
let res = mmr.get_block_headers(block_height, 1).await.map(|block_headers| ApiResponseBody::GetBlockHeader(block_headers[0]));
125+
req.tx_response.send(res).map_err(|_| anyhow::anyhow!("Failed to send response to GetBlockHeader request"))?;
144126
}
145127
ApiRequestBody::GetTransactionProof(txid) => {
146128
let res = fetch_transaction_proof(
@@ -227,17 +209,6 @@ impl AppClient {
227209
.await
228210
}
229211

230-
pub async fn add_block(&self, block_header: BlockHeader) -> Result<(), anyhow::Error> {
231-
self.send_request(
232-
ApiRequestBody::AddBlock(block_header),
233-
|response| match response {
234-
ApiResponseBody::Empty => Some(()),
235-
_ => None,
236-
},
237-
)
238-
.await
239-
}
240-
241212
pub async fn generate_block_proof(
242213
&self,
243214
block_height: u32,

crates/raito-bridge-node/src/indexer.rs

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
//! Bitcoin blockchain indexer that builds MMR accumulator and generates sparse roots for new blocks.
22
3+
use std::{path::PathBuf, sync::Arc};
4+
5+
use accumulators::hasher::stark_blake::StarkBlakeHasher;
6+
use raito_spv_mmr::block_mmr::BlockMMR;
37
use tokio::sync::broadcast;
48
use tracing::{error, info};
59

610
use raito_bitcoin_client::BitcoinClient;
711

8-
use crate::app::AppClient;
12+
use crate::mmr_store::MMRStore;
913

1014
/// Bitcoin block indexer that builds MMR accumulator and generates sparse roots
1115
pub struct Indexer {
1216
/// Indexer configuration
1317
config: IndexerConfig,
14-
/// App client
15-
app_client: AppClient,
1618
/// Shutdown signal receiver
1719
rx_shutdown: broadcast::Receiver<()>,
1820
}
@@ -23,19 +25,16 @@ pub struct IndexerConfig {
2325
pub rpc_url: String,
2426
/// Bitcoin RPC user:password (optional)
2527
pub rpc_userpwd: Option<String>,
28+
/// Path to the database storing the MMR accumulator
29+
pub mmr_db_path: PathBuf,
2630
/// Indexing lag in blocks
2731
pub indexing_lag: u32,
2832
}
2933

3034
impl Indexer {
31-
pub fn new(
32-
config: IndexerConfig,
33-
app_client: AppClient,
34-
rx_shutdown: broadcast::Receiver<()>,
35-
) -> Self {
35+
pub fn new(config: IndexerConfig, rx_shutdown: broadcast::Receiver<()>) -> Self {
3636
Self {
3737
config,
38-
app_client,
3938
rx_shutdown,
4039
}
4140
}
@@ -47,15 +46,24 @@ impl Indexer {
4746
BitcoinClient::new(self.config.rpc_url.clone(), self.config.rpc_userpwd.clone())?;
4847
info!("Bitcoin RPC client initialized");
4948

50-
let mut next_block_height = self.app_client.get_block_count().await?;
49+
// We need to specify mmr_id to have deterministic keys in the database
50+
let mmr_id = Some("blocks".to_string());
51+
let store = Arc::new(
52+
MMRStore::single_atomic_writer(&self.config.mmr_db_path, mmr_id.clone()).await?,
53+
);
54+
let hasher = Arc::new(StarkBlakeHasher::default());
55+
let mut mmr = BlockMMR::new(store, hasher, mmr_id);
56+
info!("MMR loaded from {}", self.config.mmr_db_path.display());
57+
58+
let mut next_block_height = mmr.get_block_count().await?;
5159
info!("Current MMR blocks count: {}", next_block_height);
5260

5361
loop {
5462
tokio::select! {
5563
res = bitcoin_client.wait_block_header(next_block_height, self.config.indexing_lag) => {
5664
match res {
5765
Ok((block_header, block_hash)) => {
58-
self.app_client.add_block(block_header).await?;
66+
mmr.add_block_header(next_block_height, &block_header).await?;
5967
info!("Block #{} {} processed", next_block_height, block_hash);
6068
next_block_height += 1;
6169
},

crates/raito-bridge-node/src/main.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use crate::{
1616

1717
mod app;
1818
mod indexer;
19+
mod mmr_store;
1920
mod rpc;
2021
mod shutdown;
2122

@@ -67,29 +68,30 @@ async fn main() {
6768
// Instantiating components and wiring them together
6869
let shutdown = Shutdown::default();
6970

71+
let indexer_config = IndexerConfig {
72+
rpc_url: cli.bitcoin_rpc_url.clone(),
73+
rpc_userpwd: cli.bitcoin_rpc_userpwd.clone(),
74+
mmr_db_path: cli.db_path.clone(),
75+
indexing_lag: cli.mmr_block_lag,
76+
};
77+
let mut indexer = Indexer::new(indexer_config, shutdown.subscribe());
78+
7079
let app_config = AppConfig {
71-
db_path: cli.db_path,
80+
db_path: cli.db_path.clone(),
7281
api_requests_capacity: 1000,
7382
bitcoin_rpc_url: cli.bitcoin_rpc_url.clone(),
7483
bitcoin_rpc_userpwd: cli.bitcoin_rpc_userpwd.clone(),
7584
};
7685
let (mut app_server, app_client) = create_app(app_config, shutdown.subscribe());
7786

78-
let indexer_config = IndexerConfig {
79-
rpc_url: cli.bitcoin_rpc_url.clone(),
80-
rpc_userpwd: cli.bitcoin_rpc_userpwd.clone(),
81-
indexing_lag: cli.mmr_block_lag,
82-
};
83-
let mut indexer = Indexer::new(indexer_config, app_client.clone(), shutdown.subscribe());
84-
8587
let rpc_config = RpcConfig {
8688
rpc_host: cli.rpc_host,
8789
};
8890
let rpc_server = RpcServer::new(rpc_config, app_client.clone(), shutdown.subscribe());
8991

9092
// Launching threads for each component
91-
let app_handle = tokio::spawn(async move { app_server.run().await });
9293
let indexer_handle = tokio::spawn(async move { indexer.run().await });
94+
let app_handle = tokio::spawn(async move { app_server.run().await });
9395
let rpc_handle = tokio::spawn(async move { rpc_server.run().await });
9496
let shutdown_handle = tokio::spawn(async move { shutdown.run().await });
9597

0 commit comments

Comments
 (0)