Skip to content

Commit 7103d3e

Browse files
committed
move table engine proxy to table_engine crate.
1 parent d8f652b commit 7103d3e

File tree

7 files changed

+65
-54
lines changed

7 files changed

+65
-54
lines changed

catalog_impls/src/table_based.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -912,9 +912,10 @@ mod tests {
912912
schema::{CreateOptions, CreateTableRequest, DropOptions, DropTableRequest, SchemaRef},
913913
};
914914
use common_types::table::{DEFAULT_CLUSTER_VERSION, DEFAULT_SHARD_ID};
915-
use server::table_engine::{MemoryTableEngine, TableEngineProxy};
916915
use table_engine::{
917916
engine::{TableEngineRef, TableState},
917+
memory::MemoryTableEngine,
918+
proxy::TableEngineProxy,
918919
ANALYTIC_ENGINE_TYPE,
919920
};
920921

server/src/handlers/influxdb.rs

+2
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,8 @@ fn convert_influx_value(field_value: FieldValue) -> Value {
226226
Value { value: Some(v) }
227227
}
228228

229+
// fn convert_query_result(output: Output)
230+
229231
// TODO: Request and response type don't match influxdb's API now.
230232
pub async fn query<Q: QueryExecutor + 'static>(
231233
ctx: RequestContext,

server/src/lib.rs

-1
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,3 @@ mod metrics;
2222
mod mysql;
2323
pub mod schema_config_provider;
2424
pub mod server;
25-
pub mod table_engine;

src/setup.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,8 @@ use server::{
3030
cluster_based::ClusterBasedProvider, config_based::ConfigBasedProvider,
3131
},
3232
server::Builder,
33-
table_engine::{MemoryTableEngine, TableEngineProxy},
3433
};
35-
use table_engine::engine::EngineRuntimes;
34+
use table_engine::{engine::EngineRuntimes, memory::MemoryTableEngine, proxy::TableEngineProxy};
3635
use tracing_util::{
3736
self,
3837
tracing_appender::{non_blocking::WorkerGuard, rolling::Rotation},

table_engine/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub mod memory;
1010
pub mod partition;
1111
pub mod predicate;
1212
pub mod provider;
13+
pub mod proxy;
1314
pub mod remote;
1415
pub mod stream;
1516
pub mod table;

table_engine/src/memory.rs

+45-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
22

3-
//! In-memory table implementations
3+
//! In-memory table engine implementations
44
55
use std::{
66
collections::HashMap,
@@ -23,14 +23,18 @@ use futures::stream::Stream;
2323
use snafu::{OptionExt, ResultExt};
2424

2525
use crate::{
26+
engine::{
27+
CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, TableEngine,
28+
},
2629
stream::{
2730
self, ErrNoSource, ErrWithSource, PartitionedStreams, RecordBatchStream,
2831
SendableRecordBatchStream,
2932
},
3033
table::{
3134
AlterSchemaRequest, FlushRequest, GetRequest, ReadRequest, Result, Table, TableId,
32-
TableStats, UnsupportedMethod, WriteRequest,
35+
TableRef, TableStats, UnsupportedMethod, WriteRequest,
3336
},
37+
MEMORY_ENGINE_TYPE,
3438
};
3539

3640
type RowGroupVec = Vec<RowGroup>;
@@ -250,3 +254,42 @@ fn build_column_block<'a, I: Iterator<Item = &'a Datum>>(
250254
}
251255
Ok(builder.build())
252256
}
257+
258+
/// Memory table engine implementation
259+
// Mainly for test purpose now
260+
pub struct MemoryTableEngine;
261+
262+
#[async_trait]
263+
impl TableEngine for MemoryTableEngine {
264+
fn engine_type(&self) -> &str {
265+
MEMORY_ENGINE_TYPE
266+
}
267+
268+
async fn close(&self) -> crate::engine::Result<()> {
269+
Ok(())
270+
}
271+
272+
async fn create_table(&self, request: CreateTableRequest) -> crate::engine::Result<TableRef> {
273+
Ok(Arc::new(MemoryTable::new(
274+
request.table_name,
275+
request.table_id,
276+
request.table_schema,
277+
MEMORY_ENGINE_TYPE.to_string(),
278+
)))
279+
}
280+
281+
async fn drop_table(&self, _request: DropTableRequest) -> crate::engine::Result<bool> {
282+
Ok(true)
283+
}
284+
285+
async fn open_table(
286+
&self,
287+
_request: OpenTableRequest,
288+
) -> crate::engine::Result<Option<TableRef>> {
289+
Ok(None)
290+
}
291+
292+
async fn close_table(&self, _request: CloseTableRequest) -> crate::engine::Result<()> {
293+
Ok(())
294+
}
295+
}

server/src/table_engine.rs renamed to table_engine/src/proxy.rs

+14-48
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,19 @@
11
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
22

3-
//! Table engine implementation
4-
5-
use std::sync::Arc;
3+
//! Table engine proxy
64
75
use async_trait::async_trait;
8-
use table_engine::{
6+
7+
use crate::{
98
engine::{
10-
CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, Result,
11-
TableEngine, TableEngineRef, UnknownEngineType,
9+
CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, TableEngine,
10+
TableEngineRef, UnknownEngineType,
1211
},
13-
memory::MemoryTable,
12+
memory::MemoryTableEngine,
1413
table::TableRef,
1514
ANALYTIC_ENGINE_TYPE, MEMORY_ENGINE_TYPE,
1615
};
1716

18-
/// Memory table engine implementation
19-
// Mainly for test purpose now
20-
pub struct MemoryTableEngine;
21-
22-
#[async_trait]
23-
impl TableEngine for MemoryTableEngine {
24-
fn engine_type(&self) -> &str {
25-
MEMORY_ENGINE_TYPE
26-
}
27-
28-
async fn close(&self) -> Result<()> {
29-
Ok(())
30-
}
31-
32-
async fn create_table(&self, request: CreateTableRequest) -> Result<TableRef> {
33-
Ok(Arc::new(MemoryTable::new(
34-
request.table_name,
35-
request.table_id,
36-
request.table_schema,
37-
MEMORY_ENGINE_TYPE.to_string(),
38-
)))
39-
}
40-
41-
async fn drop_table(&self, _request: DropTableRequest) -> Result<bool> {
42-
Ok(true)
43-
}
44-
45-
async fn open_table(&self, _request: OpenTableRequest) -> Result<Option<TableRef>> {
46-
Ok(None)
47-
}
48-
49-
async fn close_table(&self, _request: CloseTableRequest) -> Result<()> {
50-
Ok(())
51-
}
52-
}
53-
5417
/// Route [CreateTableRequest] to the correct engine by its engine type
5518
pub struct TableEngineProxy {
5619
/// Memory table engine
@@ -65,14 +28,14 @@ impl TableEngine for TableEngineProxy {
6528
"TableEngineProxy"
6629
}
6730

68-
async fn close(&self) -> Result<()> {
31+
async fn close(&self) -> crate::engine::Result<()> {
6932
self.memory.close().await?;
7033
self.analytic.close().await?;
7134

7235
Ok(())
7336
}
7437

75-
async fn create_table(&self, request: CreateTableRequest) -> Result<TableRef> {
38+
async fn create_table(&self, request: CreateTableRequest) -> crate::engine::Result<TableRef> {
7639
// TODO(yingwen): Use a map
7740
match request.engine.as_str() {
7841
MEMORY_ENGINE_TYPE => self.memory.create_table(request).await,
@@ -81,7 +44,7 @@ impl TableEngine for TableEngineProxy {
8144
}
8245
}
8346

84-
async fn drop_table(&self, request: DropTableRequest) -> Result<bool> {
47+
async fn drop_table(&self, request: DropTableRequest) -> crate::engine::Result<bool> {
8548
match request.engine.as_str() {
8649
MEMORY_ENGINE_TYPE => self.memory.drop_table(request).await,
8750
ANALYTIC_ENGINE_TYPE => self.analytic.drop_table(request).await,
@@ -90,7 +53,10 @@ impl TableEngine for TableEngineProxy {
9053
}
9154

9255
/// Open table, return error if table not exists
93-
async fn open_table(&self, request: OpenTableRequest) -> Result<Option<TableRef>> {
56+
async fn open_table(
57+
&self,
58+
request: OpenTableRequest,
59+
) -> crate::engine::Result<Option<TableRef>> {
9460
match request.engine.as_str() {
9561
MEMORY_ENGINE_TYPE => self.memory.open_table(request).await,
9662
ANALYTIC_ENGINE_TYPE => self.analytic.open_table(request).await,
@@ -99,7 +65,7 @@ impl TableEngine for TableEngineProxy {
9965
}
10066

10167
/// Close table, it is ok to close a closed table.
102-
async fn close_table(&self, request: CloseTableRequest) -> Result<()> {
68+
async fn close_table(&self, request: CloseTableRequest) -> crate::engine::Result<()> {
10369
match request.engine.as_str() {
10470
MEMORY_ENGINE_TYPE => self.memory.close_table(request).await,
10571
ANALYTIC_ENGINE_TYPE => self.analytic.close_table(request).await,

0 commit comments

Comments
 (0)