Skip to content

Commit 78027f3

Browse files
authored
Merge branch 'main' into remove_ahash
2 parents 42e12e1 + d62e414 commit 78027f3

File tree

6 files changed

+69
-11
lines changed

6 files changed

+69
-11
lines changed

proxy/src/http/prom.rs

+1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
6161
let ctx = ProxyContext {
6262
runtime: self.engine_runtimes.write_runtime.clone(),
6363
timeout: ctx.timeout,
64+
enable_partition_table_access: false,
6465
};
6566

6667
let result = self.handle_write_internal(ctx, table_request).await?;

proxy/src/http/sql.rs

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
3636
let context = Context {
3737
timeout: ctx.timeout,
3838
runtime: self.engine_runtimes.read_runtime.clone(),
39+
enable_partition_table_access: true,
3940
};
4041

4142
match self.handle_sql(context, &ctx.schema, &req.query).await? {

proxy/src/influxdb/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
5757
let proxy_context = Context {
5858
timeout: ctx.timeout,
5959
runtime: self.engine_runtimes.write_runtime.clone(),
60+
enable_partition_table_access: false,
6061
};
6162
let result = self
6263
.handle_write_internal(proxy_context, table_request)

proxy/src/lib.rs

+49-3
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,11 @@ use ceresdbproto::storage::{
3838
use common_types::{request_id::RequestId, table::DEFAULT_SHARD_ID};
3939
use common_util::{error::BoxError, runtime::Runtime};
4040
use futures::FutureExt;
41-
use interpreters::{context::Context as InterpreterContext, factory::Factory, interpreter::Output};
41+
use interpreters::{
42+
context::Context as InterpreterContext,
43+
factory::Factory,
44+
interpreter::{InterpreterPtr, Output},
45+
};
4246
use log::{error, info};
4347
use query_engine::executor::Executor as QueryExecutor;
4448
use query_frontend::plan::Plan;
@@ -362,23 +366,64 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
362366
msg: "Request is blocked",
363367
})?;
364368

369+
let interpreter =
370+
self.build_interpreter(request_id, catalog, schema, plan, deadline, false)?;
371+
Self::interpreter_execute_plan(interpreter, deadline).await
372+
}
373+
374+
async fn execute_plan_involving_partition_table(
375+
&self,
376+
request_id: RequestId,
377+
catalog: &str,
378+
schema: &str,
379+
plan: Plan,
380+
deadline: Option<Instant>,
381+
) -> Result<Output> {
382+
self.instance
383+
.limiter
384+
.try_limit(&plan)
385+
.box_err()
386+
.context(Internal {
387+
msg: "Request is blocked",
388+
})?;
389+
390+
let interpreter =
391+
self.build_interpreter(request_id, catalog, schema, plan, deadline, true)?;
392+
Self::interpreter_execute_plan(interpreter, deadline).await
393+
}
394+
395+
fn build_interpreter(
396+
&self,
397+
request_id: RequestId,
398+
catalog: &str,
399+
schema: &str,
400+
plan: Plan,
401+
deadline: Option<Instant>,
402+
enable_partition_table_access: bool,
403+
) -> Result<InterpreterPtr> {
365404
let interpreter_ctx = InterpreterContext::builder(request_id, deadline)
366405
// Use current ctx's catalog and schema as default catalog and schema
367406
.default_catalog_and_schema(catalog.to_string(), schema.to_string())
407+
.enable_partition_table_access(enable_partition_table_access)
368408
.build();
369409
let interpreter_factory = Factory::new(
370410
self.instance.query_executor.clone(),
371411
self.instance.catalog_manager.clone(),
372412
self.instance.table_engine.clone(),
373413
self.instance.table_manipulator.clone(),
374414
);
375-
let interpreter = interpreter_factory
415+
interpreter_factory
376416
.create(interpreter_ctx, plan)
377417
.box_err()
378418
.context(Internal {
379419
msg: "Failed to create interpreter",
380-
})?;
420+
})
421+
}
381422

423+
async fn interpreter_execute_plan(
424+
interpreter: InterpreterPtr,
425+
deadline: Option<Instant>,
426+
) -> Result<Output> {
382427
if let Some(deadline) = deadline {
383428
tokio::time::timeout_at(
384429
tokio::time::Instant::from_std(deadline),
@@ -406,4 +451,5 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
406451
pub struct Context {
407452
pub timeout: Option<Duration>,
408453
pub runtime: Arc<Runtime>,
454+
pub enable_partition_table_access: bool,
409455
}

proxy/src/read.rs

+11-8
Original file line numberDiff line numberDiff line change
@@ -129,14 +129,17 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
129129
msg: format!("Failed to create plan, query:{sql}"),
130130
})?;
131131

132-
let output = self
133-
.execute_plan(request_id, catalog, schema, plan, deadline)
134-
.await
135-
.box_err()
136-
.with_context(|| ErrWithCause {
137-
code: StatusCode::INTERNAL_SERVER_ERROR,
138-
msg: format!("Failed to execute plan, sql:{sql}"),
139-
})?;
132+
let output = if ctx.enable_partition_table_access {
133+
self.execute_plan_involving_partition_table(request_id, catalog, schema, plan, deadline)
134+
.await
135+
} else {
136+
self.execute_plan(request_id, catalog, schema, plan, deadline)
137+
.await
138+
};
139+
let output = output.box_err().with_context(|| ErrWithCause {
140+
code: StatusCode::INTERNAL_SERVER_ERROR,
141+
msg: format!("Failed to execute plan, sql:{sql}"),
142+
})?;
140143

141144
let cost = begin_instant.saturating_elapsed();
142145
info!("Handle sql query success, catalog:{catalog}, schema:{schema}, request_id:{request_id}, cost:{cost:?}, sql:{sql:?}");

server/src/grpc/storage_service/mod.rs

+6
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ impl<Q: QueryExecutor + 'static> StorageService for StorageServiceImpl<Q> {
137137
let ctx = Context {
138138
runtime: self.runtimes.read_runtime.clone(),
139139
timeout: self.timeout,
140+
enable_partition_table_access: false,
140141
};
141142
let stream = Self::stream_sql_query_internal(ctx, proxy, req).await;
142143

@@ -159,6 +160,7 @@ impl<Q: QueryExecutor + 'static> StorageServiceImpl<Q> {
159160
let ctx = Context {
160161
runtime: self.runtimes.read_runtime.clone(),
161162
timeout: self.timeout,
163+
enable_partition_table_access: false,
162164
};
163165

164166
let join_handle = self
@@ -189,6 +191,7 @@ impl<Q: QueryExecutor + 'static> StorageServiceImpl<Q> {
189191
let ctx = Context {
190192
runtime: self.runtimes.write_runtime.clone(),
191193
timeout: self.timeout,
194+
enable_partition_table_access: false,
192195
};
193196

194197
let join_handle = self.runtimes.write_runtime.spawn(async move {
@@ -228,6 +231,7 @@ impl<Q: QueryExecutor + 'static> StorageServiceImpl<Q> {
228231
let ctx = Context {
229232
runtime: self.runtimes.read_runtime.clone(),
230233
timeout: self.timeout,
234+
enable_partition_table_access: false,
231235
};
232236
let join_handle = self
233237
.runtimes
@@ -290,6 +294,7 @@ impl<Q: QueryExecutor + 'static> StorageServiceImpl<Q> {
290294
let ctx = Context {
291295
runtime: self.runtimes.read_runtime.clone(),
292296
timeout: self.timeout,
297+
enable_partition_table_access: false,
293298
};
294299
let join_handle = self.runtimes.read_runtime.spawn(async move {
295300
if req.context.is_none() {
@@ -329,6 +334,7 @@ impl<Q: QueryExecutor + 'static> StorageServiceImpl<Q> {
329334
let ctx = Context {
330335
runtime: self.runtimes.write_runtime.clone(),
331336
timeout: self.timeout,
337+
enable_partition_table_access: false,
332338
};
333339

334340
let join_handle = self.runtimes.write_runtime.spawn(async move {

0 commit comments

Comments
 (0)