@@ -35,7 +35,10 @@ use ceresdbproto::{
35
35
storage:: { arrow_payload, ArrowPayload } ,
36
36
} ;
37
37
use common_types:: { record_batch:: RecordBatch , request_id:: RequestId } ;
38
- use futures:: stream:: { self , BoxStream , FuturesUnordered , StreamExt } ;
38
+ use futures:: {
39
+ stream:: { self , BoxStream , FuturesUnordered , StreamExt } ,
40
+ Future ,
41
+ } ;
39
42
use generic_error:: BoxError ;
40
43
use log:: { error, info} ;
41
44
use notifier:: notifier:: { ExecutionGuard , RequestNotifiers , RequestResult } ;
@@ -97,6 +100,14 @@ impl StreamReadReqKey {
97
100
pub type StreamReadRequestNotifiers =
98
101
Arc < RequestNotifiers < StreamReadReqKey , mpsc:: Sender < Result < RecordBatch > > > > ;
99
102
103
+ #[ derive( Clone , Debug , PartialEq , Eq , Hash ) ]
104
+ pub struct PhysicalPlanKey {
105
+ encoded_plan : Vec < u8 > ,
106
+ }
107
+
108
+ pub type PhysicalPlanNotifiers =
109
+ Arc < RequestNotifiers < PhysicalPlanKey , mpsc:: Sender < Result < RecordBatch > > > > ;
110
+
100
111
/// Stream metric
101
112
trait MetricCollector : ' static + Send + Unpin {
102
113
fn collect ( self ) ;
@@ -221,6 +232,7 @@ macro_rules! record_stream_to_response_stream {
221
232
pub struct QueryDedup {
222
233
pub config : QueryDedupConfig ,
223
234
pub request_notifiers : StreamReadRequestNotifiers ,
235
+ pub physical_plan_notifiers : PhysicalPlanNotifiers ,
224
236
}
225
237
226
238
#[ derive( Clone ) ]
@@ -286,7 +298,6 @@ impl RemoteEngineServiceImpl {
286
298
request : Request < ReadRequest > ,
287
299
) -> Result < StreamWithMetric < StreamReadMetricCollector > > {
288
300
let metric = StreamReadMetricCollector ( Instant :: now ( ) ) ;
289
- let ( tx, rx) = mpsc:: channel ( query_dedup. config . notify_queue_cap ) ;
290
301
291
302
let request = request. into_inner ( ) ;
292
303
let table_engine:: remote:: model:: ReadRequest {
@@ -303,14 +314,25 @@ impl RemoteEngineServiceImpl {
303
314
read_request. projected_schema . projection ( ) ,
304
315
) ;
305
316
306
- match query_dedup
307
- . request_notifiers
308
- . insert_notifier ( request_key. clone ( ) , tx)
309
- {
317
+ let QueryDedup {
318
+ config,
319
+ request_notifiers,
320
+ ..
321
+ } = query_dedup;
322
+
323
+ let ( tx, rx) = mpsc:: channel ( config. notify_queue_cap ) ;
324
+ match request_notifiers. insert_notifier ( request_key. clone ( ) , tx) {
310
325
// The first request, need to handle it, and then notify the other requests.
311
326
RequestResult :: First => {
312
- self . read_and_send_dedupped_resps ( request, request_key, query_dedup)
313
- . await ?;
327
+ let ctx = self . handler_ctx ( ) ;
328
+ let query = async move { handle_stream_read ( ctx, request) . await } ;
329
+ self . read_and_send_dedupped_resps (
330
+ request_key,
331
+ query,
332
+ request_notifiers. clone ( ) ,
333
+ config. notify_timeout . 0 ,
334
+ )
335
+ . await ?;
314
336
}
315
337
// The request is waiting for the result of first request.
316
338
RequestResult :: Wait => {
@@ -325,22 +347,22 @@ impl RemoteEngineServiceImpl {
325
347
) )
326
348
}
327
349
328
- async fn read_and_send_dedupped_resps (
350
+ async fn read_and_send_dedupped_resps < K , F > (
329
351
& self ,
330
- request : ReadRequest ,
331
- request_key : StreamReadReqKey ,
332
- query_dedup : QueryDedup ,
333
- ) -> Result < ( ) > {
334
- let ctx = self . handler_ctx ( ) ;
335
-
352
+ request_key : K ,
353
+ query : F ,
354
+ notifiers : Arc < RequestNotifiers < K , mpsc:: Sender < Result < RecordBatch > > > > ,
355
+ notify_timeout : Duration ,
356
+ ) -> Result < ( ) >
357
+ where
358
+ K : Hash + PartialEq + Eq ,
359
+ F : Future < Output = Result < PartitionedStreams > > + Send + ' static ,
360
+ {
336
361
// This is used to remove key when future is cancelled.
337
362
let mut guard = ExecutionGuard :: new ( || {
338
- query_dedup . request_notifiers . take_notifiers ( & request_key) ;
363
+ notifiers . take_notifiers ( & request_key) ;
339
364
} ) ;
340
- let handle = self
341
- . runtimes
342
- . read_runtime
343
- . spawn ( async move { handle_stream_read ( ctx, request) . await } ) ;
365
+ let handle = self . runtimes . read_runtime . spawn ( query) ;
344
366
let streams = handle. await . box_err ( ) . context ( ErrWithCause {
345
367
code : StatusCode :: Internal ,
346
368
msg : "fail to join task" ,
@@ -379,15 +401,11 @@ impl RemoteEngineServiceImpl {
379
401
380
402
// We should set cancel to guard, otherwise the key will be removed twice.
381
403
guard. cancel ( ) ;
382
- let notifiers = query_dedup
383
- . request_notifiers
384
- . take_notifiers ( & request_key)
385
- . unwrap ( ) ;
404
+ let notifiers = notifiers. take_notifiers ( & request_key) . unwrap ( ) ;
386
405
387
406
// Do send in background to avoid blocking the rpc procedure.
388
- let timeout = query_dedup. config . notify_timeout . 0 ;
389
407
self . runtimes . read_runtime . spawn ( async move {
390
- Self :: send_dedupped_resps ( resps, notifiers, timeout ) . await ;
408
+ Self :: send_dedupped_resps ( resps, notifiers, notify_timeout ) . await ;
391
409
} ) ;
392
410
393
411
Ok ( ( ) )
@@ -564,11 +582,12 @@ impl RemoteEngineServiceImpl {
564
582
let metric = ExecutePlanMetricCollect ( Instant :: now ( ) ) ;
565
583
let request = request. into_inner ( ) ;
566
584
let query_engine = self . instance . query_engine . clone ( ) ;
585
+ let ( ctx, encoded_plan) = extract_plan_from_req ( request) ?;
567
586
568
587
let stream = self
569
588
. runtimes
570
589
. read_runtime
571
- . spawn ( async move { handle_execute_plan ( request , query_engine) . await } )
590
+ . spawn ( async move { handle_execute_plan ( ctx , encoded_plan , query_engine) . await } )
572
591
. await
573
592
. box_err ( )
574
593
. with_context ( || ErrWithCause {
@@ -585,6 +604,56 @@ impl RemoteEngineServiceImpl {
585
604
Ok ( StreamWithMetric :: new ( Box :: pin ( stream) , metric) )
586
605
}
587
606
607
+ async fn dedup_execute_physical_plan_internal (
608
+ & self ,
609
+ query_dedup : QueryDedup ,
610
+ request : Request < ExecutePlanRequest > ,
611
+ ) -> Result < StreamWithMetric < ExecutePlanMetricCollect > > {
612
+ let metric = ExecutePlanMetricCollect ( Instant :: now ( ) ) ;
613
+ let request = request. into_inner ( ) ;
614
+
615
+ let query_engine = self . instance . query_engine . clone ( ) ;
616
+ let ( ctx, encoded_plan) = extract_plan_from_req ( request) ?;
617
+ let key = PhysicalPlanKey {
618
+ encoded_plan : encoded_plan. clone ( ) ,
619
+ } ;
620
+
621
+ let QueryDedup {
622
+ config,
623
+ physical_plan_notifiers,
624
+ ..
625
+ } = query_dedup;
626
+
627
+ let ( tx, rx) = mpsc:: channel ( config. notify_queue_cap ) ;
628
+ match physical_plan_notifiers. insert_notifier ( key. clone ( ) , tx) {
629
+ // The first request, need to handle it, and then notify the other requests.
630
+ RequestResult :: First => {
631
+ let query = async move {
632
+ handle_execute_plan ( ctx, encoded_plan, query_engine)
633
+ . await
634
+ . map ( PartitionedStreams :: one_stream)
635
+ } ;
636
+ self . read_and_send_dedupped_resps (
637
+ key,
638
+ query,
639
+ physical_plan_notifiers,
640
+ config. notify_timeout . 0 ,
641
+ )
642
+ . await ?;
643
+ }
644
+ // The request is waiting for the result of first request.
645
+ RequestResult :: Wait => {
646
+ // TODO: add metrics to collect the time cost of waited stream
647
+ // read.
648
+ }
649
+ }
650
+
651
+ Ok ( StreamWithMetric :: new (
652
+ Box :: pin ( ReceiverStream :: new ( rx) ) ,
653
+ metric,
654
+ ) )
655
+ }
656
+
588
657
fn handler_ctx ( & self ) -> HandlerContext {
589
658
HandlerContext {
590
659
catalog_manager : self . instance . catalog_manager . clone ( ) ,
@@ -652,17 +721,13 @@ impl RemoteEngineService for RemoteEngineServiceImpl {
652
721
& self ,
653
722
request : Request < ExecutePlanRequest > ,
654
723
) -> std:: result:: Result < Response < Self :: ExecutePhysicalPlanStream > , Status > {
655
- let record_stream_result =
656
- self . execute_physical_plan_internal ( request)
657
- . await
658
- . map ( |stream| {
659
- stream. map ( |batch_result| {
660
- batch_result. box_err ( ) . with_context ( || ErrWithCause {
661
- code : StatusCode :: Internal ,
662
- msg : "failed to poll record batch" ,
663
- } )
664
- } )
665
- } ) ;
724
+ let record_stream_result = match self . query_dedup . clone ( ) {
725
+ Some ( query_dedup) => {
726
+ self . dedup_execute_physical_plan_internal ( query_dedup, request)
727
+ . await
728
+ }
729
+ None => self . execute_physical_plan_internal ( request) . await ,
730
+ } ;
666
731
667
732
record_stream_to_response_stream ! ( record_stream_result, ExecutePhysicalPlanStream )
668
733
}
@@ -870,22 +935,33 @@ async fn handle_get_table_info(
870
935
} )
871
936
}
872
937
873
- async fn handle_execute_plan (
874
- request : ExecutePlanRequest ,
875
- query_engine : QueryEngineRef ,
876
- ) -> Result < SendableRecordBatchStream > {
938
+ fn extract_plan_from_req ( request : ExecutePlanRequest ) -> Result < ( ExecContext , Vec < u8 > ) > {
877
939
// Build execution context.
878
940
let ctx_in_req = request. context . with_context ( || ErrNoCause {
879
941
code : StatusCode :: Internal ,
880
942
msg : "execution context not found in physical plan request" ,
881
943
} ) ?;
944
+ let typed_plan_in_req = request. physical_plan . with_context ( || ErrNoCause {
945
+ code : StatusCode :: Internal ,
946
+ msg : "plan not found in physical plan request" ,
947
+ } ) ?;
948
+ // FIXME: return the type from query engine.
949
+ let valid_plan = check_and_extract_plan ( typed_plan_in_req, QueryEngineType :: Datafusion ) ?;
950
+
951
+ Ok ( ( ctx_in_req, valid_plan) )
952
+ }
882
953
954
+ async fn handle_execute_plan (
955
+ ctx : ExecContext ,
956
+ encoded_plan : Vec < u8 > ,
957
+ query_engine : QueryEngineRef ,
958
+ ) -> Result < SendableRecordBatchStream > {
883
959
let ExecContext {
884
960
request_id,
885
961
default_catalog,
886
962
default_schema,
887
963
timeout_ms,
888
- } = ctx_in_req ;
964
+ } = ctx ;
889
965
890
966
let request_id = RequestId :: from ( request_id) ;
891
967
let deadline = if timeout_ms >= 0 {
@@ -901,16 +977,9 @@ async fn handle_execute_plan(
901
977
default_schema,
902
978
} ;
903
979
904
- // Build physical plan.
905
- let typed_plan_in_req = request. physical_plan . with_context ( || ErrNoCause {
906
- code : StatusCode :: Internal ,
907
- msg : "plan not found in physical plan request" ,
908
- } ) ?;
909
- // FIXME: return the type from query engine.
910
- let valid_plan = check_and_extract_plan ( typed_plan_in_req, QueryEngineType :: Datafusion ) ?;
911
980
// TODO: Build remote plan in physical planner.
912
981
let physical_plan = Box :: new ( DataFusionPhysicalPlanAdapter :: new ( TypedPlan :: Remote (
913
- valid_plan ,
982
+ encoded_plan ,
914
983
) ) ) ;
915
984
916
985
// Execute plan.
0 commit comments