Skip to content

Commit

Permalink
return None in WebRTCListenStream when udp_mux is closed
Browse files Browse the repository at this point in the history
to indicate that no more events will be coming from the stream
  • Loading branch information
melekes committed Jul 26, 2022
1 parent d0c95a4 commit ef5e306
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 11 deletions.
2 changes: 1 addition & 1 deletion transports/webrtc/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ impl Stream for WebRTCListenStream {
}
UDPMuxEvent::Error(e) => {
self.close(Err(Error::UDPMuxError(e)));
return self.poll_next(cx);
return Poll::Ready(None);
}
_ => return self.poll_next(cx),
}
Expand Down
17 changes: 8 additions & 9 deletions transports/webrtc/src/udp_mux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use std::{
const RECEIVE_MTU: usize = 8192;

/// A previously unseen address of a remote who've sent us an ICE binding request.
#[derive(Debug)]
pub struct NewAddr {
pub addr: SocketAddr,
pub ufrag: String,
Expand All @@ -66,6 +67,7 @@ impl UDPMuxParams {
}

/// An event emitted by [`UDPMuxNewAddr`] when it's polled.
#[derive(Debug)]
pub enum UDPMuxEvent {
/// Connection error. UDP mux should be stopped.
Error(Error),
Expand Down Expand Up @@ -93,9 +95,6 @@ pub struct UDPMuxNewAddr {

/// `true` when UDP mux is closed.
is_closed: AtomicBool,

/// Buffer used when reading from the underlying UDP socket.
read_buffer: [u8; RECEIVE_MTU],
}

impl UDPMuxNewAddr {
Expand All @@ -107,7 +106,6 @@ impl UDPMuxNewAddr {
address_map: RwLock::default(),
new_addrs: RwLock::default(),
is_closed: AtomicBool::new(false),
read_buffer: [0u8; RECEIVE_MTU],
})
}

Expand All @@ -133,7 +131,7 @@ impl UDPMuxNewAddr {
conns.get(&ufrag).map(Clone::clone)
}
Err(e) => {
log::debug!("{} (addr: {})", e, addr);
log::debug!("{} (addr={})", e, addr);
None
}
}
Expand All @@ -147,7 +145,8 @@ impl UDPMuxNewAddr {
/// Reads from the underlying UDP socket and either reports a new address or proxies data to the
/// muxed connection.
pub async fn read_from_conn(&self) -> UDPMuxEvent {
let mut buffer = self.read_buffer;
// TODO: avoid reallocating the buffer
let mut buffer = [0u8; RECEIVE_MTU];
let conn = &self.params.conn;

let res = conn.recv_from(&mut buffer).await;
Expand Down Expand Up @@ -176,7 +175,7 @@ impl UDPMuxNewAddr {
match ufrag_from_stun_message(&buffer, false) {
Ok(ufrag) => {
log::trace!(
"Notifying about new address {} from {}",
"Notifying about new address addr={} from ufrag={}",
&addr,
ufrag
);
Expand All @@ -186,7 +185,7 @@ impl UDPMuxNewAddr {
}
Err(e) => {
log::debug!(
"Unknown address {} (non STUN packet: {})",
"Unknown address addr={} (non STUN packet: {})",
&addr,
e
);
Expand All @@ -197,7 +196,7 @@ impl UDPMuxNewAddr {
Some(conn) => {
tokio::spawn(async move {
if let Err(err) = conn.write_packet(&buffer[..len], addr).await {
log::error!("Failed to write packet: {} (addr: {})", err, addr);
log::error!("Failed to write packet: {} (addr={})", err, addr);
}
});
}
Expand Down
3 changes: 2 additions & 1 deletion transports/webrtc/src/webrtc_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl WebRTCConnection {
udp_mux: Arc<dyn UDPMux + Send + Sync>,
remote_fingerprint: &str,
) -> Result<Self, Error> {
// TODO: at least 128 bit of entropy
let ufrag: String = thread_rng()
.sample_iter(&Alphanumeric)
.take(64)
Expand Down Expand Up @@ -116,7 +117,7 @@ impl WebRTCConnection {
let client_session_description = render_description(
sdp::CLIENT_SESSION_DESCRIPTION,
addr,
"UNKNOWN", // certificate verification is disabled, so any value is okay.
"NONE", // certificate verification is disabled, so any value is okay.
&remote_ufrag,
);
log::debug!("OFFER: {:?}", client_session_description);
Expand Down

0 comments on commit ef5e306

Please sign in to comment.