From 19cd3469849f068c55065110eaa796e09b319525 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Wed, 20 Sep 2023 16:30:37 +0800 Subject: [PATCH 1/2] fix: loop all sub tables to get table info --- proxy/src/lib.rs | 88 ++++++++++++++++++------------- table_engine/src/partition/mod.rs | 10 ++++ 2 files changed, 61 insertions(+), 37 deletions(-) diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 6593db42fa..840abab768 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -75,7 +75,8 @@ use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use table_engine::{ engine::{CreateTableParams, EngineRuntimes, TableState}, - remote::model::{GetTableInfoRequest, TableIdentifier}, + partition::PartitionInfo, + remote::model::{GetTableInfoRequest, TableIdentifier, TableInfo}, table::{TableId, TableRef}, PARTITION_TABLE_ENGINE_TYPE, }; @@ -314,27 +315,55 @@ impl Proxy { Ok(table) } - async fn maybe_open_partition_table_if_not_exist( + async fn get_partition_table_info( &self, catalog_name: &str, schema_name: &str, - table_name: &str, - ) -> Result<()> { - if let Err(e) = self - .open_partition_table_inner(catalog_name, schema_name, table_name) - .await - { - warn!("Open partition table failed, err:{e:?}"); + base_name: &str, + part_info: &PartitionInfo, + ) -> Result { + let get_inner = |i| async move { + let sub_partition_table_name = util::get_sub_partition_name(base_name, part_info, i); + let table = self + .instance + .remote_engine_ref + .get_table_info(GetTableInfoRequest { + table: TableIdentifier { + catalog: catalog_name.to_string(), + schema: schema_name.to_string(), + table: sub_partition_table_name, + }, + }) + .await + .box_err() + .with_context(|| ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: "Failed to get table", + })?; + + Ok(table) + }; + + let part_num = part_info.get_definition_num(); + if part_num == 1 { + return get_inner(0).await; } - // When open remote table failed, we currently don't return error outside. - // This is because when sub_table[0] is unhealthy, we can not drop the partition - // table. - // TODO: maybe we can find a more elegant way to deal with this issue. - Ok(()) + // Loop get sub tables to get table info in case of some of them has problems. + for i in 0..part_info.get_definition_num() - 1 { + let ret = get_inner(i).await; + if let Err(err) = ret { + warn!("Failed to get table info, err:{err:?}"); + } else { + return ret; + } + } + + // return the last sub table's get result to outside + get_inner(part_num - 1).await } - async fn open_partition_table_inner( + async fn maybe_open_partition_table_if_not_exist( &self, catalog_name: &str, schema_name: &str, @@ -403,29 +432,14 @@ impl Proxy { let partition_table_info = table_info_in_meta.unwrap(); // If table not exists, open it. - // Get table_schema from first sub partition table. - let first_sub_partition_table_name = util::get_sub_partition_name( - &partition_table_info.name, - partition_table_info.partition_info.as_ref().unwrap(), - 0usize, - ); let table = self - .instance - .remote_engine_ref - .get_table_info(GetTableInfoRequest { - table: TableIdentifier { - catalog: catalog_name.to_string(), - schema: schema_name.to_string(), - table: first_sub_partition_table_name, - }, - }) - .await - .box_err() - .with_context(|| ErrWithCause { - code: StatusCode::INTERNAL_SERVER_ERROR, - msg: "Failed to get table", - })?; - + .get_partition_table_info( + catalog_name, + schema_name, + &partition_table_info.name, + partition_table_info.partition_info.as_ref().unwrap(), + ) + .await?; // Partition table is a virtual table, so we need to create it manually. // Partition info is stored in ceresmeta, so we need to use create_table_request // to create it. diff --git a/table_engine/src/partition/mod.rs b/table_engine/src/partition/mod.rs index 7dca5792f6..e8ba154d24 100644 --- a/table_engine/src/partition/mod.rs +++ b/table_engine/src/partition/mod.rs @@ -90,6 +90,7 @@ pub enum PartitionInfo { } impl PartitionInfo { + #[inline] pub fn get_definitions(&self) -> Vec { match self { Self::Random(v) => v.definitions.clone(), @@ -97,6 +98,15 @@ impl PartitionInfo { Self::Key(v) => v.definitions.clone(), } } + + #[inline] + pub fn get_definition_num(&self) -> usize { + match self { + Self::Random(v) => v.definitions.len(), + Self::Hash(v) => v.definitions.len(), + Self::Key(v) => v.definitions.len(), + } + } } #[derive(Clone, Debug, PartialEq, Eq, Default)] From f059485d57cf3137254e3dc95fc35f7ec11c5b5f Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Wed, 20 Sep 2023 17:06:29 +0800 Subject: [PATCH 2/2] fix CR --- proxy/src/lib.rs | 23 ++++++++++------------- table_engine/src/partition/mod.rs | 2 +- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 840abab768..237aac5f62 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -323,6 +323,7 @@ impl Proxy { part_info: &PartitionInfo, ) -> Result { let get_inner = |i| async move { + // TODO: the remote engine should provide a method to get all sub table names. let sub_partition_table_name = util::get_sub_partition_name(base_name, part_info, i); let table = self .instance @@ -344,16 +345,12 @@ impl Proxy { Ok(table) }; - let part_num = part_info.get_definition_num(); - if part_num == 1 { - return get_inner(0).await; - } - - // Loop get sub tables to get table info in case of some of them has problems. - for i in 0..part_info.get_definition_num() - 1 { + let part_num = part_info.get_partition_num(); + // Loop all sub tables to get table info in case of some of them has problems. + for i in 0..part_num - 1 { let ret = get_inner(i).await; - if let Err(err) = ret { - warn!("Failed to get table info, err:{err:?}"); + if let Err(e) = ret { + warn!("Failed to get table info, err:{e:?}"); } else { return ret; } @@ -432,7 +429,7 @@ impl Proxy { let partition_table_info = table_info_in_meta.unwrap(); // If table not exists, open it. - let table = self + let table_info = self .get_partition_table_info( catalog_name, schema_name, @@ -447,9 +444,9 @@ impl Proxy { catalog_name: catalog_name.to_string(), schema_name: schema_name.to_string(), table_name: partition_table_info.name, - table_schema: table.table_schema, - engine: table.engine, - table_options: table.options, + table_schema: table_info.table_schema, + engine: table_info.engine, + table_options: table_info.options, partition_info: partition_table_info.partition_info, }; let create_table_request = CreateTableRequest { diff --git a/table_engine/src/partition/mod.rs b/table_engine/src/partition/mod.rs index e8ba154d24..29b8756f53 100644 --- a/table_engine/src/partition/mod.rs +++ b/table_engine/src/partition/mod.rs @@ -100,7 +100,7 @@ impl PartitionInfo { } #[inline] - pub fn get_definition_num(&self) -> usize { + pub fn get_partition_num(&self) -> usize { match self { Self::Random(v) => v.definitions.len(), Self::Hash(v) => v.definitions.len(),