Skip to content

Commit 046cdd3

Browse files
authored
feat: impl DoNothing wal (#1311)
## Rationale In some deployments when latest data is allowed to lost, wal is not a required module. Also, wal may cost too much resources if write throughput is high, by disable it, we can save those resources. ## Detailed Changes - Add a `DoNothing` wal implementation - Introduce `WalConfig`, and add a new `disable_data` flag in it. - This `Config` wrap old `StorageConfig`, and use `#[serde(flatten)]` to keep backwards compatible. ## Test Plan Manually
1 parent 7de0519 commit 046cdd3

File tree

10 files changed

+242
-117
lines changed

10 files changed

+242
-117
lines changed

analytic_engine/src/lib.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use object_store::config::StorageOptions;
4141
use serde::{Deserialize, Serialize};
4242
use size_ext::ReadableSize;
4343
use time_ext::ReadableDuration;
44-
use wal::config::StorageConfig;
44+
use wal::config::Config as WalConfig;
4545

4646
pub use crate::{compaction::scheduler::SchedulerConfig, table_options::TableOptions};
4747

@@ -112,7 +112,7 @@ pub struct Config {
112112
/// + RocksDB
113113
/// + OBKV
114114
/// + Kafka
115-
pub wal: StorageConfig,
115+
pub wal: WalConfig,
116116

117117
/// Recover mode
118118
///
@@ -188,7 +188,7 @@ impl Default for Config {
188188
max_bytes_per_write_batch: None,
189189
mem_usage_sampling_interval: ReadableDuration::secs(0),
190190
wal_encode: WalEncodeConfig::default(),
191-
wal: StorageConfig::RocksDB(Box::default()),
191+
wal: WalConfig::default(),
192192
remote_engine_client: remote_engine_client::config::Config::default(),
193193
recover_mode: RecoverMode::TableBased,
194194
metrics: MetricsOptions::default(),

analytic_engine/src/tests/util.rs

+26-16
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use table_engine::{
4040
use tempfile::TempDir;
4141
use time_ext::ReadableDuration;
4242
use wal::{
43-
config::StorageConfig,
43+
config::{Config as WalConfig, StorageConfig},
4444
manager::{OpenedWals, WalRuntimes, WalsOpener},
4545
rocksdb_impl::{config::RocksDBStorageConfig, manager::RocksDBWalsOpener},
4646
table_kv_impl::wal::MemWalsOpener,
@@ -506,10 +506,13 @@ impl Builder {
506506
data_dir: dir.path().to_str().unwrap().to_string(),
507507
}),
508508
},
509-
wal: StorageConfig::RocksDB(Box::new(RocksDBStorageConfig {
510-
data_dir: dir.path().to_str().unwrap().to_string(),
511-
..Default::default()
512-
})),
509+
wal: WalConfig {
510+
storage: StorageConfig::RocksDB(Box::new(RocksDBStorageConfig {
511+
data_dir: dir.path().to_str().unwrap().to_string(),
512+
..Default::default()
513+
})),
514+
disable_data: false,
515+
},
513516
..Default::default()
514517
};
515518

@@ -581,11 +584,13 @@ impl Default for RocksDBEngineBuildContext {
581584
data_dir: dir.path().to_str().unwrap().to_string(),
582585
}),
583586
},
584-
585-
wal: StorageConfig::RocksDB(Box::new(RocksDBStorageConfig {
586-
data_dir: dir.path().to_str().unwrap().to_string(),
587-
..Default::default()
588-
})),
587+
wal: WalConfig {
588+
storage: StorageConfig::RocksDB(Box::new(RocksDBStorageConfig {
589+
data_dir: dir.path().to_str().unwrap().to_string(),
590+
..Default::default()
591+
})),
592+
disable_data: false,
593+
},
589594
..Default::default()
590595
};
591596

@@ -614,11 +619,13 @@ impl Clone for RocksDBEngineBuildContext {
614619
};
615620

616621
config.storage = storage;
617-
config.wal = StorageConfig::RocksDB(Box::new(RocksDBStorageConfig {
618-
data_dir: dir.path().to_str().unwrap().to_string(),
619-
..Default::default()
620-
}));
621-
622+
config.wal = WalConfig {
623+
storage: StorageConfig::RocksDB(Box::new(RocksDBStorageConfig {
624+
data_dir: dir.path().to_str().unwrap().to_string(),
625+
..Default::default()
626+
})),
627+
disable_data: false,
628+
};
622629
Self {
623630
config,
624631
open_method: self.open_method,
@@ -674,7 +681,10 @@ impl Default for MemoryEngineBuildContext {
674681
data_dir: dir.path().to_str().unwrap().to_string(),
675682
}),
676683
},
677-
wal: StorageConfig::Obkv(Box::default()),
684+
wal: WalConfig {
685+
storage: StorageConfig::Obkv(Box::default()),
686+
disable_data: false,
687+
},
678688
..Default::default()
679689
};
680690

src/ceresdb/src/setup.rs

+13-1
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,16 @@ fn build_engine_runtimes(config: &RuntimeConfig) -> EngineRuntimes {
100100
}
101101
}
102102

103+
fn validate_config(config: &Config) {
104+
let is_data_wal_disabled = config.analytic.wal.disable_data;
105+
if is_data_wal_disabled {
106+
let is_cluster = config.cluster_deployment.is_some();
107+
if !is_cluster {
108+
panic!("Invalid config, we can only disable data wal in cluster deployments")
109+
}
110+
}
111+
}
112+
103113
/// Run a server, returns when the server is shutdown by user
104114
pub fn run_server(config: Config, log_runtime: RuntimeLevel) {
105115
let runtimes = Arc::new(build_engine_runtimes(&config.runtime));
@@ -108,8 +118,10 @@ pub fn run_server(config: Config, log_runtime: RuntimeLevel) {
108118

109119
info!("Server starts up, config:{:#?}", config);
110120

121+
validate_config(&config);
122+
111123
runtimes.default_runtime.block_on(async {
112-
match config.analytic.wal {
124+
match config.analytic.wal.storage {
113125
StorageConfig::RocksDB(_) => {
114126
#[cfg(feature = "wal-rocksdb")]
115127
{

src/wal/src/config.rs

+21
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,27 @@ pub type KafkaStorageConfig = crate::message_queue_impl::config::KafkaStorageCon
3232
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
3333
pub struct KafkaStorageConfig;
3434

35+
#[derive(Debug, Clone, Deserialize, Serialize)]
36+
pub struct Config {
37+
// The flatten attribute inlines keys from a field into the parent struct.
38+
// That's to say `storage` has no real usage, it's just a placeholder.
39+
#[serde(flatten)]
40+
pub storage: StorageConfig,
41+
/// If true, data wal will return Ok directly, without any IO operations.
42+
// Note: this is only used for test, we shouldn't enable this in production.
43+
#[serde(default)]
44+
pub disable_data: bool,
45+
}
46+
47+
impl Default for Config {
48+
fn default() -> Self {
49+
Self {
50+
storage: StorageConfig::RocksDB(Box::default()),
51+
disable_data: false,
52+
}
53+
}
54+
}
55+
3556
/// Options for wal storage backend
3657
#[derive(Debug, Clone, Deserialize, Serialize)]
3758
#[serde(tag = "type")]

src/wal/src/dummy.rs

+74
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright 2023 The CeresDB Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use async_trait::async_trait;
16+
use common_types::SequenceNumber;
17+
18+
use crate::{
19+
log_batch::LogWriteBatch,
20+
manager::{
21+
BatchLogIteratorAdapter, ReadContext, ReadRequest, RegionId, Result, ScanContext,
22+
ScanRequest, WalLocation, WalManager, WriteContext,
23+
},
24+
};
25+
26+
#[derive(Debug)]
27+
pub struct DoNothing;
28+
29+
#[async_trait]
30+
impl WalManager for DoNothing {
31+
async fn sequence_num(&self, _location: WalLocation) -> Result<SequenceNumber> {
32+
Ok(0)
33+
}
34+
35+
async fn mark_delete_entries_up_to(
36+
&self,
37+
_location: WalLocation,
38+
_sequence_num: SequenceNumber,
39+
) -> Result<()> {
40+
Ok(())
41+
}
42+
43+
async fn close_region(&self, _region: RegionId) -> Result<()> {
44+
Ok(())
45+
}
46+
47+
async fn close_gracefully(&self) -> Result<()> {
48+
Ok(())
49+
}
50+
51+
async fn read_batch(
52+
&self,
53+
_ctx: &ReadContext,
54+
_req: &ReadRequest,
55+
) -> Result<BatchLogIteratorAdapter> {
56+
Ok(BatchLogIteratorAdapter::empty())
57+
}
58+
59+
async fn write(&self, _ctx: &WriteContext, _batch: &LogWriteBatch) -> Result<SequenceNumber> {
60+
Ok(0)
61+
}
62+
63+
async fn scan(
64+
&self,
65+
_ctx: &ScanContext,
66+
_req: &ScanRequest,
67+
) -> Result<BatchLogIteratorAdapter> {
68+
Ok(BatchLogIteratorAdapter::empty())
69+
}
70+
71+
async fn get_statistics(&self) -> Option<String> {
72+
None
73+
}
74+
}

src/wal/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#![feature(trait_alias)]
1818

1919
pub mod config;
20+
mod dummy;
2021
pub mod kv_encoder;
2122
pub mod log_batch;
2223
pub mod manager;

src/wal/src/manager.rs

+9-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use runtime::Runtime;
2828
use snafu::ResultExt;
2929

3030
use crate::{
31-
config::StorageConfig,
31+
config::Config,
3232
log_batch::{LogEntry, LogWriteBatch, PayloadDecodeContext, PayloadDecoder},
3333
metrics::WAL_WRITE_BYTES_HISTOGRAM,
3434
};
@@ -400,6 +400,13 @@ impl BatchLogIteratorAdapter {
400400
}
401401
}
402402

403+
pub fn empty() -> Self {
404+
Self {
405+
iter: None,
406+
batch_size: 1,
407+
}
408+
}
409+
403410
async fn simulated_async_next<D, F>(
404411
&mut self,
405412
decoder: D,
@@ -549,7 +556,7 @@ pub(crate) const MANIFEST_DIR_NAME: &str = "manifest";
549556

550557
#[async_trait]
551558
pub trait WalsOpener: Send + Sync + Default {
552-
async fn open_wals(&self, config: &StorageConfig, runtimes: WalRuntimes) -> Result<OpenedWals>;
559+
async fn open_wals(&self, config: &Config, runtimes: WalRuntimes) -> Result<OpenedWals>;
553560
}
554561

555562
#[cfg(test)]

src/wal/src/message_queue_impl/wal.rs

+15-10
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use runtime::Runtime;
2424
use snafu::ResultExt;
2525

2626
use crate::{
27-
config::StorageConfig,
27+
config::{Config, StorageConfig},
2828
log_batch::{LogEntry, LogWriteBatch},
2929
manager::{
3030
self, error::*, AsyncLogIterator, BatchLogIteratorAdapter, OpenedWals, ReadContext,
@@ -138,8 +138,8 @@ pub struct KafkaWalsOpener;
138138

139139
#[async_trait]
140140
impl WalsOpener for KafkaWalsOpener {
141-
async fn open_wals(&self, config: &StorageConfig, runtimes: WalRuntimes) -> Result<OpenedWals> {
142-
let kafka_wal_config = match config {
141+
async fn open_wals(&self, config: &Config, runtimes: WalRuntimes) -> Result<OpenedWals> {
142+
let kafka_wal_config = match &config.storage {
143143
StorageConfig::Kafka(config) => config.clone(),
144144
_ => {
145145
return InvalidWalConfig {
@@ -156,12 +156,17 @@ impl WalsOpener for KafkaWalsOpener {
156156
let kafka = KafkaImpl::new(kafka_wal_config.kafka.clone())
157157
.await
158158
.context(OpenKafka)?;
159-
let data_wal = MessageQueueImpl::new(
160-
WAL_DIR_NAME.to_string(),
161-
kafka.clone(),
162-
default_runtime.clone(),
163-
kafka_wal_config.data_namespace,
164-
);
159+
let data_wal = if config.disable_data {
160+
Arc::new(crate::dummy::DoNothing) as Arc<_>
161+
} else {
162+
let data_wal = MessageQueueImpl::new(
163+
WAL_DIR_NAME.to_string(),
164+
kafka.clone(),
165+
default_runtime.clone(),
166+
kafka_wal_config.data_namespace,
167+
);
168+
Arc::new(data_wal) as Arc<_>
169+
};
165170

166171
let manifest_wal = MessageQueueImpl::new(
167172
MANIFEST_DIR_NAME.to_string(),
@@ -171,7 +176,7 @@ impl WalsOpener for KafkaWalsOpener {
171176
);
172177

173178
Ok(OpenedWals {
174-
data_wal: Arc::new(data_wal),
179+
data_wal,
175180
manifest_wal: Arc::new(manifest_wal),
176181
})
177182
}

0 commit comments

Comments
 (0)