Skip to content

Commit

Permalink
refactor: use NetworkHandle to access num connected peers (#1541)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse authored Feb 24, 2023
1 parent 5844ce1 commit 3589879
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 106 deletions.
4 changes: 2 additions & 2 deletions bin/reth/src/chain/import.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
dirs::{ConfigPath, DbPath, PlatformPath},
node::{handle_events, NodeEvent},
node::events::{handle_events, NodeEvent},
};
use clap::{crate_version, Parser};
use eyre::Context;
Expand Down Expand Up @@ -109,7 +109,7 @@ impl ImportCommand {
let (mut pipeline, events) =
self.build_import_pipeline(config, db.clone(), &consensus, file_client).await?;

tokio::spawn(handle_events(events));
tokio::spawn(handle_events(None, events));

// Run pipeline
info!(target: "reth::cli", "Starting sync pipeline");
Expand Down
118 changes: 118 additions & 0 deletions bin/reth/src/node/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
//! Support for handling events emitted by node components.
use futures::{Stream, StreamExt};
use reth_network::{NetworkEvent, NetworkHandle};
use reth_network_api::PeersInfo;
use reth_primitives::BlockNumber;
use reth_stages::{PipelineEvent, StageId};
use std::time::Duration;
use tracing::{info, warn};

/// The current high-level state of the node.
struct NodeState {
/// Connection to the network
network: Option<NetworkHandle>,
/// The stage currently being executed.
current_stage: Option<StageId>,
/// The current checkpoint of the executing stage.
current_checkpoint: BlockNumber,
}

impl NodeState {
fn new(network: Option<NetworkHandle>) -> Self {
Self { network, current_stage: None, current_checkpoint: 0 }
}

fn num_connected_peers(&self) -> usize {
self.network.as_ref().map(|net| net.num_connected_peers()).unwrap_or_default()
}

/// Processes an event emitted by the pipeline
async fn handle_pipeline_event(&mut self, event: PipelineEvent) {
match event {
PipelineEvent::Running { stage_id, stage_progress } => {
let notable = self.current_stage.is_none();
self.current_stage = Some(stage_id);
self.current_checkpoint = stage_progress.unwrap_or_default();

if notable {
info!(target: "reth::cli", stage = %stage_id, from = stage_progress, "Executing stage");
}
}
PipelineEvent::Ran { stage_id, result } => {
let notable = result.stage_progress > self.current_checkpoint;
self.current_checkpoint = result.stage_progress;
if result.done {
self.current_stage = None;
info!(target: "reth::cli", stage = %stage_id, checkpoint = result.stage_progress, "Stage finished executing");
} else if notable {
info!(target: "reth::cli", stage = %stage_id, checkpoint = result.stage_progress, "Stage committed progress");
}
}
_ => (),
}
}

async fn handle_network_event(&mut self, event: NetworkEvent) {
match event {
NetworkEvent::SessionEstablished { peer_id, status, .. } => {
info!(target: "reth::cli", connected_peers = self.num_connected_peers(), peer_id = %peer_id, best_block = %status.blockhash, "Peer connected");
}
NetworkEvent::SessionClosed { peer_id, reason } => {
let reason = reason.map(|s| s.to_string()).unwrap_or_else(|| "None".to_string());
warn!(target: "reth::cli", connected_peers = self.num_connected_peers(), peer_id = %peer_id, %reason, "Peer disconnected.");
}
_ => (),
}
}
}

/// A node event.
pub enum NodeEvent {
/// A network event.
Network(NetworkEvent),
/// A sync pipeline event.
Pipeline(PipelineEvent),
}

impl From<NetworkEvent> for NodeEvent {
fn from(evt: NetworkEvent) -> NodeEvent {
NodeEvent::Network(evt)
}
}

impl From<PipelineEvent> for NodeEvent {
fn from(evt: PipelineEvent) -> NodeEvent {
NodeEvent::Pipeline(evt)
}
}

/// Displays relevant information to the user from components of the node, and periodically
/// displays the high-level status of the node.
pub async fn handle_events(
network: Option<NetworkHandle>,
mut events: impl Stream<Item = NodeEvent> + Unpin,
) {
let mut state = NodeState::new(network);

let mut interval = tokio::time::interval(Duration::from_secs(30));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
Some(event) = events.next() => {
match event {
NodeEvent::Network(event) => {
state.handle_network_event(event).await;
},
NodeEvent::Pipeline(event) => {
state.handle_pipeline_event(event).await;
}
}
},
_ = interval.tick() => {
let stage = state.current_stage.map(|id| id.to_string()).unwrap_or_else(|| "None".to_string());
info!(target: "reth::cli", connected_peers = state.num_connected_peers(), %stage, checkpoint = state.current_checkpoint, "Status");
}
}
}
}
111 changes: 7 additions & 104 deletions bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
utils::get_single_header,
};
use clap::{crate_version, Parser};
use events::NodeEvent;
use eyre::Context;
use fdlimit::raise_fd_limit;
use futures::{pin_mut, stream::select as stream_select, Stream, StreamExt};
Expand All @@ -32,10 +33,10 @@ use reth_interfaces::{
sync::SyncStateUpdater,
};
use reth_network::{
error::NetworkError, FetchClient, NetworkConfig, NetworkEvent, NetworkHandle, NetworkManager,
error::NetworkError, FetchClient, NetworkConfig, NetworkHandle, NetworkManager,
};
use reth_network_api::NetworkInfo;
use reth_primitives::{BlockHashOrNumber, BlockNumber, ChainSpec, Head, H256};
use reth_primitives::{BlockHashOrNumber, ChainSpec, Head, H256};
use reth_provider::{BlockProvider, HeaderProvider, ShareableDatabase};
use reth_rpc_engine_api::{EngineApi, EngineApiHandle};
use reth_staged_sync::{
Expand All @@ -51,10 +52,12 @@ use reth_stages::{
stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage, FINISH},
};
use reth_tasks::TaskExecutor;
use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
use tokio::sync::{mpsc::unbounded_channel, watch};
use tracing::*;

pub mod events;

/// Start the node
#[derive(Debug, Parser)]
pub struct Command {
Expand Down Expand Up @@ -181,7 +184,7 @@ impl Command {
)
.await?;

ctx.task_executor.spawn(handle_events(events));
ctx.task_executor.spawn(events::handle_events(Some(network.clone()), events));

// Run pipeline
let (rx, tx) = tokio::sync::oneshot::channel();
Expand Down Expand Up @@ -477,106 +480,6 @@ async fn run_network_until_shutdown<C>(
}
}

/// The current high-level state of the node.
#[derive(Default)]
struct NodeState {
/// The number of connected peers.
connected_peers: usize,
/// The stage currently being executed.
current_stage: Option<StageId>,
/// The current checkpoint of the executing stage.
current_checkpoint: BlockNumber,
}

impl NodeState {
async fn handle_pipeline_event(&mut self, event: PipelineEvent) {
match event {
PipelineEvent::Running { stage_id, stage_progress } => {
let notable = self.current_stage.is_none();
self.current_stage = Some(stage_id);
self.current_checkpoint = stage_progress.unwrap_or_default();

if notable {
info!(target: "reth::cli", stage = %stage_id, from = stage_progress, "Executing stage");
}
}
PipelineEvent::Ran { stage_id, result } => {
let notable = result.stage_progress > self.current_checkpoint;
self.current_checkpoint = result.stage_progress;
if result.done {
self.current_stage = None;
info!(target: "reth::cli", stage = %stage_id, checkpoint = result.stage_progress, "Stage finished executing");
} else if notable {
info!(target: "reth::cli", stage = %stage_id, checkpoint = result.stage_progress, "Stage committed progress");
}
}
_ => (),
}
}

async fn handle_network_event(&mut self, event: NetworkEvent) {
match event {
NetworkEvent::SessionEstablished { peer_id, status, .. } => {
self.connected_peers += 1;
info!(target: "reth::cli", connected_peers = self.connected_peers, peer_id = %peer_id, best_block = %status.blockhash, "Peer connected");
}
NetworkEvent::SessionClosed { peer_id, reason } => {
self.connected_peers -= 1;
let reason = reason.map(|s| s.to_string()).unwrap_or_else(|| "None".to_string());
warn!(target: "reth::cli", connected_peers = self.connected_peers, peer_id = %peer_id, %reason, "Peer disconnected.");
}
_ => (),
}
}
}

/// A node event.
pub enum NodeEvent {
/// A network event.
Network(NetworkEvent),
/// A sync pipeline event.
Pipeline(PipelineEvent),
}

impl From<NetworkEvent> for NodeEvent {
fn from(evt: NetworkEvent) -> NodeEvent {
NodeEvent::Network(evt)
}
}

impl From<PipelineEvent> for NodeEvent {
fn from(evt: PipelineEvent) -> NodeEvent {
NodeEvent::Pipeline(evt)
}
}

/// Displays relevant information to the user from components of the node, and periodically
/// displays the high-level status of the node.
pub async fn handle_events(mut events: impl Stream<Item = NodeEvent> + Unpin) {
let mut state = NodeState::default();

let mut interval = tokio::time::interval(Duration::from_secs(30));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
Some(event) = events.next() => {
match event {
NodeEvent::Network(event) => {
state.handle_network_event(event).await;
},
NodeEvent::Pipeline(event) => {
state.handle_pipeline_event(event).await;
}
}
},
_ = interval.tick() => {
let stage = state.current_stage.map(|id| id.to_string()).unwrap_or_else(|| "None".to_string());
info!(target: "reth::cli", connected_peers = state.connected_peers, %stage, checkpoint = state.current_checkpoint, "Status");
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 3589879

Please sign in to comment.