From ab1ec126983f949804684e11e0e58c7cf3b22bc4 Mon Sep 17 00:00:00 2001 From: Thibault Saunier Date: Wed, 8 Mar 2023 11:15:53 -0300 Subject: [PATCH] webrtcsink: Add support for pre encoded streams This is a first step where we try to replicate encoding conditions from the input stream into the discovery pipeline. A second patch will implement using input buffers in the discovery pipelines. This moves discovery to using input buffers directly. Instead of trying to replicate buffers that `webrtcsink` is getting as input with testsrc, directly run discovery based on the real buffers. This way we are sure we work with the exact right stream type and we don't need encoders to support encoding streams inputs. We use the same logic for both encoded and raw input to avoid having several code paths and makes it all more correct in any case. Part-of: --- net/webrtc/Cargo.toml | 2 +- net/webrtc/src/utils.rs | 143 ++++- net/webrtc/src/webrtcsink/imp.rs | 1002 +++++++++++++++++------------- 3 files changed, 695 insertions(+), 452 deletions(-) diff --git a/net/webrtc/Cargo.toml b/net/webrtc/Cargo.toml index b70411b5..a4cdb971 100644 --- a/net/webrtc/Cargo.toml +++ b/net/webrtc/Cargo.toml @@ -17,6 +17,7 @@ gst-sdp = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package gst-rtp = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package = "gstreamer-rtp", features = ["v1_20"] } gst-utils = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package = "gstreamer-utils" } gst-base = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package = "gstreamer-base" } +uuid = { version = "1", features = ["v4"] } once_cell = "1.0" anyhow = "1" @@ -50,7 +51,6 @@ url-escape = "0.1.1" tracing = { version = "0.1", features = ["log"] } tracing-subscriber = { version = "0.3", features = ["registry", "env-filter"] } tracing-log = "0.1" -uuid = { version = "1", features = ["v4"] } clap = { version = "4", features = ["derive"] } [lib] diff --git a/net/webrtc/src/utils.rs b/net/webrtc/src/utils.rs index 75885f0f..8d9ee2e2 100644 --- a/net/webrtc/src/utils.rs +++ b/net/webrtc/src/utils.rs @@ -159,10 +159,10 @@ impl Codec { let encoder = Self::get_encoder_for_caps(caps, encoders); let payloader = Self::get_payloader_for_codec(name, payloaders); - let encoding_info = if encoder.is_some() && payloader.is_some() { + let encoding_info = if let (Some(encoder), Some(payloader)) = (encoder, payloader) { Some(EncodingInfo { - encoder: encoder.unwrap(), - payloader: payloader.unwrap(), + encoder, + payloader, output_filter: None, }) } else { @@ -323,17 +323,54 @@ impl Codec { }) } - pub fn build_payloader(&self) -> Option> { + pub fn build_payloader(&self, pt: u32) -> Option { self.encoding_info.as_ref().map(|info| { - info.payloader.create().build().with_context(|| { - format!( - "Creating payloader {}", - self.encoding_info.as_ref().unwrap().payloader.name() - ) - }) + let mut res = info + .payloader + .create() + .property("mtu", 1200_u32) + .property("pt", pt); + + if ["vp8enc", "vp9enc"].contains(&self.encoder_name().unwrap().as_str()) { + res = res.property_from_str("picture-id-mode", "15-bit"); + } + + res.build().unwrap() }) } + pub fn raw_converter_filter(&self) -> Result { + let caps = if self.is_video() { + let mut structure_builder = gst::Structure::builder("video/x-raw") + .field("pixel-aspect-ratio", gst::Fraction::new(1, 1)); + + if self.encoder_name().map(|e| e.as_str() == "nvh264enc").unwrap_or(false) { + // Quirk: nvh264enc can perform conversion from RGB formats, but + // doesn't advertise / negotiate colorimetry correctly, leading + // to incorrect color display in Chrome (but interestingly not in + // Firefox). In any case, restrict to exclude RGB formats altogether, + // and let videoconvert do the conversion properly if needed. + structure_builder = + structure_builder.field("format", gst::List::new(["NV12", "YV12", "I420"])); + } + + gst::Caps::builder_full_with_any_features() + .structure(structure_builder.build()) + .build() + } else { + gst::Caps::builder("audio/x-raw").build() + }; + + gst::ElementFactory::make("capsfilter") + .property("caps", &caps) + .build() + .with_context(|| "Creating capsfilter caps") + } + + pub fn encoder_factory(&self) -> Option { + self.encoding_info.as_ref().map(|info| info.encoder.clone()) + } + pub fn encoder_name(&self) -> Option { self.encoding_info .as_ref() @@ -341,16 +378,49 @@ impl Codec { } pub fn set_output_filter(&mut self, caps: gst::Caps) { - self.encoding_info - .as_mut() - .map(|info| info.output_filter = Some(caps)); + if let Some(info) = self.encoding_info.as_mut() { + info.output_filter = Some(caps); + } } pub fn output_filter(&self) -> Option { self.encoding_info .as_ref() - .map(|info| info.output_filter.clone()) - .flatten() + .and_then(|info| info.output_filter.clone()) + } + + pub fn build_parser(&self) -> Result, Error> { + match self.name.as_str() { + "H264" => make_element("h264parse", None), + "H265" => make_element("h265parse", None), + _ => return Ok(None), + } + .map(Some) + } + + pub fn parser_caps(&self, force_profile: bool) -> gst::Caps { + let codec_caps_name = self.caps.structure(0).unwrap().name(); + match self.name.as_str() { + "H264" => { + if force_profile { + gst::debug!( + CAT, + "No H264 profile requested, selecting constrained-baseline" + ); + + gst::Caps::builder(codec_caps_name) + .field("stream-format", "avc") + .field("profile", "constrained-baseline") + .build() + } else { + gst::Caps::builder(codec_caps_name) + .field("stream-format", "avc") + .build() + } + } + "H265" => gst::Caps::new_empty_simple("video/x-h265"), + _ => gst::Caps::new_any(), + } } } @@ -382,12 +452,22 @@ impl Deref for Codecs { } impl Codecs { - pub fn to_map(self) -> BTreeMap { + pub fn to_map(&self) -> BTreeMap { self.0 - .into_iter() - .map(|codec| (codec.payload().unwrap(), codec)) + .iter() + .map(|codec| (codec.payload().unwrap(), codec.clone())) .collect() } + + pub fn from_map(codecs: &BTreeMap) -> Self { + Self(codecs.values().cloned().collect()) + } + + pub fn find_for_encoded_caps(&self, caps: &gst::Caps) -> Option { + self.iter() + .find(|codec| codec.caps.can_intersect(caps) && codec.encoding_info.is_some()) + .cloned() + } } static CODECS: Lazy = Lazy::new(|| { @@ -455,14 +535,14 @@ impl Codecs { CODECS .iter() .find(|codec| codec.name == encoding_name) - .map(|codec| codec.clone()) + .cloned() } pub fn video_codecs() -> Vec { CODECS .iter() .filter(|codec| codec.stream_type == gst::StreamType::VIDEO) - .map(|codec| codec.clone()) + .cloned() .collect() } @@ -470,7 +550,7 @@ impl Codecs { CODECS .iter() .filter(|codec| codec.stream_type == gst::StreamType::AUDIO) - .map(|codec| codec.clone()) + .cloned() .collect() } @@ -530,3 +610,24 @@ impl Codecs { ) } } + +pub fn is_raw_caps(caps: &gst::Caps) -> bool { + assert!(caps.is_fixed()); + ["video/x-raw", "audio/x-raw"].contains(&caps.structure(0).unwrap().name().as_str()) +} + +pub fn cleanup_codec_caps(mut caps: gst::Caps) -> gst::Caps { + assert!(caps.is_fixed()); + + if let Some(s) = caps.make_mut().structure_mut(0) { + if ["video/x-h264", "video/x-h265"].contains(&s.name().as_str()) { + s.remove_fields(["codec_data"]); + } else if ["video/x-vp8", "video/x-vp9"].contains(&s.name().as_str()) { + s.remove_fields(["profile"]); + } else if s.name() == "audio/x-opus" { + s.remove_fields(["streamheader"]); + } + } + + caps +} diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index c2a6ef07..c260993a 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MPL-2.0 -use crate::utils::*; +use crate::utils::{cleanup_codec_caps, is_raw_caps, make_element, Codec, Codecs}; use anyhow::Context; use gst::glib; use gst::prelude::*; @@ -15,6 +15,7 @@ use futures::prelude::*; use anyhow::{anyhow, Error}; use once_cell::sync::Lazy; use std::collections::HashMap; + use std::ops::Mul; use std::sync::{Arc, Condvar, Mutex}; @@ -82,6 +83,50 @@ struct Settings { signaller: Signallable, } +/// Type of discovery, used to differentiate between initial discovery +/// and discovery initiated by client offer +#[derive(Debug, Clone, PartialEq, Eq)] +enum DiscoveryType { + /// Initial discovery of our input streams + Initial, + /// Discovery to select a specific codec as requested by the remote peer + CodecSelection, +} + +#[derive(Debug, Clone)] +struct DiscoveryInfo { + id: String, + type_: DiscoveryType, + caps: gst::Caps, + srcs: Arc>>, +} + +impl DiscoveryInfo { + fn new(type_: DiscoveryType, caps: gst::Caps) -> Self { + Self { + id: uuid::Uuid::new_v4().to_string(), + type_, + caps, + srcs: Default::default(), + } + } + + fn srcs(&self) -> Vec { + self.srcs.lock().unwrap().clone() + } + + fn create_src(&self) -> gst_app::AppSrc { + let src = gst_app::AppSrc::builder() + .caps(&self.caps) + .format(gst::Format::Time) + .build(); + + self.srcs.lock().unwrap().push(src.clone()); + + src + } +} + /// Wrapper around our sink pads #[derive(Debug, Clone)] struct InputStream { @@ -97,6 +142,8 @@ struct InputStream { serial: u32, /// Whether the input stream is video or not is_video: bool, + /// Information about currently running codec discoveries + discoveries: Vec, } /// Wrapper around webrtcbin pads @@ -192,9 +239,9 @@ struct State { sessions: HashMap, codecs: BTreeMap, /// Used to abort codec discovery - codecs_abort_handle: Option, + codecs_abort_handles: Vec, /// Used to wait for the discovery task to fully stop - codecs_done_receiver: Option>, + codecs_done_receivers: Vec>, /// Used to determine whether we can start the signaller when going to Playing, /// or whether we should wait codec_discovery_done: bool, @@ -267,15 +314,11 @@ impl Default for Settings { Self { video_caps: Codecs::video_codecs() .into_iter() - .map(|codec| codec.caps.iter().map(|s| s.to_owned()).collect::>()) - .flatten() - .into_iter() + .flat_map(|codec| codec.caps.iter().map(|s| s.to_owned()).collect::>()) .collect::(), audio_caps: Codecs::audio_codecs() .into_iter() - .map(|codec| codec.caps.iter().map(|s| s.to_owned()).collect::>()) - .flatten() - .into_iter() + .flat_map(|codec| codec.caps.iter().map(|s| s.to_owned()).collect::>()) .collect::(), stun_server: DEFAULT_STUN_SERVER.map(String::from), turn_servers: gst::Array::new(Vec::new() as Vec), @@ -301,8 +344,8 @@ impl Default for State { signaller_state: SignallerState::Stopped, sessions: HashMap::new(), codecs: BTreeMap::new(), - codecs_abort_handle: None, - codecs_done_receiver: None, + codecs_abort_handles: Vec::new(), + codecs_done_receivers: Vec::new(), codec_discovery_done: false, audio_serial: 0, video_serial: 0, @@ -456,184 +499,186 @@ fn configure_encoder(enc: &gst::Element, start_bitrate: u32) { } } -/// Bit of an awkward function, but the goal here is to keep -/// most of the encoding code for consumers in line with -/// the codec discovery code, and this gets the job done. -#[allow(clippy::too_many_arguments)] // This needs some more refactoring and it will happen soon -fn setup_encoding( - element: &super::BaseWebRTCSink, - pipeline: &gst::Pipeline, - src: &gst::Element, - input_caps: &gst::Caps, - output_caps: &gst::Caps, - codec: &Codec, - mut encoded_filter: Option, +/// Set of elements used in an EncodingChain +struct EncodingChain { + raw_filter: Option, + encoder: Option, + pay_filter: gst::Element, +} + +struct EncodingChainBuilder { + /// Caps of the input chain + input_caps: gst::Caps, + //// Caps expected after the payloader + output_caps: gst::Caps, + /// The Codec representing wanted encoding + codec: Codec, + /// The SSRC to use for the RTP stream if any + /// Filter element between the encoder and the payloader. + encoded_filter: Option, ssrc: Option, + /// The TWCC ID to use for payloaded stream twcc: Option, -) -> Result<(gst::Element, gst::Element, gst::Element), Error> { - gst::trace!( - CAT, - "Setting up encoding, input caps: {input_caps}, \ - output caps: {output_caps}, codec: {codec:?}, twcc: {twcc:?}" - ); +} - let conv = match codec.is_video() { - true => make_converter_for_video_caps(input_caps)?.upcast(), - false => gst::parse_bin_from_description("audioresample ! audioconvert", true)?.upcast(), - }; - - let conv_filter = make_element("capsfilter", None)?; - - let enc = codec - .build_encoder() - .expect("Encoders should always have been set in the CodecInfo we handle")?; - let parse_filter = make_element("capsfilter", None)?; - let pay = codec - .build_payloader() - .expect("Payloaders should always have been set in the CodecInfo we handle")?; - let pay_filter = make_element("capsfilter", None)?; - - pay.set_property("mtu", 1200_u32); - pay.set_property("pt", codec.payload().unwrap() as u32); - - if let Some(ssrc) = ssrc { - pay.set_property("ssrc", ssrc); - } - - pay_filter.set_property("caps", output_caps); - - pipeline - .add_many([&conv, &conv_filter, &enc, &parse_filter, &pay, &pay_filter]) - .unwrap(); - gst::Element::link_many([src, &conv, &conv_filter, &enc]) - .with_context(|| "Linking encoding elements")?; - - let codec_name = codec.caps.structure(0).unwrap().name(); - - let enc_last = if let Some(encoded_filter) = encoded_filter.take() { - pipeline.add(&encoded_filter).unwrap(); - enc.link(&encoded_filter) - .with_context(|| "Linking encoded filter")?; - - encoded_filter - } else { - enc.clone() - }; - - if let Some(parser) = if codec_name == "video/x-h264" { - Some(make_element("h264parse", None)?) - } else if codec_name == "video/x-h265" { - Some(make_element("h265parse", None)?) - } else { - None - } { - pipeline.add(&parser).unwrap(); - gst::Element::link_many([&enc_last, &parser, &parse_filter]) - .with_context(|| "Linking encoding elements")?; - } else { - gst::Element::link_many([&enc_last, &parse_filter]) - .with_context(|| "Linking encoding elements")?; - } - - let conv_caps = if codec.is_video() { - let mut structure_builder = gst::Structure::builder("video/x-raw") - .field("pixel-aspect-ratio", gst::Fraction::new(1, 1)); - - if codec.encoder_name().unwrap().as_str() == "nvh264enc" { - // Quirk: nvh264enc can perform conversion from RGB formats, but - // doesn't advertise / negotiate colorimetry correctly, leading - // to incorrect color display in Chrome (but interestingly not in - // Firefox). In any case, restrict to exclude RGB formats altogether, - // and let videoconvert do the conversion properly if needed. - structure_builder = - structure_builder.field("format", gst::List::new(["NV12", "YV12", "I420"])); +impl EncodingChainBuilder { + fn new( + input_caps: &gst::Caps, + output_caps: &gst::Caps, + codec: &Codec, + encoded_filter: Option, + ) -> Self { + Self { + input_caps: input_caps.clone(), + output_caps: output_caps.clone(), + codec: codec.clone(), + encoded_filter, + ssrc: None, + twcc: None, } - - gst::Caps::builder_full_with_any_features() - .structure(structure_builder.build()) - .build() - } else { - gst::Caps::builder("audio/x-raw").build() - }; - - match codec.encoder_name().unwrap().as_str() { - "vp8enc" | "vp9enc" => { - pay.set_property_from_str("picture-id-mode", "15-bit"); - } - _ => (), } - /* We only enforce TWCC in the offer caps, once a remote description - * has been set it will get automatically negotiated. This is necessary - * because the implementor in Firefox had apparently not understood the - * concept of *transport-wide* congestion control, and firefox doesn't - * provide feedback for audio packets. - */ - if let Some(idx) = twcc { - let twcc_extension = gst_rtp::RTPHeaderExtension::create_from_uri(RTP_TWCC_URI).unwrap(); - twcc_extension.set_id(idx); - pay.emit_by_name::<()>("add-extension", &[&twcc_extension]); + fn ssrc(mut self, ssrc: u32) -> Self { + self.ssrc = Some(ssrc); + self } - conv_filter.set_property("caps", conv_caps); + fn twcc(mut self, twcc: u32) -> Self { + self.twcc = Some(twcc); + self + } - let parse_caps = if codec_name == "video/x-h264" { - if output_caps.is_any() { - gst::debug!( - CAT, - obj: element, - "No H264 profile requested, selecting constrained-baseline" - ); + fn build(self, pipeline: &gst::Pipeline, src: &gst::Element) -> Result { + gst::trace!( + CAT, + obj: pipeline, + "Setting up encoding, input caps: {input_caps}, \ + output caps: {output_caps}, codec: {codec:?}, twcc: {twcc:?}", + input_caps = self.input_caps, + output_caps = self.output_caps, + codec = self.codec, + twcc = self.twcc, + ); - gst::Caps::builder(codec_name) - .field("stream-format", "avc") - .field("profile", "constrained-baseline") - .build() + let needs_encoding = is_raw_caps(&self.input_caps); + let mut elements: Vec = Vec::new(); + + let (raw_filter, encoder) = if needs_encoding { + elements.push(match self.codec.is_video() { + true => make_converter_for_video_caps(&self.input_caps)?.upcast(), + false => { + gst::parse_bin_from_description("audioresample ! audioconvert", true)?.upcast() + } + }); + + let raw_filter = self.codec.raw_converter_filter()?; + elements.push(raw_filter.clone()); + + let encoder = self + .codec + .build_encoder() + .expect("We should always have an encoder for negotiated codecs")?; + elements.push(encoder.clone()); + elements.push(make_element("capsfilter", None)?); + + (Some(raw_filter), Some(encoder)) } else { - /* When output caps were specified, we are answering an offer - * and should not force a profile */ - gst::Caps::builder(codec_name) - .field("stream-format", "avc") - .build() + (None, None) + }; + + if let Some(parser) = self.codec.build_parser()? { + elements.push(parser); } - } else if codec_name == "video/x-h265" { - gst::Caps::builder(codec_name) - .field("stream-format", "hvc1") + + // Only force the profile when output caps were not specified, either + // through input caps or because we are answering an offer + let force_profile = self.output_caps.is_any() && needs_encoding; + elements.push( + gst::ElementFactory::make("capsfilter") + .property("caps", self.codec.parser_caps(force_profile)) + .build() + .with_context(|| "Failed to make element capsfilter")?, + ); + + if let Some(ref encoded_filter) = self.encoded_filter { + elements.push(encoded_filter.clone()); + } + + let pay = self + .codec + .build_payloader( + self.codec + .payload() + .expect("Negotiated codec should always have pt set") as u32, + ) + .expect("Payloaders should always have been set in the CodecInfo we handle"); + + if let Some(ssrc) = self.ssrc { + pay.set_property("ssrc", ssrc); + } + + /* We only enforce TWCC in the offer caps, once a remote description + * has been set it will get automatically negotiated. This is necessary + * because the implementor in Firefox had apparently not understood the + * concept of *transport-wide* congestion control, and firefox doesn't + * provide feedback for audio packets. + */ + if let Some(idx) = self.twcc { + let twcc_extension = + gst_rtp::RTPHeaderExtension::create_from_uri(RTP_TWCC_URI).unwrap(); + twcc_extension.set_id(idx); + pay.emit_by_name::<()>("add-extension", &[&twcc_extension]); + } + elements.push(pay); + + let pay_filter = gst::ElementFactory::make("capsfilter") + .property("caps", self.output_caps) .build() - } else { - gst::Caps::new_any() - }; + .with_context(|| "Failed to make payloader")?; + elements.push(pay_filter.clone()); - parse_filter.set_property("caps", parse_caps); + for element in &elements { + pipeline.add(element).unwrap(); + } - gst::Element::link_many([&parse_filter, &pay, &pay_filter]) - .with_context(|| "Linking encoding elements")?; + elements.insert(0, src.clone()); + gst::Element::link_many(elements.iter().collect::>().as_slice()) + .with_context(|| "Linking encoding elements")?; - Ok((enc, conv_filter, pay_filter)) + Ok(EncodingChain { + raw_filter, + encoder, + pay_filter, + }) + } } impl VideoEncoder { fn new( - element: gst::Element, - filter: gst::Element, + encoding_elements: &EncodingChain, video_info: gst_video::VideoInfo, peer_id: &str, codec_name: &str, transceiver: gst_webrtc::WebRTCRTPTransceiver, - ) -> Self { + ) -> Option { let halved_framerate = video_info.fps().mul(gst::Fraction::new(1, 2)); - - Self { - factory_name: element.factory().unwrap().name().into(), + Some(Self { + factory_name: encoding_elements + .encoder + .as_ref()? + .factory() + .unwrap() + .name() + .into(), codec_name: codec_name.to_string(), - element, - filter, + element: encoding_elements.encoder.as_ref()?.clone(), + filter: encoding_elements.raw_filter.as_ref()?.clone(), halved_framerate, video_info, session_id: peer_id.to_string(), mitigation_mode: WebRTCSinkMitigationMode::NONE, transceiver, - } + }) } fn bitrate(&self) -> i32 { @@ -756,7 +801,7 @@ impl State { gst::info!(CAT, "Ending session {}", session.id); session.pipeline.debug_to_dot_file_with_ts( gst::DebugGraphDetails::all(), - format!("removing-peer-{}-", session.peer_id,), + format!("removing-session-{}-", session.peer_id,), ); for ssrc in session.webrtc_pads.keys() { @@ -914,29 +959,23 @@ impl Session { let pay_filter = make_element("capsfilter", None)?; self.pipeline.add(&pay_filter).unwrap(); - let output_caps = codec - .output_filter() - .clone() - .unwrap_or_else(gst::Caps::new_any); + let output_caps = codec.output_filter().unwrap_or_else(gst::Caps::new_any); - let encoded_filter = element.emit_by_name::>( - "request-encoded-filter", - &[&Some(&self.peer_id), &stream_name, &codec.caps], - ); - - let (enc, raw_filter, encoding_sink) = setup_encoding( - element, - &self.pipeline, - &appsrc, + let encoding_chain = EncodingChainBuilder::new( &webrtc_pad.in_caps, &output_caps, &codec, - encoded_filter, - Some(webrtc_pad.ssrc), - None, - )?; + element.emit_by_name::>( + "request-encoded-filter", + &[&Some(&self.peer_id), &stream_name, &codec.caps], + ), + ) + .ssrc(webrtc_pad.ssrc) + .build(&self.pipeline, &appsrc)?; - element.emit_by_name::("encoder-setup", &[&self.peer_id, &stream_name, &enc]); + if let Some(ref enc) = encoding_chain.encoder { + element.emit_by_name::("encoder-setup", &[&self.peer_id, &stream_name, &enc]); + } // At this point, the peer has provided its answer, and we want to // let the payloader / encoder perform negotiation according to that. @@ -983,43 +1022,42 @@ impl Session { if codec.is_video() { let video_info = gst_video::VideoInfo::from_caps(&webrtc_pad.in_caps)?; - let mut enc = VideoEncoder::new( - enc, - raw_filter, + if let Some(mut enc) = VideoEncoder::new( + &encoding_chain, video_info, &self.peer_id, codec.caps.structure(0).unwrap().name(), transceiver, - ); - - match self.cc_info.heuristic { - WebRTCSinkCongestionControl::Disabled => { - // If congestion control is disabled, we simply use the highest - // known "safe" value for the bitrate. - enc.set_bitrate(element, self.cc_info.max_bitrate as i32); - enc.transceiver.set_property("fec-percentage", 50u32); - } - WebRTCSinkCongestionControl::Homegrown => { - if let Some(congestion_controller) = self.congestion_controller.as_mut() { - congestion_controller.target_bitrate_on_delay += enc.bitrate(); - congestion_controller.target_bitrate_on_loss = - congestion_controller.target_bitrate_on_delay; - enc.transceiver.set_property("fec-percentage", 0u32); - } else { - /* If congestion control is disabled, we simply use the highest - * known "safe" value for the bitrate. */ + ) { + match self.cc_info.heuristic { + WebRTCSinkCongestionControl::Disabled => { + // If congestion control is disabled, we simply use the highest + // known "safe" value for the bitrate. enc.set_bitrate(element, self.cc_info.max_bitrate as i32); enc.transceiver.set_property("fec-percentage", 50u32); } + WebRTCSinkCongestionControl::Homegrown => { + if let Some(congestion_controller) = self.congestion_controller.as_mut() { + congestion_controller.target_bitrate_on_delay += enc.bitrate(); + congestion_controller.target_bitrate_on_loss = + congestion_controller.target_bitrate_on_delay; + enc.transceiver.set_property("fec-percentage", 0u32); + } else { + /* If congestion control is disabled, we simply use the highest + * known "safe" value for the bitrate. */ + enc.set_bitrate(element, self.cc_info.max_bitrate as i32); + enc.transceiver.set_property("fec-percentage", 50u32); + } + } + _ => enc.transceiver.set_property("fec-percentage", 0u32), } - _ => enc.transceiver.set_property("fec-percentage", 0u32), - } - self.encoders.push(enc); + self.encoders.push(enc); - if let Some(rtpgccbwe) = self.rtpgccbwe.as_ref() { - let max_bitrate = self.cc_info.max_bitrate * (self.encoders.len() as u32); - rtpgccbwe.set_property("max-bitrate", max_bitrate); + if let Some(rtpgccbwe) = self.rtpgccbwe.as_ref() { + let max_bitrate = self.cc_info.max_bitrate * (self.encoders.len() as u32); + rtpgccbwe.set_property("max-bitrate", max_bitrate); + } } } @@ -1029,7 +1067,7 @@ impl Session { .sync_children_states() .with_context(|| format!("Connecting input stream for {}", self.peer_id))?; - encoding_sink.link(&pay_filter)?; + encoding_chain.pay_filter.link(&pay_filter)?; let srcpad = pay_filter.static_pad("src").unwrap(); @@ -1096,6 +1134,28 @@ impl InputStream { appsink.set_state(gst::State::Null).unwrap(); } } + + fn create_discovery(&mut self, type_: DiscoveryType) -> DiscoveryInfo { + let discovery_info = DiscoveryInfo::new( + type_, + self.in_caps.clone().expect( + "We should never create a discovery for a stream that doesn't have caps set", + ), + ); + + self.discoveries.push(discovery_info.clone()); + + discovery_info + } + + fn remove_discovery(&mut self, discovery: &DiscoveryInfo) { + let id = self + .discoveries + .iter() + .position(|d| d.id == discovery.id) + .expect("We expect discovery to always be in the list of discoverers when removing"); + self.discoveries.remove(id); + } } impl NavigationEventHandler { @@ -1184,7 +1244,7 @@ impl BaseWebRTCSink { async fn request_webrtcbin_pad( element: &super::BaseWebRTCSink, webrtcbin: &gst::Element, - stream: &InputStream, + stream: &mut InputStream, media: Option<&gst_sdp::SDPMediaRef>, settings: &Settings, webrtc_pads: &mut HashMap, @@ -1195,8 +1255,11 @@ impl BaseWebRTCSink { let mut payloader_caps = match media { Some(media) => { + let discovery_info = stream.create_discovery(DiscoveryType::CodecSelection); + let codec = BaseWebRTCSink::select_codec( element, + &discovery_info, media, &stream.in_caps.as_ref().unwrap().clone(), &stream.sink_pad.name(), @@ -1204,6 +1267,8 @@ impl BaseWebRTCSink { ) .await; + stream.remove_discovery(&discovery_info); + match codec { Some(codec) => { gst::debug!( @@ -1315,15 +1380,17 @@ impl BaseWebRTCSink { .iter_mut() .for_each(|(_, stream)| stream.unprepare(element)); - if let Some(handle) = state.codecs_abort_handle.take() { + let codecs_abort_handle = std::mem::take(&mut state.codecs_abort_handles); + codecs_abort_handle.into_iter().for_each(|handle| { handle.abort(); - } + }); - if let Some(receiver) = state.codecs_done_receiver.take() { + let codecs_done_receiver = std::mem::take(&mut state.codecs_done_receivers); + codecs_done_receiver.into_iter().for_each(|receiver| { RUNTIME.spawn(async { let _ = receiver.await; }); - } + }); state.codec_discovery_done = false; state.codecs = BTreeMap::new(); @@ -1582,6 +1649,7 @@ impl BaseWebRTCSink { async fn select_codec( element: &super::BaseWebRTCSink, + discovery_info: &DiscoveryInfo, media: &gst_sdp::SDPMediaRef, in_caps: &gst::Caps, stream_name: &str, @@ -1630,6 +1698,10 @@ impl BaseWebRTCSink { let encoding_name = s.get::("encoding-name").unwrap(); if let Some(mut codec) = Codecs::find(&encoding_name) { + if !codec.can_encode() { + continue; + } + codec.set_pt(payload); for (user_caps, codecs_and_caps) in ordered_codecs_and_caps.iter_mut() { if codec.caps.is_subset(user_caps) { @@ -1662,7 +1734,6 @@ impl BaseWebRTCSink { } } - let in_caps = &in_caps; let futs = ordered_codecs_and_caps .iter() .flat_map(|(_, codecs_and_caps)| codecs_and_caps) @@ -1670,17 +1741,18 @@ impl BaseWebRTCSink { BaseWebRTCSink::run_discovery_pipeline( element, stream_name, - codec, - in_caps, + discovery_info, + codec.clone(), + in_caps.clone(), caps, twcc_idx, ) - .await - .map(|s| { - let mut codec = codec.clone(); - codec.set_output_filter([s].into_iter().collect()); - codec - }) + .await + .map(|s| { + let mut codec = codec.clone(); + codec.set_output_filter([s].into_iter().collect()); + codec + }) }); /* Run sequentially to avoid NVENC collisions */ @@ -2175,27 +2247,21 @@ impl BaseWebRTCSink { }; if let Some(idx) = streams.iter().position(|s| { - let stream_is_video = match s - .in_caps - .as_ref() - .unwrap() - .structure(0) - .unwrap() - .name() - .as_str() - { - "video/x-raw" => true, - "audio/x-raw" => false, - _ => unreachable!(), - }; + let structname = + s.in_caps.as_ref().unwrap().structure(0).unwrap().name(); + let stream_is_video = structname.starts_with("video/"); + + if !stream_is_video { + assert!(structname.starts_with("audio/")); + } media_is_video == stream_is_video }) { - let stream = streams.remove(idx); + let mut stream = streams.remove(idx); BaseWebRTCSink::request_webrtcbin_pad( &element, &webrtcbin, - &stream, + &mut stream, Some(media), &settings_clone, &mut webrtc_pads, @@ -2212,11 +2278,11 @@ impl BaseWebRTCSink { } } } else { - for stream in streams { + for mut stream in streams { BaseWebRTCSink::request_webrtcbin_pad( &element, &webrtcbin, - &stream, + &mut stream, None, &settings_clone, &mut webrtc_pads, @@ -2622,97 +2688,111 @@ impl BaseWebRTCSink { async fn run_discovery_pipeline( element: &super::BaseWebRTCSink, stream_name: &str, - codec: &Codec, - caps: &gst::Caps, + discovery_info: &DiscoveryInfo, + codec: Codec, + input_caps: gst::Caps, output_caps: &gst::Caps, twcc: Option, ) -> Result { let pipe = PipelineWrapper(gst::Pipeline::default()); - let src = if codec.is_video() { - make_element("videotestsrc", None)? - } else { - make_element("audiotestsrc", None)? - }; - let mut elements = vec![src.clone()]; + let has_raw_input = is_raw_caps(&input_caps); + let src = discovery_info.create_src(); + let mut elements = vec![src.clone().upcast::()]; + let encoding_chain_src = if codec.is_video() && has_raw_input { + elements.push(make_converter_for_video_caps(&input_caps)?); - if codec.is_video() { - elements.push(make_converter_for_video_caps(caps)?); - } + let capsfilter = make_element("capsfilter", Some("raw_capsfilter"))?; + elements.push(capsfilter.clone()); + + capsfilter + } else { + src.clone().upcast::() + }; gst::debug!( CAT, obj: element, - "Running discovery pipeline for input caps {caps} and output caps {output_caps} with codec {codec:?}" + "Running discovery pipeline for input caps {input_caps} and output caps {output_caps} with codec {codec:?}" ); - let capsfilter = make_element("capsfilter", None)?; - elements.push(capsfilter.clone()); + gst::debug!(CAT, obj: element, "Running discovery pipeline"); let elements_slice = &elements.iter().collect::>(); pipe.0.add_many(elements_slice).unwrap(); gst::Element::link_many(elements_slice) - .with_context(|| format!("Running discovery pipeline for caps {caps}"))?; + .with_context(|| format!("Running discovery pipeline for caps {input_caps}"))?; - let encoded_filter = element.emit_by_name::>( - "request-encoded-filter", - &[&Option::::None, &stream_name, &codec.caps], - ); - - let (_, _, encoding_sink) = setup_encoding( - element, - &pipe.0, - &capsfilter, - caps, + let mut encoding_chain_builder = EncodingChainBuilder::new( + &src.caps() + .expect("Caps should always be set when starting discovery"), output_caps, - codec, - encoded_filter, - None, - twcc, - )?; + &codec, + element.emit_by_name::>( + "request-encoded-filter", + &[&Option::::None, &stream_name, &codec.caps], + ), + ); + if let Some(twcc) = twcc { + encoding_chain_builder = encoding_chain_builder.twcc(twcc) + } + let encoding_chain = encoding_chain_builder.build(&pipe.0, &encoding_chain_src)?; - let sink = make_element("fakesink", None)?; + let sink = gst_app::AppSink::builder() + .callbacks( + gst_app::AppSinkCallbacks::builder() + .new_event(|sink| { + let obj = sink.pull_object().ok(); + if let Some(event) = obj.and_then(|o| o.downcast::().ok()) { + if let gst::EventView::Caps(caps) = event.view() { + sink.post_message(gst::message::Application::new( + gst::Structure::builder("payloaded_caps") + .field("caps", &caps.caps().to_owned()) + .build(), + )) + .expect("Could not send message"); + } + } - pipe.0.add(&sink).unwrap(); - - encoding_sink + true + }) + .build(), + ) + .build(); + pipe.0.add(sink.upcast_ref::()).unwrap(); + encoding_chain + .pay_filter .link(&sink) - .with_context(|| format!("Running discovery pipeline for caps {caps}"))?; - - capsfilter.set_property("caps", caps); - - src.set_property("num-buffers", 1); + .with_context(|| format!("Running discovery pipeline for caps {input_caps}"))?; let mut stream = pipe.0.bus().unwrap().stream(); pipe.0 .set_state(gst::State::Playing) - .with_context(|| format!("Running discovery pipeline for caps {caps}"))?; - - let in_caps = caps; + .with_context(|| format!("Running discovery pipeline for caps {input_caps}"))?; while let Some(msg) = stream.next().await { match msg.view() { gst::MessageView::Error(err) => { + gst::error!(CAT, "Error in discovery pipeline: {err:#?}"); pipe.0.debug_to_dot_file_with_ts( gst::DebugGraphDetails::all(), "webrtcsink-discovery-error", ); return Err(err.error().into()); } - gst::MessageView::Eos(_) => { - pipe.0.debug_to_dot_file_with_ts( - gst::DebugGraphDetails::all(), - "webrtcsink-discovery-done", - ); + gst::MessageView::Application(appmsg) => { + let caps = match appmsg.structure() { + Some(s) => { + if s.name().as_str() != "payloaded_caps" { + continue; + } - let caps = match encoding_sink.static_pad("src").unwrap().current_caps() { - Some(caps) => caps, - None => { - // https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/1713 - return Err(anyhow!("EOS but no caps were pushed")); + s.get::("caps").unwrap() } + _ => continue, }; + gst::info!(CAT, "Discovery pipeline got caps {caps:?}"); pipe.0.debug_to_dot_file_with_ts( gst::DebugGraphDetails::all(), "webrtcsink-discovery-done", @@ -2731,7 +2811,7 @@ impl BaseWebRTCSink { gst::debug!( CAT, obj: element, - "Codec discovery pipeline for caps {in_caps} with codec {codec:?} succeeded: {s}" + "Codec discovery pipeline for caps {input_caps} with codec {codec:?} succeeded: {s}" ); return Ok(s); } else { @@ -2749,36 +2829,61 @@ impl BaseWebRTCSink { async fn lookup_caps( element: &super::BaseWebRTCSink, + discovery_info: DiscoveryInfo, name: String, - in_caps: gst::Caps, output_caps: gst::Caps, codecs: &Codecs, - ) -> (String, gst::Caps) { - let sink_caps = in_caps.as_ref().to_owned(); + ) -> Result<(), Error> { + let futs = if let Some(codec) = codecs.find_for_encoded_caps(&discovery_info.caps) { + let mut caps = discovery_info.caps.clone(); - let is_video = match sink_caps.structure(0).unwrap().name().as_str() { - "video/x-raw" => true, - "audio/x-raw" => false, - _ => unreachable!(), + gst::info!( + CAT, + obj: element, + "Stream is already encoded with codec {}, still need to payload it", + codec.name + ); + + caps = cleanup_codec_caps(caps); + + vec![BaseWebRTCSink::run_discovery_pipeline( + element, + &name, + &discovery_info, + codec, + caps, + &output_caps, + Some(1), + )] + } else { + let sink_caps = discovery_info.caps.clone(); + + let is_video = match sink_caps.structure(0).unwrap().name().as_str() { + "video/x-raw" => true, + "audio/x-raw" => false, + _ => unreachable!(), + }; + + codecs + .iter() + .filter(|codec| codec.is_video() == is_video) + .map(|codec| { + BaseWebRTCSink::run_discovery_pipeline( + element, + &name, + &discovery_info, + codec.clone(), + sink_caps.clone(), + &output_caps, + Some(1), + ) + }) + .collect() }; let mut payloader_caps = gst::Caps::new_empty(); let payloader_caps_mut = payloader_caps.make_mut(); - let futs = codecs - .iter() - .filter(|codec| codec.is_video() == is_video) - .map(|codec| { - BaseWebRTCSink::run_discovery_pipeline( - element, - &name, - codec, - &sink_caps, - &output_caps, - Some(1), - ) - }); - for ret in futures::future::join_all(futs).await { match ret { Ok(s) => { @@ -2798,50 +2903,14 @@ impl BaseWebRTCSink { } } - (name, payloader_caps) - } - - async fn lookup_streams_caps(&self, element: &super::BaseWebRTCSink) -> Result<(), Error> { - let codecs = { - let settings = self.settings.lock().unwrap(); - - Codecs::list_encoders(settings.video_caps.iter().chain(settings.audio_caps.iter())) - }; - - gst::debug!(CAT, obj: element, "Looked up codecs {codecs:?}"); - - let futs: Vec<_> = self - .state - .lock() - .unwrap() - .streams - .iter() - .map(|(name, stream)| { - BaseWebRTCSink::lookup_caps( - element, - name.to_owned(), - stream.in_caps.as_ref().unwrap().to_owned(), - gst::Caps::new_any(), - &codecs, - ) - }) - .collect(); - - let caps: Vec<(String, gst::Caps)> = futures::future::join_all(futs).await; - - let mut state = self.state.lock().unwrap(); - - for (name, caps) in caps { - if caps.is_empty() { - return Err(anyhow!("No caps found for stream {}", name)); - } - - if let Some(mut stream) = state.streams.get_mut(&name) { - stream.out_caps = Some(caps); - } + let mut state = element.imp().state.lock().unwrap(); + if let Some(mut stream) = state.streams.get_mut(&name) { + stream.out_caps = Some(payloader_caps.clone()); } - state.codecs = codecs.to_map(); + if payloader_caps.is_empty() { + anyhow::bail!("No caps found for stream {name}"); + } Ok(()) } @@ -2866,87 +2935,144 @@ impl BaseWebRTCSink { ) -> bool { use gst::EventView; - match event.view() { - EventView::Caps(e) => { - if let Some(caps) = pad.current_caps() { - if caps.is_strictly_equal(e.caps()) { - // Nothing changed - true - } else { - gst::error!(CAT, obj: pad, "Renegotiation is not supported"); - false - } + if let EventView::Caps(e) = event.view() { + if let Some(caps) = pad.current_caps() { + if caps.is_strictly_equal(e.caps()) { + // Nothing changed + return true; } else { - gst::info!(CAT, obj: pad, "Received caps event {:?}", e); + gst::error!(CAT, obj: pad, "Renegotiation is not supported"); + return false; + } + } else { + gst::info!(CAT, obj: pad, "Received caps event {:?}", e); - let mut all_pads_have_caps = true; + self.state + .lock() + .unwrap() + .streams + .iter_mut() + .for_each(|(_, mut stream)| { + if stream.sink_pad.upcast_ref::() == pad { + stream.in_caps = Some(e.caps().to_owned()); + } + }); + } + } - self.state - .lock() - .unwrap() - .streams - .iter_mut() - .for_each(|(_, mut stream)| { - if stream.sink_pad.upcast_ref::() == pad { - stream.in_caps = Some(e.caps().to_owned()); - } else if stream.in_caps.is_none() { - all_pads_have_caps = false; - } - }); + gst::Pad::event_default(pad, Some(element), event) + } - if all_pads_have_caps { - let element_clone = element.downgrade(); - RUNTIME.spawn(async move { - if let Some(element) = element_clone.upgrade() { - let this = element.imp(); - let (fut, handle) = - futures::future::abortable(this.lookup_streams_caps(&element)); + fn start_stream_discovery_if_needed(&self, stream_name: &str, buffer: &gst::Buffer) { + let (codecs, discovery_info) = { + let mut state = self.state.lock().unwrap(); + let stream = state.streams.get_mut(stream_name).unwrap(); - let (codecs_done_sender, codecs_done_receiver) = - futures::channel::oneshot::channel(); + // Discovery already happened... nothing to do here. + if stream.out_caps.is_some() { + return; + } - // Compiler isn't budged by dropping state before await, - // so let's make a new scope instead. - { - let mut state = this.state.lock().unwrap(); - state.codecs_abort_handle = Some(handle); - state.codecs_done_receiver = Some(codecs_done_receiver); - } - - match fut.await { - Ok(Err(err)) => { - gst::error!(CAT, obj: element, "error: {}", err); - gst::element_error!( - element, - gst::StreamError::CodecNotFound, - ["Failed to look up output caps: {}", err] - ); - } - Ok(Ok(_)) => { - let settings = this.settings.lock().unwrap(); - let mut state = this.state.lock().unwrap(); - state.codec_discovery_done = true; - let signaller = settings.signaller.clone(); - drop(settings); - if state.should_start_signaller(&element) { - state.signaller_state = SignallerState::Started; - drop(state); - signaller.start(); - } - } - _ => (), - } - - let _ = codecs_done_sender.send(()); - } - }); + let mut discovery_started = false; + for discovery_info in stream.discoveries.iter() { + if matches!(discovery_info.type_, DiscoveryType::Initial) { + discovery_started = true; + } + for src in discovery_info.srcs() { + if let Err(err) = src.push_buffer(buffer.clone()) { + gst::log!(CAT, obj: src, "Failed to push buffer: {}", err); } - - gst::Pad::event_default(pad, Some(element), event) } } - _ => gst::Pad::event_default(pad, Some(element), event), - } + + if discovery_started { + // Discovery already started, we pushed the buffer to keep it + // going + return; + } + + let discovery_info = stream.create_discovery(DiscoveryType::Initial); + stream.discoveries.push(discovery_info.clone()); + + let codecs = if !state.codecs.is_empty() { + Codecs::from_map(&state.codecs) + } else { + let settings = self.settings.lock().unwrap(); + let codecs = Codecs::list_encoders( + settings.video_caps.iter().chain(settings.audio_caps.iter()), + ); + + state.codecs = codecs.to_map(); + + codecs + }; + + (codecs, discovery_info) + }; + + let stream_name_clone = stream_name.to_owned(); + RUNTIME.spawn(glib::clone!(@weak self as this, @strong discovery_info => async move { + let element = &*this.obj(); + let (fut, handle) = futures::future::abortable( + Self::lookup_caps( + element, + discovery_info, + stream_name_clone, + gst::Caps::new_any(), + &codecs, + )); + + let (codecs_done_sender, codecs_done_receiver) = + futures::channel::oneshot::channel(); + + // Compiler isn't budged by dropping state before await, + // so let's make a new scope instead. + { + let mut state = this.state.lock().unwrap(); + state.codecs_abort_handles.push(handle); + state.codecs_done_receivers.push(codecs_done_receiver); + } + + match fut.await { + Ok(Err(err)) => { + gst::error!(CAT, imp: this, "Error running discovery: {err:?}"); + gst::element_error!( + this.obj(), + gst::StreamError::CodecNotFound, + ["Failed to look up output caps: {err:?}"] + ); + } + Ok(Ok(_)) => { + let settings = this.settings.lock().unwrap(); + let mut state = this.state.lock().unwrap(); + state.codec_discovery_done = state.streams.values().all(|stream| stream.out_caps.is_some()); + let signaller = settings.signaller.clone(); + drop(settings); + if state.should_start_signaller(element) { + state.signaller_state = SignallerState::Started; + drop(state); + signaller.start(); + } + } + _ => (), + } + + let _ = codecs_done_sender.send(()); + })); + + let mut state = self.state.lock().unwrap(); + let stream = state.streams.get_mut(stream_name).unwrap(); + stream.remove_discovery(&discovery_info); + } + + fn chain( + &self, + pad: &gst::GhostPad, + buffer: gst::Buffer, + ) -> Result { + self.start_stream_discovery_if_needed(pad.name().as_str(), &buffer); + + gst::ProxyPad::chain_default(pad, Some(&*self.obj()), buffer) } } @@ -3349,7 +3475,7 @@ impl GstObjectImpl for BaseWebRTCSink {} impl ElementImpl for BaseWebRTCSink { fn pad_templates() -> &'static [gst::PadTemplate] { static PAD_TEMPLATES: Lazy> = Lazy::new(|| { - let caps = gst::Caps::builder_full() + let mut caps_builder = gst::Caps::builder_full() .structure(gst::Structure::builder("video/x-raw").build()) .structure_with_features( gst::Structure::builder("video/x-raw").build(), @@ -3362,22 +3488,30 @@ impl ElementImpl for BaseWebRTCSink { .structure_with_features( gst::Structure::builder("video/x-raw").build(), gst::CapsFeatures::new([NVMM_MEMORY_FEATURE]), - ) - .build(); + ); + + for codec in Codecs::video_codecs() { + caps_builder = caps_builder.structure(codec.caps.structure(0).unwrap().to_owned()); + } + let video_pad_template = gst::PadTemplate::new( "video_%u", gst::PadDirection::Sink, gst::PadPresence::Request, - &caps, + &caps_builder.build(), ) .unwrap(); - let caps = gst::Caps::builder("audio/x-raw").build(); + let mut caps_builder = + gst::Caps::builder_full().structure(gst::Structure::builder("audio/x-raw").build()); + for codec in Codecs::audio_codecs() { + caps_builder = caps_builder.structure(codec.caps.structure(0).unwrap().to_owned()); + } let audio_pad_template = gst::PadTemplate::new( "audio_%u", gst::PadDirection::Sink, gst::PadPresence::Request, - &caps + &caps_builder.build(), ) .unwrap(); @@ -3418,6 +3552,13 @@ impl ElementImpl for BaseWebRTCSink { let sink_pad = gst::GhostPad::builder_from_template(templ) .name(name.as_str()) + .chain_function(|pad, parent, buffer| { + BaseWebRTCSink::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |this| this.chain(pad, buffer), + ) + }) .event_function(|pad, parent, event| { BaseWebRTCSink::catch_panic_pad_function( parent, @@ -3441,6 +3582,7 @@ impl ElementImpl for BaseWebRTCSink { clocksync: None, is_video, serial, + discoveries: Default::default(), }, );