Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: insert select to stream mode #1544

Merged
merged 7 commits into from
Jul 18, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 100 additions & 38 deletions src/interpreters/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use common_types::{
column_block::{ColumnBlock, ColumnBlockBuilder},
column_schema::ColumnId,
datum::Datum,
record_batch::RecordBatch as CommonRecordBatch,
row::{Row, RowBuilder, RowGroup},
schema::Schema,
};
Expand All @@ -43,9 +44,10 @@ use datafusion::{
},
};
use df_operator::visitor::find_columns_by_expr;
use futures::TryStreamExt;
use futures::StreamExt;
use generic_error::{BoxError, GenericError};
use hash_ext::hash64;
use logger::error;
use macros::define_result;
use query_engine::{executor::ExecutorRef, physical_planner::PhysicalPlannerRef};
use query_frontend::{
Expand All @@ -54,12 +56,14 @@ use query_frontend::{
};
use runtime::Priority;
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use table_engine::table::{TableRef, WriteRequest};
use table_engine::{
stream::SendableRecordBatchStream,
table::{TableRef, WriteRequest},
};

use crate::{
context::Context,
interpreter::{Insert, Interpreter, InterpreterPtr, Output, Result as InterpreterResult},
RecordBatchVec,
};

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -115,6 +119,9 @@ pub enum Error {

#[snafu(display("Record columns not enough, len:{}, index:{}", len, index))]
RecordColumnsNotEnough { len: usize, index: usize },

#[snafu(display("Failed to do select, err:{}", source))]
Select { source: table_engine::stream::Error },
}

define_result!(Error);
Expand Down Expand Up @@ -152,14 +159,18 @@ impl Interpreter for InsertInterpreter {
default_value_map,
} = self.plan;

let mut rows = match source {
InsertSource::Values { row_group } => row_group,
match source {
InsertSource::Values { row_group: rows } => {
let num_rows =
prepare_and_write_table(table.clone(), rows, &default_value_map).await?;

Ok(Output::AffectedRows(num_rows))
}
InsertSource::Select {
query: query_plan,
column_index_in_insert,
} => {
// TODO: support streaming insert
let record_batches = exec_select_logical_plan(
let mut record_batches_stream = exec_select_logical_plan(
self.ctx,
query_plan,
self.executor,
Expand All @@ -168,38 +179,98 @@ impl Interpreter for InsertInterpreter {
.await
.context(Insert)?;

if record_batches.is_empty() {
return Ok(Output::AffectedRows(0));
// accumulate 8 record batches before writing to table and this param
// can be adjusted according to the performance test result.
let max_batch_size = 8;
let mut result_rows = 0;
let mut record_batches = Vec::new();
while let Some(response) = record_batches_stream.next().await {
match response {
Ok(record_batch) => {
if record_batch.is_empty() {
continue;
}
record_batches.push(record_batch);
}
Err(e) => {
error!("Failed to get record batch, err:{}", e);
return Err(e).context(Select).context(Insert)?;
}
}

if record_batches.len() >= max_batch_size {
let num_rows = write_record_batches(
&mut record_batches,
column_index_in_insert.as_slice(),
table.clone(),
&default_value_map,
)
.await?;
result_rows += num_rows;
}
}

if !record_batches.is_empty() {
let num_rows = write_record_batches(
&mut record_batches,
column_index_in_insert.as_slice(),
table,
&default_value_map,
)
.await?;
result_rows += num_rows;
}

convert_records_to_row_group(record_batches, column_index_in_insert, table.schema())
.context(Insert)?
Ok(Output::AffectedRows(result_rows))
}
};
}
}
}

maybe_generate_tsid(&mut rows).context(Insert)?;
async fn write_record_batches(
record_batches: &mut Vec<CommonRecordBatch>,
column_index_in_insert: &[InsertMode],
table: TableRef,
default_value_map: &BTreeMap<usize, DfLogicalExpr>,
) -> InterpreterResult<usize> {
let row_group = convert_records_to_row_group(
record_batches.as_slice(),
column_index_in_insert,
table.schema(),
)
.context(Insert)?;
record_batches.clear();

prepare_and_write_table(table, row_group, default_value_map).await
}

// Fill default values
fill_default_values(table.clone(), &mut rows, &default_value_map).context(Insert)?;
async fn prepare_and_write_table(
table: TableRef,
mut row_group: RowGroup,
default_value_map: &BTreeMap<usize, DfLogicalExpr>,
) -> InterpreterResult<usize> {
maybe_generate_tsid(&mut row_group).context(Insert)?;

let request = WriteRequest { row_group: rows };
// Fill default values
fill_default_values(table.clone(), &mut row_group, default_value_map).context(Insert)?;

let num_rows = table
.write(request)
.await
.context(WriteTable)
.context(Insert)?;
let request = WriteRequest { row_group };

Ok(Output::AffectedRows(num_rows))
}
let num_rows = table
.write(request)
.await
.context(WriteTable)
.context(Insert)?;

Ok(num_rows)
}

async fn exec_select_logical_plan(
ctx: Context,
query_plan: QueryPlan,
executor: ExecutorRef,
physical_planner: PhysicalPlannerRef,
) -> Result<RecordBatchVec> {
) -> Result<SendableRecordBatchStream> {
let priority = Priority::High;

let query_ctx = ctx
Expand All @@ -216,34 +287,25 @@ async fn exec_select_logical_plan(
})?;

// Execute select physical plan.
let record_batch_stream = executor
let record_batch_stream: SendableRecordBatchStream = executor
.execute(&query_ctx, physical_plan)
.await
.box_err()
.context(ExecuteSelectPlan {
msg: "failed to execute select physical plan",
})?;

let record_batches =
record_batch_stream
.try_collect()
.await
.box_err()
.context(ExecuteSelectPlan {
msg: "failed to collect select execution results",
})?;

Ok(record_batches)
Ok(record_batch_stream)
}

fn convert_records_to_row_group(
record_batches: RecordBatchVec,
column_index_in_insert: Vec<InsertMode>,
record_batches: &[CommonRecordBatch],
column_index_in_insert: &[InsertMode],
schema: Schema,
) -> Result<RowGroup> {
let mut data_rows: Vec<Row> = Vec::new();

for record in &record_batches {
for record in record_batches {
let num_cols = record.num_columns();
let num_rows = record.num_rows();
for row_idx in 0..num_rows {
Expand Down
Loading