Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use new protocol for remote engine service write #1146

Merged
merged 3 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ bytes = "1.1.0"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
ceresdbproto = "1.0"
ceresdbproto = "1.0.10"
codec = { path = "components/codec" }
chrono = "0.4"
clap = "3.0"
Expand Down
2 changes: 1 addition & 1 deletion common_types/src/column_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ impl TryFrom<&Arc<Field>> for ColumnSchema {
impl From<&ColumnSchema> for Field {
fn from(col_schema: &ColumnSchema) -> Self {
let metadata = encode_arrow_field_meta_data(col_schema);
// If the column sholud use dictionary, create correspond dictionary type.
// If the column should use dictionary, create correspond dictionary type.
let mut field = if col_schema.is_dictionary {
Field::new_dict(
&col_schema.name,
Expand Down
22 changes: 22 additions & 0 deletions common_types/src/datum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,28 @@ impl<'a> DatumView<'a> {
_ => None,
}
}

pub fn to_datum(&self) -> Datum {
match self {
DatumView::Null => Datum::Null,
DatumView::Timestamp(v) => Datum::Timestamp(*v),
DatumView::Double(v) => Datum::Double(*v),
DatumView::Float(v) => Datum::Float(*v),
DatumView::Varbinary(v) => Datum::Varbinary(Bytes::from(v.to_vec())),
DatumView::String(v) => Datum::String(StringBytes::copy_from_str(v)),
DatumView::UInt64(v) => Datum::UInt64(*v),
DatumView::UInt32(v) => Datum::UInt32(*v),
DatumView::UInt16(v) => Datum::UInt16(*v),
DatumView::UInt8(v) => Datum::UInt8(*v),
DatumView::Int64(v) => Datum::Int64(*v),
DatumView::Int32(v) => Datum::Int32(*v),
DatumView::Int16(v) => Datum::Int16(*v),
DatumView::Int8(v) => Datum::Int8(*v),
DatumView::Boolean(v) => Datum::Boolean(*v),
DatumView::Date(v) => Datum::Date(*v),
DatumView::Time(v) => Datum::Time(*v),
}
}
}

impl<'a> std::hash::Hash for DatumView<'a> {
Expand Down
20 changes: 6 additions & 14 deletions remote_engine_client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,9 @@ impl Client {

// Write to remote.
let table_ident = request.table.clone();
let request_pb = WriteRequest::convert_to_pb(request, self.compression)
.box_err()
.context(Convert {
msg: "Failed to convert WriteRequest to pb",
})?;
let request_pb = request.convert_into_pb().box_err().context(Convert {
msg: "Failed to convert WriteRequest to pb",
})?;
let mut rpc_client = RemoteEngineServiceClient::<Channel>::new(route_context.channel);

let result = rpc_client
Expand Down Expand Up @@ -184,16 +182,10 @@ impl Client {
request,
channel,
} = context;
let compress_options = self.compression;
let batch_request_pb = request.convert_into_pb().box_err().context(Convert {
msg: "failed to convert request to pb",
})?;
let handle = self.io_runtime.spawn(async move {
let batch_request_pb =
match WriteBatchRequest::convert_write_batch_to_pb(request, compress_options)
.box_err()
{
Ok(pb) => pb,
Err(e) => return Err(e),
};

let mut rpc_client = RemoteEngineServiceClient::<Channel>::new(channel);
let rpc_result = rpc_client
.write_batch(Request::new(batch_request_pb))
Expand Down
54 changes: 47 additions & 7 deletions server/src/grpc/remote_engine_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use async_trait::async_trait;
use catalog::{manager::ManagerRef, schema::SchemaRef};
use ceresdbproto::{
remote_engine::{
read_response::Output::Arrow, remote_engine_service_server::RemoteEngineService,
read_response::Output::Arrow, remote_engine_service_server::RemoteEngineService, row_group,
GetTableInfoRequest, GetTableInfoResponse, ReadRequest, ReadResponse, WriteBatchRequest,
WriteRequest, WriteResponse,
},
Expand All @@ -38,8 +38,11 @@ use proxy::{
use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner};
use snafu::{OptionExt, ResultExt};
use table_engine::{
engine::EngineRuntimes, predicate::PredicateRef, remote::model::TableIdentifier,
stream::PartitionedStreams, table::TableRef,
engine::EngineRuntimes,
predicate::PredicateRef,
remote::model::{self, TableIdentifier},
stream::PartitionedStreams,
table::TableRef,
};
use time_ext::InstantExt;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -604,19 +607,56 @@ async fn record_write(
}

async fn handle_write(ctx: HandlerContext, request: WriteRequest) -> Result<WriteResponse> {
let write_request: table_engine::remote::model::WriteRequest =
request.try_into().box_err().context(ErrWithCause {
let table_ident: TableIdentifier = request
.table
.context(ErrNoCause {
code: StatusCode::BadRequest,
msg: "fail to convert write request",
msg: "missing table ident",
})?
.into();

let rows_payload = request
.row_group
.context(ErrNoCause {
code: StatusCode::BadRequest,
msg: "missing row group payload",
})?
.rows
.context(ErrNoCause {
code: StatusCode::BadRequest,
msg: "missing rows payload",
})?;

let table = find_table_by_identifier(&ctx, &table_ident)?;
let write_request = match rows_payload {
row_group::Rows::Arrow(payload) => {
let row_group = model::WriteRequest::decode_row_group_from_arrow_payload(payload)
.box_err()
.context(ErrWithCause {
code: StatusCode::BadRequest,
msg: "failed to decode row group payload",
})?;
model::WriteRequest::new(table_ident, row_group)
}
row_group::Rows::Contiguous(payload) => {
let schema = table.schema();
let row_group =
model::WriteRequest::decode_row_group_from_contiguous_payload(payload, &schema)
.box_err()
.context(ErrWithCause {
code: StatusCode::BadRequest,
msg: "failed to decode row group payload",
})?;
model::WriteRequest::new(table_ident, row_group)
}
};

// In theory we should record write request we at the beginning of server's
// handle, but the payload is encoded, so we cannot record until decode payload
// here.
record_write(&ctx.hotspot_recorder, &write_request).await;

let num_rows = write_request.write_request.row_group.num_rows();
let table = find_table_by_identifier(&ctx, &write_request.table)?;

let res = table
.write(write_request.write_request)
Expand Down
Loading