Skip to content

Commit 8ff80ed

Browse files
committed
split Snapshotter to Snapshotter and SnapshotRecoverer, remove the now unused SnapshotBuilder.
1 parent 564352e commit 8ff80ed

File tree

3 files changed

+43
-76
lines changed

3 files changed

+43
-76
lines changed

analytic_engine/src/instance/open.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ use crate::{
5151
const MAX_RECORD_BATCHES_IN_FLIGHT_WHEN_COMPACTION_READ: usize = 64;
5252

5353
pub(crate) struct ManifestStorages {
54-
pub(crate) wal_manager: WalManagerRef,
55-
pub(crate) oss_storage: ObjectStoreRef,
54+
pub wal_manager: WalManagerRef,
55+
pub oss_storage: ObjectStoreRef,
5656
}
5757

5858
impl Instance {

analytic_engine/src/manifest/details.rs

+41-67
Original file line numberDiff line numberDiff line change
@@ -234,24 +234,30 @@ impl TableSnapshotProvider for TableSnapshotProviderImpl {
234234
}
235235
}
236236

237-
/// Snapshot builder
238-
#[async_trait]
239-
trait SnapshotBuilder {
240-
async fn build(&self) -> Result<Option<Snapshot>>;
241-
}
242-
243-
/// Storage based snapshot builder
237+
/// Snapshot recoverer
238+
///
239+
/// Usually, it will recover the snapshot from storage(like disk, oss, etc).
240+
// TODO: remove `LogStore` and related operations, it should be called directly but not in the
241+
// `SnapshotReoverer`.
244242
#[derive(Debug, Clone)]
245-
struct StorageSnapshotBuilder<LogStore, SnapshotStore> {
243+
struct SnapshotRecoverer<LogStore, SnapshotStore> {
246244
log_store: LogStore,
247245
snapshot_store: SnapshotStore,
248246
}
249247

250-
impl<LogStore, SnapshotStore> StorageSnapshotBuilder<LogStore, SnapshotStore>
248+
impl<LogStore, SnapshotStore> SnapshotRecoverer<LogStore, SnapshotStore>
251249
where
252250
LogStore: MetaUpdateLogStore + Send + Sync,
253251
SnapshotStore: MetaUpdateSnapshotStore + Send + Sync,
254252
{
253+
async fn recover(&self) -> Result<Option<Snapshot>> {
254+
// Load the current snapshot first.
255+
match self.snapshot_store.load().await? {
256+
Some(v) => Ok(Some(self.create_latest_snapshot_with_prev(v).await?)),
257+
None => self.create_latest_snapshot_without_prev().await,
258+
}
259+
}
260+
255261
async fn create_latest_snapshot_with_prev(&self, prev_snapshot: Snapshot) -> Result<Snapshot> {
256262
let log_start_boundary = ReadBoundary::Excluded(prev_snapshot.end_seq);
257263
let mut reader = self.log_store.scan(log_start_boundary).await?;
@@ -279,37 +285,34 @@ where
279285

280286
let mut latest_seq = SequenceNumber::MIN;
281287
let mut manifest_data_builder = TableManifestDataBuilder::default();
288+
let mut has_logs = false;
282289
while let Some((seq, update)) = reader.next_update().await? {
283290
latest_seq = seq;
284291
manifest_data_builder
285292
.apply_update(update)
286293
.context(ApplyUpdate)?;
294+
has_logs = true;
287295
}
288296

289-
Ok(Some(Snapshot {
290-
end_seq: latest_seq,
291-
data: manifest_data_builder.build(),
292-
}))
293-
}
294-
}
295-
296-
#[async_trait]
297-
impl<LogStore, SnapshotStore> SnapshotBuilder for StorageSnapshotBuilder<LogStore, SnapshotStore>
298-
where
299-
LogStore: MetaUpdateLogStore + Send + Sync,
300-
SnapshotStore: MetaUpdateSnapshotStore + Send + Sync,
301-
{
302-
async fn build(&self) -> Result<Option<Snapshot>> {
303-
// Load the current snapshot first.
304-
match self.snapshot_store.load().await? {
305-
Some(v) => Ok(Some(self.create_latest_snapshot_with_prev(v).await?)),
306-
None => self.create_latest_snapshot_without_prev().await,
297+
if has_logs {
298+
Ok(Some(Snapshot {
299+
end_seq: latest_seq,
300+
data: manifest_data_builder.build(),
301+
}))
302+
} else {
303+
Ok(None)
307304
}
308305
}
309306
}
310-
/// Memory based snapshot builder
307+
308+
/// Snapshot creator
309+
///
310+
/// Usually, it will get snapshot from memory, and store them to storage(like
311+
/// disk, oss, etc).
312+
// TODO: remove `LogStore` and related operations, it should be called directly but not in the
313+
// `Snapshotter`.
311314
#[derive(Debug, Clone)]
312-
struct MemorySnapshotBuilder<LogStore, SnapshotStore> {
315+
struct Snapshotter<LogStore, SnapshotStore> {
313316
log_store: LogStore,
314317
snapshot_store: SnapshotStore,
315318
end_seq: SequenceNumber,
@@ -318,14 +321,13 @@ struct MemorySnapshotBuilder<LogStore, SnapshotStore> {
318321
table_id: TableId,
319322
}
320323

321-
#[async_trait]
322-
impl<LogStore, SnapshotStore> SnapshotBuilder for MemorySnapshotBuilder<LogStore, SnapshotStore>
324+
impl<LogStore, SnapshotStore> Snapshotter<LogStore, SnapshotStore>
323325
where
324326
LogStore: MetaUpdateLogStore + Send + Sync,
325327
SnapshotStore: MetaUpdateSnapshotStore + Send + Sync,
326328
{
327329
/// Create a latest snapshot of the current logs.
328-
async fn build(&self) -> Result<Option<Snapshot>> {
330+
async fn snapshot(&self) -> Result<Option<Snapshot>> {
329331
// Get snapshot data from memory.
330332
let table_snapshot_opt = self
331333
.snapshot_data_provider
@@ -454,17 +456,15 @@ impl ManifestImpl {
454456
let snapshot_store =
455457
ObjectStoreBasedSnapshotStore::new(space_id, table_id, self.store.clone());
456458
let end_seq = self.wal_manager.sequence_num(location).await.unwrap();
457-
let memory_snapshot_builder = MemorySnapshotBuilder {
459+
let snapshotter = Snapshotter {
458460
log_store,
459461
snapshot_store,
460462
end_seq,
461463
snapshot_data_provider: self.snap_data_provider.clone(),
462464
space_id,
463465
table_id,
464466
};
465-
let snapshotter = Snapshotter {
466-
snapshot_builder: memory_snapshot_builder,
467-
};
467+
468468
let snapshot = snapshotter.snapshot().await?.map(|v| {
469469
self.decrease_num_updates();
470470
v
@@ -524,14 +524,11 @@ impl Manifest for ManifestImpl {
524524
load_req.table_id,
525525
self.store.clone(),
526526
);
527-
let storage_snapshot_builder = StorageSnapshotBuilder {
527+
let reoverer = SnapshotRecoverer {
528528
log_store,
529529
snapshot_store,
530530
};
531-
let snapshotter = Snapshotter {
532-
snapshot_builder: storage_snapshot_builder,
533-
};
534-
let snapshot = snapshotter.snapshot().await?;
531+
let snapshot = reoverer.recover().await?;
535532

536533
Ok(snapshot.and_then(|v| v.data))
537534
}
@@ -708,11 +705,6 @@ impl MetaUpdateLogStore for WalBasedLogStore {
708705
}
709706
}
710707

711-
#[derive(Debug, Clone)]
712-
struct Snapshotter<Builder> {
713-
snapshot_builder: Builder,
714-
}
715-
716708
/// The snapshot for the current logs.
717709
#[derive(Debug, Clone, PartialEq)]
718710
struct Snapshot {
@@ -791,18 +783,6 @@ impl From<Snapshot> for manifest_pb::Snapshot {
791783
}
792784
}
793785

794-
impl<Builder> Snapshotter<Builder>
795-
where
796-
Builder: SnapshotBuilder + Send + Sync,
797-
{
798-
/// Do snapshot for the current logs including:
799-
/// - saving the snapshot.
800-
/// - deleting the expired logs.
801-
pub async fn snapshot(&self) -> Result<Option<Snapshot>> {
802-
self.snapshot_builder.build().await
803-
}
804-
}
805-
806786
#[cfg(test)]
807787
mod tests {
808788
use std::{path::PathBuf, sync::Arc, vec};
@@ -1529,32 +1509,26 @@ mod tests {
15291509
table_id: TableId,
15301510
) -> Option<Snapshot> {
15311511
let end_seq = log_store.next_seq() - 1;
1532-
let memory_builder = MemorySnapshotBuilder {
1512+
let snapshotter = Snapshotter {
15331513
log_store: log_store.clone(),
15341514
snapshot_store: snapshot_store.clone(),
15351515
end_seq,
15361516
snapshot_data_provider: snapshot_provider,
15371517
space_id: 0,
15381518
table_id,
15391519
};
1540-
let snapshotter = Snapshotter {
1541-
snapshot_builder: memory_builder,
1542-
};
15431520
snapshotter.snapshot().await.unwrap()
15441521
}
15451522

15461523
async fn recover_snapshot(
15471524
log_store: &MemLogStore,
15481525
snapshot_store: &MemSnapshotStore,
15491526
) -> Option<Snapshot> {
1550-
let storage_builder = StorageSnapshotBuilder {
1527+
let recoverer = SnapshotRecoverer {
15511528
log_store: log_store.clone(),
15521529
snapshot_store: snapshot_store.clone(),
15531530
};
1554-
let snapshotter = Snapshotter {
1555-
snapshot_builder: storage_builder,
1556-
};
1557-
snapshotter.snapshot().await.unwrap()
1531+
recoverer.recover().await.unwrap()
15581532
}
15591533

15601534
#[test]

analytic_engine/src/setup.rs

-7
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,6 @@ pub struct EngineBuilder<'a> {
111111
impl<'a> EngineBuilder<'a> {
112112
pub async fn build(self) -> Result<TableEngineRef> {
113113
let opened_storages = open_storage(self.config.storage.clone()).await?;
114-
// let manifest = ManifestImpl::open(
115-
// self.config.manifest.clone(),
116-
// self.opened_wals.manifest_wal.clone(),
117-
// opened_storages.default_store().clone(),
118-
// )
119-
// .await
120-
// .context(OpenManifest)?;
121114
let manifest_storages = ManifestStorages {
122115
wal_manager: self.opened_wals.manifest_wal.clone(),
123116
oss_storage: opened_storages.default_store().clone(),

0 commit comments

Comments
 (0)