Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: find new columns to improve write performance #918

Merged
merged 3 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 90 additions & 37 deletions proxy/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{

use bytes::Bytes;
use ceresdbproto::storage::{
storage_service_client::StorageServiceClient, value, RouteRequest, WriteRequest,
storage_service_client::StorageServiceClient, value, RouteRequest, Value, WriteRequest,
WriteResponse as WriteResponsePB, WriteSeriesEntry, WriteTableRequest,
};
use cluster::config::SchemaConfig;
Expand All @@ -34,7 +34,7 @@ use query_engine::executor::Executor as QueryExecutor;
use query_frontend::{
frontend::{Context as FrontendContext, Frontend},
plan::{AlterTableOperation, AlterTablePlan, InsertPlan, Plan},
planner::build_schema_from_write_table_request,
planner::{build_column_schema, try_get_data_type_from_value},
provider::CatalogMetaProvider,
};
use router::endpoint::Endpoint;
Expand All @@ -43,7 +43,7 @@ use table_engine::table::TableRef;
use tonic::transport::Channel;

use crate::{
error::{ErrNoCause, ErrWithCause, InternalNoCause, Result},
error::{ErrNoCause, ErrWithCause, Internal, InternalNoCause, Result},
forward::{ForwardResult, ForwarderRef},
Context, Proxy,
};
Expand Down Expand Up @@ -477,14 +477,6 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
code: StatusCode::BAD_REQUEST,
})?;
let schema = req_ctx.database;
let schema_config = self
.schema_config_provider
.schema_config(&schema)
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("Fail to fetch schema config, schema_name:{schema}"),
})?;

debug!(
"Local write begin, catalog:{catalog}, schema:{schema}, request_id:{request_id}, first_table:{:?}, num_tables:{}",
Expand All @@ -503,7 +495,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
};

let plan_vec = self
.write_request_to_insert_plan(req.table_requests, schema_config, write_context)
.write_request_to_insert_plan(req.table_requests, write_context)
.await?;

let mut success = 0;
Expand All @@ -522,7 +514,6 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
async fn write_request_to_insert_plan(
&self,
table_requests: Vec<WriteTableRequest>,
schema_config: Option<&SchemaConfig>,
write_context: WriteContext,
) -> Result<Vec<InsertPlan>> {
let mut plan_vec = Vec::with_capacity(table_requests.len());
Expand All @@ -534,7 +525,6 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
deadline,
auto_create_table,
} = write_context;
let schema_config = schema_config.cloned().unwrap_or_default();
for write_table_req in table_requests {
let table_name = &write_table_req.table;
self.maybe_open_partition_table_if_not_exist(&catalog, &schema, table_name)
Expand All @@ -555,7 +545,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
// * Currently, the decision to add columns is made at the request level, not at
// the row level, so the cost is relatively small.
let table_schema = table.schema();
let columns = find_new_columns(&table_schema, &schema_config, &write_table_req)?;
let columns = find_new_columns(&table_schema, &write_table_req)?;
if !columns.is_empty() {
self.execute_add_columns_plan(
request_id,
Expand Down Expand Up @@ -668,32 +658,95 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {

fn find_new_columns(
schema: &Schema,
schema_config: &SchemaConfig,
write_req: &WriteTableRequest,
write_table_req: &WriteTableRequest,
) -> Result<Vec<ColumnSchema>> {
let new_schema = build_schema_from_write_table_request(schema_config, write_req)
let WriteTableRequest {
table,
field_names,
tag_names,
entries: write_entries,
} = write_table_req;

let mut columns: BTreeMap<_, ColumnSchema> = BTreeMap::new();
for write_entry in write_entries {
// Parse tags.
for tag in &write_entry.tags {
let name_index = tag.name_index as usize;
ensure!(
name_index < tag_names.len(),
InternalNoCause {
msg: format!(
"Tag {tag:?} is not found in tag_names:{tag_names:?}, table:{table}",
),
}
);

let tag_name = &tag_names[name_index];

build_column(&mut columns, schema, tag_name, &tag.value, true)?;
}

// Parse fields.
for field_group in &write_entry.field_groups {
for field in &field_group.fields {
let field_index = field.name_index as usize;
ensure!(
field_index < field_names.len(),
InternalNoCause {
msg: format!(
"Field {field:?} is not found in field_names:{field_names:?}, table:{table}",
),
}
);
if (field.name_index as usize) < field_names.len() {
let field_name = &field_names[field.name_index as usize];
build_column(&mut columns, schema, field_name, &field.value, false)?;
}
}
}
}

Ok(columns.into_iter().map(|v| v.1).collect())
}

fn build_column<'a>(
columns: &mut BTreeMap<&'a str, ColumnSchema>,
schema: &Schema,
name: &'a str,
value: &Option<Value>,
is_tag: bool,
) -> Result<()> {
// Skip adding columns, the following cases:
// 1. Field already exists.
// 2. The new column has been added.
if schema.index_of(name).is_some() || columns.get(name).is_some() {
return Ok(());
}

let column_value = value
.as_ref()
.with_context(|| InternalNoCause {
msg: format!("Column value is needed, column:{name}"),
})?
.value
.as_ref()
.with_context(|| InternalNoCause {
msg: format!("Column value type is not supported, column:{name}"),
})?;

let data_type = try_get_data_type_from_value(column_value)
.box_err()
.context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "Build schema from write table request failed",
.context(Internal {
msg: "Failed to get data type",
})?;

let columns = new_schema.columns();
let old_columns = schema.columns();

// find new columns:
// 1. timestamp column can't be a new column;
// 2. column not in old schema is a new column.
let new_columns = columns
.iter()
.enumerate()
.filter(|(idx, column)| {
*idx != new_schema.timestamp_index()
&& !old_columns.iter().any(|c| c.name == column.name)
})
.map(|(_, column)| column.clone())
.collect();
Ok(new_columns)
let column_schema = build_column_schema(name, data_type, is_tag)
.box_err()
.context(Internal {
msg: "Failed to build column schema",
})?;
columns.insert(name, column_schema);
Ok(())
}

fn write_table_request_to_insert_plan(
Expand Down
5 changes: 3 additions & 2 deletions query_frontend/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ impl<'a, P: MetaProvider> Planner<'a, P> {
}
}

fn build_column_schema(
pub fn build_column_schema(
column_name: &str,
data_type: DatumKind,
is_tag: bool,
Expand Down Expand Up @@ -537,7 +537,7 @@ fn ensure_data_type_compatible(
Ok(())
}

fn try_get_data_type_from_value(value: &PbValue) -> Result<DatumKind> {
pub fn try_get_data_type_from_value(value: &PbValue) -> Result<DatumKind> {
match value {
PbValue::Float64Value(_) => Ok(DatumKind::Double),
PbValue::StringValue(_) => Ok(DatumKind::String),
Expand All @@ -555,6 +555,7 @@ fn try_get_data_type_from_value(value: &PbValue) -> Result<DatumKind> {
PbValue::VarbinaryValue(_) => Ok(DatumKind::Varbinary),
}
}

/// A planner wraps the datafusion's logical planner, and delegate sql like
/// select/explain to datafusion's planner.
pub(crate) struct PlannerDelegate<'a, P: MetaProvider> {
Expand Down