Skip to content

Commit b567322

Browse files
author
Ludo Galabru
committed
fix: lazy block approach
1 parent aa5e418 commit b567322

File tree

1 file changed

+154
-31
lines changed
  • components/chainhook-event-observer/src/hord/db

1 file changed

+154
-31
lines changed

components/chainhook-event-observer/src/hord/db/mod.rs

+154-31
Original file line numberDiff line numberDiff line change
@@ -330,23 +330,36 @@ impl CompactedBlock {
330330
Ok(())
331331
}
332332

333-
fn serialize_to_lazy_format<W: Write>(&self, fd: &mut W) -> std::io::Result<()> {
333+
pub fn serialize_to_lazy_format<W: Write>(&self, fd: &mut W) -> std::io::Result<()> {
334+
// Number of transactions in the block (not including coinbase)
334335
let tx_len = self.0 .1.len() as u16;
335336
fd.write(&tx_len.to_be_bytes())?;
337+
// For each transaction:
336338
for (_, inputs, outputs) in self.0 .1.iter() {
337-
let inputs_len = inputs.len() as u8;
338-
let outputs_len = outputs.len() as u8;
339-
fd.write(&[inputs_len])?;
340-
fd.write(&[outputs_len])?;
339+
let inputs_len = inputs.len() as u16;
340+
let outputs_len = outputs.len() as u16;
341+
// Number of inputs
342+
fd.write(&inputs_len.to_be_bytes())?;
343+
// Number of outputs
344+
fd.write(&outputs_len.to_be_bytes())?;
341345
}
346+
// Coinbase transaction
342347
fd.write_all(&self.0 .0 .0)?;
348+
//
343349
fd.write(&self.0 .0 .1.to_be_bytes())?;
350+
// For each transaction
344351
for (id, inputs, outputs) in self.0 .1.iter() {
352+
// Transaction id
345353
fd.write_all(id)?;
354+
// For each input
346355
for (txid, block, vout, value) in inputs.iter() {
356+
// Txin id
347357
fd.write_all(txid)?;
358+
// Block height
348359
fd.write(&block.to_be_bytes())?;
360+
// Vout
349361
fd.write(&vout.to_be_bytes())?;
362+
// Value
350363
fd.write(&value.to_be_bytes())?;
351364
}
352365
for value in outputs.iter() {
@@ -418,7 +431,10 @@ fn get_default_hord_db_file_path_rocks_db(base_dir: &PathBuf) -> PathBuf {
418431
fn rocks_db_default_options() -> rocksdb::Options {
419432
let mut opts = rocksdb::Options::default();
420433
opts.create_if_missing(true);
421-
opts.set_disable_auto_compactions(true);
434+
// opts.prepare_for_bulk_load();
435+
// opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
436+
// opts.set_blob_compression_type(rocksdb::DBCompressionType::Lz4);
437+
// opts.increase_parallelism(parallelism)
422438
// Per rocksdb's documentation:
423439
// If cache_index_and_filter_blocks is false (which is default),
424440
// the number of index/filter blocks is controlled by option max_open_files.
@@ -501,6 +517,21 @@ pub fn insert_entry_in_blocks(
501517
.expect("unable to insert metadata");
502518
}
503519

520+
pub fn insert_entry_in_blocks_lazy_block(
521+
block_height: u32,
522+
lazy_block: &LazyBlock,
523+
blocks_db_rw: &DB,
524+
_ctx: &Context,
525+
) {
526+
let block_height_bytes = block_height.to_be_bytes();
527+
blocks_db_rw
528+
.put(&block_height_bytes, &lazy_block.bytes)
529+
.expect("unable to insert blocks");
530+
blocks_db_rw
531+
.put(b"metadata::last_insert", block_height_bytes)
532+
.expect("unable to insert metadata");
533+
}
534+
504535
pub fn find_last_block_inserted(blocks_db: &DB) -> u32 {
505536
match blocks_db.get(b"metadata::last_insert") {
506537
Ok(Some(bytes)) => u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]),
@@ -534,7 +565,11 @@ pub fn find_block_at_block_height(
534565
}
535566
}
536567

537-
pub fn find_lazy_block_at_block_height(block_height: u32, blocks_db: &DB) -> Option<LazyBlock> {
568+
pub fn find_lazy_block_at_block_height(
569+
block_height: u32,
570+
retry: u8,
571+
blocks_db: &DB,
572+
) -> Option<LazyBlock> {
538573
match blocks_db.get(block_height.to_be_bytes()) {
539574
Ok(Some(res)) => Some(LazyBlock::new(res)),
540575
_ => None,
@@ -1360,6 +1395,9 @@ pub fn retrieve_satoshi_point_using_lazy_storage(
13601395
block_identifier: &BlockIdentifier,
13611396
transaction_identifier: &TransactionIdentifier,
13621397
inscription_number: u64,
1398+
traversals_cache: Arc<
1399+
DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>,
1400+
>,
13631401
ctx: &Context,
13641402
) -> Result<TraversalResult, String> {
13651403
ctx.try_log(|logger| {
@@ -1381,22 +1419,86 @@ pub fn retrieve_satoshi_point_using_lazy_storage(
13811419
};
13821420
let mut tx_cursor = (txid, 0);
13831421
let mut hops: u32 = 0;
1384-
let mut local_block_cache = HashMap::new();
13851422
loop {
1386-
local_block_cache.clear();
1387-
13881423
hops += 1;
1389-
let lazy_block = match local_block_cache.get(&ordinal_block_number) {
1390-
Some(block) => block,
1391-
None => match find_lazy_block_at_block_height(ordinal_block_number, &blocks_db) {
1392-
Some(block) => {
1393-
local_block_cache.insert(ordinal_block_number, block);
1394-
local_block_cache.get(&ordinal_block_number).unwrap()
1424+
if hops as u64 > block_identifier.index {
1425+
return Err(format!(
1426+
"Unable to process transaction {}, manual investigation required",
1427+
transaction_identifier.hash
1428+
));
1429+
}
1430+
1431+
if let Some(cached_tx) = traversals_cache.get(&(ordinal_block_number, tx_cursor.0)) {
1432+
let tx = cached_tx.value();
1433+
let mut next_found_in_cache = false;
1434+
let mut sats_out = 0;
1435+
for (index, output_value) in tx.outputs.iter().enumerate() {
1436+
if index == tx_cursor.1 {
1437+
break;
13951438
}
1396-
None => {
1397-
return Err(format!("block #{ordinal_block_number} not in database"));
1439+
// ctx.try_log(|logger| {
1440+
// slog::info!(logger, "Adding {} from output #{}", output_value, index)
1441+
// });
1442+
sats_out += output_value;
1443+
}
1444+
sats_out += ordinal_offset;
1445+
// ctx.try_log(|logger| {
1446+
// slog::info!(
1447+
// logger,
1448+
// "Adding offset {ordinal_offset} to sats_out {sats_out}"
1449+
// )
1450+
// });
1451+
1452+
let mut sats_in = 0;
1453+
for input in tx.inputs.iter() {
1454+
sats_in += input.txin_value;
1455+
// ctx.try_log(|logger| {
1456+
// slog::info!(
1457+
// logger,
1458+
// "Adding txin_value {txin_value} to sats_in {sats_in} (txin: {})",
1459+
// hex::encode(&txin)
1460+
// )
1461+
// });
1462+
1463+
if sats_out < sats_in {
1464+
ordinal_offset = sats_out - (sats_in - input.txin_value);
1465+
ordinal_block_number = input.block_height;
1466+
1467+
// ctx.try_log(|logger| slog::info!(logger, "Block {ordinal_block_number} / Tx {} / [in:{sats_in}, out:{sats_out}]: {block_height} -> {ordinal_block_number}:{ordinal_offset} -> {}:{vout}",
1468+
// hex::encode(&txid_n),
1469+
// hex::encode(&txin)));
1470+
tx_cursor = (input.txin.clone(), input.vout as usize);
1471+
next_found_in_cache = true;
1472+
break;
13981473
}
1399-
},
1474+
}
1475+
1476+
if next_found_in_cache {
1477+
continue;
1478+
}
1479+
1480+
if sats_in == 0 {
1481+
ctx.try_log(|logger| {
1482+
slog::error!(
1483+
logger,
1484+
"Transaction {} is originating from a non spending transaction",
1485+
transaction_identifier.hash
1486+
)
1487+
});
1488+
return Ok(TraversalResult {
1489+
inscription_number: 0,
1490+
ordinal_number: 0,
1491+
transfers: 0,
1492+
});
1493+
}
1494+
}
1495+
1496+
let lazy_block = match find_lazy_block_at_block_height(ordinal_block_number, 3, &blocks_db)
1497+
{
1498+
Some(block) => block,
1499+
None => {
1500+
return Err(format!("block #{ordinal_block_number} not in database"));
1501+
}
14001502
};
14011503

14021504
let coinbase_txid = lazy_block.get_coinbase_txid();
@@ -1472,7 +1574,7 @@ pub fn retrieve_satoshi_point_using_lazy_storage(
14721574
sats_out += ordinal_offset;
14731575

14741576
let mut sats_in = 0;
1475-
for input in lazy_tx.inputs.into_iter() {
1577+
for input in lazy_tx.inputs.iter() {
14761578
sats_in += input.txin_value;
14771579
// ctx.try_log(|logger| {
14781580
// slog::info!(
@@ -1493,6 +1595,23 @@ pub fn retrieve_satoshi_point_using_lazy_storage(
14931595
break;
14941596
}
14951597
}
1598+
1599+
traversals_cache.insert((ordinal_block_number, tx_cursor.0), lazy_tx);
1600+
1601+
if sats_in == 0 {
1602+
ctx.try_log(|logger| {
1603+
slog::error!(
1604+
logger,
1605+
"Transaction {} is originating from a non spending transaction",
1606+
transaction_identifier.hash
1607+
)
1608+
});
1609+
return Ok(TraversalResult {
1610+
inscription_number: 0,
1611+
ordinal_number: 0,
1612+
transfers: 0,
1613+
});
1614+
}
14961615
}
14971616
}
14981617

@@ -1523,13 +1642,13 @@ pub struct LazyBlockTransaction {
15231642
pub struct LazyBlockTransactionInput {
15241643
pub txin: [u8; 8],
15251644
pub block_height: u32,
1526-
pub vout: u8,
1645+
pub vout: u16,
15271646
pub txin_value: u64,
15281647
}
15291648

15301649
const TXID_LEN: usize = 8;
15311650
const SATS_LEN: usize = 8;
1532-
const INPUT_SIZE: usize = 8 + 4 + 1 + 8;
1651+
const INPUT_SIZE: usize = TXID_LEN + 4 + 2 + SATS_LEN;
15331652
const OUTPUT_SIZE: usize = 8;
15341653

15351654
impl LazyBlock {
@@ -1539,7 +1658,7 @@ impl LazyBlock {
15391658
}
15401659

15411660
pub fn get_coinbase_data_pos(&self) -> usize {
1542-
(2 + self.tx_len * 2) as usize
1661+
(2 + self.tx_len * 2 * 2) as usize
15431662
}
15441663

15451664
pub fn get_u64_at_pos(&self, pos: usize) -> u64 {
@@ -1569,10 +1688,14 @@ impl LazyBlock {
15691688
self.get_coinbase_data_pos() + TXID_LEN + SATS_LEN
15701689
}
15711690

1572-
pub fn get_transaction_format(&self, index: u16) -> (u8, u8, usize) {
1573-
let inputs_len_pos = (2 + index * 2) as usize;
1574-
let inputs = self.bytes[inputs_len_pos];
1575-
let outputs = self.bytes[inputs_len_pos + 1];
1691+
pub fn get_transaction_format(&self, index: u16) -> (u16, u16, usize) {
1692+
let inputs_len_pos = (2 + index * 2 * 2) as usize;
1693+
let inputs =
1694+
u16::from_be_bytes([self.bytes[inputs_len_pos], self.bytes[inputs_len_pos + 1]]);
1695+
let outputs = u16::from_be_bytes([
1696+
self.bytes[inputs_len_pos + 2],
1697+
self.bytes[inputs_len_pos + 3],
1698+
]);
15761699
let size = TXID_LEN + (inputs as usize * INPUT_SIZE) + (outputs as usize * OUTPUT_SIZE);
15771700
(inputs, outputs, size)
15781701
}
@@ -1581,8 +1704,8 @@ impl LazyBlock {
15811704
&self,
15821705
cursor: &mut Cursor<&Vec<u8>>,
15831706
txid: [u8; 8],
1584-
inputs_len: u8,
1585-
outputs_len: u8,
1707+
inputs_len: u16,
1708+
outputs_len: u16,
15861709
) -> LazyBlockTransaction {
15871710
let mut inputs = Vec::with_capacity(inputs_len as usize);
15881711
for _ in 0..inputs_len {
@@ -1592,14 +1715,14 @@ impl LazyBlock {
15921715
cursor
15931716
.read_exact(&mut block_height)
15941717
.expect("data corrupted");
1595-
let mut vout = [0u8; 1];
1718+
let mut vout = [0u8; 2];
15961719
cursor.read_exact(&mut vout).expect("data corrupted");
15971720
let mut txin_value = [0u8; 8];
15981721
cursor.read_exact(&mut txin_value).expect("data corrupted");
15991722
inputs.push(LazyBlockTransactionInput {
16001723
txin: txin,
16011724
block_height: u32::from_be_bytes(block_height),
1602-
vout: vout[0],
1725+
vout: u16::from_be_bytes(vout),
16031726
txin_value: u64::from_be_bytes(txin_value),
16041727
});
16051728
}

0 commit comments

Comments
 (0)