Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(shuffles): Determination logic for pre shuffle merge #3674

Merged
merged 4 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ def set_execution_config(
enable_aqe: Enables Adaptive Query Execution, Defaults to False
enable_native_executor: Enables the native executor, Defaults to False
default_morsel_size: Default size of morsels used for the new local executor. Defaults to 131072 rows.
shuffle_algorithm: The shuffle algorithm to use. Defaults to "map_reduce". Other options are "pre_shuffle_merge".
shuffle_algorithm: The shuffle algorithm to use. Defaults to "auto", which will let Daft determine the algorithm. Options are "map_reduce" and "pre_shuffle_merge".
pre_shuffle_merge_threshold: Memory threshold in bytes for pre-shuffle merge. Defaults to 1GB
enable_ray_tracing: Enable tracing for Ray. Accessible in `/tmp/ray/session_latest/logs/daft` after the run completes. Defaults to False.
scantask_splitting_level: How aggressively to split scan tasks. Setting this to `2` will use a more aggressive ScanTask splitting algorithm which might be more expensive to run but results in more even splits of partitions. Defaults to 1.
Expand Down
2 changes: 1 addition & 1 deletion src/common/daft-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl Default for DaftExecutionConfig {
enable_aqe: false,
enable_native_executor: false,
default_morsel_size: 128 * 1024,
shuffle_algorithm: "map_reduce".to_string(),
shuffle_algorithm: "auto".to_string(),
pre_shuffle_merge_threshold: 1024 * 1024 * 1024, // 1GB
enable_ray_tracing: false,
scantask_splitting_level: 1,
Expand Down
7 changes: 5 additions & 2 deletions src/common/daft-config/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,12 @@
config.default_morsel_size = default_morsel_size;
}
if let Some(shuffle_algorithm) = shuffle_algorithm {
if !matches!(shuffle_algorithm, "map_reduce" | "pre_shuffle_merge") {
if !matches!(
shuffle_algorithm,
"map_reduce" | "pre_shuffle_merge" | "auto"
) {
return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
"shuffle_algorithm must be 'map_reduce' or 'pre_shuffle_merge'",
"shuffle_algorithm must be 'auto', 'map_reduce' or 'pre_shuffle_merge'",

Check warning on line 205 in src/common/daft-config/src/python.rs

View check run for this annotation

Codecov / codecov/patch

src/common/daft-config/src/python.rs#L205

Added line #L205 was not covered by tests
));
}
config.shuffle_algorithm = shuffle_algorithm.to_string();
Expand Down
109 changes: 75 additions & 34 deletions src/daft-physical-plan/src/ops/shuffle_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,21 @@
target_num_partitions,
));
}
ShuffleExchangeStrategy::MapReduceWithPreShuffleMerge { target_spec, .. } => {
ShuffleExchangeStrategy::MapReduceWithPreShuffleMerge {
target_spec,
pre_shuffle_merge_threshold,
} => {

Check warning on line 85 in src/daft-physical-plan/src/ops/shuffle_exchange.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-physical-plan/src/ops/shuffle_exchange.rs#L83-L85

Added lines #L83 - L85 were not covered by tests
res.push("Strategy: MapReduceWithPreShuffleMerge".to_string());
res.push(format!("Target Spec: {:?}", target_spec));
res.push(format!(
"Number of Partitions: {} → {}",
self.input.clustering_spec().num_partitions(),
target_spec.num_partitions(),
));
res.push(format!(
"Pre-Shuffle Merge Threshold: {}",
pre_shuffle_merge_threshold
));

Check warning on line 96 in src/daft-physical-plan/src/ops/shuffle_exchange.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-physical-plan/src/ops/shuffle_exchange.rs#L93-L96

Added lines #L93 - L96 were not covered by tests
}
}
res
Expand All @@ -104,10 +111,74 @@
}

impl ShuffleExchangeFactory {
const PARTITION_THRESHOLD_TO_USE_PRE_SHUFFLE_MERGE: usize = 200;

pub fn new(input: PhysicalPlanRef) -> Self {
Self { input }
}

fn should_use_pre_shuffle_merge(
&self,
input_num_partitions: usize,
target_num_partitions: usize,
) -> bool {
let total_num_partitions = input_num_partitions * target_num_partitions;
let geometric_mean = (total_num_partitions as f64).sqrt() as usize;
geometric_mean > Self::PARTITION_THRESHOLD_TO_USE_PRE_SHUFFLE_MERGE
}

fn get_shuffle_strategy(
&self,
clustering_spec: Arc<ClusteringSpec>,
cfg: Option<&DaftExecutionConfig>,
) -> ShuffleExchangeStrategy {
match cfg {
Some(cfg) if cfg.shuffle_algorithm == "pre_shuffle_merge" => {
ShuffleExchangeStrategy::MapReduceWithPreShuffleMerge {
target_spec: clustering_spec,
pre_shuffle_merge_threshold: cfg.pre_shuffle_merge_threshold,
}
}
Some(cfg) if cfg.shuffle_algorithm == "map_reduce" => {
ShuffleExchangeStrategy::NaiveFullyMaterializingMapReduce {
target_spec: clustering_spec,
}

Check warning on line 145 in src/daft-physical-plan/src/ops/shuffle_exchange.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-physical-plan/src/ops/shuffle_exchange.rs#L143-L145

Added lines #L143 - L145 were not covered by tests
}
Some(cfg) if cfg.shuffle_algorithm == "auto" => {
if self.should_use_pre_shuffle_merge(
self.input.clustering_spec().num_partitions(),
clustering_spec.num_partitions(),
) {
ShuffleExchangeStrategy::MapReduceWithPreShuffleMerge {
target_spec: clustering_spec,
pre_shuffle_merge_threshold: cfg.pre_shuffle_merge_threshold,
}
} else {
ShuffleExchangeStrategy::NaiveFullyMaterializingMapReduce {
target_spec: clustering_spec,
}
}
}
None => {
if self.should_use_pre_shuffle_merge(
self.input.clustering_spec().num_partitions(),
clustering_spec.num_partitions(),
) {
ShuffleExchangeStrategy::MapReduceWithPreShuffleMerge {
target_spec: clustering_spec,
pre_shuffle_merge_threshold: DaftExecutionConfig::default()
.pre_shuffle_merge_threshold,
}

Check warning on line 171 in src/daft-physical-plan/src/ops/shuffle_exchange.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-physical-plan/src/ops/shuffle_exchange.rs#L167-L171

Added lines #L167 - L171 were not covered by tests
} else {
ShuffleExchangeStrategy::NaiveFullyMaterializingMapReduce {
target_spec: clustering_spec,
}
}
}
_ => unreachable!(),

Check warning on line 178 in src/daft-physical-plan/src/ops/shuffle_exchange.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-physical-plan/src/ops/shuffle_exchange.rs#L178

Added line #L178 was not covered by tests
}
}

pub fn get_hash_partitioning(
&self,
by: Vec<ExprRef>,
Expand All @@ -119,17 +190,7 @@
by,
)));

let strategy = match cfg {
Some(cfg) if cfg.shuffle_algorithm == "pre_shuffle_merge" => {
ShuffleExchangeStrategy::MapReduceWithPreShuffleMerge {
target_spec: clustering_spec,
pre_shuffle_merge_threshold: cfg.pre_shuffle_merge_threshold,
}
}
_ => ShuffleExchangeStrategy::NaiveFullyMaterializingMapReduce {
target_spec: clustering_spec,
},
};
let strategy = self.get_shuffle_strategy(clustering_spec, cfg);

ShuffleExchange {
input: self.input.clone(),
Expand All @@ -150,17 +211,7 @@
descending,
)));

let strategy = match cfg {
Some(cfg) if cfg.shuffle_algorithm == "pre_shuffle_merge" => {
ShuffleExchangeStrategy::MapReduceWithPreShuffleMerge {
target_spec: clustering_spec,
pre_shuffle_merge_threshold: cfg.pre_shuffle_merge_threshold,
}
}
_ => ShuffleExchangeStrategy::NaiveFullyMaterializingMapReduce {
target_spec: clustering_spec,
},
};
let strategy = self.get_shuffle_strategy(clustering_spec, cfg);

Check warning on line 214 in src/daft-physical-plan/src/ops/shuffle_exchange.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-physical-plan/src/ops/shuffle_exchange.rs#L214

Added line #L214 was not covered by tests

ShuffleExchange {
input: self.input.clone(),
Expand All @@ -177,17 +228,7 @@
num_partitions,
)));

let strategy = match cfg {
Some(cfg) if cfg.shuffle_algorithm == "pre_shuffle_merge" => {
ShuffleExchangeStrategy::MapReduceWithPreShuffleMerge {
target_spec: clustering_spec,
pre_shuffle_merge_threshold: cfg.pre_shuffle_merge_threshold,
}
}
_ => ShuffleExchangeStrategy::NaiveFullyMaterializingMapReduce {
target_spec: clustering_spec,
},
};
let strategy = self.get_shuffle_strategy(clustering_spec, cfg);

ShuffleExchange {
input: self.input.clone(),
Expand Down
Loading