From 25cd4191b7df66e206dd3579c7f305a3e4469943 Mon Sep 17 00:00:00 2001 From: jiacai2050 <dev@liujiacai.net> Date: Wed, 3 Jan 2024 14:04:27 +0800 Subject: [PATCH 1/4] feat: block rules support query --- proxy/src/limiter.rs | 15 +++++++++++++++ query_frontend/src/plan.rs | 12 ++++++++++++ 2 files changed, 27 insertions(+) diff --git a/proxy/src/limiter.rs b/proxy/src/limiter.rs index 9405dd9ec4..eface21f79 100644 --- a/proxy/src/limiter.rs +++ b/proxy/src/limiter.rs @@ -33,8 +33,11 @@ pub enum Error { define_result!(Error); #[derive(Clone, Copy, Deserialize, Debug, PartialEq, Eq, Hash, Serialize, PartialOrd, Ord)] +#[serde(tag = "type", content = "content")] pub enum BlockRule { QueryWithoutPredicate, + /// Max time range a query can scan. + QueryRange(i64), AnyQuery, AnyInsert, } @@ -52,6 +55,18 @@ impl BlockRule { match self { BlockRule::QueryWithoutPredicate => self.is_query_without_predicate(plan), BlockRule::AnyQuery => matches!(plan, Plan::Query(_)), + BlockRule::QueryRange(threshold) => { + if let Plan::Query(plan) = plan { + if let Some(range) = plan.query_range() { + if range > *threshold { + return true; + } + } + false + } else { + false + } + } BlockRule::AnyInsert => matches!(plan, Plan::Insert(_)), } } diff --git a/query_frontend/src/plan.rs b/query_frontend/src/plan.rs index 94c8bca6b3..85baf59b4c 100644 --- a/query_frontend/src/plan.rs +++ b/query_frontend/src/plan.rs @@ -201,6 +201,18 @@ impl QueryPlan { Some(priority) } + + /// When query contains invalid time range such as `[200, 100]`, it will + /// return None. + pub fn query_range(&self) -> Option<i64> { + self.extract_time_range().map(|time_range| { + time_range + .exclusive_end() + .as_i64() + .checked_sub(time_range.inclusive_start().as_i64()) + .unwrap_or(i64::MAX) + }) + } } impl Debug for QueryPlan { From c4d8447eccdf69ef59702ba593ee2d3e7ec39f4e Mon Sep 17 00:00:00 2001 From: jiacai2050 <dev@liujiacai.net> Date: Wed, 3 Jan 2024 14:16:30 +0800 Subject: [PATCH 2/4] refactor if blocks --- proxy/src/limiter.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/proxy/src/limiter.rs b/proxy/src/limiter.rs index eface21f79..d2422b72d0 100644 --- a/proxy/src/limiter.rs +++ b/proxy/src/limiter.rs @@ -62,10 +62,9 @@ impl BlockRule { return true; } } - false - } else { - false } + + false } BlockRule::AnyInsert => matches!(plan, Plan::Insert(_)), } From 6879edea2d05e4f6f9eeab62d8c36018d239458c Mon Sep 17 00:00:00 2001 From: jiacai2050 <dev@liujiacai.net> Date: Wed, 3 Jan 2024 20:46:18 +0800 Subject: [PATCH 3/4] add readable duration deserialize --- proxy/src/limiter.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/proxy/src/limiter.rs b/proxy/src/limiter.rs index d2422b72d0..68d0ab08e9 100644 --- a/proxy/src/limiter.rs +++ b/proxy/src/limiter.rs @@ -12,13 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{collections::HashSet, sync::RwLock}; +use std::{collections::HashSet, str::FromStr, sync::RwLock}; use datafusion::logical_expr::logical_plan::LogicalPlan; use macros::define_result; use query_frontend::plan::Plan; use serde::{Deserialize, Serialize}; use snafu::Snafu; +use time_ext::ReadableDuration; #[derive(Snafu, Debug)] #[snafu(visibility(pub))] @@ -37,11 +38,22 @@ define_result!(Error); pub enum BlockRule { QueryWithoutPredicate, /// Max time range a query can scan. + #[serde(deserialize_with = "deserialize_readable_duration")] QueryRange(i64), AnyQuery, AnyInsert, } +fn deserialize_readable_duration<'de, D>(deserializer: D) -> std::result::Result<i64, D::Error> +where + D: serde::Deserializer<'de>, +{ + let s: &str = Deserialize::deserialize(deserializer)?; + ReadableDuration::from_str(s) + .map(|d| d.0.as_millis() as i64) + .map_err(serde::de::Error::custom) +} + #[derive(Default, Clone, Deserialize, Debug, Serialize)] #[serde(default)] pub struct LimiterConfig { From 399e062c369314b3b9228b6f04848201c7af7160 Mon Sep 17 00:00:00 2001 From: jiacai2050 <dev@liujiacai.net> Date: Thu, 4 Jan 2024 09:58:40 +0800 Subject: [PATCH 4/4] add block metrics --- proxy/src/limiter.rs | 16 ++++++++++++++-- proxy/src/metrics.rs | 6 ++++++ proxy/src/read.rs | 2 +- query_frontend/src/plan.rs | 15 +++++++++++++++ 4 files changed, 36 insertions(+), 3 deletions(-) diff --git a/proxy/src/limiter.rs b/proxy/src/limiter.rs index 68d0ab08e9..ea6b697de9 100644 --- a/proxy/src/limiter.rs +++ b/proxy/src/limiter.rs @@ -21,6 +21,8 @@ use serde::{Deserialize, Serialize}; use snafu::Snafu; use time_ext::ReadableDuration; +use crate::metrics::BLOCKED_REQUEST_COUNTER_VEC_GLOBAL; + #[derive(Snafu, Debug)] #[snafu(visibility(pub))] pub enum Error { @@ -185,8 +187,18 @@ impl Limiter { /// /// Error will throws if the plan is forbidden to execute. pub fn try_limit(&self, plan: &Plan) -> Result<()> { - self.try_limit_by_block_list(plan)?; - self.try_limit_by_rules(plan) + let result = { + self.try_limit_by_block_list(plan)?; + self.try_limit_by_rules(plan) + }; + + if result.is_err() { + BLOCKED_REQUEST_COUNTER_VEC_GLOBAL + .with_label_values(&[plan.plan_type()]) + .inc(); + } + + result } pub fn add_write_block_list(&self, block_list: Vec<String>) { diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index c55c47c27b..3478b6bdd7 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -61,6 +61,12 @@ lazy_static! { pub static ref HTTP_HANDLER_COUNTER_VEC_GLOBAL: IntCounterVec = register_int_counter_vec!("http_handler_counter", "Http handler counter", &["type"]) .unwrap(); + pub static ref BLOCKED_REQUEST_COUNTER_VEC_GLOBAL: IntCounterVec = register_int_counter_vec!( + "blocked_request_counter", + "Blocked request counter", + &["type"] + ) + .unwrap(); } lazy_static! { diff --git a/proxy/src/read.rs b/proxy/src/read.rs index effb88c086..e821750cde 100644 --- a/proxy/src/read.rs +++ b/proxy/src/read.rs @@ -240,7 +240,7 @@ impl Proxy { .try_limit(&plan) .box_err() .context(Internal { - msg: "Request is blocked", + msg: format!("Request is blocked, table_name:{table_name:?}"), })?; } diff --git a/query_frontend/src/plan.rs b/query_frontend/src/plan.rs index 85baf59b4c..597c6810be 100644 --- a/query_frontend/src/plan.rs +++ b/query_frontend/src/plan.rs @@ -77,6 +77,21 @@ pub enum Plan { Exists(ExistsTablePlan), } +impl Plan { + pub fn plan_type(&self) -> &str { + match self { + Self::Query(_) => "query", + Self::Insert(_) => "insert", + Self::Create(_) + | Self::Drop(_) + | Self::Describe(_) + | Self::AlterTable(_) + | Self::Show(_) + | Self::Exists(_) => "other", + } + } +} + pub struct PriorityContext { pub time_range_threshold: u64, }