Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add shard related methods to table engine #897

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 98 additions & 3 deletions analytic_engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use std::sync::Arc;

use async_trait::async_trait;
use common_util::error::BoxError;
use log::info;
use log::{error, info};
use snafu::ResultExt;
use table_engine::{
engine::{
Close, CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, Result,
TableEngine,
Close, CloseShardRequest, CloseTableRequest, CreateTableRequest, DropTableRequest,
OpenShardRequest, OpenTableRequest, Result, TableEngine,
},
table::{SchemaId, TableRef},
ANALYTIC_ENGINE_TYPE,
Expand All @@ -37,6 +37,63 @@ impl TableEngineImpl {
pub fn new(instance: InstanceRef) -> Self {
Self { instance }
}

async fn open_tables_of_shard(
&self,
open_requests: Vec<table_engine::engine::OpenTableRequest>,
) -> Vec<table_engine::engine::Result<Option<TableRef>>> {
if open_requests.is_empty() {
return Vec::new();
}

let mut open_results = Vec::with_capacity(open_requests.len());
for request in open_requests {
let result = self
.open_table(request.clone())
.await
.map_err(|e| {
error!("Failed to open table, open_request:{request:?}, err:{e}");
e
})
.map(|table_opt| {
if table_opt.is_none() {
error!(
"Table engine returns none when opening table, open_request:{request:?}"
);
}
table_opt
});

open_results.push(result);
}

open_results
}

async fn close_tables_of_shard(
&self,
close_requests: Vec<table_engine::engine::CloseTableRequest>,
) -> Vec<table_engine::engine::Result<String>> {
if close_requests.is_empty() {
return Vec::new();
}

let mut close_results = Vec::with_capacity(close_requests.len());
for request in close_requests {
let result = self
.close_table(request.clone())
.await
.map_err(|e| {
error!("Failed to close table, close_request:{request:?}, err:{e}");
e
})
.map(|_| request.table_name);

close_results.push(result);
}

close_results
}
}

impl Drop for TableEngineImpl {
Expand Down Expand Up @@ -132,6 +189,44 @@ impl TableEngine for TableEngineImpl {

Ok(())
}

async fn open_shard(&self, request: OpenShardRequest) -> Vec<Result<Option<TableRef>>> {
let table_defs = request.table_defs;
let open_requests = table_defs
.into_iter()
.map(|def| OpenTableRequest {
catalog_name: def.catalog_name,
schema_name: def.schema_name,
schema_id: def.schema_id,
table_name: def.name,
table_id: def.id,
engine: request.engine.clone(),
shard_id: request.shard_id,
})
.collect();

self.open_tables_of_shard(open_requests).await
}

async fn close_shard(
&self,
request: CloseShardRequest,
) -> Vec<table_engine::engine::Result<String>> {
let table_defs = request.table_defs;
let close_requests = table_defs
.into_iter()
.map(|def| CloseTableRequest {
catalog_name: def.catalog_name,
schema_name: def.schema_name,
schema_id: def.schema_id,
table_name: def.name,
table_id: def.id,
engine: request.engine.clone(),
})
.collect();

self.close_tables_of_shard(close_requests).await
}
}

/// Generate the space id from the schema id with assumption schema id is unique
Expand Down
34 changes: 34 additions & 0 deletions catalog/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,40 @@ pub struct AlterTableRequest {
pub operations: Vec<AlterTableOperation>,
}

#[derive(Debug, Clone)]
pub struct OpenShardRequest {
/// Shard id
pub shard_id: ShardId,

/// Table infos
pub table_defs: Vec<TableDef>,

/// Table engine type
pub engine: String,
}

#[derive(Clone, Debug)]
pub struct TableDef {
pub catalog_name: String,
pub schema_name: String,
pub id: TableId,
pub name: String,
}

impl TableDef {
pub fn into_engine_table_def(self, schema_id: SchemaId) -> engine::TableDef {
engine::TableDef {
catalog_name: self.catalog_name,
schema_name: self.schema_name,
schema_id,
id: self.id,
name: self.name,
}
}
}

pub type CloseShardRequest = OpenShardRequest;

/// Schema manage tables.
#[async_trait]
pub trait Schema {
Expand Down
147 changes: 34 additions & 113 deletions catalog/src/table_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,15 @@
use std::time::Instant;

use common_util::{error::BoxError, time::InstantExt};
use log::{error, info, warn};
use log::{info, warn};
use snafu::{OptionExt, ResultExt};
use table_engine::{
engine::{CloseShardRequest, OpenShardRequest, TableEngineRef},
table::TableRef,
};
use table_engine::{engine, table::TableRef};

use crate::{
manager::ManagerRef,
schema::{
CloseOptions, CloseTableRequest, CreateOptions, CreateTableRequest, DropOptions,
DropTableRequest, OpenOptions, OpenTableRequest, SchemaRef,
CloseOptions, CloseShardRequest, CloseTableRequest, CreateOptions, CreateTableRequest,
DropOptions, DropTableRequest, OpenOptions, OpenShardRequest, OpenTableRequest, SchemaRef,
},
Result, TableOperatorNoCause, TableOperatorWithCause,
};
Expand All @@ -39,40 +36,30 @@ impl TableOperator {
let shard_id = request.shard_id;

// Generate open requests.
let table_infos = request.table_defs;
let schemas_and_requests = table_infos
.into_iter()
.map(|table| {
let schema_res = self.schema_by_name(&table.catalog_name, &table.schema_name);

schema_res.map(|schema| {
let request = table_engine::engine::OpenTableRequest {
catalog_name: table.catalog_name,
schema_name: table.schema_name,
schema_id: schema.id(),
table_name: table.name.clone(),
table_id: table.id,
engine: request.engine.clone(),
shard_id: request.shard_id,
};

(schema, request)
})
})
.collect::<Result<Vec<_>>>()?;
let (schemas, requests): (Vec<_>, Vec<_>) = schemas_and_requests.into_iter().unzip();
let mut schemas = Vec::with_capacity(request.table_defs.len());
let mut engine_table_defs = Vec::with_capacity(request.table_defs.len());
for open_ctx in request.table_defs {
let schema = self.schema_by_name(&open_ctx.catalog_name, &open_ctx.schema_name)?;
engine_table_defs.push(open_ctx.into_engine_table_def(schema.id()));
schemas.push(schema);
}

// Open tables by table engine.
// TODO: add the `open_shard` method into table engine.
let open_res = open_tables_of_shard(table_engine, requests).await;
let engine_open_shard_req = engine::OpenShardRequest {
shard_id: request.shard_id,
table_defs: engine_table_defs,
engine: request.engine,
};
let open_results = table_engine.open_shard(engine_open_shard_req).await;

// Check and register successful opened table into schema.
let mut success_count = 0_u32;
let mut no_table_count = 0_u32;
let mut open_err_count = 0_u32;

for (schema, open_res) in schemas.into_iter().zip(open_res.into_iter()) {
match open_res {
for (schema, open_result) in schemas.into_iter().zip(open_results.into_iter()) {
match open_result {
Ok(Some(table)) => {
schema.register_table(table);
success_count += 1;
Expand Down Expand Up @@ -112,38 +99,29 @@ impl TableOperator {
let shard_id = request.shard_id;

// Generate open requests.
let table_defs = request.table_defs;
let schemas_and_requests = table_defs
.into_iter()
.map(|def| {
let schema_res = self.schema_by_name(&def.catalog_name, &def.schema_name);

schema_res.map(|schema| {
let request = table_engine::engine::CloseTableRequest {
catalog_name: def.catalog_name,
schema_name: def.schema_name,
schema_id: schema.id(),
table_name: def.name.clone(),
table_id: def.id,
engine: request.engine.clone(),
};

(schema, request)
})
})
.collect::<Result<Vec<_>>>()?;
let (schemas, requests): (Vec<_>, Vec<_>) = schemas_and_requests.into_iter().unzip();
let mut schemas = Vec::with_capacity(request.table_defs.len());
let mut engine_table_defs = Vec::with_capacity(request.table_defs.len());
for table_def in request.table_defs {
let schema = self.schema_by_name(&table_def.catalog_name, &table_def.schema_name)?;
engine_table_defs.push(table_def.into_engine_table_def(schema.id()));
schemas.push(schema);
}

// Close tables by table engine.
// TODO: add the `close_shard` method into table engine.
let results = close_tables_of_shard(table_engine, requests).await;
let engine_close_shard_req = engine::CloseShardRequest {
shard_id: request.shard_id,
table_defs: engine_table_defs,
engine: request.engine,
};
let close_results = table_engine.close_shard(engine_close_shard_req).await;

// Check and unregister successful closed table from schema.
let mut success_count = 0_u32;
let mut close_err_count = 0_u32;

for (schema, result) in schemas.into_iter().zip(results.into_iter()) {
match result {
for (schema, close_result) in schemas.into_iter().zip(close_results.into_iter()) {
match close_result {
Ok(table_name) => {
schema.unregister_table(&table_name);
success_count += 1;
Expand Down Expand Up @@ -286,60 +264,3 @@ impl TableOperator {
})
}
}

async fn open_tables_of_shard(
table_engine: TableEngineRef,
open_requests: Vec<table_engine::engine::OpenTableRequest>,
) -> Vec<table_engine::engine::Result<Option<TableRef>>> {
if open_requests.is_empty() {
return Vec::new();
}

let mut open_results = Vec::with_capacity(open_requests.len());
for request in open_requests {
let result = table_engine
.open_table(request.clone())
.await
.map_err(|e| {
error!("Failed to open table, open_request:{request:?}, err:{e}");
e
})
.map(|table_opt| {
if table_opt.is_none() {
error!(
"Table engine returns none when opening table, open_request:{request:?}"
);
}
table_opt
});

open_results.push(result);
}

open_results
}

async fn close_tables_of_shard(
table_engine: TableEngineRef,
close_requests: Vec<table_engine::engine::CloseTableRequest>,
) -> Vec<table_engine::engine::Result<String>> {
if close_requests.is_empty() {
return Vec::new();
}

let mut close_results = Vec::with_capacity(close_requests.len());
for request in close_requests {
let result = table_engine
.close_table(request.clone())
.await
.map_err(|e| {
error!("Failed to close table, close_request:{request:?}, err:{e}");
e
})
.map(|_| request.table_name);

close_results.push(result);
}

close_results
}
12 changes: 10 additions & 2 deletions partition_table_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use common_util::error::BoxError;
use snafu::{OptionExt, ResultExt};
use table_engine::{
engine::{
CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, Result,
TableEngine, Unexpected, UnexpectedNoCause,
CloseShardRequest, CloseTableRequest, CreateTableRequest, DropTableRequest,
OpenShardRequest, OpenTableRequest, Result, TableEngine, Unexpected, UnexpectedNoCause,
},
remote::RemoteEngineRef,
table::TableRef,
Expand Down Expand Up @@ -75,4 +75,12 @@ impl TableEngine for PartitionTableEngine {
async fn close_table(&self, _request: CloseTableRequest) -> Result<()> {
Ok(())
}

async fn open_shard(&self, _request: OpenShardRequest) -> Vec<Result<Option<TableRef>>> {
vec![Ok(None)]
}

async fn close_shard(&self, _request: CloseShardRequest) -> Vec<Result<String>> {
vec![Ok("".to_string())]
}
}
Loading