Skip to content

Commit a494746

Browse files
authored
Merge d7914e6 into f8becfc
2 parents f8becfc + d7914e6 commit a494746

File tree

2 files changed

+106
-9
lines changed

2 files changed

+106
-9
lines changed

sql/src/influxql/mod.rs

+15-4
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,22 @@ pub mod error {
1919
))]
2020
BuildSchema { msg: String, backtrace: Backtrace },
2121

22-
#[snafu(display("Failed to build influxql plan, msg:{}, err:{}", msg, source))]
23-
BuildPlan { msg: String, source: GenericError },
22+
#[snafu(display(
23+
"Failed to build influxql plan with cause, msg:{}, err:{}",
24+
msg,
25+
source
26+
))]
27+
BuildPlanWithCause { msg: String, source: GenericError },
28+
29+
#[snafu(display(
30+
"Failed to build influxql plan with no cause, msg:{}.\nBacktrace:{}",
31+
msg,
32+
backtrace
33+
))]
34+
BuildPlanNoCause { msg: String, backtrace: Backtrace },
2435

25-
#[snafu(display("Unimplemented influxql statement, statement:{}", stmt))]
26-
Unimplemented { stmt: String },
36+
#[snafu(display("Unimplemented influxql statement, msg:{}", msg))]
37+
Unimplemented { msg: String },
2738
}
2839
define_result!(Error);
2940
}

sql/src/influxql/planner.rs

+91-5
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,12 @@ use std::sync::Arc;
66

77
use common_util::error::BoxError;
88
use influxql_logical_planner::planner::InfluxQLToLogicalPlan;
9-
use influxql_parser::statement::Statement as InfluxqlStatement;
10-
use snafu::ResultExt;
9+
use influxql_parser::{
10+
common::{MeasurementName, QualifiedMeasurementName},
11+
select::{MeasurementSelection, SelectStatement},
12+
statement::Statement as InfluxqlStatement,
13+
};
14+
use snafu::{ensure, ResultExt};
1115

1216
use crate::{
1317
influxql::{error::*, provider::InfluxSchemaProviderImpl},
@@ -44,13 +48,19 @@ impl<'a, P: MetaProvider> Planner<'a, P> {
4448
| InfluxqlStatement::Delete(_)
4549
| InfluxqlStatement::DropMeasurement(_)
4650
| InfluxqlStatement::Explain(_) => Unimplemented {
47-
stmt: stmt.to_string(),
51+
msg: stmt.to_string(),
4852
}
4953
.fail(),
5054
}
5155
}
5256

5357
pub fn select_to_plan(self, stmt: InfluxqlStatement) -> Result<Plan> {
58+
if let InfluxqlStatement::Select(select_stmt) = &stmt {
59+
check_select_statement(select_stmt)?;
60+
} else {
61+
unreachable!("select statement here has been ensured by caller");
62+
}
63+
5464
let influx_schema_provider = InfluxSchemaProviderImpl {
5565
context_provider: &self.context_provider,
5666
};
@@ -62,18 +72,94 @@ impl<'a, P: MetaProvider> Planner<'a, P> {
6272
let df_plan = influxql_logical_planner
6373
.statement_to_plan(stmt)
6474
.box_err()
65-
.context(BuildPlan {
75+
.context(BuildPlanWithCause {
6676
msg: "build df plan for influxql select statement",
6777
})?;
6878
let tables = Arc::new(
6979
self.context_provider
7080
.try_into_container()
7181
.box_err()
72-
.context(BuildPlan {
82+
.context(BuildPlanWithCause {
7383
msg: "get tables from df plan of select",
7484
})?,
7585
);
7686

7787
Ok(Plan::Query(QueryPlan { df_plan, tables }))
7888
}
7989
}
90+
91+
pub fn check_select_statement(select_stmt: &SelectStatement) -> Result<()> {
92+
// Only support from single measurements now.
93+
ensure!(
94+
!select_stmt.from.is_empty(),
95+
BuildPlanNoCause {
96+
msg: format!("invalid influxql select statement with empty from, stmt:{select_stmt}"),
97+
}
98+
);
99+
ensure!(
100+
select_stmt.from.len() == 1,
101+
Unimplemented {
102+
msg: format!("select from multiple measurements, stmt:{select_stmt}"),
103+
}
104+
);
105+
106+
let from = &select_stmt.from[0];
107+
match from {
108+
MeasurementSelection::Name(name) => {
109+
let QualifiedMeasurementName { name, .. } = name;
110+
111+
match name {
112+
MeasurementName::Regex(_) => Unimplemented {
113+
msg: format!("select from regex, stmt:{select_stmt}"),
114+
}
115+
.fail(),
116+
MeasurementName::Name(_) => Ok(()),
117+
}
118+
}
119+
MeasurementSelection::Subquery(_) => Unimplemented {
120+
msg: format!("select from subquery, stmt:{select_stmt}"),
121+
}
122+
.fail(),
123+
}
124+
}
125+
126+
#[cfg(test)]
127+
mod test {
128+
use influxql_parser::{select::SelectStatement, statement::Statement};
129+
130+
use super::check_select_statement;
131+
132+
#[test]
133+
fn test_check_select_from() {
134+
let from_measurement = parse_select("select * from a;");
135+
let from_multi_measurements = parse_select("select * from a,b;");
136+
let from_regex = parse_select(r#"select * from /d/"#);
137+
let from_subquery = parse_select("select * from (select a,b from c)");
138+
139+
let res = check_select_statement(&from_measurement);
140+
assert!(res.is_ok());
141+
142+
let res = check_select_statement(&from_multi_measurements);
143+
let err = res.err().unwrap();
144+
assert!(err
145+
.to_string()
146+
.contains("select from multiple measurements"));
147+
148+
let res = check_select_statement(&from_regex);
149+
let err = res.err().unwrap();
150+
assert!(err.to_string().contains("select from regex"));
151+
152+
let res = check_select_statement(&from_subquery);
153+
let err = res.err().unwrap();
154+
assert!(err.to_string().contains("select from subquery"));
155+
}
156+
157+
fn parse_select(influxql: &str) -> SelectStatement {
158+
let stmt = influxql_parser::parse_statements(influxql).unwrap()[0].clone();
159+
if let Statement::Select(select_stmt) = stmt {
160+
*select_stmt
161+
} else {
162+
unreachable!()
163+
}
164+
}
165+
}

0 commit comments

Comments
 (0)