@@ -26,6 +26,7 @@ use crate::service::predicates::{
26
26
} ;
27
27
use crate :: service:: runloops:: start_bitcoin_scan_runloop;
28
28
29
+ use chainhook_sdk:: chainhooks:: bitcoin:: BitcoinChainhookOccurrencePayload ;
29
30
use chainhook_sdk:: chainhooks:: types:: {
30
31
BitcoinChainhookSpecification , ChainhookFullSpecification , ChainhookSpecification ,
31
32
} ;
@@ -36,7 +37,7 @@ use chainhook_sdk::observer::{
36
37
use chainhook_sdk:: types:: { BitcoinBlockData , BlockIdentifier } ;
37
38
use chainhook_sdk:: utils:: Context ;
38
39
use crossbeam_channel:: unbounded;
39
- use crossbeam_channel:: { select, Sender } ;
40
+ use crossbeam_channel:: select;
40
41
use dashmap:: DashMap ;
41
42
use fxhash:: FxHasher ;
42
43
use redis:: Commands ;
@@ -56,13 +57,21 @@ impl Service {
56
57
Self { config, ctx }
57
58
}
58
59
59
- pub async fn run ( & mut self , predicates : Vec < ChainhookFullSpecification > ) -> Result < ( ) , String > {
60
+ pub async fn run (
61
+ & mut self ,
62
+ predicates : Vec < ChainhookFullSpecification > ,
63
+ predicate_activity_relayer : Option <
64
+ crossbeam_channel:: Sender < BitcoinChainhookOccurrencePayload > ,
65
+ > ,
66
+ ) -> Result < ( ) , String > {
60
67
let mut event_observer_config = self . config . get_event_observer_config ( ) ;
61
68
let chainhook_config = create_and_consolidate_chainhook_config_with_predicates (
62
69
predicates,
70
+ predicate_activity_relayer. is_some ( ) ,
63
71
& self . config ,
64
72
& self . ctx ,
65
73
) ;
74
+
66
75
event_observer_config. chainhook_config = Some ( chainhook_config) ;
67
76
68
77
let ordhook_config = self . config . get_ordhook_config ( ) ;
@@ -106,9 +115,14 @@ impl Service {
106
115
self . start_main_runloop_with_dynamic_predicates (
107
116
& observer_command_tx,
108
117
observer_event_rx,
118
+ predicate_activity_relayer,
109
119
) ?;
110
120
} else {
111
- self . start_main_runloop ( & observer_command_tx, observer_event_rx) ?;
121
+ self . start_main_runloop (
122
+ & observer_command_tx,
123
+ observer_event_rx,
124
+ predicate_activity_relayer,
125
+ ) ?;
112
126
}
113
127
Ok ( ( ) )
114
128
}
@@ -117,6 +131,9 @@ impl Service {
117
131
& self ,
118
132
_observer_command_tx : & std:: sync:: mpsc:: Sender < ObserverCommand > ,
119
133
observer_event_rx : crossbeam_channel:: Receiver < ObserverEvent > ,
134
+ predicate_activity_relayer : Option <
135
+ crossbeam_channel:: Sender < BitcoinChainhookOccurrencePayload > ,
136
+ > ,
120
137
) -> Result < ( ) , String > {
121
138
loop {
122
139
let event = match observer_event_rx. recv ( ) {
@@ -131,6 +148,11 @@ impl Service {
131
148
}
132
149
} ;
133
150
match event {
151
+ ObserverEvent :: BitcoinPredicateTriggered ( data) => {
152
+ if let Some ( ref tx) = predicate_activity_relayer {
153
+ let _ = tx. send ( data) ;
154
+ }
155
+ }
134
156
ObserverEvent :: Terminate => {
135
157
info ! ( self . ctx. expect_logger( ) , "Terminating runloop" ) ;
136
158
break ;
@@ -145,6 +167,9 @@ impl Service {
145
167
& self ,
146
168
observer_command_tx : & std:: sync:: mpsc:: Sender < ObserverCommand > ,
147
169
observer_event_rx : crossbeam_channel:: Receiver < ObserverEvent > ,
170
+ predicate_activity_relayer : Option <
171
+ crossbeam_channel:: Sender < BitcoinChainhookOccurrencePayload > ,
172
+ > ,
148
173
) -> Result < ( ) , String > {
149
174
let PredicatesApi :: On ( ref api_config) = self . config . http_api else {
150
175
return Ok ( ( ) )
@@ -282,6 +307,11 @@ impl Service {
282
307
}
283
308
}
284
309
}
310
+ ObserverEvent :: BitcoinPredicateTriggered ( data) => {
311
+ if let Some ( ref tx) = predicate_activity_relayer {
312
+ let _ = tx. send ( data) ;
313
+ }
314
+ }
285
315
ObserverEvent :: Terminate => {
286
316
info ! ( self . ctx. expect_logger( ) , "Terminating runloop" ) ;
287
317
break ;
0 commit comments