Skip to content

Commit 1b607d9

Browse files
authored
feat: support the simplest influxql raw query (apache#710)
* add influxql statement converter(to sql statement). * complete the influxql planner. * add influxql http interface. * address CR.
1 parent 12c539c commit 1b607d9

File tree

15 files changed

+674
-136
lines changed

15 files changed

+674
-136
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/src/handlers/error.rs

+3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ pub enum Error {
1414
#[snafu(display("Failed to parse sql, err:{}", source))]
1515
ParseSql { source: sql::frontend::Error },
1616

17+
#[snafu(display("Failed to parse influxql, err:{}", source))]
18+
ParseInfluxql { source: sql::frontend::Error },
19+
1720
#[snafu(display("Failed to create plan, query:{}, err:{}", query, source))]
1821
CreatePlan {
1922
query: String,

server/src/handlers/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
pub mod admin;
66
pub mod error;
77
pub mod prom;
8-
pub mod sql;
8+
pub mod query;
99

1010
mod prelude {
1111
pub use catalog::manager::Manager as CatalogManager;

server/src/handlers/prom.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use query_engine::executor::{Executor as QueryExecutor, RecordBatchVec};
2525
use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};
2626
use warp::reject;
2727

28+
use super::query::QueryRequest;
2829
use crate::{
2930
context::RequestContext, handlers, instance::InstanceRef,
3031
schema_config_provider::SchemaConfigProviderRef,
@@ -291,7 +292,8 @@ impl<Q: QueryExecutor + 'static> RemoteStorage for CeresDBStorage<Q> {
291292
TIMESTAMP_COLUMN
292293
);
293294

294-
let result = handlers::sql::handle_sql(ctx, self.instance.clone(), sql.into())
295+
let request = QueryRequest::Sql(sql.into());
296+
let result = handlers::query::handle_query(ctx, self.instance.clone(), request)
295297
.await
296298
.map_err(Box::new)
297299
.context(SqlHandle)?;

server/src/handlers/sql.rs server/src/handlers/query.rs

+82-37
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ use sql::{
2424
};
2525

2626
use crate::handlers::{
27-
error::{CreatePlan, InterpreterExec, ParseSql, QueryBlock, QueryTimeout, TooMuchStmt},
27+
error::{
28+
CreatePlan, InterpreterExec, ParseInfluxql, ParseSql, QueryBlock, QueryTimeout, TooMuchStmt,
29+
},
2830
prelude::*,
2931
};
3032

@@ -104,18 +106,33 @@ impl From<Bytes> for Request {
104106
}
105107
}
106108

107-
pub async fn handle_sql<Q: QueryExecutor + 'static>(
109+
#[derive(Debug)]
110+
pub enum QueryRequest {
111+
Sql(Request),
112+
// TODO: influxql include more parameters, we should add it in later.
113+
Influxql(Request),
114+
}
115+
impl QueryRequest {
116+
fn query(&self) -> &str {
117+
match self {
118+
QueryRequest::Sql(request) => request.query.as_str(),
119+
QueryRequest::Influxql(request) => request.query.as_str(),
120+
}
121+
}
122+
}
123+
124+
pub async fn handle_query<Q: QueryExecutor + 'static>(
108125
ctx: &RequestContext,
109126
instance: InstanceRef<Q>,
110-
request: Request,
127+
query_request: QueryRequest,
111128
) -> Result<Output> {
112129
let request_id = RequestId::next_id();
113130
let begin_instant = Instant::now();
114131
let deadline = ctx.timeout.map(|t| begin_instant + t);
115132

116133
info!(
117-
"sql handler try to process request, request_id:{}, request:{:?}",
118-
request_id, request
134+
"Query handler try to process request, request_id:{}, request:{:?}",
135+
request_id, query_request
119136
);
120137

121138
// TODO(yingwen): Privilege check, cannot access data of other tenant
@@ -127,38 +144,66 @@ pub async fn handle_sql<Q: QueryExecutor + 'static>(
127144
function_registry: &*instance.function_registry,
128145
};
129146
let frontend = Frontend::new(provider);
130-
131147
let mut sql_ctx = SqlContext::new(request_id, deadline);
132-
// Parse sql, frontend error of invalid sql already contains sql
133-
// TODO(yingwen): Maybe move sql from frontend error to outer error
134-
let mut stmts = frontend
135-
.parse_sql(&mut sql_ctx, &request.query)
136-
.context(ParseSql)?;
137-
138-
if stmts.is_empty() {
139-
return Ok(Output::AffectedRows(0));
140-
}
141148

142-
// TODO(yingwen): For simplicity, we only support executing one statement now
143-
// TODO(yingwen): INSERT/UPDATE/DELETE can be batched
144-
ensure!(
145-
stmts.len() == 1,
146-
TooMuchStmt {
147-
len: stmts.len(),
148-
query: request.query,
149+
let plan = match &query_request {
150+
QueryRequest::Sql(request) => {
151+
// Parse sql, frontend error of invalid sql already contains sql
152+
// TODO(yingwen): Maybe move sql from frontend error to outer error
153+
let mut stmts = frontend
154+
.parse_sql(&mut sql_ctx, &request.query)
155+
.context(ParseSql)?;
156+
157+
if stmts.is_empty() {
158+
return Ok(Output::AffectedRows(0));
159+
}
160+
161+
// TODO(yingwen): For simplicity, we only support executing one statement now
162+
// TODO(yingwen): INSERT/UPDATE/DELETE can be batched
163+
ensure!(
164+
stmts.len() == 1,
165+
TooMuchStmt {
166+
len: stmts.len(),
167+
query: &request.query,
168+
}
169+
);
170+
171+
// Create logical plan
172+
// Note: Remember to store sql in error when creating logical plan
173+
frontend
174+
.statement_to_plan(&mut sql_ctx, stmts.remove(0))
175+
.context(CreatePlan {
176+
query: &request.query,
177+
})?
149178
}
150-
);
151179

152-
// Create logical plan
153-
// Note: Remember to store sql in error when creating logical plan
154-
let plan = frontend
155-
.statement_to_plan(&mut sql_ctx, stmts.remove(0))
156-
.context(CreatePlan {
157-
query: &request.query,
158-
})?;
180+
QueryRequest::Influxql(request) => {
181+
let mut stmts = frontend
182+
.parse_influxql(&mut sql_ctx, &request.query)
183+
.context(ParseInfluxql)?;
184+
185+
if stmts.is_empty() {
186+
return Ok(Output::AffectedRows(0));
187+
}
188+
189+
ensure!(
190+
stmts.len() == 1,
191+
TooMuchStmt {
192+
len: stmts.len(),
193+
query: &request.query,
194+
}
195+
);
196+
197+
frontend
198+
.influxql_stmt_to_plan(&mut sql_ctx, stmts.remove(0))
199+
.context(CreatePlan {
200+
query: &request.query,
201+
})?
202+
}
203+
};
159204

160205
instance.limiter.try_limit(&plan).context(QueryBlock {
161-
query: &request.query,
206+
query: query_request.query(),
162207
})?;
163208

164209
// Execute in interpreter
@@ -177,7 +222,7 @@ pub async fn handle_sql<Q: QueryExecutor + 'static>(
177222
interpreter_factory
178223
.create(interpreter_ctx, plan)
179224
.context(InterpreterExec {
180-
query: &request.query,
225+
query: query_request.query(),
181226
})?;
182227

183228
let output = if let Some(deadline) = deadline {
@@ -187,24 +232,24 @@ pub async fn handle_sql<Q: QueryExecutor + 'static>(
187232
)
188233
.await
189234
.context(QueryTimeout {
190-
query: &request.query,
235+
query: query_request.query(),
191236
})
192237
.and_then(|v| {
193238
v.context(InterpreterExec {
194-
query: &request.query,
239+
query: query_request.query(),
195240
})
196241
})?
197242
} else {
198243
interpreter.execute().await.context(InterpreterExec {
199-
query: &request.query,
244+
query: query_request.query(),
200245
})?
201246
};
202247

203248
info!(
204-
"sql handler finished, request_id:{}, cost:{}ms, request:{:?}",
249+
"Query handler finished, request_id:{}, cost:{}ms, request:{:?}",
205250
request_id,
206251
begin_instant.saturating_elapsed().as_millis(),
207-
request
252+
query_request
208253
);
209254

210255
Ok(output)

server/src/http.rs

+43-3
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::{
88
};
99

1010
use common_util::error::BoxError;
11+
use handlers::query::QueryRequest;
1112
use log::{error, info};
1213
use logger::RuntimeLevel;
1314
use profile::Profiler;
@@ -30,7 +31,7 @@ use crate::{
3031
consts,
3132
context::RequestContext,
3233
error_util,
33-
handlers::{self, prom::CeresDBStorage, sql::Request},
34+
handlers::{self, prom::CeresDBStorage, query::Request},
3435
instance::InstanceRef,
3536
metrics,
3637
schema_config_provider::SchemaConfigProviderRef,
@@ -126,6 +127,7 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
126127
self.home()
127128
.or(self.metrics())
128129
.or(self.sql())
130+
.or(self.influxql())
129131
.or(self.heap_profile())
130132
.or(self.admin_block())
131133
.or(self.flush_memtable())
@@ -181,9 +183,47 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
181183
.and(self.with_context())
182184
.and(self.with_instance())
183185
.and_then(|req, ctx, instance| async move {
184-
let result = handlers::sql::handle_sql(&ctx, instance, req)
186+
let req = QueryRequest::Sql(req);
187+
let result = handlers::query::handle_query(&ctx, instance, req)
185188
.await
186-
.map(handlers::sql::convert_output)
189+
.map(handlers::query::convert_output)
190+
.map_err(|e| {
191+
// TODO(yingwen): Maybe truncate and print the sql
192+
error!("Http service Failed to handle sql, err:{}", e);
193+
Box::new(e)
194+
})
195+
.context(HandleRequest);
196+
match result {
197+
Ok(res) => Ok(reply::json(&res)),
198+
Err(e) => Err(reject::custom(e)),
199+
}
200+
})
201+
}
202+
203+
// POST /influxql
204+
// this request type is not what influxdb API expected, the one in influxdb:
205+
// https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-http-endpoint
206+
fn influxql(
207+
&self,
208+
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
209+
// accept json or plain text
210+
let extract_request = warp::body::json()
211+
.or(warp::body::bytes().map(Request::from))
212+
.unify();
213+
214+
warp::path!("influxql")
215+
.and(warp::post())
216+
.and(warp::body::content_length_limit(self.config.max_body_size))
217+
.and(extract_request)
218+
.and(self.with_context())
219+
.and(self.with_instance())
220+
.and_then(|req, ctx, instance| async move {
221+
let req = QueryRequest::Influxql(req);
222+
let result = handlers::query::handle_query(&ctx, instance, req)
223+
.await
224+
// TODO: the sql's `convert_output` function may be not suitable to influxql.
225+
// We should implement influxql's related function in later.
226+
.map(handlers::query::convert_output)
187227
.map_err(|e| {
188228
// TODO(yingwen): Maybe truncate and print the sql
189229
error!("Http service Failed to handle sql, err:{}", e);

server/src/mysql/worker.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ use table_engine::engine::EngineRuntimes;
1111

1212
use crate::{
1313
context::RequestContext,
14-
handlers::{self, sql::Request},
14+
handlers::{
15+
self,
16+
query::{QueryRequest, Request},
17+
},
1518
instance::Instance,
1619
mysql::{
1720
error::{CreateContext, HandleSql, Result},
@@ -109,9 +112,9 @@ where
109112
{
110113
async fn do_query<'a>(&'a mut self, sql: &'a str) -> Result<Output> {
111114
let ctx = self.create_ctx()?;
112-
113115
let req = Request::from(sql.to_string());
114-
handlers::sql::handle_sql(&ctx, self.instance.clone(), req)
116+
let req = QueryRequest::Sql(req);
117+
handlers::query::handle_query(&ctx, self.instance.clone(), req)
115118
.await
116119
.map_err(|e| {
117120
error!("Mysql service Failed to handle sql, err: {}", e);

sql/src/influxql/mod.rs

+9-2
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
//! Influxql processing
44
55
pub mod planner;
6-
pub(crate) mod stmt_rewriter;
6+
pub(crate) mod select;
77
pub(crate) mod util;
8+
89
pub mod error {
910
use common_util::error::GenericError;
1011
use snafu::{Backtrace, Snafu};
@@ -32,7 +33,13 @@ pub mod error {
3233
backtrace
3334
))]
3435
RewriteNoCause { msg: String, backtrace: Backtrace },
35-
}
3636

37+
#[snafu(display(
38+
"Failed to convert to sql statement, msg:{}.\nBacktrace:{}",
39+
msg,
40+
backtrace
41+
))]
42+
Convert { msg: String, backtrace: Backtrace },
43+
}
3744
define_result!(Error);
3845
}

0 commit comments

Comments
 (0)