Skip to content

Commit

Permalink
refactor(rpc)!: update mempool interface and test code
Browse files Browse the repository at this point in the history
  • Loading branch information
LagginTimes committed Feb 20, 2025
1 parent dc452a1 commit 66fa256
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 30 deletions.
57 changes: 49 additions & 8 deletions crates/bitcoind_rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
#![warn(missing_docs)]

use bdk_core::{BlockId, CheckPoint};
use bitcoin::{block::Header, Block, BlockHash, Transaction};
use bitcoin::{block::Header, Block, BlockHash, Transaction, Txid};
use bitcoincore_rpc::bitcoincore_rpc_json;
use std::collections::HashSet;

pub mod bip158;

Expand Down Expand Up @@ -64,17 +65,19 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
}
}

/// Emit mempool transactions, alongside their first-seen unix timestamps.
/// Emit mempool transactions and capture the initial snapshot of all mempool [`Txid`]s.
///
/// This method emits each transaction only once, unless we cannot guarantee the transaction's
/// ancestors are already emitted.
/// This method returns a [`MempoolEvent`] containing the full transactions (with their
/// first-seen unix timestamps) that were emitted, and the set of all [`Txid`]s present from the
/// initial mempool query. Each transaction is emitted only once, unless we cannot guarantee the
/// transaction's ancestors are already emitted.
///
/// To understand why, consider a receiver which filters transactions based on whether it
/// alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction spends a
/// tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to block
/// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block
/// at height `h`.
pub fn mempool(&mut self) -> Result<Vec<(Transaction, u64)>, bitcoincore_rpc::Error> {
pub fn mempool(&mut self) -> Result<MempoolEvent, bitcoincore_rpc::Error> {
let client = self.client;

// This is the emitted tip height during the last mempool emission.
Expand All @@ -91,8 +94,11 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
let prev_mempool_time = self.last_mempool_time;
let mut latest_time = prev_mempool_time;

let txs_to_emit = client
.get_raw_mempool_verbose()?
// Get the raw mempool result from the RPC client.
let raw_mempool = client.get_raw_mempool_verbose()?;
let raw_mempool_txids = raw_mempool.keys().copied().collect::<HashSet<_>>();

let emitted_txs = raw_mempool
.into_iter()
.filter_map({
let latest_time = &mut latest_time;
Expand Down Expand Up @@ -128,7 +134,11 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
self.last_mempool_time = latest_time;
self.last_mempool_tip = Some(self.last_cp.height());

Ok(txs_to_emit)
Ok(MempoolEvent {
emitted_txs,
raw_mempool_txids,
last_seen: latest_time as u64,
})
}

/// Emit the next block height and header (if any).
Expand All @@ -144,6 +154,37 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
}
}

/// A new emission from mempool.
#[derive(Debug)]
pub struct MempoolEvent {
/// Emitted mempool transactions with their first‐seen unix timestamps.
pub emitted_txs: Vec<(Transaction, u64)>,

/// Set of all [`Txid`]s from the raw mempool result, including transactions that may have been
/// confirmed or evicted during processing. This is used to determine which expected
/// transactions are missing.
pub raw_mempool_txids: HashSet<Txid>,

/// The latest first-seen epoch of emitted mempool transactions.
pub last_seen: u64,
}

impl MempoolEvent {
/// Given an iterator of expected [`Txid`]s, return those that are missing from the mempool.
pub fn evicted_txids(
&self,
expected_unconfirmed_txids: impl IntoIterator<Item = Txid>,
) -> HashSet<Txid> {
let expected_set = expected_unconfirmed_txids
.into_iter()
.collect::<HashSet<_>>();
expected_set
.difference(&self.raw_mempool_txids)
.copied()
.collect()
}
}

/// A newly emitted block from [`Emitter`].
#[derive(Debug)]
pub struct BlockEvent<B> {
Expand Down
33 changes: 21 additions & 12 deletions crates/bitcoind_rpc/tests/test_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
assert!(emitter.next_block()?.is_none());

let mempool_txs = emitter.mempool()?;
let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs);
let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs.emitted_txs);
assert_eq!(
indexed_additions
.tx_graph
Expand Down Expand Up @@ -437,6 +437,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
// the first emission should include all transactions
let emitted_txids = emitter
.mempool()?
.emitted_txs
.into_iter()
.map(|(tx, _)| tx.compute_txid())
.collect::<BTreeSet<Txid>>();
Expand All @@ -447,7 +448,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {

// second emission should be empty
assert!(
emitter.mempool()?.is_empty(),
emitter.mempool()?.emitted_txs.is_empty(),
"second emission should be empty"
);

Expand All @@ -457,7 +458,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
}
while emitter.next_header()?.is_some() {}
assert!(
emitter.mempool()?.is_empty(),
emitter.mempool()?.emitted_txs.is_empty(),
"third emission, after chain tip is extended, should also be empty"
);

Expand Down Expand Up @@ -506,6 +507,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
assert_eq!(
emitter
.mempool()?
.emitted_txs
.into_iter()
.map(|(tx, _)| tx.compute_txid())
.collect::<BTreeSet<_>>(),
Expand All @@ -515,6 +517,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
assert_eq!(
emitter
.mempool()?
.emitted_txs
.into_iter()
.map(|(tx, _)| tx.compute_txid())
.collect::<BTreeSet<_>>(),
Expand All @@ -535,6 +538,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
.collect::<BTreeSet<_>>();
let emitted_txids = emitter
.mempool()?
.emitted_txs
.into_iter()
.map(|(tx, _)| tx.compute_txid())
.collect::<BTreeSet<_>>();
Expand Down Expand Up @@ -593,6 +597,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
assert_eq!(
emitter
.mempool()?
.emitted_txs
.into_iter()
.map(|(tx, _)| tx.compute_txid())
.collect::<BTreeSet<_>>(),
Expand Down Expand Up @@ -628,6 +633,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
// include mempool txs introduced at reorg height or greater
let mempool = emitter
.mempool()?
.emitted_txs
.into_iter()
.map(|(tx, _)| tx.compute_txid())
.collect::<BTreeSet<_>>();
Expand All @@ -643,6 +649,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {

let mempool = emitter
.mempool()?
.emitted_txs
.into_iter()
.map(|(tx, _)| tx.compute_txid())
.collect::<BTreeSet<_>>();
Expand Down Expand Up @@ -766,7 +773,7 @@ fn test_expect_tx_evicted() -> anyhow::Result<()> {
)?;

let mut emitter = Emitter::new(env.rpc_client(), chain.tip(), 1);
let changeset = graph.batch_insert_unconfirmed(emitter.mempool()?);
let changeset = graph.batch_insert_unconfirmed(emitter.mempool()?.emitted_txs);
assert!(changeset
.tx_graph
.txs
Expand Down Expand Up @@ -806,14 +813,15 @@ fn test_expect_tx_evicted() -> anyhow::Result<()> {
// Send the tx.
let txid_2 = core.send_raw_transaction(&tx1b)?;

// We evict the expected txs that are missing from mempool.
let exp_txids = graph
.expected_unconfirmed_spk_txids(&chain, chain_tip, ..)?
.collect::<Vec<_>>();
assert_eq!(exp_txids, vec![(txid_1, spk)]);
let mempool = emitter
.mempool()?
.into_iter()
// Retrieve the expected unconfirmed txids and spks from the graph.
let exp_spk_txids = graph.expected_unconfirmed_spk_txids(&chain, chain_tip, ..)?;
assert_eq!(exp_spk_txids, vec![(txid_1, spk)]);

// Check that mempool emission contains evicted txid.
let mempool_event = emitter.mempool()?;
let unseen_txids: Vec<Txid> = mempool_event
.emitted_txs
.iter()
.map(|(tx, _)| tx.compute_txid())
.collect();
assert!(unseen_txids.contains(&txid_2));
Expand All @@ -825,6 +833,7 @@ fn test_expect_tx_evicted() -> anyhow::Result<()> {
let _ = graph.insert_evicted_at(txid, seen_at);
}

// tx1 should no longer be canonical.
assert!(graph
.graph()
.list_canonical_txs(&chain, chain_tip)
Expand Down
25 changes: 18 additions & 7 deletions example-crates/example_bitcoind_rpc_polling/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@ use bdk_bitcoind_rpc::{
bitcoincore_rpc::{Auth, Client, RpcApi},
Emitter,
};
use bdk_chain::{
bitcoin::{Block, Transaction},
local_chain, Merge,
};
use bdk_chain::{bitcoin::Block, local_chain, Merge};
use example_cli::{
anyhow,
clap::{self, Args, Subcommand},
Expand All @@ -36,7 +33,7 @@ const DB_COMMIT_DELAY: Duration = Duration::from_secs(60);
#[derive(Debug)]
enum Emission {
Block(bdk_bitcoind_rpc::BlockEvent<Block>),
Mempool(Vec<(Transaction, u64)>),
Mempool(bdk_bitcoind_rpc::MempoolEvent),
Tip(u32),
}

Expand Down Expand Up @@ -204,7 +201,7 @@ fn main() -> anyhow::Result<()> {
let graph_changeset = graph
.lock()
.unwrap()
.batch_insert_relevant_unconfirmed(mempool_txs);
.batch_insert_relevant_unconfirmed(mempool_txs.emitted_txs);
{
let db = &mut *db.lock().unwrap();
db_stage.merge(ChangeSet {
Expand Down Expand Up @@ -287,7 +284,21 @@ fn main() -> anyhow::Result<()> {
(chain_changeset, graph_changeset)
}
Emission::Mempool(mempool_txs) => {
let graph_changeset = graph.batch_insert_relevant_unconfirmed(mempool_txs);
let mut graph_changeset = graph
.batch_insert_relevant_unconfirmed(mempool_txs.emitted_txs.clone());
let expected_txids = graph
.expected_unconfirmed_spk_txids(
&chain.clone(),
chain.tip().block_id(),
..,
)?
.into_iter()
.map(|(txid, _)| txid);
let evicted_txids = mempool_txs.evicted_txids(expected_txids);
for txid in evicted_txids {
graph_changeset
.merge(graph.insert_evicted_at(txid, mempool_txs.last_seen));
}
(local_chain::ChangeSet::default(), graph_changeset)
}
Emission::Tip(h) => {
Expand Down
6 changes: 3 additions & 3 deletions example-crates/example_wallet_rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use bdk_bitcoind_rpc::{
Emitter,
};
use bdk_wallet::{
bitcoin::{Block, Network, Transaction},
bitcoin::{Block, Network},
file_store::Store,
KeychainKind, Wallet,
};
Expand Down Expand Up @@ -73,7 +73,7 @@ impl Args {
enum Emission {
SigTerm,
Block(bdk_bitcoind_rpc::BlockEvent<Block>),
Mempool(Vec<(Transaction, u64)>),
Mempool(bdk_bitcoind_rpc::MempoolEvent),
}

fn main() -> anyhow::Result<()> {
Expand Down Expand Up @@ -157,7 +157,7 @@ fn main() -> anyhow::Result<()> {
}
Emission::Mempool(mempool_emission) => {
let start_apply_mempool = Instant::now();
wallet.apply_unconfirmed_txs(mempool_emission);
wallet.apply_unconfirmed_txs(mempool_emission.emitted_txs);
wallet.persist(&mut db)?;
println!(
"Applied unconfirmed transactions in {}s",
Expand Down

0 comments on commit 66fa256

Please sign in to comment.