Skip to content

Commit 85eb0b7

Browse files
authored
fix: avoid any updates after table is closed (#998)
## Rationale Part of #990. Some background jobs are still allowed to execute, and it will lead to data corrupted when a table is migrated between different nodes because of multiple writers for the same table. ## Detailed Changes Introduce a flag called `invalid` in the table data to denote whether the serial executor is valid, and this flag is protected with the `TableOpSerialExecutor` in table data, and the `TableOpSerialExecutor` won't be acquired if the flag is set, that is to say, any table operation including updating manifest, altering table and so on, can't be executed after the flag is set because these operations require the `TableOpSerialExecutor`. Finally, the flag will be set when the table is closed.
1 parent 907058e commit 85eb0b7

File tree

10 files changed

+219
-161
lines changed

10 files changed

+219
-161
lines changed

analytic_engine/src/compaction/scheduler.rs

+10-1
Original file line numberDiff line numberDiff line change
@@ -650,7 +650,16 @@ impl ScheduleWorker {
650650
self.max_unflushed_duration,
651651
);
652652

653-
let mut serial_exec = table_data.serial_exec.lock().await;
653+
let mut serial_exec = if let Some(v) = table_data.acquire_serial_exec_ctx().await {
654+
v
655+
} else {
656+
warn!(
657+
"Table is closed, ignore this periodical flush, table:{}",
658+
table_data.name
659+
);
660+
continue;
661+
};
662+
654663
let flush_scheduler = serial_exec.flush_scheduler();
655664
// Instance flush the table asynchronously.
656665
if let Err(e) = flusher

analytic_engine/src/instance/close.rs

+10-6
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33
//! Close table logic of instance
44
55
use log::{info, warn};
6-
use snafu::ResultExt;
6+
use snafu::{OptionExt, ResultExt};
77
use table_engine::engine::CloseTableRequest;
88

99
use crate::{
1010
instance::{
11-
engine::{DoManifestSnapshot, FlushTable, Result},
11+
engine::{DoManifestSnapshot, FlushTable, OperateClosedTable, Result},
1212
flush_compaction::{Flusher, TableFlushOptions},
1313
},
1414
manifest::{ManifestRef, SnapshotRequest},
@@ -37,8 +37,11 @@ impl Closer {
3737

3838
// Flush table.
3939
let opts = TableFlushOptions::default();
40-
let mut serial_exec = table_data.serial_exec.lock().await;
41-
let flush_scheduler = serial_exec.flush_scheduler();
40+
let mut serial_exec_ctx = table_data
41+
.acquire_serial_exec_ctx()
42+
.await
43+
.context(OperateClosedTable)?;
44+
let flush_scheduler = serial_exec_ctx.flush_scheduler();
4245

4346
self.flusher
4447
.do_flush(flush_scheduler, &table_data, opts)
@@ -67,9 +70,10 @@ impl Closer {
6770
let removed_table = self.space.remove_table(&request.table_name);
6871
assert!(removed_table.is_some());
6972

73+
serial_exec_ctx.invalidate();
7074
info!(
71-
"table:{}-{} has been removed from the space_id:{}",
72-
table_data.name, table_data.id, self.space.id
75+
"table:{} has been removed from the space_id:{}, table_id:{}",
76+
table_data.name, self.space.id, table_data.id,
7377
);
7478
Ok(())
7579
}

analytic_engine/src/instance/drop.rs

+7-4
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33
//! Drop table logic of instance
44
55
use log::{info, warn};
6-
use snafu::ResultExt;
6+
use snafu::{OptionExt, ResultExt};
77
use table_engine::engine::DropTableRequest;
88

99
use crate::{
1010
instance::{
11-
engine::{FlushTable, Result, WriteManifest},
11+
engine::{FlushTable, OperateClosedTable, Result, WriteManifest},
1212
flush_compaction::{Flusher, TableFlushOptions},
1313
SpaceStoreRef,
1414
},
@@ -36,7 +36,10 @@ impl Dropper {
3636
}
3737
};
3838

39-
let mut serial_exec = table_data.serial_exec.lock().await;
39+
let mut serial_exec_ctx = table_data
40+
.acquire_serial_exec_ctx()
41+
.await
42+
.context(OperateClosedTable)?;
4043

4144
if table_data.is_dropped() {
4245
warn!(
@@ -51,7 +54,7 @@ impl Dropper {
5154
// be avoided.
5255

5356
let opts = TableFlushOptions::default();
54-
let flush_scheduler = serial_exec.flush_scheduler();
57+
let flush_scheduler = serial_exec_ctx.flush_scheduler();
5558
self.flusher
5659
.do_flush(flush_scheduler, &table_data, opts)
5760
.await

analytic_engine/src/instance/engine.rs

+28-73
Original file line numberDiff line numberDiff line change
@@ -23,44 +23,33 @@ use crate::{
2323
#[derive(Debug, Snafu)]
2424
#[snafu(visibility(pub(crate)))]
2525
pub enum Error {
26-
#[snafu(display(
27-
"The space of the table does not exist, space_id:{}, table:{}.\nBacktrace:\n{}",
28-
space_id,
29-
table,
30-
backtrace,
31-
))]
26+
#[snafu(display("The space of the table does not exist, space_id:{space_id}, table:{table}.\nBacktrace:\n{backtrace}"))]
3227
SpaceNotExist {
3328
space_id: SpaceId,
3429
table: String,
3530
backtrace: Backtrace,
3631
},
3732

38-
#[snafu(display("Failed to read meta update, table_id:{}, err:{}", table_id, source))]
33+
#[snafu(display("Failed to read meta update, table_id:{table_id}, err:{source}"))]
3934
ReadMetaUpdate {
4035
table_id: TableId,
4136
source: GenericError,
4237
},
4338

4439
#[snafu(display(
45-
"Failed to recover table data, space_id:{}, table:{}, err:{}",
46-
space_id,
47-
table,
48-
source
40+
"Failed to recover table data, space_id:{space_id}, table:{table}, err:{source}"
4941
))]
5042
RecoverTableData {
5143
space_id: SpaceId,
5244
table: String,
5345
source: crate::table::data::Error,
5446
},
5547

56-
#[snafu(display("Failed to read wal, err:{}", source))]
48+
#[snafu(display("Failed to read wal, err:{source}"))]
5749
ReadWal { source: wal::manager::Error },
5850

5951
#[snafu(display(
60-
"Failed to apply log entry to memtable, table:{}, table_id:{}, err:{}",
61-
table,
62-
table_id,
63-
source
52+
"Failed to apply log entry to memtable, table:{table}, table_id:{table_id}, err:{source}",
6453
))]
6554
ApplyMemTable {
6655
space_id: SpaceId,
@@ -70,11 +59,7 @@ pub enum Error {
7059
},
7160

7261
#[snafu(display(
73-
"Flush failed, space_id:{}, table:{}, table_id:{}, err:{}",
74-
space_id,
75-
table,
76-
table_id,
77-
source
62+
"Flush failed, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}",
7863
))]
7964
FlushTable {
8065
space_id: SpaceId,
@@ -84,11 +69,7 @@ pub enum Error {
8469
},
8570

8671
#[snafu(display(
87-
"Failed to persist meta update to manifest, space_id:{}, table:{}, table_id:{}, err:{}",
88-
space_id,
89-
table,
90-
table_id,
91-
source
72+
"Failed to persist meta update to manifest, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}",
9273
))]
9374
WriteManifest {
9475
space_id: SpaceId,
@@ -98,11 +79,7 @@ pub enum Error {
9879
},
9980

10081
#[snafu(display(
101-
"Failed to persist meta update to WAL, space_id:{}, table:{}, table_id:{}, err:{}",
102-
space_id,
103-
table,
104-
table_id,
105-
source
82+
"Failed to persist meta update to WAL, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}",
10683
))]
10784
WriteWal {
10885
space_id: SpaceId,
@@ -112,11 +89,7 @@ pub enum Error {
11289
},
11390

11491
#[snafu(display(
115-
"Invalid options, space_id:{}, table:{}, table_id:{}, err:{}",
116-
space_id,
117-
table,
118-
table_id,
119-
source
92+
"Invalid options, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}",
12093
))]
12194
InvalidOptions {
12295
space_id: SpaceId,
@@ -126,11 +99,7 @@ pub enum Error {
12699
},
127100

128101
#[snafu(display(
129-
"Failed to create table data, space_id:{}, table:{}, table_id:{}, err:{}",
130-
space_id,
131-
table,
132-
table_id,
133-
source
102+
"Failed to create table data, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}",
134103
))]
135104
CreateTableData {
136105
space_id: SpaceId,
@@ -140,11 +109,8 @@ pub enum Error {
140109
},
141110

142111
#[snafu(display(
143-
"Try to update schema to elder version, table:{}, current_version:{}, given_version:{}.\nBacktrace:\n{}",
144-
table,
145-
current_version,
146-
given_version,
147-
backtrace,
112+
"Try to update schema to elder version, table:{table}, current_version:{current_version}, \
113+
given_version:{given_version}.\nBacktrace:\n{backtrace}",
148114
))]
149115
InvalidSchemaVersion {
150116
table: String,
@@ -154,11 +120,8 @@ pub enum Error {
154120
},
155121

156122
#[snafu(display(
157-
"Invalid previous schema version, table:{}, current_version:{}, pre_version:{}.\nBacktrace:\n{}",
158-
table,
159-
current_version,
160-
pre_version,
161-
backtrace,
123+
"Invalid previous schema version, table:{table}, current_version:{current_version}, \
124+
pre_version:{pre_version}.\nBacktrace:\n{backtrace}",
162125
))]
163126
InvalidPreVersion {
164127
table: String,
@@ -167,21 +130,14 @@ pub enum Error {
167130
backtrace: Backtrace,
168131
},
169132

170-
#[snafu(display(
171-
"Alter schema of a dropped table:{}.\nBacktrace:\n{}",
172-
table,
173-
backtrace
174-
))]
133+
#[snafu(display("Alter schema of a dropped table:{table}.\nBacktrace:\n{backtrace}"))]
175134
AlterDroppedTable { table: String, backtrace: Backtrace },
176135

177-
#[snafu(display("Failed to store version edit, err:{}", source))]
136+
#[snafu(display("Failed to store version edit, err:{source}"))]
178137
StoreVersionEdit { source: GenericError },
179138

180139
#[snafu(display(
181-
"Failed to encode payloads, table:{}, wal_location:{:?}, err:{}",
182-
table,
183-
wal_location,
184-
source
140+
"Failed to encode payloads, table:{table}, wal_location:{wal_location:?}, err:{source}"
185141
))]
186142
EncodePayloads {
187143
table: String,
@@ -190,10 +146,7 @@ pub enum Error {
190146
},
191147

192148
#[snafu(display(
193-
"Failed to do manifest snapshot for table, space_id:{}, table:{}, err:{}",
194-
space_id,
195-
table,
196-
source
149+
"Failed to do manifest snapshot for table, space_id:{space_id}, table:{table}, err:{source}",
197150
))]
198151
DoManifestSnapshot {
199152
space_id: SpaceId,
@@ -202,30 +155,31 @@ pub enum Error {
202155
},
203156

204157
#[snafu(display(
205-
"Table open failed and can not be created again, table:{}.\nBacktrace:\n{}",
206-
table,
207-
backtrace,
158+
"Table open failed and can not be created again, table:{table}.\nBacktrace:\n{backtrace}",
208159
))]
209160
CreateOpenFailedTable { table: String, backtrace: Backtrace },
210161

211-
#[snafu(display("Failed to open manifest, err:{}", source))]
162+
#[snafu(display("Failed to open manifest, err:{source}"))]
212163
OpenManifest {
213164
source: crate::manifest::details::Error,
214165
},
215166

216-
#[snafu(display("Failed to find table, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
167+
#[snafu(display("Failed to find table, msg:{msg}.\nBacktrace:\n{backtrace}"))]
217168
TableNotExist { msg: String, backtrace: Backtrace },
218169

219-
#[snafu(display("Failed to open shard, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
170+
#[snafu(display("Failed to open shard, msg:{msg}.\nBacktrace:\n{backtrace}"))]
220171
OpenTablesOfShard { msg: String, backtrace: Backtrace },
221172

222-
#[snafu(display("Failed to replay wal, msg:{:?}, err:{}", msg, source))]
173+
#[snafu(display("Try to operate a closed table.\nBacktrace:\n{backtrace}"))]
174+
OperateClosedTable { backtrace: Backtrace },
175+
176+
#[snafu(display("Failed to replay wal, msg:{msg:?}, err:{source}"))]
223177
ReplayWalWithCause {
224178
msg: Option<String>,
225179
source: GenericError,
226180
},
227181

228-
#[snafu(display("Failed to replay wal, msg:{:?}.\nBacktrace:\n{}", msg, backtrace))]
182+
#[snafu(display("Failed to replay wal, msg:{msg:?}.\nBacktrace:\n{backtrace}"))]
229183
ReplayWalNoCause {
230184
msg: Option<String>,
231185
backtrace: Backtrace,
@@ -264,6 +218,7 @@ impl From<Error> for table_engine::engine::Error {
264218
| Error::TableNotExist { .. }
265219
| Error::OpenTablesOfShard { .. }
266220
| Error::ReplayWalNoCause { .. }
221+
| Error::OperateClosedTable { .. }
267222
| Error::ReplayWalWithCause { .. } => Self::Unexpected {
268223
source: Box::new(err),
269224
},

analytic_engine/src/instance/mod.rs

+12-3
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,14 @@ use common_util::{
2828
};
2929
use log::{error, info};
3030
use mem_collector::MemUsageCollector;
31-
use snafu::{ResultExt, Snafu};
31+
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
3232
use table_engine::{engine::EngineRuntimes, table::FlushRequest};
3333
use tokio::sync::oneshot::{self, error::RecvError};
3434
use wal::manager::{WalLocation, WalManagerRef};
3535

36-
use self::flush_compaction::{Flusher, TableFlushOptions};
3736
use crate::{
3837
compaction::{scheduler::CompactionSchedulerRef, TableCompactionRequest},
38+
instance::flush_compaction::{Flusher, TableFlushOptions},
3939
manifest::ManifestRef,
4040
row_iter::IterOptions,
4141
space::{SpaceId, SpaceRef, SpacesRef},
@@ -66,6 +66,9 @@ pub enum Error {
6666
source: GenericError,
6767
},
6868

69+
#[snafu(display("Try to operate a closed table, table:{table}.\nBacktrace:\n{backtrace}"))]
70+
OperateClosedTable { table: String, backtrace: Backtrace },
71+
6972
#[snafu(display("Failed to receive {} result, table:{}, err:{}", op, table, source))]
7073
RecvManualOpResult {
7174
op: String,
@@ -195,7 +198,13 @@ impl Instance {
195198
};
196199

197200
let flusher = self.make_flusher();
198-
let mut serial_exec = table_data.serial_exec.lock().await;
201+
let mut serial_exec =
202+
table_data
203+
.acquire_serial_exec_ctx()
204+
.await
205+
.context(OperateClosedTable {
206+
table: &table_data.name,
207+
})?;
199208
let flush_scheduler = serial_exec.flush_scheduler();
200209
flusher
201210
.schedule_flush(flush_scheduler, table_data, flush_opts)

0 commit comments

Comments
 (0)