diff --git a/query_engine/src/datafusion_impl/task_context.rs b/query_engine/src/datafusion_impl/task_context.rs index 6da0848667..5bde854bbd 100644 --- a/query_engine/src/datafusion_impl/task_context.rs +++ b/query_engine/src/datafusion_impl/task_context.rs @@ -122,7 +122,7 @@ impl Preprocessor { .await .box_err() .with_context(|| ExecutorWithCause { - msg: format!("failed to preprocess remote plan"), + msg: Some("failed to preprocess remote plan".to_string()), }) } diff --git a/server/src/grpc/mod.rs b/server/src/grpc/mod.rs index cda1a3bddb..5f4a7e1fdf 100644 --- a/server/src/grpc/mod.rs +++ b/server/src/grpc/mod.rs @@ -307,6 +307,7 @@ impl Builder { v.enable.then(|| QueryDedup { config: v.clone(), request_notifiers: Arc::new(RequestNotifiers::default()), + physical_plan_notifiers: Arc::new(RequestNotifiers::default()), }) }) .unwrap_or_default(); diff --git a/server/src/grpc/remote_engine_service/mod.rs b/server/src/grpc/remote_engine_service/mod.rs index 7c500eefe8..cfcca52540 100644 --- a/server/src/grpc/remote_engine_service/mod.rs +++ b/server/src/grpc/remote_engine_service/mod.rs @@ -35,7 +35,10 @@ use ceresdbproto::{ storage::{arrow_payload, ArrowPayload}, }; use common_types::{record_batch::RecordBatch, request_id::RequestId}; -use futures::stream::{self, BoxStream, FuturesUnordered, StreamExt}; +use futures::{ + stream::{self, BoxStream, FuturesUnordered, StreamExt}, + Future, +}; use generic_error::BoxError; use log::{error, info}; use notifier::notifier::{ExecutionGuard, RequestNotifiers, RequestResult}; @@ -97,6 +100,14 @@ impl StreamReadReqKey { pub type StreamReadRequestNotifiers = Arc>>>; +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct PhysicalPlanKey { + encoded_plan: Vec, +} + +pub type PhysicalPlanNotifiers = + Arc>>>; + /// Stream metric trait MetricCollector: 'static + Send + Unpin { fn collect(self); @@ -221,6 +232,7 @@ macro_rules! record_stream_to_response_stream { pub struct QueryDedup { pub config: QueryDedupConfig, pub request_notifiers: StreamReadRequestNotifiers, + pub physical_plan_notifiers: PhysicalPlanNotifiers, } #[derive(Clone)] @@ -286,7 +298,6 @@ impl RemoteEngineServiceImpl { request: Request, ) -> Result> { let metric = StreamReadMetricCollector(Instant::now()); - let (tx, rx) = mpsc::channel(query_dedup.config.notify_queue_cap); let request = request.into_inner(); let table_engine::remote::model::ReadRequest { @@ -303,14 +314,25 @@ impl RemoteEngineServiceImpl { read_request.projected_schema.projection(), ); - match query_dedup - .request_notifiers - .insert_notifier(request_key.clone(), tx) - { + let QueryDedup { + config, + request_notifiers, + .. + } = query_dedup; + + let (tx, rx) = mpsc::channel(config.notify_queue_cap); + match request_notifiers.insert_notifier(request_key.clone(), tx) { // The first request, need to handle it, and then notify the other requests. RequestResult::First => { - self.read_and_send_dedupped_resps(request, request_key, query_dedup) - .await?; + let ctx = self.handler_ctx(); + let query = async move { handle_stream_read(ctx, request).await }; + self.read_and_send_dedupped_resps( + request_key, + query, + request_notifiers.clone(), + config.notify_timeout.0, + ) + .await?; } // The request is waiting for the result of first request. RequestResult::Wait => { @@ -325,22 +347,22 @@ impl RemoteEngineServiceImpl { )) } - async fn read_and_send_dedupped_resps( + async fn read_and_send_dedupped_resps( &self, - request: ReadRequest, - request_key: StreamReadReqKey, - query_dedup: QueryDedup, - ) -> Result<()> { - let ctx = self.handler_ctx(); - + request_key: K, + query: F, + notifiers: Arc>>>, + notify_timeout: Duration, + ) -> Result<()> + where + K: Hash + PartialEq + Eq, + F: Future> + Send + 'static, + { // This is used to remove key when future is cancelled. let mut guard = ExecutionGuard::new(|| { - query_dedup.request_notifiers.take_notifiers(&request_key); + notifiers.take_notifiers(&request_key); }); - let handle = self - .runtimes - .read_runtime - .spawn(async move { handle_stream_read(ctx, request).await }); + let handle = self.runtimes.read_runtime.spawn(query); let streams = handle.await.box_err().context(ErrWithCause { code: StatusCode::Internal, msg: "fail to join task", @@ -379,15 +401,11 @@ impl RemoteEngineServiceImpl { // We should set cancel to guard, otherwise the key will be removed twice. guard.cancel(); - let notifiers = query_dedup - .request_notifiers - .take_notifiers(&request_key) - .unwrap(); + let notifiers = notifiers.take_notifiers(&request_key).unwrap(); // Do send in background to avoid blocking the rpc procedure. - let timeout = query_dedup.config.notify_timeout.0; self.runtimes.read_runtime.spawn(async move { - Self::send_dedupped_resps(resps, notifiers, timeout).await; + Self::send_dedupped_resps(resps, notifiers, notify_timeout).await; }); Ok(()) @@ -564,11 +582,12 @@ impl RemoteEngineServiceImpl { let metric = ExecutePlanMetricCollect(Instant::now()); let request = request.into_inner(); let query_engine = self.instance.query_engine.clone(); + let (ctx, encoded_plan) = extract_plan_from_req(request)?; let stream = self .runtimes .read_runtime - .spawn(async move { handle_execute_plan(request, query_engine).await }) + .spawn(async move { handle_execute_plan(ctx, encoded_plan, query_engine).await }) .await .box_err() .with_context(|| ErrWithCause { @@ -585,6 +604,56 @@ impl RemoteEngineServiceImpl { Ok(StreamWithMetric::new(Box::pin(stream), metric)) } + async fn dedup_execute_physical_plan_internal( + &self, + query_dedup: QueryDedup, + request: Request, + ) -> Result> { + let metric = ExecutePlanMetricCollect(Instant::now()); + let request = request.into_inner(); + + let query_engine = self.instance.query_engine.clone(); + let (ctx, encoded_plan) = extract_plan_from_req(request)?; + let key = PhysicalPlanKey { + encoded_plan: encoded_plan.clone(), + }; + + let QueryDedup { + config, + physical_plan_notifiers, + .. + } = query_dedup; + + let (tx, rx) = mpsc::channel(config.notify_queue_cap); + match physical_plan_notifiers.insert_notifier(key.clone(), tx) { + // The first request, need to handle it, and then notify the other requests. + RequestResult::First => { + let query = async move { + handle_execute_plan(ctx, encoded_plan, query_engine) + .await + .map(PartitionedStreams::one_stream) + }; + self.read_and_send_dedupped_resps( + key, + query, + physical_plan_notifiers, + config.notify_timeout.0, + ) + .await?; + } + // The request is waiting for the result of first request. + RequestResult::Wait => { + // TODO: add metrics to collect the time cost of waited stream + // read. + } + } + + Ok(StreamWithMetric::new( + Box::pin(ReceiverStream::new(rx)), + metric, + )) + } + fn handler_ctx(&self) -> HandlerContext { HandlerContext { catalog_manager: self.instance.catalog_manager.clone(), @@ -652,17 +721,13 @@ impl RemoteEngineService for RemoteEngineServiceImpl { &self, request: Request, ) -> std::result::Result, Status> { - let record_stream_result = - self.execute_physical_plan_internal(request) - .await - .map(|stream| { - stream.map(|batch_result| { - batch_result.box_err().with_context(|| ErrWithCause { - code: StatusCode::Internal, - msg: "failed to poll record batch", - }) - }) - }); + let record_stream_result = match self.query_dedup.clone() { + Some(query_dedup) => { + self.dedup_execute_physical_plan_internal(query_dedup, request) + .await + } + None => self.execute_physical_plan_internal(request).await, + }; record_stream_to_response_stream!(record_stream_result, ExecutePhysicalPlanStream) } @@ -870,22 +935,33 @@ async fn handle_get_table_info( }) } -async fn handle_execute_plan( - request: ExecutePlanRequest, - query_engine: QueryEngineRef, -) -> Result { +fn extract_plan_from_req(request: ExecutePlanRequest) -> Result<(ExecContext, Vec)> { // Build execution context. let ctx_in_req = request.context.with_context(|| ErrNoCause { code: StatusCode::Internal, msg: "execution context not found in physical plan request", })?; + let typed_plan_in_req = request.physical_plan.with_context(|| ErrNoCause { + code: StatusCode::Internal, + msg: "plan not found in physical plan request", + })?; + // FIXME: return the type from query engine. + let valid_plan = check_and_extract_plan(typed_plan_in_req, QueryEngineType::Datafusion)?; + + Ok((ctx_in_req, valid_plan)) +} +async fn handle_execute_plan( + ctx: ExecContext, + encoded_plan: Vec, + query_engine: QueryEngineRef, +) -> Result { let ExecContext { request_id, default_catalog, default_schema, timeout_ms, - } = ctx_in_req; + } = ctx; let request_id = RequestId::from(request_id); let deadline = if timeout_ms >= 0 { @@ -901,16 +977,9 @@ async fn handle_execute_plan( default_schema, }; - // Build physical plan. - let typed_plan_in_req = request.physical_plan.with_context(|| ErrNoCause { - code: StatusCode::Internal, - msg: "plan not found in physical plan request", - })?; - // FIXME: return the type from query engine. - let valid_plan = check_and_extract_plan(typed_plan_in_req, QueryEngineType::Datafusion)?; // TODO: Build remote plan in physical planner. let physical_plan = Box::new(DataFusionPhysicalPlanAdapter::new(TypedPlan::Remote( - valid_plan, + encoded_plan, ))); // Execute plan.