@@ -46,7 +46,7 @@ use std::str;
46
46
use std:: str:: FromStr ;
47
47
use std:: sync:: mpsc:: { Receiver , Sender } ;
48
48
use std:: sync:: { Arc , Mutex , RwLock } ;
49
- use std:: time:: Duration ;
49
+ use std:: time:: { Duration , SystemTime , UNIX_EPOCH } ;
50
50
#[ cfg( feature = "zeromq" ) ]
51
51
use zeromq:: { Socket , SocketRecv } ;
52
52
@@ -359,6 +359,35 @@ impl ChainhookStore {
359
359
}
360
360
}
361
361
362
+ #[ derive( Debug , Default , Serialize , Clone ) ]
363
+ pub struct ReorgMetrics {
364
+ timestamp : i64 ,
365
+ applied_blocks : usize ,
366
+ rolled_back_blocks : usize ,
367
+ }
368
+
369
+ #[ derive( Debug , Default , Serialize , Clone ) ]
370
+ pub struct ChainMetrics {
371
+ pub tip_height : u64 ,
372
+ pub last_reorg : Option < ReorgMetrics > ,
373
+ pub last_block_ingestion_at : u128 ,
374
+ pub registered_predicates : usize ,
375
+ pub deregistered_predicates : usize ,
376
+ }
377
+
378
+ impl ChainMetrics {
379
+ pub fn deregister_prediate ( & mut self ) {
380
+ self . registered_predicates -= 1 ;
381
+ self . deregistered_predicates += 1 ;
382
+ }
383
+ }
384
+
385
+ #[ derive( Debug , Default , Serialize , Clone ) ]
386
+ pub struct ObserverMetrics {
387
+ pub bitcoin : ChainMetrics ,
388
+ pub stacks : ChainMetrics ,
389
+ }
390
+
362
391
pub async fn start_event_observer (
363
392
mut config : EventObserverConfig ,
364
393
observer_commands_tx : Sender < ObserverCommand > ,
@@ -409,6 +438,18 @@ pub async fn start_event_observer(
409
438
410
439
let background_job_tx_mutex = Arc :: new ( Mutex :: new ( observer_commands_tx. clone ( ) ) ) ;
411
440
441
+ let observer_metrics = ObserverMetrics {
442
+ bitcoin : ChainMetrics {
443
+ registered_predicates : chainhook_store. predicates . bitcoin_chainhooks . len ( ) ,
444
+ ..Default :: default ( )
445
+ } ,
446
+ stacks : ChainMetrics {
447
+ registered_predicates : chainhook_store. predicates . stacks_chainhooks . len ( ) ,
448
+ ..Default :: default ( )
449
+ } ,
450
+ } ;
451
+ let observer_metrics_rw_lock = Arc :: new ( RwLock :: new ( observer_metrics) ) ;
452
+
412
453
let limits = Limits :: default ( ) . limit ( "json" , 20 . megabytes ( ) ) ;
413
454
let mut shutdown_config = config:: Shutdown :: default ( ) ;
414
455
shutdown_config. ctrlc = false ;
@@ -451,6 +492,7 @@ pub async fn start_event_observer(
451
492
. manage ( background_job_tx_mutex)
452
493
. manage ( bitcoin_config)
453
494
. manage ( ctx_cloned)
495
+ . manage ( observer_metrics_rw_lock. clone ( ) )
454
496
. mount ( "/" , routes)
455
497
. ignite ( )
456
498
. await ?;
@@ -470,6 +512,7 @@ pub async fn start_event_observer(
470
512
observer_commands_rx,
471
513
observer_events_tx,
472
514
ingestion_shutdown,
515
+ observer_metrics_rw_lock. clone ( ) ,
473
516
ctx,
474
517
)
475
518
. await
@@ -653,6 +696,7 @@ pub async fn start_observer_commands_handler(
653
696
observer_commands_rx : Receiver < ObserverCommand > ,
654
697
observer_events_tx : Option < crossbeam_channel:: Sender < ObserverEvent > > ,
655
698
ingestion_shutdown : Option < Shutdown > ,
699
+ observer_metrics : Arc < RwLock < ObserverMetrics > > ,
656
700
ctx : Context ,
657
701
) -> Result < ( ) , Box < dyn Error > > {
658
702
let mut chainhooks_occurrences_tracker: HashMap < String , u64 > = HashMap :: new ( ) ;
@@ -728,6 +772,21 @@ pub async fn start_observer_commands_handler(
728
772
}
729
773
} ;
730
774
} ;
775
+ match observer_metrics. write ( ) {
776
+ Ok ( mut metrics) => {
777
+ if new_block. block_identifier . index > metrics. bitcoin . tip_height {
778
+ metrics. bitcoin . tip_height = new_block. block_identifier . index ;
779
+ }
780
+ metrics. bitcoin . last_block_ingestion_at = SystemTime :: now ( )
781
+ . duration_since ( UNIX_EPOCH )
782
+ . expect ( "Could not get current time in ms" )
783
+ . as_millis ( )
784
+ . into ( ) ;
785
+ }
786
+ Err ( e) => ctx. try_log ( |logger| {
787
+ slog:: warn!( logger, "unable to acquire observer_metrics_rw_lock:{}" , e)
788
+ } ) ,
789
+ } ;
731
790
bitcoin_block_store. insert ( new_block. block_identifier . clone ( ) , new_block) ;
732
791
}
733
792
ObserverCommand :: CacheBitcoinBlock ( block) => {
@@ -974,6 +1033,29 @@ pub async fn start_observer_commands_handler(
974
1033
}
975
1034
}
976
1035
1036
+ match blocks_to_apply
1037
+ . iter ( )
1038
+ . max_by_key ( |b| b. block_identifier . index )
1039
+ {
1040
+ Some ( highest_tip_block) => match observer_metrics. write ( ) {
1041
+ Ok ( mut metrics) => {
1042
+ metrics. bitcoin . last_reorg = Some ( ReorgMetrics {
1043
+ timestamp : highest_tip_block. timestamp . into ( ) ,
1044
+ applied_blocks : blocks_to_apply. len ( ) ,
1045
+ rolled_back_blocks : blocks_to_rollback. len ( ) ,
1046
+ } ) ;
1047
+ }
1048
+ Err ( e) => ctx. try_log ( |logger| {
1049
+ slog:: warn!(
1050
+ logger,
1051
+ "unable to acquire observer_metrics_rw_lock:{}" ,
1052
+ e
1053
+ )
1054
+ } ) ,
1055
+ } ,
1056
+ None => { }
1057
+ }
1058
+
977
1059
BitcoinChainEvent :: ChainUpdatedWithReorg ( BitcoinChainUpdatedWithReorgData {
978
1060
blocks_to_apply,
979
1061
blocks_to_rollback,
@@ -1108,6 +1190,17 @@ pub async fn start_observer_commands_handler(
1108
1190
ChainhookSpecification :: Bitcoin ( chainhook) ,
1109
1191
) ) ;
1110
1192
}
1193
+
1194
+ match observer_metrics. write ( ) {
1195
+ Ok ( mut metrics) => metrics. bitcoin . deregister_prediate ( ) ,
1196
+ Err ( e) => ctx. try_log ( |logger| {
1197
+ slog:: warn!(
1198
+ logger,
1199
+ "unable to acquire observer_metrics_rw_lock:{}" ,
1200
+ e
1201
+ )
1202
+ } ) ,
1203
+ }
1111
1204
}
1112
1205
}
1113
1206
@@ -1157,6 +1250,66 @@ pub async fn start_observer_commands_handler(
1157
1250
stacks_chainhooks. len( )
1158
1251
)
1159
1252
} ) ;
1253
+ // track stacks chain metrics
1254
+ match & chain_event {
1255
+ StacksChainEvent :: ChainUpdatedWithBlocks ( update) => {
1256
+ match update
1257
+ . new_blocks
1258
+ . iter ( )
1259
+ . max_by_key ( |b| b. block . block_identifier . index )
1260
+ {
1261
+ Some ( highest_tip_update) => match observer_metrics. write ( ) {
1262
+ Ok ( mut metrics) => {
1263
+ if highest_tip_update. block . block_identifier . index
1264
+ > metrics. stacks . tip_height
1265
+ {
1266
+ metrics. stacks . tip_height =
1267
+ highest_tip_update. block . block_identifier . index ;
1268
+ }
1269
+ metrics. stacks . last_block_ingestion_at = SystemTime :: now ( )
1270
+ . duration_since ( UNIX_EPOCH )
1271
+ . expect ( "Could not get current time in ms" )
1272
+ . as_millis ( )
1273
+ . into ( ) ;
1274
+ }
1275
+ Err ( e) => ctx. try_log ( |logger| {
1276
+ slog:: warn!(
1277
+ logger,
1278
+ "unable to acquire observer_metrics_rw_lock:{}" ,
1279
+ e
1280
+ )
1281
+ } ) ,
1282
+ } ,
1283
+ None => { }
1284
+ }
1285
+ }
1286
+ StacksChainEvent :: ChainUpdatedWithReorg ( update) => {
1287
+ match update
1288
+ . blocks_to_apply
1289
+ . iter ( )
1290
+ . max_by_key ( |b| b. block . block_identifier . index )
1291
+ {
1292
+ Some ( highest_tip_update) => match observer_metrics. write ( ) {
1293
+ Ok ( mut metrics) => {
1294
+ metrics. stacks . last_reorg = Some ( ReorgMetrics {
1295
+ timestamp : highest_tip_update. block . timestamp . into ( ) ,
1296
+ applied_blocks : update. blocks_to_apply . len ( ) ,
1297
+ rolled_back_blocks : update. blocks_to_rollback . len ( ) ,
1298
+ } ) ;
1299
+ }
1300
+ Err ( e) => ctx. try_log ( |logger| {
1301
+ slog:: warn!(
1302
+ logger,
1303
+ "unable to acquire observer_metrics_rw_lock:{}" ,
1304
+ e
1305
+ )
1306
+ } ) ,
1307
+ } ,
1308
+ None => { }
1309
+ }
1310
+ }
1311
+ _ => { }
1312
+ }
1160
1313
1161
1314
// process hooks
1162
1315
let ( predicates_triggered, predicates_evaluated) =
@@ -1241,6 +1394,17 @@ pub async fn start_observer_commands_handler(
1241
1394
ChainhookSpecification :: Stacks ( chainhook) ,
1242
1395
) ) ;
1243
1396
}
1397
+
1398
+ match observer_metrics. write ( ) {
1399
+ Ok ( mut metrics) => metrics. stacks . deregister_prediate ( ) ,
1400
+ Err ( e) => ctx. try_log ( |logger| {
1401
+ slog:: warn!(
1402
+ logger,
1403
+ "unable to acquire observer_metrics_rw_lock:{}" ,
1404
+ e
1405
+ )
1406
+ } ) ,
1407
+ }
1244
1408
}
1245
1409
}
1246
1410
@@ -1286,7 +1450,7 @@ pub async fn start_observer_commands_handler(
1286
1450
. predicates
1287
1451
. register_full_specification ( networks, spec)
1288
1452
{
1289
- Ok ( uuid ) => uuid ,
1453
+ Ok ( spec ) => spec ,
1290
1454
Err ( e) => {
1291
1455
ctx. try_log ( |logger| {
1292
1456
slog:: error!(
@@ -1300,11 +1464,25 @@ pub async fn start_observer_commands_handler(
1300
1464
} ;
1301
1465
ctx. try_log ( |logger| slog:: info!( logger, "Registering chainhook {}" , spec. uuid( ) , ) ) ;
1302
1466
if let Some ( ref tx) = observer_events_tx {
1303
- let _ = tx. send ( ObserverEvent :: PredicateRegistered ( spec) ) ;
1467
+ let _ = tx. send ( ObserverEvent :: PredicateRegistered ( spec. clone ( ) ) ) ;
1304
1468
} else {
1305
1469
ctx. try_log ( |logger| slog:: info!( logger, "Enabling Predicate {}" , spec. uuid( ) ) ) ;
1306
1470
chainhook_store. predicates . enable_specification ( & mut spec) ;
1307
1471
}
1472
+
1473
+ match observer_metrics. write ( ) {
1474
+ Ok ( mut metrics) => match spec {
1475
+ ChainhookSpecification :: Bitcoin ( _) => {
1476
+ metrics. bitcoin . registered_predicates += 1
1477
+ }
1478
+ ChainhookSpecification :: Stacks ( _) => {
1479
+ metrics. stacks . registered_predicates += 1
1480
+ }
1481
+ } ,
1482
+ Err ( e) => ctx. try_log ( |logger| {
1483
+ slog:: warn!( logger, "unable to acquire observer_metrics_rw_lock:{}" , e)
1484
+ } ) ,
1485
+ } ;
1308
1486
}
1309
1487
ObserverCommand :: EnablePredicate ( mut spec) => {
1310
1488
ctx. try_log ( |logger| slog:: info!( logger, "Enabling Predicate {}" , spec. uuid( ) ) ) ;
@@ -1323,6 +1501,13 @@ pub async fn start_observer_commands_handler(
1323
1501
ChainhookSpecification :: Stacks ( hook) ,
1324
1502
) ) ;
1325
1503
}
1504
+
1505
+ match observer_metrics. write ( ) {
1506
+ Ok ( mut metrics) => metrics. stacks . deregister_prediate ( ) ,
1507
+ Err ( e) => ctx. try_log ( |logger| {
1508
+ slog:: warn!( logger, "unable to acquire observer_metrics_rw_lock:{}" , e)
1509
+ } ) ,
1510
+ }
1326
1511
}
1327
1512
ObserverCommand :: DeregisterBitcoinPredicate ( hook_uuid) => {
1328
1513
ctx. try_log ( |logger| {
@@ -1335,6 +1520,13 @@ pub async fn start_observer_commands_handler(
1335
1520
let _ = tx. send ( ObserverEvent :: PredicateDeregistered (
1336
1521
ChainhookSpecification :: Bitcoin ( hook) ,
1337
1522
) ) ;
1523
+
1524
+ match observer_metrics. write ( ) {
1525
+ Ok ( mut metrics) => metrics. bitcoin . deregister_prediate ( ) ,
1526
+ Err ( e) => ctx. try_log ( |logger| {
1527
+ slog:: warn!( logger, "unable to acquire observer_metrics_rw_lock:{}" , e)
1528
+ } ) ,
1529
+ }
1338
1530
}
1339
1531
}
1340
1532
}
0 commit comments