Skip to content

Commit 2b9de8d

Browse files
authored
fix: write cancel when flush pending write queue (#940)
## Related Issues Closes # ## Detailed Changes Currently, the writer for the first request in the pending write queue is responsible for flushing all the writes in the queue, but it can't be ensured to always trigger flush because it may be cancelled when await for the serial exec lock. And this change set replaces the `try_join_all` in the rpc services to avoid such cancellation. ## Test Plan Test it manually.
1 parent 9058868 commit 2b9de8d

File tree

4 files changed

+49
-26
lines changed

4 files changed

+49
-26
lines changed

analytic_engine/src/instance/write.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ impl<'a> MemTableWriter<'a> {
353353

354354
impl<'a> Writer<'a> {
355355
pub(crate) async fn write(&mut self, request: WriteRequest) -> Result<usize> {
356-
let _timer = self.table_data.metrics.start_table_write_timer();
356+
let _timer = self.table_data.metrics.start_table_write_execute_timer();
357357
self.table_data.metrics.on_write_request_begin();
358358

359359
self.validate_before_write(&request)?;

analytic_engine/src/table/metrics.rs

+9-1
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ pub struct Metrics {
144144
table_write_space_flush_wait_duration: Histogram,
145145
table_write_instance_flush_wait_duration: Histogram,
146146
table_write_flush_wait_duration: Histogram,
147+
table_write_execute_duration: Histogram,
147148
table_write_total_duration: Histogram,
148149
}
149150

@@ -175,6 +176,8 @@ impl Default for Metrics {
175176
.with_label_values(&["wait_instance_flush"]),
176177
table_write_flush_wait_duration: TABLE_WRITE_DURATION_HISTOGRAM
177178
.with_label_values(&["wait_flush"]),
179+
table_write_execute_duration: TABLE_WRITE_DURATION_HISTOGRAM
180+
.with_label_values(&["execute"]),
178181
table_write_total_duration: TABLE_WRITE_DURATION_HISTOGRAM
179182
.with_label_values(&["total"]),
180183
}
@@ -210,10 +213,15 @@ impl Metrics {
210213
}
211214

212215
#[inline]
213-
pub fn start_table_write_timer(&self) -> HistogramTimer {
216+
pub fn start_table_total_timer(&self) -> HistogramTimer {
214217
self.table_write_total_duration.start_timer()
215218
}
216219

220+
#[inline]
221+
pub fn start_table_write_execute_timer(&self) -> HistogramTimer {
222+
self.table_write_execute_duration.start_timer()
223+
}
224+
217225
#[inline]
218226
pub fn start_table_write_encode_timer(&self) -> HistogramTimer {
219227
self.table_write_encode_duration.start_timer()

analytic_engine/src/table/mod.rs

+6
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,12 @@ impl Table for TableImpl {
379379
}
380380

381381
async fn write(&self, request: WriteRequest) -> Result<usize> {
382+
let _timer = self
383+
.space_table
384+
.table_data()
385+
.metrics
386+
.start_table_total_timer();
387+
382388
if self.should_queue_write_request(&request) {
383389
return self.write_with_pending_queue(request).await;
384390
}

server/src/grpc/remote_engine_service/mod.rs

+33-24
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@ use ceresdbproto::{
1717
};
1818
use common_types::record_batch::RecordBatch;
1919
use common_util::{error::BoxError, time::InstantExt};
20-
use futures::{
21-
future::try_join_all,
22-
stream::{self, BoxStream, StreamExt},
23-
};
20+
use futures::stream::{self, BoxStream, StreamExt};
2421
use log::error;
2522
use proxy::instance::InstanceRef;
2623
use query_engine::executor::Executor as QueryExecutor;
@@ -162,31 +159,43 @@ impl<Q: QueryExecutor + 'static> RemoteEngineServiceImpl<Q> {
162159
) -> std::result::Result<Response<WriteResponse>, Status> {
163160
let begin_instant = Instant::now();
164161
let request = request.into_inner();
165-
let mut write_table_futures = Vec::with_capacity(request.batch.len());
162+
let mut write_table_handles = Vec::with_capacity(request.batch.len());
166163
for one_request in request.batch {
167164
let ctx = self.handler_ctx();
168-
write_table_futures.push(async move { handle_write(ctx, one_request).await });
165+
let handle = self
166+
.runtimes
167+
.write_runtime
168+
.spawn(handle_write(ctx, one_request));
169+
write_table_handles.push(handle);
169170
}
170171

171-
let handle = self
172-
.runtimes
173-
.write_runtime
174-
.spawn(try_join_all(write_table_futures));
175-
let batch_result = handle.await.box_err().context(ErrWithCause {
176-
code: StatusCode::Internal,
177-
msg: "fail to run the join task",
178-
});
179-
180-
let mut batch_resp = WriteResponse::default();
181-
match batch_result {
182-
Ok(Ok(v)) => {
183-
batch_resp.header = Some(error::build_ok_header());
184-
batch_resp.affected_rows = v.into_iter().map(|resp| resp.affected_rows).sum();
185-
}
186-
Ok(Err(e)) | Err(e) => {
187-
batch_resp.header = Some(error::build_err_header(e));
188-
}
172+
let mut batch_resp = WriteResponse {
173+
header: Some(error::build_ok_header()),
174+
affected_rows: 0,
189175
};
176+
for write_handle in write_table_handles {
177+
let write_result = write_handle.await.box_err().context(ErrWithCause {
178+
code: StatusCode::Internal,
179+
msg: "fail to run the join task",
180+
});
181+
// The underlying write can't be cancelled, so just ignore the left write
182+
// handles (don't abort them) if any error is encountered.
183+
match write_result {
184+
Ok(res) => match res {
185+
Ok(resp) => batch_resp.affected_rows += resp.affected_rows,
186+
Err(e) => {
187+
error!("Failed to write batches, err:{e}");
188+
batch_resp.header = Some(error::build_err_header(e));
189+
break;
190+
}
191+
},
192+
Err(e) => {
193+
error!("Failed to write batches, err:{e}");
194+
batch_resp.header = Some(error::build_err_header(e));
195+
break;
196+
}
197+
};
198+
}
190199

191200
REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC
192201
.write_batch

0 commit comments

Comments
 (0)