Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: loop all sub tables to get table info #1224

Merged
merged 2 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 51 additions & 37 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use table_engine::{
engine::{CreateTableParams, EngineRuntimes, TableState},
remote::model::{GetTableInfoRequest, TableIdentifier},
partition::PartitionInfo,
remote::model::{GetTableInfoRequest, TableIdentifier, TableInfo},
table::{TableId, TableRef},
PARTITION_TABLE_ENGINE_TYPE,
};
Expand Down Expand Up @@ -314,27 +315,55 @@ impl Proxy {
Ok(table)
}

async fn maybe_open_partition_table_if_not_exist(
async fn get_partition_table_info(
&self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
) -> Result<()> {
if let Err(e) = self
.open_partition_table_inner(catalog_name, schema_name, table_name)
.await
{
warn!("Open partition table failed, err:{e:?}");
base_name: &str,
part_info: &PartitionInfo,
) -> Result<TableInfo> {
let get_inner = |i| async move {
let sub_partition_table_name = util::get_sub_partition_name(base_name, part_info, i);
let table = self
.instance
.remote_engine_ref
.get_table_info(GetTableInfoRequest {
table: TableIdentifier {
catalog: catalog_name.to_string(),
schema: schema_name.to_string(),
table: sub_partition_table_name,
},
})
.await
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "Failed to get table",
})?;

Ok(table)
};

let part_num = part_info.get_definition_num();
if part_num == 1 {
return get_inner(0).await;
}

// When open remote table failed, we currently don't return error outside.
// This is because when sub_table[0] is unhealthy, we can not drop the partition
// table.
// TODO: maybe we can find a more elegant way to deal with this issue.
Ok(())
// Loop get sub tables to get table info in case of some of them has problems.
for i in 0..part_info.get_definition_num() - 1 {
let ret = get_inner(i).await;
if let Err(err) = ret {
warn!("Failed to get table info, err:{err:?}");
} else {
return ret;
}
}

// return the last sub table's get result to outside
get_inner(part_num - 1).await
}

async fn open_partition_table_inner(
async fn maybe_open_partition_table_if_not_exist(
&self,
catalog_name: &str,
schema_name: &str,
Expand Down Expand Up @@ -403,29 +432,14 @@ impl Proxy {
let partition_table_info = table_info_in_meta.unwrap();

// If table not exists, open it.
// Get table_schema from first sub partition table.
let first_sub_partition_table_name = util::get_sub_partition_name(
&partition_table_info.name,
partition_table_info.partition_info.as_ref().unwrap(),
0usize,
);
let table = self
.instance
.remote_engine_ref
.get_table_info(GetTableInfoRequest {
table: TableIdentifier {
catalog: catalog_name.to_string(),
schema: schema_name.to_string(),
table: first_sub_partition_table_name,
},
})
.await
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "Failed to get table",
})?;

.get_partition_table_info(
catalog_name,
schema_name,
&partition_table_info.name,
partition_table_info.partition_info.as_ref().unwrap(),
)
.await?;
// Partition table is a virtual table, so we need to create it manually.
// Partition info is stored in ceresmeta, so we need to use create_table_request
// to create it.
Expand Down
10 changes: 10 additions & 0 deletions table_engine/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,23 @@ pub enum PartitionInfo {
}

impl PartitionInfo {
#[inline]
pub fn get_definitions(&self) -> Vec<PartitionDefinition> {
match self {
Self::Random(v) => v.definitions.clone(),
Self::Hash(v) => v.definitions.clone(),
Self::Key(v) => v.definitions.clone(),
}
}

#[inline]
pub fn get_definition_num(&self) -> usize {
match self {
Self::Random(v) => v.definitions.len(),
Self::Hash(v) => v.definitions.len(),
Self::Key(v) => v.definitions.len(),
}
}
}

#[derive(Clone, Debug, PartialEq, Eq, Default)]
Expand Down