diff --git a/Cargo.lock b/Cargo.lock index 0a5e507a..a3b9a6b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2021,7 +2021,7 @@ checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" dependencies = [ "hermit-abi", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.52.0", ] diff --git a/examples/sync-preview/dolos.toml b/examples/sync-preview/dolos.toml index bcafe688..13e08816 100644 --- a/examples/sync-preview/dolos.toml +++ b/examples/sync-preview/dolos.toml @@ -5,6 +5,7 @@ is_testnet = true [storage] path = "data" +max_wal_history = 10000 [genesis] byron_path = "byron.json" diff --git a/src/bin/dolos/common.rs b/src/bin/dolos/common.rs index b3a8e7ca..b2238c30 100644 --- a/src/bin/dolos/common.rs +++ b/src/bin/dolos/common.rs @@ -20,8 +20,12 @@ pub fn open_wal(config: &crate::Config) -> Result { std::fs::create_dir_all(root).map_err(Error::storage)?; - let wal = wal::redb::WalStore::open(root.join("wal"), config.storage.wal_cache) - .map_err(Error::storage)?; + let wal = wal::redb::WalStore::open( + root.join("wal"), + config.storage.wal_cache, + config.storage.max_wal_history, + ) + .map_err(Error::storage)?; Ok(wal) } @@ -40,8 +44,12 @@ pub fn open_data_stores(config: &crate::Config) -> Result { std::fs::create_dir_all(root).map_err(Error::storage)?; - let wal = wal::redb::WalStore::open(root.join("wal"), config.storage.wal_cache) - .map_err(Error::storage)?; + let wal = wal::redb::WalStore::open( + root.join("wal"), + config.storage.wal_cache, + config.storage.max_wal_history, + ) + .map_err(Error::storage)?; let ledger = state::redb::LedgerStore::open(root.join("ledger"), config.storage.ledger_cache) .map_err(Error::storage)? diff --git a/src/bin/dolos/data/copy_wal.rs b/src/bin/dolos/data/copy_wal.rs index 581487ff..3ffeb6bd 100644 --- a/src/bin/dolos/data/copy_wal.rs +++ b/src/bin/dolos/data/copy_wal.rs @@ -24,13 +24,13 @@ pub fn run(config: &crate::Config, args: &Args) -> miette::Result<()> { let (source, _) = crate::common::open_data_stores(config).context("opening data stores")?; - let mut target = dolos::wal::redb::WalStore::open(&args.output, None) + let mut target = dolos::wal::redb::WalStore::open(&args.output, None, None) .into_diagnostic() .context("opening target WAL")?; let since = match args.since { Some(slot) => source - .approximate_slot(slot, 100) + .approximate_slot(slot, slot..slot + 200) .into_diagnostic() .context("finding initial slot")?, None => None, @@ -38,7 +38,7 @@ pub fn run(config: &crate::Config, args: &Args) -> miette::Result<()> { let until = match args.until { Some(slot) => source - .approximate_slot(slot, 100) + .approximate_slot(slot, slot - 200..=slot) .into_diagnostic() .context("finding final slot")?, None => None, diff --git a/src/bin/dolos/main.rs b/src/bin/dolos/main.rs index 82865069..810f683b 100644 --- a/src/bin/dolos/main.rs +++ b/src/bin/dolos/main.rs @@ -73,8 +73,8 @@ pub struct StorageConfig { /// Size (in Mb) of memory allocated for ledger caching ledger_cache: Option, - #[allow(dead_code)] - wal_size: Option, + /// Maximum number of slots (not blocks) to keep in the WAL + max_wal_history: Option, } impl Default for StorageConfig { @@ -83,7 +83,7 @@ impl Default for StorageConfig { path: PathBuf::from("data"), wal_cache: None, ledger_cache: None, - wal_size: None, + max_wal_history: None, } } } diff --git a/src/sync/roll.rs b/src/sync/roll.rs index 8bdb0e98..881ab856 100644 --- a/src/sync/roll.rs +++ b/src/sync/roll.rs @@ -10,8 +10,15 @@ pub type Cursor = (BlockSlot, BlockHash); pub type UpstreamPort = gasket::messaging::InputPort; pub type DownstreamPort = gasket::messaging::OutputPort; +const HOUSEKEEPING_INTERVAL: std::time::Duration = std::time::Duration::from_secs(60); + +pub enum WorkUnit { + PullEvent(PullEvent), + Housekeeping, +} + #[derive(Stage)] -#[stage(name = "roll", unit = "PullEvent", worker = "Worker")] +#[stage(name = "roll", unit = "WorkUnit", worker = "Worker")] pub struct Stage { store: WalStore, @@ -36,7 +43,7 @@ impl Stage { } } - fn process_pull_event(&mut self, unit: &PullEvent) -> Result<(), WorkerError> { + async fn process_pull_event(&mut self, unit: &PullEvent) -> Result<(), WorkerError> { match unit { PullEvent::RollForward(block) => { let block = wal::RawBlock { @@ -64,39 +71,47 @@ impl Stage { } } + self.downstream + .send(RollEvent::TipChanged.into()) + .await + .or_panic()?; + Ok(()) } } -pub struct Worker; +pub struct Worker { + housekeeping_timer: tokio::time::Interval, +} impl Worker {} #[async_trait::async_trait(?Send)] impl gasket::framework::Worker for Worker { async fn bootstrap(_stage: &Stage) -> Result { - Ok(Worker) + Ok(Worker { + // TODO: make this interval user-configurable + housekeeping_timer: tokio::time::interval(HOUSEKEEPING_INTERVAL), + }) } - async fn schedule( - &mut self, - stage: &mut Stage, - ) -> Result, WorkerError> { - // TODO: define a pruning strategy for the WAL here - - let msg = stage.upstream.recv().await.or_panic()?; - - Ok(WorkSchedule::Unit(msg.payload)) + async fn schedule(&mut self, stage: &mut Stage) -> Result, WorkerError> { + tokio::select! { + msg = stage.upstream.recv() => { + let msg = msg.or_panic()?; + Ok(WorkSchedule::Unit(WorkUnit::PullEvent(msg.payload))) + } + _ = self.housekeeping_timer.tick() => { + Ok(WorkSchedule::Unit(WorkUnit::Housekeeping)) + } + } } - async fn execute(&mut self, unit: &PullEvent, stage: &mut Stage) -> Result<(), WorkerError> { - stage.process_pull_event(unit)?; - - stage - .downstream - .send(RollEvent::TipChanged.into()) - .await - .or_panic()?; + async fn execute(&mut self, unit: &WorkUnit, stage: &mut Stage) -> Result<(), WorkerError> { + match unit { + WorkUnit::PullEvent(pull) => stage.process_pull_event(pull).await?, + WorkUnit::Housekeeping => stage.store.housekeeping().or_panic()?, + } Ok(()) } diff --git a/src/tests/upstream.rs b/src/tests/upstream.rs index 6f63e8bc..bc2578ae 100644 --- a/src/tests/upstream.rs +++ b/src/tests/upstream.rs @@ -50,7 +50,7 @@ fn test_mainnet_upstream() { ) .unwrap(); - let mut wal = WalStore::memory().unwrap(); + let mut wal = WalStore::memory(None).unwrap(); wal.initialize_from_origin().unwrap(); diff --git a/src/wal/reader.rs b/src/wal/reader.rs index 1c2be1ae..fbcfd8c5 100644 --- a/src/wal/reader.rs +++ b/src/wal/reader.rs @@ -53,6 +53,16 @@ pub trait WalReader: Clone { .ok_or(WalError::PointNotFound(point.clone())) } + fn find_start(&self) -> Result, WalError> { + let start = self + .crawl_from(None)? + .filter_forward() + .map(|(seq, log)| (seq, (&log).into())) + .next(); + + Ok(start) + } + fn find_tip(&self) -> Result, WalError> { let tip = self .crawl_from(None)? diff --git a/src/wal/redb.rs b/src/wal/redb.rs index 22d8198a..d7e7c9ca 100644 --- a/src/wal/redb.rs +++ b/src/wal/redb.rs @@ -1,9 +1,8 @@ use bincode; use itertools::Itertools; -use log::info; use redb::{Range, ReadableTable, TableDefinition}; use std::{path::Path, sync::Arc}; -use tracing::{debug, warn}; +use tracing::{debug, info, warn}; use super::{ BlockSlot, ChainPoint, LogEntry, LogSeq, LogValue, RawBlock, WalError, WalReader, WalWriter, @@ -11,7 +10,10 @@ use super::{ impl redb::Value for LogValue { type SelfType<'a> = Self; - type AsBytes<'a> = Vec where Self: 'a; + type AsBytes<'a> + = Vec + where + Self: 'a; fn fixed_width() -> Option { None @@ -86,6 +88,7 @@ const DEFAULT_CACHE_SIZE_MB: usize = 50; #[derive(Clone)] pub struct WalStore { db: Arc, + max_slots: Option, tip_change: Arc, } @@ -119,19 +122,24 @@ impl WalStore { Ok(()) } - pub fn memory() -> Result { + pub fn memory(max_slots: Option) -> Result { let db = redb::Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?; let out = Self { db: Arc::new(db), tip_change: Arc::new(tokio::sync::Notify::new()), + max_slots, }; Ok(out) } - pub fn open(path: impl AsRef, cache_size: Option) -> Result { + pub fn open( + path: impl AsRef, + cache_size: Option, + max_slots: Option, + ) -> Result { let inner = redb::Database::builder() .set_repair_callback(|x| warn!(progress = x.progress() * 100f64, "wal db is repairing")) .set_cache_size(1024 * 1024 * cache_size.unwrap_or(DEFAULT_CACHE_SIZE_MB)) @@ -140,6 +148,7 @@ impl WalStore { let out = Self { db: Arc::new(inner), tip_change: Arc::new(tokio::sync::Notify::new()), + max_slots, }; Ok(out) @@ -186,6 +195,84 @@ impl WalStore { Ok(()) } + const MAX_PRUNE_SLOTS_PER_PASS: u64 = 10_000; + + /// Prunes the WAL history to maintain a maximum number of slots. + /// + /// This method attempts to remove older entries from the WAL to keep the + /// total number of slots within the specified `max_slots` limit. It + /// operates as follows: + /// + /// 1. Determines the start and last slots in the WAL. + /// 2. Calculates the number of slots that exceed the `max_slots` limit. + /// 3. If pruning is necessary, it removes entries older than a calculated + /// cutoff slot. + /// 4. Pruning is limited to a maximum of `MAX_PRUNE_SLOTS_PER_PASS` slots + /// per invocation to avoid long-running operations. + /// + /// # Arguments + /// + /// * `max_slots` - The maximum number of slots to retain in the WAL. + /// + /// # Returns + /// + /// Returns `Ok(())` if the operation was successful, or a `WalError` if an + /// error occurred. + /// + /// # Notes + /// + /// - If the WAL doesn't exceed the `max_slots` limit, no pruning occurs. + /// - This method is typically called periodically as part of housekeeping + /// operations. + /// - The actual number of slots pruned may be less than the calculated + /// excess to avoid long-running operations. + pub fn prune_history(&mut self, max_slots: u64) -> Result<(), WalError> { + let start_slot = match self.find_start()? { + Some((_, ChainPoint::Origin)) => 0, + Some((_, ChainPoint::Specific(slot, _))) => slot, + _ => { + debug!("no start point found, skipping housekeeping"); + return Ok(()); + } + }; + + let last_slot = match self.find_tip()? { + Some((_, ChainPoint::Specific(slot, _))) => slot, + _ => { + debug!("no tip found, skipping housekeeping"); + return Ok(()); + } + }; + + let delta = last_slot - start_slot - max_slots; + + debug!(delta, last_slot, start_slot, "wal history delta computed"); + + if delta <= max_slots { + debug!(delta, max_slots, "no pruning necessary"); + return Ok(()); + } + + let max_prune = core::cmp::min(delta, Self::MAX_PRUNE_SLOTS_PER_PASS); + + let prune_before = start_slot + max_prune; + + info!(cutoff_slot = prune_before, "pruning wal for excess history"); + + self.remove_before(prune_before)?; + + Ok(()) + } + + pub fn housekeeping(&mut self) -> Result<(), WalError> { + if let Some(max_slots) = self.max_slots { + info!(max_slots, "pruning wal for excess history"); + self.prune_history(max_slots)?; + } + + Ok(()) + } + /// Approximates the LogSeq for a given BlockSlot within a specified delta /// range. /// @@ -211,10 +298,19 @@ impl WalStore { pub fn approximate_slot( &self, target: BlockSlot, - max_delta: BlockSlot, + search_range: impl std::ops::RangeBounds, ) -> Result, WalError> { - let min_slot = (target - max_delta) as i128; - let max_slot = (target + max_delta) as i128; + let min_slot = match search_range.start_bound() { + std::ops::Bound::Included(x) => *x as i128, + std::ops::Bound::Excluded(x) => *x as i128 + 1, + std::ops::Bound::Unbounded => i128::MIN, + }; + + let max_slot = match search_range.end_bound() { + std::ops::Bound::Included(x) => *x as i128, + std::ops::Bound::Excluded(x) => *x as i128 - 1, + std::ops::Bound::Unbounded => i128::MAX, + }; let rx = self.db.begin_read()?; let table = rx.open_table(POS)?; @@ -230,6 +326,61 @@ impl WalStore { Ok(seq) } + /// Attempts to find an approximate LogSeq for a given BlockSlot with + /// retries. + /// + /// This function repeatedly calls `approximate_slot` with an expanding + /// search range until a suitable LogSeq is found or the maximum number + /// of retries is reached. + /// + /// # Arguments + /// + /// * `target` - The target BlockSlot to approximate. + /// * `search_range` - A closure that takes the current retry count and + /// returns a range of BlockSlots to search within. This allows for + /// dynamic expansion of the search range. + /// + /// # Returns + /// + /// Returns a Result containing an Option. If a suitable entry is + /// found within any of the attempted search ranges, it returns + /// Some(LogSeq), otherwise None. Returns an error if there's an issue + /// accessing the database. + /// + /// # Errors + /// + /// This function will return an error if there's an issue with database + /// operations during any of the approximation attempts. + /// + /// # Examples + /// + /// ``` + /// let result = wal.approximate_slot_with_retry( + /// slot, + /// |retry| slot - 100 * retry..=slot + 100 * retry + /// )?; + /// ``` + pub fn approximate_slot_with_retry( + &self, + target: BlockSlot, + search_range: F, + ) -> Result, WalError> + where + F: Fn(usize) -> R, + R: std::ops::RangeBounds, + { + for i in 1..10 { + let search_range = search_range(i); + let seq = self.approximate_slot(target, search_range)?; + + if let Some(seq) = seq { + return Ok(Some(seq)); + } + } + + Ok(None) + } + /// Removes all entries from the WAL before the specified slot. /// /// This function is used to trim the WAL by removing all entries that are @@ -261,7 +412,10 @@ impl WalStore { { let last_seq = self - .approximate_slot(slot, 40)? + .approximate_slot_with_retry(slot, |attempt| { + let start = slot - (20 * attempt as u64); + start..=slot + })? .ok_or(WalError::SlotNotFound(slot))?; debug!(last_seq, "found max sequence to remove"); diff --git a/src/wal/stream.rs b/src/wal/stream.rs index 817d8b0e..bb430c94 100644 --- a/src/wal/stream.rs +++ b/src/wal/stream.rs @@ -52,7 +52,7 @@ mod tests { #[tokio::test] async fn test_stream_waiting() { - let mut db = WalStore::memory().unwrap(); + let mut db = WalStore::memory(None).unwrap(); db.initialize_from_origin().unwrap(); diff --git a/src/wal/testing.rs b/src/wal/testing.rs index 489ac2e5..be8fe7ec 100644 --- a/src/wal/testing.rs +++ b/src/wal/testing.rs @@ -21,7 +21,7 @@ pub fn dummy_block_from_slot(slot: u64) -> RawBlock { } pub fn empty_db() -> redb::WalStore { - let mut wal = super::redb::WalStore::memory().unwrap(); + let mut wal = super::redb::WalStore::memory(None).unwrap(); wal.initialize_from_origin().unwrap();