Skip to content

Commit 919c660

Browse files
committed
Add blockchain.outpoint.subscribe RPC
1 parent 60c1313 commit 919c660

File tree

3 files changed

+200
-6
lines changed

3 files changed

+200
-6
lines changed

src/electrum.rs

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use anyhow::{bail, Context, Result};
22
use bitcoin::{
33
consensus::{deserialize, serialize},
44
hashes::hex::{FromHex, ToHex},
5-
BlockHash, Txid,
5+
BlockHash, OutPoint, Txid,
66
};
77
use rayon::prelude::*;
88
use serde_derive::Deserialize;
@@ -18,7 +18,7 @@ use crate::{
1818
merkle::Proof,
1919
metrics::Histogram,
2020
signals::Signal,
21-
status::ScriptHashStatus,
21+
status::{OutPointStatus, ScriptHashStatus},
2222
tracker::Tracker,
2323
types::ScriptHash,
2424
};
@@ -33,6 +33,7 @@ const UNKNOWN_FEE: isize = -1; // (allowed by Electrum protocol)
3333
pub struct Client {
3434
tip: Option<BlockHash>,
3535
scripthashes: HashMap<ScriptHash, ScriptHashStatus>,
36+
outpoints: HashMap<OutPoint, OutPointStatus>,
3637
}
3738

3839
#[derive(Deserialize)]
@@ -172,7 +173,25 @@ impl Rpc {
172173
}
173174
})
174175
.collect::<Result<Vec<Value>>>()
175-
.context("failed to update status")?;
176+
.context("failed to update scripthash status")?;
177+
178+
notifications.extend(
179+
client
180+
.outpoints
181+
.par_iter_mut()
182+
.filter_map(|(outpoint, status)| -> Option<Result<Value>> {
183+
match self.tracker.update_outpoint_status(status, &self.daemon) {
184+
Ok(true) => Some(Ok(notification(
185+
"blockchain.outpoint.subscribe",
186+
&[json!([outpoint.txid, outpoint.vout]), json!(status)],
187+
))),
188+
Ok(false) => None, // outpoint status is the same
189+
Err(e) => Some(Err(e)),
190+
}
191+
})
192+
.collect::<Result<Vec<Value>>>()
193+
.context("failed to update scripthash status")?,
194+
);
176195

177196
if let Some(old_tip) = client.tip {
178197
let new_tip = self.tracker.chain().tip();
@@ -300,6 +319,28 @@ impl Rpc {
300319
Ok(json!(result))
301320
}
302321

322+
fn outpoint_subscribe(&self, client: &mut Client, (txid, vout): (Txid, u32)) -> Result<Value> {
323+
let outpoint = OutPoint::new(txid, vout);
324+
Ok(match client.outpoints.entry(outpoint) {
325+
Entry::Occupied(e) => json!(e.get()),
326+
Entry::Vacant(e) => {
327+
let outpoint = OutPoint::new(txid, vout);
328+
let mut status = OutPointStatus::new(outpoint);
329+
self.tracker
330+
.update_outpoint_status(&mut status, &self.daemon)?;
331+
json!(e.insert(status))
332+
}
333+
})
334+
}
335+
336+
fn outpoint_unsubscribe(
337+
&self,
338+
client: &mut Client,
339+
(txid, vout): (Txid, u32),
340+
) -> Result<Value> {
341+
Ok(json!(client.outpoints.remove(&OutPoint::new(txid, vout))))
342+
}
343+
303344
fn new_status(&self, scripthash: ScriptHash) -> Result<ScriptHashStatus> {
304345
let mut status = ScriptHashStatus::new(scripthash);
305346
self.tracker
@@ -434,6 +475,8 @@ impl Rpc {
434475
Call::HeadersSubscribe => self.headers_subscribe(client),
435476
Call::MempoolFeeHistogram => self.get_fee_histogram(),
436477
Call::PeersSubscribe => Ok(json!([])),
478+
Call::OutPointSubscribe(args) => self.outpoint_subscribe(client, args),
479+
Call::OutPointUnsubscribe(args) => self.outpoint_unsubscribe(client, args),
437480
Call::Ping => Ok(Value::Null),
438481
Call::RelayFee => self.relayfee(),
439482
Call::ScriptHashGetBalance(args) => self.scripthash_get_balance(client, args),
@@ -467,19 +510,21 @@ enum Call {
467510
Banner,
468511
BlockHeader((usize,)),
469512
BlockHeaders((usize, usize)),
470-
TransactionBroadcast((String,)),
471513
Donation,
472514
EstimateFee((u16,)),
473515
Features,
474516
HeadersSubscribe,
475517
MempoolFeeHistogram,
518+
OutPointSubscribe((Txid, u32)), // TODO: support spk_hint
519+
OutPointUnsubscribe((Txid, u32)),
476520
PeersSubscribe,
477521
Ping,
478522
RelayFee,
479523
ScriptHashGetBalance((ScriptHash,)),
480524
ScriptHashGetHistory((ScriptHash,)),
481525
ScriptHashListUnspent((ScriptHash,)),
482526
ScriptHashSubscribe((ScriptHash,)),
527+
TransactionBroadcast((String,)),
483528
TransactionGet(TxGetArgs),
484529
TransactionGetMerkle((Txid, usize)),
485530
Version((String, Version)),
@@ -497,6 +542,8 @@ impl Call {
497542
"blockchain.scripthash.get_history" => Call::ScriptHashGetHistory(convert(params)?),
498543
"blockchain.scripthash.listunspent" => Call::ScriptHashListUnspent(convert(params)?),
499544
"blockchain.scripthash.subscribe" => Call::ScriptHashSubscribe(convert(params)?),
545+
"blockchain.outpoint.subscribe" => Call::OutPointSubscribe(convert(params)?),
546+
"blockchain.outpoint.unsubscribe" => Call::OutPointUnsubscribe(convert(params)?),
500547
"blockchain.transaction.broadcast" => Call::TransactionBroadcast(convert(params)?),
501548
"blockchain.transaction.get" => Call::TransactionGet(convert(params)?),
502549
"blockchain.transaction.get_merkle" => Call::TransactionGetMerkle(convert(params)?),

src/status.rs

Lines changed: 140 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use bitcoin::{
44
Amount, Block, BlockHash, OutPoint, SignedAmount, Transaction, Txid,
55
};
66
use rayon::prelude::*;
7-
use serde::ser::{Serialize, Serializer};
7+
use serde::ser::{Serialize, SerializeMap, Serializer};
88

99
use std::collections::{BTreeMap, HashMap, HashSet};
1010
use std::convert::TryFrom;
@@ -49,12 +49,26 @@ impl TxEntry {
4949
// Confirmation height of a transaction or its mempool state:
5050
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-history
5151
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-mempool
52+
#[derive(Copy, Clone, Eq, PartialEq)]
5253
enum Height {
5354
Confirmed { height: usize },
5455
Unconfirmed { has_unconfirmed_inputs: bool },
5556
}
5657

5758
impl Height {
59+
fn from_blockhash(blockhash: BlockHash, chain: &Chain) -> Self {
60+
let height = chain
61+
.get_block_height(&blockhash)
62+
.expect("missing block in chain");
63+
Self::Confirmed { height }
64+
}
65+
66+
fn unconfirmed(e: &crate::mempool::Entry) -> Self {
67+
Self::Unconfirmed {
68+
has_unconfirmed_inputs: e.has_unconfirmed_inputs,
69+
}
70+
}
71+
5872
fn as_i64(&self) -> i64 {
5973
match self {
6074
Self::Confirmed { height } => i64::try_from(*height).unwrap(),
@@ -509,6 +523,131 @@ fn compute_status_hash(history: &[HistoryEntry]) -> Option<StatusHash> {
509523
Some(StatusHash::from_engine(engine))
510524
}
511525

526+
pub(crate) struct OutPointStatus {
527+
outpoint: OutPoint,
528+
funding: Option<Height>,
529+
spending: Option<(Txid, Height)>,
530+
tip: BlockHash,
531+
}
532+
533+
impl Serialize for OutPointStatus {
534+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
535+
where
536+
S: Serializer,
537+
{
538+
let mut map = serializer.serialize_map(None)?;
539+
if let Some(funding) = &self.funding {
540+
map.serialize_entry("height", &funding)?;
541+
}
542+
if let Some((txid, height)) = &self.spending {
543+
map.serialize_entry("spender_txhash", &txid)?;
544+
map.serialize_entry("spender_height", &height)?;
545+
}
546+
map.end()
547+
}
548+
}
549+
550+
impl OutPointStatus {
551+
pub(crate) fn new(outpoint: OutPoint) -> Self {
552+
Self {
553+
outpoint,
554+
funding: None,
555+
spending: None,
556+
tip: BlockHash::default(),
557+
}
558+
}
559+
560+
pub(crate) fn sync(
561+
&mut self,
562+
index: &Index,
563+
mempool: &Mempool,
564+
daemon: &Daemon,
565+
) -> Result<bool> {
566+
let funding = self.sync_funding(index, daemon, mempool)?;
567+
let spending = self.sync_spending(index, daemon, mempool)?;
568+
let same_status = (self.funding == funding) && (self.spending == spending);
569+
self.funding = funding;
570+
self.spending = spending;
571+
self.tip = index.chain().tip();
572+
Ok(!same_status)
573+
}
574+
575+
/// Return true iff current tip became unconfirmed
576+
fn is_reorg(&self, chain: &Chain) -> bool {
577+
chain.get_block_height(&self.tip).is_none()
578+
}
579+
580+
fn sync_funding(
581+
&self,
582+
index: &Index,
583+
daemon: &Daemon,
584+
mempool: &Mempool,
585+
) -> Result<Option<Height>> {
586+
let chain = index.chain();
587+
if !self.is_reorg(chain) {
588+
if let Some(Height::Confirmed { .. }) = &self.funding {
589+
return Ok(self.funding);
590+
}
591+
}
592+
let mut confirmed = None;
593+
daemon.for_blocks(
594+
index.filter_by_txid(self.outpoint.txid),
595+
|blockhash, block| {
596+
if confirmed.is_none() {
597+
for tx in block.txdata {
598+
let txid = tx.txid();
599+
let output_len = u32::try_from(tx.output.len()).unwrap();
600+
if self.outpoint.txid == txid && self.outpoint.vout < output_len {
601+
confirmed = Some(Height::from_blockhash(blockhash, chain));
602+
return;
603+
}
604+
}
605+
}
606+
},
607+
)?;
608+
Ok(confirmed.or_else(|| {
609+
mempool
610+
.get(&self.outpoint.txid)
611+
.map(|entry| Height::unconfirmed(entry))
612+
}))
613+
}
614+
615+
fn sync_spending(
616+
&self,
617+
index: &Index,
618+
daemon: &Daemon,
619+
mempool: &Mempool,
620+
) -> Result<Option<(Txid, Height)>> {
621+
let chain = index.chain();
622+
if !self.is_reorg(chain) {
623+
if let Some((_, Height::Confirmed { .. })) = &self.spending {
624+
return Ok(self.spending);
625+
}
626+
}
627+
let spending_blockhashes = index.filter_by_spending(self.outpoint);
628+
let mut confirmed = None;
629+
daemon.for_blocks(spending_blockhashes, |blockhash, block| {
630+
for tx in block.txdata {
631+
for txi in &tx.input {
632+
if txi.previous_output == self.outpoint {
633+
// TODO: there should be only one spending input
634+
assert!(confirmed.is_none(), "double spend of {}", self.outpoint);
635+
confirmed = Some((tx.txid(), Height::from_blockhash(blockhash, chain)));
636+
return;
637+
}
638+
}
639+
}
640+
})?;
641+
Ok(confirmed.or_else(|| {
642+
let entries = mempool.filter_by_spending(&self.outpoint);
643+
assert!(entries.len() <= 1, "double spend of {}", self.outpoint);
644+
entries
645+
.first()
646+
.map(|entry| (entry.txid, Height::unconfirmed(entry)))
647+
}))
648+
}
649+
}
650+
512651
#[cfg(test)]
513652
mod tests {
514653
use super::HistoryEntry;

src/tracker.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::{
1111
mempool::{Histogram, Mempool},
1212
metrics::Metrics,
1313
signals::ExitFlag,
14-
status::{Balance, HistoryEntry, ScriptHashStatus, UnspentEntry},
14+
status::{Balance, HistoryEntry, OutPointStatus, ScriptHashStatus, UnspentEntry},
1515
};
1616

1717
/// Electrum protocol subscriptions' tracker
@@ -86,6 +86,14 @@ impl Tracker {
8686
status.get_balance(self.chain())
8787
}
8888

89+
pub(crate) fn update_outpoint_status(
90+
&self,
91+
status: &mut OutPointStatus,
92+
daemon: &Daemon,
93+
) -> Result<bool> {
94+
status.sync(&self.index, &self.mempool, daemon)
95+
}
96+
8997
pub(crate) fn get_blockhash_by_txid(&self, txid: Txid) -> Option<BlockHash> {
9098
// Note: there are two blocks with coinbase transactions having same txid (see BIP-30)
9199
self.index.filter_by_txid(txid).next()

0 commit comments

Comments
 (0)