Skip to content

Commit 7640eec

Browse files
feat: support auto create table config (apache#713)
* support auto create config * Update server/src/config.rs Co-authored-by: Jiacai Liu <dev@liujiacai.net> * define a new struct --------- Co-authored-by: Jiacai Liu <dev@liujiacai.net>
1 parent 08d72cd commit 7640eec

File tree

7 files changed

+80
-21
lines changed

7 files changed

+80
-21
lines changed

.gitignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
target
22
.DS_Store
33
.idea/
4-
.vscode
4+
.vscode

server/src/config.rs

+5
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@ pub struct ServerConfig {
106106

107107
/// Config for forwarding
108108
pub forward: forward::Config,
109+
110+
/// Whether to create table automatically when data is first written, only
111+
/// used in gRPC
112+
pub auto_create_table: bool,
109113
}
110114

111115
impl Default for ServerConfig {
@@ -120,6 +124,7 @@ impl Default for ServerConfig {
120124
grpc_server_cq_count: 20,
121125
resp_compress_min_length: ReadableSize::mb(4),
122126
forward: forward::Config::default(),
127+
auto_create_table: true,
123128
}
124129
}
125130
}

server/src/grpc/mod.rs

+8
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ pub struct Builder<Q> {
212212
opened_wals: Option<OpenedWals>,
213213
schema_config_provider: Option<SchemaConfigProviderRef>,
214214
forward_config: Option<forward::Config>,
215+
auto_create_table: bool,
215216
}
216217

217218
impl<Q> Builder<Q> {
@@ -228,6 +229,7 @@ impl<Q> Builder<Q> {
228229
opened_wals: None,
229230
schema_config_provider: None,
230231
forward_config: None,
232+
auto_create_table: true,
231233
}
232234
}
233235

@@ -287,6 +289,11 @@ impl<Q> Builder<Q> {
287289
self.timeout = timeout;
288290
self
289291
}
292+
293+
pub fn auto_create_table(mut self, auto_create_table: bool) -> Self {
294+
self.auto_create_table = auto_create_table;
295+
self
296+
}
290297
}
291298

292299
impl<Q: QueryExecutor + 'static> Builder<Q> {
@@ -339,6 +346,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
339346
forwarder,
340347
timeout: self.timeout,
341348
resp_compress_min_length: self.resp_compress_min_length,
349+
auto_create_table: self.auto_create_table,
342350
};
343351
let rpc_server = StorageServiceServer::new(storage_service);
344352

server/src/grpc/storage_service/mod.rs

+9-1
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ pub struct HandlerContext<'a, Q> {
9898
forwarder: Option<ForwarderRef>,
9999
timeout: Option<Duration>,
100100
resp_compress_min_length: usize,
101+
auto_create_table: bool,
101102
}
102103

103104
impl<'a, Q> HandlerContext<'a, Q> {
@@ -110,6 +111,7 @@ impl<'a, Q> HandlerContext<'a, Q> {
110111
forwarder: Option<ForwarderRef>,
111112
timeout: Option<Duration>,
112113
resp_compress_min_length: usize,
114+
auto_create_table: bool,
113115
) -> Self {
114116
// catalog is not exposed to protocol layer
115117
let catalog = instance.catalog_manager.default_catalog_name().to_string();
@@ -123,6 +125,7 @@ impl<'a, Q> HandlerContext<'a, Q> {
123125
forwarder,
124126
timeout,
125127
resp_compress_min_length,
128+
auto_create_table,
126129
}
127130
}
128131

@@ -141,6 +144,7 @@ pub struct StorageServiceImpl<Q: QueryExecutor + 'static> {
141144
pub forwarder: Option<ForwarderRef>,
142145
pub timeout: Option<Duration>,
143146
pub resp_compress_min_length: usize,
147+
pub auto_create_table: bool,
144148
}
145149

146150
macro_rules! handle_request {
@@ -158,6 +162,7 @@ macro_rules! handle_request {
158162
let forwarder = self.forwarder.clone();
159163
let timeout = self.timeout;
160164
let resp_compress_min_length = self.resp_compress_min_length;
165+
let auto_create_table = self.auto_create_table;
161166

162167
// The future spawned by tokio cannot be executed by other executor/runtime, so
163168

@@ -179,7 +184,7 @@ macro_rules! handle_request {
179184
.fail()?
180185
}
181186
let handler_ctx =
182-
HandlerContext::new(header, router, instance, &schema_config_provider, forwarder, timeout, resp_compress_min_length);
187+
HandlerContext::new(header, router, instance, &schema_config_provider, forwarder, timeout, resp_compress_min_length, auto_create_table);
183188
$mod_name::$handle_fn(&handler_ctx, req)
184189
.await
185190
.map_err(|e| {
@@ -254,6 +259,7 @@ impl<Q: QueryExecutor + 'static> StorageServiceImpl<Q> {
254259
self.forwarder.clone(),
255260
self.timeout,
256261
self.resp_compress_min_length,
262+
self.auto_create_table,
257263
);
258264

259265
let mut total_success = 0;
@@ -310,6 +316,7 @@ impl<Q: QueryExecutor + 'static> StorageServiceImpl<Q> {
310316
let forwarder = self.forwarder.clone();
311317
let timeout = self.timeout;
312318
let resp_compress_min_length = self.resp_compress_min_length;
319+
let auto_create_table = self.auto_create_table;
313320

314321
let (tx, rx) = mpsc::channel(STREAM_QUERY_CHANNEL_LEN);
315322
self.runtimes.read_runtime.spawn(async move {
@@ -321,6 +328,7 @@ impl<Q: QueryExecutor + 'static> StorageServiceImpl<Q> {
321328
forwarder,
322329
timeout,
323330
resp_compress_min_length,
331+
auto_create_table
324332
);
325333
let query_req = request.into_inner();
326334
let output = sql_query::fetch_query_output(&handler_ctx, &query_req)

server/src/grpc/storage_service/write.rs

+49-13
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,32 @@ use crate::{
4040
instance::InstanceRef,
4141
};
4242

43+
#[derive(Debug)]
44+
pub struct WriteContext {
45+
pub request_id: RequestId,
46+
pub deadline: Option<Instant>,
47+
pub catalog: String,
48+
pub schema: String,
49+
pub auto_create_table: bool,
50+
}
51+
52+
impl WriteContext {
53+
pub fn new(
54+
request_id: RequestId,
55+
deadline: Option<Instant>,
56+
catalog: String,
57+
schema: String,
58+
) -> Self {
59+
let auto_create_table = true;
60+
Self {
61+
request_id,
62+
deadline,
63+
catalog,
64+
schema,
65+
auto_create_table,
66+
}
67+
}
68+
}
4369
pub(crate) async fn handle_write<Q: QueryExecutor + 'static>(
4470
ctx: &HandlerContext<'_, Q>,
4571
req: WriteRequest,
@@ -70,14 +96,19 @@ pub(crate) async fn handle_write<Q: QueryExecutor + 'static>(
7096
req.table_requests.len(),
7197
);
7298

73-
let plan_vec = write_request_to_insert_plan(
99+
let write_context = WriteContext {
74100
request_id,
75-
catalog,
76-
&schema,
101+
deadline,
102+
catalog: catalog.to_string(),
103+
schema: schema.to_string(),
104+
auto_create_table: ctx.auto_create_table,
105+
};
106+
107+
let plan_vec = write_request_to_insert_plan(
77108
ctx.instance.clone(),
78109
req.table_requests,
79110
schema_config,
80-
deadline,
111+
write_context,
81112
)
82113
.await?;
83114

@@ -169,35 +200,40 @@ pub async fn execute_plan<Q: QueryExecutor + 'static>(
169200
}
170201

171202
pub async fn write_request_to_insert_plan<Q: QueryExecutor + 'static>(
172-
request_id: RequestId,
173-
catalog: &str,
174-
schema: &str,
175203
instance: InstanceRef<Q>,
176204
table_requests: Vec<WriteTableRequest>,
177205
schema_config: Option<&SchemaConfig>,
178-
deadline: Option<Instant>,
206+
write_context: WriteContext,
179207
) -> Result<Vec<InsertPlan>> {
208+
let WriteContext {
209+
request_id,
210+
catalog,
211+
schema,
212+
deadline,
213+
auto_create_table,
214+
} = write_context;
215+
180216
let mut plan_vec = Vec::with_capacity(table_requests.len());
181217

182218
for write_table_req in table_requests {
183219
let table_name = &write_table_req.table;
184-
let mut table = try_get_table(catalog, schema, instance.clone(), table_name)?;
220+
let mut table = try_get_table(&catalog, &schema, instance.clone(), table_name)?;
185221

186-
if table.is_none() {
222+
if table.is_none() && auto_create_table {
187223
// TODO: remove this clone?
188224
let schema_config = schema_config.cloned().unwrap_or_default();
189225
create_table(
190226
request_id,
191-
catalog,
192-
schema,
227+
&catalog,
228+
&schema,
193229
instance.clone(),
194230
&write_table_req,
195231
&schema_config,
196232
deadline,
197233
)
198234
.await?;
199235
// try to get table again
200-
table = try_get_table(catalog, schema, instance.clone(), table_name)?;
236+
table = try_get_table(&catalog, &schema, instance.clone(), table_name)?;
201237
}
202238

203239
match table {

server/src/handlers/prom.rs

+7-6
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ use warp::reject;
2727

2828
use super::query::QueryRequest;
2929
use crate::{
30-
context::RequestContext, handlers, instance::InstanceRef,
31-
schema_config_provider::SchemaConfigProviderRef,
30+
context::RequestContext, grpc::storage_service::write::WriteContext, handlers,
31+
instance::InstanceRef, schema_config_provider::SchemaConfigProviderRef,
3232
};
3333

3434
#[derive(Debug, Snafu)]
@@ -243,14 +243,15 @@ impl<Q: QueryExecutor + 'static> RemoteStorage for CeresDBStorage<Q> {
243243
.schema_config_provider
244244
.schema_config(schema)
245245
.context(SchemaError)?;
246+
247+
let write_context =
248+
WriteContext::new(request_id, deadline, catalog.clone(), schema.clone());
249+
246250
let plans = crate::grpc::storage_service::write::write_request_to_insert_plan(
247-
request_id,
248-
catalog,
249-
schema,
250251
self.instance.clone(),
251252
Self::convert_write_request(req)?,
252253
schema_config,
253-
deadline,
254+
write_context,
254255
)
255256
.await
256257
.context(GRPCWriteError)?;

server/src/server.rs

+1
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
351351
.schema_config_provider(provider)
352352
.forward_config(self.config.forward)
353353
.timeout(self.config.timeout.map(|v| v.0))
354+
.auto_create_table(self.config.auto_create_table)
354355
.build()
355356
.context(BuildGrpcService)?;
356357

0 commit comments

Comments
 (0)