Skip to content

Commit 8b0aa2e

Browse files
authored
Datastore revamp 1: new indexing model & core datastructures (#1727)
1 parent dd6f03e commit 8b0aa2e

37 files changed

+2058
-3450
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,10 @@ polars-core = "0.27.1"
8080
polars-lazy = "0.27.1"
8181
polars-ops = "0.27.1"
8282
puffin = "0.14"
83+
smallvec = { version = "1.0", features = ["const_generics", "union"] }
8384
thiserror = "1.0"
8485
time = { version = "0.3", features = ["wasm-bindgen"] }
86+
tinyvec = { version = "1.6", features = ["alloc", "rustc_1_55"] }
8587
tokio = "1.24"
8688
wgpu = { version = "0.15.1", default-features = false }
8789
wgpu-core = { version = "0.15.1", default-features = false }

crates/re_arrow_store/Cargo.toml

+4-4
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,17 @@ re_log.workspace = true
3838

3939
# External dependencies:
4040
ahash.workspace = true
41-
anyhow.workspace = true
4241
arrow2 = { workspace = true, features = [
4342
"compute_concatenate",
4443
"compute_aggregate",
4544
] }
45+
arrow2_convert.workspace = true
4646
document-features = "0.2"
4747
indent = "0.1"
4848
itertools = { workspace = true }
4949
nohash-hasher = "0.2"
5050
parking_lot.workspace = true
51+
smallvec.workspace = true
5152
static_assertions = "1.1"
5253
thiserror.workspace = true
5354

@@ -73,6 +74,7 @@ polars-ops = { workspace = true, optional = true, features = [
7374

7475

7576
[dev-dependencies]
77+
anyhow.workspace = true
7678
criterion = "0.4"
7779
mimalloc.workspace = true
7880
polars-core = { workspace = true, features = [
@@ -85,9 +87,7 @@ polars-core = { workspace = true, features = [
8587
"sort_multiple",
8688
] }
8789
rand = "0.8"
88-
smallvec = { version = "1.0", features = ["const_generics", "union"] }
89-
tinyvec = { version = "1.6", features = ["alloc", "rustc_1_55"] }
90-
90+
tinyvec.workspace = true
9191

9292
[lib]
9393
bench = false

crates/re_arrow_store/benches/data_store.rs

+22-31
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
#[global_allocator]
22
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
33

4-
use arrow2::array::{Array, UnionArray};
4+
use arrow2::array::UnionArray;
55
use criterion::{criterion_group, criterion_main, Criterion};
66

77
use re_arrow_store::{DataStore, DataStoreConfig, LatestAtQuery, RangeQuery, TimeInt, TimeRange};
88
use re_log_types::{
99
component_types::{InstanceKey, Rect2D},
1010
datagen::{build_frame_nr, build_some_instances, build_some_rects},
11-
Component as _, ComponentName, DataRow, DataTable, EntityPath, MsgId, TimeType, Timeline,
11+
Component as _, ComponentName, DataCell, DataRow, DataTable, EntityPath, MsgId, TimeType,
12+
Timeline,
1213
};
1314

1415
criterion_group!(benches, insert, latest_at, latest_at_missing, range);
@@ -73,10 +74,7 @@ fn insert(c: &mut Criterion) {
7374
b.iter(|| {
7475
insert_table(
7576
DataStoreConfig {
76-
index_bucket_nb_rows: num_rows_per_bucket,
77-
component_bucket_nb_rows: num_rows_per_bucket,
78-
index_bucket_size_bytes: u64::MAX,
79-
component_bucket_size_bytes: u64::MAX,
77+
indexed_bucket_num_rows: num_rows_per_bucket,
8078
..Default::default()
8179
},
8280
InstanceKey::name(),
@@ -101,10 +99,11 @@ fn latest_at(c: &mut Criterion) {
10199
group.bench_function("default", |b| {
102100
let store = insert_table(Default::default(), InstanceKey::name(), &table);
103101
b.iter(|| {
104-
let results = latest_data_at(&store, Rect2D::name(), &[Rect2D::name()]);
105-
let rects = results[0]
102+
let cells = latest_data_at(&store, Rect2D::name(), &[Rect2D::name()]);
103+
let rects = cells[0]
106104
.as_ref()
107105
.unwrap()
106+
.as_arrow_ref()
108107
.as_any()
109108
.downcast_ref::<UnionArray>()
110109
.unwrap();
@@ -116,21 +115,19 @@ fn latest_at(c: &mut Criterion) {
116115
for &num_rows_per_bucket in num_rows_per_bucket() {
117116
let store = insert_table(
118117
DataStoreConfig {
119-
index_bucket_nb_rows: num_rows_per_bucket,
120-
component_bucket_nb_rows: num_rows_per_bucket,
121-
index_bucket_size_bytes: u64::MAX,
122-
component_bucket_size_bytes: u64::MAX,
118+
indexed_bucket_num_rows: num_rows_per_bucket,
123119
..Default::default()
124120
},
125121
InstanceKey::name(),
126122
&table,
127123
);
128124
group.bench_function(format!("bucketsz={num_rows_per_bucket}"), |b| {
129125
b.iter(|| {
130-
let results = latest_data_at(&store, Rect2D::name(), &[Rect2D::name()]);
131-
let rects = results[0]
126+
let cells = latest_data_at(&store, Rect2D::name(), &[Rect2D::name()]);
127+
let rects = cells[0]
132128
.as_ref()
133129
.unwrap()
130+
.as_arrow_ref()
134131
.as_any()
135132
.downcast_ref::<UnionArray>()
136133
.unwrap();
@@ -180,10 +177,7 @@ fn latest_at_missing(c: &mut Criterion) {
180177
for &num_rows_per_bucket in num_rows_per_bucket() {
181178
let store = insert_table(
182179
DataStoreConfig {
183-
index_bucket_nb_rows: num_rows_per_bucket,
184-
component_bucket_nb_rows: num_rows_per_bucket,
185-
index_bucket_size_bytes: u64::MAX,
186-
component_bucket_size_bytes: u64::MAX,
180+
indexed_bucket_num_rows: num_rows_per_bucket,
187181
..Default::default()
188182
},
189183
InstanceKey::name(),
@@ -236,25 +230,23 @@ fn range(c: &mut Criterion) {
236230
for &num_rows_per_bucket in num_rows_per_bucket() {
237231
let store = insert_table(
238232
DataStoreConfig {
239-
index_bucket_nb_rows: num_rows_per_bucket,
240-
component_bucket_nb_rows: num_rows_per_bucket,
241-
index_bucket_size_bytes: u64::MAX,
242-
component_bucket_size_bytes: u64::MAX,
233+
indexed_bucket_num_rows: num_rows_per_bucket,
243234
..Default::default()
244235
},
245236
InstanceKey::name(),
246237
&table,
247238
);
248239
group.bench_function(format!("bucketsz={num_rows_per_bucket}"), |b| {
249240
b.iter(|| {
250-
let msgs = range_data(&store, [Rect2D::name()]);
251-
for (cur_time, (time, results)) in msgs.enumerate() {
241+
let rows = range_data(&store, [Rect2D::name()]);
242+
for (cur_time, (time, cells)) in rows.enumerate() {
252243
let time = time.unwrap();
253244
assert_eq!(cur_time as i64, time.as_i64());
254245

255-
let rects = results[0]
246+
let rects = cells[0]
256247
.as_ref()
257248
.unwrap()
249+
.as_arrow_ref()
258250
.as_any()
259251
.downcast_ref::<UnionArray>()
260252
.unwrap();
@@ -305,26 +297,25 @@ fn latest_data_at<const N: usize>(
305297
store: &DataStore,
306298
primary: ComponentName,
307299
secondaries: &[ComponentName; N],
308-
) -> [Option<Box<dyn Array>>; N] {
300+
) -> [Option<DataCell>; N] {
309301
let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
310302
let timeline_query = LatestAtQuery::new(timeline_frame_nr, (NUM_ROWS / 2).into());
311303
let ent_path = EntityPath::from("rects");
312304

313-
let row_indices = store
305+
store
314306
.latest_at(&timeline_query, &ent_path, primary, secondaries)
315-
.unwrap_or_else(|| [(); N].map(|_| None));
316-
store.get(secondaries, &row_indices)
307+
.unwrap_or_else(|| [(); N].map(|_| None))
317308
}
318309

319310
fn range_data<const N: usize>(
320311
store: &DataStore,
321312
components: [ComponentName; N],
322-
) -> impl Iterator<Item = (Option<TimeInt>, [Option<Box<dyn Array>>; N])> + '_ {
313+
) -> impl Iterator<Item = (Option<TimeInt>, [Option<DataCell>; N])> + '_ {
323314
let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
324315
let query = RangeQuery::new(timeline_frame_nr, TimeRange::new(0.into(), NUM_ROWS.into()));
325316
let ent_path = EntityPath::from("rects");
326317

327318
store
328319
.range(&query, &ent_path, components)
329-
.map(move |(time, _, row_indices)| (time, store.get(&components, &row_indices)))
320+
.map(move |(time, _, cells)| (time, cells))
330321
}

crates/re_arrow_store/src/lib.rs

+2-5
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,14 @@ pub mod polars_util;
3333
pub mod test_util;
3434

3535
pub use self::arrow_util::ArrayExt;
36-
pub use self::store::{
37-
DataStore, DataStoreConfig, IndexBucket, IndexRowNr, IndexTable, RowIndex, RowIndexKind,
38-
};
36+
pub use self::store::{DataStore, DataStoreConfig};
3937
pub use self::store_gc::GarbageCollectionTarget;
4038
pub use self::store_read::{LatestAtQuery, RangeQuery};
4139
pub use self::store_stats::DataStoreStats;
4240
pub use self::store_write::{WriteError, WriteResult};
4341

4442
pub(crate) use self::store::{
45-
ComponentBucket, ComponentTable, IndexBucketIndices, PersistentComponentTable,
46-
PersistentIndexTable, SecondaryIndex, TimeIndex,
43+
IndexedBucket, IndexedBucketInner, IndexedTable, PersistentIndexedTable,
4744
};
4845

4946
// Re-exports

crates/re_arrow_store/src/polars_util.rs

+18-18
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
use arrow2::array::Array;
21
use itertools::Itertools;
32
use polars_core::{prelude::*, series::Series};
43
use polars_ops::prelude::*;
5-
use re_log_types::{ComponentName, EntityPath, TimeInt};
4+
use re_log_types::{ComponentName, DataCell, EntityPath, TimeInt};
65

76
use crate::{ArrayExt, DataStore, LatestAtQuery, RangeQuery};
87

@@ -38,12 +37,11 @@ pub fn latest_component(
3837
let cluster_key = store.cluster_key();
3938

4039
let components = &[cluster_key, primary];
41-
let row_indices = store
40+
let cells = store
4241
.latest_at(query, ent_path, primary, components)
43-
.unwrap_or([None; 2]);
44-
let results = store.get(components, &row_indices);
42+
.unwrap_or([(); 2].map(|_| None));
4543

46-
dataframe_from_results(components, results)
44+
dataframe_from_cells(&cells)
4745
}
4846

4947
/// Queries any number of components and their cluster keys from their respective point-of-views,
@@ -161,12 +159,11 @@ pub fn range_components<'a, const N: usize>(
161159
.chain(
162160
store
163161
.range(query, ent_path, components)
164-
.map(move |(time, _, row_indices)| {
165-
let results = store.get(&components, &row_indices);
162+
.map(move |(time, _, cells)| {
166163
(
167164
time,
168-
row_indices[primary_col].is_some(), // is_primary
169-
dataframe_from_results(&components, results),
165+
cells[primary_col].is_some(), // is_primary
166+
dataframe_from_cells(&cells),
170167
)
171168
}),
172169
)
@@ -200,16 +197,19 @@ pub fn range_components<'a, const N: usize>(
200197

201198
// --- Joins ---
202199

203-
pub fn dataframe_from_results<const N: usize>(
204-
components: &[ComponentName; N],
205-
results: [Option<Box<dyn Array>>; N],
200+
// TODO(#1619): none of this mess should be here
201+
202+
pub fn dataframe_from_cells<const N: usize>(
203+
cells: &[Option<DataCell>; N],
206204
) -> SharedResult<DataFrame> {
207-
let series: Result<Vec<_>, _> = components
205+
let series: Result<Vec<_>, _> = cells
208206
.iter()
209-
.zip(results)
210-
.filter_map(|(component, col)| col.map(|col| (component, col)))
211-
.map(|(&component, col)| {
212-
Series::try_from((component.as_str(), col.as_ref().clean_for_polars()))
207+
.flatten()
208+
.map(|cell| {
209+
Series::try_from((
210+
cell.component_name().as_str(),
211+
cell.as_arrow_ref().clean_for_polars(),
212+
))
213213
})
214214
.collect();
215215

0 commit comments

Comments
 (0)