Skip to content

Commit 49640b2

Browse files
committed
Datastore revamp 4: sunset MsgId
1 parent 698e51b commit 49640b2

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+460
-443
lines changed

crates/re_arrow_store/benches/data_store.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ use re_arrow_store::{DataStore, DataStoreConfig, LatestAtQuery, RangeQuery, Time
88
use re_log_types::{
99
component_types::{InstanceKey, Rect2D},
1010
datagen::{build_frame_nr, build_some_instances, build_some_rects},
11-
Component as _, ComponentName, DataCell, DataRow, DataTable, EntityPath, MsgId, TimeType,
12-
Timeline,
11+
Component as _, ComponentName, DataCell, DataRow, DataTable, EntityPath, RowId, TableId,
12+
TimeType, Timeline,
1313
};
1414

1515
criterion_group!(benches, insert, latest_at, latest_at_missing, range);
@@ -262,10 +262,10 @@ fn range(c: &mut Criterion) {
262262

263263
fn build_table(n: usize, packed: bool) -> DataTable {
264264
let mut table = DataTable::from_rows(
265-
MsgId::ZERO,
265+
TableId::ZERO,
266266
(0..NUM_ROWS).map(move |frame_idx| {
267267
DataRow::from_cells2(
268-
MsgId::random(),
268+
RowId::random(),
269269
"rects",
270270
[build_frame_nr(frame_idx.into())],
271271
n as _,
@@ -277,7 +277,7 @@ fn build_table(n: usize, packed: bool) -> DataTable {
277277
// Do a serialization roundtrip to pack everything in contiguous memory.
278278
if packed {
279279
let (schema, columns) = table.serialize().unwrap();
280-
table = DataTable::deserialize(MsgId::ZERO, &schema, &columns).unwrap();
280+
table = DataTable::deserialize(TableId::ZERO, &schema, &columns).unwrap();
281281
}
282282

283283
table
@@ -304,7 +304,7 @@ fn latest_data_at<const N: usize>(
304304

305305
store
306306
.latest_at(&timeline_query, &ent_path, primary, secondaries)
307-
.unwrap_or_else(|| [(); N].map(|_| None))
307+
.map_or_else(|| [(); N].map(|_| None), |(_, cells)| cells)
308308
}
309309

310310
fn range_data<const N: usize>(

crates/re_arrow_store/src/polars_util.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use itertools::Itertools;
22
use polars_core::{prelude::*, series::Series};
33
use polars_ops::prelude::*;
4-
use re_log_types::{ComponentName, DataCell, EntityPath, TimeInt};
4+
use re_log_types::{ComponentName, DataCell, EntityPath, RowId, TimeInt};
55

66
use crate::{ArrayExt, DataStore, LatestAtQuery, RangeQuery};
77

@@ -37,9 +37,9 @@ pub fn latest_component(
3737
let cluster_key = store.cluster_key();
3838

3939
let components = &[cluster_key, primary];
40-
let cells = store
40+
let (_, cells) = store
4141
.latest_at(query, ent_path, primary, components)
42-
.unwrap_or([(); 2].map(|_| None));
42+
.unwrap_or((RowId::ZERO, [(); 2].map(|_| None)));
4343

4444
dataframe_from_cells(&cells)
4545
}

crates/re_arrow_store/src/store_read.rs

+21-22
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use itertools::Itertools;
44
use nohash_hasher::IntSet;
55
use re_log::trace;
66
use re_log_types::{
7-
ComponentName, DataCell, EntityPath, MsgId, RowId, TimeInt, TimePoint, TimeRange, Timeline,
7+
ComponentName, DataCell, EntityPath, RowId, TimeInt, TimePoint, TimeRange, Timeline,
88
};
99
use smallvec::SmallVec;
1010

@@ -151,7 +151,7 @@ impl DataStore {
151151
///
152152
/// ```rust
153153
/// # use polars_core::{prelude::*, series::Series};
154-
/// # use re_log_types::{ComponentName, EntityPath, TimeInt};
154+
/// # use re_log_types::{ComponentName, EntityPath, RowId, TimeInt};
155155
/// # use re_arrow_store::{DataStore, LatestAtQuery, RangeQuery};
156156
/// #
157157
/// pub fn latest_component(
@@ -163,9 +163,9 @@ impl DataStore {
163163
/// let cluster_key = store.cluster_key();
164164
///
165165
/// let components = &[cluster_key, primary];
166-
/// let cells = store
167-
/// .latest_at(query, ent_path, primary, components)
168-
/// .unwrap_or([(); 2].map(|_| None));
166+
/// let (_, cells) = store
167+
/// .latest_at(&query, ent_path, primary, components)
168+
/// .unwrap_or((RowId::ZERO, [(); 2].map(|_| None)));
169169
///
170170
/// let series: Result<Vec<_>, _> = cells
171171
/// .iter()
@@ -193,7 +193,7 @@ impl DataStore {
193193
ent_path: &EntityPath,
194194
primary: ComponentName,
195195
components: &[ComponentName; N],
196-
) -> Option<[Option<DataCell>; N]> {
196+
) -> Option<(RowId, [Option<DataCell>; N])> {
197197
crate::profile_function!();
198198

199199
// TODO(cmc): kind & query_id need to somehow propagate through the span system.
@@ -232,7 +232,7 @@ impl DataStore {
232232
// return the results immediately.
233233
if cells
234234
.as_ref()
235-
.map_or(false, |cells| cells.iter().all(Option::is_some))
235+
.map_or(false, |(_, cells)| cells.iter().all(Option::is_some))
236236
{
237237
return cells;
238238
}
@@ -260,13 +260,13 @@ impl DataStore {
260260
(None, Some(cells_timeless)) => return Some(cells_timeless),
261261
// we have both temporal & timeless cells: let's merge the two when it makes sense
262262
// and return the end result.
263-
(Some(mut cells), Some(cells_timeless)) => {
263+
(Some((row_id, mut cells)), Some((_, cells_timeless))) => {
264264
for (i, row_idx) in cells_timeless.into_iter().enumerate() {
265265
if cells[i].is_none() {
266266
cells[i] = row_idx;
267267
}
268268
}
269-
return Some(cells);
269+
return Some((row_id, cells));
270270
}
271271
// no cells at all.
272272
(None, None) => {}
@@ -320,7 +320,7 @@ impl DataStore {
320320
/// ```rust
321321
/// # use arrow2::array::Array;
322322
/// # use polars_core::{prelude::*, series::Series};
323-
/// # use re_log_types::{ComponentName, DataCell, EntityPath, TimeInt};
323+
/// # use re_log_types::{ComponentName, DataCell, EntityPath, RowId, TimeInt};
324324
/// # use re_arrow_store::{DataStore, LatestAtQuery, RangeQuery};
325325
/// #
326326
/// # pub fn dataframe_from_cells<const N: usize>(
@@ -354,9 +354,9 @@ impl DataStore {
354354
/// let latest_time = query.range.min.as_i64().saturating_sub(1).into();
355355
/// let df_latest = {
356356
/// let query = LatestAtQuery::new(query.timeline, latest_time);
357-
/// let cells = store
357+
/// let (_, cells) = store
358358
/// .latest_at(&query, ent_path, primary, &components)
359-
/// .unwrap_or([(); 2].map(|_| None));
359+
/// .unwrap_or((RowId::ZERO, [(); 2].map(|_| None)));
360360
/// dataframe_from_cells(cells)
361361
/// };
362362
///
@@ -425,10 +425,10 @@ impl DataStore {
425425
}
426426
}
427427

428-
pub fn get_msg_metadata(&self, msg_id: &MsgId) -> Option<&TimePoint> {
428+
pub fn get_msg_metadata(&self, row_id: &RowId) -> Option<&TimePoint> {
429429
crate::profile_function!();
430430

431-
self.metadata_registry.get(msg_id)
431+
self.metadata_registry.get(row_id)
432432
}
433433

434434
/// Sort all unsorted indices in the store.
@@ -452,7 +452,7 @@ impl IndexedTable {
452452
time: TimeInt,
453453
primary: ComponentName,
454454
components: &[ComponentName; N],
455-
) -> Option<[Option<DataCell>; N]> {
455+
) -> Option<(RowId, [Option<DataCell>; N])> {
456456
crate::profile_function!();
457457

458458
// Early-exit if this entire table is unaware of this component.
@@ -660,16 +660,17 @@ impl IndexedBucket {
660660
time: TimeInt,
661661
primary: ComponentName,
662662
components: &[ComponentName; N],
663-
) -> Option<[Option<DataCell>; N]> {
663+
) -> Option<(RowId, [Option<DataCell>; N])> {
664664
crate::profile_function!();
665+
665666
self.sort_indices_if_needed();
666667

667668
let IndexedBucketInner {
668669
is_sorted,
669670
time_range: _,
670671
col_time,
671672
col_insert_id: _,
672-
col_row_id: _,
673+
col_row_id,
673674
col_num_instances: _,
674675
columns,
675676
size_bytes: _,
@@ -679,8 +680,6 @@ impl IndexedBucket {
679680
// Early-exit if this bucket is unaware of this component.
680681
let column = columns.get(&primary)?;
681682

682-
crate::profile_function!();
683-
684683
trace!(
685684
kind = "latest_at",
686685
%primary,
@@ -759,7 +758,7 @@ impl IndexedBucket {
759758
}
760759
}
761760

762-
Some(cells)
761+
Some((col_row_id[secondary_row_nr as usize], cells))
763762
}
764763

765764
/// Iterates the bucket in order to return the cells of the the specified `components`,
@@ -983,7 +982,7 @@ impl PersistentIndexedTable {
983982
&self,
984983
primary: ComponentName,
985984
components: &[ComponentName; N],
986-
) -> Option<[Option<DataCell>; N]> {
985+
) -> Option<(RowId, [Option<DataCell>; N])> {
987986
if self.is_empty() {
988987
return None;
989988
}
@@ -1057,7 +1056,7 @@ impl PersistentIndexedTable {
10571056
}
10581057
}
10591058

1060-
Some(cells)
1059+
Some((self.col_row_id[secondary_row_nr as usize], cells))
10611060
}
10621061

10631062
/// Iterates the table in order to return the cells of the the specified `components`,

crates/re_arrow_store/src/test_util.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::DataStoreConfig;
77
macro_rules! test_row {
88
($entity:ident @ $frames:tt => $n:expr; [$c0:expr $(,)*]) => {
99
::re_log_types::DataRow::from_cells1(
10-
::re_log_types::MsgId::random(),
10+
::re_log_types::RowId::random(),
1111
$entity.clone(),
1212
$frames,
1313
$n,
@@ -16,7 +16,7 @@ macro_rules! test_row {
1616
};
1717
($entity:ident @ $frames:tt => $n:expr; [$c0:expr, $c1:expr $(,)*]) => {
1818
::re_log_types::DataRow::from_cells2(
19-
::re_log_types::MsgId::random(),
19+
::re_log_types::RowId::random(),
2020
$entity.clone(),
2121
$frames,
2222
$n,

crates/re_arrow_store/tests/correctness.rs

+13-13
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ fn latest_at_emptiness_edge_cases_impl(store: &mut DataStore) {
146146
// bunch of non-existing components
147147
{
148148
let components = &["they".into(), "dont".into(), "exist".into()];
149-
let cells = store
149+
let (_, cells) = store
150150
.latest_at(
151151
&LatestAtQuery::new(timeline_frame_nr, frame40),
152152
&ent_path,
@@ -159,7 +159,7 @@ fn latest_at_emptiness_edge_cases_impl(store: &mut DataStore) {
159159

160160
// empty component list
161161
{
162-
let cells = store
162+
let (_, cells) = store
163163
.latest_at(
164164
&LatestAtQuery::new(timeline_frame_nr, frame40),
165165
&ent_path,
@@ -309,53 +309,53 @@ fn gc_correct() {
309309

310310
// TODO(#1619): bring back garbage collection
311311

312-
// let msg_id_chunks = store.gc(
312+
// let row_id_chunks = store.gc(
313313
// GarbageCollectionTarget::DropAtLeastPercentage(1.0),
314314
// Timeline::new("frame_nr", TimeType::Sequence),
315315
// MsgId::name(),
316316
// );
317317

318-
// let msg_ids = msg_id_chunks
318+
// let row_ids = row_id_chunks
319319
// .iter()
320320
// .flat_map(|chunk| arrow_array_deserialize_iterator::<Option<MsgId>>(&**chunk).unwrap())
321321
// .map(Option::unwrap) // MsgId is always present
322322
// .collect::<ahash::HashSet<_>>();
323-
// assert!(!msg_ids.is_empty());
323+
// assert!(!row_ids.is_empty());
324324

325325
// if let err @ Err(_) = store.sanity_check() {
326326
// store.sort_indices_if_needed();
327327
// eprintln!("{store}");
328328
// err.unwrap();
329329
// }
330330
// check_still_readable(&store);
331-
// for msg_id in &msg_ids {
332-
// assert!(store.get_msg_metadata(msg_id).is_some());
331+
// for row_id in &row_ids {
332+
// assert!(store.get_msg_metadata(row_id).is_some());
333333
// }
334334

335-
// store.clear_msg_metadata(&msg_ids);
335+
// store.clear_msg_metadata(&row_ids);
336336

337337
// if let err @ Err(_) = store.sanity_check() {
338338
// store.sort_indices_if_needed();
339339
// eprintln!("{store}");
340340
// err.unwrap();
341341
// }
342342
// check_still_readable(&store);
343-
// for msg_id in &msg_ids {
344-
// assert!(store.get_msg_metadata(msg_id).is_none());
343+
// for row_id in &row_ids {
344+
// assert!(store.get_msg_metadata(row_id).is_none());
345345
// }
346346

347-
// let msg_id_chunks = store.gc(
347+
// let row_id_chunks = store.gc(
348348
// GarbageCollectionTarget::DropAtLeastPercentage(1.0),
349349
// Timeline::new("frame_nr", TimeType::Sequence),
350350
// MsgId::name(),
351351
// );
352352

353-
// let msg_ids = msg_id_chunks
353+
// let row_ids = row_id_chunks
354354
// .iter()
355355
// .flat_map(|chunk| arrow_array_deserialize_iterator::<Option<MsgId>>(&**chunk).unwrap())
356356
// .map(Option::unwrap) // MsgId is always present
357357
// .collect::<ahash::HashSet<_>>();
358-
// assert!(msg_ids.is_empty());
358+
// assert!(row_ids.is_empty());
359359

360360
// if let err @ Err(_) = store.sanity_check() {
361361
// store.sort_indices_if_needed();

0 commit comments

Comments
 (0)