diff --git a/net/webrtc/Cargo.toml b/net/webrtc/Cargo.toml index b70411b5..a4cdb971 100644 --- a/net/webrtc/Cargo.toml +++ b/net/webrtc/Cargo.toml @@ -17,6 +17,7 @@ gst-sdp = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package gst-rtp = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package = "gstreamer-rtp", features = ["v1_20"] } gst-utils = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package = "gstreamer-utils" } gst-base = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package = "gstreamer-base" } +uuid = { version = "1", features = ["v4"] } once_cell = "1.0" anyhow = "1" @@ -50,7 +51,6 @@ url-escape = "0.1.1" tracing = { version = "0.1", features = ["log"] } tracing-subscriber = { version = "0.3", features = ["registry", "env-filter"] } tracing-log = "0.1" -uuid = { version = "1", features = ["v4"] } clap = { version = "4", features = ["derive"] } [lib] diff --git a/net/webrtc/src/utils.rs b/net/webrtc/src/utils.rs index 75885f0f..8d9ee2e2 100644 --- a/net/webrtc/src/utils.rs +++ b/net/webrtc/src/utils.rs @@ -159,10 +159,10 @@ impl Codec { let encoder = Self::get_encoder_for_caps(caps, encoders); let payloader = Self::get_payloader_for_codec(name, payloaders); - let encoding_info = if encoder.is_some() && payloader.is_some() { + let encoding_info = if let (Some(encoder), Some(payloader)) = (encoder, payloader) { Some(EncodingInfo { - encoder: encoder.unwrap(), - payloader: payloader.unwrap(), + encoder, + payloader, output_filter: None, }) } else { @@ -323,17 +323,54 @@ impl Codec { }) } - pub fn build_payloader(&self) -> Option> { + pub fn build_payloader(&self, pt: u32) -> Option { self.encoding_info.as_ref().map(|info| { - info.payloader.create().build().with_context(|| { - format!( - "Creating payloader {}", - self.encoding_info.as_ref().unwrap().payloader.name() - ) - }) + let mut res = info + .payloader + .create() + .property("mtu", 1200_u32) + .property("pt", pt); + + if ["vp8enc", "vp9enc"].contains(&self.encoder_name().unwrap().as_str()) { + res = res.property_from_str("picture-id-mode", "15-bit"); + } + + res.build().unwrap() }) } + pub fn raw_converter_filter(&self) -> Result { + let caps = if self.is_video() { + let mut structure_builder = gst::Structure::builder("video/x-raw") + .field("pixel-aspect-ratio", gst::Fraction::new(1, 1)); + + if self.encoder_name().map(|e| e.as_str() == "nvh264enc").unwrap_or(false) { + // Quirk: nvh264enc can perform conversion from RGB formats, but + // doesn't advertise / negotiate colorimetry correctly, leading + // to incorrect color display in Chrome (but interestingly not in + // Firefox). In any case, restrict to exclude RGB formats altogether, + // and let videoconvert do the conversion properly if needed. + structure_builder = + structure_builder.field("format", gst::List::new(["NV12", "YV12", "I420"])); + } + + gst::Caps::builder_full_with_any_features() + .structure(structure_builder.build()) + .build() + } else { + gst::Caps::builder("audio/x-raw").build() + }; + + gst::ElementFactory::make("capsfilter") + .property("caps", &caps) + .build() + .with_context(|| "Creating capsfilter caps") + } + + pub fn encoder_factory(&self) -> Option { + self.encoding_info.as_ref().map(|info| info.encoder.clone()) + } + pub fn encoder_name(&self) -> Option { self.encoding_info .as_ref() @@ -341,16 +378,49 @@ impl Codec { } pub fn set_output_filter(&mut self, caps: gst::Caps) { - self.encoding_info - .as_mut() - .map(|info| info.output_filter = Some(caps)); + if let Some(info) = self.encoding_info.as_mut() { + info.output_filter = Some(caps); + } } pub fn output_filter(&self) -> Option { self.encoding_info .as_ref() - .map(|info| info.output_filter.clone()) - .flatten() + .and_then(|info| info.output_filter.clone()) + } + + pub fn build_parser(&self) -> Result, Error> { + match self.name.as_str() { + "H264" => make_element("h264parse", None), + "H265" => make_element("h265parse", None), + _ => return Ok(None), + } + .map(Some) + } + + pub fn parser_caps(&self, force_profile: bool) -> gst::Caps { + let codec_caps_name = self.caps.structure(0).unwrap().name(); + match self.name.as_str() { + "H264" => { + if force_profile { + gst::debug!( + CAT, + "No H264 profile requested, selecting constrained-baseline" + ); + + gst::Caps::builder(codec_caps_name) + .field("stream-format", "avc") + .field("profile", "constrained-baseline") + .build() + } else { + gst::Caps::builder(codec_caps_name) + .field("stream-format", "avc") + .build() + } + } + "H265" => gst::Caps::new_empty_simple("video/x-h265"), + _ => gst::Caps::new_any(), + } } } @@ -382,12 +452,22 @@ impl Deref for Codecs { } impl Codecs { - pub fn to_map(self) -> BTreeMap { + pub fn to_map(&self) -> BTreeMap { self.0 - .into_iter() - .map(|codec| (codec.payload().unwrap(), codec)) + .iter() + .map(|codec| (codec.payload().unwrap(), codec.clone())) .collect() } + + pub fn from_map(codecs: &BTreeMap) -> Self { + Self(codecs.values().cloned().collect()) + } + + pub fn find_for_encoded_caps(&self, caps: &gst::Caps) -> Option { + self.iter() + .find(|codec| codec.caps.can_intersect(caps) && codec.encoding_info.is_some()) + .cloned() + } } static CODECS: Lazy = Lazy::new(|| { @@ -455,14 +535,14 @@ impl Codecs { CODECS .iter() .find(|codec| codec.name == encoding_name) - .map(|codec| codec.clone()) + .cloned() } pub fn video_codecs() -> Vec { CODECS .iter() .filter(|codec| codec.stream_type == gst::StreamType::VIDEO) - .map(|codec| codec.clone()) + .cloned() .collect() } @@ -470,7 +550,7 @@ impl Codecs { CODECS .iter() .filter(|codec| codec.stream_type == gst::StreamType::AUDIO) - .map(|codec| codec.clone()) + .cloned() .collect() } @@ -530,3 +610,24 @@ impl Codecs { ) } } + +pub fn is_raw_caps(caps: &gst::Caps) -> bool { + assert!(caps.is_fixed()); + ["video/x-raw", "audio/x-raw"].contains(&caps.structure(0).unwrap().name().as_str()) +} + +pub fn cleanup_codec_caps(mut caps: gst::Caps) -> gst::Caps { + assert!(caps.is_fixed()); + + if let Some(s) = caps.make_mut().structure_mut(0) { + if ["video/x-h264", "video/x-h265"].contains(&s.name().as_str()) { + s.remove_fields(["codec_data"]); + } else if ["video/x-vp8", "video/x-vp9"].contains(&s.name().as_str()) { + s.remove_fields(["profile"]); + } else if s.name() == "audio/x-opus" { + s.remove_fields(["streamheader"]); + } + } + + caps +} diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index c2a6ef07..c260993a 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MPL-2.0 -use crate::utils::*; +use crate::utils::{cleanup_codec_caps, is_raw_caps, make_element, Codec, Codecs}; use anyhow::Context; use gst::glib; use gst::prelude::*; @@ -15,6 +15,7 @@ use futures::prelude::*; use anyhow::{anyhow, Error}; use once_cell::sync::Lazy; use std::collections::HashMap; + use std::ops::Mul; use std::sync::{Arc, Condvar, Mutex}; @@ -82,6 +83,50 @@ struct Settings { signaller: Signallable, } +/// Type of discovery, used to differentiate between initial discovery +/// and discovery initiated by client offer +#[derive(Debug, Clone, PartialEq, Eq)] +enum DiscoveryType { + /// Initial discovery of our input streams + Initial, + /// Discovery to select a specific codec as requested by the remote peer + CodecSelection, +} + +#[derive(Debug, Clone)] +struct DiscoveryInfo { + id: String, + type_: DiscoveryType, + caps: gst::Caps, + srcs: Arc>>, +} + +impl DiscoveryInfo { + fn new(type_: DiscoveryType, caps: gst::Caps) -> Self { + Self { + id: uuid::Uuid::new_v4().to_string(), + type_, + caps, + srcs: Default::default(), + } + } + + fn srcs(&self) -> Vec { + self.srcs.lock().unwrap().clone() + } + + fn create_src(&self) -> gst_app::AppSrc { + let src = gst_app::AppSrc::builder() + .caps(&self.caps) + .format(gst::Format::Time) + .build(); + + self.srcs.lock().unwrap().push(src.clone()); + + src + } +} + /// Wrapper around our sink pads #[derive(Debug, Clone)] struct InputStream { @@ -97,6 +142,8 @@ struct InputStream { serial: u32, /// Whether the input stream is video or not is_video: bool, + /// Information about currently running codec discoveries + discoveries: Vec, } /// Wrapper around webrtcbin pads @@ -192,9 +239,9 @@ struct State { sessions: HashMap, codecs: BTreeMap, /// Used to abort codec discovery - codecs_abort_handle: Option, + codecs_abort_handles: Vec, /// Used to wait for the discovery task to fully stop - codecs_done_receiver: Option>, + codecs_done_receivers: Vec>, /// Used to determine whether we can start the signaller when going to Playing, /// or whether we should wait codec_discovery_done: bool, @@ -267,15 +314,11 @@ impl Default for Settings { Self { video_caps: Codecs::video_codecs() .into_iter() - .map(|codec| codec.caps.iter().map(|s| s.to_owned()).collect::>()) - .flatten() - .into_iter() + .flat_map(|codec| codec.caps.iter().map(|s| s.to_owned()).collect::>()) .collect::(), audio_caps: Codecs::audio_codecs() .into_iter() - .map(|codec| codec.caps.iter().map(|s| s.to_owned()).collect::>()) - .flatten() - .into_iter() + .flat_map(|codec| codec.caps.iter().map(|s| s.to_owned()).collect::>()) .collect::(), stun_server: DEFAULT_STUN_SERVER.map(String::from), turn_servers: gst::Array::new(Vec::new() as Vec), @@ -301,8 +344,8 @@ impl Default for State { signaller_state: SignallerState::Stopped, sessions: HashMap::new(), codecs: BTreeMap::new(), - codecs_abort_handle: None, - codecs_done_receiver: None, + codecs_abort_handles: Vec::new(), + codecs_done_receivers: Vec::new(), codec_discovery_done: false, audio_serial: 0, video_serial: 0, @@ -456,184 +499,186 @@ fn configure_encoder(enc: &gst::Element, start_bitrate: u32) { } } -/// Bit of an awkward function, but the goal here is to keep -/// most of the encoding code for consumers in line with -/// the codec discovery code, and this gets the job done. -#[allow(clippy::too_many_arguments)] // This needs some more refactoring and it will happen soon -fn setup_encoding( - element: &super::BaseWebRTCSink, - pipeline: &gst::Pipeline, - src: &gst::Element, - input_caps: &gst::Caps, - output_caps: &gst::Caps, - codec: &Codec, - mut encoded_filter: Option, +/// Set of elements used in an EncodingChain +struct EncodingChain { + raw_filter: Option, + encoder: Option, + pay_filter: gst::Element, +} + +struct EncodingChainBuilder { + /// Caps of the input chain + input_caps: gst::Caps, + //// Caps expected after the payloader + output_caps: gst::Caps, + /// The Codec representing wanted encoding + codec: Codec, + /// The SSRC to use for the RTP stream if any + /// Filter element between the encoder and the payloader. + encoded_filter: Option, ssrc: Option, + /// The TWCC ID to use for payloaded stream twcc: Option, -) -> Result<(gst::Element, gst::Element, gst::Element), Error> { - gst::trace!( - CAT, - "Setting up encoding, input caps: {input_caps}, \ - output caps: {output_caps}, codec: {codec:?}, twcc: {twcc:?}" - ); +} - let conv = match codec.is_video() { - true => make_converter_for_video_caps(input_caps)?.upcast(), - false => gst::parse_bin_from_description("audioresample ! audioconvert", true)?.upcast(), - }; - - let conv_filter = make_element("capsfilter", None)?; - - let enc = codec - .build_encoder() - .expect("Encoders should always have been set in the CodecInfo we handle")?; - let parse_filter = make_element("capsfilter", None)?; - let pay = codec - .build_payloader() - .expect("Payloaders should always have been set in the CodecInfo we handle")?; - let pay_filter = make_element("capsfilter", None)?; - - pay.set_property("mtu", 1200_u32); - pay.set_property("pt", codec.payload().unwrap() as u32); - - if let Some(ssrc) = ssrc { - pay.set_property("ssrc", ssrc); - } - - pay_filter.set_property("caps", output_caps); - - pipeline - .add_many([&conv, &conv_filter, &enc, &parse_filter, &pay, &pay_filter]) - .unwrap(); - gst::Element::link_many([src, &conv, &conv_filter, &enc]) - .with_context(|| "Linking encoding elements")?; - - let codec_name = codec.caps.structure(0).unwrap().name(); - - let enc_last = if let Some(encoded_filter) = encoded_filter.take() { - pipeline.add(&encoded_filter).unwrap(); - enc.link(&encoded_filter) - .with_context(|| "Linking encoded filter")?; - - encoded_filter - } else { - enc.clone() - }; - - if let Some(parser) = if codec_name == "video/x-h264" { - Some(make_element("h264parse", None)?) - } else if codec_name == "video/x-h265" { - Some(make_element("h265parse", None)?) - } else { - None - } { - pipeline.add(&parser).unwrap(); - gst::Element::link_many([&enc_last, &parser, &parse_filter]) - .with_context(|| "Linking encoding elements")?; - } else { - gst::Element::link_many([&enc_last, &parse_filter]) - .with_context(|| "Linking encoding elements")?; - } - - let conv_caps = if codec.is_video() { - let mut structure_builder = gst::Structure::builder("video/x-raw") - .field("pixel-aspect-ratio", gst::Fraction::new(1, 1)); - - if codec.encoder_name().unwrap().as_str() == "nvh264enc" { - // Quirk: nvh264enc can perform conversion from RGB formats, but - // doesn't advertise / negotiate colorimetry correctly, leading - // to incorrect color display in Chrome (but interestingly not in - // Firefox). In any case, restrict to exclude RGB formats altogether, - // and let videoconvert do the conversion properly if needed. - structure_builder = - structure_builder.field("format", gst::List::new(["NV12", "YV12", "I420"])); +impl EncodingChainBuilder { + fn new( + input_caps: &gst::Caps, + output_caps: &gst::Caps, + codec: &Codec, + encoded_filter: Option, + ) -> Self { + Self { + input_caps: input_caps.clone(), + output_caps: output_caps.clone(), + codec: codec.clone(), + encoded_filter, + ssrc: None, + twcc: None, } - - gst::Caps::builder_full_with_any_features() - .structure(structure_builder.build()) - .build() - } else { - gst::Caps::builder("audio/x-raw").build() - }; - - match codec.encoder_name().unwrap().as_str() { - "vp8enc" | "vp9enc" => { - pay.set_property_from_str("picture-id-mode", "15-bit"); - } - _ => (), } - /* We only enforce TWCC in the offer caps, once a remote description - * has been set it will get automatically negotiated. This is necessary - * because the implementor in Firefox had apparently not understood the - * concept of *transport-wide* congestion control, and firefox doesn't - * provide feedback for audio packets. - */ - if let Some(idx) = twcc { - let twcc_extension = gst_rtp::RTPHeaderExtension::create_from_uri(RTP_TWCC_URI).unwrap(); - twcc_extension.set_id(idx); - pay.emit_by_name::<()>("add-extension", &[&twcc_extension]); + fn ssrc(mut self, ssrc: u32) -> Self { + self.ssrc = Some(ssrc); + self } - conv_filter.set_property("caps", conv_caps); + fn twcc(mut self, twcc: u32) -> Self { + self.twcc = Some(twcc); + self + } - let parse_caps = if codec_name == "video/x-h264" { - if output_caps.is_any() { - gst::debug!( - CAT, - obj: element, - "No H264 profile requested, selecting constrained-baseline" - ); + fn build(self, pipeline: &gst::Pipeline, src: &gst::Element) -> Result { + gst::trace!( + CAT, + obj: pipeline, + "Setting up encoding, input caps: {input_caps}, \ + output caps: {output_caps}, codec: {codec:?}, twcc: {twcc:?}", + input_caps = self.input_caps, + output_caps = self.output_caps, + codec = self.codec, + twcc = self.twcc, + ); - gst::Caps::builder(codec_name) - .field("stream-format", "avc") - .field("profile", "constrained-baseline") - .build() + let needs_encoding = is_raw_caps(&self.input_caps); + let mut elements: Vec = Vec::new(); + + let (raw_filter, encoder) = if needs_encoding { + elements.push(match self.codec.is_video() { + true => make_converter_for_video_caps(&self.input_caps)?.upcast(), + false => { + gst::parse_bin_from_description("audioresample ! audioconvert", true)?.upcast() + } + }); + + let raw_filter = self.codec.raw_converter_filter()?; + elements.push(raw_filter.clone()); + + let encoder = self + .codec + .build_encoder() + .expect("We should always have an encoder for negotiated codecs")?; + elements.push(encoder.clone()); + elements.push(make_element("capsfilter", None)?); + + (Some(raw_filter), Some(encoder)) } else { - /* When output caps were specified, we are answering an offer - * and should not force a profile */ - gst::Caps::builder(codec_name) - .field("stream-format", "avc") - .build() + (None, None) + }; + + if let Some(parser) = self.codec.build_parser()? { + elements.push(parser); } - } else if codec_name == "video/x-h265" { - gst::Caps::builder(codec_name) - .field("stream-format", "hvc1") + + // Only force the profile when output caps were not specified, either + // through input caps or because we are answering an offer + let force_profile = self.output_caps.is_any() && needs_encoding; + elements.push( + gst::ElementFactory::make("capsfilter") + .property("caps", self.codec.parser_caps(force_profile)) + .build() + .with_context(|| "Failed to make element capsfilter")?, + ); + + if let Some(ref encoded_filter) = self.encoded_filter { + elements.push(encoded_filter.clone()); + } + + let pay = self + .codec + .build_payloader( + self.codec + .payload() + .expect("Negotiated codec should always have pt set") as u32, + ) + .expect("Payloaders should always have been set in the CodecInfo we handle"); + + if let Some(ssrc) = self.ssrc { + pay.set_property("ssrc", ssrc); + } + + /* We only enforce TWCC in the offer caps, once a remote description + * has been set it will get automatically negotiated. This is necessary + * because the implementor in Firefox had apparently not understood the + * concept of *transport-wide* congestion control, and firefox doesn't + * provide feedback for audio packets. + */ + if let Some(idx) = self.twcc { + let twcc_extension = + gst_rtp::RTPHeaderExtension::create_from_uri(RTP_TWCC_URI).unwrap(); + twcc_extension.set_id(idx); + pay.emit_by_name::<()>("add-extension", &[&twcc_extension]); + } + elements.push(pay); + + let pay_filter = gst::ElementFactory::make("capsfilter") + .property("caps", self.output_caps) .build() - } else { - gst::Caps::new_any() - }; + .with_context(|| "Failed to make payloader")?; + elements.push(pay_filter.clone()); - parse_filter.set_property("caps", parse_caps); + for element in &elements { + pipeline.add(element).unwrap(); + } - gst::Element::link_many([&parse_filter, &pay, &pay_filter]) - .with_context(|| "Linking encoding elements")?; + elements.insert(0, src.clone()); + gst::Element::link_many(elements.iter().collect::>().as_slice()) + .with_context(|| "Linking encoding elements")?; - Ok((enc, conv_filter, pay_filter)) + Ok(EncodingChain { + raw_filter, + encoder, + pay_filter, + }) + } } impl VideoEncoder { fn new( - element: gst::Element, - filter: gst::Element, + encoding_elements: &EncodingChain, video_info: gst_video::VideoInfo, peer_id: &str, codec_name: &str, transceiver: gst_webrtc::WebRTCRTPTransceiver, - ) -> Self { + ) -> Option { let halved_framerate = video_info.fps().mul(gst::Fraction::new(1, 2)); - - Self { - factory_name: element.factory().unwrap().name().into(), + Some(Self { + factory_name: encoding_elements + .encoder + .as_ref()? + .factory() + .unwrap() + .name() + .into(), codec_name: codec_name.to_string(), - element, - filter, + element: encoding_elements.encoder.as_ref()?.clone(), + filter: encoding_elements.raw_filter.as_ref()?.clone(), halved_framerate, video_info, session_id: peer_id.to_string(), mitigation_mode: WebRTCSinkMitigationMode::NONE, transceiver, - } + }) } fn bitrate(&self) -> i32 { @@ -756,7 +801,7 @@ impl State { gst::info!(CAT, "Ending session {}", session.id); session.pipeline.debug_to_dot_file_with_ts( gst::DebugGraphDetails::all(), - format!("removing-peer-{}-", session.peer_id,), + format!("removing-session-{}-", session.peer_id,), ); for ssrc in session.webrtc_pads.keys() { @@ -914,29 +959,23 @@ impl Session { let pay_filter = make_element("capsfilter", None)?; self.pipeline.add(&pay_filter).unwrap(); - let output_caps = codec - .output_filter() - .clone() - .unwrap_or_else(gst::Caps::new_any); + let output_caps = codec.output_filter().unwrap_or_else(gst::Caps::new_any); - let encoded_filter = element.emit_by_name::>( - "request-encoded-filter", - &[&Some(&self.peer_id), &stream_name, &codec.caps], - ); - - let (enc, raw_filter, encoding_sink) = setup_encoding( - element, - &self.pipeline, - &appsrc, + let encoding_chain = EncodingChainBuilder::new( &webrtc_pad.in_caps, &output_caps, &codec, - encoded_filter, - Some(webrtc_pad.ssrc), - None, - )?; + element.emit_by_name::>( + "request-encoded-filter", + &[&Some(&self.peer_id), &stream_name, &codec.caps], + ), + ) + .ssrc(webrtc_pad.ssrc) + .build(&self.pipeline, &appsrc)?; - element.emit_by_name::("encoder-setup", &[&self.peer_id, &stream_name, &enc]); + if let Some(ref enc) = encoding_chain.encoder { + element.emit_by_name::("encoder-setup", &[&self.peer_id, &stream_name, &enc]); + } // At this point, the peer has provided its answer, and we want to // let the payloader / encoder perform negotiation according to that. @@ -983,43 +1022,42 @@ impl Session { if codec.is_video() { let video_info = gst_video::VideoInfo::from_caps(&webrtc_pad.in_caps)?; - let mut enc = VideoEncoder::new( - enc, - raw_filter, + if let Some(mut enc) = VideoEncoder::new( + &encoding_chain, video_info, &self.peer_id, codec.caps.structure(0).unwrap().name(), transceiver, - ); - - match self.cc_info.heuristic { - WebRTCSinkCongestionControl::Disabled => { - // If congestion control is disabled, we simply use the highest - // known "safe" value for the bitrate. - enc.set_bitrate(element, self.cc_info.max_bitrate as i32); - enc.transceiver.set_property("fec-percentage", 50u32); - } - WebRTCSinkCongestionControl::Homegrown => { - if let Some(congestion_controller) = self.congestion_controller.as_mut() { - congestion_controller.target_bitrate_on_delay += enc.bitrate(); - congestion_controller.target_bitrate_on_loss = - congestion_controller.target_bitrate_on_delay; - enc.transceiver.set_property("fec-percentage", 0u32); - } else { - /* If congestion control is disabled, we simply use the highest - * known "safe" value for the bitrate. */ + ) { + match self.cc_info.heuristic { + WebRTCSinkCongestionControl::Disabled => { + // If congestion control is disabled, we simply use the highest + // known "safe" value for the bitrate. enc.set_bitrate(element, self.cc_info.max_bitrate as i32); enc.transceiver.set_property("fec-percentage", 50u32); } + WebRTCSinkCongestionControl::Homegrown => { + if let Some(congestion_controller) = self.congestion_controller.as_mut() { + congestion_controller.target_bitrate_on_delay += enc.bitrate(); + congestion_controller.target_bitrate_on_loss = + congestion_controller.target_bitrate_on_delay; + enc.transceiver.set_property("fec-percentage", 0u32); + } else { + /* If congestion control is disabled, we simply use the highest + * known "safe" value for the bitrate. */ + enc.set_bitrate(element, self.cc_info.max_bitrate as i32); + enc.transceiver.set_property("fec-percentage", 50u32); + } + } + _ => enc.transceiver.set_property("fec-percentage", 0u32), } - _ => enc.transceiver.set_property("fec-percentage", 0u32), - } - self.encoders.push(enc); + self.encoders.push(enc); - if let Some(rtpgccbwe) = self.rtpgccbwe.as_ref() { - let max_bitrate = self.cc_info.max_bitrate * (self.encoders.len() as u32); - rtpgccbwe.set_property("max-bitrate", max_bitrate); + if let Some(rtpgccbwe) = self.rtpgccbwe.as_ref() { + let max_bitrate = self.cc_info.max_bitrate * (self.encoders.len() as u32); + rtpgccbwe.set_property("max-bitrate", max_bitrate); + } } } @@ -1029,7 +1067,7 @@ impl Session { .sync_children_states() .with_context(|| format!("Connecting input stream for {}", self.peer_id))?; - encoding_sink.link(&pay_filter)?; + encoding_chain.pay_filter.link(&pay_filter)?; let srcpad = pay_filter.static_pad("src").unwrap(); @@ -1096,6 +1134,28 @@ impl InputStream { appsink.set_state(gst::State::Null).unwrap(); } } + + fn create_discovery(&mut self, type_: DiscoveryType) -> DiscoveryInfo { + let discovery_info = DiscoveryInfo::new( + type_, + self.in_caps.clone().expect( + "We should never create a discovery for a stream that doesn't have caps set", + ), + ); + + self.discoveries.push(discovery_info.clone()); + + discovery_info + } + + fn remove_discovery(&mut self, discovery: &DiscoveryInfo) { + let id = self + .discoveries + .iter() + .position(|d| d.id == discovery.id) + .expect("We expect discovery to always be in the list of discoverers when removing"); + self.discoveries.remove(id); + } } impl NavigationEventHandler { @@ -1184,7 +1244,7 @@ impl BaseWebRTCSink { async fn request_webrtcbin_pad( element: &super::BaseWebRTCSink, webrtcbin: &gst::Element, - stream: &InputStream, + stream: &mut InputStream, media: Option<&gst_sdp::SDPMediaRef>, settings: &Settings, webrtc_pads: &mut HashMap, @@ -1195,8 +1255,11 @@ impl BaseWebRTCSink { let mut payloader_caps = match media { Some(media) => { + let discovery_info = stream.create_discovery(DiscoveryType::CodecSelection); + let codec = BaseWebRTCSink::select_codec( element, + &discovery_info, media, &stream.in_caps.as_ref().unwrap().clone(), &stream.sink_pad.name(), @@ -1204,6 +1267,8 @@ impl BaseWebRTCSink { ) .await; + stream.remove_discovery(&discovery_info); + match codec { Some(codec) => { gst::debug!( @@ -1315,15 +1380,17 @@ impl BaseWebRTCSink { .iter_mut() .for_each(|(_, stream)| stream.unprepare(element)); - if let Some(handle) = state.codecs_abort_handle.take() { + let codecs_abort_handle = std::mem::take(&mut state.codecs_abort_handles); + codecs_abort_handle.into_iter().for_each(|handle| { handle.abort(); - } + }); - if let Some(receiver) = state.codecs_done_receiver.take() { + let codecs_done_receiver = std::mem::take(&mut state.codecs_done_receivers); + codecs_done_receiver.into_iter().for_each(|receiver| { RUNTIME.spawn(async { let _ = receiver.await; }); - } + }); state.codec_discovery_done = false; state.codecs = BTreeMap::new(); @@ -1582,6 +1649,7 @@ impl BaseWebRTCSink { async fn select_codec( element: &super::BaseWebRTCSink, + discovery_info: &DiscoveryInfo, media: &gst_sdp::SDPMediaRef, in_caps: &gst::Caps, stream_name: &str, @@ -1630,6 +1698,10 @@ impl BaseWebRTCSink { let encoding_name = s.get::("encoding-name").unwrap(); if let Some(mut codec) = Codecs::find(&encoding_name) { + if !codec.can_encode() { + continue; + } + codec.set_pt(payload); for (user_caps, codecs_and_caps) in ordered_codecs_and_caps.iter_mut() { if codec.caps.is_subset(user_caps) { @@ -1662,7 +1734,6 @@ impl BaseWebRTCSink { } } - let in_caps = &in_caps; let futs = ordered_codecs_and_caps .iter() .flat_map(|(_, codecs_and_caps)| codecs_and_caps) @@ -1670,17 +1741,18 @@ impl BaseWebRTCSink { BaseWebRTCSink::run_discovery_pipeline( element, stream_name, - codec, - in_caps, + discovery_info, + codec.clone(), + in_caps.clone(), caps, twcc_idx, ) - .await - .map(|s| { - let mut codec = codec.clone(); - codec.set_output_filter([s].into_iter().collect()); - codec - }) + .await + .map(|s| { + let mut codec = codec.clone(); + codec.set_output_filter([s].into_iter().collect()); + codec + }) }); /* Run sequentially to avoid NVENC collisions */ @@ -2175,27 +2247,21 @@ impl BaseWebRTCSink { }; if let Some(idx) = streams.iter().position(|s| { - let stream_is_video = match s - .in_caps - .as_ref() - .unwrap() - .structure(0) - .unwrap() - .name() - .as_str() - { - "video/x-raw" => true, - "audio/x-raw" => false, - _ => unreachable!(), - }; + let structname = + s.in_caps.as_ref().unwrap().structure(0).unwrap().name(); + let stream_is_video = structname.starts_with("video/"); + + if !stream_is_video { + assert!(structname.starts_with("audio/")); + } media_is_video == stream_is_video }) { - let stream = streams.remove(idx); + let mut stream = streams.remove(idx); BaseWebRTCSink::request_webrtcbin_pad( &element, &webrtcbin, - &stream, + &mut stream, Some(media), &settings_clone, &mut webrtc_pads, @@ -2212,11 +2278,11 @@ impl BaseWebRTCSink { } } } else { - for stream in streams { + for mut stream in streams { BaseWebRTCSink::request_webrtcbin_pad( &element, &webrtcbin, - &stream, + &mut stream, None, &settings_clone, &mut webrtc_pads, @@ -2622,97 +2688,111 @@ impl BaseWebRTCSink { async fn run_discovery_pipeline( element: &super::BaseWebRTCSink, stream_name: &str, - codec: &Codec, - caps: &gst::Caps, + discovery_info: &DiscoveryInfo, + codec: Codec, + input_caps: gst::Caps, output_caps: &gst::Caps, twcc: Option, ) -> Result { let pipe = PipelineWrapper(gst::Pipeline::default()); - let src = if codec.is_video() { - make_element("videotestsrc", None)? - } else { - make_element("audiotestsrc", None)? - }; - let mut elements = vec![src.clone()]; + let has_raw_input = is_raw_caps(&input_caps); + let src = discovery_info.create_src(); + let mut elements = vec![src.clone().upcast::()]; + let encoding_chain_src = if codec.is_video() && has_raw_input { + elements.push(make_converter_for_video_caps(&input_caps)?); - if codec.is_video() { - elements.push(make_converter_for_video_caps(caps)?); - } + let capsfilter = make_element("capsfilter", Some("raw_capsfilter"))?; + elements.push(capsfilter.clone()); + + capsfilter + } else { + src.clone().upcast::() + }; gst::debug!( CAT, obj: element, - "Running discovery pipeline for input caps {caps} and output caps {output_caps} with codec {codec:?}" + "Running discovery pipeline for input caps {input_caps} and output caps {output_caps} with codec {codec:?}" ); - let capsfilter = make_element("capsfilter", None)?; - elements.push(capsfilter.clone()); + gst::debug!(CAT, obj: element, "Running discovery pipeline"); let elements_slice = &elements.iter().collect::>(); pipe.0.add_many(elements_slice).unwrap(); gst::Element::link_many(elements_slice) - .with_context(|| format!("Running discovery pipeline for caps {caps}"))?; + .with_context(|| format!("Running discovery pipeline for caps {input_caps}"))?; - let encoded_filter = element.emit_by_name::>( - "request-encoded-filter", - &[&Option::::None, &stream_name, &codec.caps], - ); - - let (_, _, encoding_sink) = setup_encoding( - element, - &pipe.0, - &capsfilter, - caps, + let mut encoding_chain_builder = EncodingChainBuilder::new( + &src.caps() + .expect("Caps should always be set when starting discovery"), output_caps, - codec, - encoded_filter, - None, - twcc, - )?; + &codec, + element.emit_by_name::>( + "request-encoded-filter", + &[&Option::::None, &stream_name, &codec.caps], + ), + ); + if let Some(twcc) = twcc { + encoding_chain_builder = encoding_chain_builder.twcc(twcc) + } + let encoding_chain = encoding_chain_builder.build(&pipe.0, &encoding_chain_src)?; - let sink = make_element("fakesink", None)?; + let sink = gst_app::AppSink::builder() + .callbacks( + gst_app::AppSinkCallbacks::builder() + .new_event(|sink| { + let obj = sink.pull_object().ok(); + if let Some(event) = obj.and_then(|o| o.downcast::().ok()) { + if let gst::EventView::Caps(caps) = event.view() { + sink.post_message(gst::message::Application::new( + gst::Structure::builder("payloaded_caps") + .field("caps", &caps.caps().to_owned()) + .build(), + )) + .expect("Could not send message"); + } + } - pipe.0.add(&sink).unwrap(); - - encoding_sink + true + }) + .build(), + ) + .build(); + pipe.0.add(sink.upcast_ref::()).unwrap(); + encoding_chain + .pay_filter .link(&sink) - .with_context(|| format!("Running discovery pipeline for caps {caps}"))?; - - capsfilter.set_property("caps", caps); - - src.set_property("num-buffers", 1); + .with_context(|| format!("Running discovery pipeline for caps {input_caps}"))?; let mut stream = pipe.0.bus().unwrap().stream(); pipe.0 .set_state(gst::State::Playing) - .with_context(|| format!("Running discovery pipeline for caps {caps}"))?; - - let in_caps = caps; + .with_context(|| format!("Running discovery pipeline for caps {input_caps}"))?; while let Some(msg) = stream.next().await { match msg.view() { gst::MessageView::Error(err) => { + gst::error!(CAT, "Error in discovery pipeline: {err:#?}"); pipe.0.debug_to_dot_file_with_ts( gst::DebugGraphDetails::all(), "webrtcsink-discovery-error", ); return Err(err.error().into()); } - gst::MessageView::Eos(_) => { - pipe.0.debug_to_dot_file_with_ts( - gst::DebugGraphDetails::all(), - "webrtcsink-discovery-done", - ); + gst::MessageView::Application(appmsg) => { + let caps = match appmsg.structure() { + Some(s) => { + if s.name().as_str() != "payloaded_caps" { + continue; + } - let caps = match encoding_sink.static_pad("src").unwrap().current_caps() { - Some(caps) => caps, - None => { - // https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/1713 - return Err(anyhow!("EOS but no caps were pushed")); + s.get::("caps").unwrap() } + _ => continue, }; + gst::info!(CAT, "Discovery pipeline got caps {caps:?}"); pipe.0.debug_to_dot_file_with_ts( gst::DebugGraphDetails::all(), "webrtcsink-discovery-done", @@ -2731,7 +2811,7 @@ impl BaseWebRTCSink { gst::debug!( CAT, obj: element, - "Codec discovery pipeline for caps {in_caps} with codec {codec:?} succeeded: {s}" + "Codec discovery pipeline for caps {input_caps} with codec {codec:?} succeeded: {s}" ); return Ok(s); } else { @@ -2749,36 +2829,61 @@ impl BaseWebRTCSink { async fn lookup_caps( element: &super::BaseWebRTCSink, + discovery_info: DiscoveryInfo, name: String, - in_caps: gst::Caps, output_caps: gst::Caps, codecs: &Codecs, - ) -> (String, gst::Caps) { - let sink_caps = in_caps.as_ref().to_owned(); + ) -> Result<(), Error> { + let futs = if let Some(codec) = codecs.find_for_encoded_caps(&discovery_info.caps) { + let mut caps = discovery_info.caps.clone(); - let is_video = match sink_caps.structure(0).unwrap().name().as_str() { - "video/x-raw" => true, - "audio/x-raw" => false, - _ => unreachable!(), + gst::info!( + CAT, + obj: element, + "Stream is already encoded with codec {}, still need to payload it", + codec.name + ); + + caps = cleanup_codec_caps(caps); + + vec![BaseWebRTCSink::run_discovery_pipeline( + element, + &name, + &discovery_info, + codec, + caps, + &output_caps, + Some(1), + )] + } else { + let sink_caps = discovery_info.caps.clone(); + + let is_video = match sink_caps.structure(0).unwrap().name().as_str() { + "video/x-raw" => true, + "audio/x-raw" => false, + _ => unreachable!(), + }; + + codecs + .iter() + .filter(|codec| codec.is_video() == is_video) + .map(|codec| { + BaseWebRTCSink::run_discovery_pipeline( + element, + &name, + &discovery_info, + codec.clone(), + sink_caps.clone(), + &output_caps, + Some(1), + ) + }) + .collect() }; let mut payloader_caps = gst::Caps::new_empty(); let payloader_caps_mut = payloader_caps.make_mut(); - let futs = codecs - .iter() - .filter(|codec| codec.is_video() == is_video) - .map(|codec| { - BaseWebRTCSink::run_discovery_pipeline( - element, - &name, - codec, - &sink_caps, - &output_caps, - Some(1), - ) - }); - for ret in futures::future::join_all(futs).await { match ret { Ok(s) => { @@ -2798,50 +2903,14 @@ impl BaseWebRTCSink { } } - (name, payloader_caps) - } - - async fn lookup_streams_caps(&self, element: &super::BaseWebRTCSink) -> Result<(), Error> { - let codecs = { - let settings = self.settings.lock().unwrap(); - - Codecs::list_encoders(settings.video_caps.iter().chain(settings.audio_caps.iter())) - }; - - gst::debug!(CAT, obj: element, "Looked up codecs {codecs:?}"); - - let futs: Vec<_> = self - .state - .lock() - .unwrap() - .streams - .iter() - .map(|(name, stream)| { - BaseWebRTCSink::lookup_caps( - element, - name.to_owned(), - stream.in_caps.as_ref().unwrap().to_owned(), - gst::Caps::new_any(), - &codecs, - ) - }) - .collect(); - - let caps: Vec<(String, gst::Caps)> = futures::future::join_all(futs).await; - - let mut state = self.state.lock().unwrap(); - - for (name, caps) in caps { - if caps.is_empty() { - return Err(anyhow!("No caps found for stream {}", name)); - } - - if let Some(mut stream) = state.streams.get_mut(&name) { - stream.out_caps = Some(caps); - } + let mut state = element.imp().state.lock().unwrap(); + if let Some(mut stream) = state.streams.get_mut(&name) { + stream.out_caps = Some(payloader_caps.clone()); } - state.codecs = codecs.to_map(); + if payloader_caps.is_empty() { + anyhow::bail!("No caps found for stream {name}"); + } Ok(()) } @@ -2866,87 +2935,144 @@ impl BaseWebRTCSink { ) -> bool { use gst::EventView; - match event.view() { - EventView::Caps(e) => { - if let Some(caps) = pad.current_caps() { - if caps.is_strictly_equal(e.caps()) { - // Nothing changed - true - } else { - gst::error!(CAT, obj: pad, "Renegotiation is not supported"); - false - } + if let EventView::Caps(e) = event.view() { + if let Some(caps) = pad.current_caps() { + if caps.is_strictly_equal(e.caps()) { + // Nothing changed + return true; } else { - gst::info!(CAT, obj: pad, "Received caps event {:?}", e); + gst::error!(CAT, obj: pad, "Renegotiation is not supported"); + return false; + } + } else { + gst::info!(CAT, obj: pad, "Received caps event {:?}", e); - let mut all_pads_have_caps = true; + self.state + .lock() + .unwrap() + .streams + .iter_mut() + .for_each(|(_, mut stream)| { + if stream.sink_pad.upcast_ref::() == pad { + stream.in_caps = Some(e.caps().to_owned()); + } + }); + } + } - self.state - .lock() - .unwrap() - .streams - .iter_mut() - .for_each(|(_, mut stream)| { - if stream.sink_pad.upcast_ref::() == pad { - stream.in_caps = Some(e.caps().to_owned()); - } else if stream.in_caps.is_none() { - all_pads_have_caps = false; - } - }); + gst::Pad::event_default(pad, Some(element), event) + } - if all_pads_have_caps { - let element_clone = element.downgrade(); - RUNTIME.spawn(async move { - if let Some(element) = element_clone.upgrade() { - let this = element.imp(); - let (fut, handle) = - futures::future::abortable(this.lookup_streams_caps(&element)); + fn start_stream_discovery_if_needed(&self, stream_name: &str, buffer: &gst::Buffer) { + let (codecs, discovery_info) = { + let mut state = self.state.lock().unwrap(); + let stream = state.streams.get_mut(stream_name).unwrap(); - let (codecs_done_sender, codecs_done_receiver) = - futures::channel::oneshot::channel(); + // Discovery already happened... nothing to do here. + if stream.out_caps.is_some() { + return; + } - // Compiler isn't budged by dropping state before await, - // so let's make a new scope instead. - { - let mut state = this.state.lock().unwrap(); - state.codecs_abort_handle = Some(handle); - state.codecs_done_receiver = Some(codecs_done_receiver); - } - - match fut.await { - Ok(Err(err)) => { - gst::error!(CAT, obj: element, "error: {}", err); - gst::element_error!( - element, - gst::StreamError::CodecNotFound, - ["Failed to look up output caps: {}", err] - ); - } - Ok(Ok(_)) => { - let settings = this.settings.lock().unwrap(); - let mut state = this.state.lock().unwrap(); - state.codec_discovery_done = true; - let signaller = settings.signaller.clone(); - drop(settings); - if state.should_start_signaller(&element) { - state.signaller_state = SignallerState::Started; - drop(state); - signaller.start(); - } - } - _ => (), - } - - let _ = codecs_done_sender.send(()); - } - }); + let mut discovery_started = false; + for discovery_info in stream.discoveries.iter() { + if matches!(discovery_info.type_, DiscoveryType::Initial) { + discovery_started = true; + } + for src in discovery_info.srcs() { + if let Err(err) = src.push_buffer(buffer.clone()) { + gst::log!(CAT, obj: src, "Failed to push buffer: {}", err); } - - gst::Pad::event_default(pad, Some(element), event) } } - _ => gst::Pad::event_default(pad, Some(element), event), - } + + if discovery_started { + // Discovery already started, we pushed the buffer to keep it + // going + return; + } + + let discovery_info = stream.create_discovery(DiscoveryType::Initial); + stream.discoveries.push(discovery_info.clone()); + + let codecs = if !state.codecs.is_empty() { + Codecs::from_map(&state.codecs) + } else { + let settings = self.settings.lock().unwrap(); + let codecs = Codecs::list_encoders( + settings.video_caps.iter().chain(settings.audio_caps.iter()), + ); + + state.codecs = codecs.to_map(); + + codecs + }; + + (codecs, discovery_info) + }; + + let stream_name_clone = stream_name.to_owned(); + RUNTIME.spawn(glib::clone!(@weak self as this, @strong discovery_info => async move { + let element = &*this.obj(); + let (fut, handle) = futures::future::abortable( + Self::lookup_caps( + element, + discovery_info, + stream_name_clone, + gst::Caps::new_any(), + &codecs, + )); + + let (codecs_done_sender, codecs_done_receiver) = + futures::channel::oneshot::channel(); + + // Compiler isn't budged by dropping state before await, + // so let's make a new scope instead. + { + let mut state = this.state.lock().unwrap(); + state.codecs_abort_handles.push(handle); + state.codecs_done_receivers.push(codecs_done_receiver); + } + + match fut.await { + Ok(Err(err)) => { + gst::error!(CAT, imp: this, "Error running discovery: {err:?}"); + gst::element_error!( + this.obj(), + gst::StreamError::CodecNotFound, + ["Failed to look up output caps: {err:?}"] + ); + } + Ok(Ok(_)) => { + let settings = this.settings.lock().unwrap(); + let mut state = this.state.lock().unwrap(); + state.codec_discovery_done = state.streams.values().all(|stream| stream.out_caps.is_some()); + let signaller = settings.signaller.clone(); + drop(settings); + if state.should_start_signaller(element) { + state.signaller_state = SignallerState::Started; + drop(state); + signaller.start(); + } + } + _ => (), + } + + let _ = codecs_done_sender.send(()); + })); + + let mut state = self.state.lock().unwrap(); + let stream = state.streams.get_mut(stream_name).unwrap(); + stream.remove_discovery(&discovery_info); + } + + fn chain( + &self, + pad: &gst::GhostPad, + buffer: gst::Buffer, + ) -> Result { + self.start_stream_discovery_if_needed(pad.name().as_str(), &buffer); + + gst::ProxyPad::chain_default(pad, Some(&*self.obj()), buffer) } } @@ -3349,7 +3475,7 @@ impl GstObjectImpl for BaseWebRTCSink {} impl ElementImpl for BaseWebRTCSink { fn pad_templates() -> &'static [gst::PadTemplate] { static PAD_TEMPLATES: Lazy> = Lazy::new(|| { - let caps = gst::Caps::builder_full() + let mut caps_builder = gst::Caps::builder_full() .structure(gst::Structure::builder("video/x-raw").build()) .structure_with_features( gst::Structure::builder("video/x-raw").build(), @@ -3362,22 +3488,30 @@ impl ElementImpl for BaseWebRTCSink { .structure_with_features( gst::Structure::builder("video/x-raw").build(), gst::CapsFeatures::new([NVMM_MEMORY_FEATURE]), - ) - .build(); + ); + + for codec in Codecs::video_codecs() { + caps_builder = caps_builder.structure(codec.caps.structure(0).unwrap().to_owned()); + } + let video_pad_template = gst::PadTemplate::new( "video_%u", gst::PadDirection::Sink, gst::PadPresence::Request, - &caps, + &caps_builder.build(), ) .unwrap(); - let caps = gst::Caps::builder("audio/x-raw").build(); + let mut caps_builder = + gst::Caps::builder_full().structure(gst::Structure::builder("audio/x-raw").build()); + for codec in Codecs::audio_codecs() { + caps_builder = caps_builder.structure(codec.caps.structure(0).unwrap().to_owned()); + } let audio_pad_template = gst::PadTemplate::new( "audio_%u", gst::PadDirection::Sink, gst::PadPresence::Request, - &caps + &caps_builder.build(), ) .unwrap(); @@ -3418,6 +3552,13 @@ impl ElementImpl for BaseWebRTCSink { let sink_pad = gst::GhostPad::builder_from_template(templ) .name(name.as_str()) + .chain_function(|pad, parent, buffer| { + BaseWebRTCSink::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |this| this.chain(pad, buffer), + ) + }) .event_function(|pad, parent, event| { BaseWebRTCSink::catch_panic_pad_function( parent, @@ -3441,6 +3582,7 @@ impl ElementImpl for BaseWebRTCSink { clocksync: None, is_video, serial, + discoveries: Default::default(), }, );