Skip to content

Commit

Permalink
persist: introduce a very small in-mem blob cache
Browse files Browse the repository at this point in the history
A one-time (skunkworks) experiment showed that showed an environment
running our demo "auction" source + mv got 90%+ cache hits with a 1 MiB
cache. This doesn't scale up to prod data sizes and doesn't help with
multi-process replicas, but the memory usage seems unobjectionable
enough to have it for the cases that it does help.

Possibly, a decent chunk of why this is true is pubsub. With the low
pubsub latencies, we might write some blob to s3, then within
milliseconds notify everyone in-process interested in that blob, waking
them up and fetching it. This means even a very small cache is useful
because things stay in it just long enough for them to get fetched by
everyone that immediately needs them. 1 MiB is enough to fit things like
state rollups, remap shard writes, and likely many MVs (probably less so
for sources, but atm those still happen in another cluster).

Touches #19225
  • Loading branch information
danhhz committed May 31, 2023
1 parent 9ff51cb commit b55ac18
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/persist-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-
futures = "0.3.25"
futures-util = "0.3"
h2 = "0.3.13"
moka = { version = "0.9.6", default-features = false, features = ["sync"] }
mz-build-info = { path = "../build-info" }
mz-ore = { path = "../ore", features = ["bytes_", "test", "tracing_"] }
mz-persist = { path = "../persist" }
Expand Down
4 changes: 4 additions & 0 deletions src/persist-client/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use tracing::{debug, instrument};

use crate::async_runtime::CpuHeavyRuntime;
use crate::error::{CodecConcreteType, CodecMismatch};
use crate::internal::cache::BlobMemCache;
use crate::internal::machine::retry_external;
use crate::internal::metrics::{LockMetrics, Metrics, MetricsBlob, MetricsConsensus, ShardMetrics};
use crate::internal::state::TypedState;
Expand Down Expand Up @@ -199,6 +200,9 @@ impl PersistClientCache {
Self::PROMETHEUS_SCRAPE_INTERVAL,
)
.await;
// This is intentionally "outside" (wrapping) MetricsBlob so
// that we don't include cached responses in blob metrics.
let blob = BlobMemCache::new(&self.cfg, Arc::clone(&self.metrics), blob);
Arc::clone(&x.insert((RttLatencyTask(task), blob)).1)
}
};
Expand Down
9 changes: 9 additions & 0 deletions src/persist-client/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ pub struct PersistConfig {
pub pubsub_server_connection_channel_size: usize,
/// Size of channel used by the state cache to broadcast shard state references.
pub pubsub_state_cache_shard_ref_channel_size: usize,
/// Capacity of in-mem blob cache in bytes.
pub blob_cache_mem_limit_bytes: usize,
}

impl PersistConfig {
Expand Down Expand Up @@ -172,6 +174,13 @@ impl PersistConfig {
pubsub_client_receiver_channel_size: 25,
pubsub_server_connection_channel_size: 25,
pubsub_state_cache_shard_ref_channel_size: 25,
// This initial value was tuned via a one-time experiment that
// showed an environment running our demo "auction" source + mv got
// 90%+ cache hits with a 1 MiB cache. This doesn't scale up to prod
// data sizes and doesn't help with multi-process replicas, but the
// memory usage seems unobjectionable enough to have it for the
// cases that it does help.
blob_cache_mem_limit_bytes: 1024 * 1024,
// TODO: This doesn't work with the process orchestrator. Instead,
// separate --log-prefix into --service-name and --enable-log-prefix
// options, where the first is always provided and the second is
Expand Down
126 changes: 126 additions & 0 deletions src/persist-client/src/internal/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! In-process caches of [Blob].
use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
use moka::sync::Cache;
use mz_ore::bytes::SegmentedBytes;
use mz_ore::cast::CastFrom;
use mz_persist::location::{Atomicity, Blob, BlobMetadata, ExternalError};
use tracing::error;

use crate::cfg::PersistConfig;
use crate::internal::metrics::Metrics;

// In-memory cache for [Blob].
#[derive(Debug)]
pub struct BlobMemCache {
metrics: Arc<Metrics>,
cache: Cache<String, SegmentedBytes>,
blob: Arc<dyn Blob + Send + Sync>,
}

impl BlobMemCache {
pub fn new(
cfg: &PersistConfig,
metrics: Arc<Metrics>,
blob: Arc<dyn Blob + Send + Sync>,
) -> Arc<dyn Blob + Send + Sync> {
let cache = Cache::<String, SegmentedBytes>::builder()
.max_capacity(u64::cast_from(cfg.blob_cache_mem_limit_bytes))
.weigher(|k, v| {
u32::try_from(v.len()).unwrap_or_else(|_| {
// We chunk off blobs at 128MiB, so the length should easily
// fit in a u32.
error!(
"unexpectedly large blob in persist cache {} bytes: {}",
v.len(),
k
);
u32::MAX
})
})
.build();
let blob = BlobMemCache {
metrics,
cache,
blob,
};
Arc::new(blob)
}

fn update_size_metrics(&self) {
self.metrics
.blob_cache_mem
.size_blobs
.set(self.cache.entry_count());
self.metrics
.blob_cache_mem
.size_bytes
.set(self.cache.weighted_size());
}
}

#[async_trait]
impl Blob for BlobMemCache {
async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
// First check if the blob is in the cache. If it is, return it. If not,
// fetch it and put it in the cache.
//
// Blobs are write-once modify-never, so we don't have to worry about
// any races or cache invalidations here. If the value is in the cache,
// it's also what's in s3 (if not, then there's a horrible bug somewhere
// else).
if let Some(cached_value) = self.cache.get(key) {
self.metrics.blob_cache_mem.hits_blobs.inc();
self.metrics
.blob_cache_mem
.hits_bytes
.inc_by(u64::cast_from(cached_value.len()));
return Ok(Some(cached_value));
}

// This could maybe use moka's async cache to unify any concurrent
// fetches for the same key? That's not particularly expected in
// persist's workload, so punt for now.
let res = self.blob.get(key).await?;
if let Some(blob) = res.as_ref() {
self.cache.insert(key.to_owned(), blob.clone());
self.update_size_metrics();
}
Ok(res)
}

async fn list_keys_and_metadata(
&self,
key_prefix: &str,
f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
) -> Result<(), ExternalError> {
self.blob.list_keys_and_metadata(key_prefix, f).await
}

async fn set(&self, key: &str, value: Bytes, atomic: Atomicity) -> Result<(), ExternalError> {
let () = self.blob.set(key, value.clone(), atomic).await?;
self.cache
.insert(key.to_owned(), SegmentedBytes::from(value));
self.update_size_metrics();
Ok(())
}

async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
let res = self.blob.delete(key).await;
self.cache.invalidate(key);
self.update_size_metrics();
res
}
}
38 changes: 38 additions & 0 deletions src/persist-client/src/internal/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ pub struct Metrics {
pub pubsub_client: PubSubClientMetrics,
/// Metrics for mfp/filter pushdown.
pub pushdown: PushdownMetrics,
/// Metrics for blob caching.
pub blob_cache_mem: BlobMemCache,

/// Metrics for the persist sink.
pub sink: SinkMetrics,
Expand Down Expand Up @@ -123,6 +125,7 @@ impl Metrics {
watch: WatchMetrics::new(registry),
pubsub_client: PubSubClientMetrics::new(registry),
pushdown: PushdownMetrics::new(registry),
blob_cache_mem: BlobMemCache::new(registry),
sink: SinkMetrics::new(registry),
s3_blob: S3BlobMetrics::new(registry),
postgres_consensus: PostgresConsensusMetrics::new(registry),
Expand Down Expand Up @@ -1893,6 +1896,41 @@ impl PushdownMetrics {
}
}

#[derive(Debug)]
pub struct BlobMemCache {
pub(crate) size_blobs: UIntGauge,
pub(crate) size_bytes: UIntGauge,
pub(crate) hits_blobs: IntCounter,
pub(crate) hits_bytes: IntCounter,
}

impl BlobMemCache {
fn new(registry: &MetricsRegistry) -> Self {
BlobMemCache {
size_blobs: registry.register(metric!(
name: "mz_persist_blob_cache_size_blobs",
help: "count of blobs in the cache",
const_labels: {"cache" => "mem"},
)),
size_bytes: registry.register(metric!(
name: "mz_persist_blob_cache_size_bytes",
help: "total size of blobs in the cache",
const_labels: {"cache" => "mem"},
)),
hits_blobs: registry.register(metric!(
name: "mz_persist_blob_cache_hits_blobs",
help: "number of blobs served via cache instead of s3",
const_labels: {"cache" => "mem"},
)),
hits_bytes: registry.register(metric!(
name: "mz_persist_blob_cache_hits_bytes",
help: "total size of blobs served via cache instead of s3",
const_labels: {"cache" => "mem"},
)),
}
}
}

#[derive(Debug)]
pub struct ExternalOpMetrics {
started: IntCounter,
Expand Down
1 change: 1 addition & 0 deletions src/persist-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ pub mod write;
/// An implementation of the public crate interface.
mod internal {
pub mod apply;
pub mod cache;
pub mod compact;
pub mod encoding;
pub mod gc;
Expand Down

0 comments on commit b55ac18

Please sign in to comment.