diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 6bbb2a92..9d15dbf2 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -47,7 +47,7 @@ use std::{ collections::{HashMap, HashSet}, net::SocketAddr, sync::{ - atomic::{AtomicU64, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, }, time::{Duration, Instant}, @@ -371,6 +371,7 @@ impl TorrentStateLive { state: self.clone(), tx, counters, + first_message_received: AtomicBool::new(false), }; let options = PeerConnectionOptions { connect_timeout: self.meta.options.peer_connect_timeout, @@ -434,6 +435,7 @@ impl TorrentStateLive { state: state.clone(), tx, counters, + first_message_received: AtomicBool::new(false), }; let options = PeerConnectionOptions { connect_timeout: state.meta.options.peer_connect_timeout, @@ -486,7 +488,7 @@ impl TorrentStateLive { let state = self; loop { let addr = peer_queue_rx.recv().await.context("torrent closed")?; - if state.is_finished_and_dont_need_peers() { + if state.is_finished_and_no_active_streams() { debug!("ignoring peer {} as we are finished", addr); state.peers.mark_peer_not_needed(addr); continue; @@ -656,6 +658,7 @@ impl TorrentStateLive { Ok(()) } + // If we have all selected pieces but not necessarily all pieces. pub(crate) fn is_finished(&self) -> bool { self.get_hns().map(|h| h.finished()).unwrap_or_default() } @@ -670,7 +673,9 @@ impl TorrentStateLive { .any(|file_id| !chunks.is_file_finished(&self.meta.file_infos[file_id])) } - fn is_finished_and_dont_need_peers(&self) -> bool { + // We might have the torrent "finished" i.e. no selected files. But if someone is streaming files despite + // them being selected, we aren't fully "finished". + fn is_finished_and_no_active_streams(&self) -> bool { self.is_finished() && !self.has_active_streams_unfinished_files( &self.lock_read("is_finished_and_dont_need_peers"), @@ -766,6 +771,8 @@ struct PeerHandler { addr: SocketAddr, tx: PeerTx, + + first_message_received: AtomicBool, } impl<'a> PeerConnectionHandler for &'a PeerHandler { @@ -780,6 +787,14 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { } async fn on_received_message(&self, message: Message>) -> anyhow::Result<()> { + // The first message must be "bitfield", but if it's not sent, + // assume the bitfield is all zeroes and was sent. + if !matches!(&message, Message::Bitfield(..)) + && !self.first_message_received.swap(true, Ordering::Relaxed) + { + self.on_bitfield_notify.notify_waiters(); + } + match message { Message::Request(request) => { self.on_download_request(request) @@ -931,7 +946,7 @@ impl PeerHandler { self.counters.errors.fetch_add(1, Ordering::Relaxed); - if self.state.is_finished_and_dont_need_peers() { + if self.state.is_finished_and_no_active_streams() { debug!("torrent finished, not re-queueing"); pe.value_mut().state.set(PeerState::NotNeeded, pstats); return Ok(()); @@ -953,7 +968,9 @@ impl PeerHandler { duration = format!("{dur:?}") ), async move { + debug!("waiting to reconnect again"); tokio::time::sleep(dur).await; + debug!("finished waiting"); self.state .peers .with_peer_mut(handle, "dead_to_queued", |peer| { @@ -1184,43 +1201,50 @@ impl PeerHandler { .await; } + // The job of this is to request chunks and also to keep peer alive. + // The moment this ends, the peer is disconnected. async fn task_peer_chunk_requester(&self) -> anyhow::Result<()> { let handle = self.addr; self.wait_for_bitfield().await; - // TODO: this check needs to happen more often, we need to update our - // interested state with the other side, for now we send it only once. - if self.state.is_finished_and_dont_need_peers() { - self.tx - .send(WriterRequest::Message(MessageOwned::NotInterested))?; - - if self - .state - .peers - .with_live(self.addr, |l| { - l.has_full_torrent(self.state.lengths.total_pieces() as usize) - }) - .unwrap_or_default() - { - debug!("both peer and us have full torrent, disconnecting"); - self.tx.send(WriterRequest::Disconnect(Ok(())))?; - // Sleep a bit to ensure this gets written to the network by manage_peer - tokio::time::sleep(Duration::from_millis(100)).await; - return Ok(()); + let mut update_interest = { + let mut current = false; + move |h: &PeerHandler, new_value: bool| -> anyhow::Result<()> { + if new_value != current { + h.tx.send(if new_value { + WriterRequest::Message(MessageOwned::Interested) + } else { + WriterRequest::Message(MessageOwned::NotInterested) + })?; + current = new_value; + } + Ok(()) } - } else { - self.tx - .send(WriterRequest::Message(MessageOwned::Interested))?; - } + }; loop { aframe!(self.wait_for_unchoke()).await; - if self.state.is_finished_and_dont_need_peers() { - debug!("nothing left to do, disconnecting peer"); - return Ok(()); + // If we have full torrent, we don't need to request more pieces. + // However we might still need to seed them to the peer. + if self.state.is_finished_and_no_active_streams() { + update_interest(self, false)?; + if !self.state.peers.is_peer_interested(self.addr) { + debug!("nothing left to do, neither of us is interested, disconnecting peer"); + self.tx.send(WriterRequest::Disconnect(Ok(())))?; + // wait until the receiver gets the message so that it doesn't finish with an error. + tokio::time::sleep(Duration::from_millis(100)).await; + return Ok(()); + } else { + // TODO: wait for a notification of interest, e.g. update of selected files or new streams or change + // in peer interest. + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } } + update_interest(self, true)?; + // Try steal a pice from a very slow peer first. Otherwise we might wait too long // to download early pieces. // Then try get the next one in queue. @@ -1235,7 +1259,8 @@ impl PeerHandler { None => { debug!("no pieces to request"); match aframe!(tokio::time::timeout( - Duration::from_secs(10), + // Half of default rw timeout not to race with it. + Duration::from_secs(5), new_piece_notify )) .await @@ -1277,7 +1302,7 @@ impl PeerHandler { loop { match aframe!(tokio::time::timeout( - Duration::from_secs(10), + Duration::from_secs(5), aframe!(self.requests_sem.acquire()) )) .await diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index 818483ed..8299fc12 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -28,7 +28,7 @@ impl Peer { tx: PeerTx, counters: &AggregatePeerStatsAtomic, ) -> Self { - let state = PeerStateNoMut(PeerState::Live(LivePeerState::new(peer_id, tx))); + let state = PeerStateNoMut(PeerState::Live(LivePeerState::new(peer_id, tx, true))); counters.inc(&state.0); Self { state, @@ -142,7 +142,10 @@ impl PeerStateNoMut { } match self.take(counters) { PeerState::Queued | PeerState::Dead | PeerState::NotNeeded => { - self.set(PeerState::Live(LivePeerState::new(peer_id, tx)), counters); + self.set( + PeerState::Live(LivePeerState::new(peer_id, tx, true)), + counters, + ); } PeerState::Connecting(..) | PeerState::Live(..) => unreachable!(), } @@ -159,7 +162,10 @@ impl PeerStateNoMut { PeerState::Connecting(tx) => tx, _ => unreachable!(), }; - self.set(PeerState::Live(LivePeerState::new(peer_id, tx)), counters); + self.set( + PeerState::Live(LivePeerState::new(peer_id, tx, false)), + counters, + ); self.get_live_mut() } else { None @@ -189,10 +195,10 @@ pub(crate) struct LivePeerState { } impl LivePeerState { - pub fn new(peer_id: Id20, tx: PeerTx) -> Self { + pub fn new(peer_id: Id20, tx: PeerTx, initial_interested: bool) -> Self { LivePeerState { peer_id, - peer_interested: false, + peer_interested: initial_interested, bitfield: BF::default(), inflight_requests: Default::default(), tx, diff --git a/crates/librqbit/src/torrent_state/live/peers/mod.rs b/crates/librqbit/src/torrent_state/live/peers/mod.rs index bc4cbd69..edbd8ff2 100644 --- a/crates/librqbit/src/torrent_state/live/peers/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peers/mod.rs @@ -77,6 +77,11 @@ impl PeerStates { Some(p) } + pub fn is_peer_interested(&self, handle: PeerHandle) -> bool { + self.with_live(handle, |live| live.peer_interested) + .unwrap_or(false) + } + pub fn mark_peer_interested(&self, handle: PeerHandle, is_interested: bool) -> Option { self.with_live_mut(handle, "mark_peer_interested", |live| { let prev = live.peer_interested; @@ -84,6 +89,7 @@ impl PeerStates { prev }) } + pub fn update_bitfield_from_vec(&self, handle: PeerHandle, bitfield: Box<[u8]>) -> Option<()> { self.with_live_mut(handle, "update_bitfield_from_vec", |live| { live.bitfield = BF::from_boxed_slice(bitfield);