Skip to content

Commit

Permalink
feat(esplora): Handle spks with expected txids
Browse files Browse the repository at this point in the history
Co-authored-by: Wei Chen <wzc110@gmail.com>
  • Loading branch information
evanlinjin and LagginTimes committed Feb 24, 2025
1 parent a965a39 commit 5107b3a
Show file tree
Hide file tree
Showing 4 changed files with 314 additions and 37 deletions.
37 changes: 26 additions & 11 deletions crates/esplora/src/async_ext.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use async_trait::async_trait;
use bdk_core::collections::{BTreeMap, BTreeSet, HashSet};
use bdk_core::spk_client::{FullScanRequest, FullScanResponse, SyncRequest, SyncResponse};
use bdk_core::spk_client::{
FullScanRequest, FullScanResponse, SpkWithExpectedTxids, SyncRequest, SyncResponse,
};
use bdk_core::{
bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid},
bitcoin::{BlockHash, OutPoint, Txid},
BlockId, CheckPoint, ConfirmationBlockTime, Indexed, TxUpdate,
};
use esplora_client::Sleeper;
Expand Down Expand Up @@ -62,7 +64,7 @@ where
stop_gap: usize,
parallel_requests: usize,
) -> Result<FullScanResponse<K>, Error> {
let mut request = request.into();
let mut request: FullScanRequest<K> = request.into();
let start_time = request.start_time();
let keychains = request.keychains();

Expand All @@ -77,7 +79,9 @@ where
let mut inserted_txs = HashSet::<Txid>::new();
let mut last_active_indices = BTreeMap::<K, u32>::new();
for keychain in keychains {
let keychain_spks = request.iter_spks(keychain.clone());
let keychain_spks = request
.iter_spks(keychain.clone())
.map(|(spk_i, spk)| (spk_i, spk.into()));
let (update, last_active_index) = fetch_txs_with_keychain_spks(
self,
start_time,
Expand Down Expand Up @@ -112,7 +116,7 @@ where
request: R,
parallel_requests: usize,
) -> Result<SyncResponse, Error> {
let mut request = request.into();
let mut request: SyncRequest<I> = request.into();
let start_time = request.start_time();

let chain_tip = request.chain_tip();
Expand All @@ -129,7 +133,7 @@ where
self,
start_time,
&mut inserted_txs,
request.iter_spks(),
request.iter_spks_with_expected_txids(),
parallel_requests,
)
.await?,
Expand Down Expand Up @@ -291,10 +295,10 @@ async fn fetch_txs_with_keychain_spks<I, S>(
parallel_requests: usize,
) -> Result<(TxUpdate<ConfirmationBlockTime>, Option<u32>), Error>
where
I: Iterator<Item = Indexed<ScriptBuf>> + Send,
I: Iterator<Item = Indexed<SpkWithExpectedTxids>> + Send,
S: Sleeper + Clone + Send + Sync,
{
type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>, HashSet<Txid>);

let mut update = TxUpdate::<ConfirmationBlockTime>::default();
let mut last_index = Option::<u32>::None;
Expand All @@ -306,6 +310,8 @@ where
.take(parallel_requests)
.map(|(spk_index, spk)| {
let client = client.clone();
let expected_txids = spk.expected_txids;
let spk = spk.spk;
async move {
let mut last_seen = None;
let mut spk_txs = Vec::new();
Expand All @@ -315,9 +321,15 @@ where
last_seen = txs.last().map(|tx| tx.txid);
spk_txs.extend(txs);
if tx_count < 25 {
break Result::<_, Error>::Ok((spk_index, spk_txs));
break;
}
}
let got_txids = spk_txs.iter().map(|tx| tx.txid).collect::<HashSet<_>>();
let evicted_txids = expected_txids
.difference(&got_txids)
.copied()
.collect::<HashSet<_>>();
Result::<TxsOfSpkIndex, Error>::Ok((spk_index, spk_txs, evicted_txids))
}
})
.collect::<FuturesOrdered<_>>();
Expand All @@ -326,7 +338,7 @@ where
break;
}

for (index, txs) in handles.try_collect::<Vec<TxsOfSpkIndex>>().await? {
for (index, txs, evicted) in handles.try_collect::<Vec<TxsOfSpkIndex>>().await? {
last_index = Some(index);
if !txs.is_empty() {
last_active_index = Some(index);
Expand All @@ -338,6 +350,9 @@ where
insert_anchor_or_seen_at_from_status(&mut update, start_time, tx.txid, tx.status);
insert_prevouts(&mut update, tx.vin);
}
update
.evicted_ats
.extend(evicted.into_iter().map(|txid| (txid, start_time)));
}

let last_index = last_index.expect("Must be set since handles wasn't empty.");
Expand Down Expand Up @@ -370,7 +385,7 @@ async fn fetch_txs_with_spks<I, S>(
parallel_requests: usize,
) -> Result<TxUpdate<ConfirmationBlockTime>, Error>
where
I: IntoIterator<Item = ScriptBuf> + Send,
I: IntoIterator<Item = SpkWithExpectedTxids> + Send,
I::IntoIter: Send,
S: Sleeper + Clone + Send + Sync,
{
Expand Down
57 changes: 35 additions & 22 deletions crates/esplora/src/blocking_ext.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use bdk_core::collections::{BTreeMap, BTreeSet, HashSet};
use bdk_core::spk_client::{FullScanRequest, FullScanResponse, SyncRequest, SyncResponse};
use bdk_core::spk_client::{
FullScanRequest, FullScanResponse, SpkWithExpectedTxids, SyncRequest, SyncResponse,
};
use bdk_core::{
bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid},
bitcoin::{BlockHash, OutPoint, Txid},
BlockId, CheckPoint, ConfirmationBlockTime, Indexed, TxUpdate,
};
use esplora_client::{OutputStatus, Tx};
Expand Down Expand Up @@ -53,7 +55,7 @@ impl EsploraExt for esplora_client::BlockingClient {
stop_gap: usize,
parallel_requests: usize,
) -> Result<FullScanResponse<K>, Error> {
let mut request = request.into();
let mut request: FullScanRequest<K> = request.into();
let start_time = request.start_time();

let chain_tip = request.chain_tip();
Expand All @@ -67,7 +69,9 @@ impl EsploraExt for esplora_client::BlockingClient {
let mut inserted_txs = HashSet::<Txid>::new();
let mut last_active_indices = BTreeMap::<K, u32>::new();
for keychain in request.keychains() {
let keychain_spks = request.iter_spks(keychain.clone());
let keychain_spks = request
.iter_spks(keychain.clone())
.map(|(spk_i, spk)| (spk_i, spk.into()));
let (update, last_active_index) = fetch_txs_with_keychain_spks(
self,
start_time,
Expand Down Expand Up @@ -120,7 +124,7 @@ impl EsploraExt for esplora_client::BlockingClient {
self,
start_time,
&mut inserted_txs,
request.iter_spks(),
request.iter_spks_with_expected_txids(),
parallel_requests,
)?);
tx_update.extend(fetch_txs_with_txids(
Expand Down Expand Up @@ -254,15 +258,15 @@ fn chain_update(
Ok(tip)
}

fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>>>(
fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<SpkWithExpectedTxids>>>(
client: &esplora_client::BlockingClient,
start_time: u64,
inserted_txs: &mut HashSet<Txid>,
mut keychain_spks: I,
stop_gap: usize,
parallel_requests: usize,
) -> Result<(TxUpdate<ConfirmationBlockTime>, Option<u32>), Error> {
type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>, HashSet<Txid>);

let mut update = TxUpdate::<ConfirmationBlockTime>::default();
let mut last_index = Option::<u32>::None;
Expand All @@ -273,21 +277,27 @@ fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>>>(
.by_ref()
.take(parallel_requests)
.map(|(spk_index, spk)| {
std::thread::spawn({
let client = client.clone();
move || -> Result<TxsOfSpkIndex, Error> {
let mut last_seen = None;
let mut spk_txs = Vec::new();
loop {
let txs = client.scripthash_txs(&spk, last_seen)?;
let tx_count = txs.len();
last_seen = txs.last().map(|tx| tx.txid);
spk_txs.extend(txs);
if tx_count < 25 {
break Ok((spk_index, spk_txs));
}
let client = client.clone();
let expected_txids = spk.expected_txids;
let spk = spk.spk;
std::thread::spawn(move || -> Result<TxsOfSpkIndex, Error> {
let mut last_txid = None;
let mut spk_txs = Vec::new();
loop {
let txs = client.scripthash_txs(&spk, last_txid)?;
let tx_count = txs.len();
last_txid = txs.last().map(|tx| tx.txid);
spk_txs.extend(txs);
if tx_count < 25 {
break;
}
}
let got_txids = spk_txs.iter().map(|tx| tx.txid).collect::<HashSet<_>>();
let evicted_txids = expected_txids
.difference(&got_txids)
.copied()
.collect::<HashSet<_>>();
Ok((spk_index, spk_txs, evicted_txids))
})
})
.collect::<Vec<JoinHandle<Result<TxsOfSpkIndex, Error>>>>();
Expand All @@ -297,7 +307,7 @@ fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>>>(
}

for handle in handles {
let (index, txs) = handle.join().expect("thread must not panic")?;
let (index, txs, evicted) = handle.join().expect("thread must not panic")?;
last_index = Some(index);
if !txs.is_empty() {
last_active_index = Some(index);
Expand All @@ -309,6 +319,9 @@ fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>>>(
insert_anchor_or_seen_at_from_status(&mut update, start_time, tx.txid, tx.status);
insert_prevouts(&mut update, tx.vin);
}
update
.evicted_ats
.extend(evicted.into_iter().map(|txid| (txid, start_time)));
}

let last_index = last_index.expect("Must be set since handles wasn't empty.");
Expand All @@ -333,7 +346,7 @@ fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>>>(
/// requests to make in parallel.
///
/// Refer to [crate-level docs](crate) for more.
fn fetch_txs_with_spks<I: IntoIterator<Item = ScriptBuf>>(
fn fetch_txs_with_spks<I: IntoIterator<Item = SpkWithExpectedTxids>>(
client: &esplora_client::BlockingClient,
start_time: u64,
inserted_txs: &mut HashSet<Txid>,
Expand Down
128 changes: 126 additions & 2 deletions crates/esplora/tests/async_ext.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,139 @@
use bdk_chain::keychain_txout::KeychainTxOutIndex;
use bdk_chain::local_chain::LocalChain;
use bdk_chain::spk_client::{FullScanRequest, SyncRequest};
use bdk_chain::{ConfirmationBlockTime, TxGraph};
use bdk_chain::{ConfirmationBlockTime, IndexedTxGraph, TxGraph};
use bdk_core::bitcoin::key::Secp256k1;
use bdk_esplora::EsploraAsyncExt;
use bdk_testenv::bitcoincore_rpc::json::CreateRawTransactionInput;
use bdk_testenv::bitcoincore_rpc::RawTx;
use esplora_client::{self, Builder};
use std::collections::{BTreeSet, HashSet};
use miniscript::Descriptor;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::str::FromStr;
use std::thread::sleep;
use std::time::Duration;

use bdk_chain::bitcoin::{Address, Amount};
use bdk_testenv::{anyhow, bitcoincore_rpc::RpcApi, TestEnv};

// Ensure that a wallet can detect a malicious replacement of an incoming transaction.
//
// This checks that both the Electrum chain source and the receiving structures properly track the
// replaced transaction as missing.
#[tokio::test]
pub async fn detect_receive_tx_cancel() -> anyhow::Result<()> {
const SEND_TX_FEE: Amount = Amount::from_sat(1000);
const UNDO_SEND_TX_FEE: Amount = Amount::from_sat(2000);

use bdk_chain::keychain_txout::SyncRequestBuilderExt;
let env = TestEnv::new()?;
let rpc_client = env.rpc_client();
let base_url = format!("http://{}", &env.electrsd.esplora_url.clone().unwrap());
let client = Builder::new(base_url.as_str()).build_async()?;

let (receiver_desc, _) = Descriptor::parse_descriptor(&Secp256k1::signing_only(), "tr([73c5da0a/86'/0'/0']xprv9xgqHN7yz9MwCkxsBPN5qetuNdQSUttZNKw1dcYTV4mkaAFiBVGQziHs3NRSWMkCzvgjEe3n9xV8oYywvM8at9yRqyaZVz6TYYhX98VjsUk/0/*)")
.expect("must be valid");
let mut graph = IndexedTxGraph::<ConfirmationBlockTime, _>::new(KeychainTxOutIndex::new(0));
let _ = graph.index.insert_descriptor((), receiver_desc.clone())?;
let (chain, _) = LocalChain::from_genesis_hash(env.bitcoind.client.get_block_hash(0)?);

// Derive the receiving address from the descriptor.
let ((_, receiver_spk), _) = graph.index.reveal_next_spk(()).unwrap();
let receiver_addr = Address::from_script(&receiver_spk, bdk_chain::bitcoin::Network::Regtest)?;

env.mine_blocks(101, None)?;

// Select a UTXO to use as an input for constructing our test transactions.
let selected_utxo = rpc_client
.list_unspent(None, None, None, Some(false), None)?
.into_iter()
// Find a block reward tx.
.find(|utxo| utxo.amount == Amount::from_int_btc(50))
.expect("Must find a block reward UTXO");

// Derive the sender's address from the selected UTXO.
let sender_spk = selected_utxo.script_pub_key.clone();
let sender_addr = Address::from_script(&sender_spk, bdk_chain::bitcoin::Network::Regtest)
.expect("Failed to derive address from UTXO");

// Setup the common inputs used by both `send_tx` and `undo_send_tx`.
let inputs = [CreateRawTransactionInput {
txid: selected_utxo.txid,
vout: selected_utxo.vout,
sequence: None,
}];

// Create and sign the `send_tx` that sends funds to the receiver address.
let send_tx_outputs = HashMap::from([(
receiver_addr.to_string(),
selected_utxo.amount - SEND_TX_FEE,
)]);
let send_tx = rpc_client.create_raw_transaction(&inputs, &send_tx_outputs, None, Some(true))?;
let send_tx = rpc_client
.sign_raw_transaction_with_wallet(send_tx.raw_hex(), None, None)?
.transaction()?;

// Create and sign the `undo_send_tx` transaction. This redirects funds back to the sender
// address.
let undo_send_outputs = HashMap::from([(
sender_addr.to_string(),
selected_utxo.amount - UNDO_SEND_TX_FEE,
)]);
let undo_send_tx =
rpc_client.create_raw_transaction(&inputs, &undo_send_outputs, None, Some(true))?;
let undo_send_tx = rpc_client
.sign_raw_transaction_with_wallet(undo_send_tx.raw_hex(), None, None)?
.transaction()?;

// Sync after broadcasting the `send_tx`. Ensure that we detect and receive the `send_tx`.
let send_txid = env.rpc_client().send_raw_transaction(send_tx.raw_hex())?;
env.wait_until_electrum_sees_txid(send_txid, Duration::from_secs(6))?;
let sync_request = SyncRequest::builder_now()
.chain_tip(chain.tip())
.revealed_spks_from_indexer(&graph.index, ..)
.expected_spk_txids(graph.list_expected_spk_txids(&chain, chain.tip().block_id(), ..));
let sync_response = client.sync(sync_request, 1).await?;
assert!(
sync_response
.tx_update
.txs
.iter()
.any(|tx| tx.compute_txid() == send_txid),
"sync response must include the send_tx"
);
let changeset = graph.apply_update(sync_response.tx_update.clone());
assert!(
changeset.tx_graph.txs.contains(&send_tx),
"tx graph must deem send_tx relevant and include it"
);

// Sync after broadcasting the `undo_send_tx`. Verify that `send_tx` is now missing from the
// mempool.
let undo_send_txid = env
.rpc_client()
.send_raw_transaction(undo_send_tx.raw_hex())?;
env.wait_until_electrum_sees_txid(undo_send_txid, Duration::from_secs(6))?;
let sync_request = SyncRequest::builder_now()
.chain_tip(chain.tip())
.revealed_spks_from_indexer(&graph.index, ..)
.expected_spk_txids(graph.list_expected_spk_txids(&chain, chain.tip().block_id(), ..));
let sync_response = client.sync(sync_request, 1).await?;
assert!(
sync_response
.tx_update
.evicted_ats
.iter()
.any(|(txid, _)| *txid == send_txid),
"sync response must track send_tx as missing from mempool"
);
let changeset = graph.apply_update(sync_response.tx_update.clone());
assert!(
changeset.tx_graph.last_evicted.contains_key(&send_txid),
"tx graph must track send_tx as missing"
);

Ok(())
}
#[tokio::test]
pub async fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> {
let env = TestEnv::new()?;
Expand Down
Loading

0 comments on commit 5107b3a

Please sign in to comment.