Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add meta stable check into integration test #1202

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ reqwest = { workspace = true }
serde = { workspace = true }
sqlness = "0.4.3"
tokio = { workspace = true }
uuid = { version = "1.3", features = ["v4"] }
80 changes: 72 additions & 8 deletions integration_tests/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const CERESDB_CONFIG_FILE_0_ENV: &str = "CERESDB_CONFIG_FILE_0";
const CERESDB_CONFIG_FILE_1_ENV: &str = "CERESDB_CONFIG_FILE_1";
const CLUSTER_CERESDB_STDOUT_FILE_0_ENV: &str = "CLUSTER_CERESDB_STDOUT_FILE_0";
const CLUSTER_CERESDB_STDOUT_FILE_1_ENV: &str = "CLUSTER_CERESDB_STDOUT_FILE_1";
const CLUSTER_CERESDB_HEALTH_CHECK_INTERVAL_SECONDS: usize = 5;

const CERESDB_SERVER_ADDR: &str = "CERESDB_SERVER_ADDR";

Expand All @@ -63,9 +64,10 @@ impl HttpClient {
}
}

#[async_trait]
pub trait Backend {
fn start() -> Self;
fn wait_for_ready(&self);
async fn wait_for_ready(&self);
fn stop(&mut self);
}

Expand All @@ -77,6 +79,10 @@ pub struct CeresDBCluster {
server0: CeresDBServer,
server1: CeresDBServer,
ceresmeta_process: Child,

/// Used in meta health check
db_client: Arc<dyn DbClient>,
meta_stable_check_sql: String,
}

impl CeresDBServer {
Expand All @@ -97,6 +103,7 @@ impl CeresDBServer {
}
}

#[async_trait]
impl Backend for CeresDBServer {
fn start() -> Self {
let config = env::var(CERESDB_CONFIG_FILE_ENV).expect("Cannot parse ceresdb config env");
Expand All @@ -105,15 +112,33 @@ impl Backend for CeresDBServer {
Self::spawn(bin, config, stdout)
}

fn wait_for_ready(&self) {
std::thread::sleep(Duration::from_secs(5));
async fn wait_for_ready(&self) {
tokio::time::sleep(Duration::from_secs(10)).await
}

fn stop(&mut self) {
self.server_process.kill().expect("Failed to kill server");
}
}

impl CeresDBCluster {
async fn check_meta_stable(&self) -> bool {
let query_ctx = RpcContext {
database: Some("public".to_string()),
timeout: None,
};

let query_req = Request {
tables: vec![],
sql: self.meta_stable_check_sql.clone(),
};

let result = self.db_client.sql_query(&query_ctx, &query_req).await;
result.is_ok()
}
}

#[async_trait]
impl Backend for CeresDBCluster {
fn start() -> Self {
let ceresmeta_bin =
Expand Down Expand Up @@ -149,16 +174,55 @@ impl Backend for CeresDBCluster {
let server0 = CeresDBServer::spawn(ceresdb_bin.clone(), ceresdb_config_0, stdout0);
let server1 = CeresDBServer::spawn(ceresdb_bin, ceresdb_config_1, stdout1);

// Meta stable check context
let endpoint = env::var(SERVER_GRPC_ENDPOINT_ENV).unwrap_or_else(|_| {
panic!("Cannot read server endpoint from env {SERVER_GRPC_ENDPOINT_ENV:?}")
});
let db_client = Builder::new(endpoint, Mode::Proxy).build();

let meta_stable_check_sql = format!(
r#"CREATE TABLE `stable_check_{}`
(`name` string TAG, `value` double NOT NULL, `t` timestamp NOT NULL, TIMESTAMP KEY(t))"#,
uuid::Uuid::new_v4()
);

Self {
server0,
server1,
ceresmeta_process,
db_client,
meta_stable_check_sql,
}
}

fn wait_for_ready(&self) {
println!("wait for cluster service ready...\n");
std::thread::sleep(Duration::from_secs(20));
async fn wait_for_ready(&self) {
println!("wait for cluster service initialized...");
tokio::time::sleep(Duration::from_secs(20_u64)).await;

println!("wait for cluster service stable begin...");
let mut wait_cnt = 0;
let wait_max = 6;
loop {
if wait_cnt >= wait_max {
println!(
"wait too long for cluster service stable, maybe somethings went wrong..."
);
return;
}

if self.check_meta_stable().await {
println!("wait for cluster service stable finished...");
return;
}

wait_cnt += 1;
let has_waited = wait_cnt * CLUSTER_CERESDB_HEALTH_CHECK_INTERVAL_SECONDS;
println!("waiting for cluster service stable, has_waited:{has_waited}s");
tokio::time::sleep(Duration::from_secs(
CLUSTER_CERESDB_HEALTH_CHECK_INTERVAL_SECONDS as u64,
))
.await;
}
}

fn stop(&mut self) {
Expand Down Expand Up @@ -226,9 +290,9 @@ impl<T: Send + Sync> Database for CeresDB<T> {
}

impl<T: Backend> CeresDB<T> {
pub fn create() -> CeresDB<T> {
pub async fn create() -> CeresDB<T> {
let backend = T::start();
backend.wait_for_ready();
backend.wait_for_ready().await;

let endpoint = env::var(SERVER_GRPC_ENDPOINT_ENV).unwrap_or_else(|_| {
panic!("Cannot read server endpoint from env {SERVER_GRPC_ENDPOINT_ENV:?}")
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ impl EnvController for CeresDBController {
async fn start(&self, env: &str, _config: Option<&Path>) -> Self::DB {
println!("start with env {env}");
let db = match env {
"local" => Box::new(CeresDB::<CeresDBServer>::create()) as DbRef,
"cluster" => Box::new(CeresDB::<CeresDBCluster>::create()) as DbRef,
"local" => Box::new(CeresDB::<CeresDBServer>::create().await) as DbRef,
"cluster" => Box::new(CeresDB::<CeresDBCluster>::create().await) as DbRef,
_ => panic!("invalid env {env}"),
};

Expand Down