diff --git a/Cargo.lock b/Cargo.lock index dafe353b15..192c56ad85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1299,6 +1299,7 @@ dependencies = [ "serde", "sqlness", "tokio", + "uuid", ] [[package]] diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index 6a9bdd7279..a30a3bbae1 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -28,3 +28,4 @@ reqwest = { workspace = true } serde = { workspace = true } sqlness = "0.4.3" tokio = { workspace = true } +uuid = { version = "1.3", features = ["v4"] } diff --git a/integration_tests/src/database.rs b/integration_tests/src/database.rs index 56244c0b96..96b2267655 100644 --- a/integration_tests/src/database.rs +++ b/integration_tests/src/database.rs @@ -44,6 +44,7 @@ const CERESDB_CONFIG_FILE_0_ENV: &str = "CERESDB_CONFIG_FILE_0"; const CERESDB_CONFIG_FILE_1_ENV: &str = "CERESDB_CONFIG_FILE_1"; const CLUSTER_CERESDB_STDOUT_FILE_0_ENV: &str = "CLUSTER_CERESDB_STDOUT_FILE_0"; const CLUSTER_CERESDB_STDOUT_FILE_1_ENV: &str = "CLUSTER_CERESDB_STDOUT_FILE_1"; +const CLUSTER_CERESDB_HEALTH_CHECK_INTERVAL_SECONDS: usize = 5; const CERESDB_SERVER_ADDR: &str = "CERESDB_SERVER_ADDR"; @@ -63,9 +64,10 @@ impl HttpClient { } } +#[async_trait] pub trait Backend { fn start() -> Self; - fn wait_for_ready(&self); + async fn wait_for_ready(&self); fn stop(&mut self); } @@ -77,6 +79,10 @@ pub struct CeresDBCluster { server0: CeresDBServer, server1: CeresDBServer, ceresmeta_process: Child, + + /// Used in meta health check + db_client: Arc, + meta_stable_check_sql: String, } impl CeresDBServer { @@ -97,6 +103,7 @@ impl CeresDBServer { } } +#[async_trait] impl Backend for CeresDBServer { fn start() -> Self { let config = env::var(CERESDB_CONFIG_FILE_ENV).expect("Cannot parse ceresdb config env"); @@ -105,8 +112,8 @@ impl Backend for CeresDBServer { Self::spawn(bin, config, stdout) } - fn wait_for_ready(&self) { - std::thread::sleep(Duration::from_secs(5)); + async fn wait_for_ready(&self) { + tokio::time::sleep(Duration::from_secs(10)).await } fn stop(&mut self) { @@ -114,6 +121,24 @@ impl Backend for CeresDBServer { } } +impl CeresDBCluster { + async fn check_meta_stable(&self) -> bool { + let query_ctx = RpcContext { + database: Some("public".to_string()), + timeout: None, + }; + + let query_req = Request { + tables: vec![], + sql: self.meta_stable_check_sql.clone(), + }; + + let result = self.db_client.sql_query(&query_ctx, &query_req).await; + result.is_ok() + } +} + +#[async_trait] impl Backend for CeresDBCluster { fn start() -> Self { let ceresmeta_bin = @@ -149,16 +174,55 @@ impl Backend for CeresDBCluster { let server0 = CeresDBServer::spawn(ceresdb_bin.clone(), ceresdb_config_0, stdout0); let server1 = CeresDBServer::spawn(ceresdb_bin, ceresdb_config_1, stdout1); + // Meta stable check context + let endpoint = env::var(SERVER_GRPC_ENDPOINT_ENV).unwrap_or_else(|_| { + panic!("Cannot read server endpoint from env {SERVER_GRPC_ENDPOINT_ENV:?}") + }); + let db_client = Builder::new(endpoint, Mode::Proxy).build(); + + let meta_stable_check_sql = format!( + r#"CREATE TABLE `stable_check_{}` + (`name` string TAG, `value` double NOT NULL, `t` timestamp NOT NULL, TIMESTAMP KEY(t))"#, + uuid::Uuid::new_v4() + ); + Self { server0, server1, ceresmeta_process, + db_client, + meta_stable_check_sql, } } - fn wait_for_ready(&self) { - println!("wait for cluster service ready...\n"); - std::thread::sleep(Duration::from_secs(20)); + async fn wait_for_ready(&self) { + println!("wait for cluster service initialized..."); + tokio::time::sleep(Duration::from_secs(20_u64)).await; + + println!("wait for cluster service stable begin..."); + let mut wait_cnt = 0; + let wait_max = 6; + loop { + if wait_cnt >= wait_max { + println!( + "wait too long for cluster service stable, maybe somethings went wrong..." + ); + return; + } + + if self.check_meta_stable().await { + println!("wait for cluster service stable finished..."); + return; + } + + wait_cnt += 1; + let has_waited = wait_cnt * CLUSTER_CERESDB_HEALTH_CHECK_INTERVAL_SECONDS; + println!("waiting for cluster service stable, has_waited:{has_waited}s"); + tokio::time::sleep(Duration::from_secs( + CLUSTER_CERESDB_HEALTH_CHECK_INTERVAL_SECONDS as u64, + )) + .await; + } } fn stop(&mut self) { @@ -226,9 +290,9 @@ impl Database for CeresDB { } impl CeresDB { - pub fn create() -> CeresDB { + pub async fn create() -> CeresDB { let backend = T::start(); - backend.wait_for_ready(); + backend.wait_for_ready().await; let endpoint = env::var(SERVER_GRPC_ENDPOINT_ENV).unwrap_or_else(|_| { panic!("Cannot read server endpoint from env {SERVER_GRPC_ENDPOINT_ENV:?}") diff --git a/integration_tests/src/main.rs b/integration_tests/src/main.rs index 7d81214261..87d923ca0c 100644 --- a/integration_tests/src/main.rs +++ b/integration_tests/src/main.rs @@ -60,8 +60,8 @@ impl EnvController for CeresDBController { async fn start(&self, env: &str, _config: Option<&Path>) -> Self::DB { println!("start with env {env}"); let db = match env { - "local" => Box::new(CeresDB::::create()) as DbRef, - "cluster" => Box::new(CeresDB::::create()) as DbRef, + "local" => Box::new(CeresDB::::create().await) as DbRef, + "cluster" => Box::new(CeresDB::::create().await) as DbRef, _ => panic!("invalid env {env}"), };