Skip to content

Commit d0efc20

Browse files
authored
Merge e717bb7 into f5af256
2 parents f5af256 + e717bb7 commit d0efc20

File tree

9 files changed

+45
-3
lines changed

9 files changed

+45
-3
lines changed

.github/workflows/ci.yml

+4
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,10 @@ jobs:
217217
working-directory: integration_tests
218218
run: |
219219
make run-rust
220+
- name: Run MySQL client tests
221+
working-directory: integration_tests
222+
run: |
223+
make run-mysql
220224
- name: Upload Logs
221225
if: always()
222226
uses: actions/upload-artifact@v3

Makefile

+2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ SHELL = /bin/bash
22

33
DIR=$(shell pwd)
44

5+
.DEFAULT_GOAL := integration-test
6+
57
init:
68
echo "init"
79
echo "Git branch: $GITBRANCH"

integration_tests/Makefile

+3
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,6 @@ run-go:
5555

5656
run-rust:
5757
cd sdk/rust && cargo run
58+
59+
run-mysql:
60+
cd mysql && ./basic.sh

integration_tests/mysql/basic.sh

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
#!/usr/bin/env bash
2+
3+
# This only ensure query by mysql protocol is OK,
4+
# Full SQL test in ensured by sqlness tests.
5+
mysql -h 127.0.0.1 -P 3307 -e 'show tables'
6+
7+
mysql -h 127.0.0.1 -P 3307 -e 'select 1, now()'

server/src/mysql/builder.rs

+12-2
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,14 @@
33
use std::{net::SocketAddr, sync::Arc, time::Duration};
44

55
use query_engine::executor::Executor as QueryExecutor;
6+
use router::RouterRef;
67
use snafu::{OptionExt, ResultExt};
78
use table_engine::engine::EngineRuntimes;
89

910
use crate::{
1011
instance::InstanceRef,
1112
mysql::{
12-
error::{MissingInstance, MissingRuntimes, ParseIpAddr, Result},
13+
error::{MissingInstance, MissingRouter, MissingRuntimes, ParseIpAddr, Result},
1314
service::MysqlService,
1415
},
1516
};
@@ -18,6 +19,7 @@ pub struct Builder<Q> {
1819
config: Config,
1920
runtimes: Option<Arc<EngineRuntimes>>,
2021
instance: Option<InstanceRef<Q>>,
22+
router: Option<RouterRef>,
2123
}
2224

2325
#[derive(Debug)]
@@ -33,6 +35,7 @@ impl<Q> Builder<Q> {
3335
config,
3436
runtimes: None,
3537
instance: None,
38+
router: None,
3639
}
3740
}
3841

@@ -45,18 +48,25 @@ impl<Q> Builder<Q> {
4548
self.instance = Some(instance);
4649
self
4750
}
51+
52+
pub fn router(mut self, router: RouterRef) -> Self {
53+
self.router = Some(router);
54+
self
55+
}
4856
}
4957

5058
impl<Q: QueryExecutor + 'static> Builder<Q> {
5159
pub fn build(self) -> Result<MysqlService<Q>> {
5260
let runtimes = self.runtimes.context(MissingRuntimes)?;
5361
let instance = self.instance.context(MissingInstance)?;
62+
let router = self.router.context(MissingRouter)?;
5463

5564
let addr: SocketAddr = format!("{}:{}", self.config.ip, self.config.port)
5665
.parse()
5766
.context(ParseIpAddr { ip: self.config.ip })?;
5867

59-
let mysql_handler = MysqlService::new(instance, runtimes, addr, self.config.timeout);
68+
let mysql_handler =
69+
MysqlService::new(instance, runtimes, router, addr, self.config.timeout);
6070
Ok(mysql_handler)
6171
}
6272
}

server/src/mysql/error.rs

+3
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ pub enum Error {
1212
#[snafu(display("Missing instance to build service.\nBacktrace:\n{}", backtrace))]
1313
MissingInstance { backtrace: Backtrace },
1414

15+
#[snafu(display("Missing router to build service.\nBacktrace:\n{}", backtrace))]
16+
MissingRouter { backtrace: Backtrace },
17+
1518
#[snafu(display(
1619
"Failed to parse ip addr, ip:{}, err:{}.\nBacktrace:\n{}",
1720
ip,

server/src/mysql/service.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use common_util::runtime::JoinHandle;
66
use log::{error, info};
77
use opensrv_mysql::AsyncMysqlIntermediary;
88
use query_engine::executor::Executor as QueryExecutor;
9+
use router::RouterRef;
910
use table_engine::engine::EngineRuntimes;
1011
use tokio::sync::oneshot::{self, Receiver, Sender};
1112

@@ -17,6 +18,7 @@ use crate::{
1718
pub struct MysqlService<Q> {
1819
instance: InstanceRef<Q>,
1920
runtimes: Arc<EngineRuntimes>,
21+
router: RouterRef,
2022
socket_addr: SocketAddr,
2123
join_handler: Option<JoinHandle<()>>,
2224
tx: Option<Sender<()>>,
@@ -27,12 +29,14 @@ impl<Q> MysqlService<Q> {
2729
pub fn new(
2830
instance: Arc<Instance<Q>>,
2931
runtimes: Arc<EngineRuntimes>,
32+
router: RouterRef,
3033
socket_addr: SocketAddr,
3134
timeout: Option<Duration>,
3235
) -> MysqlService<Q> {
3336
Self {
3437
instance,
3538
runtimes,
39+
router,
3640
socket_addr,
3741
join_handler: None,
3842
tx: None,
@@ -53,6 +57,7 @@ impl<Q: QueryExecutor + 'static> MysqlService<Q> {
5357
self.join_handler = Some(rt.bg_runtime.spawn(Self::loop_accept(
5458
self.instance.clone(),
5559
self.runtimes.clone(),
60+
self.router.clone(),
5661
self.socket_addr,
5762
self.timeout,
5863
rx,
@@ -69,6 +74,7 @@ impl<Q: QueryExecutor + 'static> MysqlService<Q> {
6974
async fn loop_accept(
7075
instance: InstanceRef<Q>,
7176
runtimes: Arc<EngineRuntimes>,
77+
router: RouterRef,
7278
socket_addr: SocketAddr,
7379
timeout: Option<Duration>,
7480
mut rx: Receiver<()>,
@@ -90,10 +96,11 @@ impl<Q: QueryExecutor + 'static> MysqlService<Q> {
9096
};
9197
let instance = instance.clone();
9298
let runtimes = runtimes.clone();
99+
let router = router.clone();
93100

94101
let rt = runtimes.read_runtime.clone();
95102
rt.spawn(AsyncMysqlIntermediary::run_on(
96-
MysqlWorker::new(instance, runtimes, timeout),
103+
MysqlWorker::new(instance, runtimes, router, timeout),
97104
stream,
98105
));
99106
},

server/src/mysql/worker.rs

+5
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use interpreters::interpreter::Output;
66
use log::{error, info};
77
use opensrv_mysql::{AsyncMysqlShim, ErrorKind, QueryResultWriter, StatementMetaWriter};
88
use query_engine::executor::Executor as QueryExecutor;
9+
use router::RouterRef;
910
use snafu::ResultExt;
1011
use table_engine::engine::EngineRuntimes;
1112

@@ -26,6 +27,7 @@ pub struct MysqlWorker<W: std::io::Write + Send + Sync, Q> {
2627
generic_hold: PhantomData<W>,
2728
instance: Arc<Instance<Q>>,
2829
runtimes: Arc<EngineRuntimes>,
30+
router: RouterRef,
2931
timeout: Option<Duration>,
3032
}
3133

@@ -37,12 +39,14 @@ where
3739
pub fn new(
3840
instance: Arc<Instance<Q>>,
3941
runtimes: Arc<EngineRuntimes>,
42+
router: RouterRef,
4043
timeout: Option<Duration>,
4144
) -> Self {
4245
Self {
4346
generic_hold: PhantomData::default(),
4447
instance,
4548
runtimes,
49+
router,
4650
timeout,
4751
}
4852
}
@@ -144,6 +148,7 @@ where
144148
.runtime(runtime)
145149
.enable_partition_table_access(false)
146150
.timeout(self.timeout)
151+
.router(self.router.clone())
147152
.build()
148153
.context(CreateContext)
149154
}

server/src/server.rs

+1
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
354354
let mysql_service = mysql::Builder::new(mysql_config)
355355
.runtimes(engine_runtimes.clone())
356356
.instance(instance.clone())
357+
.router(router.clone())
357358
.build()
358359
.context(BuildMysqlService)?;
359360

0 commit comments

Comments
 (0)