Skip to content

Commit e88aaa2

Browse files
authored
Introduce Chunk component-level helpers and UnitChunk (#6990)
This introduces all the usual crazy helpers for when you want to retrieve some very particular piece of data out of a chunk, in one (hopefully) neat, consistent package. In particular this adds `UnitChunk`, a wrapper type for `Chunk` with is guaranteed to only ever hold one row of data, which is going to be very useful when introducing the new `Chunk`-based latest-at API later on.
1 parent e9c0be7 commit e88aaa2

File tree

11 files changed

+432
-102
lines changed

11 files changed

+432
-102
lines changed

crates/store/re_chunk/src/helpers.rs

+360
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,360 @@
1+
use std::sync::Arc;
2+
3+
use arrow2::array::Array as ArrowArray;
4+
5+
use re_log_types::{TimeInt, Timeline};
6+
use re_types_core::{Component, ComponentName, SizeBytes};
7+
8+
use crate::{Chunk, ChunkResult, RowId};
9+
10+
// --- Helpers ---
11+
12+
impl Chunk {
13+
// --- Batch ---
14+
15+
/// Returns the raw data for the specified component.
16+
///
17+
/// Returns an error if the row index is out of bounds.
18+
#[inline]
19+
pub fn component_batch_raw(
20+
&self,
21+
component_name: &ComponentName,
22+
row_index: usize,
23+
) -> Option<ChunkResult<Box<dyn ArrowArray>>> {
24+
self.components.get(component_name).map(|list_array| {
25+
if list_array.len() > row_index {
26+
Ok(list_array.value(row_index))
27+
} else {
28+
Err(crate::ChunkError::IndexOutOfBounds {
29+
kind: "row".to_owned(),
30+
len: list_array.len(),
31+
index: row_index,
32+
})
33+
}
34+
})
35+
}
36+
37+
/// Returns the deserialized data for the specified component.
38+
///
39+
/// Returns an error if the data cannot be deserialized, or if the row index is out of bounds.
40+
#[inline]
41+
pub fn component_batch<C: Component>(&self, row_index: usize) -> Option<ChunkResult<Vec<C>>> {
42+
let res = self.component_batch_raw(&C::name(), row_index)?;
43+
44+
let array = match res {
45+
Ok(array) => array,
46+
Err(err) => return Some(Err(err)),
47+
};
48+
49+
let data = C::from_arrow(&*array);
50+
Some(data.map_err(Into::into))
51+
}
52+
53+
// --- Instance ---
54+
55+
/// Returns the raw data for the specified component at the given instance index.
56+
///
57+
/// Returns an error if either the row index or instance index are out of bounds.
58+
#[inline]
59+
pub fn component_instance_raw(
60+
&self,
61+
component_name: &ComponentName,
62+
row_index: usize,
63+
instance_index: usize,
64+
) -> Option<ChunkResult<Box<dyn ArrowArray>>> {
65+
let res = self.component_batch_raw(component_name, row_index)?;
66+
67+
let array = match res {
68+
Ok(array) => array,
69+
Err(err) => return Some(Err(err)),
70+
};
71+
72+
if array.len() > instance_index {
73+
Some(Ok(array.sliced(instance_index, 1)))
74+
} else {
75+
Some(Err(crate::ChunkError::IndexOutOfBounds {
76+
kind: "instance".to_owned(),
77+
len: array.len(),
78+
index: instance_index,
79+
}))
80+
}
81+
}
82+
83+
/// Returns the component data of the specified instance.
84+
///
85+
/// Returns an error if the data cannot be deserialized, or if either the row index or instance index
86+
/// are out of bounds.
87+
#[inline]
88+
pub fn component_instance<C: Component>(
89+
&self,
90+
row_index: usize,
91+
instance_index: usize,
92+
) -> Option<ChunkResult<C>> {
93+
let res = self.component_instance_raw(&C::name(), row_index, instance_index)?;
94+
95+
let array = match res {
96+
Ok(array) => array,
97+
Err(err) => return Some(Err(err)),
98+
};
99+
100+
match C::from_arrow(&*array) {
101+
Ok(data) => data.into_iter().next().map(Ok), // NOTE: It's already sliced!
102+
Err(err) => Some(Err(err.into())),
103+
}
104+
}
105+
106+
// --- Mono ---
107+
108+
/// Returns the raw data for the specified component, assuming a mono-batch.
109+
///
110+
/// Returns an error if either the row index is out of bounds, or the underlying batch is not
111+
/// of unit length.
112+
#[inline]
113+
pub fn component_mono_raw(
114+
&self,
115+
component_name: &ComponentName,
116+
row_index: usize,
117+
) -> Option<ChunkResult<Box<dyn ArrowArray>>> {
118+
let res = self.component_batch_raw(component_name, row_index)?;
119+
120+
let array = match res {
121+
Ok(array) => array,
122+
Err(err) => return Some(Err(err)),
123+
};
124+
125+
if array.len() == 1 {
126+
Some(Ok(array.sliced(0, 1)))
127+
} else {
128+
Some(Err(crate::ChunkError::IndexOutOfBounds {
129+
kind: "mono".to_owned(),
130+
len: array.len(),
131+
index: 0,
132+
}))
133+
}
134+
}
135+
136+
/// Returns the deserialized data for the specified component, assuming a mono-batch.
137+
///
138+
/// Returns an error if the data cannot be deserialized, or if either the row index is out of bounds,
139+
/// or the underlying batch is not of unit length.
140+
#[inline]
141+
pub fn component_mono<C: Component>(&self, row_index: usize) -> Option<ChunkResult<C>> {
142+
let res = self.component_mono_raw(&C::name(), row_index)?;
143+
144+
let array = match res {
145+
Ok(array) => array,
146+
Err(err) => return Some(Err(err)),
147+
};
148+
149+
match C::from_arrow(&*array) {
150+
Ok(data) => data.into_iter().next().map(Ok), // NOTE: It's already sliced!
151+
Err(err) => Some(Err(err.into())),
152+
}
153+
}
154+
}
155+
156+
// --- Unit ---
157+
158+
/// A simple type alias for an `Arc<Chunk>`.
159+
pub type ChunkShared = Arc<Chunk>;
160+
161+
/// A [`ChunkShared`] that is guaranteed to always contain a single row's worth of data.
162+
#[derive(Debug, Clone)]
163+
pub struct UnitChunkShared(ChunkShared);
164+
165+
impl std::ops::Deref for UnitChunkShared {
166+
type Target = Chunk;
167+
168+
#[inline]
169+
fn deref(&self) -> &Self::Target {
170+
&self.0
171+
}
172+
}
173+
174+
impl SizeBytes for UnitChunkShared {
175+
#[inline]
176+
fn heap_size_bytes(&self) -> u64 {
177+
Chunk::heap_size_bytes(&self.0)
178+
}
179+
}
180+
181+
impl Chunk {
182+
/// Turns the chunk into a [`UnitChunkShared`], if possible.
183+
#[inline]
184+
pub fn to_unit(self: &ChunkShared) -> Option<UnitChunkShared> {
185+
(self.num_rows() == 1).then(|| UnitChunkShared(Arc::clone(self)))
186+
}
187+
188+
/// Turns the chunk into a [`UnitChunkShared`], if possible.
189+
#[inline]
190+
pub fn into_unit(self) -> Option<UnitChunkShared> {
191+
(self.num_rows() == 1).then(|| UnitChunkShared(Arc::new(self)))
192+
}
193+
}
194+
195+
impl UnitChunkShared {
196+
// Turns the unit chunk back into a standard [`Chunk`].
197+
#[inline]
198+
pub fn into_chunk(self) -> ChunkShared {
199+
self.0
200+
}
201+
}
202+
203+
impl UnitChunkShared {
204+
/// Returns the index (`(TimeInt, RowId)` pair) of the single row within, on the given timeline.
205+
///
206+
/// Returns the single static index if the chunk is static.
207+
#[inline]
208+
pub fn index(&self, timeline: &Timeline) -> Option<(TimeInt, RowId)> {
209+
debug_assert!(self.num_rows() == 1);
210+
if self.is_static() {
211+
self.row_ids()
212+
.next()
213+
.map(|row_id| (TimeInt::STATIC, row_id))
214+
} else {
215+
self.timelines.get(timeline).and_then(|time_chunk| {
216+
time_chunk
217+
.times()
218+
.next()
219+
.and_then(|time| self.row_ids().next().map(|row_id| (time, row_id)))
220+
})
221+
}
222+
}
223+
224+
/// Returns the [`RowId`] of the single row within, on the given timeline.
225+
///
226+
/// Returns the single static `RowId` if the chunk is static.
227+
#[inline]
228+
pub fn row_id(&self) -> Option<RowId> {
229+
debug_assert!(self.num_rows() == 1);
230+
self.row_ids().next()
231+
}
232+
233+
/// Returns the number of instances of the single row within.
234+
///
235+
/// The maximum value amongst all components is what's returned.
236+
#[inline]
237+
pub fn num_instances(&self) -> u64 {
238+
self.components
239+
.values()
240+
.map(|list_array| {
241+
list_array.validity().map_or_else(
242+
|| list_array.len(),
243+
|validity| validity.len() - validity.unset_bits(),
244+
)
245+
})
246+
.max()
247+
.unwrap_or(0) as u64
248+
}
249+
}
250+
251+
// --- Unit helpers ---
252+
253+
impl UnitChunkShared {
254+
// --- Batch ---
255+
256+
/// Returns the raw data for the specified component.
257+
#[inline]
258+
pub fn component_batch_raw(
259+
&self,
260+
component_name: &ComponentName,
261+
) -> Option<Box<dyn ArrowArray>> {
262+
debug_assert!(self.num_rows() == 1);
263+
self.components
264+
.get(component_name)
265+
.map(|list_array| list_array.value(0))
266+
}
267+
268+
/// Returns the deserialized data for the specified component.
269+
///
270+
/// Returns an error if the data cannot be deserialized.
271+
#[inline]
272+
pub fn component_batch<C: Component>(&self) -> Option<ChunkResult<Vec<C>>> {
273+
let data = C::from_arrow(&*self.component_batch_raw(&C::name())?);
274+
Some(data.map_err(Into::into))
275+
}
276+
277+
// --- Instance ---
278+
279+
/// Returns the raw data for the specified component at the given instance index.
280+
///
281+
/// Returns an error if the instance index is out of bounds.
282+
#[inline]
283+
pub fn component_instance_raw(
284+
&self,
285+
component_name: &ComponentName,
286+
instance_index: usize,
287+
) -> Option<ChunkResult<Box<dyn ArrowArray>>> {
288+
let array = self.component_batch_raw(component_name)?;
289+
if array.len() > instance_index {
290+
Some(Ok(array.sliced(instance_index, 1)))
291+
} else {
292+
Some(Err(crate::ChunkError::IndexOutOfBounds {
293+
kind: "instance".to_owned(),
294+
len: array.len(),
295+
index: instance_index,
296+
}))
297+
}
298+
}
299+
300+
/// Returns the deserialized data for the specified component at the given instance index.
301+
///
302+
/// Returns an error if the data cannot be deserialized, or if the instance index is out of bounds.
303+
#[inline]
304+
pub fn component_instance<C: Component>(
305+
&self,
306+
instance_index: usize,
307+
) -> Option<ChunkResult<C>> {
308+
let res = self.component_instance_raw(&C::name(), instance_index)?;
309+
310+
let array = match res {
311+
Ok(array) => array,
312+
Err(err) => return Some(Err(err)),
313+
};
314+
315+
match C::from_arrow(&*array) {
316+
Ok(data) => data.into_iter().next().map(Ok), // NOTE: It's already sliced!
317+
Err(err) => Some(Err(err.into())),
318+
}
319+
}
320+
321+
// --- Mono ---
322+
323+
/// Returns the raw data for the specified component, assuming a mono-batch.
324+
///
325+
/// Returns an error if the underlying batch is not of unit length.
326+
#[inline]
327+
pub fn component_mono_raw(
328+
&self,
329+
component_name: &ComponentName,
330+
) -> Option<ChunkResult<Box<dyn ArrowArray>>> {
331+
let array = self.component_batch_raw(component_name)?;
332+
if array.len() == 1 {
333+
Some(Ok(array.sliced(0, 1)))
334+
} else {
335+
Some(Err(crate::ChunkError::IndexOutOfBounds {
336+
kind: "mono".to_owned(),
337+
len: array.len(),
338+
index: 0,
339+
}))
340+
}
341+
}
342+
343+
/// Returns the deserialized data for the specified component, assuming a mono-batch.
344+
///
345+
/// Returns an error if the data cannot be deserialized, or if the underlying batch is not of unit length.
346+
#[inline]
347+
pub fn component_mono<C: Component>(&self) -> Option<ChunkResult<C>> {
348+
let res = self.component_mono_raw(&C::name())?;
349+
350+
let array = match res {
351+
Ok(array) => array,
352+
Err(err) => return Some(Err(err)),
353+
};
354+
355+
match C::from_arrow(&*array) {
356+
Ok(data) => data.into_iter().next().map(Ok), // NOTE: It's already sliced!
357+
Err(err) => Some(Err(err.into())),
358+
}
359+
}
360+
}

0 commit comments

Comments
 (0)