Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Primary caching 11: cache stats and integration with memory panel #4773

Merged
merged 10 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions crates/re_log_types/src/time_point/time_int.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ pub struct TimeInt(pub(crate) i64);

impl nohash_hasher::IsEnabled for TimeInt {}

impl re_types_core::SizeBytes for TimeInt {
#[inline]
fn heap_size_bytes(&self) -> u64 {
0
}
}

impl TimeInt {
/// The beginning of time.
///
Expand Down
45 changes: 32 additions & 13 deletions crates/re_memory/src/memory_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ pub struct MemoryHistory {
/// Bytes used by the datastore according to its own accounting.
pub counted_store: History<i64>,

/// Bytes used by the primary caches according to their own accounting.
pub counted_primary_caches: History<i64>,

/// Bytes used by the blueprint store according to its own accounting.
pub counted_blueprint: History<i64>,
}
Expand All @@ -38,6 +41,7 @@ impl Default for MemoryHistory {
counted: History::new(0..max_elems, max_seconds),
counted_gpu: History::new(0..max_elems, max_seconds),
counted_store: History::new(0..max_elems, max_seconds),
counted_primary_caches: History::new(0..max_elems, max_seconds),
counted_blueprint: History::new(0..max_elems, max_seconds),
}
}
Expand All @@ -50,39 +54,54 @@ impl MemoryHistory {
counted,
counted_gpu,
counted_store,
counted_primary_caches,
counted_blueprint,
} = self;
resident.is_empty()
&& counted.is_empty()
&& counted_gpu.is_empty()
&& counted_store.is_empty()
&& counted_primary_caches.is_empty()
&& counted_blueprint.is_empty()
}

/// Add data to history
pub fn capture(
&mut self,
counted_gpu: Option<i64>,
counted_store: Option<i64>,
counted_blueprint: Option<i64>,
updated_counted_gpu: Option<i64>,
updated_counted_store: Option<i64>,
updated_counted_primary_caches: Option<i64>,
updated_counted_blueprint: Option<i64>,
) {
let mem_use = crate::MemoryUse::capture();
let now = crate::util::sec_since_start();

if let Some(resident) = mem_use.resident {
self.resident.add(now, resident);
let Self {
resident,
counted,
counted_gpu,
counted_store,
counted_primary_caches,
counted_blueprint,
} = self;

if let Some(updated_resident) = mem_use.resident {
resident.add(now, updated_resident);
}
if let Some(updated_counted) = mem_use.counted {
counted.add(now, updated_counted);
}
if let Some(counted) = mem_use.counted {
self.counted.add(now, counted);
if let Some(updated_counted_gpu) = updated_counted_gpu {
counted_gpu.add(now, updated_counted_gpu);
}
if let Some(counted_gpu) = counted_gpu {
self.counted_gpu.add(now, counted_gpu);
if let Some(updated_counted_store) = updated_counted_store {
counted_store.add(now, updated_counted_store);
}
if let Some(counted_store) = counted_store {
self.counted_store.add(now, counted_store);
if let Some(updated_counted_primary_caches) = updated_counted_primary_caches {
counted_primary_caches.add(now, updated_counted_primary_caches);
}
if let Some(counted_blueprint) = counted_blueprint {
self.counted_blueprint.add(now, counted_blueprint);
if let Some(updated_counted_blueprint) = updated_counted_blueprint {
counted_blueprint.add(now, updated_counted_blueprint);
}
}
}
130 changes: 98 additions & 32 deletions crates/re_query_cache/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use re_data_store::{
};
use re_log_types::{EntityPath, RowId, StoreId, TimeInt, Timeline};
use re_query::ArchetypeView;
use re_types_core::{components::InstanceKey, Archetype, ArchetypeName, Component, ComponentName};
use re_types_core::{
components::InstanceKey, Archetype, ArchetypeName, Component, ComponentName, SizeBytes as _,
};

use crate::{ErasedFlatVecDeque, FlatVecDeque};

Expand Down Expand Up @@ -49,10 +51,8 @@ static CACHES: Lazy<StoreSubscriberHandle> =
Lazy::new(|| re_data_store::DataStore::register_subscriber(Box::<Caches>::default()));

/// Maintains the top-level cache mappings.
//
// TODO(#4730): SizeBytes support + size stats + mem panel
#[derive(Default)]
pub struct Caches(RwLock<HashMap<CacheKey, CachesPerArchetype>>);
pub struct Caches(pub(crate) RwLock<HashMap<CacheKey, CachesPerArchetype>>);

#[derive(Default)]
pub struct CachesPerArchetype {
Expand All @@ -65,7 +65,7 @@ pub struct CachesPerArchetype {
//
// TODO(cmc): At some point we should probably just store the PoV and optional components rather
// than an `ArchetypeName`: the query system doesn't care about archetypes.
latest_at_per_archetype: RwLock<HashMap<ArchetypeName, Arc<RwLock<LatestAtCache>>>>,
pub(crate) latest_at_per_archetype: RwLock<HashMap<ArchetypeName, Arc<RwLock<LatestAtCache>>>>,

/// Everything greater than or equal to this timestamp has been asynchronously invalidated.
///
Expand Down Expand Up @@ -116,8 +116,8 @@ impl Caches {
re_data_store::DataStore::with_subscriber_once(*CACHES, move |caches: &Caches| {
let mut caches = caches.0.write();

let caches_per_archetype = caches.entry(key).or_default();
caches_per_archetype.handle_pending_invalidation();
let caches_per_archetype = caches.entry(key.clone()).or_default();
caches_per_archetype.handle_pending_invalidation(&key);

let mut latest_at_per_archetype =
caches_per_archetype.latest_at_per_archetype.write();
Expand All @@ -133,6 +133,12 @@ impl Caches {
let mut cache = cache.write();
f(&mut cache)
}

#[inline]
pub(crate) fn with<F: FnMut(&Caches) -> R, R>(f: F) -> R {
// NOTE: downcasting cannot fail, this is our own private handle.
re_data_store::DataStore::with_subscriber(*CACHES, f).unwrap()
}
}

/// Uniquely identifies cached query results in the [`Caches`].
Expand Down Expand Up @@ -264,7 +270,7 @@ impl CachesPerArchetype {
///
/// Invalidation is deferred to query time because it is far more efficient that way: the frame
/// time effectively behaves as a natural micro-batching mechanism.
fn handle_pending_invalidation(&mut self) {
fn handle_pending_invalidation(&mut self, key: &CacheKey) {
let pending_timeless_invalidation = self.pending_timeless_invalidation;
let pending_timeful_invalidation = self.pending_timeful_invalidation.is_some();

Expand All @@ -281,15 +287,39 @@ impl CachesPerArchetype {
latest_at_cache.timeless = None;
}

let mut removed_bytes = 0u64;
if let Some(min_time) = self.pending_timeful_invalidation {
latest_at_cache
.per_query_time
.retain(|&query_time, _| query_time < min_time);

latest_at_cache
.per_data_time
.retain(|&data_time, _| data_time < min_time);
latest_at_cache.per_data_time.retain(|&data_time, bucket| {
if data_time < min_time {
return true;
}

// Only if that bucket is about to be dropped.
if Arc::strong_count(bucket) == 1 {
removed_bytes += bucket.read().total_size_bytes;
}

false
});
}

latest_at_cache.total_size_bytes = latest_at_cache
.total_size_bytes
.checked_sub(removed_bytes)
.unwrap_or_else(|| {
re_log::debug!(
store_id = %key.store_id,
entity_path = %key.entity_path,
current = latest_at_cache.total_size_bytes,
removed = removed_bytes,
"book keeping underflowed"
);
u64::MIN
});
}

self.pending_timeful_invalidation = None;
Expand Down Expand Up @@ -328,9 +358,14 @@ pub struct CacheBucket {
// TODO(#4733): Don't denormalize auto-generated instance keys.
// TODO(#4734): Don't denormalize splatted values.
pub(crate) components: BTreeMap<ComponentName, Box<dyn ErasedFlatVecDeque + Send + Sync>>,

/// The total size in bytes stored in this bucket.
///
/// Only used so we can decrement the global cache size when the last reference to a bucket
/// gets dropped.
pub(crate) total_size_bytes: u64,
//
// TODO(cmc): secondary cache
// TODO(#4730): size stats: this requires codegen'ing SizeBytes for all components!
}

impl CacheBucket {
Expand Down Expand Up @@ -387,12 +422,14 @@ macro_rules! impl_insert {
#[doc = "Inserts the contents of the given [`ArchetypeView`], which are made of the specified"]
#[doc = "`" $N "` point-of-view components and `" $M "` optional components, to the cache."]
#[doc = ""]
#[doc = "Returns the size in bytes of the data that was cached."]
#[doc = ""]
#[doc = "`query_time` must be the time of query, _not_ of the resulting data."]
pub fn [<insert_pov$N _comp$M>]<A, $($pov,)+ $($comp),*>(
&mut self,
query_time: TimeInt,
arch_view: &ArchetypeView<A>,
) -> ::re_query::Result<()>
) -> ::re_query::Result<u64>
where
A: Archetype,
$($pov: Component + Send + Sync + 'static,)+
Expand All @@ -401,21 +438,31 @@ macro_rules! impl_insert {
// NOTE: not `profile_function!` because we want them merged together.
re_tracing::profile_scope!("CacheBucket::insert", format!("arch={} pov={} comp={}", A::name(), $N, $M));

let Self {
data_times,
pov_instance_keys,
components: _,
} = self;

let pov_row_id = arch_view.primary_row_id();
let index = data_times.partition_point(|t| t < &(query_time, pov_row_id));
let index = self.data_times.partition_point(|t| t < &(query_time, pov_row_id));

let mut added_size_bytes = 0u64;

self.data_times.insert(index, (query_time, pov_row_id));
added_size_bytes += (query_time, pov_row_id).total_size_bytes();

{
// The `FlatVecDeque` will have to collect the data one way or another: do it ourselves
// instead, that way we can efficiently computes its size while we're at it.
let added: FlatVecDeque<InstanceKey> = arch_view
.iter_instance_keys()
.collect::<VecDeque<InstanceKey>>()
.into();
added_size_bytes += added.total_size_bytes();
self.pov_instance_keys.insert_deque(index, added);
}

$(added_size_bytes += self.insert_component::<A, $pov>(index, arch_view)?;)+
$(added_size_bytes += self.insert_component_opt::<A, $comp>(index, arch_view)?;)*

data_times.insert(index, (query_time, pov_row_id));
pov_instance_keys.insert(index, arch_view.iter_instance_keys());
$(self.insert_component::<A, $pov>(index, arch_view)?;)+
$(self.insert_component_opt::<A, $comp>(index, arch_view)?;)*
self.total_size_bytes += added_size_bytes;

Ok(())
Ok(added_size_bytes)
} }
};

Expand All @@ -436,7 +483,7 @@ impl CacheBucket {
&mut self,
query_time: TimeInt,
arch_view: &ArchetypeView<A>,
) -> ::re_query::Result<()>
) -> ::re_query::Result<u64>
where
A: Archetype,
R1: Component + Send + Sync + 'static,
Expand All @@ -453,42 +500,58 @@ impl CacheBucket {
&mut self,
at: usize,
arch_view: &ArchetypeView<A>,
) -> re_query::Result<()> {
) -> re_query::Result<u64> {
re_tracing::profile_function!(C::name());

let data = self
.components
.entry(C::name())
.or_insert_with(|| Box::new(FlatVecDeque::<C>::new()));

// The `FlatVecDeque` will have to collect the data one way or another: do it ourselves
// instead, that way we can efficiently computes its size while we're at it.
let added: FlatVecDeque<C> = arch_view
.iter_required_component::<C>()?
.collect::<VecDeque<C>>()
.into();
let added_size_bytes = added.total_size_bytes();

// NOTE: downcast cannot fail, we create it just above.
let data = data.as_any_mut().downcast_mut::<FlatVecDeque<C>>().unwrap();
data.insert(at, arch_view.iter_required_component::<C>()?);
data.insert_deque(at, added);

Ok(())
Ok(added_size_bytes)
}

#[inline]
fn insert_component_opt<A: Archetype, C: Component + Send + Sync + 'static>(
&mut self,
at: usize,
arch_view: &ArchetypeView<A>,
) -> re_query::Result<()> {
) -> re_query::Result<u64> {
re_tracing::profile_function!(C::name());

let data = self
.components
.entry(C::name())
.or_insert_with(|| Box::new(FlatVecDeque::<Option<C>>::new()));

// The `FlatVecDeque` will have to collect the data one way or another: do it ourselves
// instead, that way we can efficiently computes its size while we're at it.
let added: FlatVecDeque<Option<C>> = arch_view
.iter_optional_component::<C>()?
.collect::<VecDeque<Option<C>>>()
.into();
let added_size_bytes = added.total_size_bytes();

// NOTE: downcast cannot fail, we create it just above.
let data = data
.as_any_mut()
.downcast_mut::<FlatVecDeque<Option<C>>>()
.unwrap();
data.insert(at, arch_view.iter_optional_component::<C>()?);
data.insert_deque(at, added);

Ok(())
Ok(added_size_bytes)
}
}

Expand Down Expand Up @@ -526,4 +589,7 @@ pub struct LatestAtCache {
// NOTE: Lives separately so we don't pay the extra `Option` cost in the much more common
// timeful case.
pub timeless: Option<CacheBucket>,

/// Total size of the data stored in this cache in bytes.
pub total_size_bytes: u64,
}
Loading