1
1
use crate :: config:: Config ;
2
- use chainhook_types:: StacksNetwork ;
2
+ use chainhook_event_observer:: utils:: Context ;
3
+ use chainhook_types:: { BitcoinNetwork , StacksNetwork } ;
3
4
use clarinet_files:: FileLocation ;
4
5
use flate2:: read:: GzDecoder ;
5
6
use futures_util:: StreamExt ;
@@ -15,13 +16,21 @@ pub fn default_tsv_sha_file_path(network: &StacksNetwork) -> String {
15
16
format ! ( "{:?}-stacks-events.sha256" , network) . to_lowercase ( )
16
17
}
17
18
19
+ pub fn default_sqlite_file_path ( _network : & BitcoinNetwork ) -> String {
20
+ format ! ( "hord.sqlite" ) . to_lowercase ( )
21
+ }
22
+
23
+ pub fn default_sqlite_sha_file_path ( _network : & BitcoinNetwork ) -> String {
24
+ format ! ( "hord.sqlite.sha256" ) . to_lowercase ( )
25
+ }
26
+
18
27
pub async fn download_tsv_file ( config : & Config ) -> Result < ( ) , String > {
19
28
let mut destination_path = config. expected_cache_path ( ) ;
20
29
std:: fs:: create_dir_all ( & destination_path) . unwrap_or_else ( |e| {
21
30
println ! ( "{}" , e. to_string( ) ) ;
22
31
} ) ;
23
32
24
- let remote_sha_url = config. expected_remote_tsv_sha256 ( ) ;
33
+ let remote_sha_url = config. expected_remote_stacks_tsv_sha256 ( ) ;
25
34
let res = reqwest:: get ( & remote_sha_url)
26
35
. await
27
36
. or ( Err ( format ! ( "Failed to GET from '{}'" , & remote_sha_url) ) ) ?
@@ -32,11 +41,10 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> {
32
41
let mut local_sha_file_path = destination_path. clone ( ) ;
33
42
local_sha_file_path. push ( default_tsv_sha_file_path ( & config. network . stacks_network ) ) ;
34
43
35
- println ! ( "1" ) ;
36
44
let local_sha_file = FileLocation :: from_path ( local_sha_file_path) ;
37
45
let _ = local_sha_file. write_content ( & res. to_vec ( ) ) ;
38
46
39
- let file_url = config. expected_remote_tsv_url ( ) ;
47
+ let file_url = config. expected_remote_stacks_tsv_url ( ) ;
40
48
let res = reqwest:: get ( & file_url)
41
49
. await
42
50
. or ( Err ( format ! ( "Failed to GET from '{}'" , & file_url) ) ) ?;
@@ -76,6 +84,68 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> {
76
84
Ok ( ( ) )
77
85
}
78
86
87
+ pub async fn download_sqlite_file ( config : & Config ) -> Result < ( ) , String > {
88
+ let mut destination_path = config. expected_cache_path ( ) ;
89
+ std:: fs:: create_dir_all ( & destination_path) . unwrap_or_else ( |e| {
90
+ println ! ( "{}" , e. to_string( ) ) ;
91
+ } ) ;
92
+
93
+ let remote_sha_url = config. expected_remote_ordinals_sqlite_sha256 ( ) ;
94
+ let res = reqwest:: get ( & remote_sha_url)
95
+ . await
96
+ . or ( Err ( format ! ( "Failed to GET from '{}'" , & remote_sha_url) ) ) ?
97
+ . bytes ( )
98
+ . await
99
+ . or ( Err ( format ! ( "Failed to GET from '{}'" , & remote_sha_url) ) ) ?;
100
+
101
+ let mut local_sha_file_path = destination_path. clone ( ) ;
102
+ local_sha_file_path. push ( default_sqlite_sha_file_path (
103
+ & config. network . bitcoin_network ,
104
+ ) ) ;
105
+
106
+ let local_sha_file = FileLocation :: from_path ( local_sha_file_path) ;
107
+ let _ = local_sha_file. write_content ( & res. to_vec ( ) ) ;
108
+
109
+ let file_url = config. expected_remote_ordinals_sqlite_url ( ) ;
110
+ let res = reqwest:: get ( & file_url)
111
+ . await
112
+ . or ( Err ( format ! ( "Failed to GET from '{}'" , & file_url) ) ) ?;
113
+
114
+ // Download chunks
115
+ let ( tx, rx) = flume:: bounded ( 0 ) ;
116
+ destination_path. push ( default_sqlite_file_path ( & config. network . bitcoin_network ) ) ;
117
+
118
+ let decoder_thread = std:: thread:: spawn ( move || {
119
+ let input = ChannelRead :: new ( rx) ;
120
+ let mut decoder = GzDecoder :: new ( input) ;
121
+ let mut content = Vec :: new ( ) ;
122
+ let _ = decoder. read_to_end ( & mut content) ;
123
+ let mut file = fs:: File :: create ( & destination_path) . unwrap ( ) ;
124
+ if let Err ( e) = file. write_all ( & content[ ..] ) {
125
+ println ! ( "unable to write file: {}" , e. to_string( ) ) ;
126
+ std:: process:: exit ( 1 ) ;
127
+ }
128
+ } ) ;
129
+
130
+ if res. status ( ) == reqwest:: StatusCode :: OK {
131
+ let mut stream = res. bytes_stream ( ) ;
132
+ while let Some ( item) = stream. next ( ) . await {
133
+ let chunk = item. or ( Err ( format ! ( "Error while downloading file" ) ) ) ?;
134
+ tx. send_async ( chunk. to_vec ( ) )
135
+ . await
136
+ . map_err ( |e| format ! ( "unable to download stacks event: {}" , e. to_string( ) ) ) ?;
137
+ }
138
+ drop ( tx) ;
139
+ }
140
+
141
+ tokio:: task:: spawn_blocking ( || decoder_thread. join ( ) )
142
+ . await
143
+ . unwrap ( )
144
+ . unwrap ( ) ;
145
+
146
+ Ok ( ( ) )
147
+ }
148
+
79
149
// Wrap a channel into something that impls `io::Read`
80
150
struct ChannelRead {
81
151
rx : flume:: Receiver < Vec < u8 > > ,
@@ -105,3 +175,129 @@ impl Read for ChannelRead {
105
175
self . current . read ( buf)
106
176
}
107
177
}
178
+
179
+ pub async fn download_stacks_dataset_if_required ( config : & mut Config , ctx : & Context ) -> bool {
180
+ if config. is_initial_ingestion_required ( ) {
181
+ // Download default tsv.
182
+ if config. rely_on_remote_stacks_tsv ( ) && config. should_download_remote_stacks_tsv ( ) {
183
+ let url = config. expected_remote_stacks_tsv_url ( ) ;
184
+ let mut tsv_file_path = config. expected_cache_path ( ) ;
185
+ tsv_file_path. push ( default_tsv_file_path ( & config. network . stacks_network ) ) ;
186
+ let mut tsv_sha_file_path = config. expected_cache_path ( ) ;
187
+ tsv_sha_file_path. push ( default_tsv_sha_file_path ( & config. network . stacks_network ) ) ;
188
+
189
+ // Download archive if not already present in cache
190
+ // Load the local
191
+ let local_sha_file = FileLocation :: from_path ( tsv_sha_file_path) . read_content ( ) ;
192
+ let sha_url = config. expected_remote_stacks_tsv_sha256 ( ) ;
193
+
194
+ let remote_sha_file = match reqwest:: get ( & sha_url) . await {
195
+ Ok ( response) => response. bytes ( ) . await ,
196
+ Err ( e) => Err ( e) ,
197
+ } ;
198
+ match ( local_sha_file, remote_sha_file) {
199
+ ( Ok ( local) , Ok ( remote_response) ) => {
200
+ println ! ( "{:?}" , local) ;
201
+ println ! ( "{:?}" , remote_response) ;
202
+ }
203
+ ( Ok ( local) , _) => {
204
+ // println!("Local: {:?}", local)
205
+ }
206
+ ( _, _) => {
207
+ // We will download the latest file
208
+ println ! ( "error reading local / remote" ) ;
209
+ }
210
+ }
211
+
212
+ if !tsv_file_path. exists ( ) {
213
+ info ! ( ctx. expect_logger( ) , "Downloading {}" , url) ;
214
+ match download_tsv_file ( & config) . await {
215
+ Ok ( _) => { }
216
+ Err ( e) => {
217
+ error ! ( ctx. expect_logger( ) , "{}" , e) ;
218
+ std:: process:: exit ( 1 ) ;
219
+ }
220
+ }
221
+ } else {
222
+ info ! (
223
+ ctx. expect_logger( ) ,
224
+ "Building in-memory chainstate from file {}" ,
225
+ tsv_file_path. display( )
226
+ ) ;
227
+ }
228
+ config. add_local_stacks_tsv_source ( & tsv_file_path) ;
229
+ }
230
+ true
231
+ } else {
232
+ info ! (
233
+ ctx. expect_logger( ) ,
234
+ "Streaming blocks from stacks-node {}" , config. network. stacks_node_rpc_url
235
+ ) ;
236
+ false
237
+ }
238
+ }
239
+
240
+ pub async fn download_ordinals_dataset_if_required ( config : & Config , ctx : & Context ) -> bool {
241
+ if config. is_initial_ingestion_required ( ) {
242
+ // Download default tsv.
243
+ if config. rely_on_remote_ordinals_sqlite ( )
244
+ && config. should_download_remote_ordinals_sqlite ( )
245
+ {
246
+ let url = config. expected_remote_ordinals_sqlite_url ( ) ;
247
+ let mut sqlite_file_path = config. expected_cache_path ( ) ;
248
+ sqlite_file_path. push ( default_sqlite_file_path ( & config. network . bitcoin_network ) ) ;
249
+ let mut tsv_sha_file_path = config. expected_cache_path ( ) ;
250
+ tsv_sha_file_path. push ( default_sqlite_sha_file_path (
251
+ & config. network . bitcoin_network ,
252
+ ) ) ;
253
+
254
+ // Download archive if not already present in cache
255
+ // Load the local
256
+ let local_sha_file = FileLocation :: from_path ( tsv_sha_file_path) . read_content ( ) ;
257
+ let sha_url = config. expected_remote_ordinals_sqlite_sha256 ( ) ;
258
+
259
+ let remote_sha_file = match reqwest:: get ( & sha_url) . await {
260
+ Ok ( response) => response. bytes ( ) . await ,
261
+ Err ( e) => Err ( e) ,
262
+ } ;
263
+ match ( local_sha_file, remote_sha_file) {
264
+ ( Ok ( local) , Ok ( remote_response) ) => {
265
+ println ! ( "{:?}" , local) ;
266
+ println ! ( "{:?}" , remote_response) ;
267
+ }
268
+ ( Ok ( local) , _) => {
269
+ // println!("Local: {:?}", local)
270
+ }
271
+ ( _, _) => {
272
+ // We will download the latest file
273
+ println ! ( "error reading local / remote" ) ;
274
+ }
275
+ }
276
+
277
+ if !sqlite_file_path. exists ( ) {
278
+ info ! ( ctx. expect_logger( ) , "Downloading {}" , url) ;
279
+ match download_sqlite_file ( & config) . await {
280
+ Ok ( _) => { }
281
+ Err ( e) => {
282
+ error ! ( ctx. expect_logger( ) , "{}" , e) ;
283
+ std:: process:: exit ( 1 ) ;
284
+ }
285
+ }
286
+ } else {
287
+ info ! (
288
+ ctx. expect_logger( ) ,
289
+ "Basing ordinals evaluation on database {}" ,
290
+ sqlite_file_path. display( )
291
+ ) ;
292
+ }
293
+ // config.add_local_ordinals_sqlite_source(&sqlite_file_path);
294
+ }
295
+ true
296
+ } else {
297
+ info ! (
298
+ ctx. expect_logger( ) ,
299
+ "Streaming blocks from bitcoind {}" , config. network. stacks_node_rpc_url
300
+ ) ;
301
+ false
302
+ }
303
+ }
0 commit comments