Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Iceberg partitioned writes #2842

Merged
merged 11 commits into from
Sep 20, 2024
3 changes: 2 additions & 1 deletion daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ from daft.udf import PartialStatefulUDF, PartialStatelessUDF

if TYPE_CHECKING:
import pyarrow as pa
from pyiceberg.partitioning import PartitionSpec as IcebergPartitionSpec
from pyiceberg.schema import Schema as IcebergSchema
from pyiceberg.table import TableProperties as IcebergTableProperties

Expand Down Expand Up @@ -1689,7 +1690,7 @@ class LogicalPlanBuilder:
self,
table_name: str,
table_location: str,
spec_id: int,
partition_spec: IcebergPartitionSpec,
iceberg_schema: IcebergSchema,
iceberg_properties: IcebergTableProperties,
catalog_columns: list[str],
Expand Down
37 changes: 25 additions & 12 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,13 +647,13 @@
DataFrame: The operations that occurred with this write.
"""

if len(table.spec().fields) > 0:
raise ValueError("Cannot write to partitioned Iceberg tables")

import pyarrow as pa
import pyiceberg
from packaging.version import parse

if len(table.spec().fields) > 0 and parse(pyiceberg.__version__) < parse("0.7.0"):
raise ValueError("pyiceberg>=0.7.0 is required to write to a partitioned table")

Check warning on line 655 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L655

Added line #L655 was not covered by tests

if parse(pyiceberg.__version__) < parse("0.6.0"):
raise ValueError(f"Write Iceberg is only supported on pyiceberg>=0.6.0, found {pyiceberg.__version__}")

Expand Down Expand Up @@ -683,19 +683,28 @@
else:
deleted_files = []

schema = table.schema()
partitioning: Dict[str, list] = {schema.find_field(field.source_id).name: [] for field in table.spec().fields}

for data_file in data_files:
operations.append("ADD")
path.append(data_file.file_path)
rows.append(data_file.record_count)
size.append(data_file.file_size_in_bytes)

for field in partitioning.keys():
partitioning[field].append(getattr(data_file.partition, field, None))

for pf in deleted_files:
data_file = pf.file
operations.append("DELETE")
path.append(data_file.file_path)
rows.append(data_file.record_count)
size.append(data_file.file_size_in_bytes)

for field in partitioning.keys():
partitioning[field].append(getattr(data_file.partition, field, None))

if parse(pyiceberg.__version__) >= parse("0.7.0"):
from pyiceberg.table import ALWAYS_TRUE, PropertyUtil, TableProperties

Expand Down Expand Up @@ -735,19 +744,23 @@

merge.commit()

with_operations = {
"operation": pa.array(operations, type=pa.string()),
"rows": pa.array(rows, type=pa.int64()),
"file_size": pa.array(size, type=pa.int64()),
"file_name": pa.array([fp for fp in path], type=pa.string()),
}

if partitioning:
with_operations["partitioning"] = pa.StructArray.from_arrays(
partitioning.values(), names=partitioning.keys()
)

from daft import from_pydict

with_operations = from_pydict(
{
"operation": pa.array(operations, type=pa.string()),
"rows": pa.array(rows, type=pa.int64()),
"file_size": pa.array(size, type=pa.int64()),
"file_name": pa.array([os.path.basename(fp) for fp in path], type=pa.string()),
}
)
# NOTE: We are losing the history of the plan here.
# This is due to the fact that the logical plan of the write_iceberg returns datafiles but we want to return the above data
return with_operations
return from_pydict(with_operations)

@DataframePublicAPI
def write_deltalake(
Expand Down
5 changes: 3 additions & 2 deletions daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from daft.table import MicroPartition, table_io

if TYPE_CHECKING:
from pyiceberg.partitioning import PartitionSpec as IcebergPartitionSpec

Check warning on line 23 in daft/execution/execution_step.py

View check run for this annotation

Codecov / codecov/patch

daft/execution/execution_step.py#L23

Added line #L23 was not covered by tests
from pyiceberg.schema import Schema as IcebergSchema
from pyiceberg.table import TableProperties as IcebergTableProperties

Expand Down Expand Up @@ -387,7 +388,7 @@
base_path: str
iceberg_schema: IcebergSchema
iceberg_properties: IcebergTableProperties
spec_id: int
partition_spec: IcebergPartitionSpec
io_config: IOConfig | None

def run(self, inputs: list[MicroPartition]) -> list[MicroPartition]:
Expand Down Expand Up @@ -415,7 +416,7 @@
base_path=self.base_path,
schema=self.iceberg_schema,
properties=self.iceberg_properties,
spec_id=self.spec_id,
partition_spec=self.partition_spec,
io_config=self.io_config,
)

Expand Down
5 changes: 3 additions & 2 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
T = TypeVar("T")

if TYPE_CHECKING:
from pyiceberg.partitioning import PartitionSpec as IcebergPartitionSpec

Check warning on line 56 in daft/execution/physical_plan.py

View check run for this annotation

Codecov / codecov/patch

daft/execution/physical_plan.py#L56

Added line #L56 was not covered by tests
from pyiceberg.schema import Schema as IcebergSchema
from pyiceberg.table import TableProperties as IcebergTableProperties

Expand Down Expand Up @@ -117,7 +118,7 @@
base_path: str,
iceberg_schema: IcebergSchema,
iceberg_properties: IcebergTableProperties,
spec_id: int,
partition_spec: IcebergPartitionSpec,
io_config: IOConfig | None,
) -> InProgressPhysicalPlan[PartitionT]:
"""Write the results of `child_plan` into pyiceberg data files described by `write_info`."""
Expand All @@ -128,7 +129,7 @@
base_path=base_path,
iceberg_schema=iceberg_schema,
iceberg_properties=iceberg_properties,
spec_id=spec_id,
partition_spec=partition_spec,
io_config=io_config,
),
)
Expand Down
5 changes: 3 additions & 2 deletions daft/execution/rust_physical_plan_shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from daft.table import MicroPartition

if TYPE_CHECKING:
from pyiceberg.partitioning import PartitionSpec as IcebergPartitionSpec

Check warning on line 23 in daft/execution/rust_physical_plan_shim.py

View check run for this annotation

Codecov / codecov/patch

daft/execution/rust_physical_plan_shim.py#L23

Added line #L23 was not covered by tests
from pyiceberg.schema import Schema as IcebergSchema
from pyiceberg.table import TableProperties as IcebergTableProperties

Expand Down Expand Up @@ -343,15 +344,15 @@
base_path: str,
iceberg_schema: IcebergSchema,
iceberg_properties: IcebergTableProperties,
spec_id: int,
partition_spec: IcebergPartitionSpec,
io_config: IOConfig | None,
) -> physical_plan.InProgressPhysicalPlan[PartitionT]:
return physical_plan.iceberg_write(
input,
base_path=base_path,
iceberg_schema=iceberg_schema,
iceberg_properties=iceberg_properties,
spec_id=spec_id,
partition_spec=partition_spec,
io_config=io_config,
)

Expand Down
2 changes: 1 addition & 1 deletion daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3306,7 +3306,7 @@ def days(self) -> Expression:
"""Partitioning Transform that returns the number of days since epoch (1970-01-01)

Returns:
Expression: Date Type Expression
Expression: Int32 Expression in days
"""
return Expression._from_pyexpr(self._expr.partitioning_days())

Expand Down
15 changes: 11 additions & 4 deletions daft/iceberg/iceberg_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@
source_name, DataType.from_arrow_type(schema_to_pyarrow(iceberg_schema.find_type(source_name)))
)
transform = pfield.transform
iceberg_result_type = transform.result_type(source_field.field_type)
arrow_result_type = schema_to_pyarrow(iceberg_result_type)
daft_result_type = DataType.from_arrow_type(arrow_result_type)
result_field = Field.create(name, daft_result_type)
source_type = DataType.from_arrow_type(schema_to_pyarrow(source_field.field_type))

from pyiceberg.transforms import (
BucketTransform,
DayTransform,
Expand All @@ -58,22 +56,31 @@
tfm = None
if isinstance(transform, IdentityTransform):
tfm = PartitionTransform.identity()
result_type = source_type
elif isinstance(transform, YearTransform):
tfm = PartitionTransform.year()
result_type = DataType.int32()
elif isinstance(transform, MonthTransform):
tfm = PartitionTransform.month()
result_type = DataType.int32()
elif isinstance(transform, DayTransform):
tfm = PartitionTransform.day()
result_type = DataType.int32()
elif isinstance(transform, HourTransform):
tfm = PartitionTransform.hour()
result_type = DataType.int32()
elif isinstance(transform, BucketTransform):
n = transform.num_buckets
tfm = PartitionTransform.iceberg_bucket(n)
result_type = DataType.int32()
elif isinstance(transform, TruncateTransform):
w = transform.width
tfm = PartitionTransform.iceberg_truncate(w)
result_type = source_type
else:
warnings.warn(f"{transform} not implemented, Please make an issue!")
result_type = source_type

Check warning on line 82 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L82

Added line #L82 was not covered by tests
result_field = Field.create(name, result_type)
return make_partition_field(result_field, daft_field, transform=tfm)


Expand Down
122 changes: 122 additions & 0 deletions daft/iceberg/iceberg_write.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import datetime
import uuid
import warnings
from typing import TYPE_CHECKING, Any

from daft import Expression, Series, col
from daft.table import MicroPartition

if TYPE_CHECKING:
import pyarrow as pa
from pyiceberg.partitioning import PartitionField as IcebergPartitionField
from pyiceberg.schema import Schema as IcebergSchema

Check warning on line 12 in daft/iceberg/iceberg_write.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_write.py#L10-L12

Added lines #L10 - L12 were not covered by tests


def add_missing_columns(table: MicroPartition, schema: "pa.Schema") -> MicroPartition:
"""Add null values for columns in the schema that are missing from the table."""

import pyarrow as pa

existing_columns = set(table.column_names())

columns = {}
for name in schema.names:
if name in existing_columns:
columns[name] = table.get_column(name)
else:
columns[name] = Series.from_arrow(pa.nulls(len(table), type=schema.field(name).type), name=name)

return MicroPartition.from_pydict(columns)


def coerce_pyarrow_table_to_schema(pa_table: "pa.Table", schema: "pa.Schema") -> "pa.Table":
"""Coerces a PyArrow table to the supplied schema

1. For each field in `pa_table`, cast it to the field in `input_schema` if one with a matching name
is available
2. Reorder the fields in the casted table to the supplied schema, dropping any fields in `pa_table`
that do not exist in the supplied schema
3. If any fields in the supplied schema are not present, add a null array of the correct type

This ensures that we populate field_id for iceberg as well as fill in null values where needed
This might break for nested fields with large_strings
we should test that behavior

Args:
pa_table (pa.Table): Table to coerce
schema (pa.Schema): Iceberg schema to coerce to

Returns:
pa.Table: Table with schema == `schema`
"""
import pyarrow as pa

input_schema_names = set(schema.names)

# Perform casting of types to provided schema's types
cast_to_schema = [
(schema.field(inferred_field.name) if inferred_field.name in input_schema_names else inferred_field)
for inferred_field in pa_table.schema
]
casted_table = pa_table.cast(pa.schema(cast_to_schema))

# Reorder and pad columns with a null column where necessary
pa_table_column_names = set(casted_table.column_names)
columns = []
for name in schema.names:
if name in pa_table_column_names:
columns.append(casted_table[name])
else:
columns.append(pa.nulls(len(casted_table), type=schema.field(name).type))

Check warning on line 70 in daft/iceberg/iceberg_write.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_write.py#L70

Added line #L70 was not covered by tests
return pa.table(columns, schema=schema)


def partition_field_to_expr(field: "IcebergPartitionField", schema: "IcebergSchema") -> Expression:
from pyiceberg.transforms import (
BucketTransform,
DayTransform,
HourTransform,
IdentityTransform,
MonthTransform,
TruncateTransform,
YearTransform,
)

partition_col = col(schema.find_field(field.source_id).name)

if isinstance(field.transform, IdentityTransform):
return partition_col
elif isinstance(field.transform, YearTransform):
return partition_col.partitioning.years()
elif isinstance(field.transform, MonthTransform):
return partition_col.partitioning.months()
elif isinstance(field.transform, DayTransform):
return partition_col.partitioning.days()
elif isinstance(field.transform, HourTransform):
return partition_col.partitioning.hours()
elif isinstance(field.transform, BucketTransform):
return partition_col.partitioning.iceberg_bucket(field.transform.num_buckets)
elif isinstance(field.transform, TruncateTransform):
return partition_col.partitioning.iceberg_truncate(field.transform.width)
else:
warnings.warn(f"{field.transform} not implemented, Please make an issue!")
return partition_col

Check warning on line 103 in daft/iceberg/iceberg_write.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_write.py#L102-L103

Added lines #L102 - L103 were not covered by tests


def to_partition_representation(value: Any):
if value is None:
return None

if isinstance(value, datetime.datetime):
# Convert to microseconds since epoch
return (value - datetime.datetime(1970, 1, 1)) // datetime.timedelta(microseconds=1)
elif isinstance(value, datetime.date):
# Convert to days since epoch
return (value - datetime.date(1970, 1, 1)) // datetime.timedelta(days=1)
elif isinstance(value, datetime.time):
# Convert to microseconds since midnight
return (value.hour * 60 * 60 + value.minute * 60 + value.second) * 1_000_000 + value.microsecond

Check warning on line 118 in daft/iceberg/iceberg_write.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_write.py#L118

Added line #L118 was not covered by tests
elif isinstance(value, uuid.UUID):
return str(value)

Check warning on line 120 in daft/iceberg/iceberg_write.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_write.py#L120

Added line #L120 was not covered by tests
else:
return value
4 changes: 2 additions & 2 deletions daft/logical/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,12 @@ def write_iceberg(self, table: IcebergTable) -> LogicalPlanBuilder:

name = ".".join(table.name())
location = f"{table.location()}/data"
spec_id = table.spec().spec_id
partition_spec = table.spec()
schema = table.schema()
props = table.properties
columns = [col.name for col in schema.columns]
io_config = _convert_iceberg_file_io_properties_to_io_config(table.io.properties)
builder = self._builder.iceberg_write(name, location, spec_id, schema, props, columns, io_config)
builder = self._builder.iceberg_write(name, location, partition_spec, schema, props, columns, io_config)
return LogicalPlanBuilder(builder)

def write_deltalake(
Expand Down
Loading
Loading