Skip to content

Commit

Permalink
feat: ensure reserved memory for computing tasks on compute node star…
Browse files Browse the repository at this point in the history
…ting (risingwavelabs#7670)

The total memory of a CN consists of:

1. computing memory (both stream & batch)
2. storage memory (block cache, meta cache, etc.)
3. memory for system usage

That is to say, we have **_CN total memory_ = _computing memory_ + _storage memory_ + _system memory_**, and both _CN total memory_ and _storage memory_ are configured by the user currently. This PR is to ensure that _computing memory_ and  _system memory_ are correctly reserved,, i.e. **_computing memory_ + _system memory_ = _CN total memory_ - _storage memory_ > a given amount of memory**. We set this "given amount of memory" as 1G for now (512M for computing and 512M for system). The check is performed on CN starting.

Approved-By: fuyufjh
Approved-By: hzxa21
  • Loading branch information
xx01cyx authored Feb 3, 2023
1 parent 20bdb72 commit dffc2f1
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 13 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ template:
# Whether to enable in-memory pure KV state backend
enable-in-memory-kv-state-backend: false

# Total available memory to LRU Manager in bytes
# Total available memory for the compute node in bytes
total-memory-bytes: 8589934592

# Parallelism of tasks per compute node
Expand Down
30 changes: 28 additions & 2 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ pub struct StorageConfig {
/// Maximum shared buffer size, writes attempting to exceed the capacity will stall until there
/// is enough space.
#[serde(default = "default::storage::shared_buffer_capacity_mb")]
pub shared_buffer_capacity_mb: u32,
pub shared_buffer_capacity_mb: usize,

/// State store url.
#[serde(default = "default::storage::state_store")]
Expand Down Expand Up @@ -348,6 +348,32 @@ impl Default for StorageConfig {
}
}

impl StorageConfig {
/// Checks whether an embedded compactor starts with a compute node.
#[inline(always)]
pub fn embedded_compactor_enabled(&self) -> bool {
// We treat `hummock+memory-shared` as a shared storage, so we won't start the compactor
// along with the compute node.
self.state_store == "hummock+memory"
|| self.state_store.starts_with("hummock+disk")
|| self.disable_remote_compactor
}

/// The maximal memory that storage components may use based on the configurations. Note that
/// this is the total storage memory for one compute node instead of the whole cluster.
pub fn total_storage_memory_limit_mb(&self) -> usize {
let total_memory = self.block_cache_capacity_mb
+ self.meta_cache_capacity_mb
+ self.shared_buffer_capacity_mb
+ self.file_cache.total_buffer_capacity_mb;
if self.embedded_compactor_enabled() {
total_memory + self.compactor_memory_limit_mb
} else {
total_memory
}
}
}

/// The subsection `[storage.file_cache]` in `risingwave.toml`.
///
/// It's put at [`StorageConfig::file_cache`].
Expand Down Expand Up @@ -533,7 +559,7 @@ mod default {
4
}

pub fn shared_buffer_capacity_mb() -> u32 {
pub fn shared_buffer_capacity_mb() -> usize {
1024
}

Expand Down
1 change: 1 addition & 0 deletions src/compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ num-traits = "0.2"
parking_lot = "0.12"
paste = "1"
pprof = { version = "0.11", features = ["flamegraph"] }
pretty-bytes = "0.2.2"
prometheus = { version = "0.13" }
prost = "0.11"
risingwave_batch = { path = "../batch" }
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub struct ComputeNodeOpts {
#[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
pub config_path: String,

/// Total available memory in bytes, used by LRU Manager
/// Total available memory for the compute node in bytes. Used by both computing and storage.
#[clap(long, env = "RW_TOTAL_MEMORY_BYTES", default_value_t = default_total_memory_bytes())]
pub total_memory_bytes: usize,

Expand Down
6 changes: 6 additions & 0 deletions src/compute/src/memory_management/memory_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ use risingwave_common::util::epoch::Epoch;
use risingwave_stream::executor::monitor::StreamingMetrics;
use risingwave_stream::task::LocalStreamManager;

/// The minimal memory requirement of computing tasks in megabytes.
pub const MIN_COMPUTE_MEMORY_MB: usize = 512;
/// The memory reserved for system usage (stack and code segment of processes, allocation overhead,
/// network buffer, etc.) in megabytes.
pub const SYSTEM_RESERVED_MEMORY_MB: usize = 512;

/// When `enable_managed_cache` is set, compute node will launch a [`GlobalMemoryManager`] to limit
/// the memory usage.
#[cfg_attr(not(target_os = "linux"), expect(dead_code))]
Expand Down
43 changes: 35 additions & 8 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ use std::sync::Arc;
use std::time::Duration;

use async_stack_trace::StackTraceManager;
use pretty_bytes::converter::convert;
use risingwave_batch::executor::BatchTaskMetrics;
use risingwave_batch::rpc::service::task_service::BatchServiceImpl;
use risingwave_batch::task::{BatchEnvironment, BatchManager};
use risingwave_common::config::{
load_config, AsyncStackTraceOption, MAX_CONNECTION_WINDOW_SIZE, STREAM_WINDOW_SIZE,
load_config, AsyncStackTraceOption, StorageConfig, MAX_CONNECTION_WINDOW_SIZE,
STREAM_WINDOW_SIZE,
};
use risingwave_common::monitor::process_linux::monitor_process;
use risingwave_common::util::addr::HostAddr;
Expand Down Expand Up @@ -52,7 +54,9 @@ use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;

use crate::memory_management::memory_manager::GlobalMemoryManager;
use crate::memory_management::memory_manager::{
GlobalMemoryManager, MIN_COMPUTE_MEMORY_MB, SYSTEM_RESERVED_MEMORY_MB,
};
use crate::rpc::service::config_service::ConfigServiceImpl;
use crate::rpc::service::exchange_metrics::ExchangeServiceMetrics;
use crate::rpc::service::exchange_service::ExchangeServiceImpl;
Expand All @@ -71,11 +75,13 @@ pub async fn compute_node_serve(
) -> (Vec<JoinHandle<()>>, Sender<()>) {
// Load the configuration.
let config = load_config(&opts.config_path, Some(opts.override_config));
validate_compute_node_memory_config(opts.total_memory_bytes, &config.storage);
info!(
"Starting compute node with config {:?} with debug assertions {}",
config,
if cfg!(debug_assertions) { "on" } else { "off" }
);

// Initialize all the configs
let storage_config = Arc::new(config.storage.clone());
let stream_config = Arc::new(config.streaming.clone());
Expand Down Expand Up @@ -144,12 +150,8 @@ pub async fn compute_node_serve(
let mut extra_info_sources: Vec<ExtraInfoSourceRef> = vec![];
if let Some(storage) = state_store.as_hummock_trait() {
extra_info_sources.push(storage.sstable_id_manager().clone());
// Note: we treat `hummock+memory-shared` as a shared storage, so we won't start the
// compactor along with compute node.
if config.storage.state_store == "hummock+memory"
|| config.storage.state_store.starts_with("hummock+disk")
|| storage_config.disable_remote_compactor
{

if storage_config.embedded_compactor_enabled() {
tracing::info!("start embedded compactor");
let read_memory_limiter = Arc::new(MemoryLimiter::new(
storage_config.compactor_memory_limit_mb as u64 * 1024 * 1024 / 2,
Expand Down Expand Up @@ -325,3 +327,28 @@ pub async fn compute_node_serve(

(join_handle_vec, shutdown_send)
}

/// Check whether the compute node has enough memory to perform computing tasks. Apart from storage,
/// it must reserve at least `MIN_COMPUTE_MEMORY_MB` for computing and `SYSTEM_RESERVED_MEMORY_MB`
/// for other system usage. Otherwise, it is not allowed to start.
fn validate_compute_node_memory_config(
cn_total_memory_bytes: usize,
storage_config: &StorageConfig,
) {
let storage_memory_mb = storage_config.total_storage_memory_limit_mb();
if storage_memory_mb << 20 > cn_total_memory_bytes {
panic!(
"The storage memory exceeds the total compute node memory:\nTotal compute node memory: {}\nStorage memory: {}\nAt least 1 GB memory should be reserved apart from the storage memory. Please increase the total compute node memory or decrease the storage memory in configurations and restart the compute node.",
convert(cn_total_memory_bytes as _),
convert((storage_memory_mb << 20) as _)
);
} else if (storage_memory_mb + MIN_COMPUTE_MEMORY_MB + SYSTEM_RESERVED_MEMORY_MB) << 20
>= cn_total_memory_bytes
{
panic!(
"No enough memory for computing and other system usage:\nTotal compute node memory: {}\nStorage memory: {}\nAt least 1 GB memory should be reserved apart from the storage memory. Please increase the total compute node memory or decrease the storage memory in configurations and restart the compute node.",
convert(cn_total_memory_bytes as _),
convert((storage_memory_mb << 20) as _)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub struct BufferTracker {

impl BufferTracker {
pub fn from_storage_config(config: &StorageConfig) -> Self {
let capacity = config.shared_buffer_capacity_mb as usize * (1 << 20);
let capacity = config.shared_buffer_capacity_mb * (1 << 20);
let flush_threshold = capacity * 4 / 5;
Self::new(capacity, flush_threshold)
}
Expand Down

0 comments on commit dffc2f1

Please sign in to comment.