@@ -19,7 +19,7 @@ use std::{
19
19
fs:: File ,
20
20
process:: { Child , Command } ,
21
21
sync:: Arc ,
22
- time:: { Duration , Instant } ,
22
+ time:: Duration ,
23
23
} ;
24
24
25
25
use async_trait:: async_trait;
@@ -30,7 +30,6 @@ use ceresdb_client::{
30
30
} ;
31
31
use reqwest:: { ClientBuilder , Url } ;
32
32
use sqlness:: { Database , QueryContext } ;
33
- use uuid:: Timestamp ;
34
33
35
34
const SERVER_GRPC_ENDPOINT_ENV : & str = "CERESDB_SERVER_GRPC_ENDPOINT" ;
36
35
const SERVER_HTTP_ENDPOINT_ENV : & str = "CERESDB_SERVER_HTTP_ENDPOINT" ;
@@ -83,7 +82,7 @@ pub struct CeresDBCluster {
83
82
84
83
/// Used in meta health check
85
84
db_client : Arc < dyn DbClient > ,
86
- health_check_sql : String ,
85
+ meta_stable_check_sql : String ,
87
86
}
88
87
89
88
impl CeresDBServer {
@@ -123,15 +122,15 @@ impl Backend for CeresDBServer {
123
122
}
124
123
125
124
impl CeresDBCluster {
126
- async fn check_meta_health ( & self ) -> bool {
125
+ async fn check_meta_stable ( & self ) -> bool {
127
126
let query_ctx = RpcContext {
128
127
database : Some ( "public" . to_string ( ) ) ,
129
128
timeout : None ,
130
129
} ;
131
130
132
131
let query_req = Request {
133
132
tables : vec ! [ ] ,
134
- sql : self . health_check_sql . clone ( ) ,
133
+ sql : self . meta_stable_check_sql . clone ( ) ,
135
134
} ;
136
135
137
136
let result = self . db_client . sql_query ( & query_ctx, & query_req) . await ;
@@ -175,51 +174,50 @@ impl Backend for CeresDBCluster {
175
174
let server0 = CeresDBServer :: spawn ( ceresdb_bin. clone ( ) , ceresdb_config_0, stdout0) ;
176
175
let server1 = CeresDBServer :: spawn ( ceresdb_bin, ceresdb_config_1, stdout1) ;
177
176
178
- // Health check context
177
+ // Meta stable check context
179
178
let endpoint = env:: var ( SERVER_GRPC_ENDPOINT_ENV ) . unwrap_or_else ( |_| {
180
179
panic ! ( "Cannot read server endpoint from env {SERVER_GRPC_ENDPOINT_ENV:?}" )
181
180
} ) ;
182
181
let db_client = Builder :: new ( endpoint, Mode :: Proxy ) . build ( ) ;
183
182
184
- let health_check_sql = format ! (
185
- r#"CREATE TABLE `health_check_ {}`
183
+ let meta_stable_check_sql = format ! (
184
+ r#"CREATE TABLE `stable_check_ {}`
186
185
(`name` string TAG, `value` double NOT NULL, `t` timestamp NOT NULL, TIMESTAMP KEY(t))"# ,
187
- "asfdasfadsfad"
186
+ uuid :: Uuid :: new_v4 ( )
188
187
) ;
189
188
190
189
Self {
191
190
server0,
192
191
server1,
193
192
ceresmeta_process,
194
193
db_client,
195
- health_check_sql ,
194
+ meta_stable_check_sql ,
196
195
}
197
196
}
198
197
199
198
async fn wait_for_ready ( & self ) {
200
- println ! ( "wait for cluster service initialized...\n " ) ;
201
- tokio:: time:: sleep ( Duration :: from_secs (
202
- 20 as u64 ,
203
- ) )
204
- . await ;
205
-
206
- println ! ( "wait for cluster service stable begin...\n " ) ;
199
+ println ! ( "wait for cluster service initialized..." ) ;
200
+ tokio:: time:: sleep ( Duration :: from_secs ( 20_u64 ) ) . await ;
201
+
202
+ println ! ( "wait for cluster service stable begin..." ) ;
207
203
let mut wait_cnt = 0 ;
208
204
let wait_max = 6 ;
209
205
loop {
210
206
if wait_cnt >= wait_max {
211
- println ! ( "wait too long for cluster service stable, maybe somethings went wrong..." ) ;
207
+ println ! (
208
+ "wait too long for cluster service stable, maybe somethings went wrong..."
209
+ ) ;
212
210
return ;
213
211
}
214
212
215
- if self . check_meta_health ( ) . await {
216
- println ! ( "wait cluster service stable finished...\n " ) ;
213
+ if self . check_meta_stable ( ) . await {
214
+ println ! ( "wait for cluster service stable finished..." ) ;
217
215
return ;
218
216
}
219
217
220
218
wait_cnt += 1 ;
221
219
let has_waited = wait_cnt * CLUSTER_CERESDB_HEALTH_CHECK_INTERVAL_SECONDS ;
222
- println ! ( "waiting for cluster service stable, has_waited:{has_waited}s\n " ) ;
220
+ println ! ( "waiting for cluster service stable, has_waited:{has_waited}s" ) ;
223
221
tokio:: time:: sleep ( Duration :: from_secs (
224
222
CLUSTER_CERESDB_HEALTH_CHECK_INTERVAL_SECONDS as u64 ,
225
223
) )
0 commit comments