@@ -29,13 +29,14 @@ use common_types::{
29
29
schema:: { IndexInWriterSchema , Schema } ,
30
30
table:: ShardId ,
31
31
} ;
32
+ use futures:: StreamExt ;
32
33
use generic_error:: BoxError ;
33
34
use lazy_static:: lazy_static;
34
35
use logger:: { debug, error, info, trace, warn} ;
35
36
use prometheus:: { exponential_buckets, register_histogram, Histogram } ;
36
37
use snafu:: ResultExt ;
37
38
use table_engine:: table:: TableId ;
38
- use tokio:: sync:: MutexGuard ;
39
+ use tokio:: sync:: { Mutex , MutexGuard } ;
39
40
use wal:: {
40
41
log_batch:: LogEntry ,
41
42
manager:: {
@@ -335,6 +336,7 @@ impl RegionBasedReplay {
335
336
let schema_provider = TableSchemaProviderAdapter {
336
337
table_datas : table_datas_by_id. clone ( ) ,
337
338
} ;
339
+ let serial_exec_ctxs = Arc :: new ( Mutex :: new ( serial_exec_ctxs) ) ;
338
340
// Split and replay logs.
339
341
loop {
340
342
let _timer = PULL_LOGS_DURATION_HISTOGRAM . start_timer ( ) ;
@@ -352,13 +354,8 @@ impl RegionBasedReplay {
352
354
}
353
355
354
356
let _timer = APPLY_LOGS_DURATION_HISTOGRAM . start_timer ( ) ;
355
- Self :: replay_single_batch (
356
- context,
357
- & log_entry_buf,
358
- & mut serial_exec_ctxs,
359
- failed_tables,
360
- )
361
- . await ?;
357
+ Self :: replay_single_batch ( context, & log_entry_buf, & serial_exec_ctxs, failed_tables)
358
+ . await ?;
362
359
}
363
360
364
361
Ok ( ( ) )
@@ -367,36 +364,46 @@ impl RegionBasedReplay {
367
364
async fn replay_single_batch (
368
365
context : & ReplayContext ,
369
366
log_batch : & VecDeque < LogEntry < ReadPayload > > ,
370
- serial_exec_ctxs : & mut HashMap < TableId , SerialExecContext < ' _ > > ,
367
+ serial_exec_ctxs : & Arc < Mutex < HashMap < TableId , SerialExecContext < ' _ > > > > ,
371
368
failed_tables : & mut FailedTables ,
372
369
) -> Result < ( ) > {
373
370
let mut table_batches = Vec :: new ( ) ;
374
371
// TODO: No `group_by` method in `VecDeque`, so implement it manually here...
375
372
Self :: split_log_batch_by_table ( log_batch, & mut table_batches) ;
376
373
377
374
// TODO: Replay logs of different tables in parallel.
375
+ let mut replay_tasks = Vec :: with_capacity ( table_batches. len ( ) ) ;
378
376
for table_batch in table_batches {
379
377
// Some tables may have failed in previous replay, ignore them.
380
378
if failed_tables. contains_key ( & table_batch. table_id ) {
381
379
continue ;
382
380
}
383
381
384
- // Replay all log entries of current table.
385
- // Some tables may have been moved to other shards or dropped, ignore such logs.
386
- if let Some ( ctx) = serial_exec_ctxs. get_mut ( & table_batch. table_id ) {
387
- let result = replay_table_log_entries (
388
- & context. flusher ,
389
- context. max_retry_flush_limit ,
390
- & mut ctx. serial_exec ,
391
- & ctx. table_data ,
392
- log_batch. range ( table_batch. range ) ,
393
- )
394
- . await ;
382
+ let serial_exec_ctxs = serial_exec_ctxs. clone ( ) ;
383
+ replay_tasks. push ( async move {
384
+ // Some tables may have been moved to other shards or dropped, ignore such logs.
385
+ if let Some ( ctx) = serial_exec_ctxs. lock ( ) . await . get_mut ( & table_batch. table_id ) {
386
+ let result = replay_table_log_entries (
387
+ & context. flusher ,
388
+ context. max_retry_flush_limit ,
389
+ & mut ctx. serial_exec ,
390
+ & ctx. table_data ,
391
+ log_batch. range ( table_batch. range ) ,
392
+ )
393
+ . await ;
394
+ ( table_batch. table_id , Some ( result) )
395
+ } else {
396
+ ( table_batch. table_id , None )
397
+ }
398
+ } ) ;
399
+ }
395
400
401
+ // Run at most 20 tasks in parallel
402
+ let mut replay_tasks = futures:: stream:: iter ( replay_tasks) . buffer_unordered ( 20 ) ;
403
+ while let Some ( ( table_id, ret) ) = replay_tasks. next ( ) . await {
404
+ if let Some ( Err ( e) ) = ret {
396
405
// If occur error, mark this table as failed and store the cause.
397
- if let Err ( e) = result {
398
- failed_tables. insert ( table_batch. table_id , e) ;
399
- }
406
+ failed_tables. insert ( table_id, e) ;
400
407
}
401
408
}
402
409
0 commit comments