Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(node)!: Implement graceful shutdown #343

Merged
merged 20 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ celestia-tendermint-proto = "0.32.1"
#libp2p-core = { path = "../../rust-libp2p/core" }
#libp2p-swarm = { path = "../../rust-libp2p/swarm" }

blockstore = { git = "https://github.com/oblique/blockstore", rev = "73ed78bd138041451432354f20ef19b1a73fd866" }

# Uncomment this if you need debug symbols in release.
# Also check node-wasm's `Cargo.toml`.
#[profile.release]
Expand Down
3 changes: 2 additions & 1 deletion node-wasm/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::spawn_local;
use web_sys::{BroadcastChannel, MessageEvent, SharedWorker};

use lumina_node::blockstore::IndexedDbBlockstore;
use lumina_node::node::{Node, SyncingInfo};
use lumina_node::store::{IndexedDbStore, SamplingMetadata, Store};

Expand Down Expand Up @@ -49,7 +50,7 @@ pub enum WorkerError {
}

struct NodeWorker {
node: Node<IndexedDbStore>,
node: Node<IndexedDbBlockstore, IndexedDbStore>,
events_channel_name: String,
}

Expand Down
1 change: 1 addition & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ thiserror = "1.0.61"
tokio = { version = "1.38.0", features = ["macros", "sync"] }
tokio-util = "0.7.11"
tracing = "0.1.40"
void = "1.0.2"
web-time = "1.1.0"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
Expand Down
18 changes: 13 additions & 5 deletions node/src/daser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use tracing::{debug, error, warn};
use web_time::{Duration, Instant};

use crate::events::{EventPublisher, NodeEvent};
use crate::executor::spawn;
use crate::executor::{spawn, JoinHandle};
use crate::p2p::shwap::sample_cid;
use crate::p2p::{P2p, P2pError};
use crate::store::{BlockRanges, SamplingStatus, Store, StoreError};
Expand Down Expand Up @@ -71,6 +71,7 @@ pub enum DaserError {
/// Component responsible for data availability sampling of blocks from the network.
pub(crate) struct Daser {
cancellation_token: CancellationToken,
join_handle: JoinHandle,
}

/// Arguments used to configure the [`Daser`].
Expand All @@ -96,7 +97,7 @@ impl Daser {
let event_pub = args.event_pub.clone();
let mut worker = Worker::new(args, cancellation_token.child_token())?;

spawn(async move {
let join_handle = spawn(async move {
if let Err(e) = worker.run().await {
error!("Daser stopped because of a fatal error: {e}");

Expand All @@ -106,20 +107,27 @@ impl Daser {
}
});

Ok(Daser { cancellation_token })
Ok(Daser {
cancellation_token,
join_handle,
})
}

/// Stop the worker.
pub(crate) fn stop(&self) {
// Singal the Worker to stop.
// TODO: Should we wait for the Worker to stop?
self.cancellation_token.cancel();
}

/// Wait until worker is completely stopped.
pub(crate) async fn join(&self) {
self.join_handle.join().await;
}
}

impl Drop for Daser {
fn drop(&mut self) {
self.cancellation_token.cancel();
self.stop();
}
}

Expand Down
5 changes: 0 additions & 5 deletions node/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,6 @@ impl EventPublisher {
file_line: location.line(),
});
}

/// Returns if there are any active subscribers or not.
pub(crate) fn has_subscribers(&self) -> bool {
self.tx.receiver_count() > 0
}
}

impl EventSubscriber {
Expand Down
164 changes: 125 additions & 39 deletions node/src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,31 @@
use std::fmt::{self, Debug};
use std::future::Future;

use tokio::select;
use tokio_util::sync::CancellationToken;

use crate::utils::Token;

#[allow(unused_imports)]
pub(crate) use self::imp::{
sleep, spawn, spawn_cancellable, timeout, yield_now, Elapsed, Interval,
};

/// Naive `JoinHandle` implementation.
pub(crate) struct JoinHandle(Token);

impl JoinHandle {
pub(crate) async fn join(&self) {
self.0.triggered().await;
}
}

impl Debug for JoinHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("JoinHandle { .. }")
}
}

#[cfg(not(target_arch = "wasm32"))]
mod imp {
use super::*;
Expand All @@ -17,28 +35,48 @@ mod imp {
pub(crate) use tokio::time::{sleep, timeout};

#[track_caller]
pub(crate) fn spawn<F>(future: F)
pub(crate) fn spawn<F>(future: F) -> JoinHandle
where
F: Future<Output = ()> + Send + 'static,
{
tokio::spawn(future);
let token = Token::new();
let guard = token.trigger_drop_guard();

tokio::spawn(async move {
let _guard = guard;
future.await;
});

JoinHandle(token)
}

/// Spawn a cancellable task.
///
/// This will cancel the task in the highest layer and should not be used
/// if cancellation must happen in a point.
#[track_caller]
pub(crate) fn spawn_cancellable<F>(cancelation_token: CancellationToken, future: F)
pub(crate) fn spawn_cancellable<F>(
cancelation_token: CancellationToken,
future: F,
) -> JoinHandle
where
F: Future<Output = ()> + Send + 'static,
{
let token = Token::new();
let guard = token.trigger_drop_guard();

tokio::spawn(async move {
let _guard = guard;
select! {
// Run branches in order.
biased;

_ = cancelation_token.cancelled() => {}
_ = future => {}
}
});

JoinHandle(token)
}

pub(crate) struct Interval(tokio::time::Interval);
Expand Down Expand Up @@ -80,27 +118,47 @@ mod imp {

use super::*;

pub(crate) fn spawn<F>(future: F)
pub(crate) fn spawn<F>(future: F) -> JoinHandle
where
F: Future<Output = ()> + 'static,
{
wasm_bindgen_futures::spawn_local(future);
let token = Token::new();
let guard = token.trigger_drop_guard();

wasm_bindgen_futures::spawn_local(async move {
let _guard = guard;
future.await;
});

JoinHandle(token)
}

/// Spawn a cancellable task.
///
/// This will cancel the task in the highest layer and should not be used
/// if cancellation must happen in a point.
pub(crate) fn spawn_cancellable<F>(cancelation_token: CancellationToken, future: F)
pub(crate) fn spawn_cancellable<F>(
cancelation_token: CancellationToken,
future: F,
) -> JoinHandle
where
F: Future<Output = ()> + 'static,
{
let token = Token::new();
let guard = token.trigger_drop_guard();

wasm_bindgen_futures::spawn_local(async move {
let _guard = guard;
select! {
// Run branches in order.
biased;

_ = cancelation_token.cancelled() => {}
_ = future => {}
}
});

JoinHandle(token)
}

pub(crate) struct Interval(SendWrapper<IntervalStream>);
Expand Down Expand Up @@ -179,41 +237,69 @@ mod imp {
fn setTimeout(closure: &Closure<dyn FnMut()>, timeout: u32);
}

let yielded = Rc::new(Cell::new(false));
let waker = Rc::new(RefCell::new(None::<Waker>));

let wake_closure = {
let yielded = yielded.clone();
let waker = waker.clone();

Closure::new(move || {
yielded.set(true);
waker.borrow_mut().take().unwrap().wake();
let fut = async move {
let yielded = Rc::new(Cell::new(false));
let waker = Rc::new(RefCell::new(None::<Waker>));

let wake_closure = {
let yielded = yielded.clone();
let waker = waker.clone();

Closure::new(move || {
yielded.set(true);
waker.borrow_mut().take().unwrap().wake();
})
};

// Unlike `queueMicrotask` or a naive yield_now implementation (i.e. `wake()`
// and return `Poll::Pending` once), `setTimeout` closure will be executed by
// JavaScript's event loop.
//
// This has two main benefits:
//
// * Garbage collector will be executed.
// * We give time to JavaScript's tasks too.
//
// Ref: https://html.spec.whatwg.org/multipage/timers-and-user-prompts.html
setTimeout(&wake_closure, 0);

debug_assert!(!yielded.get(), "Closure called before reaching event loop");

poll_fn(|cx| {
if yielded.get() {
Poll::Ready(())
} else {
*waker.borrow_mut() = Some(cx.waker().to_owned());
Poll::Pending
}
})
.await;
};

// Unlike `queueMicrotask` or a naive yield_now implementation (i.e. `wake()`
// and return `Poll::Pending` once), `setTimeout` closure will be executed by
// JavaScript's event loop.
//
// This has two main benefits:
//
// * Garbage collector will be executed.
// * We give time to JavaScript's tasks too.
//
// Ref: https://html.spec.whatwg.org/multipage/timers-and-user-prompts.html
setTimeout(&wake_closure, 0);

debug_assert!(!yielded.get(), "Closure called before reaching event loop");

poll_fn(|cx| {
if yielded.get() {
Poll::Ready(())
} else {
*waker.borrow_mut() = Some(cx.waker().to_owned());
Poll::Pending
}
})
.await;
let fut = SendWrapper::new(fut);
fut.await;
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::async_test;
use std::time::Duration;
use web_time::Instant;

#[async_test]
async fn join_handle() {
let now = Instant::now();

let join_handle = spawn(async {
sleep(Duration::from_millis(10)).await;
});

join_handle.join().await;
assert!(now.elapsed() >= Duration::from_millis(10));

// This must return immediately.
join_handle.join().await;
}
}
Loading
Loading