Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: table blacklist not work for write #1507

Merged
merged 5 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions integration_tests/config/horaedb-cluster-0.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,5 @@ timeout = "5s"
server_addrs = ['127.0.0.1:2379']

[limiter]
write_block_list = ['mytable1']
read_block_list = ['mytable1']
write_block_list = ['block_test_table']
read_block_list = ['block_test_table']
4 changes: 2 additions & 2 deletions integration_tests/config/horaedb-cluster-1.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,5 @@ timeout = "5s"
server_addrs = ['127.0.0.1:2379']

[limiter]
write_block_list = ['mytable1']
read_block_list = ['mytable1']
write_block_list = ['block_test_table']
read_block_list = ['block_test_table']
74 changes: 68 additions & 6 deletions integration_tests/sdk/rust/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use horaedb_client::{
};

const ENDPOINT: &str = "127.0.0.1:8831";
const BLOCKED_TABLE: &str = "block_test_table";

struct TestDatas {
col_names: Vec<String>,
Expand Down Expand Up @@ -83,11 +84,12 @@ async fn main() {
let now = current_timestamp_ms();

let test_datas = generate_test_datas(now);

test_auto_create_table(&client, &rpc_ctx, now, &test_datas).await;
test_add_column(&client, &rpc_ctx, now, &test_datas).await;
test_block_table(&client, &rpc_ctx, now).await;

drop_table_if_exists(&client, &rpc_ctx, now).await;
drop_test_table_if_exists(&client, &rpc_ctx, now).await;
drop_table_if_exists(&client, &rpc_ctx, BLOCKED_TABLE).await;
print!("Test done")
}

Expand All @@ -99,7 +101,7 @@ async fn test_auto_create_table(
) {
println!("Test auto create table");

drop_table_if_exists(client, rpc_ctx, timestamp).await;
drop_test_table_if_exists(client, rpc_ctx, timestamp).await;

write(client, rpc_ctx, timestamp, test_datas, false).await;
sql_query(client, rpc_ctx, timestamp, test_datas, false).await;
Expand All @@ -117,11 +119,71 @@ async fn test_add_column(
sql_query(client, rpc_ctx, timestamp, test_datas, true).await;
}

async fn drop_table_if_exists(client: &Arc<dyn DbClient>, rpc_ctx: &RpcContext, timestamp: i64) {
async fn test_block_table(client: &Arc<dyn DbClient>, rpc_ctx: &RpcContext, timestamp: i64) {
println!("Test auto create table");

drop_table_if_exists(client, rpc_ctx, BLOCKED_TABLE).await;
create_table(client, rpc_ctx, BLOCKED_TABLE).await;

// try to write, should return table blocked error
let mut write_req = WriteRequest::default();
let mut points = Vec::new();
let builder = PointBuilder::new(BLOCKED_TABLE.to_string())
.timestamp(timestamp)
.tag("name", Value::String("name1".to_string()))
.field("value", Value::Double(0.42));
let point = builder.build().unwrap();
points.push(point);
write_req.add_points(points);
if let Err(e) = client.write(rpc_ctx, &write_req).await {
let e = e.to_string();
assert!(e.contains("Table operation is blocked"));
} else {
panic!("it should return blocked error");
}

// try to query, should be blocked, too
let query_req = SqlQueryRequest {
tables: vec![BLOCKED_TABLE.to_string()],
sql: format!("SELECT * from {}", BLOCKED_TABLE),
};
if let Err(e) = client.sql_query(rpc_ctx, &query_req).await {
let e = e.to_string();
assert!(e.contains("Table operation is blocked"));
} else {
panic!("it should return blocked error");
}
}

async fn drop_test_table_if_exists(
client: &Arc<dyn DbClient>,
rpc_ctx: &RpcContext,
timestamp: i64,
) {
let test_table = format!("test_table_{timestamp}");
drop_table_if_exists(client, rpc_ctx, &test_table).await;
}

async fn drop_table_if_exists(client: &Arc<dyn DbClient>, rpc_ctx: &RpcContext, table: &str) {
let query_req = SqlQueryRequest {
tables: vec![test_table.clone()],
sql: format!("DROP TABLE IF EXISTS {test_table}"),
tables: vec![table.to_string()],
sql: format!("DROP TABLE IF EXISTS {table}"),
};
let _ = client.sql_query(rpc_ctx, &query_req).await.unwrap();
}

async fn create_table(client: &Arc<dyn DbClient>, rpc_ctx: &RpcContext, table: &str) {
let query_req = SqlQueryRequest {
tables: vec![table.to_string()],
sql: format!(
"
CREATE TABLE {} (
name string TAG,
value double NOT NULL,
t timestamp NOT NULL,
timestamp KEY (t))",
table
),
};
let _ = client.sql_query(rpc_ctx, &query_req).await.unwrap();
}
Expand Down
16 changes: 11 additions & 5 deletions src/proxy/src/limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ use crate::metrics::BLOCKED_REQUEST_COUNTER_VEC_GLOBAL;
#[derive(Snafu, Debug)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Queried table is blocked, table:{}", table,))]
BlockedTable { table: String },
#[snafu(display("Table operation is blocked, table:{}, op:{}", table, op))]
BlockedTable { table: String, op: String },

#[snafu(display("Query is blocked by rule:{:?}", rule))]
BlockedByRule { rule: BlockRule },
#[snafu(display("Table operation is blocked by rule:{:?}, op:{}", rule, op))]
BlockedByRule { rule: BlockRule, op: String },
}

define_result!(Error);
Expand Down Expand Up @@ -158,6 +158,7 @@ impl Limiter {
{
BlockedTable {
table: blocked_table,
op: plan.plan_type(),
}
.fail()?;
}
Expand All @@ -174,6 +175,7 @@ impl Limiter {
{
BlockedTable {
table: insert.table.name(),
op: plan.plan_type(),
}
.fail()?;
}
Expand All @@ -187,7 +189,11 @@ impl Limiter {
fn try_limit_by_rules(&self, plan: &Plan) -> Result<()> {
self.rules.read().unwrap().iter().try_for_each(|rule| {
if rule.should_limit(plan) {
BlockedByRule { rule: *rule }.fail()?;
BlockedByRule {
rule: *rule,
op: plan.plan_type(),
}
.fail()?;
}

Ok(())
Expand Down
50 changes: 35 additions & 15 deletions src/proxy/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ use crate::{

type WriteResponseFutures<'a> = Vec<BoxFuture<'a, runtime::Result<Result<WriteResponse>>>>;

struct PlanWithTable {
plan: Plan,
table: TableRef,
}

#[derive(Debug)]
pub struct WriteContext {
pub request_id: RequestId,
Expand Down Expand Up @@ -501,19 +506,34 @@ impl Proxy {
auto_create_table: self.auto_create_table,
};

let plan_vec = self
let plans = self
.write_request_to_insert_plan(req.table_requests, write_context)
.await?;

let mut success = 0;
for insert_plan in plan_vec {
let table = insert_plan.table.clone();

// TODO: concurrently run the insert plan here
for plan_with_table in plans {
let PlanWithTable { plan, table } = plan_with_table;

// check limit first
// TODO: if one table is blocked, maybe should not lead to failure of whole
// batch?
self.instance
.limiter
.try_limit(&plan)
.box_err()
.context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "table is blocked",
})?;

match self
.execute_insert_plan(
request_id.clone(),
catalog_name,
&schema_name,
insert_plan,
plan,
deadline,
)
.await
Expand Down Expand Up @@ -544,8 +564,8 @@ impl Proxy {
&self,
table_requests: Vec<WriteTableRequest>,
write_context: WriteContext,
) -> Result<Vec<InsertPlan>> {
let mut plan_vec = Vec::with_capacity(table_requests.len());
) -> Result<Vec<PlanWithTable>> {
let mut plans = Vec::with_capacity(table_requests.len());

let WriteContext {
request_id,
Expand Down Expand Up @@ -602,26 +622,26 @@ impl Proxy {
}
Ok(v) => v,
};
plan_vec.push(plan);
let plan = Plan::Insert(plan);
let plan_with_table = PlanWithTable {
plan,
table: table_clone,
};

plans.push(plan_with_table);
}

Ok(plan_vec)
Ok(plans)
}

async fn execute_insert_plan(
&self,
request_id: RequestId,
catalog_name: &str,
schema_name: &str,
insert_plan: InsertPlan,
plan: Plan,
deadline: Option<Instant>,
) -> Result<usize> {
debug!(
"Execute insert plan begin, table:{}, row_num:{}",
insert_plan.table.name(),
insert_plan.rows.num_rows()
);
let plan = Plan::Insert(insert_plan);
let output = self
.execute_plan(request_id, catalog_name, schema_name, plan, deadline)
.await;
Expand Down
Loading