Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit c26ab60

Browse files
committedMar 9, 2023
define a new struct
1 parent 8e617cf commit c26ab60

File tree

7 files changed

+76
-43
lines changed

7 files changed

+76
-43
lines changed
 

‎.gitignore

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
target
22
.DS_Store
33
.idea/
4-
.vscode
5-
proto
4+
.vscode

‎server/src/config.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,9 @@ pub struct ServerConfig {
107107
/// Config for forwarding
108108
pub forward: forward::Config,
109109

110-
/// Whether to create table when data is first written, only used in gRPC
111-
pub auto_create_tables: bool,
110+
/// Whether to create table automatically when data is first written, only
111+
/// used in gRPC
112+
pub auto_create_table: bool,
112113
}
113114

114115
impl Default for ServerConfig {
@@ -123,7 +124,7 @@ impl Default for ServerConfig {
123124
grpc_server_cq_count: 20,
124125
resp_compress_min_length: ReadableSize::mb(4),
125126
forward: forward::Config::default(),
126-
auto_create_tables: true,
127+
auto_create_table: true,
127128
}
128129
}
129130
}

‎server/src/grpc/mod.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ pub struct Builder<Q> {
212212
opened_wals: Option<OpenedWals>,
213213
schema_config_provider: Option<SchemaConfigProviderRef>,
214214
forward_config: Option<forward::Config>,
215-
auto_create_tables: bool,
215+
auto_create_table: bool,
216216
}
217217

218218
impl<Q> Builder<Q> {
@@ -229,7 +229,7 @@ impl<Q> Builder<Q> {
229229
opened_wals: None,
230230
schema_config_provider: None,
231231
forward_config: None,
232-
auto_create_tables: true,
232+
auto_create_table: true,
233233
}
234234
}
235235

@@ -290,8 +290,8 @@ impl<Q> Builder<Q> {
290290
self
291291
}
292292

293-
pub fn auto_create_tables(mut self, auto_create_tables: bool) -> Self {
294-
self.auto_create_tables = auto_create_tables;
293+
pub fn auto_create_table(mut self, auto_create_table: bool) -> Self {
294+
self.auto_create_table = auto_create_table;
295295
self
296296
}
297297
}
@@ -346,7 +346,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
346346
forwarder,
347347
timeout: self.timeout,
348348
resp_compress_min_length: self.resp_compress_min_length,
349-
auto_create_tables: self.auto_create_tables,
349+
auto_create_table: self.auto_create_table,
350350
};
351351
let rpc_server = StorageServiceServer::new(storage_service);
352352

‎server/src/grpc/storage_service/mod.rs

+9-9
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ pub struct HandlerContext<'a, Q> {
9898
forwarder: Option<ForwarderRef>,
9999
timeout: Option<Duration>,
100100
resp_compress_min_length: usize,
101-
auto_create_tables: bool,
101+
auto_create_table: bool,
102102
}
103103

104104
impl<'a, Q> HandlerContext<'a, Q> {
@@ -111,7 +111,7 @@ impl<'a, Q> HandlerContext<'a, Q> {
111111
forwarder: Option<ForwarderRef>,
112112
timeout: Option<Duration>,
113113
resp_compress_min_length: usize,
114-
auto_create_tables: bool,
114+
auto_create_table: bool,
115115
) -> Self {
116116
// catalog is not exposed to protocol layer
117117
let catalog = instance.catalog_manager.default_catalog_name().to_string();
@@ -125,7 +125,7 @@ impl<'a, Q> HandlerContext<'a, Q> {
125125
forwarder,
126126
timeout,
127127
resp_compress_min_length,
128-
auto_create_tables,
128+
auto_create_table,
129129
}
130130
}
131131

@@ -144,7 +144,7 @@ pub struct StorageServiceImpl<Q: QueryExecutor + 'static> {
144144
pub forwarder: Option<ForwarderRef>,
145145
pub timeout: Option<Duration>,
146146
pub resp_compress_min_length: usize,
147-
pub auto_create_tables: bool,
147+
pub auto_create_table: bool,
148148
}
149149

150150
macro_rules! handle_request {
@@ -162,7 +162,7 @@ macro_rules! handle_request {
162162
let forwarder = self.forwarder.clone();
163163
let timeout = self.timeout;
164164
let resp_compress_min_length = self.resp_compress_min_length;
165-
let auto_create_tables = self.auto_create_tables;
165+
let auto_create_table = self.auto_create_table;
166166

167167
// The future spawned by tokio cannot be executed by other executor/runtime, so
168168

@@ -184,7 +184,7 @@ macro_rules! handle_request {
184184
.fail()?
185185
}
186186
let handler_ctx =
187-
HandlerContext::new(header, router, instance, &schema_config_provider, forwarder, timeout, resp_compress_min_length, auto_create_tables);
187+
HandlerContext::new(header, router, instance, &schema_config_provider, forwarder, timeout, resp_compress_min_length, auto_create_table);
188188
$mod_name::$handle_fn(&handler_ctx, req)
189189
.await
190190
.map_err(|e| {
@@ -259,7 +259,7 @@ impl<Q: QueryExecutor + 'static> StorageServiceImpl<Q> {
259259
self.forwarder.clone(),
260260
self.timeout,
261261
self.resp_compress_min_length,
262-
self.auto_create_tables,
262+
self.auto_create_table,
263263
);
264264

265265
let mut total_success = 0;
@@ -316,7 +316,7 @@ impl<Q: QueryExecutor + 'static> StorageServiceImpl<Q> {
316316
let forwarder = self.forwarder.clone();
317317
let timeout = self.timeout;
318318
let resp_compress_min_length = self.resp_compress_min_length;
319-
let auto_create_tables = self.auto_create_tables;
319+
let auto_create_table = self.auto_create_table;
320320

321321
let (tx, rx) = mpsc::channel(STREAM_QUERY_CHANNEL_LEN);
322322
self.runtimes.read_runtime.spawn(async move {
@@ -328,7 +328,7 @@ impl<Q: QueryExecutor + 'static> StorageServiceImpl<Q> {
328328
forwarder,
329329
timeout,
330330
resp_compress_min_length,
331-
auto_create_tables
331+
auto_create_table
332332
);
333333
let query_req = request.into_inner();
334334
let output = sql_query::fetch_query_output(&handler_ctx, &query_req)

‎server/src/grpc/storage_service/write.rs

+49-16
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,32 @@ use crate::{
4040
instance::InstanceRef,
4141
};
4242

43+
#[derive(Debug)]
44+
pub struct WriteContext {
45+
pub request_id: RequestId,
46+
pub deadline: Option<Instant>,
47+
pub catalog: String,
48+
pub schema: String,
49+
pub auto_create_table: bool,
50+
}
51+
52+
impl WriteContext {
53+
pub fn new(
54+
request_id: RequestId,
55+
deadline: Option<Instant>,
56+
catalog: String,
57+
schema: String,
58+
) -> Self {
59+
let auto_create_table = true;
60+
Self {
61+
request_id,
62+
deadline,
63+
catalog,
64+
schema,
65+
auto_create_table,
66+
}
67+
}
68+
}
4369
pub(crate) async fn handle_write<Q: QueryExecutor + 'static>(
4470
ctx: &HandlerContext<'_, Q>,
4571
req: WriteRequest,
@@ -70,15 +96,19 @@ pub(crate) async fn handle_write<Q: QueryExecutor + 'static>(
7096
req.table_requests.len(),
7197
);
7298

73-
let plan_vec = write_request_to_insert_plan(
99+
let write_context = WriteContext {
74100
request_id,
75-
catalog,
76-
&schema,
101+
deadline,
102+
catalog: catalog.to_string(),
103+
schema: schema.to_string(),
104+
auto_create_table: ctx.auto_create_table,
105+
};
106+
107+
let plan_vec = write_request_to_insert_plan(
77108
ctx.instance.clone(),
78109
req.table_requests,
79110
schema_config,
80-
deadline,
81-
ctx.auto_create_tables,
111+
write_context,
82112
)
83113
.await?;
84114

@@ -169,38 +199,41 @@ pub async fn execute_plan<Q: QueryExecutor + 'static>(
169199
})
170200
}
171201

172-
#[allow(clippy::too_many_arguments)]
173202
pub async fn write_request_to_insert_plan<Q: QueryExecutor + 'static>(
174-
request_id: RequestId,
175-
catalog: &str,
176-
schema: &str,
177203
instance: InstanceRef<Q>,
178204
table_requests: Vec<WriteTableRequest>,
179205
schema_config: Option<&SchemaConfig>,
180-
deadline: Option<Instant>,
181-
auto_create_tables: bool,
206+
write_context: WriteContext,
182207
) -> Result<Vec<InsertPlan>> {
208+
let WriteContext {
209+
request_id,
210+
catalog,
211+
schema,
212+
deadline,
213+
auto_create_table,
214+
} = write_context;
215+
183216
let mut plan_vec = Vec::with_capacity(table_requests.len());
184217

185218
for write_table_req in table_requests {
186219
let table_name = &write_table_req.table;
187-
let mut table = try_get_table(catalog, schema, instance.clone(), table_name)?;
220+
let mut table = try_get_table(&catalog, &schema, instance.clone(), table_name)?;
188221

189-
if table.is_none() && auto_create_tables {
222+
if table.is_none() && auto_create_table {
190223
// TODO: remove this clone?
191224
let schema_config = schema_config.cloned().unwrap_or_default();
192225
create_table(
193226
request_id,
194-
catalog,
195-
schema,
227+
&catalog,
228+
&schema,
196229
instance.clone(),
197230
&write_table_req,
198231
&schema_config,
199232
deadline,
200233
)
201234
.await?;
202235
// try to get table again
203-
table = try_get_table(catalog, schema, instance.clone(), table_name)?;
236+
table = try_get_table(&catalog, &schema, instance.clone(), table_name)?;
204237
}
205238

206239
match table {

‎server/src/handlers/prom.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};
2626
use warp::reject;
2727

2828
use crate::{
29-
context::RequestContext, handlers, instance::InstanceRef,
30-
schema_config_provider::SchemaConfigProviderRef,
29+
context::RequestContext, grpc::storage_service::write::WriteContext, handlers,
30+
instance::InstanceRef, schema_config_provider::SchemaConfigProviderRef,
3131
};
3232

3333
#[derive(Debug, Snafu)]
@@ -242,15 +242,15 @@ impl<Q: QueryExecutor + 'static> RemoteStorage for CeresDBStorage<Q> {
242242
.schema_config_provider
243243
.schema_config(schema)
244244
.context(SchemaError)?;
245+
246+
let write_context =
247+
WriteContext::new(request_id, deadline, catalog.clone(), schema.clone());
248+
245249
let plans = crate::grpc::storage_service::write::write_request_to_insert_plan(
246-
request_id,
247-
catalog,
248-
schema,
249250
self.instance.clone(),
250251
Self::convert_write_request(req)?,
251252
schema_config,
252-
deadline,
253-
true,
253+
write_context,
254254
)
255255
.await
256256
.context(GRPCWriteError)?;

‎server/src/server.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
351351
.schema_config_provider(provider)
352352
.forward_config(self.config.forward)
353353
.timeout(self.config.timeout.map(|v| v.0))
354-
.auto_create_tables(self.config.auto_create_tables)
354+
.auto_create_table(self.config.auto_create_table)
355355
.build()
356356
.context(BuildGrpcService)?;
357357

0 commit comments

Comments
 (0)
Please sign in to comment.