Skip to content

Commit 91b01b5

Browse files
committed
Add blockchain.outpoint.subscribe RPC
1 parent 0f075fe commit 91b01b5

File tree

3 files changed

+200
-6
lines changed

3 files changed

+200
-6
lines changed

src/electrum.rs

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use anyhow::{bail, Context, Result};
22
use bitcoin::{
33
consensus::{deserialize, serialize},
44
hashes::hex::{FromHex, ToHex},
5-
BlockHash, Txid,
5+
BlockHash, OutPoint, Txid,
66
};
77
use crossbeam_channel::Receiver;
88
use rayon::prelude::*;
@@ -19,7 +19,7 @@ use crate::{
1919
merkle::Proof,
2020
metrics::{self, Histogram, Metrics},
2121
signals::Signal,
22-
status::ScriptHashStatus,
22+
status::{OutPointStatus, ScriptHashStatus},
2323
tracker::Tracker,
2424
types::ScriptHash,
2525
};
@@ -34,6 +34,7 @@ const UNSUBSCRIBED_QUERY_MESSAGE: &str = "your wallet uses less efficient method
3434
pub struct Client {
3535
tip: Option<BlockHash>,
3636
scripthashes: HashMap<ScriptHash, ScriptHashStatus>,
37+
outpoints: HashMap<OutPoint, OutPointStatus>,
3738
}
3839

3940
#[derive(Deserialize)]
@@ -183,7 +184,25 @@ impl Rpc {
183184
}
184185
})
185186
.collect::<Result<Vec<Value>>>()
186-
.context("failed to update status")?;
187+
.context("failed to update scripthash status")?;
188+
189+
notifications.extend(
190+
client
191+
.outpoints
192+
.par_iter_mut()
193+
.filter_map(|(outpoint, status)| -> Option<Result<Value>> {
194+
match self.tracker.update_outpoint_status(status, &self.daemon) {
195+
Ok(true) => Some(Ok(notification(
196+
"blockchain.outpoint.subscribe",
197+
&[json!([outpoint.txid, outpoint.vout]), json!(status)],
198+
))),
199+
Ok(false) => None, // outpoint status is the same
200+
Err(e) => Some(Err(e)),
201+
}
202+
})
203+
.collect::<Result<Vec<Value>>>()
204+
.context("failed to update scripthash status")?,
205+
);
187206

188207
if let Some(old_tip) = client.tip {
189208
let new_tip = self.tracker.chain().tip();
@@ -339,6 +358,28 @@ impl Rpc {
339358
})
340359
}
341360

361+
fn outpoint_subscribe(&self, client: &mut Client, (txid, vout): (Txid, u32)) -> Result<Value> {
362+
let outpoint = OutPoint::new(txid, vout);
363+
Ok(match client.outpoints.entry(outpoint) {
364+
Entry::Occupied(e) => json!(e.get()),
365+
Entry::Vacant(e) => {
366+
let outpoint = OutPoint::new(txid, vout);
367+
let mut status = OutPointStatus::new(outpoint);
368+
self.tracker
369+
.update_outpoint_status(&mut status, &self.daemon)?;
370+
json!(e.insert(status))
371+
}
372+
})
373+
}
374+
375+
fn outpoint_unsubscribe(
376+
&self,
377+
client: &mut Client,
378+
(txid, vout): (Txid, u32),
379+
) -> Result<Value> {
380+
Ok(json!(client.outpoints.remove(&OutPoint::new(txid, vout))))
381+
}
382+
342383
fn new_status(&self, scripthash: ScriptHash) -> Result<ScriptHashStatus> {
343384
let mut status = ScriptHashStatus::new(scripthash);
344385
self.tracker
@@ -518,6 +559,8 @@ impl Rpc {
518559
Params::Features => self.features(),
519560
Params::HeadersSubscribe => self.headers_subscribe(client),
520561
Params::MempoolFeeHistogram => self.get_fee_histogram(),
562+
Params::OutPointSubscribe(args) => self.outpoint_subscribe(client, *args),
563+
Params::OutPointUnsubscribe(args) => self.outpoint_unsubscribe(client, *args),
521564
Params::PeersSubscribe => Ok(json!([])),
522565
Params::Ping => Ok(Value::Null),
523566
Params::RelayFee => self.relayfee(),
@@ -540,19 +583,21 @@ enum Params {
540583
Banner,
541584
BlockHeader((usize,)),
542585
BlockHeaders((usize, usize)),
543-
TransactionBroadcast((String,)),
544586
Donation,
545587
EstimateFee((u16,)),
546588
Features,
547589
HeadersSubscribe,
548590
MempoolFeeHistogram,
591+
OutPointSubscribe((Txid, u32)), // TODO: support spk_hint
592+
OutPointUnsubscribe((Txid, u32)),
549593
PeersSubscribe,
550594
Ping,
551595
RelayFee,
552596
ScriptHashGetBalance((ScriptHash,)),
553597
ScriptHashGetHistory((ScriptHash,)),
554598
ScriptHashListUnspent((ScriptHash,)),
555599
ScriptHashSubscribe((ScriptHash,)),
600+
TransactionBroadcast((String,)),
556601
TransactionGet(TxGetArgs),
557602
TransactionGetMerkle((Txid, usize)),
558603
Version((String, Version)),
@@ -565,6 +610,8 @@ impl Params {
565610
"blockchain.block.headers" => Params::BlockHeaders(convert(params)?),
566611
"blockchain.estimatefee" => Params::EstimateFee(convert(params)?),
567612
"blockchain.headers.subscribe" => Params::HeadersSubscribe,
613+
"blockchain.outpoint.subscribe" => Params::OutPointSubscribe(convert(params)?),
614+
"blockchain.outpoint.unsubscribe" => Params::OutPointUnsubscribe(convert(params)?),
568615
"blockchain.relayfee" => Params::RelayFee,
569616
"blockchain.scripthash.get_balance" => Params::ScriptHashGetBalance(convert(params)?),
570617
"blockchain.scripthash.get_history" => Params::ScriptHashGetHistory(convert(params)?),

src/status.rs

Lines changed: 140 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use bitcoin::{
44
Amount, Block, BlockHash, OutPoint, SignedAmount, Transaction, Txid,
55
};
66
use rayon::prelude::*;
7-
use serde::ser::{Serialize, Serializer};
7+
use serde::ser::{Serialize, SerializeMap, Serializer};
88

99
use std::collections::{BTreeMap, HashMap, HashSet};
1010
use std::convert::TryFrom;
@@ -48,12 +48,26 @@ impl TxEntry {
4848
// Confirmation height of a transaction or its mempool state:
4949
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-history
5050
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-mempool
51+
#[derive(Copy, Clone, Eq, PartialEq)]
5152
enum Height {
5253
Confirmed { height: usize },
5354
Unconfirmed { has_unconfirmed_inputs: bool },
5455
}
5556

5657
impl Height {
58+
fn from_blockhash(blockhash: BlockHash, chain: &Chain) -> Self {
59+
let height = chain
60+
.get_block_height(&blockhash)
61+
.expect("missing block in chain");
62+
Self::Confirmed { height }
63+
}
64+
65+
fn unconfirmed(e: &crate::mempool::Entry) -> Self {
66+
Self::Unconfirmed {
67+
has_unconfirmed_inputs: e.has_unconfirmed_inputs,
68+
}
69+
}
70+
5771
fn as_i64(&self) -> i64 {
5872
match self {
5973
Self::Confirmed { height } => i64::try_from(*height).unwrap(),
@@ -539,6 +553,131 @@ fn filter_block_txs<T: Send>(
539553
.into_iter()
540554
}
541555

556+
pub(crate) struct OutPointStatus {
557+
outpoint: OutPoint,
558+
funding: Option<Height>,
559+
spending: Option<(Txid, Height)>,
560+
tip: BlockHash,
561+
}
562+
563+
impl Serialize for OutPointStatus {
564+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
565+
where
566+
S: Serializer,
567+
{
568+
let mut map = serializer.serialize_map(None)?;
569+
if let Some(funding) = &self.funding {
570+
map.serialize_entry("height", &funding)?;
571+
}
572+
if let Some((txid, height)) = &self.spending {
573+
map.serialize_entry("spender_txhash", &txid)?;
574+
map.serialize_entry("spender_height", &height)?;
575+
}
576+
map.end()
577+
}
578+
}
579+
580+
impl OutPointStatus {
581+
pub(crate) fn new(outpoint: OutPoint) -> Self {
582+
Self {
583+
outpoint,
584+
funding: None,
585+
spending: None,
586+
tip: BlockHash::default(),
587+
}
588+
}
589+
590+
pub(crate) fn sync(
591+
&mut self,
592+
index: &Index,
593+
mempool: &Mempool,
594+
daemon: &Daemon,
595+
) -> Result<bool> {
596+
let funding = self.sync_funding(index, daemon, mempool)?;
597+
let spending = self.sync_spending(index, daemon, mempool)?;
598+
let same_status = (self.funding == funding) && (self.spending == spending);
599+
self.funding = funding;
600+
self.spending = spending;
601+
self.tip = index.chain().tip();
602+
Ok(!same_status)
603+
}
604+
605+
/// Return true iff current tip became unconfirmed
606+
fn is_reorg(&self, chain: &Chain) -> bool {
607+
chain.get_block_height(&self.tip).is_none()
608+
}
609+
610+
fn sync_funding(
611+
&self,
612+
index: &Index,
613+
daemon: &Daemon,
614+
mempool: &Mempool,
615+
) -> Result<Option<Height>> {
616+
let chain = index.chain();
617+
if !self.is_reorg(chain) {
618+
if let Some(Height::Confirmed { .. }) = &self.funding {
619+
return Ok(self.funding);
620+
}
621+
}
622+
let mut confirmed = None;
623+
daemon.for_blocks(
624+
index.filter_by_txid(self.outpoint.txid),
625+
|blockhash, block| {
626+
if confirmed.is_none() {
627+
for tx in block.txdata {
628+
let txid = tx.txid();
629+
let output_len = u32::try_from(tx.output.len()).unwrap();
630+
if self.outpoint.txid == txid && self.outpoint.vout < output_len {
631+
confirmed = Some(Height::from_blockhash(blockhash, chain));
632+
return;
633+
}
634+
}
635+
}
636+
},
637+
)?;
638+
Ok(confirmed.or_else(|| {
639+
mempool
640+
.get(&self.outpoint.txid)
641+
.map(|entry| Height::unconfirmed(entry))
642+
}))
643+
}
644+
645+
fn sync_spending(
646+
&self,
647+
index: &Index,
648+
daemon: &Daemon,
649+
mempool: &Mempool,
650+
) -> Result<Option<(Txid, Height)>> {
651+
let chain = index.chain();
652+
if !self.is_reorg(chain) {
653+
if let Some((_, Height::Confirmed { .. })) = &self.spending {
654+
return Ok(self.spending);
655+
}
656+
}
657+
let spending_blockhashes = index.filter_by_spending(self.outpoint);
658+
let mut confirmed = None;
659+
daemon.for_blocks(spending_blockhashes, |blockhash, block| {
660+
for tx in block.txdata {
661+
for txi in &tx.input {
662+
if txi.previous_output == self.outpoint {
663+
// TODO: there should be only one spending input
664+
assert!(confirmed.is_none(), "double spend of {}", self.outpoint);
665+
confirmed = Some((tx.txid(), Height::from_blockhash(blockhash, chain)));
666+
return;
667+
}
668+
}
669+
}
670+
})?;
671+
Ok(confirmed.or_else(|| {
672+
let entries = mempool.filter_by_spending(&self.outpoint);
673+
assert!(entries.len() <= 1, "double spend of {}", self.outpoint);
674+
entries
675+
.first()
676+
.map(|entry| (entry.txid, Height::unconfirmed(entry)))
677+
}))
678+
}
679+
}
680+
542681
#[cfg(test)]
543682
mod tests {
544683
use super::HistoryEntry;

src/tracker.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::{
1111
mempool::{FeeHistogram, Mempool},
1212
metrics::Metrics,
1313
signals::ExitFlag,
14-
status::{Balance, ScriptHashStatus, UnspentEntry},
14+
status::{Balance, OutPointStatus, ScriptHashStatus, UnspentEntry},
1515
};
1616

1717
/// Electrum protocol subscriptions' tracker
@@ -93,6 +93,14 @@ impl Tracker {
9393
status.get_balance(self.chain())
9494
}
9595

96+
pub(crate) fn update_outpoint_status(
97+
&self,
98+
status: &mut OutPointStatus,
99+
daemon: &Daemon,
100+
) -> Result<bool> {
101+
status.sync(&self.index, &self.mempool, daemon)
102+
}
103+
96104
pub(crate) fn get_blockhash_by_txid(&self, txid: Txid) -> Option<BlockHash> {
97105
// Note: there are two blocks with coinbase transactions having same txid (see BIP-30)
98106
self.index.filter_by_txid(txid).next()

0 commit comments

Comments
 (0)