Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reuse logical planner in influxdb_iox #730

Merged
merged 6 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions integration_tests/cases/env/local/influxql/basic.result
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ CREATE TABLE `h2o_feet` (
`time` timestamp NOT NULL,
`level_description` string TAG,
`location` string TAG,
`water_level` double NOT NULL,
`water_level` double,
timestamp KEY (time)) ENGINE = Analytic WITH (
enable_ttl = 'false'
);
Expand All @@ -27,12 +27,12 @@ affected_rows: 6
-- SQLNESS ARG protocol=influxql
SELECT * FROM "h2o_feet";

{"rows":[{"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":8.12},{"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":8.005},{"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":7.887},{"level_description":"below 3 feet","location":"santa_monica","water_level":2.064},{"level_description":"below 3 feet","location":"santa_monica","water_level":2.116},{"level_description":"below 3 feet","location":"santa_monica","water_level":2.028}]}
{"rows":[{"ceresdb::measurement":"h2o_feet","time":1439827200000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.064},{"ceresdb::measurement":"h2o_feet","time":1439827200000,"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":8.12},{"ceresdb::measurement":"h2o_feet","time":1439827560000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.116},{"ceresdb::measurement":"h2o_feet","time":1439827560000,"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":8.005},{"ceresdb::measurement":"h2o_feet","time":1439827620000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.028},{"ceresdb::measurement":"h2o_feet","time":1439827620000,"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":7.887}]}

-- SQLNESS ARG protocol=influxql
SELECT "level_description", location, water_level FROM "h2o_feet" where location = 'santa_monica';

{"rows":[{"level_description":"below 3 feet","location":"santa_monica","water_level":2.064},{"level_description":"below 3 feet","location":"santa_monica","water_level":2.116},{"level_description":"below 3 feet","location":"santa_monica","water_level":2.028}]}
{"rows":[{"ceresdb::measurement":"h2o_feet","time":1439827200000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.064},{"ceresdb::measurement":"h2o_feet","time":1439827560000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.116},{"ceresdb::measurement":"h2o_feet","time":1439827620000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.028}]}

DROP TABLE IF EXISTS `h2o_feet`;

Expand Down
2 changes: 1 addition & 1 deletion integration_tests/cases/env/local/influxql/basic.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ CREATE TABLE `h2o_feet` (
`time` timestamp NOT NULL,
`level_description` string TAG,
`location` string TAG,
`water_level` double NOT NULL,
`water_level` double,
timestamp KEY (time)) ENGINE = Analytic WITH (
enable_ttl = 'false'
);
Expand Down
3 changes: 2 additions & 1 deletion sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ datafusion-expr = { workspace = true }
datafusion-proto = { workspace = true }
df_operator = { workspace = true }
hashbrown = { version = "0.12", features = ["raw"] }
influxdb_influxql_parser = { git = "https://github.com/Rachelint/influxdb_iox.git", branch = "influxql-parser" }
influxql-logical-planner = { git = "https://github.com/Rachelint/influxql-logical-planner.git", rev = "83d8854004e569b32ddbc66a31f74ea5dee99c7b" }
influxql-parser = { git = "https://github.com/Rachelint/influxql-parser.git", rev = "6fd5d946843682c3559e1c172cf23a14d61495b9" }
itertools = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions sql/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{sync::Arc, time::Instant};
use ceresdbproto::{prometheus::Expr as PromExpr, storage::WriteTableRequest};
use cluster::config::SchemaConfig;
use common_types::request_id::RequestId;
use influxdb_influxql_parser::statement::Statement as InfluxqlStatement;
use influxql_parser::statement::Statement as InfluxqlStatement;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::table;

Expand Down Expand Up @@ -43,7 +43,7 @@ pub enum Error {
#[snafu(display("invalid influxql, influxql:{}, err:{}", influxql, parse_err))]
InvalidInfluxql {
influxql: String,
parse_err: influxdb_influxql_parser::common::ParseError,
parse_err: influxql_parser::common::ParseError,
},
}

Expand Down Expand Up @@ -105,7 +105,7 @@ impl<P> Frontend<P> {
_ctx: &mut Context,
influxql: &str,
) -> Result<Vec<InfluxqlStatement>> {
match influxdb_influxql_parser::parse_statements(influxql) {
match influxql_parser::parse_statements(influxql) {
Ok(stmts) => Ok(stmts),
Err(e) => Err(Error::InvalidInfluxql {
influxql: influxql.to_string(),
Expand Down
30 changes: 7 additions & 23 deletions sql/src/influxql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
//! Influxql processing

pub mod planner;
pub(crate) mod select;
pub(crate) mod util;
pub(crate) mod provider;

pub mod error {
use common_util::error::GenericError;
Expand All @@ -14,32 +13,17 @@ pub mod error {
#[snafu(visibility = "pub")]
pub enum Error {
#[snafu(display(
"Unimplemented influxql statement, msg: {}.\nBacktrace:{}",
"Failed to build influxdb schema, msg:{}.\nBacktrace:{}",
msg,
backtrace
))]
Unimplemented { msg: String, backtrace: Backtrace },
BuildSchema { msg: String, backtrace: Backtrace },

#[snafu(display(
"Failed to rewrite influxql from statement with cause, msg:{}, source:{}",
msg,
source
))]
RewriteWithCause { msg: String, source: GenericError },

#[snafu(display(
"Failed to rewrite influxql from statement no cause, msg:{}.\nBacktrace:{}",
msg,
backtrace
))]
RewriteNoCause { msg: String, backtrace: Backtrace },
#[snafu(display("Failed to build influxql plan, msg:{}, err:{}", msg, source))]
BuildPlan { msg: String, source: GenericError },

#[snafu(display(
"Failed to convert to sql statement, msg:{}.\nBacktrace:{}",
msg,
backtrace
))]
Convert { msg: String, backtrace: Backtrace },
#[snafu(display("Unimplemented influxql statement, statement:{}", stmt))]
Unimplemented { stmt: String },
}
define_result!(Error);
}
104 changes: 50 additions & 54 deletions sql/src/influxql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,28 @@

//! Influxql planner

use common_util::error::{BoxError, GenericResult};
use influxdb_influxql_parser::{
select::SelectStatement, statement::Statement as InfluxqlStatement,
};
use std::sync::Arc;

use common_util::error::BoxError;
use influxql_logical_planner::planner::InfluxQLToLogicalPlan;
use influxql_parser::statement::Statement as InfluxqlStatement;
use snafu::ResultExt;
use sqlparser::ast::Statement as SqlStatement;
use table_engine::table::TableRef;

use crate::{
influxql::select::{converter::Converter, rewriter::Rewriter},
plan::Plan,
planner::{BuildInfluxqlPlan, Result},
provider::MetaProvider,
influxql::{error::*, provider::InfluxSchemaProviderImpl},
plan::{Plan, QueryPlan},
provider::{ContextProviderAdapter, MetaProvider},
};

#[allow(dead_code)]
const CERESDB_MEASUREMENT_COLUMN_NAME: &str = "ceresdb::measurement";

pub(crate) struct Planner<'a, P: MetaProvider> {
sql_planner: crate::planner::PlannerDelegate<'a, P>,
context_provider: ContextProviderAdapter<'a, P>,
}

impl<'a, P: MetaProvider> Planner<'a, P> {
pub fn new(sql_planner: crate::planner::PlannerDelegate<'a, P>) -> Self {
Self { sql_planner }
pub fn new(context_provider: ContextProviderAdapter<'a, P>) -> Self {
Self { context_provider }
}

/// Build sql logical plan from [InfluxqlStatement].
Expand All @@ -33,51 +32,48 @@ impl<'a, P: MetaProvider> Planner<'a, P> {
/// the [InfluxqlStatement] will be converted to [SqlStatement] first,
/// and build plan then.
pub fn statement_to_plan(self, stmt: InfluxqlStatement) -> Result<Plan> {
match stmt {
InfluxqlStatement::Select(stmt) => self.select_to_plan(*stmt),
InfluxqlStatement::CreateDatabase(_) => todo!(),
InfluxqlStatement::ShowDatabases(_) => todo!(),
InfluxqlStatement::ShowRetentionPolicies(_) => todo!(),
InfluxqlStatement::ShowTagKeys(_) => todo!(),
InfluxqlStatement::ShowTagValues(_) => todo!(),
InfluxqlStatement::ShowFieldKeys(_) => todo!(),
InfluxqlStatement::ShowMeasurements(_) => todo!(),
InfluxqlStatement::Delete(_) => todo!(),
InfluxqlStatement::DropMeasurement(_) => todo!(),
InfluxqlStatement::Explain(_) => todo!(),
match &stmt {
InfluxqlStatement::Select(_) => self.select_to_plan(stmt),
InfluxqlStatement::CreateDatabase(_)
| InfluxqlStatement::ShowDatabases(_)
| InfluxqlStatement::ShowRetentionPolicies(_)
| InfluxqlStatement::ShowTagKeys(_)
| InfluxqlStatement::ShowTagValues(_)
| InfluxqlStatement::ShowFieldKeys(_)
| InfluxqlStatement::ShowMeasurements(_)
| InfluxqlStatement::Delete(_)
| InfluxqlStatement::DropMeasurement(_)
| InfluxqlStatement::Explain(_) => Unimplemented {
stmt: stmt.to_string(),
}
.fail(),
}
}

pub fn select_to_plan(self, stmt: SelectStatement) -> Result<Plan> {
let mut stmt = stmt;
let provider_impl = MeasurementProviderImpl(&self.sql_planner);
let rewriter = Rewriter::new(&provider_impl);
rewriter
.rewrite(&mut stmt)
.box_err()
.context(BuildInfluxqlPlan)?;

let sql_stmt = SqlStatement::Query(Box::new(
Converter::convert(stmt)
.box_err()
.context(BuildInfluxqlPlan)?,
));
pub fn select_to_plan(self, stmt: InfluxqlStatement) -> Result<Plan> {
let influx_schema_provider = InfluxSchemaProviderImpl {
context_provider: &self.context_provider,
};
let influxql_logical_planner = InfluxQLToLogicalPlan::new(
&influx_schema_provider,
CERESDB_MEASUREMENT_COLUMN_NAME.to_string(),
);

self.sql_planner
.sql_statement_to_plan(sql_stmt)
let df_plan = influxql_logical_planner
.statement_to_plan(stmt)
.box_err()
.context(BuildInfluxqlPlan)
}
}

pub trait MeasurementProvider {
fn measurement(&self, measurement_name: &str) -> GenericResult<Option<TableRef>>;
}

struct MeasurementProviderImpl<'a, P: MetaProvider>(&'a crate::planner::PlannerDelegate<'a, P>);
.context(BuildPlan {
msg: "build df plan for influxql select statement",
})?;
let tables = Arc::new(
self.context_provider
.try_into_container()
.box_err()
.context(BuildPlan {
msg: "get tables from df plan of select",
})?,
);

impl<'a, P: MetaProvider> MeasurementProvider for MeasurementProviderImpl<'a, P> {
fn measurement(&self, measurement_name: &str) -> GenericResult<Option<TableRef>> {
self.0.find_table(measurement_name).box_err()
Ok(Plan::Query(QueryPlan { df_plan, tables }))
}
}
Loading