Skip to content

Commit 70025e4

Browse files
committed
Add blockchain.outpoint.subscribe RPC
1 parent 86488f6 commit 70025e4

File tree

3 files changed

+200
-6
lines changed

3 files changed

+200
-6
lines changed

src/electrum.rs

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use anyhow::{bail, Context, Result};
22
use bitcoin::{
33
consensus::{deserialize, serialize},
44
hashes::hex::{FromHex, ToHex},
5-
BlockHash, Txid,
5+
BlockHash, OutPoint, Txid,
66
};
77
use crossbeam_channel::Receiver;
88
use rayon::prelude::*;
@@ -19,7 +19,7 @@ use crate::{
1919
merkle::Proof,
2020
metrics::{self, Histogram, Metrics},
2121
signals::Signal,
22-
status::ScriptHashStatus,
22+
status::{OutPointStatus, ScriptHashStatus},
2323
tracker::Tracker,
2424
types::ScriptHash,
2525
};
@@ -32,6 +32,7 @@ const UNKNOWN_FEE: isize = -1; // (allowed by Electrum protocol)
3232
pub struct Client {
3333
tip: Option<BlockHash>,
3434
scripthashes: HashMap<ScriptHash, ScriptHashStatus>,
35+
outpoints: HashMap<OutPoint, OutPointStatus>,
3536
}
3637

3738
#[derive(Deserialize)]
@@ -176,7 +177,25 @@ impl Rpc {
176177
}
177178
})
178179
.collect::<Result<Vec<Value>>>()
179-
.context("failed to update status")?;
180+
.context("failed to update scripthash status")?;
181+
182+
notifications.extend(
183+
client
184+
.outpoints
185+
.par_iter_mut()
186+
.filter_map(|(outpoint, status)| -> Option<Result<Value>> {
187+
match self.tracker.update_outpoint_status(status, &self.daemon) {
188+
Ok(true) => Some(Ok(notification(
189+
"blockchain.outpoint.subscribe",
190+
&[json!([outpoint.txid, outpoint.vout]), json!(status)],
191+
))),
192+
Ok(false) => None, // outpoint status is the same
193+
Err(e) => Some(Err(e)),
194+
}
195+
})
196+
.collect::<Result<Vec<Value>>>()
197+
.context("failed to update scripthash status")?,
198+
);
180199

181200
if let Some(old_tip) = client.tip {
182201
let new_tip = self.tracker.chain().tip();
@@ -332,6 +351,28 @@ impl Rpc {
332351
})
333352
}
334353

354+
fn outpoint_subscribe(&self, client: &mut Client, (txid, vout): (Txid, u32)) -> Result<Value> {
355+
let outpoint = OutPoint::new(txid, vout);
356+
Ok(match client.outpoints.entry(outpoint) {
357+
Entry::Occupied(e) => json!(e.get()),
358+
Entry::Vacant(e) => {
359+
let outpoint = OutPoint::new(txid, vout);
360+
let mut status = OutPointStatus::new(outpoint);
361+
self.tracker
362+
.update_outpoint_status(&mut status, &self.daemon)?;
363+
json!(e.insert(status))
364+
}
365+
})
366+
}
367+
368+
fn outpoint_unsubscribe(
369+
&self,
370+
client: &mut Client,
371+
(txid, vout): (Txid, u32),
372+
) -> Result<Value> {
373+
Ok(json!(client.outpoints.remove(&OutPoint::new(txid, vout))))
374+
}
375+
335376
fn new_status(&self, scripthash: ScriptHash) -> Result<ScriptHashStatus> {
336377
let mut status = ScriptHashStatus::new(scripthash);
337378
self.tracker
@@ -505,6 +546,8 @@ impl Rpc {
505546
Params::Features => self.features(),
506547
Params::HeadersSubscribe => self.headers_subscribe(client),
507548
Params::MempoolFeeHistogram => self.get_fee_histogram(),
549+
Params::OutPointSubscribe(args) => self.outpoint_subscribe(client, *args),
550+
Params::OutPointUnsubscribe(args) => self.outpoint_unsubscribe(client, *args),
508551
Params::PeersSubscribe => Ok(json!([])),
509552
Params::Ping => Ok(Value::Null),
510553
Params::RelayFee => self.relayfee(),
@@ -527,19 +570,21 @@ enum Params {
527570
Banner,
528571
BlockHeader((usize,)),
529572
BlockHeaders((usize, usize)),
530-
TransactionBroadcast((String,)),
531573
Donation,
532574
EstimateFee((u16,)),
533575
Features,
534576
HeadersSubscribe,
535577
MempoolFeeHistogram,
578+
OutPointSubscribe((Txid, u32)), // TODO: support spk_hint
579+
OutPointUnsubscribe((Txid, u32)),
536580
PeersSubscribe,
537581
Ping,
538582
RelayFee,
539583
ScriptHashGetBalance((ScriptHash,)),
540584
ScriptHashGetHistory((ScriptHash,)),
541585
ScriptHashListUnspent((ScriptHash,)),
542586
ScriptHashSubscribe((ScriptHash,)),
587+
TransactionBroadcast((String,)),
543588
TransactionGet(TxGetArgs),
544589
TransactionGetMerkle((Txid, usize)),
545590
Version((String, Version)),
@@ -552,6 +597,8 @@ impl Params {
552597
"blockchain.block.headers" => Params::BlockHeaders(convert(params)?),
553598
"blockchain.estimatefee" => Params::EstimateFee(convert(params)?),
554599
"blockchain.headers.subscribe" => Params::HeadersSubscribe,
600+
"blockchain.outpoint.subscribe" => Params::OutPointSubscribe(convert(params)?),
601+
"blockchain.outpoint.unsubscribe" => Params::OutPointUnsubscribe(convert(params)?),
555602
"blockchain.relayfee" => Params::RelayFee,
556603
"blockchain.scripthash.get_balance" => Params::ScriptHashGetBalance(convert(params)?),
557604
"blockchain.scripthash.get_history" => Params::ScriptHashGetHistory(convert(params)?),

src/status.rs

Lines changed: 140 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use bitcoin::{
44
Amount, Block, BlockHash, OutPoint, SignedAmount, Transaction, Txid,
55
};
66
use rayon::prelude::*;
7-
use serde::ser::{Serialize, Serializer};
7+
use serde::ser::{Serialize, SerializeMap, Serializer};
88

99
use std::collections::{BTreeMap, HashMap, HashSet};
1010
use std::convert::TryFrom;
@@ -49,12 +49,26 @@ impl TxEntry {
4949
// Confirmation height of a transaction or its mempool state:
5050
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-history
5151
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-mempool
52+
#[derive(Copy, Clone, Eq, PartialEq)]
5253
enum Height {
5354
Confirmed { height: usize },
5455
Unconfirmed { has_unconfirmed_inputs: bool },
5556
}
5657

5758
impl Height {
59+
fn from_blockhash(blockhash: BlockHash, chain: &Chain) -> Self {
60+
let height = chain
61+
.get_block_height(&blockhash)
62+
.expect("missing block in chain");
63+
Self::Confirmed { height }
64+
}
65+
66+
fn unconfirmed(e: &crate::mempool::Entry) -> Self {
67+
Self::Unconfirmed {
68+
has_unconfirmed_inputs: e.has_unconfirmed_inputs,
69+
}
70+
}
71+
5872
fn as_i64(&self) -> i64 {
5973
match self {
6074
Self::Confirmed { height } => i64::try_from(*height).unwrap(),
@@ -511,6 +525,131 @@ fn compute_status_hash(history: &[HistoryEntry]) -> Option<StatusHash> {
511525
Some(StatusHash::from_engine(engine))
512526
}
513527

528+
pub(crate) struct OutPointStatus {
529+
outpoint: OutPoint,
530+
funding: Option<Height>,
531+
spending: Option<(Txid, Height)>,
532+
tip: BlockHash,
533+
}
534+
535+
impl Serialize for OutPointStatus {
536+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
537+
where
538+
S: Serializer,
539+
{
540+
let mut map = serializer.serialize_map(None)?;
541+
if let Some(funding) = &self.funding {
542+
map.serialize_entry("height", &funding)?;
543+
}
544+
if let Some((txid, height)) = &self.spending {
545+
map.serialize_entry("spender_txhash", &txid)?;
546+
map.serialize_entry("spender_height", &height)?;
547+
}
548+
map.end()
549+
}
550+
}
551+
552+
impl OutPointStatus {
553+
pub(crate) fn new(outpoint: OutPoint) -> Self {
554+
Self {
555+
outpoint,
556+
funding: None,
557+
spending: None,
558+
tip: BlockHash::default(),
559+
}
560+
}
561+
562+
pub(crate) fn sync(
563+
&mut self,
564+
index: &Index,
565+
mempool: &Mempool,
566+
daemon: &Daemon,
567+
) -> Result<bool> {
568+
let funding = self.sync_funding(index, daemon, mempool)?;
569+
let spending = self.sync_spending(index, daemon, mempool)?;
570+
let same_status = (self.funding == funding) && (self.spending == spending);
571+
self.funding = funding;
572+
self.spending = spending;
573+
self.tip = index.chain().tip();
574+
Ok(!same_status)
575+
}
576+
577+
/// Return true iff current tip became unconfirmed
578+
fn is_reorg(&self, chain: &Chain) -> bool {
579+
chain.get_block_height(&self.tip).is_none()
580+
}
581+
582+
fn sync_funding(
583+
&self,
584+
index: &Index,
585+
daemon: &Daemon,
586+
mempool: &Mempool,
587+
) -> Result<Option<Height>> {
588+
let chain = index.chain();
589+
if !self.is_reorg(chain) {
590+
if let Some(Height::Confirmed { .. }) = &self.funding {
591+
return Ok(self.funding);
592+
}
593+
}
594+
let mut confirmed = None;
595+
daemon.for_blocks(
596+
index.filter_by_txid(self.outpoint.txid),
597+
|blockhash, block| {
598+
if confirmed.is_none() {
599+
for tx in block.txdata {
600+
let txid = tx.txid();
601+
let output_len = u32::try_from(tx.output.len()).unwrap();
602+
if self.outpoint.txid == txid && self.outpoint.vout < output_len {
603+
confirmed = Some(Height::from_blockhash(blockhash, chain));
604+
return;
605+
}
606+
}
607+
}
608+
},
609+
)?;
610+
Ok(confirmed.or_else(|| {
611+
mempool
612+
.get(&self.outpoint.txid)
613+
.map(|entry| Height::unconfirmed(entry))
614+
}))
615+
}
616+
617+
fn sync_spending(
618+
&self,
619+
index: &Index,
620+
daemon: &Daemon,
621+
mempool: &Mempool,
622+
) -> Result<Option<(Txid, Height)>> {
623+
let chain = index.chain();
624+
if !self.is_reorg(chain) {
625+
if let Some((_, Height::Confirmed { .. })) = &self.spending {
626+
return Ok(self.spending);
627+
}
628+
}
629+
let spending_blockhashes = index.filter_by_spending(self.outpoint);
630+
let mut confirmed = None;
631+
daemon.for_blocks(spending_blockhashes, |blockhash, block| {
632+
for tx in block.txdata {
633+
for txi in &tx.input {
634+
if txi.previous_output == self.outpoint {
635+
// TODO: there should be only one spending input
636+
assert!(confirmed.is_none(), "double spend of {}", self.outpoint);
637+
confirmed = Some((tx.txid(), Height::from_blockhash(blockhash, chain)));
638+
return;
639+
}
640+
}
641+
}
642+
})?;
643+
Ok(confirmed.or_else(|| {
644+
let entries = mempool.filter_by_spending(&self.outpoint);
645+
assert!(entries.len() <= 1, "double spend of {}", self.outpoint);
646+
entries
647+
.first()
648+
.map(|entry| (entry.txid, Height::unconfirmed(entry)))
649+
}))
650+
}
651+
}
652+
514653
#[cfg(test)]
515654
mod tests {
516655
use super::HistoryEntry;

src/tracker.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::{
1111
mempool::{FeeHistogram, Mempool},
1212
metrics::Metrics,
1313
signals::ExitFlag,
14-
status::{Balance, ScriptHashStatus, UnspentEntry},
14+
status::{Balance, OutPointStatus, ScriptHashStatus, UnspentEntry},
1515
};
1616

1717
/// Electrum protocol subscriptions' tracker
@@ -82,6 +82,14 @@ impl Tracker {
8282
status.get_balance(self.chain())
8383
}
8484

85+
pub(crate) fn update_outpoint_status(
86+
&self,
87+
status: &mut OutPointStatus,
88+
daemon: &Daemon,
89+
) -> Result<bool> {
90+
status.sync(&self.index, &self.mempool, daemon)
91+
}
92+
8593
pub(crate) fn get_blockhash_by_txid(&self, txid: Txid) -> Option<BlockHash> {
8694
// Note: there are two blocks with coinbase transactions having same txid (see BIP-30)
8795
self.index.filter_by_txid(txid).next()

0 commit comments

Comments
 (0)