diff --git a/daft/expressions/expressions.py b/daft/expressions/expressions.py index 3dc46487ec..9b263021a5 100644 --- a/daft/expressions/expressions.py +++ b/daft/expressions/expressions.py @@ -3306,7 +3306,7 @@ def days(self) -> Expression: """Partitioning Transform that returns the number of days since epoch (1970-01-01) Returns: - Expression: Date Type Expression + Expression: Int32 Expression in days """ return Expression._from_pyexpr(self._expr.partitioning_days()) diff --git a/daft/iceberg/iceberg_scan.py b/daft/iceberg/iceberg_scan.py index 58241ca217..ee9b221699 100644 --- a/daft/iceberg/iceberg_scan.py +++ b/daft/iceberg/iceberg_scan.py @@ -41,10 +41,8 @@ def _iceberg_partition_field_to_daft_partition_field( source_name, DataType.from_arrow_type(schema_to_pyarrow(iceberg_schema.find_type(source_name))) ) transform = pfield.transform - iceberg_result_type = transform.result_type(source_field.field_type) - arrow_result_type = schema_to_pyarrow(iceberg_result_type) - daft_result_type = DataType.from_arrow_type(arrow_result_type) - result_field = Field.create(name, daft_result_type) + source_type = DataType.from_arrow_type(schema_to_pyarrow(source_field.field_type)) + from pyiceberg.transforms import ( BucketTransform, DayTransform, @@ -58,22 +56,31 @@ def _iceberg_partition_field_to_daft_partition_field( tfm = None if isinstance(transform, IdentityTransform): tfm = PartitionTransform.identity() + result_type = source_type elif isinstance(transform, YearTransform): tfm = PartitionTransform.year() + result_type = DataType.int32() elif isinstance(transform, MonthTransform): tfm = PartitionTransform.month() + result_type = DataType.int32() elif isinstance(transform, DayTransform): tfm = PartitionTransform.day() + result_type = DataType.int32() elif isinstance(transform, HourTransform): tfm = PartitionTransform.hour() + result_type = DataType.int32() elif isinstance(transform, BucketTransform): n = transform.num_buckets tfm = PartitionTransform.iceberg_bucket(n) + result_type = DataType.int32() elif isinstance(transform, TruncateTransform): w = transform.width tfm = PartitionTransform.iceberg_truncate(w) + result_type = source_type else: warnings.warn(f"{transform} not implemented, Please make an issue!") + result_type = source_type + result_field = Field.create(name, result_type) return make_partition_field(result_field, daft_field, transform=tfm) diff --git a/src/daft-core/src/series/ops/partitioning.rs b/src/daft-core/src/series/ops/partitioning.rs index 18f6fed67a..1acda1720e 100644 --- a/src/daft-core/src/series/ops/partitioning.rs +++ b/src/daft-core/src/series/ops/partitioning.rs @@ -45,7 +45,7 @@ impl Series { ((&years_since_1970 * &months_in_year)? + months_of_this_year)? - month_of_epoch } _ => Err(DaftError::ComputeError(format!( - "Can only run partitioning_years() operation on temporal types, got {}", + "Can only run partitioning_months() operation on temporal types, got {}", self.data_type() ))), }?;