Skip to content

Commit

Permalink
Add support for ResourceRequest.
Browse files Browse the repository at this point in the history
  • Loading branch information
clarkzinzow committed Aug 14, 2023
1 parent 472810d commit 6d9b611
Show file tree
Hide file tree
Showing 24 changed files with 200 additions and 138 deletions.
3 changes: 2 additions & 1 deletion daft/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class daft:
from_pylist,
from_ray_dataset,
)
from daft.daft import ImageFormat
from daft.daft import ImageFormat, ResourceRequest
from daft.dataframe import DataFrame
from daft.datatype import DataType, ImageMode, TimeUnit
from daft.expressions import col, lit
Expand Down Expand Up @@ -106,4 +106,5 @@ class daft:
"TimeUnit",
"register_viz_hook",
"udf",
"ResourceRequest",
]
9 changes: 7 additions & 2 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,18 @@
from daft.api_annotations import DataframePublicAPI
from daft.context import get_context
from daft.convert import InputListType
from daft.daft import FileFormat, JoinType, PartitionScheme, PartitionSpec
from daft.daft import (
FileFormat,
JoinType,
PartitionScheme,
PartitionSpec,
ResourceRequest,
)
from daft.dataframe.preview import DataFramePreview
from daft.datatype import DataType
from daft.errors import ExpressionTypeError
from daft.expressions import Expression, ExpressionsProjection, col, lit
from daft.logical.builder import LogicalPlanBuilder
from daft.resource_request import ResourceRequest
from daft.runners.partitioning import PartitionCacheEntry, PartitionSet
from daft.runners.pyrunner import LocalPartitionSet
from daft.table import Table
Expand Down
2 changes: 1 addition & 1 deletion daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
JoinType,
JsonSourceConfig,
ParquetSourceConfig,
ResourceRequest,
)
from daft.expressions import Expression, ExpressionsProjection, col
from daft.logical.map_partition_ops import MapPartitionOp
from daft.logical.schema import Schema
from daft.resource_request import ResourceRequest
from daft.runners.partitioning import (
PartialPartitionMetadata,
PartitionMetadata,
Expand Down
3 changes: 1 addition & 2 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from loguru import logger

from daft.daft import FileFormat, FileFormatConfig, JoinType
from daft.daft import FileFormat, FileFormatConfig, JoinType, ResourceRequest
from daft.execution import execution_step
from daft.execution.execution_step import (
Instruction,
Expand All @@ -35,7 +35,6 @@
)
from daft.expressions import ExpressionsProjection
from daft.logical.schema import Schema
from daft.resource_request import ResourceRequest
from daft.runners.partitioning import PartialPartitionMetadata

PartitionT = TypeVar("PartitionT")
Expand Down
21 changes: 14 additions & 7 deletions daft/execution/rust_physical_plan_shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@
from typing import Iterator, TypeVar, cast

from daft.context import get_context
from daft.daft import FileFormat, FileFormatConfig, JoinType, PyExpr, PySchema, PyTable
from daft.daft import (
FileFormat,
FileFormatConfig,
JoinType,
PyExpr,
PySchema,
PyTable,
ResourceRequest,
)
from daft.execution import execution_step, physical_plan
from daft.expressions import Expression, ExpressionsProjection
from daft.logical.map_partition_ops import MapPartitionOp
from daft.logical.schema import Schema
from daft.resource_request import ResourceRequest
from daft.table import Table

PartitionT = TypeVar("PartitionT")
Expand All @@ -27,7 +34,7 @@ def local_aggregate(
return physical_plan.pipeline_instruction(
child_plan=input,
pipeable_instruction=aggregation_step,
resource_request=ResourceRequest(), # TODO use real resource request
resource_request=ResourceRequest(),
)


Expand All @@ -52,13 +59,13 @@ def tabular_scan(


def project(
input: physical_plan.InProgressPhysicalPlan[PartitionT], projection: list[PyExpr]
input: physical_plan.InProgressPhysicalPlan[PartitionT], projection: list[PyExpr], resource_request: ResourceRequest
) -> physical_plan.InProgressPhysicalPlan[PartitionT]:
expr_projection = ExpressionsProjection([Expression._from_pyexpr(expr) for expr in projection])
return physical_plan.pipeline_instruction(
child_plan=input,
pipeable_instruction=execution_step.Project(expr_projection),
resource_request=ResourceRequest(), # TODO(Clark): Use real ResourceRequest.
resource_request=resource_request,
)


Expand All @@ -83,7 +90,7 @@ def explode(
return physical_plan.pipeline_instruction(
child_plan=input,
pipeable_instruction=execution_step.MapPartition(explode_op),
resource_request=ResourceRequest(), # TODO(Clark): Use real ResourceRequest.
resource_request=ResourceRequest(),
)


Expand Down Expand Up @@ -115,7 +122,7 @@ def split_by_hash(
return physical_plan.pipeline_instruction(
input,
fanout_instruction,
ResourceRequest(), # TODO(Clark): Propagate resource request.
ResourceRequest(),
)


Expand Down
8 changes: 1 addition & 7 deletions daft/logical/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
JoinType,
PartitionScheme,
PartitionSpec,
ResourceRequest,
)
from daft.expressions.expressions import Expression, ExpressionsProjection
from daft.logical.schema import Schema
from daft.resource_request import ResourceRequest
from daft.runners.partitioning import PartitionCacheEntry

if TYPE_CHECKING:
Expand Down Expand Up @@ -54,12 +54,6 @@ def num_partitions(self) -> int:
"""
return self.partition_spec().num_partitions

@abstractmethod
def resource_request(self) -> ResourceRequest:
"""
Returns a custom ResourceRequest if one has been attached to this logical plan.
"""

@abstractmethod
def pretty_print(self) -> str:
"""
Expand Down
5 changes: 1 addition & 4 deletions daft/logical/logical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
JoinType,
PartitionScheme,
PartitionSpec,
ResourceRequest,
)
from daft.datatype import DataType
from daft.errors import ExpressionTypeError
Expand All @@ -26,7 +27,6 @@
from daft.logical.builder import LogicalPlanBuilder
from daft.logical.map_partition_ops import ExplodeOp, MapPartitionOp
from daft.logical.schema import Schema
from daft.resource_request import ResourceRequest
from daft.runners.partitioning import PartitionCacheEntry
from daft.table import Table

Expand Down Expand Up @@ -58,9 +58,6 @@ def schema(self) -> Schema:
def partition_spec(self) -> PartitionSpec:
return self._plan.partition_spec()

def resource_request(self) -> ResourceRequest:
return self._plan.resource_request()

def pretty_print(self) -> str:
return self._plan.pretty_print()

Expand Down
5 changes: 2 additions & 3 deletions daft/logical/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

from loguru import logger

from daft import resource_request
from daft.daft import PartitionScheme
from daft.daft import PartitionScheme, ResourceRequest
from daft.expressions import ExpressionsProjection, col
from daft.internal.rule import Rule
from daft.logical.logical_plan import (
Expand Down Expand Up @@ -404,7 +403,7 @@ def _drop_double_projection(self, parent: Projection, child: Projection) -> Logi
return Projection(
grandchild,
ExpressionsProjection(new_exprs),
custom_resource_request=resource_request.ResourceRequest.max_resources(
custom_resource_request=ResourceRequest.max_resources(
[parent.resource_request(), child.resource_request()]
),
)
Expand Down
11 changes: 2 additions & 9 deletions daft/logical/rust_logical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@
from daft.context import get_context
from daft.daft import FileFormat, FileFormatConfig, JoinType
from daft.daft import LogicalPlanBuilder as _LogicalPlanBuilder
from daft.daft import PartitionScheme, PartitionSpec
from daft.daft import PartitionScheme, PartitionSpec, ResourceRequest
from daft.errors import ExpressionTypeError
from daft.expressions.expressions import Expression, ExpressionsProjection
from daft.logical.builder import LogicalPlanBuilder
from daft.logical.schema import Schema
from daft.resource_request import ResourceRequest
from daft.runners.partitioning import PartitionCacheEntry

if TYPE_CHECKING:
Expand All @@ -40,10 +39,6 @@ def partition_spec(self) -> PartitionSpec:
# TODO(Clark): Push PartitionSpec into planner.
return self._builder.partition_spec()

def resource_request(self) -> ResourceRequest:
# TODO(Clark): Expose resource request via builder, or push it into the planner.
return ResourceRequest()

def pretty_print(self) -> str:
return repr(self)

Expand Down Expand Up @@ -95,11 +90,9 @@ def project(
projection: ExpressionsProjection,
custom_resource_request: ResourceRequest = ResourceRequest(),
) -> RustLogicalPlanBuilder:
if custom_resource_request != ResourceRequest():
raise NotImplementedError("ResourceRequests not supported for new query planner")
schema = projection.resolve_schema(self.schema())
exprs = [expr._expr for expr in projection]
builder = self._builder.project(exprs, schema._schema)
builder = self._builder.project(exprs, schema._schema, custom_resource_request)
return RustLogicalPlanBuilder(builder)

def filter(self, predicate: Expression) -> RustLogicalPlanBuilder:
Expand Down
52 changes: 0 additions & 52 deletions daft/resource_request.py

This file was deleted.

3 changes: 1 addition & 2 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@
import pyarrow as pa
from loguru import logger

from daft.daft import FileFormatConfig
from daft.daft import FileFormatConfig, ResourceRequest
from daft.execution import physical_plan
from daft.execution.execution_step import Instruction, MaterializedResult, PartitionTask
from daft.filesystem import get_filesystem_from_path, glob_path_with_stats
from daft.internal.gpu import cuda_device_count
from daft.logical.builder import LogicalPlanBuilder
from daft.logical.schema import Schema
from daft.resource_request import ResourceRequest
from daft.runners import runner_io
from daft.runners.partitioning import (
PartID,
Expand Down
3 changes: 1 addition & 2 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
)
raise

from daft.daft import FileFormatConfig
from daft.daft import FileFormatConfig, ResourceRequest
from daft.datatype import DataType
from daft.execution.execution_step import (
FanoutInstruction,
Expand All @@ -35,7 +35,6 @@
SingleOutputPartitionTask,
)
from daft.filesystem import get_filesystem_from_path, glob_path_with_stats
from daft.resource_request import ResourceRequest
from daft.runners import runner_io
from daft.runners.partitioning import (
PartID,
Expand Down
2 changes: 1 addition & 1 deletion docs/source/learn/user_guides/udf.rst
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ Custom resources can be requested when you call :meth:`df.with_column() <daft.Da

.. code:: python
from daft.resource_request import ResourceRequest
from daft import ResourceRequest
# Runs the UDF `func` with the specified resource requests
df = df.with_column(
Expand Down
4 changes: 3 additions & 1 deletion src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use crate::logical_plan::LogicalPlan;
use crate::{logical_plan::LogicalPlan, ResourceRequest};

#[cfg(feature = "python")]
use {
Expand Down Expand Up @@ -82,6 +82,7 @@ impl LogicalPlanBuilder {
&self,
projection: Vec<PyExpr>,
projected_schema: &PySchema,
resource_request: ResourceRequest,
) -> PyResult<LogicalPlanBuilder> {
let projection_exprs = projection
.iter()
Expand All @@ -90,6 +91,7 @@ impl LogicalPlanBuilder {
let logical_plan: LogicalPlan = ops::Project::new(
projection_exprs,
projected_schema.clone().into(),
resource_request,
self.plan.clone(),
)
.into();
Expand Down
3 changes: 3 additions & 0 deletions src/daft-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod partitioning;
mod physical_ops;
mod physical_plan;
mod planner;
mod resource_request;
mod sink_info;
mod source_info;

Expand All @@ -15,6 +16,7 @@ pub use join::JoinType;
pub use logical_plan::LogicalPlan;
pub use partitioning::{PartitionScheme, PartitionSpec};
pub use physical_plan::PhysicalPlanScheduler;
pub use resource_request::ResourceRequest;
pub use source_info::{
CsvSourceConfig, FileFormat, JsonSourceConfig, ParquetSourceConfig, PyFileFormatConfig,
};
Expand All @@ -34,6 +36,7 @@ pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> {
parent.add_class::<PartitionScheme>()?;
parent.add_class::<JoinType>()?;
parent.add_class::<PhysicalPlanScheduler>()?;
parent.add_class::<ResourceRequest>()?;

Ok(())
}
Loading

0 comments on commit 6d9b611

Please sign in to comment.