diff --git a/e2e_test/ddl/table.slt b/e2e_test/ddl/table.slt index 525982f2c579d..4c076b979fc25 100644 --- a/e2e_test/ddl/table.slt +++ b/e2e_test/ddl/table.slt @@ -1,261 +1 @@ -# Create a table. -statement ok -create table ddl_t (v1 int); - -statement ok -explain select v1 from ddl_t; - -# Create another table with duplicated name. -statement error -create table ddl_t (v2 int); - -# Create a table using a empty string. -statement error -create table ""(v2 int); - -statement ok -create table if not exists ddl_t (v2 int); - -# Drop the table. -statement ok -drop table ddl_t; - -# Drop it again. -statement error -drop table ddl_t; - -# Create another table with the same name. -statement ok -create table ddl_t (v2 int); - -statement ok -explain select v2 from ddl_t; - -# Create a mview on top of it. -statement ok -create materialized view ddl_mv as select v2 from ddl_t; - -statement ok -explain select v2 from ddl_t; - -statement ok -explain create sink sink_t from ddl_t with ( connector = 'kafka', type = 'append-only', force_append_only = 'true' ); - -statement ok -explain create sink sink_as as select sum(v2) as sum from ddl_t with ( connector = 'kafka', type = 'append-only', force_append_only = 'true' ); - -# Create a mview with duplicated name. -statement error -create materialized view ddl_mv as select v2 from ddl_t; - -# Drop the table before dropping the mview. -statement error -drop table ddl_t; - -# We're not allowed to drop the mview using `DROP TABLE`. -statement error -drop table ddl_mv; - -# Drop the mview. -statement ok -drop materialized view ddl_mv; - -# Drop it again. -statement error -drop materialized view ddl_mv; - -# We're not allowed to drop the table using `DROP MATERIALIZED VIEW`. -statement error -drop materialized view ddl_t; - -# Now, we can drop the base table. -statement ok -drop table ddl_t; - -# Create table concludes struct column. -statement ok -create table st (v1 int, v2 struct>); - -statement ok -drop table st - -# We test the case sensitivity of table name and column name. -statement ok -create table t1 (v1 int); - -statement ok -drop table T1; - -statement ok -create table T1 (v1 int); - -statement ok -drop table t1; - -statement ok -create table "T1" (v1 int); - -# Since we have not really bound the columns in the insert statement -# this test case cannot be enabled. -# statement error -# insert into "T1" ("V1") values (1); - -statement error -drop table t1; - -statement error -drop table T1; - -statement ok -drop table "T1"; - -statement ok -create table "T2" ("V1" int); - -# Since we have not really bound the columns in the insert statement -# this test case cannot be enabled. -# statement error -# insert into "T2" (V1) values (1); - -statement ok -insert into "T2" ("V1") values (1); - -statement ok -drop table "T2" - -statement error -create table C1 (c1 varchar(5)); - -statement error -create table t (v1 int not null); - -statement error -create table t (v1 varchar collate "en_US"); - -# Test create-table-as -statement ok -create table t as select 1; - -statement ok -drop table t; - -statement error -create table t as select 1,2; - -statement ok -create table t as select 1 as a, 2 as b; - -statement ok -drop table t; - -statement ok -create table t(v1) as select 1; - -statement ok -drop table t; - -statement ok -create table t (v1 int,v2 int); - -statement ok -insert into t values (1,1); - -statement ok -insert into t values (1,1); - -statement ok -insert into t values (1,1); - -statement ok -flush - -statement ok -create table t1 as select * from t; - -statement ok -flush; - -query I -select * from t1; ----- -1 1 -1 1 -1 1 - -statement ok -drop table t1; - -statement ok -drop table t; - -statement ok -create table t AS SELECT * FROM generate_series(0, 5,1) tbl(i); - -statement ok -flush; - -query I -select * from t order by i; ----- -0 -1 -2 -3 -4 -5 - -statement ok -drop table t; - -statement ok -create table t (v1 int); - -statement ok -insert into t values (1); - -statement ok -insert into t values (2); - -statement ok -create table n1 as select sum(v1) from t; - -statement ok -flush; - -query I -select * from n1; ----- -3 - -statement error -create table n1 (v2 int); - -statement error -create table n1 as select * from t; - -statement ok -create table if not exists n1 (v2 int); - -statement ok -drop table n1; - -statement ok -drop table t; - -statement ok -create table t (v1 int,v2 int); - -statement ok -create table t1(a,b) as select v1,v2 from t; - -statement ok -create table t2(a) as select v1,v2 from t; - -statement ok -drop table t; - -statement ok -drop table t1; - -statement ok -drop table t2; +include ./table/*.slt.part diff --git a/e2e_test/ddl/table/generated_columns.slt.part b/e2e_test/ddl/table/generated_columns.slt.part new file mode 100644 index 0000000000000..f6c0a18067838 --- /dev/null +++ b/e2e_test/ddl/table/generated_columns.slt.part @@ -0,0 +1,41 @@ +# Create a table with generated columns. +statement ok +create table t1 (v1 int as v2-1, v2 int, v3 int as v2+1); + +statement ok +insert into t1 (v2) values (1), (2); + +statement ok +flush; + +query III +select * from t1; +---- +0 1 2 +1 2 3 + +statement ok +drop table t1; + +# Create a table with generated columns. +statement ok +create table t2 (v1 int, v2 int as v1+1); + +statement ok +insert into t2 values (1), (2); + +statement ok +flush; + +query II +select * from t2; +---- +1 2 +2 3 + +statement ok +drop table t2; + +# Generated column reference another generated column +statement error +create table t2 (v1 int as v2+1, v2 int, v3 int as v1-1); diff --git a/e2e_test/ddl/table/table.slt.part b/e2e_test/ddl/table/table.slt.part new file mode 100644 index 0000000000000..525982f2c579d --- /dev/null +++ b/e2e_test/ddl/table/table.slt.part @@ -0,0 +1,261 @@ +# Create a table. +statement ok +create table ddl_t (v1 int); + +statement ok +explain select v1 from ddl_t; + +# Create another table with duplicated name. +statement error +create table ddl_t (v2 int); + +# Create a table using a empty string. +statement error +create table ""(v2 int); + +statement ok +create table if not exists ddl_t (v2 int); + +# Drop the table. +statement ok +drop table ddl_t; + +# Drop it again. +statement error +drop table ddl_t; + +# Create another table with the same name. +statement ok +create table ddl_t (v2 int); + +statement ok +explain select v2 from ddl_t; + +# Create a mview on top of it. +statement ok +create materialized view ddl_mv as select v2 from ddl_t; + +statement ok +explain select v2 from ddl_t; + +statement ok +explain create sink sink_t from ddl_t with ( connector = 'kafka', type = 'append-only', force_append_only = 'true' ); + +statement ok +explain create sink sink_as as select sum(v2) as sum from ddl_t with ( connector = 'kafka', type = 'append-only', force_append_only = 'true' ); + +# Create a mview with duplicated name. +statement error +create materialized view ddl_mv as select v2 from ddl_t; + +# Drop the table before dropping the mview. +statement error +drop table ddl_t; + +# We're not allowed to drop the mview using `DROP TABLE`. +statement error +drop table ddl_mv; + +# Drop the mview. +statement ok +drop materialized view ddl_mv; + +# Drop it again. +statement error +drop materialized view ddl_mv; + +# We're not allowed to drop the table using `DROP MATERIALIZED VIEW`. +statement error +drop materialized view ddl_t; + +# Now, we can drop the base table. +statement ok +drop table ddl_t; + +# Create table concludes struct column. +statement ok +create table st (v1 int, v2 struct>); + +statement ok +drop table st + +# We test the case sensitivity of table name and column name. +statement ok +create table t1 (v1 int); + +statement ok +drop table T1; + +statement ok +create table T1 (v1 int); + +statement ok +drop table t1; + +statement ok +create table "T1" (v1 int); + +# Since we have not really bound the columns in the insert statement +# this test case cannot be enabled. +# statement error +# insert into "T1" ("V1") values (1); + +statement error +drop table t1; + +statement error +drop table T1; + +statement ok +drop table "T1"; + +statement ok +create table "T2" ("V1" int); + +# Since we have not really bound the columns in the insert statement +# this test case cannot be enabled. +# statement error +# insert into "T2" (V1) values (1); + +statement ok +insert into "T2" ("V1") values (1); + +statement ok +drop table "T2" + +statement error +create table C1 (c1 varchar(5)); + +statement error +create table t (v1 int not null); + +statement error +create table t (v1 varchar collate "en_US"); + +# Test create-table-as +statement ok +create table t as select 1; + +statement ok +drop table t; + +statement error +create table t as select 1,2; + +statement ok +create table t as select 1 as a, 2 as b; + +statement ok +drop table t; + +statement ok +create table t(v1) as select 1; + +statement ok +drop table t; + +statement ok +create table t (v1 int,v2 int); + +statement ok +insert into t values (1,1); + +statement ok +insert into t values (1,1); + +statement ok +insert into t values (1,1); + +statement ok +flush + +statement ok +create table t1 as select * from t; + +statement ok +flush; + +query I +select * from t1; +---- +1 1 +1 1 +1 1 + +statement ok +drop table t1; + +statement ok +drop table t; + +statement ok +create table t AS SELECT * FROM generate_series(0, 5,1) tbl(i); + +statement ok +flush; + +query I +select * from t order by i; +---- +0 +1 +2 +3 +4 +5 + +statement ok +drop table t; + +statement ok +create table t (v1 int); + +statement ok +insert into t values (1); + +statement ok +insert into t values (2); + +statement ok +create table n1 as select sum(v1) from t; + +statement ok +flush; + +query I +select * from n1; +---- +3 + +statement error +create table n1 (v2 int); + +statement error +create table n1 as select * from t; + +statement ok +create table if not exists n1 (v2 int); + +statement ok +drop table n1; + +statement ok +drop table t; + +statement ok +create table t (v1 int,v2 int); + +statement ok +create table t1(a,b) as select v1,v2 from t; + +statement ok +create table t2(a) as select v1,v2 from t; + +statement ok +drop table t; + +statement ok +drop table t1; + +statement ok +drop table t2; diff --git a/proto/plan_common.proto b/proto/plan_common.proto index 43d1fcbe9a45d..98d4db458a487 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -4,6 +4,7 @@ package plan_common; import "common.proto"; import "data.proto"; +import "expr.proto"; option java_package = "com.risingwave.proto"; option optimize_for = SPEED; @@ -26,6 +27,8 @@ message ColumnDesc { // For example, when the type is created from a protobuf schema file, // this field will store the message name. string type_name = 5; + // Optional description for the generated column. + GeneratedColumnDesc generated_column = 6; } message ColumnCatalog { @@ -33,6 +36,10 @@ message ColumnCatalog { bool is_hidden = 2; } +message GeneratedColumnDesc { + expr.ExprNode expr = 1; +} + message StorageTableDesc { uint32 table_id = 1; repeated ColumnDesc columns = 2; diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index e73b6ae5ce7e8..29e0d7db7cb13 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -15,7 +15,7 @@ use std::borrow::Cow; use itertools::Itertools; -use risingwave_pb::plan_common::{PbColumnCatalog, PbColumnDesc}; +use risingwave_pb::plan_common::{GeneratedColumnDesc, PbColumnCatalog, PbColumnDesc}; use super::row_id_column_desc; use crate::catalog::{Field, ROW_ID_COLUMN_ID}; @@ -93,6 +93,7 @@ pub struct ColumnDesc { pub name: String, pub field_descs: Vec, pub type_name: String, + pub generated_column: Option, } impl ColumnDesc { @@ -103,6 +104,7 @@ impl ColumnDesc { name: String::new(), field_descs: vec![], type_name: String::new(), + generated_column: None, } } @@ -119,6 +121,7 @@ impl ColumnDesc { .map(|f| f.to_protobuf()) .collect_vec(), type_name: self.type_name.clone(), + generated_column: self.generated_column.clone(), } } @@ -161,6 +164,7 @@ impl ColumnDesc { name: name.to_string(), field_descs: vec![], type_name: "".to_string(), + generated_column: None, } } @@ -180,6 +184,7 @@ impl ColumnDesc { name: name.to_string(), field_descs: fields, type_name: type_name.to_string(), + generated_column: None, } } @@ -194,12 +199,17 @@ impl ColumnDesc { .map(Self::from_field_without_column_id) .collect_vec(), type_name: field.type_name.clone(), + generated_column: None, } } pub fn from_field_without_column_id(field: &Field) -> Self { Self::from_field_with_column_id(field, 0) } + + pub fn is_generated_column(&self) -> bool { + self.generated_column.is_some() + } } impl From for ColumnDesc { @@ -215,6 +225,7 @@ impl From for ColumnDesc { name: prost.name, type_name: prost.type_name, field_descs, + generated_column: prost.generated_column, } } } @@ -233,6 +244,7 @@ impl From<&ColumnDesc> for PbColumnDesc { name: c.name.clone(), field_descs: c.field_descs.iter().map(ColumnDesc::to_protobuf).collect(), type_name: c.type_name.clone(), + generated_column: c.generated_column.clone(), } } } @@ -249,6 +261,11 @@ impl ColumnCatalog { self.is_hidden } + /// If the column is a generated column + pub fn is_generated(&self) -> bool { + self.column_desc.generated_column.is_some() + } + /// Get a reference to the column desc's data type. pub fn data_type(&self) -> &DataType { &self.column_desc.data_type diff --git a/src/common/src/catalog/mod.rs b/src/common/src/catalog/mod.rs index 3a3266f08358a..968e6e04a8bd1 100644 --- a/src/common/src/catalog/mod.rs +++ b/src/common/src/catalog/mod.rs @@ -88,6 +88,7 @@ pub fn row_id_column_desc() -> ColumnDesc { name: row_id_column_name(), field_descs: vec![], type_name: "".to_string(), + generated_column: None, } } diff --git a/src/common/src/catalog/test_utils.rs b/src/common/src/catalog/test_utils.rs index 54251062b7908..e0df76709ad4c 100644 --- a/src/common/src/catalog/test_utils.rs +++ b/src/common/src/catalog/test_utils.rs @@ -56,6 +56,7 @@ impl ColumnDescTestExt for ColumnDesc { name: name.to_string(), type_name: type_name.to_string(), field_descs: fields, + generated_column: None, } } } diff --git a/src/compute/tests/integration_tests.rs b/src/compute/tests/integration_tests.rs index c9025a7420ca2..350ee7f38a975 100644 --- a/src/compute/tests/integration_tests.rs +++ b/src/compute/tests/integration_tests.rs @@ -158,6 +158,7 @@ async fn test_table_materialize() -> StreamResult<()> { name: field.name, field_descs: vec![], type_name: "".to_string(), + generated_column: None, }) .collect_vec(); let (barrier_tx, barrier_rx) = unbounded_channel(); diff --git a/src/connector/src/parser/avro/util.rs b/src/connector/src/parser/avro/util.rs index 024be54c51fed..09a18113224e3 100644 --- a/src/connector/src/parser/avro/util.rs +++ b/src/connector/src/parser/avro/util.rs @@ -49,6 +49,7 @@ pub(crate) fn avro_field_to_column_desc( name: name.to_owned(), field_descs: vec_column, type_name: schema_name.to_string(), + generated_column: None, }) } _ => { diff --git a/src/connector/src/parser/debezium/avro_parser.rs b/src/connector/src/parser/debezium/avro_parser.rs index 4f3526e75d42b..f545d74f027ba 100644 --- a/src/connector/src/parser/debezium/avro_parser.rs +++ b/src/connector/src/parser/debezium/avro_parser.rs @@ -382,46 +382,22 @@ mod tests { assert_eq!(columns.len(), 4); assert_eq!( - CatColumnDesc { - data_type: DataType::Int32, - column_id: 1.into(), - name: "id".to_owned(), - field_descs: Vec::new(), - type_name: "".to_owned() - }, + CatColumnDesc::new_atomic(DataType::Int32, "id", 1), columns[0] ); assert_eq!( - CatColumnDesc { - data_type: DataType::Varchar, - column_id: 2.into(), - name: "first_name".to_owned(), - field_descs: Vec::new(), - type_name: "".to_owned() - }, + CatColumnDesc::new_atomic(DataType::Varchar, "first_name", 2), columns[1] ); assert_eq!( - CatColumnDesc { - data_type: DataType::Varchar, - column_id: 3.into(), - name: "last_name".to_owned(), - field_descs: Vec::new(), - type_name: "".to_owned() - }, + CatColumnDesc::new_atomic(DataType::Varchar, "last_name", 3), columns[2] ); assert_eq!( - CatColumnDesc { - data_type: DataType::Varchar, - column_id: 4.into(), - name: "email".to_owned(), - field_descs: Vec::new(), - type_name: "".to_owned() - }, + CatColumnDesc::new_atomic(DataType::Varchar, "email", 4), columns[3] ); } diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index 62d79ed3f2a62..d39d9f8809775 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -154,6 +154,7 @@ impl ProtobufParserConfig { column_type: Some(field_type.to_protobuf()), field_descs, type_name: m.full_name().to_string(), + generated_column: None, }) } else { *index += 1; diff --git a/src/connector/src/source/manager.rs b/src/connector/src/source/manager.rs index 948f046a98367..ab9cdc2ef9b15 100644 --- a/src/connector/src/source/manager.rs +++ b/src/connector/src/source/manager.rs @@ -82,6 +82,7 @@ impl From<&SourceColumnDesc> for ColumnDesc { name: s.name.clone(), field_descs: s.fields.clone(), type_name: "".to_string(), + generated_column: None, } } } diff --git a/src/frontend/planner_test/tests/testdata/generated_columns.yaml b/src/frontend/planner_test/tests/testdata/generated_columns.yaml new file mode 100644 index 0000000000000..48785276e175e --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/generated_columns.yaml @@ -0,0 +1,11 @@ +# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information. +- name: table with generated columns + sql: | + explain create table t1 (v1 int as v2-1, v2 int, v3 int as v2+1); + explain_output: | + StreamMaterialize { columns: [v1, v2, v3, _row_id(hidden)], pk_columns: [_row_id], pk_conflict: "overwrite" } + └─StreamExchange { dist: HashShard(_row_id) } + └─StreamRowIdGen { row_id_index: 3 } + └─StreamProject { exprs: [(v2 - 1:Int32) as $expr1, v2, (v2 + 1:Int32) as $expr2, _row_id] } + └─StreamDml { columns: [v2, _row_id] } + └─StreamSource diff --git a/src/frontend/src/binder/delete.rs b/src/frontend/src/binder/delete.rs index 2de1a7e0e182d..62a268e94bab5 100644 --- a/src/frontend/src/binder/delete.rs +++ b/src/frontend/src/binder/delete.rs @@ -13,7 +13,7 @@ // limitations under the License. use risingwave_common::catalog::{Schema, TableVersionId}; -use risingwave_common::error::Result; +use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_sqlparser::ast::{Expr, ObjectName, SelectItem}; use super::statement::RewriteExprsRecursive; @@ -77,6 +77,13 @@ impl Binder { let owner = table_catalog.owner; let table_version_id = table_catalog.version_id().expect("table must be versioned"); + // TODO(yuhao): delete from table with generated columns + if table_catalog.has_generated_column() { + return Err(RwError::from(ErrorCode::BindError( + "Delete from a table with generated column has not been implemented.".to_string(), + ))); + } + let table = self.bind_table(schema_name, &table_name, None)?; let (returning_list, fields) = self.bind_returning_list(returning_items)?; let returning = !returning_list.is_empty(); diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 0c0af4830f052..28aa3e3bc6229 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -303,7 +303,7 @@ impl Binder { fn now() -> Handle { Box::new(move |binder, mut inputs| { binder.ensure_now_function_allowed()?; - if !binder.in_create_mv { + if !binder.in_streaming { inputs.push(ExprImpl::from(Literal::new( Some(ScalarImpl::Int64((binder.bind_timestamp_ms * 1000) as i64)), DataType::Timestamptz, @@ -641,7 +641,7 @@ impl Binder { } fn ensure_now_function_allowed(&self) -> Result<()> { - if self.in_create_mv + if self.in_streaming && !matches!( self.context.clause, Some(Clause::Where) | Some(Clause::Having) diff --git a/src/frontend/src/binder/expr/mod.rs b/src/frontend/src/binder/expr/mod.rs index 9a05d80c6da06..867eedcfd4848 100644 --- a/src/frontend/src/binder/expr/mod.rs +++ b/src/frontend/src/binder/expr/mod.rs @@ -452,6 +452,7 @@ pub fn bind_struct_field(column_def: &StructField) -> Result { name: f.name.real_value(), field_descs: vec![], type_name: "".to_string(), + generated_column: None, }) }) .collect::>>()? @@ -464,6 +465,7 @@ pub fn bind_struct_field(column_def: &StructField) -> Result { name: column_def.name.real_value(), field_descs, type_name: "".to_string(), + generated_column: None, }) } diff --git a/src/frontend/src/binder/insert.rs b/src/frontend/src/binder/insert.rs index 46b6c969cb058..b832cd2c97cf1 100644 --- a/src/frontend/src/binder/insert.rs +++ b/src/frontend/src/binder/insert.rs @@ -100,19 +100,41 @@ impl Binder { let table_id = table_catalog.id; let owner = table_catalog.owner; let table_version_id = table_catalog.version_id().expect("table must be versioned"); - let columns_to_insert = table_catalog - .columns - .clone() - .into_iter() - .filter(|c| !c.is_hidden()) - .collect_vec(); - let row_id_index = table_catalog.row_id_index; + let columns_to_insert = table_catalog.columns_to_insert().cloned().collect_vec(); let expected_types: Vec = columns_to_insert .iter() .map(|c| c.data_type().clone()) .collect(); + let generated_column_names: HashSet<_> = table_catalog.generated_column_names().collect(); + for query_col in &columns { + let query_col_name = query_col.real_value(); + if generated_column_names.contains(query_col_name.as_str()) { + return Err(RwError::from(ErrorCode::BindError(format!( + "cannot insert a non-DEFAULT value into column \"{0}\". Column \"{0}\" is a generated column.", + &query_col_name + )))); + } + } + + // TODO(yuhao): refine this if row_id is always the last column. + // + // `row_id_index` in bin insert operation should rule out generated column + let row_id_index = { + if let Some(row_id_index) = table_catalog.row_id_index { + let mut cnt = 0; + for col in table_catalog.columns().iter().take(row_id_index + 1) { + if col.is_generated() { + cnt += 1; + } + } + Some(row_id_index - cnt) + } else { + None + } + }; + // When the column types of `source` query do not match `expected_types`, casting is // needed. // diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs index c758440bcc5b7..275ece08b67db 100644 --- a/src/frontend/src/binder/mod.rs +++ b/src/frontend/src/binder/mod.rs @@ -89,8 +89,8 @@ pub struct Binder { next_share_id: ShareId, search_path: SearchPath, - /// Whether the Binder is binding an MV. - in_create_mv: bool, + /// Whether the Binder is binding an MV/SINK. + in_streaming: bool, /// `ShareId`s identifying shared views. shared_views: HashMap, @@ -181,7 +181,7 @@ impl ParameterTypes { } impl Binder { - fn new_inner(session: &SessionImpl, in_create_mv: bool, param_types: Vec) -> Binder { + fn new_inner(session: &SessionImpl, in_streaming: bool, param_types: Vec) -> Binder { let now_ms = session .env() .hummock_snapshot_manager() @@ -200,7 +200,7 @@ impl Binder { next_values_id: 0, next_share_id: 0, search_path: session.config().get_search_path(), - in_create_mv, + in_streaming, shared_views: HashMap::new(), param_types: ParameterTypes::new(param_types), } diff --git a/src/frontend/src/catalog/system_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/mod.rs index 8bdd8c5fbebe2..b2ef50bf96585 100644 --- a/src/frontend/src/catalog/system_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/mod.rs @@ -131,6 +131,7 @@ macro_rules! def_sys_catalog { name: col.1.to_string(), field_descs: vec![], type_name: "".to_string(), + generated_column: None, }, is_hidden: false, }) diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 36a12611ce95e..9fc4073c48d67 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -370,6 +370,24 @@ impl TableCatalog { handle_pk_conflict_behavior: self.conflict_behavior_type, } } + + /// Get columns excluding hidden columns and generated golumns. + pub fn columns_to_insert(&self) -> impl Iterator { + self.columns + .iter() + .filter(|c| !c.is_hidden() && !c.is_generated()) + } + + pub fn generated_column_names(&self) -> impl Iterator { + self.columns + .iter() + .filter(|c| c.is_generated()) + .map(|c| c.name()) + } + + pub fn has_generated_column(&self) -> bool { + self.columns.iter().any(|c| c.is_generated()) + } } impl From for TableCatalog { @@ -533,22 +551,11 @@ mod tests { column_id: ColumnId::new(1), name: "country".to_string(), field_descs: vec![ - ColumnDesc { - data_type: DataType::Varchar, - column_id: ColumnId::new(2), - name: "address".to_string(), - field_descs: vec![], - type_name: String::new(), - }, - ColumnDesc { - data_type: DataType::Varchar, - column_id: ColumnId::new(3), - name: "zipcode".to_string(), - field_descs: vec![], - type_name: String::new(), - } + ColumnDesc::new_atomic(DataType::Varchar, "address", 2), + ColumnDesc::new_atomic(DataType::Varchar, "zipcode", 3), ], - type_name: ".test.Country".to_string() + type_name: ".test.Country".to_string(), + generated_column: None, }, is_hidden: false } diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index bd61c83bbe83e..6e3168d983608 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -15,7 +15,7 @@ use anyhow::Context; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_pb::catalog::Table; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; @@ -70,6 +70,13 @@ pub async fn handle_alter_table_column( table.clone() }; + // TODO(yuhao): alter table with generated columns. + if original_catalog.has_generated_column() { + return Err(RwError::from(ErrorCode::BindError( + "Alter a table with generated column has not been implemented.".to_string(), + ))); + } + // Retrieve the original table definition and parse it to AST. let [mut definition]: [_; 1] = Parser::parse_sql(&original_catalog.definition) .context("unable to parse original table definition")? diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index f8928d290b3a9..afc014ad3865a 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -79,7 +79,7 @@ pub fn gen_sink_plan( let definition = context.normalized_sql().to_owned(); let bound = { - let mut binder = Binder::new(session); + let mut binder = Binder::new_for_stream(session); binder.bind_query(*query)? }; diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 3c31d3a52a84d..3c0f316f86b84 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -41,12 +41,14 @@ use risingwave_sqlparser::ast::{ SourceWatermark, }; -use super::create_table::bind_sql_table_constraints; +use super::create_table::bind_sql_table_column_constraints; use super::RwPgResponse; use crate::binder::Binder; use crate::catalog::ColumnId; use crate::expr::Expr; -use crate::handler::create_table::{bind_sql_columns, ColumnIdGenerator}; +use crate::handler::create_table::{ + bind_sql_column_constraints, bind_sql_columns, ColumnIdGenerator, +}; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::KAFKA_TIMESTAMP_COLUMN_NAME; use crate::session::SessionImpl; @@ -456,6 +458,7 @@ fn check_and_add_timestamp_column( name: KAFKA_TIMESTAMP_COLUMN_NAME.to_string(), field_descs: vec![], type_name: "".to_string(), + generated_column: None, }; column_descs.push(kafka_timestamp_column); } @@ -594,16 +597,13 @@ pub async fn handle_create_source( let mut col_id_gen = ColumnIdGenerator::new_initial(); - let (mut column_descs, pk_column_id_from_columns) = - bind_sql_columns(stmt.columns, &mut col_id_gen)?; + let mut column_descs = bind_sql_columns(stmt.columns.clone(), &mut col_id_gen)?; check_and_add_timestamp_column(&with_properties, &mut column_descs, &mut col_id_gen); - let (mut columns, mut pk_column_ids, mut row_id_index) = bind_sql_table_constraints( - column_descs.clone(), - pk_column_id_from_columns, - stmt.constraints, - )?; + let (mut columns, mut pk_column_ids, mut row_id_index) = + bind_sql_table_column_constraints(column_descs, stmt.columns.clone(), stmt.constraints)?; + if row_id_index.is_none() { return Err(ErrorCode::InvalidInputSyntax( "Source does not support PRIMARY KEY constraint, please use \"CREATE TABLE\" instead" @@ -629,6 +629,15 @@ pub async fn handle_create_source( // TODO(yuhao): allow multiple watermark on source. assert!(watermark_descs.len() <= 1); + bind_sql_column_constraints(&session, name.clone(), &mut columns, stmt.columns)?; + + if columns.iter().any(|c| c.is_generated()) { + // TODO(yuhao): allow generated columns on source + return Err(RwError::from(ErrorCode::BindError( + "Generated columns on source has not been implemented.".to_string(), + ))); + } + let row_id_index = row_id_index.map(|index| index as _); let pk_column_ids = pk_column_ids.into_iter().map(Into::into).collect(); diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 465ceb44726a6..d6d8b22a817a6 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -24,6 +24,7 @@ use risingwave_common::catalog::{ }; use risingwave_common::error::{ErrorCode, Result}; use risingwave_pb::catalog::{PbSource, PbTable, StreamSourceInfo, WatermarkDesc}; +use risingwave_pb::plan_common::GeneratedColumnDesc; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_sqlparser::ast::{ ColumnDef, ColumnOption, DataType as AstDataType, ObjectName, SourceSchema, SourceWatermark, @@ -35,11 +36,13 @@ use super::RwPgResponse; use crate::binder::{bind_data_type, bind_struct_field}; use crate::catalog::table_catalog::TableVersion; use crate::catalog::{check_valid_column_name, ColumnId}; +use crate::expr::{Expr, ExprImpl}; use crate::handler::create_source::{bind_source_watermark, UPSTREAM_SOURCE_KEY}; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::LogicalSource; use crate::optimizer::property::{Order, RequiredDist}; use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot}; +use crate::session::SessionImpl; use crate::stream_fragmenter::build_graph; use crate::{Binder, TableCatalog, WithOptions}; @@ -117,9 +120,7 @@ impl ColumnIdGenerator { pub fn bind_sql_columns( columns: Vec, col_id_gen: &mut ColumnIdGenerator, -) -> Result<(Vec, Option)> { - // In `ColumnDef`, pk can contain only one column. So we use `Option` rather than `Vec`. - let mut pk_column_id = None; +) -> Result> { let mut column_descs = Vec::with_capacity(columns.len()); for column in columns { @@ -131,8 +132,9 @@ pub fn bind_sql_columns( name, data_type, collation, - options, + .. } = column; + let data_type = data_type.ok_or(ErrorCode::InvalidInputSyntax( "data type is not specified".into(), ))?; @@ -143,27 +145,9 @@ pub fn bind_sql_columns( ) .into()); } - for option_def in options { - match option_def.option { - ColumnOption::Unique { is_primary: true } => { - if pk_column_id.is_some() { - return Err(ErrorCode::BindError( - "multiple primary keys are not allowed".into(), - ) - .into()); - } - pk_column_id = Some(column_id); - } - _ => { - return Err(ErrorCode::NotImplemented( - format!("column constraints \"{}\"", option_def), - None.into(), - ) - .into()) - } - } - } + check_valid_column_name(&name.real_value())?; + let field_descs = if let AstDataType::Struct(fields) = &data_type { fields .iter() @@ -178,22 +162,137 @@ pub fn bind_sql_columns( name: name.real_value(), field_descs, type_name: "".to_string(), + generated_column: None, }); } - Ok((column_descs, pk_column_id)) + Ok(column_descs) +} + +fn check_generated_column_constraints( + column_name: &String, + expr: &ExprImpl, + column_catalogs: &[ColumnCatalog], + generated_column_names: &[String], +) -> Result<()> { + let input_refs = expr.collect_input_refs(column_catalogs.len()); + for idx in input_refs.ones() { + let referred_generated_column = &column_catalogs[idx].column_desc.name; + if generated_column_names + .iter() + .any(|c| c == referred_generated_column) + { + return Err(ErrorCode::BindError( + format!("Generated can not reference another generated column, but here generated column \"{}\" referenced another generated column \"{}\"", column_name, referred_generated_column), + ) + .into()); + } + } + Ok(()) } -/// Binds table constraints given the binding results from column definitions. +/// Binds constraints that can be only specified in column definitions. +pub fn bind_sql_column_constraints( + session: &SessionImpl, + table_name: String, + column_catalogs: &mut [ColumnCatalog], + columns: Vec, +) -> Result<()> { + let generated_column_names = { + let mut names = vec![]; + for column in &columns { + for option_def in &column.options { + if let ColumnOption::GeneratedColumns(_) = option_def.option { + names.push(column.name.real_value()); + break; + } + } + } + names + }; + + let mut binder = Binder::new_for_stream(session); + binder.bind_columns_to_context(table_name.clone(), column_catalogs.to_vec())?; + for column in columns { + for option_def in column.options { + match option_def.option { + ColumnOption::GeneratedColumns(expr) => { + let idx = binder + .get_column_binding_index(table_name.clone(), &column.name.real_value())?; + let expr_impl = binder.bind_expr(expr)?; + + check_generated_column_constraints( + &column.name.real_value(), + &expr_impl, + column_catalogs, + &generated_column_names, + )?; + + column_catalogs[idx].column_desc.generated_column = Some(GeneratedColumnDesc { + expr: Some(expr_impl.to_expr_proto()), + }); + } + ColumnOption::Unique { is_primary: true } => { + // Bind primary key in `bind_sql_table_column_constraints` + } + _ => { + return Err(ErrorCode::NotImplemented( + format!("column constraints \"{}\"", option_def), + None.into(), + ) + .into()) + } + } + } + } + Ok(()) +} + +/// Binds constraints that can be specified in both column definitions and table definition. +/// /// It returns the columns together with `pk_column_ids`, and an optional row id column index if /// added. -pub fn bind_sql_table_constraints( - column_descs: Vec, - pk_column_id_from_columns: Option, - constraints: Vec, +pub fn bind_sql_table_column_constraints( + columns_descs: Vec, + columns_defs: Vec, + table_constraints: Vec, ) -> Result<(Vec, Vec, Option)> { let mut pk_column_names = vec![]; - for constraint in constraints { + // Mapping from column name to column id. + let name_to_id = columns_descs + .iter() + .map(|c| (c.name.as_str(), c.column_id)) + .collect::>(); + + // Bind column constraints + for column in columns_defs { + for option_def in column.options { + match option_def.option { + ColumnOption::Unique { is_primary: true } => { + if !pk_column_names.is_empty() { + return Err(ErrorCode::BindError( + "multiple primary keys are not allowed".into(), + ) + .into()); + } + pk_column_names.push(column.name.real_value()); + } + ColumnOption::GeneratedColumns(_) => { + // Bind generated columns in `bind_sql_column_constraints` + } + _ => { + return Err(ErrorCode::NotImplemented( + format!("column constraints \"{}\"", option_def), + None.into(), + ) + .into()) + } + } + } + } + + // Bind table constraints. + for constraint in table_constraints { match constraint { TableConstraint::Unique { name: _, @@ -206,7 +305,7 @@ pub fn bind_sql_table_constraints( ) .into()); } - pk_column_names = columns; + pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec(); } _ => { return Err(ErrorCode::NotImplemented( @@ -217,36 +316,17 @@ pub fn bind_sql_table_constraints( } } } - let mut pk_column_ids = match (pk_column_id_from_columns, pk_column_names.is_empty()) { - (Some(_), false) => { - return Err(ErrorCode::BindError("multiple primary keys are not allowed".into()).into()) - } - (None, true) => { - // We don't have a pk column now, so we need to add row_id column as the pk column - // later. - vec![] - } - (Some(cid), true) => vec![cid], - (None, false) => { - let name_to_id = column_descs - .iter() - .map(|c| (c.name.as_str(), c.column_id)) - .collect::>(); - pk_column_names - .iter() - .map(|ident| { - let name = ident.real_value(); - name_to_id.get(name.as_str()).copied().ok_or_else(|| { - ErrorCode::BindError(format!( - "column \"{name}\" named in key does not exist" - )) - }) - }) - .try_collect()? - } - }; - let mut columns_catalog = column_descs + let mut pk_column_ids: Vec<_> = pk_column_names + .iter() + .map(|name| { + name_to_id.get(name.as_str()).copied().ok_or_else(|| { + ErrorCode::BindError(format!("column \"{name}\" named in key does not exist")) + }) + }) + .try_collect()?; + + let mut columns_catalog = columns_descs .into_iter() .map(|c| { // All columns except `_row_id` or starts with `_rw` should be visible. @@ -282,21 +362,22 @@ pub fn bind_sql_table_constraints( pub(crate) async fn gen_create_table_plan_with_source( context: OptimizerContext, table_name: ObjectName, - columns: Vec, + column_defs: Vec, constraints: Vec, source_schema: SourceSchema, source_watermarks: Vec, mut col_id_gen: ColumnIdGenerator, append_only: bool, ) -> Result<(PlanRef, Option, PbTable)> { - let (column_descs, pk_column_id_from_columns) = bind_sql_columns(columns, &mut col_id_gen)?; + let session = context.session_ctx(); + let column_descs = bind_sql_columns(column_defs.clone(), &mut col_id_gen)?; let mut properties = context.with_options().inner().clone().into_iter().collect(); let (mut columns, mut pk_column_ids, mut row_id_index) = - bind_sql_table_constraints(column_descs, pk_column_id_from_columns, constraints)?; + bind_sql_table_column_constraints(column_descs, column_defs.clone(), constraints)?; let watermark_descs = bind_source_watermark( - context.session_ctx(), + session, table_name.real_value(), source_watermarks, &columns, @@ -316,6 +397,8 @@ pub(crate) async fn gen_create_table_plan_with_source( ) .await?; + bind_sql_column_constraints(session, table_name.real_value(), &mut columns, column_defs)?; + gen_table_plan_inner( context.into(), table_name, @@ -343,14 +426,14 @@ pub(crate) fn gen_create_table_plan( append_only: bool, ) -> Result<(PlanRef, Option, PbTable)> { let definition = context.normalized_sql().to_owned(); - let (column_descs, pk_column_id_from_columns) = bind_sql_columns(columns, &mut col_id_gen)?; + let column_descs = bind_sql_columns(columns.clone(), &mut col_id_gen)?; let properties = context.with_options().inner().clone().into_iter().collect(); gen_create_table_plan_without_bind( context, table_name, column_descs, - pk_column_id_from_columns, + columns, constraints, properties, definition, @@ -365,7 +448,7 @@ pub(crate) fn gen_create_table_plan_without_bind( context: OptimizerContext, table_name: ObjectName, column_descs: Vec, - pk_column_id_from_columns: Option, + column_defs: Vec, constraints: Vec, properties: HashMap, definition: String, @@ -373,8 +456,8 @@ pub(crate) fn gen_create_table_plan_without_bind( append_only: bool, version: Option, ) -> Result<(PlanRef, Option, PbTable)> { - let (columns, pk_column_ids, row_id_index) = - bind_sql_table_constraints(column_descs, pk_column_id_from_columns, constraints)?; + let (mut columns, pk_column_ids, row_id_index) = + bind_sql_table_column_constraints(column_descs, column_defs.clone(), constraints)?; let watermark_descs = bind_source_watermark( context.session_ctx(), @@ -383,6 +466,13 @@ pub(crate) fn gen_create_table_plan_without_bind( &columns, )?; + bind_sql_column_constraints( + context.session_ctx(), + table_name.real_value(), + &mut columns, + column_defs, + )?; + gen_table_plan_inner( context.into(), table_name, @@ -450,21 +540,13 @@ fn gen_table_plan_inner( ) .into(); - let mut required_cols = FixedBitSet::with_capacity(source_node.schema().len()); - required_cols.toggle_range(..); - let mut out_names = source_node.schema().names(); - - if let Some(row_id_index) = row_id_index { - required_cols.toggle(row_id_index); - out_names.remove(row_id_index); - } - + let required_cols = FixedBitSet::with_capacity(source_node.schema().len()); let mut plan_root = PlanRoot::new( source_node, RequiredDist::Any, Order::any(), required_cols, - out_names, + vec![], ); if append_only && row_id_index.is_none() { @@ -716,13 +798,10 @@ mod tests { .. } = ast.remove(0) else { panic!("test case should be create table") }; let actual: Result<_> = (|| { - let (column_descs, pk_column_id_from_columns) = - bind_sql_columns(columns, &mut ColumnIdGenerator::new_initial())?; - let (_, pk_column_ids, _) = bind_sql_table_constraints( - column_descs, - pk_column_id_from_columns, - constraints, - )?; + let column_descs = + bind_sql_columns(columns.clone(), &mut ColumnIdGenerator::new_initial())?; + let (_, pk_column_ids, _) = + bind_sql_table_column_constraints(column_descs, columns, constraints)?; Ok(pk_column_ids) })(); match (expected, actual) { diff --git a/src/frontend/src/handler/create_table_as.rs b/src/frontend/src/handler/create_table_as.rs index 4db3dbd5cfa60..488dffe8729c7 100644 --- a/src/frontend/src/handler/create_table_as.rs +++ b/src/frontend/src/handler/create_table_as.rs @@ -97,7 +97,7 @@ pub async fn handle_create_as( context, table_name.clone(), column_descs, - None, + vec![], vec![], properties, "".to_owned(), // TODO: support `SHOW CREATE TABLE` for `CREATE TABLE AS` diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index e9999c4bfb8e8..e44bc4de6a345 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -41,8 +41,8 @@ use risingwave_pb::catalog::WatermarkDesc; use self::heuristic_optimizer::ApplyOrder; use self::plan_node::{ - BatchProject, Convention, LogicalProject, StreamDml, StreamMaterialize, StreamProject, - StreamRowIdGen, StreamSink, StreamWatermarkFilter, + BatchProject, Convention, LogicalProject, LogicalSource, StreamDml, StreamMaterialize, + StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter, }; use self::plan_visitor::has_batch_exchange; #[cfg(debug_assertions)] @@ -401,10 +401,22 @@ impl PlanRoot { stream_plan = StreamDml::new( stream_plan, append_only, - columns.iter().map(|c| c.column_desc.clone()).collect(), + columns + .iter() + .filter_map(|c| (!c.is_generated()).then(|| c.column_desc.clone())) + .collect(), ) .into(); + // Add generated columns. + let exprs = LogicalSource::gen_optional_generated_column_project_exprs( + columns.iter().map(|c| c.column_desc.clone()).collect(), + )?; + if let Some(exprs) = exprs { + let logical_project = LogicalProject::new(stream_plan, exprs); + stream_plan = StreamProject::new(logical_project).into(); + } + // Add WatermarkFilter node. if !watermark_descs.is_empty() { stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into(); diff --git a/src/frontend/src/optimizer/plan_node/generic/agg.rs b/src/frontend/src/optimizer/plan_node/generic/agg.rs index 20b3bac69d49a..ba23311a78623 100644 --- a/src/frontend/src/optimizer/plan_node/generic/agg.rs +++ b/src/frontend/src/optimizer/plan_node/generic/agg.rs @@ -572,7 +572,7 @@ impl PlanAggCall { }); // modify filter - let mut rewriter = IndexRewriter { mapping }; + let mut rewriter = IndexRewriter::new(mapping); self.filter.conjunctions.iter_mut().for_each(|x| { *x = rewriter.rewrite_expr(x.clone()); }); diff --git a/src/frontend/src/optimizer/plan_node/generic/source.rs b/src/frontend/src/optimizer/plan_node/generic/source.rs index b1beb588e8337..8a650007f3458 100644 --- a/src/frontend/src/optimizer/plan_node/generic/source.rs +++ b/src/frontend/src/optimizer/plan_node/generic/source.rs @@ -49,15 +49,20 @@ pub struct Source { impl GenericPlanNode for Source { fn schema(&self) -> Schema { - let fields = self.column_descs.iter().map(Into::into).collect(); + let fields = self.non_generated_columns().map(Into::into).collect(); + // let fields = self.column_descs.iter().map(Into::into).collect(); Schema { fields } } fn logical_pk(&self) -> Option> { let mut id_to_idx = HashMap::new(); - self.column_descs.iter().enumerate().for_each(|(idx, c)| { - id_to_idx.insert(c.column_id, idx); - }); + // self.column_descs.iter().filter(|c| !c.is_generated_column()).enumerate().for_each(|(idx, + // c)| { + self.non_generated_columns() + .enumerate() + .for_each(|(idx, c)| { + id_to_idx.insert(c.column_id, idx); + }); self.pk_col_ids .iter() .map(|c| id_to_idx.get(c).copied()) @@ -70,11 +75,12 @@ impl GenericPlanNode for Source { fn functional_dependency(&self) -> FunctionalDependencySet { let pk_indices = self.logical_pk(); + let non_generated_columns_count = self.non_generated_columns().count(); match pk_indices { Some(pk_indices) => { - FunctionalDependencySet::with_key(self.column_descs.len(), &pk_indices) + FunctionalDependencySet::with_key(non_generated_columns_count, &pk_indices) } - None => FunctionalDependencySet::new(self.column_descs.len()), + None => FunctionalDependencySet::new(non_generated_columns_count), } } } @@ -108,4 +114,11 @@ impl Source { builder.build(vec![], 1) } + + /// Non-generated columns + fn non_generated_columns(&self) -> impl Iterator { + self.column_descs + .iter() + .filter(|c| !c.is_generated_column()) + } } diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 5300db3ca2610..f582bc9eb2bfc 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -18,9 +18,11 @@ use std::ops::Bound; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::rc::Rc; +use risingwave_common::bail; use risingwave_common::catalog::{ColumnDesc, Schema}; use risingwave_common::error::Result; use risingwave_connector::source::DataType; +use risingwave_pb::plan_common::GeneratedColumnDesc; use super::stream_watermark_filter::StreamWatermarkFilter; use super::{ @@ -29,12 +31,12 @@ use super::{ }; use crate::catalog::source_catalog::SourceCatalog; use crate::catalog::ColumnId; -use crate::expr::{Expr, ExprImpl, ExprType}; +use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprType, InputRef}; use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::plan_node::{ ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, }; -use crate::utils::{ColIndexMapping, Condition}; +use crate::utils::{ColIndexMapping, Condition, IndexRewriter}; use crate::TableCatalog; /// For kafka source, we attach a hidden column [`KAFKA_TIMESTAMP_COLUMN_NAME`] to it, so that we @@ -83,6 +85,79 @@ impl LogicalSource { } } + pub fn gen_optional_generated_column_project_exprs( + column_descs: Vec, + ) -> Result>> { + if !column_descs.iter().any(|c| c.generated_column.is_some()) { + return Ok(None); + } + + let col_mapping = { + let mut mapping = vec![None; column_descs.len()]; + let mut cur = 0; + for (idx, column_desc) in column_descs.iter().enumerate() { + if column_desc.generated_column.is_none() { + mapping[idx] = Some(cur); + cur += 1; + } else { + mapping[idx] = None; + } + } + ColIndexMapping::new(mapping) + }; + + let mut rewriter = IndexRewriter::new(col_mapping); + let mut exprs = Vec::with_capacity(column_descs.len()); + let mut cur = 0; + for column_desc in column_descs { + let ret_data_type = column_desc.data_type.clone(); + if let Some(generated_column) = column_desc.generated_column { + let GeneratedColumnDesc { expr } = generated_column; + // TODO(yuhao): avoid this `from_expr_proto`. + let proj_expr = rewriter.rewrite_expr(ExprImpl::from_expr_proto(&expr.unwrap())?); + if proj_expr.return_type() != column_desc.data_type { + bail!("Expression return type should match the type specified for the column"); + } + exprs.push(proj_expr); + } else { + let input_ref = InputRef { + data_type: ret_data_type, + index: cur, + }; + cur += 1; + exprs.push(ExprImpl::InputRef(Box::new(input_ref))); + } + } + + Ok(Some(exprs)) + } + + pub fn create( + source_catalog: Option>, + column_descs: Vec, + pk_col_ids: Vec, + row_id_index: Option, + gen_row_id: bool, + for_table: bool, + ctx: OptimizerContextRef, + ) -> Result { + let source = Self::new( + source_catalog, + column_descs.clone(), + pk_col_ids, + row_id_index, + gen_row_id, + for_table, + ctx, + ); + let exprs = Self::gen_optional_generated_column_project_exprs(column_descs)?; + if let Some(exprs) = exprs { + Ok(LogicalProject::new(source.into(), exprs).into()) + } else { + Ok(source.into()) + } + } + pub(super) fn column_names(&self) -> Vec { self.schema() .fields() diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 2a6b56a80159d..132fda401ddd8 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -86,7 +86,7 @@ impl Planner { let pk_col_ids = source.catalog.pk_col_ids.clone(); let row_id_index = source.catalog.row_id_index; let gen_row_id = source.catalog.append_only; - Ok(LogicalSource::new( + LogicalSource::create( Some(Rc::new(source.catalog)), column_descs, pk_col_ids, @@ -95,7 +95,6 @@ impl Planner { false, self.ctx(), ) - .into()) } pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result { diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index 4a1f4aedb5203..62e1476fcfeb8 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -519,27 +519,9 @@ pub(crate) mod tests { stream_key: vec![], pk: vec![], columns: vec![ - ColumnDesc { - data_type: DataType::Int32, - column_id: 0.into(), - name: "a".to_string(), - type_name: String::new(), - field_descs: vec![], - }, - ColumnDesc { - data_type: DataType::Float64, - column_id: 1.into(), - name: "b".to_string(), - type_name: String::new(), - field_descs: vec![], - }, - ColumnDesc { - data_type: DataType::Int64, - column_id: 2.into(), - name: "_row_id".to_string(), - type_name: String::new(), - field_descs: vec![], - }, + ColumnDesc::new_atomic(DataType::Int32, "a", 0), + ColumnDesc::new_atomic(DataType::Float64, "b", 1), + ColumnDesc::new_atomic(DataType::Int64, "c", 2), ], distribution_key: vec![], append_only: false, diff --git a/src/frontend/src/utils/rewrite_index.rs b/src/frontend/src/utils/rewrite_index.rs index 2107cb55d5219..74b4bf3986f73 100644 --- a/src/frontend/src/utils/rewrite_index.rs +++ b/src/frontend/src/utils/rewrite_index.rs @@ -16,7 +16,13 @@ use super::ColIndexMapping; use crate::expr::{ExprImpl, ExprRewriter, InputRef}; pub struct IndexRewriter { - pub mapping: ColIndexMapping, + mapping: ColIndexMapping, +} + +impl IndexRewriter { + pub fn new(mapping: ColIndexMapping) -> Self { + Self { mapping } + } } impl ExprRewriter for IndexRewriter { diff --git a/src/prost/build.rs b/src/prost/build.rs index 8e832846eee41..05f1d2c4e70f6 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -65,6 +65,7 @@ fn main() -> Result<(), Box> { .type_attribute("expr.FunctionCall", "#[derive(Eq, Hash)]") .type_attribute("expr.UserDefinedFunction", "#[derive(Eq, Hash)]") .type_attribute("catalog.StreamSourceInfo", "#[derive(Eq, Hash)]") + .type_attribute("plan_common.GeneratedColumnDesc", "#[derive(Eq, Hash)]") .out_dir(out_dir.as_path()) .compile(&protos, &[proto_dir.to_string()]) .expect("Failed to compile grpc!"); diff --git a/src/source/src/source_desc.rs b/src/source/src/source_desc.rs index ff8e8d1676a28..2bcc5bfec1ab8 100644 --- a/src/source/src/source_desc.rs +++ b/src/source/src/source_desc.rs @@ -212,6 +212,7 @@ pub mod test_utils { name: f.name.clone(), field_descs: vec![], type_name: "".to_string(), + generated_column: None, } .to_protobuf(), ), diff --git a/src/source/src/table.rs b/src/source/src/table.rs index 1e1b9e16f5d70..778b33846d525 100644 --- a/src/source/src/table.rs +++ b/src/source/src/table.rs @@ -100,7 +100,9 @@ impl TableDmlHandle { #[cfg(debug_assertions)] risingwave_common::util::schema_check::schema_check( - self.column_descs.iter().map(|c| &c.data_type), + self.column_descs + .iter() + .filter_map(|c| (!c.is_generated_column()).then_some(&c.data_type)), chunk.columns(), ) .expect("table source write chunk schema check failed"); diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index 21f24bb67e4cb..4a6dfb6ba9bbe 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -371,6 +371,8 @@ pub enum ColumnOption { /// - MySQL's `AUTO_INCREMENT` or SQLite's `AUTOINCREMENT` /// - ... DialectSpecific(Vec), + /// AS ( )` + GeneratedColumns(Expr), } impl fmt::Display for ColumnOption { @@ -403,6 +405,7 @@ impl fmt::Display for ColumnOption { } Check(expr) => write!(f, "CHECK ({})", expr), DialectSpecific(val) => write!(f, "{}", display_separated(val, " ")), + GeneratedColumns(expr) => write!(f, "AS {}", expr), } } } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index bfb4a1dc456fa..9da52fc43f497 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -2258,6 +2258,8 @@ impl Parser { let expr = self.parse_expr()?; self.expect_token(&Token::RParen)?; Ok(Some(ColumnOption::Check(expr))) + } else if self.parse_keyword(Keyword::AS) { + Ok(Some(ColumnOption::GeneratedColumns(self.parse_expr()?))) } else { Ok(None) } diff --git a/src/storage/hummock_sdk/src/filter_key_extractor.rs b/src/storage/hummock_sdk/src/filter_key_extractor.rs index dd9537f40b8ea..e3e81e681beaa 100644 --- a/src/storage/hummock_sdk/src/filter_key_extractor.rs +++ b/src/storage/hummock_sdk/src/filter_key_extractor.rs @@ -338,7 +338,7 @@ mod tests { use bytes::{BufMut, BytesMut}; use itertools::Itertools; - use risingwave_common::catalog::{ColumnDesc, ColumnId}; + use risingwave_common::catalog::ColumnDesc; use risingwave_common::constants::hummock::PROPERTIES_RETENTION_SECOND_KEY; use risingwave_common::hash::VirtualNode; use risingwave_common::row::OwnedRow; @@ -387,53 +387,25 @@ mod tests { columns: vec![ PbColumnCatalog { column_desc: Some( - (&ColumnDesc { - data_type: DataType::Int64, - column_id: ColumnId::new(0), - name: "_row_id".to_string(), - field_descs: vec![], - type_name: "".to_string(), - }) - .into(), + (&ColumnDesc::new_atomic(DataType::Int64, "_row_id", 0)).into(), ), is_hidden: true, }, PbColumnCatalog { column_desc: Some( - (&ColumnDesc { - data_type: DataType::Int64, - column_id: ColumnId::new(0), - name: "col_1".to_string(), - field_descs: vec![], - type_name: "Int64".to_string(), - }) - .into(), + (&ColumnDesc::new_atomic(DataType::Int64, "col_1", 0)).into(), ), is_hidden: false, }, PbColumnCatalog { column_desc: Some( - (&ColumnDesc { - data_type: DataType::Float64, - column_id: ColumnId::new(0), - name: "col_2".to_string(), - field_descs: vec![], - type_name: "Float64".to_string(), - }) - .into(), + (&ColumnDesc::new_atomic(DataType::Float64, "col_2", 0)).into(), ), is_hidden: false, }, PbColumnCatalog { column_desc: Some( - (&ColumnDesc { - data_type: DataType::Varchar, - column_id: ColumnId::new(0), - name: "col_3".to_string(), - field_descs: vec![], - type_name: "Varchar".to_string(), - }) - .into(), + (&ColumnDesc::new_atomic(DataType::Varchar, "col_3", 0)).into(), ), is_hidden: false, }, diff --git a/src/stream/src/executor/lookup/tests.rs b/src/stream/src/executor/lookup/tests.rs index 7cbd975fcd50f..5fda9504d17f7 100644 --- a/src/stream/src/executor/lookup/tests.rs +++ b/src/stream/src/executor/lookup/tests.rs @@ -20,7 +20,7 @@ use futures::StreamExt; use itertools::Itertools; use risingwave_common::array::stream_chunk::StreamChunkTestExt; use risingwave_common::array::StreamChunk; -use risingwave_common::catalog::{ColumnDesc, ColumnId, ConflictBehavior, Field, Schema, TableId}; +use risingwave_common::catalog::{ColumnDesc, ConflictBehavior, Field, Schema, TableId}; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_storage::memory::MemoryStateStore; @@ -35,20 +35,8 @@ use crate::executor::{ fn arrangement_col_descs() -> Vec { vec![ - ColumnDesc { - data_type: DataType::Int64, - column_id: ColumnId::new(0), - name: "rowid_column".to_string(), - field_descs: vec![], - type_name: "".to_string(), - }, - ColumnDesc { - data_type: DataType::Int64, - column_id: ColumnId::new(1), - name: "join_column".to_string(), - field_descs: vec![], - type_name: "".to_string(), - }, + ColumnDesc::new_atomic(DataType::Int64, "rowid_column", 0), + ColumnDesc::new_atomic(DataType::Int64, "join_column", 1), ] } @@ -152,20 +140,8 @@ async fn create_arrangement( /// | b | | | 3 -> 4 | fn create_source() -> Box { let columns = vec![ - ColumnDesc { - data_type: DataType::Int64, - column_id: ColumnId::new(1), - name: "join_column".to_string(), - field_descs: vec![], - type_name: "".to_string(), - }, - ColumnDesc { - data_type: DataType::Int64, - column_id: ColumnId::new(2), - name: "rowid_column".to_string(), - field_descs: vec![], - type_name: "".to_string(), - }, + ColumnDesc::new_atomic(DataType::Int64, "join_column", 1), + ColumnDesc::new_atomic(DataType::Int64, "rowid_column", 2), ]; // Prepare source chunks.