Skip to content

Commit 049533a

Browse files
authored
Merge branch 'main' into reuse-influxql-logical-planner
2 parents eb189a0 + 56bb7b0 commit 049533a

File tree

28 files changed

+251
-141
lines changed

28 files changed

+251
-141
lines changed

Cargo.lock

+63-10
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ system_catalog = { path = "system_catalog" }
114114
table_engine = { path = "table_engine" }
115115
table_kv = { path = "components/table_kv" }
116116
tempfile = "3.1.0"
117+
toml = "0.7"
117118
tracing_util = { path = "components/tracing_util" }
118119
trace_metric = { path = "components/trace_metric" }
119120
trace_metric_derive = { path = "components/trace_metric_derive" }
@@ -143,6 +144,7 @@ server = { workspace = true }
143144
signal-hook = "0.3"
144145
sort = "0.8.5"
145146
table_engine = { workspace = true }
147+
toml = { workspace = true }
146148
tracing_util = { workspace = true }
147149

148150
[build-dependencies]

analytic_engine/src/compaction/mod.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
use std::{collections::HashMap, str::FromStr, sync::Arc};
66

77
use common_util::config::{ReadableSize, TimeUnit};
8-
use serde::Deserialize;
8+
use serde::{Deserialize, Serialize};
99
use snafu::{ensure, Backtrace, GenerateBacktrace, ResultExt, Snafu};
1010
use tokio::sync::oneshot;
1111

@@ -57,15 +57,15 @@ pub enum Error {
5757
InvalidOption { error: String, backtrace: Backtrace },
5858
}
5959

60-
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Default)]
60+
#[derive(Debug, Clone, Copy, Deserialize, Default, PartialEq, Serialize)]
6161
pub enum CompactionStrategy {
6262
#[default]
6363
Default,
6464
TimeWindow(TimeWindowCompactionOptions),
6565
SizeTiered(SizeTieredCompactionOptions),
6666
}
6767

68-
#[derive(Debug, Clone, Copy, Deserialize, PartialEq)]
68+
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
6969
pub struct SizeTieredCompactionOptions {
7070
pub bucket_low: f32,
7171
pub bucket_high: f32,
@@ -75,7 +75,7 @@ pub struct SizeTieredCompactionOptions {
7575
pub max_input_sstable_size: ReadableSize,
7676
}
7777

78-
#[derive(Debug, Clone, Copy, Deserialize, PartialEq)]
78+
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
7979
pub struct TimeWindowCompactionOptions {
8080
pub size_tiered: SizeTieredCompactionOptions,
8181
// TODO(boyan) In fact right now we only supports TimeUnit::Milliseconds resolution.

analytic_engine/src/compaction/picker.rs

+13-4
Original file line numberDiff line numberDiff line change
@@ -185,10 +185,12 @@ impl Bucket {
185185

186186
fn with_files(files: Vec<FileHandle>) -> Self {
187187
let total: usize = files.iter().map(|f| f.size() as usize).sum();
188-
Self {
189-
avg_size: total / files.len(),
190-
files,
191-
}
188+
let avg_size = if files.is_empty() {
189+
0
190+
} else {
191+
total / files.len()
192+
};
193+
Self { avg_size, files }
192194
}
193195

194196
fn insert_file(&mut self, file: &FileHandle) {
@@ -823,4 +825,11 @@ mod tests {
823825
.collect::<Vec<_>>()
824826
);
825827
}
828+
829+
#[test]
830+
fn empty_bucket() {
831+
let bucket = Bucket::with_files(vec![]);
832+
assert_eq!(bucket.avg_size, 0);
833+
assert!(bucket.files.is_empty());
834+
}
826835
}

analytic_engine/src/compaction/scheduler.rs

+7-33
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@ use common_util::{
2121
time::DurationExt,
2222
};
2323
use log::{debug, error, info, warn};
24-
use serde::Deserialize;
24+
use serde::{Deserialize, Serialize};
2525
use snafu::{ResultExt, Snafu};
2626
use table_engine::table::TableId;
2727
use tokio::{
2828
sync::{
29-
mpsc::{self, error::SendError, Receiver, Sender},
29+
mpsc::{self, Receiver, Sender},
3030
Mutex,
3131
},
3232
time,
@@ -38,9 +38,7 @@ use crate::{
3838
PickerManager, TableCompactionRequest, WaitError, WaiterNotifier,
3939
},
4040
instance::{
41-
flush_compaction::{self, TableFlushOptions},
42-
write_worker::CompactionNotifier,
43-
Instance, SpaceStore,
41+
flush_compaction::TableFlushOptions, write_worker::CompactionNotifier, Instance, SpaceStore,
4442
},
4543
table::data::TableDataRef,
4644
TableOptions,
@@ -54,7 +52,7 @@ pub enum Error {
5452

5553
define_result!(Error);
5654

57-
#[derive(Debug, Clone, Deserialize)]
55+
#[derive(Debug, Clone, Deserialize, Serialize)]
5856
#[serde(default)]
5957
pub struct SchedulerConfig {
6058
pub schedule_channel_len: usize,
@@ -574,12 +572,11 @@ impl ScheduleWorker {
574572
None => {
575573
// Memory usage exceeds the threshold, let's put pack the
576574
// request.
577-
debug!(
578-
"Compaction task is ignored, because of high memory usage:{}, task:{:?}",
575+
warn!(
576+
"Compaction task is ignored, because of high memory usage:{}, task:{:?}, table:{}",
579577
self.memory_limit.usage.load(Ordering::Relaxed),
580-
compaction_task,
578+
compaction_task, table_data.name
581579
);
582-
self.put_back_compaction_request(compact_req).await;
583580
return;
584581
}
585582
};
@@ -596,29 +593,6 @@ impl ScheduleWorker {
596593
);
597594
}
598595

599-
async fn put_back_compaction_request(&self, req: TableCompactionRequest) {
600-
if let Err(SendError(ScheduleTask::Request(TableCompactionRequest {
601-
compaction_notifier,
602-
waiter,
603-
..
604-
}))) = self.sender.send(ScheduleTask::Request(req)).await
605-
{
606-
let e = Arc::new(
607-
flush_compaction::Other {
608-
msg: "Failed to put back the compaction request for memory usage exceeds",
609-
}
610-
.build(),
611-
);
612-
if let Some(notifier) = compaction_notifier {
613-
notifier.notify_err(e.clone());
614-
}
615-
616-
let waiter_notifier = WaiterNotifier::new(waiter);
617-
let wait_err = WaitError::Compaction { source: e };
618-
waiter_notifier.notify_wait_result(Err(wait_err));
619-
}
620-
}
621-
622596
async fn schedule(&mut self) {
623597
self.compact_tables().await;
624598
self.flush_tables().await;

analytic_engine/src/lib.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use wal::{
3636
pub use crate::{compaction::scheduler::SchedulerConfig, table_options::TableOptions};
3737

3838
/// Config of analytic engine
39-
#[derive(Debug, Clone, Deserialize)]
39+
#[derive(Debug, Clone, Deserialize, Serialize)]
4040
#[serde(default)]
4141
pub struct Config {
4242
/// Storage options of the engine
@@ -115,7 +115,7 @@ impl Default for Config {
115115
}
116116

117117
/// Config of wal based on obkv
118-
#[derive(Debug, Default, Clone, Deserialize)]
118+
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
119119
#[serde(default)]
120120
pub struct ObkvWalConfig {
121121
/// Obkv client config
@@ -222,7 +222,7 @@ impl From<WalNamespaceConfig> for NamespaceConfig {
222222
}
223223

224224
/// Config of wal based on obkv
225-
#[derive(Debug, Default, Clone, Deserialize)]
225+
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
226226
#[serde(default)]
227227
pub struct KafkaWalConfig {
228228
/// Kafka client config
@@ -235,7 +235,7 @@ pub struct KafkaWalConfig {
235235
}
236236

237237
/// Config for wal based on RocksDB
238-
#[derive(Debug, Clone, Deserialize)]
238+
#[derive(Debug, Clone, Deserialize, Serialize)]
239239
#[serde(default)]
240240
pub struct RocksDBConfig {
241241
/// Data directory used by RocksDB.
@@ -250,7 +250,7 @@ impl Default for RocksDBConfig {
250250
}
251251
}
252252
/// Options for wal storage backend
253-
#[derive(Debug, Clone, Deserialize)]
253+
#[derive(Debug, Clone, Deserialize, Serialize)]
254254
#[serde(tag = "type")]
255255
pub enum WalStorageConfig {
256256
RocksDB(Box<RocksDBConfig>),

analytic_engine/src/manifest/details.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use log::{debug, info, warn};
2222
use object_store::{ObjectStoreRef, Path};
2323
use parquet::data_type::AsBytes;
2424
use prost::Message;
25-
use serde::Deserialize;
25+
use serde::{Deserialize, Serialize};
2626
use snafu::{Backtrace, ResultExt, Snafu};
2727
use table_engine::table::TableId;
2828
use tokio::sync::Mutex;
@@ -157,7 +157,7 @@ impl MetaUpdateLogEntryIterator for MetaUpdateReaderImpl {
157157
}
158158

159159
/// Options for manifest
160-
#[derive(Debug, Clone, Deserialize)]
160+
#[derive(Debug, Clone, Deserialize, Serialize)]
161161
pub struct Options {
162162
/// Steps to do snapshot
163163
pub snapshot_every_n_updates: usize,

0 commit comments

Comments
 (0)