Skip to content

Commit b281d61

Browse files
authored
Merge 2e9c523 into 796c209
2 parents 796c209 + 2e9c523 commit b281d61

File tree

49 files changed

+763
-416
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+763
-416
lines changed

Cargo.lock

+19
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ members = [
3838
"integration_tests/sdk/rust",
3939
"interpreters",
4040
"meta_client",
41+
"partition_table_engine",
4142
"query_engine",
4243
"remote_engine_client",
4344
"router",
@@ -91,6 +92,7 @@ interpreters = { path = "interpreters" }
9192
itertools = "0.10.5"
9293
meta_client = { path = "meta_client" }
9394
object_store = { path = "components/object_store" }
95+
partition_table_engine = { path = "partition_table_engine" }
9496
parquet_ext = { path = "components/parquet_ext" }
9597
parquet = { version = "36.0.0" }
9698
paste = "1.0"

analytic_engine/src/engine.rs

+20-56
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
1+
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.
22

33
//! Implements the TableEngine trait
44
@@ -7,21 +7,17 @@ use std::sync::Arc;
77
use async_trait::async_trait;
88
use common_util::error::BoxError;
99
use log::info;
10-
use snafu::{OptionExt, ResultExt};
10+
use snafu::ResultExt;
1111
use table_engine::{
1212
engine::{
1313
Close, CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, Result,
14-
TableEngine, Unexpected, UnexpectedNoCause,
14+
TableEngine,
1515
},
1616
table::{SchemaId, TableRef},
1717
ANALYTIC_ENGINE_TYPE,
1818
};
1919

20-
use crate::{
21-
instance::InstanceRef,
22-
space::SpaceId,
23-
table::{partition::PartitionTableImpl, TableImpl},
24-
};
20+
use crate::{instance::InstanceRef, space::SpaceId, table::TableImpl};
2521

2622
/// TableEngine implementation
2723
pub struct TableEngineImpl {
@@ -76,30 +72,14 @@ impl TableEngine for TableEngineImpl {
7672

7773
let space_table = self.instance.create_table(space_id, request).await?;
7874

79-
let table_impl: TableRef = match &space_table.table_data().partition_info {
80-
None => Arc::new(TableImpl::new(
81-
self.instance.clone(),
82-
ANALYTIC_ENGINE_TYPE.to_string(),
83-
space_id,
84-
space_table.table_data().id,
85-
space_table.table_data().clone(),
86-
space_table,
87-
)),
88-
Some(_v) => Arc::new(
89-
PartitionTableImpl::new(
90-
self.instance
91-
.remote_engine
92-
.clone()
93-
.context(UnexpectedNoCause {
94-
msg: "remote engine not found",
95-
})?,
96-
ANALYTIC_ENGINE_TYPE.to_string(),
97-
space_table,
98-
)
99-
.box_err()
100-
.context(Unexpected)?,
101-
),
102-
};
75+
let table_impl: TableRef = Arc::new(TableImpl::new(
76+
self.instance.clone(),
77+
ANALYTIC_ENGINE_TYPE.to_string(),
78+
space_id,
79+
space_table.table_data().id,
80+
space_table.table_data().clone(),
81+
space_table,
82+
));
10383

10484
Ok(table_impl)
10585
}
@@ -128,30 +108,14 @@ impl TableEngine for TableEngineImpl {
128108
None => return Ok(None),
129109
};
130110

131-
let table_impl: TableRef = match &space_table.table_data().partition_info {
132-
None => Arc::new(TableImpl::new(
133-
self.instance.clone(),
134-
ANALYTIC_ENGINE_TYPE.to_string(),
135-
space_id,
136-
space_table.table_data().id,
137-
space_table.table_data().clone(),
138-
space_table,
139-
)),
140-
Some(_v) => Arc::new(
141-
PartitionTableImpl::new(
142-
self.instance
143-
.remote_engine
144-
.clone()
145-
.context(UnexpectedNoCause {
146-
msg: "remote engine is empty",
147-
})?,
148-
ANALYTIC_ENGINE_TYPE.to_string(),
149-
space_table,
150-
)
151-
.box_err()
152-
.context(Unexpected)?,
153-
),
154-
};
111+
let table_impl = Arc::new(TableImpl::new(
112+
self.instance.clone(),
113+
ANALYTIC_ENGINE_TYPE.to_string(),
114+
space_id,
115+
space_table.table_data().id,
116+
space_table.table_data().clone(),
117+
space_table,
118+
));
155119

156120
Ok(Some(table_impl))
157121
}

analytic_engine/src/instance/create.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
1+
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.
22

33
//! Create table logic of instance
44
@@ -113,7 +113,6 @@ impl Instance {
113113
table_name: table_data.name.clone(),
114114
schema: table_data.schema(),
115115
opts: table_data.table_options().as_ref().clone(),
116-
partition_info: table_data.partition_info.clone(),
117116
});
118117
MetaUpdateRequest {
119118
shard_info: table_data.shard_info,

analytic_engine/src/instance/mod.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
1+
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.
22

33
//! A table engine instance
44
//!
@@ -27,7 +27,7 @@ use common_util::{define_result, runtime::Runtime};
2727
use log::info;
2828
use mem_collector::MemUsageCollector;
2929
use snafu::{ResultExt, Snafu};
30-
use table_engine::{engine::EngineRuntimes, remote::RemoteEngineRef};
30+
use table_engine::engine::EngineRuntimes;
3131
use wal::manager::{WalLocation, WalManagerRef};
3232

3333
use crate::{
@@ -177,7 +177,6 @@ pub struct Instance {
177177
/// Options for scanning sst
178178
pub(crate) scan_options: ScanOptions,
179179
pub(crate) iter_options: Option<IterOptions>,
180-
pub(crate) remote_engine: Option<RemoteEngineRef>,
181180
}
182181

183182
impl Instance {

analytic_engine/src/instance/open.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::{
1010
use common_types::schema::IndexInWriterSchema;
1111
use log::{debug, error, info, trace, warn};
1212
use snafu::ResultExt;
13-
use table_engine::{engine::OpenTableRequest, remote::RemoteEngineRef};
13+
use table_engine::engine::OpenTableRequest;
1414
use tokio::sync::oneshot;
1515
use wal::{
1616
log_batch::LogEntry,
@@ -54,7 +54,6 @@ impl Instance {
5454
wal_manager: WalManagerRef,
5555
store_picker: ObjectStorePickerRef,
5656
sst_factory: SstFactoryRef,
57-
remote_engine_ref: Option<RemoteEngineRef>,
5857
) -> Result<Arc<Self>> {
5958
let space_store = Arc::new(SpaceStore {
6059
spaces: RwLock::new(Spaces::default()),
@@ -111,7 +110,6 @@ impl Instance {
111110
.map(|v| v.as_byte() as usize),
112111
iter_options,
113112
scan_options,
114-
remote_engine: remote_engine_ref,
115113
});
116114

117115
Ok(instance)

analytic_engine/src/manifest/details.rs

+6-44
Original file line numberDiff line numberDiff line change
@@ -689,17 +689,13 @@ where
689689
mod tests {
690690
use std::{path::PathBuf, sync::Arc, vec};
691691

692-
use bytes::Bytes;
693692
use common_types::{
694693
column_schema, datum::DatumKind, schema, schema::Schema, table::DEFAULT_SHARD_ID,
695694
};
696695
use common_util::{runtime, runtime::Runtime, tests::init_log_for_test};
697696
use futures::future::BoxFuture;
698697
use object_store::LocalFileSystem;
699-
use table_engine::{
700-
partition::{HashPartitionInfo, PartitionDefinition, PartitionInfo},
701-
table::{SchemaId, TableId, TableSeqGenerator},
702-
};
698+
use table_engine::table::{SchemaId, TableId, TableSeqGenerator};
703699
use wal::rocks_impl::manager::Builder as WalBuilder;
704700

705701
use super::*;
@@ -831,23 +827,6 @@ mod tests {
831827
table_name,
832828
schema: common_types::tests::build_schema(),
833829
opts: TableOptions::default(),
834-
partition_info: None,
835-
})
836-
}
837-
838-
fn meta_update_add_table_with_partition_info(
839-
&self,
840-
table_id: TableId,
841-
partition_info: Option<PartitionInfo>,
842-
) -> MetaUpdate {
843-
let table_name = Self::table_name_from_id(table_id);
844-
MetaUpdate::AddTable(AddTableMeta {
845-
space_id: self.schema_id.as_u32(),
846-
table_id,
847-
table_name,
848-
schema: common_types::tests::build_schema(),
849-
opts: TableOptions::default(),
850-
partition_info,
851830
})
852831
}
853832

@@ -897,16 +876,14 @@ mod tests {
897876
async fn add_table_with_manifest(
898877
&self,
899878
table_id: TableId,
900-
partition_info: Option<PartitionInfo>,
901879
manifest_data_builder: &mut TableManifestDataBuilder,
902880
manifest: &ManifestImpl,
903881
) {
904882
let shard_info = TableShardInfo {
905883
shard_id: DEFAULT_SHARD_ID,
906884
};
907885

908-
let add_table =
909-
self.meta_update_add_table_with_partition_info(table_id, partition_info);
886+
let add_table = self.meta_update_add_table(table_id);
910887
let update_req = {
911888
MetaUpdateRequest {
912889
shard_info,
@@ -967,7 +944,7 @@ mod tests {
967944
manifest_data_builder: &mut TableManifestDataBuilder,
968945
) {
969946
let manifest = self.open_manifest().await;
970-
self.add_table_with_manifest(table_id, None, manifest_data_builder, &manifest)
947+
self.add_table_with_manifest(table_id, manifest_data_builder, &manifest)
971948
.await;
972949
}
973950

@@ -1128,26 +1105,11 @@ mod tests {
11281105

11291106
runtime.block_on(async move {
11301107
let table_id = ctx.alloc_table_id();
1131-
let default_version = 0;
1132-
let partition_info = Some(PartitionInfo::Hash(HashPartitionInfo {
1133-
version: default_version,
1134-
definitions: vec![PartitionDefinition {
1135-
name: "p0".to_string(),
1136-
origin_name: Some("region0".to_string()),
1137-
}],
1138-
expr: Bytes::from("test"),
1139-
linear: false,
1140-
}));
11411108
let location = WalLocation::new(DEFAULT_SHARD_ID as u64, table_id.as_u64());
11421109
let mut manifest_data_builder = TableManifestDataBuilder::default();
11431110
let manifest = ctx.open_manifest().await;
1144-
ctx.add_table_with_manifest(
1145-
table_id,
1146-
partition_info,
1147-
&mut manifest_data_builder,
1148-
&manifest,
1149-
)
1150-
.await;
1111+
ctx.add_table_with_manifest(table_id, &mut manifest_data_builder, &manifest)
1112+
.await;
11511113

11521114
manifest
11531115
.maybe_do_snapshot(ctx.schema_id.as_u32(), table_id, location, true)
@@ -1188,7 +1150,7 @@ mod tests {
11881150
};
11891151
let mut manifest_data_builder = TableManifestDataBuilder::default();
11901152
let manifest = ctx.open_manifest().await;
1191-
ctx.add_table_with_manifest(table_id, None, &mut manifest_data_builder, &manifest)
1153+
ctx.add_table_with_manifest(table_id, &mut manifest_data_builder, &manifest)
11921154
.await;
11931155

11941156
for i in 0..500 {

0 commit comments

Comments
 (0)