Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: loop all sub tables to get table info #1224

Merged
merged 2 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 52 additions & 41 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use table_engine::{
engine::{CreateTableParams, EngineRuntimes, TableState},
remote::model::{GetTableInfoRequest, TableIdentifier},
partition::PartitionInfo,
remote::model::{GetTableInfoRequest, TableIdentifier, TableInfo},
table::{TableId, TableRef},
PARTITION_TABLE_ENGINE_TYPE,
};
Expand Down Expand Up @@ -314,27 +315,52 @@ impl Proxy {
Ok(table)
}

async fn maybe_open_partition_table_if_not_exist(
async fn get_partition_table_info(
&self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
) -> Result<()> {
if let Err(e) = self
.open_partition_table_inner(catalog_name, schema_name, table_name)
.await
{
warn!("Open partition table failed, err:{e:?}");
base_name: &str,
part_info: &PartitionInfo,
) -> Result<TableInfo> {
let get_inner = |i| async move {
// TODO: the remote engine should provide a method to get all sub table names.
let sub_partition_table_name = util::get_sub_partition_name(base_name, part_info, i);
let table = self
.instance
.remote_engine_ref
.get_table_info(GetTableInfoRequest {
table: TableIdentifier {
catalog: catalog_name.to_string(),
schema: schema_name.to_string(),
table: sub_partition_table_name,
},
})
.await
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "Failed to get table",
})?;

Ok(table)
};

let part_num = part_info.get_partition_num();
// Loop all sub tables to get table info in case of some of them has problems.
for i in 0..part_num - 1 {
let ret = get_inner(i).await;
if let Err(e) = ret {
warn!("Failed to get table info, err:{e:?}");
} else {
return ret;
}
}

// When open remote table failed, we currently don't return error outside.
// This is because when sub_table[0] is unhealthy, we can not drop the partition
// table.
// TODO: maybe we can find a more elegant way to deal with this issue.
Ok(())
// return the last sub table's get result to outside
get_inner(part_num - 1).await
}

async fn open_partition_table_inner(
async fn maybe_open_partition_table_if_not_exist(
&self,
catalog_name: &str,
schema_name: &str,
Expand Down Expand Up @@ -403,39 +429,24 @@ impl Proxy {
let partition_table_info = table_info_in_meta.unwrap();

// If table not exists, open it.
// Get table_schema from first sub partition table.
let first_sub_partition_table_name = util::get_sub_partition_name(
&partition_table_info.name,
partition_table_info.partition_info.as_ref().unwrap(),
0usize,
);
let table = self
.instance
.remote_engine_ref
.get_table_info(GetTableInfoRequest {
table: TableIdentifier {
catalog: catalog_name.to_string(),
schema: schema_name.to_string(),
table: first_sub_partition_table_name,
},
})
.await
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "Failed to get table",
})?;

let table_info = self
.get_partition_table_info(
catalog_name,
schema_name,
&partition_table_info.name,
partition_table_info.partition_info.as_ref().unwrap(),
)
.await?;
// Partition table is a virtual table, so we need to create it manually.
// Partition info is stored in ceresmeta, so we need to use create_table_request
// to create it.
let params = CreateTableParams {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: partition_table_info.name,
table_schema: table.table_schema,
engine: table.engine,
table_options: table.options,
table_schema: table_info.table_schema,
engine: table_info.engine,
table_options: table_info.options,
partition_info: partition_table_info.partition_info,
};
let create_table_request = CreateTableRequest {
Expand Down
10 changes: 10 additions & 0 deletions table_engine/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,23 @@ pub enum PartitionInfo {
}

impl PartitionInfo {
#[inline]
pub fn get_definitions(&self) -> Vec<PartitionDefinition> {
match self {
Self::Random(v) => v.definitions.clone(),
Self::Hash(v) => v.definitions.clone(),
Self::Key(v) => v.definitions.clone(),
}
}

#[inline]
pub fn get_partition_num(&self) -> usize {
match self {
Self::Random(v) => v.definitions.len(),
Self::Hash(v) => v.definitions.len(),
Self::Key(v) => v.definitions.len(),
}
}
}

#[derive(Clone, Debug, PartialEq, Eq, Default)]
Expand Down