Skip to content

Commit 5255dfb

Browse files
committed
propagate changes in all libs and binaries
1 parent 1db26ff commit 5255dfb

File tree

47 files changed

+253
-243
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+253
-243
lines changed

crates/store/re_dataframe/src/engine.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use re_chunk::{EntityPath, TransportChunk};
2-
use re_chunk_store::{ChunkStore, ColumnDescriptor, QueryExpression};
2+
use re_chunk_store::{ChunkStoreHandle, ColumnDescriptor, QueryExpression};
33
use re_log_types::EntityPathFilter;
44
use re_query::QueryCache;
55

@@ -30,7 +30,7 @@ pub type RecordBatch = TransportChunk;
3030
// got to go. But for that we need to generally introduce `ChunkStoreHandle` and `QueryCacheHandle`
3131
// first, and this is not as straightforward as it seems.
3232
pub struct QueryEngine<'a> {
33-
pub store: &'a ChunkStore,
33+
pub store: ChunkStoreHandle,
3434
pub cache: &'a QueryCache,
3535
}
3636

@@ -45,7 +45,7 @@ impl QueryEngine<'_> {
4545
/// * second, the component columns in lexical order (`Color`, `Radius, ...`).
4646
#[inline]
4747
pub fn schema(&self) -> Vec<ColumnDescriptor> {
48-
self.store.schema()
48+
self.store.read().schema()
4949
}
5050

5151
/// Returns the filtered schema for the given [`QueryExpression`].
@@ -55,7 +55,7 @@ impl QueryEngine<'_> {
5555
/// * second, the component columns in lexical order (`Color`, `Radius, ...`).
5656
#[inline]
5757
pub fn schema_for_query(&self, query: &QueryExpression) -> Vec<ColumnDescriptor> {
58-
self.store.schema_for_query(query)
58+
self.store.read().schema_for_query(query)
5959
}
6060

6161
/// Starts a new query by instantiating a [`QueryHandle`].
@@ -71,6 +71,7 @@ impl QueryEngine<'_> {
7171
filter: &'a EntityPathFilter,
7272
) -> impl Iterator<Item = EntityPath> + 'a {
7373
self.store
74+
.read()
7475
.all_entities()
7576
.into_iter()
7677
.filter(|entity_path| filter.matches(entity_path))

crates/store/re_dataframe/src/query.rs

+4-7
Original file line numberDiff line numberDiff line change
@@ -162,11 +162,13 @@ impl QueryHandle<'_> {
162162
fn init_(&self) -> QueryHandleState {
163163
re_tracing::profile_scope!("init");
164164

165+
let store = self.engine.store.read();
166+
165167
// The timeline doesn't matter if we're running in static-only mode.
166168
let filtered_index = self.query.filtered_index.unwrap_or_default();
167169

168170
// 1. Compute the schema for the query.
169-
let view_contents = self.engine.store.schema_for_query(&self.query);
171+
let view_contents = store.schema_for_query(&self.query);
170172

171173
// 2. Compute the schema of the selected contents.
172174
//
@@ -327,7 +329,6 @@ impl QueryHandle<'_> {
327329
re_chunk::LatestAtQuery::new(Timeline::default(), TimeInt::STATIC);
328330

329331
let results = self.engine.cache.latest_at(
330-
self.engine.store,
331332
&query,
332333
&descr.entity_path,
333334
[descr.component_name],
@@ -586,10 +587,7 @@ impl QueryHandle<'_> {
586587
//
587588
// TODO(cmc): Going through the cache is very useful in a Viewer context, but
588589
// not so much in an SDK context. Make it configurable.
589-
let results =
590-
self.engine
591-
.cache
592-
.range(self.engine.store, query, entity_path, component_names);
590+
let results = self.engine.cache.range(query, entity_path, component_names);
593591

594592
debug_assert!(
595593
results.components.len() <= 1,
@@ -997,7 +995,6 @@ impl QueryHandle<'_> {
997995
re_chunk::LatestAtQuery::new(state.filtered_index, *cur_index_value);
998996

999997
let results = self.engine.cache.latest_at(
1000-
self.engine.store,
1001998
&query,
1002999
&descr.entity_path,
10031000
[descr.component_name],

crates/store/re_entity_db/src/entity_db.rs

+52-42
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use parking_lot::Mutex;
55

66
use re_chunk::{Chunk, ChunkResult, RowId, TimeInt};
77
use re_chunk_store::{
8-
ChunkStore, ChunkStoreChunkStats, ChunkStoreConfig, ChunkStoreEvent, ChunkStoreSubscriber,
9-
GarbageCollectionOptions, GarbageCollectionTarget,
8+
ChunkStore, ChunkStoreChunkStats, ChunkStoreConfig, ChunkStoreEvent, ChunkStoreHandle,
9+
ChunkStoreSubscriber, GarbageCollectionOptions, GarbageCollectionTarget,
1010
};
1111
use re_log_types::{
1212
ApplicationId, EntityPath, EntityPathHash, LogMsg, ResolvedTimeRange, ResolvedTimeRangeF,
@@ -62,7 +62,7 @@ pub struct EntityDb {
6262
tree: crate::EntityTree,
6363

6464
/// Stores all components for all entities for all timelines.
65-
data_store: ChunkStore,
65+
data_store: ChunkStoreHandle,
6666

6767
/// Query caches for the data in [`Self::data_store`].
6868
query_caches: re_query::QueryCache,
@@ -76,8 +76,8 @@ impl EntityDb {
7676
}
7777

7878
pub fn with_store_config(store_id: StoreId, store_config: ChunkStoreConfig) -> Self {
79-
let data_store = ChunkStore::new(store_id.clone(), store_config);
80-
let query_caches = re_query::QueryCache::new(&data_store);
79+
let data_store = ChunkStoreHandle::new(ChunkStore::new(store_id.clone(), store_config));
80+
let query_caches = re_query::QueryCache::new(data_store.clone());
8181

8282
Self {
8383
data_source: None,
@@ -99,11 +99,6 @@ impl EntityDb {
9999
&self.tree
100100
}
101101

102-
#[inline]
103-
pub fn data_store(&self) -> &ChunkStore {
104-
&self.data_store
105-
}
106-
107102
pub fn store_info_msg(&self) -> Option<&SetStoreInfo> {
108103
self.set_store_info.as_ref()
109104
}
@@ -123,7 +118,7 @@ impl EntityDb {
123118

124119
pub fn query_engine(&self) -> re_dataframe::QueryEngine<'_> {
125120
re_dataframe::QueryEngine {
126-
store: self.store(),
121+
store: self.store().clone(),
127122
cache: self.query_caches(),
128123
}
129124
}
@@ -141,7 +136,7 @@ impl EntityDb {
141136
component_names: impl IntoIterator<Item = re_types_core::ComponentName>,
142137
) -> re_query::LatestAtResults {
143138
self.query_caches()
144-
.latest_at(self.store(), query, entity_path, component_names)
139+
.latest_at(query, entity_path, component_names)
145140
}
146141

147142
/// Get the latest index and value for a given dense [`re_types_core::Component`].
@@ -160,7 +155,7 @@ impl EntityDb {
160155
) -> Option<((TimeInt, RowId), C)> {
161156
let results = self
162157
.query_caches()
163-
.latest_at(self.store(), query, entity_path, [C::name()]);
158+
.latest_at(query, entity_path, [C::name()]);
164159
results
165160
.component_mono()
166161
.map(|value| (results.index(), value))
@@ -182,7 +177,7 @@ impl EntityDb {
182177
) -> Option<((TimeInt, RowId), C)> {
183178
let results = self
184179
.query_caches()
185-
.latest_at(self.store(), query, entity_path, [C::name()]);
180+
.latest_at(query, entity_path, [C::name()]);
186181
results
187182
.component_mono_quiet()
188183
.map(|value| (results.index(), value))
@@ -208,7 +203,7 @@ impl EntityDb {
208203
}
209204

210205
#[inline]
211-
pub fn store(&self) -> &ChunkStore {
206+
pub fn store(&self) -> &ChunkStoreHandle {
212207
&self.data_store
213208
}
214209

@@ -218,8 +213,8 @@ impl EntityDb {
218213
}
219214

220215
#[inline]
221-
pub fn store_id(&self) -> &StoreId {
222-
self.data_store.id()
216+
pub fn store_id(&self) -> StoreId {
217+
self.data_store.read().id()
223218
}
224219

225220
/// If this entity db is the result of a clone, which store was it cloned from?
@@ -264,14 +259,14 @@ impl EntityDb {
264259

265260
#[inline]
266261
pub fn num_rows(&self) -> u64 {
267-
self.data_store.stats().total().num_rows
262+
self.data_store.read().stats().total().num_rows
268263
}
269264

270265
/// Return the current `ChunkStoreGeneration`. This can be used to determine whether the
271266
/// database has been modified since the last time it was queried.
272267
#[inline]
273268
pub fn generation(&self) -> re_chunk_store::ChunkStoreGeneration {
274-
self.data_store.generation()
269+
self.data_store.read().generation()
275270
}
276271

277272
#[inline]
@@ -324,7 +319,7 @@ impl EntityDb {
324319
pub fn add(&mut self, msg: &LogMsg) -> Result<Vec<ChunkStoreEvent>, Error> {
325320
re_tracing::profile_function!();
326321

327-
debug_assert_eq!(msg.store_id(), self.store_id());
322+
debug_assert_eq!(*msg.store_id(), self.store_id());
328323

329324
let store_events = match &msg {
330325
LogMsg::SetStoreInfo(msg) => {
@@ -350,7 +345,10 @@ impl EntityDb {
350345
}
351346

352347
pub fn add_chunk(&mut self, chunk: &Arc<Chunk>) -> Result<Vec<ChunkStoreEvent>, Error> {
353-
let store_events = self.data_store.insert_chunk(chunk)?;
348+
// NOTE: using `write_arc` so the borrow checker can make sense of the partial borrow of `self`.
349+
let mut store = self.data_store.write_arc();
350+
351+
let store_events = store.insert_chunk(chunk)?;
354352

355353
self.register_entity_path(chunk.entity_path());
356354

@@ -367,8 +365,7 @@ impl EntityDb {
367365

368366
// It is possible for writes to trigger deletions: specifically in the case of
369367
// overwritten static data leading to dangling chunks.
370-
self.tree
371-
.on_store_deletions(&self.data_store, &store_events);
368+
self.tree.on_store_deletions(&store, &store_events);
372369

373370
// We inform the stats last, since it measures e2e latency.
374371
self.stats.on_events(&store_events);
@@ -433,15 +430,18 @@ impl EntityDb {
433430
fn gc(&mut self, gc_options: &GarbageCollectionOptions) -> Vec<ChunkStoreEvent> {
434431
re_tracing::profile_function!();
435432

436-
let (store_events, stats_diff) = self.data_store.gc(gc_options);
433+
// NOTE: using `write_arc` so the borrow checker can make sense of the partial borrow of `self`.
434+
let mut store = self.data_store.write_arc();
435+
436+
let (store_events, stats_diff) = store.gc(gc_options);
437437

438438
re_log::trace!(
439439
num_row_ids_dropped = store_events.len(),
440440
size_bytes_dropped = re_format::format_bytes(stats_diff.total().total_size_bytes as _),
441441
"purged datastore"
442442
);
443443

444-
self.on_store_deletions(&store_events);
444+
self.on_store_deletions(&store, &store_events);
445445

446446
store_events
447447
}
@@ -454,8 +454,11 @@ impl EntityDb {
454454
timeline: &Timeline,
455455
drop_range: ResolvedTimeRange,
456456
) -> Vec<ChunkStoreEvent> {
457-
let store_events = self.data_store.drop_time_range(timeline, drop_range);
458-
self.on_store_deletions(&store_events);
457+
// NOTE: using `write_arc` so the borrow checker can make sense of the partial borrow of `self`.
458+
let mut store = self.data_store.write_arc();
459+
460+
let store_events = store.drop_time_range(timeline, drop_range);
461+
self.on_store_deletions(&store, &store_events);
459462
store_events
460463
}
461464

@@ -467,9 +470,12 @@ impl EntityDb {
467470
pub fn drop_entity_path(&mut self, entity_path: &EntityPath) {
468471
re_tracing::profile_function!();
469472

470-
let store_events = self.data_store.drop_entity_path(entity_path);
473+
// NOTE: using `write_arc` so the borrow checker can make sense of the partial borrow of `self`.
474+
let mut store = self.data_store.write_arc();
475+
476+
let store_events = store.drop_entity_path(entity_path);
471477

472-
self.on_store_deletions(&store_events);
478+
self.on_store_deletions(&store, &store_events);
473479
}
474480

475481
/// Unconditionally drops all the data for a given [`EntityPath`] and all its children.
@@ -489,13 +495,13 @@ impl EntityDb {
489495
}
490496
}
491497

492-
fn on_store_deletions(&mut self, store_events: &[ChunkStoreEvent]) {
498+
fn on_store_deletions(&mut self, store: &ChunkStore, store_events: &[ChunkStoreEvent]) {
493499
re_tracing::profile_function!();
494500

495501
self.times_per_timeline.on_events(store_events);
496502
self.query_caches.on_events(store_events);
497503
self.time_histogram_per_timeline.on_events(store_events);
498-
self.tree.on_store_deletions(&self.data_store, store_events);
504+
self.tree.on_store_deletions(store, store_events);
499505
}
500506

501507
/// Key used for sorting recordings in the UI.
@@ -514,6 +520,8 @@ impl EntityDb {
514520
) -> impl Iterator<Item = ChunkResult<LogMsg>> + '_ {
515521
re_tracing::profile_function!();
516522

523+
let store = self.data_store.read();
524+
517525
let set_store_info_msg = self
518526
.store_info_msg()
519527
.map(|msg| Ok(LogMsg::SetStoreInfo(msg.clone())));
@@ -526,8 +534,7 @@ impl EntityDb {
526534
)
527535
});
528536

529-
let mut chunks: Vec<&Arc<Chunk>> = self
530-
.store()
537+
let mut chunks: Vec<Arc<Chunk>> = store
531538
.iter_chunks()
532539
.filter(move |chunk| {
533540
let Some((timeline, time_range)) = time_filter else {
@@ -543,6 +550,7 @@ impl EntityDb {
543550
|| time_range.contains(time_column.time_range().max())
544551
})
545552
})
553+
.cloned() // refcount
546554
.collect();
547555

548556
// Try to roughly preserve the order of the chunks
@@ -606,7 +614,8 @@ impl EntityDb {
606614
});
607615
}
608616

609-
for chunk in self.store().iter_chunks() {
617+
let store = self.data_store.read();
618+
for chunk in store.iter_chunks() {
610619
new_db.add_chunk(&Arc::clone(chunk))?;
611620
}
612621

@@ -627,8 +636,9 @@ impl EntityDb {
627636
};
628637

629638
let mut stats = ChunkStoreChunkStats::default();
639+
let store = self.data_store.read();
630640
subtree.visit_children_recursively(|path| {
631-
stats += self.store().entity_stats_static(path);
641+
stats += store.entity_stats_static(path);
632642
});
633643

634644
stats
@@ -649,8 +659,9 @@ impl EntityDb {
649659
};
650660

651661
let mut stats = ChunkStoreChunkStats::default();
662+
let store = self.data_store.read();
652663
subtree.visit_children_recursively(|path| {
653-
stats += self.store().entity_stats_on_timeline(path, timeline);
664+
stats += store.entity_stats_on_timeline(path, timeline);
654665
});
655666

656667
stats
@@ -670,10 +681,9 @@ impl EntityDb {
670681
return false;
671682
};
672683

684+
let store = self.data_store.read();
673685
subtree
674-
.find_first_child_recursive(|path| {
675-
self.store().entity_has_data_on_timeline(timeline, path)
676-
})
686+
.find_first_child_recursive(|path| store.entity_has_data_on_timeline(timeline, path))
677687
.is_some()
678688
}
679689

@@ -691,10 +701,10 @@ impl EntityDb {
691701
return false;
692702
};
693703

704+
let store = self.data_store.read();
694705
subtree
695706
.find_first_child_recursive(|path| {
696-
self.store()
697-
.entity_has_temporal_data_on_timeline(timeline, path)
707+
store.entity_has_temporal_data_on_timeline(timeline, path)
698708
})
699709
.is_some()
700710
}
@@ -704,7 +714,7 @@ impl re_types_core::SizeBytes for EntityDb {
704714
#[inline]
705715
fn heap_size_bytes(&self) -> u64 {
706716
// TODO(emilk): size of entire EntityDb, including secondary indices etc
707-
self.data_store().stats().total().total_size_bytes
717+
self.data_store.read().stats().total().total_size_bytes
708718
}
709719
}
710720

0 commit comments

Comments
 (0)