Skip to content

Commit f8471d2

Browse files
authored
Revert "fix: avoid any updates after table is closed (#998)" (#1034)
This reverts commit 85eb0b7. ## Rationale The changes introduced by #998 are not reasonable. Another fix will address #990. ## Detailed Changes Revert #998
1 parent cb7c907 commit f8471d2

File tree

10 files changed

+161
-219
lines changed

10 files changed

+161
-219
lines changed

analytic_engine/src/compaction/scheduler.rs

+1-10
Original file line numberDiff line numberDiff line change
@@ -650,16 +650,7 @@ impl ScheduleWorker {
650650
self.max_unflushed_duration,
651651
);
652652

653-
let mut serial_exec = if let Some(v) = table_data.acquire_serial_exec_ctx().await {
654-
v
655-
} else {
656-
warn!(
657-
"Table is closed, ignore this periodical flush, table:{}",
658-
table_data.name
659-
);
660-
continue;
661-
};
662-
653+
let mut serial_exec = table_data.serial_exec.lock().await;
663654
let flush_scheduler = serial_exec.flush_scheduler();
664655
// Instance flush the table asynchronously.
665656
if let Err(e) = flusher

analytic_engine/src/instance/close.rs

+6-10
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33
//! Close table logic of instance
44
55
use log::{info, warn};
6-
use snafu::{OptionExt, ResultExt};
6+
use snafu::ResultExt;
77
use table_engine::engine::CloseTableRequest;
88

99
use crate::{
1010
instance::{
11-
engine::{DoManifestSnapshot, FlushTable, OperateClosedTable, Result},
11+
engine::{DoManifestSnapshot, FlushTable, Result},
1212
flush_compaction::{Flusher, TableFlushOptions},
1313
},
1414
manifest::{ManifestRef, SnapshotRequest},
@@ -37,11 +37,8 @@ impl Closer {
3737

3838
// Flush table.
3939
let opts = TableFlushOptions::default();
40-
let mut serial_exec_ctx = table_data
41-
.acquire_serial_exec_ctx()
42-
.await
43-
.context(OperateClosedTable)?;
44-
let flush_scheduler = serial_exec_ctx.flush_scheduler();
40+
let mut serial_exec = table_data.serial_exec.lock().await;
41+
let flush_scheduler = serial_exec.flush_scheduler();
4542

4643
self.flusher
4744
.do_flush(flush_scheduler, &table_data, opts)
@@ -70,10 +67,9 @@ impl Closer {
7067
let removed_table = self.space.remove_table(&request.table_name);
7168
assert!(removed_table.is_some());
7269

73-
serial_exec_ctx.invalidate();
7470
info!(
75-
"table:{} has been removed from the space_id:{}, table_id:{}",
76-
table_data.name, self.space.id, table_data.id,
71+
"table:{}-{} has been removed from the space_id:{}",
72+
table_data.name, table_data.id, self.space.id
7773
);
7874
Ok(())
7975
}

analytic_engine/src/instance/drop.rs

+4-7
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33
//! Drop table logic of instance
44
55
use log::{info, warn};
6-
use snafu::{OptionExt, ResultExt};
6+
use snafu::ResultExt;
77
use table_engine::engine::DropTableRequest;
88

99
use crate::{
1010
instance::{
11-
engine::{FlushTable, OperateClosedTable, Result, WriteManifest},
11+
engine::{FlushTable, Result, WriteManifest},
1212
flush_compaction::{Flusher, TableFlushOptions},
1313
SpaceStoreRef,
1414
},
@@ -36,10 +36,7 @@ impl Dropper {
3636
}
3737
};
3838

39-
let mut serial_exec_ctx = table_data
40-
.acquire_serial_exec_ctx()
41-
.await
42-
.context(OperateClosedTable)?;
39+
let mut serial_exec = table_data.serial_exec.lock().await;
4340

4441
if table_data.is_dropped() {
4542
warn!(
@@ -54,7 +51,7 @@ impl Dropper {
5451
// be avoided.
5552

5653
let opts = TableFlushOptions::default();
57-
let flush_scheduler = serial_exec_ctx.flush_scheduler();
54+
let flush_scheduler = serial_exec.flush_scheduler();
5855
self.flusher
5956
.do_flush(flush_scheduler, &table_data, opts)
6057
.await

analytic_engine/src/instance/engine.rs

+73-28
Original file line numberDiff line numberDiff line change
@@ -23,33 +23,44 @@ use crate::{
2323
#[derive(Debug, Snafu)]
2424
#[snafu(visibility(pub(crate)))]
2525
pub enum Error {
26-
#[snafu(display("The space of the table does not exist, space_id:{space_id}, table:{table}.\nBacktrace:\n{backtrace}"))]
26+
#[snafu(display(
27+
"The space of the table does not exist, space_id:{}, table:{}.\nBacktrace:\n{}",
28+
space_id,
29+
table,
30+
backtrace,
31+
))]
2732
SpaceNotExist {
2833
space_id: SpaceId,
2934
table: String,
3035
backtrace: Backtrace,
3136
},
3237

33-
#[snafu(display("Failed to read meta update, table_id:{table_id}, err:{source}"))]
38+
#[snafu(display("Failed to read meta update, table_id:{}, err:{}", table_id, source))]
3439
ReadMetaUpdate {
3540
table_id: TableId,
3641
source: GenericError,
3742
},
3843

3944
#[snafu(display(
40-
"Failed to recover table data, space_id:{space_id}, table:{table}, err:{source}"
45+
"Failed to recover table data, space_id:{}, table:{}, err:{}",
46+
space_id,
47+
table,
48+
source
4149
))]
4250
RecoverTableData {
4351
space_id: SpaceId,
4452
table: String,
4553
source: crate::table::data::Error,
4654
},
4755

48-
#[snafu(display("Failed to read wal, err:{source}"))]
56+
#[snafu(display("Failed to read wal, err:{}", source))]
4957
ReadWal { source: wal::manager::Error },
5058

5159
#[snafu(display(
52-
"Failed to apply log entry to memtable, table:{table}, table_id:{table_id}, err:{source}",
60+
"Failed to apply log entry to memtable, table:{}, table_id:{}, err:{}",
61+
table,
62+
table_id,
63+
source
5364
))]
5465
ApplyMemTable {
5566
space_id: SpaceId,
@@ -59,7 +70,11 @@ pub enum Error {
5970
},
6071

6172
#[snafu(display(
62-
"Flush failed, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}",
73+
"Flush failed, space_id:{}, table:{}, table_id:{}, err:{}",
74+
space_id,
75+
table,
76+
table_id,
77+
source
6378
))]
6479
FlushTable {
6580
space_id: SpaceId,
@@ -69,7 +84,11 @@ pub enum Error {
6984
},
7085

7186
#[snafu(display(
72-
"Failed to persist meta update to manifest, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}",
87+
"Failed to persist meta update to manifest, space_id:{}, table:{}, table_id:{}, err:{}",
88+
space_id,
89+
table,
90+
table_id,
91+
source
7392
))]
7493
WriteManifest {
7594
space_id: SpaceId,
@@ -79,7 +98,11 @@ pub enum Error {
7998
},
8099

81100
#[snafu(display(
82-
"Failed to persist meta update to WAL, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}",
101+
"Failed to persist meta update to WAL, space_id:{}, table:{}, table_id:{}, err:{}",
102+
space_id,
103+
table,
104+
table_id,
105+
source
83106
))]
84107
WriteWal {
85108
space_id: SpaceId,
@@ -89,7 +112,11 @@ pub enum Error {
89112
},
90113

91114
#[snafu(display(
92-
"Invalid options, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}",
115+
"Invalid options, space_id:{}, table:{}, table_id:{}, err:{}",
116+
space_id,
117+
table,
118+
table_id,
119+
source
93120
))]
94121
InvalidOptions {
95122
space_id: SpaceId,
@@ -99,7 +126,11 @@ pub enum Error {
99126
},
100127

101128
#[snafu(display(
102-
"Failed to create table data, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}",
129+
"Failed to create table data, space_id:{}, table:{}, table_id:{}, err:{}",
130+
space_id,
131+
table,
132+
table_id,
133+
source
103134
))]
104135
CreateTableData {
105136
space_id: SpaceId,
@@ -109,8 +140,11 @@ pub enum Error {
109140
},
110141

111142
#[snafu(display(
112-
"Try to update schema to elder version, table:{table}, current_version:{current_version}, \
113-
given_version:{given_version}.\nBacktrace:\n{backtrace}",
143+
"Try to update schema to elder version, table:{}, current_version:{}, given_version:{}.\nBacktrace:\n{}",
144+
table,
145+
current_version,
146+
given_version,
147+
backtrace,
114148
))]
115149
InvalidSchemaVersion {
116150
table: String,
@@ -120,8 +154,11 @@ pub enum Error {
120154
},
121155

122156
#[snafu(display(
123-
"Invalid previous schema version, table:{table}, current_version:{current_version}, \
124-
pre_version:{pre_version}.\nBacktrace:\n{backtrace}",
157+
"Invalid previous schema version, table:{}, current_version:{}, pre_version:{}.\nBacktrace:\n{}",
158+
table,
159+
current_version,
160+
pre_version,
161+
backtrace,
125162
))]
126163
InvalidPreVersion {
127164
table: String,
@@ -130,14 +167,21 @@ pub enum Error {
130167
backtrace: Backtrace,
131168
},
132169

133-
#[snafu(display("Alter schema of a dropped table:{table}.\nBacktrace:\n{backtrace}"))]
170+
#[snafu(display(
171+
"Alter schema of a dropped table:{}.\nBacktrace:\n{}",
172+
table,
173+
backtrace
174+
))]
134175
AlterDroppedTable { table: String, backtrace: Backtrace },
135176

136-
#[snafu(display("Failed to store version edit, err:{source}"))]
177+
#[snafu(display("Failed to store version edit, err:{}", source))]
137178
StoreVersionEdit { source: GenericError },
138179

139180
#[snafu(display(
140-
"Failed to encode payloads, table:{table}, wal_location:{wal_location:?}, err:{source}"
181+
"Failed to encode payloads, table:{}, wal_location:{:?}, err:{}",
182+
table,
183+
wal_location,
184+
source
141185
))]
142186
EncodePayloads {
143187
table: String,
@@ -146,7 +190,10 @@ pub enum Error {
146190
},
147191

148192
#[snafu(display(
149-
"Failed to do manifest snapshot for table, space_id:{space_id}, table:{table}, err:{source}",
193+
"Failed to do manifest snapshot for table, space_id:{}, table:{}, err:{}",
194+
space_id,
195+
table,
196+
source
150197
))]
151198
DoManifestSnapshot {
152199
space_id: SpaceId,
@@ -155,31 +202,30 @@ pub enum Error {
155202
},
156203

157204
#[snafu(display(
158-
"Table open failed and can not be created again, table:{table}.\nBacktrace:\n{backtrace}",
205+
"Table open failed and can not be created again, table:{}.\nBacktrace:\n{}",
206+
table,
207+
backtrace,
159208
))]
160209
CreateOpenFailedTable { table: String, backtrace: Backtrace },
161210

162-
#[snafu(display("Failed to open manifest, err:{source}"))]
211+
#[snafu(display("Failed to open manifest, err:{}", source))]
163212
OpenManifest {
164213
source: crate::manifest::details::Error,
165214
},
166215

167-
#[snafu(display("Failed to find table, msg:{msg}.\nBacktrace:\n{backtrace}"))]
216+
#[snafu(display("Failed to find table, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
168217
TableNotExist { msg: String, backtrace: Backtrace },
169218

170-
#[snafu(display("Failed to open shard, msg:{msg}.\nBacktrace:\n{backtrace}"))]
219+
#[snafu(display("Failed to open shard, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
171220
OpenTablesOfShard { msg: String, backtrace: Backtrace },
172221

173-
#[snafu(display("Try to operate a closed table.\nBacktrace:\n{backtrace}"))]
174-
OperateClosedTable { backtrace: Backtrace },
175-
176-
#[snafu(display("Failed to replay wal, msg:{msg:?}, err:{source}"))]
222+
#[snafu(display("Failed to replay wal, msg:{:?}, err:{}", msg, source))]
177223
ReplayWalWithCause {
178224
msg: Option<String>,
179225
source: GenericError,
180226
},
181227

182-
#[snafu(display("Failed to replay wal, msg:{msg:?}.\nBacktrace:\n{backtrace}"))]
228+
#[snafu(display("Failed to replay wal, msg:{:?}.\nBacktrace:\n{}", msg, backtrace))]
183229
ReplayWalNoCause {
184230
msg: Option<String>,
185231
backtrace: Backtrace,
@@ -218,7 +264,6 @@ impl From<Error> for table_engine::engine::Error {
218264
| Error::TableNotExist { .. }
219265
| Error::OpenTablesOfShard { .. }
220266
| Error::ReplayWalNoCause { .. }
221-
| Error::OperateClosedTable { .. }
222267
| Error::ReplayWalWithCause { .. } => Self::Unexpected {
223268
source: Box::new(err),
224269
},

analytic_engine/src/instance/mod.rs

+3-12
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,14 @@ use common_util::{
2828
};
2929
use log::{error, info};
3030
use mem_collector::MemUsageCollector;
31-
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
31+
use snafu::{ResultExt, Snafu};
3232
use table_engine::{engine::EngineRuntimes, table::FlushRequest};
3333
use tokio::sync::oneshot::{self, error::RecvError};
3434
use wal::manager::{WalLocation, WalManagerRef};
3535

36+
use self::flush_compaction::{Flusher, TableFlushOptions};
3637
use crate::{
3738
compaction::{scheduler::CompactionSchedulerRef, TableCompactionRequest},
38-
instance::flush_compaction::{Flusher, TableFlushOptions},
3939
manifest::ManifestRef,
4040
row_iter::IterOptions,
4141
space::{SpaceId, SpaceRef, SpacesRef},
@@ -66,9 +66,6 @@ pub enum Error {
6666
source: GenericError,
6767
},
6868

69-
#[snafu(display("Try to operate a closed table, table:{table}.\nBacktrace:\n{backtrace}"))]
70-
OperateClosedTable { table: String, backtrace: Backtrace },
71-
7269
#[snafu(display("Failed to receive {} result, table:{}, err:{}", op, table, source))]
7370
RecvManualOpResult {
7471
op: String,
@@ -198,13 +195,7 @@ impl Instance {
198195
};
199196

200197
let flusher = self.make_flusher();
201-
let mut serial_exec =
202-
table_data
203-
.acquire_serial_exec_ctx()
204-
.await
205-
.context(OperateClosedTable {
206-
table: &table_data.name,
207-
})?;
198+
let mut serial_exec = table_data.serial_exec.lock().await;
208199
let flush_scheduler = serial_exec.flush_scheduler();
209200
flusher
210201
.schedule_flush(flush_scheduler, table_data, flush_opts)

0 commit comments

Comments
 (0)