Skip to content

Commit e7287eb

Browse files
authored
refactor: move wal structs and traits to wal crate (#1263)
## Rationale Ref - #1231 (comment). ## Detailed Changes See the commit messages. ## Test Plan Code refactor. All existing tests should pass. --------- Signed-off-by: tison <wander4096@gmail.com>
1 parent 58ed851 commit e7287eb

File tree

27 files changed

+538
-529
lines changed

27 files changed

+538
-529
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

analytic_engine/src/lib.rs

+3-164
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,10 @@ pub mod table_meta_set_impl;
3737
pub mod tests;
3838

3939
use manifest::details::Options as ManifestOptions;
40-
use message_queue::kafka::config::Config as KafkaConfig;
4140
use object_store::config::StorageOptions;
4241
use serde::{Deserialize, Serialize};
4342
use size_ext::ReadableSize;
44-
use table_kv::config::ObkvConfig;
45-
use time_ext::ReadableDuration;
46-
use wal::{
47-
message_queue_impl::config::Config as MessageQueueWalConfig,
48-
rocks_impl::config::Config as RocksDBWalConfig, table_kv_impl::model::NamespaceConfig,
49-
};
43+
use wal::config::StorageConfig;
5044

5145
pub use crate::{compaction::scheduler::SchedulerConfig, table_options::TableOptions};
5246

@@ -112,7 +106,7 @@ pub struct Config {
112106
/// + RocksDB
113107
/// + OBKV
114108
/// + Kafka
115-
pub wal: WalStorageConfig,
109+
pub wal: StorageConfig,
116110

117111
/// Recover mode
118112
///
@@ -163,165 +157,10 @@ impl Default for Config {
163157
write_sst_max_buffer_size: ReadableSize::mb(10),
164158
max_retry_flush_limit: 0,
165159
max_bytes_per_write_batch: None,
166-
wal: WalStorageConfig::RocksDB(Box::default()),
160+
wal: StorageConfig::RocksDB(Box::default()),
167161
remote_engine_client: remote_engine_client::config::Config::default(),
168162
recover_mode: RecoverMode::TableBased,
169163
metrics: MetricsOptions::default(),
170164
}
171165
}
172166
}
173-
174-
/// Config of wal based on obkv
175-
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
176-
#[serde(default)]
177-
pub struct ObkvWalConfig {
178-
/// Obkv client config
179-
pub obkv: ObkvConfig,
180-
/// Namespace config for data.
181-
pub data_namespace: WalNamespaceConfig,
182-
/// Namespace config for meta data
183-
pub meta_namespace: ManifestNamespaceConfig,
184-
}
185-
186-
/// Config of obkv wal based manifest
187-
#[derive(Debug, Clone, Serialize, Deserialize)]
188-
#[serde(default)]
189-
pub struct ManifestNamespaceConfig {
190-
/// Decide how many wal data shards will be created
191-
///
192-
/// NOTICE: it can just be set once, the later setting makes no effect.
193-
pub shard_num: usize,
194-
195-
/// Decide how many wal meta shards will be created
196-
///
197-
/// NOTICE: it can just be set once, the later setting makes no effect.
198-
pub meta_shard_num: usize,
199-
200-
pub init_scan_timeout: ReadableDuration,
201-
pub init_scan_batch_size: i32,
202-
pub clean_scan_timeout: ReadableDuration,
203-
pub clean_scan_batch_size: usize,
204-
pub bucket_create_parallelism: usize,
205-
}
206-
207-
impl Default for ManifestNamespaceConfig {
208-
fn default() -> Self {
209-
let namespace_config = NamespaceConfig::default();
210-
211-
Self {
212-
shard_num: namespace_config.wal_shard_num,
213-
meta_shard_num: namespace_config.table_unit_meta_shard_num,
214-
init_scan_timeout: namespace_config.init_scan_timeout,
215-
init_scan_batch_size: namespace_config.init_scan_batch_size,
216-
clean_scan_timeout: namespace_config.clean_scan_timeout,
217-
clean_scan_batch_size: namespace_config.clean_scan_batch_size,
218-
bucket_create_parallelism: namespace_config.bucket_create_parallelism,
219-
}
220-
}
221-
}
222-
223-
impl From<ManifestNamespaceConfig> for NamespaceConfig {
224-
fn from(manifest_config: ManifestNamespaceConfig) -> Self {
225-
NamespaceConfig {
226-
wal_shard_num: manifest_config.shard_num,
227-
table_unit_meta_shard_num: manifest_config.meta_shard_num,
228-
ttl: None,
229-
init_scan_timeout: manifest_config.init_scan_timeout,
230-
init_scan_batch_size: manifest_config.init_scan_batch_size,
231-
clean_scan_timeout: manifest_config.clean_scan_timeout,
232-
clean_scan_batch_size: manifest_config.clean_scan_batch_size,
233-
bucket_create_parallelism: manifest_config.bucket_create_parallelism,
234-
}
235-
}
236-
}
237-
238-
/// Config of obkv wal based wal module
239-
#[derive(Debug, Clone, Serialize, Deserialize)]
240-
#[serde(default)]
241-
pub struct WalNamespaceConfig {
242-
/// Decide how many wal data shards will be created
243-
///
244-
/// NOTICE: it can just be set once, the later setting makes no effect.
245-
pub shard_num: usize,
246-
247-
/// Decide how many wal meta shards will be created
248-
///
249-
/// NOTICE: it can just be set once, the later setting makes no effect.
250-
pub meta_shard_num: usize,
251-
252-
pub ttl: ReadableDuration,
253-
pub init_scan_timeout: ReadableDuration,
254-
pub init_scan_batch_size: i32,
255-
pub bucket_create_parallelism: usize,
256-
}
257-
258-
impl Default for WalNamespaceConfig {
259-
fn default() -> Self {
260-
let namespace_config = NamespaceConfig::default();
261-
262-
Self {
263-
shard_num: namespace_config.wal_shard_num,
264-
meta_shard_num: namespace_config.table_unit_meta_shard_num,
265-
ttl: namespace_config.ttl.unwrap(),
266-
init_scan_timeout: namespace_config.init_scan_timeout,
267-
init_scan_batch_size: namespace_config.init_scan_batch_size,
268-
bucket_create_parallelism: namespace_config.bucket_create_parallelism,
269-
}
270-
}
271-
}
272-
273-
impl From<WalNamespaceConfig> for NamespaceConfig {
274-
fn from(wal_config: WalNamespaceConfig) -> Self {
275-
Self {
276-
wal_shard_num: wal_config.shard_num,
277-
table_unit_meta_shard_num: wal_config.meta_shard_num,
278-
ttl: Some(wal_config.ttl),
279-
init_scan_timeout: wal_config.init_scan_timeout,
280-
init_scan_batch_size: wal_config.init_scan_batch_size,
281-
bucket_create_parallelism: wal_config.bucket_create_parallelism,
282-
..Default::default()
283-
}
284-
}
285-
}
286-
287-
/// Config of wal based on obkv
288-
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
289-
#[serde(default)]
290-
pub struct KafkaWalConfig {
291-
/// Kafka client config
292-
pub kafka: KafkaConfig,
293-
294-
/// Namespace config for data.
295-
pub data_namespace: MessageQueueWalConfig,
296-
/// Namespace config for meta data
297-
pub meta_namespace: MessageQueueWalConfig,
298-
}
299-
300-
/// Config for wal based on RocksDB
301-
#[derive(Debug, Clone, Deserialize, Serialize)]
302-
#[serde(default)]
303-
pub struct RocksDBConfig {
304-
/// Data directory used by RocksDB.
305-
pub data_dir: String,
306-
307-
pub data_namespace: RocksDBWalConfig,
308-
pub meta_namespace: RocksDBWalConfig,
309-
}
310-
311-
impl Default for RocksDBConfig {
312-
fn default() -> Self {
313-
Self {
314-
data_dir: "/tmp/ceresdb".to_string(),
315-
data_namespace: Default::default(),
316-
meta_namespace: Default::default(),
317-
}
318-
}
319-
}
320-
/// Options for wal storage backend
321-
#[derive(Debug, Clone, Deserialize, Serialize)]
322-
#[serde(tag = "type")]
323-
pub enum WalStorageConfig {
324-
RocksDB(Box<RocksDBConfig>),
325-
Obkv(Box<ObkvWalConfig>),
326-
Kafka(Box<KafkaWalConfig>),
327-
}

0 commit comments

Comments
 (0)