diff --git a/Cargo.lock b/Cargo.lock index 7b272f00fe..2e963e3eb6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2783,26 +2783,43 @@ dependencies = [ ] [[package]] -name = "influxdb_influxql_parser" +name = "influxdb_line_protocol" +version = "0.1.0" +source = "git+https://github.com/jiacai2050/influxdb_line_protocol#14e00a3dbc99a5edff226b92e3496314b086acf4" +dependencies = [ + "bytes 1.4.0", + "nom 7.1.1", + "smallvec", + "snafu 0.7.1", +] + +[[package]] +name = "influxql-logical-planner" version = "0.1.0" -source = "git+https://github.com/Rachelint/influxdb_iox.git?branch=influxql-parser#77e24e992a90dd08a6d71366de53fac721e491fb" +source = "git+https://github.com/Rachelint/influxql-logical-planner.git?rev=83d8854004e569b32ddbc66a31f74ea5dee99c7b#83d8854004e569b32ddbc66a31f74ea5dee99c7b" dependencies = [ + "arrow 32.0.0", "chrono", "chrono-tz", - "nom 7.1.1", - "num-traits", + "datafusion", + "influxql-parser", + "itertools", + "lazy_static", "once_cell", + "regex", + "regex-syntax", ] [[package]] -name = "influxdb_line_protocol" +name = "influxql-parser" version = "0.1.0" -source = "git+https://github.com/jiacai2050/influxdb_line_protocol#14e00a3dbc99a5edff226b92e3496314b086acf4" +source = "git+https://github.com/Rachelint/influxql-parser.git?rev=6fd5d946843682c3559e1c172cf23a14d61495b9#6fd5d946843682c3559e1c172cf23a14d61495b9" dependencies = [ - "bytes 1.4.0", + "chrono", + "chrono-tz", "nom 7.1.1", - "smallvec", - "snafu 0.7.1", + "num-traits", + "once_cell", ] [[package]] @@ -5665,7 +5682,8 @@ dependencies = [ "datafusion-proto", "df_operator", "hashbrown 0.12.3", - "influxdb_influxql_parser", + "influxql-logical-planner", + "influxql-parser", "itertools", "lazy_static", "log", diff --git a/integration_tests/cases/env/local/influxql/basic.result b/integration_tests/cases/env/local/influxql/basic.result index cde59f96f3..fb1abe4ae7 100644 --- a/integration_tests/cases/env/local/influxql/basic.result +++ b/integration_tests/cases/env/local/influxql/basic.result @@ -6,7 +6,7 @@ CREATE TABLE `h2o_feet` ( `time` timestamp NOT NULL, `level_description` string TAG, `location` string TAG, - `water_level` double NOT NULL, + `water_level` double, timestamp KEY (time)) ENGINE = Analytic WITH ( enable_ttl = 'false' ); @@ -27,12 +27,12 @@ affected_rows: 6 -- SQLNESS ARG protocol=influxql SELECT * FROM "h2o_feet"; -{"rows":[{"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":8.12},{"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":8.005},{"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":7.887},{"level_description":"below 3 feet","location":"santa_monica","water_level":2.064},{"level_description":"below 3 feet","location":"santa_monica","water_level":2.116},{"level_description":"below 3 feet","location":"santa_monica","water_level":2.028}]} +{"rows":[{"ceresdb::measurement":"h2o_feet","time":1439827200000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.064},{"ceresdb::measurement":"h2o_feet","time":1439827200000,"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":8.12},{"ceresdb::measurement":"h2o_feet","time":1439827560000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.116},{"ceresdb::measurement":"h2o_feet","time":1439827560000,"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":8.005},{"ceresdb::measurement":"h2o_feet","time":1439827620000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.028},{"ceresdb::measurement":"h2o_feet","time":1439827620000,"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":7.887}]} -- SQLNESS ARG protocol=influxql SELECT "level_description", location, water_level FROM "h2o_feet" where location = 'santa_monica'; -{"rows":[{"level_description":"below 3 feet","location":"santa_monica","water_level":2.064},{"level_description":"below 3 feet","location":"santa_monica","water_level":2.116},{"level_description":"below 3 feet","location":"santa_monica","water_level":2.028}]} +{"rows":[{"ceresdb::measurement":"h2o_feet","time":1439827200000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.064},{"ceresdb::measurement":"h2o_feet","time":1439827560000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.116},{"ceresdb::measurement":"h2o_feet","time":1439827620000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.028}]} DROP TABLE IF EXISTS `h2o_feet`; diff --git a/integration_tests/cases/env/local/influxql/basic.sql b/integration_tests/cases/env/local/influxql/basic.sql index bb197cc41c..c063f3b3fb 100644 --- a/integration_tests/cases/env/local/influxql/basic.sql +++ b/integration_tests/cases/env/local/influxql/basic.sql @@ -4,7 +4,7 @@ CREATE TABLE `h2o_feet` ( `time` timestamp NOT NULL, `level_description` string TAG, `location` string TAG, - `water_level` double NOT NULL, + `water_level` double, timestamp KEY (time)) ENGINE = Analytic WITH ( enable_ttl = 'false' ); diff --git a/sql/Cargo.toml b/sql/Cargo.toml index 126309dfa5..16605b1063 100644 --- a/sql/Cargo.toml +++ b/sql/Cargo.toml @@ -27,7 +27,8 @@ datafusion-expr = { workspace = true } datafusion-proto = { workspace = true } df_operator = { workspace = true } hashbrown = { version = "0.12", features = ["raw"] } -influxdb_influxql_parser = { git = "https://github.com/Rachelint/influxdb_iox.git", branch = "influxql-parser" } +influxql-logical-planner = { git = "https://github.com/Rachelint/influxql-logical-planner.git", rev = "83d8854004e569b32ddbc66a31f74ea5dee99c7b" } +influxql-parser = { git = "https://github.com/Rachelint/influxql-parser.git", rev = "6fd5d946843682c3559e1c172cf23a14d61495b9" } itertools = { workspace = true } lazy_static = { workspace = true } log = { workspace = true } diff --git a/sql/src/frontend.rs b/sql/src/frontend.rs index 5229ea8225..1000e441dd 100644 --- a/sql/src/frontend.rs +++ b/sql/src/frontend.rs @@ -7,7 +7,7 @@ use std::{sync::Arc, time::Instant}; use ceresdbproto::{prometheus::Expr as PromExpr, storage::WriteTableRequest}; use cluster::config::SchemaConfig; use common_types::request_id::RequestId; -use influxdb_influxql_parser::statement::Statement as InfluxqlStatement; +use influxql_parser::statement::Statement as InfluxqlStatement; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::table; @@ -43,7 +43,7 @@ pub enum Error { #[snafu(display("invalid influxql, influxql:{}, err:{}", influxql, parse_err))] InvalidInfluxql { influxql: String, - parse_err: influxdb_influxql_parser::common::ParseError, + parse_err: influxql_parser::common::ParseError, }, } @@ -105,7 +105,7 @@ impl

Frontend

{ _ctx: &mut Context, influxql: &str, ) -> Result> { - match influxdb_influxql_parser::parse_statements(influxql) { + match influxql_parser::parse_statements(influxql) { Ok(stmts) => Ok(stmts), Err(e) => Err(Error::InvalidInfluxql { influxql: influxql.to_string(), diff --git a/sql/src/influxql/mod.rs b/sql/src/influxql/mod.rs index 033a5dd5e5..0a4d202972 100644 --- a/sql/src/influxql/mod.rs +++ b/sql/src/influxql/mod.rs @@ -3,8 +3,7 @@ //! Influxql processing pub mod planner; -pub(crate) mod select; -pub(crate) mod util; +pub(crate) mod provider; pub mod error { use common_util::error::GenericError; @@ -14,32 +13,17 @@ pub mod error { #[snafu(visibility = "pub")] pub enum Error { #[snafu(display( - "Unimplemented influxql statement, msg: {}.\nBacktrace:{}", + "Failed to build influxdb schema, msg:{}.\nBacktrace:{}", msg, backtrace ))] - Unimplemented { msg: String, backtrace: Backtrace }, + BuildSchema { msg: String, backtrace: Backtrace }, - #[snafu(display( - "Failed to rewrite influxql from statement with cause, msg:{}, source:{}", - msg, - source - ))] - RewriteWithCause { msg: String, source: GenericError }, - - #[snafu(display( - "Failed to rewrite influxql from statement no cause, msg:{}.\nBacktrace:{}", - msg, - backtrace - ))] - RewriteNoCause { msg: String, backtrace: Backtrace }, + #[snafu(display("Failed to build influxql plan, msg:{}, err:{}", msg, source))] + BuildPlan { msg: String, source: GenericError }, - #[snafu(display( - "Failed to convert to sql statement, msg:{}.\nBacktrace:{}", - msg, - backtrace - ))] - Convert { msg: String, backtrace: Backtrace }, + #[snafu(display("Unimplemented influxql statement, statement:{}", stmt))] + Unimplemented { stmt: String }, } define_result!(Error); } diff --git a/sql/src/influxql/planner.rs b/sql/src/influxql/planner.rs index 00b09736fb..bdefa32cbc 100644 --- a/sql/src/influxql/planner.rs +++ b/sql/src/influxql/planner.rs @@ -2,29 +2,28 @@ //! Influxql planner -use common_util::error::{BoxError, GenericResult}; -use influxdb_influxql_parser::{ - select::SelectStatement, statement::Statement as InfluxqlStatement, -}; +use std::sync::Arc; + +use common_util::error::BoxError; +use influxql_logical_planner::planner::InfluxQLToLogicalPlan; +use influxql_parser::statement::Statement as InfluxqlStatement; use snafu::ResultExt; -use sqlparser::ast::Statement as SqlStatement; -use table_engine::table::TableRef; use crate::{ - influxql::select::{converter::Converter, rewriter::Rewriter}, - plan::Plan, - planner::{BuildInfluxqlPlan, Result}, - provider::MetaProvider, + influxql::{error::*, provider::InfluxSchemaProviderImpl}, + plan::{Plan, QueryPlan}, + provider::{ContextProviderAdapter, MetaProvider}, }; -#[allow(dead_code)] +const CERESDB_MEASUREMENT_COLUMN_NAME: &str = "ceresdb::measurement"; + pub(crate) struct Planner<'a, P: MetaProvider> { - sql_planner: crate::planner::PlannerDelegate<'a, P>, + context_provider: ContextProviderAdapter<'a, P>, } impl<'a, P: MetaProvider> Planner<'a, P> { - pub fn new(sql_planner: crate::planner::PlannerDelegate<'a, P>) -> Self { - Self { sql_planner } + pub fn new(context_provider: ContextProviderAdapter<'a, P>) -> Self { + Self { context_provider } } /// Build sql logical plan from [InfluxqlStatement]. @@ -33,51 +32,48 @@ impl<'a, P: MetaProvider> Planner<'a, P> { /// the [InfluxqlStatement] will be converted to [SqlStatement] first, /// and build plan then. pub fn statement_to_plan(self, stmt: InfluxqlStatement) -> Result { - match stmt { - InfluxqlStatement::Select(stmt) => self.select_to_plan(*stmt), - InfluxqlStatement::CreateDatabase(_) => todo!(), - InfluxqlStatement::ShowDatabases(_) => todo!(), - InfluxqlStatement::ShowRetentionPolicies(_) => todo!(), - InfluxqlStatement::ShowTagKeys(_) => todo!(), - InfluxqlStatement::ShowTagValues(_) => todo!(), - InfluxqlStatement::ShowFieldKeys(_) => todo!(), - InfluxqlStatement::ShowMeasurements(_) => todo!(), - InfluxqlStatement::Delete(_) => todo!(), - InfluxqlStatement::DropMeasurement(_) => todo!(), - InfluxqlStatement::Explain(_) => todo!(), + match &stmt { + InfluxqlStatement::Select(_) => self.select_to_plan(stmt), + InfluxqlStatement::CreateDatabase(_) + | InfluxqlStatement::ShowDatabases(_) + | InfluxqlStatement::ShowRetentionPolicies(_) + | InfluxqlStatement::ShowTagKeys(_) + | InfluxqlStatement::ShowTagValues(_) + | InfluxqlStatement::ShowFieldKeys(_) + | InfluxqlStatement::ShowMeasurements(_) + | InfluxqlStatement::Delete(_) + | InfluxqlStatement::DropMeasurement(_) + | InfluxqlStatement::Explain(_) => Unimplemented { + stmt: stmt.to_string(), + } + .fail(), } } - pub fn select_to_plan(self, stmt: SelectStatement) -> Result { - let mut stmt = stmt; - let provider_impl = MeasurementProviderImpl(&self.sql_planner); - let rewriter = Rewriter::new(&provider_impl); - rewriter - .rewrite(&mut stmt) - .box_err() - .context(BuildInfluxqlPlan)?; - - let sql_stmt = SqlStatement::Query(Box::new( - Converter::convert(stmt) - .box_err() - .context(BuildInfluxqlPlan)?, - )); + pub fn select_to_plan(self, stmt: InfluxqlStatement) -> Result { + let influx_schema_provider = InfluxSchemaProviderImpl { + context_provider: &self.context_provider, + }; + let influxql_logical_planner = InfluxQLToLogicalPlan::new( + &influx_schema_provider, + CERESDB_MEASUREMENT_COLUMN_NAME.to_string(), + ); - self.sql_planner - .sql_statement_to_plan(sql_stmt) + let df_plan = influxql_logical_planner + .statement_to_plan(stmt) .box_err() - .context(BuildInfluxqlPlan) - } -} - -pub trait MeasurementProvider { - fn measurement(&self, measurement_name: &str) -> GenericResult>; -} - -struct MeasurementProviderImpl<'a, P: MetaProvider>(&'a crate::planner::PlannerDelegate<'a, P>); + .context(BuildPlan { + msg: "build df plan for influxql select statement", + })?; + let tables = Arc::new( + self.context_provider + .try_into_container() + .box_err() + .context(BuildPlan { + msg: "get tables from df plan of select", + })?, + ); -impl<'a, P: MetaProvider> MeasurementProvider for MeasurementProviderImpl<'a, P> { - fn measurement(&self, measurement_name: &str) -> GenericResult> { - self.0.find_table(measurement_name).box_err() + Ok(Plan::Query(QueryPlan { df_plan, tables })) } } diff --git a/sql/src/influxql/provider.rs b/sql/src/influxql/provider.rs new file mode 100644 index 0000000000..7a90f16dfa --- /dev/null +++ b/sql/src/influxql/provider.rs @@ -0,0 +1,490 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Influxql schema provider + +use std::sync::Arc; + +use arrow::datatypes::Field as ArrowField; +use common_types::{column_schema::ColumnSchema, datum::DatumKind, schema::Schema}; +use common_util::error::BoxError; +use datafusion::sql::planner::ContextProvider; +use influxql_logical_planner::{ + provider::{InfluxColumnType, InfluxFieldType, Schema as InfluxSchema, SchemaProvider}, + DataFusionError, Result as DatafusionResult, +}; + +use crate::{ + influxql::error::*, + provider::{ContextProviderAdapter, MetaProvider}, +}; + +/// Influx schema used for build logical plan +pub struct InfluxSchemaImpl { + columns: Vec, + time_column_idx: usize, +} + +impl InfluxSchemaImpl { + /// New influx schema by ceresdb inner schema. + /// + /// NOTICE: The compatible ceresdb inner schema is required as following: + /// + Only one timestamp column named "time" + /// + Tag column only can be string type and nullable + /// + Field column only can be int64/uint64/float64/string/boolean type + /// and nullable + fn new(schema: &Schema) -> Result { + let cols = schema.columns(); + let timestamp_key_idx = schema.timestamp_index(); + let tsid_idx_opt = schema.index_of_tsid(); + let arrow_fields = &schema.to_arrow_schema_ref().fields; + + let influx_columns = arrow_fields + .iter() + .zip(cols.iter().enumerate()) + .filter_map(|(arrow_col, (col_idx, col))| { + if matches!(tsid_idx_opt, Some(tsid_idx) if col_idx == tsid_idx) { + None + } else { + let influx_type_res = + map_column_to_influx_column(col, col_idx == timestamp_key_idx); + Some(influx_type_res.map(|influx_type| InfluxColumnSchema { + influx_type, + arrow_field: arrow_col.clone(), + })) + } + }) + .collect::>>()?; + + // Schema is ensured to have timestamp key. + let time_column_idx = influx_columns + .iter() + .enumerate() + .find(|(_, column)| matches!(column.influx_type, InfluxColumnType::Timestamp)) + .map(|(idx, _)| idx) + .unwrap(); + + Ok(Self { + columns: influx_columns, + time_column_idx, + }) + } +} + +impl InfluxSchema for InfluxSchemaImpl { + fn columns(&self) -> Vec<(InfluxColumnType, &ArrowField)> { + self.columns + .iter() + .map(|column| (column.influx_type, &column.arrow_field)) + .collect() + } + + fn tags(&self) -> Vec<&ArrowField> { + self.columns + .iter() + .filter_map(|column| { + if matches!(column.influx_type, InfluxColumnType::Tag) { + Some(&column.arrow_field) + } else { + None + } + }) + .collect() + } + + fn fields(&self) -> Vec<&ArrowField> { + self.columns + .iter() + .filter_map(|column| { + if matches!(column.influx_type, InfluxColumnType::Field(..)) { + Some(&column.arrow_field) + } else { + None + } + }) + .collect() + } + + fn time(&self) -> &ArrowField { + // Time column must exist, has checked it when building. + let time_column = &self.columns[self.time_column_idx]; + + &time_column.arrow_field + } + + fn column(&self, idx: usize) -> (InfluxColumnType, &ArrowField) { + let column = &self.columns[idx]; + + (column.influx_type, &column.arrow_field) + } + + fn find_index_of(&self, name: &str) -> Option { + self.columns + .iter() + .enumerate() + .find(|(_, column)| column.arrow_field.name() == name) + .map(|(index, _)| index) + } +} + +fn map_column_to_influx_column( + column: &ColumnSchema, + is_timestamp_key: bool, +) -> Result { + if is_timestamp_key { + return map_column_to_influx_time_column(column); + } + + if column.is_tag { + return map_column_to_influx_tag_column(column); + } + + map_column_to_influx_field_column(column) +} + +// TODO: don't restrict the time column name. +fn map_column_to_influx_time_column(column: &ColumnSchema) -> Result { + if column.name == "time" && !column.is_nullable { + Ok(InfluxColumnType::Timestamp) + } else { + BuildSchema { + msg: format!("invalid time column, column:{column:?}"), + } + .fail() + } +} + +// TODO: support more tag types. +fn map_column_to_influx_tag_column(column: &ColumnSchema) -> Result { + if matches!(column.data_type, DatumKind::String) && column.is_nullable { + Ok(InfluxColumnType::Tag) + } else { + BuildSchema { + msg: format!("invalid tag column, column:{column:?}"), + } + .fail() + } +} + +// TODO: support more field types. +fn map_column_to_influx_field_column(column: &ColumnSchema) -> Result { + if column.is_nullable { + match column.data_type { + DatumKind::Int64 => Ok(InfluxColumnType::Field(InfluxFieldType::Integer)), + DatumKind::UInt64 => Ok(InfluxColumnType::Field(InfluxFieldType::UInteger)), + DatumKind::Double => Ok(InfluxColumnType::Field(InfluxFieldType::Float)), + DatumKind::String => Ok(InfluxColumnType::Field(InfluxFieldType::String)), + DatumKind::Boolean => Ok(InfluxColumnType::Field(InfluxFieldType::Boolean)), + DatumKind::Null + | DatumKind::Timestamp + | DatumKind::Float + | DatumKind::Varbinary + | DatumKind::UInt32 + | DatumKind::UInt16 + | DatumKind::UInt8 + | DatumKind::Int32 + | DatumKind::Int16 + | DatumKind::Int8 + | DatumKind::Date + | DatumKind::Time => BuildSchema { + msg: format!("invalid field column, column:{column:?}"), + } + .fail(), + } + } else { + BuildSchema { + msg: format!("invalid field column, column:{column:?}"), + } + .fail() + } +} + +struct InfluxColumnSchema { + influx_type: InfluxColumnType, + arrow_field: ArrowField, +} + +/// Influx schema provider used for building logical plan +pub(crate) struct InfluxSchemaProviderImpl<'a, P: MetaProvider> { + pub(crate) context_provider: &'a ContextProviderAdapter<'a, P>, +} + +impl<'a, P: MetaProvider> SchemaProvider for InfluxSchemaProviderImpl<'a, P> { + fn get_table_provider( + &self, + name: &str, + ) -> DatafusionResult> { + self.context_provider + .get_table_provider(name.into()) + .box_err() + .map_err(|e| DataFusionError::External(e)) + } + + fn table_names(&self) -> DatafusionResult> { + Err(DataFusionError::NotImplemented( + "get all table names".to_string(), + )) + } + + fn table_schema( + &self, + name: &str, + ) -> DatafusionResult>> { + let table_opt = self + .context_provider + .table(name.into()) + .box_err() + .map_err(|e| DataFusionError::External(e))?; + + Ok(match table_opt { + Some(table) => { + let influx_schema = InfluxSchemaImpl::new(&table.schema()) + .box_err() + .map_err(|e| DataFusionError::External(e))?; + Some(Arc::new(influx_schema)) + } + None => None, + }) + } + + fn table_exists(&self, name: &str) -> DatafusionResult { + Ok(self.table_schema(name)?.is_some()) + } +} + +#[cfg(test)] +mod test { + use arrow::datatypes::{DataType, TimeUnit}; + use common_types::{ + column_schema, + datum::DatumKind, + schema::{self, Schema, TSID_COLUMN}, + }; + use influxql_logical_planner::provider::{ + InfluxColumnType, InfluxFieldType, Schema as InfluxSchema, + }; + + use super::InfluxSchemaImpl; + + #[test] + fn test_build_influx_schema() { + let cases = vec![ + Case::Compatible, + Case::TimeNameInvalid, + Case::TagNotNull, + Case::FieldNotNull, + Case::TagTypeInvalid, + Case::FieldTypeInvalid, + ]; + + for case in cases { + let schema = build_test_schema(case); + let influx_schema = InfluxSchemaImpl::new(&schema); + match case { + Case::Compatible => { + let influx_schema = influx_schema.unwrap(); + let columns = influx_schema.columns(); + for column in columns { + match column { + (InfluxColumnType::Timestamp, field) => { + assert_eq!(field.name(), "time"); + assert_eq!( + field.data_type(), + &DataType::Timestamp(TimeUnit::Millisecond, None) + ); + assert!(!field.is_nullable()); + } + (InfluxColumnType::Tag, field) => { + assert_eq!(field.name(), "tag"); + assert_eq!(field.data_type(), &DataType::Utf8); + assert!(field.is_nullable()); + } + (InfluxColumnType::Field(InfluxFieldType::Integer), field) => { + assert_eq!(field.name(), "int_field"); + assert_eq!(field.data_type(), &DataType::Int64); + assert!(field.is_nullable()); + } + (InfluxColumnType::Field(InfluxFieldType::UInteger), field) => { + assert_eq!(field.name(), "uint_field"); + assert_eq!(field.data_type(), &DataType::UInt64); + assert!(field.is_nullable()); + } + (InfluxColumnType::Field(InfluxFieldType::Float), field) => { + assert_eq!(field.name(), "float_field"); + assert_eq!(field.data_type(), &DataType::Float64); + assert!(field.is_nullable()); + } + (InfluxColumnType::Field(InfluxFieldType::String), field) => { + assert_eq!(field.name(), "str_field"); + assert_eq!(field.data_type(), &DataType::Utf8); + assert!(field.is_nullable()); + } + (InfluxColumnType::Field(InfluxFieldType::Boolean), field) => { + assert_eq!(field.name(), "bool_field"); + assert_eq!(field.data_type(), &DataType::Boolean); + assert!(field.is_nullable()); + } + } + } + } + Case::TimeNameInvalid => { + assert!(influx_schema.is_err()); + assert!(influx_schema + .err() + .unwrap() + .to_string() + .contains("invalid time column")); + } + Case::TagNotNull => { + assert!(influx_schema.is_err()); + assert!(influx_schema + .err() + .unwrap() + .to_string() + .contains("invalid tag column")); + } + Case::FieldNotNull => { + assert!(influx_schema.is_err()); + assert!(influx_schema + .err() + .unwrap() + .to_string() + .contains("invalid field column")); + } + Case::TagTypeInvalid => { + assert!(influx_schema.is_err()); + assert!(influx_schema + .err() + .unwrap() + .to_string() + .contains("invalid tag column")); + } + Case::FieldTypeInvalid => { + assert!(influx_schema.is_err()); + assert!(influx_schema + .err() + .unwrap() + .to_string() + .contains("invalid field column")); + } + } + } + } + + #[derive(Clone, Copy)] + enum Case { + Compatible, + TimeNameInvalid, + TagNotNull, + FieldNotNull, + TagTypeInvalid, + FieldTypeInvalid, + } + + fn build_test_schema(case: Case) -> Schema { + let time_column_name = if matches!(case, Case::TimeNameInvalid) { + "not_time" + } else { + "time" + }; + + let base_schema_builder = schema::Builder::new() + .auto_increment_column_id(true) + .add_key_column( + column_schema::Builder::new(time_column_name.to_string(), DatumKind::Timestamp) + .is_nullable(false) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_key_column( + column_schema::Builder::new(TSID_COLUMN.to_owned(), DatumKind::UInt64) + .is_nullable(false) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("tag".to_string(), DatumKind::String) + .is_nullable(true) + .is_tag(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("str_field".to_string(), DatumKind::String) + .is_nullable(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("int_field".to_string(), DatumKind::Int64) + .is_nullable(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("uint_field".to_string(), DatumKind::UInt64) + .is_nullable(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("float_field".to_string(), DatumKind::Double) + .is_nullable(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("bool_field".to_string(), DatumKind::Boolean) + .is_nullable(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap(); + + let schema_builder = match case { + Case::TagNotNull => base_schema_builder + .add_normal_column( + column_schema::Builder::new("tag_not_null".to_string(), DatumKind::String) + .is_nullable(false) + .is_tag(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap(), + Case::FieldNotNull => base_schema_builder + .add_normal_column( + column_schema::Builder::new("field_not_null".to_string(), DatumKind::Int64) + .is_nullable(false) + .build() + .expect("should succeed build column schema"), + ) + .unwrap(), + Case::TagTypeInvalid => base_schema_builder + .add_normal_column( + column_schema::Builder::new("tag_invaild".to_string(), DatumKind::Varbinary) + .is_nullable(true) + .is_tag(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap(), + Case::FieldTypeInvalid => base_schema_builder + .add_normal_column( + column_schema::Builder::new("field_invalid".to_string(), DatumKind::Varbinary) + .is_nullable(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap(), + _ => base_schema_builder, + }; + + schema_builder.build().expect("should succeed build schema") + } +} diff --git a/sql/src/influxql/select/converter.rs b/sql/src/influxql/select/converter.rs deleted file mode 100644 index c8ee6a7920..0000000000 --- a/sql/src/influxql/select/converter.rs +++ /dev/null @@ -1,386 +0,0 @@ -// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. - -//! Influxql select statement converter(to sql statement) - -use influxdb_influxql_parser::{ - common::{MeasurementName, QualifiedMeasurementName}, - expression::{ - BinaryOperator as InfluxqlBinaryOperator, ConditionalExpression, ConditionalOperator, - Expr as InfluxqlExpr, - }, - literal::Literal, - select::{Dimension, MeasurementSelection, SelectStatement}, -}; -use snafu::ensure; -use sqlparser::ast::{ - BinaryOperator, Expr, Function, FunctionArg, FunctionArgExpr, Ident, ObjectName, Offset, - OffsetRows, Query, Select, SelectItem, SetExpr, TableFactor, TableWithJoins, Value, -}; - -use crate::influxql::{error::*, util}; - -/// Influxql select statement converter -// Derived from influxdb_iox: -// https://github.com/influxdata/influxdb_iox/blob/ff11fe465d02faf6c4dd3017df8750b38d4afd2b/iox_query/src/plan/influxql/planner.rs -#[allow(dead_code)] -pub struct Converter; - -impl Converter { - #[allow(dead_code)] - pub fn convert(stmt: SelectStatement) -> Result { - // Fields in `Query` needed to be converted. - // - limit - // - order by - // - limit - // - offset - // - select body - let limit = stmt.limit.map(|limit| { - let limit_n = *limit; - Expr::Value(Value::Number(limit_n.to_string(), false)) - }); - - let offset = stmt.offset.map(|offset| { - let offset_n = *offset; - let offset_val = Expr::Value(Value::Number(offset_n.to_string(), false)); - - Offset { - value: offset_val, - rows: OffsetRows::None, - } - }); - - // For select body: - // - projection - // - from - // - selection - // - group_by - let projection_exprs = stmt - .fields - .iter() - .map(|field| expr_to_sql_expr(ExprScope::Projection, &field.expr)) - .collect::>>()?; - let projection = stmt - .fields - .iter() - .zip(projection_exprs.into_iter()) - .map(|(field, expr)| match &field.alias { - Some(alias) => SelectItem::ExprWithAlias { - expr, - alias: Ident::new(alias.to_string()), - }, - None => SelectItem::UnnamedExpr(expr), - }) - .collect(); - - ensure!( - stmt.from.len() == 1, - Unimplemented { - msg: "select from multiple measurements", - } - ); - let measurement_name = match &stmt.from[0] { - MeasurementSelection::Name(QualifiedMeasurementName { name, .. }) => match name { - MeasurementName::Name(name) => name.to_string(), - MeasurementName::Regex(re) => { - return Convert { - msg: format!("convert from to sql statement encounter regex, regex:{re}"), - } - .fail() - } - }, - MeasurementSelection::Subquery(_) => { - return Unimplemented { - msg: "select from subquery", - } - .fail() - } - }; - let table_factor = TableFactor::Table { - name: ObjectName(vec![Ident::with_quote('`', measurement_name)]), - alias: None, - args: None, - with_hints: Vec::default(), - }; - let from = vec![TableWithJoins { - relation: table_factor, - joins: Vec::default(), - }]; - - let selection = match stmt.condition { - Some(condition) => Some(conditional_to_sql_expr(&condition)?), - None => None, - }; - - let group_by = match stmt.group_by { - Some(keys) => keys - .iter() - .map(|key| match key { - Dimension::Time { .. } => Unimplemented { - msg: "group by time interval", - } - .fail(), - Dimension::Tag(tag) => Ok(Expr::Identifier(Ident::new(tag.to_string()))), - Dimension::Regex(re) => Convert { - msg: format!( - "convert group by to sql statement encounter regex, regex:{re}" - ), - } - .fail(), - Dimension::Wildcard => Convert { - msg: "convert group by to sql statement encounter wildcard", - } - .fail(), - }) - .collect::>>()?, - None => Vec::default(), - }; - - let body = Select { - distinct: false, - top: None, - projection, - into: None, - from, - lateral_views: Vec::default(), - selection, - group_by, - cluster_by: Vec::default(), - sort_by: Vec::default(), - having: None, - qualify: None, - distribute_by: Vec::default(), - }; - - Ok(Query { - with: None, - body: Box::new(SetExpr::Select(Box::new(body))), - order_by: Vec::default(), - limit, - offset, - fetch: None, - locks: Vec::default(), - }) - } -} - -#[derive(Debug, Eq, PartialEq, Clone, Copy)] -enum ExprScope { - Projection, - Where, -} - -/// Map an InfluxQL [`InfluxqlExpr`] to a sql [`Expr`]. -fn expr_to_sql_expr(scope: ExprScope, iql: &InfluxqlExpr) -> Result { - match iql { - // rewriter is expected to expand wildcard expressions - InfluxqlExpr::Wildcard(_) => Convert { - msg: "unexpected wildcard in projection", - } - .fail(), - - InfluxqlExpr::VarRef { - name, - data_type: opt_dst_type, - } => { - if let Some(dst_type) = opt_dst_type { - return Unimplemented { - msg: format!("cast to dst type, column:{name}, dst type:{dst_type}"), - } - .fail(); - }; - - Ok(Expr::Identifier(Ident::new(name.to_string()))) - } - - InfluxqlExpr::BindParameter(_) => Unimplemented { - msg: "bind parameter", - } - .fail(), - - InfluxqlExpr::Literal(val) => Ok(match val { - Literal::Integer(v) => Expr::Value(Value::Number(v.to_string(), false)), - Literal::Unsigned(v) => Expr::Value(Value::Number(v.to_string(), false)), - Literal::Float(v) => Expr::Value(Value::Number(v.to_string(), false)), - Literal::String(v) => Expr::Value(Value::SingleQuotedString(v.clone())), - Literal::Timestamp(v) => Expr::Value(Value::SingleQuotedString(v.to_rfc3339())), - Literal::Duration(_) => { - return Unimplemented { - msg: "duration literal", - } - .fail() - } - Literal::Regex(re) => match scope { - // a regular expression in a projection list is unexpected, - // as it should have been expanded by the rewriter. - ExprScope::Projection => { - return Convert { - msg: format!( - "convert projection to sql statement encounter regex, regex:{re}" - ), - } - .fail() - } - ExprScope::Where => { - return Unimplemented { - msg: "regex in where clause", - } - .fail() - } - }, - Literal::Boolean(v) => Expr::Value(Value::Boolean(*v)), - }), - - InfluxqlExpr::Distinct(_) => Unimplemented { msg: "DISTINCT" }.fail(), - - InfluxqlExpr::Call { name, args } => call_to_sql_expr(scope, name, args), - - InfluxqlExpr::Binary { lhs, op, rhs } => binary_expr_to_sql_expr(scope, lhs, op, rhs), - - InfluxqlExpr::Nested(e) => expr_to_sql_expr(scope, e), - } -} - -fn call_to_sql_expr(scope: ExprScope, name: &str, args: &[InfluxqlExpr]) -> Result { - if util::is_scalar_math_function(name) { - let name = ObjectName(vec![Ident::new(name.to_string())]); - // TODO: Support `FunctionArg::Named`. - let args = args - .iter() - .map(|arg| { - let sql_expr_res = expr_to_sql_expr(scope, arg); - sql_expr_res.map(|sql_expr| FunctionArg::Unnamed(FunctionArgExpr::Expr(sql_expr))) - }) - .collect::>>()?; - - return Ok(Expr::Function(Function { - name, - args, - over: None, - distinct: false, - special: false, - })); - } - - match scope { - ExprScope::Projection => Unimplemented { - msg: "aggregate and selector functions in projection list", - } - .fail(), - - ExprScope::Where => { - if name.eq_ignore_ascii_case("now") { - Unimplemented { - msg: "now() in where clause", - } - .fail() - } else { - Convert { - msg: format!("invalid function call in condition: {name}"), - } - .fail() - } - } - } -} - -fn binary_expr_to_sql_expr( - scope: ExprScope, - lhs: &InfluxqlExpr, - op: &InfluxqlBinaryOperator, - rhs: &InfluxqlExpr, -) -> Result { - let left = Box::new(expr_to_sql_expr(scope, lhs)?); - let right = Box::new(expr_to_sql_expr(scope, rhs)?); - - let op = match op { - InfluxqlBinaryOperator::Add => BinaryOperator::Plus, - InfluxqlBinaryOperator::Sub => BinaryOperator::Minus, - InfluxqlBinaryOperator::Mul => BinaryOperator::Multiply, - InfluxqlBinaryOperator::Div => BinaryOperator::Divide, - InfluxqlBinaryOperator::Mod => BinaryOperator::Modulo, - InfluxqlBinaryOperator::BitwiseAnd => BinaryOperator::BitwiseAnd, - InfluxqlBinaryOperator::BitwiseOr => BinaryOperator::BitwiseOr, - InfluxqlBinaryOperator::BitwiseXor => BinaryOperator::BitwiseXor, - }; - - Ok(Expr::BinaryOp { left, op, right }) -} - -/// Map an InfluxQL [`ConditionalExpression`] to a sql [`Expr`]. -fn conditional_to_sql_expr(iql: &ConditionalExpression) -> Result { - match iql { - ConditionalExpression::Expr(expr) => expr_to_sql_expr(ExprScope::Where, expr), - ConditionalExpression::Binary { lhs, op, rhs } => { - let op = conditional_op_to_operator(*op)?; - let (lhs, rhs) = (conditional_to_sql_expr(lhs)?, conditional_to_sql_expr(rhs)?); - - Ok(Expr::BinaryOp { - left: Box::new(lhs), - op, - right: Box::new(rhs), - }) - } - ConditionalExpression::Grouped(e) => conditional_to_sql_expr(e), - } -} - -fn conditional_op_to_operator(op: ConditionalOperator) -> Result { - match op { - ConditionalOperator::Eq => Ok(BinaryOperator::Eq), - ConditionalOperator::NotEq => Ok(BinaryOperator::NotEq), - ConditionalOperator::EqRegex => Unimplemented { - msg: "eq regex in where clause", - } - .fail(), - ConditionalOperator::NotEqRegex => Unimplemented { - msg: "not eq regex in where clause", - } - .fail(), - ConditionalOperator::Lt => Ok(BinaryOperator::Lt), - ConditionalOperator::LtEq => Ok(BinaryOperator::LtEq), - ConditionalOperator::Gt => Ok(BinaryOperator::Gt), - ConditionalOperator::GtEq => Ok(BinaryOperator::GtEq), - ConditionalOperator::And => Ok(BinaryOperator::And), - ConditionalOperator::Or => Ok(BinaryOperator::Or), - // NOTE: This is not supported by InfluxQL SELECT expressions, so it is unexpected - ConditionalOperator::In => Convert { - msg: "unexpected binary operator: IN", - } - .fail(), - } -} - -#[cfg(test)] -mod test { - use sqlparser::ast::Statement as SqlStatement; - - use crate::{ - ast::Statement, influxql::select::converter::Converter, parser::Parser, tests::parse_select, - }; - - #[test] - fn test_basic_convert() { - // Common parts between influxql and sql, include: - // - limit - // - offset - // - projection - // - from(single table) - // - selection - // - group_by - let stmt = parse_select( - "SELECT a, sin(b), c - FROM influxql_test WHERE a < 4 and b > 4.5 GROUP BY c LIMIT 1 OFFSET 0", - ); - let converted_sql_stmt = Statement::Standard(Box::new(SqlStatement::Query(Box::new( - Converter::convert(stmt).unwrap(), - )))); - - let sql_stmts = Parser::parse_sql( - "SELECT a, sin(b), c - FROM influxql_test WHERE a < 4 and b > 4.5 GROUP BY c LIMIT 1 OFFSET 0", - ) - .unwrap(); - let expected_sql_stmt = sql_stmts.first().unwrap(); - assert_eq!(expected_sql_stmt, &converted_sql_stmt); - } -} diff --git a/sql/src/influxql/select/mod.rs b/sql/src/influxql/select/mod.rs deleted file mode 100644 index 0cf2aefd0e..0000000000 --- a/sql/src/influxql/select/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. - -//! Convert influxql to sql at statement level - -pub(crate) mod converter; -pub(crate) mod rewriter; diff --git a/sql/src/influxql/select/rewriter.rs b/sql/src/influxql/select/rewriter.rs deleted file mode 100644 index e44d73b6b9..0000000000 --- a/sql/src/influxql/select/rewriter.rs +++ /dev/null @@ -1,432 +0,0 @@ -// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. - -//! Influxql select statement rewriter - -use std::{collections::BTreeSet, ops::ControlFlow}; - -use common_util::error::BoxError; -use influxdb_influxql_parser::{ - common::{MeasurementName, QualifiedMeasurementName, ZeroOrMore}, - expression::{walk, Expr, WildcardType}, - identifier::Identifier, - literal::Literal, - select::{ - Dimension, Field, FieldList, FromMeasurementClause, MeasurementSelection, SelectStatement, - }, -}; -use itertools::{Either, Itertools}; -use snafu::{ensure, OptionExt, ResultExt}; - -use crate::influxql::{error::*, planner::MeasurementProvider, util}; - -/// Rewriter for the influxql statement -/// -/// It will rewrite statement before converting it to sql statement. -// Derived from influxdb_iox: -// https://github.com/influxdata/influxdb_iox/blob/ff11fe465d02faf6c4dd3017df8750b38d4afd2b/iox_query/src/plan/influxql/rewriter.rs -pub(crate) struct Rewriter<'a> { - measurement_provider: &'a dyn MeasurementProvider, -} - -impl<'a> Rewriter<'a> { - pub fn new(measurement_provider: &'a dyn MeasurementProvider) -> Self { - Self { - measurement_provider, - } - } - - pub fn rewrite(&self, stmt: &mut SelectStatement) -> Result<()> { - self.rewrite_from(stmt)?; - self.rewrite_field_list(stmt) - } - - fn rewrite_from(&self, stmt: &mut SelectStatement) -> Result<()> { - let mut new_from = Vec::new(); - for ms in stmt.from.iter() { - match ms { - MeasurementSelection::Name(qmn) => { - match qmn { - QualifiedMeasurementName { - name: MeasurementName::Name(name), - .. - } => { - let _ = self - .measurement_provider - .measurement(name) - .context(RewriteWithCause { - msg: format!( - "rewrite from failed to find measurement, measurement:{name}", - ), - })? - .context(RewriteNoCause { - msg: format!( - "rewrite from found measurement not found, measurement:{name}" - ), - })?; - new_from.push(ms.clone()); - } - QualifiedMeasurementName { - name: MeasurementName::Regex(_), - .. - } => { - // TODO: need to support get all tables first. - return Unimplemented { - msg: "rewrite from regex", - } - .fail(); - } - } - } - MeasurementSelection::Subquery(_) => { - return Unimplemented { - msg: "rewrite from subquery", - } - .fail(); - } - } - } - - // TODO: support from multiple tables. - ensure!( - new_from.len() == 1, - Unimplemented { - msg: "rewrite from multiple measurements" - } - ); - - stmt.from = FromMeasurementClause::new(new_from); - - Ok(()) - } - - /// Rewrite the projection list and GROUP BY of the specified `SELECT`. - // TODO: should support from multiple measurements. - // TODO: support rewrite fields in subquery. - fn rewrite_field_list(&self, stmt: &mut SelectStatement) -> Result<()> { - ensure!( - stmt.from.len() == 1, - Unimplemented { - msg: "rewrite field list from multiple measurements" - } - ); - - match &stmt.from[0] { - MeasurementSelection::Name(qualified_name) => { - let QualifiedMeasurementName { name, .. } = qualified_name; - - match name { - MeasurementName::Name(name) => { - // Get schema, and split columns to tags and fields. - let (tags, fields) = self - .tags_and_fields_in_measurement(name.as_str()) - .box_err() - .context(RewriteWithCause { - msg: "rewrite field list fail to find measurement", - })?; - let mut group_by_tags = BTreeSet::new(); - maybe_rewrite_group_by(&tags, &mut group_by_tags, stmt)?; - maybe_rewrite_projection(&tags, &fields, &group_by_tags, stmt)?; - - Ok(()) - } - - MeasurementName::Regex(_) => RewriteNoCause { - msg: "rewrite field list should not encounter regex in from clause", - } - .fail(), - } - } - - MeasurementSelection::Subquery(_) => Unimplemented { - msg: "rewrite field list from subquery", - } - .fail(), - } - } - - fn tags_and_fields_in_measurement( - &self, - measurement_name: &str, - ) -> Result<(Vec, Vec)> { - let measurement = self - .measurement_provider - .measurement(measurement_name) - .context(RewriteWithCause { - msg: format!("failed to find measurement, measurement:{measurement_name}"), - })? - .context(RewriteNoCause { - msg: format!("measurement not found, measurement:{measurement_name}"), - })?; - - // Get schema and split to tags and fields. - let schema = measurement.schema(); - let tsid_idx_opt = schema.index_of_tsid(); - let timestamp_key_idx = schema.timestamp_index(); - let tags_and_fields: (Vec, Vec) = schema - .columns() - .iter() - .enumerate() - .filter_map(|(col_idx, col)| { - let is_tsid_col = match tsid_idx_opt { - Some(idx) => col_idx == idx, - None => false, - }; - let is_timestamp_key_col = col_idx == timestamp_key_idx; - - if !is_tsid_col && !is_timestamp_key_col { - Some(col) - } else { - None - } - }) - .partition_map(|col| { - if col.is_tag { - Either::Left(col.name.clone()) - } else { - Either::Right(col.name.clone()) - } - }); - Ok(tags_and_fields) - } -} - -fn maybe_rewrite_group_by( - tags: &[String], - group_by_tags: &mut BTreeSet, - stmt: &mut SelectStatement, -) -> Result<()> { - if let Some(group_by) = &stmt.group_by { - for dimension in group_by.iter() { - match dimension { - Dimension::Time { .. } => { - return Unimplemented { - msg: "group by time interval", - } - .fail(); - } - - Dimension::Tag(tag) => { - if !tags.contains(&tag.to_string()) { - return RewriteNoCause { - msg: format!("rewrite group by encounter tag not exist, tag:{tag}, exist tags:{tags:?}"), - } - .fail(); - } - let _ = group_by_tags.insert(tag.to_string()); - } - - Dimension::Regex(re) => { - let re = util::parse_regex(re).box_err().context(RewriteWithCause { - msg: format!("rewrite group by encounter invalid regex, regex:{re}"), - })?; - let match_tags = tags.iter().filter_map(|tag| { - if re.is_match(tag.as_str()) { - Some(tag.clone()) - } else { - None - } - }); - group_by_tags.extend(match_tags); - } - - Dimension::Wildcard => group_by_tags.extend(tags.iter().cloned()), - } - } - - stmt.group_by = Some(ZeroOrMore::new( - group_by_tags - .iter() - .map(|tag| Dimension::Tag(Identifier::new(tag.clone()))) - .collect::>(), - )); - } - - Ok(()) -} - -fn maybe_rewrite_projection( - tags: &[String], - fields: &[String], - groub_by_tags: &BTreeSet, - stmt: &mut SelectStatement, -) -> Result<()> { - let mut new_fields = Vec::new(); - - enum AddFieldType { - Tag, - Field, - Both, - } - - let add_fields = |filter: &dyn Fn(&String) -> bool, - add_field_type: AddFieldType, - new_fields: &mut Vec| { - if matches!(&add_field_type, AddFieldType::Tag | AddFieldType::Both) { - let tag_fields = tags.iter().filter_map(|tag| { - if !groub_by_tags.contains(tag.as_str()) && filter(tag) { - Some(Field { - expr: Expr::VarRef { - name: tag.clone().into(), - data_type: None, - }, - alias: None, - }) - } else { - None - } - }); - new_fields.extend(tag_fields); - } - - if matches!(&add_field_type, AddFieldType::Field | AddFieldType::Both) { - let normal_fields = fields.iter().filter_map(|field| { - if filter(field) { - Some(Field { - expr: Expr::VarRef { - name: field.clone().into(), - data_type: None, - }, - alias: None, - }) - } else { - None - } - }); - new_fields.extend(normal_fields); - } - }; - - for f in stmt.fields.iter() { - match &f.expr { - Expr::Wildcard(wct) => { - let filter = |_: &String| -> bool { true }; - - match wct { - Some(WildcardType::Tag) => { - add_fields(&filter, AddFieldType::Tag, &mut new_fields); - } - Some(WildcardType::Field) => { - add_fields(&filter, AddFieldType::Field, &mut new_fields); - } - None => { - add_fields(&filter, AddFieldType::Both, &mut new_fields); - } - } - } - - Expr::Literal(Literal::Regex(re)) => { - let re = util::parse_regex(re).box_err().context(RewriteWithCause { - msg: format!("rewrite projection encounter invalid regex, regex:{re}"), - })?; - - let filter = |v: &String| -> bool { re.is_match(v.as_str()) }; - - add_fields(&filter, AddFieldType::Both, &mut new_fields); - } - - Expr::Call { args, .. } => { - let mut args = args; - - // Search for the call with a wildcard by continuously descending until - // we no longer have a call. - while let Some(Expr::Call { - args: inner_args, .. - }) = args.first() - { - args = inner_args; - } - - match args.first() { - Some(Expr::Wildcard(Some(WildcardType::Tag))) => { - return RewriteNoCause { - msg: "rewrite projection found tags placed in a call", - } - .fail(); - } - Some(Expr::Wildcard(_)) | Some(Expr::Literal(Literal::Regex(_))) => { - return Unimplemented { - msg: "wildcard or regex in call", - } - .fail(); - } - _ => { - new_fields.push(f.clone()); - continue; - } - } - } - - Expr::Binary { .. } => { - let has_wildcard = walk::walk_expr(&f.expr, &mut |e| { - match e { - Expr::Wildcard(_) | Expr::Literal(Literal::Regex(_)) => { - return ControlFlow::Break(()) - } - _ => {} - } - ControlFlow::Continue(()) - }) - .is_break(); - - if has_wildcard { - return RewriteNoCause { - msg: "rewrite projection encounter wildcard or regex in binary expression", - } - .fail(); - } - - new_fields.push(f.clone()); - } - - _ => new_fields.push(f.clone()), - } - } - - stmt.fields = FieldList::new(new_fields); - - Ok(()) -} - -#[cfg(test)] -mod test { - use crate::tests::{parse_select, rewrite_statement, MockMetaProvider}; - - #[test] - fn test_wildcard_and_regex_in_projection() { - let namespace = MockMetaProvider::default(); - - let mut stmt = parse_select("SELECT * FROM influxql_test"); - rewrite_statement(&namespace, &mut stmt); - assert_eq!( - "SELECT col1, col2, col3 FROM influxql_test", - stmt.to_string() - ); - - let mut stmt = parse_select("SELECT *::tag FROM influxql_test"); - rewrite_statement(&namespace, &mut stmt); - assert_eq!("SELECT col1, col2 FROM influxql_test", stmt.to_string()); - - let mut stmt = parse_select("SELECT *::field FROM influxql_test"); - rewrite_statement(&namespace, &mut stmt); - assert_eq!("SELECT col3 FROM influxql_test", stmt.to_string()); - } - - #[test] - fn test_wildcard_and_regex_in_group_by() { - let namespace = MockMetaProvider::default(); - - let mut stmt = parse_select("SELECT * FROM influxql_test GROUP BY *"); - rewrite_statement(&namespace, &mut stmt); - assert_eq!( - "SELECT col3 FROM influxql_test GROUP BY col1, col2", - stmt.to_string() - ); - - let mut stmt = parse_select("SELECT * FROM influxql_test GROUP BY col1"); - rewrite_statement(&namespace, &mut stmt); - assert_eq!( - "SELECT col2, col3 FROM influxql_test GROUP BY col1", - stmt.to_string() - ); - } -} diff --git a/sql/src/influxql/util.rs b/sql/src/influxql/util.rs deleted file mode 100644 index 7cc7b855f7..0000000000 --- a/sql/src/influxql/util.rs +++ /dev/null @@ -1,130 +0,0 @@ -// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. - -//! Some utils used process influxql - -use std::collections::HashSet; - -use influxdb_influxql_parser::string::Regex; -use lazy_static::lazy_static; - -// Copy from influxdb_iox: -// https://github.com/influxdata/influxdb_iox/blob/e7369449f8975f6f86bc665ea3e1f556c2777145/query_functions/src/regex.rs#L147 -pub fn clean_non_meta_escapes(pattern: &str) -> String { - if pattern.is_empty() { - return pattern.to_string(); - } - - #[derive(Debug, Copy, Clone)] - enum SlashState { - No, - Single, - Double, - } - - let mut next_state = SlashState::No; - - let next_chars = pattern - .chars() - .map(Some) - .skip(1) - .chain(std::iter::once(None)); - - // emit char based on previous - let new_pattern: String = pattern - .chars() - .zip(next_chars) - .filter_map(|(c, next_char)| { - let cur_state = next_state; - next_state = match (c, cur_state) { - ('\\', SlashState::No) => SlashState::Single, - ('\\', SlashState::Single) => SlashState::Double, - ('\\', SlashState::Double) => SlashState::Single, - _ => SlashState::No, - }; - - // Decide to emit `c` or not - match (cur_state, c, next_char) { - (SlashState::No, '\\', Some(next_char)) - | (SlashState::Double, '\\', Some(next_char)) - if !is_valid_character_after_escape(next_char) => - { - None - } - _ => Some(c), - } - }) - .collect(); - - new_pattern -} - -// Copy from influxdb_iox: -// https://github.com/influxdata/influxdb_iox/blob/e7369449f8975f6f86bc665ea3e1f556c2777145/query_functions/src/regex.rs#L123 -fn is_valid_character_after_escape(c: char) -> bool { - // same list as https://docs.rs/regex-syntax/0.6.25/src/regex_syntax/ast/parse.rs.html#1445-1538 - match c { - '0'..='7' => true, - '8'..='9' => true, - 'x' | 'u' | 'U' => true, - 'p' | 'P' => true, - 'd' | 's' | 'w' | 'D' | 'S' | 'W' => true, - _ => regex_syntax::is_meta_character(c), - } -} - -// Copy from influxdb_iox: -// https://github.com/influxdata/influxdb_iox/blob/e7369449f8975f6f86bc665ea3e1f556c2777145/iox_query/src/plan/influxql/util.rs#L48 -pub fn parse_regex(re: &Regex) -> std::result::Result { - let pattern = clean_non_meta_escapes(re.as_str()); - regex::Regex::new(&pattern) -} - -// Copy from influxql_iox. -lazy_static! { - static ref SCALAR_MATH_FUNCTIONS: HashSet<&'static str> = HashSet::from([ - "abs", "sin", "cos", "tan", "asin", "acos", "atan", "atan2", "exp", "log", "ln", "log2", - "log10", "sqrt", "pow", "floor", "ceil", "round", - ]); -} - -/// Returns `true` if `name` is a mathematical scalar function -/// supported by InfluxQL. -pub(crate) fn is_scalar_math_function(name: &str) -> bool { - SCALAR_MATH_FUNCTIONS.contains(name) -} - -mod test { - // Copy from influxdb_iox: - // https://github.com/influxdata/influxdb_iox/blob/e7369449f8975f6f86bc665ea3e1f556c2777145/query_functions/src/regex.rs#L357 - #[test] - fn test_clean_non_meta_escapes() { - let cases = vec![ - ("", ""), - (r#"\"#, r#"\"#), - (r#"\\"#, r#"\\"#), - // : is not a special meta character - (r#"\:"#, r#":"#), - // . is a special meta character - (r#"\."#, r#"\."#), - (r#"foo\"#, r#"foo\"#), - (r#"foo\\"#, r#"foo\\"#), - (r#"foo\:"#, r#"foo:"#), - (r#"foo\xff"#, r#"foo\xff"#), - (r#"fo\\o"#, r#"fo\\o"#), - (r#"fo\:o"#, r#"fo:o"#), - (r#"fo\:o\x123"#, r#"fo:o\x123"#), - (r#"fo\:o\x123\:"#, r#"fo:o\x123:"#), - (r#"foo\\\:bar"#, r#"foo\\:bar"#), - (r#"foo\\\:bar\\\:"#, r#"foo\\:bar\\:"#), - ("foo", "foo"), - ]; - - for (pattern, expected) in cases { - let cleaned_pattern = crate::influxql::util::clean_non_meta_escapes(pattern); - assert_eq!( - cleaned_pattern, expected, - "Expected '{pattern}' to be cleaned to '{expected}', got '{cleaned_pattern}'" - ); - } - } -} diff --git a/sql/src/planner.rs b/sql/src/planner.rs index 822f71516f..896b43b25d 100644 --- a/sql/src/planner.rs +++ b/sql/src/planner.rs @@ -36,7 +36,7 @@ use datafusion::{ ResolvedTableReference, }, }; -use influxdb_influxql_parser::statement::Statement as InfluxqlStatement; +use influxql_parser::statement::Statement as InfluxqlStatement; use log::{debug, trace}; use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; use sqlparser::ast::{ @@ -254,7 +254,9 @@ pub enum Error { InvalidWriteEntry { msg: String }, #[snafu(display("Failed to build influxql plan, err:{}", source))] - BuildInfluxqlPlan { source: GenericError }, + BuildInfluxqlPlan { + source: crate::influxql::error::Error, + }, } define_result!(Error); @@ -329,10 +331,11 @@ impl<'a, P: MetaProvider> Planner<'a, P> { pub fn influxql_stmt_to_plan(&self, statement: InfluxqlStatement) -> Result { let adapter = ContextProviderAdapter::new(self.provider, self.read_parallelism); - let planner = PlannerDelegate::new(adapter); - let influxql_planner = crate::influxql::planner::Planner::new(planner); - influxql_planner.statement_to_plan(statement) + let influxql_planner = crate::influxql::planner::Planner::new(adapter); + influxql_planner + .statement_to_plan(statement) + .context(BuildInfluxqlPlan) } pub fn write_req_to_plan( diff --git a/sql/src/tests.rs b/sql/src/tests.rs index cc593e5802..ecaf19d67c 100644 --- a/sql/src/tests.rs +++ b/sql/src/tests.rs @@ -3,26 +3,16 @@ use std::sync::Arc; use catalog::consts::{DEFAULT_CATALOG, DEFAULT_SCHEMA}; -use common_types::{ - column_schema, - datum::DatumKind, - schema::{Builder, Schema, TSID_COLUMN}, - tests::{build_default_value_schema, build_schema}, -}; -use common_util::error::GenericResult; +use common_types::tests::{build_default_value_schema, build_schema}; use datafusion::catalog::TableReference; use df_operator::{scalar::ScalarUdf, udaf::AggregateUdf}; -use influxdb_influxql_parser::{parse_statements, select::SelectStatement, statement::Statement}; use table_engine::{ memory::MemoryTable, table::{Table, TableId, TableRef}, ANALYTIC_ENGINE_TYPE, }; -use crate::{ - influxql::{planner::MeasurementProvider, select::rewriter::Rewriter}, - provider::MetaProvider, -}; +use crate::provider::MetaProvider; pub struct MockMetaProvider { tables: Vec>, @@ -56,12 +46,6 @@ impl Default for MockMetaProvider { build_schema(), ANALYTIC_ENGINE_TYPE.to_string(), )), - Arc::new(MemoryTable::new( - "influxql_test".to_string(), - TableId::from(144), - build_influxql_test_schema(), - ANALYTIC_ENGINE_TYPE.to_string(), - )), ], } } @@ -95,68 +79,3 @@ impl MetaProvider for MockMetaProvider { todo!() } } - -impl MeasurementProvider for MockMetaProvider { - fn measurement( - &self, - measurement_name: &str, - ) -> GenericResult> { - let table_ref = TableReference::Bare { - table: std::borrow::Cow::Borrowed(measurement_name), - }; - Ok(self.table(table_ref).unwrap()) - } -} - -pub fn rewrite_statement(provider: &dyn MeasurementProvider, stmt: &mut SelectStatement) { - let rewriter = Rewriter::new(provider); - rewriter.rewrite(stmt).unwrap(); -} - -/// Returns the InfluxQL [`SelectStatement`] for the specified SQL, `s`. -pub fn parse_select(s: &str) -> SelectStatement { - let statements = parse_statements(s).unwrap(); - match statements.first() { - Some(Statement::Select(sel)) => *sel.clone(), - _ => panic!("expected SELECT statement"), - } -} - -fn build_influxql_test_schema() -> Schema { - Builder::new() - .auto_increment_column_id(true) - .add_key_column( - column_schema::Builder::new(TSID_COLUMN.to_string(), DatumKind::UInt64) - .build() - .expect("should succeed build column schema"), - ) - .unwrap() - .add_key_column( - column_schema::Builder::new("timestamp".to_string(), DatumKind::Timestamp) - .build() - .expect("should succeed build column schema"), - ) - .unwrap() - .add_normal_column( - column_schema::Builder::new("col1".to_string(), DatumKind::String) - .is_tag(true) - .build() - .expect("should succeed build column schema"), - ) - .unwrap() - .add_normal_column( - column_schema::Builder::new("col2".to_string(), DatumKind::String) - .is_tag(true) - .build() - .expect("should succeed build column schema"), - ) - .unwrap() - .add_normal_column( - column_schema::Builder::new("col3".to_string(), DatumKind::Int64) - .build() - .expect("should succeed build column schema"), - ) - .unwrap() - .build() - .expect("should succeed to build schema") -}