From 7d5f0e31411de2787092ce5102f8e16d1d11b9d7 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Tue, 24 Sep 2024 16:09:08 +0300 Subject: [PATCH] feat(node-wasm)!: Add `stop()` in `NodeClient` --- Cargo.lock | 1 - cli/Cargo.toml | 1 - cli/src/common.rs | 4 +- cli/src/server.rs | 47 ++------------ cli/static/index.html | 16 ++++- cli/static/lumina_node.js | 20 ++++++ cli/static/run_node.js | 126 ++++++++++++++++---------------------- cli/static/worker.js | 25 +++----- node-wasm/src/client.rs | 24 ++++---- node-wasm/src/commands.rs | 4 +- node-wasm/src/worker.rs | 25 +++++--- node/src/events.rs | 9 ++- node/src/executor.rs | 26 +++++++- node/src/node.rs | 2 + 14 files changed, 167 insertions(+), 163 deletions(-) create mode 100644 cli/static/lumina_node.js diff --git a/Cargo.lock b/Cargo.lock index 699a2d25b..4b256da52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3227,7 +3227,6 @@ dependencies = [ "redb", "rust-embed", "serde", - "serde_repr", "tokio", "tracing", "tracing-appender", diff --git a/cli/Cargo.toml b/cli/Cargo.toml index a945d3e19..5336e6977 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -38,7 +38,6 @@ mime_guess = "2.0.4" redb = "2.1.1" rust-embed = { version = "8.4.0", features = ["interpolate-folder-path"] } serde = "1.0.203" -serde_repr = "0.1.19" tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread"] } tracing = "0.1.40" tracing-appender = "0.2.3" diff --git a/cli/src/common.rs b/cli/src/common.rs index bd12d9f9a..d25222c38 100644 --- a/cli/src/common.rs +++ b/cli/src/common.rs @@ -3,14 +3,12 @@ use std::env::current_exe; use anyhow::Result; use clap::{Parser, ValueEnum}; use lumina_node::network::Network; -use serde_repr::Serialize_repr; use crate::native; #[cfg(feature = "browser-node")] use crate::server; -#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, ValueEnum, Serialize_repr)] -#[repr(u8)] +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, ValueEnum)] pub(crate) enum ArgNetwork { #[default] Mainnet, diff --git a/cli/src/server.rs b/cli/src/server.rs index 8689d15eb..aad01aaf1 100644 --- a/cli/src/server.rs +++ b/cli/src/server.rs @@ -2,29 +2,18 @@ use std::net::SocketAddr; use anyhow::Result; use axum::body::Body; -use axum::extract::{Path, State}; +use axum::extract::Path; use axum::http::{header, StatusCode}; use axum::response::Response; use axum::routing::get; -use axum::{Json, Router}; +use axum::Router; use clap::Args; -use libp2p::Multiaddr; -use lumina_node::network::canonical_network_bootnodes; use rust_embed::RustEmbed; -use serde::Serialize; use tokio::net::TcpListener; use tracing::info; -use crate::common::ArgNetwork; - const SERVER_DEFAULT_BIND_ADDR: &str = "127.0.0.1:9876"; -#[derive(Debug, Clone, Serialize)] -struct WasmNodeArgs { - pub network: ArgNetwork, - pub bootnodes: Vec, -} - #[derive(RustEmbed)] #[folder = "$WASM_NODE_OUT_DIR"] struct WasmPackage; @@ -35,40 +24,18 @@ struct StaticResources; #[derive(Debug, Args)] pub(crate) struct Params { - /// Network to connect. - #[arg(short, long, value_enum, default_value_t)] - pub(crate) network: ArgNetwork, - - /// Listening addresses. Can be used multiple times. + /// Listening address. #[arg(short, long = "listen", default_value = SERVER_DEFAULT_BIND_ADDR)] pub(crate) listen_addr: SocketAddr, - - /// Bootnode multiaddr, including peer id. Can be used multiple times. - #[arg(short, long = "bootnode")] - pub(crate) bootnodes: Vec, } pub(crate) async fn run(args: Params) -> Result<()> { - let network = args.network.into(); - let bootnodes = if args.bootnodes.is_empty() { - canonical_network_bootnodes(network).collect() - } else { - args.bootnodes - }; - - let state = WasmNodeArgs { - network: args.network, - bootnodes, - }; - let app = Router::new() .route("/", get(serve_index_html)) .route("/js/*path", get(serve_embedded_path::)) - .route("/wasm/*path", get(serve_embedded_path::)) - .route("/cfg.json", get(serve_config)) - .with_state(state); + .route("/wasm/*path", get(serve_embedded_path::)); - info!("listening on {}", args.listen_addr); + info!("Listening on http://{}", args.listen_addr); let listener = TcpListener::bind(&args.listen_addr).await?; Ok(axum::serve(listener, app.into_make_service()).await?) @@ -91,7 +58,3 @@ async fn serve_embedded_path( Err(StatusCode::NOT_FOUND) } } - -async fn serve_config(state: State) -> Json { - Json(state.0) -} diff --git a/cli/static/index.html b/cli/static/index.html index becc32ede..bb80e9c0e 100644 --- a/cli/static/index.html +++ b/cli/static/index.html @@ -53,6 +53,18 @@ outline: none; } + .button { + background-color: var(--bg1); + color: var(--fg1); + border: 1px solid var(--border); + } + + .button:focus, + .button:focus-visible { + border: 1px solid var(--fg1); + outline: none; + } + .status { margin: 1rem 0 1rem 0.5rem; visibility: hidden; @@ -79,6 +91,8 @@ margin-left: 0.5rem; color: var(--fg1); } + + @@ -97,7 +111,7 @@

Bootnodes

- +

Event Logs

diff --git a/cli/static/lumina_node.js b/cli/static/lumina_node.js new file mode 100644 index 000000000..c0ce51d40 --- /dev/null +++ b/cli/static/lumina_node.js @@ -0,0 +1,20 @@ +// Equivalent to `../../node-wasm/js/index.js` but loads `/wasm/lumina_node_wasm.js` + +import init, { NodeClient } from "/wasm/lumina_node_wasm.js" + +/** +* Spawn a worker running lumina node and get the `NodeClient` connected to it. +*/ +export async function spawnNode() { + await init(); + let worker = new Worker(new URL("/js/worker.js", import.meta.url), { type: 'module' }); + let client = await new NodeClient(worker); + + // Workaround + await (new Promise(resolve => setTimeout(resolve, 500))); + + return client; +} + +export * from "/wasm/lumina_node_wasm.js"; +export default init; diff --git a/cli/static/run_node.js b/cli/static/run_node.js index 46f021be8..3de82ebe3 100644 --- a/cli/static/run_node.js +++ b/cli/static/run_node.js @@ -1,32 +1,18 @@ Error.stackTraceLimit = 99; // rust stack traces can get pretty big, increase the default -import init, { NodeConfig, NodeClient } from "/wasm/lumina_node_wasm.js"; - -async function fetch_config() { - const response = await fetch('/cfg.json'); - const json = await response.json(); - - console.log("Received config:", json); - - let config = NodeConfig.default(json.network); - if (json.bootnodes.length !== 0) { - config.bootnodes = json.bootnodes; - } - - return config; -} +import init, { NodeConfig, spawnNode } from "/js/lumina_node.js"; async function show_stats(node) { - if (!node || !await node.is_running()) { + if (!node || !await node.isRunning()) { return; } - const info = await node.syncer_info(); + const info = await node.syncerInfo(); document.getElementById("stored-ranges").innerText = info.stored_headers.map((range) => { return `${range.start}..${range.end}`; }).join(", "); let peers_ul = document.createElement('ul'); - (await node.connected_peers()).forEach(peer => { + (await node.connectedPeers()).forEach(peer => { var li = document.createElement("li"); li.innerText = peer; li.classList.add("mono"); @@ -35,7 +21,7 @@ async function show_stats(node) { document.getElementById("peers").replaceChildren(peers_ul); - const network_head = await node.get_network_head_header(); + const network_head = await node.getNetworkHeadHeader(); if (network_head == null) { return } @@ -48,44 +34,6 @@ async function show_stats(node) { document.getElementById("block-data-square").innerText = `${square_rows}x${square_cols} shares`; } -function bind_config(data) { - const network_div = document.getElementById("network-id"); - const bootnodes_div = document.getElementById("bootnodes"); - - const update_config_elements = () => { - network_div.value = window.config.network; - bootnodes_div.value = window.config.bootnodes.join("\n"); - } - - let proxy = { - set: function(obj, prop, value) { - if (prop == "network") { - const config = NodeConfig.default(Number(value)); - obj.network = config.network; - obj.bootnodes = config.bootnodes; - } else if (prop == "bootnodes") { - obj[prop] = value; - } else { - return Reflect.set(obj, prop, value); - } - - update_config_elements() - - return true; - } - }; - - window.config = new Proxy(data, proxy); - update_config_elements(); - - network_div.addEventListener("change", event => { - window.config.network = Number(event.target.value.trim()); - }); - bootnodes_div.addEventListener("change", event => { - window.config.bootnodes = event.target.value.trim().split("\n").map(multiaddr => multiaddr.trim()); - }); -} - function log_event(event) { // Skip noisy events if (event.data.get("event").type == "share_sampling_result") { @@ -105,30 +53,64 @@ function log_event(event) { textarea.scrollTop = textarea.scrollHeight; } -async function main(document, window) { - await init(); +function starting(document) { + document.getElementById("start-stop").disabled = true; + document.querySelectorAll('.config').forEach(elem => elem.disabled = true); +} + +async function started(document, window) { + document.getElementById("peer-id").innerText = await window.node.localPeerId(); + document.querySelectorAll(".status").forEach(elem => elem.style.visibility = "visible"); + document.getElementById("start-stop").innerText = "Stop"; + document.getElementById("start-stop").disabled = false; +} - window.node = await new NodeClient("/js/worker.js"); +function stopping(document) { + document.getElementById("start-stop").disabled = true; + document.querySelectorAll('.config').forEach(elem => elem.disabled = true); +} + +function stopped(document) { + document.querySelectorAll(".status").forEach(elem => elem.style.visibility = "hidden"); + document.getElementById("start-stop").innerText = "Start"; + document.querySelectorAll('.config').forEach(elem => elem.disabled = false); + document.getElementById("start-stop").disabled = false; +} + +async function main(document, window) { + window.node = await spawnNode(); - window.events = await window.node.events_channel(); + window.events = await window.node.eventsChannel(); window.events.onmessage = (event) => { log_event(event); }; - bind_config(await fetch_config()); + const network_id_div = document.getElementById("network-id"); + const bootnodes_div = document.getElementById("bootnodes"); + const start_stop_div = document.getElementById("start-stop"); - if (await window.node.is_running() === true) { - document.querySelectorAll('.config').forEach(elem => elem.disabled = true); - document.getElementById("peer-id").innerText = await window.node.local_peer_id(); - document.querySelectorAll(".status").forEach(elem => elem.style.visibility = "visible"); - } + window.config = NodeConfig.default(0); + bootnodes_div.value = window.config.bootnodes.join("\n"); - document.getElementById("start").addEventListener("click", async () => { - document.querySelectorAll('.config').forEach(elem => elem.disabled = true); + network_id_div.addEventListener("change", event => { + window.config = NodeConfig.default(Number(event.target.value)); + bootnodes_div.value = window.config.bootnodes.join("\n"); + }); - await window.node.start(window.config); - document.getElementById("peer-id").innerText = await window.node.local_peer_id(); - document.querySelectorAll(".status").forEach(elem => elem.style.visibility = "visible"); + bootnodes_div.addEventListener("change", event => { + window.config.bootnodes = event.target.value.trim().split("\n").map(multiaddr => multiaddr.trim()); + }); + + start_stop_div.addEventListener("click", async () => { + if (await window.node.isRunning() === true) { + stopping(document); + await window.node.stop(); + stopped(document); + } else { + starting(document); + await window.node.start(window.config); + await started(document, window); + } }); setInterval(async () => await show_stats(window.node), 1000) diff --git a/cli/static/worker.js b/cli/static/worker.js index 196ec2622..97e78862d 100644 --- a/cli/static/worker.js +++ b/cli/static/worker.js @@ -1,21 +1,12 @@ -import init, { run_worker } from '/wasm/lumina_node_wasm.js'; +// Equivalent to `../../node-wasm/js/worker.js` but loads `/wasm/lumina_node_wasm.js` + +import init, { NodeWorker } from "/wasm/lumina_node_wasm.js"; Error.stackTraceLimit = 99; -// for SharedWorker we queue incoming connections -// for dedicated Worker we queue incoming messages (coming from the single client) -let queued = []; -if (typeof SharedWorkerGlobalScope !== 'undefined' && self instanceof SharedWorkerGlobalScope) { - onconnect = (event) => { - queued.push(event) - } -} else { - onmessage = (event) => { - queued.push(event); - } -} +init().then(async () => { + let worker = new NodeWorker(self); + console.log("starting worker: ", worker); -init().then(() => { - console.log("starting worker, queued messages: ", queued.length); - run_worker(queued); -}) + await worker.run(); +}); diff --git a/node-wasm/src/client.rs b/node-wasm/src/client.rs index e4dd05ed1..a579a9898 100644 --- a/node-wasm/src/client.rs +++ b/node-wasm/src/client.rs @@ -25,7 +25,7 @@ use crate::wrapper::node::{PeerTrackerInfoSnapshot, SyncingInfoSnapshot}; /// Config for the lumina wasm node. #[wasm_bindgen(inspectable, js_name = NodeConfig)] -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct WasmNodeConfig { /// A network to connect to. pub network: Network, @@ -94,14 +94,22 @@ impl NodeClient { } /// Start a node with the provided config, if it's not running - pub async fn start(&self, config: WasmNodeConfig) -> Result<()> { - let command = NodeCommand::StartNode(config); + pub async fn start(&self, config: &WasmNodeConfig) -> Result<()> { + let command = NodeCommand::StartNode(config.to_owned()); let response = self.worker.exec(command).await?; response.into_node_started().check_variant()??; Ok(()) } + pub async fn stop(&self) -> Result<()> { + let command = NodeCommand::StopNode; + let response = self.worker.exec(command).await?; + response.into_node_stopped().check_variant()?; + + Ok(()) + } + /// Get node's local peer ID. #[wasm_bindgen(js_name = localPeerId)] pub async fn local_peer_id(&self) -> Result { @@ -345,16 +353,6 @@ impl NodeClient { Ok(to_value(&metadata?)?) } - /// Requests SharedWorker running lumina to close. Any events received afterwards wont - /// be processed and new NodeClient needs to be created to restart a node. - pub async fn close(&self) -> Result<()> { - let command = NodeCommand::CloseWorker; - let response = self.worker.exec(command).await?; - response.into_worker_closed().check_variant()?; - - Ok(()) - } - /// Returns a [`BroadcastChannel`] for events generated by [`Node`]. #[wasm_bindgen(js_name = eventsChannel)] pub async fn events_channel(&self) -> Result { diff --git a/node-wasm/src/commands.rs b/node-wasm/src/commands.rs index 82ff5404f..f7b359d15 100644 --- a/node-wasm/src/commands.rs +++ b/node-wasm/src/commands.rs @@ -23,6 +23,7 @@ pub(crate) enum NodeCommand { InternalPing, IsRunning, StartNode(WasmNodeConfig), + StopNode, GetEventsChannelName, GetLocalPeerId, GetSyncerInfo, @@ -52,7 +53,6 @@ pub(crate) enum NodeCommand { GetSamplingMetadata { height: u64, }, - CloseWorker, } #[derive(Serialize, Deserialize, Debug)] @@ -68,6 +68,7 @@ pub(crate) enum WorkerResponse { NodeNotRunning, IsRunning(bool), NodeStarted(Result<()>), + NodeStopped(()), EventsChannelName(String), LocalPeerId(String), SyncerInfo(Result), @@ -81,7 +82,6 @@ pub(crate) enum WorkerResponse { Headers(JsResult), LastSeenNetworkHead(JsResult), SamplingMetadata(Result>), - WorkerClosed(()), } pub(crate) trait CheckableResponseExt { diff --git a/node-wasm/src/worker.rs b/node-wasm/src/worker.rs index c0bce2e30..2149b77f5 100644 --- a/node-wasm/src/worker.rs +++ b/node-wasm/src/worker.rs @@ -9,7 +9,7 @@ use tokio::sync::mpsc; use tracing::{error, info, warn}; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::spawn_local; -use web_sys::{BroadcastChannel, SharedWorker}; +use web_sys::BroadcastChannel; use lumina_node::blockstore::IndexedDbBlockstore; use lumina_node::events::{EventSubscriber, NodeEventInfo}; @@ -20,7 +20,7 @@ use crate::client::WasmNodeConfig; use crate::commands::{NodeCommand, SingleHeaderQuery, WorkerResponse}; use crate::error::{Context, Error, Result}; use crate::ports::{ClientMessage, WorkerServer}; -use crate::utils::{random_id, WorkerSelf}; +use crate::utils::random_id; use crate::wrapper::libp2p::NetworkInfoSnapshot; #[derive(Debug, Serialize, Deserialize, Error)] @@ -78,6 +78,16 @@ impl NodeWorker { loop { let (client_id, command) = self.request_server.recv().await?; + // StopNode needs special handling because `NodeWorkerInstance` needs to be consumed. + if matches!(&command, NodeCommand::StopNode) { + if let Some(node) = self.node.take() { + node.stop().await; + self.request_server + .respond_to(client_id, WorkerResponse::NodeStopped(())); + continue; + } + } + let response = match &mut self.node { Some(node) => node.process_command(command).await, node @ None => match command { @@ -130,6 +140,10 @@ impl NodeWorkerInstance { }) } + async fn stop(self) { + self.node.stop().await; + } + async fn get_syncer_info(&mut self) -> Result { Ok(self.node.syncer_info().await?) } @@ -234,6 +248,7 @@ impl NodeWorkerInstance { NodeCommand::StartNode(_) => { WorkerResponse::NodeStarted(Err(Error::new("Node already started"))) } + NodeCommand::StopNode => unreachable!("StopNode is handled in `run()`"), NodeCommand::GetLocalPeerId => { WorkerResponse::LocalPeerId(self.node.local_peer_id().to_string()) } @@ -282,10 +297,6 @@ impl NodeWorkerInstance { NodeCommand::GetSamplingMetadata { height } => { WorkerResponse::SamplingMetadata(self.get_sampling_metadata(height).await) } - NodeCommand::CloseWorker => { - SharedWorker::worker_self().close(); - WorkerResponse::WorkerClosed(()) - } NodeCommand::InternalPing => WorkerResponse::InternalPong, } } @@ -313,6 +324,4 @@ async fn event_forwarder_task(mut events_sub: EventSubscriber, events_channel: B } } } - - events_channel.close(); } diff --git a/node/src/events.rs b/node/src/events.rs index fea553343..ea3672794 100644 --- a/node/src/events.rs +++ b/node/src/events.rs @@ -295,6 +295,9 @@ pub enum NodeEvent { /// In case of compromised network, syncing and data sampling will /// stop immediately. NetworkCompromised, + + /// Node stopped. + NodeStopped, } impl NodeEvent { @@ -317,7 +320,8 @@ impl NodeEvent { | NodeEvent::FetchingHeadHeaderFinished { .. } | NodeEvent::FetchingHeadersStarted { .. } | NodeEvent::FetchingHeadersFinished { .. } - | NodeEvent::PrunedHeaders { .. } => false, + | NodeEvent::PrunedHeaders { .. } + | NodeEvent::NodeStopped => false, } } } @@ -432,6 +436,9 @@ impl fmt::Display for NodeEvent { write!(f, "The network is compromised and should not be trusted. ")?; write!(f, "Node stopped synchronizing and sampling, but you can still make some queries to the network.") } + NodeEvent::NodeStopped => { + write!(f, "Node stopped") + } } } } diff --git a/node/src/executor.rs b/node/src/executor.rs index d294590c0..59f7ec503 100644 --- a/node/src/executor.rs +++ b/node/src/executor.rs @@ -234,7 +234,26 @@ mod imp { #[wasm_bindgen] extern "C" { #[wasm_bindgen] - fn setTimeout(closure: &Closure, timeout: u32); + fn setTimeout(closure: &Closure, timeout: u32) -> i32; + + #[wasm_bindgen] + fn clearTimeout(id: i32); + } + + struct ClearTimeoutOnCancel(Option); + + impl ClearTimeoutOnCancel { + fn disarm(mut self) { + self.0.take(); + } + } + + impl Drop for ClearTimeoutOnCancel { + fn drop(&mut self) { + if let Some(id) = self.0.take() { + clearTimeout(id); + } + } } let fut = async move { @@ -261,7 +280,8 @@ mod imp { // * We give time to JavaScript's tasks too. // // Ref: https://html.spec.whatwg.org/multipage/timers-and-user-prompts.html - setTimeout(&wake_closure, 0); + let id = setTimeout(&wake_closure, 0); + let guard = ClearTimeoutOnCancel(Some(id)); debug_assert!(!yielded.get(), "Closure called before reaching event loop"); @@ -274,6 +294,8 @@ mod imp { } }) .await; + + guard.disarm(); }; let fut = SendWrapper::new(fut); diff --git a/node/src/node.rs b/node/src/node.rs index b381563a5..612e5b6e0 100644 --- a/node/src/node.rs +++ b/node/src/node.rs @@ -230,6 +230,8 @@ where if let Err(e) = store.close().await { warn!("Store failed to close: {e}"); } + + self.event_channel.publisher().send(NodeEvent::NodeStopped); } fn syncer(&self) -> &Syncer {