diff --git a/crates/re_data_store/src/store_format.rs b/crates/re_data_store/src/store_format.rs index 33962271bcb3..29c9ad1b589d 100644 --- a/crates/re_data_store/src/store_format.rs +++ b/crates/re_data_store/src/store_format.rs @@ -1,4 +1,5 @@ use re_format::{format_bytes, format_number}; +use re_log_types::TimeInt; use re_types_core::SizeBytes as _; use crate::{DataStore, IndexedBucket, IndexedTable, PersistentIndexedTable}; @@ -129,7 +130,7 @@ impl std::fmt::Display for IndexedBucket { let time_range = { let time_range = &self.inner.read().time_range; - if time_range.min.as_i64() != i64::MAX && time_range.max.as_i64() != i64::MIN { + if time_range.min != TimeInt::MAX && time_range.max != TimeInt::MIN { format!( " - {}: {}", self.timeline.name(), diff --git a/crates/re_data_store/src/store_read.rs b/crates/re_data_store/src/store_read.rs index d48e7daf6d3a..3a598f41c937 100644 --- a/crates/re_data_store/src/store_read.rs +++ b/crates/re_data_store/src/store_read.rs @@ -65,7 +65,7 @@ impl std::fmt::Debug for RangeQuery { self.timeline.typ().format_utc(self.range.min), self.timeline.typ().format_utc(self.range.max), self.timeline.name(), - if self.range.min == TimeInt::MIN { + if self.range.min <= TimeInt::MIN { "including" } else { "excluding" @@ -494,7 +494,7 @@ impl DataStore { .flatten() .map(|(time, row_id, cells)| (Some(time), row_id, cells)); - if query.range.min == TimeInt::MIN { + if query.range.min <= TimeInt::MIN { let timeless = self .timeless_tables .get(&ent_path_hash) diff --git a/crates/re_data_store/src/store_sanity.rs b/crates/re_data_store/src/store_sanity.rs index 770bfe387050..028a2f215104 100644 --- a/crates/re_data_store/src/store_sanity.rs +++ b/crates/re_data_store/src/store_sanity.rs @@ -1,4 +1,6 @@ -use re_log_types::{DataCellColumn, NumInstances, RowId, TimeRange, VecDequeSortingExt as _}; +use re_log_types::{ + DataCellColumn, NumInstances, RowId, TimeInt, TimeRange, VecDequeSortingExt as _, +}; use re_types_core::{ComponentName, Loggable, SizeBytes as _}; use crate::{ @@ -179,8 +181,16 @@ impl IndexedBucket { let mut times = col_time.clone(); times.sort(); - let expected_min = times.front().copied().unwrap_or(i64::MAX).into(); - let expected_max = times.back().copied().unwrap_or(i64::MIN).into(); + let expected_min = times + .front() + .copied() + .unwrap_or(TimeInt::MAX.as_i64()) + .into(); + let expected_max = times + .back() + .copied() + .unwrap_or(TimeInt::MIN.as_i64()) + .into(); let expected_time_range = TimeRange::new(expected_min, expected_max); if expected_time_range != *time_range { diff --git a/crates/re_data_store/tests/correctness.rs b/crates/re_data_store/tests/correctness.rs index fcbed9f79bf5..70392f61eee2 100644 --- a/crates/re_data_store/tests/correctness.rs +++ b/crates/re_data_store/tests/correctness.rs @@ -451,7 +451,7 @@ fn range_join_across_single_row_impl(store: &mut DataStore) { let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); let query = re_data_store::RangeQuery::new( timeline_frame_nr, - re_data_store::TimeRange::new(i64::MIN.into(), i64::MAX.into()), + re_data_store::TimeRange::new(TimeInt::MIN, TimeInt::MAX), ); let components = [InstanceKey::name(), Position2D::name(), Color::name()]; let dfs = re_data_store::polars_util::range_components( diff --git a/crates/re_log_types/src/data_row.rs b/crates/re_log_types/src/data_row.rs index fffd27c7e21c..46501eea8039 100644 --- a/crates/re_log_types/src/data_row.rs +++ b/crates/re_log_types/src/data_row.rs @@ -155,6 +155,7 @@ impl std::fmt::Display for RowId { impl RowId { pub const ZERO: Self = Self(re_tuid::Tuid::ZERO); + pub const MAX: Self = Self(re_tuid::Tuid::MAX); /// Create a new unique [`RowId`] based on the current time. #[allow(clippy::new_without_default)] diff --git a/crates/re_log_types/src/time_point/time_int.rs b/crates/re_log_types/src/time_point/time_int.rs index a9e5c21129fc..fa633336323a 100644 --- a/crates/re_log_types/src/time_point/time_int.rs +++ b/crates/re_log_types/src/time_point/time_int.rs @@ -29,6 +29,7 @@ impl TimeInt { // a bit of leeway. pub const BEGINNING: Self = Self(i64::MIN / 2); + // TODO(#4832): `TimeInt::BEGINNING` vs. `TimeInt::MIN` vs. `Option`… pub const MIN: Self = Self(i64::MIN); pub const MAX: Self = Self(i64::MAX); diff --git a/crates/re_query_cache/src/cache.rs b/crates/re_query_cache/src/cache.rs index 14be0c39a40b..dda9c599afc9 100644 --- a/crates/re_query_cache/src/cache.rs +++ b/crates/re_query_cache/src/cache.rs @@ -1,5 +1,6 @@ use std::{ collections::{BTreeMap, VecDeque}, + ops::Range, sync::Arc, }; @@ -327,6 +328,8 @@ impl CachesPerArchetype { re_tracing::profile_function!(); + // TODO(cmc): range invalidation + for latest_at_cache in self.latest_at_per_archetype.read().values() { let mut latest_at_cache = latest_at_cache.write(); @@ -419,12 +422,6 @@ pub struct CacheBucket { } impl CacheBucket { - /// Iterate over the timestamps of the point-of-view components. - #[inline] - pub fn iter_data_times(&self) -> impl Iterator { - self.data_times.iter() - } - #[inline] pub fn time_range(&self) -> Option { let first_time = self.data_times.front().map(|(t, _)| *t)?; @@ -444,6 +441,25 @@ impl CacheBucket { self.data_times.binary_search(&(data_time, row_id)).is_ok() } + /// How many timestamps' worth of data is stored in this bucket? + #[inline] + pub fn num_entries(&self) -> usize { + self.data_times.len() + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.num_entries() == 0 + } + + // --- + + /// Iterate over the timestamps of the point-of-view components. + #[inline] + pub fn iter_data_times(&self) -> impl Iterator { + self.data_times.iter() + } + /// Iterate over the [`InstanceKey`] batches of the point-of-view components. #[inline] pub fn iter_pov_instance_keys(&self) -> impl Iterator { @@ -474,15 +490,73 @@ impl CacheBucket { Some(data.iter()) } - /// How many timestamps' worth of data is stored in this bucket? + // --- + + /// Returns the index range that corresponds to the specified `time_range`. + /// + /// Use the returned range with one of the range iteration methods: + /// - [`Self::range_data_times`] + /// - [`Self::range_pov_instance_keys`] + /// - [`Self::range_component`] + /// - [`Self::range_component_opt`] + /// + /// Make sure that the bucket hasn't been modified in-between! + /// + /// This is `O(2*log(n))`, so make sure to clone the returned range rather than calling this + /// multiple times. #[inline] - pub fn num_entries(&self) -> usize { - self.data_times.len() + pub fn entry_range(&self, time_range: TimeRange) -> Range { + let start_index = self + .data_times + .partition_point(|(data_time, _)| data_time < &time_range.min); + let end_index = self + .data_times + .partition_point(|(data_time, _)| data_time <= &time_range.max); + start_index..end_index } + /// Range over the timestamps of the point-of-view components. #[inline] - pub fn is_empty(&self) -> bool { - self.num_entries() == 0 + pub fn range_data_times( + &self, + entry_range: Range, + ) -> impl Iterator { + self.data_times.range(entry_range) + } + + /// Range over the [`InstanceKey`] batches of the point-of-view components. + #[inline] + pub fn range_pov_instance_keys( + &self, + entry_range: Range, + ) -> impl Iterator { + self.pov_instance_keys.range(entry_range) + } + + /// Range over the batches of the specified non-optional component. + #[inline] + pub fn range_component( + &self, + entry_range: Range, + ) -> Option> { + let data = self + .components + .get(&C::name()) + .and_then(|data| data.as_any().downcast_ref::>())?; + Some(data.range(entry_range)) + } + + /// Range over the batches of the specified optional component. + #[inline] + pub fn range_component_opt( + &self, + entry_range: Range, + ) -> Option]>> { + let data = self + .components + .get(&C::name()) + .and_then(|data| data.as_any().downcast_ref::>>())?; + Some(data.range(entry_range)) } } diff --git a/crates/re_query_cache/src/cache_stats.rs b/crates/re_query_cache/src/cache_stats.rs index ecf774aa6fb9..76d2e6a2d5f7 100644 --- a/crates/re_query_cache/src/cache_stats.rs +++ b/crates/re_query_cache/src/cache_stats.rs @@ -3,7 +3,7 @@ use std::collections::BTreeMap; use re_log_types::{EntityPath, TimeRange, Timeline}; use re_types_core::ComponentName; -use crate::{Caches, LatestAtCache, RangeCache}; +use crate::{cache::CacheBucket, Caches, LatestAtCache, RangeCache}; // --- @@ -71,6 +71,18 @@ impl Caches { pub fn stats(detailed_stats: bool) -> CachesStats { re_tracing::profile_function!(); + fn upsert_bucket_stats( + per_component: &mut BTreeMap, + bucket: &CacheBucket, + ) { + for (component_name, data) in &bucket.components { + let stats: &mut CachedComponentStats = + per_component.entry(*component_name).or_default(); + stats.total_rows += data.dyn_num_entries() as u64; + stats.total_instances += data.dyn_num_values() as u64; + } + } + Self::with(|caches| { let latest_at = caches .0 @@ -98,22 +110,12 @@ impl Caches { if let Some(per_component) = per_component.as_mut() { re_tracing::profile_scope!("detailed"); - for bucket in per_data_time.values() { - for (component_name, data) in &bucket.read().components { - let stats: &mut CachedComponentStats = - per_component.entry(*component_name).or_default(); - stats.total_rows += data.dyn_num_entries() as u64; - stats.total_instances += data.dyn_num_values() as u64; - } + if let Some(bucket) = &timeless { + upsert_bucket_stats(per_component, bucket); } - if let Some(bucket) = &timeless { - for (component_name, data) in &bucket.components { - let stats: &mut CachedComponentStats = - per_component.entry(*component_name).or_default(); - stats.total_rows += data.dyn_num_entries() as u64; - stats.total_instances += data.dyn_num_values() as u64; - } + for bucket in per_data_time.values() { + upsert_bucket_stats(per_component, &bucket.read()); } } } @@ -140,27 +142,24 @@ impl Caches { .values() .map(|range_cache| { let RangeCache { - bucket, + per_data_time, + timeless, total_size_bytes, } = &*range_cache.read(); - let total_rows = bucket.data_times.len() as u64; + let total_rows = per_data_time.data_times.len() as u64; let mut per_component = detailed_stats.then(BTreeMap::default); if let Some(per_component) = per_component.as_mut() { re_tracing::profile_scope!("detailed"); - for (component_name, data) in &bucket.components { - let stats: &mut CachedComponentStats = - per_component.entry(*component_name).or_default(); - stats.total_rows += data.dyn_num_entries() as u64; - stats.total_instances += data.dyn_num_values() as u64; - } + upsert_bucket_stats(per_component, timeless); + upsert_bucket_stats(per_component, per_data_time); } ( key.timeline, - bucket.time_range().unwrap_or(TimeRange::EMPTY), + per_data_time.time_range().unwrap_or(TimeRange::EMPTY), CachedEntityStats { total_size_bytes: *total_size_bytes, total_rows, diff --git a/crates/re_query_cache/src/latest_at.rs b/crates/re_query_cache/src/latest_at.rs index 491653881106..493c8286c018 100644 --- a/crates/re_query_cache/src/latest_at.rs +++ b/crates/re_query_cache/src/latest_at.rs @@ -90,7 +90,6 @@ macro_rules! impl_query_archetype_latest_at { f(data); } - Ok(()) }; diff --git a/crates/re_query_cache/src/range.rs b/crates/re_query_cache/src/range.rs index fce4e09d144a..29a772c494c2 100644 --- a/crates/re_query_cache/src/range.rs +++ b/crates/re_query_cache/src/range.rs @@ -2,7 +2,7 @@ use paste::paste; use seq_macro::seq; use re_data_store::{DataStore, RangeQuery, TimeInt}; -use re_log_types::{EntityPath, RowId}; +use re_log_types::{EntityPath, RowId, TimeRange}; use re_types_core::{components::InstanceKey, Archetype, Component}; use crate::{CacheBucket, Caches, MaybeCachedComponentData}; @@ -12,8 +12,13 @@ use crate::{CacheBucket, Caches, MaybeCachedComponentData}; /// Caches the results of `Range` queries. #[derive(Default)] pub struct RangeCache { + /// All timeful data, organized by _data_ time. + // // TODO(cmc): bucketize - pub bucket: CacheBucket, + pub per_data_time: CacheBucket, + + /// All timeless data. + pub timeless: CacheBucket, /// Total size of the data stored in this cache in bytes. pub total_size_bytes: u64, @@ -35,11 +40,11 @@ impl RangeCache { pub fn compute_front_query(&self, query: &RangeQuery) -> Option { let mut reduced_query = query.clone(); - if self.bucket.is_empty() { + if self.per_data_time.is_empty() { return Some(reduced_query); } - if let Some(bucket_time_range) = self.bucket.time_range() { + if let Some(bucket_time_range) = self.per_data_time.time_range() { reduced_query.range.max = TimeInt::min( reduced_query.range.max, bucket_time_range.min.as_i64().saturating_sub(1).into(), @@ -61,7 +66,7 @@ impl RangeCache { pub fn compute_back_query(&self, query: &RangeQuery) -> Option { let mut reduced_query = query.clone(); - if let Some(bucket_time_range) = self.bucket.time_range() { + if let Some(bucket_time_range) = self.per_data_time.time_range() { reduced_query.range.min = TimeInt::max( reduced_query.range.min, bucket_time_range.max.as_i64().saturating_add(1).into(), @@ -104,19 +109,22 @@ macro_rules! impl_query_archetype_range { ), ), { - let mut iter_results = |bucket: &crate::CacheBucket| -> crate::Result<()> { + let mut range_results = + |timeless: bool, bucket: &crate::CacheBucket, time_range: TimeRange| -> crate::Result<()> { re_tracing::profile_scope!("iter"); + let entry_range = bucket.entry_range(time_range); + let it = itertools::izip!( - bucket.iter_data_times(), - bucket.iter_pov_instance_keys(), - $(bucket.iter_component::<$pov>() + bucket.range_data_times(entry_range.clone()), + bucket.range_pov_instance_keys(entry_range.clone()), + $(bucket.range_component::<$pov>(entry_range.clone()) .ok_or_else(|| re_query::ComponentNotFoundError(<$pov>::name()))?,)+ - $(bucket.iter_component_opt::<$comp>() + $(bucket.range_component_opt::<$comp>(entry_range.clone()) .ok_or_else(|| re_query::ComponentNotFoundError(<$comp>::name()))?,)* ).map(|((time, row_id), instance_keys, $($pov,)+ $($comp,)*)| { ( - (Some(*time), *row_id), // TODO(cmc): timeless + ((!timeless).then_some(*time), *row_id), MaybeCachedComponentData::Cached(instance_keys), $(MaybeCachedComponentData::Cached($pov),)+ $(MaybeCachedComponentData::Cached($comp),)* @@ -147,7 +155,7 @@ macro_rules! impl_query_archetype_range { let mut added_size_bytes = 0u64; for arch_view in arch_views { - let data_time = arch_view.data_time().unwrap_or(TimeInt::MIN); // TODO(cmc): timeless + let data_time = arch_view.data_time().unwrap_or(TimeInt::MIN); if bucket.contains_data_row(data_time, arch_view.primary_row_id()) { continue; @@ -171,16 +179,43 @@ macro_rules! impl_query_archetype_range { let mut range_callback = |query: &RangeQuery, range_cache: &mut crate::RangeCache| { re_tracing::profile_scope!("range", format!("{query:?}")); - for reduced_query in range_cache.compute_queries(query) { + // NOTE: Same logic as what the store does. + if query.range.min <= TimeInt::MIN { + let mut reduced_query = query.clone(); + // This is the reduced query corresponding to the timeless part of the data. + // It is inclusive and so it will yield `MIN..=MIN` = `[MIN]`. + reduced_query.range.max = TimeInt::MIN; // inclusive + // NOTE: `+ 2` because we always grab the indicator component as well as the // instance keys. let arch_views = ::re_query::range_archetype::(store, &reduced_query, entity_path); range_cache.total_size_bytes += - upsert_results::(arch_views, &mut range_cache.bucket)?; + upsert_results::(arch_views, &mut range_cache.timeless)?; + + if !range_cache.timeless.is_empty() { + range_results(true, &range_cache.timeless, reduced_query.range)?; + } } - iter_results(&range_cache.bucket) + + let mut query = query.clone(); + query.range.min = TimeInt::max((TimeInt::MIN.as_i64() + 1).into(), query.range.min); + + for reduced_query in range_cache.compute_queries(&query) { + // NOTE: `+ 2` because we always grab the indicator component as well as the + // instance keys. + let arch_views = + ::re_query::range_archetype::(store, &reduced_query, entity_path); + range_cache.total_size_bytes += + upsert_results::(arch_views, &mut range_cache.per_data_time)?; + } + + if !range_cache.per_data_time.is_empty() { + range_results(false, &range_cache.per_data_time, query.range)?; + } + + Ok(()) }; diff --git a/crates/re_query_cache/tests/range.rs b/crates/re_query_cache/tests/range.rs index a0312ba1127a..af51565d2943 100644 --- a/crates/re_query_cache/tests/range.rs +++ b/crates/re_query_cache/tests/range.rs @@ -97,8 +97,6 @@ fn simple_range() { } #[test] -// TODO(cmc): timeless support -#[should_panic(expected = "assertion failed: `(left == right)`")] fn timeless_range() { let mut store = DataStore::new( re_log_types::StoreId::random(re_log_types::StoreKind::Recording), diff --git a/crates/re_space_view_time_series/src/visualizer_system.rs b/crates/re_space_view_time_series/src/visualizer_system.rs index 6a7f4126d775..21e0616c55c3 100644 --- a/crates/re_space_view_time_series/src/visualizer_system.rs +++ b/crates/re_space_view_time_series/src/visualizer_system.rs @@ -1,4 +1,5 @@ use re_data_store::TimeRange; +use re_log_types::TimeInt; use re_query_cache::QueryError; use re_types::{ archetypes::TimeSeriesScalar, @@ -152,7 +153,7 @@ impl TimeSeriesSystem { visible_history.to(query.latest_at), ) } else { - (i64::MIN.into(), i64::MAX.into()) + (TimeInt::MIN, TimeInt::MAX) }; let query =