Skip to content

Commit 3efa221

Browse files
authored
feat: query partition table with proxy in grpc service (apache#802)
* feat: support partition table with proxy * refactor code * feat: introduce partition table engine * test: modify integration_tests/cases/env/cluster/05_ddl/partition_table.result * refactor code * refactor by CR * refactor by CR * refactor by CR * refactor by CR
1 parent be3e4b4 commit 3efa221

File tree

45 files changed

+739
-421
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+739
-421
lines changed

Cargo.lock

+19
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

analytic_engine/src/engine.rs

+20-56
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
1+
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.
22

33
//! Implements the TableEngine trait
44
@@ -7,21 +7,17 @@ use std::sync::Arc;
77
use async_trait::async_trait;
88
use common_util::error::BoxError;
99
use log::info;
10-
use snafu::{OptionExt, ResultExt};
10+
use snafu::ResultExt;
1111
use table_engine::{
1212
engine::{
1313
Close, CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, Result,
14-
TableEngine, Unexpected, UnexpectedNoCause,
14+
TableEngine,
1515
},
1616
table::{SchemaId, TableRef},
1717
ANALYTIC_ENGINE_TYPE,
1818
};
1919

20-
use crate::{
21-
instance::InstanceRef,
22-
space::SpaceId,
23-
table::{partition::PartitionTableImpl, TableImpl},
24-
};
20+
use crate::{instance::InstanceRef, space::SpaceId, table::TableImpl};
2521

2622
/// TableEngine implementation
2723
pub struct TableEngineImpl {
@@ -76,30 +72,14 @@ impl TableEngine for TableEngineImpl {
7672

7773
let space_table = self.instance.create_table(space_id, request).await?;
7874

79-
let table_impl: TableRef = match &space_table.table_data().partition_info {
80-
None => Arc::new(TableImpl::new(
81-
self.instance.clone(),
82-
ANALYTIC_ENGINE_TYPE.to_string(),
83-
space_id,
84-
space_table.table_data().id,
85-
space_table.table_data().clone(),
86-
space_table,
87-
)),
88-
Some(_v) => Arc::new(
89-
PartitionTableImpl::new(
90-
self.instance
91-
.remote_engine
92-
.clone()
93-
.context(UnexpectedNoCause {
94-
msg: "remote engine not found",
95-
})?,
96-
ANALYTIC_ENGINE_TYPE.to_string(),
97-
space_table,
98-
)
99-
.box_err()
100-
.context(Unexpected)?,
101-
),
102-
};
75+
let table_impl: TableRef = Arc::new(TableImpl::new(
76+
self.instance.clone(),
77+
ANALYTIC_ENGINE_TYPE.to_string(),
78+
space_id,
79+
space_table.table_data().id,
80+
space_table.table_data().clone(),
81+
space_table,
82+
));
10383

10484
Ok(table_impl)
10585
}
@@ -128,30 +108,14 @@ impl TableEngine for TableEngineImpl {
128108
None => return Ok(None),
129109
};
130110

131-
let table_impl: TableRef = match &space_table.table_data().partition_info {
132-
None => Arc::new(TableImpl::new(
133-
self.instance.clone(),
134-
ANALYTIC_ENGINE_TYPE.to_string(),
135-
space_id,
136-
space_table.table_data().id,
137-
space_table.table_data().clone(),
138-
space_table,
139-
)),
140-
Some(_v) => Arc::new(
141-
PartitionTableImpl::new(
142-
self.instance
143-
.remote_engine
144-
.clone()
145-
.context(UnexpectedNoCause {
146-
msg: "remote engine is empty",
147-
})?,
148-
ANALYTIC_ENGINE_TYPE.to_string(),
149-
space_table,
150-
)
151-
.box_err()
152-
.context(Unexpected)?,
153-
),
154-
};
111+
let table_impl = Arc::new(TableImpl::new(
112+
self.instance.clone(),
113+
ANALYTIC_ENGINE_TYPE.to_string(),
114+
space_id,
115+
space_table.table_data().id,
116+
space_table.table_data().clone(),
117+
space_table,
118+
));
155119

156120
Ok(Some(table_impl))
157121
}

analytic_engine/src/instance/create.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
1+
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.
22

33
//! Create table logic of instance
44
@@ -113,7 +113,6 @@ impl Instance {
113113
table_name: table_data.name.clone(),
114114
schema: table_data.schema(),
115115
opts: table_data.table_options().as_ref().clone(),
116-
partition_info: table_data.partition_info.clone(),
117116
});
118117
MetaUpdateRequest {
119118
shard_info: table_data.shard_info,

analytic_engine/src/instance/mod.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
1+
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.
22

33
//! A table engine instance
44
//!
@@ -27,7 +27,7 @@ use common_util::{define_result, runtime::Runtime};
2727
use log::info;
2828
use mem_collector::MemUsageCollector;
2929
use snafu::{ResultExt, Snafu};
30-
use table_engine::{engine::EngineRuntimes, remote::RemoteEngineRef};
30+
use table_engine::engine::EngineRuntimes;
3131
use wal::manager::{WalLocation, WalManagerRef};
3232

3333
use crate::{
@@ -177,7 +177,6 @@ pub struct Instance {
177177
/// Options for scanning sst
178178
pub(crate) scan_options: ScanOptions,
179179
pub(crate) iter_options: Option<IterOptions>,
180-
pub(crate) remote_engine: Option<RemoteEngineRef>,
181180
}
182181

183182
impl Instance {

analytic_engine/src/instance/open.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::{
1010
use common_types::schema::IndexInWriterSchema;
1111
use log::{debug, error, info, trace, warn};
1212
use snafu::ResultExt;
13-
use table_engine::{engine::OpenTableRequest, remote::RemoteEngineRef};
13+
use table_engine::engine::OpenTableRequest;
1414
use tokio::sync::oneshot;
1515
use wal::{
1616
log_batch::LogEntry,
@@ -54,7 +54,6 @@ impl Instance {
5454
wal_manager: WalManagerRef,
5555
store_picker: ObjectStorePickerRef,
5656
sst_factory: SstFactoryRef,
57-
remote_engine_ref: Option<RemoteEngineRef>,
5857
) -> Result<Arc<Self>> {
5958
let space_store = Arc::new(SpaceStore {
6059
spaces: RwLock::new(Spaces::default()),
@@ -111,7 +110,6 @@ impl Instance {
111110
.map(|v| v.as_byte() as usize),
112111
iter_options,
113112
scan_options,
114-
remote_engine: remote_engine_ref,
115113
});
116114

117115
Ok(instance)

analytic_engine/src/manifest/details.rs

+6-44
Original file line numberDiff line numberDiff line change
@@ -689,17 +689,13 @@ where
689689
mod tests {
690690
use std::{path::PathBuf, sync::Arc, vec};
691691

692-
use bytes::Bytes;
693692
use common_types::{
694693
column_schema, datum::DatumKind, schema, schema::Schema, table::DEFAULT_SHARD_ID,
695694
};
696695
use common_util::{runtime, runtime::Runtime, tests::init_log_for_test};
697696
use futures::future::BoxFuture;
698697
use object_store::LocalFileSystem;
699-
use table_engine::{
700-
partition::{HashPartitionInfo, PartitionDefinition, PartitionInfo},
701-
table::{SchemaId, TableId, TableSeqGenerator},
702-
};
698+
use table_engine::table::{SchemaId, TableId, TableSeqGenerator};
703699
use wal::rocks_impl::manager::Builder as WalBuilder;
704700

705701
use super::*;
@@ -831,23 +827,6 @@ mod tests {
831827
table_name,
832828
schema: common_types::tests::build_schema(),
833829
opts: TableOptions::default(),
834-
partition_info: None,
835-
})
836-
}
837-
838-
fn meta_update_add_table_with_partition_info(
839-
&self,
840-
table_id: TableId,
841-
partition_info: Option<PartitionInfo>,
842-
) -> MetaUpdate {
843-
let table_name = Self::table_name_from_id(table_id);
844-
MetaUpdate::AddTable(AddTableMeta {
845-
space_id: self.schema_id.as_u32(),
846-
table_id,
847-
table_name,
848-
schema: common_types::tests::build_schema(),
849-
opts: TableOptions::default(),
850-
partition_info,
851830
})
852831
}
853832

@@ -897,16 +876,14 @@ mod tests {
897876
async fn add_table_with_manifest(
898877
&self,
899878
table_id: TableId,
900-
partition_info: Option<PartitionInfo>,
901879
manifest_data_builder: &mut TableManifestDataBuilder,
902880
manifest: &ManifestImpl,
903881
) {
904882
let shard_info = TableShardInfo {
905883
shard_id: DEFAULT_SHARD_ID,
906884
};
907885

908-
let add_table =
909-
self.meta_update_add_table_with_partition_info(table_id, partition_info);
886+
let add_table = self.meta_update_add_table(table_id);
910887
let update_req = {
911888
MetaUpdateRequest {
912889
shard_info,
@@ -967,7 +944,7 @@ mod tests {
967944
manifest_data_builder: &mut TableManifestDataBuilder,
968945
) {
969946
let manifest = self.open_manifest().await;
970-
self.add_table_with_manifest(table_id, None, manifest_data_builder, &manifest)
947+
self.add_table_with_manifest(table_id, manifest_data_builder, &manifest)
971948
.await;
972949
}
973950

@@ -1128,26 +1105,11 @@ mod tests {
11281105

11291106
runtime.block_on(async move {
11301107
let table_id = ctx.alloc_table_id();
1131-
let default_version = 0;
1132-
let partition_info = Some(PartitionInfo::Hash(HashPartitionInfo {
1133-
version: default_version,
1134-
definitions: vec![PartitionDefinition {
1135-
name: "p0".to_string(),
1136-
origin_name: Some("region0".to_string()),
1137-
}],
1138-
expr: Bytes::from("test"),
1139-
linear: false,
1140-
}));
11411108
let location = WalLocation::new(DEFAULT_SHARD_ID as u64, table_id.as_u64());
11421109
let mut manifest_data_builder = TableManifestDataBuilder::default();
11431110
let manifest = ctx.open_manifest().await;
1144-
ctx.add_table_with_manifest(
1145-
table_id,
1146-
partition_info,
1147-
&mut manifest_data_builder,
1148-
&manifest,
1149-
)
1150-
.await;
1111+
ctx.add_table_with_manifest(table_id, &mut manifest_data_builder, &manifest)
1112+
.await;
11511113

11521114
manifest
11531115
.maybe_do_snapshot(ctx.schema_id.as_u32(), table_id, location, true)
@@ -1188,7 +1150,7 @@ mod tests {
11881150
};
11891151
let mut manifest_data_builder = TableManifestDataBuilder::default();
11901152
let manifest = ctx.open_manifest().await;
1191-
ctx.add_table_with_manifest(table_id, None, &mut manifest_data_builder, &manifest)
1153+
ctx.add_table_with_manifest(table_id, &mut manifest_data_builder, &manifest)
11921154
.await;
11931155

11941156
for i in 0..500 {

analytic_engine/src/manifest/meta_update.rs

+4-17
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
1+
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.
22

33
//! Update to meta
44
@@ -13,7 +13,7 @@ use common_types::{
1313
use common_util::define_result;
1414
use prost::Message;
1515
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
16-
use table_engine::{partition::PartitionInfo, table::TableId};
16+
use table_engine::table::TableId;
1717
use wal::log_batch::{Payload, PayloadDecoder};
1818

1919
use crate::{
@@ -42,11 +42,6 @@ pub enum Error {
4242
#[snafu(display("Failed to convert schema, err:{}", source))]
4343
ConvertSchema { source: common_types::schema::Error },
4444

45-
#[snafu(display("Failed to convert partition info, err:{}", source))]
46-
ConvertPartitionInfo {
47-
source: table_engine::partition::Error,
48-
},
49-
5045
#[snafu(display("Empty table schema.\nBacktrace:\n{}", backtrace))]
5146
EmptyTableSchema { backtrace: Backtrace },
5247

@@ -155,19 +150,18 @@ pub struct AddTableMeta {
155150
pub schema: Schema,
156151
// Options needed to persist
157152
pub opts: TableOptions,
158-
pub partition_info: Option<PartitionInfo>,
159153
}
160154

161155
impl From<AddTableMeta> for manifest_pb::AddTableMeta {
162156
fn from(v: AddTableMeta) -> Self {
163-
let partition_info = v.partition_info.map(|v| v.into());
164157
manifest_pb::AddTableMeta {
165158
space_id: v.space_id,
166159
table_id: v.table_id.as_u64(),
167160
table_name: v.table_name,
168161
schema: Some(schema_pb::TableSchema::from(&v.schema)),
169162
options: Some(manifest_pb::TableOptions::from(v.opts)),
170-
partition_info,
163+
// Deprecated.
164+
partition_info: None,
171165
}
172166
}
173167
}
@@ -178,20 +172,13 @@ impl TryFrom<manifest_pb::AddTableMeta> for AddTableMeta {
178172
fn try_from(src: manifest_pb::AddTableMeta) -> Result<Self> {
179173
let table_schema = src.schema.context(EmptyTableSchema)?;
180174
let opts = src.options.context(EmptyTableOptions)?;
181-
let partition_info = match src.partition_info {
182-
Some(partition_info) => {
183-
Some(PartitionInfo::try_from(partition_info).context(ConvertPartitionInfo)?)
184-
}
185-
None => None,
186-
};
187175

188176
Ok(Self {
189177
space_id: src.space_id,
190178
table_id: TableId::from(src.table_id),
191179
table_name: src.table_name,
192180
schema: Schema::try_from(table_schema).context(ConvertSchema)?,
193181
opts: TableOptions::try_from(opts).context(ConvertTableOptions)?,
194-
partition_info,
195182
})
196183
}
197184
}

0 commit comments

Comments
 (0)