@@ -11,7 +11,9 @@ use std::{
11
11
use async_trait:: async_trait;
12
12
use common_types:: { schema:: IndexInWriterSchema , table:: ShardId } ;
13
13
use common_util:: error:: BoxError ;
14
+ use lazy_static:: lazy_static;
14
15
use log:: { debug, error, info, trace} ;
16
+ use prometheus:: { exponential_buckets, register_histogram, Histogram } ;
15
17
use snafu:: ResultExt ;
16
18
use table_engine:: table:: TableId ;
17
19
use tokio:: sync:: MutexGuard ;
@@ -34,6 +36,22 @@ use crate::{
34
36
table:: data:: TableDataRef ,
35
37
} ;
36
38
39
+ // Metrics of wal replayer
40
+ lazy_static ! {
41
+ static ref PULL_LOGS_DURATION_HISTOGRAM : Histogram = register_histogram!(
42
+ "wal_replay_pull_logs_duration" ,
43
+ "Histogram for pull logs duration in wal replay in seconds" ,
44
+ exponential_buckets( 0.01 , 2.0 , 13 ) . unwrap( )
45
+ )
46
+ . unwrap( ) ;
47
+ static ref APPLY_LOGS_DURATION_HISTOGRAM : Histogram = register_histogram!(
48
+ "wal_replay_apply_logs_duration" ,
49
+ "Histogram for apply logs duration in wal replay in seconds" ,
50
+ exponential_buckets( 0.01 , 2.0 , 13 ) . unwrap( )
51
+ )
52
+ . unwrap( ) ;
53
+ }
54
+
37
55
/// Wal replayer supporting both table based and region based
38
56
// TODO: limit the memory usage in `RegionBased` mode.
39
57
pub struct WalReplayer < ' a > {
@@ -186,18 +204,21 @@ impl TableBasedReplay {
186
204
let mut log_entry_buf = VecDeque :: with_capacity ( context. wal_replay_batch_size ) ;
187
205
loop {
188
206
// fetch entries to log_entry_buf
207
+ let timer = PULL_LOGS_DURATION_HISTOGRAM . start_timer ( ) ;
189
208
let decoder = WalDecoder :: default ( ) ;
190
209
log_entry_buf = log_iter
191
210
. next_log_entries ( decoder, log_entry_buf)
192
211
. await
193
212
. box_err ( )
194
213
. context ( ReplayWalWithCause { msg : None } ) ?;
214
+ drop ( timer) ;
195
215
196
216
if log_entry_buf. is_empty ( ) {
197
217
break ;
198
218
}
199
219
200
220
// Replay all log entries of current table
221
+ let timer = APPLY_LOGS_DURATION_HISTOGRAM . start_timer ( ) ;
201
222
replay_table_log_entries (
202
223
& context. flusher ,
203
224
context. max_retry_flush_limit ,
@@ -206,6 +227,7 @@ impl TableBasedReplay {
206
227
log_entry_buf. iter ( ) ,
207
228
)
208
229
. await ?;
230
+ drop ( timer) ;
209
231
}
210
232
211
233
Ok ( ( ) )
@@ -276,19 +298,23 @@ impl RegionBasedReplay {
276
298
277
299
// Split and replay logs.
278
300
loop {
301
+ let timer = PULL_LOGS_DURATION_HISTOGRAM . start_timer ( ) ;
279
302
let decoder = WalDecoder :: default ( ) ;
280
303
log_entry_buf = log_iter
281
304
. next_log_entries ( decoder, log_entry_buf)
282
305
. await
283
306
. box_err ( )
284
307
. context ( ReplayWalWithCause { msg : None } ) ?;
308
+ drop ( timer) ;
285
309
286
310
if log_entry_buf. is_empty ( ) {
287
311
break ;
288
312
}
289
313
314
+ let timer = APPLY_LOGS_DURATION_HISTOGRAM . start_timer ( ) ;
290
315
Self :: replay_single_batch ( context, & log_entry_buf, & mut serial_exec_ctxs, faileds)
291
316
. await ?;
317
+ drop ( timer) ;
292
318
}
293
319
294
320
Ok ( ( ) )
0 commit comments