-
Notifications
You must be signed in to change notification settings - Fork 187
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Stageify plan on shuffle boundaries #3781
Conversation
CodSpeed Performance ReportMerging #3781 will not alter performanceComparing Summary
|
4dd14c3
to
4471680
Compare
0f51cc7
to
3ad9ed7
Compare
flowchart TD
subgraph stage_final["Stage final"]
stage_finalSort0["Sort: Sort by = (col(L_SHIPMODE), ascending, nulls last)
Num partitions = 200"]
stage_finalProject1["Project: col(L_SHIPMODE), col((Literal(Int32(1)) if
O_ORDERPRIORITY.is_in(,Literal(Utf8('1-URGENT')),Literal(Utf8('2-HIGH')))
else Literal(Int32(0))).local_sum().local_sum()) as high_line_count,
col((Literal(Int32(1)) if O_ORDERPRIORITY.is_in(,Literal(Utf8('1-
URGENT')),Literal(Utf8('2-HIGH'))).not() else
Literal(Int32(0))).local_sum().local_sum()) as low_line_count
Clustering spec = { Num partitions = 200, By = col(L_SHIPMODE) }"]
stage_finalAggregate2["Aggregation: sum(col((Literal(Int32(1)) if
O_ORDERPRIORITY.is_in(,Literal(Utf8('1-URGENT')),Literal(Utf8('2-
HIGH'))) else Literal(Int32(0))).local_sum()) as (Literal(Int32(1)) if
O_ORDERPRIORITY.is_in(,Literal(Utf8('1-URGENT')),Literal(Utf8('2-HIGH')))
else Literal(Int32(0))).local_sum().local_sum()), sum(col((Literal(Int32(1))
if O_ORDERPRIORITY.is_in(,Literal(Utf8('1-URGENT')),Literal(Utf8('2-
HIGH'))).not() else Literal(Int32(0))).local_sum()) as (Literal(Int32(1)) if
O_ORDERPRIORITY.is_in(,Literal(Utf8('1-URGENT')),Literal(Utf8('2-HIGH'))).not()
else Literal(Int32(0))).local_sum().local_sum())
Group by = col(L_SHIPMODE)"]
stage_finalInMemoryScan3["InMemoryScan:
Schema = EMPTY,
Size bytes = 97536,
Clustering spec = { Num partitions = 200, By = col(L_SHIPMODE) }
Source ID = 5
"]
stage_finalInMemoryScan3 --> stage_finalAggregate2
stage_finalAggregate2 --> stage_finalProject1
stage_finalProject1 --> stage_finalSort0
end
subgraph stage_5["Stage 5"]
stage_5ShuffleExchange0["ShuffleExchange:
Strategy: MapReduceWithPreShuffleMerge
Target Spec: Hash(HashClusteringConfig { num_partitions: 200, by:
[Column('L_SHIPMODE')] })
Number of Partitions: 256 → 200
Pre-Shuffle Merge Threshold: 1073741824"]
stage_5InMemoryScan1["InMemoryScan:
Schema = EMPTY,
Size bytes = 16384,
Clustering spec = { Num partitions = 256 }
Source ID = 4
"]
stage_5InMemoryScan1 --> stage_5ShuffleExchange0
end
subgraph stage_4["Stage 4"]
stage_4Aggregate0["Aggregation: sum(if [col(O_ORDERPRIORITY) IN (lit('1-URGENT'), lit('2-
HIGH'))] then [lit(1)] else [lit(0)] as high_line_count as (Literal(Int32(1))
if O_ORDERPRIORITY.is_in(,Literal(Utf8('1-URGENT')),Literal(Utf8('2-HIGH')))
else Literal(Int32(0))).local_sum()), sum(if [not(col(O_ORDERPRIORITY)
IN (lit('1-URGENT'), lit('2-HIGH')))] then [lit(1)] else [lit(0)] as
low_line_count as (Literal(Int32(1)) if O_ORDERPRIORITY.is_in(,Literal(Utf8('1-
URGENT')),Literal(Utf8('2-HIGH'))).not() else Literal(Int32(0))).local_sum())
Group by = col(L_SHIPMODE)"]
stage_4Project1["Project: col(O_ORDERPRIORITY), col(L_SHIPMODE)
Clustering spec = { Num partitions = 256 }"]
stage_4HashJoin2["HashJoin: Type = Inner
Left on = col(O_ORDERKEY)
Right on = col(L_ORDERKEY)"]
stage_4InMemoryScan3["InMemoryScan:
Schema = O_ORDERKEY#Int64, O_ORDERPRIORITY#Utf8,
Size bytes = 36600176588,
Clustering spec = { Num partitions = 256, By = col(O_ORDERKEY) }
Source ID = 2
"]
stage_4InMemoryScan3 --> stage_4HashJoin2
stage_4InMemoryScan4["InMemoryScan:
Schema = L_ORDERKEY#Int64, L_SHIPMODE#Utf8,
Size bytes = 623364576,
Clustering spec = { Num partitions = 256, By = col(L_ORDERKEY) }
Source ID = 0
"]
stage_4InMemoryScan4 --> stage_4HashJoin2
stage_4HashJoin2 --> stage_4Project1
stage_4Project1 --> stage_4Aggregate0
end
subgraph stage_2["Stage 2"]
stage_2ShuffleExchange0["ShuffleExchange:
Strategy: MapReduceWithPreShuffleMerge
Target Spec: Hash(HashClusteringConfig { num_partitions: 256, by:
[Column('O_ORDERKEY')] })
Number of Partitions: 256 → 256
Pre-Shuffle Merge Threshold: 1073741824"]
stage_2InMemoryScan1["InMemoryScan:
Schema = EMPTY,
Size bytes = 36599939020,
Clustering spec = { Num partitions = 256 }
Source ID = 3
"]
stage_2InMemoryScan1 --> stage_2ShuffleExchange0
end
subgraph stage_3["Stage 3"]
stage_3TabularScan0["TabularScan:
Num Scan Tasks = 256
Estimated Scan Bytes = 69057170690
Clustering spec = { Num partitions = 256 }
Pushdowns: {projection: [O_ORDERKEY, O_ORDERPRIORITY], filter:
not(is_null(col(O_ORDERKEY)))}
Schema: {O_ORDERKEY#Int64, O_CUSTKEY#Int64, O_ORDERSTATUS#Utf8,
O_TOTALPRICE#Float64, O_ORDERDATE#Date, O_ORDERPRIORITY#Utf8, O_CLERK#Utf8,
O_SHIPPRIORITY#Int64, O_COMMENT#Utf8}
Scan Tasks: [
{File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-
dbgen/1000_0/512/parquet/orders/0075897d-cf42-4e73-a0c6-7578be24b312-0.parquet},
File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-
dbgen/1000_0/512/parquet/orders/01d013a2-28f6-41ac-8d60-527b4cecddc7-0.parquet}}
{File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-
dbgen/1000_0/512/parquet/orders/020da10e-f840-4843-9a96-ee6193fadca1-0.parquet},
File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-
dbgen/1000_0/512/parquet/orders/0273fb27-17aa-4063-8a34-e486362f50b8-0.parquet}}
{File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-
dbgen/1000_0/512/parquet/orders/03191195-4734-4619-9384-e06e081b5d22-0.parquet},
File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-
dbgen/1000_0/512/parquet/orders/03a48bfa-bef8-4ed8-8644-ae9e71e80bff-0.parquet}}
...
{File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-
dbgen/1000_0/512/parquet/orders/f900a9d4-c66c-4935-a7ae-4ab01714b1df-0.parquet},
File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-
dbgen/1000_0/512/parquet/orders/fba64149-dc35-4207-bbbb-2119f98c275e-0.parquet}}
{File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-
dbgen/1000_0/512/parquet/orders/fcbb12f6-20b9-48d1-8d5d-b8687179f302-0.parquet},
File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-
dbgen/1000_0/512/parquet/orders/fcc49573-b4d7-4d7f-aa18-5d6799817d2d-0.parquet}}
{File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-
dbgen/1000_0/512/parquet/orders/fefb14ea-76f7-4409-9fd4-fdc7d13bdab0-0.parquet},
File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-
dbgen/1000_0/512/parquet/orders/ff7a1ec6-2e04-4619-8023-5ba8303094ee-0.parquet}}
]
"]
end
stage_3 --> stage_2
stage_2 --> stage_4
subgraph stage_0["Stage 0"]
stage_0ShuffleExchange0["ShuffleExchange:
Strategy: NaiveFullyMaterializingMapReduce
Target Spec: Hash(HashClusteringConfig { num_partitions: 256, by:
[Column('L_ORDERKEY')] })
Number of Partitions: 52 → 256"]
stage_0InMemoryScan1["InMemoryScan:
Schema = EMPTY,
Size bytes = 623262176,
Clustering spec = { Num partitions = 52 }
Source ID = 1
"]
stage_0InMemoryScan1 --> stage_0ShuffleExchange0
end
subgraph stage_1["Stage 1"]
stage_1TabularScan0["TabularScan:
Num Scan Tasks = 52
Estimated Scan Bytes = 243609958780
Clustering spec = { Num partitions = 52 }
Pushdowns: {projection: [L_ORDERKEY, L_SHIPMODE], filter: [[[[[[col(L_SHIPMODE)
== lit('MAIL')] | [col(L_SHIPMODE) == lit('SHIP')]] & [col(L_COMMITDATE)
< col(L_RECEIPTDATE)]] & [col(L_SHIPDATE) < col(L_COMMITDATE)]] &
[col(L_RECEIPTDATE) >= lit(1994-01-01)]] & [col(L_RECEIPTDATE) < lit(1995-01-
01)]] & not(is_null(col(L_ORDERKEY)))}
Schema: {L_ORDERKEY#Int64, L_PARTKEY#Int64, L_SUPPKEY#Int64, L_LINENUMBER#Int64,
L_QUANTITY#Int64, L_EXTENDEDPRICE#Float64, L_DISCOUNT#Float64, L_TAX#Float64,
L_RETURNFLAG#Utf8, L_LINESTATUS#Utf8, L_SHIPDATE#Date, L_COMMITDATE#Date,
L_RECEIPTDATE#Date, L_SHIPINSTRUCT#Utf8, L_SHIPMODE#Utf8, L_COMMENT#Utf8}
Scan Tasks: [
{File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/
tpch-dbgen/1000_0/512/parquet/lineitem/00b288ce-9637-403e-8e96-e7afd15e82d6-
0.parquet}, File {s3://eventual-dev-benchmarking-fixtures/uncompressed-
smaller-rg/tpch-dbgen/1000_0/512/parquet/lineitem/00b963c8-a4bb-43e1-9c69-
9a032bcc8703-0.parquet}, File {s3://eventual-dev-benchmarking-fixtures/
uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/lineitem/00dde51d-
0e1a-43aa-8ef6-15052c82e4d7-0.parquet}, ..., File {s3://eventual-dev-
benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/
lineitem/03005f84-e30c-4732-ae33-9726b2860367-0.parquet}, File {s3://eventual-
dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/
lineitem/0349213c-5e39-4301-9b97-16e9f99f694f-0.parquet}, File {s3://eventual-
dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/
lineitem/03b80d3c-2c63-4f78-b582-6c935abf2da8-0.parquet}}
{File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/
tpch-dbgen/1000_0/512/parquet/lineitem/03e44dbb-1584-4c7a-b8c9-8f19d9dd1a0c-
0.parquet}, File {s3://eventual-dev-benchmarking-fixtures/uncompressed-
smaller-rg/tpch-dbgen/1000_0/512/parquet/lineitem/04a7edbb-07cb-4503-a331-
a4045be6b92f-0.parquet}, File {s3://eventual-dev-benchmarking-fixtures/
uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/lineitem/04db892c-
ee93-4d09-807f-4996b6142785-0.parquet}, ..., File {s3://eventual-dev-
benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/
lineitem/066716a9-3240-42f9-83ba-e7996852b504-0.parquet}, File {s3://eventual-
dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/
lineitem/0705bb03-7adf-4739-8a1b-51706f60334b-0.parquet}, File {s3://eventual-
dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/
lineitem/07bc30ad-2c28-44fc-a67f-eeb53d1ac6bd-0.parquet}}
{File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/
tpch-dbgen/1000_0/512/parquet/lineitem/07fe785b-bb18-4042-907c-4816fed7b769-
0.parquet}, File {s3://eventual-dev-benchmarking-fixtures/uncompressed-
smaller-rg/tpch-dbgen/1000_0/512/parquet/lineitem/081f14e0-430d-4fa9-b40b-
275e5ea71b33-0.parquet}, File {s3://eventual-dev-benchmarking-fixtures/
uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/lineitem/0876939e-
7877-4289-873b-141b63f30d4f-0.parquet}, ..., File {s3://eventual-dev-
benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/
lineitem/0b91c061-ceea-46a0-949a-d210a91dc7e7-0.parquet}, File {s3://eventual-
dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/
lineitem/0d0e0714-3dc5-47d0-97a7-05994a3c063b-0.parquet}, File {s3://eventual-
dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/
lineitem/0d753761-c906-43b9-8cf7-cf7b9cba14ff-0.parquet}}
...
{File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/
tpch-dbgen/1000_0/512/parquet/lineitem/f5a61726-5eec-46e8-a85c-010c0d94cb93-
0.parquet}, File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-
rg/tpch-dbgen/1000_0/512/parquet/lineitem/f5e83af2-6e87-4dac-b652-c9f6b0aa9ecf-
0.parquet}, File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-
rg/tpch-dbgen/1000_0/512/parquet/lineitem/f634dbf3-d506-4a4f-8551-40f473ebda42-
0.parquet}, ..., File {s3://eventual-dev-benchmarking-fixtures/uncompressed-
smaller-rg/tpch-dbgen/1000_0/512/parquet/lineitem/f7d8f121-b0a2-456b-9fd6-
6c019136081b-0.parquet}, File {s3://eventual-dev-benchmarking-fixtures/
uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/lineitem/f813ce16-
6a46-4fcc-9527-a63e834d6ce1-0.parquet}, File {s3://eventual-dev-benchmarking-
fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/lineitem/
f83e72be-7559-49b1-8354-39e7ffdf2913-0.parquet}}
{File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/
tpch-dbgen/1000_0/512/parquet/lineitem/f8cecdf4-5971-4975-86ad-343aa6cac5e6-
0.parquet}, File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-
rg/tpch-dbgen/1000_0/512/parquet/lineitem/f9f85c70-2fc1-4ce6-bfb9-4ef252fed5d2-
0.parquet}, File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-
rg/tpch-dbgen/1000_0/512/parquet/lineitem/fa1f594b-ba16-44e6-85ac-aeeeda9474a2-
0.parquet}, ..., File {s3://eventual-dev-benchmarking-fixtures/uncompressed-
smaller-rg/tpch-dbgen/1000_0/512/parquet/lineitem/fcb3ffe0-cc29-4f56-878c-
743979673d9e-0.parquet}, File {s3://eventual-dev-benchmarking-fixtures/
uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/lineitem/fd2f7a52-
8cbc-46ea-9783-cf7191e7153f-0.parquet}, File {s3://eventual-dev-benchmarking-
fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/lineitem/
fd8545ca-a3c3-4494-8636-0d7b81cda8ca-0.parquet}}
{File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/
tpch-dbgen/1000_0/512/parquet/lineitem/fdd5d7b7-f4b0-4f8b-ab23-cc49e3234358-
0.parquet}, File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-
rg/tpch-dbgen/1000_0/512/parquet/lineitem/ff08ac7b-a0c3-4f15-b2d4-172a706eac95-
0.parquet}}
]
"]
end
stage_1 --> stage_0
stage_0 --> stage_4
stage_4 --> stage_5
stage_5 --> stage_final
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks Good! but Let's make sure to enable some tests in CI for the next PR.
use serde::{Deserialize, Serialize}; | ||
|
||
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] | ||
pub struct PlaceholderScan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of naming this PlaceholderScan
, we can call this something like PreviousStageScan
and place the stage_id
of the stage we expect to go inside of here. Then during the next iteration we can enforce the stage_id
is what we expect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed. Opted to perform the stage id assertion in the planner instead of the plan, since we could be replacing either a logical or physical plan.
|
||
impl EmittedStage { | ||
fn new(query_stage_output: &QueryStageOutput, input_stages: Vec<Self>) -> DaftResult<Self> { | ||
let mut strip_cache_entry = StripCacheEntryFromInMemoryScan {}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to strip the cache entry after we execute the stage. Otherwise you delete the reference to the CacheEntry and it may be garbage collected before it gets the chance to get read
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree that this can be dangerous. But also we don't want to hold a reference to the cache entry in the planner during execution of the stage. Added assertions that the cached stage doesn't hold references, but the stage that is emitted has the cache entry.
d48ef02
to
88ef626
Compare
88ef626
to
9a36fdd
Compare
…lyze (#3852) Follow on from: #3781 This PR modifies stagification logic on joins to emit the children of joins before any shuffle exchanges, if necessary. This is so we can dynamically adjust partitioning schemes for shuffles based on exact runtime stats of the input data. Additionally, this PR adds stats (time taken, num rows, size bytes) per stage to the explain analyze. --------- Co-authored-by: Colin Ho <colinho@Colins-MacBook-Pro.local> Co-authored-by: Colin Ho <colinho@Colins-MBP.localdomain>
Refactor the current adaptive planner to stageify the plan on, works as follows:
Essentially, the logical boundaries are joins (these are the only places where data size affects the planning decision) and the physical boundaries are shuffles.
Results of TPCH SF1000, 4 nodes
After:
Daft Q1 took 31.26 seconds
Daft Q2 took 35.46 seconds
Daft Q3 took 53.77 seconds
Daft Q4 took 19.98 seconds
Daft Q5 took 206.01 seconds
Daft Q6 took 10.50 seconds
Daft Q7 took 91.34 seconds
Daft Q8 took 142.85 seconds
Daft Q9 took 271.16 seconds
Daft Q10 took 53.28 seconds
Total time: 918.28 seconds
Spilled 2194637 MiB
Before:
Q1 took 31.05 seconds
Q2 took 24.95 seconds
Q3 took 50.91 seconds
Q4 took 24.11 seconds
Q5 took 177.07 seconds
Q6 took 11.17 seconds
Q7 took 75.97 seconds
Q8 took 150.76 seconds
Q9 took 263.51 seconds
Q10 took 59.37 seconds
Total: 868.87 seconds
Spilled 2200948 MiB