Skip to content

Commit 2e9c523

Browse files
committed
refactor by CR
1 parent fbe29ee commit 2e9c523

File tree

5 files changed

+79
-73
lines changed

5 files changed

+79
-73
lines changed

catalog/src/schema.rs

+1
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ pub struct CreateTableRequest {
192192
/// Table name
193193
pub table_name: String,
194194
/// Table id
195+
// TODO: remove this field
195196
pub table_id: Option<TableId>,
196197
/// Table schema
197198
pub table_schema: common_types::schema::Schema,

catalog_impls/src/volatile.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ impl Schema for SchemaImpl {
313313
.with_context(|| schema::CreateTable {
314314
request: request.clone(),
315315
msg: format!("table with shards is not found in the ShardTableManager, catalog_name:{}, schema_name:{}, table_name:{}",
316-
request.catalog_name,request.schema_name,request.table_name),
316+
request.catalog_name, request.schema_name, request.table_name),
317317
})?;
318318
}
319319
let request = request.into_engine_create_request(None, self.schema_id);

router/src/cluster_based.rs

+66-56
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
1+
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.
22

33
//! A router based on the [`cluster::Cluster`].
44
55
use async_trait::async_trait;
66
use ceresdbproto::storage::{Route, RouteRequest};
77
use cluster::ClusterRef;
88
use common_util::error::BoxError;
9-
use meta_client::types::RouteTablesRequest;
9+
use meta_client::types::{RouteTablesRequest, TableInfo};
1010
use moka::future::Cache;
1111
use snafu::ResultExt;
1212

@@ -15,9 +15,16 @@ use crate::{
1515
RouteCacheConfig, Router,
1616
};
1717

18+
#[derive(Clone)]
19+
struct RouteData {
20+
table_name: String,
21+
table_info: TableInfo,
22+
endpoint: Option<Endpoint>,
23+
}
24+
1825
pub struct ClusterBasedRouter {
1926
cluster: ClusterRef,
20-
cache: Option<Cache<String, Route>>,
27+
cache: Option<Cache<String, RouteData>>,
2128
}
2229

2330
impl ClusterBasedRouter {
@@ -39,7 +46,7 @@ impl ClusterBasedRouter {
3946

4047
/// route table from local cache, return cache routes and tables which are
4148
/// not in cache
42-
fn route_from_cache(&self, tables: Vec<String>, routes: &mut Vec<Route>) -> Vec<String> {
49+
fn route_from_cache(&self, tables: Vec<String>, routes: &mut Vec<RouteData>) -> Vec<String> {
4350
let mut miss = vec![];
4451

4552
if let Some(cache) = &self.cache {
@@ -56,33 +63,20 @@ impl ClusterBasedRouter {
5663

5764
miss
5865
}
59-
}
60-
61-
/// Make a route according to the table name and the raw endpoint.
62-
fn make_route(table_name: &str, endpoint: &str) -> Result<Route> {
63-
let endpoint: Endpoint = endpoint.parse().context(ParseEndpoint { endpoint })?;
64-
65-
Ok(Route {
66-
table: table_name.to_string(),
67-
endpoint: Some(endpoint.into()),
68-
})
69-
}
70-
71-
#[async_trait]
72-
impl Router for ClusterBasedRouter {
73-
async fn route(&self, req: RouteRequest) -> Result<Vec<Route>> {
74-
let req_ctx = req.context.unwrap();
7566

67+
async fn route_with_cache(
68+
&self,
69+
tables: Vec<String>,
70+
database: String,
71+
) -> Result<Vec<RouteData>> {
7672
// Firstly route table from local cache.
77-
let mut routes = Vec::with_capacity(req.tables.len());
78-
let miss = self.route_from_cache(req.tables, &mut routes);
79-
73+
let mut routes = Vec::with_capacity(tables.len());
74+
let miss = self.route_from_cache(tables, &mut routes);
8075
if miss.is_empty() {
8176
return Ok(routes);
8277
}
83-
8478
let route_tables_req = RouteTablesRequest {
85-
schema_name: req_ctx.database,
79+
schema_name: database,
8680
table_names: miss,
8781
};
8882

@@ -99,7 +93,7 @@ impl Router for ClusterBasedRouter {
9993
for (table_name, route_entry) in route_resp.entries {
10094
for node_shard in route_entry.node_shards {
10195
if node_shard.shard_info.is_leader() {
102-
let route = make_route(&table_name, &node_shard.endpoint)?;
96+
let route = make_route(route_entry.table_info.clone(), &node_shard.endpoint)?;
10397
if let Some(cache) = &self.cache {
10498
// There may be data race here, and it is acceptable currently.
10599
cache.insert(table_name.clone(), route.clone()).await;
@@ -108,43 +102,59 @@ impl Router for ClusterBasedRouter {
108102
}
109103
}
110104
}
111-
return Ok(routes);
105+
Ok(routes)
106+
}
107+
}
108+
109+
/// Make a route according to the table_info and the raw endpoint.
110+
fn make_route(table_info: TableInfo, endpoint: &str) -> Result<RouteData> {
111+
let endpoint: Endpoint = endpoint.parse().context(ParseEndpoint { endpoint })?;
112+
113+
Ok(RouteData {
114+
table_name: table_info.name.clone(),
115+
table_info,
116+
endpoint: Some(endpoint),
117+
})
118+
}
119+
120+
#[async_trait]
121+
impl Router for ClusterBasedRouter {
122+
async fn route(&self, req: RouteRequest) -> Result<Vec<Route>> {
123+
let req_ctx = req.context.unwrap();
124+
let route_data_vec = self.route_with_cache(req.tables, req_ctx.database).await?;
125+
Ok(route_data_vec
126+
.into_iter()
127+
.map(|v| Route {
128+
table: v.table_name,
129+
endpoint: v.endpoint.map(Into::into),
130+
})
131+
.collect())
112132
}
113133

114134
async fn fetch_partition_table_info(
115135
&self,
116136
schema: &str,
117137
table: &str,
118138
) -> Result<Option<PartitionTableInfo>> {
119-
let route_tables_req = RouteTablesRequest {
120-
schema_name: schema.to_string(),
121-
table_names: vec![table.to_string()],
122-
};
123-
let route_resp = self
124-
.cluster
125-
.route_tables(&route_tables_req)
126-
.await
127-
.box_err()
128-
.with_context(|| OtherWithCause {
129-
msg: format!("Failed to route tables by cluster, req:{route_tables_req:?}"),
130-
})?;
139+
let mut route_data_vec = self
140+
.route_with_cache(vec![table.to_string()], schema.to_string())
141+
.await?;
142+
if route_data_vec.is_empty() {
143+
return Ok(None);
144+
}
131145

132-
let table_info = route_resp
133-
.entries
134-
.get(table)
135-
.map(|entry| entry.table_info.clone());
136-
if let Some(v) = table_info {
137-
if v.partition_info.is_some() {
138-
let partition_table_info = PartitionTableInfo {
139-
id: v.id,
140-
name: v.name,
141-
schema_id: v.schema_id,
142-
schema_name: v.schema_name,
143-
partition_info: v.partition_info.unwrap(),
144-
};
145-
return Ok(Some(partition_table_info));
146-
}
146+
let route_data = route_data_vec.remove(0);
147+
let table_info = route_data.table_info;
148+
if table_info.partition_info.is_some() {
149+
return Ok(Some(PartitionTableInfo {
150+
id: table_info.id,
151+
name: table_info.name,
152+
schema_id: table_info.schema_id,
153+
schema_name: table_info.schema_name,
154+
partition_info: table_info.partition_info.unwrap(),
155+
}));
147156
}
157+
148158
Ok(None)
149159
}
150160
}
@@ -288,7 +298,7 @@ mod tests {
288298
let mut routes = Vec::with_capacity(tables.len());
289299
let miss = router.route_from_cache(tables, &mut routes);
290300
assert_eq!(routes.len(), 1);
291-
assert_eq!(routes[0].table, table1.to_string());
301+
assert_eq!(routes[0].table_name, table1.to_string());
292302
assert_eq!(miss.len(), 0);
293303

294304
// sleep 1.5s, table2 will be evicted, and table1 in cache
@@ -297,7 +307,7 @@ mod tests {
297307
let mut routes = Vec::with_capacity(tables.len());
298308
let miss = router.route_from_cache(tables, &mut routes);
299309
assert_eq!(routes.len(), 1);
300-
assert_eq!(routes[0].table, table1.to_string());
310+
assert_eq!(routes[0].table_name, table1.to_string());
301311
assert_eq!(miss.len(), 1);
302312
assert_eq!(miss[0], table2.to_string());
303313
}

server/src/grpc/meta_event_service/mod.rs

+9-14
Original file line numberDiff line numberDiff line change
@@ -333,22 +333,17 @@ async fn handle_create_table_on_shard(
333333
),
334334
})?;
335335

336-
let partition_info = match table_info.partition_info {
337-
Some(v) => Some(
338-
PartitionInfo::try_from(v.clone())
339-
.box_err()
340-
.with_context(|| ErrWithCause {
336+
let (table_engine, partition_info) = match table_info.partition_info {
337+
Some(v) => {
338+
let partition_info = Some(PartitionInfo::try_from(v.clone()).box_err().with_context(
339+
|| ErrWithCause {
341340
code: StatusCode::BadRequest,
342341
msg: format!("fail to parse partition info, partition_info:{v:?}"),
343-
})?,
344-
),
345-
None => None,
346-
};
347-
348-
let table_engine = if partition_info.is_some() {
349-
ctx.partition_table_engine.clone()
350-
} else {
351-
ctx.table_engine.clone()
342+
},
343+
)?);
344+
(ctx.partition_table_engine.clone(), partition_info)
345+
}
346+
None => (ctx.table_engine.clone(), None),
352347
};
353348

354349
// Build create table request and options.

server/src/proxy/grpc/sql_query.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
306306
// Open partition table if needed.
307307
let table_name = frontend::parse_table_name(&stmts);
308308
if let Some(table_name) = table_name {
309-
self.open_partition_table_if_not_exist(catalog, schema, &table_name)
309+
self.maybe_open_partition_table_if_not_exist(catalog, schema, &table_name)
310310
.await?;
311311
}
312312

@@ -392,7 +392,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
392392
Ok(output)
393393
}
394394

395-
async fn open_partition_table_if_not_exist(
395+
async fn maybe_open_partition_table_if_not_exist(
396396
&self,
397397
catalog_name: &str,
398398
schema_name: &str,

0 commit comments

Comments
 (0)