diff --git a/src/peer_connection/mod.rs b/src/peer_connection/mod.rs index 8a4088cc8..4e3a288b3 100644 --- a/src/peer_connection/mod.rs +++ b/src/peer_connection/mod.rs @@ -425,6 +425,10 @@ impl RTCPeerConnection { let current_local_description = params.current_local_description.lock().await; current_local_description.clone() }; + let current_remote_description = { + let current_remote_description = params.current_remote_description.lock().await; + current_remote_description.clone() + }; if let Some(local_desc) = ¤t_local_description { let len_data_channel = { @@ -470,9 +474,7 @@ impl RTCPeerConnection { match local_desc.sdp_type { RTCSdpType::Offer => { // Step 5.3.2 - let current_remote_description = - params.current_remote_description.lock().await; - if let Some(remote_desc) = &*current_remote_description { + if let Some(remote_desc) = ¤t_remote_description { if let Some(rm) = get_by_mid(t.mid().await.as_str(), remote_desc) { @@ -487,8 +489,28 @@ impl RTCPeerConnection { } } RTCSdpType::Answer => { + let remote_desc = match ¤t_remote_description { + Some(d) => d, + None => return true, + }; + let offered_direction = + match get_by_mid(t.mid().await.as_str(), remote_desc) { + Some(d) => { + let dir = get_peer_direction(d); + if dir == RTCRtpTransceiverDirection::Unspecified { + RTCRtpTransceiverDirection::Inactive + } else { + dir + } + } + None => RTCRtpTransceiverDirection::Inactive, + }; + + let current_direction = get_peer_direction(m); // Step 5.3.3 - if m.attribute(t.direction().to_string().as_str()).is_none() { + if current_direction + != t.direction().intersect(offered_direction.reverse()) + { return true; } } @@ -1234,7 +1256,43 @@ impl RTCPeerConnection { let we_answer = desc.sdp_type == RTCSdpType::Answer; let remote_description = self.remote_description().await; + let mut local_transceivers = self.get_transceivers().await; if we_answer { + if let Some(parsed) = desc.parsed { + // WebRTC Spec 1.0 https://www.w3.org/TR/webrtc/ + // Section 4.4.1.5 + for media in &parsed.media_descriptions { + if media.media_name.media == MEDIA_SECTION_APPLICATION { + continue; + } + + let kind = RTPCodecType::from(media.media_name.media.as_str()); + let direction = get_peer_direction(media); + if kind == RTPCodecType::Unspecified + || direction == RTCRtpTransceiverDirection::Unspecified + { + continue; + } + + let mid_value = match get_mid_value(media) { + Some(mid) if !mid.is_empty() => mid, + _ => continue, + }; + + let t = match find_by_mid(mid_value, &mut local_transceivers).await { + Some(t) => t, + None => continue, + }; + let previous_direction = t.current_direction(); + // 4.9.1.7.3 applying a local answer or pranswer + // Set transceiver.[[CurrentDirection]] and transceiver.[[FiredDirection]] to direction. + + // TODO: Also set FiredDirection here. + t.set_current_direction(direction); + t.process_new_current_direction(previous_direction).await?; + } + } + if let Some(remote_desc) = remote_description { self.start_rtp_senders().await?; @@ -1305,87 +1363,123 @@ impl RTCPeerConnection { let we_offer = desc.sdp_type == RTCSdpType::Answer; if !we_offer && !detected_plan_b { - if let Some(remote_desc) = remote_description { - if let Some(parsed) = &remote_desc.parsed { - for media in &parsed.media_descriptions { - if let Some(mid_value) = get_mid_value(media) { - if mid_value.is_empty() { + if let Some(parsed) = remote_description.as_ref().and_then(|r| r.parsed.as_ref()) { + for media in &parsed.media_descriptions { + let mid_value = match get_mid_value(media) { + Some(m) => { + if m.is_empty() { return Err(Error::ErrPeerConnRemoteDescriptionWithoutMidValue); + } else { + m } + } + None => continue, + }; - if media.media_name.media == MEDIA_SECTION_APPLICATION { - continue; - } + if media.media_name.media == MEDIA_SECTION_APPLICATION { + continue; + } - let kind = RTPCodecType::from(media.media_name.media.as_str()); - let direction = get_peer_direction(media); - if kind == RTPCodecType::Unspecified - || direction == RTCRtpTransceiverDirection::Unspecified - { - continue; - } + let kind = RTPCodecType::from(media.media_name.media.as_str()); + let direction = get_peer_direction(media); + if kind == RTPCodecType::Unspecified + || direction == RTCRtpTransceiverDirection::Unspecified + { + continue; + } - let t = if let Some(t) = - find_by_mid(mid_value, &mut local_transceivers).await - { - Some(t) + let t = if let Some(t) = + find_by_mid(mid_value, &mut local_transceivers).await + { + Some(t) + } else { + satisfy_type_and_direction(kind, direction, &mut local_transceivers) + .await + }; + + if let Some(t) = t { + if t.mid().await.is_empty() { + t.set_mid(mid_value.to_owned()).await?; + } + } else { + let receiver = Arc::new(RTCRtpReceiver::new( + self.internal.setting_engine.get_receive_mtu(), + kind, + Arc::clone(&self.internal.dtls_transport), + Arc::clone(&self.internal.media_engine), + Arc::clone(&self.interceptor), + )); + + let local_direction = + if direction == RTCRtpTransceiverDirection::Recvonly { + RTCRtpTransceiverDirection::Sendonly } else { - satisfy_type_and_direction( - kind, - direction, - &mut local_transceivers, - ) - .await + RTCRtpTransceiverDirection::Recvonly }; - if let Some(t) = t { - let previous_direction = t.direction(); - // 4.7.9.2. - // Let direction be an RTCRtpTransceiverDirection value representing the - // direction from the media description, but with the send and receive directions - // reversed to represent this peer's point of view. - // If the media description is rejected, set direction to "inactive". - let new_direction = direction.reverse(); - - if t.mid().await.is_empty() { - t.set_mid(mid_value.to_owned()).await?; - } + let t = RTCRtpTransceiver::new( + Some(receiver), + None, + local_direction, + kind, + vec![], + Arc::clone(&self.internal.media_engine), + ) + .await; - t.set_direction(new_direction); - t.process_new_direction(previous_direction).await?; - } else { - let receiver = Arc::new(RTCRtpReceiver::new( - self.internal.setting_engine.get_receive_mtu(), - kind, - Arc::clone(&self.internal.dtls_transport), - Arc::clone(&self.internal.media_engine), - Arc::clone(&self.interceptor), - )); - - let local_direction = - if direction == RTCRtpTransceiverDirection::Recvonly { - RTCRtpTransceiverDirection::Sendonly - } else { - RTCRtpTransceiverDirection::Recvonly - }; - - let t = RTCRtpTransceiver::new( - Some(receiver), - None, - local_direction, - kind, - vec![], - Arc::clone(&self.internal.media_engine), - ) - .await; + self.internal.add_rtp_transceiver(Arc::clone(&t)).await; - self.internal.add_rtp_transceiver(Arc::clone(&t)).await; + if t.mid().await.is_empty() { + t.set_mid(mid_value.to_owned()).await?; + } + } + } + } + } - if t.mid().await.is_empty() { - t.set_mid(mid_value.to_owned()).await?; - } + if we_offer { + // WebRTC Spec 1.0 https://www.w3.org/TR/webrtc/ + // Section 4.4.1.5 + // This is an answer from the remote. + if let Some(parsed) = remote_description.as_ref().and_then(|r| r.parsed.as_ref()) { + for media in &parsed.media_descriptions { + let mid_value = match get_mid_value(media) { + Some(m) => { + if m.is_empty() { + return Err(Error::ErrPeerConnRemoteDescriptionWithoutMidValue); + } else { + m } } + None => continue, + }; + + if media.media_name.media == MEDIA_SECTION_APPLICATION { + continue; + } + let kind = RTPCodecType::from(media.media_name.media.as_str()); + let direction = get_peer_direction(media); + if kind == RTPCodecType::Unspecified + || direction == RTCRtpTransceiverDirection::Unspecified + { + continue; + } + + if let Some(t) = find_by_mid(mid_value, &mut local_transceivers).await { + let previous_direction = t.direction(); + + // 4.9.2.9 + // Let direction be an RTCRtpTransceiverDirection value representing the direction + // from the media description, but with the send and receive directions reversed to + // represent this peer's point of view. If the media description is rejected, + // set direction to "inactive". + let reversed_direction = direction.reverse(); + + // 4.9.2.13.2 + // Set transceiver.[[CurrentDirection]] and transceiver.[[Direction]]s to direction. + t.set_direction(reversed_direction); + t.set_current_direction(reversed_direction); + t.process_new_current_direction(previous_direction).await?; } } } diff --git a/src/peer_connection/peer_connection_internal.rs b/src/peer_connection/peer_connection_internal.rs index 400bb77fb..8bef7d6c1 100644 --- a/src/peer_connection/peer_connection_internal.rs +++ b/src/peer_connection/peer_connection_internal.rs @@ -599,7 +599,7 @@ impl PeerConnectionInternal { .await; } - pub(super) async fn remote_description(self: &Arc) -> Option { + pub(super) async fn remote_description(&self) -> Option { let pending_remote_description = self.pending_remote_description.lock().await; if pending_remote_description.is_some() { pending_remote_description.clone() @@ -800,16 +800,7 @@ impl PeerConnectionInternal { let ice_params = self.ice_gatherer.get_local_parameters().await?; let candidates = self.ice_gatherer.get_local_candidates().await?; - let remote_description = { - let pending_remote_description = self.pending_remote_description.lock().await; - if pending_remote_description.is_some() { - pending_remote_description.clone() - } else { - let current_remote_description = self.current_remote_description.lock().await; - current_remote_description.clone() - } - }; - + let remote_description = self.remote_description().await; let detected_plan_b = description_is_plan_b(remote_description.as_ref())?; let mut media_sections = vec![]; let mut already_have_application_media_section = false; @@ -898,6 +889,7 @@ impl PeerConnectionInternal { id: mid_value.to_owned(), transceivers: media_transceivers, rid_map: get_rids(media), + offered_direction: (!include_unmatched).then(|| direction), ..Default::default() }); } else { diff --git a/src/peer_connection/sdp/mod.rs b/src/peer_connection/sdp/mod.rs index 0fd645f5a..5341aecb0 100644 --- a/src/peer_connection/sdp/mod.rs +++ b/src/peer_connection/sdp/mod.rs @@ -401,6 +401,7 @@ pub(crate) struct AddTransceiverSdpParams { mid_value: String, dtls_role: ConnectionRole, ice_gathering_state: RTCIceGatheringState, + offered_direction: Option, } pub(crate) async fn add_transceiver_sdp( @@ -559,7 +560,45 @@ pub(crate) async fn add_transceiver_sdp( } } - media = media.with_property_attribute(t.direction().to_string()); + let direction = match params.offered_direction { + Some(offered_direction) => { + use RTCRtpTransceiverDirection::*; + let transceiver_direction = t.direction(); + + match offered_direction { + Sendonly | Recvonly => { + // If a stream is offered as sendonly, the corresponding stream MUST be + // marked as recvonly or inactive in the answer. + + // If a media stream is + // listed as recvonly in the offer, the answer MUST be marked as + // sendonly or inactive in the answer. + offered_direction.reverse().intersect(transceiver_direction) + } + // If an offered media stream is + // listed as sendrecv (or if there is no direction attribute at the + // media or session level, in which case the stream is sendrecv by + // default), the corresponding stream in the answer MAY be marked as + // sendonly, recvonly, sendrecv, or inactive + Sendrecv | Unspecified => t.direction(), + // If an offered media + // stream is listed as inactive, it MUST be marked as inactive in the + // answer. + Inactive => Inactive, + } + } + None => { + // If don't have an offered direction to intersect with just use the transceivers + // current direction. + // + // https://datatracker.ietf.org/doc/html/rfc8829#section-4.2.3 + // + // When creating offers, the transceiver direction is directly reflected + // in the output, even for re-offers. + t.direction() + } + }; + media = media.with_property_attribute(direction.to_string()); for fingerprint in dtls_fingerprints { media = media.with_fingerprint( @@ -582,6 +621,7 @@ pub(crate) struct MediaSection { pub(crate) transceivers: Vec>, pub(crate) data: bool, pub(crate) rid_map: HashMap, + pub(crate) offered_direction: Option, } pub(crate) struct PopulateSdpParams { @@ -641,6 +681,7 @@ pub(crate) async fn populate_sdp( mid_value: m.id.clone(), dtls_role: params.connection_role, ice_gathering_state: params.ice_gathering_state, + offered_direction: m.offered_direction, }; let (d1, should_add_id) = add_transceiver_sdp( d, diff --git a/src/peer_connection/sdp/sdp_test.rs b/src/peer_connection/sdp/sdp_test.rs index 399ac1008..64841ef37 100644 --- a/src/peer_connection/sdp/sdp_test.rs +++ b/src/peer_connection/sdp/sdp_test.rs @@ -677,6 +677,7 @@ async fn test_populate_sdp() -> Result<()> { transceivers: vec![tr], data: false, rid_map, + ..Default::default() }]; let d = SessionDescription::default(); @@ -757,6 +758,7 @@ async fn test_populate_sdp() -> Result<()> { transceivers: vec![tr], data: false, rid_map: HashMap::new(), + ..Default::default() }]; let d = SessionDescription::default(); @@ -850,12 +852,14 @@ async fn test_populate_sdp_reject() -> Result<()> { transceivers: vec![trv], data: false, rid_map: HashMap::new(), + ..Default::default() }, MediaSection { id: "audio".to_owned(), transceivers: vec![tra], data: false, rid_map: HashMap::new(), + ..Default::default() }, ]; diff --git a/src/rtp_transceiver/mod.rs b/src/rtp_transceiver/mod.rs index a82dd0a1c..2ec9513c6 100644 --- a/src/rtp_transceiver/mod.rs +++ b/src/rtp_transceiver/mod.rs @@ -14,6 +14,7 @@ use interceptor::{ Attributes, }; +use log::trace; use serde::{Deserialize, Serialize}; use std::sync::atomic::{AtomicBool, AtomicU8, Ordering}; use std::sync::Arc; @@ -168,7 +169,9 @@ pub struct RTCRtpTransceiver { mid: Mutex, //atomic.Value sender: Mutex>>, //atomic.Value receiver: Mutex>>, //atomic.Value - direction: AtomicU8, //RTPTransceiverDirection, //atomic.Value + + direction: AtomicU8, //RTPTransceiverDirection + current_direction: AtomicU8, //RTPTransceiverDirection codecs: Arc>>, // User provided codecs via set_codec_preferences @@ -191,7 +194,10 @@ impl RTCRtpTransceiver { mid: Mutex::new(String::new()), sender: Mutex::new(None), receiver: Mutex::new(None), + direction: AtomicU8::new(direction as u8), + current_direction: AtomicU8::new(RTCRtpTransceiverDirection::Unspecified as u8), + codecs: Arc::new(Mutex::new(codecs)), stopped: AtomicBool::new(false), kind, @@ -304,20 +310,49 @@ impl RTCRtpTransceiver { self.kind } - /// direction returns the RTPTransceiver's current direction + /// direction returns the RTPTransceiver's desired direction. pub fn direction(&self) -> RTCRtpTransceiverDirection { self.direction.load(Ordering::SeqCst).into() } pub(crate) fn set_direction(&self, d: RTCRtpTransceiverDirection) { - self.direction.store(d as u8, Ordering::SeqCst); + let previous: RTCRtpTransceiverDirection = + self.direction.swap(d as u8, Ordering::SeqCst).into(); + trace!( + "Changing direction of transceiver from {} to {}", + previous, + d + ); + } + + /// current_direction returns the RTPTransceiver's current direction as negotiated. + /// + /// If this transceiver has never been negotiated or if it's stopped this returns [`RTCRtpTransceiverDirection::Unspecified`]. + pub fn current_direction(&self) -> RTCRtpTransceiverDirection { + if self.stopped.load(Ordering::SeqCst) { + return RTCRtpTransceiverDirection::Unspecified; + } + + self.current_direction.load(Ordering::SeqCst).into() + } + + pub(crate) fn set_current_direction(&self, d: RTCRtpTransceiverDirection) { + let previous: RTCRtpTransceiverDirection = self + .current_direction + .swap(d as u8, Ordering::SeqCst) + .into(); + trace!( + "Changing current direction of transceiver from {} to {}", + previous, + d, + ); } /// Perform any subsequent actions after altering the transceiver's direction. /// /// After changing the transceiver's direction this method should be called to perform any /// side-effects that results from the new direction, such as pausing/resuming the RTP receiver. - pub(crate) async fn process_new_direction( + pub(crate) async fn process_new_current_direction( &self, previous_direction: RTCRtpTransceiverDirection, ) -> Result<()> { @@ -325,7 +360,12 @@ impl RTCRtpTransceiver { return Ok(()); } - let current_direction = self.direction(); + let current_direction = self.current_direction(); + trace!( + "Processing transceiver direction change from {} to {}", + previous_direction, + current_direction + ); match (previous_direction, current_direction) { (a, b) if a == b => { @@ -334,11 +374,13 @@ impl RTCRtpTransceiver { // All others imply a change (_, RTCRtpTransceiverDirection::Inactive | RTCRtpTransceiverDirection::Sendonly) => { if let Some(receiver) = &*self.receiver.lock().await { + trace!("Pausing receiver {:?}", receiver); receiver.pause().await?; } } (_, RTCRtpTransceiverDirection::Recvonly | RTCRtpTransceiverDirection::Sendrecv) => { if let Some(receiver) = &*self.receiver.lock().await { + trace!("Unpausing receiver {:?}", receiver); receiver.resume().await?; } } diff --git a/src/rtp_transceiver/rtp_receiver/mod.rs b/src/rtp_transceiver/rtp_receiver/mod.rs index a0b9f4c92..fbd1c57c4 100644 --- a/src/rtp_transceiver/rtp_receiver/mod.rs +++ b/src/rtp_transceiver/rtp_receiver/mod.rs @@ -27,19 +27,29 @@ use tokio::sync::{watch, Mutex, RwLock}; #[derive(Debug, Copy, Clone, PartialEq, Eq)] #[repr(u8)] pub enum State { - Initial = 0, - Running = 1, - Paused = 2, - Closed = 3, + /// We haven't started yet. + Unstarted = 0, + /// We haven't started yet and additionally we've been paused. + UnstartedPaused = 1, + + /// We have started and are running. + Started = 2, + + /// We have been paused after starting. + Paused = 3, + + /// We have been stopped. + Stopped = 4, } impl From for State { fn from(value: u8) -> Self { match value { - v if v == State::Initial as u8 => State::Initial, - v if v == State::Running as u8 => State::Running, + v if v == State::Unstarted as u8 => State::Unstarted, + v if v == State::UnstartedPaused as u8 => State::UnstartedPaused, + v if v == State::Started as u8 => State::Started, v if v == State::Paused as u8 => State::Paused, - v if v == State::Closed as u8 => State::Closed, + v if v == State::Stopped as u8 => State::Stopped, _ => unreachable!( "Invalid serialization of {}: {}", std::any::type_name::(), @@ -52,10 +62,11 @@ impl From for State { impl fmt::Display for State { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - State::Initial => write!(f, "Initial"), - State::Running => write!(f, "Running"), + State::Unstarted => write!(f, "Unstarted"), + State::UnstartedPaused => write!(f, "UnstartedPaused"), + State::Started => write!(f, "Running"), State::Paused => write!(f, "Paused"), - State::Closed => write!(f, "Closed"), + State::Stopped => write!(f, "Closed"), } } } @@ -69,15 +80,23 @@ impl State { } match current { - Self::Initial if matches!(to, Self::Running | Self::Paused | Self::Closed) => { + Self::Unstarted + if matches!(to, Self::Started | Self::Stopped | Self::UnstartedPaused) => + { + let _ = tx.send(to); + return Ok(()); + } + Self::UnstartedPaused + if matches!(to, Self::Unstarted | Self::Stopped | Self::Paused) => + { let _ = tx.send(to); return Ok(()); } - State::Running if matches!(to, Self::Paused | Self::Closed) => { + State::Started if matches!(to, Self::Paused | Self::Stopped) => { let _ = tx.send(to); return Ok(()); } - State::Paused if matches!(to, Self::Running | Self::Closed) => { + State::Paused if matches!(to, Self::Started | Self::Stopped) => { let _ = tx.send(to); return Ok(()); } @@ -93,30 +112,34 @@ impl State { match state { _ if states.contains(&state) => return Ok(()), - State::Closed => { + State::Stopped => { return Err(Error::ErrClosedPipe); } _ => {} } - if let Err(_) = rx.changed().await { + if rx.changed().await.is_err() { return Err(Error::ErrClosedPipe); } } } async fn error_on_close(rx: &mut watch::Receiver) -> Result<()> { - if let Err(_) = rx.changed().await { + if rx.changed().await.is_err() { return Err(Error::ErrClosedPipe); } let state = *rx.borrow(); - if state == State::Closed { + if state == State::Stopped { return Err(Error::ErrClosedPipe); } Ok(()) } + + fn is_started(&self) -> bool { + matches!(self, Self::Started | Self::Paused) + } } pub struct RTPReceiverInternal { @@ -141,7 +164,7 @@ impl RTPReceiverInternal { let mut state_watch_rx = self.state_tx.subscribe(); // Ensure we are running or paused. When paused we still receive RTCP even if RTP traffic // isn't flowing. - State::wait_for(&mut state_watch_rx, &[State::Running, State::Paused]).await?; + State::wait_for(&mut state_watch_rx, &[State::Started, State::Paused]).await?; let tracks = self.tracks.read().await; if let Some(t) = tracks.first() { @@ -173,7 +196,7 @@ impl RTPReceiverInternal { // Ensure we are running or paused. When paused we still recevie RTCP even if RTP traffic // isn't flowing. - State::wait_for(&mut state_watch_rx, &[State::Running, State::Paused]).await?; + State::wait_for(&mut state_watch_rx, &[State::Started, State::Paused]).await?; let tracks = self.tracks.read().await; for t in &*tracks { @@ -235,7 +258,7 @@ impl RTPReceiverInternal { let mut state_watch_rx = self.state_tx.subscribe(); // Ensure we are running. - State::wait_for(&mut state_watch_rx, &[State::Running]).await?; + State::wait_for(&mut state_watch_rx, &[State::Started]).await?; //log::debug!("read_rtp enter tracks tid {}", tid); let mut rtp_interceptor = None; @@ -268,11 +291,8 @@ impl RTPReceiverInternal { _ = state_watch_rx.changed() => { let new_state = *state_watch_rx.borrow(); - match new_state { - State::Closed => { - return Err(Error::ErrClosedPipe); - }, - _ => {}, + if new_state == State::Stopped { + return Err(Error::ErrClosedPipe); } current_state = new_state; } @@ -340,19 +360,31 @@ impl RTPReceiverInternal { } pub(crate) fn start(&self) -> Result<()> { - State::transition(State::Running, &self.state_tx) + State::transition(State::Started, &self.state_tx) } pub(crate) fn pause(&self) -> Result<()> { - State::transition(State::Paused, &self.state_tx) + let current = self.current_state(); + + match current { + State::Unstarted => State::transition(State::UnstartedPaused, &self.state_tx), + State::Started => State::transition(State::Paused, &self.state_tx), + _ => Ok(()), + } } pub(crate) fn resume(&self) -> Result<()> { - State::transition(State::Running, &self.state_tx) + let current = self.current_state(); + + match current { + State::UnstartedPaused => State::transition(State::Unstarted, &self.state_tx), + State::Paused => State::transition(State::Started, &self.state_tx), + _ => Ok(()), + } } pub(crate) fn close(&self) -> Result<()> { - State::transition(State::Closed, &self.state_tx) + State::transition(State::Stopped, &self.state_tx) } } @@ -381,7 +413,7 @@ impl RTCRtpReceiver { media_engine: Arc, interceptor: Arc, ) -> Self { - let (state_tx, state_rx) = watch::channel(State::Initial); + let (state_tx, state_rx) = watch::channel(State::Unstarted); RTCRtpReceiver { receive_mtu, @@ -475,7 +507,8 @@ impl RTCRtpReceiver { pub async fn receive(&self, parameters: &RTCRtpReceiveParameters) -> Result<()> { let receiver = Arc::downgrade(&self.internal); - if self.internal.current_state() != State::Initial { + let current_state = self.internal.current_state(); + if current_state.is_started() { return Err(Error::ErrRTPReceiverReceiveAlreadyCalled); } self.internal.start()?; @@ -613,7 +646,7 @@ impl RTCRtpReceiver { } pub(crate) async fn have_received(&self) -> bool { - self.internal.current_state() != State::Initial + self.internal.current_state().is_started() } pub(crate) async fn start(&self, incoming: &TrackDetails) { @@ -641,9 +674,14 @@ impl RTCRtpReceiver { // set track id and label early so they can be set as new track information // is received from the SDP. + let is_unpaused = self.current_state() == State::Started; for track_remote in &self.tracks().await { track_remote.set_id(incoming.id.clone()).await; track_remote.set_stream_id(incoming.stream_id.clone()).await; + + if is_unpaused { + track_remote.fire_onunmute().await; + } } } @@ -653,7 +691,8 @@ impl RTCRtpReceiver { self.internal.close()?; let mut errs = vec![]; - if previous_state != State::Initial { + let was_ever_started = previous_state.is_started(); + if was_ever_started { let tracks = self.internal.tracks.write().await; for t in &*tracks { if let Some(rtcp_read_stream) = &t.stream.rtcp_read_stream { @@ -775,6 +814,10 @@ impl RTCRtpReceiver { pub(crate) async fn pause(&self) -> Result<()> { self.internal.pause()?; + if !self.internal.current_state().is_started() { + return Ok(()); + } + let streams = self.internal.tracks.read().await; for stream in streams.iter() { @@ -789,6 +832,10 @@ impl RTCRtpReceiver { pub(crate) async fn resume(&self) -> Result<()> { self.internal.resume()?; + if !self.internal.current_state().is_started() { + return Ok(()); + } + let streams = self.internal.tracks.read().await; for stream in streams.iter() { diff --git a/src/rtp_transceiver/rtp_transceiver_direction.rs b/src/rtp_transceiver/rtp_transceiver_direction.rs index a8b9d21e4..4cb04f0b9 100644 --- a/src/rtp_transceiver/rtp_transceiver_direction.rs +++ b/src/rtp_transceiver/rtp_transceiver_direction.rs @@ -80,6 +80,30 @@ impl RTCRtpTransceiverDirection { _ => *self, } } + + pub fn intersect(&self, other: RTCRtpTransceiverDirection) -> RTCRtpTransceiverDirection { + Self::from_send_recv( + self.has_send() && other.has_send(), + self.has_recv() && other.has_recv(), + ) + } + + pub fn from_send_recv(send: bool, recv: bool) -> RTCRtpTransceiverDirection { + match (send, recv) { + (true, true) => Self::Sendrecv, + (true, false) => Self::Sendonly, + (false, true) => Self::Recvonly, + (false, false) => Self::Inactive, + } + } + + fn has_send(&self) -> bool { + matches!(self, Self::Sendrecv | Self::Sendonly) + } + + fn has_recv(&self) -> bool { + matches!(self, Self::Sendrecv | Self::Recvonly) + } } pub(crate) fn have_rtp_transceiver_direction_intersection( @@ -129,4 +153,72 @@ mod test { assert_eq!(expected_string, d.to_string()); } } + + #[test] + fn test_rtp_transceiver_has_send() { + let tests = vec![ + (RTCRtpTransceiverDirection::Unspecified, false), + (RTCRtpTransceiverDirection::Sendrecv, true), + (RTCRtpTransceiverDirection::Sendonly, true), + (RTCRtpTransceiverDirection::Recvonly, false), + (RTCRtpTransceiverDirection::Inactive, false), + ]; + + for (d, expected_value) in tests { + assert_eq!(expected_value, d.has_send()); + } + } + + #[test] + fn test_rtp_transceiver_has_recv() { + let tests = vec![ + (RTCRtpTransceiverDirection::Unspecified, false), + (RTCRtpTransceiverDirection::Sendrecv, true), + (RTCRtpTransceiverDirection::Sendonly, false), + (RTCRtpTransceiverDirection::Recvonly, true), + (RTCRtpTransceiverDirection::Inactive, false), + ]; + + for (d, expected_value) in tests { + assert_eq!(expected_value, d.has_recv()); + } + } + + #[test] + fn test_rtp_transceiver_from_send_recv() { + let tests = vec![ + (RTCRtpTransceiverDirection::Sendrecv, (true, true)), + (RTCRtpTransceiverDirection::Sendonly, (true, false)), + (RTCRtpTransceiverDirection::Recvonly, (false, true)), + (RTCRtpTransceiverDirection::Inactive, (false, false)), + ]; + + for (expected_value, (send, recv)) in tests { + assert_eq!( + expected_value, + RTCRtpTransceiverDirection::from_send_recv(send, recv) + ); + } + } + + #[test] + fn test_rtp_transceiver_intersect() { + use RTCRtpTransceiverDirection::*; + + let tests = vec![ + ((Sendrecv, Recvonly), Recvonly), + ((Sendrecv, Sendonly), Sendonly), + ((Sendrecv, Inactive), Inactive), + ((Sendonly, Inactive), Inactive), + ((Recvonly, Inactive), Inactive), + ((Recvonly, Sendrecv), Recvonly), + ((Sendonly, Sendrecv), Sendonly), + ((Sendonly, Recvonly), Inactive), + ((Recvonly, Recvonly), Recvonly), + ]; + + for ((a, b), expected_direction) in tests { + assert_eq!(expected_direction, a.intersect(b)); + } + } } diff --git a/src/rtp_transceiver/rtp_transceiver_test.rs b/src/rtp_transceiver/rtp_transceiver_test.rs index 246bedfad..bf674d469 100644 --- a/src/rtp_transceiver/rtp_transceiver_test.rs +++ b/src/rtp_transceiver/rtp_transceiver_test.rs @@ -3,7 +3,6 @@ use crate::api::media_engine::{MIME_TYPE_OPUS, MIME_TYPE_VP8, MIME_TYPE_VP9}; use crate::api::APIBuilder; use crate::peer_connection::configuration::RTCConfiguration; use crate::peer_connection::peer_connection_test::{close_pair_now, create_vnet_pair}; -use crate::peer_connection::signaling_state::RTCSignalingState; #[tokio::test] async fn test_rtp_transceiver_set_codec_preferences() -> Result<()> { @@ -227,7 +226,7 @@ async fn test_rtp_transceiver_direction_change() -> Result<()> { .add_transceiver_from_kind(RTPCodecType::Video, &[]) .await?; - let answer_transceiver = answer_pc + let _ = answer_pc .add_transceiver_from_kind(RTPCodecType::Video, &[]) .await?;