Skip to content

Commit 858ebed

Browse files
zealchenchenmian.cm
and
chenmian.cm
committed
Feat concurrent replay wal (apache#1505)
## Rationale Close apache#1498 ## Detailed Changes Recover table logs in parallel. ## Test Plan CI --------- Co-authored-by: chenmian.cm <chenmian.cm@antfin.com>
1 parent 86405c5 commit 858ebed

File tree

1 file changed

+18
-3
lines changed

1 file changed

+18
-3
lines changed

src/analytic_engine/src/instance/wal_replayer.rs

+18-3
Original file line numberDiff line numberDiff line change
@@ -186,9 +186,24 @@ impl Replay for TableBasedReplay {
186186
batch_size: context.wal_replay_batch_size,
187187
..Default::default()
188188
};
189-
for table_data in table_datas {
190-
let table_id = table_data.id;
191-
if let Err(e) = Self::recover_table_logs(context, table_data, &read_ctx).await {
189+
190+
let mut tasks = futures::stream::iter(
191+
table_datas
192+
.iter()
193+
.map(|table_data| {
194+
let table_id = table_data.id;
195+
let read_ctx = &read_ctx;
196+
async move {
197+
let ret = Self::recover_table_logs(context, table_data, read_ctx).await;
198+
(table_id, ret)
199+
}
200+
})
201+
.collect::<Vec<_>>(),
202+
)
203+
.buffer_unordered(20);
204+
while let Some((table_id, ret)) = tasks.next().await {
205+
if let Err(e) = ret {
206+
// If occur error, mark this table as failed and store the cause.
192207
failed_tables.insert(table_id, e);
193208
}
194209
}

0 commit comments

Comments
 (0)