Skip to content

Commit

Permalink
Add support for Project and Coalesce.
Browse files Browse the repository at this point in the history
  • Loading branch information
clarkzinzow committed Aug 11, 2023
1 parent 5785434 commit 6754d67
Show file tree
Hide file tree
Showing 31 changed files with 271 additions and 76 deletions.
11 changes: 11 additions & 0 deletions daft/execution/rust_physical_plan_shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ def tabular_scan(
)


def project(
input: physical_plan.InProgressPhysicalPlan[PartitionT], projection: list[PyExpr]
) -> physical_plan.InProgressPhysicalPlan[PartitionT]:
expr_projection = ExpressionsProjection([Expression._from_pyexpr(expr) for expr in projection])
return physical_plan.pipeline_instruction(
child_plan=input,
pipeable_instruction=execution_step.Project(expr_projection),
resource_request=ResourceRequest(), # TODO(Clark): Use real ResourceRequest.
)


def sort(
input: physical_plan.InProgressPhysicalPlan[PartitionT],
sort_by: list[PyExpr],
Expand Down
23 changes: 21 additions & 2 deletions daft/logical/rust_logical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,23 @@ def project(
projection: ExpressionsProjection,
custom_resource_request: ResourceRequest = ResourceRequest(),
) -> RustLogicalPlanBuilder:
raise NotImplementedError("not implemented")
if custom_resource_request != ResourceRequest():
raise NotImplementedError("ResourceRequests not supported for new query planner")
schema = projection.resolve_schema(self.schema())
exprs = [expr._expr for expr in projection]
builder = self._builder.project(exprs, schema._schema)
return RustLogicalPlanBuilder(builder)

def filter(self, predicate: Expression) -> RustLogicalPlanBuilder:
# TODO(Clark): Move this logic to Rust side after we've ported ExpressionsProjection.
predicate_expr_proj = ExpressionsProjection([predicate])
predicate_schema = predicate_expr_proj.resolve_schema(self.schema())
for resolved_field, predicate_expr in zip(predicate_schema, predicate_expr_proj):
resolved_type = resolved_field.dtype
if resolved_type != DataType.bool():
raise ValueError(
f"Expected expression {predicate_expr} to resolve to type Boolean, but received: {resolved_type}"
)
builder = self._builder.filter(predicate._expr)
return RustLogicalPlanBuilder(builder)

Expand Down Expand Up @@ -137,7 +151,12 @@ def repartition(
return RustLogicalPlanBuilder(builder)

def coalesce(self, num_partitions: int) -> RustLogicalPlanBuilder:
raise NotImplementedError("not implemented")
if num_partitions > self.num_partitions():
raise ValueError(
f"Coalesce can only reduce the number of partitions: {num_partitions} vs {self.num_partitions}"
)
builder = self._builder.coalesce(num_partitions)
return RustLogicalPlanBuilder(builder)

def agg(
self,
Expand Down
26 changes: 26 additions & 0 deletions src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,25 @@ impl LogicalPlanBuilder {
Ok(logical_plan_builder)
}

pub fn project(
&self,
projection: Vec<PyExpr>,
projected_schema: &PySchema,
) -> PyResult<LogicalPlanBuilder> {
let projection_exprs = projection
.iter()
.map(|e| e.clone().into())
.collect::<Vec<Expr>>();
let logical_plan: LogicalPlan = ops::Project::new(
projection_exprs,
projected_schema.clone().into(),
self.plan.clone(),
)
.into();
let logical_plan_builder = LogicalPlanBuilder::new(logical_plan.into());
Ok(logical_plan_builder)
}

pub fn filter(&self, predicate: &PyExpr) -> PyResult<LogicalPlanBuilder> {
let logical_plan: LogicalPlan =
ops::Filter::new(predicate.expr.clone(), self.plan.clone()).into();
Expand Down Expand Up @@ -125,6 +144,13 @@ impl LogicalPlanBuilder {
Ok(logical_plan_builder)
}

pub fn coalesce(&self, num_partitions: usize) -> PyResult<LogicalPlanBuilder> {
let logical_plan: LogicalPlan =
ops::Coalesce::new(num_partitions, self.plan.clone()).into();
let logical_plan_builder = LogicalPlanBuilder::new(logical_plan.into());
Ok(logical_plan_builder)
}

pub fn distinct(&self) -> PyResult<LogicalPlanBuilder> {
let logical_plan: LogicalPlan = ops::Distinct::new(self.plan.clone()).into();
let logical_plan_builder = LogicalPlanBuilder::new(logical_plan.into());
Expand Down
16 changes: 16 additions & 0 deletions src/daft-plan/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ use crate::{ops::*, PartitionScheme, PartitionSpec};
#[derive(Clone, Debug)]
pub enum LogicalPlan {
Source(Source),
Project(Project),
Filter(Filter),
Limit(Limit),
Sort(Sort),
Repartition(Repartition),
Coalesce(Coalesce),
Distinct(Distinct),
Aggregate(Aggregate),
Concat(Concat),
Expand All @@ -21,10 +23,14 @@ impl LogicalPlan {
pub fn schema(&self) -> SchemaRef {
match self {
Self::Source(Source { schema, .. }) => schema.clone(),
Self::Project(Project {
projected_schema, ..
}) => projected_schema.clone(),
Self::Filter(Filter { input, .. }) => input.schema(),
Self::Limit(Limit { input, .. }) => input.schema(),
Self::Sort(Sort { input, .. }) => input.schema(),
Self::Repartition(Repartition { input, .. }) => input.schema(),
Self::Coalesce(Coalesce { input, .. }) => input.schema(),
Self::Distinct(Distinct { input, .. }) => input.schema(),
Self::Aggregate(aggregate) => aggregate.schema(),
Self::Concat(Concat { input, .. }) => input.schema(),
Expand All @@ -35,6 +41,7 @@ impl LogicalPlan {
pub fn partition_spec(&self) -> Arc<PartitionSpec> {
match self {
Self::Source(Source { partition_spec, .. }) => partition_spec.clone(),
Self::Project(Project { input, .. }) => input.partition_spec(),
Self::Filter(Filter { input, .. }) => input.partition_spec(),
Self::Limit(Limit { input, .. }) => input.partition_spec(),
Self::Sort(Sort { input, sort_by, .. }) => PartitionSpec::new_internal(
Expand All @@ -54,6 +61,9 @@ impl LogicalPlan {
Some(partition_by.clone()),
)
.into(),
Self::Coalesce(Coalesce { num_to, .. }) => {
PartitionSpec::new_internal(PartitionScheme::Unknown, *num_to, None).into()
}
Self::Distinct(Distinct { input, .. }) => input.partition_spec(),
Self::Aggregate(Aggregate { input, .. }) => input.partition_spec(), // TODO
Self::Concat(Concat { input, other }) => PartitionSpec::new_internal(
Expand All @@ -69,10 +79,12 @@ impl LogicalPlan {
pub fn children(&self) -> Vec<&Self> {
match self {
Self::Source(..) => vec![],
Self::Project(Project { input, .. }) => vec![input],
Self::Filter(Filter { input, .. }) => vec![input],
Self::Limit(Limit { input, .. }) => vec![input],
Self::Sort(Sort { input, .. }) => vec![input],
Self::Repartition(Repartition { input, .. }) => vec![input],
Self::Coalesce(Coalesce { input, .. }) => vec![input],
Self::Distinct(Distinct { input, .. }) => vec![input],
Self::Aggregate(Aggregate { input, .. }) => vec![input],
Self::Concat(Concat { input, other }) => vec![input, other],
Expand All @@ -83,10 +95,12 @@ impl LogicalPlan {
pub fn multiline_display(&self) -> Vec<String> {
match self {
Self::Source(source) => source.multiline_display(),
Self::Project(Project { projection, .. }) => vec![format!("Project: {projection:?}")],
Self::Filter(Filter { predicate, .. }) => vec![format!("Filter: {predicate}")],
Self::Limit(Limit { limit, .. }) => vec![format!("Limit: {limit}")],
Self::Sort(sort) => sort.multiline_display(),
Self::Repartition(repartition) => repartition.multiline_display(),
Self::Coalesce(Coalesce { num_to, .. }) => vec![format!("Coalesce: {num_to}")],
Self::Distinct(_) => vec!["Distinct".to_string()],
Self::Aggregate(aggregate) => aggregate.multiline_display(),
Self::Concat(_) => vec!["Concat".to_string()],
Expand All @@ -112,10 +126,12 @@ macro_rules! impl_from_data_struct_for_logical_plan {
}

impl_from_data_struct_for_logical_plan!(Source);
impl_from_data_struct_for_logical_plan!(Project);
impl_from_data_struct_for_logical_plan!(Filter);
impl_from_data_struct_for_logical_plan!(Limit);
impl_from_data_struct_for_logical_plan!(Sort);
impl_from_data_struct_for_logical_plan!(Repartition);
impl_from_data_struct_for_logical_plan!(Coalesce);
impl_from_data_struct_for_logical_plan!(Distinct);
impl_from_data_struct_for_logical_plan!(Aggregate);
impl_from_data_struct_for_logical_plan!(Concat);
Expand Down
17 changes: 17 additions & 0 deletions src/daft-plan/src/ops/coalesce.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use std::sync::Arc;

use crate::LogicalPlan;

#[derive(Clone, Debug)]
pub struct Coalesce {
// Number of partitions to coalesce to.
pub num_to: usize,
// Upstream node.
pub input: Arc<LogicalPlan>,
}

impl Coalesce {
pub(crate) fn new(num_to: usize, input: Arc<LogicalPlan>) -> Self {
Self { num_to, input }
}
}
4 changes: 4 additions & 0 deletions src/daft-plan/src/ops/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
mod agg;
mod coalesce;
mod concat;
mod distinct;
mod filter;
mod limit;
mod project;
mod repartition;
mod sink;
mod sort;
mod source;

pub use agg::Aggregate;
pub use coalesce::Coalesce;
pub use concat::Concat;
pub use distinct::Distinct;
pub use filter::Filter;
pub use limit::Limit;
pub use project::Project;
pub use repartition::Repartition;
pub use sink::Sink;
pub use sort::Sort;
Expand Down
28 changes: 28 additions & 0 deletions src/daft-plan/src/ops/project.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use std::sync::Arc;

use daft_core::schema::SchemaRef;
use daft_dsl::Expr;

use crate::LogicalPlan;

#[derive(Clone, Debug)]
pub struct Project {
pub projection: Vec<Expr>,
pub projected_schema: SchemaRef,
// Upstream node.
pub input: Arc<LogicalPlan>,
}

impl Project {
pub(crate) fn new(
projection: Vec<Expr>,
projected_schema: SchemaRef,
input: Arc<LogicalPlan>,
) -> Self {
Self {
projection,
projected_schema,
input,
}
}
}
15 changes: 15 additions & 0 deletions src/daft-plan/src/physical_ops/flatten.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use std::sync::Arc;

use crate::physical_plan::PhysicalPlan;

#[derive(Clone, Debug)]
pub struct Flatten {
// Upstream node.
pub input: Arc<PhysicalPlan>,
}

impl Flatten {
pub(crate) fn new(input: Arc<PhysicalPlan>) -> Self {
Self { input }
}
}
4 changes: 4 additions & 0 deletions src/daft-plan/src/physical_ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ mod concat;
mod csv;
mod fanout;
mod filter;
mod flatten;
#[cfg(feature = "python")]
mod in_memory;
mod json;
mod limit;
mod parquet;
mod project;
mod reduce;
mod sort;
mod split;
Expand All @@ -19,11 +21,13 @@ pub use concat::Concat;
pub use csv::{TabularScanCsv, TabularWriteCsv};
pub use fanout::{FanoutByHash, FanoutByRange, FanoutRandom};
pub use filter::Filter;
pub use flatten::Flatten;
#[cfg(feature = "python")]
pub use in_memory::InMemoryScan;
pub use json::{TabularScanJson, TabularWriteJson};
pub use limit::Limit;
pub use parquet::{TabularScanParquet, TabularWriteParquet};
pub use project::Project;
pub use reduce::ReduceMerge;
pub use sort::Sort;
pub use split::Split;
18 changes: 18 additions & 0 deletions src/daft-plan/src/physical_ops/project.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use std::sync::Arc;

use daft_dsl::Expr;

use crate::physical_plan::PhysicalPlan;

#[derive(Clone, Debug)]
pub struct Project {
pub projection: Vec<Expr>,
// Upstream node.
pub input: Arc<PhysicalPlan>,
}

impl Project {
pub(crate) fn new(projection: Vec<Expr>, input: Arc<PhysicalPlan>) -> Self {
Self { projection, input }
}
}
22 changes: 22 additions & 0 deletions src/daft-plan/src/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ pub enum PhysicalPlan {
TabularScanParquet(TabularScanParquet),
TabularScanCsv(TabularScanCsv),
TabularScanJson(TabularScanJson),
Project(Project),
Filter(Filter),
Limit(Limit),
Sort(Sort),
Split(Split),
Flatten(Flatten),
FanoutRandom(FanoutRandom),
FanoutByHash(FanoutByHash),
#[allow(dead_code)]
Expand Down Expand Up @@ -167,6 +169,18 @@ impl PhysicalPlan {
limit,
..
}) => tabular_scan(py, schema, file_info, file_format_config, limit),
PhysicalPlan::Project(Project { input, projection }) => {
let upstream_iter = input.to_partition_tasks(py, psets)?;
let projection_pyexprs: Vec<PyExpr> = projection
.iter()
.map(|expr| PyExpr::from(expr.clone()))
.collect();
let py_iter = py
.import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))?
.getattr(pyo3::intern!(py, "project"))?
.call1((upstream_iter, projection_pyexprs))?;
Ok(py_iter.into())
}
PhysicalPlan::Filter(Filter { input, predicate }) => {
let upstream_iter = input.to_partition_tasks(py, psets)?;
let expressions_mod =
Expand Down Expand Up @@ -242,6 +256,14 @@ impl PhysicalPlan {
.call1((upstream_iter, *input_num_partitions, *output_num_partitions))?;
Ok(py_iter.into())
}
PhysicalPlan::Flatten(Flatten { input }) => {
let upstream_iter = input.to_partition_tasks(py, psets)?;
let py_iter = py
.import(pyo3::intern!(py, "daft.execution.physical_plan"))?
.getattr(pyo3::intern!(py, "flatten_plan"))?
.call1((upstream_iter,))?;
Ok(py_iter.into())
}
PhysicalPlan::FanoutRandom(FanoutRandom {
input,
num_partitions,
Expand Down
Loading

0 comments on commit 6754d67

Please sign in to comment.