Skip to content

Commit ea0759d

Browse files
authored
feat: impl influxdb api with proxy (apache#875)
## Which issue does this PR close? Closes # ## Rationale for this change Implement influxdb api with proxy. ## What changes are included in this PR? * Add `influxdb` in proxy module. * Remove some useless code in http query. ## Are there any user-facing changes? No. ## How does this change test CI.
1 parent 066212b commit ea0759d

File tree

12 files changed

+305
-355
lines changed

12 files changed

+305
-355
lines changed

catalog/src/lib.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,10 @@ pub enum Error {
5353
#[snafu(display("Failed to operate table, msg:{}, err:{}", msg, source))]
5454
TableOperatorWithCause { msg: String, source: GenericError },
5555

56-
#[snafu(display("Failed to operate table, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
57-
TableOperatorNoCause { msg: String, backtrace: Backtrace },
56+
// Fixme: Temporarily remove the stack information, otherwise you will encounter a
57+
// segmentation fault.
58+
#[snafu(display("Failed to operate table, msg:{}.\n", msg))]
59+
TableOperatorNoCause { msg: String },
5860
}
5961

6062
define_result!(Error);

catalog/src/table_operator.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ impl TableOperator {
101101
} else {
102102
TableOperatorNoCause {
103103
msg: format!(
104-
"Failed to open shard, some tables open failed, no table is shard id:{shard_id}, opened count:{no_table_count}, open error count:{open_err_count}"),
104+
"Failed to open shard, some tables open failed, shard id:{shard_id}, no table is opened count:{no_table_count}, open error count:{open_err_count}"),
105105
}.fail()
106106
}
107107
}

proxy/src/handlers/error.rs

+1-12
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
//! Error of handlers
44
5-
use common_util::{define_result, error::GenericError};
5+
use common_util::define_result;
66
use snafu::{Backtrace, Snafu};
77
use warp::reject::Reject;
88

@@ -18,11 +18,6 @@ pub enum Error {
1818
source: query_frontend::frontend::Error,
1919
},
2020

21-
#[snafu(display("Failed to parse influxql, err:{}", source))]
22-
ParseInfluxql {
23-
source: query_frontend::frontend::Error,
24-
},
25-
2621
#[snafu(display("Failed to create plan, query:{}, err:{}", query, source))]
2722
CreatePlan {
2823
query: String,
@@ -77,12 +72,6 @@ pub enum Error {
7772
backtrace: Backtrace,
7873
},
7974

80-
#[snafu(display("InfluxDb handler failed, msg:{}, source:{}", msg, source))]
81-
InfluxDbHandlerWithCause { msg: String, source: GenericError },
82-
83-
#[snafu(display("InfluxDb handler failed, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
84-
InfluxDbHandlerNoCause { msg: String, backtrace: Backtrace },
85-
8675
#[snafu(display("Route handler failed, table:{:?}, source:{}", table, source))]
8776
RouteHandler {
8877
table: String,

proxy/src/handlers/mod.rs

-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
55
pub mod admin;
66
pub(crate) mod error;
7-
pub mod influxdb;
87
pub mod query;
98
pub mod route;
109

proxy/src/handlers/query.rs

+1-31
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,7 @@ use serde::{
2323
use snafu::{ensure, ResultExt};
2424

2525
use crate::handlers::{
26-
error::{
27-
CreatePlan, InterpreterExec, ParseInfluxql, ParseSql, QueryBlock, QueryTimeout, TooMuchStmt,
28-
},
29-
influxdb::InfluxqlRequest,
26+
error::{CreatePlan, InterpreterExec, ParseSql, QueryBlock, QueryTimeout, TooMuchStmt},
3027
prelude::*,
3128
};
3229

@@ -101,14 +98,11 @@ impl From<Bytes> for Request {
10198
#[derive(Debug)]
10299
pub enum QueryRequest {
103100
Sql(Request),
104-
// TODO: influxql include more parameters, we should add it in later.
105-
Influxql(InfluxqlRequest),
106101
}
107102
impl QueryRequest {
108103
pub fn query(&self) -> &str {
109104
match self {
110105
QueryRequest::Sql(request) => request.query.as_str(),
111-
QueryRequest::Influxql(request) => request.query.as_str(),
112106
}
113107
}
114108
}
@@ -168,30 +162,6 @@ pub async fn handle_query<Q: QueryExecutor + 'static>(
168162
query: &request.query,
169163
})?
170164
}
171-
172-
QueryRequest::Influxql(request) => {
173-
let mut stmts = frontend
174-
.parse_influxql(&mut sql_ctx, &request.query)
175-
.context(ParseInfluxql)?;
176-
177-
if stmts.is_empty() {
178-
return Ok(Output::AffectedRows(0));
179-
}
180-
181-
ensure!(
182-
stmts.len() == 1,
183-
TooMuchStmt {
184-
len: stmts.len(),
185-
query: &request.query,
186-
}
187-
);
188-
189-
frontend
190-
.influxql_stmt_to_plan(&mut sql_ctx, stmts.remove(0))
191-
.context(CreatePlan {
192-
query: &request.query,
193-
})?
194-
}
195165
};
196166

197167
instance.limiter.try_limit(&plan).context(QueryBlock {

proxy/src/http/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.
22

33
pub mod prom;
4-
pub mod query;
4+
pub mod sql;

proxy/src/http/query.rs proxy/src/http/sql.rs

+59-113
Original file line numberDiff line numberDiff line change
@@ -33,23 +33,22 @@ use crate::{
3333
error::{ErrNoCause, ErrWithCause, Internal, InternalNoCause, Result},
3434
execute_plan,
3535
forward::ForwardResult,
36-
handlers::influxdb::InfluxqlRequest,
3736
Proxy,
3837
};
3938

4039
impl<Q: QueryExecutor + 'static> Proxy<Q> {
41-
pub async fn handle_query(
40+
pub async fn handle_http_sql_query(
4241
&self,
4342
ctx: &RequestContext,
44-
query_request: QueryRequest,
43+
req: Request,
4544
) -> Result<Output> {
4645
let request_id = RequestId::next_id();
4746
let begin_instant = Instant::now();
4847
let deadline = ctx.timeout.map(|t| begin_instant + t);
4948

5049
info!(
5150
"Query handler try to process request, request_id:{}, request:{:?}",
52-
request_id, query_request
51+
request_id, req
5352
);
5453

5554
// TODO(yingwen): Privilege check, cannot access data of other tenant
@@ -63,110 +62,66 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
6362
let frontend = Frontend::new(provider);
6463
let mut sql_ctx = SqlContext::new(request_id, deadline);
6564

66-
let plan = match &query_request {
67-
QueryRequest::Sql(request) => {
68-
// Parse sql, frontend error of invalid sql already contains sql
69-
// TODO(yingwen): Maybe move sql from frontend error to outer error
70-
let mut stmts = frontend
71-
.parse_sql(&mut sql_ctx, &request.query)
72-
.box_err()
73-
.with_context(|| ErrWithCause {
74-
code: StatusCode::BAD_REQUEST,
75-
msg: format!("Failed to parse sql, query:{}", request.query),
76-
})?;
77-
78-
if stmts.is_empty() {
79-
return Ok(Output::AffectedRows(0));
80-
}
81-
82-
// TODO(yingwen): For simplicity, we only support executing one statement now
83-
// TODO(yingwen): INSERT/UPDATE/DELETE can be batched
84-
ensure!(
85-
stmts.len() == 1,
86-
ErrNoCause {
87-
code: StatusCode::BAD_REQUEST,
88-
msg: format!(
89-
"Only support execute one statement now, current num:{}, query:{}.",
90-
stmts.len(),
91-
request.query
92-
),
93-
}
94-
);
95-
96-
let sql_query_request = SqlQueryRequest {
97-
context: Some(GrpcRequestContext {
98-
database: ctx.schema.clone(),
99-
}),
100-
tables: vec![],
101-
sql: request.query.clone(),
102-
};
103-
104-
if let Some(resp) = self.maybe_forward_sql_query(&sql_query_request).await? {
105-
match resp {
106-
ForwardResult::Forwarded(resp) => {
107-
return convert_sql_response_to_output(resp?)
108-
}
109-
ForwardResult::Local => (),
110-
}
111-
};
112-
113-
// Open partition table if needed.
114-
let table_name = frontend::parse_table_name(&stmts);
115-
if let Some(table_name) = table_name {
116-
self.maybe_open_partition_table_if_not_exist(
117-
&ctx.catalog,
118-
&ctx.schema,
119-
&table_name,
120-
)
121-
.await?;
122-
}
123-
124-
// Create logical plan
125-
// Note: Remember to store sql in error when creating logical plan
126-
frontend
127-
.statement_to_plan(&mut sql_ctx, stmts.remove(0))
128-
.box_err()
129-
.with_context(|| ErrWithCause {
130-
code: StatusCode::BAD_REQUEST,
131-
msg: format!("Failed to build plan, query:{}", request.query),
132-
})?
65+
// Parse sql, frontend error of invalid sql already contains sql
66+
// TODO(yingwen): Maybe move sql from frontend error to outer error
67+
let mut stmts = frontend
68+
.parse_sql(&mut sql_ctx, &req.query)
69+
.box_err()
70+
.with_context(|| ErrWithCause {
71+
code: StatusCode::BAD_REQUEST,
72+
msg: format!("Failed to parse sql, query:{}", req.query),
73+
})?;
74+
75+
if stmts.is_empty() {
76+
return Ok(Output::AffectedRows(0));
77+
}
78+
79+
// TODO(yingwen): For simplicity, we only support executing one statement now
80+
// TODO(yingwen): INSERT/UPDATE/DELETE can be batched
81+
ensure!(
82+
stmts.len() == 1,
83+
ErrNoCause {
84+
code: StatusCode::BAD_REQUEST,
85+
msg: format!(
86+
"Only support execute one statement now, current num:{}, query:{}.",
87+
stmts.len(),
88+
req.query
89+
),
13390
}
91+
);
92+
93+
let sql_query_request = SqlQueryRequest {
94+
context: Some(GrpcRequestContext {
95+
database: ctx.schema.clone(),
96+
}),
97+
tables: vec![],
98+
sql: req.query.clone(),
99+
};
134100

135-
QueryRequest::Influxql(request) => {
136-
let mut stmts = frontend
137-
.parse_influxql(&mut sql_ctx, &request.query)
138-
.box_err()
139-
.with_context(|| ErrWithCause {
140-
code: StatusCode::BAD_REQUEST,
141-
msg: format!("Failed to parse influxql, query:{}", request.query),
142-
})?;
143-
144-
if stmts.is_empty() {
145-
return Ok(Output::AffectedRows(0));
146-
}
147-
148-
ensure!(
149-
stmts.len() == 1,
150-
ErrNoCause {
151-
code: StatusCode::BAD_REQUEST,
152-
msg: format!(
153-
"Only support execute one statement now, current num:{}, query:{}.",
154-
stmts.len(),
155-
request.query
156-
),
157-
}
158-
);
159-
160-
frontend
161-
.influxql_stmt_to_plan(&mut sql_ctx, stmts.remove(0))
162-
.box_err()
163-
.with_context(|| ErrWithCause {
164-
code: StatusCode::BAD_REQUEST,
165-
msg: format!("Failed to build plan, query:{}", request.query),
166-
})?
101+
if let Some(resp) = self.maybe_forward_sql_query(&sql_query_request).await? {
102+
match resp {
103+
ForwardResult::Forwarded(resp) => return convert_sql_response_to_output(resp?),
104+
ForwardResult::Local => (),
167105
}
168106
};
169107

108+
// Open partition table if needed.
109+
let table_name = frontend::parse_table_name(&stmts);
110+
if let Some(table_name) = table_name {
111+
self.maybe_open_partition_table_if_not_exist(&ctx.catalog, &ctx.schema, &table_name)
112+
.await?;
113+
}
114+
115+
// Create logical plan
116+
// Note: Remember to store sql in error when creating logical plan
117+
let plan = frontend
118+
.statement_to_plan(&mut sql_ctx, stmts.remove(0))
119+
.box_err()
120+
.with_context(|| ErrWithCause {
121+
code: StatusCode::BAD_REQUEST,
122+
msg: format!("Failed to build plan, query:{}", req.query),
123+
})?;
124+
170125
self.instance
171126
.limiter
172127
.try_limit(&plan)
@@ -189,7 +144,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
189144
"Query handler finished, request_id:{}, cost:{}ms, request:{:?}",
190145
request_id,
191146
begin_instant.saturating_elapsed().as_millis(),
192-
query_request
147+
req
193148
);
194149

195150
Ok(output)
@@ -259,15 +214,6 @@ impl Serialize for ResponseRows {
259214
}
260215
}
261216

262-
#[derive(Debug)]
263-
pub enum QueryRequest {
264-
Sql(Request),
265-
// TODO: influxql include more parameters, we should add it in later.
266-
// TODO: remove dead_code after implement influxql with proxy
267-
#[allow(dead_code)]
268-
Influxql(InfluxqlRequest),
269-
}
270-
271217
// Convert output to json
272218
pub fn convert_output(output: Output) -> Response {
273219
match output {

0 commit comments

Comments
 (0)