Skip to content

Commit

Permalink
feat: automate WAL housekeeping procedure (#347)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Oct 12, 2024
1 parent 0fab1d1 commit 4384530
Show file tree
Hide file tree
Showing 11 changed files with 232 additions and 44 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/sync-preview/dolos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ is_testnet = true

[storage]
path = "data"
max_wal_history = 10000

[genesis]
byron_path = "byron.json"
Expand Down
16 changes: 12 additions & 4 deletions src/bin/dolos/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ pub fn open_wal(config: &crate::Config) -> Result<wal::redb::WalStore, Error> {

std::fs::create_dir_all(root).map_err(Error::storage)?;

let wal = wal::redb::WalStore::open(root.join("wal"), config.storage.wal_cache)
.map_err(Error::storage)?;
let wal = wal::redb::WalStore::open(
root.join("wal"),
config.storage.wal_cache,
config.storage.max_wal_history,
)
.map_err(Error::storage)?;

Ok(wal)
}
Expand All @@ -40,8 +44,12 @@ pub fn open_data_stores(config: &crate::Config) -> Result<Stores, Error> {

std::fs::create_dir_all(root).map_err(Error::storage)?;

let wal = wal::redb::WalStore::open(root.join("wal"), config.storage.wal_cache)
.map_err(Error::storage)?;
let wal = wal::redb::WalStore::open(
root.join("wal"),
config.storage.wal_cache,
config.storage.max_wal_history,
)
.map_err(Error::storage)?;

let ledger = state::redb::LedgerStore::open(root.join("ledger"), config.storage.ledger_cache)
.map_err(Error::storage)?
Expand Down
6 changes: 3 additions & 3 deletions src/bin/dolos/data/copy_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,21 @@ pub fn run(config: &crate::Config, args: &Args) -> miette::Result<()> {

let (source, _) = crate::common::open_data_stores(config).context("opening data stores")?;

let mut target = dolos::wal::redb::WalStore::open(&args.output, None)
let mut target = dolos::wal::redb::WalStore::open(&args.output, None, None)
.into_diagnostic()
.context("opening target WAL")?;

let since = match args.since {
Some(slot) => source
.approximate_slot(slot, 100)
.approximate_slot(slot, slot..slot + 200)
.into_diagnostic()
.context("finding initial slot")?,
None => None,
};

let until = match args.until {
Some(slot) => source
.approximate_slot(slot, 100)
.approximate_slot(slot, slot - 200..=slot)
.into_diagnostic()
.context("finding final slot")?,
None => None,
Expand Down
6 changes: 3 additions & 3 deletions src/bin/dolos/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ pub struct StorageConfig {
/// Size (in Mb) of memory allocated for ledger caching
ledger_cache: Option<usize>,

#[allow(dead_code)]
wal_size: Option<u64>,
/// Maximum number of slots (not blocks) to keep in the WAL
max_wal_history: Option<u64>,
}

impl Default for StorageConfig {
Expand All @@ -83,7 +83,7 @@ impl Default for StorageConfig {
path: PathBuf::from("data"),
wal_cache: None,
ledger_cache: None,
wal_size: None,
max_wal_history: None,
}
}
}
Expand Down
57 changes: 36 additions & 21 deletions src/sync/roll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,15 @@ pub type Cursor = (BlockSlot, BlockHash);
pub type UpstreamPort = gasket::messaging::InputPort<PullEvent>;
pub type DownstreamPort = gasket::messaging::OutputPort<RollEvent>;

const HOUSEKEEPING_INTERVAL: std::time::Duration = std::time::Duration::from_secs(60);

pub enum WorkUnit {
PullEvent(PullEvent),
Housekeeping,
}

#[derive(Stage)]
#[stage(name = "roll", unit = "PullEvent", worker = "Worker")]
#[stage(name = "roll", unit = "WorkUnit", worker = "Worker")]
pub struct Stage {
store: WalStore,

Expand All @@ -36,7 +43,7 @@ impl Stage {
}
}

fn process_pull_event(&mut self, unit: &PullEvent) -> Result<(), WorkerError> {
async fn process_pull_event(&mut self, unit: &PullEvent) -> Result<(), WorkerError> {
match unit {
PullEvent::RollForward(block) => {
let block = wal::RawBlock {
Expand Down Expand Up @@ -64,39 +71,47 @@ impl Stage {
}
}

self.downstream
.send(RollEvent::TipChanged.into())
.await
.or_panic()?;

Ok(())
}
}

pub struct Worker;
pub struct Worker {
housekeeping_timer: tokio::time::Interval,
}

impl Worker {}

#[async_trait::async_trait(?Send)]
impl gasket::framework::Worker<Stage> for Worker {
async fn bootstrap(_stage: &Stage) -> Result<Self, WorkerError> {
Ok(Worker)
Ok(Worker {
// TODO: make this interval user-configurable
housekeeping_timer: tokio::time::interval(HOUSEKEEPING_INTERVAL),
})
}

async fn schedule(
&mut self,
stage: &mut Stage,
) -> Result<WorkSchedule<PullEvent>, WorkerError> {
// TODO: define a pruning strategy for the WAL here

let msg = stage.upstream.recv().await.or_panic()?;

Ok(WorkSchedule::Unit(msg.payload))
async fn schedule(&mut self, stage: &mut Stage) -> Result<WorkSchedule<WorkUnit>, WorkerError> {
tokio::select! {
msg = stage.upstream.recv() => {
let msg = msg.or_panic()?;
Ok(WorkSchedule::Unit(WorkUnit::PullEvent(msg.payload)))
}
_ = self.housekeeping_timer.tick() => {
Ok(WorkSchedule::Unit(WorkUnit::Housekeeping))
}
}
}

async fn execute(&mut self, unit: &PullEvent, stage: &mut Stage) -> Result<(), WorkerError> {
stage.process_pull_event(unit)?;

stage
.downstream
.send(RollEvent::TipChanged.into())
.await
.or_panic()?;
async fn execute(&mut self, unit: &WorkUnit, stage: &mut Stage) -> Result<(), WorkerError> {
match unit {
WorkUnit::PullEvent(pull) => stage.process_pull_event(pull).await?,
WorkUnit::Housekeeping => stage.store.housekeeping().or_panic()?,
}

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/tests/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ fn test_mainnet_upstream() {
)
.unwrap();

let mut wal = WalStore::memory().unwrap();
let mut wal = WalStore::memory(None).unwrap();

wal.initialize_from_origin().unwrap();

Expand Down
10 changes: 10 additions & 0 deletions src/wal/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ pub trait WalReader: Clone {
.ok_or(WalError::PointNotFound(point.clone()))
}

fn find_start(&self) -> Result<Option<(LogSeq, ChainPoint)>, WalError> {
let start = self
.crawl_from(None)?
.filter_forward()
.map(|(seq, log)| (seq, (&log).into()))
.next();

Ok(start)
}

fn find_tip(&self) -> Result<Option<(LogSeq, ChainPoint)>, WalError> {
let tip = self
.crawl_from(None)?
Expand Down
Loading

0 comments on commit 4384530

Please sign in to comment.