diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 1b9febb0..cf55efd7 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -7038,6 +7038,24 @@ "return-type": "GStrv", "when": "last" }, + "payloader-setup": { + "args": [ + { + "name": "arg0", + "type": "gchararray" + }, + { + "name": "arg1", + "type": "gchararray" + }, + { + "name": "arg2", + "type": "GstElement" + } + ], + "return-type": "gboolean", + "when": "last" + }, "request-encoded-filter": { "args": [ { diff --git a/net/webrtc/src/utils.rs b/net/webrtc/src/utils.rs index 808bc09a..46176ec6 100644 --- a/net/webrtc/src/utils.rs +++ b/net/webrtc/src/utils.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, HashMap}, + collections::{BTreeMap, HashMap, HashSet}, ops::Deref, sync::atomic::{AtomicBool, Ordering}, }; @@ -611,28 +611,10 @@ impl Codec { }) } - pub fn build_payloader(&self, pt: u32) -> Option { - self.encoding_info.as_ref().map(|info| { - let mut res = info - .payloader - .create() - .property("mtu", 1200_u32) - .property("pt", pt); - - match info.payloader.name().as_str() { - "rtpvp8pay" | "rtpvp9pay" => { - res = res.property_from_str("picture-id-mode", "15-bit"); - } - "rtph264pay" | "rtph265pay" => { - res = res - .property_from_str("aggregate-mode", "zero-latency") - .property("config-interval", -1i32); - } - _ => (), - } - - res.build().unwrap() - }) + pub fn create_payloader(&self) -> Option { + self.encoding_info + .as_ref() + .map(|info| info.payloader.create().build().unwrap()) } pub fn raw_converter_filter(&self) -> Result { @@ -938,3 +920,39 @@ pub struct NavigationEvent { #[serde(flatten)] pub event: gst_video::NavigationEvent, } + +pub fn find_smallest_available_ext_id(ids: impl IntoIterator) -> u32 { + let used_numbers: HashSet<_> = ids.into_iter().collect(); + (1..).find(|&num| !used_numbers.contains(&num)).unwrap() +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_find_smallest_available_ext_id_case( + ids: impl IntoIterator, + expected: u32, + ) -> Result<(), String> { + let actual = find_smallest_available_ext_id(ids); + + if actual != expected { + return Err(format!("Expected {}, got {}", expected, actual)); + } + + Ok(()) + } + + #[test] + fn test_find_smallest_available_ext_id() -> Result<(), String> { + [ + (vec![], 1u32), + (vec![2u32, 3u32, 4u32], 1u32), + (vec![1u32, 3u32, 4u32], 2u32), + (vec![4u32, 1u32, 3u32], 2u32), + (vec![1u32, 2u32, 3u32], 4u32), + ] + .into_iter() + .try_for_each(|(input, expected)| test_find_smallest_available_ext_id_case(input, expected)) + } +} diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index 85195da5..8371bd5d 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -25,7 +25,7 @@ use crate::aws_kvs_signaller::AwsKvsSignaller; use crate::livekit_signaller::LiveKitSignaller; use crate::signaller::{prelude::*, Signallable, Signaller, WebRTCSignallerRole}; use crate::whip_signaller::WhipClientSignaller; -use crate::RUNTIME; +use crate::{utils, RUNTIME}; use std::collections::{BTreeMap, HashSet}; static CAT: Lazy = Lazy::new(|| { @@ -749,6 +749,36 @@ fn configure_encoder(enc: &gst::Element, start_bitrate: u32) { } } +/// Default configuration for known payloaders, can be disabled +/// by returning True from an payloader-setup handler. +fn configure_payloader(pay: &gst::Element) { + pay.set_property("mtu", 1200_u32); + + match pay.factory().unwrap().name().as_str() { + "rtpvp8pay" | "rtpvp9pay" => { + pay.set_property_from_str("picture-id-mode", "15-bit"); + } + "rtph264pay" | "rtph265pay" => { + pay.set_property_from_str("aggregate-mode", "zero-latency"); + pay.set_property("config-interval", -1i32); + } + _ => (), + } +} + +fn setup_signal_accumulator( + _hint: &glib::subclass::SignalInvocationHint, + ret: &mut glib::Value, + value: &glib::Value, +) -> bool { + let is_configured = value.get::().unwrap(); + let continue_emission = !is_configured; + + *ret = value.clone(); + + continue_emission +} + /// Set of elements used in an EncodingChain struct EncodingChain { raw_filter: Option, @@ -756,22 +786,24 @@ struct EncodingChain { pay_filter: gst::Element, } -struct EncodingChainBuilder { +/// A set of elements that transform raw data into RTP packets +struct PayloadChain { + encoding_chain: EncodingChain, + payloader: gst::Element, +} + +struct PayloadChainBuilder { /// Caps of the input chain input_caps: gst::Caps, - //// Caps expected after the payloader + /// Caps expected after the payloader output_caps: gst::Caps, /// The Codec representing wanted encoding codec: Codec, - /// The SSRC to use for the RTP stream if any /// Filter element between the encoder and the payloader. encoded_filter: Option, - ssrc: Option, - /// The TWCC ID to use for payloaded stream - twcc: Option, } -impl EncodingChainBuilder { +impl PayloadChainBuilder { fn new( input_caps: &gst::Caps, output_caps: &gst::Caps, @@ -783,31 +815,18 @@ impl EncodingChainBuilder { output_caps: output_caps.clone(), codec: codec.clone(), encoded_filter, - ssrc: None, - twcc: None, } } - fn ssrc(mut self, ssrc: u32) -> Self { - self.ssrc = Some(ssrc); - self - } - - fn twcc(mut self, twcc: u32) -> Self { - self.twcc = Some(twcc); - self - } - - fn build(self, pipeline: &gst::Pipeline, src: &gst::Element) -> Result { + fn build(self, pipeline: &gst::Pipeline, src: &gst::Element) -> Result { gst::trace!( CAT, obj: pipeline, "Setting up encoding, input caps: {input_caps}, \ - output caps: {output_caps}, codec: {codec:?}, twcc: {twcc:?}", + output caps: {output_caps}, codec: {codec:?}", input_caps = self.input_caps, output_caps = self.output_caps, codec = self.codec, - twcc = self.twcc, ); let needs_encoding = is_raw_caps(&self.input_caps); @@ -856,33 +875,10 @@ impl EncodingChainBuilder { let pay = self .codec - .build_payloader( - self.codec - .payload() - .expect("Negotiated codec should always have pt set") as u32, - ) + .create_payloader() .expect("Payloaders should always have been set in the CodecInfo we handle"); - if let Some(ssrc) = self.ssrc { - pay.set_property("ssrc", ssrc); - } - - /* We only enforce TWCC in the offer caps, once a remote description - * has been set it will get automatically negotiated. This is necessary - * because the implementor in Firefox had apparently not understood the - * concept of *transport-wide* congestion control, and firefox doesn't - * provide feedback for audio packets. - */ - if let Some(idx) = self.twcc { - if let Some(twcc_extension) = gst_rtp::RTPHeaderExtension::create_from_uri(RTP_TWCC_URI) - { - twcc_extension.set_id(idx); - pay.emit_by_name::<()>("add-extension", &[&twcc_extension]); - } else { - anyhow::bail!("Failed to add TWCC extension, make sure 'gst-plugins-good:rtpmanager' is installed"); - } - } - elements.push(pay); + elements.push(pay.clone()); let pay_filter = gst::ElementFactory::make("capsfilter") .property("caps", self.output_caps) @@ -898,10 +894,13 @@ impl EncodingChainBuilder { gst::Element::link_many(elements.iter().collect::>().as_slice()) .with_context(|| "Linking encoding elements")?; - Ok(EncodingChain { - raw_filter, - encoder, - pay_filter, + Ok(PayloadChain { + encoding_chain: EncodingChain { + raw_filter, + encoder, + pay_filter, + }, + payloader: pay, }) } } @@ -1236,7 +1235,10 @@ impl Session { let output_caps = codec.output_filter().unwrap_or_else(gst::Caps::new_any); - let encoding_chain = EncodingChainBuilder::new( + let PayloadChain { + payloader, + encoding_chain, + } = PayloadChainBuilder::new( &webrtc_pad.in_caps, &output_caps, &codec, @@ -1245,13 +1247,21 @@ impl Session { &[&Some(&self.peer_id), &stream_name, &codec.caps], ), ) - .ssrc(webrtc_pad.ssrc) .build(&self.pipeline, &appsrc)?; if let Some(ref enc) = encoding_chain.encoder { element.emit_by_name::("encoder-setup", &[&self.peer_id, &stream_name, &enc]); } + element.imp().configure_payloader( + &self.peer_id, + stream_name, + &payloader, + &codec, + Some(webrtc_pad.ssrc), + ExtensionConfigurationType::Skip, + )?; + // At this point, the peer has provided its answer, and we want to // let the payloader / encoder perform negotiation according to that. // @@ -1448,7 +1458,103 @@ impl NavigationEventHandler { } } +/// How to configure RTP extensions for payloaders, if at all +enum ExtensionConfigurationType { + /// Skip configuration, do not add any extensions + Skip, + /// Configure extensions and assign IDs automatically, based on already enabled extensions + Auto, + /// Configure extensions, use specific ids that were provided + Apply { twcc_id: u32 }, +} + impl BaseWebRTCSink { + fn configure_congestion_control( + &self, + payloader: &gst::Element, + extension_configuration_type: ExtensionConfigurationType, + ) -> Result<(), Error> { + if let ExtensionConfigurationType::Skip = extension_configuration_type { + return Ok(()); + } + + let settings = self.settings.lock().unwrap(); + + if settings.cc_info.heuristic == WebRTCSinkCongestionControl::Disabled { + return Ok(()); + } + + let enabled_extensions: gst::Array = payloader.property("extensions"); + let twcc = enabled_extensions + .iter() + .find(|value| { + let value = value.get::().unwrap(); + + match value.uri() { + Some(v) => v == RTP_TWCC_URI, + None => false, + } + }) + .map(|value| value.get::().unwrap()); + + if let Some(ext) = twcc { + gst::debug!(CAT, obj: payloader, "TWCC extension is already mapped to id {} by application", ext.id()); + return Ok(()); + } + + let twcc_id = match extension_configuration_type { + ExtensionConfigurationType::Auto => utils::find_smallest_available_ext_id( + enabled_extensions + .iter() + .map(|value| value.get::().unwrap().id()), + ), + ExtensionConfigurationType::Apply { twcc_id } => twcc_id, + ExtensionConfigurationType::Skip => unreachable!(), + }; + gst::debug!(CAT, obj: payloader, "Mapping TWCC extension to ID {}", twcc_id); + + /* We only enforce TWCC in the offer caps, once a remote description + * has been set it will get automatically negotiated. This is necessary + * because the implementor in Firefox had apparently not understood the + * concept of *transport-wide* congestion control, and firefox doesn't + * provide feedback for audio packets. + */ + if let Some(twcc_extension) = gst_rtp::RTPHeaderExtension::create_from_uri(RTP_TWCC_URI) { + twcc_extension.set_id(twcc_id); + payloader.emit_by_name::<()>("add-extension", &[&twcc_extension]); + } else { + anyhow::bail!("Failed to add TWCC extension, make sure 'gst-plugins-good:rtpmanager' is installed"); + } + + Ok(()) + } + + fn configure_payloader( + &self, + peer_id: &str, + stream_name: &str, + payloader: &gst::Element, + codec: &Codec, + ssrc: Option, + extension_configuration_type: ExtensionConfigurationType, + ) -> Result<(), Error> { + self.obj() + .emit_by_name::("payloader-setup", &[&peer_id, &stream_name, &payloader]); + + payloader.set_property( + "pt", + codec + .payload() + .expect("Negotiated codec should always have pt set") as u32, + ); + + if let Some(ssrc) = ssrc { + payloader.set_property("ssrc", ssrc); + } + + self.configure_congestion_control(payloader, extension_configuration_type) + } + fn generate_ssrc( element: &super::BaseWebRTCSink, webrtc_pads: &HashMap, @@ -2026,6 +2132,10 @@ impl BaseWebRTCSink { .iter() .flat_map(|(_, codecs_and_caps)| codecs_and_caps) .map(|(codec, caps)| async move { + let extension_configuration_type = twcc_idx + .map(|twcc_id| ExtensionConfigurationType::Apply { twcc_id }) + .unwrap_or(ExtensionConfigurationType::Skip); + BaseWebRTCSink::run_discovery_pipeline( element, stream_name, @@ -2033,7 +2143,7 @@ impl BaseWebRTCSink { codec.clone(), in_caps.clone(), caps, - twcc_idx, + extension_configuration_type, ) .await .map(|s| { @@ -2999,7 +3109,7 @@ impl BaseWebRTCSink { codec: Codec, input_caps: gst::Caps, output_caps: &gst::Caps, - twcc: Option, + extension_configuration_type: ExtensionConfigurationType, ) -> Result { let pipe = PipelineWrapper(gst::Pipeline::default()); @@ -3029,7 +3139,7 @@ impl BaseWebRTCSink { gst::Element::link_many(elements_slice) .with_context(|| format!("Running discovery pipeline for caps {input_caps}"))?; - let mut encoding_chain_builder = EncodingChainBuilder::new( + let payload_chain_builder = PayloadChainBuilder::new( &src.caps() .expect("Caps should always be set when starting discovery"), output_caps, @@ -3039,10 +3149,11 @@ impl BaseWebRTCSink { &[&Option::::None, &stream_name, &codec.caps], ), ); - if let Some(twcc) = twcc { - encoding_chain_builder = encoding_chain_builder.twcc(twcc) - } - let encoding_chain = encoding_chain_builder.build(&pipe.0, &encoding_chain_src)?; + + let PayloadChain { + payloader, + encoding_chain, + } = payload_chain_builder.build(&pipe.0, &encoding_chain_src)?; if let Some(ref enc) = encoding_chain.encoder { element.emit_by_name::( @@ -3051,6 +3162,15 @@ impl BaseWebRTCSink { ); } + element.imp().configure_payloader( + "discovery", + stream_name, + &payloader, + &codec, + None, + extension_configuration_type, + )?; + let sink = gst_app::AppSink::builder() .callbacks( gst_app::AppSinkCallbacks::builder() @@ -3196,7 +3316,7 @@ impl BaseWebRTCSink { codec, caps, &output_caps, - Some(1), + ExtensionConfigurationType::Auto, )] } else { let sink_caps = discovery_info.caps.clone(); @@ -3218,7 +3338,7 @@ impl BaseWebRTCSink { codec.clone(), sink_caps.clone(), &output_caps, - Some(1), + ExtensionConfigurationType::Auto, ) }) .collect() @@ -3775,7 +3895,7 @@ impl ObjectImpl for BaseWebRTCSink { gst::Element::static_type(), ]) .return_type::() - .accumulator(|_hint, _ret, value| !value.get::().unwrap()) + .accumulator(setup_signal_accumulator) .class_handler(|_, args| { let element = args[0].get::().expect("signal arg"); let enc = args[3].get::().unwrap(); @@ -3795,6 +3915,43 @@ impl ObjectImpl for BaseWebRTCSink { Some(false.to_value()) }) .build(), + /** + * RsBaseWebRTCSink::payloader-setup: + * @consumer_id: Identifier of the consumer, or "discovery" + * when the payloader is used in a discovery pipeline. + * @pad_name: The name of the corresponding input pad + * @payloader: The constructed payloader for selected codec + * + * This signal can be used to tweak @payloader properties, in particular, adding + * additional extensions. + * + * Note that payload type and ssrc settings are managed by webrtcsink element and + * trying to change them from an application handler will have no effect. + * + * Returns: True if the encoder is entirely configured, + * False to let other handlers run. Note that unless your intent is to enforce + * your custom settings, it's recommended to let the default handler run + * (by returning true), which would apply the optimal settings. + */ + glib::subclass::Signal::builder("payloader-setup") + .param_types([ + String::static_type(), + String::static_type(), + gst::Element::static_type(), + ]) + .return_type::() + .accumulator(setup_signal_accumulator) + .class_handler(|_, args| { + let pay = args[3].get::().unwrap(); + + configure_payloader(&pay); + + // The default handler is no-op: the whole configuration logic happens + // in BaseWebRTCSink::configure_payloader, which is where this signal + // is invoked from + Some(false.to_value()) + }) + .build(), /** * RsWebRTCSink::request-encoded-filter: * @consumer_id: Identifier of the consumer