diff --git a/proxy/src/limiter.rs b/proxy/src/limiter.rs index 9405dd9ec4..ea6b697de9 100644 --- a/proxy/src/limiter.rs +++ b/proxy/src/limiter.rs @@ -12,13 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{collections::HashSet, sync::RwLock}; +use std::{collections::HashSet, str::FromStr, sync::RwLock}; use datafusion::logical_expr::logical_plan::LogicalPlan; use macros::define_result; use query_frontend::plan::Plan; use serde::{Deserialize, Serialize}; use snafu::Snafu; +use time_ext::ReadableDuration; + +use crate::metrics::BLOCKED_REQUEST_COUNTER_VEC_GLOBAL; #[derive(Snafu, Debug)] #[snafu(visibility(pub))] @@ -33,12 +36,26 @@ pub enum Error { define_result!(Error); #[derive(Clone, Copy, Deserialize, Debug, PartialEq, Eq, Hash, Serialize, PartialOrd, Ord)] +#[serde(tag = "type", content = "content")] pub enum BlockRule { QueryWithoutPredicate, + /// Max time range a query can scan. + #[serde(deserialize_with = "deserialize_readable_duration")] + QueryRange(i64), AnyQuery, AnyInsert, } +fn deserialize_readable_duration<'de, D>(deserializer: D) -> std::result::Result +where + D: serde::Deserializer<'de>, +{ + let s: &str = Deserialize::deserialize(deserializer)?; + ReadableDuration::from_str(s) + .map(|d| d.0.as_millis() as i64) + .map_err(serde::de::Error::custom) +} + #[derive(Default, Clone, Deserialize, Debug, Serialize)] #[serde(default)] pub struct LimiterConfig { @@ -52,6 +69,17 @@ impl BlockRule { match self { BlockRule::QueryWithoutPredicate => self.is_query_without_predicate(plan), BlockRule::AnyQuery => matches!(plan, Plan::Query(_)), + BlockRule::QueryRange(threshold) => { + if let Plan::Query(plan) = plan { + if let Some(range) = plan.query_range() { + if range > *threshold { + return true; + } + } + } + + false + } BlockRule::AnyInsert => matches!(plan, Plan::Insert(_)), } } @@ -159,8 +187,18 @@ impl Limiter { /// /// Error will throws if the plan is forbidden to execute. pub fn try_limit(&self, plan: &Plan) -> Result<()> { - self.try_limit_by_block_list(plan)?; - self.try_limit_by_rules(plan) + let result = { + self.try_limit_by_block_list(plan)?; + self.try_limit_by_rules(plan) + }; + + if result.is_err() { + BLOCKED_REQUEST_COUNTER_VEC_GLOBAL + .with_label_values(&[plan.plan_type()]) + .inc(); + } + + result } pub fn add_write_block_list(&self, block_list: Vec) { diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index c55c47c27b..3478b6bdd7 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -61,6 +61,12 @@ lazy_static! { pub static ref HTTP_HANDLER_COUNTER_VEC_GLOBAL: IntCounterVec = register_int_counter_vec!("http_handler_counter", "Http handler counter", &["type"]) .unwrap(); + pub static ref BLOCKED_REQUEST_COUNTER_VEC_GLOBAL: IntCounterVec = register_int_counter_vec!( + "blocked_request_counter", + "Blocked request counter", + &["type"] + ) + .unwrap(); } lazy_static! { diff --git a/proxy/src/read.rs b/proxy/src/read.rs index effb88c086..e821750cde 100644 --- a/proxy/src/read.rs +++ b/proxy/src/read.rs @@ -240,7 +240,7 @@ impl Proxy { .try_limit(&plan) .box_err() .context(Internal { - msg: "Request is blocked", + msg: format!("Request is blocked, table_name:{table_name:?}"), })?; } diff --git a/query_frontend/src/plan.rs b/query_frontend/src/plan.rs index 94c8bca6b3..597c6810be 100644 --- a/query_frontend/src/plan.rs +++ b/query_frontend/src/plan.rs @@ -77,6 +77,21 @@ pub enum Plan { Exists(ExistsTablePlan), } +impl Plan { + pub fn plan_type(&self) -> &str { + match self { + Self::Query(_) => "query", + Self::Insert(_) => "insert", + Self::Create(_) + | Self::Drop(_) + | Self::Describe(_) + | Self::AlterTable(_) + | Self::Show(_) + | Self::Exists(_) => "other", + } + } +} + pub struct PriorityContext { pub time_range_threshold: u64, } @@ -201,6 +216,18 @@ impl QueryPlan { Some(priority) } + + /// When query contains invalid time range such as `[200, 100]`, it will + /// return None. + pub fn query_range(&self) -> Option { + self.extract_time_range().map(|time_range| { + time_range + .exclusive_end() + .as_i64() + .checked_sub(time_range.inclusive_start().as_i64()) + .unwrap_or(i64::MAX) + }) + } } impl Debug for QueryPlan {