Skip to content

Commit 8cc6371

Browse files
authored
feat: support write partition table in grpc service (apache#828)
* feat: support write partition table in grpc service * refactor by CR
1 parent 175db3c commit 8cc6371

File tree

5 files changed

+168
-153
lines changed

5 files changed

+168
-153
lines changed

partition_table_engine/src/partition.rs

+2
Original file line numberDiff line numberDiff line change
@@ -268,10 +268,12 @@ impl Table for PartitionTableImpl {
268268
.fail()
269269
}
270270

271+
// Partition table is a virtual table, so it don't need to flush.
271272
async fn flush(&self, _request: FlushRequest) -> Result<()> {
272273
Ok(())
273274
}
274275

276+
// Partition table is a virtual table, so it don't need to compact.
275277
async fn compact(&self) -> Result<()> {
276278
Ok(())
277279
}

server/src/proxy/grpc/sql_query.rs

+2-149
Original file line numberDiff line numberDiff line change
@@ -5,35 +5,27 @@
55
use std::{sync::Arc, time::Instant};
66

77
use arrow_ext::ipc::{CompressOptions, CompressionMethod, RecordBatchesEncoder};
8-
use catalog::schema::{CreateOptions, CreateTableRequest, DropOptions, DropTableRequest};
98
use ceresdbproto::{
109
common::ResponseHeader,
1110
storage::{
1211
arrow_payload, sql_query_response, storage_service_client::StorageServiceClient,
1312
ArrowPayload, SqlQueryRequest, SqlQueryResponse,
1413
},
1514
};
16-
use common_types::{record_batch::RecordBatch, request_id::RequestId, table::DEFAULT_SHARD_ID};
15+
use common_types::{record_batch::RecordBatch, request_id::RequestId};
1716
use common_util::{error::BoxError, time::InstantExt};
1817
use futures::{stream, stream::BoxStream, FutureExt, StreamExt};
1918
use http::StatusCode;
2019
use interpreters::{context::Context as InterpreterContext, factory::Factory, interpreter::Output};
2120
use log::{error, info, warn};
2221
use query_engine::executor::Executor as QueryExecutor;
2322
use router::endpoint::Endpoint;
24-
use snafu::{ensure, OptionExt, ResultExt};
23+
use snafu::{ensure, ResultExt};
2524
use sql::{
2625
frontend,
2726
frontend::{Context as SqlContext, Frontend},
2827
provider::CatalogMetaProvider,
2928
};
30-
use table_engine::{
31-
engine::TableState,
32-
partition::{format_sub_partition_table_name, PartitionInfo},
33-
remote::model::{GetTableInfoRequest, TableIdentifier},
34-
table::TableId,
35-
PARTITION_TABLE_ENGINE_TYPE,
36-
};
3729
use tokio::sync::mpsc;
3830
use tokio_stream::wrappers::ReceiverStream;
3931
use tonic::{transport::Channel, IntoRequest};
@@ -346,140 +338,6 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
346338

347339
Ok(output)
348340
}
349-
350-
async fn maybe_open_partition_table_if_not_exist(
351-
&self,
352-
catalog_name: &str,
353-
schema_name: &str,
354-
table_name: &str,
355-
) -> Result<()> {
356-
let partition_table_info = self
357-
.router
358-
.fetch_partition_table_info(schema_name, table_name)
359-
.await?;
360-
if partition_table_info.is_none() {
361-
return Ok(());
362-
}
363-
364-
let partition_table_info = partition_table_info.unwrap();
365-
366-
let catalog = self
367-
.instance
368-
.catalog_manager
369-
.catalog_by_name(catalog_name)
370-
.box_err()
371-
.with_context(|| ErrWithCause {
372-
code: StatusCode::INTERNAL_SERVER_ERROR,
373-
msg: format!("Failed to find catalog, catalog_name:{catalog_name}"),
374-
})?
375-
.with_context(|| ErrNoCause {
376-
code: StatusCode::BAD_REQUEST,
377-
msg: format!("Catalog not found, catalog_name:{catalog_name}"),
378-
})?;
379-
380-
// TODO: support create schema if not exist
381-
let schema = catalog
382-
.schema_by_name(schema_name)
383-
.box_err()
384-
.with_context(|| ErrWithCause {
385-
code: StatusCode::INTERNAL_SERVER_ERROR,
386-
msg: format!("Failed to find schema, schema_name:{schema_name}"),
387-
})?
388-
.context(ErrNoCause {
389-
code: StatusCode::BAD_REQUEST,
390-
msg: format!("Schema not found, schema_name:{schema_name}"),
391-
})?;
392-
let table = schema
393-
.table_by_name(&partition_table_info.name)
394-
.box_err()
395-
.with_context(|| ErrWithCause {
396-
code: StatusCode::INTERNAL_SERVER_ERROR,
397-
msg: format!(
398-
"Failed to find table, table_name:{}",
399-
partition_table_info.name
400-
),
401-
})?;
402-
403-
if let Some(table) = table {
404-
if table.id().as_u64() == partition_table_info.id {
405-
return Ok(());
406-
}
407-
408-
// Drop partition table if table id not match.
409-
let opts = DropOptions {
410-
table_engine: self.instance.partition_table_engine.clone(),
411-
};
412-
schema
413-
.drop_table(
414-
DropTableRequest {
415-
catalog_name: catalog_name.to_string(),
416-
schema_name: schema_name.to_string(),
417-
table_name: table_name.to_string(),
418-
engine: PARTITION_TABLE_ENGINE_TYPE.to_string(),
419-
},
420-
opts,
421-
)
422-
.await
423-
.box_err()
424-
.with_context(|| ErrWithCause {
425-
code: StatusCode::INTERNAL_SERVER_ERROR,
426-
msg: format!("Failed to drop partition table, table_name:{table_name}"),
427-
})?;
428-
}
429-
430-
// If table not exists, open it.
431-
// Get table_schema from first sub partition table.
432-
let first_sub_partition_table_name = get_sub_partition_name(
433-
&partition_table_info.name,
434-
&partition_table_info.partition_info,
435-
0usize,
436-
);
437-
let table = self
438-
.instance
439-
.remote_engine_ref
440-
.get_table_info(GetTableInfoRequest {
441-
table: TableIdentifier {
442-
catalog: catalog_name.to_string(),
443-
schema: schema_name.to_string(),
444-
table: first_sub_partition_table_name,
445-
},
446-
})
447-
.await
448-
.box_err()
449-
.with_context(|| ErrWithCause {
450-
code: StatusCode::INTERNAL_SERVER_ERROR,
451-
msg: "Failed to get table",
452-
})?;
453-
454-
// Partition table is a virtual table, so we need to create it manually.
455-
// Partition info is stored in ceresmeta, so we need to use create_table_request
456-
// to create it.
457-
let create_table_request = CreateTableRequest {
458-
catalog_name: catalog_name.to_string(),
459-
schema_name: schema_name.to_string(),
460-
table_name: partition_table_info.name,
461-
table_id: Some(TableId::new(partition_table_info.id)),
462-
table_schema: table.table_schema,
463-
engine: table.engine,
464-
options: Default::default(),
465-
state: TableState::Stable,
466-
shard_id: DEFAULT_SHARD_ID,
467-
partition_info: Some(partition_table_info.partition_info),
468-
};
469-
let create_opts = CreateOptions {
470-
table_engine: self.instance.partition_table_engine.clone(),
471-
create_if_not_exists: true,
472-
};
473-
schema
474-
.create_table(create_table_request.clone(), create_opts)
475-
.await
476-
.box_err()
477-
.with_context(|| ErrWithCause {
478-
code: StatusCode::INTERNAL_SERVER_ERROR,
479-
msg: format!("Failed to create table, request:{create_table_request:?}"),
480-
})?;
481-
Ok(())
482-
}
483341
}
484342

485343
// TODO(chenxiang): Output can have both `rows` and `affected_rows`
@@ -603,8 +461,3 @@ impl QueryResponseWriter {
603461
Ok(resp)
604462
}
605463
}
606-
607-
fn get_sub_partition_name(table_name: &str, partition_info: &PartitionInfo, id: usize) -> String {
608-
let partition_name = partition_info.get_definitions()[id].name.clone();
609-
format_sub_partition_table_name(table_name, &partition_name)
610-
}

server/src/proxy/grpc/write.rs

+10-1
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
118118
request_id,
119119
deadline,
120120
catalog: catalog.to_string(),
121-
schema: schema.to_string(),
121+
schema: schema.clone(),
122122
auto_create_table: self.auto_create_table,
123123
};
124124

@@ -130,6 +130,15 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
130130
)
131131
.await?;
132132

133+
for insert_plan in &plan_vec {
134+
self.maybe_open_partition_table_if_not_exist(
135+
catalog,
136+
&schema,
137+
insert_plan.table.name(),
138+
)
139+
.await?;
140+
}
141+
133142
let mut success = 0;
134143
for insert_plan in plan_vec {
135144
success += execute_insert_plan(

0 commit comments

Comments
 (0)