Skip to content

Commit

Permalink
feat(node): Add tail header removal from store (#328)
Browse files Browse the repository at this point in the history
Signed-off-by: Mikołaj Florkiewicz <mikolaj@florkiewicz.me>
Co-authored-by: Yiannis Marangos <psyberbits@gmail.com>
  • Loading branch information
fl0rek and oblique authored Aug 13, 2024
1 parent 1579079 commit 7ba4d0b
Show file tree
Hide file tree
Showing 8 changed files with 465 additions and 125 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ blockstore = { workspace = true, features = ["indexeddb"] }
celestia-types = { workspace = true, features = ["wasm-bindgen"] }
getrandom = { version = "0.2.15", features = ["js"] }
gloo-timers = { version = "0.3.0", features = ["futures"] }
js-sys = "0.3.69"
libp2p = { workspace = true, features = [
"noise",
"wasm-bindgen",
Expand Down
69 changes: 66 additions & 3 deletions node/src/block_ranges.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,20 @@ impl BlockRanges {
Some(head)
}

/// Returns the tail (lowest) height and removes it from the ranges.
pub fn pop_tail(&mut self) -> Option<u64> {
let first = self.0.first_mut()?;
let tail = *first.start();

if first.len() == 1 {
self.0.remove(0);
} else {
*first = *first.start() + 1..=*first.end();
}

Some(tail)
}

/// Insert a new range.
///
/// This fails only if `range` is not valid. It allows inserting an overlapping range.
Expand Down Expand Up @@ -478,6 +492,20 @@ impl Sub<&BlockRanges> for BlockRanges {
}
}

impl Iterator for BlockRanges {
type Item = u64;

fn next(&mut self) -> Option<Self::Item> {
self.pop_tail()
}
}

impl DoubleEndedIterator for BlockRanges {
fn next_back(&mut self) -> Option<Self::Item> {
self.pop_head()
}
}

impl Display for BlockRanges {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[")?;
Expand Down Expand Up @@ -506,9 +534,7 @@ fn calc_overlap(
mod tests {
use super::*;

fn new_block_ranges<const N: usize>(ranges: [BlockRange; N]) -> BlockRanges {
BlockRanges::from_vec(ranges.into_iter().collect()).expect("invalid BlockRanges")
}
use crate::test_utils::new_block_ranges;

#[test]
fn range_len() {
Expand Down Expand Up @@ -704,6 +730,43 @@ mod tests {
assert_eq!(ranges.pop_head(), None);
}

#[test]
fn pop_tail() {
let mut ranges = new_block_ranges([]);
assert_eq!(ranges.pop_tail(), None);

let mut ranges = new_block_ranges([1..=4, 6..=8, 10..=10]);
assert_eq!(ranges.pop_tail(), Some(1));
assert_eq!(ranges.pop_tail(), Some(2));
assert_eq!(ranges.pop_tail(), Some(3));
assert_eq!(ranges.pop_tail(), Some(4));
assert_eq!(ranges.pop_tail(), Some(6));
assert_eq!(ranges.pop_tail(), Some(7));
assert_eq!(ranges.pop_tail(), Some(8));
assert_eq!(ranges.pop_tail(), Some(10));
assert_eq!(ranges.pop_tail(), None);
}

#[test]
fn block_ranges_iterator() {
let ranges = new_block_ranges([1..=5, 10..=15]);
let heights: Vec<_> = ranges.collect();
assert_eq!(heights, vec![1, 2, 3, 4, 5, 10, 11, 12, 13, 14, 15]);

let empty_heights: Vec<u64> = new_block_ranges([]).collect();
assert_eq!(empty_heights, Vec::<u64>::new())
}

#[test]
fn block_ranges_double_ended_iterator() {
let ranges = new_block_ranges([1..=5, 10..=15]);
let heights: Vec<_> = ranges.rev().collect();
assert_eq!(heights, vec![15, 14, 13, 12, 11, 10, 5, 4, 3, 2, 1]);

let empty_heights: Vec<u64> = new_block_ranges([]).collect();
assert_eq!(empty_heights, Vec::<u64>::new())
}

#[test]
fn validate_check() {
(1..=1).validate().unwrap();
Expand Down
108 changes: 107 additions & 1 deletion node/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ pub trait Store: Send + Sync + Debug {

/// Returns a list of accepted sampling ranges currently held in store.
async fn get_accepted_sampling_ranges(&self) -> Result<BlockRanges>;

/// Remove header with lowest height from the store.
async fn remove_last(&self) -> Result<u64>;
}

/// Representation of all the errors that can occur when interacting with the [`Store`].
Expand Down Expand Up @@ -230,7 +233,7 @@ impl From<tokio::task::JoinError> for StoreError {
// Needed for `Into<VerifiedExtendedHeaders>`
impl From<Infallible> for StoreError {
fn from(_: Infallible) -> Self {
// Infallable should not be possible to construct
// Infallible should not be possible to construct
unreachable!("Infallible failed")
}
}
Expand Down Expand Up @@ -331,6 +334,7 @@ mod tests {
// rstest only supports attributes which last segment is `test`
// https://docs.rs/rstest/0.18.2/rstest/attr.rstest.html#inject-test-attribute
use crate::test_utils::async_test as test;
use crate::test_utils::new_block_ranges;

#[test]
async fn converts_bounded_ranges() {
Expand Down Expand Up @@ -1015,6 +1019,81 @@ mod tests {
store.insert(fork.next()).await.unwrap_err();
}

#[rstest]
#[case::in_memory(new_in_memory_store())]
#[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
#[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
#[self::test]
async fn tail_removal_partial_range<S: Store>(
#[case]
#[future(awt)]
s: S,
) {
let store = s;
let headers = ExtendedHeaderGenerator::new().next_many(128);

store.insert(&headers[0..64]).await.unwrap();
store.insert(&headers[96..128]).await.unwrap();
assert_store(&store, &headers, new_block_ranges([1..=64, 97..=128])).await;

assert_eq!(store.remove_last().await.unwrap(), 1);
assert_store(&store, &headers, new_block_ranges([2..=64, 97..=128])).await;
}

#[rstest]
#[case::in_memory(new_in_memory_store())]
#[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
#[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
#[self::test]
async fn tail_removal_full_range<S: Store>(
#[case]
#[future(awt)]
s: S,
) {
let store = s;
let headers = ExtendedHeaderGenerator::new().next_many(128);

store.insert(&headers[0..1]).await.unwrap();
store.insert(&headers[65..128]).await.unwrap();
assert_store(&store, &headers, new_block_ranges([1..=1, 66..=128])).await;

assert_eq!(store.remove_last().await.unwrap(), 1);
assert_store(&store, &headers, new_block_ranges([66..=128])).await;
}

#[rstest]
#[case::in_memory(new_in_memory_store())]
#[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
#[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
#[self::test]
async fn tail_removal_remove_all<S: Store>(
#[case]
#[future(awt)]
s: S,
) {
let store = s;
let headers = ExtendedHeaderGenerator::new().next_many(66);

store.insert(&headers[..]).await.unwrap();
assert_store(&store, &headers, new_block_ranges([1..=66])).await;

for i in 1..=66 {
assert_eq!(store.remove_last().await.unwrap(), i);
}

assert!(matches!(
store.remove_last().await.unwrap_err(),
StoreError::NotFound
));

let stored_ranges = store.get_stored_header_ranges().await.unwrap();
assert!(stored_ranges.is_empty());

for h in 1..=66 {
assert!(!store.has_at(h).await);
}
}

/// Fills an empty store
async fn fill_store<S: Store>(store: &mut S, amount: u64) -> ExtendedHeaderGenerator {
assert!(!store.has_at(1).await, "Store is not empty");
Expand All @@ -1033,6 +1112,33 @@ mod tests {
InMemoryStore::new()
}

pub(crate) async fn assert_store<S: Store>(
store: &S,
headers: &[ExtendedHeader],
expected_ranges: BlockRanges,
) {
assert_eq!(
store.get_stored_header_ranges().await.unwrap(),
expected_ranges
);
for header in headers {
let height = header.height().value();
if expected_ranges.contains(height) {
assert_eq!(&store.get_by_height(height).await.unwrap(), header);
assert_eq!(&store.get_by_hash(&header.hash()).await.unwrap(), header);
} else {
assert!(matches!(
store.get_by_height(height).await.unwrap_err(),
StoreError::NotFound
));
assert!(matches!(
store.get_by_hash(&header.hash()).await.unwrap_err(),
StoreError::NotFound
));
}
}
}

#[cfg(not(target_arch = "wasm32"))]
async fn new_redb_store() -> RedbStore {
RedbStore::in_memory().await.unwrap()
Expand Down
46 changes: 42 additions & 4 deletions node/src/store/in_memory_store.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::hash_map::Entry as HashMapEntry;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt::Display;
use std::pin::pin;
Expand Down Expand Up @@ -133,6 +133,11 @@ impl InMemoryStore {
header_added_notifier: Notify::new(),
}
}

async fn remove_last(&self) -> Result<u64> {
let mut inner = self.inner.write().await;
inner.remove_last()
}
}

impl InMemoryStoreInner {
Expand Down Expand Up @@ -196,7 +201,7 @@ impl InMemoryStoreInner {
"inconsistency between headers table and ranges table"
);

let HashMapEntry::Vacant(headers_entry) = self.headers.entry(hash) else {
let Entry::Vacant(headers_entry) = self.headers.entry(hash) else {
// TODO: Remove this when we implement type-safe validation on insertion.
return Err(StoreInsertionError::HashExists(hash).into());
};
Expand Down Expand Up @@ -261,10 +266,10 @@ impl InMemoryStoreInner {
}

match self.sampling_data.entry(height) {
HashMapEntry::Vacant(entry) => {
Entry::Vacant(entry) => {
entry.insert(SamplingMetadata { status, cids });
}
HashMapEntry::Occupied(mut entry) => {
Entry::Occupied(mut entry) => {
let metadata = entry.get_mut();
metadata.status = status;

Expand Down Expand Up @@ -301,6 +306,35 @@ impl InMemoryStoreInner {

Ok(Some(metadata.clone()))
}

fn remove_last(&mut self) -> Result<u64> {
let Some(height) = self.header_ranges.tail() else {
return Err(StoreError::NotFound);
};

let Entry::Occupied(height_to_hash) = self.height_to_hash.entry(height) else {
return Err(StoreError::StoredDataError(format!(
"inconsistency between ranges and height_to_hash tables, height {height}"
)));
};

let hash = height_to_hash.get();
let Entry::Occupied(header) = self.headers.entry(*hash) else {
return Err(StoreError::StoredDataError(format!(
"inconsistency between header and height_to_hash tables, hash {hash}"
)));
};

// sampling data may or may not be there
self.sampling_data.remove(&height);

height_to_hash.remove_entry();
header.remove_entry();

self.header_ranges.pop_tail();

Ok(height)
}
}

#[async_trait]
Expand Down Expand Up @@ -392,6 +426,10 @@ impl Store for InMemoryStore {
async fn get_accepted_sampling_ranges(&self) -> Result<BlockRanges> {
Ok(self.get_accepted_sampling_ranges().await)
}

async fn remove_last(&self) -> Result<u64> {
self.remove_last().await
}
}

impl Default for InMemoryStore {
Expand Down
Loading

0 comments on commit 7ba4d0b

Please sign in to comment.