Skip to content

Commit 7057334

Browse files
committed
add influxql http interface.
1 parent b36471b commit 7057334

File tree

11 files changed

+156
-42
lines changed

11 files changed

+156
-42
lines changed

server/src/handlers/error.rs

+3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ pub enum Error {
1414
#[snafu(display("Failed to parse sql, err:{}", source))]
1515
ParseSql { source: sql::frontend::Error },
1616

17+
#[snafu(display("Failed to parse influxql, err:{}", source))]
18+
ParseInfluxql { source: sql::frontend::Error },
19+
1720
#[snafu(display("Failed to create plan, query:{}, err:{}", query, source))]
1821
CreatePlan {
1922
query: String,

server/src/handlers/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
pub mod admin;
66
pub mod error;
77
pub mod prom;
8-
pub mod sql;
8+
pub mod query;
99

1010
mod prelude {
1111
pub use catalog::manager::Manager as CatalogManager;

server/src/handlers/prom.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use query_engine::executor::{Executor as QueryExecutor, RecordBatchVec};
2525
use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};
2626
use warp::reject;
2727

28+
use super::query::{QueryRequest, QueryType};
2829
use crate::{
2930
context::RequestContext, handlers, instance::InstanceRef,
3031
schema_config_provider::SchemaConfigProviderRef,
@@ -291,7 +292,11 @@ impl<Q: QueryExecutor + 'static> RemoteStorage for CeresDBStorage<Q> {
291292
TIMESTAMP_COLUMN
292293
);
293294

294-
let result = handlers::sql::handle_sql(ctx, self.instance.clone(), sql.into())
295+
let request = QueryRequest {
296+
query_type: QueryType::Sql,
297+
request: sql.into(),
298+
};
299+
let result = handlers::query::handle_query(ctx, self.instance.clone(), request)
295300
.await
296301
.map_err(Box::new)
297302
.context(SqlHandle)?;

server/src/handlers/sql.rs server/src/handlers/query.rs

+77-31
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ use sql::{
2424
};
2525

2626
use crate::handlers::{
27-
error::{CreatePlan, InterpreterExec, ParseSql, QueryBlock, QueryTimeout, TooMuchStmt},
27+
error::{
28+
CreatePlan, InterpreterExec, ParseInfluxql, ParseSql, QueryBlock, QueryTimeout, TooMuchStmt,
29+
},
2830
prelude::*,
2931
};
3032

@@ -104,18 +106,30 @@ impl From<Bytes> for Request {
104106
}
105107
}
106108

107-
pub async fn handle_sql<Q: QueryExecutor + 'static>(
109+
#[derive(Debug)]
110+
pub enum QueryType {
111+
Sql,
112+
Influxql,
113+
}
114+
115+
#[derive(Debug)]
116+
pub struct QueryRequest {
117+
pub query_type: QueryType,
118+
pub request: Request,
119+
}
120+
121+
pub async fn handle_query<Q: QueryExecutor + 'static>(
108122
ctx: &RequestContext,
109123
instance: InstanceRef<Q>,
110-
request: Request,
124+
query_request: QueryRequest,
111125
) -> Result<Output> {
112126
let request_id = RequestId::next_id();
113127
let begin_instant = Instant::now();
114128
let deadline = ctx.timeout.map(|t| begin_instant + t);
115129

116130
info!(
117-
"sql handler try to process request, request_id:{}, request:{:?}",
118-
request_id, request
131+
"Query handler try to process request, request_id:{}, request:{:?}",
132+
request_id, query_request
119133
);
120134

121135
// TODO(yingwen): Privilege check, cannot access data of other tenant
@@ -127,35 +141,67 @@ pub async fn handle_sql<Q: QueryExecutor + 'static>(
127141
function_registry: &*instance.function_registry,
128142
};
129143
let frontend = Frontend::new(provider);
130-
131144
let mut sql_ctx = SqlContext::new(request_id, deadline);
132-
// Parse sql, frontend error of invalid sql already contains sql
133-
// TODO(yingwen): Maybe move sql from frontend error to outer error
134-
let mut stmts = frontend
135-
.parse_sql(&mut sql_ctx, &request.query)
136-
.context(ParseSql)?;
137-
138-
if stmts.is_empty() {
139-
return Ok(Output::AffectedRows(0));
140-
}
141145

142-
// TODO(yingwen): For simplicity, we only support executing one statement now
143-
// TODO(yingwen): INSERT/UPDATE/DELETE can be batched
144-
ensure!(
145-
stmts.len() == 1,
146-
TooMuchStmt {
147-
len: stmts.len(),
148-
query: request.query,
146+
let QueryRequest {
147+
request,
148+
query_type,
149+
} = query_request;
150+
let plan = match query_type {
151+
QueryType::Sql => {
152+
// Parse sql, frontend error of invalid sql already contains sql
153+
// TODO(yingwen): Maybe move sql from frontend error to outer error
154+
let mut stmts = frontend
155+
.parse_sql(&mut sql_ctx, &request.query)
156+
.context(ParseSql)?;
157+
158+
if stmts.is_empty() {
159+
return Ok(Output::AffectedRows(0));
160+
}
161+
162+
// TODO(yingwen): For simplicity, we only support executing one statement now
163+
// TODO(yingwen): INSERT/UPDATE/DELETE can be batched
164+
ensure!(
165+
stmts.len() == 1,
166+
TooMuchStmt {
167+
len: stmts.len(),
168+
query: request.query,
169+
}
170+
);
171+
172+
// Create logical plan
173+
// Note: Remember to store sql in error when creating logical plan
174+
frontend
175+
.statement_to_plan(&mut sql_ctx, stmts.remove(0))
176+
.context(CreatePlan {
177+
query: &request.query,
178+
})?
149179
}
150-
);
151180

152-
// Create logical plan
153-
// Note: Remember to store sql in error when creating logical plan
154-
let plan = frontend
155-
.statement_to_plan(&mut sql_ctx, stmts.remove(0))
156-
.context(CreatePlan {
157-
query: &request.query,
158-
})?;
181+
QueryType::Influxql => {
182+
let mut stmts = frontend
183+
.parse_influxql(&mut sql_ctx, &request.query)
184+
.context(ParseInfluxql)?;
185+
186+
if stmts.is_empty() {
187+
return Ok(Output::AffectedRows(0));
188+
}
189+
190+
ensure!(
191+
stmts.len() == 1,
192+
TooMuchStmt {
193+
len: stmts.len(),
194+
query: request.query,
195+
}
196+
);
197+
198+
frontend
199+
.influxql_stmt_to_plan(&mut sql_ctx, stmts.remove(0))
200+
.context(CreatePlan {
201+
query: &request.query,
202+
})?
203+
}
204+
};
159205

160206
instance.limiter.try_limit(&plan).context(QueryBlock {
161207
query: &request.query,
@@ -201,7 +247,7 @@ pub async fn handle_sql<Q: QueryExecutor + 'static>(
201247
};
202248

203249
info!(
204-
"sql handler finished, request_id:{}, cost:{}ms, request:{:?}",
250+
"Query handler finished, request_id:{}, cost:{}ms, request:{:?}",
205251
request_id,
206252
begin_instant.saturating_elapsed().as_millis(),
207253
request

server/src/http.rs

+49-3
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::{
88
};
99

1010
use common_util::error::BoxError;
11+
use handlers::query::QueryRequest;
1112
use log::{error, info};
1213
use logger::RuntimeLevel;
1314
use profile::Profiler;
@@ -30,7 +31,11 @@ use crate::{
3031
consts,
3132
context::RequestContext,
3233
error_util,
33-
handlers::{self, prom::CeresDBStorage, sql::Request},
34+
handlers::{
35+
self,
36+
prom::CeresDBStorage,
37+
query::{QueryType, Request},
38+
},
3439
instance::InstanceRef,
3540
metrics,
3641
schema_config_provider::SchemaConfigProviderRef,
@@ -126,6 +131,7 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
126131
self.home()
127132
.or(self.metrics())
128133
.or(self.sql())
134+
.or(self.influxql())
129135
.or(self.heap_profile())
130136
.or(self.admin_block())
131137
.or(self.flush_memtable())
@@ -181,9 +187,49 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
181187
.and(self.with_context())
182188
.and(self.with_instance())
183189
.and_then(|req, ctx, instance| async move {
184-
let result = handlers::sql::handle_sql(&ctx, instance, req)
190+
let req = QueryRequest {
191+
query_type: QueryType::Sql,
192+
request: req,
193+
};
194+
let result = handlers::query::handle_query(&ctx, instance, req)
195+
.await
196+
.map(handlers::query::convert_output)
197+
.map_err(|e| {
198+
// TODO(yingwen): Maybe truncate and print the sql
199+
error!("Http service Failed to handle sql, err:{}", e);
200+
Box::new(e)
201+
})
202+
.context(HandleRequest);
203+
match result {
204+
Ok(res) => Ok(reply::json(&res)),
205+
Err(e) => Err(reject::custom(e)),
206+
}
207+
})
208+
}
209+
210+
// POST /sql
211+
fn influxql(
212+
&self,
213+
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
214+
// accept json or plain text
215+
let extract_request = warp::body::json()
216+
.or(warp::body::bytes().map(Request::from))
217+
.unify();
218+
219+
warp::path!("influxql")
220+
.and(warp::post())
221+
.and(warp::body::content_length_limit(self.config.max_body_size))
222+
.and(extract_request)
223+
.and(self.with_context())
224+
.and(self.with_instance())
225+
.and_then(|req, ctx, instance| async move {
226+
let req = QueryRequest {
227+
query_type: QueryType::Influxql,
228+
request: req,
229+
};
230+
let result = handlers::query::handle_query(&ctx, instance, req)
185231
.await
186-
.map(handlers::sql::convert_output)
232+
.map(handlers::query::convert_output)
187233
.map_err(|e| {
188234
// TODO(yingwen): Maybe truncate and print the sql
189235
error!("Http service Failed to handle sql, err:{}", e);

server/src/mysql/worker.rs

+9-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ use table_engine::engine::EngineRuntimes;
1111

1212
use crate::{
1313
context::RequestContext,
14-
handlers::{self, sql::Request},
14+
handlers::{
15+
self,
16+
query::{QueryRequest, QueryType, Request},
17+
},
1518
instance::Instance,
1619
mysql::{
1720
error::{CreateContext, HandleSql, Result},
@@ -109,9 +112,12 @@ where
109112
{
110113
async fn do_query<'a>(&'a mut self, sql: &'a str) -> Result<Output> {
111114
let ctx = self.create_ctx()?;
112-
113115
let req = Request::from(sql.to_string());
114-
handlers::sql::handle_sql(&ctx, self.instance.clone(), req)
116+
let req = QueryRequest {
117+
query_type: QueryType::Sql,
118+
request: req,
119+
};
120+
handlers::query::handle_query(&ctx, self.instance.clone(), req)
115121
.await
116122
.map_err(|e| {
117123
error!("Mysql service Failed to handle sql, err: {}", e);

sql/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@ df_operator = { workspace = true }
2929
hashbrown = { version = "0.12", features = ["raw"] }
3030
influxdb_influxql_parser = { git = "https://github.com/Rachelint/influxdb_iox.git", branch = "influxql-parser" }
3131
itertools = { workspace = true }
32+
lazy_static = { workspace = true }
3233
log = { workspace = true }
3334
paste = { workspace = true }
3435
regex = "1"
3536
regex-syntax = "0.6.28"
3637
snafu = { workspace = true }
3738
sqlparser = { workspace = true }
3839
table_engine = { workspace = true }
39-
lazy_static = { workspace = true }
4040

4141
[dev-dependencies]
4242
common_types = { workspace = true, features = ["test"] }

sql/src/influxql/planner.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.
22

3-
//! Influxql planner.
3+
//! Influxql planner
44
55
use common_util::error::{BoxError, GenericResult};
66
use influxdb_influxql_parser::{
@@ -10,8 +10,8 @@ use snafu::ResultExt;
1010
use sqlparser::ast::Statement as SqlStatement;
1111
use table_engine::table::TableRef;
1212

13-
use super::select::{converter::Converter, rewriter::Rewriter};
1413
use crate::{
14+
influxql::select::{converter::Converter, rewriter::Rewriter},
1515
plan::Plan,
1616
planner::{BuildInfluxqlPlan, Result},
1717
provider::MetaProvider,

sql/src/influxql/select/converter.rs

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ use sqlparser::ast::{
2020
use crate::influxql::{error::*, util};
2121

2222
/// Influxql select statement converter
23+
// Derived from influxdb_iox:
24+
// https://github.com/influxdata/influxdb_iox/blob/ff11fe465d02faf6c4dd3017df8750b38d4afd2b/iox_query/src/plan/influxql/planner.rs
2325
#[allow(dead_code)]
2426
pub struct Converter;
2527

sql/src/influxql/select/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,6 @@
1+
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.
2+
3+
//! Statement level select converting from influxql to sql
4+
15
pub(crate) mod converter;
26
pub(crate) mod rewriter;

sql/src/influxql/test_util.rs

+2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.
22

3+
//! Test utils
4+
35
use common_util::error::GenericResult;
46
use datafusion::sql::TableReference;
57
use influxdb_influxql_parser::{parse_statements, select::SelectStatement, statement::Statement};

0 commit comments

Comments
 (0)