Skip to content

Commit 936cac5

Browse files
committed
address CR.
1 parent 7057334 commit 936cac5

File tree

12 files changed

+114
-132
lines changed

12 files changed

+114
-132
lines changed

server/src/handlers/prom.rs

+2-5
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use query_engine::executor::{Executor as QueryExecutor, RecordBatchVec};
2525
use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};
2626
use warp::reject;
2727

28-
use super::query::{QueryRequest, QueryType};
28+
use super::query::QueryRequest;
2929
use crate::{
3030
context::RequestContext, handlers, instance::InstanceRef,
3131
schema_config_provider::SchemaConfigProviderRef,
@@ -292,10 +292,7 @@ impl<Q: QueryExecutor + 'static> RemoteStorage for CeresDBStorage<Q> {
292292
TIMESTAMP_COLUMN
293293
);
294294

295-
let request = QueryRequest {
296-
query_type: QueryType::Sql,
297-
request: sql.into(),
298-
};
295+
let request = QueryRequest::Sql(sql.into());
299296
let result = handlers::query::handle_query(ctx, self.instance.clone(), request)
300297
.await
301298
.map_err(Box::new)

server/src/handlers/query.rs

+22-23
Original file line numberDiff line numberDiff line change
@@ -107,15 +107,18 @@ impl From<Bytes> for Request {
107107
}
108108

109109
#[derive(Debug)]
110-
pub enum QueryType {
111-
Sql,
112-
Influxql,
110+
pub enum QueryRequest {
111+
Sql(Request),
112+
// TODO: influxql include more parameters, we should add it in later.
113+
Influxql(Request),
113114
}
114-
115-
#[derive(Debug)]
116-
pub struct QueryRequest {
117-
pub query_type: QueryType,
118-
pub request: Request,
115+
impl QueryRequest {
116+
fn query(&self) -> &str {
117+
match self {
118+
QueryRequest::Sql(request) => request.query.as_str(),
119+
QueryRequest::Influxql(request) => request.query.as_str(),
120+
}
121+
}
119122
}
120123

121124
pub async fn handle_query<Q: QueryExecutor + 'static>(
@@ -143,12 +146,8 @@ pub async fn handle_query<Q: QueryExecutor + 'static>(
143146
let frontend = Frontend::new(provider);
144147
let mut sql_ctx = SqlContext::new(request_id, deadline);
145148

146-
let QueryRequest {
147-
request,
148-
query_type,
149-
} = query_request;
150-
let plan = match query_type {
151-
QueryType::Sql => {
149+
let plan = match &query_request {
150+
QueryRequest::Sql(request) => {
152151
// Parse sql, frontend error of invalid sql already contains sql
153152
// TODO(yingwen): Maybe move sql from frontend error to outer error
154153
let mut stmts = frontend
@@ -165,7 +164,7 @@ pub async fn handle_query<Q: QueryExecutor + 'static>(
165164
stmts.len() == 1,
166165
TooMuchStmt {
167166
len: stmts.len(),
168-
query: request.query,
167+
query: &request.query,
169168
}
170169
);
171170

@@ -178,7 +177,7 @@ pub async fn handle_query<Q: QueryExecutor + 'static>(
178177
})?
179178
}
180179

181-
QueryType::Influxql => {
180+
QueryRequest::Influxql(request) => {
182181
let mut stmts = frontend
183182
.parse_influxql(&mut sql_ctx, &request.query)
184183
.context(ParseInfluxql)?;
@@ -191,7 +190,7 @@ pub async fn handle_query<Q: QueryExecutor + 'static>(
191190
stmts.len() == 1,
192191
TooMuchStmt {
193192
len: stmts.len(),
194-
query: request.query,
193+
query: &request.query,
195194
}
196195
);
197196

@@ -204,7 +203,7 @@ pub async fn handle_query<Q: QueryExecutor + 'static>(
204203
};
205204

206205
instance.limiter.try_limit(&plan).context(QueryBlock {
207-
query: &request.query,
206+
query: query_request.query(),
208207
})?;
209208

210209
// Execute in interpreter
@@ -223,7 +222,7 @@ pub async fn handle_query<Q: QueryExecutor + 'static>(
223222
interpreter_factory
224223
.create(interpreter_ctx, plan)
225224
.context(InterpreterExec {
226-
query: &request.query,
225+
query: query_request.query(),
227226
})?;
228227

229228
let output = if let Some(deadline) = deadline {
@@ -233,24 +232,24 @@ pub async fn handle_query<Q: QueryExecutor + 'static>(
233232
)
234233
.await
235234
.context(QueryTimeout {
236-
query: &request.query,
235+
query: query_request.query(),
237236
})
238237
.and_then(|v| {
239238
v.context(InterpreterExec {
240-
query: &request.query,
239+
query: query_request.query(),
241240
})
242241
})?
243242
} else {
244243
interpreter.execute().await.context(InterpreterExec {
245-
query: &request.query,
244+
query: query_request.query(),
246245
})?
247246
};
248247

249248
info!(
250249
"Query handler finished, request_id:{}, cost:{}ms, request:{:?}",
251250
request_id,
252251
begin_instant.saturating_elapsed().as_millis(),
253-
request
252+
query_request
254253
);
255254

256255
Ok(output)

server/src/http.rs

+8-14
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,7 @@ use crate::{
3131
consts,
3232
context::RequestContext,
3333
error_util,
34-
handlers::{
35-
self,
36-
prom::CeresDBStorage,
37-
query::{QueryType, Request},
38-
},
34+
handlers::{self, prom::CeresDBStorage, query::Request},
3935
instance::InstanceRef,
4036
metrics,
4137
schema_config_provider::SchemaConfigProviderRef,
@@ -187,10 +183,7 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
187183
.and(self.with_context())
188184
.and(self.with_instance())
189185
.and_then(|req, ctx, instance| async move {
190-
let req = QueryRequest {
191-
query_type: QueryType::Sql,
192-
request: req,
193-
};
186+
let req = QueryRequest::Sql(req);
194187
let result = handlers::query::handle_query(&ctx, instance, req)
195188
.await
196189
.map(handlers::query::convert_output)
@@ -207,7 +200,9 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
207200
})
208201
}
209202

210-
// POST /sql
203+
// POST /influxql
204+
// this request type is not what influxdb API expected, the one in influxdb:
205+
// https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-http-endpoint
211206
fn influxql(
212207
&self,
213208
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
@@ -223,12 +218,11 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
223218
.and(self.with_context())
224219
.and(self.with_instance())
225220
.and_then(|req, ctx, instance| async move {
226-
let req = QueryRequest {
227-
query_type: QueryType::Influxql,
228-
request: req,
229-
};
221+
let req = QueryRequest::Influxql(req);
230222
let result = handlers::query::handle_query(&ctx, instance, req)
231223
.await
224+
// TODO: the sql's `convert_output` function may be not suitable to influxql.
225+
// We should implement influxql's related function in later.
232226
.map(handlers::query::convert_output)
233227
.map_err(|e| {
234228
// TODO(yingwen): Maybe truncate and print the sql

server/src/mysql/worker.rs

+2-5
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::{
1313
context::RequestContext,
1414
handlers::{
1515
self,
16-
query::{QueryRequest, QueryType, Request},
16+
query::{QueryRequest, Request},
1717
},
1818
instance::Instance,
1919
mysql::{
@@ -113,10 +113,7 @@ where
113113
async fn do_query<'a>(&'a mut self, sql: &'a str) -> Result<Output> {
114114
let ctx = self.create_ctx()?;
115115
let req = Request::from(sql.to_string());
116-
let req = QueryRequest {
117-
query_type: QueryType::Sql,
118-
request: req,
119-
};
116+
let req = QueryRequest::Sql(req);
120117
handlers::query::handle_query(&ctx, self.instance.clone(), req)
121118
.await
122119
.map_err(|e| {

sql/src/influxql/mod.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,8 @@
44
55
pub mod planner;
66
pub(crate) mod select;
7-
#[cfg(test)]
8-
pub mod test_util;
97
pub(crate) mod util;
8+
109
pub mod error {
1110
use common_util::error::GenericError;
1211
use snafu::{Backtrace, Snafu};
@@ -36,7 +35,7 @@ pub mod error {
3635
RewriteNoCause { msg: String, backtrace: Backtrace },
3736

3837
#[snafu(display(
39-
"Failed to convert to sql statement, msg:{}..\nBacktrace:{}",
38+
"Failed to convert to sql statement, msg:{}.\nBacktrace:{}",
4039
msg,
4140
backtrace
4241
))]

sql/src/influxql/planner.rs

+5
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ impl<'a, P: MetaProvider> Planner<'a, P> {
2727
Self { sql_planner }
2828
}
2929

30+
/// Build sql logical plan from [InfluxqlStatement].
31+
///
32+
/// NOTICE: when building plan from influxql select statement,
33+
/// the [InfluxqlStatement] will be converted to [SqlStatement] first,
34+
/// and build plan then.
3035
pub fn statement_to_plan(self, stmt: InfluxqlStatement) -> Result<Plan> {
3136
match stmt {
3237
InfluxqlStatement::Select(stmt) => self.select_to_plan(*stmt),

sql/src/influxql/select/converter.rs

+23-25
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ impl Converter {
3535
// - offset
3636
// - select body
3737
let limit = stmt.limit.map(|limit| {
38-
let limit_n: u64 = *limit;
38+
let limit_n = *limit;
3939
Expr::Value(Value::Number(limit_n.to_string(), false))
4040
});
4141

@@ -75,7 +75,7 @@ impl Converter {
7575
ensure!(
7676
stmt.from.len() == 1,
7777
Unimplemented {
78-
msg: "from multiple measurements",
78+
msg: "select from multiple measurements",
7979
}
8080
);
8181
let measurement_name = match &stmt.from[0] {
@@ -90,7 +90,7 @@ impl Converter {
9090
},
9191
MeasurementSelection::Subquery(_) => {
9292
return Unimplemented {
93-
msg: "from subquery",
93+
msg: "select from subquery",
9494
}
9595
.fail()
9696
}
@@ -252,32 +252,32 @@ fn call_to_sql_expr(scope: ExprScope, name: &str, args: &[InfluxqlExpr]) -> Resu
252252
})
253253
.collect::<Result<Vec<_>>>()?;
254254

255-
Ok(Expr::Function(Function {
255+
return Ok(Expr::Function(Function {
256256
name,
257257
args,
258258
over: None,
259259
distinct: false,
260260
special: false,
261-
}))
262-
} else {
263-
match scope {
264-
ExprScope::Projection => Unimplemented {
265-
msg: "aggregate and selector functions in projection list",
266-
}
267-
.fail(),
261+
}));
262+
}
268263

269-
ExprScope::Where => {
270-
if name.eq_ignore_ascii_case("now") {
271-
Unimplemented {
272-
msg: "now() in where clause",
273-
}
274-
.fail()
275-
} else {
276-
Convert {
277-
msg: format!("invalid function call in condition: {name}"),
278-
}
279-
.fail()
264+
match scope {
265+
ExprScope::Projection => Unimplemented {
266+
msg: "aggregate and selector functions in projection list",
267+
}
268+
.fail(),
269+
270+
ExprScope::Where => {
271+
if name.eq_ignore_ascii_case("now") {
272+
Unimplemented {
273+
msg: "now() in where clause",
280274
}
275+
.fail()
276+
} else {
277+
Convert {
278+
msg: format!("invalid function call in condition: {name}"),
279+
}
280+
.fail()
281281
}
282282
}
283283
}
@@ -355,9 +355,7 @@ mod test {
355355
use sqlparser::ast::Statement as SqlStatement;
356356

357357
use crate::{
358-
ast::Statement,
359-
influxql::{select::converter::Converter, test_util::parse_select},
360-
parser::Parser,
358+
ast::Statement, influxql::select::converter::Converter, parser::Parser, tests::parse_select,
361359
};
362360

363361
#[test]

sql/src/influxql/select/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.
22

3-
//! Statement level select converting from influxql to sql
3+
//! Convert influxql to sql at statement level
44
55
pub(crate) mod converter;
66
pub(crate) mod rewriter;

0 commit comments

Comments
 (0)