From 25cd4191b7df66e206dd3579c7f305a3e4469943 Mon Sep 17 00:00:00 2001
From: jiacai2050 <dev@liujiacai.net>
Date: Wed, 3 Jan 2024 14:04:27 +0800
Subject: [PATCH 1/4] feat: block rules support query

---
 proxy/src/limiter.rs       | 15 +++++++++++++++
 query_frontend/src/plan.rs | 12 ++++++++++++
 2 files changed, 27 insertions(+)

diff --git a/proxy/src/limiter.rs b/proxy/src/limiter.rs
index 9405dd9ec4..eface21f79 100644
--- a/proxy/src/limiter.rs
+++ b/proxy/src/limiter.rs
@@ -33,8 +33,11 @@ pub enum Error {
 define_result!(Error);
 
 #[derive(Clone, Copy, Deserialize, Debug, PartialEq, Eq, Hash, Serialize, PartialOrd, Ord)]
+#[serde(tag = "type", content = "content")]
 pub enum BlockRule {
     QueryWithoutPredicate,
+    /// Max time range a query can scan.
+    QueryRange(i64),
     AnyQuery,
     AnyInsert,
 }
@@ -52,6 +55,18 @@ impl BlockRule {
         match self {
             BlockRule::QueryWithoutPredicate => self.is_query_without_predicate(plan),
             BlockRule::AnyQuery => matches!(plan, Plan::Query(_)),
+            BlockRule::QueryRange(threshold) => {
+                if let Plan::Query(plan) = plan {
+                    if let Some(range) = plan.query_range() {
+                        if range > *threshold {
+                            return true;
+                        }
+                    }
+                    false
+                } else {
+                    false
+                }
+            }
             BlockRule::AnyInsert => matches!(plan, Plan::Insert(_)),
         }
     }
diff --git a/query_frontend/src/plan.rs b/query_frontend/src/plan.rs
index 94c8bca6b3..85baf59b4c 100644
--- a/query_frontend/src/plan.rs
+++ b/query_frontend/src/plan.rs
@@ -201,6 +201,18 @@ impl QueryPlan {
 
         Some(priority)
     }
+
+    /// When query contains invalid time range such as `[200, 100]`, it will
+    /// return None.
+    pub fn query_range(&self) -> Option<i64> {
+        self.extract_time_range().map(|time_range| {
+            time_range
+                .exclusive_end()
+                .as_i64()
+                .checked_sub(time_range.inclusive_start().as_i64())
+                .unwrap_or(i64::MAX)
+        })
+    }
 }
 
 impl Debug for QueryPlan {

From c4d8447eccdf69ef59702ba593ee2d3e7ec39f4e Mon Sep 17 00:00:00 2001
From: jiacai2050 <dev@liujiacai.net>
Date: Wed, 3 Jan 2024 14:16:30 +0800
Subject: [PATCH 2/4] refactor if blocks

---
 proxy/src/limiter.rs | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/proxy/src/limiter.rs b/proxy/src/limiter.rs
index eface21f79..d2422b72d0 100644
--- a/proxy/src/limiter.rs
+++ b/proxy/src/limiter.rs
@@ -62,10 +62,9 @@ impl BlockRule {
                             return true;
                         }
                     }
-                    false
-                } else {
-                    false
                 }
+
+                false
             }
             BlockRule::AnyInsert => matches!(plan, Plan::Insert(_)),
         }

From 6879edea2d05e4f6f9eeab62d8c36018d239458c Mon Sep 17 00:00:00 2001
From: jiacai2050 <dev@liujiacai.net>
Date: Wed, 3 Jan 2024 20:46:18 +0800
Subject: [PATCH 3/4] add readable duration deserialize

---
 proxy/src/limiter.rs | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)

diff --git a/proxy/src/limiter.rs b/proxy/src/limiter.rs
index d2422b72d0..68d0ab08e9 100644
--- a/proxy/src/limiter.rs
+++ b/proxy/src/limiter.rs
@@ -12,13 +12,14 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use std::{collections::HashSet, sync::RwLock};
+use std::{collections::HashSet, str::FromStr, sync::RwLock};
 
 use datafusion::logical_expr::logical_plan::LogicalPlan;
 use macros::define_result;
 use query_frontend::plan::Plan;
 use serde::{Deserialize, Serialize};
 use snafu::Snafu;
+use time_ext::ReadableDuration;
 
 #[derive(Snafu, Debug)]
 #[snafu(visibility(pub))]
@@ -37,11 +38,22 @@ define_result!(Error);
 pub enum BlockRule {
     QueryWithoutPredicate,
     /// Max time range a query can scan.
+    #[serde(deserialize_with = "deserialize_readable_duration")]
     QueryRange(i64),
     AnyQuery,
     AnyInsert,
 }
 
+fn deserialize_readable_duration<'de, D>(deserializer: D) -> std::result::Result<i64, D::Error>
+where
+    D: serde::Deserializer<'de>,
+{
+    let s: &str = Deserialize::deserialize(deserializer)?;
+    ReadableDuration::from_str(s)
+        .map(|d| d.0.as_millis() as i64)
+        .map_err(serde::de::Error::custom)
+}
+
 #[derive(Default, Clone, Deserialize, Debug, Serialize)]
 #[serde(default)]
 pub struct LimiterConfig {

From 399e062c369314b3b9228b6f04848201c7af7160 Mon Sep 17 00:00:00 2001
From: jiacai2050 <dev@liujiacai.net>
Date: Thu, 4 Jan 2024 09:58:40 +0800
Subject: [PATCH 4/4] add block metrics

---
 proxy/src/limiter.rs       | 16 ++++++++++++++--
 proxy/src/metrics.rs       |  6 ++++++
 proxy/src/read.rs          |  2 +-
 query_frontend/src/plan.rs | 15 +++++++++++++++
 4 files changed, 36 insertions(+), 3 deletions(-)

diff --git a/proxy/src/limiter.rs b/proxy/src/limiter.rs
index 68d0ab08e9..ea6b697de9 100644
--- a/proxy/src/limiter.rs
+++ b/proxy/src/limiter.rs
@@ -21,6 +21,8 @@ use serde::{Deserialize, Serialize};
 use snafu::Snafu;
 use time_ext::ReadableDuration;
 
+use crate::metrics::BLOCKED_REQUEST_COUNTER_VEC_GLOBAL;
+
 #[derive(Snafu, Debug)]
 #[snafu(visibility(pub))]
 pub enum Error {
@@ -185,8 +187,18 @@ impl Limiter {
     ///
     /// Error will throws if the plan is forbidden to execute.
     pub fn try_limit(&self, plan: &Plan) -> Result<()> {
-        self.try_limit_by_block_list(plan)?;
-        self.try_limit_by_rules(plan)
+        let result = {
+            self.try_limit_by_block_list(plan)?;
+            self.try_limit_by_rules(plan)
+        };
+
+        if result.is_err() {
+            BLOCKED_REQUEST_COUNTER_VEC_GLOBAL
+                .with_label_values(&[plan.plan_type()])
+                .inc();
+        }
+
+        result
     }
 
     pub fn add_write_block_list(&self, block_list: Vec<String>) {
diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs
index c55c47c27b..3478b6bdd7 100644
--- a/proxy/src/metrics.rs
+++ b/proxy/src/metrics.rs
@@ -61,6 +61,12 @@ lazy_static! {
     pub static ref HTTP_HANDLER_COUNTER_VEC_GLOBAL: IntCounterVec =
         register_int_counter_vec!("http_handler_counter", "Http handler counter", &["type"])
             .unwrap();
+    pub static ref BLOCKED_REQUEST_COUNTER_VEC_GLOBAL: IntCounterVec = register_int_counter_vec!(
+        "blocked_request_counter",
+        "Blocked request counter",
+        &["type"]
+    )
+    .unwrap();
 }
 
 lazy_static! {
diff --git a/proxy/src/read.rs b/proxy/src/read.rs
index effb88c086..e821750cde 100644
--- a/proxy/src/read.rs
+++ b/proxy/src/read.rs
@@ -240,7 +240,7 @@ impl Proxy {
                 .try_limit(&plan)
                 .box_err()
                 .context(Internal {
-                    msg: "Request is blocked",
+                    msg: format!("Request is blocked, table_name:{table_name:?}"),
                 })?;
         }
 
diff --git a/query_frontend/src/plan.rs b/query_frontend/src/plan.rs
index 85baf59b4c..597c6810be 100644
--- a/query_frontend/src/plan.rs
+++ b/query_frontend/src/plan.rs
@@ -77,6 +77,21 @@ pub enum Plan {
     Exists(ExistsTablePlan),
 }
 
+impl Plan {
+    pub fn plan_type(&self) -> &str {
+        match self {
+            Self::Query(_) => "query",
+            Self::Insert(_) => "insert",
+            Self::Create(_)
+            | Self::Drop(_)
+            | Self::Describe(_)
+            | Self::AlterTable(_)
+            | Self::Show(_)
+            | Self::Exists(_) => "other",
+        }
+    }
+}
+
 pub struct PriorityContext {
     pub time_range_threshold: u64,
 }