Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Add Sparse Tensor logical type #2722

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,8 @@ class PyDataType:
@staticmethod
def tensor(dtype: PyDataType, shape: tuple[int, ...] | None = None) -> PyDataType: ...
@staticmethod
def coo_sparse_tensor(dtype: PyDataType, shape: tuple[int, ...] | None = None) -> PyDataType: ...
@staticmethod
def python() -> PyDataType: ...
def to_arrow(self, cast_tensor_type_for_ray: builtins.bool | None = None) -> pyarrow.DataType: ...
def is_numeric(self) -> builtins.bool: ...
Expand Down
24 changes: 24 additions & 0 deletions daft/datatype.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,30 @@ def tensor(
raise ValueError("Tensor shape must be a non-empty tuple of ints, but got: ", shape)
return cls._from_pydatatype(PyDataType.tensor(dtype._dtype, shape))

@classmethod
def coo_sparse_tensor(
cls,
dtype: DataType,
shape: tuple[int, ...] | None = None,
) -> DataType:
"""Create a COO sparse tensor DataType: COO sparse tensor arrays contain COO representation of n-dimensional arrays of data of the provided ``dtype`` as elements, each of the provided
``shape``.

If a ``shape`` is given, each ndarray in the column will have this shape.

If ``shape`` is not given, the ndarrays in the column can have different shapes. This is much more flexible,
but will result in a less compact representation and may be make some operations less efficient.

Args:
dtype: The type of the data contained within the tensor elements.
shape: The shape of each coo sparse tensor in the column. This is ``None`` by default, which allows the shapes of
each tensor element to vary.
"""
if shape is not None:
if not isinstance(shape, tuple) or not shape or any(not isinstance(n, int) for n in shape):
raise ValueError("Tensor shape must be a non-empty tuple of ints, but got: ", shape)
return cls._from_pydatatype(PyDataType.coo_sparse_tensor(dtype._dtype, shape))

@classmethod
def from_arrow_type(cls, arrow_type: pa.lib.DataType) -> DataType:
"""Maps a PyArrow DataType to a Daft DataType"""
Expand Down
4 changes: 3 additions & 1 deletion src/daft-core/src/array/growable/logical_growable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use common_error::DaftResult;
use crate::{
datatypes::{
logical::LogicalArray, DaftDataType, DaftLogicalType, DateType, Decimal128Type,
DurationType, EmbeddingType, Field, FixedShapeImageType, FixedShapeTensorType, ImageType,
DurationType, EmbeddingType, Field, FixedShapeImageType, FixedShapeTensorType, COOSparseTensorType, FixedShapeCOOSparseTensorType, ImageType,
MapType, TensorType, TimeType, TimestampType,
},
DataType, IntoSeries, Series,
Expand Down Expand Up @@ -81,6 +81,8 @@ impl_logical_growable!(LogicalTimeGrowable, TimeType);
impl_logical_growable!(LogicalEmbeddingGrowable, EmbeddingType);
impl_logical_growable!(LogicalFixedShapeImageGrowable, FixedShapeImageType);
impl_logical_growable!(LogicalFixedShapeTensorGrowable, FixedShapeTensorType);
impl_logical_growable!(LogicalCOOSparseTensorGrowable, COOSparseTensorType);
impl_logical_growable!(LogicalFixedShapeCOOSparseTensorGrowable, FixedShapeCOOSparseTensorType);
impl_logical_growable!(LogicalImageGrowable, ImageType);
impl_logical_growable!(LogicalDecimal128Growable, Decimal128Type);
impl_logical_growable!(LogicalTensorGrowable, TensorType);
Expand Down
11 changes: 9 additions & 2 deletions src/daft-core/src/array/growable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use crate::{
array::{FixedSizeListArray, ListArray, StructArray},
datatypes::{
logical::{
DateArray, Decimal128Array, DurationArray, EmbeddingArray, FixedShapeImageArray,
FixedShapeTensorArray, ImageArray, MapArray, TensorArray, TimeArray, TimestampArray,
COOSparseTensorArray, DateArray, Decimal128Array, DurationArray, EmbeddingArray, FixedShapeCOOSparseTensorArray, FixedShapeImageArray, FixedShapeTensorArray, ImageArray, MapArray, TensorArray, TimeArray, TimestampArray
},
BinaryArray, BooleanArray, ExtensionArray, FixedSizeBinaryArray, Float32Array,
Float64Array, Int128Array, Int16Array, Int32Array, Int64Array, Int8Array, NullArray,
Expand Down Expand Up @@ -209,6 +208,14 @@ impl_growable_array!(
FixedShapeTensorArray,
logical_growable::LogicalFixedShapeTensorGrowable<'a>
);
impl_growable_array!(
COOSparseTensorArray,
logical_growable::LogicalCOOSparseTensorGrowable<'a>
);
impl_growable_array!(
FixedShapeCOOSparseTensorArray,
logical_growable::LogicalFixedShapeCOOSparseTensorGrowable<'a>
);
impl_growable_array!(ImageArray, logical_growable::LogicalImageGrowable<'a>);
impl_growable_array!(TensorArray, logical_growable::LogicalTensorGrowable<'a>);
impl_growable_array!(
Expand Down
168 changes: 164 additions & 4 deletions src/daft-core/src/array/ops/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ use super::as_arrow::AsArrow;
use crate::{
array::{
growable::make_growable,
ops::{from_arrow::FromArrow, full::FullNull, image::ImageArraySidecarData},
ops::{from_arrow::FromArrow, full::FullNull, image::ImageArraySidecarData, DaftCompare},
DataArray, FixedSizeListArray, ListArray, StructArray,
},
datatypes::{
logical::{
DateArray, Decimal128Array, DurationArray, EmbeddingArray, FixedShapeImageArray,
FixedShapeTensorArray, ImageArray, LogicalArray, LogicalArrayImpl, MapArray,
TensorArray, TimeArray, TimestampArray,
COOSparseTensorArray, DateArray, Decimal128Array, DurationArray, EmbeddingArray,
FixedShapeCOOSparseTensorArray, FixedShapeImageArray, FixedShapeTensorArray,
ImageArray, LogicalArray, LogicalArrayImpl, MapArray, TensorArray, TimeArray,
TimestampArray,
},
DaftArrayType, DaftArrowBackedType, DaftLogicalType, DataType, Field, ImageMode,
Int32Array, Int64Array, TimeUnit, UInt64Array, Utf8Array,
Expand All @@ -32,7 +33,9 @@ use arrow2::{
},
offset::Offsets,
};
use image::math;
use indexmap::IndexMap;
use num_traits::zero;

#[cfg(feature = "python")]
use {
Expand Down Expand Up @@ -1408,6 +1411,77 @@ impl TensorArray {
);
Ok(tensor_array.into_series())
}
DataType::COOSparseTensor(inner_dtype) => {
let shape_iterator = self.shape_array().into_iter();
let data_iterator = self.data_array().into_iter();
let enumerated_data = shape_iterator
.zip(data_iterator)
.map(|(shape, data)| (shape.unwrap(), data.unwrap()));
let zero_series = Int64Array::from((
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should be able to do Int64Array::from(("item", [0].as_slice()))

"item",
Box::new(arrow2::array::Int64Array::from_iter([Some(0)].iter())),
))
.into_series();
let mut non_zero_values = Vec::new();
let mut non_zero_indices = Vec::new();
let mut offsets = Vec::<usize>::new();
for (shape_series, data_series) in enumerated_data {
let shape_array = shape_series.u64().unwrap();
assert!(
data_series.len()
== shape_array.into_iter().flatten().product::<u64>() as usize
);
let non_zero_mask = data_series.not_equal(&zero_series)?;
let data = data_series.filter(&non_zero_mask)?;
let indices = UInt64Array::arange("item", 0, data_series.len() as i64, 1)?
.into_series()
.filter(&non_zero_mask)?;
offsets.push(data.len());
non_zero_values.push(data);
non_zero_indices.push(indices);
}

let new_dtype = DataType::COOSparseTensor(Box::new(inner_dtype.as_ref().clone()));
let offsets: Offsets<i64> =
Offsets::try_from_iter(non_zero_values.iter().map(|s| s.len()))?;
let non_zero_values_series =
Series::concat(&non_zero_values.iter().collect::<Vec<&Series>>())?;
let non_zero_indices_series =
Series::concat(&non_zero_indices.iter().collect::<Vec<&Series>>())?;
let offsets_cloned = offsets.clone();
let data_list_arr = ListArray::new(
Field::new(
"values",
DataType::List(Box::new(non_zero_values_series.data_type().clone())),
),
non_zero_values_series,
offsets.into(),
None,
);
let indices_list_arr = ListArray::new(
Field::new(
"indices",
DataType::List(Box::new(non_zero_indices_series.data_type().clone())),
),
non_zero_indices_series,
offsets_cloned.into(),
None,
);
let sparse_struct_array = StructArray::new(
Field::new(self.name(), new_dtype.to_physical()),
vec![
data_list_arr.into_series(),
indices_list_arr.into_series(),
self.shape_array().clone().into_series(),
],
None,
);
Ok(COOSparseTensorArray::new(
Field::new(sparse_struct_array.name(), new_dtype.clone()),
sparse_struct_array,
)
.into_series())
}
DataType::Image(mode) => {
let sa = self.shape_array();
if !(0..self.len()).map(|i| sa.get(i)).all(|s| {
Expand Down Expand Up @@ -1527,6 +1601,86 @@ impl TensorArray {
}
}

impl COOSparseTensorArray {
pub fn cast(&self, dtype: &DataType) -> DaftResult<Series> {
match dtype {
DataType::Tensor(inner_dtype) => {
let non_zero_values_array = self.values_array();
let non_zero_indices_array = self.indices_array();
let shape_array = self.shape_array();
let mut sizes_vec: Vec<usize> = vec![0; shape_array.len()];
for (i, shape) in shape_array.into_iter().enumerate() {
match shape {
Some(shape) => {
let shape = shape.u64().unwrap().as_arrow();
let num_elements =
shape.values().clone().into_iter().product::<u64>() as usize;
sizes_vec[i] = num_elements;
}
_ => {}
}
}
let offsets: Offsets<i64> = Offsets::try_from_iter(sizes_vec.iter().cloned())?;
let mut values = vec![0 as i64; sizes_vec.iter().sum::<usize>() as usize];
for (i, indices) in non_zero_indices_array.into_iter().enumerate() {
for j in 0..indices.unwrap().len() {
let index = non_zero_indices_array
.get(i)
.unwrap()
.u64()
.unwrap()
.as_arrow()
.value(j) as usize;
let list_start_offset = offsets.start_end(i).0;
values[list_start_offset + index] = non_zero_values_array
.get(i)
.unwrap()
.i64()
.unwrap()
.as_arrow()
.value(j);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I assume the type of the values not sure how to both be able to edit the physical memory and keep it dynamically typed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure it is the idiomatic way to do it but, you can create a macro that accepts the inner_dtype and the primitive type it is mapped to, initialize the vector with the primitive and downcast the series with the array type, for example:

macro_rules! implement_cast_for_dense_with_inner_dtype {
    ($dtype:ty, $array_type:ty, $n_values:ident, $non_zero_indices_array:ident, $non_zero_values_array:ident, $offsets:ident) => {{
        let mut values = vec![0 as $dtype; $n_values];
        for (i, indices) in $non_zero_indices_array.into_iter().enumerate() {
            for j in 0..indices.unwrap().len() {
                let index = $non_zero_indices_array
                    .get(i)
                    .unwrap()
                    .u64()
                    .unwrap()
                    .as_arrow()
                    .value(j) as usize;
                let list_start_offset = $offsets.start_end(i).0;
                values[list_start_offset + index] = $non_zero_values_array
                    .get(i)
                    .unwrap()
                    .downcast::<$array_type>()
                    .unwrap()
                    .as_arrow()
                    .value(j);
            }
        }
        Box::new(arrow2::array::PrimitiveArray::from_vec(values))
    }};
}

impl COOSparseTensorArray {
    pub fn cast(&self, dtype: &DataType) -> DaftResult<Series> {
        match dtype {
            DataType::Tensor(inner_dtype) => {
                let non_zero_values_array = self.values_array();
                let non_zero_indices_array = self.indices_array();
                let shape_array = self.shape_array();
                let mut sizes_vec: Vec<usize> = vec![0; shape_array.len()];
                for (i, shape) in shape_array.into_iter().enumerate() {
                    match shape {
                        Some(shape) => {
                            let shape = shape.u64().unwrap().as_arrow();
                            let num_elements =
                                shape.values().clone().into_iter().product::<u64>() as usize;
                            sizes_vec[i] = num_elements;
                        }
                        _ => {}
                    }
                }
                let offsets: Offsets<i64> = Offsets::try_from_iter(sizes_vec.iter().cloned())?;
                let n_values = sizes_vec.iter().sum::<usize>() as usize;
                let item: Box<dyn arrow2::array::Array> = match inner_dtype.as_ref() {
                    DataType::Float32 => implement_cast_for_dense_with_inner_dtype!(f32, Float32Array, n_values, non_zero_indices_array, non_zero_values_array, offsets),
                    DataType::Int64 => implement_cast_for_dense_with_inner_dtype!(i64, Int64Array, n_values, non_zero_indices_array, non_zero_values_array, offsets),
                    _ => panic!("Hi")
                };
                let list_arr = ListArray::new(
                    Field::new(
                        "data",
                        DataType::List(Box::new(inner_dtype.as_ref().clone())),
                    ),
                    Series::try_from((
                        "item",
                        item,
                    ))?,
                    offsets.into(),
                    None,
                ).into_series();
                let physical_type = dtype.to_physical();
                let struct_array = StructArray::new(
                    Field::new(self.name(), physical_type),
                    vec![list_arr, shape_array.clone().into_series()],
                    None
                );
                Ok(
                    TensorArray::new(Field::new(self.name(), dtype.clone()), struct_array)
                        .into_series(),
                )
            }
            (_) => self.physical.cast(dtype),
        }
    }
}

}
let list_arr = ListArray::new(
Field::new(
"data",
DataType::List(Box::new(inner_dtype.as_ref().clone())),
),
Series::try_from((
"item",
Box::new(arrow2::array::PrimitiveArray::from_vec(values))
as Box<dyn arrow2::array::Array>,
))?,
offsets.into(),
None,
).into_series();
let physical_type = dtype.to_physical();
let struct_array = StructArray::new(
Field::new(self.name(), physical_type),
vec![list_arr, shape_array.clone().into_series()],
None
);
Ok(
TensorArray::new(Field::new(self.name(), dtype.clone()), struct_array)
.into_series(),
)
}
(_) => self.physical.cast(dtype),
}
}
}

impl FixedShapeCOOSparseTensorArray {
pub fn cast(&self, dtype: &DataType) -> DaftResult<Series> {
match (dtype, self.data_type()) {
(DataType::FixedShapeTensor(_, _), DataType::FixedShapeCOOSparseTensor(_, _)) => {
todo!("We need to implement this cast")
}
(_, _) => self.physical.cast(dtype),
}
}
}

impl FixedShapeTensorArray {
pub fn cast(&self, dtype: &DataType) -> DaftResult<Series> {
match (dtype, self.data_type()) {
Expand Down Expand Up @@ -1603,6 +1757,12 @@ impl FixedShapeTensorArray {
.into_series(),
)
}
(
DataType::FixedShapeCOOSparseTensor(_, _),
DataType::FixedShapeTensor(inner_type, tensor_shape),
) => {
todo!("We need to implement this cast")
}
// NOTE(Clark): Casting to FixedShapeImage is supported by the physical array cast.
(_, _) => self.physical.cast(dtype),
}
Expand Down
36 changes: 36 additions & 0 deletions src/daft-core/src/array/ops/coo_sparse_tensor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use crate::array::ListArray;
use crate::datatypes::logical::{COOSparseTensorArray, FixedShapeCOOSparseTensorArray};

impl COOSparseTensorArray {
pub fn values_array(&self) -> &ListArray {
const VALUES_IDX: usize = 0;
let array = self.physical.children.get(VALUES_IDX).unwrap();
array.list().unwrap()
}

pub fn indices_array(&self) -> &ListArray {
const INDICES_IDX: usize = 1;
let array = self.physical.children.get(INDICES_IDX).unwrap();
array.list().unwrap()
}

pub fn shape_array(&self) -> &ListArray {
const SHAPE_IDX: usize = 2;
let array = self.physical.children.get(SHAPE_IDX).unwrap();
array.list().unwrap()
}
}

impl FixedShapeCOOSparseTensorArray {
pub fn values_array(&self) -> &ListArray {
const VALUES_IDX: usize = 0;
let array = self.physical.children.get(VALUES_IDX).unwrap();
array.list().unwrap()
}

pub fn indices_array(&self) -> &ListArray {
const INDICES_IDX: usize = 1;
let array = self.physical.children.get(INDICES_IDX).unwrap();
array.list().unwrap()
}
}
1 change: 1 addition & 0 deletions src/daft-core/src/array/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ mod struct_;
mod sum;
mod take;
pub(crate) mod tensor;
pub(crate) mod coo_sparse_tensor;
mod time;
pub mod trigonometry;
mod truncate;
Expand Down
41 changes: 39 additions & 2 deletions src/daft-core/src/array/ops/repr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use crate::{
array::{DataArray, FixedSizeListArray, ListArray, StructArray},
datatypes::{
logical::{
DateArray, Decimal128Array, DurationArray, EmbeddingArray, FixedShapeImageArray,
FixedShapeTensorArray, ImageArray, MapArray, TensorArray, TimeArray, TimestampArray,
COOSparseTensorArray, DateArray, Decimal128Array, DurationArray, EmbeddingArray, FixedShapeCOOSparseTensorArray, FixedShapeImageArray, FixedShapeTensorArray, ImageArray, MapArray, TensorArray, TimeArray, TimestampArray
},
BinaryArray, BooleanArray, DaftNumericType, ExtensionArray, FixedSizeBinaryArray,
ImageFormat, NullArray, UInt64Array, Utf8Array,
Expand Down Expand Up @@ -290,6 +289,26 @@ impl FixedShapeTensorArray {
}
}

impl COOSparseTensorArray {
pub fn str_value(&self, idx: usize) -> DaftResult<String> {
if self.physical.is_valid(idx) {
Ok("<COOSparseTensor>".to_string())
} else {
Ok("None".to_string())
}
}
}

impl FixedShapeCOOSparseTensorArray {
pub fn str_value(&self, idx: usize) -> DaftResult<String> {
if self.physical.is_valid(idx) {
Ok("<FixedShapeCOOSparseTensor>".to_string())
} else {
Ok("None".to_string())
}
}
}

impl TensorArray {
pub fn str_value(&self, idx: usize) -> DaftResult<String> {
let shape_element = self.physical.children[1]
Expand Down Expand Up @@ -479,3 +498,21 @@ impl TensorArray {
.replace('\n', "<br />")
}
}

impl COOSparseTensorArray {
pub fn html_value(&self, idx: usize) -> String {
let str_value = self.str_value(idx).unwrap();
html_escape::encode_text(&str_value)
.into_owned()
.replace('\n', "<br />")
}
}

impl FixedShapeCOOSparseTensorArray {
pub fn html_value(&self, idx: usize) -> String {
let str_value = self.str_value(idx).unwrap();
html_escape::encode_text(&str_value)
.into_owned()
.replace('\n', "<br />")
}
}
Loading
Loading