Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Stageify plan on shuffle boundaries #3781

Merged
merged 16 commits into from
Feb 26, 2025
Merged

feat: Stageify plan on shuffle boundaries #3781

merged 16 commits into from
Feb 26, 2025

Conversation

colin-ho
Copy link
Contributor

@colin-ho colin-ho commented Feb 7, 2025

Refactor the current adaptive planner to stageify the plan on, works as follows:

  1. Translate logical plan until the root is reached or we hit a point where materialized data can better inform the planning decision, i.e. a join.
  2. Split the translated physical plan on shuffle exchanges, first emitting the plan until the exchange, then the exchange, then repeat until the physical plan is complete.
  3. Optimize the logical plan and repeat from step 1.

Essentially, the logical boundaries are joins (these are the only places where data size affects the planning decision) and the physical boundaries are shuffles.

Results of TPCH SF1000, 4 nodes

After:

Daft Q1 took 31.26 seconds
Daft Q2 took 35.46 seconds
Daft Q3 took 53.77 seconds
Daft Q4 took 19.98 seconds
Daft Q5 took 206.01 seconds
Daft Q6 took 10.50 seconds
Daft Q7 took 91.34 seconds
Daft Q8 took 142.85 seconds
Daft Q9 took 271.16 seconds
Daft Q10 took 53.28 seconds
Total time: 918.28 seconds
Spilled 2194637 MiB

Before:

Q1 took 31.05 seconds
Q2 took 24.95 seconds
Q3 took 50.91 seconds
Q4 took 24.11 seconds
Q5 took 177.07 seconds
Q6 took 11.17 seconds
Q7 took 75.97 seconds
Q8 took 150.76 seconds
Q9 took 263.51 seconds
Q10 took 59.37 seconds
Total: 868.87 seconds
Spilled 2200948 MiB

@github-actions github-actions bot added the feat label Feb 7, 2025
Copy link

codspeed-hq bot commented Feb 7, 2025

CodSpeed Performance Report

Merging #3781 will not alter performance

Comparing colin/stageify (9a36fdd) with main (0080a7e)

Summary

✅ 24 untouched benchmarks

Copy link

codecov bot commented Feb 7, 2025

Codecov Report

Attention: Patch coverage is 69.43320% with 151 lines in your changes missing coverage. Please review.

Project coverage is 78.39%. Comparing base (0080a7e) to head (9a36fdd).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...daft-physical-plan/src/physical_planner/display.rs 0.00% 61 Missing ⚠️
...daft-physical-plan/src/physical_planner/planner.rs 88.10% 37 Missing ⚠️
.../daft-physical-plan/src/ops/previous_stage_scan.rs 21.42% 22 Missing ⚠️
src/daft-physical-plan/src/ops/in_memory.rs 0.00% 10 Missing ⚠️
src/daft-physical-plan/src/plan.rs 27.27% 8 Missing ⚠️
src/daft-scheduler/src/adaptive.rs 55.55% 4 Missing ⚠️
daft/runners/ray_runner.py 70.00% 3 Missing ⚠️
...ft-physical-plan/src/physical_planner/translate.rs 90.00% 3 Missing ⚠️
daft/plan_scheduler/physical_plan_scheduler.py 66.66% 1 Missing ⚠️
src/daft-physical-plan/src/display.rs 0.00% 1 Missing ⚠️
... and 1 more
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #3781      +/-   ##
==========================================
+ Coverage   77.77%   78.39%   +0.62%     
==========================================
  Files         760      762       +2     
  Lines       96306    96229      -77     
==========================================
+ Hits        74899    75442     +543     
+ Misses      21407    20787     -620     
Files with missing lines Coverage Δ
src/daft-logical-plan/src/builder/mod.rs 87.83% <100.00%> (+0.01%) ⬆️
src/daft-logical-plan/src/display.rs 98.00% <ø> (-0.05%) ⬇️
src/daft-logical-plan/src/ops/source.rs 77.27% <100.00%> (-0.75%) ⬇️
...rc/daft-logical-plan/src/optimization/optimizer.rs 95.21% <100.00%> (+0.16%) ⬆️
...lan/src/optimization/rules/eliminate_cross_join.rs 88.09% <ø> (ø)
...lan/src/optimization/rules/simplify_expressions.rs 92.30% <ø> (-0.12%) ⬇️
src/daft-logical-plan/src/source_info/mod.rs 90.90% <100.00%> (+22.15%) ⬆️
src/daft-physical-plan/src/ops/mod.rs 0.00% <ø> (ø)
...n/src/optimization/rules/reorder_partition_keys.rs 77.94% <ø> (ø)
src/daft-physical-plan/src/physical_planner/mod.rs 95.00% <ø> (ø)
... and 13 more

... and 10 files with indirect coverage changes

@colin-ho
Copy link
Contributor Author

colin-ho commented Feb 19, 2025

flowchart TD
subgraph stage_final["Stage final"]
stage_finalSort0["Sort: Sort by = (col(L_SHIPMODE), ascending, nulls last)
Num partitions = 200"]
stage_finalProject1["Project: col(L_SHIPMODE), col((Literal(Int32(1)) if
O_ORDERPRIORITY.is_in(,Literal(Utf8('1-URGENT')),Literal(Utf8('2-HIGH')))
else Literal(Int32(0))).local_sum().local_sum()) as high_line_count,
col((Literal(Int32(1)) if O_ORDERPRIORITY.is_in(,Literal(Utf8('1-
URGENT')),Literal(Utf8('2-HIGH'))).not() else
Literal(Int32(0))).local_sum().local_sum()) as low_line_count
Clustering spec = { Num partitions = 200, By = col(L_SHIPMODE) }"]
stage_finalAggregate2["Aggregation: sum(col((Literal(Int32(1)) if
O_ORDERPRIORITY.is_in(,Literal(Utf8('1-URGENT')),Literal(Utf8('2-
HIGH'))) else Literal(Int32(0))).local_sum()) as (Literal(Int32(1)) if
O_ORDERPRIORITY.is_in(,Literal(Utf8('1-URGENT')),Literal(Utf8('2-HIGH')))
else Literal(Int32(0))).local_sum().local_sum()), sum(col((Literal(Int32(1))
if O_ORDERPRIORITY.is_in(,Literal(Utf8('1-URGENT')),Literal(Utf8('2-
HIGH'))).not() else Literal(Int32(0))).local_sum()) as (Literal(Int32(1)) if
O_ORDERPRIORITY.is_in(,Literal(Utf8('1-URGENT')),Literal(Utf8('2-HIGH'))).not()
else Literal(Int32(0))).local_sum().local_sum())
Group by = col(L_SHIPMODE)"]
stage_finalInMemoryScan3["InMemoryScan:
Schema = EMPTY,
Size bytes = 97536,
Clustering spec = { Num partitions = 200, By = col(L_SHIPMODE) }
Source ID = 5
"]
stage_finalInMemoryScan3 --> stage_finalAggregate2
stage_finalAggregate2 --> stage_finalProject1
stage_finalProject1 --> stage_finalSort0
end
subgraph stage_5["Stage 5"]
stage_5ShuffleExchange0["ShuffleExchange:
Strategy: MapReduceWithPreShuffleMerge
Target Spec: Hash(HashClusteringConfig { num_partitions: 200, by:
[Column('L_SHIPMODE')] })
Number of Partitions: 256 → 200
Pre-Shuffle Merge Threshold: 1073741824"]
stage_5InMemoryScan1["InMemoryScan:
Schema = EMPTY,
Size bytes = 16384,
Clustering spec = { Num partitions = 256 }
Source ID = 4
"]
stage_5InMemoryScan1 --> stage_5ShuffleExchange0
end
subgraph stage_4["Stage 4"]
stage_4Aggregate0["Aggregation: sum(if [col(O_ORDERPRIORITY) IN (lit('1-URGENT'), lit('2-
HIGH'))] then [lit(1)] else [lit(0)] as high_line_count as (Literal(Int32(1))
if O_ORDERPRIORITY.is_in(,Literal(Utf8('1-URGENT')),Literal(Utf8('2-HIGH')))
else Literal(Int32(0))).local_sum()), sum(if [not(col(O_ORDERPRIORITY)
IN (lit('1-URGENT'), lit('2-HIGH')))] then [lit(1)] else [lit(0)] as
low_line_count as (Literal(Int32(1)) if O_ORDERPRIORITY.is_in(,Literal(Utf8('1-
URGENT')),Literal(Utf8('2-HIGH'))).not() else Literal(Int32(0))).local_sum())
Group by = col(L_SHIPMODE)"]
stage_4Project1["Project: col(O_ORDERPRIORITY), col(L_SHIPMODE)
Clustering spec = { Num partitions = 256 }"]
stage_4HashJoin2["HashJoin: Type = Inner
Left on = col(O_ORDERKEY)
Right on = col(L_ORDERKEY)"]
stage_4InMemoryScan3["InMemoryScan:
Schema = O_ORDERKEY#Int64, O_ORDERPRIORITY#Utf8,
Size bytes = 36600176588,
Clustering spec = { Num partitions = 256, By = col(O_ORDERKEY) }
Source ID = 2
"]
stage_4InMemoryScan3 --> stage_4HashJoin2
stage_4InMemoryScan4["InMemoryScan:
Schema = L_ORDERKEY#Int64, L_SHIPMODE#Utf8,
Size bytes = 623364576,
Clustering spec = { Num partitions = 256, By = col(L_ORDERKEY) }
Source ID = 0
"]
stage_4InMemoryScan4 --> stage_4HashJoin2
stage_4HashJoin2 --> stage_4Project1
stage_4Project1 --> stage_4Aggregate0
end
subgraph stage_2["Stage 2"]
stage_2ShuffleExchange0["ShuffleExchange:
Strategy: MapReduceWithPreShuffleMerge
Target Spec: Hash(HashClusteringConfig { num_partitions: 256, by:
[Column('O_ORDERKEY')] })
Number of Partitions: 256 → 256
Pre-Shuffle Merge Threshold: 1073741824"]
stage_2InMemoryScan1["InMemoryScan:
Schema = EMPTY,
Size bytes = 36599939020,
Clustering spec = { Num partitions = 256 }
Source ID = 3
"]
stage_2InMemoryScan1 --> stage_2ShuffleExchange0
end
subgraph stage_3["Stage 3"]
stage_3TabularScan0["TabularScan:
Num Scan Tasks = 256
Estimated Scan Bytes = 69057170690
Clustering spec = { Num partitions = 256 }
Pushdowns: {projection: [O_ORDERKEY, O_ORDERPRIORITY], filter:
not(is_null(col(O_ORDERKEY)))}
Schema: {O_ORDERKEY#Int64, O_CUSTKEY#Int64, O_ORDERSTATUS#Utf8,
O_TOTALPRICE#Float64, O_ORDERDATE#Date, O_ORDERPRIORITY#Utf8, O_CLERK#Utf8,
O_SHIPPRIORITY#Int64, O_COMMENT#Utf8}
Scan Tasks: [
{File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-
dbgen/1000_0/512/parquet/orders/0075897d-cf42-4e73-a0c6-7578be24b312-0.parquet},
File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-
dbgen/1000_0/512/parquet/orders/01d013a2-28f6-41ac-8d60-527b4cecddc7-0.parquet}}
{File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-
dbgen/1000_0/512/parquet/orders/020da10e-f840-4843-9a96-ee6193fadca1-0.parquet},
File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-
dbgen/1000_0/512/parquet/orders/0273fb27-17aa-4063-8a34-e486362f50b8-0.parquet}}
{File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-
dbgen/1000_0/512/parquet/orders/03191195-4734-4619-9384-e06e081b5d22-0.parquet},
File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-
dbgen/1000_0/512/parquet/orders/03a48bfa-bef8-4ed8-8644-ae9e71e80bff-0.parquet}}
...
{File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-
dbgen/1000_0/512/parquet/orders/f900a9d4-c66c-4935-a7ae-4ab01714b1df-0.parquet},
File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-
dbgen/1000_0/512/parquet/orders/fba64149-dc35-4207-bbbb-2119f98c275e-0.parquet}}
{File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-
dbgen/1000_0/512/parquet/orders/fcbb12f6-20b9-48d1-8d5d-b8687179f302-0.parquet},
File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-
dbgen/1000_0/512/parquet/orders/fcc49573-b4d7-4d7f-aa18-5d6799817d2d-0.parquet}}
{File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-
dbgen/1000_0/512/parquet/orders/fefb14ea-76f7-4409-9fd4-fdc7d13bdab0-0.parquet},
File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-
dbgen/1000_0/512/parquet/orders/ff7a1ec6-2e04-4619-8023-5ba8303094ee-0.parquet}}
]
"]
end
stage_3 --> stage_2
stage_2 --> stage_4
subgraph stage_0["Stage 0"]
stage_0ShuffleExchange0["ShuffleExchange:
Strategy: NaiveFullyMaterializingMapReduce
Target Spec: Hash(HashClusteringConfig { num_partitions: 256, by:
[Column('L_ORDERKEY')] })
Number of Partitions: 52 → 256"]
stage_0InMemoryScan1["InMemoryScan:
Schema = EMPTY,
Size bytes = 623262176,
Clustering spec = { Num partitions = 52 }
Source ID = 1
"]
stage_0InMemoryScan1 --> stage_0ShuffleExchange0
end
subgraph stage_1["Stage 1"]
stage_1TabularScan0["TabularScan:
Num Scan Tasks = 52
Estimated Scan Bytes = 243609958780
Clustering spec = { Num partitions = 52 }
Pushdowns: {projection: [L_ORDERKEY, L_SHIPMODE], filter: [[[[[[col(L_SHIPMODE)
== lit('MAIL')] | [col(L_SHIPMODE) == lit('SHIP')]] & [col(L_COMMITDATE)
< col(L_RECEIPTDATE)]] & [col(L_SHIPDATE) < col(L_COMMITDATE)]] &
[col(L_RECEIPTDATE) >= lit(1994-01-01)]] & [col(L_RECEIPTDATE) < lit(1995-01-
01)]] & not(is_null(col(L_ORDERKEY)))}
Schema: {L_ORDERKEY#Int64, L_PARTKEY#Int64, L_SUPPKEY#Int64, L_LINENUMBER#Int64,
L_QUANTITY#Int64, L_EXTENDEDPRICE#Float64, L_DISCOUNT#Float64, L_TAX#Float64,
L_RETURNFLAG#Utf8, L_LINESTATUS#Utf8, L_SHIPDATE#Date, L_COMMITDATE#Date,
L_RECEIPTDATE#Date, L_SHIPINSTRUCT#Utf8, L_SHIPMODE#Utf8, L_COMMENT#Utf8}
Scan Tasks: [
{File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/
tpch-dbgen/1000_0/512/parquet/lineitem/00b288ce-9637-403e-8e96-e7afd15e82d6-
0.parquet}, File {s3://eventual-dev-benchmarking-fixtures/uncompressed-
smaller-rg/tpch-dbgen/1000_0/512/parquet/lineitem/00b963c8-a4bb-43e1-9c69-
9a032bcc8703-0.parquet}, File {s3://eventual-dev-benchmarking-fixtures/
uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/lineitem/00dde51d-
0e1a-43aa-8ef6-15052c82e4d7-0.parquet}, ..., File {s3://eventual-dev-
benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/
lineitem/03005f84-e30c-4732-ae33-9726b2860367-0.parquet}, File {s3://eventual-
dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/
lineitem/0349213c-5e39-4301-9b97-16e9f99f694f-0.parquet}, File {s3://eventual-
dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/
lineitem/03b80d3c-2c63-4f78-b582-6c935abf2da8-0.parquet}}
{File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/
tpch-dbgen/1000_0/512/parquet/lineitem/03e44dbb-1584-4c7a-b8c9-8f19d9dd1a0c-
0.parquet}, File {s3://eventual-dev-benchmarking-fixtures/uncompressed-
smaller-rg/tpch-dbgen/1000_0/512/parquet/lineitem/04a7edbb-07cb-4503-a331-
a4045be6b92f-0.parquet}, File {s3://eventual-dev-benchmarking-fixtures/
uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/lineitem/04db892c-
ee93-4d09-807f-4996b6142785-0.parquet}, ..., File {s3://eventual-dev-
benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/
lineitem/066716a9-3240-42f9-83ba-e7996852b504-0.parquet}, File {s3://eventual-
dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/
lineitem/0705bb03-7adf-4739-8a1b-51706f60334b-0.parquet}, File {s3://eventual-
dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/
lineitem/07bc30ad-2c28-44fc-a67f-eeb53d1ac6bd-0.parquet}}
{File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/
tpch-dbgen/1000_0/512/parquet/lineitem/07fe785b-bb18-4042-907c-4816fed7b769-
0.parquet}, File {s3://eventual-dev-benchmarking-fixtures/uncompressed-
smaller-rg/tpch-dbgen/1000_0/512/parquet/lineitem/081f14e0-430d-4fa9-b40b-
275e5ea71b33-0.parquet}, File {s3://eventual-dev-benchmarking-fixtures/
uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/lineitem/0876939e-
7877-4289-873b-141b63f30d4f-0.parquet}, ..., File {s3://eventual-dev-
benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/
lineitem/0b91c061-ceea-46a0-949a-d210a91dc7e7-0.parquet}, File {s3://eventual-
dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/
lineitem/0d0e0714-3dc5-47d0-97a7-05994a3c063b-0.parquet}, File {s3://eventual-
dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/
lineitem/0d753761-c906-43b9-8cf7-cf7b9cba14ff-0.parquet}}
...
{File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/
tpch-dbgen/1000_0/512/parquet/lineitem/f5a61726-5eec-46e8-a85c-010c0d94cb93-
0.parquet}, File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-
rg/tpch-dbgen/1000_0/512/parquet/lineitem/f5e83af2-6e87-4dac-b652-c9f6b0aa9ecf-
0.parquet}, File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-
rg/tpch-dbgen/1000_0/512/parquet/lineitem/f634dbf3-d506-4a4f-8551-40f473ebda42-
0.parquet}, ..., File {s3://eventual-dev-benchmarking-fixtures/uncompressed-
smaller-rg/tpch-dbgen/1000_0/512/parquet/lineitem/f7d8f121-b0a2-456b-9fd6-
6c019136081b-0.parquet}, File {s3://eventual-dev-benchmarking-fixtures/
uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/lineitem/f813ce16-
6a46-4fcc-9527-a63e834d6ce1-0.parquet}, File {s3://eventual-dev-benchmarking-
fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/lineitem/
f83e72be-7559-49b1-8354-39e7ffdf2913-0.parquet}}
{File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/
tpch-dbgen/1000_0/512/parquet/lineitem/f8cecdf4-5971-4975-86ad-343aa6cac5e6-
0.parquet}, File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-
rg/tpch-dbgen/1000_0/512/parquet/lineitem/f9f85c70-2fc1-4ce6-bfb9-4ef252fed5d2-
0.parquet}, File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-
rg/tpch-dbgen/1000_0/512/parquet/lineitem/fa1f594b-ba16-44e6-85ac-aeeeda9474a2-
0.parquet}, ..., File {s3://eventual-dev-benchmarking-fixtures/uncompressed-
smaller-rg/tpch-dbgen/1000_0/512/parquet/lineitem/fcb3ffe0-cc29-4f56-878c-
743979673d9e-0.parquet}, File {s3://eventual-dev-benchmarking-fixtures/
uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/lineitem/fd2f7a52-
8cbc-46ea-9783-cf7191e7153f-0.parquet}, File {s3://eventual-dev-benchmarking-
fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/lineitem/
fd8545ca-a3c3-4494-8636-0d7b81cda8ca-0.parquet}}
{File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/
tpch-dbgen/1000_0/512/parquet/lineitem/fdd5d7b7-f4b0-4f8b-ab23-cc49e3234358-
0.parquet}, File {s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-
rg/tpch-dbgen/1000_0/512/parquet/lineitem/ff08ac7b-a0c3-4f15-b2d4-172a706eac95-
0.parquet}}
]
"]
end
stage_1 --> stage_0
stage_0 --> stage_4
stage_4 --> stage_5
stage_5 --> stage_final

Loading

Copy link
Member

@samster25 samster25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks Good! but Let's make sure to enable some tests in CI for the next PR.

use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct PlaceholderScan {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of naming this PlaceholderScan, we can call this something like PreviousStageScan and place the stage_id of the stage we expect to go inside of here. Then during the next iteration we can enforce the stage_id is what we expect

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed. Opted to perform the stage id assertion in the planner instead of the plan, since we could be replacing either a logical or physical plan.


impl EmittedStage {
fn new(query_stage_output: &QueryStageOutput, input_stages: Vec<Self>) -> DaftResult<Self> {
let mut strip_cache_entry = StripCacheEntryFromInMemoryScan {};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to strip the cache entry after we execute the stage. Otherwise you delete the reference to the CacheEntry and it may be garbage collected before it gets the chance to get read

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that this can be dangerous. But also we don't want to hold a reference to the cache entry in the planner during execution of the stage. Added assertions that the cached stage doesn't hold references, but the stage that is emitted has the cache entry.

@colin-ho colin-ho merged commit f782869 into main Feb 26, 2025
49 of 50 checks passed
@colin-ho colin-ho deleted the colin/stageify branch February 26, 2025 02:56
colin-ho added a commit that referenced this pull request Feb 26, 2025
…lyze (#3852)

Follow on from: #3781

This PR modifies stagification logic on joins to emit the children of
joins before any shuffle exchanges, if necessary. This is so we can
dynamically adjust partitioning schemes for shuffles based on exact
runtime stats of the input data.

Additionally, this PR adds stats (time taken, num rows, size bytes) per
stage to the explain analyze.

---------

Co-authored-by: Colin Ho <colinho@Colins-MacBook-Pro.local>
Co-authored-by: Colin Ho <colinho@Colins-MBP.localdomain>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants