Skip to content

Commit

Permalink
fix iceberg day transform
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinzwang committed Sep 16, 2024
1 parent eaff9cc commit 1963141
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 6 deletions.
2 changes: 1 addition & 1 deletion daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3306,7 +3306,7 @@ def days(self) -> Expression:
"""Partitioning Transform that returns the number of days since epoch (1970-01-01)
Returns:
Expression: Date Type Expression
Expression: Int32 Expression in days
"""
return Expression._from_pyexpr(self._expr.partitioning_days())

Expand Down
15 changes: 11 additions & 4 deletions daft/iceberg/iceberg_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@ def _iceberg_partition_field_to_daft_partition_field(
source_name, DataType.from_arrow_type(schema_to_pyarrow(iceberg_schema.find_type(source_name)))
)
transform = pfield.transform
iceberg_result_type = transform.result_type(source_field.field_type)
arrow_result_type = schema_to_pyarrow(iceberg_result_type)
daft_result_type = DataType.from_arrow_type(arrow_result_type)
result_field = Field.create(name, daft_result_type)
source_type = DataType.from_arrow_type(schema_to_pyarrow(source_field.field_type))

from pyiceberg.transforms import (
BucketTransform,
DayTransform,
Expand All @@ -58,22 +56,31 @@ def _iceberg_partition_field_to_daft_partition_field(
tfm = None
if isinstance(transform, IdentityTransform):
tfm = PartitionTransform.identity()
result_type = source_type
elif isinstance(transform, YearTransform):
tfm = PartitionTransform.year()
result_type = DataType.int32()
elif isinstance(transform, MonthTransform):
tfm = PartitionTransform.month()
result_type = DataType.int32()
elif isinstance(transform, DayTransform):
tfm = PartitionTransform.day()
result_type = DataType.int32()
elif isinstance(transform, HourTransform):
tfm = PartitionTransform.hour()
result_type = DataType.int32()
elif isinstance(transform, BucketTransform):
n = transform.num_buckets
tfm = PartitionTransform.iceberg_bucket(n)
result_type = DataType.int32()
elif isinstance(transform, TruncateTransform):
w = transform.width
tfm = PartitionTransform.iceberg_truncate(w)
result_type = source_type
else:
warnings.warn(f"{transform} not implemented, Please make an issue!")
result_type = source_type

Check warning on line 82 in daft/iceberg/iceberg_scan.py

View check run for this annotation

Codecov / codecov/patch

daft/iceberg/iceberg_scan.py#L82

Added line #L82 was not covered by tests
result_field = Field.create(name, result_type)
return make_partition_field(result_field, daft_field, transform=tfm)


Expand Down
2 changes: 1 addition & 1 deletion src/daft-core/src/series/ops/partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl Series {
((&years_since_1970 * &months_in_year)? + months_of_this_year)? - month_of_epoch
}
_ => Err(DaftError::ComputeError(format!(
"Can only run partitioning_years() operation on temporal types, got {}",
"Can only run partitioning_months() operation on temporal types, got {}",

Check warning on line 48 in src/daft-core/src/series/ops/partitioning.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-core/src/series/ops/partitioning.rs#L48

Added line #L48 was not covered by tests
self.data_type()
))),
}?;
Expand Down

0 comments on commit 1963141

Please sign in to comment.