-
Notifications
You must be signed in to change notification settings - Fork 186
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT] Add Sparse Tensor logical type #2722
Changes from 1 commit
1b606e2
4c58ffb
8d356c6
abe8251
6fab44c
fff1c3f
105793a
75414ee
37a1a5e
4853057
b8220d8
59c9fd5
ae722ae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,14 +4,15 @@ use super::as_arrow::AsArrow; | |
use crate::{ | ||
array::{ | ||
growable::make_growable, | ||
ops::{from_arrow::FromArrow, full::FullNull, image::ImageArraySidecarData}, | ||
ops::{from_arrow::FromArrow, full::FullNull, image::ImageArraySidecarData, DaftCompare}, | ||
DataArray, FixedSizeListArray, ListArray, StructArray, | ||
}, | ||
datatypes::{ | ||
logical::{ | ||
DateArray, Decimal128Array, DurationArray, EmbeddingArray, FixedShapeImageArray, | ||
FixedShapeTensorArray, ImageArray, LogicalArray, LogicalArrayImpl, MapArray, | ||
TensorArray, TimeArray, TimestampArray, | ||
COOSparseTensorArray, DateArray, Decimal128Array, DurationArray, EmbeddingArray, | ||
FixedShapeCOOSparseTensorArray, FixedShapeImageArray, FixedShapeTensorArray, | ||
ImageArray, LogicalArray, LogicalArrayImpl, MapArray, TensorArray, TimeArray, | ||
TimestampArray, | ||
}, | ||
DaftArrayType, DaftArrowBackedType, DaftLogicalType, DataType, Field, ImageMode, | ||
Int32Array, Int64Array, TimeUnit, UInt64Array, Utf8Array, | ||
|
@@ -32,7 +33,9 @@ use arrow2::{ | |
}, | ||
offset::Offsets, | ||
}; | ||
use image::math; | ||
use indexmap::IndexMap; | ||
use num_traits::zero; | ||
|
||
#[cfg(feature = "python")] | ||
use { | ||
|
@@ -1408,6 +1411,77 @@ impl TensorArray { | |
); | ||
Ok(tensor_array.into_series()) | ||
} | ||
DataType::COOSparseTensor(inner_dtype) => { | ||
let shape_iterator = self.shape_array().into_iter(); | ||
let data_iterator = self.data_array().into_iter(); | ||
let enumerated_data = shape_iterator | ||
.zip(data_iterator) | ||
.map(|(shape, data)| (shape.unwrap(), data.unwrap())); | ||
let zero_series = Int64Array::from(( | ||
"item", | ||
Box::new(arrow2::array::Int64Array::from_iter([Some(0)].iter())), | ||
)) | ||
.into_series(); | ||
let mut non_zero_values = Vec::new(); | ||
let mut non_zero_indices = Vec::new(); | ||
let mut offsets = Vec::<usize>::new(); | ||
for (shape_series, data_series) in enumerated_data { | ||
let shape_array = shape_series.u64().unwrap(); | ||
assert!( | ||
data_series.len() | ||
== shape_array.into_iter().flatten().product::<u64>() as usize | ||
); | ||
let non_zero_mask = data_series.not_equal(&zero_series)?; | ||
let data = data_series.filter(&non_zero_mask)?; | ||
let indices = UInt64Array::arange("item", 0, data_series.len() as i64, 1)? | ||
.into_series() | ||
.filter(&non_zero_mask)?; | ||
offsets.push(data.len()); | ||
non_zero_values.push(data); | ||
non_zero_indices.push(indices); | ||
} | ||
|
||
let new_dtype = DataType::COOSparseTensor(Box::new(inner_dtype.as_ref().clone())); | ||
let offsets: Offsets<i64> = | ||
Offsets::try_from_iter(non_zero_values.iter().map(|s| s.len()))?; | ||
let non_zero_values_series = | ||
Series::concat(&non_zero_values.iter().collect::<Vec<&Series>>())?; | ||
let non_zero_indices_series = | ||
Series::concat(&non_zero_indices.iter().collect::<Vec<&Series>>())?; | ||
let offsets_cloned = offsets.clone(); | ||
let data_list_arr = ListArray::new( | ||
Field::new( | ||
"values", | ||
DataType::List(Box::new(non_zero_values_series.data_type().clone())), | ||
), | ||
non_zero_values_series, | ||
offsets.into(), | ||
None, | ||
); | ||
let indices_list_arr = ListArray::new( | ||
Field::new( | ||
"indices", | ||
DataType::List(Box::new(non_zero_indices_series.data_type().clone())), | ||
), | ||
non_zero_indices_series, | ||
offsets_cloned.into(), | ||
None, | ||
); | ||
let sparse_struct_array = StructArray::new( | ||
Field::new(self.name(), new_dtype.to_physical()), | ||
vec![ | ||
data_list_arr.into_series(), | ||
indices_list_arr.into_series(), | ||
self.shape_array().clone().into_series(), | ||
], | ||
None, | ||
); | ||
Ok(COOSparseTensorArray::new( | ||
Field::new(sparse_struct_array.name(), new_dtype.clone()), | ||
sparse_struct_array, | ||
) | ||
.into_series()) | ||
} | ||
DataType::Image(mode) => { | ||
let sa = self.shape_array(); | ||
if !(0..self.len()).map(|i| sa.get(i)).all(|s| { | ||
|
@@ -1527,6 +1601,86 @@ impl TensorArray { | |
} | ||
} | ||
|
||
impl COOSparseTensorArray { | ||
pub fn cast(&self, dtype: &DataType) -> DaftResult<Series> { | ||
match dtype { | ||
DataType::Tensor(inner_dtype) => { | ||
let non_zero_values_array = self.values_array(); | ||
let non_zero_indices_array = self.indices_array(); | ||
let shape_array = self.shape_array(); | ||
let mut sizes_vec: Vec<usize> = vec![0; shape_array.len()]; | ||
for (i, shape) in shape_array.into_iter().enumerate() { | ||
match shape { | ||
Some(shape) => { | ||
let shape = shape.u64().unwrap().as_arrow(); | ||
let num_elements = | ||
shape.values().clone().into_iter().product::<u64>() as usize; | ||
sizes_vec[i] = num_elements; | ||
} | ||
_ => {} | ||
} | ||
} | ||
let offsets: Offsets<i64> = Offsets::try_from_iter(sizes_vec.iter().cloned())?; | ||
let mut values = vec![0 as i64; sizes_vec.iter().sum::<usize>() as usize]; | ||
for (i, indices) in non_zero_indices_array.into_iter().enumerate() { | ||
for j in 0..indices.unwrap().len() { | ||
let index = non_zero_indices_array | ||
.get(i) | ||
.unwrap() | ||
.u64() | ||
.unwrap() | ||
.as_arrow() | ||
.value(j) as usize; | ||
let list_start_offset = offsets.start_end(i).0; | ||
values[list_start_offset + index] = non_zero_values_array | ||
.get(i) | ||
.unwrap() | ||
.i64() | ||
.unwrap() | ||
.as_arrow() | ||
.value(j); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here I assume the type of the values not sure how to both be able to edit the physical memory and keep it dynamically typed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure it is the idiomatic way to do it but, you can create a macro that accepts the inner_dtype and the primitive type it is mapped to, initialize the vector with the primitive and downcast the series with the array type, for example: macro_rules! implement_cast_for_dense_with_inner_dtype {
($dtype:ty, $array_type:ty, $n_values:ident, $non_zero_indices_array:ident, $non_zero_values_array:ident, $offsets:ident) => {{
let mut values = vec![0 as $dtype; $n_values];
for (i, indices) in $non_zero_indices_array.into_iter().enumerate() {
for j in 0..indices.unwrap().len() {
let index = $non_zero_indices_array
.get(i)
.unwrap()
.u64()
.unwrap()
.as_arrow()
.value(j) as usize;
let list_start_offset = $offsets.start_end(i).0;
values[list_start_offset + index] = $non_zero_values_array
.get(i)
.unwrap()
.downcast::<$array_type>()
.unwrap()
.as_arrow()
.value(j);
}
}
Box::new(arrow2::array::PrimitiveArray::from_vec(values))
}};
}
impl COOSparseTensorArray {
pub fn cast(&self, dtype: &DataType) -> DaftResult<Series> {
match dtype {
DataType::Tensor(inner_dtype) => {
let non_zero_values_array = self.values_array();
let non_zero_indices_array = self.indices_array();
let shape_array = self.shape_array();
let mut sizes_vec: Vec<usize> = vec![0; shape_array.len()];
for (i, shape) in shape_array.into_iter().enumerate() {
match shape {
Some(shape) => {
let shape = shape.u64().unwrap().as_arrow();
let num_elements =
shape.values().clone().into_iter().product::<u64>() as usize;
sizes_vec[i] = num_elements;
}
_ => {}
}
}
let offsets: Offsets<i64> = Offsets::try_from_iter(sizes_vec.iter().cloned())?;
let n_values = sizes_vec.iter().sum::<usize>() as usize;
let item: Box<dyn arrow2::array::Array> = match inner_dtype.as_ref() {
DataType::Float32 => implement_cast_for_dense_with_inner_dtype!(f32, Float32Array, n_values, non_zero_indices_array, non_zero_values_array, offsets),
DataType::Int64 => implement_cast_for_dense_with_inner_dtype!(i64, Int64Array, n_values, non_zero_indices_array, non_zero_values_array, offsets),
_ => panic!("Hi")
};
let list_arr = ListArray::new(
Field::new(
"data",
DataType::List(Box::new(inner_dtype.as_ref().clone())),
),
Series::try_from((
"item",
item,
))?,
offsets.into(),
None,
).into_series();
let physical_type = dtype.to_physical();
let struct_array = StructArray::new(
Field::new(self.name(), physical_type),
vec![list_arr, shape_array.clone().into_series()],
None
);
Ok(
TensorArray::new(Field::new(self.name(), dtype.clone()), struct_array)
.into_series(),
)
}
(_) => self.physical.cast(dtype),
}
}
} |
||
} | ||
let list_arr = ListArray::new( | ||
Field::new( | ||
"data", | ||
DataType::List(Box::new(inner_dtype.as_ref().clone())), | ||
), | ||
Series::try_from(( | ||
"item", | ||
Box::new(arrow2::array::PrimitiveArray::from_vec(values)) | ||
as Box<dyn arrow2::array::Array>, | ||
))?, | ||
offsets.into(), | ||
None, | ||
).into_series(); | ||
let physical_type = dtype.to_physical(); | ||
let struct_array = StructArray::new( | ||
Field::new(self.name(), physical_type), | ||
vec![list_arr, shape_array.clone().into_series()], | ||
None | ||
); | ||
Ok( | ||
TensorArray::new(Field::new(self.name(), dtype.clone()), struct_array) | ||
.into_series(), | ||
) | ||
} | ||
(_) => self.physical.cast(dtype), | ||
} | ||
} | ||
} | ||
|
||
impl FixedShapeCOOSparseTensorArray { | ||
pub fn cast(&self, dtype: &DataType) -> DaftResult<Series> { | ||
match (dtype, self.data_type()) { | ||
(DataType::FixedShapeTensor(_, _), DataType::FixedShapeCOOSparseTensor(_, _)) => { | ||
todo!("We need to implement this cast") | ||
} | ||
(_, _) => self.physical.cast(dtype), | ||
} | ||
} | ||
} | ||
|
||
impl FixedShapeTensorArray { | ||
pub fn cast(&self, dtype: &DataType) -> DaftResult<Series> { | ||
match (dtype, self.data_type()) { | ||
|
@@ -1603,6 +1757,12 @@ impl FixedShapeTensorArray { | |
.into_series(), | ||
) | ||
} | ||
( | ||
DataType::FixedShapeCOOSparseTensor(_, _), | ||
DataType::FixedShapeTensor(inner_type, tensor_shape), | ||
) => { | ||
todo!("We need to implement this cast") | ||
} | ||
// NOTE(Clark): Casting to FixedShapeImage is supported by the physical array cast. | ||
(_, _) => self.physical.cast(dtype), | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
use crate::array::ListArray; | ||
use crate::datatypes::logical::{COOSparseTensorArray, FixedShapeCOOSparseTensorArray}; | ||
|
||
impl COOSparseTensorArray { | ||
pub fn values_array(&self) -> &ListArray { | ||
const VALUES_IDX: usize = 0; | ||
let array = self.physical.children.get(VALUES_IDX).unwrap(); | ||
array.list().unwrap() | ||
} | ||
|
||
pub fn indices_array(&self) -> &ListArray { | ||
const INDICES_IDX: usize = 1; | ||
let array = self.physical.children.get(INDICES_IDX).unwrap(); | ||
array.list().unwrap() | ||
} | ||
|
||
pub fn shape_array(&self) -> &ListArray { | ||
const SHAPE_IDX: usize = 2; | ||
let array = self.physical.children.get(SHAPE_IDX).unwrap(); | ||
array.list().unwrap() | ||
} | ||
} | ||
|
||
impl FixedShapeCOOSparseTensorArray { | ||
pub fn values_array(&self) -> &ListArray { | ||
const VALUES_IDX: usize = 0; | ||
let array = self.physical.children.get(VALUES_IDX).unwrap(); | ||
array.list().unwrap() | ||
} | ||
|
||
pub fn indices_array(&self) -> &ListArray { | ||
const INDICES_IDX: usize = 1; | ||
let array = self.physical.children.get(INDICES_IDX).unwrap(); | ||
array.list().unwrap() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should be able to do
Int64Array::from(("item", [0].as_slice()))