Skip to content

Commit 07df586

Browse files
authored
feat: reuse logical planner in influxdb_iox (apache#730)
* reust logical planner in influxdb_iox. * fix clippy. * add tests. * fix integration test. * address CR.
1 parent c7afb6c commit 07df586

File tree

13 files changed

+592
-1136
lines changed

13 files changed

+592
-1136
lines changed

Cargo.lock

+28-10
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

integration_tests/cases/env/local/influxql/basic.result

+3-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ CREATE TABLE `h2o_feet` (
66
`time` timestamp NOT NULL,
77
`level_description` string TAG,
88
`location` string TAG,
9-
`water_level` double NOT NULL,
9+
`water_level` double,
1010
timestamp KEY (time)) ENGINE = Analytic WITH (
1111
enable_ttl = 'false'
1212
);
@@ -27,12 +27,12 @@ affected_rows: 6
2727
-- SQLNESS ARG protocol=influxql
2828
SELECT * FROM "h2o_feet";
2929

30-
{"rows":[{"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":8.12},{"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":8.005},{"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":7.887},{"level_description":"below 3 feet","location":"santa_monica","water_level":2.064},{"level_description":"below 3 feet","location":"santa_monica","water_level":2.116},{"level_description":"below 3 feet","location":"santa_monica","water_level":2.028}]}
30+
{"rows":[{"ceresdb::measurement":"h2o_feet","time":1439827200000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.064},{"ceresdb::measurement":"h2o_feet","time":1439827200000,"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":8.12},{"ceresdb::measurement":"h2o_feet","time":1439827560000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.116},{"ceresdb::measurement":"h2o_feet","time":1439827560000,"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":8.005},{"ceresdb::measurement":"h2o_feet","time":1439827620000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.028},{"ceresdb::measurement":"h2o_feet","time":1439827620000,"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":7.887}]}
3131

3232
-- SQLNESS ARG protocol=influxql
3333
SELECT "level_description", location, water_level FROM "h2o_feet" where location = 'santa_monica';
3434

35-
{"rows":[{"level_description":"below 3 feet","location":"santa_monica","water_level":2.064},{"level_description":"below 3 feet","location":"santa_monica","water_level":2.116},{"level_description":"below 3 feet","location":"santa_monica","water_level":2.028}]}
35+
{"rows":[{"ceresdb::measurement":"h2o_feet","time":1439827200000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.064},{"ceresdb::measurement":"h2o_feet","time":1439827560000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.116},{"ceresdb::measurement":"h2o_feet","time":1439827620000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.028}]}
3636

3737
DROP TABLE IF EXISTS `h2o_feet`;
3838

integration_tests/cases/env/local/influxql/basic.sql

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ CREATE TABLE `h2o_feet` (
44
`time` timestamp NOT NULL,
55
`level_description` string TAG,
66
`location` string TAG,
7-
`water_level` double NOT NULL,
7+
`water_level` double,
88
timestamp KEY (time)) ENGINE = Analytic WITH (
99
enable_ttl = 'false'
1010
);

sql/src/frontend.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{sync::Arc, time::Instant};
77
use ceresdbproto::{prometheus::Expr as PromExpr, storage::WriteTableRequest};
88
use cluster::config::SchemaConfig;
99
use common_types::request_id::RequestId;
10-
use influxdb_influxql_parser::statement::Statement as InfluxqlStatement;
10+
use influxql_parser::statement::Statement as InfluxqlStatement;
1111
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
1212
use table_engine::table;
1313

@@ -43,7 +43,7 @@ pub enum Error {
4343
#[snafu(display("invalid influxql, influxql:{}, err:{}", influxql, parse_err))]
4444
InvalidInfluxql {
4545
influxql: String,
46-
parse_err: influxdb_influxql_parser::common::ParseError,
46+
parse_err: influxql_parser::common::ParseError,
4747
},
4848
}
4949

@@ -105,7 +105,7 @@ impl<P> Frontend<P> {
105105
_ctx: &mut Context,
106106
influxql: &str,
107107
) -> Result<Vec<InfluxqlStatement>> {
108-
match influxdb_influxql_parser::parse_statements(influxql) {
108+
match influxql_parser::parse_statements(influxql) {
109109
Ok(stmts) => Ok(stmts),
110110
Err(e) => Err(Error::InvalidInfluxql {
111111
influxql: influxql.to_string(),

sql/src/influxql/mod.rs

+7-23
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@
33
//! Influxql processing
44
55
pub mod planner;
6-
pub(crate) mod select;
7-
pub(crate) mod util;
6+
pub(crate) mod provider;
87

98
pub mod error {
109
use common_util::error::GenericError;
@@ -14,32 +13,17 @@ pub mod error {
1413
#[snafu(visibility = "pub")]
1514
pub enum Error {
1615
#[snafu(display(
17-
"Unimplemented influxql statement, msg: {}.\nBacktrace:{}",
16+
"Failed to build influxdb schema, msg:{}.\nBacktrace:{}",
1817
msg,
1918
backtrace
2019
))]
21-
Unimplemented { msg: String, backtrace: Backtrace },
20+
BuildSchema { msg: String, backtrace: Backtrace },
2221

23-
#[snafu(display(
24-
"Failed to rewrite influxql from statement with cause, msg:{}, source:{}",
25-
msg,
26-
source
27-
))]
28-
RewriteWithCause { msg: String, source: GenericError },
29-
30-
#[snafu(display(
31-
"Failed to rewrite influxql from statement no cause, msg:{}.\nBacktrace:{}",
32-
msg,
33-
backtrace
34-
))]
35-
RewriteNoCause { msg: String, backtrace: Backtrace },
22+
#[snafu(display("Failed to build influxql plan, msg:{}, err:{}", msg, source))]
23+
BuildPlan { msg: String, source: GenericError },
3624

37-
#[snafu(display(
38-
"Failed to convert to sql statement, msg:{}.\nBacktrace:{}",
39-
msg,
40-
backtrace
41-
))]
42-
Convert { msg: String, backtrace: Backtrace },
25+
#[snafu(display("Unimplemented influxql statement, statement:{}", stmt))]
26+
Unimplemented { stmt: String },
4327
}
4428
define_result!(Error);
4529
}

sql/src/influxql/planner.rs

+50-54
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,28 @@
22

33
//! Influxql planner
44
5-
use common_util::error::{BoxError, GenericResult};
6-
use influxdb_influxql_parser::{
7-
select::SelectStatement, statement::Statement as InfluxqlStatement,
8-
};
5+
use std::sync::Arc;
6+
7+
use common_util::error::BoxError;
8+
use influxql_logical_planner::planner::InfluxQLToLogicalPlan;
9+
use influxql_parser::statement::Statement as InfluxqlStatement;
910
use snafu::ResultExt;
10-
use sqlparser::ast::Statement as SqlStatement;
11-
use table_engine::table::TableRef;
1211

1312
use crate::{
14-
influxql::select::{converter::Converter, rewriter::Rewriter},
15-
plan::Plan,
16-
planner::{BuildInfluxqlPlan, Result},
17-
provider::MetaProvider,
13+
influxql::{error::*, provider::InfluxSchemaProviderImpl},
14+
plan::{Plan, QueryPlan},
15+
provider::{ContextProviderAdapter, MetaProvider},
1816
};
1917

20-
#[allow(dead_code)]
18+
const CERESDB_MEASUREMENT_COLUMN_NAME: &str = "ceresdb::measurement";
19+
2120
pub(crate) struct Planner<'a, P: MetaProvider> {
22-
sql_planner: crate::planner::PlannerDelegate<'a, P>,
21+
context_provider: ContextProviderAdapter<'a, P>,
2322
}
2423

2524
impl<'a, P: MetaProvider> Planner<'a, P> {
26-
pub fn new(sql_planner: crate::planner::PlannerDelegate<'a, P>) -> Self {
27-
Self { sql_planner }
25+
pub fn new(context_provider: ContextProviderAdapter<'a, P>) -> Self {
26+
Self { context_provider }
2827
}
2928

3029
/// Build sql logical plan from [InfluxqlStatement].
@@ -33,51 +32,48 @@ impl<'a, P: MetaProvider> Planner<'a, P> {
3332
/// the [InfluxqlStatement] will be converted to [SqlStatement] first,
3433
/// and build plan then.
3534
pub fn statement_to_plan(self, stmt: InfluxqlStatement) -> Result<Plan> {
36-
match stmt {
37-
InfluxqlStatement::Select(stmt) => self.select_to_plan(*stmt),
38-
InfluxqlStatement::CreateDatabase(_) => todo!(),
39-
InfluxqlStatement::ShowDatabases(_) => todo!(),
40-
InfluxqlStatement::ShowRetentionPolicies(_) => todo!(),
41-
InfluxqlStatement::ShowTagKeys(_) => todo!(),
42-
InfluxqlStatement::ShowTagValues(_) => todo!(),
43-
InfluxqlStatement::ShowFieldKeys(_) => todo!(),
44-
InfluxqlStatement::ShowMeasurements(_) => todo!(),
45-
InfluxqlStatement::Delete(_) => todo!(),
46-
InfluxqlStatement::DropMeasurement(_) => todo!(),
47-
InfluxqlStatement::Explain(_) => todo!(),
35+
match &stmt {
36+
InfluxqlStatement::Select(_) => self.select_to_plan(stmt),
37+
InfluxqlStatement::CreateDatabase(_)
38+
| InfluxqlStatement::ShowDatabases(_)
39+
| InfluxqlStatement::ShowRetentionPolicies(_)
40+
| InfluxqlStatement::ShowTagKeys(_)
41+
| InfluxqlStatement::ShowTagValues(_)
42+
| InfluxqlStatement::ShowFieldKeys(_)
43+
| InfluxqlStatement::ShowMeasurements(_)
44+
| InfluxqlStatement::Delete(_)
45+
| InfluxqlStatement::DropMeasurement(_)
46+
| InfluxqlStatement::Explain(_) => Unimplemented {
47+
stmt: stmt.to_string(),
48+
}
49+
.fail(),
4850
}
4951
}
5052

51-
pub fn select_to_plan(self, stmt: SelectStatement) -> Result<Plan> {
52-
let mut stmt = stmt;
53-
let provider_impl = MeasurementProviderImpl(&self.sql_planner);
54-
let rewriter = Rewriter::new(&provider_impl);
55-
rewriter
56-
.rewrite(&mut stmt)
57-
.box_err()
58-
.context(BuildInfluxqlPlan)?;
59-
60-
let sql_stmt = SqlStatement::Query(Box::new(
61-
Converter::convert(stmt)
62-
.box_err()
63-
.context(BuildInfluxqlPlan)?,
64-
));
53+
pub fn select_to_plan(self, stmt: InfluxqlStatement) -> Result<Plan> {
54+
let influx_schema_provider = InfluxSchemaProviderImpl {
55+
context_provider: &self.context_provider,
56+
};
57+
let influxql_logical_planner = InfluxQLToLogicalPlan::new(
58+
&influx_schema_provider,
59+
CERESDB_MEASUREMENT_COLUMN_NAME.to_string(),
60+
);
6561

66-
self.sql_planner
67-
.sql_statement_to_plan(sql_stmt)
62+
let df_plan = influxql_logical_planner
63+
.statement_to_plan(stmt)
6864
.box_err()
69-
.context(BuildInfluxqlPlan)
70-
}
71-
}
72-
73-
pub trait MeasurementProvider {
74-
fn measurement(&self, measurement_name: &str) -> GenericResult<Option<TableRef>>;
75-
}
76-
77-
struct MeasurementProviderImpl<'a, P: MetaProvider>(&'a crate::planner::PlannerDelegate<'a, P>);
65+
.context(BuildPlan {
66+
msg: "build df plan for influxql select statement",
67+
})?;
68+
let tables = Arc::new(
69+
self.context_provider
70+
.try_into_container()
71+
.box_err()
72+
.context(BuildPlan {
73+
msg: "get tables from df plan of select",
74+
})?,
75+
);
7876

79-
impl<'a, P: MetaProvider> MeasurementProvider for MeasurementProviderImpl<'a, P> {
80-
fn measurement(&self, measurement_name: &str) -> GenericResult<Option<TableRef>> {
81-
self.0.find_table(measurement_name).box_err()
77+
Ok(Plan::Query(QueryPlan { df_plan, tables }))
8278
}
8379
}

0 commit comments

Comments
 (0)