Skip to content

Commit 216cd52

Browse files
authored
fix: clean up rocksdb connections during rollbacks (#420)
* fix: only one rocksdb conn * unify context * improve logs * clean up connections * revert sequencing * improve logs * revert logs
1 parent 48b3cce commit 216cd52

File tree

2 files changed

+68
-84
lines changed

2 files changed

+68
-84
lines changed

components/ordhook-core/src/core/pipeline/processors/inscription_indexing.rs

+14-38
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::{
88
use chainhook_postgres::{pg_begin, pg_pool_client};
99
use chainhook_sdk::utils::Context;
1010
use chainhook_types::{BitcoinBlockData, TransactionIdentifier};
11-
use crossbeam_channel::{Sender, TryRecvError};
11+
use crossbeam_channel::TryRecvError;
1212

1313
use dashmap::DashMap;
1414
use fxhash::FxHasher;
@@ -33,11 +33,7 @@ use crate::{
3333
sequence_cursor::SequenceCursor,
3434
},
3535
},
36-
db::{
37-
blocks::{self, open_blocks_db_with_retry},
38-
cursor::TransactionBytesCursor,
39-
ordinals_pg,
40-
},
36+
db::{blocks::open_blocks_db_with_retry, cursor::TransactionBytesCursor, ordinals_pg},
4137
service::PgConnectionPools,
4238
try_crit, try_debug, try_info,
4339
utils::monitoring::PrometheusMonitoring,
@@ -55,7 +51,6 @@ pub fn start_inscription_indexing_processor(
5551
config: &Config,
5652
pg_pools: &PgConnectionPools,
5753
ctx: &Context,
58-
post_processor: Option<Sender<BitcoinBlockData>>,
5954
prometheus: &PrometheusMonitoring,
6055
) -> PostProcessorController {
6156
let (commands_tx, commands_rx) = crossbeam_channel::bounded::<PostProcessorCommand>(2);
@@ -122,7 +117,6 @@ pub fn start_inscription_indexing_processor(
122117
&mut sequence_cursor,
123118
&cache_l2,
124119
&mut brc20_cache,
125-
&post_processor,
126120
&prometheus,
127121
&config,
128122
&pg_pools,
@@ -155,12 +149,11 @@ pub fn start_inscription_indexing_processor(
155149
}
156150
}
157151

158-
pub async fn process_blocks(
152+
async fn process_blocks(
159153
next_blocks: &mut Vec<BitcoinBlockData>,
160154
sequence_cursor: &mut SequenceCursor,
161155
cache_l2: &Arc<DashMap<(u32, [u8; 8]), TransactionBytesCursor, BuildHasherDefault<FxHasher>>>,
162156
brc20_cache: &mut Option<Brc20MemoryCache>,
163-
post_processor: &Option<Sender<BitcoinBlockData>>,
164157
prometheus: &PrometheusMonitoring,
165158
config: &Config,
166159
pg_pools: &PgConnectionPools,
@@ -172,14 +165,7 @@ pub async fn process_blocks(
172165
for _cursor in 0..next_blocks.len() {
173166
let mut block = next_blocks.remove(0);
174167

175-
// Invalidate and recompute cursor when crossing the jubilee height
176-
let jubilee_height =
177-
get_jubilee_block_height(&get_bitcoin_network(&block.metadata.network));
178-
if block.block_identifier.index == jubilee_height {
179-
sequence_cursor.reset();
180-
}
181-
182-
process_block(
168+
index_block(
183169
&mut block,
184170
&next_blocks,
185171
sequence_cursor,
@@ -193,15 +179,12 @@ pub async fn process_blocks(
193179
)
194180
.await?;
195181

196-
if let Some(post_processor_tx) = post_processor {
197-
let _ = post_processor_tx.send(block.clone());
198-
}
199182
updated_blocks.push(block);
200183
}
201184
Ok(updated_blocks)
202185
}
203186

204-
pub async fn process_block(
187+
pub async fn index_block(
205188
block: &mut BitcoinBlockData,
206189
next_blocks: &Vec<BitcoinBlockData>,
207190
sequence_cursor: &mut SequenceCursor,
@@ -217,6 +200,13 @@ pub async fn process_block(
217200
let block_height = block.block_identifier.index;
218201
try_info!(ctx, "Indexing block #{block_height}");
219202

203+
// Invalidate and recompute cursor when crossing the jubilee height
204+
if block.block_identifier.index
205+
== get_jubilee_block_height(&get_bitcoin_network(&block.metadata.network))
206+
{
207+
sequence_cursor.reset();
208+
}
209+
220210
{
221211
let mut ord_client = pg_pool_client(&pg_pools.ordinals).await?;
222212
let ord_tx = pg_begin(&mut ord_client).await?;
@@ -233,16 +223,11 @@ pub async fn process_block(
233223
config,
234224
ctx,
235225
)?;
236-
let inner_ctx = if config.logs.ordinals_internals {
237-
ctx.clone()
238-
} else {
239-
Context::empty()
240-
};
241226
if has_inscription_reveals {
242-
augment_block_with_inscriptions(block, sequence_cursor, cache_l1, &ord_tx, &inner_ctx)
227+
augment_block_with_inscriptions(block, sequence_cursor, cache_l1, &ord_tx, ctx)
243228
.await?;
244229
}
245-
augment_block_with_transfers(block, &ord_tx, &inner_ctx).await?;
230+
augment_block_with_transfers(block, &ord_tx, ctx).await?;
246231

247232
// Write data
248233
ordinals_pg::insert_block(block, &ord_tx).await?;
@@ -294,15 +279,6 @@ pub async fn rollback_block(
294279
ctx: &Context,
295280
) -> Result<(), String> {
296281
try_info!(ctx, "Rolling back block #{block_height}");
297-
// Drop from blocks DB.
298-
let blocks_db = open_blocks_db_with_retry(true, &config, ctx);
299-
blocks::delete_blocks_in_block_range(
300-
block_height as u32,
301-
block_height as u32,
302-
&blocks_db,
303-
&ctx,
304-
);
305-
// Drop from postgres.
306282
{
307283
let mut ord_client = pg_pool_client(&pg_pools.ordinals).await?;
308284
let ord_tx = pg_begin(&mut ord_client).await?;

components/ordhook-core/src/service/mod.rs

+54-46
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@ use crate::core::meta_protocols::brc20::cache::{brc20_new_cache, Brc20MemoryCach
33
use crate::core::pipeline::bitcoind_download_blocks;
44
use crate::core::pipeline::processors::block_archiving::start_block_archiving_processor;
55
use crate::core::pipeline::processors::inscription_indexing::{
6-
process_block, rollback_block, start_inscription_indexing_processor,
6+
index_block, rollback_block, start_inscription_indexing_processor,
77
};
88
use crate::core::protocol::sequence_cursor::SequenceCursor;
99
use crate::core::{
1010
first_inscription_height, new_traversals_lazy_cache, should_sync_ordinals_db,
1111
should_sync_rocks_db,
1212
};
1313
use crate::db::blocks::{
14-
find_missing_blocks, insert_entry_in_blocks, open_blocks_db_with_retry, run_compaction,
14+
self, find_missing_blocks, open_blocks_db_with_retry, run_compaction,
1515
};
1616
use crate::db::cursor::{BlockBytesCursor, TransactionBytesCursor};
1717
use crate::db::ordinals_pg;
@@ -22,8 +22,8 @@ use chainhook_postgres::{pg_begin, pg_pool, pg_pool_client};
2222
use chainhook_sdk::observer::{
2323
start_event_observer, BitcoinBlockDataCached, ObserverEvent, ObserverSidecar,
2424
};
25-
use chainhook_types::BlockIdentifier;
2625
use chainhook_sdk::utils::{BlockHeights, Context};
26+
use chainhook_types::BlockIdentifier;
2727
use crossbeam_channel::select;
2828
use dashmap::DashMap;
2929
use deadpool_postgres::Pool;
@@ -303,7 +303,6 @@ impl Service {
303303
&self.config,
304304
&self.pg_pools,
305305
&self.ctx,
306-
None,
307306
&self.prometheus,
308307
);
309308
try_info!(
@@ -332,63 +331,72 @@ impl Service {
332331

333332
pub async fn chainhook_sidecar_mutate_blocks(
334333
blocks_to_mutate: &mut Vec<BitcoinBlockDataCached>,
335-
blocks_ids_to_rollback: &Vec<BlockIdentifier>,
334+
block_ids_to_rollback: &Vec<BlockIdentifier>,
336335
cache_l2: &Arc<DashMap<(u32, [u8; 8]), TransactionBytesCursor, BuildHasherDefault<FxHasher>>>,
337336
brc20_cache: &mut Option<Brc20MemoryCache>,
338337
prometheus: &PrometheusMonitoring,
339338
config: &Config,
340339
pg_pools: &PgConnectionPools,
341340
ctx: &Context,
342341
) -> Result<(), String> {
343-
let blocks_db_rw = open_blocks_db_with_retry(true, &config, ctx);
344-
345-
for block_id_to_rollback in blocks_ids_to_rollback.iter() {
346-
rollback_block(block_id_to_rollback.index, config, pg_pools, ctx).await?;
342+
if block_ids_to_rollback.len() > 0 {
343+
let blocks_db_rw = open_blocks_db_with_retry(true, &config, ctx);
344+
for block_id in block_ids_to_rollback.iter() {
345+
blocks::delete_blocks_in_block_range(
346+
block_id.index as u32,
347+
block_id.index as u32,
348+
&blocks_db_rw,
349+
&ctx,
350+
);
351+
rollback_block(block_id.index, config, pg_pools, ctx).await?;
352+
}
353+
blocks_db_rw
354+
.flush()
355+
.map_err(|e| format!("error dropping rollback blocks from rocksdb: {e}"))?;
347356
}
348357

349-
for cache in blocks_to_mutate.iter_mut() {
350-
let block_bytes = match BlockBytesCursor::from_standardized_block(&cache.block) {
358+
for cached_block in blocks_to_mutate.iter_mut() {
359+
if cached_block.processed_by_sidecar {
360+
continue;
361+
}
362+
let block_bytes = match BlockBytesCursor::from_standardized_block(&cached_block.block) {
351363
Ok(block_bytes) => block_bytes,
352364
Err(e) => {
353-
try_error!(
354-
ctx,
355-
"Unable to compress block #{}: #{}",
356-
cache.block.block_identifier.index,
357-
e.to_string()
358-
);
359-
continue;
365+
return Err(format!(
366+
"Unable to compress block #{}: #{e}",
367+
cached_block.block.block_identifier.index
368+
));
360369
}
361370
};
362-
363-
insert_entry_in_blocks(
364-
cache.block.block_identifier.index as u32,
365-
&block_bytes,
366-
true,
367-
&blocks_db_rw,
368-
&ctx,
369-
);
370-
blocks_db_rw
371-
.flush()
372-
.map_err(|e| format!("error inserting block to rocksdb: {e}"))?;
373-
374-
if !cache.processed_by_sidecar {
375-
let mut cache_l1 = BTreeMap::new();
376-
let mut sequence_cursor = SequenceCursor::new();
377-
process_block(
378-
&mut cache.block,
379-
&vec![],
380-
&mut sequence_cursor,
381-
&mut cache_l1,
382-
&cache_l2,
383-
brc20_cache.as_mut(),
384-
prometheus,
385-
&config,
386-
pg_pools,
371+
{
372+
let blocks_db_rw = open_blocks_db_with_retry(true, &config, ctx);
373+
blocks::insert_entry_in_blocks(
374+
cached_block.block.block_identifier.index as u32,
375+
&block_bytes,
376+
true,
377+
&blocks_db_rw,
387378
&ctx,
388-
)
389-
.await?;
390-
cache.processed_by_sidecar = true;
379+
);
380+
blocks_db_rw
381+
.flush()
382+
.map_err(|e| format!("error inserting block to rocksdb: {e}"))?;
391383
}
384+
let mut cache_l1 = BTreeMap::new();
385+
let mut sequence_cursor = SequenceCursor::new();
386+
index_block(
387+
&mut cached_block.block,
388+
&vec![],
389+
&mut sequence_cursor,
390+
&mut cache_l1,
391+
&cache_l2,
392+
brc20_cache.as_mut(),
393+
prometheus,
394+
&config,
395+
pg_pools,
396+
&ctx,
397+
)
398+
.await?;
399+
cached_block.processed_by_sidecar = true;
392400
}
393401
Ok(())
394402
}

0 commit comments

Comments
 (0)