-
-
Notifications
You must be signed in to change notification settings - Fork 348
/
Copy patharchiveStates.ts
124 lines (111 loc) Β· 4.7 KB
/
archiveStates.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import {Logger} from "@lodestar/utils";
import {SLOTS_PER_EPOCH} from "@lodestar/params";
import {Slot, Epoch} from "@lodestar/types";
import {computeEpochAtSlot, computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {CheckpointWithHex} from "@lodestar/fork-choice";
import {IBeaconDb} from "../../db/index.js";
import {IStateRegenerator} from "../regen/interface.js";
import {getStateSlotFromBytes} from "../../util/multifork.js";
/**
* Minimum number of epochs between single temp archived states
* These states will be pruned once a new state is persisted
*/
const PERSIST_TEMP_STATE_EVERY_EPOCHS = 32;
export interface StatesArchiverOpts {
/**
* Minimum number of epochs between archived states
*/
archiveStateEpochFrequency: number;
}
/**
* Archives finalized states from active bucket to archive bucket.
*
* Only the new finalized state is stored to disk
*/
export class StatesArchiver {
constructor(
private readonly regen: IStateRegenerator,
private readonly db: IBeaconDb,
private readonly logger: Logger,
private readonly opts: StatesArchiverOpts
) {}
/**
* Persist states every some epochs to
* - Minimize disk space, storing the least states possible
* - Minimize the sync progress lost on unexpected crash, storing temp state every few epochs
*
* At epoch `e` there will be states peristed at intervals of `PERSIST_STATE_EVERY_EPOCHS` = 32
* and one at `PERSIST_TEMP_STATE_EVERY_EPOCHS` = 1024
* ```
* | | | .
* epoch - 1024*2 epoch - 1024 epoch - 32 epoch
* ```
*/
async maybeArchiveState(finalized: CheckpointWithHex): Promise<void> {
const lastStoredSlot = await this.db.stateArchive.lastKey();
const lastStoredEpoch = computeEpochAtSlot(lastStoredSlot ?? 0);
const {archiveStateEpochFrequency} = this.opts;
if (finalized.epoch - lastStoredEpoch > Math.min(PERSIST_TEMP_STATE_EVERY_EPOCHS, archiveStateEpochFrequency)) {
await this.archiveState(finalized);
// Only check the current and previous intervals
const minEpoch = Math.max(
0,
(Math.floor(finalized.epoch / archiveStateEpochFrequency) - 1) * archiveStateEpochFrequency
);
const storedStateSlots = await this.db.stateArchive.keys({
lt: computeStartSlotAtEpoch(finalized.epoch),
gte: computeStartSlotAtEpoch(minEpoch),
});
const statesSlotsToDelete = computeStateSlotsToDelete(storedStateSlots, archiveStateEpochFrequency);
if (statesSlotsToDelete.length > 0) {
await this.db.stateArchive.batchDelete(statesSlotsToDelete);
}
// More logs to investigate the rss spike issue https://github.com/ChainSafe/lodestar/issues/5591
this.logger.verbose("Archived state completed", {
finalizedEpoch: finalized.epoch,
minEpoch,
storedStateSlots: storedStateSlots.join(","),
statesSlotsToDelete: statesSlotsToDelete.join(","),
});
}
}
/**
* Archives finalized states from active bucket to archive bucket.
* Only the new finalized state is stored to disk
*/
async archiveState(finalized: CheckpointWithHex): Promise<void> {
// the finalized state could be from to disk
const finalizedStateOrBytes = await this.regen.getCheckpointStateOrBytes(finalized);
const {rootHex} = finalized;
if (!finalizedStateOrBytes) {
throw Error(`No state in cache for finalized checkpoint state epoch #${finalized.epoch} root ${rootHex}`);
}
if (finalizedStateOrBytes instanceof Uint8Array) {
const slot = getStateSlotFromBytes(finalizedStateOrBytes);
await this.db.stateArchive.putBinary(slot, finalizedStateOrBytes);
this.logger.verbose("Archived finalized state bytes", {finalizedEpoch: finalized.epoch, slot, root: rootHex});
} else {
// state
await this.db.stateArchive.put(finalizedStateOrBytes.slot, finalizedStateOrBytes);
this.logger.verbose("Archived finalized state", {epoch: finalized.epoch, root: rootHex});
}
// don't delete states before the finalized state, auto-prune will take care of it
}
}
/**
* Keeps first epoch per interval of persistEveryEpochs, deletes the rest
*/
export function computeStateSlotsToDelete(storedStateSlots: Slot[], persistEveryEpochs: Epoch): Slot[] {
const persistEverySlots = persistEveryEpochs * SLOTS_PER_EPOCH;
const intervalsWithStates = new Set<number>();
const stateSlotsToDelete = new Set<number>();
for (const slot of storedStateSlots) {
const interval = Math.floor(slot / persistEverySlots);
if (intervalsWithStates.has(interval)) {
stateSlotsToDelete.add(slot);
} else {
intervalsWithStates.add(interval);
}
}
return Array.from(stateSlotsToDelete.values());
}