From d7ac3f1a51b09c86814138b475357ff9fe08192e Mon Sep 17 00:00:00 2001 From: baojinri Date: Mon, 24 Jul 2023 15:41:10 +0800 Subject: [PATCH 01/10] dedup request --- common_types/src/column.rs | 18 ++-- common_types/src/projected_schema.rs | 8 ++ common_types/src/record_batch.rs | 4 +- server/src/grpc/mod.rs | 9 +- .../remote_engine_service/dedup_request.rs | 71 ++++++++++++++ server/src/grpc/remote_engine_service/mod.rs | 97 ++++++++++++++++--- table_engine/src/predicate.rs | 2 +- table_engine/src/table.rs | 2 +- 8 files changed, 183 insertions(+), 28 deletions(-) create mode 100644 server/src/grpc/remote_engine_service/dedup_request.rs diff --git a/common_types/src/column.rs b/common_types/src/column.rs index 7e9a038e2b..246794a5f4 100644 --- a/common_types/src/column.rs +++ b/common_types/src/column.rs @@ -87,7 +87,7 @@ pub enum Error { pub type Result = std::result::Result; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct NullColumn(NullArray); impl NullColumn { @@ -109,7 +109,7 @@ impl NullColumn { macro_rules! define_numeric_column { ($($Kind: ident), *) => { $(paste! { - #[derive(Debug)] + #[derive(Debug, Clone)] pub struct [<$Kind Column>]([<$Kind Array>]); #[inline] @@ -131,24 +131,24 @@ define_numeric_column!( Float, Double, UInt64, UInt32, UInt16, UInt8, Int64, Int32, Int16, Int8, Boolean ); -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct TimestampColumn(TimestampMillisecondArray); -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct VarbinaryColumn(BinaryArray); -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct StringColumn(StringArray); /// dictionary encode type is difference from other types, need implement /// without macro -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct StringDictionaryColumn(DictionaryArray); -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct DateColumn(DateArray); -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct TimeColumn(TimeArray); #[inline] @@ -749,7 +749,7 @@ impl_column_block!( macro_rules! define_column_block { ($($Kind: ident), *) => { paste! { - #[derive(Debug)] + #[derive(Debug, Clone)] pub enum ColumnBlock { Null(NullColumn), StringDictionary(StringDictionaryColumn), diff --git a/common_types/src/projected_schema.rs b/common_types/src/projected_schema.rs index e50643f481..d7a04470a6 100644 --- a/common_types/src/projected_schema.rs +++ b/common_types/src/projected_schema.rs @@ -134,6 +134,10 @@ impl ProjectedSchema { self.0.is_all_projection() } + pub fn projection(&self) -> Option> { + self.0.projection() + } + /// Returns the [RowProjector] to project the rows with source schema to /// rows with [RecordSchemaWithKey]. /// @@ -260,6 +264,10 @@ impl ProjectedSchemaInner { self.projection.is_none() } + fn projection(&self) -> Option> { + self.projection.clone() + } + // TODO(yingwen): We can fill missing not null column with default value instead // of returning error. fn try_project_with_key(&self, source_schema: &Schema) -> Result { diff --git a/common_types/src/record_batch.rs b/common_types/src/record_batch.rs index fbfacd902b..b46aa06d19 100644 --- a/common_types/src/record_batch.rs +++ b/common_types/src/record_batch.rs @@ -102,7 +102,7 @@ pub enum Error { pub type Result = std::result::Result; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct RecordBatchData { arrow_record_batch: ArrowRecordBatch, column_blocks: Vec, @@ -192,7 +192,7 @@ impl TryFrom for RecordBatchData { // TODO(yingwen): The schema in RecordBatch should be much simple because it may // lack some information. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct RecordBatch { schema: RecordSchema, data: RecordBatchData, diff --git a/server/src/grpc/mod.rs b/server/src/grpc/mod.rs index 9a556132eb..0fc4a9df74 100644 --- a/server/src/grpc/mod.rs +++ b/server/src/grpc/mod.rs @@ -31,11 +31,15 @@ use query_engine::executor::Executor as QueryExecutor; use runtime::{JoinHandle, Runtime}; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::engine::EngineRuntimes; -use tokio::sync::oneshot::{self, Sender}; +use tokio::sync::{ + oneshot::{self, Sender}, + RwLock, +}; use tonic::transport::Server; use crate::grpc::{ - meta_event_service::MetaServiceImpl, remote_engine_service::RemoteEngineServiceImpl, + meta_event_service::MetaServiceImpl, + remote_engine_service::{dedup_request::DedupMap, RemoteEngineServiceImpl}, storage_service::StorageServiceImpl, }; @@ -269,6 +273,7 @@ impl Builder { let service = RemoteEngineServiceImpl { instance, runtimes: runtimes.clone(), + dedup_map: Arc::new(RwLock::new(DedupMap::default())), }; RemoteEngineServiceServer::new(service) }; diff --git a/server/src/grpc/remote_engine_service/dedup_request.rs b/server/src/grpc/remote_engine_service/dedup_request.rs new file mode 100644 index 0000000000..b828bd240d --- /dev/null +++ b/server/src/grpc/remote_engine_service/dedup_request.rs @@ -0,0 +1,71 @@ +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. + +use std::collections::HashMap; + +use common_types::record_batch::RecordBatch; +use table_engine::{predicate::PredicateRef, table::ReadOrder}; +use tokio::sync::mpsc::Sender; + +use crate::grpc::remote_engine_service::Result; + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct RequestKey { + table: String, + predicate: PredicateRef, + projection: Option>, + order: ReadOrder, +} + +impl RequestKey { + pub fn new( + table: String, + predicate: PredicateRef, + projection: Option>, + order: ReadOrder, + ) -> Self { + Self { + table, + predicate, + projection, + order, + } + } +} + +#[derive(Clone, Debug, Default)] +pub struct Notifiers { + txs: Vec>>, +} + +impl Notifiers { + pub fn new(txs: Vec>>) -> Self { + Self { txs } + } + + pub fn get_txs(&self) -> &Vec>> { + &self.txs + } + + pub fn add_tx(&mut self, tx: Sender>) { + self.txs.push(tx); + } +} + +#[derive(Clone, Debug, Default)] +pub struct DedupMap { + inner: HashMap, +} + +impl DedupMap { + pub fn get_notifiers(&self, key: RequestKey) -> Option<&Notifiers> { + self.inner.get(&key) + } + + pub fn add_notifiers(&mut self, key: RequestKey, value: Notifiers) { + self.inner.insert(key, value); + } + + pub fn delete_notifiers(&mut self, key: &RequestKey) { + self.inner.remove(key); + } +} diff --git a/server/src/grpc/remote_engine_service/mod.rs b/server/src/grpc/remote_engine_service/mod.rs index e87b057b6c..d2e0f3bbf1 100644 --- a/server/src/grpc/remote_engine_service/mod.rs +++ b/server/src/grpc/remote_engine_service/mod.rs @@ -27,7 +27,7 @@ use table_engine::{ table::TableRef, }; use time_ext::InstantExt; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, RwLock}; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; @@ -35,9 +35,13 @@ use crate::grpc::{ metrics::{ REMOTE_ENGINE_GRPC_HANDLER_COUNTER_VEC, REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC, }, - remote_engine_service::error::{ErrNoCause, ErrWithCause, Result, StatusCode}, + remote_engine_service::{ + dedup_request::{DedupMap, Notifiers, RequestKey}, + error::{ErrNoCause, ErrWithCause, Result, StatusCode}, + }, }; +pub mod dedup_request; mod error; const STREAM_QUERY_CHANNEL_LEN: usize = 20; @@ -47,6 +51,7 @@ const DEFAULT_COMPRESS_MIN_LENGTH: usize = 80 * 1024; pub struct RemoteEngineServiceImpl { pub instance: InstanceRef, pub runtimes: Arc, + pub dedup_map: Arc>, } impl RemoteEngineServiceImpl { @@ -57,15 +62,59 @@ impl RemoteEngineServiceImpl { let instant = Instant::now(); let ctx = self.handler_ctx(); let (tx, rx) = mpsc::channel(STREAM_QUERY_CHANNEL_LEN); - let handle = self.runtimes.read_runtime.spawn(async move { - let read_request = request.into_inner(); - handle_stream_read(ctx, read_request).await - }); + + let request = request.into_inner(); + let table_engine::remote::model::ReadRequest { + table, + read_request, + } = request.clone().try_into().box_err().context(ErrWithCause { + code: StatusCode::BadRequest, + msg: "fail to convert read request", + })?; + + let request_key = RequestKey::new( + table.table, + read_request.predicate.clone(), + read_request.projected_schema.projection(), + read_request.order, + ); + + { + let mut dedup_map = self.dedup_map.write().await; + match dedup_map.get_notifiers(request_key.clone()) { + Some(notifiers) => { + let txs = notifiers.get_txs(); + let mut notifiers = Notifiers::new(txs.clone()); + notifiers.add_tx(tx); + dedup_map.add_notifiers(request_key.clone(), notifiers); + + REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC + .stream_read + .observe(instant.saturating_elapsed().as_secs_f64()); + return Ok(ReceiverStream::new(rx)); + } + None => { + let mut notifiers = Notifiers::default(); + notifiers.add_tx(tx); + dedup_map.add_notifiers(request_key.clone(), notifiers); + } + } + } + + let handle = self + .runtimes + .read_runtime + .spawn(async move { handle_stream_read(ctx, request).await }); let streams = handle.await.box_err().context(ErrWithCause { code: StatusCode::Internal, msg: "fail to join task", })??; + let mut dedup_map = self.dedup_map.write().await; + let txs = dedup_map + .get_notifiers(request_key.clone()) + .unwrap() + .get_txs(); for stream in streams.streams { let mut stream = stream.map(|result| { result.box_err().context(ErrWithCause { @@ -73,16 +122,37 @@ impl RemoteEngineServiceImpl { msg: "record batch failed", }) }); - let tx = tx.clone(); + + let txs = txs.clone(); self.runtimes.read_runtime.spawn(async move { let mut num_rows = 0; while let Some(batch) = stream.next().await { - if let Ok(record_batch) = &batch { - num_rows += record_batch.num_rows(); - } - if let Err(e) = tx.send(batch).await { - error!("Failed to send handler result, err:{}.", e); - break; + for tx in &txs { + match &batch { + Ok(record_batch) => { + num_rows += record_batch.num_rows(); + if let Err(e) = tx.send(Ok(record_batch.clone())).await { + error!("Failed to send handler result, err:{}.", e); + break; + } + } + // TODO(shuangxiao): error clone + Err(_) => { + if let Err(e) = tx + .send( + ErrNoCause { + code: StatusCode::Internal, + msg: "failed to handler request".to_string(), + } + .fail(), + ) + .await + { + error!("Failed to send handler result, err:{}.", e); + break; + } + } + } } } REMOTE_ENGINE_GRPC_HANDLER_COUNTER_VEC @@ -90,6 +160,7 @@ impl RemoteEngineServiceImpl { .inc_by(num_rows as u64); }); } + dedup_map.delete_notifiers(&request_key); REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC .stream_read diff --git a/table_engine/src/predicate.rs b/table_engine/src/predicate.rs index 1300c93782..8912530556 100644 --- a/table_engine/src/predicate.rs +++ b/table_engine/src/predicate.rs @@ -49,7 +49,7 @@ pub enum Error { define_result!(Error); /// Predicate helps determine whether specific row group should be read. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Predicate { /// Predicates in the query for filter out the columns that meet all the /// exprs. diff --git a/table_engine/src/table.rs b/table_engine/src/table.rs index 482903c25c..d376152f74 100644 --- a/table_engine/src/table.rs +++ b/table_engine/src/table.rs @@ -356,7 +356,7 @@ pub struct GetRequest { pub primary_key: Vec, } -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] pub enum ReadOrder { /// No order requirements from the read request. None = 0, From 0c53df97c5a6aaea376a0dacfe38acb20eac4514 Mon Sep 17 00:00:00 2001 From: baojinri Date: Tue, 25 Jul 2023 15:13:04 +0800 Subject: [PATCH 02/10] modify code --- server/src/grpc/mod.rs | 7 +- .../remote_engine_service/dedup_request.rs | 37 +++-- server/src/grpc/remote_engine_service/mod.rs | 129 +++++++++++------- 3 files changed, 103 insertions(+), 70 deletions(-) diff --git a/server/src/grpc/mod.rs b/server/src/grpc/mod.rs index 0fc4a9df74..3a32dcf854 100644 --- a/server/src/grpc/mod.rs +++ b/server/src/grpc/mod.rs @@ -5,7 +5,7 @@ use std::{ net::{AddrParseError, SocketAddr}, stringify, - sync::Arc, + sync::{Arc, RwLock}, time::Duration, }; @@ -31,10 +31,7 @@ use query_engine::executor::Executor as QueryExecutor; use runtime::{JoinHandle, Runtime}; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::engine::EngineRuntimes; -use tokio::sync::{ - oneshot::{self, Sender}, - RwLock, -}; +use tokio::sync::oneshot::{self, Sender}; use tonic::transport::Server; use crate::grpc::{ diff --git a/server/src/grpc/remote_engine_service/dedup_request.rs b/server/src/grpc/remote_engine_service/dedup_request.rs index b828bd240d..a927d2108d 100644 --- a/server/src/grpc/remote_engine_service/dedup_request.rs +++ b/server/src/grpc/remote_engine_service/dedup_request.rs @@ -1,6 +1,6 @@ // Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. -use std::collections::HashMap; +use std::{collections::HashMap, sync::RwLock}; use common_types::record_batch::RecordBatch; use table_engine::{predicate::PredicateRef, table::ReadOrder}; @@ -32,40 +32,49 @@ impl RequestKey { } } -#[derive(Clone, Debug, Default)] +type Notifier = Sender>; + +#[derive(Debug, Default)] pub struct Notifiers { - txs: Vec>>, + notifiers: RwLock>, } impl Notifiers { - pub fn new(txs: Vec>>) -> Self { - Self { txs } + pub fn new(notifier: Notifier) -> Self { + let notifiers = vec![notifier]; + Self { + notifiers: RwLock::new(notifiers), + } } - pub fn get_txs(&self) -> &Vec>> { - &self.txs + pub fn add_notifier(&self, notifier: Notifier) { + self.notifiers.write().unwrap().push(notifier); } - pub fn add_tx(&mut self, tx: Sender>) { - self.txs.push(tx); + pub fn into_notifiers(self) -> Vec { + self.notifiers.into_inner().unwrap() } } -#[derive(Clone, Debug, Default)] +#[derive(Debug, Default)] pub struct DedupMap { inner: HashMap, } impl DedupMap { - pub fn get_notifiers(&self, key: RequestKey) -> Option<&Notifiers> { - self.inner.get(&key) + pub fn get_notifiers(&self, key: &RequestKey) -> Option<&Notifiers> { + self.inner.get(key) } pub fn add_notifiers(&mut self, key: RequestKey, value: Notifiers) { self.inner.insert(key, value); } - pub fn delete_notifiers(&mut self, key: &RequestKey) { - self.inner.remove(key); + pub fn delete_notifiers(&mut self, key: RequestKey) -> Option { + self.inner.remove(&key) + } + + pub fn contains_key(&self, key: &RequestKey) -> bool { + self.inner.contains_key(key) } } diff --git a/server/src/grpc/remote_engine_service/mod.rs b/server/src/grpc/remote_engine_service/mod.rs index d2e0f3bbf1..3fbca710b2 100644 --- a/server/src/grpc/remote_engine_service/mod.rs +++ b/server/src/grpc/remote_engine_service/mod.rs @@ -2,7 +2,10 @@ // Remote engine rpc service implementation. -use std::{sync::Arc, time::Instant}; +use std::{ + sync::{Arc, RwLock}, + time::Instant, +}; use arrow_ext::ipc::{self, CompressOptions, CompressOutput, CompressionMethod}; use async_trait::async_trait; @@ -16,7 +19,7 @@ use ceresdbproto::{ storage::{arrow_payload, ArrowPayload}, }; use common_types::record_batch::RecordBatch; -use futures::stream::{self, BoxStream, StreamExt}; +use futures::stream::{self, BoxStream, FuturesUnordered, StreamExt}; use generic_error::BoxError; use log::{error, info}; use proxy::instance::InstanceRef; @@ -27,7 +30,7 @@ use table_engine::{ table::TableRef, }; use time_ext::InstantExt; -use tokio::sync::{mpsc, RwLock}; +use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; @@ -80,22 +83,29 @@ impl RemoteEngineServiceImpl { ); { - let mut dedup_map = self.dedup_map.write().await; - match dedup_map.get_notifiers(request_key.clone()) { - Some(notifiers) => { - let txs = notifiers.get_txs(); - let mut notifiers = Notifiers::new(txs.clone()); - notifiers.add_tx(tx); - dedup_map.add_notifiers(request_key.clone(), notifiers); + let dedup_map = self.dedup_map.read().unwrap(); + if dedup_map.contains_key(&request_key) { + let notifiers = dedup_map.get_notifiers(&request_key).unwrap(); + notifiers.add_notifier(tx); + + REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC + .stream_read + .observe(instant.saturating_elapsed().as_secs_f64()); + return Ok(ReceiverStream::new(rx)); + } else { + drop(dedup_map); + + let mut dedup_map = self.dedup_map.write().unwrap(); + if dedup_map.contains_key(&request_key) { + let notifiers = dedup_map.get_notifiers(&request_key).unwrap(); + notifiers.add_notifier(tx); REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC .stream_read .observe(instant.saturating_elapsed().as_secs_f64()); return Ok(ReceiverStream::new(rx)); - } - None => { - let mut notifiers = Notifiers::default(); - notifiers.add_tx(tx); + } else { + let notifiers = Notifiers::new(tx); dedup_map.add_notifiers(request_key.clone(), notifiers); } } @@ -110,11 +120,7 @@ impl RemoteEngineServiceImpl { msg: "fail to join task", })??; - let mut dedup_map = self.dedup_map.write().await; - let txs = dedup_map - .get_notifiers(request_key.clone()) - .unwrap() - .get_txs(); + let mut stream_read = FuturesUnordered::new(); for stream in streams.streams { let mut stream = stream.map(|result| { result.box_err().context(ErrWithCause { @@ -123,44 +129,65 @@ impl RemoteEngineServiceImpl { }) }); - let txs = txs.clone(); - self.runtimes.read_runtime.spawn(async move { - let mut num_rows = 0; + let handle = self.runtimes.read_runtime.spawn(async move { + let mut batches = Vec::new(); while let Some(batch) = stream.next().await { - for tx in &txs { - match &batch { - Ok(record_batch) => { - num_rows += record_batch.num_rows(); - if let Err(e) = tx.send(Ok(record_batch.clone())).await { - error!("Failed to send handler result, err:{}.", e); - break; - } - } - // TODO(shuangxiao): error clone - Err(_) => { - if let Err(e) = tx - .send( - ErrNoCause { - code: StatusCode::Internal, - msg: "failed to handler request".to_string(), - } - .fail(), - ) - .await - { - error!("Failed to send handler result, err:{}.", e); - break; + batches.push(batch) + } + + batches + }); + stream_read.push(handle); + } + + let mut batches = Vec::new(); + while let Some(result) = stream_read.next().await { + let batch = result.box_err().context(ErrWithCause { + code: StatusCode::Internal, + msg: "failed to join task", + })?; + batches.extend(batch); + } + + let notifiers = { + let mut dedup_map = self.dedup_map.write().unwrap(); + let notifiers = dedup_map.delete_notifiers(request_key).unwrap(); + notifiers.into_notifiers() + }; + + let mut num_rows = 0; + for batch in batches { + match batch { + Ok(batch) => { + num_rows += batch.num_rows() * notifiers.len(); + for tx in ¬ifiers { + if let Err(e) = tx.send(Ok(batch.clone())).await { + error!("Failed to send handler result, err:{}.", e); + } + } + } + Err(_) => { + for tx in ¬ifiers { + if let Err(e) = tx + .send( + ErrNoCause { + code: StatusCode::Internal, + msg: "failed to handler request".to_string(), } - } + .fail(), + ) + .await + { + error!("Failed to send handler result, err:{}.", e); } } } - REMOTE_ENGINE_GRPC_HANDLER_COUNTER_VEC - .query_succeeded_row - .inc_by(num_rows as u64); - }); + } } - dedup_map.delete_notifiers(&request_key); + + REMOTE_ENGINE_GRPC_HANDLER_COUNTER_VEC + .query_succeeded_row + .inc_by(num_rows as u64); REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC .stream_read From 430d4edf089f4a647dd1de2c5d6f016488058721 Mon Sep 17 00:00:00 2001 From: baojinri Date: Tue, 25 Jul 2023 20:53:58 +0800 Subject: [PATCH 03/10] modify code for review --- server/src/config.rs | 4 + server/src/grpc/mod.rs | 15 ++- .../remote_engine_service/dedup_request.rs | 80 -------------- .../remote_engine_service/dedup_requests.rs | 100 ++++++++++++++++++ server/src/grpc/remote_engine_service/mod.rs | 67 ++++-------- server/src/server.rs | 1 + 6 files changed, 139 insertions(+), 128 deletions(-) delete mode 100644 server/src/grpc/remote_engine_service/dedup_request.rs create mode 100644 server/src/grpc/remote_engine_service/dedup_requests.rs diff --git a/server/src/config.rs b/server/src/config.rs index 318d9b0f1d..e35d4beec6 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -121,6 +121,9 @@ pub struct ServerConfig { /// Config of remote engine client pub remote_client: remote_engine_client::Config, + + /// Whether to deduplicate requests + pub dedup_requests: bool, } impl Default for ServerConfig { @@ -140,6 +143,7 @@ impl Default for ServerConfig { route_cache: router::RouteCacheConfig::default(), hotspot: hotspot::Config::default(), remote_client: remote_engine_client::Config::default(), + dedup_requests: true, } } } diff --git a/server/src/grpc/mod.rs b/server/src/grpc/mod.rs index 3a32dcf854..83c8ee4e7d 100644 --- a/server/src/grpc/mod.rs +++ b/server/src/grpc/mod.rs @@ -5,7 +5,7 @@ use std::{ net::{AddrParseError, SocketAddr}, stringify, - sync::{Arc, RwLock}, + sync::Arc, time::Duration, }; @@ -36,7 +36,7 @@ use tonic::transport::Server; use crate::grpc::{ meta_event_service::MetaServiceImpl, - remote_engine_service::{dedup_request::DedupMap, RemoteEngineServiceImpl}, + remote_engine_service::{dedup_requests::RequestNotifiers, RemoteEngineServiceImpl}, storage_service::StorageServiceImpl, }; @@ -197,6 +197,7 @@ pub struct Builder { cluster: Option, opened_wals: Option, proxy: Option>>, + request_notifiers: Option>, } impl Builder { @@ -209,6 +210,7 @@ impl Builder { cluster: None, opened_wals: None, proxy: None, + request_notifiers: None, } } @@ -247,6 +249,13 @@ impl Builder { self.proxy = Some(proxy); self } + + pub fn dedup_requests(mut self, dedup_requests: bool) -> Self { + if dedup_requests { + self.request_notifiers = Some(Arc::new(RequestNotifiers::default())); + } + self + } } impl Builder { @@ -270,7 +279,7 @@ impl Builder { let service = RemoteEngineServiceImpl { instance, runtimes: runtimes.clone(), - dedup_map: Arc::new(RwLock::new(DedupMap::default())), + request_notifiers: self.request_notifiers, }; RemoteEngineServiceServer::new(service) }; diff --git a/server/src/grpc/remote_engine_service/dedup_request.rs b/server/src/grpc/remote_engine_service/dedup_request.rs deleted file mode 100644 index a927d2108d..0000000000 --- a/server/src/grpc/remote_engine_service/dedup_request.rs +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. - -use std::{collections::HashMap, sync::RwLock}; - -use common_types::record_batch::RecordBatch; -use table_engine::{predicate::PredicateRef, table::ReadOrder}; -use tokio::sync::mpsc::Sender; - -use crate::grpc::remote_engine_service::Result; - -#[derive(Clone, Debug, PartialEq, Eq, Hash)] -pub struct RequestKey { - table: String, - predicate: PredicateRef, - projection: Option>, - order: ReadOrder, -} - -impl RequestKey { - pub fn new( - table: String, - predicate: PredicateRef, - projection: Option>, - order: ReadOrder, - ) -> Self { - Self { - table, - predicate, - projection, - order, - } - } -} - -type Notifier = Sender>; - -#[derive(Debug, Default)] -pub struct Notifiers { - notifiers: RwLock>, -} - -impl Notifiers { - pub fn new(notifier: Notifier) -> Self { - let notifiers = vec![notifier]; - Self { - notifiers: RwLock::new(notifiers), - } - } - - pub fn add_notifier(&self, notifier: Notifier) { - self.notifiers.write().unwrap().push(notifier); - } - - pub fn into_notifiers(self) -> Vec { - self.notifiers.into_inner().unwrap() - } -} - -#[derive(Debug, Default)] -pub struct DedupMap { - inner: HashMap, -} - -impl DedupMap { - pub fn get_notifiers(&self, key: &RequestKey) -> Option<&Notifiers> { - self.inner.get(key) - } - - pub fn add_notifiers(&mut self, key: RequestKey, value: Notifiers) { - self.inner.insert(key, value); - } - - pub fn delete_notifiers(&mut self, key: RequestKey) -> Option { - self.inner.remove(&key) - } - - pub fn contains_key(&self, key: &RequestKey) -> bool { - self.inner.contains_key(key) - } -} diff --git a/server/src/grpc/remote_engine_service/dedup_requests.rs b/server/src/grpc/remote_engine_service/dedup_requests.rs new file mode 100644 index 0000000000..bccd16fc85 --- /dev/null +++ b/server/src/grpc/remote_engine_service/dedup_requests.rs @@ -0,0 +1,100 @@ +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. + +use std::{collections::HashMap, sync::RwLock}; + +use common_types::record_batch::RecordBatch; +use table_engine::{predicate::PredicateRef, table::ReadOrder}; +use tokio::sync::mpsc::Sender; + +use crate::grpc::remote_engine_service::Result; + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct RequestKey { + table: String, + predicate: PredicateRef, + projection: Option>, + order: ReadOrder, +} + +impl RequestKey { + pub fn new( + table: String, + predicate: PredicateRef, + projection: Option>, + order: ReadOrder, + ) -> Self { + Self { + table, + predicate, + projection, + order, + } + } +} + +type Notifier = Sender>; + +#[derive(Debug, Default)] +struct Notifiers { + notifiers: RwLock>, +} + +impl Notifiers { + pub fn new(notifier: Notifier) -> Self { + let notifiers = vec![notifier]; + Self { + notifiers: RwLock::new(notifiers), + } + } + + pub fn add_notifier(&self, notifier: Notifier) { + self.notifiers.write().unwrap().push(notifier); + } +} + +#[derive(Debug, Default)] +pub struct RequestNotifiers { + inner: RwLock>, +} + +impl RequestNotifiers { + /// Insert a notifier for the given key. + pub fn insert_notifier(&self, key: RequestKey, notifier: Notifier) -> RequestResult { + // First try to read the notifiers, if the key exists, add the notifier to the + // notifiers. + let notifiers = self.inner.read().unwrap(); + if notifiers.contains_key(&key) { + notifiers.get(&key).unwrap().add_notifier(notifier); + return RequestResult::Wait; + } + drop(notifiers); + + // If the key does not exist, try to write the notifiers. + let mut notifiers = self.inner.write().unwrap(); + // double check, if the key exists, add the notifier to the notifiers. + if notifiers.contains_key(&key) { + notifiers.get(&key).unwrap().add_notifier(notifier); + return RequestResult::Wait; + } + + //the key is not existed, insert the key and the notifier. + notifiers.insert(key, Notifiers::new(notifier)); + RequestResult::First + } + + /// Take the notifiers for the given key, and remove the key from the map. + pub fn take_notifiers(&self, key: RequestKey) -> Option> { + self.inner + .write() + .unwrap() + .remove(&key) + .map(|notifiers| notifiers.notifiers.into_inner().unwrap()) + } +} + +pub enum RequestResult { + // The first request for this key, need to handle this request. + First, + // There are other requests for this key, just wait for the result. + Wait, +} diff --git a/server/src/grpc/remote_engine_service/mod.rs b/server/src/grpc/remote_engine_service/mod.rs index 3fbca710b2..cd652c3dd8 100644 --- a/server/src/grpc/remote_engine_service/mod.rs +++ b/server/src/grpc/remote_engine_service/mod.rs @@ -2,10 +2,7 @@ // Remote engine rpc service implementation. -use std::{ - sync::{Arc, RwLock}, - time::Instant, -}; +use std::{sync::Arc, time::Instant}; use arrow_ext::ipc::{self, CompressOptions, CompressOutput, CompressionMethod}; use async_trait::async_trait; @@ -39,12 +36,12 @@ use crate::grpc::{ REMOTE_ENGINE_GRPC_HANDLER_COUNTER_VEC, REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC, }, remote_engine_service::{ - dedup_request::{DedupMap, Notifiers, RequestKey}, + dedup_requests::{RequestKey, RequestNotifiers, RequestResult}, error::{ErrNoCause, ErrWithCause, Result, StatusCode}, }, }; -pub mod dedup_request; +pub mod dedup_requests; mod error; const STREAM_QUERY_CHANNEL_LEN: usize = 20; @@ -54,7 +51,7 @@ const DEFAULT_COMPRESS_MIN_LENGTH: usize = 80 * 1024; pub struct RemoteEngineServiceImpl { pub instance: InstanceRef, pub runtimes: Arc, - pub dedup_map: Arc>, + pub request_notifiers: Option>, } impl RemoteEngineServiceImpl { @@ -82,31 +79,16 @@ impl RemoteEngineServiceImpl { read_request.order, ); - { - let dedup_map = self.dedup_map.read().unwrap(); - if dedup_map.contains_key(&request_key) { - let notifiers = dedup_map.get_notifiers(&request_key).unwrap(); - notifiers.add_notifier(tx); - - REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC - .stream_read - .observe(instant.saturating_elapsed().as_secs_f64()); - return Ok(ReceiverStream::new(rx)); - } else { - drop(dedup_map); - - let mut dedup_map = self.dedup_map.write().unwrap(); - if dedup_map.contains_key(&request_key) { - let notifiers = dedup_map.get_notifiers(&request_key).unwrap(); - notifiers.add_notifier(tx); - + if let Some(request_notifiers) = self.request_notifiers.clone() { + match request_notifiers.insert_notifier(request_key.clone(), tx.clone()) { + // The first request, need to handle it, and then notify the other requests. + RequestResult::First => {} + // The request is waiting for the result of first request. + RequestResult::Wait => { REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC .stream_read .observe(instant.saturating_elapsed().as_secs_f64()); return Ok(ReceiverStream::new(rx)); - } else { - let notifiers = Notifiers::new(tx); - dedup_map.add_notifiers(request_key.clone(), notifiers); } } } @@ -149,10 +131,9 @@ impl RemoteEngineServiceImpl { batches.extend(batch); } - let notifiers = { - let mut dedup_map = self.dedup_map.write().unwrap(); - let notifiers = dedup_map.delete_notifiers(request_key).unwrap(); - notifiers.into_notifiers() + let notifiers = match self.request_notifiers.clone() { + Some(request_notifiers) => request_notifiers.take_notifiers(request_key).unwrap(), + None => vec![tx], }; let mut num_rows = 0; @@ -160,24 +141,20 @@ impl RemoteEngineServiceImpl { match batch { Ok(batch) => { num_rows += batch.num_rows() * notifiers.len(); - for tx in ¬ifiers { - if let Err(e) = tx.send(Ok(batch.clone())).await { + for notifier in ¬ifiers { + if let Err(e) = notifier.send(Ok(batch.clone())).await { error!("Failed to send handler result, err:{}.", e); } } } Err(_) => { - for tx in ¬ifiers { - if let Err(e) = tx - .send( - ErrNoCause { - code: StatusCode::Internal, - msg: "failed to handler request".to_string(), - } - .fail(), - ) - .await - { + for notifier in ¬ifiers { + let err = ErrNoCause { + code: StatusCode::Internal, + msg: "failed to handler request".to_string(), + } + .fail(); + if let Err(e) = notifier.send(err).await { error!("Failed to send handler result, err:{}.", e); } } diff --git a/server/src/server.rs b/server/src/server.rs index 04005bee64..5b9a70734c 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -400,6 +400,7 @@ impl Builder { .opened_wals(opened_wals) .timeout(self.server_config.timeout.map(|v| v.0)) .proxy(proxy) + .dedup_requests(self.server_config.dedup_requests) .build() .context(BuildGrpcService)?; From 332d5905623ed9444efefda8c5401fd4a7f5eeeb Mon Sep 17 00:00:00 2001 From: baojinri Date: Wed, 26 Jul 2023 15:15:39 +0800 Subject: [PATCH 04/10] abstract deduped stream read internal --- server/src/grpc/remote_engine_service/mod.rs | 90 ++++++++++++++++---- 1 file changed, 74 insertions(+), 16 deletions(-) diff --git a/server/src/grpc/remote_engine_service/mod.rs b/server/src/grpc/remote_engine_service/mod.rs index cd652c3dd8..6c6f8a5a74 100644 --- a/server/src/grpc/remote_engine_service/mod.rs +++ b/server/src/grpc/remote_engine_service/mod.rs @@ -62,6 +62,53 @@ impl RemoteEngineServiceImpl { let instant = Instant::now(); let ctx = self.handler_ctx(); let (tx, rx) = mpsc::channel(STREAM_QUERY_CHANNEL_LEN); + let handle = self.runtimes.read_runtime.spawn(async move { + let read_request = request.into_inner(); + handle_stream_read(ctx, read_request).await + }); + let streams = handle.await.box_err().context(ErrWithCause { + code: StatusCode::Internal, + msg: "fail to join task", + })??; + + for stream in streams.streams { + let mut stream = stream.map(|result| { + result.box_err().context(ErrWithCause { + code: StatusCode::Internal, + msg: "record batch failed", + }) + }); + let tx = tx.clone(); + self.runtimes.read_runtime.spawn(async move { + let mut num_rows = 0; + while let Some(batch) = stream.next().await { + if let Ok(record_batch) = &batch { + num_rows += record_batch.num_rows(); + } + if let Err(e) = tx.send(batch).await { + error!("Failed to send handler result, err:{}.", e); + break; + } + } + REMOTE_ENGINE_GRPC_HANDLER_COUNTER_VEC + .query_succeeded_row + .inc_by(num_rows as u64); + }); + } + + REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC + .stream_read + .observe(instant.saturating_elapsed().as_secs_f64()); + Ok(ReceiverStream::new(rx)) + } + + async fn deduped_stream_read_internal( + &self, + request: Request, + ) -> Result>> { + let instant = Instant::now(); + let ctx = self.handler_ctx(); + let (tx, rx) = mpsc::channel(STREAM_QUERY_CHANNEL_LEN); let request = request.into_inner(); let table_engine::remote::model::ReadRequest { @@ -79,17 +126,20 @@ impl RemoteEngineServiceImpl { read_request.order, ); - if let Some(request_notifiers) = self.request_notifiers.clone() { - match request_notifiers.insert_notifier(request_key.clone(), tx.clone()) { - // The first request, need to handle it, and then notify the other requests. - RequestResult::First => {} - // The request is waiting for the result of first request. - RequestResult::Wait => { - REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC - .stream_read - .observe(instant.saturating_elapsed().as_secs_f64()); - return Ok(ReceiverStream::new(rx)); - } + match self + .request_notifiers + .as_ref() + .unwrap() + .insert_notifier(request_key.clone(), tx) + { + // The first request, need to handle it, and then notify the other requests. + RequestResult::First => {} + // The request is waiting for the result of first request. + RequestResult::Wait => { + REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC + .stream_read + .observe(instant.saturating_elapsed().as_secs_f64()); + return Ok(ReceiverStream::new(rx)); } } @@ -131,10 +181,12 @@ impl RemoteEngineServiceImpl { batches.extend(batch); } - let notifiers = match self.request_notifiers.clone() { - Some(request_notifiers) => request_notifiers.take_notifiers(request_key).unwrap(), - None => vec![tx], - }; + let notifiers = self + .request_notifiers + .as_ref() + .unwrap() + .take_notifiers(request_key) + .unwrap(); let mut num_rows = 0; for batch in batches { @@ -311,7 +363,13 @@ impl RemoteEngineService for RemoteEngineServiceImpl request: Request, ) -> std::result::Result, Status> { REMOTE_ENGINE_GRPC_HANDLER_COUNTER_VEC.stream_query.inc(); - match self.stream_read_internal(request).await { + let result = if self.request_notifiers.is_some() { + self.deduped_stream_read_internal(request).await + } else { + self.stream_read_internal(request).await + }; + + match result { Ok(stream) => { let new_stream: Self::ReadStream = Box::pin(stream.map(|res| match res { Ok(record_batch) => { From a9d47dd7ea8c49f1edbf01f6bc604c79416c06d6 Mon Sep 17 00:00:00 2001 From: baojinri Date: Thu, 27 Jul 2023 00:16:53 +0800 Subject: [PATCH 05/10] add guard --- .../remote_engine_service/dedup_requests.rs | 4 +- server/src/grpc/remote_engine_service/mod.rs | 42 ++++++++++++++++++- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/server/src/grpc/remote_engine_service/dedup_requests.rs b/server/src/grpc/remote_engine_service/dedup_requests.rs index bccd16fc85..c39ce6ebb0 100644 --- a/server/src/grpc/remote_engine_service/dedup_requests.rs +++ b/server/src/grpc/remote_engine_service/dedup_requests.rs @@ -83,11 +83,11 @@ impl RequestNotifiers { } /// Take the notifiers for the given key, and remove the key from the map. - pub fn take_notifiers(&self, key: RequestKey) -> Option> { + pub fn take_notifiers(&self, key: &RequestKey) -> Option> { self.inner .write() .unwrap() - .remove(&key) + .remove(key) .map(|notifiers| notifiers.notifiers.into_inner().unwrap()) } } diff --git a/server/src/grpc/remote_engine_service/mod.rs b/server/src/grpc/remote_engine_service/mod.rs index 6c6f8a5a74..7cdca80eb9 100644 --- a/server/src/grpc/remote_engine_service/mod.rs +++ b/server/src/grpc/remote_engine_service/mod.rs @@ -54,6 +54,32 @@ pub struct RemoteEngineServiceImpl { pub request_notifiers: Option>, } +struct RequestNotifierGuard(Option); + +impl Default for RequestNotifierGuard { + fn default() -> Self { + Self(None) + } +} + +impl RequestNotifierGuard { + fn set_none(&mut self) { + self.0 = None; + } + + fn set_some(&mut self, f: F) { + self.0 = Some(f); + } +} + +impl Drop for RequestNotifierGuard { + fn drop(&mut self) { + if let Some(f) = (self.0).take() { + f() + } + } +} + impl RemoteEngineServiceImpl { async fn stream_read_internal( &self, @@ -126,6 +152,7 @@ impl RemoteEngineServiceImpl { read_request.order, ); + let mut _guard = RequestNotifierGuard::default(); match self .request_notifiers .as_ref() @@ -133,7 +160,15 @@ impl RemoteEngineServiceImpl { .insert_notifier(request_key.clone(), tx) { // The first request, need to handle it, and then notify the other requests. - RequestResult::First => {} + RequestResult::First => { + // The _guard is used to remove key when future is cancelled. + _guard.set_some(|| { + self.request_notifiers + .as_ref() + .unwrap() + .take_notifiers(&request_key); + }); + } // The request is waiting for the result of first request. RequestResult::Wait => { REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC @@ -181,11 +216,13 @@ impl RemoteEngineServiceImpl { batches.extend(batch); } + // We should set None to _guard, otherwise the key will be removed twice. + _guard.set_none(); let notifiers = self .request_notifiers .as_ref() .unwrap() - .take_notifiers(request_key) + .take_notifiers(&request_key) .unwrap(); let mut num_rows = 0; @@ -210,6 +247,7 @@ impl RemoteEngineServiceImpl { error!("Failed to send handler result, err:{}.", e); } } + break; } } } From 2c0acaf9e75710c94088890c416dced6e9c45708 Mon Sep 17 00:00:00 2001 From: baojinri Date: Thu, 27 Jul 2023 10:04:34 +0800 Subject: [PATCH 06/10] add generic --- server/src/grpc/mod.rs | 8 +- .../remote_engine_service/dedup_requests.rs | 73 ++++++++----------- server/src/grpc/remote_engine_service/mod.rs | 49 ++++++++++--- 3 files changed, 74 insertions(+), 56 deletions(-) diff --git a/server/src/grpc/mod.rs b/server/src/grpc/mod.rs index 83c8ee4e7d..a864330702 100644 --- a/server/src/grpc/mod.rs +++ b/server/src/grpc/mod.rs @@ -16,7 +16,7 @@ use ceresdbproto::{ storage::storage_service_server::StorageServiceServer, }; use cluster::ClusterRef; -use common_types::column_schema; +use common_types::{column_schema, record_batch::RecordBatch}; use futures::FutureExt; use generic_error::GenericError; use log::{info, warn}; @@ -36,7 +36,9 @@ use tonic::transport::Server; use crate::grpc::{ meta_event_service::MetaServiceImpl, - remote_engine_service::{dedup_requests::RequestNotifiers, RemoteEngineServiceImpl}, + remote_engine_service::{ + dedup_requests::RequestNotifiers, error, RemoteEngineServiceImpl, RequestKey, + }, storage_service::StorageServiceImpl, }; @@ -197,7 +199,7 @@ pub struct Builder { cluster: Option, opened_wals: Option, proxy: Option>>, - request_notifiers: Option>, + request_notifiers: Option>>, } impl Builder { diff --git a/server/src/grpc/remote_engine_service/dedup_requests.rs b/server/src/grpc/remote_engine_service/dedup_requests.rs index c39ce6ebb0..d7e41870dc 100644 --- a/server/src/grpc/remote_engine_service/dedup_requests.rs +++ b/server/src/grpc/remote_engine_service/dedup_requests.rs @@ -1,65 +1,54 @@ // Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. -use std::{collections::HashMap, sync::RwLock}; +use std::{collections::HashMap, hash::Hash, sync::RwLock}; -use common_types::record_batch::RecordBatch; -use table_engine::{predicate::PredicateRef, table::ReadOrder}; use tokio::sync::mpsc::Sender; -use crate::grpc::remote_engine_service::Result; +type Notifier = Sender>; -#[derive(Clone, Debug, PartialEq, Eq, Hash)] -pub struct RequestKey { - table: String, - predicate: PredicateRef, - projection: Option>, - order: ReadOrder, +#[derive(Debug)] +struct Notifiers { + notifiers: RwLock>>, } -impl RequestKey { - pub fn new( - table: String, - predicate: PredicateRef, - projection: Option>, - order: ReadOrder, - ) -> Self { - Self { - table, - predicate, - projection, - order, - } - } -} - -type Notifier = Sender>; - -#[derive(Debug, Default)] -struct Notifiers { - notifiers: RwLock>, -} - -impl Notifiers { - pub fn new(notifier: Notifier) -> Self { +impl Notifiers { + pub fn new(notifier: Notifier) -> Self { let notifiers = vec![notifier]; Self { notifiers: RwLock::new(notifiers), } } - pub fn add_notifier(&self, notifier: Notifier) { + pub fn add_notifier(&self, notifier: Notifier) { self.notifiers.write().unwrap().push(notifier); } } -#[derive(Debug, Default)] -pub struct RequestNotifiers { - inner: RwLock>, +#[derive(Debug)] +pub struct RequestNotifiers +where + K: PartialEq + Eq + Hash, +{ + inner: RwLock>>, +} + +impl Default for RequestNotifiers +where + K: PartialEq + Eq + Hash, +{ + fn default() -> Self { + Self { + inner: RwLock::new(HashMap::new()), + } + } } -impl RequestNotifiers { +impl RequestNotifiers +where + K: PartialEq + Eq + Hash, +{ /// Insert a notifier for the given key. - pub fn insert_notifier(&self, key: RequestKey, notifier: Notifier) -> RequestResult { + pub fn insert_notifier(&self, key: K, notifier: Notifier) -> RequestResult { // First try to read the notifiers, if the key exists, add the notifier to the // notifiers. let notifiers = self.inner.read().unwrap(); @@ -83,7 +72,7 @@ impl RequestNotifiers { } /// Take the notifiers for the given key, and remove the key from the map. - pub fn take_notifiers(&self, key: &RequestKey) -> Option> { + pub fn take_notifiers(&self, key: &K) -> Option>> { self.inner .write() .unwrap() diff --git a/server/src/grpc/remote_engine_service/mod.rs b/server/src/grpc/remote_engine_service/mod.rs index 7cdca80eb9..7e4803aa34 100644 --- a/server/src/grpc/remote_engine_service/mod.rs +++ b/server/src/grpc/remote_engine_service/mod.rs @@ -2,7 +2,7 @@ // Remote engine rpc service implementation. -use std::{sync::Arc, time::Instant}; +use std::{hash::Hash, sync::Arc, time::Instant}; use arrow_ext::ipc::{self, CompressOptions, CompressOutput, CompressionMethod}; use async_trait::async_trait; @@ -23,8 +23,11 @@ use proxy::instance::InstanceRef; use query_engine::executor::Executor as QueryExecutor; use snafu::{OptionExt, ResultExt}; use table_engine::{ - engine::EngineRuntimes, remote::model::TableIdentifier, stream::PartitionedStreams, - table::TableRef, + engine::EngineRuntimes, + predicate::PredicateRef, + remote::model::TableIdentifier, + stream::PartitionedStreams, + table::{ReadOrder, TableRef}, }; use time_ext::InstantExt; use tokio::sync::mpsc; @@ -36,22 +39,39 @@ use crate::grpc::{ REMOTE_ENGINE_GRPC_HANDLER_COUNTER_VEC, REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC, }, remote_engine_service::{ - dedup_requests::{RequestKey, RequestNotifiers, RequestResult}, - error::{ErrNoCause, ErrWithCause, Result, StatusCode}, + dedup_requests::{RequestNotifiers, RequestResult}, + error::{ErrNoCause, ErrWithCause, Error, Result, StatusCode}, }, }; pub mod dedup_requests; -mod error; +pub mod error; const STREAM_QUERY_CHANNEL_LEN: usize = 20; const DEFAULT_COMPRESS_MIN_LENGTH: usize = 80 * 1024; -#[derive(Clone)] -pub struct RemoteEngineServiceImpl { - pub instance: InstanceRef, - pub runtimes: Arc, - pub request_notifiers: Option>, +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct RequestKey { + table: String, + predicate: PredicateRef, + projection: Option>, + order: ReadOrder, +} + +impl RequestKey { + pub fn new( + table: String, + predicate: PredicateRef, + projection: Option>, + order: ReadOrder, + ) -> Self { + Self { + table, + predicate, + projection, + order, + } + } } struct RequestNotifierGuard(Option); @@ -80,6 +100,13 @@ impl Drop for RequestNotifierGuard { } } +#[derive(Clone)] +pub struct RemoteEngineServiceImpl { + pub instance: InstanceRef, + pub runtimes: Arc, + pub request_notifiers: Option>>, +} + impl RemoteEngineServiceImpl { async fn stream_read_internal( &self, From 0e7aa0c9e5f4383fef1168519bf4785d30ec89a8 Mon Sep 17 00:00:00 2001 From: baojinri Date: Thu, 27 Jul 2023 11:11:42 +0800 Subject: [PATCH 07/10] modify code for review --- server/src/config.rs | 4 ++-- .../dedup_requests.rs | 24 +++++++++---------- server/src/grpc/mod.rs | 17 ++++++------- server/src/grpc/remote_engine_service/mod.rs | 18 +++++++------- server/src/lib.rs | 1 + server/src/server.rs | 2 +- 6 files changed, 34 insertions(+), 32 deletions(-) rename server/src/{grpc/remote_engine_service => }/dedup_requests.rs (79%) diff --git a/server/src/config.rs b/server/src/config.rs index e35d4beec6..ea16617e38 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -123,7 +123,7 @@ pub struct ServerConfig { pub remote_client: remote_engine_client::Config, /// Whether to deduplicate requests - pub dedup_requests: bool, + pub enable_query_dedup: bool, } impl Default for ServerConfig { @@ -143,7 +143,7 @@ impl Default for ServerConfig { route_cache: router::RouteCacheConfig::default(), hotspot: hotspot::Config::default(), remote_client: remote_engine_client::Config::default(), - dedup_requests: true, + enable_query_dedup: true, } } } diff --git a/server/src/grpc/remote_engine_service/dedup_requests.rs b/server/src/dedup_requests.rs similarity index 79% rename from server/src/grpc/remote_engine_service/dedup_requests.rs rename to server/src/dedup_requests.rs index d7e41870dc..f9d8baf390 100644 --- a/server/src/grpc/remote_engine_service/dedup_requests.rs +++ b/server/src/dedup_requests.rs @@ -4,35 +4,35 @@ use std::{collections::HashMap, hash::Hash, sync::RwLock}; use tokio::sync::mpsc::Sender; -type Notifier = Sender>; +type Notifier = Sender; #[derive(Debug)] -struct Notifiers { - notifiers: RwLock>>, +struct Notifiers { + notifiers: RwLock>>, } -impl Notifiers { - pub fn new(notifier: Notifier) -> Self { +impl Notifiers { + pub fn new(notifier: Notifier) -> Self { let notifiers = vec![notifier]; Self { notifiers: RwLock::new(notifiers), } } - pub fn add_notifier(&self, notifier: Notifier) { + pub fn add_notifier(&self, notifier: Notifier) { self.notifiers.write().unwrap().push(notifier); } } #[derive(Debug)] -pub struct RequestNotifiers +pub struct RequestNotifiers where K: PartialEq + Eq + Hash, { - inner: RwLock>>, + inner: RwLock>>, } -impl Default for RequestNotifiers +impl Default for RequestNotifiers where K: PartialEq + Eq + Hash, { @@ -43,12 +43,12 @@ where } } -impl RequestNotifiers +impl RequestNotifiers where K: PartialEq + Eq + Hash, { /// Insert a notifier for the given key. - pub fn insert_notifier(&self, key: K, notifier: Notifier) -> RequestResult { + pub fn insert_notifier(&self, key: K, notifier: Notifier) -> RequestResult { // First try to read the notifiers, if the key exists, add the notifier to the // notifiers. let notifiers = self.inner.read().unwrap(); @@ -72,7 +72,7 @@ where } /// Take the notifiers for the given key, and remove the key from the map. - pub fn take_notifiers(&self, key: &K) -> Option>> { + pub fn take_notifiers(&self, key: &K) -> Option>> { self.inner .write() .unwrap() diff --git a/server/src/grpc/mod.rs b/server/src/grpc/mod.rs index a864330702..6f903e416f 100644 --- a/server/src/grpc/mod.rs +++ b/server/src/grpc/mod.rs @@ -34,12 +34,13 @@ use table_engine::engine::EngineRuntimes; use tokio::sync::oneshot::{self, Sender}; use tonic::transport::Server; -use crate::grpc::{ - meta_event_service::MetaServiceImpl, - remote_engine_service::{ - dedup_requests::RequestNotifiers, error, RemoteEngineServiceImpl, RequestKey, +use crate::{ + dedup_requests::RequestNotifiers, + grpc::{ + meta_event_service::MetaServiceImpl, + remote_engine_service::{error, RemoteEngineServiceImpl, RequestKey}, + storage_service::StorageServiceImpl, }, - storage_service::StorageServiceImpl, }; mod meta_event_service; @@ -199,7 +200,7 @@ pub struct Builder { cluster: Option, opened_wals: Option, proxy: Option>>, - request_notifiers: Option>>, + request_notifiers: Option>>>, } impl Builder { @@ -252,8 +253,8 @@ impl Builder { self } - pub fn dedup_requests(mut self, dedup_requests: bool) -> Self { - if dedup_requests { + pub fn request_notifiers(mut self, enable_query_dedup: bool) -> Self { + if enable_query_dedup { self.request_notifiers = Some(Arc::new(RequestNotifiers::default())); } self diff --git a/server/src/grpc/remote_engine_service/mod.rs b/server/src/grpc/remote_engine_service/mod.rs index 7e4803aa34..a9959c729f 100644 --- a/server/src/grpc/remote_engine_service/mod.rs +++ b/server/src/grpc/remote_engine_service/mod.rs @@ -34,17 +34,17 @@ use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; -use crate::grpc::{ - metrics::{ - REMOTE_ENGINE_GRPC_HANDLER_COUNTER_VEC, REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC, - }, - remote_engine_service::{ - dedup_requests::{RequestNotifiers, RequestResult}, - error::{ErrNoCause, ErrWithCause, Error, Result, StatusCode}, +use crate::{ + dedup_requests::{RequestNotifiers, RequestResult}, + grpc::{ + metrics::{ + REMOTE_ENGINE_GRPC_HANDLER_COUNTER_VEC, + REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC, + }, + remote_engine_service::error::{ErrNoCause, ErrWithCause, Result, StatusCode}, }, }; -pub mod dedup_requests; pub mod error; const STREAM_QUERY_CHANNEL_LEN: usize = 20; @@ -104,7 +104,7 @@ impl Drop for RequestNotifierGuard { pub struct RemoteEngineServiceImpl { pub instance: InstanceRef, pub runtimes: Arc, - pub request_notifiers: Option>>, + pub request_notifiers: Option>>>, } impl RemoteEngineServiceImpl { diff --git a/server/src/lib.rs b/server/src/lib.rs index d2d58bf156..2379efc510 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -7,6 +7,7 @@ pub mod config; mod consts; +mod dedup_requests; mod error_util; mod grpc; mod http; diff --git a/server/src/server.rs b/server/src/server.rs index 5b9a70734c..eea66d0d53 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -400,7 +400,7 @@ impl Builder { .opened_wals(opened_wals) .timeout(self.server_config.timeout.map(|v| v.0)) .proxy(proxy) - .dedup_requests(self.server_config.dedup_requests) + .request_notifiers(self.server_config.enable_query_dedup) .build() .context(BuildGrpcService)?; From 17eee71b75e765e6286b9522fdbbc58f94e6c1eb Mon Sep 17 00:00:00 2001 From: baojinri Date: Thu, 27 Jul 2023 15:53:15 +0800 Subject: [PATCH 08/10] modify code --- server/src/grpc/remote_engine_service/mod.rs | 43 ++++++++++---------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/server/src/grpc/remote_engine_service/mod.rs b/server/src/grpc/remote_engine_service/mod.rs index a9959c729f..0f4b9180f0 100644 --- a/server/src/grpc/remote_engine_service/mod.rs +++ b/server/src/grpc/remote_engine_service/mod.rs @@ -74,28 +74,28 @@ impl RequestKey { } } -struct RequestNotifierGuard(Option); - -impl Default for RequestNotifierGuard { - fn default() -> Self { - Self(None) - } +struct ExecutionGuard { + f: F, + cancelled: bool, } -impl RequestNotifierGuard { - fn set_none(&mut self) { - self.0 = None; +impl ExecutionGuard { + fn new(f: F) -> Self { + Self { + f, + cancelled: false, + } } - fn set_some(&mut self, f: F) { - self.0 = Some(f); + fn cancel(&mut self) { + self.cancelled = true; } } -impl Drop for RequestNotifierGuard { +impl Drop for ExecutionGuard { fn drop(&mut self) { - if let Some(f) = (self.0).take() { - f() + if !self.cancelled { + (self.f)() } } } @@ -179,8 +179,7 @@ impl RemoteEngineServiceImpl { read_request.order, ); - let mut _guard = RequestNotifierGuard::default(); - match self + let mut guard = match self .request_notifiers .as_ref() .unwrap() @@ -188,13 +187,13 @@ impl RemoteEngineServiceImpl { { // The first request, need to handle it, and then notify the other requests. RequestResult::First => { - // The _guard is used to remove key when future is cancelled. - _guard.set_some(|| { + // This is used to remove key when future is cancelled. + ExecutionGuard::new(|| { self.request_notifiers .as_ref() .unwrap() .take_notifiers(&request_key); - }); + }) } // The request is waiting for the result of first request. RequestResult::Wait => { @@ -203,7 +202,7 @@ impl RemoteEngineServiceImpl { .observe(instant.saturating_elapsed().as_secs_f64()); return Ok(ReceiverStream::new(rx)); } - } + }; let handle = self .runtimes @@ -243,8 +242,8 @@ impl RemoteEngineServiceImpl { batches.extend(batch); } - // We should set None to _guard, otherwise the key will be removed twice. - _guard.set_none(); + // We should set cancel to guard, otherwise the key will be removed twice. + guard.cancel(); let notifiers = self .request_notifiers .as_ref() From 91a6c0327ebade0aa688ce6e7fa67ce4bdbf94b9 Mon Sep 17 00:00:00 2001 From: baojinri Date: Thu, 27 Jul 2023 17:46:00 +0800 Subject: [PATCH 09/10] modify code --- server/src/grpc/metrics.rs | 1 + server/src/grpc/mod.rs | 4 +- server/src/grpc/remote_engine_service/mod.rs | 46 +++++++++----------- 3 files changed, 23 insertions(+), 28 deletions(-) diff --git a/server/src/grpc/metrics.rs b/server/src/grpc/metrics.rs index 65f44626ee..0016c898d3 100644 --- a/server/src/grpc/metrics.rs +++ b/server/src/grpc/metrics.rs @@ -44,6 +44,7 @@ make_auto_flush_static_metric! { write_succeeded_row, write_failed_row, query_succeeded_row, + dedupped_stream_query, } pub struct RemoteEngineGrpcHandlerCounterVec: LocalIntCounter { diff --git a/server/src/grpc/mod.rs b/server/src/grpc/mod.rs index 6f903e416f..8abd04c0cd 100644 --- a/server/src/grpc/mod.rs +++ b/server/src/grpc/mod.rs @@ -38,7 +38,7 @@ use crate::{ dedup_requests::RequestNotifiers, grpc::{ meta_event_service::MetaServiceImpl, - remote_engine_service::{error, RemoteEngineServiceImpl, RequestKey}, + remote_engine_service::{error, RemoteEngineServiceImpl, StreamReadReqKey}, storage_service::StorageServiceImpl, }, }; @@ -200,7 +200,7 @@ pub struct Builder { cluster: Option, opened_wals: Option, proxy: Option>>, - request_notifiers: Option>>>, + request_notifiers: Option>>>, } impl Builder { diff --git a/server/src/grpc/remote_engine_service/mod.rs b/server/src/grpc/remote_engine_service/mod.rs index 0f4b9180f0..ff8a2ff43d 100644 --- a/server/src/grpc/remote_engine_service/mod.rs +++ b/server/src/grpc/remote_engine_service/mod.rs @@ -51,14 +51,14 @@ const STREAM_QUERY_CHANNEL_LEN: usize = 20; const DEFAULT_COMPRESS_MIN_LENGTH: usize = 80 * 1024; #[derive(Clone, Debug, PartialEq, Eq, Hash)] -pub struct RequestKey { +pub struct StreamReadReqKey { table: String, predicate: PredicateRef, projection: Option>, order: ReadOrder, } -impl RequestKey { +impl StreamReadReqKey { pub fn new( table: String, predicate: PredicateRef, @@ -104,7 +104,7 @@ impl Drop for ExecutionGuard { pub struct RemoteEngineServiceImpl { pub instance: InstanceRef, pub runtimes: Arc, - pub request_notifiers: Option>>>, + pub request_notifiers: Option>>>, } impl RemoteEngineServiceImpl { @@ -157,6 +157,7 @@ impl RemoteEngineServiceImpl { async fn deduped_stream_read_internal( &self, + request_notifiers: Arc>>, request: Request, ) -> Result>> { let instant = Instant::now(); @@ -172,27 +173,19 @@ impl RemoteEngineServiceImpl { msg: "fail to convert read request", })?; - let request_key = RequestKey::new( + let request_key = StreamReadReqKey::new( table.table, read_request.predicate.clone(), read_request.projected_schema.projection(), read_request.order, ); - let mut guard = match self - .request_notifiers - .as_ref() - .unwrap() - .insert_notifier(request_key.clone(), tx) - { + let mut guard = match request_notifiers.insert_notifier(request_key.clone(), tx) { // The first request, need to handle it, and then notify the other requests. RequestResult::First => { // This is used to remove key when future is cancelled. ExecutionGuard::new(|| { - self.request_notifiers - .as_ref() - .unwrap() - .take_notifiers(&request_key); + request_notifiers.take_notifiers(&request_key); }) } // The request is waiting for the result of first request. @@ -244,18 +237,14 @@ impl RemoteEngineServiceImpl { // We should set cancel to guard, otherwise the key will be removed twice. guard.cancel(); - let notifiers = self - .request_notifiers - .as_ref() - .unwrap() - .take_notifiers(&request_key) - .unwrap(); + let notifiers = request_notifiers.take_notifiers(&request_key).unwrap(); + let num_notifiers = notifiers.len(); let mut num_rows = 0; for batch in batches { match batch { Ok(batch) => { - num_rows += batch.num_rows() * notifiers.len(); + num_rows += batch.num_rows() * num_notifiers; for notifier in ¬ifiers { if let Err(e) = notifier.send(Ok(batch.clone())).await { error!("Failed to send handler result, err:{}.", e); @@ -281,10 +270,13 @@ impl RemoteEngineServiceImpl { REMOTE_ENGINE_GRPC_HANDLER_COUNTER_VEC .query_succeeded_row .inc_by(num_rows as u64); - + REMOTE_ENGINE_GRPC_HANDLER_COUNTER_VEC + .dedupped_stream_query + .inc_by((num_notifiers - 1) as u64); REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC .stream_read .observe(instant.saturating_elapsed().as_secs_f64()); + Ok(ReceiverStream::new(rx)) } @@ -427,10 +419,12 @@ impl RemoteEngineService for RemoteEngineServiceImpl request: Request, ) -> std::result::Result, Status> { REMOTE_ENGINE_GRPC_HANDLER_COUNTER_VEC.stream_query.inc(); - let result = if self.request_notifiers.is_some() { - self.deduped_stream_read_internal(request).await - } else { - self.stream_read_internal(request).await + let result = match self.request_notifiers.clone() { + Some(request_notifiers) => { + self.deduped_stream_read_internal(request_notifiers, request) + .await + } + None => self.stream_read_internal(request).await, }; match result { From 7a379ca2c4c9c5753f05b6768ab285dd406a7680 Mon Sep 17 00:00:00 2001 From: baojinri Date: Fri, 28 Jul 2023 15:00:01 +0800 Subject: [PATCH 10/10] modify code for review --- server/src/config.rs | 2 +- server/src/dedup_requests.rs | 22 ++++++++++---------- server/src/grpc/remote_engine_service/mod.rs | 3 +++ 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/server/src/config.rs b/server/src/config.rs index ea16617e38..fb2510cc31 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -143,7 +143,7 @@ impl Default for ServerConfig { route_cache: router::RouteCacheConfig::default(), hotspot: hotspot::Config::default(), remote_client: remote_engine_client::Config::default(), - enable_query_dedup: true, + enable_query_dedup: false, } } } diff --git a/server/src/dedup_requests.rs b/server/src/dedup_requests.rs index f9d8baf390..6a90f29be3 100644 --- a/server/src/dedup_requests.rs +++ b/server/src/dedup_requests.rs @@ -29,7 +29,7 @@ pub struct RequestNotifiers where K: PartialEq + Eq + Hash, { - inner: RwLock>>, + notifiers_by_key: RwLock>>, } impl Default for RequestNotifiers @@ -38,7 +38,7 @@ where { fn default() -> Self { Self { - inner: RwLock::new(HashMap::new()), + notifiers_by_key: RwLock::new(HashMap::new()), } } } @@ -51,29 +51,29 @@ where pub fn insert_notifier(&self, key: K, notifier: Notifier) -> RequestResult { // First try to read the notifiers, if the key exists, add the notifier to the // notifiers. - let notifiers = self.inner.read().unwrap(); - if notifiers.contains_key(&key) { - notifiers.get(&key).unwrap().add_notifier(notifier); + let notifiers_by_key = self.notifiers_by_key.read().unwrap(); + if let Some(notifiers) = notifiers_by_key.get(&key) { + notifiers.add_notifier(notifier); return RequestResult::Wait; } - drop(notifiers); + drop(notifiers_by_key); // If the key does not exist, try to write the notifiers. - let mut notifiers = self.inner.write().unwrap(); + let mut notifiers_by_key = self.notifiers_by_key.write().unwrap(); // double check, if the key exists, add the notifier to the notifiers. - if notifiers.contains_key(&key) { - notifiers.get(&key).unwrap().add_notifier(notifier); + if let Some(notifiers) = notifiers_by_key.get(&key) { + notifiers.add_notifier(notifier); return RequestResult::Wait; } //the key is not existed, insert the key and the notifier. - notifiers.insert(key, Notifiers::new(notifier)); + notifiers_by_key.insert(key, Notifiers::new(notifier)); RequestResult::First } /// Take the notifiers for the given key, and remove the key from the map. pub fn take_notifiers(&self, key: &K) -> Option>> { - self.inner + self.notifiers_by_key .write() .unwrap() .remove(key) diff --git a/server/src/grpc/remote_engine_service/mod.rs b/server/src/grpc/remote_engine_service/mod.rs index ff8a2ff43d..342501801f 100644 --- a/server/src/grpc/remote_engine_service/mod.rs +++ b/server/src/grpc/remote_engine_service/mod.rs @@ -149,6 +149,7 @@ impl RemoteEngineServiceImpl { }); } + // TODO(shuangxiao): this metric is invalid, refactor it. REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC .stream_read .observe(instant.saturating_elapsed().as_secs_f64()); @@ -190,6 +191,7 @@ impl RemoteEngineServiceImpl { } // The request is waiting for the result of first request. RequestResult::Wait => { + // TODO(shuangxiao): this metric is invalid, refactor it. REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC .stream_read .observe(instant.saturating_elapsed().as_secs_f64()); @@ -273,6 +275,7 @@ impl RemoteEngineServiceImpl { REMOTE_ENGINE_GRPC_HANDLER_COUNTER_VEC .dedupped_stream_query .inc_by((num_notifiers - 1) as u64); + // TODO(shuangxiao): this metric is invalid, refactor it. REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC .stream_read .observe(instant.saturating_elapsed().as_secs_f64());