diff --git a/Makefile b/Makefile index aa5a6d32..ebed1aca 100644 --- a/Makefile +++ b/Makefile @@ -20,7 +20,7 @@ devserver: echo -n '' > /tmp/rqbit-log && cargo run -- \ --log-file /tmp/rqbit-log \ --log-file-rust-log=debug,librqbit=trace \ - server start --fastresume /tmp/scratch/ + server start /tmp/scratch/ @PHONY: devserver devserver-postgres: diff --git a/crates/librqbit/src/api.rs b/crates/librqbit/src/api.rs index 60039665..17e2fcf5 100644 --- a/crates/librqbit/src/api.rs +++ b/crates/librqbit/src/api.rs @@ -14,6 +14,7 @@ use crate::{ session::{ AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session, TorrentId, }, + session_stats::snapshot::SessionStatsSnapshot, torrent_state::{ peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot}, FileStream, ManagedTorrentHandle, @@ -68,6 +69,14 @@ impl<'de> Deserialize<'de> for TorrentIdOrHash { struct V<'de> { p: PhantomData<&'de ()>, } + + macro_rules! visit_int { + ($v:expr) => {{ + let tid: TorrentId = $v.try_into().map_err(|e| E::custom(format!("{e:?}")))?; + Ok(TorrentIdOrHash::from(tid)) + }}; + } + impl<'de> serde::de::Visitor<'de> for V<'de> { type Value = TorrentIdOrHash; @@ -75,16 +84,47 @@ impl<'de> Deserialize<'de> for TorrentIdOrHash { f.write_str("integer or 40 byte info hash") } + fn visit_i64(self, v: i64) -> std::result::Result + where + E: serde::de::Error, + { + visit_int!(v) + } + + fn visit_i128(self, v: i128) -> std::result::Result + where + E: serde::de::Error, + { + visit_int!(v) + } + + fn visit_u128(self, v: u128) -> std::result::Result + where + E: serde::de::Error, + { + visit_int!(v) + } + + fn visit_u64(self, v: u64) -> std::result::Result + where + E: serde::de::Error, + { + visit_int!(v) + } + fn visit_str(self, v: &str) -> std::result::Result where E: serde::de::Error, { - TorrentIdOrHash::parse(v) - .map_err(|_| E::custom("expected integer or 40 byte info hash")) + TorrentIdOrHash::parse(v).map_err(|e| { + E::custom(format!( + "expected integer or 40 byte info hash, couldn't parse string: {e:?}" + )) + }) } } - deserializer.deserialize_str(V::default()) + deserializer.deserialize_any(V::default()) } } @@ -171,6 +211,10 @@ impl Api { make_torrent_details(&info_hash, &handle.info().info, only_files.as_deref()) } + pub fn api_session_stats(&self) -> SessionStatsSnapshot { + self.session().stats_snapshot() + } + pub fn torrent_file_mime_type( &self, idx: TorrentIdOrHash, diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index 7c25458c..4d0a48f7 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -79,6 +79,7 @@ fn compute_chunk_have_status(lengths: &Lengths, have_pieces: &BS) -> anyhow::Res lengths.total_pieces() ); } + let required_size = lengths.chunk_bitfield_bytes(); let vec = vec![0u8; required_size]; let mut chunk_bf = BF::from_boxed_slice(vec.into_boxed_slice()); diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 49de3b45..f72b95be 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -62,10 +62,15 @@ impl HttpApi { "GET /dht/stats": "DHT stats", "GET /dht/table": "DHT routing table", "GET /torrents": "List torrents", + "GET /torrents/playlist": "Generate M3U8 playlist for all files in all torrents", + "GET /stats": "Global session stats", + "POST /torrents/resolve_magnet": "Resolve a magnet to torrent file bytes", "GET /torrents/{id_or_infohash}": "Torrent details", "GET /torrents/{id_or_infohash}/haves": "The bitfield of have pieces", + "GET /torrents/{id_or_infohash}/playlist": "Generate M3U8 playlist for this torrent", "GET /torrents/{id_or_infohash}/stats/v1": "Torrent stats", "GET /torrents/{id_or_infohash}/peer_stats": "Per peer stats", + "GET /torrents/{id_or_infohash}/stream/{file_idx}": "Stream a file. Accepts Range header to seek.", "POST /torrents/{id_or_infohash}/pause": "Pause torrent", "POST /torrents/{id_or_infohash}/start": "Resume torrent", "POST /torrents/{id_or_infohash}/forget": "Forget about the torrent, keep the files", @@ -88,6 +93,10 @@ impl HttpApi { state.api_dht_table().map(axum::Json) } + async fn session_stats(State(state): State) -> impl IntoResponse { + axum::Json(state.api_session_stats()) + } + async fn torrents_list(State(state): State) -> impl IntoResponse { axum::Json(state.api_torrent_list()) } @@ -446,6 +455,7 @@ impl HttpApi { .route("/rust_log", post(set_rust_log)) .route("/dht/stats", get(dht_stats)) .route("/dht/table", get(dht_table)) + .route("/stats", get(session_stats)) .route("/torrents", get(torrents_list)) .route("/torrents/:id", get(torrent_details)) .route("/torrents/:id/haves", get(torrent_haves)) diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index b6028c73..4dda659c 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -57,6 +57,7 @@ mod peer_info_reader; mod read_buf; mod session; mod session_persistence; +pub mod session_stats; mod spawn_utils; pub mod storage; mod stream_connect; diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 01ac3ef2..670cd8fa 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -16,6 +16,7 @@ use crate::{ peer_connection::PeerConnectionOptions, read_buf::ReadBuf, session_persistence::{json::JsonSessionPersistenceStore, SessionPersistenceStore}, + session_stats::SessionStats, spawn_utils::BlockingSpawner, storage::{ filesystem::FilesystemStorageFactory, BoxStorageFactory, StorageFactoryExt, TorrentStorage, @@ -116,6 +117,8 @@ pub struct Session { root_span: Option, + pub(crate) stats: SessionStats, + // This is stored for all tasks to stop when session is dropped. _cancellation_token_drop_guard: DropGuard, } @@ -509,8 +512,10 @@ impl Session { async fn persistence_factory( opts: &SessionOptions, - ) -> anyhow::Result<(Option>, Arc)> { - + ) -> anyhow::Result<( + Option>, + Arc, + )> { macro_rules! make_result { ($store:expr) => { if opts.fastresume { @@ -535,7 +540,7 @@ impl Session { ); make_result!(s) - }, + } #[cfg(feature = "postgres")] Some(SessionPersistenceConfig::Postgres { connection_string }) => { use crate::session_persistence::postgres::PostgresSessionStorage; @@ -602,17 +607,23 @@ impl Session { reqwest_client, connector: stream_connector, root_span: opts.root_span, - concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new(opts.concurrent_init_limit.unwrap_or(3))) + stats: SessionStats::new(), + concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new( + opts.concurrent_init_limit.unwrap_or(3), + )), }); if let Some(mut disk_write_rx) = disk_write_rx { - session.spawn(error_span!(parent: session.rs(), "disk_writer"), async move { - while let Some(work) = disk_write_rx.recv().await { - trace!(disk_write_rx_queue_len = disk_write_rx.len()); - spawner.spawn_block_in_place(work); - } - Ok(()) - }); + session.spawn( + error_span!(parent: session.rs(), "disk_writer"), + async move { + while let Some(work) = disk_write_rx.recv().await { + trace!(disk_write_rx_queue_len = disk_write_rx.len()); + spawner.spawn_block_in_place(work); + } + Ok(()) + }, + ); } if let Some(tcp_listener) = tcp_listener { @@ -639,28 +650,36 @@ impl Session { let mut futs = FuturesUnordered::new(); while !added_all || !futs.is_empty() { + // NOTE: this closure exists purely to workaround rustfmt screwing up when inlining it. + let add_torrent_span = |info_hash: &Id20| -> tracing::Span { + error_span!(parent: session.rs(), "add_torrent", info_hash=?info_hash) + }; tokio::select! { Some(res) = futs.next(), if !futs.is_empty() => { if let Err(e) = res { error!("error adding torrent to session: {e:?}"); } - }, + } st = ps.next(), if !added_all => { - if let Some(st) = st { - let (id, st) = st?; - let span = error_span!(parent: session.rs(), "add_torrent", info_hash=?st.info_hash()); - let (add_torrent, mut opts) = st.into_add_torrent()?; - opts.preferred_id = Some(id); - let fut = session.add_torrent(add_torrent, Some(opts)).instrument(span); - futs.push(fut); - } else { - added_all = true; - } - }, - } + match st { + Some(st) => { + let (id, st) = st?; + let span = add_torrent_span(st.info_hash()); + let (add_torrent, mut opts) = st.into_add_torrent()?; + opts.preferred_id = Some(id); + let fut = session.add_torrent(add_torrent, Some(opts)); + let fut = fut.instrument(span); + futs.push(fut); + }, + None => added_all = true + }; + } + }; } } + session.start_speed_estimator_updater(); + Ok(session) } .boxed() @@ -785,7 +804,7 @@ impl Session { spawn_with_cancel(span, self.cancellation_token.clone(), fut); } - fn rs(&self) -> Option { + pub(crate) fn rs(&self) -> Option { self.root_span.as_ref().and_then(|s| s.id()) } @@ -1147,6 +1166,7 @@ impl Session { self.cancellation_token.child_token(), self.concurrent_initialize_semaphore.clone(), self.bitv_factory.clone(), + self.stats.atomic.clone(), ) .context("error starting torrent")?; } @@ -1303,6 +1323,7 @@ impl Session { self.cancellation_token.child_token(), self.concurrent_initialize_semaphore.clone(), self.bitv_factory.clone(), + self.stats.atomic.clone(), )?; self.try_update_persistence_metadata(handle).await; Ok(()) diff --git a/crates/librqbit/src/session_stats/atomic.rs b/crates/librqbit/src/session_stats/atomic.rs new file mode 100644 index 00000000..416521c5 --- /dev/null +++ b/crates/librqbit/src/session_stats/atomic.rs @@ -0,0 +1,10 @@ +use std::sync::atomic::AtomicU64; + +use crate::torrent_state::live::peers::stats::atomic::AggregatePeerStatsAtomic; + +#[derive(Default, Debug)] +pub struct AtomicSessionStats { + pub fetched_bytes: AtomicU64, + pub uploaded_bytes: AtomicU64, + pub(crate) peers: AggregatePeerStatsAtomic, +} diff --git a/crates/librqbit/src/session_stats/mod.rs b/crates/librqbit/src/session_stats/mod.rs new file mode 100644 index 00000000..ef3d8149 --- /dev/null +++ b/crates/librqbit/src/session_stats/mod.rs @@ -0,0 +1,62 @@ +use std::{ + sync::{atomic::Ordering, Arc}, + time::{Duration, Instant}, +}; + +use atomic::AtomicSessionStats; +use librqbit_core::speed_estimator::SpeedEstimator; +use snapshot::SessionStatsSnapshot; +use tracing::error_span; + +use crate::Session; + +pub mod atomic; +pub mod snapshot; + +pub struct SessionStats { + pub atomic: Arc, + pub down_speed_estimator: SpeedEstimator, + pub up_speed_estimator: SpeedEstimator, +} + +impl SessionStats { + pub fn new() -> Self { + SessionStats { + atomic: Default::default(), + down_speed_estimator: SpeedEstimator::new(5), + up_speed_estimator: SpeedEstimator::new(5), + } + } +} + +impl Default for SessionStats { + fn default() -> Self { + Self::new() + } +} + +impl Session { + pub(crate) fn start_speed_estimator_updater(self: &Arc) { + self.spawn(error_span!(parent: self.rs(), "speed_estimator"), { + let s = self.clone(); + + async move { + let mut i = tokio::time::interval(Duration::from_secs(1)); + loop { + i.tick().await; + let now = Instant::now(); + let fetched = s.stats.atomic.fetched_bytes.load(Ordering::Relaxed); + let uploaded = s.stats.atomic.uploaded_bytes.load(Ordering::Relaxed); + s.stats + .down_speed_estimator + .add_snapshot(fetched, None, now); + s.stats.up_speed_estimator.add_snapshot(uploaded, None, now); + } + } + }) + } + + pub fn stats_snapshot(&self) -> SessionStatsSnapshot { + SessionStatsSnapshot::from(&self.stats) + } +} diff --git a/crates/librqbit/src/session_stats/snapshot.rs b/crates/librqbit/src/session_stats/snapshot.rs new file mode 100644 index 00000000..b2feb2d4 --- /dev/null +++ b/crates/librqbit/src/session_stats/snapshot.rs @@ -0,0 +1,22 @@ +use serde::Serialize; + +use crate::torrent_state::{peers::stats::snapshot::AggregatePeerStats, stats::Speed}; + +use super::SessionStats; + +#[derive(Debug, Serialize)] +pub struct SessionStatsSnapshot { + download_speed: Speed, + upload_speed: Speed, + peers: AggregatePeerStats, +} + +impl From<&SessionStats> for SessionStatsSnapshot { + fn from(s: &SessionStats) -> Self { + Self { + download_speed: s.down_speed_estimator.mbps().into(), + upload_speed: s.up_speed_estimator.mbps().into(), + peers: AggregatePeerStats::from(&s.atomic.peers), + } + } +} diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index a537fd05..e9da863d 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -70,6 +70,7 @@ use peer_binary_protocol::{ extended::{handshake::ExtendedHandshake, ut_metadata::UtMetadata, ExtendedMessage}, Handshake, Message, MessageOwned, Piece, Request, }; +use peers::stats::atomic::AggregatePeerStatsAtomic; use tokio::sync::{ mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, Notify, OwnedSemaphorePermit, Semaphore, @@ -84,6 +85,7 @@ use crate::{ PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest, }, session::CheckedIncomingConnection, + session_stats::atomic::AtomicSessionStats, torrent_state::{peer::Peer, utils::atomic_inc}, type_aliases::{DiskWorkQueueSender, FilePriorities, FileStorage, PeerHandle, BF}, }; @@ -201,6 +203,8 @@ pub struct TorrentStateLive { up_speed_estimator: SpeedEstimator, cancellation_token: CancellationToken, + session_stats: Arc, + pub(crate) streams: Arc, have_broadcast_tx: tokio::sync::broadcast::Sender, } @@ -210,6 +214,7 @@ impl TorrentStateLive { paused: TorrentStatePaused, fatal_errors_tx: tokio::sync::oneshot::Sender, cancellation_token: CancellationToken, + session_stats: Arc, ) -> anyhow::Result> { let (peer_queue_tx, peer_queue_rx) = unbounded_channel(); @@ -237,7 +242,11 @@ impl TorrentStateLive { let state = Arc::new(TorrentStateLive { meta: paused.info.clone(), - peers: Default::default(), + peers: PeerStates { + session_stats: session_stats.clone(), + stats: Default::default(), + states: Default::default(), + }, locked: RwLock::new(TorrentStateLocked { chunks: Some(paused.chunk_tracker), // TODO: move under per_piece_locks? @@ -260,6 +269,7 @@ impl TorrentStateLive { up_speed_estimator, cancellation_token, have_broadcast_tx, + session_stats, streams: paused.streams, per_piece_locks: (0..lengths.total_pieces()) .map(|_| RwLock::new(())) @@ -307,6 +317,10 @@ impl TorrentStateLive { spawn_with_cancel(span, self.cancellation_token.clone(), fut); } + fn peer_stats(&self) -> [&AggregatePeerStatsAtomic; 2] { + [&self.peers.stats, &self.peers.session_stats.peers] + } + pub fn down_speed_estimator(&self) -> &SpeedEstimator { &self.down_speed_estimator } @@ -343,7 +357,7 @@ impl TorrentStateLive { .incoming_connection( Id20::new(checked_peer.handshake.peer_id), tx.clone(), - &self.peers.stats, + &self.peer_stats(), ) .context("peer already existed")?; peer.stats.counters.clone() @@ -353,7 +367,7 @@ impl TorrentStateLive { let peer = Peer::new_live_for_incoming_connection( Id20::new(checked_peer.handshake.peer_id), tx.clone(), - &self.peers.stats, + &self.peer_stats(), ); let counters = peer.stats.counters.clone(); vac.insert(peer); @@ -562,7 +576,7 @@ impl TorrentStateLive { fn set_peer_live(&self, handle: PeerHandle, h: Handshake) { self.peers.with_peer_mut(handle, "set_peer_live", |p| { p.state - .connecting_to_live(Id20::new(h.peer_id), &self.peers.stats); + .connecting_to_live(Id20::new(h.peer_id), &self.peer_stats()); }); } @@ -750,7 +764,7 @@ impl TorrentStateLive { for mut pe in self.peers.states.iter_mut() { if let PeerState::Live(l) = pe.value().state.get() { if l.has_full_torrent(self.lengths.total_pieces() as usize) { - let prev = pe.value_mut().state.set_not_needed(&self.peers.stats); + let prev = pe.value_mut().state.set_not_needed(&self.peer_stats()); let _ = prev .take_live_no_counters() .unwrap() @@ -763,7 +777,7 @@ impl TorrentStateLive { pub(crate) fn reconnect_all_not_needed_peers(&self) { for mut pe in self.peers.states.iter_mut() { - if pe.state.not_needed_to_queued(&self.peers.stats) + if pe.state.not_needed_to_queued(&self.peer_stats()) && self.peer_queue_tx.send(*pe.key()).is_err() { return; @@ -883,6 +897,10 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { .stats .uploaded_bytes .fetch_add(bytes as u64, Ordering::Relaxed); + self.state + .session_stats + .uploaded_bytes + .fetch_add(bytes as u64, Ordering::Relaxed); } fn read_chunk(&self, chunk: &ChunkInfo, buf: &mut [u8]) -> anyhow::Result<()> { @@ -925,7 +943,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { impl PeerHandler { fn on_peer_died(self, error: Option) -> anyhow::Result<()> { let peers = &self.state.peers; - let pstats = &peers.stats; + let pstats = self.state.peer_stats(); let handle = self.addr; let mut pe = match peers.states.get_mut(&handle) { Some(peer) => TimedExistence::new(peer, "on_peer_died"), @@ -934,7 +952,7 @@ impl PeerHandler { return Ok(()); } }; - let prev = pe.value_mut().state.take(pstats); + let prev = pe.value_mut().state.take(&pstats); match prev { PeerState::Connecting(_) => {} @@ -953,7 +971,7 @@ impl PeerHandler { } PeerState::NotNeeded => { // Restore it as std::mem::take() replaced it above. - pe.value_mut().state.set(PeerState::NotNeeded, pstats); + pe.value_mut().state.set(PeerState::NotNeeded, &pstats); return Ok(()); } s @ PeerState::Queued | s @ PeerState::Dead => { @@ -969,7 +987,7 @@ impl PeerHandler { Some(e) => e, None => { trace!("peer died without errors, not re-queueing"); - pe.value_mut().state.set(PeerState::NotNeeded, pstats); + pe.value_mut().state.set(PeerState::NotNeeded, &pstats); return Ok(()); } }; @@ -978,11 +996,11 @@ impl PeerHandler { if self.state.is_finished_and_no_active_streams() { debug!("torrent finished, not re-queueing"); - pe.value_mut().state.set(PeerState::NotNeeded, pstats); + pe.value_mut().state.set(PeerState::NotNeeded, &pstats); return Ok(()); } - pe.value_mut().state.set(PeerState::Dead, pstats); + pe.value_mut().state.set(PeerState::Dead, &pstats); let backoff = pe.value_mut().stats.backoff.next_backoff(); @@ -1006,7 +1024,7 @@ impl PeerHandler { .with_peer_mut(handle, "dead_to_queued", |peer| { match peer.state.get() { PeerState::Dead => { - peer.state.set(PeerState::Queued, &self.state.peers.stats) + peer.state.set(PeerState::Queued, &self.state.peer_stats()) } other => bail!( "peer is in unexpected state: {}. Expected dead", @@ -1415,6 +1433,10 @@ impl PeerHandler { .stats .fetched_bytes .fetch_add(piece.block.as_ref().len() as u64, Ordering::Relaxed); + self.state + .session_stats + .fetched_bytes + .fetch_add(piece.block.len() as u64, Ordering::Relaxed); fn write_to_disk( state: &TorrentStateLive, diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index 8299fc12..9efd6b0b 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -26,10 +26,12 @@ impl Peer { pub fn new_live_for_incoming_connection( peer_id: Id20, tx: PeerTx, - counters: &AggregatePeerStatsAtomic, + counters: &[&AggregatePeerStatsAtomic], ) -> Self { let state = PeerStateNoMut(PeerState::Live(LivePeerState::new(peer_id, tx, true))); - counters.inc(&state.0); + for counter in counters { + counter.inc(&state.0); + } Self { state, stats: Default::default(), @@ -85,12 +87,20 @@ impl PeerStateNoMut { &self.0 } - pub fn take(&mut self, counters: &AggregatePeerStatsAtomic) -> PeerState { + pub fn take(&mut self, counters: &[&AggregatePeerStatsAtomic]) -> PeerState { self.set(Default::default(), counters) } - pub fn set(&mut self, new: PeerState, counters: &AggregatePeerStatsAtomic) -> PeerState { - counters.incdec(&self.0, &new); + pub fn destroy(self, counters: &[&AggregatePeerStatsAtomic]) { + for counter in counters { + counter.dec(&self.0); + } + } + + pub fn set(&mut self, new: PeerState, counters: &[&AggregatePeerStatsAtomic]) -> PeerState { + for counter in counters { + counter.incdec(&self.0, &new); + } std::mem::replace(&mut self.0, new) } @@ -110,7 +120,7 @@ impl PeerStateNoMut { pub fn idle_to_connecting( &mut self, - counters: &AggregatePeerStatsAtomic, + counters: &[&AggregatePeerStatsAtomic], ) -> Option<(PeerRx, PeerTx)> { match &self.0 { PeerState::Queued | PeerState::NotNeeded => { @@ -123,7 +133,7 @@ impl PeerStateNoMut { } } - pub fn not_needed_to_queued(&mut self, counters: &AggregatePeerStatsAtomic) -> bool { + pub fn not_needed_to_queued(&mut self, counters: &[&AggregatePeerStatsAtomic]) -> bool { if let PeerState::NotNeeded = &self.0 { self.set(PeerState::Queued, counters); return true; @@ -135,7 +145,7 @@ impl PeerStateNoMut { &mut self, peer_id: Id20, tx: PeerTx, - counters: &AggregatePeerStatsAtomic, + counters: &[&AggregatePeerStatsAtomic], ) -> anyhow::Result<()> { if matches!(&self.0, PeerState::Connecting(..) | PeerState::Live(..)) { anyhow::bail!("peer already active"); @@ -155,7 +165,7 @@ impl PeerStateNoMut { pub fn connecting_to_live( &mut self, peer_id: Id20, - counters: &AggregatePeerStatsAtomic, + counters: &[&AggregatePeerStatsAtomic], ) -> Option<&mut LivePeerState> { if let PeerState::Connecting(_) = &self.0 { let tx = match self.take(counters) { @@ -172,7 +182,7 @@ impl PeerStateNoMut { } } - pub fn set_not_needed(&mut self, counters: &AggregatePeerStatsAtomic) -> PeerState { + pub fn set_not_needed(&mut self, counters: &[&AggregatePeerStatsAtomic]) -> PeerState { self.set(PeerState::NotNeeded, counters) } } diff --git a/crates/librqbit/src/torrent_state/live/peers/mod.rs b/crates/librqbit/src/torrent_state/live/peers/mod.rs index edbd8ff2..6478f306 100644 --- a/crates/librqbit/src/torrent_state/live/peers/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peers/mod.rs @@ -1,4 +1,4 @@ -use std::net::SocketAddr; +use std::{net::SocketAddr, sync::Arc}; use anyhow::Context; use backoff::backoff::Backoff; @@ -8,6 +8,7 @@ use peer_binary_protocol::{Message, Request}; use crate::{ peer_connection::WriterRequest, + session_stats::atomic::AtomicSessionStats, torrent_state::utils::{atomic_inc, TimedExistence}, type_aliases::{PeerHandle, BF}, }; @@ -18,12 +19,20 @@ use super::peer::{LivePeerState, Peer, PeerRx, PeerState, PeerTx}; pub mod stats; -#[derive(Default)] pub(crate) struct PeerStates { + pub session_stats: Arc, pub stats: AggregatePeerStatsAtomic, pub states: DashMap, } +impl Drop for PeerStates { + fn drop(&mut self) { + for (_, p) in std::mem::take(&mut self.states).into_iter() { + p.state.destroy(&[&self.session_stats.peers]); + } + } +} + impl PeerStates { pub fn stats(&self) -> AggregatePeerStats { AggregatePeerStats::from(&self.stats) @@ -36,7 +45,10 @@ impl PeerStates { Entry::Vacant(vac) => { vac.insert(Default::default()); atomic_inc(&self.stats.queued); + atomic_inc(&self.session_stats.peers.queued); + atomic_inc(&self.stats.seen); + atomic_inc(&self.session_stats.peers.seen); Some(addr) } } @@ -73,7 +85,10 @@ impl PeerStates { pub fn drop_peer(&self, handle: PeerHandle) -> Option { let p = self.states.remove(&handle).map(|r| r.1)?; - self.stats.dec(p.state.get()); + let s = p.state.get(); + self.stats.dec(s); + self.session_stats.peers.dec(s); + Some(p) } @@ -99,7 +114,7 @@ impl PeerStates { let rx = self .with_peer_mut(h, "mark_peer_connecting", |peer| { peer.state - .idle_to_connecting(&self.stats) + .idle_to_connecting(&[&self.stats, &self.session_stats.peers]) .context("invalid peer state") }) .context("peer not found in states")??; @@ -114,7 +129,8 @@ impl PeerStates { pub fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option { let prev = self.with_peer_mut(handle, "mark_peer_not_needed", |peer| { - peer.state.set_not_needed(&self.stats) + peer.state + .set_not_needed(&[&self.stats, &self.session_stats.peers]) })?; Some(prev) } @@ -132,6 +148,7 @@ impl PeerStates { atomic_inc(&p.stats.counters.times_stolen_from_me); }); self.stats.inc_steals(); + self.session_stats.peers.inc_steals(); self.with_live_mut(from_peer, "send_cancellations", |live| { let to_remove = live diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 2002924b..0b506abe 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -38,6 +38,7 @@ use crate::bitv_factory::BitVFactory; use crate::chunk_tracker::ChunkTracker; use crate::file_info::FileInfo; use crate::session::TorrentId; +use crate::session_stats::atomic::AtomicSessionStats; use crate::spawn_utils::BlockingSpawner; use crate::storage::BoxStorageFactory; use crate::stream_connect::StreamConnector; @@ -211,6 +212,7 @@ impl ManagedTorrent { live_cancellation_token: CancellationToken, init_semaphore: Arc, bitv_factory: Arc, + session_stats: Arc, ) -> anyhow::Result<()> { let mut g = self.locked.write(); @@ -319,8 +321,12 @@ impl ManagedTorrent { } let (tx, rx) = tokio::sync::oneshot::channel(); - let live = - TorrentStateLive::new(paused, tx, live_cancellation_token)?; + let live = TorrentStateLive::new( + paused, + tx, + live_cancellation_token, + session_stats, + )?; g.state = ManagedTorrentState::Live(live.clone()); drop(g); @@ -345,7 +351,12 @@ impl ManagedTorrent { ManagedTorrentState::Paused(_) => { let paused = g.state.take().assert_paused(); let (tx, rx) = tokio::sync::oneshot::channel(); - let live = TorrentStateLive::new(paused, tx, live_cancellation_token.clone())?; + let live = TorrentStateLive::new( + paused, + tx, + live_cancellation_token.clone(), + session_stats, + )?; g.state = ManagedTorrentState::Live(live.clone()); drop(g); @@ -371,6 +382,7 @@ impl ManagedTorrent { live_cancellation_token, init_semaphore, bitv_factory, + session_stats, ) } ManagedTorrentState::None => bail!("bug: torrent is in empty state"), diff --git a/crates/librqbit/webui/src/api-types.ts b/crates/librqbit/webui/src/api-types.ts index 45be3132..3d9488e7 100644 --- a/crates/librqbit/webui/src/api-types.ts +++ b/crates/librqbit/webui/src/api-types.ts @@ -34,6 +34,21 @@ export interface Speed { human_readable: string; } +export interface AggregatePeerStats { + queued: number; + connecting: number; + live: number; + seen: number; + dead: number; + not_needed: number; +} + +export interface SessionStats { + download_speed: Speed; + upload_speed: Speed; + peers: AggregatePeerStats; +} + // Interface for the Torrent Stats API response export interface LiveTorrentStats { snapshot: { @@ -46,14 +61,7 @@ export interface LiveTorrentStats { remaining_bytes: number; total_bytes: number; total_piece_download_ms: number; - peer_stats: { - queued: number; - connecting: number; - live: number; - seen: number; - dead: number; - not_needed: number; - }; + peer_stats: AggregatePeerStats; }; average_piece_download_time: { secs: number; @@ -182,4 +190,5 @@ export interface RqbitAPI { start: (index: number) => Promise; forget: (index: number) => Promise; delete: (index: number) => Promise; + stats: () => Promise; } diff --git a/crates/librqbit/webui/src/components/Footer.tsx b/crates/librqbit/webui/src/components/Footer.tsx new file mode 100644 index 00000000..186cab60 --- /dev/null +++ b/crates/librqbit/webui/src/components/Footer.tsx @@ -0,0 +1,12 @@ +import { useStatsStore } from "../stores/statsStore"; +import { Speed } from "./Speed"; + +export const Footer: React.FC<{}> = () => { + let stats = useStatsStore((stats) => stats.stats); + return ( +
+
↓ {stats.download_speed.human_readable}
+
↑ {stats.upload_speed.human_readable}
+
+ ); +}; diff --git a/crates/librqbit/webui/src/components/TorrentsList.tsx b/crates/librqbit/webui/src/components/TorrentsList.tsx index 51361375..1b9410b7 100644 --- a/crates/librqbit/webui/src/components/TorrentsList.tsx +++ b/crates/librqbit/webui/src/components/TorrentsList.tsx @@ -19,7 +19,9 @@ export const TorrentsList = (props: {

No existing torrents found.

) : ( props.torrents.map((t: TorrentId) => ( - + <> + + )) )} diff --git a/crates/librqbit/webui/src/context.tsx b/crates/librqbit/webui/src/context.tsx index 58a63a8f..7bf10b80 100644 --- a/crates/librqbit/webui/src/context.tsx +++ b/crates/librqbit/webui/src/context.tsx @@ -1,5 +1,5 @@ import { createContext } from "react"; -import { RqbitAPI } from "./api-types"; +import { RqbitAPI, SessionStats } from "./api-types"; export const APIContext = createContext({ listTorrents: () => { @@ -38,5 +38,8 @@ export const APIContext = createContext({ getPlaylistUrl: function (index: number): string | null { throw new Error("Function not implemented."); }, + stats: function (): Promise { + throw new Error("Function not implemented."); + }, }); export const RefreshTorrentStatsContext = createContext({ refresh: () => {} }); diff --git a/crates/librqbit/webui/src/http-api.ts b/crates/librqbit/webui/src/http-api.ts index ef6af660..3bf66d81 100644 --- a/crates/librqbit/webui/src/http-api.ts +++ b/crates/librqbit/webui/src/http-api.ts @@ -3,6 +3,7 @@ import { ErrorDetails, ListTorrentsResponse, RqbitAPI, + SessionStats, TorrentDetails, TorrentStats, } from "./api-types"; @@ -82,6 +83,9 @@ export const API: RqbitAPI & { getVersion: () => Promise } = { getTorrentStats: (index: number): Promise => { return makeRequest("GET", `/torrents/${index}/stats/v1`); }, + stats: (): Promise => { + return makeRequest("GET", "/stats"); + }, uploadTorrent: (data, opts): Promise => { let url = "/torrents?&overwrite=true"; @@ -152,6 +156,6 @@ export const API: RqbitAPI & { getVersion: () => Promise } = { return url; }, getPlaylistUrl: (index: number) => { - return (apiUrl || window.origin) + `/torrents/${index}/playlist`; + return (apiUrl || window.origin) + `/torrents/${index}/playlist`; }, }; diff --git a/crates/librqbit/webui/src/rqbit-web.tsx b/crates/librqbit/webui/src/rqbit-web.tsx index e2ac3a3a..2542cce4 100644 --- a/crates/librqbit/webui/src/rqbit-web.tsx +++ b/crates/librqbit/webui/src/rqbit-web.tsx @@ -11,6 +11,8 @@ import { DarkMode } from "./helper/darkMode"; import { useTorrentStore } from "./stores/torrentStore"; import { useErrorStore } from "./stores/errorStore"; import { AlertModal } from "./components/modal/AlertModal"; +import { useStatsStore } from "./stores/statsStore"; +import { Footer } from "./components/Footer"; export interface ErrorWithLabel { text: string; @@ -49,6 +51,8 @@ export const RqbitWebUI = (props: { }; setRefreshTorrents(refreshTorrents); + const setStats = useStatsStore((state) => state.setStats); + useEffect(() => { return customSetInterval( async () => @@ -67,8 +71,25 @@ export const RqbitWebUI = (props: { ); }, []); + useEffect(() => { + return customSetInterval( + async () => + API.stats().then( + (stats) => { + setStats(stats); + return 1000; + }, + (e) => { + console.error(e); + return 5000; + } + ), + 0 + ); + }, []); + return ( -
+
{/* Menu buttons */} @@ -82,10 +103,14 @@ export const RqbitWebUI = (props: {
+
+
+
+ setLogsOpened(false)} />
diff --git a/crates/librqbit/webui/src/stores/statsStore.ts b/crates/librqbit/webui/src/stores/statsStore.ts new file mode 100644 index 00000000..ebf2b4cb --- /dev/null +++ b/crates/librqbit/webui/src/stores/statsStore.ts @@ -0,0 +1,26 @@ +import { create } from "zustand"; + +import { SessionStats } from "../api-types"; + +export interface StatsStore { + stats: SessionStats; + setStats: (stats: SessionStats) => void; +} + +export const useStatsStore = create((set) => ({ + stats: { + download_speed: { human_readable: "N/A", mbps: 0 }, + upload_speed: { human_readable: "N/A", mbps: 0 }, + peers: { + connecting: 0, + dead: 0, + live: 0, + not_needed: 0, + queued: 0, + seen: 0, + }, + }, + setStats: (stats) => { + set({ stats }); + }, +})); diff --git a/desktop/src-tauri/src/main.rs b/desktop/src-tauri/src/main.rs index a8e43067..97f2e7c1 100644 --- a/desktop/src-tauri/src/main.rs +++ b/desktop/src-tauri/src/main.rs @@ -19,6 +19,7 @@ use librqbit::{ TorrentListResponse, TorrentStats, }, dht::PersistentDhtConfig, + session_stats::snapshot::SessionStatsSnapshot, tracing_subscriber_config_utils::{init_logging, InitLoggingOptions, InitLoggingResult}, AddTorrent, AddTorrentOptions, Api, ApiError, PeerConnectionOptions, Session, SessionOptions, SessionPersistenceConfig, @@ -318,6 +319,11 @@ async fn torrent_action_configure( .await } +#[tauri::command] +async fn stats(state: tauri::State<'_, State>) -> Result { + Ok(state.api()?.api_session_stats()) +} + #[tauri::command] fn get_version() -> &'static str { env!("CARGO_PKG_VERSION") @@ -352,6 +358,7 @@ async fn start() { torrent_action_start, torrent_action_configure, torrent_create_from_base64_file, + stats, get_version, config_default, config_current, diff --git a/desktop/src/api.tsx b/desktop/src/api.tsx index 6e0f9e87..51138a09 100644 --- a/desktop/src/api.tsx +++ b/desktop/src/api.tsx @@ -6,6 +6,7 @@ import { TorrentDetails, TorrentStats, ErrorDetails, + SessionStats, } from "rqbit-webui/src/api-types"; import { InvokeArgs, invoke } from "@tauri-apps/api/tauri"; @@ -141,5 +142,8 @@ export const makeAPI = (configuration: RqbitDesktopConfig): RqbitAPI => { } return `${httpBase}/torrents/${index}/playlist`; }, + stats: () => { + return invokeAPI("stats"); + }, }; };