@@ -6,6 +6,7 @@ use common_util::runtime::JoinHandle;
6
6
use log:: { error, info} ;
7
7
use opensrv_mysql:: AsyncMysqlIntermediary ;
8
8
use query_engine:: executor:: Executor as QueryExecutor ;
9
+ use router:: RouterRef ;
9
10
use table_engine:: engine:: EngineRuntimes ;
10
11
use tokio:: sync:: oneshot:: { self , Receiver , Sender } ;
11
12
@@ -17,6 +18,7 @@ use crate::{
17
18
pub struct MysqlService < Q > {
18
19
instance : InstanceRef < Q > ,
19
20
runtimes : Arc < EngineRuntimes > ,
21
+ router : RouterRef ,
20
22
socket_addr : SocketAddr ,
21
23
join_handler : Option < JoinHandle < ( ) > > ,
22
24
tx : Option < Sender < ( ) > > ,
@@ -27,12 +29,14 @@ impl<Q> MysqlService<Q> {
27
29
pub fn new (
28
30
instance : Arc < Instance < Q > > ,
29
31
runtimes : Arc < EngineRuntimes > ,
32
+ router : RouterRef ,
30
33
socket_addr : SocketAddr ,
31
34
timeout : Option < Duration > ,
32
35
) -> MysqlService < Q > {
33
36
Self {
34
37
instance,
35
38
runtimes,
39
+ router,
36
40
socket_addr,
37
41
join_handler : None ,
38
42
tx : None ,
@@ -53,6 +57,7 @@ impl<Q: QueryExecutor + 'static> MysqlService<Q> {
53
57
self . join_handler = Some ( rt. bg_runtime . spawn ( Self :: loop_accept (
54
58
self . instance . clone ( ) ,
55
59
self . runtimes . clone ( ) ,
60
+ self . router . clone ( ) ,
56
61
self . socket_addr ,
57
62
self . timeout ,
58
63
rx,
@@ -69,6 +74,7 @@ impl<Q: QueryExecutor + 'static> MysqlService<Q> {
69
74
async fn loop_accept (
70
75
instance : InstanceRef < Q > ,
71
76
runtimes : Arc < EngineRuntimes > ,
77
+ router : RouterRef ,
72
78
socket_addr : SocketAddr ,
73
79
timeout : Option < Duration > ,
74
80
mut rx : Receiver < ( ) > ,
@@ -90,10 +96,11 @@ impl<Q: QueryExecutor + 'static> MysqlService<Q> {
90
96
} ;
91
97
let instance = instance. clone( ) ;
92
98
let runtimes = runtimes. clone( ) ;
99
+ let router = router. clone( ) ;
93
100
94
101
let rt = runtimes. read_runtime. clone( ) ;
95
102
rt. spawn( AsyncMysqlIntermediary :: run_on(
96
- MysqlWorker :: new( instance, runtimes, timeout) ,
103
+ MysqlWorker :: new( instance, runtimes, router , timeout) ,
97
104
stream,
98
105
) ) ;
99
106
} ,
0 commit comments