Skip to content

Commit

Permalink
Merge branch 'feature/offload-old-states' into holesky-recover-offloa…
Browse files Browse the repository at this point in the history
…d-states
  • Loading branch information
povi committed Feb 28, 2025
2 parents e7caccb + 57c8876 commit 4289cf3
Show file tree
Hide file tree
Showing 14 changed files with 274 additions and 112 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ binary_utils = { workspace = true }
bls = { workspace = true }
clock = { workspace = true }
criterion = { workspace = true }
database = { workspace = true }
easy-ext = { workspace = true }
eth2_cache_utils = { workspace = true }
eth2_libp2p = { workspace = true }
Expand Down
19 changes: 16 additions & 3 deletions benches/benches/fork_choice_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ use allocator as _;
use anyhow::Result;
use clock::Tick;
use criterion::{BatchSize, Criterion, Throughput};
use database::Database;
use easy_ext::ext;
use eth2_cache_utils::holesky::{self, CAPELLA_BEACON_STATE};
use execution_engine::NullExecutionEngine;
use fork_choice_control::{Storage, StorageMode, DEFAULT_ARCHIVAL_EPOCH_INTERVAL};
use fork_choice_store::{
ApplyBlockChanges, ApplyTickChanges, AttestationAction, AttestationItem, AttestationOrigin,
BlockAction, Store, StoreConfig, ValidAttestation,
Expand Down Expand Up @@ -63,11 +65,19 @@ impl Criterion {
.into_iter()
.exactly_one()?;

let storage = Arc::new(Storage::new(
config.clone_arc(),
Database::in_memory(),
DEFAULT_ARCHIVAL_EPOCH_INTERVAL,
StorageMode::Standard,
));

let mut store = Store::new(
config.clone_arc(),
StoreConfig::default(),
anchor_block,
anchor_state,
storage,
false,
false,
);
Expand Down Expand Up @@ -138,7 +148,7 @@ impl Criterion {
}
}

fn process_slot(store: &mut Store<impl Preset>, slot: Slot) -> Result<()> {
fn process_slot<P: Preset>(store: &mut Store<P, Storage<P>>, slot: Slot) -> Result<()> {
let Some(changes) = store.apply_tick(Tick::start_of_slot(slot))? else {
panic!("tick at slot {slot} should be later than the current one")
};
Expand All @@ -150,7 +160,10 @@ fn process_slot(store: &mut Store<impl Preset>, slot: Slot) -> Result<()> {
Ok(())
}

fn process_block<P: Preset>(store: &mut Store<P>, block: &Arc<SignedBeaconBlock<P>>) -> Result<()> {
fn process_block<P: Preset>(
store: &mut Store<P, Storage<P>>,
block: &Arc<SignedBeaconBlock<P>>,
) -> Result<()> {
let slot = block.message().slot();

let block_action = store.validate_block(
Expand Down Expand Up @@ -192,7 +205,7 @@ fn process_block<P: Preset>(store: &mut Store<P>, block: &Arc<SignedBeaconBlock<
}

fn process_attestation<P: Preset>(
store: &mut Store<P>,
store: &mut Store<P, Storage<P>>,
attestation: Arc<Attestation<P>>,
) -> Result<()> {
let slot = attestation.data().slot;
Expand Down
6 changes: 4 additions & 2 deletions fork_choice_control/src/block_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use types::{
traits::{BeaconBlock as _, SignedBeaconBlock as _},
};

use crate::Storage;

#[derive(Constructor)]
pub struct BlockProcessor<P: Preset> {
chain_config: Arc<ChainConfig>,
Expand Down Expand Up @@ -158,7 +160,7 @@ impl<P: Preset> BlockProcessor<P> {

pub fn validate_block_for_gossip(
&self,
store: &Store<P>,
store: &Store<P, Storage<P>>,
block: &Arc<SignedBeaconBlock<P>>,
) -> Result<Option<BlockAction<P>>> {
store.validate_block_for_gossip(block, |parent| {
Expand All @@ -178,7 +180,7 @@ impl<P: Preset> BlockProcessor<P> {

pub fn validate_block<E: ExecutionEngine<P> + Send>(
&self,
store: &Store<P>,
store: &Store<P, Storage<P>>,
block: &Arc<SignedBeaconBlock<P>>,
state_root_policy: StateRootPolicy,
execution_engine: E,
Expand Down
7 changes: 4 additions & 3 deletions fork_choice_control/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use crate::{

pub struct Controller<P: Preset, E, A, W: Wait> {
// The latest consistent snapshot of the store.
store_snapshot: Arc<ArcSwap<Store<P>>>,
store_snapshot: Arc<ArcSwap<Store<P, Storage<P>>>>,
block_processor: Arc<BlockProcessor<P>>,
execution_engine: E,
state_cache: Arc<StateCacheProcessor<P>>,
Expand Down Expand Up @@ -115,6 +115,7 @@ where
store_config,
anchor_block,
anchor_state,
storage.clone_arc(),
finished_initial_forward_sync,
finished_back_sync,
);
Expand Down Expand Up @@ -546,11 +547,11 @@ where
self.store_snapshot().store_config()
}

pub(crate) fn store_snapshot(&self) -> Guard<Arc<Store<P>>> {
pub(crate) fn store_snapshot(&self) -> Guard<Arc<Store<P, Storage<P>>>> {
self.store_snapshot.load()
}

pub(crate) fn owned_store_snapshot(&self) -> Arc<Store<P>> {
pub(crate) fn owned_store_snapshot(&self) -> Arc<Store<P, Storage<P>>> {
self.store_snapshot.load_full()
}

Expand Down
46 changes: 40 additions & 6 deletions fork_choice_control/src/mutator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ use crate::{

#[expect(clippy::struct_field_names)]
pub struct Mutator<P: Preset, E, W, TS, PS, LS, NS, SS, VS> {
store: Arc<Store<P>>,
store_snapshot: Arc<ArcSwap<Store<P>>>,
store: Arc<Store<P, Storage<P>>>,
store_snapshot: Arc<ArcSwap<Store<P, Storage<P>>>>,
state_cache: Arc<StateCacheProcessor<P>>,
block_processor: Arc<BlockProcessor<P>>,
event_channels: Arc<EventChannels>,
Expand Down Expand Up @@ -136,7 +136,7 @@ where
{
#[expect(clippy::too_many_arguments)]
pub fn new(
store_snapshot: Arc<ArcSwap<Store<P>>>,
store_snapshot: Arc<ArcSwap<Store<P, Storage<P>>>>,
state_cache: Arc<StateCacheProcessor<P>>,
block_processor: Arc<BlockProcessor<P>>,
event_channels: Arc<EventChannels>,
Expand Down Expand Up @@ -1468,8 +1468,42 @@ where
if misc::is_epoch_start::<P>(block.message().slot()) {
info!("unloading old beacon states (head slot: {head_slot})");

self.store_mut()
let unloaded = self
.store_mut()
.unload_old_states(unfinalized_states_in_memory);

let unloaded_checkpoint_states = self
.store_mut()
.unload_checkpoint_states(unfinalized_states_in_memory);

let store = self.owned_store();
let storage = self.storage.clone_arc();
let wait_group = wait_group.clone();

Builder::new()
.name("store-unloader".to_owned())
.spawn(move || {
debug!("persisting unloaded old beacon states…");

let states_with_block_roots = unloaded
.iter()
.map(|chain_link| (chain_link.state(&store), chain_link.block_root))
.chain(unloaded_checkpoint_states);

match storage.append_states(states_with_block_roots) {
Ok(slots) => {
debug!(
"unloaded old beacon states persisted \
(state slots: {slots:?})",
)
}
Err(error) => {
error!("persisting unloaded old beacon states to storage failed: {error:?}")
}
}

drop(wait_group);
})?;
}

let processing_duration = insertion_time.duration_since(submission_time);
Expand Down Expand Up @@ -2431,7 +2465,7 @@ where
self.thread_pool.spawn(task);
}

fn store_mut(&mut self) -> &mut Store<P> {
fn store_mut(&mut self) -> &mut Store<P, Storage<P>> {
self.store.make_mut()
}

Expand All @@ -2440,7 +2474,7 @@ where
// faster to clone a `Store` with all the `Arc`s inside it and allocate another `Arc`.
//
// As a result, this method should only be called when `Mutator.store` is in a consistent state.
fn owned_store(&self) -> Arc<Store<P>> {
fn owned_store(&self) -> Arc<Store<P, Storage<P>>> {
self.store.clone_arc()
}

Expand Down
2 changes: 1 addition & 1 deletion fork_choice_control/src/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ pub struct BlockWithRoot<P: Preset> {
pub struct Snapshot<'storage, P: Preset> {
// Use a `Guard` instead of an owned snapshot unlike in tasks based on the intuition that
// `Snapshot`s will be less common than tasks.
store_snapshot: Guard<Arc<Store<P>>>,
store_snapshot: Guard<Arc<Store<P, Storage<P>>>>,
state_cache: Arc<StateCacheProcessor<P>>,
storage: &'storage Storage<P>,
}
Expand Down
40 changes: 33 additions & 7 deletions fork_choice_control/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,25 +53,26 @@ pub enum StateLoadStrategy<P: Preset> {
}

#[expect(clippy::struct_field_names)]
#[derive(Clone)]
pub struct Storage<P> {
config: Arc<Config>,
pub(crate) database: Database,
pub(crate) database: Arc<Database>,
pub(crate) archival_epoch_interval: NonZeroU64,
storage_mode: StorageMode,
phantom: PhantomData<P>,
}

impl<P: Preset> Storage<P> {
#[must_use]
pub const fn new(
pub fn new(
config: Arc<Config>,
database: Database,
archival_epoch_interval: NonZeroU64,
storage_mode: StorageMode,
) -> Self {
Self {
config,
database,
database: Arc::new(database),
archival_epoch_interval,
storage_mode,
phantom: PhantomData,
Expand Down Expand Up @@ -232,7 +233,7 @@ impl<P: Preset> Storage<P> {
&self,
unfinalized: impl Iterator<Item = &'cl ChainLink<P>>,
finalized: impl DoubleEndedIterator<Item = &'cl ChainLink<P>>,
store: &Store<P>,
store: &Store<P, Self>,
) -> Result<AppendedBlockSlots> {
let mut slots = AppendedBlockSlots::default();
let mut store_head_slot = 0;
Expand Down Expand Up @@ -363,6 +364,25 @@ impl<P: Preset> Storage<P> {
Ok(persisted_blob_ids)
}

pub(crate) fn append_states(
&self,
states_with_block_roots: impl Iterator<Item = (Arc<BeaconState<P>>, H256)>,
) -> Result<Vec<Slot>> {
let mut slots = vec![];
let mut batch = vec![];

for (state, block_root) in states_with_block_roots {
if !self.contains_key(StateByBlockRoot(block_root))? {
slots.push(state.slot());
batch.push(serialize(StateByBlockRoot(block_root), state)?);
}
}

self.database.put_batch(batch)?;

Ok(slots)
}

pub(crate) fn blob_sidecar_by_id(
&self,
blob_id: BlobIdentifier,
Expand Down Expand Up @@ -480,7 +500,7 @@ impl<P: Preset> Storage<P> {
Ok(None)
}

pub(crate) fn genesis_block_root(&self, store: &Store<P>) -> Result<H256> {
pub(crate) fn genesis_block_root(&self, store: &Store<P, Self>) -> Result<H256> {
self.block_root_by_slot_with_store(store, GENESIS_SLOT)?
.ok_or(Error::GenesisBlockRootNotFound)
.map_err(Into::into)
Expand Down Expand Up @@ -523,7 +543,7 @@ impl<P: Preset> Storage<P> {
// Like `block_root_by_slot`, but looks for the root in `store` first.
pub(crate) fn block_root_by_slot_with_store(
&self,
store: &Store<P>,
store: &Store<P, Self>,
slot: Slot,
) -> Result<Option<H256>> {
if let Some(chain_link) = store.chain_link_before_or_at(slot) {
Expand Down Expand Up @@ -641,7 +661,7 @@ impl<P: Preset> Storage<P> {

pub(crate) fn dependent_root(
&self,
store: &Store<P>,
store: &Store<P, Self>,
state: &BeaconState<P>,
epoch: Epoch,
) -> Result<H256> {
Expand Down Expand Up @@ -880,6 +900,12 @@ impl<P: Preset> Storage<P> {
}
}

impl<P: Preset> fork_choice_store::Storage<P> for Storage<P> {
fn stored_state_by_block_root(&self, block_root: H256) -> Result<Option<Arc<BeaconState<P>>>> {
self.state_by_block_root(block_root)
}
}

#[derive(Default, Debug)]
pub struct AppendedBlockSlots {
pub finalized: Vec<Slot>,
Expand Down
Loading

0 comments on commit 4289cf3

Please sign in to comment.