Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage handles 1: Introduce ChunkStoreHandle #7917

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions bacon.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ command = [
need_stdout = false
watch = ["tests", "benches", "examples"]

[jobs.libs]
command = ["cargo", "clippy", "--lib", "--all-features", "--color=always"]
need_stdout = false
watch = ["tests", "benches", "examples"]

[jobs.wasm]
command = [
"cargo",
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_chunk_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ indent.workspace = true
itertools.workspace = true
nohash-hasher.workspace = true
once_cell.workspace = true
parking_lot.workspace = true
parking_lot = { workspace = true, features = ["arc_lock"] }
thiserror.workspace = true
web-time.workspace = true

Expand Down
4 changes: 3 additions & 1 deletion crates/store/re_chunk_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ pub use self::dataframe::{
pub use self::events::{ChunkStoreDiff, ChunkStoreDiffKind, ChunkStoreEvent};
pub use self::gc::{GarbageCollectionOptions, GarbageCollectionTarget};
pub use self::stats::{ChunkStoreChunkStats, ChunkStoreStats};
pub use self::store::{ChunkStore, ChunkStoreConfig, ChunkStoreGeneration, ColumnMetadata};
pub use self::store::{
ChunkStore, ChunkStoreConfig, ChunkStoreGeneration, ChunkStoreHandle, ColumnMetadata,
};
pub use self::subscribers::{ChunkStoreSubscriber, ChunkStoreSubscriberHandle};

pub(crate) use self::store::ColumnMetadataState;
Expand Down
50 changes: 48 additions & 2 deletions crates/store/re_chunk_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,52 @@ pub struct ChunkStoreGeneration {
gc_id: u64,
}

/// A ref-counted, inner-mutable handle to a [`ChunkStore`].
///
/// Cheap to clone.
///
/// It is possible to grab the lock behind this handle while _maintaining a static lifetime_, see:
/// * [`ChunkStoreHandle::read_arc`]
/// * [`ChunkStoreHandle::write_arc`]
#[derive(Clone)]
pub struct ChunkStoreHandle(Arc<parking_lot::RwLock<ChunkStore>>);

impl ChunkStoreHandle {
#[inline]
pub fn new(store: ChunkStore) -> Self {
Self(Arc::new(parking_lot::RwLock::new(store)))
}

#[inline]
pub fn into_inner(self) -> Arc<parking_lot::RwLock<ChunkStore>> {
self.0
}
}

impl ChunkStoreHandle {
#[inline]
pub fn read(&self) -> parking_lot::RwLockReadGuard<'_, ChunkStore> {
self.0.read()
}

#[inline]
pub fn write(&self) -> parking_lot::RwLockWriteGuard<'_, ChunkStore> {
self.0.write()
}

#[inline]
pub fn read_arc(&self) -> parking_lot::ArcRwLockReadGuard<parking_lot::RawRwLock, ChunkStore> {
parking_lot::RwLock::read_arc(&self.0)
}

#[inline]
pub fn write_arc(
&self,
) -> parking_lot::ArcRwLockWriteGuard<parking_lot::RawRwLock, ChunkStore> {
parking_lot::RwLock::write_arc(&self.0)
}
}

/// A complete chunk store: covers all timelines, all entities, everything.
///
/// The chunk store _always_ works at the chunk level, whether it is for write & read queries or
Expand Down Expand Up @@ -497,8 +543,8 @@ impl ChunkStore {
}

#[inline]
pub fn id(&self) -> &StoreId {
&self.id
pub fn id(&self) -> StoreId {
self.id.clone()
}

#[inline]
Expand Down
9 changes: 5 additions & 4 deletions crates/store/re_dataframe/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use re_chunk::{EntityPath, TransportChunk};
use re_chunk_store::{ChunkStore, ColumnDescriptor, QueryExpression};
use re_chunk_store::{ChunkStoreHandle, ColumnDescriptor, QueryExpression};
use re_log_types::EntityPathFilter;
use re_query::QueryCache;

Expand Down Expand Up @@ -30,7 +30,7 @@ pub type RecordBatch = TransportChunk;
// got to go. But for that we need to generally introduce `ChunkStoreHandle` and `QueryCacheHandle`
// first, and this is not as straightforward as it seems.
pub struct QueryEngine<'a> {
pub store: &'a ChunkStore,
pub store: ChunkStoreHandle,
pub cache: &'a QueryCache,
}

Expand All @@ -45,7 +45,7 @@ impl QueryEngine<'_> {
/// * second, the component columns in lexical order (`Color`, `Radius, ...`).
#[inline]
pub fn schema(&self) -> Vec<ColumnDescriptor> {
self.store.schema()
self.store.read().schema()
}

/// Returns the filtered schema for the given [`QueryExpression`].
Expand All @@ -55,7 +55,7 @@ impl QueryEngine<'_> {
/// * second, the component columns in lexical order (`Color`, `Radius, ...`).
#[inline]
pub fn schema_for_query(&self, query: &QueryExpression) -> Vec<ColumnDescriptor> {
self.store.schema_for_query(query)
self.store.read().schema_for_query(query)
}

/// Starts a new query by instantiating a [`QueryHandle`].
Expand All @@ -71,6 +71,7 @@ impl QueryEngine<'_> {
filter: &'a EntityPathFilter,
) -> impl Iterator<Item = EntityPath> + 'a {
self.store
.read()
.all_entities()
.into_iter()
.filter(|entity_path| filter.matches(entity_path))
Expand Down
11 changes: 4 additions & 7 deletions crates/store/re_dataframe/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,13 @@ impl QueryHandle<'_> {
fn init_(&self) -> QueryHandleState {
re_tracing::profile_scope!("init");

let store = self.engine.store.read();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note about coarseness here


// The timeline doesn't matter if we're running in static-only mode.
let filtered_index = self.query.filtered_index.unwrap_or_default();

// 1. Compute the schema for the query.
let view_contents = self.engine.store.schema_for_query(&self.query);
let view_contents = store.schema_for_query(&self.query);

// 2. Compute the schema of the selected contents.
//
Expand Down Expand Up @@ -327,7 +329,6 @@ impl QueryHandle<'_> {
re_chunk::LatestAtQuery::new(Timeline::default(), TimeInt::STATIC);

let results = self.engine.cache.latest_at(
self.engine.store,
&query,
&descr.entity_path,
[descr.component_name],
Expand Down Expand Up @@ -586,10 +587,7 @@ impl QueryHandle<'_> {
//
// TODO(cmc): Going through the cache is very useful in a Viewer context, but
// not so much in an SDK context. Make it configurable.
let results =
self.engine
.cache
.range(self.engine.store, query, entity_path, component_names);
let results = self.engine.cache.range(query, entity_path, component_names);

debug_assert!(
results.components.len() <= 1,
Expand Down Expand Up @@ -997,7 +995,6 @@ impl QueryHandle<'_> {
re_chunk::LatestAtQuery::new(state.filtered_index, *cur_index_value);

let results = self.engine.cache.latest_at(
self.engine.store,
&query,
&descr.entity_path,
[descr.component_name],
Expand Down
Loading
Loading