Skip to content

Commit 642934a

Browse files
refactor!: refactor shard version logic (#1286)
## Rationale For details, see: apache/incubator-horaedb-meta#263 ## Detailed Changes * Modify the return value of `CreateTableOnShard` & `DropTableOnShard` to return the latest shard version. ## Test Plan Pass all unit tests and integration test. --------- Co-authored-by: xikai.wxk <xikai.wxk@antgroup.com> Co-authored-by: WEI Xikai <ShiKaiWi@users.noreply.github.com>
1 parent c33ab01 commit 642934a

File tree

8 files changed

+243
-120
lines changed

8 files changed

+243
-120
lines changed

Cargo.lock

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+6-6
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,8 @@ bytes = "1"
9393
bytes_ext = { path = "components/bytes_ext" }
9494
catalog = { path = "catalog" }
9595
catalog_impls = { path = "catalog_impls" }
96-
ceresdbproto = "1.0.21"
96+
ceresdbproto = "1.0.22"
9797
codec = { path = "components/codec" }
98-
notifier = { path = "components/notifier" }
9998
chrono = "0.4"
10099
clap = "3.0"
101100
clru = "0.6.1"
@@ -114,21 +113,22 @@ generic_error = { path = "components/generic_error" }
114113
hash_ext = { path = "components/hash_ext" }
115114
hex = "0.4.3"
116115
hyperloglog = { git = "https://github.com/jedisct1/rust-hyperloglog.git", rev = "425487ce910f26636fbde8c4d640b538431aad50" }
117-
lz4_flex = { version = "0.11", default-features = false, features = ["frame"] }
118-
lazy_static = "1.4.0"
119-
logger = { path = "components/logger" }
120-
lru = "0.7.6"
121116
id_allocator = { path = "components/id_allocator" }
122117
influxql-logical-planner = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = "iox_query_influxql" }
123118
influxql-parser = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = "influxdb_influxql_parser" }
124119
influxql-query = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = "iox_query" }
125120
influxql-schema = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = "schema" }
126121
interpreters = { path = "interpreters" }
127122
itertools = "0.10.5"
123+
lz4_flex = { version = "0.11", default-features = false, features = ["frame"] }
124+
lazy_static = "1.4.0"
125+
logger = { path = "components/logger" }
126+
lru = "0.7.6"
128127
macros = { path = "components/macros" }
129128
message_queue = { path = "components/message_queue" }
130129
meta_client = { path = "meta_client" }
131130
metric_ext = { path = "components/metric_ext" }
131+
notifier = { path = "components/notifier" }
132132
object_store = { path = "components/object_store" }
133133
panic_ext = { path = "components/panic_ext" }
134134
partitioned_lock = { path = "components/partitioned_lock" }

catalog_impls/src/volatile.rs

+4-12
Original file line numberDiff line numberDiff line change
@@ -326,20 +326,12 @@ impl Schema for SchemaImpl {
326326
// Do real create table.
327327
// Partition table is not stored in ShardTableManager.
328328
if request.params.partition_info.is_none() {
329-
let shard =
330-
self.shard_set
331-
.get(request.shard_id)
332-
.with_context(|| schema::CreateTable {
333-
request: request.clone(),
334-
msg: "shard not found".to_string(),
335-
})?;
336-
337-
// TODO: seems unnecessary?
338-
let _ = shard
339-
.find_table(&request.params.schema_name, &request.params.table_name)
329+
let _ = self
330+
.shard_set
331+
.get(request.shard_id)
340332
.with_context(|| schema::CreateTable {
341333
request: request.clone(),
342-
msg: "table not found in shard".to_string(),
334+
msg: "shard not found".to_string(),
343335
})?;
344336
}
345337
let request = request.into_engine_create_request(None, self.schema_id);

cluster/src/shard_operator.rs

+94-60
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use catalog::{
2121
},
2222
table_operator::TableOperator,
2323
};
24+
use common_types::table::ShardVersion;
2425
use generic_error::BoxError;
2526
use logger::info;
2627
use snafu::ResultExt;
@@ -219,24 +220,16 @@ impl ShardOperator {
219220
Ok(())
220221
}
221222

222-
pub async fn create_table(&self, ctx: CreateTableContext) -> Result<()> {
223-
let shard_info = ctx.updated_table_info.shard_info.clone();
224-
let table_info = ctx.updated_table_info.table_info.clone();
223+
pub async fn create_table(&self, ctx: CreateTableContext) -> Result<ShardVersion> {
224+
let shard_info = &ctx.updated_table_info.shard_info;
225+
let table_info = &ctx.updated_table_info.table_info;
225226

226227
info!(
227-
"ShardOperator create table sequentially begin, shard_info:{shard_info:?}, table_info:{table_info:?}",
228+
"ShardOperator create table sequentially begin, shard_id:{}, table:{}, shard_info:{shard_info:?}, table_info:{table_info:?}",
229+
shard_info.id,
230+
table_info.name,
228231
);
229232

230-
// FIXME: maybe should insert table from cluster after having created table.
231-
{
232-
let mut data = self.data.write().unwrap();
233-
data.try_insert_table(ctx.updated_table_info)
234-
.box_err()
235-
.with_context(|| CreateTableWithCause {
236-
msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"),
237-
})?;
238-
}
239-
240233
// Create the table by operator afterwards.
241234
let (table_engine, partition_info) = match table_info.partition_info.clone() {
242235
Some(v) => (ctx.partition_table_engine.clone(), Some(v)),
@@ -275,30 +268,37 @@ impl ShardOperator {
275268
})?;
276269

277270
info!(
278-
"ShardOperator create table sequentially finish, shard_info:{shard_info:?}, table_info:{table_info:?}",
271+
"ShardOperator table is created by operator, shard_id:{}, table:{}",
272+
shard_info.id, table_info.name,
279273
);
280274

281-
Ok(())
275+
let latest_version = {
276+
let mut data = self.data.write().unwrap();
277+
data.try_create_table(ctx.updated_table_info.clone())
278+
.box_err()
279+
.with_context(|| CreateTableWithCause {
280+
msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"),
281+
})?
282+
};
283+
284+
info!(
285+
"ShardOperator create table sequentially finish, shard_id:{}, shard_version:{}, table:{}",
286+
shard_info.id, shard_info.version, table_info.name,
287+
);
288+
289+
Ok(latest_version)
282290
}
283291

284-
pub async fn drop_table(&self, ctx: DropTableContext) -> Result<()> {
285-
let shard_info = ctx.updated_table_info.shard_info.clone();
286-
let table_info = ctx.updated_table_info.table_info.clone();
292+
pub async fn drop_table(&self, ctx: DropTableContext) -> Result<ShardVersion> {
293+
let shard_info = &ctx.updated_table_info.shard_info;
294+
let table_info = &ctx.updated_table_info.table_info;
287295

288296
info!(
289-
"ShardOperator drop table sequentially begin, shard_info:{shard_info:?}, table_info:{table_info:?}",
297+
"ShardOperator drop table sequentially begin, shard_id:{}, table:{}, shard_info:{shard_info:?}, table_info:{table_info:?}",
298+
shard_info.id,
299+
table_info.name,
290300
);
291301

292-
// FIXME: maybe should insert table from cluster after having dropped table.
293-
{
294-
let mut data = self.data.write().unwrap();
295-
data.try_remove_table(ctx.updated_table_info)
296-
.box_err()
297-
.with_context(|| DropTableWithCause {
298-
msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"),
299-
})?;
300-
}
301-
302302
// Drop the table by operator afterwards.
303303
let drop_table_request = DropTableRequest {
304304
catalog_name: ctx.catalog,
@@ -319,31 +319,41 @@ impl ShardOperator {
319319
})?;
320320

321321
info!(
322-
"ShardOperator drop table sequentially finish, shard_info:{shard_info:?}, table_info:{table_info:?}",
322+
"ShardOperator table is dropped, shard_id:{}, table:{}",
323+
shard_info.id, table_info.name,
323324
);
324325

325-
Ok(())
326+
// Update the shard info after the table is dropped.
327+
let latest_version = {
328+
let mut data = self.data.write().unwrap();
329+
data.try_drop_table(ctx.updated_table_info.clone())
330+
.box_err()
331+
.with_context(|| DropTableWithCause {
332+
msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"),
333+
})?
334+
};
335+
336+
info!(
337+
"ShardOperator drop table sequentially finish, latest_version:{latest_version}, shard_id:{}, old_shard_version:{}, table:{}",
338+
shard_info.id,
339+
shard_info.version,
340+
table_info.name,
341+
);
342+
343+
Ok(latest_version)
326344
}
327345

328346
pub async fn open_table(&self, ctx: OpenTableContext) -> Result<()> {
329-
let shard_info = ctx.updated_table_info.shard_info.clone();
330-
let table_info = ctx.updated_table_info.table_info.clone();
347+
let shard_info = &ctx.updated_table_info.shard_info;
348+
let table_info = &ctx.updated_table_info.table_info;
331349

332350
info!(
333-
"ShardOperator open table sequentially begin, shard_info:{shard_info:?}, table_info:{table_info:?}",
351+
"ShardOperator open table sequentially begin, shard_id:{}, table:{}, shard_info:{shard_info:?}, table_info:{table_info:?}",
352+
shard_info.id,
353+
table_info.name,
334354
);
335355

336-
// FIXME: maybe should insert table from cluster after having opened table.
337-
{
338-
let mut data = self.data.write().unwrap();
339-
data.try_insert_table(ctx.updated_table_info)
340-
.box_err()
341-
.with_context(|| OpenTableWithCause {
342-
msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"),
343-
})?;
344-
}
345-
346-
// Open the table by operator afterwards.
356+
// Open the table by operator.
347357
let open_table_request = OpenTableRequest {
348358
catalog_name: ctx.catalog,
349359
schema_name: table_info.schema_name.clone(),
@@ -366,28 +376,34 @@ impl ShardOperator {
366376
})?;
367377

368378
info!(
369-
"ShardOperator open table sequentially finish, shard_info:{shard_info:?}, table_info:{table_info:?}",
379+
"ShardOperator table is opened by operator, shard_id:{}, table:{}",
380+
shard_info.id, table_info.name
370381
);
371382

372-
Ok(())
373-
}
374-
375-
pub async fn close_table(&self, ctx: CloseTableContext) -> Result<()> {
376-
let shard_info = ctx.updated_table_info.shard_info.clone();
377-
let table_info = ctx.updated_table_info.table_info.clone();
378-
379-
info!("ShardOperator close table sequentially begin, shard_info:{shard_info:?}, table_info:{table_info:?}");
380-
381-
// FIXME: maybe should remove table from cluster after having closed table.
383+
// Update the shard info after the table is opened.
382384
{
383385
let mut data = self.data.write().unwrap();
384-
data.try_remove_table(ctx.updated_table_info)
386+
data.try_open_table(ctx.updated_table_info.clone())
385387
.box_err()
386-
.with_context(|| CloseTableWithCause {
388+
.with_context(|| OpenTableWithCause {
387389
msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"),
388390
})?;
389391
}
390392

393+
info!(
394+
"ShardOperator open table sequentially finish, shard_id:{}, table:{}",
395+
shard_info.id, table_info.name
396+
);
397+
398+
Ok(())
399+
}
400+
401+
pub async fn close_table(&self, ctx: CloseTableContext) -> Result<()> {
402+
let shard_info = &ctx.updated_table_info.shard_info;
403+
let table_info = &ctx.updated_table_info.table_info;
404+
405+
info!("ShardOperator close table sequentially begin, shard_id:{}, table:{}, shard_info:{shard_info:?}, table_info:{table_info:?}", shard_info.id, table_info.name);
406+
391407
// Close the table by catalog manager afterwards.
392408
let close_table_request = CloseTableRequest {
393409
catalog_name: ctx.catalog,
@@ -409,7 +425,25 @@ impl ShardOperator {
409425
msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"),
410426
})?;
411427

412-
info!("ShardOperator close table sequentially finish, shard_info:{shard_info:?}, table_info:{table_info:?}");
428+
info!(
429+
"ShardOperator table is closed by operator, shard_id:{}, table:{}",
430+
shard_info.id, table_info.name
431+
);
432+
433+
// Update the shard info after the table is closed.
434+
{
435+
let mut data = self.data.write().unwrap();
436+
data.try_close_table(ctx.updated_table_info.clone())
437+
.box_err()
438+
.with_context(|| CloseTableWithCause {
439+
msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"),
440+
})?;
441+
}
442+
443+
info!(
444+
"ShardOperator close table sequentially finish, shard_id:{}, table:{}",
445+
shard_info.id, table_info.name
446+
);
413447

414448
Ok(())
415449
}

0 commit comments

Comments
 (0)