Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] session stats #204

Merged
merged 10 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ devserver:
echo -n '' > /tmp/rqbit-log && cargo run -- \
--log-file /tmp/rqbit-log \
--log-file-rust-log=debug,librqbit=trace \
server start --fastresume /tmp/scratch/
server start /tmp/scratch/

@PHONY: devserver
devserver-postgres:
Expand Down
50 changes: 47 additions & 3 deletions crates/librqbit/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{
session::{
AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session, TorrentId,
},
session_stats::snapshot::SessionStatsSnapshot,
torrent_state::{
peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot},
FileStream, ManagedTorrentHandle,
Expand Down Expand Up @@ -68,23 +69,62 @@ impl<'de> Deserialize<'de> for TorrentIdOrHash {
struct V<'de> {
p: PhantomData<&'de ()>,
}

macro_rules! visit_int {
($v:expr) => {{
let tid: TorrentId = $v.try_into().map_err(|e| E::custom(format!("{e:?}")))?;
Ok(TorrentIdOrHash::from(tid))
}};
}

impl<'de> serde::de::Visitor<'de> for V<'de> {
type Value = TorrentIdOrHash;

fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.write_str("integer or 40 byte info hash")
}

fn visit_i64<E>(self, v: i64) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
visit_int!(v)
}

fn visit_i128<E>(self, v: i128) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
visit_int!(v)
}

fn visit_u128<E>(self, v: u128) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
visit_int!(v)
}

fn visit_u64<E>(self, v: u64) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
visit_int!(v)
}

fn visit_str<E>(self, v: &str) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
TorrentIdOrHash::parse(v)
.map_err(|_| E::custom("expected integer or 40 byte info hash"))
TorrentIdOrHash::parse(v).map_err(|e| {
E::custom(format!(
"expected integer or 40 byte info hash, couldn't parse string: {e:?}"
))
})
}
}

deserializer.deserialize_str(V::default())
deserializer.deserialize_any(V::default())
}
}

Expand Down Expand Up @@ -171,6 +211,10 @@ impl Api {
make_torrent_details(&info_hash, &handle.info().info, only_files.as_deref())
}

pub fn api_session_stats(&self) -> SessionStatsSnapshot {
self.session().stats_snapshot()
}

pub fn torrent_file_mime_type(
&self,
idx: TorrentIdOrHash,
Expand Down
1 change: 1 addition & 0 deletions crates/librqbit/src/chunk_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ fn compute_chunk_have_status(lengths: &Lengths, have_pieces: &BS) -> anyhow::Res
lengths.total_pieces()
);
}

let required_size = lengths.chunk_bitfield_bytes();
let vec = vec![0u8; required_size];
let mut chunk_bf = BF::from_boxed_slice(vec.into_boxed_slice());
Expand Down
10 changes: 10 additions & 0 deletions crates/librqbit/src/http_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,15 @@ impl HttpApi {
"GET /dht/stats": "DHT stats",
"GET /dht/table": "DHT routing table",
"GET /torrents": "List torrents",
"GET /torrents/playlist": "Generate M3U8 playlist for all files in all torrents",
"GET /stats": "Global session stats",
"POST /torrents/resolve_magnet": "Resolve a magnet to torrent file bytes",
"GET /torrents/{id_or_infohash}": "Torrent details",
"GET /torrents/{id_or_infohash}/haves": "The bitfield of have pieces",
"GET /torrents/{id_or_infohash}/playlist": "Generate M3U8 playlist for this torrent",
"GET /torrents/{id_or_infohash}/stats/v1": "Torrent stats",
"GET /torrents/{id_or_infohash}/peer_stats": "Per peer stats",
"GET /torrents/{id_or_infohash}/stream/{file_idx}": "Stream a file. Accepts Range header to seek.",
"POST /torrents/{id_or_infohash}/pause": "Pause torrent",
"POST /torrents/{id_or_infohash}/start": "Resume torrent",
"POST /torrents/{id_or_infohash}/forget": "Forget about the torrent, keep the files",
Expand All @@ -88,6 +93,10 @@ impl HttpApi {
state.api_dht_table().map(axum::Json)
}

async fn session_stats(State(state): State<ApiState>) -> impl IntoResponse {
axum::Json(state.api_session_stats())
}

async fn torrents_list(State(state): State<ApiState>) -> impl IntoResponse {
axum::Json(state.api_torrent_list())
}
Expand Down Expand Up @@ -446,6 +455,7 @@ impl HttpApi {
.route("/rust_log", post(set_rust_log))
.route("/dht/stats", get(dht_stats))
.route("/dht/table", get(dht_table))
.route("/stats", get(session_stats))
.route("/torrents", get(torrents_list))
.route("/torrents/:id", get(torrent_details))
.route("/torrents/:id/haves", get(torrent_haves))
Expand Down
1 change: 1 addition & 0 deletions crates/librqbit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ mod peer_info_reader;
mod read_buf;
mod session;
mod session_persistence;
pub mod session_stats;
mod spawn_utils;
pub mod storage;
mod stream_connect;
Expand Down
71 changes: 46 additions & 25 deletions crates/librqbit/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
peer_connection::PeerConnectionOptions,
read_buf::ReadBuf,
session_persistence::{json::JsonSessionPersistenceStore, SessionPersistenceStore},
session_stats::SessionStats,
spawn_utils::BlockingSpawner,
storage::{
filesystem::FilesystemStorageFactory, BoxStorageFactory, StorageFactoryExt, TorrentStorage,
Expand Down Expand Up @@ -116,6 +117,8 @@ pub struct Session {

root_span: Option<Span>,

pub(crate) stats: SessionStats,

// This is stored for all tasks to stop when session is dropped.
_cancellation_token_drop_guard: DropGuard,
}
Expand Down Expand Up @@ -509,8 +512,10 @@ impl Session {

async fn persistence_factory(
opts: &SessionOptions,
) -> anyhow::Result<(Option<Arc<dyn SessionPersistenceStore>>, Arc<dyn BitVFactory>)> {

) -> anyhow::Result<(
Option<Arc<dyn SessionPersistenceStore>>,
Arc<dyn BitVFactory>,
)> {
macro_rules! make_result {
($store:expr) => {
if opts.fastresume {
Expand All @@ -535,7 +540,7 @@ impl Session {
);

make_result!(s)
},
}
#[cfg(feature = "postgres")]
Some(SessionPersistenceConfig::Postgres { connection_string }) => {
use crate::session_persistence::postgres::PostgresSessionStorage;
Expand Down Expand Up @@ -602,17 +607,23 @@ impl Session {
reqwest_client,
connector: stream_connector,
root_span: opts.root_span,
concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new(opts.concurrent_init_limit.unwrap_or(3)))
stats: SessionStats::new(),
concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new(
opts.concurrent_init_limit.unwrap_or(3),
)),
});

if let Some(mut disk_write_rx) = disk_write_rx {
session.spawn(error_span!(parent: session.rs(), "disk_writer"), async move {
while let Some(work) = disk_write_rx.recv().await {
trace!(disk_write_rx_queue_len = disk_write_rx.len());
spawner.spawn_block_in_place(work);
}
Ok(())
});
session.spawn(
error_span!(parent: session.rs(), "disk_writer"),
async move {
while let Some(work) = disk_write_rx.recv().await {
trace!(disk_write_rx_queue_len = disk_write_rx.len());
spawner.spawn_block_in_place(work);
}
Ok(())
},
);
}

if let Some(tcp_listener) = tcp_listener {
Expand All @@ -639,28 +650,36 @@ impl Session {
let mut futs = FuturesUnordered::new();

while !added_all || !futs.is_empty() {
// NOTE: this closure exists purely to workaround rustfmt screwing up when inlining it.
let add_torrent_span = |info_hash: &Id20| -> tracing::Span {
error_span!(parent: session.rs(), "add_torrent", info_hash=?info_hash)
};
tokio::select! {
Some(res) = futs.next(), if !futs.is_empty() => {
if let Err(e) = res {
error!("error adding torrent to session: {e:?}");
}
},
}
st = ps.next(), if !added_all => {
if let Some(st) = st {
let (id, st) = st?;
let span = error_span!(parent: session.rs(), "add_torrent", info_hash=?st.info_hash());
let (add_torrent, mut opts) = st.into_add_torrent()?;
opts.preferred_id = Some(id);
let fut = session.add_torrent(add_torrent, Some(opts)).instrument(span);
futs.push(fut);
} else {
added_all = true;
}
},
}
match st {
Some(st) => {
let (id, st) = st?;
let span = add_torrent_span(st.info_hash());
let (add_torrent, mut opts) = st.into_add_torrent()?;
opts.preferred_id = Some(id);
let fut = session.add_torrent(add_torrent, Some(opts));
let fut = fut.instrument(span);
futs.push(fut);
},
None => added_all = true
};
}
};
}
}

session.start_speed_estimator_updater();

Ok(session)
}
.boxed()
Expand Down Expand Up @@ -785,7 +804,7 @@ impl Session {
spawn_with_cancel(span, self.cancellation_token.clone(), fut);
}

fn rs(&self) -> Option<tracing::Id> {
pub(crate) fn rs(&self) -> Option<tracing::Id> {
self.root_span.as_ref().and_then(|s| s.id())
}

Expand Down Expand Up @@ -1147,6 +1166,7 @@ impl Session {
self.cancellation_token.child_token(),
self.concurrent_initialize_semaphore.clone(),
self.bitv_factory.clone(),
self.stats.atomic.clone(),
)
.context("error starting torrent")?;
}
Expand Down Expand Up @@ -1303,6 +1323,7 @@ impl Session {
self.cancellation_token.child_token(),
self.concurrent_initialize_semaphore.clone(),
self.bitv_factory.clone(),
self.stats.atomic.clone(),
)?;
self.try_update_persistence_metadata(handle).await;
Ok(())
Expand Down
10 changes: 10 additions & 0 deletions crates/librqbit/src/session_stats/atomic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use std::sync::atomic::AtomicU64;

use crate::torrent_state::live::peers::stats::atomic::AggregatePeerStatsAtomic;

#[derive(Default, Debug)]
pub struct AtomicSessionStats {
pub fetched_bytes: AtomicU64,
pub uploaded_bytes: AtomicU64,
pub(crate) peers: AggregatePeerStatsAtomic,
}
62 changes: 62 additions & 0 deletions crates/librqbit/src/session_stats/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use std::{
sync::{atomic::Ordering, Arc},
time::{Duration, Instant},
};

use atomic::AtomicSessionStats;
use librqbit_core::speed_estimator::SpeedEstimator;
use snapshot::SessionStatsSnapshot;
use tracing::error_span;

use crate::Session;

pub mod atomic;
pub mod snapshot;

pub struct SessionStats {
pub atomic: Arc<AtomicSessionStats>,
pub down_speed_estimator: SpeedEstimator,
pub up_speed_estimator: SpeedEstimator,
}

impl SessionStats {
pub fn new() -> Self {
SessionStats {
atomic: Default::default(),
down_speed_estimator: SpeedEstimator::new(5),
up_speed_estimator: SpeedEstimator::new(5),
}
}
}

impl Default for SessionStats {
fn default() -> Self {
Self::new()
}
}

impl Session {
pub(crate) fn start_speed_estimator_updater(self: &Arc<Self>) {
self.spawn(error_span!(parent: self.rs(), "speed_estimator"), {
let s = self.clone();

async move {
let mut i = tokio::time::interval(Duration::from_secs(1));
loop {
i.tick().await;
let now = Instant::now();
let fetched = s.stats.atomic.fetched_bytes.load(Ordering::Relaxed);
let uploaded = s.stats.atomic.uploaded_bytes.load(Ordering::Relaxed);
s.stats
.down_speed_estimator
.add_snapshot(fetched, None, now);
s.stats.up_speed_estimator.add_snapshot(uploaded, None, now);
}
}
})
}

pub fn stats_snapshot(&self) -> SessionStatsSnapshot {
SessionStatsSnapshot::from(&self.stats)
}
}
22 changes: 22 additions & 0 deletions crates/librqbit/src/session_stats/snapshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use serde::Serialize;

use crate::torrent_state::{peers::stats::snapshot::AggregatePeerStats, stats::Speed};

use super::SessionStats;

#[derive(Debug, Serialize)]
pub struct SessionStatsSnapshot {
download_speed: Speed,
upload_speed: Speed,
peers: AggregatePeerStats,
}

impl From<&SessionStats> for SessionStatsSnapshot {
fn from(s: &SessionStats) -> Self {
Self {
download_speed: s.down_speed_estimator.mbps().into(),
upload_speed: s.up_speed_estimator.mbps().into(),
peers: AggregatePeerStats::from(&s.atomic.peers),
}
}
}
Loading