Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use notifier::RequestNotifiers instead of dedup_requests::RequestNotifiers #1249

Merged
merged 2 commits into from
Oct 20, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
refactor dedup request
baojinri committed Oct 18, 2023
commit f1f8f9f95db5c8e2e7a69fe4744f682151d459b0
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions proxy/Cargo.toml
Original file line number Diff line number Diff line change
@@ -47,6 +47,7 @@ lazy_static = { workspace = true }
logger = { workspace = true }
macros = { workspace = true }
meta_client = { workspace = true }
notifier = { workspace = true }
paste = { workspace = true }
prom-remote-api = { workspace = true, features = ["warp"] }
prometheus = { workspace = true }
123 changes: 0 additions & 123 deletions proxy/src/dedup_requests.rs

This file was deleted.

9 changes: 3 additions & 6 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
@@ -18,7 +18,6 @@
#![feature(trait_alias)]

pub mod context;
pub mod dedup_requests;
pub mod error;
mod error_util;
pub mod forward;
@@ -82,16 +81,14 @@ use table_engine::{
PARTITION_TABLE_ENGINE_TYPE,
};
use time_ext::{current_time_millis, parse_duration};
use tokio::sync::mpsc::Sender;
use tonic::{transport::Channel, IntoRequest};

use crate::{
dedup_requests::RequestNotifiers,
error::{ErrNoCause, ErrWithCause, Error, Internal, Result},
forward::{ForwardRequest, ForwardResult, Forwarder, ForwarderRef},
hotspot::HotspotRecorder,
instance::InstanceRef,
read::SqlResponse,
read::ReadRequestNotifiers,
schema_config_provider::SchemaConfigProviderRef,
};

@@ -125,7 +122,7 @@ pub struct Proxy {
engine_runtimes: Arc<EngineRuntimes>,
cluster_with_meta: bool,
sub_table_access_perm: SubTableAccessPerm,
request_notifiers: Option<Arc<RequestNotifiers<String, Sender<Result<SqlResponse>>>>>,
request_notifiers: Option<ReadRequestNotifiers>,
}

impl Proxy {
@@ -142,7 +139,7 @@ impl Proxy {
engine_runtimes: Arc<EngineRuntimes>,
cluster_with_meta: bool,
sub_table_access_perm: SubTableAccessPerm,
request_notifiers: Option<Arc<RequestNotifiers<String, Sender<Result<SqlResponse>>>>>,
request_notifiers: Option<ReadRequestNotifiers>,
) -> Self {
let forwarder = Arc::new(Forwarder::new(
forward_config,
9 changes: 6 additions & 3 deletions proxy/src/read.rs
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ use generic_error::BoxError;
use http::StatusCode;
use interpreters::interpreter::Output;
use logger::{error, info, warn, SlowTimer};
use notifier::notifier::{ExecutionGuard, RequestNotifiers, RequestResult};
use query_frontend::{
frontend,
frontend::{Context as SqlContext, Frontend},
@@ -35,13 +36,15 @@ use tokio::sync::mpsc::{self, Sender};
use tonic::{transport::Channel, IntoRequest};

use crate::{
dedup_requests::{ExecutionGuard, RequestNotifiers, RequestResult},
error::{ErrNoCause, ErrWithCause, Error, Internal, InternalNoCause, Result},
forward::{ForwardRequest, ForwardResult},
metrics::GRPC_HANDLER_COUNTER_VEC,
Context, Proxy,
};

const DEDUP_READ_CHANNEL_LEN: usize = 1;
pub type ReadRequestNotifiers = Arc<RequestNotifiers<String, Sender<Result<SqlResponse>>>>;

pub enum SqlResponse {
Forwarded(SqlQueryResponse),
Local(Output),
@@ -77,10 +80,10 @@ impl Proxy {
ctx: &Context,
schema: &str,
sql: &str,
request_notifiers: Arc<RequestNotifiers<String, Sender<Result<SqlResponse>>>>,
request_notifiers: ReadRequestNotifiers,
enable_partition_table_access: bool,
) -> Result<SqlResponse> {
let (tx, mut rx) = mpsc::channel(1);
let (tx, mut rx) = mpsc::channel(DEDUP_READ_CHANNEL_LEN);
let mut guard = match request_notifiers.insert_notifier(sql.to_string(), tx) {
RequestResult::First => ExecutionGuard::new(|| {
request_notifiers.take_notifiers(&sql.to_string());
13 changes: 5 additions & 8 deletions server/src/server.rs
Original file line number Diff line number Diff line change
@@ -23,9 +23,9 @@ use df_operator::registry::FunctionRegistryRef;
use interpreters::table_manipulator::TableManipulatorRef;
use logger::{info, warn, RuntimeLevel};
use macros::define_result;
use notifier::notifier::RequestNotifiers;
use partition_table_engine::PartitionTableEngine;
use proxy::{
dedup_requests::RequestNotifiers,
hotspot::HotspotRecorder,
instance::{DynamicConfig, Instance, InstanceRef},
limiter::Limiter,
@@ -440,12 +440,6 @@ impl Builder {
timeout: self.server_config.timeout.map(|v| v.0),
};

let request_notifiers = if self.server_config.query_dedup.enable {
Some(Arc::new(RequestNotifiers::default()))
} else {
None
};

let proxy = Arc::new(Proxy::new(
router.clone(),
instance.clone(),
@@ -458,7 +452,10 @@ impl Builder {
engine_runtimes.clone(),
self.cluster.is_some(),
self.server_config.sub_table_access_perm,
request_notifiers,
self.server_config
.query_dedup
.enable
.then(|| Arc::new(RequestNotifiers::default())),
));

let http_service = http::Builder::new(http_config)