Skip to content

Commit ba4a923

Browse files
zealchenjiacai2050
authored andcommitted
refactor: insert select to stream mode (apache#1544)
## Rationale Close apache#1542 ## Detailed Changes Do select and insert procedure in stream way. ## Test Plan CI test. --------- Co-authored-by: jiacai2050 <dev@liujiacai.net>
1 parent 7c14eac commit ba4a923

File tree

4 files changed

+143
-42
lines changed

4 files changed

+143
-42
lines changed

integration_tests/cases/env/local/dml/insert_into_select.result

+7-3
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,11 @@ INSERT INTO `insert_into_select_table1` (`timestamp`, `value`, `name`)
3535
VALUES
3636
(1, 100, "s1"),
3737
(2, 200, "s2"),
38-
(3, 300, "s3");
38+
(3, 300, "s3"),
39+
(4, 400, "s4"),
40+
(5, 500, "s5");
3941

40-
affected_rows: 3
42+
affected_rows: 5
4143

4244
DROP TABLE IF EXISTS `insert_into_select_table2`;
4345

@@ -58,7 +60,7 @@ INSERT INTO `insert_into_select_table2` (`timestamp`, `value`)
5860
SELECT `timestamp`, `value`
5961
FROM `insert_into_select_table1`;
6062

61-
affected_rows: 3
63+
affected_rows: 5
6264

6365
SELECT `timestamp`, `value`, `name`
6466
FROM `insert_into_select_table2`;
@@ -67,6 +69,8 @@ timestamp,value,name,
6769
Timestamp(1),Int32(100),String(""),
6870
Timestamp(2),Int32(200),String(""),
6971
Timestamp(3),Int32(300),String(""),
72+
Timestamp(4),Int32(400),String(""),
73+
Timestamp(5),Int32(500),String(""),
7074

7175

7276
DROP TABLE `insert_into_select_table1`;

integration_tests/cases/env/local/dml/insert_into_select.sql

+3-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ INSERT INTO `insert_into_select_table1` (`timestamp`, `value`, `name`)
3232
VALUES
3333
(1, 100, "s1"),
3434
(2, 200, "s2"),
35-
(3, 300, "s3");
35+
(3, 300, "s3"),
36+
(4, 400, "s4"),
37+
(5, 500, "s5");
3638

3739
DROP TABLE IF EXISTS `insert_into_select_table2`;
3840

src/interpreters/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ regex = { workspace = true }
5454
runtime = { workspace = true }
5555
snafu = { workspace = true }
5656
table_engine = { workspace = true }
57+
tokio = { workspace = true }
5758

5859
[dev-dependencies]
5960
analytic_engine = { workspace = true, features = ["test"] }

src/interpreters/src/insert.rs

+132-38
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use common_types::{
3030
column_block::{ColumnBlock, ColumnBlockBuilder},
3131
column_schema::ColumnId,
3232
datum::Datum,
33+
record_batch::RecordBatch as CommonRecordBatch,
3334
row::{Row, RowBuilder, RowGroup},
3435
schema::Schema,
3536
};
@@ -54,12 +55,15 @@ use query_frontend::{
5455
};
5556
use runtime::Priority;
5657
use snafu::{ensure, OptionExt, ResultExt, Snafu};
57-
use table_engine::table::{TableRef, WriteRequest};
58+
use table_engine::{
59+
stream::SendableRecordBatchStream,
60+
table::{TableRef, WriteRequest},
61+
};
62+
use tokio::sync::mpsc;
5863

5964
use crate::{
6065
context::Context,
6166
interpreter::{Insert, Interpreter, InterpreterPtr, Output, Result as InterpreterResult},
62-
RecordBatchVec,
6367
};
6468

6569
#[derive(Debug, Snafu)]
@@ -115,10 +119,23 @@ pub enum Error {
115119

116120
#[snafu(display("Record columns not enough, len:{}, index:{}", len, index))]
117121
RecordColumnsNotEnough { len: usize, index: usize },
122+
123+
#[snafu(display("Failed to do select, err:{}", source))]
124+
Select { source: table_engine::stream::Error },
125+
126+
#[snafu(display("Failed to send msg in channel, err:{}", msg))]
127+
MsgChannel { msg: String },
128+
129+
#[snafu(display("Failed to join async task, err:{}", msg))]
130+
AsyncTask { msg: String },
118131
}
119132

120133
define_result!(Error);
121134

135+
// TODO: make those configurable
136+
const INSERT_SELECT_ROW_BATCH_NUM: usize = 1000;
137+
const INSERT_SELECT_PENDING_BATCH_NUM: usize = 3;
138+
122139
pub struct InsertInterpreter {
123140
ctx: Context,
124141
plan: InsertPlan,
@@ -152,14 +169,18 @@ impl Interpreter for InsertInterpreter {
152169
default_value_map,
153170
} = self.plan;
154171

155-
let mut rows = match source {
156-
InsertSource::Values { row_group } => row_group,
172+
match source {
173+
InsertSource::Values { row_group: rows } => {
174+
let num_rows =
175+
prepare_and_write_table(table.clone(), rows, &default_value_map).await?;
176+
177+
Ok(Output::AffectedRows(num_rows))
178+
}
157179
InsertSource::Select {
158180
query: query_plan,
159181
column_index_in_insert,
160182
} => {
161-
// TODO: support streaming insert
162-
let record_batches = exec_select_logical_plan(
183+
let mut record_batches_stream = exec_select_logical_plan(
163184
self.ctx,
164185
query_plan,
165186
self.executor,
@@ -168,38 +189,120 @@ impl Interpreter for InsertInterpreter {
168189
.await
169190
.context(Insert)?;
170191

171-
if record_batches.is_empty() {
172-
return Ok(Output::AffectedRows(0));
173-
}
192+
let (tx, rx) = mpsc::channel(INSERT_SELECT_PENDING_BATCH_NUM);
193+
let producer = tokio::spawn(async move {
194+
while let Some(record_batch) = record_batches_stream
195+
.try_next()
196+
.await
197+
.context(Select)
198+
.context(Insert)?
199+
{
200+
if record_batch.is_empty() {
201+
continue;
202+
}
203+
if let Err(e) = tx.send(record_batch).await {
204+
return Err(Error::MsgChannel {
205+
msg: format!("{}", e),
206+
})
207+
.context(Insert)?;
208+
}
209+
}
210+
Ok(())
211+
});
212+
213+
let consumer = tokio::spawn(async move {
214+
let mut rx = rx;
215+
let mut result_rows = 0;
216+
let mut pending_rows = 0;
217+
let mut record_batches = Vec::new();
218+
while let Some(record_batch) = rx.recv().await {
219+
pending_rows += record_batch.num_rows();
220+
record_batches.push(record_batch);
221+
if pending_rows >= INSERT_SELECT_ROW_BATCH_NUM {
222+
pending_rows = 0;
223+
let num_rows = write_record_batches(
224+
&mut record_batches,
225+
column_index_in_insert.as_slice(),
226+
table.clone(),
227+
&default_value_map,
228+
)
229+
.await?;
230+
result_rows += num_rows;
231+
}
232+
}
174233

175-
convert_records_to_row_group(record_batches, column_index_in_insert, table.schema())
176-
.context(Insert)?
234+
if !record_batches.is_empty() {
235+
let num_rows = write_record_batches(
236+
&mut record_batches,
237+
column_index_in_insert.as_slice(),
238+
table,
239+
&default_value_map,
240+
)
241+
.await?;
242+
result_rows += num_rows;
243+
}
244+
Ok(result_rows)
245+
});
246+
247+
match tokio::try_join!(producer, consumer) {
248+
Ok((select_res, write_rows)) => {
249+
select_res?;
250+
Ok(Output::AffectedRows(write_rows?))
251+
}
252+
Err(e) => Err(Error::AsyncTask {
253+
msg: format!("{}", e),
254+
})
255+
.context(Insert)?,
256+
}
177257
}
178-
};
258+
}
259+
}
260+
}
179261

180-
maybe_generate_tsid(&mut rows).context(Insert)?;
262+
async fn write_record_batches(
263+
record_batches: &mut Vec<CommonRecordBatch>,
264+
column_index_in_insert: &[InsertMode],
265+
table: TableRef,
266+
default_value_map: &BTreeMap<usize, DfLogicalExpr>,
267+
) -> InterpreterResult<usize> {
268+
let row_group = convert_records_to_row_group(
269+
record_batches.as_slice(),
270+
column_index_in_insert,
271+
table.schema(),
272+
)
273+
.context(Insert)?;
274+
record_batches.clear();
275+
276+
prepare_and_write_table(table, row_group, default_value_map).await
277+
}
181278

182-
// Fill default values
183-
fill_default_values(table.clone(), &mut rows, &default_value_map).context(Insert)?;
279+
async fn prepare_and_write_table(
280+
table: TableRef,
281+
mut row_group: RowGroup,
282+
default_value_map: &BTreeMap<usize, DfLogicalExpr>,
283+
) -> InterpreterResult<usize> {
284+
maybe_generate_tsid(&mut row_group).context(Insert)?;
184285

185-
let request = WriteRequest { row_group: rows };
286+
// Fill default values
287+
fill_default_values(table.clone(), &mut row_group, default_value_map).context(Insert)?;
186288

187-
let num_rows = table
188-
.write(request)
189-
.await
190-
.context(WriteTable)
191-
.context(Insert)?;
289+
let request = WriteRequest { row_group };
192290

193-
Ok(Output::AffectedRows(num_rows))
194-
}
291+
let num_rows = table
292+
.write(request)
293+
.await
294+
.context(WriteTable)
295+
.context(Insert)?;
296+
297+
Ok(num_rows)
195298
}
196299

197300
async fn exec_select_logical_plan(
198301
ctx: Context,
199302
query_plan: QueryPlan,
200303
executor: ExecutorRef,
201304
physical_planner: PhysicalPlannerRef,
202-
) -> Result<RecordBatchVec> {
305+
) -> Result<SendableRecordBatchStream> {
203306
let priority = Priority::High;
204307

205308
let query_ctx = ctx
@@ -216,34 +319,25 @@ async fn exec_select_logical_plan(
216319
})?;
217320

218321
// Execute select physical plan.
219-
let record_batch_stream = executor
322+
let record_batch_stream: SendableRecordBatchStream = executor
220323
.execute(&query_ctx, physical_plan)
221324
.await
222325
.box_err()
223326
.context(ExecuteSelectPlan {
224327
msg: "failed to execute select physical plan",
225328
})?;
226329

227-
let record_batches =
228-
record_batch_stream
229-
.try_collect()
230-
.await
231-
.box_err()
232-
.context(ExecuteSelectPlan {
233-
msg: "failed to collect select execution results",
234-
})?;
235-
236-
Ok(record_batches)
330+
Ok(record_batch_stream)
237331
}
238332

239333
fn convert_records_to_row_group(
240-
record_batches: RecordBatchVec,
241-
column_index_in_insert: Vec<InsertMode>,
334+
record_batches: &[CommonRecordBatch],
335+
column_index_in_insert: &[InsertMode],
242336
schema: Schema,
243337
) -> Result<RowGroup> {
244338
let mut data_rows: Vec<Row> = Vec::new();
245339

246-
for record in &record_batches {
340+
for record in record_batches {
247341
let num_cols = record.num_columns();
248342
let num_rows = record.num_rows();
249343
for row_idx in 0..num_rows {

0 commit comments

Comments
 (0)