diff --git a/Cargo.lock b/Cargo.lock
index 7b272f00fe..2e963e3eb6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2783,26 +2783,43 @@ dependencies = [
]
[[package]]
-name = "influxdb_influxql_parser"
+name = "influxdb_line_protocol"
+version = "0.1.0"
+source = "git+https://github.com/jiacai2050/influxdb_line_protocol#14e00a3dbc99a5edff226b92e3496314b086acf4"
+dependencies = [
+ "bytes 1.4.0",
+ "nom 7.1.1",
+ "smallvec",
+ "snafu 0.7.1",
+]
+
+[[package]]
+name = "influxql-logical-planner"
version = "0.1.0"
-source = "git+https://github.com/Rachelint/influxdb_iox.git?branch=influxql-parser#77e24e992a90dd08a6d71366de53fac721e491fb"
+source = "git+https://github.com/Rachelint/influxql-logical-planner.git?rev=83d8854004e569b32ddbc66a31f74ea5dee99c7b#83d8854004e569b32ddbc66a31f74ea5dee99c7b"
dependencies = [
+ "arrow 32.0.0",
"chrono",
"chrono-tz",
- "nom 7.1.1",
- "num-traits",
+ "datafusion",
+ "influxql-parser",
+ "itertools",
+ "lazy_static",
"once_cell",
+ "regex",
+ "regex-syntax",
]
[[package]]
-name = "influxdb_line_protocol"
+name = "influxql-parser"
version = "0.1.0"
-source = "git+https://github.com/jiacai2050/influxdb_line_protocol#14e00a3dbc99a5edff226b92e3496314b086acf4"
+source = "git+https://github.com/Rachelint/influxql-parser.git?rev=6fd5d946843682c3559e1c172cf23a14d61495b9#6fd5d946843682c3559e1c172cf23a14d61495b9"
dependencies = [
- "bytes 1.4.0",
+ "chrono",
+ "chrono-tz",
"nom 7.1.1",
- "smallvec",
- "snafu 0.7.1",
+ "num-traits",
+ "once_cell",
]
[[package]]
@@ -5665,7 +5682,8 @@ dependencies = [
"datafusion-proto",
"df_operator",
"hashbrown 0.12.3",
- "influxdb_influxql_parser",
+ "influxql-logical-planner",
+ "influxql-parser",
"itertools",
"lazy_static",
"log",
diff --git a/integration_tests/cases/env/local/influxql/basic.result b/integration_tests/cases/env/local/influxql/basic.result
index cde59f96f3..fb1abe4ae7 100644
--- a/integration_tests/cases/env/local/influxql/basic.result
+++ b/integration_tests/cases/env/local/influxql/basic.result
@@ -6,7 +6,7 @@ CREATE TABLE `h2o_feet` (
`time` timestamp NOT NULL,
`level_description` string TAG,
`location` string TAG,
- `water_level` double NOT NULL,
+ `water_level` double,
timestamp KEY (time)) ENGINE = Analytic WITH (
enable_ttl = 'false'
);
@@ -27,12 +27,12 @@ affected_rows: 6
-- SQLNESS ARG protocol=influxql
SELECT * FROM "h2o_feet";
-{"rows":[{"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":8.12},{"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":8.005},{"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":7.887},{"level_description":"below 3 feet","location":"santa_monica","water_level":2.064},{"level_description":"below 3 feet","location":"santa_monica","water_level":2.116},{"level_description":"below 3 feet","location":"santa_monica","water_level":2.028}]}
+{"rows":[{"ceresdb::measurement":"h2o_feet","time":1439827200000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.064},{"ceresdb::measurement":"h2o_feet","time":1439827200000,"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":8.12},{"ceresdb::measurement":"h2o_feet","time":1439827560000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.116},{"ceresdb::measurement":"h2o_feet","time":1439827560000,"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":8.005},{"ceresdb::measurement":"h2o_feet","time":1439827620000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.028},{"ceresdb::measurement":"h2o_feet","time":1439827620000,"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":7.887}]}
-- SQLNESS ARG protocol=influxql
SELECT "level_description", location, water_level FROM "h2o_feet" where location = 'santa_monica';
-{"rows":[{"level_description":"below 3 feet","location":"santa_monica","water_level":2.064},{"level_description":"below 3 feet","location":"santa_monica","water_level":2.116},{"level_description":"below 3 feet","location":"santa_monica","water_level":2.028}]}
+{"rows":[{"ceresdb::measurement":"h2o_feet","time":1439827200000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.064},{"ceresdb::measurement":"h2o_feet","time":1439827560000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.116},{"ceresdb::measurement":"h2o_feet","time":1439827620000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.028}]}
DROP TABLE IF EXISTS `h2o_feet`;
diff --git a/integration_tests/cases/env/local/influxql/basic.sql b/integration_tests/cases/env/local/influxql/basic.sql
index bb197cc41c..c063f3b3fb 100644
--- a/integration_tests/cases/env/local/influxql/basic.sql
+++ b/integration_tests/cases/env/local/influxql/basic.sql
@@ -4,7 +4,7 @@ CREATE TABLE `h2o_feet` (
`time` timestamp NOT NULL,
`level_description` string TAG,
`location` string TAG,
- `water_level` double NOT NULL,
+ `water_level` double,
timestamp KEY (time)) ENGINE = Analytic WITH (
enable_ttl = 'false'
);
diff --git a/sql/Cargo.toml b/sql/Cargo.toml
index 126309dfa5..16605b1063 100644
--- a/sql/Cargo.toml
+++ b/sql/Cargo.toml
@@ -27,7 +27,8 @@ datafusion-expr = { workspace = true }
datafusion-proto = { workspace = true }
df_operator = { workspace = true }
hashbrown = { version = "0.12", features = ["raw"] }
-influxdb_influxql_parser = { git = "https://github.com/Rachelint/influxdb_iox.git", branch = "influxql-parser" }
+influxql-logical-planner = { git = "https://github.com/Rachelint/influxql-logical-planner.git", rev = "83d8854004e569b32ddbc66a31f74ea5dee99c7b" }
+influxql-parser = { git = "https://github.com/Rachelint/influxql-parser.git", rev = "6fd5d946843682c3559e1c172cf23a14d61495b9" }
itertools = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
diff --git a/sql/src/frontend.rs b/sql/src/frontend.rs
index 5229ea8225..1000e441dd 100644
--- a/sql/src/frontend.rs
+++ b/sql/src/frontend.rs
@@ -7,7 +7,7 @@ use std::{sync::Arc, time::Instant};
use ceresdbproto::{prometheus::Expr as PromExpr, storage::WriteTableRequest};
use cluster::config::SchemaConfig;
use common_types::request_id::RequestId;
-use influxdb_influxql_parser::statement::Statement as InfluxqlStatement;
+use influxql_parser::statement::Statement as InfluxqlStatement;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::table;
@@ -43,7 +43,7 @@ pub enum Error {
#[snafu(display("invalid influxql, influxql:{}, err:{}", influxql, parse_err))]
InvalidInfluxql {
influxql: String,
- parse_err: influxdb_influxql_parser::common::ParseError,
+ parse_err: influxql_parser::common::ParseError,
},
}
@@ -105,7 +105,7 @@ impl
Frontend
{
_ctx: &mut Context,
influxql: &str,
) -> Result> {
- match influxdb_influxql_parser::parse_statements(influxql) {
+ match influxql_parser::parse_statements(influxql) {
Ok(stmts) => Ok(stmts),
Err(e) => Err(Error::InvalidInfluxql {
influxql: influxql.to_string(),
diff --git a/sql/src/influxql/mod.rs b/sql/src/influxql/mod.rs
index 033a5dd5e5..0a4d202972 100644
--- a/sql/src/influxql/mod.rs
+++ b/sql/src/influxql/mod.rs
@@ -3,8 +3,7 @@
//! Influxql processing
pub mod planner;
-pub(crate) mod select;
-pub(crate) mod util;
+pub(crate) mod provider;
pub mod error {
use common_util::error::GenericError;
@@ -14,32 +13,17 @@ pub mod error {
#[snafu(visibility = "pub")]
pub enum Error {
#[snafu(display(
- "Unimplemented influxql statement, msg: {}.\nBacktrace:{}",
+ "Failed to build influxdb schema, msg:{}.\nBacktrace:{}",
msg,
backtrace
))]
- Unimplemented { msg: String, backtrace: Backtrace },
+ BuildSchema { msg: String, backtrace: Backtrace },
- #[snafu(display(
- "Failed to rewrite influxql from statement with cause, msg:{}, source:{}",
- msg,
- source
- ))]
- RewriteWithCause { msg: String, source: GenericError },
-
- #[snafu(display(
- "Failed to rewrite influxql from statement no cause, msg:{}.\nBacktrace:{}",
- msg,
- backtrace
- ))]
- RewriteNoCause { msg: String, backtrace: Backtrace },
+ #[snafu(display("Failed to build influxql plan, msg:{}, err:{}", msg, source))]
+ BuildPlan { msg: String, source: GenericError },
- #[snafu(display(
- "Failed to convert to sql statement, msg:{}.\nBacktrace:{}",
- msg,
- backtrace
- ))]
- Convert { msg: String, backtrace: Backtrace },
+ #[snafu(display("Unimplemented influxql statement, statement:{}", stmt))]
+ Unimplemented { stmt: String },
}
define_result!(Error);
}
diff --git a/sql/src/influxql/planner.rs b/sql/src/influxql/planner.rs
index 00b09736fb..bdefa32cbc 100644
--- a/sql/src/influxql/planner.rs
+++ b/sql/src/influxql/planner.rs
@@ -2,29 +2,28 @@
//! Influxql planner
-use common_util::error::{BoxError, GenericResult};
-use influxdb_influxql_parser::{
- select::SelectStatement, statement::Statement as InfluxqlStatement,
-};
+use std::sync::Arc;
+
+use common_util::error::BoxError;
+use influxql_logical_planner::planner::InfluxQLToLogicalPlan;
+use influxql_parser::statement::Statement as InfluxqlStatement;
use snafu::ResultExt;
-use sqlparser::ast::Statement as SqlStatement;
-use table_engine::table::TableRef;
use crate::{
- influxql::select::{converter::Converter, rewriter::Rewriter},
- plan::Plan,
- planner::{BuildInfluxqlPlan, Result},
- provider::MetaProvider,
+ influxql::{error::*, provider::InfluxSchemaProviderImpl},
+ plan::{Plan, QueryPlan},
+ provider::{ContextProviderAdapter, MetaProvider},
};
-#[allow(dead_code)]
+const CERESDB_MEASUREMENT_COLUMN_NAME: &str = "ceresdb::measurement";
+
pub(crate) struct Planner<'a, P: MetaProvider> {
- sql_planner: crate::planner::PlannerDelegate<'a, P>,
+ context_provider: ContextProviderAdapter<'a, P>,
}
impl<'a, P: MetaProvider> Planner<'a, P> {
- pub fn new(sql_planner: crate::planner::PlannerDelegate<'a, P>) -> Self {
- Self { sql_planner }
+ pub fn new(context_provider: ContextProviderAdapter<'a, P>) -> Self {
+ Self { context_provider }
}
/// Build sql logical plan from [InfluxqlStatement].
@@ -33,51 +32,48 @@ impl<'a, P: MetaProvider> Planner<'a, P> {
/// the [InfluxqlStatement] will be converted to [SqlStatement] first,
/// and build plan then.
pub fn statement_to_plan(self, stmt: InfluxqlStatement) -> Result {
- match stmt {
- InfluxqlStatement::Select(stmt) => self.select_to_plan(*stmt),
- InfluxqlStatement::CreateDatabase(_) => todo!(),
- InfluxqlStatement::ShowDatabases(_) => todo!(),
- InfluxqlStatement::ShowRetentionPolicies(_) => todo!(),
- InfluxqlStatement::ShowTagKeys(_) => todo!(),
- InfluxqlStatement::ShowTagValues(_) => todo!(),
- InfluxqlStatement::ShowFieldKeys(_) => todo!(),
- InfluxqlStatement::ShowMeasurements(_) => todo!(),
- InfluxqlStatement::Delete(_) => todo!(),
- InfluxqlStatement::DropMeasurement(_) => todo!(),
- InfluxqlStatement::Explain(_) => todo!(),
+ match &stmt {
+ InfluxqlStatement::Select(_) => self.select_to_plan(stmt),
+ InfluxqlStatement::CreateDatabase(_)
+ | InfluxqlStatement::ShowDatabases(_)
+ | InfluxqlStatement::ShowRetentionPolicies(_)
+ | InfluxqlStatement::ShowTagKeys(_)
+ | InfluxqlStatement::ShowTagValues(_)
+ | InfluxqlStatement::ShowFieldKeys(_)
+ | InfluxqlStatement::ShowMeasurements(_)
+ | InfluxqlStatement::Delete(_)
+ | InfluxqlStatement::DropMeasurement(_)
+ | InfluxqlStatement::Explain(_) => Unimplemented {
+ stmt: stmt.to_string(),
+ }
+ .fail(),
}
}
- pub fn select_to_plan(self, stmt: SelectStatement) -> Result {
- let mut stmt = stmt;
- let provider_impl = MeasurementProviderImpl(&self.sql_planner);
- let rewriter = Rewriter::new(&provider_impl);
- rewriter
- .rewrite(&mut stmt)
- .box_err()
- .context(BuildInfluxqlPlan)?;
-
- let sql_stmt = SqlStatement::Query(Box::new(
- Converter::convert(stmt)
- .box_err()
- .context(BuildInfluxqlPlan)?,
- ));
+ pub fn select_to_plan(self, stmt: InfluxqlStatement) -> Result {
+ let influx_schema_provider = InfluxSchemaProviderImpl {
+ context_provider: &self.context_provider,
+ };
+ let influxql_logical_planner = InfluxQLToLogicalPlan::new(
+ &influx_schema_provider,
+ CERESDB_MEASUREMENT_COLUMN_NAME.to_string(),
+ );
- self.sql_planner
- .sql_statement_to_plan(sql_stmt)
+ let df_plan = influxql_logical_planner
+ .statement_to_plan(stmt)
.box_err()
- .context(BuildInfluxqlPlan)
- }
-}
-
-pub trait MeasurementProvider {
- fn measurement(&self, measurement_name: &str) -> GenericResult