Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: block rules support query #1420

Merged
merged 4 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 41 additions & 3 deletions proxy/src/limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{collections::HashSet, sync::RwLock};
use std::{collections::HashSet, str::FromStr, sync::RwLock};

use datafusion::logical_expr::logical_plan::LogicalPlan;
use macros::define_result;
use query_frontend::plan::Plan;
use serde::{Deserialize, Serialize};
use snafu::Snafu;
use time_ext::ReadableDuration;

use crate::metrics::BLOCKED_REQUEST_COUNTER_VEC_GLOBAL;

#[derive(Snafu, Debug)]
#[snafu(visibility(pub))]
Expand All @@ -33,12 +36,26 @@ pub enum Error {
define_result!(Error);

#[derive(Clone, Copy, Deserialize, Debug, PartialEq, Eq, Hash, Serialize, PartialOrd, Ord)]
#[serde(tag = "type", content = "content")]
pub enum BlockRule {
QueryWithoutPredicate,
/// Max time range a query can scan.
#[serde(deserialize_with = "deserialize_readable_duration")]
QueryRange(i64),
AnyQuery,
AnyInsert,
}

fn deserialize_readable_duration<'de, D>(deserializer: D) -> std::result::Result<i64, D::Error>
where
D: serde::Deserializer<'de>,
{
let s: &str = Deserialize::deserialize(deserializer)?;
ReadableDuration::from_str(s)
.map(|d| d.0.as_millis() as i64)
.map_err(serde::de::Error::custom)
}

#[derive(Default, Clone, Deserialize, Debug, Serialize)]
#[serde(default)]
pub struct LimiterConfig {
Expand All @@ -52,6 +69,17 @@ impl BlockRule {
match self {
BlockRule::QueryWithoutPredicate => self.is_query_without_predicate(plan),
BlockRule::AnyQuery => matches!(plan, Plan::Query(_)),
BlockRule::QueryRange(threshold) => {
if let Plan::Query(plan) = plan {
if let Some(range) = plan.query_range() {
if range > *threshold {
return true;
}
}
}

false
}
BlockRule::AnyInsert => matches!(plan, Plan::Insert(_)),
}
}
Expand Down Expand Up @@ -159,8 +187,18 @@ impl Limiter {
///
/// Error will throws if the plan is forbidden to execute.
pub fn try_limit(&self, plan: &Plan) -> Result<()> {
self.try_limit_by_block_list(plan)?;
self.try_limit_by_rules(plan)
let result = {
self.try_limit_by_block_list(plan)?;
self.try_limit_by_rules(plan)
};

if result.is_err() {
BLOCKED_REQUEST_COUNTER_VEC_GLOBAL
.with_label_values(&[plan.plan_type()])
.inc();
}

result
}

pub fn add_write_block_list(&self, block_list: Vec<String>) {
Expand Down
6 changes: 6 additions & 0 deletions proxy/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ lazy_static! {
pub static ref HTTP_HANDLER_COUNTER_VEC_GLOBAL: IntCounterVec =
register_int_counter_vec!("http_handler_counter", "Http handler counter", &["type"])
.unwrap();
pub static ref BLOCKED_REQUEST_COUNTER_VEC_GLOBAL: IntCounterVec = register_int_counter_vec!(
"blocked_request_counter",
"Blocked request counter",
&["type"]
)
.unwrap();
}

lazy_static! {
Expand Down
2 changes: 1 addition & 1 deletion proxy/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ impl Proxy {
.try_limit(&plan)
.box_err()
.context(Internal {
msg: "Request is blocked",
msg: format!("Request is blocked, table_name:{table_name:?}"),
})?;
}

Expand Down
27 changes: 27 additions & 0 deletions query_frontend/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,21 @@ pub enum Plan {
Exists(ExistsTablePlan),
}

impl Plan {
pub fn plan_type(&self) -> &str {
match self {
Self::Query(_) => "query",
Self::Insert(_) => "insert",
Self::Create(_)
| Self::Drop(_)
| Self::Describe(_)
| Self::AlterTable(_)
| Self::Show(_)
| Self::Exists(_) => "other",
}
}
}

pub struct PriorityContext {
pub time_range_threshold: u64,
}
Expand Down Expand Up @@ -201,6 +216,18 @@ impl QueryPlan {

Some(priority)
}

/// When query contains invalid time range such as `[200, 100]`, it will
/// return None.
pub fn query_range(&self) -> Option<i64> {
self.extract_time_range().map(|time_range| {
time_range
.exclusive_end()
.as_i64()
.checked_sub(time_range.inclusive_start().as_i64())
.unwrap_or(i64::MAX)
})
}
}

impl Debug for QueryPlan {
Expand Down