@@ -44,6 +44,7 @@ const CERESDB_CONFIG_FILE_0_ENV: &str = "CERESDB_CONFIG_FILE_0";
44
44
const CERESDB_CONFIG_FILE_1_ENV : & str = "CERESDB_CONFIG_FILE_1" ;
45
45
const CLUSTER_CERESDB_STDOUT_FILE_0_ENV : & str = "CLUSTER_CERESDB_STDOUT_FILE_0" ;
46
46
const CLUSTER_CERESDB_STDOUT_FILE_1_ENV : & str = "CLUSTER_CERESDB_STDOUT_FILE_1" ;
47
+ const CLUSTER_CERESDB_HEALTH_CHECK_INTERVAL_SECONDS : usize = 5 ;
47
48
48
49
const CERESDB_SERVER_ADDR : & str = "CERESDB_SERVER_ADDR" ;
49
50
@@ -63,9 +64,10 @@ impl HttpClient {
63
64
}
64
65
}
65
66
67
+ #[ async_trait]
66
68
pub trait Backend {
67
69
fn start ( ) -> Self ;
68
- fn wait_for_ready ( & self ) ;
70
+ async fn wait_for_ready ( & self ) ;
69
71
fn stop ( & mut self ) ;
70
72
}
71
73
@@ -77,6 +79,10 @@ pub struct CeresDBCluster {
77
79
server0 : CeresDBServer ,
78
80
server1 : CeresDBServer ,
79
81
ceresmeta_process : Child ,
82
+
83
+ /// Used in meta health check
84
+ db_client : Arc < dyn DbClient > ,
85
+ meta_stable_check_sql : String ,
80
86
}
81
87
82
88
impl CeresDBServer {
@@ -97,6 +103,7 @@ impl CeresDBServer {
97
103
}
98
104
}
99
105
106
+ #[ async_trait]
100
107
impl Backend for CeresDBServer {
101
108
fn start ( ) -> Self {
102
109
let config = env:: var ( CERESDB_CONFIG_FILE_ENV ) . expect ( "Cannot parse ceresdb config env" ) ;
@@ -105,15 +112,33 @@ impl Backend for CeresDBServer {
105
112
Self :: spawn ( bin, config, stdout)
106
113
}
107
114
108
- fn wait_for_ready ( & self ) {
109
- std :: thread :: sleep ( Duration :: from_secs ( 5 ) ) ;
115
+ async fn wait_for_ready ( & self ) {
116
+ tokio :: time :: sleep ( Duration :: from_secs ( 10 ) ) . await
110
117
}
111
118
112
119
fn stop ( & mut self ) {
113
120
self . server_process . kill ( ) . expect ( "Failed to kill server" ) ;
114
121
}
115
122
}
116
123
124
+ impl CeresDBCluster {
125
+ async fn check_meta_stable ( & self ) -> bool {
126
+ let query_ctx = RpcContext {
127
+ database : Some ( "public" . to_string ( ) ) ,
128
+ timeout : None ,
129
+ } ;
130
+
131
+ let query_req = Request {
132
+ tables : vec ! [ ] ,
133
+ sql : self . meta_stable_check_sql . clone ( ) ,
134
+ } ;
135
+
136
+ let result = self . db_client . sql_query ( & query_ctx, & query_req) . await ;
137
+ result. is_ok ( )
138
+ }
139
+ }
140
+
141
+ #[ async_trait]
117
142
impl Backend for CeresDBCluster {
118
143
fn start ( ) -> Self {
119
144
let ceresmeta_bin =
@@ -149,16 +174,55 @@ impl Backend for CeresDBCluster {
149
174
let server0 = CeresDBServer :: spawn ( ceresdb_bin. clone ( ) , ceresdb_config_0, stdout0) ;
150
175
let server1 = CeresDBServer :: spawn ( ceresdb_bin, ceresdb_config_1, stdout1) ;
151
176
177
+ // Meta stable check context
178
+ let endpoint = env:: var ( SERVER_GRPC_ENDPOINT_ENV ) . unwrap_or_else ( |_| {
179
+ panic ! ( "Cannot read server endpoint from env {SERVER_GRPC_ENDPOINT_ENV:?}" )
180
+ } ) ;
181
+ let db_client = Builder :: new ( endpoint, Mode :: Proxy ) . build ( ) ;
182
+
183
+ let meta_stable_check_sql = format ! (
184
+ r#"CREATE TABLE `stable_check_{}`
185
+ (`name` string TAG, `value` double NOT NULL, `t` timestamp NOT NULL, TIMESTAMP KEY(t))"# ,
186
+ uuid:: Uuid :: new_v4( )
187
+ ) ;
188
+
152
189
Self {
153
190
server0,
154
191
server1,
155
192
ceresmeta_process,
193
+ db_client,
194
+ meta_stable_check_sql,
156
195
}
157
196
}
158
197
159
- fn wait_for_ready ( & self ) {
160
- println ! ( "wait for cluster service ready...\n " ) ;
161
- std:: thread:: sleep ( Duration :: from_secs ( 20 ) ) ;
198
+ async fn wait_for_ready ( & self ) {
199
+ println ! ( "wait for cluster service initialized..." ) ;
200
+ tokio:: time:: sleep ( Duration :: from_secs ( 20_u64 ) ) . await ;
201
+
202
+ println ! ( "wait for cluster service stable begin..." ) ;
203
+ let mut wait_cnt = 0 ;
204
+ let wait_max = 6 ;
205
+ loop {
206
+ if wait_cnt >= wait_max {
207
+ println ! (
208
+ "wait too long for cluster service stable, maybe somethings went wrong..."
209
+ ) ;
210
+ return ;
211
+ }
212
+
213
+ if self . check_meta_stable ( ) . await {
214
+ println ! ( "wait for cluster service stable finished..." ) ;
215
+ return ;
216
+ }
217
+
218
+ wait_cnt += 1 ;
219
+ let has_waited = wait_cnt * CLUSTER_CERESDB_HEALTH_CHECK_INTERVAL_SECONDS ;
220
+ println ! ( "waiting for cluster service stable, has_waited:{has_waited}s" ) ;
221
+ tokio:: time:: sleep ( Duration :: from_secs (
222
+ CLUSTER_CERESDB_HEALTH_CHECK_INTERVAL_SECONDS as u64 ,
223
+ ) )
224
+ . await ;
225
+ }
162
226
}
163
227
164
228
fn stop ( & mut self ) {
@@ -226,9 +290,9 @@ impl<T: Send + Sync> Database for CeresDB<T> {
226
290
}
227
291
228
292
impl < T : Backend > CeresDB < T > {
229
- pub fn create ( ) -> CeresDB < T > {
293
+ pub async fn create ( ) -> CeresDB < T > {
230
294
let backend = T :: start ( ) ;
231
- backend. wait_for_ready ( ) ;
295
+ backend. wait_for_ready ( ) . await ;
232
296
233
297
let endpoint = env:: var ( SERVER_GRPC_ENDPOINT_ENV ) . unwrap_or_else ( |_| {
234
298
panic ! ( "Cannot read server endpoint from env {SERVER_GRPC_ENDPOINT_ENV:?}" )
0 commit comments