webrtcsink: Added video simulcast support

This is designed to work with LiveKit server but hopefully will not
require much work for other systems supporting simulcasts
This commit is contained in:
Jordan Yelloz 2024-02-02 15:40:26 -07:00
parent 15061eb584
commit 04757ea61a
No known key found for this signature in database

View file

@ -42,6 +42,8 @@ const D3D11_MEMORY_FEATURE: &str = "memory:D3D11Memory";
const RTP_TWCC_URI: &str =
"http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01";
const RTP_MID_URI: &str = "urn:ietf:params:rtp-hdrext:sdes:mid";
const RTP_RID_URI: &str = "urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id";
const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://stun.l.google.com:19302");
const DEFAULT_MIN_BITRATE: u32 = 1000;
@ -66,6 +68,190 @@ const DEFAULT_START_BITRATE: u32 = 2048000;
#[cfg(feature = "v1_22")]
const DO_FEC_THRESHOLD: u32 = 2000000;
/// Internally unique default IDs for RTP Extensions
#[derive(Debug, Clone, Copy, Hash, PartialOrd, Ord, PartialEq, Eq)]
#[repr(u32)]
enum RTPExtensionId {
Twcc = 1,
Mid,
Rid,
}
impl RTPExtensionId {
pub const fn default_id(self) -> u32 {
self as u32
}
pub const fn uri(self) -> &'static str {
match self {
Self::Twcc => RTP_TWCC_URI,
Self::Mid => RTP_MID_URI,
Self::Rid => RTP_RID_URI,
}
}
pub fn create(&self) -> Option<gst_rtp::RTPHeaderExtension> {
gst_rtp::RTPHeaderExtension::create_from_uri(self.uri())
}
}
struct Simulcast {
mid: String,
funnel: gst::Element,
capsfilter: gst::Element,
webrtcbin_pad: gst::Pad,
}
impl Simulcast {
pub fn new(
mid: &str,
webrtcbin: &gst::Element,
parent: &gst::Bin,
settings: &Settings,
) -> anyhow::Result<Self> {
let webrtcbin_pad = webrtcbin
.request_pad_simple("sink_%u")
.ok_or_else(|| anyhow::anyhow!("failed to request sinkpad from webrtcbin"))?;
let funnel = make_element("rtpfunnel", Some(&format!("simulcast_{mid}")))?;
let capsfilter = make_element("capsfilter", Some(&format!("caps_{mid}")))?;
parent.add_many([&funnel, &capsfilter])?;
funnel.link(&capsfilter)?;
let capsfilter_srcpad = capsfilter.static_pad("src").unwrap();
capsfilter_srcpad.link(&webrtcbin_pad)?;
let transceiver = webrtcbin_pad.property::<gst_webrtc::WebRTCRTPTransceiver>("transceiver");
transceiver.set_direction(gst_webrtc::WebRTCRTPTransceiverDirection::Sendonly);
settings.configure_video_transceiver(&transceiver);
Ok(Self {
mid: mid.to_string(),
funnel,
capsfilter,
webrtcbin_pad,
})
}
fn transceiver(&self) -> gst_webrtc::WebRTCRTPTransceiver {
self.webrtcbin_pad.property("transceiver")
}
fn set_caps(&self, caps: gst::Caps) {
self.transceiver().set_codec_preferences(Some(&caps));
self.capsfilter.set_property("caps", &caps);
}
/// This should be set once the WebRTCPads are built for each InputStream
pub fn set_msid_from_pads<'a, P: Iterator<Item = &'a WebRTCPad>>(&self, pads: P) {
let msids: HashSet<&str> = pads.filter_map(WebRTCPad::stream_msid).collect();
if msids.len() > 1 {
gst::warning!(
CAT,
"multiple MSIDs {msids:?} for members of {}, ignoring",
self.mid,
);
return;
}
if let Some(msid) = msids.into_iter().next() {
self.webrtcbin_pad.set_property("msid", msid);
}
}
/// This should be set once the WebRTCPads are built for each InputStream
pub fn set_caps_from_pads<'a, P: Iterator<Item = &'a WebRTCPad>>(&self, pads: P) {
let simulcast_members = pads
.filter(|pad| pad.stream_mid() == Some(&self.mid))
.filter_map(|pad| pad.stream_rid.clone().zip(Some(pad.payloader_caps.copy())))
.collect::<Vec<_>>();
let mut caps = gst::Caps::new_empty();
let caps_mut = caps.make_mut();
let mut simulcast_rids = vec![];
for (rid, rid_caps) in simulcast_members {
caps_mut.append(rid_caps);
simulcast_rids.push(rid);
}
let simulcast_rids = simulcast_rids.join(";");
let simulcast_field = format!("send {simulcast_rids}");
caps_mut.set("a-mid", &self.mid);
caps_mut.set("a-simulcast", simulcast_field);
self.set_caps(caps);
}
/// Creates video height and width RID restriction parameters from video caps
///
/// This provides information about the video dimensions over SDP but also
/// can be used by the signalling client to inform the server about each
/// simulcast component's dimensions.
/// https://datatracker.ietf.org/doc/html/rfc8851#sec-rid_level_restrictions
fn video_caps_to_restrictions(structure: &gst::StructureRef) -> String {
let max_width = structure
.get::<i32>("width")
.ok()
.map(|width| format!("max-width={width}"));
let max_height = structure
.get::<i32>("height")
.ok()
.map(|height| format!("max-height={height}"));
max_width
.into_iter()
.chain(max_height)
.collect::<Vec<_>>()
.join(";")
}
pub fn add_rid_to_caps_structure(
rid: &str,
caps: &mut gst::Structure,
video_caps: Option<&gst::Caps>,
) {
caps.set(
format!("rid-{rid}"),
Some(&Self::format_rid_restrictions(video_caps)),
);
}
pub fn add_rid_to_caps(rid: &str, caps: &mut gst::CapsRef, video_caps: Option<&gst::Caps>) {
caps.set(
format!("rid-{rid}"),
Some(&Self::format_rid_restrictions(video_caps)),
);
}
pub fn format_rid_restrictions(video_caps: Option<&gst::Caps>) -> String {
if let Some(caps) = video_caps {
let restrictions = caps
.structure(0)
.map(Self::video_caps_to_restrictions)
.unwrap_or_default();
format!("send {restrictions}")
} else {
"send".to_string()
}
}
}
/// Expresses a group of Pads that contribute to a single track.
/// Components of each track are grouped by MID and discriminated by RID.
#[derive(Default)]
struct Simulcasts {
simulcasts: HashMap<String, Simulcast>,
}
impl Simulcasts {
pub fn get(&self, mid: &str) -> Option<&Simulcast> {
self.simulcasts.get(mid)
}
pub fn funnel(&self, mid: &str) -> Option<gst::Element> {
self.get(mid).map(|simulcast| simulcast.funnel.clone())
}
pub fn add(
&mut self,
mid: &str,
webrtcbin: &gst::Element,
parent: &gst::Bin,
settings: &Settings,
) -> anyhow::Result<()> {
if !self.simulcasts.contains_key(mid) {
let simulcast = Simulcast::new(mid, webrtcbin, parent, settings)?;
self.simulcasts.insert(mid.to_string(), simulcast);
}
Ok(())
}
pub fn iter(&self) -> impl Iterator<Item = (&String, &Simulcast)> {
self.simulcasts.iter()
}
}
#[derive(Debug, Clone, Copy)]
struct CCInfo {
heuristic: WebRTCSinkCongestionControl,
@ -91,6 +277,15 @@ struct Settings {
signaller: Signallable,
}
impl Settings {
pub fn configure_video_transceiver(&self, trans: &gst_webrtc::WebRTCRTPTransceiver) {
if self.do_fec {
trans.set_property("fec-type", gst_webrtc::WebRTCFECType::UlpRed);
}
trans.set_property("do-nack", self.do_retransmission);
}
}
#[derive(Debug, Clone)]
struct DiscoveryInfo {
id: String,
@ -200,6 +395,8 @@ struct InputStream {
is_video: bool,
/// Whether initial discovery has started
initial_discovery_started: bool,
/// The index of the SDP media associated with this stream
mline_index: Option<u32>,
}
/// Wrapper around webrtcbin pads
@ -215,8 +412,35 @@ struct WebRTCPad {
/// When None, the pad was only created to mark its transceiver
/// as inactive (in the case where we answer an offer).
stream_name: Option<String>,
/// The user-specified MSID value
stream_msid: Option<String>,
/// The user-specified MID value for Simulcasts
stream_mid: Option<String>,
/// The user-specified RID value for Simulcasts
stream_rid: Option<String>,
/// The payload selected in the answer, None at first
payload: Option<i32>,
/// The caps produced by the payloader.
/// Used by simulcasts to combine caps from multiple payloaders
payloader_caps: gst::Caps,
}
impl WebRTCPad {
fn transceiver(&self) -> gst_webrtc::WebRTCRTPTransceiver {
self.pad.property("transceiver")
}
fn mid(&self) -> Option<String> {
self.transceiver().mid().map(String::from)
}
fn stream_msid(&self) -> Option<&str> {
self.stream_msid.as_deref()
}
fn stream_mid(&self) -> Option<&str> {
self.stream_mid.as_deref()
}
fn stream_rid(&self) -> Option<&str> {
self.stream_rid.as_deref()
}
}
/// Wrapper around GStreamer encoder element, keeps track of factory
@ -265,6 +489,8 @@ struct Session {
codecs: Option<BTreeMap<i32, Codec>>,
stats_collection_handle: Option<tokio::task::JoinHandle<()>>,
simulcasts: Simulcasts,
}
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
@ -436,6 +662,9 @@ struct State {
mids: HashMap<String, String>,
signaller_signals: Option<SignallerSignals>,
finalizing_sessions: Arc<(Mutex<HashSet<String>>, Condvar)>,
/// Used in simulcasts to re-apply the MID/RID extensions when connecting
/// each payloader to the webrtcbin
rtp_header_extensions: HashMap<RTPExtensionId, u32>,
}
fn create_navigation_event(sink: &super::BaseWebRTCSink, msg: &str) {
@ -541,6 +770,7 @@ impl Default for State {
mids: HashMap::new(),
signaller_signals: Default::default(),
finalizing_sessions: Arc::new((Mutex::new(HashSet::new()), Condvar::new())),
rtp_header_extensions: HashMap::new(),
}
}
}
@ -1165,6 +1395,10 @@ impl State {
discos.remove(position);
}
}
fn register_rtp_header_extension(&mut self, ext: RTPExtensionId, id: u32) -> u32 {
*self.rtp_header_extensions.entry(ext).or_insert(id)
}
}
impl Session {
@ -1195,6 +1429,7 @@ impl Session {
stats_sigid: None,
codecs: None,
stats_collection_handle: None,
simulcasts: Simulcasts::default(),
}
}
@ -1319,6 +1554,8 @@ impl Session {
&payloader,
&codec,
Some(webrtc_pad.ssrc),
webrtc_pad.stream_mid(),
webrtc_pad.stream_rid(),
Some(&caps),
ExtensionConfigurationType::Skip,
)?;
@ -1338,14 +1575,15 @@ impl Session {
let s = caps.structure(0).unwrap();
let mut filtered_s = gst::Structure::new_empty("application/x-rtp");
filtered_s.extend(s.iter().filter_map(|(key, value)| {
if key.starts_with("a-") {
None
} else {
Some((key, value.to_owned()))
}
filtered_s.extend(s.iter().filter_map(|(key, value)| match key {
key if key.starts_with("a-") => None,
key if key.starts_with("rid-") => None,
key => Some((key, value.to_owned())),
}));
filtered_s.set("ssrc", webrtc_pad.ssrc);
if let Some(rid) = &webrtc_pad.stream_rid {
Simulcast::add_rid_to_caps_structure(rid, &mut filtered_s, Some(&webrtc_pad.in_caps));
}
let caps = gst::Caps::builder_full().structure(filtered_s).build();
@ -1405,8 +1643,19 @@ impl Session {
let srcpad = pay_filter.static_pad("src").unwrap();
let sink_pad = if let Some(funnel) = webrtc_pad
.mid()
.and_then(|mid| self.simulcasts.funnel(&mid))
{
funnel
.request_pad_simple("sink_%u")
.ok_or_else(|| anyhow::anyhow!("Failed to request funnel pad"))?
} else {
webrtc_pad.pad.clone()
};
srcpad
.link(&webrtc_pad.pad)
.link(&sink_pad)
.with_context(|| format!("Connecting input stream for {}", self.peer_id))?;
match producer.add_consumer(&appsrc) {
@ -1480,6 +1729,41 @@ impl InputStream {
fn msid(&self) -> Option<String> {
self.sink_pad.property("msid")
}
fn mid(&self) -> Option<String> {
self.sink_pad.property("mid")
}
fn rid(&self) -> Option<String> {
self.sink_pad.property("rid")
}
fn mid_and_rid(&self) -> Option<(String, String)> {
self.mid().zip(self.rid())
}
/// When grouping pads/streams by MID for simulcasts, each member will also
/// share the same mline index, causing them to contribute to the same
/// SDP media section.
/// The index will only be incremented for new groups or the default case of
/// ungrouped pads.
fn compute_mline_indexes(streams: &mut [Self]) {
let mut mlines_by_mid = HashMap::new();
let mut index = 0;
for stream in streams {
if let Some(mid) = stream.mid() {
let entry = mlines_by_mid.entry(mid.clone()).or_insert_with(|| {
let current_index = index;
index += 1;
current_index
});
stream.mline_index = Some(*entry);
} else {
stream.mline_index = Some(index);
index += 1;
}
}
}
}
impl NavigationEventHandler {
@ -1512,6 +1796,7 @@ impl NavigationEventHandler {
}
/// How to configure RTP extensions for payloaders, if at all
#[derive(Clone, Copy)]
enum ExtensionConfigurationType {
/// Skip configuration, do not add any extensions
Skip,
@ -1548,7 +1833,9 @@ impl BaseWebRTCSink {
return Ok(());
}
let Some(twcc_id) = self.pick_twcc_extension_id(payloader, extension_configuration_type)
let ext_id = RTPExtensionId::Twcc;
let Some(twcc_id) = self.pick_extension_id(payloader, ext_id, extension_configuration_type)
else {
return Ok(());
};
@ -1561,9 +1848,10 @@ impl BaseWebRTCSink {
* concept of *transport-wide* congestion control, and firefox doesn't
* provide feedback for audio packets.
*/
if let Some(twcc_extension) = gst_rtp::RTPHeaderExtension::create_from_uri(RTP_TWCC_URI) {
if let Some(twcc_extension) = RTPExtensionId::Twcc.create() {
twcc_extension.set_id(twcc_id);
payloader.emit_by_name::<()>("add-extension", &[&twcc_extension]);
self.register_rtp_header_extension(ext_id, twcc_id);
} else {
anyhow::bail!("Failed to add TWCC extension, make sure 'gst-plugins-good:rtpmanager' is installed");
}
@ -1585,41 +1873,112 @@ impl BaseWebRTCSink {
)
}
/// Returns Some with an available ID for TWCC extension or None if it's already configured
fn pick_twcc_extension_id(
fn configure_mid(
&self,
payloader: &gst::Element,
mid: &str,
extension_configuration_type: ExtensionConfigurationType,
) -> anyhow::Result<()> {
if let ExtensionConfigurationType::Skip = extension_configuration_type {
return Ok(());
}
let ext_id = RTPExtensionId::Mid;
let Some(id) = self.pick_extension_id(payloader, ext_id, extension_configuration_type)
else {
return Ok(());
};
gst::debug!(CAT, obj: payloader, "Mapping MID extension to ID {id}, mid={mid}");
if let Some(ext) = RTPExtensionId::Mid.create() {
ext.set_id(id);
ext.set_property("mid", mid);
payloader.emit_by_name::<()>("add-extension", &[&ext]);
self.register_rtp_header_extension(ext_id, id);
} else {
anyhow::bail!(
"Failed to add MID extension, make sure 'gst-plugins-good:rtpmanager' is installed"
);
}
Ok(())
}
fn configure_rid(
&self,
payloader: &gst::Element,
rid: &str,
extension_configuration_type: ExtensionConfigurationType,
) -> anyhow::Result<()> {
if let ExtensionConfigurationType::Skip = extension_configuration_type {
return Ok(());
}
let ext_id = RTPExtensionId::Rid;
let Some(id) = self.pick_extension_id(payloader, ext_id, extension_configuration_type)
else {
return Ok(());
};
gst::debug!(CAT, obj: payloader, "Mapping RID extension to ID {id}, rid={rid}");
if let Some(ext) = RTPExtensionId::Rid.create() {
ext.set_id(id);
ext.set_property("rid", rid);
payloader.emit_by_name::<()>("add-extension", &[&ext]);
self.register_rtp_header_extension(ext_id, id);
} else {
anyhow::bail!(
"Failed to add RID extension, make sure 'gst-plugins-good:rtpmanager' is installed"
);
}
Ok(())
}
/// Returns Some with an available ID for the extension or None if it's already configured
fn pick_extension_id(
&self,
payloader: &gst::Element,
ext: RTPExtensionId,
config: ExtensionConfigurationType,
) -> Option<u32> {
match extension_configuration_type {
gst::debug!(CAT, obj: payloader, "Picking extension id ext={ext:?}");
match config {
ExtensionConfigurationType::Skip => unreachable!(),
ExtensionConfigurationType::Apply { twcc_id } => Some(twcc_id),
ExtensionConfigurationType::Auto => {
// GstRTPBasePayload::extensions property is only available since GStreamer 1.24
if !payloader.has_property("extensions", Some(gst::Array::static_type())) {
let default_id = ext.default_id();
if self.has_connected_payloader_setup_slots() {
gst::warning!(CAT, "'extensions' property is not available: TWCC extension ID will default to 1. \
Application code must ensure to pick non-conflicting IDs for any additionally configured extensions. \
Please consider updating GStreamer to 1.24.");
gst::warning!(CAT, obj: payloader, "'extensions' property is not available: {ext:?} extension ID will default to {default_id}. \
Application code must ensure to pick non-conflicting IDs for any additionally configured extensions. \
Please consider updating GStreamer to 1.24.");
}
return Some(1);
gst::debug!(CAT, obj: payloader, "Using default {default_id} for {ext:?}");
return Some(default_id);
}
let enabled_extensions: gst::Array = payloader.property("extensions");
let twcc = enabled_extensions
let existing_extension = enabled_extensions
.iter()
.find(|value| {
let value = value.get::<gst_rtp::RTPHeaderExtension>().unwrap();
match value.uri() {
Some(v) => v == RTP_TWCC_URI,
None => false,
.filter_map(|value| value.get::<gst_rtp::RTPHeaderExtension>().ok())
.find(|element| {
if let Some(uri) = element.uri() {
uri == ext.uri()
} else {
false
}
})
.map(|value| value.get::<gst_rtp::RTPHeaderExtension>().unwrap());
});
if let Some(ext) = twcc {
gst::debug!(CAT, obj: payloader, "TWCC extension is already mapped to id {} by application", ext.id());
if let Some(ext) = existing_extension {
gst::debug!(CAT, obj: payloader, "{ext:?} extension is already mapped to id {} by application", ext.id());
return None;
}
@ -1631,11 +1990,16 @@ impl BaseWebRTCSink {
Some(ext_id)
}
ExtensionConfigurationType::Apply { twcc_id } => Some(twcc_id),
ExtensionConfigurationType::Skip => unreachable!(),
}
}
fn register_rtp_header_extension(&self, extension: RTPExtensionId, id: u32) -> u32 {
self.state
.lock()
.unwrap()
.register_rtp_header_extension(extension, id)
}
#[allow(clippy::too_many_arguments)]
fn configure_payloader(
&self,
@ -1644,6 +2008,8 @@ impl BaseWebRTCSink {
payloader: &gst::Element,
codec: &Codec,
ssrc: Option<u32>,
mid: Option<&str>,
rid: Option<&str>,
caps: Option<&gst::Caps>,
extension_configuration_type: ExtensionConfigurationType,
) -> Result<(), Error> {
@ -1696,7 +2062,45 @@ impl BaseWebRTCSink {
}
}
self.configure_congestion_control(payloader, codec, extension_configuration_type)
self.configure_congestion_control(payloader, codec, extension_configuration_type)?;
if let Some(mid) = mid {
self.configure_mid(payloader, mid, extension_configuration_type)?;
}
if let Some(rid) = rid {
self.configure_rid(payloader, rid, extension_configuration_type)?;
}
if matches!(
extension_configuration_type,
ExtensionConfigurationType::Skip
) {
gst::debug!(CAT, obj: payloader, "Re-applying stored extension configurations");
for (ext, id) in &self.state.lock().unwrap().rtp_header_extensions {
let Some(rtp_ext) = ext.create() else {
continue;
};
rtp_ext.set_id(*id);
match ext {
RTPExtensionId::Mid => {
if let Some(mid) = mid {
gst::debug!(CAT, obj: payloader, "Re-applying mid={mid:?}");
rtp_ext.set_property("mid", mid);
}
}
RTPExtensionId::Rid => {
if let Some(rid) = rid {
gst::debug!(CAT, obj: payloader, "Re-applying rid={rid:?}");
rtp_ext.set_property("rid", rid);
}
}
_ => {}
};
payloader.emit_by_name::<()>("add-extension", &[&rtp_ext]);
}
}
Ok(())
}
fn generate_ssrc(
@ -1718,9 +2122,9 @@ impl BaseWebRTCSink {
webrtcbin: &gst::Element,
webrtc_pads: &mut HashMap<u32, WebRTCPad>,
is_video: bool,
media_idx: u32,
) {
let ssrc = BaseWebRTCSink::generate_ssrc(element, webrtc_pads);
let media_idx = webrtc_pads.len() as i32;
let Some(pad) = webrtcbin.request_pad_simple(&format!("sink_{}", media_idx)) else {
gst::error!(CAT, obj: element, "Failed to request pad from webrtcbin");
@ -1750,14 +2154,19 @@ impl BaseWebRTCSink {
WebRTCPad {
pad,
in_caps: gst::Caps::new_empty(),
media_idx: media_idx as u32,
media_idx,
ssrc,
stream_name: None,
stream_msid: None,
stream_mid: None,
stream_rid: None,
payload: None,
payloader_caps,
},
);
}
#[allow(clippy::too_many_arguments)]
async fn request_webrtcbin_pad(
element: &super::BaseWebRTCSink,
webrtcbin: &gst::Element,
@ -1766,9 +2175,10 @@ impl BaseWebRTCSink {
settings: &Settings,
webrtc_pads: &mut HashMap<u32, WebRTCPad>,
codecs: &mut BTreeMap<i32, Codec>,
media_idx: u32,
simulcasts: &Simulcasts,
) {
let ssrc = BaseWebRTCSink::generate_ssrc(element, webrtc_pads);
let media_idx = webrtc_pads.len() as i32;
let mut payloader_caps = match media {
Some(media) => {
@ -1776,6 +2186,7 @@ impl BaseWebRTCSink {
let codec = BaseWebRTCSink::select_codec(
element,
&stream.sink_pad,
&discovery_info,
media,
&stream.in_caps.as_ref().unwrap().clone(),
@ -1811,6 +2222,7 @@ impl BaseWebRTCSink {
webrtcbin,
webrtc_pads,
stream.is_video,
media_idx,
);
} else {
let payloader_caps_mut = payloader_caps.make_mut();
@ -1875,47 +2287,70 @@ impl BaseWebRTCSink {
payloader_caps
);
let Some(pad) = webrtcbin.request_pad_simple(&format!("sink_{}", media_idx)) else {
gst::error!(CAT, obj: element, "Failed to request pad from webrtcbin");
gst::element_error!(
element,
gst::StreamError::Failed,
["Failed to request pad from webrtcbin"]
let mid = stream.mid();
let pad = if let Some((rid, simulcast)) = stream
.mid_and_rid()
.filter(|_| stream.is_video)
.and_then(|(mid, rid)| Some(rid).zip(simulcasts.get(&mid)))
{
let Simulcast {
webrtcbin_pad: pad, ..
} = simulcast;
let payloader_caps_mut = payloader_caps.make_mut();
Simulcast::add_rid_to_caps(&rid, payloader_caps_mut, stream.in_caps.as_ref());
gst::debug!(
CAT,
obj: element,
"Have simulcast pad {:?}, media_idx={media_idx}, ssrc={ssrc}, mid={mid:?}, rid={rid}",
pad.name(),
);
return;
};
pad.clone()
} else {
gst::debug!(CAT, obj: element, "Requesting singlecast pad {media_idx}");
let Some(pad) = webrtcbin.request_pad_simple(&format!("sink_{}", media_idx)) else {
gst::error!(CAT, obj: element, "Failed to request pad from webrtcbin");
gst::element_error!(
element,
gst::StreamError::Failed,
["Failed to request pad from webrtcbin"]
);
return;
};
if let Some(msid) = stream.msid() {
gst::trace!(CAT, obj: element, "forwarding msid={msid:?} to webrtcbin sinkpad");
pad.set_property("msid", &msid);
}
let transceiver = pad.property::<gst_webrtc::WebRTCRTPTransceiver>("transceiver");
transceiver.set_property(
"direction",
gst_webrtc::WebRTCRTPTransceiverDirection::Sendonly,
);
transceiver.set_property("codec-preferences", &payloader_caps);
if stream.sink_pad.name().starts_with("video_") {
if settings.do_fec {
transceiver.set_property("fec-type", gst_webrtc::WebRTCFECType::UlpRed);
if let Some(msid) = stream.msid() {
gst::trace!(CAT, obj: element, "forwarding msid={msid:?} to webrtcbin sinkpad");
pad.set_property("msid", &msid);
}
transceiver.set_property("do-nack", settings.do_retransmission);
}
let transceiver = pad.property::<gst_webrtc::WebRTCRTPTransceiver>("transceiver");
transceiver.set_property(
"direction",
gst_webrtc::WebRTCRTPTransceiverDirection::Sendonly,
);
transceiver.set_property("codec-preferences", &payloader_caps);
if stream.sink_pad.name().starts_with("video_") {
settings.configure_video_transceiver(&transceiver);
}
pad
};
webrtc_pads.insert(
ssrc,
WebRTCPad {
pad,
in_caps: stream.in_caps.as_ref().unwrap().clone(),
media_idx: media_idx as u32,
media_idx,
ssrc,
stream_name: Some(stream.sink_pad.name().to_string()),
stream_msid: stream.msid(),
stream_mid: stream.mid(),
stream_rid: stream.rid(),
payload: None,
payloader_caps,
},
);
}
@ -2244,6 +2679,7 @@ impl BaseWebRTCSink {
async fn select_codec(
element: &super::BaseWebRTCSink,
pad: &WebRTCSinkPad,
discovery_info: &DiscoveryInfo,
media: &gst_sdp::SDPMediaRef,
in_caps: &gst::Caps,
@ -2339,6 +2775,7 @@ impl BaseWebRTCSink {
BaseWebRTCSink::run_discovery_pipeline(
element,
pad,
stream_name,
discovery_info,
codec.clone(),
@ -2831,6 +3268,7 @@ impl BaseWebRTCSink {
let mut streams: Vec<InputStream> = state.streams.values().cloned().collect();
streams.sort_by_key(|s| s.serial);
InputStream::compute_mline_indexes(&mut streams);
let element_clone = element.downgrade();
let offer_clone = offer.cloned();
@ -2843,9 +3281,10 @@ impl BaseWebRTCSink {
let mut webrtc_pads: HashMap<u32, WebRTCPad> = HashMap::new();
let mut codecs: BTreeMap<i32, Codec> = BTreeMap::new();
let mut simulcasts = Simulcasts::default();
if let Some(ref offer) = offer_clone {
for media in offer.sdp().medias() {
for (i, media) in offer.sdp().medias().enumerate() {
let media_is_video = match media.media() {
Some("audio") => false,
Some("video") => true,
@ -2874,6 +3313,8 @@ impl BaseWebRTCSink {
&settings_clone,
&mut webrtc_pads,
&mut codecs,
i as u32,
&simulcasts,
)
.await;
} else {
@ -2882,22 +3323,41 @@ impl BaseWebRTCSink {
&webrtcbin,
&mut webrtc_pads,
media_is_video,
i as u32,
);
}
}
} else {
for mut stream in streams {
for mid in streams.iter().filter(|s| s.is_video).filter_map(|s| s.mid()) {
if let Err(e) = simulcasts.add(&mid, &webrtcbin, pipeline.upcast_ref(), &settings_clone) {
gst::error!(CAT, obj: element, "Failed to add simulcast for {mid}: {e:?}");
gst::element_error!(
element,
gst::StreamError::Failed,
["Failed to create simulcast for {}", mid]
);
return;
}
}
for stream in &mut streams {
let media_idx = stream.mline_index.unwrap();
BaseWebRTCSink::request_webrtcbin_pad(
&element,
&webrtcbin,
&mut stream,
stream,
None,
&settings_clone,
&mut webrtc_pads,
&mut codecs,
media_idx,
&simulcasts,
)
.await;
}
for (_, simulcast) in simulcasts.iter() {
simulcast.set_msid_from_pads(webrtc_pads.values());
simulcast.set_caps_from_pads(webrtc_pads.values());
}
}
let enable_data_channel_navigation = settings_clone.enable_data_channel_navigation;
@ -2909,6 +3369,7 @@ impl BaseWebRTCSink {
if let Some(session) = state.sessions.get_mut(&session_id) {
let session = session.unwrap_mut();
session.webrtc_pads = webrtc_pads;
session.simulcasts = simulcasts;
if offer_clone.is_some() {
session.codecs = Some(codecs);
}
@ -3307,8 +3768,10 @@ impl BaseWebRTCSink {
}
}
#[allow(clippy::too_many_arguments)]
async fn run_discovery_pipeline(
element: &super::BaseWebRTCSink,
pad: &WebRTCSinkPad,
stream_name: &str,
discovery_info: &DiscoveryInfo,
codec: Codec,
@ -3367,12 +3830,16 @@ impl BaseWebRTCSink {
);
}
let mid = pad.property::<Option<String>>("mid");
let rid = pad.property::<Option<String>>("rid");
element.imp().configure_payloader(
"discovery",
stream_name,
&payloader,
&codec,
None,
mid.as_deref(),
rid.as_deref(),
None,
extension_configuration_type,
)?;
@ -3498,6 +3965,7 @@ impl BaseWebRTCSink {
async fn lookup_caps(
element: &super::BaseWebRTCSink,
pad: &WebRTCSinkPad,
discovery_info: DiscoveryInfo,
name: String,
output_caps: gst::Caps,
@ -3517,6 +3985,7 @@ impl BaseWebRTCSink {
vec![BaseWebRTCSink::run_discovery_pipeline(
element,
pad,
&name,
&discovery_info,
codec,
@ -3539,6 +4008,7 @@ impl BaseWebRTCSink {
.map(|codec| {
BaseWebRTCSink::run_discovery_pipeline(
element,
pad,
&name,
&discovery_info,
codec.clone(),
@ -3710,7 +4180,7 @@ impl BaseWebRTCSink {
}
}
fn start_stream_discovery_if_needed(&self, stream_name: &str) {
fn start_stream_discovery_if_needed(&self, pad: &WebRTCSinkPad, stream_name: &str) {
let (codecs, discovery_info) = {
let mut state = self.state.lock().unwrap();
@ -3744,11 +4214,12 @@ impl BaseWebRTCSink {
};
let stream_name_clone = stream_name.to_owned();
RUNTIME.spawn(glib::clone!(@weak self as this, @strong discovery_info => async move {
RUNTIME.spawn(glib::clone!(@weak self as this, @strong pad, @strong discovery_info => async move {
let element = &*this.obj();
let (fut, handle) = futures::future::abortable(
Self::lookup_caps(
element,
&pad,
discovery_info.clone(),
stream_name_clone.clone(),
gst::Caps::new_any(),
@ -3797,10 +4268,10 @@ impl BaseWebRTCSink {
fn chain(
&self,
pad: &gst::GhostPad,
pad: &WebRTCSinkPad,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
self.start_stream_discovery_if_needed(pad.name().as_str());
self.start_stream_discovery_if_needed(pad, pad.name().as_str());
self.feed_discoveries(pad.name().as_str(), &buffer);
gst::ProxyPad::chain_default(pad, Some(&*self.obj()), buffer)
@ -4345,7 +4816,7 @@ impl ElementImpl for BaseWebRTCSink {
BaseWebRTCSink::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|this| this.chain(pad.upcast_ref(), buffer),
|this| this.chain(pad, buffer),
)
})
.event_function(|pad, parent, event| {
@ -4372,6 +4843,7 @@ impl ElementImpl for BaseWebRTCSink {
is_video,
serial,
initial_discovery_started: false,
mline_index: None,
},
);