Skip to content

Commit

Permalink
Rework transceiver direction support further (#210)
Browse files Browse the repository at this point in the history
The previous change to transceiver direction behaviour was a step in the
right direction(lol) but it wasn't correct. In particular we were making
changes when applying a remote offer that should happen when applying a
remote or local answer. The state machine for receivers was also
incorrect.
  • Loading branch information
lookback-hugotunius authored Jul 1, 2022
1 parent 0061742 commit 9db7d2e
Show file tree
Hide file tree
Showing 8 changed files with 435 additions and 124 deletions.
238 changes: 166 additions & 72 deletions src/peer_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,10 @@ impl RTCPeerConnection {
let current_local_description = params.current_local_description.lock().await;
current_local_description.clone()
};
let current_remote_description = {
let current_remote_description = params.current_remote_description.lock().await;
current_remote_description.clone()
};

if let Some(local_desc) = &current_local_description {
let len_data_channel = {
Expand Down Expand Up @@ -470,9 +474,7 @@ impl RTCPeerConnection {
match local_desc.sdp_type {
RTCSdpType::Offer => {
// Step 5.3.2
let current_remote_description =
params.current_remote_description.lock().await;
if let Some(remote_desc) = &*current_remote_description {
if let Some(remote_desc) = &current_remote_description {
if let Some(rm) =
get_by_mid(t.mid().await.as_str(), remote_desc)
{
Expand All @@ -487,8 +489,28 @@ impl RTCPeerConnection {
}
}
RTCSdpType::Answer => {
let remote_desc = match &current_remote_description {
Some(d) => d,
None => return true,
};
let offered_direction =
match get_by_mid(t.mid().await.as_str(), remote_desc) {
Some(d) => {
let dir = get_peer_direction(d);
if dir == RTCRtpTransceiverDirection::Unspecified {
RTCRtpTransceiverDirection::Inactive
} else {
dir
}
}
None => RTCRtpTransceiverDirection::Inactive,
};

let current_direction = get_peer_direction(m);
// Step 5.3.3
if m.attribute(t.direction().to_string().as_str()).is_none() {
if current_direction
!= t.direction().intersect(offered_direction.reverse())
{
return true;
}
}
Expand Down Expand Up @@ -1234,7 +1256,43 @@ impl RTCPeerConnection {

let we_answer = desc.sdp_type == RTCSdpType::Answer;
let remote_description = self.remote_description().await;
let mut local_transceivers = self.get_transceivers().await;
if we_answer {
if let Some(parsed) = desc.parsed {
// WebRTC Spec 1.0 https://www.w3.org/TR/webrtc/
// Section 4.4.1.5
for media in &parsed.media_descriptions {
if media.media_name.media == MEDIA_SECTION_APPLICATION {
continue;
}

let kind = RTPCodecType::from(media.media_name.media.as_str());
let direction = get_peer_direction(media);
if kind == RTPCodecType::Unspecified
|| direction == RTCRtpTransceiverDirection::Unspecified
{
continue;
}

let mid_value = match get_mid_value(media) {
Some(mid) if !mid.is_empty() => mid,
_ => continue,
};

let t = match find_by_mid(mid_value, &mut local_transceivers).await {
Some(t) => t,
None => continue,
};
let previous_direction = t.current_direction();
// 4.9.1.7.3 applying a local answer or pranswer
// Set transceiver.[[CurrentDirection]] and transceiver.[[FiredDirection]] to direction.

// TODO: Also set FiredDirection here.
t.set_current_direction(direction);
t.process_new_current_direction(previous_direction).await?;
}
}

if let Some(remote_desc) = remote_description {
self.start_rtp_senders().await?;

Expand Down Expand Up @@ -1305,87 +1363,123 @@ impl RTCPeerConnection {
let we_offer = desc.sdp_type == RTCSdpType::Answer;

if !we_offer && !detected_plan_b {
if let Some(remote_desc) = remote_description {
if let Some(parsed) = &remote_desc.parsed {
for media in &parsed.media_descriptions {
if let Some(mid_value) = get_mid_value(media) {
if mid_value.is_empty() {
if let Some(parsed) = remote_description.as_ref().and_then(|r| r.parsed.as_ref()) {
for media in &parsed.media_descriptions {
let mid_value = match get_mid_value(media) {
Some(m) => {
if m.is_empty() {
return Err(Error::ErrPeerConnRemoteDescriptionWithoutMidValue);
} else {
m
}
}
None => continue,
};

if media.media_name.media == MEDIA_SECTION_APPLICATION {
continue;
}
if media.media_name.media == MEDIA_SECTION_APPLICATION {
continue;
}

let kind = RTPCodecType::from(media.media_name.media.as_str());
let direction = get_peer_direction(media);
if kind == RTPCodecType::Unspecified
|| direction == RTCRtpTransceiverDirection::Unspecified
{
continue;
}
let kind = RTPCodecType::from(media.media_name.media.as_str());
let direction = get_peer_direction(media);
if kind == RTPCodecType::Unspecified
|| direction == RTCRtpTransceiverDirection::Unspecified
{
continue;
}

let t = if let Some(t) =
find_by_mid(mid_value, &mut local_transceivers).await
{
Some(t)
let t = if let Some(t) =
find_by_mid(mid_value, &mut local_transceivers).await
{
Some(t)
} else {
satisfy_type_and_direction(kind, direction, &mut local_transceivers)
.await
};

if let Some(t) = t {
if t.mid().await.is_empty() {
t.set_mid(mid_value.to_owned()).await?;
}
} else {
let receiver = Arc::new(RTCRtpReceiver::new(
self.internal.setting_engine.get_receive_mtu(),
kind,
Arc::clone(&self.internal.dtls_transport),
Arc::clone(&self.internal.media_engine),
Arc::clone(&self.interceptor),
));

let local_direction =
if direction == RTCRtpTransceiverDirection::Recvonly {
RTCRtpTransceiverDirection::Sendonly
} else {
satisfy_type_and_direction(
kind,
direction,
&mut local_transceivers,
)
.await
RTCRtpTransceiverDirection::Recvonly
};

if let Some(t) = t {
let previous_direction = t.direction();
// 4.7.9.2.
// Let direction be an RTCRtpTransceiverDirection value representing the
// direction from the media description, but with the send and receive directions
// reversed to represent this peer's point of view.
// If the media description is rejected, set direction to "inactive".
let new_direction = direction.reverse();

if t.mid().await.is_empty() {
t.set_mid(mid_value.to_owned()).await?;
}
let t = RTCRtpTransceiver::new(
Some(receiver),
None,
local_direction,
kind,
vec![],
Arc::clone(&self.internal.media_engine),
)
.await;

t.set_direction(new_direction);
t.process_new_direction(previous_direction).await?;
} else {
let receiver = Arc::new(RTCRtpReceiver::new(
self.internal.setting_engine.get_receive_mtu(),
kind,
Arc::clone(&self.internal.dtls_transport),
Arc::clone(&self.internal.media_engine),
Arc::clone(&self.interceptor),
));

let local_direction =
if direction == RTCRtpTransceiverDirection::Recvonly {
RTCRtpTransceiverDirection::Sendonly
} else {
RTCRtpTransceiverDirection::Recvonly
};

let t = RTCRtpTransceiver::new(
Some(receiver),
None,
local_direction,
kind,
vec![],
Arc::clone(&self.internal.media_engine),
)
.await;
self.internal.add_rtp_transceiver(Arc::clone(&t)).await;

self.internal.add_rtp_transceiver(Arc::clone(&t)).await;
if t.mid().await.is_empty() {
t.set_mid(mid_value.to_owned()).await?;
}
}
}
}
}

if t.mid().await.is_empty() {
t.set_mid(mid_value.to_owned()).await?;
}
if we_offer {
// WebRTC Spec 1.0 https://www.w3.org/TR/webrtc/
// Section 4.4.1.5
// This is an answer from the remote.
if let Some(parsed) = remote_description.as_ref().and_then(|r| r.parsed.as_ref()) {
for media in &parsed.media_descriptions {
let mid_value = match get_mid_value(media) {
Some(m) => {
if m.is_empty() {
return Err(Error::ErrPeerConnRemoteDescriptionWithoutMidValue);
} else {
m
}
}
None => continue,
};

if media.media_name.media == MEDIA_SECTION_APPLICATION {
continue;
}
let kind = RTPCodecType::from(media.media_name.media.as_str());
let direction = get_peer_direction(media);
if kind == RTPCodecType::Unspecified
|| direction == RTCRtpTransceiverDirection::Unspecified
{
continue;
}

if let Some(t) = find_by_mid(mid_value, &mut local_transceivers).await {
let previous_direction = t.direction();

// 4.9.2.9
// Let direction be an RTCRtpTransceiverDirection value representing the direction
// from the media description, but with the send and receive directions reversed to
// represent this peer's point of view. If the media description is rejected,
// set direction to "inactive".
let reversed_direction = direction.reverse();

// 4.9.2.13.2
// Set transceiver.[[CurrentDirection]] and transceiver.[[Direction]]s to direction.
t.set_direction(reversed_direction);
t.set_current_direction(reversed_direction);
t.process_new_current_direction(previous_direction).await?;
}
}
}
Expand Down
14 changes: 3 additions & 11 deletions src/peer_connection/peer_connection_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ impl PeerConnectionInternal {
.await;
}

pub(super) async fn remote_description(self: &Arc<Self>) -> Option<RTCSessionDescription> {
pub(super) async fn remote_description(&self) -> Option<RTCSessionDescription> {
let pending_remote_description = self.pending_remote_description.lock().await;
if pending_remote_description.is_some() {
pending_remote_description.clone()
Expand Down Expand Up @@ -800,16 +800,7 @@ impl PeerConnectionInternal {
let ice_params = self.ice_gatherer.get_local_parameters().await?;
let candidates = self.ice_gatherer.get_local_candidates().await?;

let remote_description = {
let pending_remote_description = self.pending_remote_description.lock().await;
if pending_remote_description.is_some() {
pending_remote_description.clone()
} else {
let current_remote_description = self.current_remote_description.lock().await;
current_remote_description.clone()
}
};

let remote_description = self.remote_description().await;
let detected_plan_b = description_is_plan_b(remote_description.as_ref())?;
let mut media_sections = vec![];
let mut already_have_application_media_section = false;
Expand Down Expand Up @@ -898,6 +889,7 @@ impl PeerConnectionInternal {
id: mid_value.to_owned(),
transceivers: media_transceivers,
rid_map: get_rids(media),
offered_direction: (!include_unmatched).then(|| direction),
..Default::default()
});
} else {
Expand Down
43 changes: 42 additions & 1 deletion src/peer_connection/sdp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ pub(crate) struct AddTransceiverSdpParams {
mid_value: String,
dtls_role: ConnectionRole,
ice_gathering_state: RTCIceGatheringState,
offered_direction: Option<RTCRtpTransceiverDirection>,
}

pub(crate) async fn add_transceiver_sdp(
Expand Down Expand Up @@ -559,7 +560,45 @@ pub(crate) async fn add_transceiver_sdp(
}
}

media = media.with_property_attribute(t.direction().to_string());
let direction = match params.offered_direction {
Some(offered_direction) => {
use RTCRtpTransceiverDirection::*;
let transceiver_direction = t.direction();

match offered_direction {
Sendonly | Recvonly => {
// If a stream is offered as sendonly, the corresponding stream MUST be
// marked as recvonly or inactive in the answer.

// If a media stream is
// listed as recvonly in the offer, the answer MUST be marked as
// sendonly or inactive in the answer.
offered_direction.reverse().intersect(transceiver_direction)
}
// If an offered media stream is
// listed as sendrecv (or if there is no direction attribute at the
// media or session level, in which case the stream is sendrecv by
// default), the corresponding stream in the answer MAY be marked as
// sendonly, recvonly, sendrecv, or inactive
Sendrecv | Unspecified => t.direction(),
// If an offered media
// stream is listed as inactive, it MUST be marked as inactive in the
// answer.
Inactive => Inactive,
}
}
None => {
// If don't have an offered direction to intersect with just use the transceivers
// current direction.
//
// https://datatracker.ietf.org/doc/html/rfc8829#section-4.2.3
//
// When creating offers, the transceiver direction is directly reflected
// in the output, even for re-offers.
t.direction()
}
};
media = media.with_property_attribute(direction.to_string());

for fingerprint in dtls_fingerprints {
media = media.with_fingerprint(
Expand All @@ -582,6 +621,7 @@ pub(crate) struct MediaSection {
pub(crate) transceivers: Vec<Arc<RTCRtpTransceiver>>,
pub(crate) data: bool,
pub(crate) rid_map: HashMap<String, String>,
pub(crate) offered_direction: Option<RTCRtpTransceiverDirection>,
}

pub(crate) struct PopulateSdpParams {
Expand Down Expand Up @@ -641,6 +681,7 @@ pub(crate) async fn populate_sdp(
mid_value: m.id.clone(),
dtls_role: params.connection_role,
ice_gathering_state: params.ice_gathering_state,
offered_direction: m.offered_direction,
};
let (d1, should_add_id) = add_transceiver_sdp(
d,
Expand Down
Loading

0 comments on commit 9db7d2e

Please sign in to comment.