From 53261c5aec42d7cf89945aeed54037572916d9d3 Mon Sep 17 00:00:00 2001 From: Kevin Axel Date: Sat, 18 Mar 2023 11:47:22 +0800 Subject: [PATCH] feat: Bushy tree join ordering (#8316) Signed-off-by: Kevin Axel --- src/common/src/session_config/mod.rs | 21 +- .../tests/testdata/bushy_join.yaml | 1408 +++++++++++++++++ .../src/optimizer/logical_optimization.rs | 26 +- .../optimizer/plan_node/logical_multi_join.rs | 239 +++ ...s => left_deep_tree_join_ordering_rule.rs} | 8 +- src/frontend/src/optimizer/rule/mod.rs | 8 +- .../stream/bushy_tree_join_ordering_rule.rs | 39 + src/frontend/src/optimizer/rule/stream/mod.rs | 1 + 8 files changed, 1736 insertions(+), 14 deletions(-) create mode 100644 src/frontend/planner_test/tests/testdata/bushy_join.yaml rename src/frontend/src/optimizer/rule/{reorder_multijoin_rule.rs => left_deep_tree_join_ordering_rule.rs} (95%) create mode 100644 src/frontend/src/optimizer/rule/stream/bushy_tree_join_ordering_rule.rs diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index d715de8985091..b8bdc55a58edb 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -34,7 +34,7 @@ use crate::util::epoch::Epoch; // This is a hack, &'static str is not allowed as a const generics argument. // TODO: refine this using the adt_const_params feature. -const CONFIG_KEYS: [&str; 21] = [ +const CONFIG_KEYS: [&str; 22] = [ "RW_IMPLICIT_FLUSH", "CREATE_COMPACTION_GROUP_FOR_MV", "QUERY_MODE", @@ -56,6 +56,7 @@ const CONFIG_KEYS: [&str; 21] = [ "RW_ENABLE_SHARE_PLAN", "INTERVALSTYLE", "BATCH_PARALLELISM", + "RW_STREAMING_ENABLE_BUSHY_JOIN", ]; // MUST HAVE 1v1 relationship to CONFIG_KEYS. e.g. CONFIG_KEYS[IMPLICIT_FLUSH] = @@ -81,6 +82,7 @@ const FORCE_TWO_PHASE_AGG: usize = 17; const RW_ENABLE_SHARE_PLAN: usize = 18; const INTERVAL_STYLE: usize = 19; const BATCH_PARALLELISM: usize = 20; +const STREAMING_ENABLE_BUSHY_JOIN: usize = 21; trait ConfigEntry: Default + for<'a> TryFrom<&'a [&'a str], Error = RwError> { fn entry_name() -> &'static str; @@ -277,6 +279,7 @@ type QueryEpoch = ConfigU64; type Timezone = ConfigString; type StreamingParallelism = ConfigU64; type StreamingEnableDeltaJoin = ConfigBool; +type StreamingEnableBushyJoin = ConfigBool; type EnableTwoPhaseAgg = ConfigBool; type ForceTwoPhaseAgg = ConfigBool; type EnableSharePlan = ConfigBool; @@ -342,6 +345,9 @@ pub struct ConfigMap { /// Enable delta join in streaming query. Defaults to false. streaming_enable_delta_join: StreamingEnableDeltaJoin, + /// Enable bushy join in the streaming query. Defaults to false. + streaming_enable_bushy_join: StreamingEnableBushyJoin, + /// Enable two phase agg optimization. Defaults to true. /// Setting this to true will always set `FORCE_TWO_PHASE_AGG` to false. enable_two_phase_agg: EnableTwoPhaseAgg, @@ -402,6 +408,8 @@ impl ConfigMap { self.streaming_parallelism = val.as_slice().try_into()?; } else if key.eq_ignore_ascii_case(StreamingEnableDeltaJoin::entry_name()) { self.streaming_enable_delta_join = val.as_slice().try_into()?; + } else if key.eq_ignore_ascii_case(StreamingEnableBushyJoin::entry_name()) { + self.streaming_enable_bushy_join = val.as_slice().try_into()?; } else if key.eq_ignore_ascii_case(EnableTwoPhaseAgg::entry_name()) { self.enable_two_phase_agg = val.as_slice().try_into()?; if !*self.enable_two_phase_agg { @@ -458,6 +466,8 @@ impl ConfigMap { Ok(self.streaming_parallelism.to_string()) } else if key.eq_ignore_ascii_case(StreamingEnableDeltaJoin::entry_name()) { Ok(self.streaming_enable_delta_join.to_string()) + } else if key.eq_ignore_ascii_case(StreamingEnableBushyJoin::entry_name()) { + Ok(self.streaming_enable_bushy_join.to_string()) } else if key.eq_ignore_ascii_case(EnableTwoPhaseAgg::entry_name()) { Ok(self.enable_two_phase_agg.to_string()) } else if key.eq_ignore_ascii_case(ForceTwoPhaseAgg::entry_name()) { @@ -550,6 +560,11 @@ impl ConfigMap { setting : self.streaming_enable_delta_join.to_string(), description: String::from("Enable delta join in streaming query.") }, + VariableInfo{ + name : StreamingEnableBushyJoin::entry_name().to_lowercase(), + setting : self.streaming_enable_bushy_join.to_string(), + description: String::from("Enable bushy join in streaming query.") + }, VariableInfo{ name : EnableTwoPhaseAgg::entry_name().to_lowercase(), setting : self.enable_two_phase_agg.to_string(), @@ -648,6 +663,10 @@ impl ConfigMap { *self.streaming_enable_delta_join } + pub fn get_streaming_enable_bushy_join(&self) -> bool { + *self.streaming_enable_bushy_join + } + pub fn get_enable_two_phase_agg(&self) -> bool { *self.enable_two_phase_agg } diff --git a/src/frontend/planner_test/tests/testdata/bushy_join.yaml b/src/frontend/planner_test/tests/testdata/bushy_join.yaml new file mode 100644 index 0000000000000..2678b03ecd3f1 --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/bushy_join.yaml @@ -0,0 +1,1408 @@ +# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information. +- id: create_tables + sql: | + CREATE TABLE supplier ( + s_suppkey INTEGER, + s_name VARCHAR, + s_address VARCHAR, + s_nationkey INTEGER, + s_phone VARCHAR, + s_acctbal NUMERIC, + s_comment VARCHAR, + PRIMARY KEY (s_suppkey) + ); + + CREATE TABLE part ( + p_partkey INTEGER, + p_name VARCHAR, + p_mfgr VARCHAR, + p_brand VARCHAR, + p_type VARCHAR, + p_size INTEGER, + p_container VARCHAR, + p_retailprice NUMERIC, + p_comment VARCHAR, + PRIMARY KEY (p_partkey) + ); + + CREATE TABLE partsupp ( + ps_partkey INTEGER, + ps_suppkey INTEGER, + ps_availqty INTEGER, + ps_supplycost NUMERIC, + ps_comment VARCHAR, + PRIMARY KEY (ps_partkey, ps_suppkey) + ); + + CREATE TABLE customer ( + c_custkey INTEGER, + c_name VARCHAR, + c_address VARCHAR, + c_nationkey INTEGER, + c_phone VARCHAR, + c_acctbal NUMERIC, + c_mktsegment VARCHAR, + c_comment VARCHAR, + PRIMARY KEY (c_custkey) + ); + + CREATE TABLE orders ( + o_orderkey BIGINT, + o_custkey INTEGER, + o_orderstatus VARCHAR, + o_totalprice NUMERIC, + o_orderdate DATE, + o_orderpriority VARCHAR, + o_clerk VARCHAR, + o_shippriority INTEGER, + o_comment VARCHAR, + PRIMARY KEY (o_orderkey) + ); + + CREATE TABLE lineitem ( + l_orderkey BIGINT, + l_partkey INTEGER, + l_suppkey INTEGER, + l_linenumber INTEGER, + l_quantity NUMERIC, + l_extendedprice NUMERIC, + l_discount NUMERIC, + l_tax NUMERIC, + l_returnflag VARCHAR, + l_linestatus VARCHAR, + l_shipdate DATE, + l_commitdate DATE, + l_receiptdate DATE, + l_shipinstruct VARCHAR, + l_shipmode VARCHAR, + l_comment VARCHAR, + PRIMARY KEY (l_orderkey, l_linenumber) + ); + + CREATE TABLE nation ( + n_nationkey INTEGER, + n_name VARCHAR, + n_regionkey INTEGER, + n_comment VARCHAR, + PRIMARY KEY (n_nationkey) + ); + + CREATE TABLE region ( + r_regionkey INTEGER, + r_name VARCHAR, + r_comment VARCHAR, + PRIMARY KEY (r_regionkey) + ); +- id: tpch_q1 + before: + - create_tables + sql: | + select + l_returnflag, + l_linestatus, + sum(l_quantity) as sum_qty, + sum(l_extendedprice) as sum_base_price, + sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, + sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, + round(avg(l_quantity), 4) as avg_qty, + round(avg(l_extendedprice), 4) as avg_price, + round(avg(l_discount), 4) as avg_disc, + count(*) as count_order + from + lineitem + where + l_shipdate <= date '1998-12-01' - interval '71' day + group by + l_returnflag, + l_linestatus + order by + l_returnflag, + l_linestatus; + stream_plan: | + StreamMaterialize { columns: [l_returnflag, l_linestatus, sum_qty, sum_base_price, sum_disc_price, sum_charge, avg_qty, avg_price, avg_disc, count_order], pk_columns: [l_returnflag, l_linestatus], pk_conflict: "no check" } + └─StreamProject { exprs: [lineitem.l_returnflag, lineitem.l_linestatus, sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum($expr1), sum($expr2), RoundDigit((sum(lineitem.l_quantity) / count(lineitem.l_quantity)), 4:Int32) as $expr3, RoundDigit((sum(lineitem.l_extendedprice) / count(lineitem.l_extendedprice)), 4:Int32) as $expr4, RoundDigit((sum(lineitem.l_discount) / count(lineitem.l_discount)), 4:Int32) as $expr5, count] } + └─StreamHashAgg { group_key: [lineitem.l_returnflag, lineitem.l_linestatus], aggs: [sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum($expr1), sum($expr2), count(lineitem.l_quantity), count(lineitem.l_extendedprice), sum(lineitem.l_discount), count(lineitem.l_discount), count] } + └─StreamExchange { dist: HashShard(lineitem.l_returnflag, lineitem.l_linestatus) } + └─StreamProject { exprs: [lineitem.l_returnflag, lineitem.l_linestatus, lineitem.l_quantity, lineitem.l_extendedprice, (lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) as $expr1, ((lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) * (1:Int32 + lineitem.l_tax)) as $expr2, lineitem.l_discount, lineitem.l_orderkey, lineitem.l_linenumber] } + └─StreamFilter { predicate: (lineitem.l_shipdate <= '1998-09-21 00:00:00':Timestamp) } + └─StreamTableScan { table: lineitem, columns: [lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_tax, lineitem.l_returnflag, lineitem.l_linestatus, lineitem.l_orderkey, lineitem.l_linenumber, lineitem.l_shipdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + with_config_map: + RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' +- id: tpch_q2 + before: + - create_tables + sql: | + select + s_acctbal, + s_name, + n_name, + p_partkey, + p_mfgr, + s_address, + s_phone, + s_comment + from + part, + supplier, + partsupp, + nation, + region + where + p_partkey = ps_partkey + and s_suppkey = ps_suppkey + and p_size = 4 + and p_type like '%TIN' + and s_nationkey = n_nationkey + and n_regionkey = r_regionkey + and r_name = 'AFRICA' + and ps_supplycost = ( + select + min(ps_supplycost) + from + partsupp, + supplier, + nation, + region + where + p_partkey = ps_partkey + and s_suppkey = ps_suppkey + and s_nationkey = n_nationkey + and n_regionkey = r_regionkey + and r_name = 'AFRICA' + ) + order by + s_acctbal desc, + n_name, + s_name, + p_partkey + limit 100; + stream_plan: | + StreamMaterialize { columns: [s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment, region.r_regionkey(hidden), nation.n_nationkey(hidden), supplier.s_suppkey(hidden), part.p_partkey(hidden), partsupp.ps_partkey(hidden), partsupp.ps_suppkey(hidden), min(partsupp.ps_supplycost)(hidden)], pk_columns: [region.r_regionkey, nation.n_nationkey, supplier.s_suppkey, part.p_partkey, p_partkey, partsupp.ps_partkey, partsupp.ps_suppkey, min(partsupp.ps_supplycost)], order_descs: [s_acctbal, n_name, s_name, p_partkey, region.r_regionkey, nation.n_nationkey, supplier.s_suppkey, part.p_partkey, partsupp.ps_partkey, partsupp.ps_suppkey, min(partsupp.ps_supplycost)], pk_conflict: "no check" } + └─StreamProject { exprs: [supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment, region.r_regionkey, nation.n_nationkey, supplier.s_suppkey, part.p_partkey, partsupp.ps_partkey, partsupp.ps_suppkey, min(partsupp.ps_supplycost)] } + └─StreamTopN { order: "[supplier.s_acctbal DESC, nation.n_name ASC, supplier.s_name ASC, part.p_partkey ASC]", limit: 100, offset: 0 } + └─StreamExchange { dist: Single } + └─StreamGroupTopN { order: "[supplier.s_acctbal DESC, nation.n_name ASC, supplier.s_name ASC, part.p_partkey ASC]", limit: 100, offset: 0, group_key: [15] } + └─StreamProject { exprs: [supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment, region.r_regionkey, nation.n_nationkey, supplier.s_suppkey, part.p_partkey, partsupp.ps_partkey, partsupp.ps_suppkey, min(partsupp.ps_supplycost), Vnode(supplier.s_suppkey) as $expr1] } + └─StreamHashJoin { type: Inner, predicate: supplier.s_suppkey = partsupp.ps_suppkey, output: [supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment, region.r_regionkey, nation.n_nationkey, supplier.s_suppkey, part.p_partkey, partsupp.ps_partkey, partsupp.ps_suppkey, min(partsupp.ps_supplycost)] } + ├─StreamExchange { dist: HashShard(supplier.s_suppkey) } + | └─StreamHashJoin { type: Inner, predicate: nation.n_nationkey = supplier.s_nationkey, output: [nation.n_name, supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, region.r_regionkey, nation.n_nationkey] } + | ├─StreamExchange { dist: HashShard(nation.n_nationkey) } + | | └─StreamHashJoin { type: Inner, predicate: region.r_regionkey = nation.n_regionkey, output: [nation.n_nationkey, nation.n_name, region.r_regionkey] } + | | ├─StreamExchange { dist: HashShard(region.r_regionkey) } + | | | └─StreamProject { exprs: [region.r_regionkey] } + | | | └─StreamFilter { predicate: (region.r_name = 'AFRICA':Varchar) } + | | | └─StreamTableScan { table: region, columns: [region.r_regionkey, region.r_name], pk: [region.r_regionkey], dist: UpstreamHashShard(region.r_regionkey) } + | | └─StreamExchange { dist: HashShard(nation.n_regionkey) } + | | └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name, nation.n_regionkey], pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } + | └─StreamExchange { dist: HashShard(supplier.s_nationkey) } + | └─StreamTableScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_nationkey, supplier.s_phone, supplier.s_acctbal, supplier.s_comment], pk: [supplier.s_suppkey], dist: UpstreamHashShard(supplier.s_suppkey) } + └─StreamExchange { dist: HashShard(partsupp.ps_suppkey) } + └─StreamHashJoin { type: Inner, predicate: part.p_partkey IS NOT DISTINCT FROM part.p_partkey AND min(partsupp.ps_supplycost) = partsupp.ps_supplycost, output: [part.p_partkey, part.p_mfgr, partsupp.ps_suppkey, part.p_partkey, min(partsupp.ps_supplycost), partsupp.ps_partkey] } + ├─StreamProject { exprs: [part.p_partkey, min(partsupp.ps_supplycost)] } + | └─StreamHashAgg { group_key: [part.p_partkey], aggs: [min(partsupp.ps_supplycost), count] } + | └─StreamHashJoin { type: LeftOuter, predicate: part.p_partkey IS NOT DISTINCT FROM partsupp.ps_partkey, output: [part.p_partkey, partsupp.ps_supplycost, partsupp.ps_partkey, partsupp.ps_suppkey, supplier.s_suppkey, region.r_regionkey, nation.n_nationkey, supplier.s_nationkey] } + | ├─StreamExchange { dist: HashShard(part.p_partkey) } + | | └─StreamProject { exprs: [part.p_partkey] } + | | └─StreamHashAgg { group_key: [part.p_partkey], aggs: [count] } + | | └─StreamProject { exprs: [part.p_partkey] } + | | └─StreamFilter { predicate: (part.p_size = 4:Int32) AND Like(part.p_type, '%TIN':Varchar) } + | | └─StreamTableScan { table: part, columns: [part.p_partkey, part.p_type, part.p_size], pk: [part.p_partkey], dist: UpstreamHashShard(part.p_partkey) } + | └─StreamExchange { dist: HashShard(partsupp.ps_partkey) } + | └─StreamHashJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [partsupp.ps_partkey, partsupp.ps_supplycost, partsupp.ps_suppkey, supplier.s_suppkey, supplier.s_nationkey, region.r_regionkey, nation.n_nationkey] } + | ├─StreamExchange { dist: HashShard(supplier.s_nationkey) } + | | └─StreamHashJoin { type: Inner, predicate: partsupp.ps_suppkey = supplier.s_suppkey, output: [partsupp.ps_partkey, partsupp.ps_supplycost, supplier.s_nationkey, partsupp.ps_suppkey, supplier.s_suppkey] } + | | ├─StreamExchange { dist: HashShard(partsupp.ps_suppkey) } + | | | └─StreamFilter { predicate: IsNotNull(partsupp.ps_partkey) } + | | | └─StreamTableScan { table: partsupp, columns: [partsupp.ps_partkey, partsupp.ps_suppkey, partsupp.ps_supplycost], pk: [partsupp.ps_partkey, partsupp.ps_suppkey], dist: UpstreamHashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } + | | └─StreamExchange { dist: HashShard(supplier.s_suppkey) } + | | └─StreamTableScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_nationkey], pk: [supplier.s_suppkey], dist: UpstreamHashShard(supplier.s_suppkey) } + | └─StreamExchange { dist: HashShard(nation.n_nationkey) } + | └─StreamHashJoin { type: Inner, predicate: region.r_regionkey = nation.n_regionkey, output: [nation.n_nationkey, region.r_regionkey] } + | ├─StreamExchange { dist: HashShard(region.r_regionkey) } + | | └─StreamProject { exprs: [region.r_regionkey] } + | | └─StreamFilter { predicate: (region.r_name = 'AFRICA':Varchar) } + | | └─StreamTableScan { table: region, columns: [region.r_regionkey, region.r_name], pk: [region.r_regionkey], dist: UpstreamHashShard(region.r_regionkey) } + | └─StreamExchange { dist: HashShard(nation.n_regionkey) } + | └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_regionkey], pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } + └─StreamHashJoin { type: Inner, predicate: part.p_partkey = partsupp.ps_partkey, output: [part.p_partkey, part.p_mfgr, partsupp.ps_suppkey, partsupp.ps_supplycost, partsupp.ps_partkey] } + ├─StreamExchange { dist: HashShard(part.p_partkey) } + | └─StreamProject { exprs: [part.p_partkey, part.p_mfgr] } + | └─StreamFilter { predicate: (part.p_size = 4:Int32) AND Like(part.p_type, '%TIN':Varchar) } + | └─StreamTableScan { table: part, columns: [part.p_partkey, part.p_mfgr, part.p_type, part.p_size], pk: [part.p_partkey], dist: UpstreamHashShard(part.p_partkey) } + └─StreamExchange { dist: HashShard(partsupp.ps_partkey) } + └─StreamTableScan { table: partsupp, columns: [partsupp.ps_partkey, partsupp.ps_suppkey, partsupp.ps_supplycost], pk: [partsupp.ps_partkey, partsupp.ps_suppkey], dist: UpstreamHashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } + with_config_map: + RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' +- id: tpch_q3 + before: + - create_tables + sql: | + select + l_orderkey, + sum(l_extendedprice * (1 - l_discount)) as revenue, + o_orderdate, + o_shippriority + from + customer, + orders, + lineitem + where + c_mktsegment = 'FURNITURE' + and c_custkey = o_custkey + and l_orderkey = o_orderkey + and o_orderdate < date '1995-03-29' + and l_shipdate > date '1995-03-29' + group by + l_orderkey, + o_orderdate, + o_shippriority + order by + revenue desc, + o_orderdate + limit 10; + stream_plan: | + StreamMaterialize { columns: [l_orderkey, revenue, o_orderdate, o_shippriority], pk_columns: [l_orderkey, o_orderdate, o_shippriority], order_descs: [revenue, o_orderdate, l_orderkey, o_shippriority], pk_conflict: "no check" } + └─StreamProject { exprs: [lineitem.l_orderkey, sum($expr1), orders.o_orderdate, orders.o_shippriority] } + └─StreamTopN { order: "[sum($expr1) DESC, orders.o_orderdate ASC]", limit: 10, offset: 0 } + └─StreamExchange { dist: Single } + └─StreamGroupTopN { order: "[sum($expr1) DESC, orders.o_orderdate ASC]", limit: 10, offset: 0, group_key: [4] } + └─StreamProject { exprs: [lineitem.l_orderkey, sum($expr1), orders.o_orderdate, orders.o_shippriority, Vnode(lineitem.l_orderkey) as $expr2] } + └─StreamProject { exprs: [lineitem.l_orderkey, sum($expr1), orders.o_orderdate, orders.o_shippriority] } + └─StreamHashAgg { group_key: [lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority], aggs: [sum($expr1), count] } + └─StreamProject { exprs: [lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority, (lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) as $expr1, lineitem.l_linenumber, customer.c_custkey, orders.o_orderkey] } + └─StreamHashJoin { type: Inner, predicate: lineitem.l_orderkey = orders.o_orderkey, output: [orders.o_orderdate, orders.o_shippriority, lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber, customer.c_custkey, orders.o_orderkey] } + ├─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + | └─StreamProject { exprs: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber] } + | └─StreamFilter { predicate: (lineitem.l_shipdate > '1995-03-29':Date) } + | └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber, lineitem.l_shipdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + └─StreamExchange { dist: HashShard(orders.o_orderkey) } + └─StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey, output: [orders.o_orderkey, orders.o_orderdate, orders.o_shippriority, customer.c_custkey] } + ├─StreamExchange { dist: HashShard(customer.c_custkey) } + | └─StreamProject { exprs: [customer.c_custkey] } + | └─StreamFilter { predicate: (customer.c_mktsegment = 'FURNITURE':Varchar) } + | └─StreamTableScan { table: customer, columns: [customer.c_custkey, customer.c_mktsegment], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } + └─StreamExchange { dist: HashShard(orders.o_custkey) } + └─StreamFilter { predicate: (orders.o_orderdate < '1995-03-29':Date) } + └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey, orders.o_orderdate, orders.o_shippriority], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } + with_config_map: + RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' +- id: tpch_q4 + before: + - create_tables + sql: | + select + o_orderpriority, + count(*) as order_count + from + orders + where + o_orderdate >= date '1997-07-01' + and o_orderdate < date '1997-07-01' + interval '3' month + and exists ( + select + * + from + lineitem + where + l_orderkey = o_orderkey + and l_commitdate < l_receiptdate + ) + group by + o_orderpriority + order by + o_orderpriority; + stream_plan: | + StreamMaterialize { columns: [o_orderpriority, order_count], pk_columns: [o_orderpriority], pk_conflict: "no check" } + └─StreamHashAgg { group_key: [orders.o_orderpriority], aggs: [count] } + └─StreamExchange { dist: HashShard(orders.o_orderpriority) } + └─StreamHashJoin { type: LeftSemi, predicate: orders.o_orderkey = lineitem.l_orderkey, output: [orders.o_orderpriority, orders.o_orderkey] } + ├─StreamExchange { dist: HashShard(orders.o_orderkey) } + | └─StreamProject { exprs: [orders.o_orderkey, orders.o_orderpriority] } + | └─StreamFilter { predicate: (orders.o_orderdate >= '1997-07-01':Date) AND (orders.o_orderdate < '1997-10-01 00:00:00':Timestamp) } + | └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_orderpriority, orders.o_orderdate], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } + └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + └─StreamProject { exprs: [lineitem.l_orderkey, lineitem.l_linenumber] } + └─StreamFilter { predicate: (lineitem.l_commitdate < lineitem.l_receiptdate) } + └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_linenumber, lineitem.l_commitdate, lineitem.l_receiptdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + with_config_map: + RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' +- id: tpch_q5 + before: + - create_tables + sql: | + select + n_name, + sum(l_extendedprice * (1 - l_discount)) as revenue + from + customer, + orders, + lineitem, + supplier, + nation, + region + where + c_custkey = o_custkey + and l_orderkey = o_orderkey + and l_suppkey = s_suppkey + and c_nationkey = s_nationkey + and s_nationkey = n_nationkey + and n_regionkey = r_regionkey + and r_name = 'MIDDLE EAST' + and o_orderdate >= date '1994-01-01' + and o_orderdate < date '1994-01-01' + interval '1' year + group by + n_name + order by + revenue desc; + stream_plan: | + StreamMaterialize { columns: [n_name, revenue], pk_columns: [n_name], order_descs: [revenue, n_name], pk_conflict: "no check" } + └─StreamProject { exprs: [nation.n_name, sum($expr1)] } + └─StreamHashAgg { group_key: [nation.n_name], aggs: [sum($expr1), count] } + └─StreamExchange { dist: HashShard(nation.n_name) } + └─StreamProject { exprs: [nation.n_name, (lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) as $expr1, lineitem.l_orderkey, lineitem.l_linenumber, customer.c_custkey, orders.o_orderkey, region.r_regionkey, nation.n_nationkey, supplier.s_suppkey, lineitem.l_suppkey, customer.c_nationkey] } + └─StreamHashJoin { type: Inner, predicate: lineitem.l_suppkey = supplier.s_suppkey AND customer.c_nationkey = supplier.s_nationkey, output: [lineitem.l_extendedprice, lineitem.l_discount, nation.n_name, lineitem.l_orderkey, lineitem.l_linenumber, customer.c_custkey, orders.o_orderkey, lineitem.l_suppkey, customer.c_nationkey, region.r_regionkey, nation.n_nationkey, supplier.s_suppkey] } + ├─StreamExchange { dist: HashShard(lineitem.l_suppkey, customer.c_nationkey) } + | └─StreamHashJoin { type: Inner, predicate: lineitem.l_orderkey = orders.o_orderkey, output: [lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount, customer.c_nationkey, lineitem.l_orderkey, lineitem.l_linenumber, customer.c_custkey, orders.o_orderkey] } + | ├─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + | | └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + | └─StreamExchange { dist: HashShard(orders.o_orderkey) } + | └─StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey, output: [customer.c_nationkey, orders.o_orderkey, customer.c_custkey] } + | ├─StreamExchange { dist: HashShard(customer.c_custkey) } + | | └─StreamTableScan { table: customer, columns: [customer.c_custkey, customer.c_nationkey], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } + | └─StreamExchange { dist: HashShard(orders.o_custkey) } + | └─StreamProject { exprs: [orders.o_orderkey, orders.o_custkey] } + | └─StreamFilter { predicate: (orders.o_orderdate >= '1994-01-01':Date) AND (orders.o_orderdate < '1995-01-01 00:00:00':Timestamp) } + | └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey, orders.o_orderdate], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } + └─StreamExchange { dist: HashShard(supplier.s_suppkey, supplier.s_nationkey) } + └─StreamHashJoin { type: Inner, predicate: nation.n_nationkey = supplier.s_nationkey, output: [nation.n_name, supplier.s_suppkey, supplier.s_nationkey, region.r_regionkey, nation.n_nationkey] } + ├─StreamExchange { dist: HashShard(nation.n_nationkey) } + | └─StreamHashJoin { type: Inner, predicate: region.r_regionkey = nation.n_regionkey, output: [nation.n_nationkey, nation.n_name, region.r_regionkey] } + | ├─StreamExchange { dist: HashShard(region.r_regionkey) } + | | └─StreamProject { exprs: [region.r_regionkey] } + | | └─StreamFilter { predicate: (region.r_name = 'MIDDLE EAST':Varchar) } + | | └─StreamTableScan { table: region, columns: [region.r_regionkey, region.r_name], pk: [region.r_regionkey], dist: UpstreamHashShard(region.r_regionkey) } + | └─StreamExchange { dist: HashShard(nation.n_regionkey) } + | └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name, nation.n_regionkey], pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } + └─StreamExchange { dist: HashShard(supplier.s_nationkey) } + └─StreamTableScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_nationkey], pk: [supplier.s_suppkey], dist: UpstreamHashShard(supplier.s_suppkey) } + with_config_map: + RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' +- id: tpch_q6 + before: + - create_tables + sql: | + select + sum(l_extendedprice * l_discount) as revenue + from + lineitem + where + l_shipdate >= date '1994-01-01' + and l_shipdate < date '1994-01-01' + interval '1' year + and l_discount between 0.08 - 0.01 and 0.08 + 0.01 + and l_quantity < 24; + stream_plan: | + StreamMaterialize { columns: [revenue], pk_columns: [], pk_conflict: "no check" } + └─StreamProject { exprs: [sum(sum($expr1))] } + └─StreamGlobalSimpleAgg { aggs: [sum(sum($expr1)), count] } + └─StreamExchange { dist: Single } + └─StreamStatelessLocalSimpleAgg { aggs: [sum($expr1)] } + └─StreamProject { exprs: [(lineitem.l_extendedprice * lineitem.l_discount) as $expr1, lineitem.l_orderkey, lineitem.l_linenumber] } + └─StreamFilter { predicate: (lineitem.l_shipdate >= '1994-01-01':Date) AND (lineitem.l_shipdate < '1995-01-01 00:00:00':Timestamp) AND (lineitem.l_discount >= 0.07:Decimal) AND (lineitem.l_discount <= 0.09:Decimal) AND (lineitem.l_quantity < 24:Int32) } + └─StreamTableScan { table: lineitem, columns: [lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_orderkey, lineitem.l_linenumber, lineitem.l_quantity, lineitem.l_shipdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + with_config_map: + RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' +- id: tpch_q7 + before: + - create_tables + sql: | + select + supp_nation, + cust_nation, + l_year, + sum(volume) as revenue + from + ( + select + n1.n_name as supp_nation, + n2.n_name as cust_nation, + extract(year from l_shipdate) as l_year, + l_extendedprice * (1 - l_discount) as volume + from + supplier, + lineitem, + orders, + customer, + nation n1, + nation n2 + where + s_suppkey = l_suppkey + and o_orderkey = l_orderkey + and c_custkey = o_custkey + and s_nationkey = n1.n_nationkey + and c_nationkey = n2.n_nationkey + and ( + (n1.n_name = 'ROMANIA' and n2.n_name = 'IRAN') + or (n1.n_name = 'IRAN' and n2.n_name = 'ROMANIA') + ) + and l_shipdate between date '1983-01-01' and date '2000-12-31' + ) as shipping + group by + supp_nation, + cust_nation, + l_year + order by + supp_nation, + cust_nation, + l_year; + stream_plan: | + StreamMaterialize { columns: [supp_nation, cust_nation, l_year, revenue], pk_columns: [supp_nation, cust_nation, l_year], pk_conflict: "no check" } + └─StreamProject { exprs: [nation.n_name, nation.n_name, $expr1, sum($expr2)] } + └─StreamHashAgg { group_key: [nation.n_name, nation.n_name, $expr1], aggs: [sum($expr2), count] } + └─StreamExchange { dist: HashShard(nation.n_name, nation.n_name, $expr1) } + └─StreamProject { exprs: [nation.n_name, nation.n_name, Extract('YEAR':Varchar, lineitem.l_shipdate) as $expr1, (lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) as $expr2, nation.n_nationkey, supplier.s_suppkey, lineitem.l_orderkey, lineitem.l_linenumber, nation.n_nationkey, customer.c_custkey, orders.o_orderkey] } + └─StreamFilter { predicate: (((nation.n_name = 'ROMANIA':Varchar) AND (nation.n_name = 'IRAN':Varchar)) OR ((nation.n_name = 'IRAN':Varchar) AND (nation.n_name = 'ROMANIA':Varchar))) } + └─StreamHashJoin { type: Inner, predicate: lineitem.l_orderkey = orders.o_orderkey, output: all } + ├─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + | └─StreamHashJoin { type: Inner, predicate: supplier.s_suppkey = lineitem.l_suppkey, output: [nation.n_name, lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipdate, nation.n_nationkey, supplier.s_suppkey, lineitem.l_linenumber] } + | ├─StreamExchange { dist: HashShard(supplier.s_suppkey) } + | | └─StreamHashJoin { type: Inner, predicate: nation.n_nationkey = supplier.s_nationkey, output: [nation.n_name, supplier.s_suppkey, nation.n_nationkey] } + | | ├─StreamExchange { dist: HashShard(nation.n_nationkey) } + | | | └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name], pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } + | | └─StreamExchange { dist: HashShard(supplier.s_nationkey) } + | | └─StreamTableScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_nationkey], pk: [supplier.s_suppkey], dist: UpstreamHashShard(supplier.s_suppkey) } + | └─StreamExchange { dist: HashShard(lineitem.l_suppkey) } + | └─StreamFilter { predicate: (lineitem.l_shipdate >= '1983-01-01':Date) AND (lineitem.l_shipdate <= '2000-12-31':Date) } + | └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipdate, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + └─StreamExchange { dist: HashShard(orders.o_orderkey) } + └─StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey, output: [nation.n_name, orders.o_orderkey, nation.n_nationkey, customer.c_custkey] } + ├─StreamExchange { dist: HashShard(customer.c_custkey) } + | └─StreamHashJoin { type: Inner, predicate: nation.n_nationkey = customer.c_nationkey, output: [nation.n_name, customer.c_custkey, nation.n_nationkey] } + | ├─StreamExchange { dist: HashShard(nation.n_nationkey) } + | | └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name], pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } + | └─StreamExchange { dist: HashShard(customer.c_nationkey) } + | └─StreamTableScan { table: customer, columns: [customer.c_custkey, customer.c_nationkey], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } + └─StreamExchange { dist: HashShard(orders.o_custkey) } + └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } + with_config_map: + RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' +- id: tpch_q8 + before: + - create_tables + sql: | + select + o_year, + round(sum(case + when nation = 'IRAN' then volume + else 0 + end) / sum(volume), 6) as mkt_share + from + ( + select + extract(year from o_orderdate) as o_year, + l_extendedprice * (1 - l_discount) as volume, + n2.n_name as nation + from + part, + supplier, + lineitem, + orders, + customer, + nation n1, + nation n2, + region + where + p_partkey = l_partkey + and s_suppkey = l_suppkey + and l_orderkey = o_orderkey + and o_custkey = c_custkey + and c_nationkey = n1.n_nationkey + and n1.n_regionkey = r_regionkey + and r_name = 'ASIA' + and s_nationkey = n2.n_nationkey + and o_orderdate between date '1995-01-01' and date '1996-12-31' + and p_type = 'PROMO ANODIZED STEEL' + ) as all_nations + group by + o_year + order by + o_year; + stream_plan: | + StreamMaterialize { columns: [o_year, mkt_share], pk_columns: [o_year], pk_conflict: "no check" } + └─StreamProject { exprs: [$expr1, RoundDigit((sum($expr2) / sum($expr3)), 6:Int32) as $expr4] } + └─StreamHashAgg { group_key: [$expr1], aggs: [sum($expr2), sum($expr3), count] } + └─StreamExchange { dist: HashShard($expr1) } + └─StreamProject { exprs: [Extract('YEAR':Varchar, orders.o_orderdate) as $expr1, Case((nation.n_name = 'IRAN':Varchar), (lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)), 0:Decimal) as $expr2, (lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) as $expr3, region.r_regionkey, nation.n_nationkey, customer.c_custkey, nation.n_nationkey, supplier.s_suppkey, part.p_partkey, lineitem.l_orderkey, lineitem.l_linenumber, orders.o_orderkey] } + └─StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey, output: [lineitem.l_extendedprice, lineitem.l_discount, orders.o_orderdate, nation.n_name, region.r_regionkey, nation.n_nationkey, customer.c_custkey, nation.n_nationkey, supplier.s_suppkey, part.p_partkey, lineitem.l_orderkey, lineitem.l_linenumber, orders.o_orderkey] } + ├─StreamExchange { dist: HashShard(customer.c_custkey) } + | └─StreamHashJoin { type: Inner, predicate: nation.n_nationkey = customer.c_nationkey, output: [customer.c_custkey, region.r_regionkey, nation.n_nationkey] } + | ├─StreamExchange { dist: HashShard(nation.n_nationkey) } + | | └─StreamHashJoin { type: Inner, predicate: region.r_regionkey = nation.n_regionkey, output: [nation.n_nationkey, region.r_regionkey] } + | | ├─StreamExchange { dist: HashShard(region.r_regionkey) } + | | | └─StreamProject { exprs: [region.r_regionkey] } + | | | └─StreamFilter { predicate: (region.r_name = 'ASIA':Varchar) } + | | | └─StreamTableScan { table: region, columns: [region.r_regionkey, region.r_name], pk: [region.r_regionkey], dist: UpstreamHashShard(region.r_regionkey) } + | | └─StreamExchange { dist: HashShard(nation.n_regionkey) } + | | └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_regionkey], pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } + | └─StreamExchange { dist: HashShard(customer.c_nationkey) } + | └─StreamTableScan { table: customer, columns: [customer.c_custkey, customer.c_nationkey], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } + └─StreamExchange { dist: HashShard(orders.o_custkey) } + └─StreamHashJoin { type: Inner, predicate: lineitem.l_orderkey = orders.o_orderkey, output: [nation.n_name, lineitem.l_extendedprice, lineitem.l_discount, orders.o_custkey, orders.o_orderdate, nation.n_nationkey, supplier.s_suppkey, part.p_partkey, lineitem.l_orderkey, lineitem.l_linenumber, orders.o_orderkey] } + ├─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + | └─StreamHashJoin { type: Inner, predicate: supplier.s_suppkey = lineitem.l_suppkey, output: [nation.n_name, lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, nation.n_nationkey, supplier.s_suppkey, part.p_partkey, lineitem.l_linenumber] } + | ├─StreamExchange { dist: HashShard(supplier.s_suppkey) } + | | └─StreamHashJoin { type: Inner, predicate: nation.n_nationkey = supplier.s_nationkey, output: [nation.n_name, supplier.s_suppkey, nation.n_nationkey] } + | | ├─StreamExchange { dist: HashShard(nation.n_nationkey) } + | | | └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name], pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } + | | └─StreamExchange { dist: HashShard(supplier.s_nationkey) } + | | └─StreamTableScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_nationkey], pk: [supplier.s_suppkey], dist: UpstreamHashShard(supplier.s_suppkey) } + | └─StreamExchange { dist: HashShard(lineitem.l_suppkey) } + | └─StreamHashJoin { type: Inner, predicate: part.p_partkey = lineitem.l_partkey, output: [lineitem.l_orderkey, lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount, part.p_partkey, lineitem.l_linenumber] } + | ├─StreamExchange { dist: HashShard(part.p_partkey) } + | | └─StreamProject { exprs: [part.p_partkey] } + | | └─StreamFilter { predicate: (part.p_type = 'PROMO ANODIZED STEEL':Varchar) } + | | └─StreamTableScan { table: part, columns: [part.p_partkey, part.p_type], pk: [part.p_partkey], dist: UpstreamHashShard(part.p_partkey) } + | └─StreamExchange { dist: HashShard(lineitem.l_partkey) } + | └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + └─StreamExchange { dist: HashShard(orders.o_orderkey) } + └─StreamFilter { predicate: (orders.o_orderdate >= '1995-01-01':Date) AND (orders.o_orderdate <= '1996-12-31':Date) } + └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey, orders.o_orderdate], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } + with_config_map: + RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' +- id: tpch_q9 + before: + - create_tables + sql: | + select + nation, + o_year, + round(sum(amount), 2) as sum_profit + from + ( + select + n_name as nation, + extract(year from o_orderdate) as o_year, + l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as amount + from + part, + supplier, + lineitem, + partsupp, + orders, + nation + where + s_suppkey = l_suppkey + and ps_suppkey = l_suppkey + and ps_partkey = l_partkey + and p_partkey = l_partkey + and o_orderkey = l_orderkey + and s_nationkey = n_nationkey + and p_name like '%yellow%' + ) as profit + group by + nation, + o_year + order by + nation, + o_year desc; + stream_plan: | + StreamMaterialize { columns: [nation, o_year, sum_profit], pk_columns: [nation, o_year], pk_conflict: "no check" } + └─StreamProject { exprs: [nation.n_name, $expr1, RoundDigit(sum($expr2), 2:Int32) as $expr3] } + └─StreamHashAgg { group_key: [nation.n_name, $expr1], aggs: [sum($expr2), count] } + └─StreamExchange { dist: HashShard(nation.n_name, $expr1) } + └─StreamProject { exprs: [nation.n_name, Extract('YEAR':Varchar, orders.o_orderdate) as $expr1, ((lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) - (partsupp.ps_supplycost * lineitem.l_quantity)) as $expr2, nation.n_nationkey, supplier.s_suppkey, orders.o_orderkey, partsupp.ps_partkey, partsupp.ps_suppkey, part.p_partkey, lineitem.l_orderkey, lineitem.l_linenumber] } + └─StreamHashJoin { type: Inner, predicate: supplier.s_suppkey = lineitem.l_suppkey, output: [lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, partsupp.ps_supplycost, orders.o_orderdate, nation.n_name, nation.n_nationkey, supplier.s_suppkey, orders.o_orderkey, partsupp.ps_partkey, partsupp.ps_suppkey, part.p_partkey, lineitem.l_orderkey, lineitem.l_linenumber] } + ├─StreamExchange { dist: HashShard(supplier.s_suppkey) } + | └─StreamHashJoin { type: Inner, predicate: nation.n_nationkey = supplier.s_nationkey, output: [nation.n_name, supplier.s_suppkey, nation.n_nationkey] } + | ├─StreamExchange { dist: HashShard(nation.n_nationkey) } + | | └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name], pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } + | └─StreamExchange { dist: HashShard(supplier.s_nationkey) } + | └─StreamTableScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_nationkey], pk: [supplier.s_suppkey], dist: UpstreamHashShard(supplier.s_suppkey) } + └─StreamExchange { dist: HashShard(lineitem.l_suppkey) } + └─StreamHashJoin { type: Inner, predicate: orders.o_orderkey = lineitem.l_orderkey, output: [orders.o_orderdate, partsupp.ps_supplycost, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, orders.o_orderkey, partsupp.ps_partkey, partsupp.ps_suppkey, part.p_partkey, lineitem.l_orderkey, lineitem.l_linenumber] } + ├─StreamExchange { dist: HashShard(orders.o_orderkey) } + | └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_orderdate], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } + └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + └─StreamHashJoin { type: Inner, predicate: partsupp.ps_suppkey = lineitem.l_suppkey AND partsupp.ps_partkey = lineitem.l_partkey, output: [partsupp.ps_supplycost, lineitem.l_orderkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, partsupp.ps_partkey, partsupp.ps_suppkey, part.p_partkey, lineitem.l_linenumber] } + ├─StreamExchange { dist: HashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } + | └─StreamTableScan { table: partsupp, columns: [partsupp.ps_partkey, partsupp.ps_suppkey, partsupp.ps_supplycost], pk: [partsupp.ps_partkey, partsupp.ps_suppkey], dist: UpstreamHashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } + └─StreamExchange { dist: HashShard(lineitem.l_partkey, lineitem.l_suppkey) } + └─StreamHashJoin { type: Inner, predicate: part.p_partkey = lineitem.l_partkey, output: [lineitem.l_orderkey, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, part.p_partkey, lineitem.l_linenumber] } + ├─StreamExchange { dist: HashShard(part.p_partkey) } + | └─StreamProject { exprs: [part.p_partkey] } + | └─StreamFilter { predicate: Like(part.p_name, '%yellow%':Varchar) } + | └─StreamTableScan { table: part, columns: [part.p_partkey, part.p_name], pk: [part.p_partkey], dist: UpstreamHashShard(part.p_partkey) } + └─StreamExchange { dist: HashShard(lineitem.l_partkey) } + └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + with_config_map: + RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' +- id: tpch_q10 + before: + - create_tables + sql: | + select + c_custkey, + c_name, + sum(l_extendedprice * (1.00 - l_discount)) as revenue, + c_acctbal, + n_name, + c_address, + c_phone, + c_comment + from + customer, + orders, + lineitem, + nation + where + c_custkey = o_custkey + and l_orderkey = o_orderkey + and o_orderdate >= date '1994-01-01' + and o_orderdate < date '1994-01-01' + interval '3' month + and l_returnflag = 'R' + and c_nationkey = n_nationkey + group by + c_custkey, + c_name, + c_acctbal, + c_phone, + n_name, + c_address, + c_comment + order by + revenue desc + limit 20; + stream_plan: | + StreamMaterialize { columns: [c_custkey, c_name, revenue, c_acctbal, n_name, c_address, c_phone, c_comment], pk_columns: [c_custkey, c_name, c_acctbal, c_phone, n_name, c_address, c_comment], order_descs: [revenue, c_custkey, c_name, c_acctbal, c_phone, n_name, c_address, c_comment], pk_conflict: "no check" } + └─StreamProject { exprs: [customer.c_custkey, customer.c_name, sum($expr1), customer.c_acctbal, nation.n_name, customer.c_address, customer.c_phone, customer.c_comment] } + └─StreamTopN { order: "[sum($expr1) DESC]", limit: 20, offset: 0 } + └─StreamExchange { dist: Single } + └─StreamGroupTopN { order: "[sum($expr1) DESC]", limit: 20, offset: 0, group_key: [8] } + └─StreamProject { exprs: [customer.c_custkey, customer.c_name, sum($expr1), customer.c_acctbal, nation.n_name, customer.c_address, customer.c_phone, customer.c_comment, Vnode(customer.c_custkey) as $expr2] } + └─StreamProject { exprs: [customer.c_custkey, customer.c_name, sum($expr1), customer.c_acctbal, nation.n_name, customer.c_address, customer.c_phone, customer.c_comment] } + └─StreamHashAgg { group_key: [customer.c_custkey, customer.c_name, customer.c_acctbal, customer.c_phone, nation.n_name, customer.c_address, customer.c_comment], aggs: [sum($expr1), count] } + └─StreamProject { exprs: [customer.c_custkey, customer.c_name, customer.c_acctbal, customer.c_phone, nation.n_name, customer.c_address, customer.c_comment, (lineitem.l_extendedprice * (1.00:Decimal - lineitem.l_discount)) as $expr1, nation.n_nationkey, lineitem.l_orderkey, lineitem.l_linenumber, orders.o_orderkey] } + └─StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey, output: [customer.c_custkey, customer.c_name, customer.c_address, customer.c_phone, customer.c_acctbal, customer.c_comment, lineitem.l_extendedprice, lineitem.l_discount, nation.n_name, nation.n_nationkey, lineitem.l_orderkey, lineitem.l_linenumber, orders.o_orderkey] } + ├─StreamExchange { dist: HashShard(customer.c_custkey) } + | └─StreamHashJoin { type: Inner, predicate: nation.n_nationkey = customer.c_nationkey, output: [nation.n_name, customer.c_custkey, customer.c_name, customer.c_address, customer.c_phone, customer.c_acctbal, customer.c_comment, nation.n_nationkey] } + | ├─StreamExchange { dist: HashShard(nation.n_nationkey) } + | | └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name], pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } + | └─StreamExchange { dist: HashShard(customer.c_nationkey) } + | └─StreamTableScan { table: customer, columns: [customer.c_custkey, customer.c_name, customer.c_address, customer.c_nationkey, customer.c_phone, customer.c_acctbal, customer.c_comment], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } + └─StreamExchange { dist: HashShard(orders.o_custkey) } + └─StreamHashJoin { type: Inner, predicate: lineitem.l_orderkey = orders.o_orderkey, output: [lineitem.l_extendedprice, lineitem.l_discount, orders.o_custkey, lineitem.l_orderkey, lineitem.l_linenumber, orders.o_orderkey] } + ├─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + | └─StreamProject { exprs: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber] } + | └─StreamFilter { predicate: (lineitem.l_returnflag = 'R':Varchar) } + | └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber, lineitem.l_returnflag], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + └─StreamExchange { dist: HashShard(orders.o_orderkey) } + └─StreamProject { exprs: [orders.o_orderkey, orders.o_custkey] } + └─StreamFilter { predicate: (orders.o_orderdate >= '1994-01-01':Date) AND (orders.o_orderdate < '1994-04-01 00:00:00':Timestamp) } + └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey, orders.o_orderdate], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } + with_config_map: + RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' +- id: tpch_q11 + before: + - create_tables + sql: | + select + ps_partkey, + sum(ps_supplycost * ps_availqty) as value + from + partsupp, + supplier, + nation + where + ps_suppkey = s_suppkey + and s_nationkey = n_nationkey + and n_name = 'ARGENTINA' + group by + ps_partkey + having + sum(ps_supplycost * ps_availqty) > ( + select + sum(ps_supplycost * ps_availqty) * 0.0001000000 + from + partsupp, + supplier, + nation + where + ps_suppkey = s_suppkey + and s_nationkey = n_nationkey + and n_name = 'ARGENTINA' + ) + order by + value desc; + stream_plan: | + StreamMaterialize { columns: [ps_partkey, value], pk_columns: [ps_partkey], order_descs: [value, ps_partkey], pk_conflict: "no check" } + └─StreamDynamicFilter { predicate: (sum($expr1) > $expr3), output: [partsupp.ps_partkey, sum($expr1)] } + ├─StreamProject { exprs: [partsupp.ps_partkey, sum($expr1)] } + | └─StreamHashAgg { group_key: [partsupp.ps_partkey], aggs: [sum($expr1), count] } + | └─StreamExchange { dist: HashShard(partsupp.ps_partkey) } + | └─StreamProject { exprs: [partsupp.ps_partkey, (partsupp.ps_supplycost * partsupp.ps_availqty) as $expr1, nation.n_nationkey, partsupp.ps_suppkey, supplier.s_suppkey] } + | └─StreamShare { id = 13 } + | └─StreamHashJoin { type: Inner, predicate: nation.n_nationkey = supplier.s_nationkey, output: [partsupp.ps_partkey, partsupp.ps_availqty, partsupp.ps_supplycost, nation.n_nationkey, partsupp.ps_suppkey, supplier.s_suppkey] } + | ├─StreamExchange { dist: HashShard(nation.n_nationkey) } + | | └─StreamProject { exprs: [nation.n_nationkey] } + | | └─StreamFilter { predicate: (nation.n_name = 'ARGENTINA':Varchar) } + | | └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name], pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } + | └─StreamExchange { dist: HashShard(supplier.s_nationkey) } + | └─StreamHashJoin { type: Inner, predicate: partsupp.ps_suppkey = supplier.s_suppkey, output: [partsupp.ps_partkey, partsupp.ps_availqty, partsupp.ps_supplycost, supplier.s_nationkey, partsupp.ps_suppkey, supplier.s_suppkey] } + | ├─StreamExchange { dist: HashShard(partsupp.ps_suppkey) } + | | └─StreamTableScan { table: partsupp, columns: [partsupp.ps_partkey, partsupp.ps_suppkey, partsupp.ps_availqty, partsupp.ps_supplycost], pk: [partsupp.ps_partkey, partsupp.ps_suppkey], dist: UpstreamHashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } + | └─StreamExchange { dist: HashShard(supplier.s_suppkey) } + | └─StreamTableScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_nationkey], pk: [supplier.s_suppkey], dist: UpstreamHashShard(supplier.s_suppkey) } + └─StreamExchange { dist: Broadcast } + └─StreamProject { exprs: [(sum(sum($expr2)) * 0.0001000000:Decimal) as $expr3] } + └─StreamGlobalSimpleAgg { aggs: [sum(sum($expr2)), count] } + └─StreamExchange { dist: Single } + └─StreamStatelessLocalSimpleAgg { aggs: [sum($expr2)] } + └─StreamProject { exprs: [(partsupp.ps_supplycost * partsupp.ps_availqty) as $expr2, nation.n_nationkey, partsupp.ps_partkey, partsupp.ps_suppkey, supplier.s_suppkey] } + └─StreamShare { id = 13 } + └─StreamHashJoin { type: Inner, predicate: nation.n_nationkey = supplier.s_nationkey, output: [partsupp.ps_partkey, partsupp.ps_availqty, partsupp.ps_supplycost, nation.n_nationkey, partsupp.ps_suppkey, supplier.s_suppkey] } + ├─StreamExchange { dist: HashShard(nation.n_nationkey) } + | └─StreamProject { exprs: [nation.n_nationkey] } + | └─StreamFilter { predicate: (nation.n_name = 'ARGENTINA':Varchar) } + | └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name], pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } + └─StreamExchange { dist: HashShard(supplier.s_nationkey) } + └─StreamHashJoin { type: Inner, predicate: partsupp.ps_suppkey = supplier.s_suppkey, output: [partsupp.ps_partkey, partsupp.ps_availqty, partsupp.ps_supplycost, supplier.s_nationkey, partsupp.ps_suppkey, supplier.s_suppkey] } + ├─StreamExchange { dist: HashShard(partsupp.ps_suppkey) } + | └─StreamTableScan { table: partsupp, columns: [partsupp.ps_partkey, partsupp.ps_suppkey, partsupp.ps_availqty, partsupp.ps_supplycost], pk: [partsupp.ps_partkey, partsupp.ps_suppkey], dist: UpstreamHashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } + └─StreamExchange { dist: HashShard(supplier.s_suppkey) } + └─StreamTableScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_nationkey], pk: [supplier.s_suppkey], dist: UpstreamHashShard(supplier.s_suppkey) } + with_config_map: + RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' +- id: tpch_q12 + before: + - create_tables + sql: | + select + l_shipmode, + sum(case + when o_orderpriority = '1-URGENT' + or o_orderpriority = '2-HIGH' + then 1 + else 0 + end) as high_line_count, + sum(case + when o_orderpriority <> '1-URGENT' + and o_orderpriority <> '2-HIGH' + then 1 + else 0 + end) as low_line_count + from + orders, + lineitem + where + o_orderkey = l_orderkey + and l_shipmode in ('FOB', 'SHIP') + and l_commitdate < l_receiptdate + and l_shipdate < l_commitdate + and l_receiptdate >= date '1994-01-01' + and l_receiptdate < date '1994-01-01' + interval '1' year + group by + l_shipmode + order by + l_shipmode; + stream_plan: | + StreamMaterialize { columns: [l_shipmode, high_line_count, low_line_count], pk_columns: [l_shipmode], pk_conflict: "no check" } + └─StreamProject { exprs: [lineitem.l_shipmode, sum($expr1), sum($expr2)] } + └─StreamHashAgg { group_key: [lineitem.l_shipmode], aggs: [sum($expr1), sum($expr2), count] } + └─StreamExchange { dist: HashShard(lineitem.l_shipmode) } + └─StreamProject { exprs: [lineitem.l_shipmode, Case(((orders.o_orderpriority = '1-URGENT':Varchar) OR (orders.o_orderpriority = '2-HIGH':Varchar)), 1:Int32, 0:Int32) as $expr1, Case(((orders.o_orderpriority <> '1-URGENT':Varchar) AND (orders.o_orderpriority <> '2-HIGH':Varchar)), 1:Int32, 0:Int32) as $expr2, orders.o_orderkey, lineitem.l_orderkey, lineitem.l_linenumber] } + └─StreamHashJoin { type: Inner, predicate: orders.o_orderkey = lineitem.l_orderkey, output: [orders.o_orderpriority, lineitem.l_shipmode, orders.o_orderkey, lineitem.l_orderkey, lineitem.l_linenumber] } + ├─StreamExchange { dist: HashShard(orders.o_orderkey) } + | └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_orderpriority], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } + └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + └─StreamProject { exprs: [lineitem.l_orderkey, lineitem.l_shipmode, lineitem.l_linenumber] } + └─StreamFilter { predicate: In(lineitem.l_shipmode, 'FOB':Varchar, 'SHIP':Varchar) AND (lineitem.l_commitdate < lineitem.l_receiptdate) AND (lineitem.l_shipdate < lineitem.l_commitdate) AND (lineitem.l_receiptdate >= '1994-01-01':Date) AND (lineitem.l_receiptdate < '1995-01-01 00:00:00':Timestamp) } + └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_shipmode, lineitem.l_linenumber, lineitem.l_shipdate, lineitem.l_commitdate, lineitem.l_receiptdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + with_config_map: + RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' +- id: tpch_q13 + before: + - create_tables + sql: | + select + c_count, + count(*) as custdist + from + ( + select + c_custkey, + count(o_orderkey) as c_count + from + customer left outer join orders on + c_custkey = o_custkey + and o_comment not like '%:1%:2%' + group by + c_custkey + ) as c_orders (c_custkey, c_count) + group by + c_count + order by + custdist desc, + c_count desc; + stream_plan: | + StreamMaterialize { columns: [c_count, custdist], pk_columns: [c_count], order_descs: [custdist, c_count], pk_conflict: "no check" } + └─StreamHashAgg { group_key: [count(orders.o_orderkey)], aggs: [count] } + └─StreamExchange { dist: HashShard(count(orders.o_orderkey)) } + └─StreamProject { exprs: [customer.c_custkey, count(orders.o_orderkey)] } + └─StreamHashAgg { group_key: [customer.c_custkey], aggs: [count(orders.o_orderkey), count] } + └─StreamHashJoin { type: LeftOuter, predicate: customer.c_custkey = orders.o_custkey, output: [customer.c_custkey, orders.o_orderkey] } + ├─StreamExchange { dist: HashShard(customer.c_custkey) } + | └─StreamTableScan { table: customer, columns: [customer.c_custkey], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } + └─StreamExchange { dist: HashShard(orders.o_custkey) } + └─StreamProject { exprs: [orders.o_orderkey, orders.o_custkey] } + └─StreamFilter { predicate: Not(Like(orders.o_comment, '%:1%:2%':Varchar)) } + └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey, orders.o_comment], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } + with_config_map: + RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' +- id: tpch_q14 + before: + - create_tables + sql: | + select + 100.00 * sum(case + when p_type like 'PROMO%' + then l_extendedprice * (1 - l_discount) + else 0 + end) / sum(l_extendedprice * (1 - l_discount)) as promo_revenue + from + lineitem, + part + where + l_partkey = p_partkey + and l_shipdate >= date '1995-09-01' + and l_shipdate < date '1995-09-01' + interval '1' month; + stream_plan: | + StreamMaterialize { columns: [promo_revenue], pk_columns: [], pk_conflict: "no check" } + └─StreamProject { exprs: [((100.00:Decimal * sum(sum($expr1))) / sum(sum($expr2))) as $expr3] } + └─StreamGlobalSimpleAgg { aggs: [sum(sum($expr1)), sum(sum($expr2)), count] } + └─StreamExchange { dist: Single } + └─StreamStatelessLocalSimpleAgg { aggs: [sum($expr1), sum($expr2)] } + └─StreamProject { exprs: [Case(Like(part.p_type, 'PROMO%':Varchar), (lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)), 0:Decimal) as $expr1, (lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) as $expr2, lineitem.l_orderkey, lineitem.l_linenumber, part.p_partkey, lineitem.l_partkey] } + └─StreamHashJoin { type: Inner, predicate: lineitem.l_partkey = part.p_partkey, output: [lineitem.l_extendedprice, lineitem.l_discount, part.p_type, lineitem.l_orderkey, lineitem.l_linenumber, lineitem.l_partkey, part.p_partkey] } + ├─StreamExchange { dist: HashShard(lineitem.l_partkey) } + | └─StreamProject { exprs: [lineitem.l_partkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_orderkey, lineitem.l_linenumber] } + | └─StreamFilter { predicate: (lineitem.l_shipdate >= '1995-09-01':Date) AND (lineitem.l_shipdate < '1995-10-01 00:00:00':Timestamp) } + | └─StreamTableScan { table: lineitem, columns: [lineitem.l_partkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_orderkey, lineitem.l_linenumber, lineitem.l_shipdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + └─StreamExchange { dist: HashShard(part.p_partkey) } + └─StreamTableScan { table: part, columns: [part.p_partkey, part.p_type], pk: [part.p_partkey], dist: UpstreamHashShard(part.p_partkey) } + with_config_map: + RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' +- id: tpch_q15 + before: + - create_tables + sql: | + with revenue0 (supplier_no, total_revenue) as ( + select + l_suppkey, + sum(l_extendedprice * (1 - l_discount)) + from + lineitem + where + l_shipdate >= date '1993-01-01' + and l_shipdate < date '1993-01-01' + interval '3' month + group by + l_suppkey + ) + select + s_suppkey, + s_name, + s_address, + s_phone, + total_revenue + from + supplier, + revenue0 + where + s_suppkey = supplier_no + and total_revenue = ( + select + max(total_revenue) + from + revenue0 + ) + order by + s_suppkey; + stream_plan: | + StreamMaterialize { columns: [s_suppkey, s_name, s_address, s_phone, total_revenue, max(max(sum($expr1)))(hidden), lineitem.l_suppkey(hidden)], pk_columns: [s_suppkey, lineitem.l_suppkey, max(max(sum($expr1)))], pk_conflict: "no check" } + └─StreamHashJoin { type: Inner, predicate: max(max(sum($expr1))) = sum($expr1), output: [supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_phone, sum($expr1), max(max(sum($expr1))), lineitem.l_suppkey] } + ├─StreamExchange { dist: HashShard(max(max(sum($expr1)))) } + | └─StreamProject { exprs: [max(max(sum($expr1)))] } + | └─StreamGlobalSimpleAgg { aggs: [max(max(sum($expr1))), count] } + | └─StreamExchange { dist: Single } + | └─StreamHashAgg { group_key: [$expr2], aggs: [max(sum($expr1)), count] } + | └─StreamProject { exprs: [lineitem.l_suppkey, sum($expr1), Vnode(lineitem.l_suppkey) as $expr2] } + | └─StreamShare { id = 10 } + | └─StreamProject { exprs: [lineitem.l_suppkey, sum($expr1)] } + | └─StreamHashAgg { group_key: [lineitem.l_suppkey], aggs: [sum($expr1), count] } + | └─StreamExchange { dist: HashShard(lineitem.l_suppkey) } + | └─StreamProject { exprs: [lineitem.l_suppkey, (lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) as $expr1, lineitem.l_orderkey, lineitem.l_linenumber] } + | └─StreamFilter { predicate: (lineitem.l_shipdate >= '1993-01-01':Date) AND (lineitem.l_shipdate < '1993-04-01 00:00:00':Timestamp) } + | └─StreamTableScan { table: lineitem, columns: [lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_orderkey, lineitem.l_linenumber, lineitem.l_shipdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + └─StreamExchange { dist: HashShard(sum($expr1)) } + └─StreamHashJoin { type: Inner, predicate: supplier.s_suppkey = lineitem.l_suppkey, output: [supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_phone, sum($expr1), lineitem.l_suppkey] } + ├─StreamExchange { dist: HashShard(supplier.s_suppkey) } + | └─StreamTableScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_phone], pk: [supplier.s_suppkey], dist: UpstreamHashShard(supplier.s_suppkey) } + └─StreamShare { id = 10 } + └─StreamProject { exprs: [lineitem.l_suppkey, sum($expr1)] } + └─StreamHashAgg { group_key: [lineitem.l_suppkey], aggs: [sum($expr1), count] } + └─StreamExchange { dist: HashShard(lineitem.l_suppkey) } + └─StreamProject { exprs: [lineitem.l_suppkey, (lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) as $expr1, lineitem.l_orderkey, lineitem.l_linenumber] } + └─StreamFilter { predicate: (lineitem.l_shipdate >= '1993-01-01':Date) AND (lineitem.l_shipdate < '1993-04-01 00:00:00':Timestamp) } + └─StreamTableScan { table: lineitem, columns: [lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_orderkey, lineitem.l_linenumber, lineitem.l_shipdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + with_config_map: + RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' +- id: tpch_q16 + before: + - create_tables + sql: | + select + p_brand, + p_type, + p_size, + count(distinct ps_suppkey) as supplier_cnt + from + partsupp, + part + where + p_partkey = ps_partkey + and p_brand <> 'Brand#45' + and p_type not like 'SMALL PLATED%' + and p_size in (19, 17, 16, 23, 10, 4, 38, 11) + and ps_suppkey not in ( + select + s_suppkey + from + supplier + where + s_comment like '%Customer%Complaints%' + ) + group by + p_brand, + p_type, + p_size + order by + supplier_cnt desc, + p_brand, + p_type, + p_size; + stream_plan: | + StreamMaterialize { columns: [p_brand, p_type, p_size, supplier_cnt], pk_columns: [p_brand, p_type, p_size], order_descs: [supplier_cnt, p_brand, p_type, p_size], pk_conflict: "no check" } + └─StreamProject { exprs: [part.p_brand, part.p_type, part.p_size, count(distinct partsupp.ps_suppkey)] } + └─StreamHashAgg { group_key: [part.p_brand, part.p_type, part.p_size], aggs: [count(distinct partsupp.ps_suppkey), count] } + └─StreamExchange { dist: HashShard(part.p_brand, part.p_type, part.p_size) } + └─StreamHashJoin { type: LeftAnti, predicate: partsupp.ps_suppkey = supplier.s_suppkey, output: [part.p_brand, part.p_type, part.p_size, partsupp.ps_suppkey, partsupp.ps_partkey, part.p_partkey] } + ├─StreamExchange { dist: HashShard(partsupp.ps_suppkey) } + | └─StreamHashJoin { type: Inner, predicate: partsupp.ps_partkey = part.p_partkey, output: [partsupp.ps_suppkey, part.p_brand, part.p_type, part.p_size, partsupp.ps_partkey, part.p_partkey] } + | ├─StreamExchange { dist: HashShard(partsupp.ps_partkey) } + | | └─StreamTableScan { table: partsupp, columns: [partsupp.ps_partkey, partsupp.ps_suppkey], pk: [partsupp.ps_partkey, partsupp.ps_suppkey], dist: UpstreamHashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } + | └─StreamExchange { dist: HashShard(part.p_partkey) } + | └─StreamFilter { predicate: (part.p_brand <> 'Brand#45':Varchar) AND Not(Like(part.p_type, 'SMALL PLATED%':Varchar)) AND In(part.p_size, 19:Int32, 17:Int32, 16:Int32, 23:Int32, 10:Int32, 4:Int32, 38:Int32, 11:Int32) } + | └─StreamTableScan { table: part, columns: [part.p_partkey, part.p_brand, part.p_type, part.p_size], pk: [part.p_partkey], dist: UpstreamHashShard(part.p_partkey) } + └─StreamExchange { dist: HashShard(supplier.s_suppkey) } + └─StreamProject { exprs: [supplier.s_suppkey] } + └─StreamFilter { predicate: Like(supplier.s_comment, '%Customer%Complaints%':Varchar) } + └─StreamTableScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_comment], pk: [supplier.s_suppkey], dist: UpstreamHashShard(supplier.s_suppkey) } + with_config_map: + RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' +- id: tpch_q17 + before: + - create_tables + sql: | + select + ROUND(sum(l_extendedprice) / 7.0, 16) as avg_yearly + from + lineitem, + part + where + p_partkey = l_partkey + and p_brand = 'Brand#13' + and p_container = 'JUMBO PKG' + and l_quantity < ( + select + 0.2 * avg(l_quantity) + from + lineitem + where + l_partkey = p_partkey + ); + stream_plan: | + StreamMaterialize { columns: [avg_yearly], pk_columns: [], pk_conflict: "no check" } + └─StreamProject { exprs: [RoundDigit((sum(sum(lineitem.l_extendedprice)) / 7.0:Decimal), 16:Int32) as $expr2] } + └─StreamGlobalSimpleAgg { aggs: [sum(sum(lineitem.l_extendedprice)), count] } + └─StreamExchange { dist: Single } + └─StreamStatelessLocalSimpleAgg { aggs: [sum(lineitem.l_extendedprice)] } + └─StreamProject { exprs: [lineitem.l_extendedprice, part.p_partkey, lineitem.l_orderkey, lineitem.l_linenumber, part.p_partkey, lineitem.l_partkey] } + └─StreamFilter { predicate: (lineitem.l_quantity < $expr1) } + └─StreamHashJoin { type: Inner, predicate: part.p_partkey IS NOT DISTINCT FROM part.p_partkey, output: all } + ├─StreamProject { exprs: [part.p_partkey, (0.2:Decimal * (sum(lineitem.l_quantity) / count(lineitem.l_quantity))) as $expr1] } + | └─StreamHashAgg { group_key: [part.p_partkey], aggs: [sum(lineitem.l_quantity), count(lineitem.l_quantity), count] } + | └─StreamHashJoin { type: LeftOuter, predicate: part.p_partkey IS NOT DISTINCT FROM lineitem.l_partkey, output: [part.p_partkey, lineitem.l_quantity, lineitem.l_orderkey, lineitem.l_linenumber] } + | ├─StreamExchange { dist: HashShard(part.p_partkey) } + | | └─StreamProject { exprs: [part.p_partkey] } + | | └─StreamHashAgg { group_key: [part.p_partkey], aggs: [count] } + | | └─StreamProject { exprs: [part.p_partkey] } + | | └─StreamFilter { predicate: (part.p_brand = 'Brand#13':Varchar) AND (part.p_container = 'JUMBO PKG':Varchar) } + | | └─StreamTableScan { table: part, columns: [part.p_partkey, part.p_brand, part.p_container], pk: [part.p_partkey], dist: UpstreamHashShard(part.p_partkey) } + | └─StreamExchange { dist: HashShard(lineitem.l_partkey) } + | └─StreamFilter { predicate: IsNotNull(lineitem.l_partkey) } + | └─StreamTableScan { table: lineitem, columns: [lineitem.l_partkey, lineitem.l_quantity, lineitem.l_orderkey, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + └─StreamExchange { dist: HashShard(part.p_partkey) } + └─StreamHashJoin { type: Inner, predicate: lineitem.l_partkey = part.p_partkey, output: [lineitem.l_quantity, lineitem.l_extendedprice, part.p_partkey, lineitem.l_orderkey, lineitem.l_linenumber, lineitem.l_partkey] } + ├─StreamExchange { dist: HashShard(lineitem.l_partkey) } + | └─StreamTableScan { table: lineitem, columns: [lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_orderkey, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + └─StreamExchange { dist: HashShard(part.p_partkey) } + └─StreamProject { exprs: [part.p_partkey] } + └─StreamFilter { predicate: (part.p_brand = 'Brand#13':Varchar) AND (part.p_container = 'JUMBO PKG':Varchar) } + └─StreamTableScan { table: part, columns: [part.p_partkey, part.p_brand, part.p_container], pk: [part.p_partkey], dist: UpstreamHashShard(part.p_partkey) } + with_config_map: + RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' +- id: tpch_q18 + before: + - create_tables + sql: | + select + c_name, + c_custkey, + o_orderkey, + o_orderdate, + o_totalprice, + sum(l_quantity) quantity + from + customer, + orders, + lineitem + where + o_orderkey in ( + select + l_orderkey + from + lineitem + group by + l_orderkey + having + sum(l_quantity) > 1 + ) + and c_custkey = o_custkey + and o_orderkey = l_orderkey + group by + c_name, + c_custkey, + o_orderkey, + o_orderdate, + o_totalprice + order by + o_totalprice desc, + o_orderdate + LIMIT 100; + stream_plan: | + StreamMaterialize { columns: [c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice, quantity], pk_columns: [c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice], order_descs: [o_totalprice, o_orderdate, c_name, c_custkey, o_orderkey], pk_conflict: "no check" } + └─StreamProject { exprs: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, sum(lineitem.l_quantity)] } + └─StreamTopN { order: "[orders.o_totalprice DESC, orders.o_orderdate ASC]", limit: 100, offset: 0 } + └─StreamExchange { dist: Single } + └─StreamGroupTopN { order: "[orders.o_totalprice DESC, orders.o_orderdate ASC]", limit: 100, offset: 0, group_key: [6] } + └─StreamProject { exprs: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, sum(lineitem.l_quantity), Vnode(orders.o_orderkey) as $expr1] } + └─StreamProject { exprs: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, sum(lineitem.l_quantity)] } + └─StreamHashAgg { group_key: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice], aggs: [sum(lineitem.l_quantity), count] } + └─StreamHashJoin { type: LeftSemi, predicate: orders.o_orderkey = lineitem.l_orderkey, output: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, lineitem.l_quantity, lineitem.l_orderkey, lineitem.l_linenumber] } + ├─StreamExchange { dist: HashShard(orders.o_orderkey) } + | └─StreamHashJoin { type: Inner, predicate: lineitem.l_orderkey = orders.o_orderkey, output: [customer.c_custkey, customer.c_name, orders.o_orderkey, orders.o_totalprice, orders.o_orderdate, lineitem.l_quantity, lineitem.l_orderkey, lineitem.l_linenumber] } + | ├─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + | | └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_quantity, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + | └─StreamExchange { dist: HashShard(orders.o_orderkey) } + | └─StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey, output: [customer.c_custkey, customer.c_name, orders.o_orderkey, orders.o_totalprice, orders.o_orderdate] } + | ├─StreamExchange { dist: HashShard(customer.c_custkey) } + | | └─StreamTableScan { table: customer, columns: [customer.c_custkey, customer.c_name], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } + | └─StreamExchange { dist: HashShard(orders.o_custkey) } + | └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey, orders.o_totalprice, orders.o_orderdate], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } + └─StreamProject { exprs: [lineitem.l_orderkey] } + └─StreamFilter { predicate: (sum(lineitem.l_quantity) > 1:Int32) } + └─StreamProject { exprs: [lineitem.l_orderkey, sum(lineitem.l_quantity)] } + └─StreamHashAgg { group_key: [lineitem.l_orderkey], aggs: [sum(lineitem.l_quantity), count] } + └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_quantity, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + with_config_map: + RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' +- id: tpch_q19 + before: + - create_tables + sql: | + select + sum(l_extendedprice* (1 - l_discount)) as revenue + from + lineitem, + part + where + ( + p_partkey = l_partkey + and p_brand = 'Brand#52' + and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG') + and l_quantity >= 1 and l_quantity <= 11 + and p_size between 1 and 5 + and l_shipmode in ('AIR', 'AIR REG') + and l_shipinstruct = 'DELIVER IN PERSON' + ) + or + ( + p_partkey = l_partkey + and p_brand = 'Brand#24' + and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK') + and l_quantity >= 30 and l_quantity <= 40 + and p_size between 1 and 10 + and l_shipmode in ('AIR', 'AIR REG') + and l_shipinstruct = 'DELIVER IN PERSON' + ) + or + ( + p_partkey = l_partkey + and p_brand = 'Brand#32' + and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG') + and l_quantity >= 10 and l_quantity <= 20 + and p_size between 1 and 15 + and l_shipmode in ('AIR', 'AIR REG') + and l_shipinstruct = 'DELIVER IN PERSON' + ); + stream_plan: | + StreamMaterialize { columns: [revenue], pk_columns: [], pk_conflict: "no check" } + └─StreamProject { exprs: [sum(sum($expr1))] } + └─StreamGlobalSimpleAgg { aggs: [sum(sum($expr1)), count] } + └─StreamExchange { dist: Single } + └─StreamStatelessLocalSimpleAgg { aggs: [sum($expr1)] } + └─StreamProject { exprs: [(lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) as $expr1, lineitem.l_orderkey, lineitem.l_linenumber, part.p_partkey, lineitem.l_partkey] } + └─StreamFilter { predicate: (((((((part.p_brand = 'Brand#52':Varchar) AND In(part.p_container, 'SM CASE':Varchar, 'SM BOX':Varchar, 'SM PACK':Varchar, 'SM PKG':Varchar)) AND (lineitem.l_quantity >= 1:Int32)) AND (lineitem.l_quantity <= 11:Int32)) AND (part.p_size <= 5:Int32)) OR (((((part.p_brand = 'Brand#24':Varchar) AND In(part.p_container, 'MED BAG':Varchar, 'MED BOX':Varchar, 'MED PKG':Varchar, 'MED PACK':Varchar)) AND (lineitem.l_quantity >= 30:Int32)) AND (lineitem.l_quantity <= 40:Int32)) AND (part.p_size <= 10:Int32))) OR (((((part.p_brand = 'Brand#32':Varchar) AND In(part.p_container, 'LG CASE':Varchar, 'LG BOX':Varchar, 'LG PACK':Varchar, 'LG PKG':Varchar)) AND (lineitem.l_quantity >= 10:Int32)) AND (lineitem.l_quantity <= 20:Int32)) AND (part.p_size <= 15:Int32))) } + └─StreamHashJoin { type: Inner, predicate: lineitem.l_partkey = part.p_partkey, output: all } + ├─StreamExchange { dist: HashShard(lineitem.l_partkey) } + | └─StreamProject { exprs: [lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_orderkey, lineitem.l_linenumber] } + | └─StreamFilter { predicate: In(lineitem.l_shipmode, 'AIR':Varchar, 'AIR REG':Varchar) AND (lineitem.l_shipinstruct = 'DELIVER IN PERSON':Varchar) } + | └─StreamTableScan { table: lineitem, columns: [lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_orderkey, lineitem.l_linenumber, lineitem.l_shipinstruct, lineitem.l_shipmode], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + └─StreamExchange { dist: HashShard(part.p_partkey) } + └─StreamFilter { predicate: (part.p_size >= 1:Int32) } + └─StreamTableScan { table: part, columns: [part.p_partkey, part.p_brand, part.p_size, part.p_container], pk: [part.p_partkey], dist: UpstreamHashShard(part.p_partkey) } + with_config_map: + RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' +- id: tpch_q20 + before: + - create_tables + sql: | + select + s_name, + s_address + from + supplier, + nation + where + s_suppkey in ( + select + ps_suppkey + from + partsupp + where + ps_partkey in ( + select + p_partkey + from + part + where + p_name like 'forest%' + ) + and ps_availqty > ( + select + 0.5 * sum(l_quantity) + from + lineitem + where + l_partkey = ps_partkey + and l_suppkey = ps_suppkey + and l_shipdate >= date '1994-01-01' + and l_shipdate < date '1994-01-01' + interval '1' year + ) + ) + and s_nationkey = n_nationkey + and n_name = 'KENYA' + order by + s_name; + stream_plan: | + StreamMaterialize { columns: [s_name, s_address, supplier.s_suppkey(hidden), nation.n_nationkey(hidden), supplier.s_nationkey(hidden)], pk_columns: [supplier.s_suppkey, nation.n_nationkey, supplier.s_nationkey], order_descs: [s_name, supplier.s_suppkey, nation.n_nationkey, supplier.s_nationkey], pk_conflict: "no check" } + └─StreamHashJoin { type: LeftSemi, predicate: supplier.s_suppkey = partsupp.ps_suppkey, output: [supplier.s_name, supplier.s_address, supplier.s_suppkey, nation.n_nationkey, supplier.s_nationkey] } + ├─StreamExchange { dist: HashShard(supplier.s_suppkey) } + | └─StreamHashJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: all } + | ├─StreamExchange { dist: HashShard(supplier.s_nationkey) } + | | └─StreamTableScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_nationkey], pk: [supplier.s_suppkey], dist: UpstreamHashShard(supplier.s_suppkey) } + | └─StreamExchange { dist: HashShard(nation.n_nationkey) } + | └─StreamProject { exprs: [nation.n_nationkey] } + | └─StreamFilter { predicate: (nation.n_name = 'KENYA':Varchar) } + | └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name], pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } + └─StreamExchange { dist: HashShard(partsupp.ps_suppkey) } + └─StreamProject { exprs: [partsupp.ps_suppkey, partsupp.ps_partkey, partsupp.ps_partkey, partsupp.ps_suppkey] } + └─StreamFilter { predicate: ($expr1 > $expr2) } + └─StreamHashJoin { type: Inner, predicate: partsupp.ps_partkey IS NOT DISTINCT FROM partsupp.ps_partkey AND partsupp.ps_suppkey IS NOT DISTINCT FROM partsupp.ps_suppkey, output: all } + ├─StreamExchange { dist: HashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } + | └─StreamProject { exprs: [partsupp.ps_partkey, partsupp.ps_suppkey, partsupp.ps_availqty::Decimal as $expr1] } + | └─StreamHashJoin { type: LeftSemi, predicate: partsupp.ps_partkey = part.p_partkey, output: all } + | ├─StreamExchange { dist: HashShard(partsupp.ps_partkey) } + | | └─StreamTableScan { table: partsupp, columns: [partsupp.ps_partkey, partsupp.ps_suppkey, partsupp.ps_availqty], pk: [partsupp.ps_partkey, partsupp.ps_suppkey], dist: UpstreamHashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } + | └─StreamExchange { dist: HashShard(part.p_partkey) } + | └─StreamProject { exprs: [part.p_partkey] } + | └─StreamFilter { predicate: Like(part.p_name, 'forest%':Varchar) } + | └─StreamTableScan { table: part, columns: [part.p_partkey, part.p_name], pk: [part.p_partkey], dist: UpstreamHashShard(part.p_partkey) } + └─StreamProject { exprs: [partsupp.ps_partkey, partsupp.ps_suppkey, (0.5:Decimal * sum(lineitem.l_quantity)) as $expr2] } + └─StreamHashAgg { group_key: [partsupp.ps_partkey, partsupp.ps_suppkey], aggs: [sum(lineitem.l_quantity), count] } + └─StreamHashJoin { type: LeftOuter, predicate: partsupp.ps_partkey IS NOT DISTINCT FROM lineitem.l_partkey AND partsupp.ps_suppkey IS NOT DISTINCT FROM lineitem.l_suppkey, output: [partsupp.ps_partkey, partsupp.ps_suppkey, lineitem.l_quantity, lineitem.l_orderkey, lineitem.l_linenumber] } + ├─StreamExchange { dist: HashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } + | └─StreamProject { exprs: [partsupp.ps_partkey, partsupp.ps_suppkey] } + | └─StreamHashAgg { group_key: [partsupp.ps_partkey, partsupp.ps_suppkey], aggs: [count] } + | └─StreamTableScan { table: partsupp, columns: [partsupp.ps_partkey, partsupp.ps_suppkey], pk: [partsupp.ps_partkey, partsupp.ps_suppkey], dist: UpstreamHashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } + └─StreamExchange { dist: HashShard(lineitem.l_partkey, lineitem.l_suppkey) } + └─StreamProject { exprs: [lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_orderkey, lineitem.l_linenumber] } + └─StreamFilter { predicate: IsNotNull(lineitem.l_partkey) AND IsNotNull(lineitem.l_suppkey) AND (lineitem.l_shipdate >= '1994-01-01':Date) AND (lineitem.l_shipdate < '1995-01-01 00:00:00':Timestamp) } + └─StreamTableScan { table: lineitem, columns: [lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_orderkey, lineitem.l_linenumber, lineitem.l_shipdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + with_config_map: + RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' +- id: tpch_q21 + before: + - create_tables + sql: | + select + s_name, + count(*) as numwait + from + supplier, + lineitem l1, + orders, + nation + where + s_suppkey = l1.l_suppkey + and o_orderkey = l1.l_orderkey + and o_orderstatus = 'F' + and l1.l_receiptdate > l1.l_commitdate + and exists ( + select + * + from + lineitem l2 + where + l2.l_orderkey = l1.l_orderkey + and l2.l_suppkey <> l1.l_suppkey + ) + and not exists ( + select + * + from + lineitem l3 + where + l3.l_orderkey = l1.l_orderkey + and l3.l_suppkey <> l1.l_suppkey + and l3.l_receiptdate > l3.l_commitdate + ) + and s_nationkey = n_nationkey + and n_name = 'GERMANY' + group by + s_name + order by + numwait desc, + s_name + LIMIT 100; + stream_plan: | + StreamMaterialize { columns: [s_name, numwait], pk_columns: [s_name], order_descs: [numwait, s_name], pk_conflict: "no check" } + └─StreamProject { exprs: [supplier.s_name, count] } + └─StreamTopN { order: "[count DESC, supplier.s_name ASC]", limit: 100, offset: 0 } + └─StreamExchange { dist: Single } + └─StreamGroupTopN { order: "[count DESC, supplier.s_name ASC]", limit: 100, offset: 0, group_key: [2] } + └─StreamProject { exprs: [supplier.s_name, count, Vnode(supplier.s_name) as $expr1] } + └─StreamHashAgg { group_key: [supplier.s_name], aggs: [count] } + └─StreamExchange { dist: HashShard(supplier.s_name) } + └─StreamHashJoin { type: LeftAnti, predicate: lineitem.l_orderkey = lineitem.l_orderkey AND (lineitem.l_suppkey <> lineitem.l_suppkey), output: [supplier.s_name, nation.n_nationkey, supplier.s_suppkey, orders.o_orderkey, lineitem.l_orderkey, lineitem.l_linenumber] } + ├─StreamHashJoin { type: LeftSemi, predicate: lineitem.l_orderkey = lineitem.l_orderkey AND (lineitem.l_suppkey <> lineitem.l_suppkey), output: all } + | ├─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + | | └─StreamHashJoin { type: Inner, predicate: supplier.s_suppkey = lineitem.l_suppkey, output: [supplier.s_name, lineitem.l_orderkey, lineitem.l_suppkey, nation.n_nationkey, supplier.s_suppkey, orders.o_orderkey, lineitem.l_linenumber] } + | | ├─StreamExchange { dist: HashShard(supplier.s_suppkey) } + | | | └─StreamHashJoin { type: Inner, predicate: nation.n_nationkey = supplier.s_nationkey, output: [supplier.s_suppkey, supplier.s_name, nation.n_nationkey] } + | | | ├─StreamExchange { dist: HashShard(nation.n_nationkey) } + | | | | └─StreamProject { exprs: [nation.n_nationkey] } + | | | | └─StreamFilter { predicate: (nation.n_name = 'GERMANY':Varchar) } + | | | | └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name], pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } + | | | └─StreamExchange { dist: HashShard(supplier.s_nationkey) } + | | | └─StreamTableScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_name, supplier.s_nationkey], pk: [supplier.s_suppkey], dist: UpstreamHashShard(supplier.s_suppkey) } + | | └─StreamExchange { dist: HashShard(lineitem.l_suppkey) } + | | └─StreamHashJoin { type: Inner, predicate: orders.o_orderkey = lineitem.l_orderkey, output: [lineitem.l_orderkey, lineitem.l_suppkey, orders.o_orderkey, lineitem.l_linenumber] } + | | ├─StreamExchange { dist: HashShard(orders.o_orderkey) } + | | | └─StreamProject { exprs: [orders.o_orderkey] } + | | | └─StreamFilter { predicate: (orders.o_orderstatus = 'F':Varchar) } + | | | └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_orderstatus], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } + | | └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + | | └─StreamProject { exprs: [lineitem.l_orderkey, lineitem.l_suppkey, lineitem.l_linenumber] } + | | └─StreamFilter { predicate: (lineitem.l_receiptdate > lineitem.l_commitdate) } + | | └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_suppkey, lineitem.l_linenumber, lineitem.l_commitdate, lineitem.l_receiptdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + | └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + | └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_suppkey, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + └─StreamProject { exprs: [lineitem.l_orderkey, lineitem.l_suppkey, lineitem.l_linenumber] } + └─StreamFilter { predicate: (lineitem.l_receiptdate > lineitem.l_commitdate) } + └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_suppkey, lineitem.l_linenumber, lineitem.l_commitdate, lineitem.l_receiptdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + with_config_map: + RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' +- id: tpch_q22 + before: + - create_tables + sql: | + select + cntrycode, + count(*) as numcust, + sum(c_acctbal) as totacctbal + from + ( + select + substring(c_phone from 1 for 2) as cntrycode, + c_acctbal + from + customer + where + substring(c_phone from 1 for 2) in + ('30', '24', '31', '38', '25', '34', '37') + and c_acctbal > ( + select + avg(c_acctbal) + from + customer + where + c_acctbal > 0.00::numeric + and substring(c_phone from 1 for 2) in + ('30', '24', '31', '38', '25', '34', '37') + ) + and not exists ( + select + * + from + orders + where + o_custkey = c_custkey + ) + ) as custsale + group by + cntrycode + order by + cntrycode; + stream_plan: | + StreamMaterialize { columns: [cntrycode, numcust, totacctbal], pk_columns: [cntrycode], pk_conflict: "no check" } + └─StreamHashAgg { group_key: [$expr2], aggs: [count, sum(customer.c_acctbal)] } + └─StreamExchange { dist: HashShard($expr2) } + └─StreamProject { exprs: [Substr(customer.c_phone, 1:Int32, 2:Int32) as $expr2, customer.c_acctbal, customer.c_custkey] } + └─StreamDynamicFilter { predicate: (customer.c_acctbal > $expr1), output: [customer.c_phone, customer.c_acctbal, customer.c_custkey] } + ├─StreamHashJoin { type: LeftAnti, predicate: customer.c_custkey = orders.o_custkey, output: [customer.c_phone, customer.c_acctbal, customer.c_custkey] } + | ├─StreamExchange { dist: HashShard(customer.c_custkey) } + | | └─StreamFilter { predicate: In(Substr(customer.c_phone, 1:Int32, 2:Int32), '30':Varchar, '24':Varchar, '31':Varchar, '38':Varchar, '25':Varchar, '34':Varchar, '37':Varchar) } + | | └─StreamTableScan { table: customer, columns: [customer.c_custkey, customer.c_phone, customer.c_acctbal], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } + | └─StreamExchange { dist: HashShard(orders.o_custkey) } + | └─StreamTableScan { table: orders, columns: [orders.o_custkey, orders.o_orderkey], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } + └─StreamExchange { dist: Broadcast } + └─StreamProject { exprs: [(sum(sum(customer.c_acctbal)) / sum0(count(customer.c_acctbal))) as $expr1] } + └─StreamGlobalSimpleAgg { aggs: [sum(sum(customer.c_acctbal)), sum0(count(customer.c_acctbal)), count] } + └─StreamExchange { dist: Single } + └─StreamStatelessLocalSimpleAgg { aggs: [sum(customer.c_acctbal), count(customer.c_acctbal)] } + └─StreamProject { exprs: [customer.c_acctbal, customer.c_custkey] } + └─StreamFilter { predicate: (customer.c_acctbal > 0.00:Decimal) AND In(Substr(customer.c_phone, 1:Int32, 2:Int32), '30':Varchar, '24':Varchar, '31':Varchar, '38':Varchar, '25':Varchar, '34':Varchar, '37':Varchar) } + └─StreamTableScan { table: customer, columns: [customer.c_acctbal, customer.c_custkey, customer.c_phone], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } + with_config_map: + RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' diff --git a/src/frontend/src/optimizer/logical_optimization.rs b/src/frontend/src/optimizer/logical_optimization.rs index 27506513c332c..33efa6dfb4497 100644 --- a/src/frontend/src/optimizer/logical_optimization.rs +++ b/src/frontend/src/optimizer/logical_optimization.rs @@ -159,9 +159,15 @@ lazy_static! { ApplyOrder::TopDown, ); - static ref JOIN_REORDER: OptimizationStage = OptimizationStage::new( + static ref LEFT_DEEP_JOIN_REORDER: OptimizationStage = OptimizationStage::new( "Join Reorder".to_string(), - vec![ReorderMultiJoinRule::create()], + vec![LeftDeepTreeJoinOrderingRule::create()], + ApplyOrder::TopDown, + ); + + static ref BUSHY_TREE_JOIN_REORDER: OptimizationStage = OptimizationStage::new( + "Bushy tree join ordering Rule".to_string(), + vec![BushyTreeJoinOrderingRule::create()], ApplyOrder::TopDown, ); @@ -365,9 +371,17 @@ impl LogicalOptimizer { // their relevant joins. plan = plan.optimize_by_rules(&TO_MULTI_JOIN); - // Reorder multijoin into left-deep join tree. - plan = plan.optimize_by_rules(&JOIN_REORDER); - + // Reorder multijoin into join tree. + if plan + .ctx() + .session_ctx() + .config() + .get_streaming_enable_bushy_join() + { + plan = plan.optimize_by_rules(&BUSHY_TREE_JOIN_REORDER); + } else { + plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_REORDER); + } // Predicate Push-down: apply filter pushdown rules again since we pullup all join // conditions into a filter above the multijoin. plan = Self::predicate_pushdown(plan, explain_trace, &ctx); @@ -438,7 +452,7 @@ impl LogicalOptimizer { plan = plan.optimize_by_rules(&TO_MULTI_JOIN); // Reorder multijoin into left-deep join tree. - plan = plan.optimize_by_rules(&JOIN_REORDER); + plan = plan.optimize_by_rules(&LEFT_DEEP_JOIN_REORDER); // Predicate Push-down: apply filter pushdown rules again since we pullup all join // conditions into a filter above the multijoin. diff --git a/src/frontend/src/optimizer/plan_node/logical_multi_join.rs b/src/frontend/src/optimizer/plan_node/logical_multi_join.rs index e9ff91a48db5e..faab125aea73d 100644 --- a/src/frontend/src/optimizer/plan_node/logical_multi_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_multi_join.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; +use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::fmt; use itertools::Itertools; @@ -483,6 +485,243 @@ impl LogicalMultiJoin { Ok(join_ordering) } + pub fn as_bushy_tree_join(&self) -> Result { + // Join tree internal representation + #[derive(Clone, Default, Debug)] + struct JoinTreeNode { + idx: Option, + left: Option>, + right: Option>, + height: usize, + } + + // join graph internal representation + #[derive(Clone, Debug)] + struct GraphNode { + id: usize, + join_tree: JoinTreeNode, + // use BTreeSet for deterministic + relations: BTreeSet, + } + + let mut nodes: BTreeMap<_, _> = (0..self.inputs.len()) + .map(|idx| GraphNode { + id: idx, + relations: BTreeSet::new(), + join_tree: JoinTreeNode { + idx: Some(idx), + left: None, + right: None, + height: 0, + }, + }) + .enumerate() + .collect(); + let (eq_join_conditions, _) = self + .on + .clone() + .split_by_input_col_nums(&self.input_col_nums(), true); + + for ((src, dst), _) in eq_join_conditions { + nodes.get_mut(&src).unwrap().relations.insert(dst); + nodes.get_mut(&dst).unwrap().relations.insert(src); + } + + // isolated nodes can be joined at any where. + let iso_nodes = nodes + .iter() + .filter_map(|n| { + if n.1.relations.is_empty() { + Some(*n.0) + } else { + None + } + }) + .collect_vec(); + + for n in iso_nodes { + for adj in 0..nodes.len() { + if adj != n { + nodes.get_mut(&n).unwrap().relations.insert(adj); + nodes.get_mut(&adj).unwrap().relations.insert(n); + } + } + } + + let mut optimized_bushy_tree = None; + let mut que = VecDeque::from([nodes]); + let mut isolated = BTreeSet::new(); + + while let Some(mut nodes) = que.pop_front() { + if nodes.len() == 1 { + let node = nodes.into_values().next().unwrap(); + optimized_bushy_tree = Some(optimized_bushy_tree.map_or( + node.clone(), + |old_tree: GraphNode| { + if node.join_tree.height < old_tree.join_tree.height { + node + } else { + old_tree + } + }, + )); + continue; + } + + let (idx, _) = nodes + .iter() + .min_by( + |(_, x), (_, y)| match x.relations.len().cmp(&y.relations.len()) { + Ordering::Less => Ordering::Less, + Ordering::Greater => Ordering::Greater, + Ordering::Equal => x.join_tree.height.cmp(&y.join_tree.height), + }, + ) + .unwrap(); + let n = nodes.remove(&idx.clone()).unwrap(); + + if n.relations.is_empty() { + isolated.insert(n.id); + que.push_back(nodes); + continue; + } + + for merge_node in &n.relations { + let mut nodes = nodes.clone(); + for adjacent_node in &n.relations { + if *adjacent_node != *merge_node { + nodes + .get_mut(adjacent_node) + .unwrap() + .relations + .remove(&n.id); + nodes + .get_mut(adjacent_node) + .unwrap() + .relations + .insert(*merge_node); + nodes + .get_mut(merge_node) + .unwrap() + .relations + .insert(*adjacent_node); + } + } + let mut merge_graph_node = nodes.get_mut(merge_node).unwrap(); + merge_graph_node.relations.remove(&n.id); + let l_tree = n.join_tree.clone(); + let r_tree = std::mem::take(&mut merge_graph_node.join_tree); + let new_height = usize::max(l_tree.height, r_tree.height) + 1; + + if let Some(min_height) = optimized_bushy_tree.as_ref().map(|t| t.join_tree.height) && min_height < new_height { + continue; + } + + merge_graph_node.join_tree = JoinTreeNode { + idx: None, + left: Some(Box::new(l_tree)), + right: Some(Box::new(r_tree)), + height: new_height, + }; + que.push_back(nodes); + } + } + + fn create_logical_join( + s: &LogicalMultiJoin, + mut join_tree: JoinTreeNode, + join_ordering: &mut Vec, + ) -> Result { + Ok(match (join_tree.left.take(), join_tree.right.take()) { + (Some(l), Some(r)) => LogicalJoin::new( + create_logical_join(s, *l, join_ordering)?, + create_logical_join(s, *r, join_ordering)?, + JoinType::Inner, + Condition::true_cond(), + ) + .into(), + (None, None) => { + if let Some(idx) = join_tree.idx { + join_ordering.push(idx); + s.inputs[idx].clone() + } else { + return Err(RwError::from(ErrorCode::InternalError( + "id of the leaf node not found in the join tree".into(), + ))); + } + } + (_, _) => { + return Err(RwError::from(ErrorCode::InternalError( + "only leaf node can have None subtree".into(), + ))) + } + }) + } + + let isolated = isolated.into_iter().collect_vec(); + let mut join_ordering = vec![]; + let mut output = if let Some(optimized_bushy_tree) = optimized_bushy_tree { + let mut output = + create_logical_join(self, optimized_bushy_tree.join_tree, &mut join_ordering)?; + + output = isolated.into_iter().fold(output, |chain, n| { + join_ordering.push(n); + LogicalJoin::new( + chain, + self.inputs[n].clone(), + JoinType::Inner, + Condition::true_cond(), + ) + .into() + }); + output + } else if !isolated.is_empty() { + let base = isolated[0]; + join_ordering.push(isolated[0]); + isolated[1..] + .iter() + .fold(self.inputs[base].clone(), |chain, n| { + join_ordering.push(*n); + LogicalJoin::new( + chain, + self.inputs[*n].clone(), + JoinType::Inner, + Condition::true_cond(), + ) + .into() + }) + } else { + return Err(RwError::from(ErrorCode::InternalError( + "no plan remain".into(), + ))); + }; + let total_col_num = self.inner2output.source_size(); + let reorder_mapping = { + let mut reorder_mapping = vec![None; total_col_num]; + + join_ordering + .iter() + .cloned() + .flat_map(|input_idx| { + (0..self.inputs[input_idx].schema().len()) + .map(move |col_idx| self.inner_i2o_mappings[input_idx].map(col_idx)) + }) + .enumerate() + .for_each(|(tar, src)| reorder_mapping[src] = Some(tar)); + reorder_mapping + }; + output = + LogicalProject::with_out_col_idx(output, reorder_mapping.iter().map(|i| i.unwrap())) + .into(); + + // We will later push down all of the filters back to the individual joins via the + // `FilterJoinRule`. + output = LogicalFilter::create(output, self.on.clone()); + output = + LogicalProject::with_out_col_idx(output, self.output_indices.iter().cloned()).into(); + Ok(output) + } + pub(crate) fn input_col_nums(&self) -> Vec { self.inputs.iter().map(|i| i.schema().len()).collect() } diff --git a/src/frontend/src/optimizer/rule/reorder_multijoin_rule.rs b/src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs similarity index 95% rename from src/frontend/src/optimizer/rule/reorder_multijoin_rule.rs rename to src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs index cdda30f46a7e8..a4e5d3db1d5c4 100644 --- a/src/frontend/src/optimizer/rule/reorder_multijoin_rule.rs +++ b/src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs @@ -17,9 +17,9 @@ use super::Rule; use crate::optimizer::rule::BoxedRule; /// Reorders a multi join into a left deep join via the heuristic ordering -pub struct ReorderMultiJoinRule {} +pub struct LeftDeepTreeJoinOrderingRule {} -impl Rule for ReorderMultiJoinRule { +impl Rule for LeftDeepTreeJoinOrderingRule { fn apply(&self, plan: PlanRef) -> Option { let join = plan.as_logical_multi_join()?; // check if join is inner and can be merged into multijoin @@ -29,9 +29,9 @@ impl Rule for ReorderMultiJoinRule { } } -impl ReorderMultiJoinRule { +impl LeftDeepTreeJoinOrderingRule { pub fn create() -> BoxedRule { - Box::new(ReorderMultiJoinRule {}) + Box::new(LeftDeepTreeJoinOrderingRule {}) } } diff --git a/src/frontend/src/optimizer/rule/mod.rs b/src/frontend/src/optimizer/rule/mod.rs index 78c0fc15f93bc..9380e47c6f3a0 100644 --- a/src/frontend/src/optimizer/rule/mod.rs +++ b/src/frontend/src/optimizer/rule/mod.rs @@ -39,8 +39,8 @@ mod pull_up_correlated_predicate_rule; pub use pull_up_correlated_predicate_rule::*; mod index_delta_join_rule; pub use index_delta_join_rule::*; -mod reorder_multijoin_rule; -pub use reorder_multijoin_rule::*; +mod left_deep_tree_join_ordering_rule; +pub use left_deep_tree_join_ordering_rule::*; mod apply_agg_transpose_rule; pub use apply_agg_transpose_rule::*; mod apply_filter_transpose_rule; @@ -84,6 +84,7 @@ pub use apply_share_eliminate_rule::*; mod top_n_on_index_rule; pub use top_n_on_index_rule::*; mod stream; +pub use stream::bushy_tree_join_ordering_rule::*; pub use stream::filter_with_now_to_join_rule::*; mod trivial_project_to_values_rule; pub use trivial_project_to_values_rule::*; @@ -115,7 +116,7 @@ macro_rules! for_all_rules { , { ProjectJoinMergeRule } , { ProjectMergeRule } , { PullUpCorrelatedPredicateRule } - , { ReorderMultiJoinRule } + , { LeftDeepTreeJoinOrderingRule } , { TranslateApplyRule } , { PushCalculationOfJoinRule } , { IndexSelectionRule } @@ -133,6 +134,7 @@ macro_rules! for_all_rules { , { RewriteLikeExprRule } , { AvoidExchangeShareRule } , { MinMaxOnIndexRule } + , { BushyTreeJoinOrderingRule } } }; } diff --git a/src/frontend/src/optimizer/rule/stream/bushy_tree_join_ordering_rule.rs b/src/frontend/src/optimizer/rule/stream/bushy_tree_join_ordering_rule.rs new file mode 100644 index 0000000000000..cdb9516d44cf7 --- /dev/null +++ b/src/frontend/src/optimizer/rule/stream/bushy_tree_join_ordering_rule.rs @@ -0,0 +1,39 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::super::super::plan_node::*; +use super::super::Rule; +use crate::optimizer::rule::BoxedRule; + +/// Reorders a multi join into a left deep join via the heuristic ordering +pub struct BushyTreeJoinOrderingRule {} + +impl Rule for BushyTreeJoinOrderingRule { + fn apply(&self, plan: PlanRef) -> Option { + let join = plan.as_logical_multi_join()?; + match join.as_bushy_tree_join() { + Ok(plan) => Some(plan), + Err(e) => { + eprintln!("{}", e); + None + } + } + } +} + +impl BushyTreeJoinOrderingRule { + pub fn create() -> BoxedRule { + Box::new(BushyTreeJoinOrderingRule {}) + } +} diff --git a/src/frontend/src/optimizer/rule/stream/mod.rs b/src/frontend/src/optimizer/rule/stream/mod.rs index 39cf0c8d1185f..f1440e913ed9a 100644 --- a/src/frontend/src/optimizer/rule/stream/mod.rs +++ b/src/frontend/src/optimizer/rule/stream/mod.rs @@ -12,4 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub(crate) mod bushy_tree_join_ordering_rule; pub(crate) mod filter_with_now_to_join_rule;