Skip to content

Commit 0842a25

Browse files
committed
refactor dedup request
1 parent 8e8304f commit 0842a25

File tree

6 files changed

+16
-140
lines changed

6 files changed

+16
-140
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proxy/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ lazy_static = { workspace = true }
4747
log = { workspace = true }
4848
macros = { workspace = true }
4949
meta_client = { workspace = true }
50+
notifier = { workspace = true }
5051
paste = { workspace = true }
5152
prom-remote-api = { workspace = true, features = ["warp"] }
5253
prometheus = { workspace = true }

proxy/src/dedup_requests.rs

-123
This file was deleted.

proxy/src/lib.rs

+3-6
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
#![feature(trait_alias)]
1919

2020
pub mod context;
21-
pub mod dedup_requests;
2221
pub mod error;
2322
mod error_util;
2423
pub mod forward;
@@ -82,16 +81,14 @@ use table_engine::{
8281
PARTITION_TABLE_ENGINE_TYPE,
8382
};
8483
use time_ext::{current_time_millis, parse_duration};
85-
use tokio::sync::mpsc::Sender;
8684
use tonic::{transport::Channel, IntoRequest};
8785

8886
use crate::{
89-
dedup_requests::RequestNotifiers,
9087
error::{ErrNoCause, ErrWithCause, Error, Internal, Result},
9188
forward::{ForwardRequest, ForwardResult, Forwarder, ForwarderRef},
9289
hotspot::HotspotRecorder,
9390
instance::InstanceRef,
94-
read::SqlResponse,
91+
read::ReadRequestNotifiers,
9592
schema_config_provider::SchemaConfigProviderRef,
9693
};
9794

@@ -125,7 +122,7 @@ pub struct Proxy {
125122
engine_runtimes: Arc<EngineRuntimes>,
126123
cluster_with_meta: bool,
127124
sub_table_access_perm: SubTableAccessPerm,
128-
request_notifiers: Option<Arc<RequestNotifiers<String, Sender<Result<SqlResponse>>>>>,
125+
request_notifiers: Option<ReadRequestNotifiers>,
129126
}
130127

131128
impl Proxy {
@@ -142,7 +139,7 @@ impl Proxy {
142139
engine_runtimes: Arc<EngineRuntimes>,
143140
cluster_with_meta: bool,
144141
sub_table_access_perm: SubTableAccessPerm,
145-
request_notifiers: Option<Arc<RequestNotifiers<String, Sender<Result<SqlResponse>>>>>,
142+
request_notifiers: Option<ReadRequestNotifiers>,
146143
) -> Self {
147144
let forwarder = Arc::new(Forwarder::new(
148145
forward_config,

proxy/src/read.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use generic_error::BoxError;
2424
use http::StatusCode;
2525
use interpreters::interpreter::Output;
2626
use log::{error, info, warn};
27+
use notifier::notifier::{ExecutionGuard, RequestNotifiers, RequestResult};
2728
use query_frontend::{
2829
frontend,
2930
frontend::{Context as SqlContext, Frontend},
@@ -36,13 +37,15 @@ use tokio::sync::mpsc::{self, Sender};
3637
use tonic::{transport::Channel, IntoRequest};
3738

3839
use crate::{
39-
dedup_requests::{ExecutionGuard, RequestNotifiers, RequestResult},
4040
error::{ErrNoCause, ErrWithCause, Error, Internal, InternalNoCause, Result},
4141
forward::{ForwardRequest, ForwardResult},
4242
metrics::GRPC_HANDLER_COUNTER_VEC,
4343
Context, Proxy,
4444
};
4545

46+
const DEDUP_READ_CHANNEL_LEN: usize = 1;
47+
pub type ReadRequestNotifiers = Arc<RequestNotifiers<String, Sender<Result<SqlResponse>>>>;
48+
4649
pub enum SqlResponse {
4750
Forwarded(SqlQueryResponse),
4851
Local(Output),
@@ -77,10 +80,10 @@ impl Proxy {
7780
ctx: &Context,
7881
schema: &str,
7982
sql: &str,
80-
request_notifiers: Arc<RequestNotifiers<String, Sender<Result<SqlResponse>>>>,
83+
request_notifiers: ReadRequestNotifiers,
8184
enable_partition_table_access: bool,
8285
) -> Result<SqlResponse> {
83-
let (tx, mut rx) = mpsc::channel(1);
86+
let (tx, mut rx) = mpsc::channel(DEDUP_READ_CHANNEL_LEN);
8487
let mut guard = match request_notifiers.insert_notifier(sql.to_string(), tx) {
8588
RequestResult::First => ExecutionGuard::new(|| {
8689
request_notifiers.take_notifiers(&sql.to_string());

server/src/server.rs

+5-8
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ use interpreters::table_manipulator::TableManipulatorRef;
2525
use log::{info, warn};
2626
use logger::RuntimeLevel;
2727
use macros::define_result;
28+
use notifier::notifier::RequestNotifiers;
2829
use partition_table_engine::PartitionTableEngine;
2930
use proxy::{
30-
dedup_requests::RequestNotifiers,
3131
hotspot::HotspotRecorder,
3232
instance::{DynamicConfig, Instance, InstanceRef},
3333
limiter::Limiter,
@@ -441,12 +441,6 @@ impl Builder {
441441
timeout: self.server_config.timeout.map(|v| v.0),
442442
};
443443

444-
let request_notifiers = if self.server_config.query_dedup.enable {
445-
Some(Arc::new(RequestNotifiers::default()))
446-
} else {
447-
None
448-
};
449-
450444
let proxy = Arc::new(Proxy::new(
451445
router.clone(),
452446
instance.clone(),
@@ -459,7 +453,10 @@ impl Builder {
459453
engine_runtimes.clone(),
460454
self.cluster.is_some(),
461455
self.server_config.sub_table_access_perm,
462-
request_notifiers,
456+
self.server_config
457+
.query_dedup
458+
.enable
459+
.then(|| Arc::new(RequestNotifiers::default())),
463460
));
464461

465462
let http_service = http::Builder::new(http_config)

0 commit comments

Comments
 (0)