Skip to content

Commit

Permalink
make spawns cancellable
Browse files Browse the repository at this point in the history
  • Loading branch information
oblique committed Jul 20, 2024
1 parent 54b5007 commit c4856f5
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 26 deletions.
5 changes: 0 additions & 5 deletions node/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,6 @@ impl EventPublisher {
file_line: location.line(),
});
}

/// Returns if there are any active subscribers or not.
pub(crate) fn has_subscribers(&self) -> bool {
self.tx.receiver_count() > 0
}
}

impl EventSubscriber {
Expand Down
35 changes: 14 additions & 21 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ use celestia_types::ExtendedHeader;
use libp2p::identity::Keypair;
use libp2p::swarm::NetworkInfo;
use libp2p::{Multiaddr, PeerId};
use tokio::select;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use tracing::warn;

use crate::daser::{Daser, DaserArgs};
use crate::events::{EventChannel, EventSubscriber, NodeEvent};
use crate::executor::spawn;
use crate::executor::spawn_cancellable;
use crate::p2p::{P2p, P2pArgs};
use crate::store::{SamplingMetadata, Store, StoreError};
use crate::syncer::{Syncer, SyncerArgs};
Expand Down Expand Up @@ -141,32 +140,26 @@ where
event_pub: event_channel.publisher(),
})?);

// spawn the task that will stop the services when the fraud is detected
let network_compromised_token = p2p.get_network_compromised_token().await?;
let tasks_cancellation_token = CancellationToken::new();

spawn({
// spawn the task that will stop the services when the fraud is detected
spawn_cancellable(tasks_cancellation_token.child_token(), {
let network_compromised_token = p2p.get_network_compromised_token().await?;
let syncer = syncer.clone();
let daser = daser.clone();
let tasks_cancellation_token = tasks_cancellation_token.child_token();
let event_pub = event_channel.publisher();

async move {
select! {
_ = tasks_cancellation_token.cancelled() => (),
_ = network_compromised_token.triggered() => {
syncer.stop();
daser.stop();

if event_pub.has_subscribers() {
event_pub.send(NodeEvent::NetworkCompromised);
} else {
// This is a very important message and we want to log it if user
// does not consume our events.
warn!("{}", NodeEvent::NetworkCompromised);
}
}
}
network_compromised_token.triggered().await;

// Network compromised! Stop Syncer and Daser.
syncer.stop();
daser.stop();

event_pub.send(NodeEvent::NetworkCompromised);
// This is a very important message and we want to log it even
// if user consumes our events.
warn!("{}", NodeEvent::NetworkCompromised);
}
});

Expand Down
2 changes: 2 additions & 0 deletions node/src/p2p/header_ex/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ where
// Choose the best HEAD.
//
// Algorithm: https://github.com/celestiaorg/go-header/blob/e50090545cc7e049d2f965d2b5c773eaa4a2c0b2/p2p/exchange.go#L357-L381
// TODO: make it cancellable
spawn(async move {
let mut resps: Vec<_> = join_all(rxs)
.await
Expand Down Expand Up @@ -200,6 +201,7 @@ where
state.request.amount
);

// TODO: make it cancellable. return err to the user on cancel.
spawn(async move {
match decode_and_verify_responses(&state.request, &responses).await {
Ok(headers) => {
Expand Down
4 changes: 4 additions & 0 deletions node/src/p2p/header_ex/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ where
let store = self.store.clone();
let tx = self.tx.clone();

// TODO: make it cancellable
spawn(async move {
let response = store
.get_head()
Expand All @@ -132,6 +133,7 @@ where
let store = self.store.clone();
let tx = self.tx.clone();

// TODO: make it cancellable
spawn(async move {
let response = store
.get_by_hash(&hash)
Expand All @@ -147,6 +149,7 @@ where
let store = self.store.clone();
let tx = self.tx.clone();

// TODO: make it cancellable
spawn(async move {
let amount = amount.min(MAX_HEADERS_AMOUNT_RESPONSE);
let mut responses = vec![];
Expand Down Expand Up @@ -179,6 +182,7 @@ where
{
let tx = self.tx.clone();

// TODO: make it cancellable
spawn(async move {
let _ = tx.send(response).await;
});
Expand Down

0 comments on commit c4856f5

Please sign in to comment.