Skip to content

Commit ede19fa

Browse files
authored
feat: refactor manifest to get snapshot in memory (#825)
* support return snapshot in `TableVersion`. * introdce `TableSnapshotProvider` to get manifest snapshot data from memory. * refactor `Snapshotter` to support from memory or storage. * correct the order between do snapshot and write wal. * fix test in manifest. * let table snapshot provider return None when drop table. * remove `origin_logs_num` from `Snapshot`. * fix compile and clippy. * split `Snapshotter` to `Snapshotter` and `SnapshotRecoverer`, remove the now unused `SnapshotBuilder`. * move `TableSnapshotProviderImpl` to space module.
1 parent f1cfbca commit ede19fa

File tree

8 files changed

+459
-217
lines changed

8 files changed

+459
-217
lines changed

analytic_engine/src/instance/engine.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,11 @@ pub enum Error {
205205
backtrace,
206206
))]
207207
CreateOpenFailedTable { table: String, backtrace: Backtrace },
208+
209+
#[snafu(display("Failed to open manifest, err:{}", source))]
210+
OpenManifest {
211+
source: crate::manifest::details::Error,
212+
},
208213
}
209214

210215
define_result!(Error);
@@ -234,7 +239,8 @@ impl From<Error> for table_engine::engine::Error {
234239
| Error::StoreVersionEdit { .. }
235240
| Error::EncodePayloads { .. }
236241
| Error::CreateOpenFailedTable { .. }
237-
| Error::DoManifestSnapshot { .. } => Self::Unexpected {
242+
| Error::DoManifestSnapshot { .. }
243+
| Error::OpenManifest { .. } => Self::Unexpected {
238244
source: Box::new(err),
239245
},
240246
}

analytic_engine/src/instance/mod.rs

+3-38
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@ mod read;
1717
pub(crate) mod serial_executor;
1818
pub(crate) mod write;
1919

20-
use std::{
21-
collections::HashMap,
22-
sync::{Arc, RwLock},
23-
};
20+
use std::sync::Arc;
2421

2522
use common_types::table::TableId;
2623
use common_util::{
@@ -40,7 +37,7 @@ use crate::{
4037
compaction::{scheduler::CompactionSchedulerRef, TableCompactionRequest},
4138
manifest::ManifestRef,
4239
row_iter::IterOptions,
43-
space::{SpaceId, SpaceRef},
40+
space::{SpaceId, SpaceRef, SpacesRef},
4441
sst::{
4542
factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef, ScanOptions},
4643
file::FilePurger,
@@ -70,41 +67,9 @@ pub enum Error {
7067

7168
define_result!(Error);
7269

73-
/// Spaces states
74-
#[derive(Default)]
75-
struct Spaces {
76-
/// Id to space
77-
id_to_space: HashMap<SpaceId, SpaceRef>,
78-
}
79-
80-
impl Spaces {
81-
/// Insert space by name, and also insert id to space mapping
82-
fn insert(&mut self, space: SpaceRef) {
83-
let space_id = space.id;
84-
self.id_to_space.insert(space_id, space);
85-
}
86-
87-
fn get_by_id(&self, id: SpaceId) -> Option<&SpaceRef> {
88-
self.id_to_space.get(&id)
89-
}
90-
91-
/// List all tables of all spaces
92-
fn list_all_tables(&self, tables: &mut Vec<TableDataRef>) {
93-
let total_tables = self.id_to_space.values().map(|s| s.table_num()).sum();
94-
tables.reserve(total_tables);
95-
for space in self.id_to_space.values() {
96-
space.list_all_tables(tables);
97-
}
98-
}
99-
100-
fn list_all_spaces(&self) -> Vec<SpaceRef> {
101-
self.id_to_space.values().cloned().collect()
102-
}
103-
}
104-
10570
pub struct SpaceStore {
10671
/// All spaces of the engine.
107-
spaces: RwLock<Spaces>,
72+
spaces: SpacesRef,
10873
/// Manifest (or meta) stores meta data of the engine instance.
10974
manifest: ManifestRef,
11075
/// Wal of all tables

analytic_engine/src/instance/open.rs

+30-8
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::{
99

1010
use common_types::schema::IndexInWriterSchema;
1111
use log::{debug, error, info, trace};
12+
use object_store::ObjectStoreRef;
1213
use snafu::ResultExt;
1314
use table_engine::engine::OpenTableRequest;
1415
use wal::{
@@ -22,17 +23,20 @@ use crate::{
2223
engine,
2324
instance::{
2425
self,
25-
engine::{ApplyMemTable, FlushTable, ReadMetaUpdate, ReadWal, RecoverTableData, Result},
26+
engine::{
27+
ApplyMemTable, FlushTable, OpenManifest, ReadMetaUpdate, ReadWal, RecoverTableData,
28+
Result,
29+
},
2630
flush_compaction::TableFlushOptions,
2731
mem_collector::MemUsageCollector,
2832
serial_executor::TableOpSerialExecutor,
2933
write::MemTableWriter,
30-
Instance, SpaceStore, Spaces,
34+
Instance, SpaceStore,
3135
},
32-
manifest::{meta_data::TableManifestData, LoadRequest, ManifestRef},
36+
manifest::{details::ManifestImpl, meta_data::TableManifestData, LoadRequest},
3337
payload::{ReadPayload, WalDecoder},
3438
row_iter::IterOptions,
35-
space::{Space, SpaceContext, SpaceId, SpaceRef},
39+
space::{Space, SpaceContext, SpaceId, SpaceRef, Spaces, TableSnapshotProviderImpl},
3640
sst::{
3741
factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef, ScanOptions},
3842
file::FilePurger,
@@ -42,18 +46,36 @@ use crate::{
4246

4347
const MAX_RECORD_BATCHES_IN_FLIGHT_WHEN_COMPACTION_READ: usize = 64;
4448

49+
pub(crate) struct ManifestStorages {
50+
pub wal_manager: WalManagerRef,
51+
pub oss_storage: ObjectStoreRef,
52+
}
53+
4554
impl Instance {
4655
/// Open a new instance
47-
pub async fn open(
56+
pub(crate) async fn open(
4857
ctx: OpenContext,
49-
manifest: ManifestRef,
58+
manifest_storages: ManifestStorages,
5059
wal_manager: WalManagerRef,
5160
store_picker: ObjectStorePickerRef,
5261
sst_factory: SstFactoryRef,
5362
) -> Result<Arc<Self>> {
63+
let spaces: Arc<RwLock<Spaces>> = Arc::new(RwLock::new(Spaces::default()));
64+
let table_snapshot_provider = Arc::new(TableSnapshotProviderImpl {
65+
spaces: spaces.clone(),
66+
});
67+
let manifest = ManifestImpl::open(
68+
ctx.config.manifest.clone(),
69+
manifest_storages.wal_manager,
70+
manifest_storages.oss_storage,
71+
table_snapshot_provider,
72+
)
73+
.await
74+
.context(OpenManifest)?;
75+
5476
let space_store = Arc::new(SpaceStore {
55-
spaces: RwLock::new(Spaces::default()),
56-
manifest,
77+
spaces,
78+
manifest: Arc::new(manifest),
5779
wal_manager: wal_manager.clone(),
5880
store_picker: store_picker.clone(),
5981
sst_factory,

0 commit comments

Comments
 (0)