Skip to content

Commit 78e62ab

Browse files
committed
Add blockchain.outpoint.subscribe RPC
1 parent 62e4048 commit 78e62ab

File tree

3 files changed

+200
-6
lines changed

3 files changed

+200
-6
lines changed

src/electrum.rs

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use anyhow::{bail, Context, Result};
22
use bitcoin::{
33
consensus::{deserialize, serialize},
44
hashes::hex::{FromHex, ToHex},
5-
BlockHash, Txid,
5+
BlockHash, OutPoint, Txid,
66
};
77
use rayon::prelude::*;
88
use serde_derive::Deserialize;
@@ -17,7 +17,7 @@ use crate::{
1717
daemon::{self, extract_bitcoind_error, Daemon},
1818
merkle::Proof,
1919
metrics::Histogram,
20-
status::ScriptHashStatus,
20+
status::{OutPointStatus, ScriptHashStatus},
2121
tracker::Tracker,
2222
types::ScriptHash,
2323
};
@@ -32,6 +32,7 @@ const UNKNOWN_FEE: isize = -1; // (allowed by Electrum protocol)
3232
pub struct Client {
3333
tip: Option<BlockHash>,
3434
scripthashes: HashMap<ScriptHash, ScriptHashStatus>,
35+
outpoints: HashMap<OutPoint, OutPointStatus>,
3536
}
3637

3738
#[derive(Deserialize)]
@@ -158,7 +159,25 @@ impl Rpc {
158159
}
159160
})
160161
.collect::<Result<Vec<Value>>>()
161-
.context("failed to update status")?;
162+
.context("failed to update scripthash status")?;
163+
164+
notifications.extend(
165+
client
166+
.outpoints
167+
.par_iter_mut()
168+
.filter_map(|(outpoint, status)| -> Option<Result<Value>> {
169+
match self.tracker.update_outpoint_status(status, &self.daemon) {
170+
Ok(true) => Some(Ok(notification(
171+
"blockchain.outpoint.subscribe",
172+
&[json!([outpoint.txid, outpoint.vout]), json!(status)],
173+
))),
174+
Ok(false) => None, // outpoint status is the same
175+
Err(e) => Some(Err(e)),
176+
}
177+
})
178+
.collect::<Result<Vec<Value>>>()
179+
.context("failed to update scripthash status")?,
180+
);
162181

163182
if let Some(old_tip) = client.tip {
164183
let new_tip = self.tracker.chain().tip();
@@ -286,6 +305,28 @@ impl Rpc {
286305
Ok(json!(result))
287306
}
288307

308+
fn outpoint_subscribe(&self, client: &mut Client, (txid, vout): (Txid, u32)) -> Result<Value> {
309+
let outpoint = OutPoint::new(txid, vout);
310+
Ok(match client.outpoints.entry(outpoint) {
311+
Entry::Occupied(e) => json!(e.get()),
312+
Entry::Vacant(e) => {
313+
let outpoint = OutPoint::new(txid, vout);
314+
let mut status = OutPointStatus::new(outpoint);
315+
self.tracker
316+
.update_outpoint_status(&mut status, &self.daemon)?;
317+
json!(e.insert(status))
318+
}
319+
})
320+
}
321+
322+
fn outpoint_unsubscribe(
323+
&self,
324+
client: &mut Client,
325+
(txid, vout): (Txid, u32),
326+
) -> Result<Value> {
327+
Ok(json!(client.outpoints.remove(&OutPoint::new(txid, vout))))
328+
}
329+
289330
fn new_status(&self, scripthash: ScriptHash) -> Result<ScriptHashStatus> {
290331
let mut status = ScriptHashStatus::new(scripthash);
291332
self.tracker
@@ -420,6 +461,8 @@ impl Rpc {
420461
Call::HeadersSubscribe => self.headers_subscribe(client),
421462
Call::MempoolFeeHistogram => self.get_fee_histogram(),
422463
Call::PeersSubscribe => Ok(json!([])),
464+
Call::OutPointSubscribe(args) => self.outpoint_subscribe(client, args),
465+
Call::OutPointUnsubscribe(args) => self.outpoint_unsubscribe(client, args),
423466
Call::Ping => Ok(Value::Null),
424467
Call::RelayFee => self.relayfee(),
425468
Call::ScriptHashGetBalance(args) => self.scripthash_get_balance(client, args),
@@ -453,19 +496,21 @@ enum Call {
453496
Banner,
454497
BlockHeader((usize,)),
455498
BlockHeaders((usize, usize)),
456-
TransactionBroadcast((String,)),
457499
Donation,
458500
EstimateFee((u16,)),
459501
Features,
460502
HeadersSubscribe,
461503
MempoolFeeHistogram,
504+
OutPointSubscribe((Txid, u32)), // TODO: support spk_hint
505+
OutPointUnsubscribe((Txid, u32)),
462506
PeersSubscribe,
463507
Ping,
464508
RelayFee,
465509
ScriptHashGetBalance((ScriptHash,)),
466510
ScriptHashGetHistory((ScriptHash,)),
467511
ScriptHashListUnspent((ScriptHash,)),
468512
ScriptHashSubscribe((ScriptHash,)),
513+
TransactionBroadcast((String,)),
469514
TransactionGet(TxGetArgs),
470515
TransactionGetMerkle((Txid, usize)),
471516
Version((String, Version)),
@@ -483,6 +528,8 @@ impl Call {
483528
"blockchain.scripthash.get_history" => Call::ScriptHashGetHistory(convert(params)?),
484529
"blockchain.scripthash.listunspent" => Call::ScriptHashListUnspent(convert(params)?),
485530
"blockchain.scripthash.subscribe" => Call::ScriptHashSubscribe(convert(params)?),
531+
"blockchain.outpoint.subscribe" => Call::OutPointSubscribe(convert(params)?),
532+
"blockchain.outpoint.unsubscribe" => Call::OutPointUnsubscribe(convert(params)?),
486533
"blockchain.transaction.broadcast" => Call::TransactionBroadcast(convert(params)?),
487534
"blockchain.transaction.get" => Call::TransactionGet(convert(params)?),
488535
"blockchain.transaction.get_merkle" => Call::TransactionGetMerkle(convert(params)?),

src/status.rs

Lines changed: 140 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use bitcoin::{
44
Amount, Block, BlockHash, OutPoint, SignedAmount, Transaction, Txid,
55
};
66
use rayon::prelude::*;
7-
use serde::ser::{Serialize, Serializer};
7+
use serde::ser::{Serialize, SerializeMap, Serializer};
88

99
use std::collections::{BTreeMap, HashMap, HashSet};
1010
use std::convert::TryFrom;
@@ -49,12 +49,26 @@ impl TxEntry {
4949
// Confirmation height of a transaction or its mempool state:
5050
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-history
5151
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-mempool
52+
#[derive(Copy, Clone, Eq, PartialEq)]
5253
enum Height {
5354
Confirmed { height: usize },
5455
Unconfirmed { has_unconfirmed_inputs: bool },
5556
}
5657

5758
impl Height {
59+
fn from_blockhash(blockhash: BlockHash, chain: &Chain) -> Self {
60+
let height = chain
61+
.get_block_height(&blockhash)
62+
.expect("missing block in chain");
63+
Self::Confirmed { height }
64+
}
65+
66+
fn unconfirmed(e: &crate::mempool::Entry) -> Self {
67+
Self::Unconfirmed {
68+
has_unconfirmed_inputs: e.has_unconfirmed_inputs,
69+
}
70+
}
71+
5872
fn as_i64(&self) -> i64 {
5973
match self {
6074
Self::Confirmed { height } => i64::try_from(*height).unwrap(),
@@ -509,6 +523,131 @@ fn compute_status_hash(history: &[HistoryEntry]) -> Option<StatusHash> {
509523
Some(StatusHash::from_engine(engine))
510524
}
511525

526+
pub(crate) struct OutPointStatus {
527+
outpoint: OutPoint,
528+
funding: Option<Height>,
529+
spending: Option<(Txid, Height)>,
530+
tip: BlockHash,
531+
}
532+
533+
impl Serialize for OutPointStatus {
534+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
535+
where
536+
S: Serializer,
537+
{
538+
let mut map = serializer.serialize_map(None)?;
539+
if let Some(funding) = &self.funding {
540+
map.serialize_entry("height", &funding)?;
541+
}
542+
if let Some((txid, height)) = &self.spending {
543+
map.serialize_entry("spender_txhash", &txid)?;
544+
map.serialize_entry("spender_height", &height)?;
545+
}
546+
map.end()
547+
}
548+
}
549+
550+
impl OutPointStatus {
551+
pub(crate) fn new(outpoint: OutPoint) -> Self {
552+
Self {
553+
outpoint,
554+
funding: None,
555+
spending: None,
556+
tip: BlockHash::default(),
557+
}
558+
}
559+
560+
pub(crate) fn sync(
561+
&mut self,
562+
index: &Index,
563+
mempool: &Mempool,
564+
daemon: &Daemon,
565+
) -> Result<bool> {
566+
let funding = self.sync_funding(index, daemon, mempool)?;
567+
let spending = self.sync_spending(index, daemon, mempool)?;
568+
let same_status = (self.funding == funding) && (self.spending == spending);
569+
self.funding = funding;
570+
self.spending = spending;
571+
self.tip = index.chain().tip();
572+
Ok(!same_status)
573+
}
574+
575+
/// Return true iff current tip became unconfirmed
576+
fn is_reorg(&self, chain: &Chain) -> bool {
577+
chain.get_block_height(&self.tip).is_none()
578+
}
579+
580+
fn sync_funding(
581+
&self,
582+
index: &Index,
583+
daemon: &Daemon,
584+
mempool: &Mempool,
585+
) -> Result<Option<Height>> {
586+
let chain = index.chain();
587+
if !self.is_reorg(chain) {
588+
if let Some(Height::Confirmed { .. }) = &self.funding {
589+
return Ok(self.funding);
590+
}
591+
}
592+
let mut confirmed = None;
593+
daemon.for_blocks(
594+
index.filter_by_txid(self.outpoint.txid),
595+
|blockhash, block| {
596+
if confirmed.is_none() {
597+
for tx in block.txdata {
598+
let txid = tx.txid();
599+
let output_len = u32::try_from(tx.output.len()).unwrap();
600+
if self.outpoint.txid == txid && self.outpoint.vout < output_len {
601+
confirmed = Some(Height::from_blockhash(blockhash, chain));
602+
return;
603+
}
604+
}
605+
}
606+
},
607+
)?;
608+
Ok(confirmed.or_else(|| {
609+
mempool
610+
.get(&self.outpoint.txid)
611+
.map(|entry| Height::unconfirmed(entry))
612+
}))
613+
}
614+
615+
fn sync_spending(
616+
&self,
617+
index: &Index,
618+
daemon: &Daemon,
619+
mempool: &Mempool,
620+
) -> Result<Option<(Txid, Height)>> {
621+
let chain = index.chain();
622+
if !self.is_reorg(chain) {
623+
if let Some((_, Height::Confirmed { .. })) = &self.spending {
624+
return Ok(self.spending);
625+
}
626+
}
627+
let spending_blockhashes = index.filter_by_spending(self.outpoint);
628+
let mut confirmed = None;
629+
daemon.for_blocks(spending_blockhashes, |blockhash, block| {
630+
for tx in block.txdata {
631+
for txi in &tx.input {
632+
if txi.previous_output == self.outpoint {
633+
// TODO: there should be only one spending input
634+
assert!(confirmed.is_none(), "double spend of {}", self.outpoint);
635+
confirmed = Some((tx.txid(), Height::from_blockhash(blockhash, chain)));
636+
return;
637+
}
638+
}
639+
}
640+
})?;
641+
Ok(confirmed.or_else(|| {
642+
let entries = mempool.filter_by_spending(&self.outpoint);
643+
assert!(entries.len() <= 1, "double spend of {}", self.outpoint);
644+
entries
645+
.first()
646+
.map(|entry| (entry.txid, Height::unconfirmed(entry)))
647+
}))
648+
}
649+
}
650+
512651
#[cfg(test)]
513652
mod tests {
514653
use super::HistoryEntry;

src/tracker.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::{
1010
index::Index,
1111
mempool::{Histogram, Mempool},
1212
metrics::Metrics,
13-
status::{Balance, HistoryEntry, ScriptHashStatus, UnspentEntry},
13+
status::{Balance, HistoryEntry, OutPointStatus, ScriptHashStatus, UnspentEntry},
1414
};
1515

1616
/// Electrum protocol subscriptions' tracker
@@ -81,6 +81,14 @@ impl Tracker {
8181
status.get_balance(self.chain())
8282
}
8383

84+
pub(crate) fn update_outpoint_status(
85+
&self,
86+
status: &mut OutPointStatus,
87+
daemon: &Daemon,
88+
) -> Result<bool> {
89+
status.sync(&self.index, &self.mempool, daemon)
90+
}
91+
8492
pub fn get_blockhash_by_txid(&self, txid: Txid) -> Option<BlockHash> {
8593
// Note: there are two blocks with coinbase transactions having same txid (see BIP-30)
8694
self.index.filter_by_txid(txid).next()

0 commit comments

Comments
 (0)