diff --git a/Cargo.lock b/Cargo.lock index fb42c823..8ae5a0b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1263,15 +1263,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "crossbeam-channel" -version = "0.5.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab3db02a9c5b5121e1e42fbdb1aeb65f5e02624cc58c43f2884c6ccac0b82f95" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "crossbeam-deque" version = "0.8.5" @@ -1370,6 +1361,16 @@ dependencies = [ "cipher", ] +[[package]] +name = "ctrlc" +version = "3.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82e95fbd621905b854affdc67943b043a0fbb6ed7385fd5a25650d19a8a6cfdf" +dependencies = [ + "nix 0.27.1", + "windows-sys 0.48.0", +] + [[package]] name = "darling" version = "0.20.8" @@ -2944,9 +2945,10 @@ dependencies = [ "aws-smithy-http", "aws-smithy-types", "aws-types", + "bytes", "chrono", "clap", - "crossbeam-channel", + "ctrlc", "data-encoding", "fastrand", "futures", @@ -4183,7 +4185,7 @@ dependencies = [ "if-addrs", "log", "multimap 0.8.3", - "nix", + "nix 0.23.2", "rand", "socket2 0.4.10", "thiserror", @@ -4674,6 +4676,17 @@ dependencies = [ "memoffset 0.6.5", ] +[[package]] +name = "nix" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" +dependencies = [ + "bitflags 2.5.0", + "cfg-if", + "libc", +] + [[package]] name = "nnnoiseless" version = "0.5.1" diff --git a/net/webrtc/Cargo.toml b/net/webrtc/Cargo.toml index 42a7c5aa..a07d4eda 100644 --- a/net/webrtc/Cargo.toml +++ b/net/webrtc/Cargo.toml @@ -58,7 +58,10 @@ livekit-protocol = { version = "0.3", optional = true } livekit-api = { version = "0.3", default-features = false, features = ["signal-client", "access-token", "native-tls"], optional = true } warp = {version = "0.3", optional = true } -crossbeam-channel = { version = "0.5", optional = true } +ctrlc = {version = "3.4.0", optional = true } + + +bytes = "1" [dev-dependencies] gst-plugin-rtp = { path = "../rtp" } @@ -78,7 +81,7 @@ path = "src/lib.rs" gst-plugin-version-helper.workspace = true [features] -default = ["v1_22", "aws", "janus", "livekit", "whip"] +default = ["v1_22", "aws", "janus", "livekit", "whip", "whep"] static = [] capi = [] v1_22 = ["gst/v1_22", "gst-app/v1_22", "gst-video/v1_22", "gst-webrtc/v1_22", "gst-sdp/v1_22", "gst-rtp/v1_22"] @@ -89,7 +92,8 @@ aws = ["dep:aws-config", "dep:aws-types", "dep:aws-credential-types", "dep:aws-s "dep:aws-sdk-kinesisvideosignaling", "dep:data-encoding", "dep:http", "dep:url-escape"] janus = ["dep:http"] livekit = ["dep:livekit-protocol", "dep:livekit-api"] -whip = ["dep:async-recursion", "dep:crossbeam-channel", "dep:reqwest", "dep:warp"] +whip = ["dep:async-recursion", "dep:reqwest", "dep:warp", "dep:ctrlc"] +whep = ["dep:async-recursion", "dep:reqwest", "dep:warp"] [package.metadata.capi] min_version = "0.9.21" @@ -119,3 +123,6 @@ name = "webrtc-precise-sync-send" [[example]] name = "webrtc-precise-sync-recv" + +[[example]] +name = "whipserver" diff --git a/net/webrtc/examples/whipserver.rs b/net/webrtc/examples/whipserver.rs new file mode 100644 index 00000000..22a4a6ac --- /dev/null +++ b/net/webrtc/examples/whipserver.rs @@ -0,0 +1,123 @@ +use std::process::exit; + +use anyhow::Error; +use clap::Parser; +use gst::prelude::*; + +#[derive(Parser, Debug)] +struct Args { + host_addr: String, +} + +fn link_video(pad: &gst::Pad, pipeline: &gst::Pipeline) { + let q = gst::ElementFactory::make_with_name( + "queue", + Some(format!("queue_{}", pad.name()).as_str()), + ) + .unwrap(); + // let vconv = gst::ElementFactory::make_with_name("videoconvert", Some(format!("vconv_{}",pad.name()).as_str())).unwrap(); + let vsink = gst::ElementFactory::make_with_name( + "autovideosink", + Some(format!("vsink_{}", pad.name()).as_str()), + ) + .unwrap(); + + pipeline.add_many([&q, &vsink]).unwrap(); + gst::Element::link_many([&q, &vsink]).unwrap(); + let qsinkpad = q.static_pad("sink").unwrap(); + pad.link(&qsinkpad).expect("linking should work"); + + q.sync_state_with_parent().unwrap(); + // vconv.sync_state_with_parent().unwrap(); + vsink.sync_state_with_parent().unwrap(); +} + +fn unlink_video(pad: &gst::Pad, pipeline: &gst::Pipeline) { + let q = pipeline + .by_name(format!("queue_{}", pad.name()).as_str()) + .unwrap(); + // let vconv = pipeline.by_name(format!("vconv_{}",pad.name()).as_str()).unwrap(); + let vsink = pipeline + .by_name(format!("vsink_{}", pad.name()).as_str()) + .unwrap(); + + q.set_state(gst::State::Null).unwrap(); + // vconv.set_state(gst::State::Null).unwrap(); + vsink.set_state(gst::State::Null).unwrap(); + + pipeline.remove_many([&q, &vsink]).unwrap(); +} + +fn link_audio(_pad: &gst::Pad) {} + +fn main() -> Result<(), Error> { + gst::init()?; + + let args = Args::parse(); + + let pipeline = gst::Pipeline::builder().build(); + let ws = gst::ElementFactory::make("whipserversrc").build()?; + ws.dynamic_cast_ref::() + .unwrap() + .set_child_property("signaller::host-addr", &args.host_addr); + + ws.set_property("enable-data-channel-navigation", true); + + let pipe = pipeline.clone(); + ws.connect_pad_added(move |_ws, pad| { + if pad.name().contains("video_") { + link_video(pad, &pipe); + } else if pad.name().contains("audio_") { + } else { + println!("unknown pad type {}", pad.name()); + } + }); + + let pipe = pipeline.clone(); + ws.connect_pad_removed(move |_ws, pad| { + if pad.name().contains("video_") { + unlink_video(pad, &pipe); + } else if pad.name().contains("audio_") { + } else { + println!("unknown pad type {}", pad.name()); + } + }); + pipeline.add(&ws)?; + pipeline.set_state(gst::State::Playing)?; + + let p = pipeline.clone(); + ctrlc::set_handler(move || { + p.set_state(gst::State::Null).unwrap(); + exit(0); + }) + .expect("Error setting Ctrl-C handler"); + + let bus = pipeline.bus().expect("Pipeline should have a bus"); + for msg in bus.iter_timed(gst::ClockTime::NONE) { + use gst::MessageView; + + match msg.view() { + MessageView::Eos(..) => { + println!("EOS"); + break; + } + MessageView::Error(err) => { + pipeline.set_state(gst::State::Null)?; + eprintln!( + "Got error from {}: {} ({})", + msg.src() + .map(|s| String::from(s.path_string())) + .unwrap_or_else(|| "None".into()), + err.error(), + err.debug().unwrap_or_else(|| "".into()), + ); + break; + } + _ => (), + } + } + + pipeline.set_state(gst::State::Null)?; + + Ok(()) +} diff --git a/net/webrtc/src/lib.rs b/net/webrtc/src/lib.rs index cb8e993b..0d716b31 100644 --- a/net/webrtc/src/lib.rs +++ b/net/webrtc/src/lib.rs @@ -24,6 +24,8 @@ pub mod signaller; pub mod utils; pub mod webrtcsink; pub mod webrtcsrc; +#[cfg(feature = "whep")] +mod whep_signaller; #[cfg(feature = "whip")] mod whip_signaller; diff --git a/net/webrtc/src/utils.rs b/net/webrtc/src/utils.rs index 1c806c91..dacb30f9 100644 --- a/net/webrtc/src/utils.rs +++ b/net/webrtc/src/utils.rs @@ -104,9 +104,9 @@ use crate::RUNTIME; use futures::future; use futures::prelude::*; use gst::ErrorMessage; -#[cfg(feature = "whip")] +#[cfg(any(feature = "whip", feature = "whep"))] use reqwest::header::HeaderMap; -#[cfg(feature = "whip")] +#[cfg(any(feature = "whip", feature = "whep"))] use reqwest::redirect::Policy; use std::sync::Mutex; use std::time::Duration; @@ -239,7 +239,7 @@ where res } -#[cfg(feature = "whip")] +#[cfg(any(feature = "whip", feature = "whep"))] pub fn parse_redirect_location( headermap: &HeaderMap, old_url: &reqwest::Url, @@ -280,13 +280,13 @@ pub fn parse_redirect_location( } } -#[cfg(feature = "whip")] +#[cfg(any(feature = "whip", feature = "whep"))] pub fn build_reqwest_client(pol: Policy) -> reqwest::Client { let client_builder = reqwest::Client::builder(); client_builder.redirect(pol).build().unwrap() } -#[cfg(feature = "whip")] +#[cfg(any(feature = "whip", feature = "whep"))] pub fn set_ice_servers( webrtcbin: &gst::Element, headermap: &HeaderMap, diff --git a/net/webrtc/src/webrtcsrc/imp.rs b/net/webrtc/src/webrtcsrc/imp.rs index 8a9edf3b..ca4e2ed6 100644 --- a/net/webrtc/src/webrtcsrc/imp.rs +++ b/net/webrtc/src/webrtcsrc/imp.rs @@ -11,7 +11,7 @@ use gst::subclass::prelude::*; use gst_webrtc::WebRTCDataChannel; use once_cell::sync::Lazy; use std::borrow::BorrowMut; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::atomic::AtomicU16; use std::sync::atomic::Ordering; @@ -44,8 +44,6 @@ struct Settings { #[derive(Default)] pub struct BaseWebRTCSrc { settings: Mutex, - n_video_pads: AtomicU16, - n_audio_pads: AtomicU16, state: Mutex, } @@ -294,34 +292,58 @@ struct SignallerSignals { request_meta: glib::SignalHandlerId, session_description: glib::SignalHandlerId, handle_ice: glib::SignalHandlerId, + session_requested: glib::SignalHandlerId, } -impl BaseWebRTCSrc { +impl Session { fn webrtcbin(&self) -> gst::Bin { - let state = self.state.lock().unwrap(); - let webrtcbin = state - .webrtcbin - .as_ref() - .expect("We should never call `.webrtcbin()` when state not > Ready") - .clone() - .downcast::() - .unwrap(); - - webrtcbin + self.webrtcbin.clone().downcast::().unwrap() } - fn signaller(&self) -> Signallable { - self.settings.lock().unwrap().signaller.clone() + fn new(session_id: &str) -> Result { + let webrtcbin = gst::ElementFactory::make("webrtcbin") + .property("bundle-policy", gst_webrtc::WebRTCBundlePolicy::MaxBundle) + .build() + .with_context(|| "Failed to make element webrtcbin".to_string())?; + + Ok(Self { + id: session_id.to_string(), + webrtcbin, + data_channel: None, + n_video_pads: AtomicU16::new(0), + n_audio_pads: AtomicU16::new(0), + flow_combiner: Mutex::new(gst_base::UniqueFlowCombiner::new()), + }) + } + + fn get_stream_id( + &self, + transceiver: Option, + mline: Option, + ) -> Option { + let mline = transceiver.map_or(mline, |t| Some(t.mlineindex())); + + // making a hash of the session ID and adding `:`, + // here the ID is the mline of the stream in the SDP. + mline.map(|mline| { + let mut cs = glib::Checksum::new(glib::ChecksumType::Sha256).unwrap(); + cs.update(self.id.as_bytes()); + format!("{}:{mline}", cs.string().unwrap()) + }) } // Maps the `webrtcbin` pad to our exposed source pad using the pad stream ID. - fn get_src_pad_from_webrtcbin_pad(&self, webrtcbin_src: &gst::Pad) -> Option { + fn get_src_pad_from_webrtcbin_pad( + &self, + webrtcbin_src: &gst::Pad, + element: &super::BaseWebRTCSrc, + ) -> Option { self.get_stream_id( Some(webrtcbin_src.property::("transceiver")), None, ) .and_then(|stream_id| { - self.obj().iterate_src_pads().into_iter().find_map(|s| { + element.iterate_src_pads().into_iter().find_map(|s| { let pad = s.ok()?.downcast::().unwrap(); if pad.imp().stream_id() == stream_id { Some(pad) @@ -332,61 +354,86 @@ impl BaseWebRTCSrc { }) } - fn send_navigation_event(&self, evt: gst_video::NavigationEvent) { - if let Some(data_channel) = &self.state.lock().unwrap().data_channel.borrow_mut() { + fn send_navigation_event( + &mut self, + evt: gst_video::NavigationEvent, + element: &super::BaseWebRTCSrc, + ) { + if let Some(data_channel) = &self.data_channel.borrow_mut() { let nav_event = NavigationEvent { mid: None, event: evt, }; match serde_json::to_string(&nav_event).ok() { Some(str) => { - gst::trace!(CAT, imp: self, "Sending navigation event to peer"); + gst::trace!(CAT, obj: element, "Sending navigation event to peer for session {}", self.id); data_channel.send_string(Some(str.as_str())); } None => { - gst::error!(CAT, imp: self, "Could not serialize navigation event"); + gst::error!(CAT, obj: element, "Could not serialize navigation event for session {}", self.id); } } } } - fn handle_webrtc_src_pad(&self, bin: &gst::Bin, pad: &gst::Pad) { - let srcpad = self.get_src_pad_from_webrtcbin_pad(pad); + // Creates a bin which contains the webrtcbin, encoded filter (if requested) plus parser + // and decoder (if needed) for every session + // + // The ghostpad of the session's bin will be the target pad of the webrtcsrc's srcpad + // corresponding to the session. + // + // The target pad for the session's bin ghostpad will be + // - the decoder's srcpad, if decoder is needed + // - otherwise, encoded filter's srcpad, if requested + // - otherwise, webrtcbin's src pad. + fn handle_webrtc_src_pad( + &self, + bin: &gst::Bin, + webrtcbin_pad: &gst::Pad, + element: &super::BaseWebRTCSrc, + ) { + let srcpad = self.get_src_pad_from_webrtcbin_pad(webrtcbin_pad, element); if let Some(ref srcpad) = srcpad { let stream_id = srcpad.imp().stream_id(); let mut builder = gst::event::StreamStart::builder(&stream_id); - if let Some(stream_start) = pad.sticky_event::(0) { + if let Some(stream_start) = webrtcbin_pad.sticky_event::(0) { builder = builder .seqnum(stream_start.seqnum()) .group_id(stream_start.group_id().unwrap_or_else(gst::GroupId::next)); } - gst::debug!(CAT, imp: self, "Storing id {stream_id} on {pad:?}"); - pad.store_sticky_event(&builder.build()).ok(); + gst::debug!(CAT, obj: element, "Storing id {stream_id} on {webrtcbin_pad:?} for session {}", self.id); + webrtcbin_pad.store_sticky_event(&builder.build()).ok(); } let ghostpad = gst::GhostPad::builder(gst::PadDirection::Src) - .with_target(pad) - .unwrap() - .proxy_pad_chain_function(glib::clone!(@weak self as this => @default-panic, move + .proxy_pad_chain_function(glib::clone!(@weak element, @strong self.id as sess_id => @default-panic, move |pad, parent, buffer| { let padret = gst::ProxyPad::chain_default(pad, parent, buffer); - let ret = this.state.lock().unwrap().flow_combiner.update_flow(padret); - - ret + let state = element.imp().state.lock().unwrap(); + let Some (session) = state.sessions.get(&sess_id) else { + gst::error!(CAT, obj: element , "session {sess_id:?} does not exist"); + return padret; + }; + let f = session.flow_combiner.lock().unwrap().update_flow(padret); + f } )) - .proxy_pad_event_function(glib::clone!(@weak self as this => @default-panic, move |pad, parent, event| { + .proxy_pad_event_function(glib::clone!(@weak element , @weak webrtcbin_pad as webrtcpad, @strong self.id as sess_id => @default-panic, move |pad, parent, event| { let event = if let gst::EventView::StreamStart(stream_start) = event.view() { - let webrtcpad = pad.peer().unwrap(); - - this.get_src_pad_from_webrtcbin_pad(&webrtcpad) + let state = element.imp().state.lock().unwrap(); + if let Some(session) = state.sessions.get(&sess_id) { + session.get_src_pad_from_webrtcbin_pad(&webrtcpad, &element) .map(|srcpad| { gst::event::StreamStart::builder(&srcpad.imp().stream_id()) .seqnum(stream_start.seqnum()) .group_id(stream_start.group_id().unwrap_or_else(gst::GroupId::next)) .build() }).unwrap_or(event) + } else { + gst::error!(CAT, obj: element , "session {sess_id:?} does not exist"); + event + } } else { event }; @@ -395,19 +442,26 @@ impl BaseWebRTCSrc { })) .build(); - if self.settings.lock().unwrap().enable_data_channel_navigation { - pad.add_probe( - gst::PadProbeType::EVENT_UPSTREAM, - glib::clone!(@weak self as this => @default-panic, move |_pad, info| { - let Some(ev) = info.event() else { - return gst::PadProbeReturn::Ok; - }; - if ev.type_() != gst::EventType::Navigation { - return gst::PadProbeReturn::Ok; - }; - - this.send_navigation_event (gst_video::NavigationEvent::parse(ev).unwrap()); - + let sess_id = self.id.clone(); + if element + .imp() + .settings + .lock() + .unwrap() + .enable_data_channel_navigation + { + webrtcbin_pad.add_probe(gst::PadProbeType::EVENT_UPSTREAM, + glib::clone!(@weak element => @default-panic, move |_pad, info| { + if let Some(gst::PadProbeData::Event(ref ev)) = info.data { + if let gst::EventView::Navigation(ev) = ev.view() { + let mut state = element.imp().state.lock().unwrap(); + if let Some(session) = state.sessions.get_mut(&sess_id) { + session.send_navigation_event (gst_video::NavigationEvent::parse(ev).unwrap(), &element); + } else { + gst::error!(CAT, obj: element , "session {sess_id:?} does not exist"); + } + } + } gst::PadProbeReturn::Ok }), ); @@ -417,12 +471,18 @@ impl BaseWebRTCSrc { .expect("Adding ghostpad to the bin should always work"); if let Some(srcpad) = srcpad { - let producer_id = self - .signaller() - .property::>("producer-peer-id") - .or_else(|| pad.property("msid")); + let signaller = element.imp().signaller(); + let producer_id = if signaller + .has_property("producer-peer-id", Some(Option::::static_type())) + { + signaller + .property::>("producer-peer-id") + .or_else(|| webrtcbin_pad.property("msid")) + } else { + Some(self.id.clone()) + }; - let encoded_filter = self.obj().emit_by_name::>( + let encoded_filter = element.emit_by_name::>( "request-encoded-filter", &[&producer_id, &srcpad.name(), &srcpad.allowed_caps()], ); @@ -431,19 +491,19 @@ impl BaseWebRTCSrc { let decodebin = gst::ElementFactory::make("decodebin3") .build() .expect("decodebin3 needs to be present!"); - self.obj().add(&decodebin).unwrap(); + bin.add(&decodebin).unwrap(); decodebin.sync_state_with_parent().unwrap(); decodebin.connect_pad_added( - glib::clone!(@weak self as this, @weak srcpad => move |_webrtcbin, pad| { + glib::clone!(@weak element as this, @weak ghostpad as ghostpad => move |_webrtcbin, pad| { if pad.direction() == gst::PadDirection::Sink { return; } - srcpad.set_target(Some(pad)).unwrap(); + ghostpad.set_target(Some(pad)).unwrap(); }), ); - gst::debug!(CAT, imp: self, "Decoding for {}", srcpad.imp().stream_id()); + gst::debug!(CAT, obj: element, "Decoding for {}", srcpad.imp().stream_id()); if let Some(encoded_filter) = encoded_filter { let filter_sink_pad = encoded_filter @@ -453,7 +513,7 @@ impl BaseWebRTCSrc { let parsebin = gst::ElementFactory::make("parsebin") .build() .expect("parsebin needs to be present!"); - self.obj().add_many([&parsebin, &encoded_filter]).unwrap(); + bin.add_many([&parsebin, &encoded_filter]).unwrap(); parsebin.connect_pad_added(move |_, pad| { pad.link(&filter_sink_pad) @@ -465,7 +525,7 @@ impl BaseWebRTCSrc { encoded_filter.sync_state_with_parent().unwrap(); }); - ghostpad + webrtcbin_pad .link(&parsebin.static_pad("sink").unwrap()) .expect("webrtcbin ! parsebin linking failed"); @@ -474,14 +534,14 @@ impl BaseWebRTCSrc { let sinkpad = decodebin .static_pad("sink") .expect("decodebin has a sink pad"); - ghostpad + webrtcbin_pad .link(&sinkpad) .expect("webrtcbin ! decodebin3 linking failed"); } } else { gst::debug!( CAT, - imp: self, + obj: element, "NO decoding for {}", srcpad.imp().stream_id() ); @@ -494,291 +554,241 @@ impl BaseWebRTCSrc { .static_pad("src") .expect("encoded filter must expose a static src pad"); - self.obj().add(&encoded_filter).unwrap(); + bin.add(&encoded_filter).unwrap(); - ghostpad + webrtcbin_pad .link(&filter_sink_pad) .expect("webrtcbin ! encoded_filter linking failed"); - srcpad.set_target(Some(&filter_src_pad)).unwrap(); encoded_filter.sync_state_with_parent().unwrap(); + ghostpad.set_target(Some(&filter_src_pad)).unwrap(); } else { - srcpad.set_target(Some(&ghostpad)).unwrap(); + // No decoder or filter + ghostpad.set_target(Some(webrtcbin_pad)).unwrap(); } } + srcpad.set_target(Some(&ghostpad)).unwrap(); } else { - gst::debug!(CAT, imp: self, "Unused webrtcbin pad {pad:?}"); + gst::debug!(CAT, obj: element, "Unused webrtcbin pad {webrtcbin_pad:?}"); } } - fn prepare(&self) -> Result<(), Error> { - let webrtcbin = gst::ElementFactory::make("webrtcbin") - .property("bundle-policy", gst_webrtc::WebRTCBundlePolicy::MaxBundle) - .build() - .with_context(|| "Failed to make element webrtcbin".to_string())?; + fn generate_offer(&self, element: &super::BaseWebRTCSrc) { + let sess_id = self.id.clone(); + let webrtcbin = self.webrtcbin(); + let direction = gst_webrtc::WebRTCRTPTransceiverDirection::Recvonly; - { - let settings = self.settings.lock().unwrap(); + let settings = element.imp().settings.lock().unwrap(); + let caps = settings + .video_codecs + .iter() + .chain(settings.audio_codecs.iter()) + .map(|codec| { + let name = &codec.name; - if let Some(stun_server) = settings.stun_server.as_ref() { - webrtcbin.set_property("stun-server", stun_server); - } + let (media, clock_rate, pt) = if codec.stream_type == gst::StreamType::AUDIO { + ("audio", 48000, 96) + } else { + //video stream type + ("video", 90000, 101) + }; - for turn_server in settings.turn_servers.iter() { - webrtcbin.emit_by_name::("add-turn-server", &[&turn_server]); - } + let mut caps = gst::Caps::new_empty(); + { + let caps = caps.get_mut().unwrap(); + let s = gst::Structure::builder("application/x-rtp") + .field("media", media) + .field("payload", pt) + .field("encoding-name", name.as_str()) + .field("clock-rate", clock_rate) + .build(); + caps.append_structure(s); + } + caps + }); + + for c in caps { + gst::info!( + CAT, + obj: element, + "Adding transceiver with caps: {c:#?}" + ); + let transceiver = webrtcbin.emit_by_name::( + "add-transceiver", + &[&direction, &c], + ); + + transceiver.set_property("do_nack", settings.do_retransmission); + transceiver.set_property("fec-type", gst_webrtc::WebRTCFECType::UlpRed); } - let bin = gst::Bin::new(); - bin.connect_pad_removed(glib::clone!(@weak self as this => move |_, pad| - this.state.lock().unwrap().flow_combiner.remove_pad(pad); - )); - bin.connect_pad_added(glib::clone!(@weak self as this => move |_, pad| - this.state.lock().unwrap().flow_combiner.add_pad(pad); - )); - webrtcbin.connect_pad_added( - glib::clone!(@weak self as this, @weak bin, => move |_webrtcbin, pad| { - if pad.direction() == gst::PadDirection::Sink { + let webrtcbin_weak = webrtcbin.downgrade(); + let promise = gst::Promise::with_change_func( + glib::clone!(@weak element as ele => move |reply| { + + let Some(webrtcbin) = webrtcbin_weak.upgrade() else { return; + }; + + let reply = match reply { + Ok(Some(reply)) => reply, + Ok(None) => { + gst::error!(CAT, obj: ele, "generate offer::Promise returned with no reply"); + return; + } + Err(e) => { + gst::error!(CAT, obj: ele, "generate offer::Promise returned with error {:?}", e); + return; + } + }; + + if let Ok(offer_sdp) = reply + .value("offer") + .map(|offer| offer.get::().unwrap()) + { + gst::debug!( + CAT, + obj: ele, + "Setting local description: {}", + offer_sdp.sdp().to_string() + ); + + webrtcbin.emit_by_name::<()>( + "set-local-description", + &[&offer_sdp, &None::], + ); + + gst::log!(CAT, obj: ele, "Sending SDP, {}", offer_sdp.sdp().to_string()); + let signaller = ele.imp().signaller(); + signaller.send_sdp(sess_id.as_str(), &offer_sdp); + } else { + let error = reply + .value("error") + .expect("structure must have an error value") + .get::() + .expect("value must be a GLib error"); + + gst::error!(CAT, obj: ele, "generate offer::Promise returned with error: {}", error); } - - this.handle_webrtc_src_pad(&bin, pad); }), ); - webrtcbin.connect_closure( - "on-ice-candidate", - false, - glib::closure!(@weak-allow-none self as this => move | - _webrtcbin: gst::Bin, - sdp_m_line_index: u32, - candidate: String| { - this.unwrap().on_ice_candidate(sdp_m_line_index, candidate); - }), - ); - - webrtcbin.connect_closure( - "on-data-channel", - false, - glib::closure!(@weak-allow-none self as this => move | - _webrtcbin: gst::Bin, - data_channel: glib::Object| { - this.unwrap().on_data_channel(data_channel); - }), - ); - - self.signaller() - .emit_by_name::<()>("webrtcbin-ready", &[&"none", &webrtcbin]); - - bin.add(&webrtcbin).unwrap(); - self.obj().add(&bin).context("Could not add `webrtcbin`")?; - - let mut state = self.state.lock().unwrap(); - state.webrtcbin.replace(webrtcbin); - - Ok(()) + webrtcbin + .clone() + .emit_by_name::<()>("create-offer", &[&None::, &promise]); } - fn get_stream_id( + fn handle_answer( &self, - transceiver: Option, - mline: Option, - ) -> Option { - let mline = transceiver.map_or(mline, |t| Some(t.mlineindex())); + answer: &gst_webrtc::WebRTCSessionDescription, + element: &super::BaseWebRTCSrc, + ) { + //FIXME: refactor the common parts of this function and handle_offer() + gst::debug!( + CAT, + obj: element, + "Setting remote description: {}", + answer.sdp().to_string() + ); - // Same logic as gst_pad_create_stream_id and friends, making a hash of - // the URI (session id, if URI doesn't exist) and adding `:`, here the ID is the mline of the - // stream in the SDP. - mline.map(|mline| { - let mut cs = glib::Checksum::new(glib::ChecksumType::Sha256).unwrap(); - - let data: String = if self - .signaller() - .has_property("uri", Some(String::static_type())) - { - self.signaller().property::>("uri").unwrap() - } else { - // use the session id - self.state.lock().unwrap().session_id.clone().unwrap() + let webrtcbin = self.webrtcbin(); + for (i, media) in answer.sdp().medias().enumerate() { + let codec_names = { + let settings = element.imp().settings.lock().unwrap(); + settings + .video_codecs + .iter() + .chain(settings.audio_codecs.iter()) + .map(|codec| codec.name.clone()) + .collect::>() }; + let caps = media + .formats() + .filter_map(|format| { + format.parse::().ok().and_then(|pt| { + let mut mediacaps = media.caps_from_media(pt)?; + let s = mediacaps.structure(0).unwrap(); + if !codec_names.contains(s.get::<&str>("encoding-name").ok()?) { + return None; + } - cs.update(data.as_bytes()); + // filter the remote media whose direction is not sendonly + media.attribute_val("sendonly")?; - format!("{}:{mline}", cs.string().unwrap()) - }) - } + let mut filtered_s = gst::Structure::new_empty("application/x-rtp"); + filtered_s.extend(s.iter().filter_map(|(key, value)| { + if key.starts_with("rtcp-") { + None + } else { + Some((key, value.to_owned())) + } + })); - fn unprepare(&self) -> Result<(), Error> { - gst::info!(CAT, imp: self, "unpreparing"); + if media + .attributes_to_caps(mediacaps.get_mut().unwrap()) + .is_err() + { + gst::warning!( + CAT, + obj: element, + "Failed to retrieve attributes from media!" + ); + return None; + } - let obj = self.obj(); - self.maybe_stop_signaller(); - self.state.lock().unwrap().session_id = None; - for pad in obj.src_pads() { - obj.remove_pad(&pad) - .map_err(|err| anyhow::anyhow!("Couldn't remove pad? {err:?}"))?; - } + let s = mediacaps.structure(0).unwrap(); - self.n_video_pads.store(0, Ordering::SeqCst); - self.n_audio_pads.store(0, Ordering::SeqCst); + filtered_s.extend(s.iter().filter_map(|(key, value)| { + if key.starts_with("extmap-") { + return Some((key, value.to_owned())); + } - Ok(()) - } + None + })); - fn connect_signaller(&self, signaller: &Signallable) { - let instance = &*self.obj(); + Some(filtered_s) + }) + }) + .collect::(); - let _ = self - .state - .lock() - .unwrap() - .signaller_signals - .insert(SignallerSignals { - error: signaller.connect_closure( - "error", - false, - glib::closure!(@watch instance => move | - _signaller: glib::Object, error: String| { - gst::element_error!( - instance, - gst::StreamError::Failed, - ["Signalling error: {}", error] + if !caps.is_empty() { + let stream_id = self.get_stream_id(None, Some(i as u32)).unwrap(); + if !element + .imp() + .create_and_probe_src_pad(&caps, &stream_id, self) + { + gst::error!( + CAT, + obj: element, + "Failed to create src pad with caps {:?}", + caps ); - }), - ), - - session_started: signaller.connect_closure( - "session-started", - false, - glib::closure!(@watch instance => move | - _signaller: glib::Object, - session_id: &str, - _peer_id: &str| { - let imp = instance.imp(); - gst::info!(CAT, imp: imp, "Session started: {session_id}"); - imp.state.lock().unwrap().session_id = - Some(session_id.to_string()); - }), - ), - - session_ended: signaller.connect_closure( - "session-ended", - false, - glib::closure!(@watch instance => move |_signaler: glib::Object, _session_id: &str|{ - instance.imp().state.lock().unwrap().session_id = None; - instance.iterate_src_pads().into_iter().for_each(|pad| - { if let Err(e) = pad.map(|pad| pad.push_event(gst::event::Eos::new())) { - gst::error!(CAT, "Could not send EOS: {e:?}"); - }} - ); - - false - }), - ), - - request_meta: signaller.connect_closure( - "request-meta", - false, - glib::closure!(@watch instance => move | - _signaller: glib::Object| -> Option { - instance.imp().settings.lock().unwrap().meta.clone() - }), - ), - - session_description: signaller.connect_closure( - "session-description", - false, - glib::closure!(@watch instance => move | - _signaller: glib::Object, - _peer_id: &str, - desc: &gst_webrtc::WebRTCSessionDescription| { - assert_eq!(desc.type_(), gst_webrtc::WebRTCSDPType::Offer); - - instance.imp().handle_offer(desc); - }), - ), - - // sdp_mid is exposed for future proofing, see - // https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/issues/1174, - // at the moment sdp_m_line_index must be Some - handle_ice: signaller.connect_closure( - "handle-ice", - false, - glib::closure!(@watch instance => move | - _signaller: glib::Object, - peer_id: &str, - sdp_m_line_index: u32, - _sdp_mid: Option, - candidate: &str| { - instance.imp().handle_ice(peer_id, Some(sdp_m_line_index), None, candidate); - }), - ), - }); - - // previous signals are disconnected when dropping the old structure - } - - // Creates and adds our `WebRTCSrcPad` source pad, returning caps accepted - // downstream - fn create_and_probe_src_pad(&self, caps: &gst::Caps, stream_id: &str) -> bool { - gst::log!(CAT, "Creating pad for {caps:?}, stream: {stream_id}"); - - let obj = self.obj(); - let media_type = caps - .structure(0) - .expect("Passing empty caps is invalid") - .get::<&str>("media") - .expect("Only caps with a `media` field are expected when creating the pad"); - - let (template, name, raw_caps) = if media_type == "video" { - ( - obj.pad_template("video_%u").unwrap(), - format!("video_{}", self.n_video_pads.fetch_add(1, Ordering::SeqCst)), - VIDEO_CAPS.to_owned(), - ) - } else if media_type == "audio" { - ( - obj.pad_template("audio_%u").unwrap(), - format!("audio_{}", self.n_audio_pads.fetch_add(1, Ordering::SeqCst)), - AUDIO_CAPS.to_owned(), - ) - } else { - gst::info!(CAT, imp: self, "Not an audio or video media {media_type:?}"); - - return false; - }; - - let caps_with_raw = [caps.clone(), raw_caps.clone()] - .into_iter() - .collect::(); - let ghost = gst::GhostPad::builder_from_template(&template) - .name(name) - .build() - .downcast::() - .unwrap(); - ghost.imp().set_stream_id(stream_id); - obj.add_pad(&ghost) - .expect("Adding ghost pad should never fail"); - - let downstream_caps = ghost.peer_query_caps(Some(&caps_with_raw)); - if let Some(first_struct) = downstream_caps.structure(0) { - if first_struct.has_name(raw_caps.structure(0).unwrap().name()) { - ghost.imp().set_needs_decoding(true) + } + } else { + gst::info!( + CAT, + obj: element, + "Not using media: {media:#?} as it doesn't match our codec restrictions" + ); } } - true + webrtcbin.emit_by_name::<()>("set-remote-description", &[&answer, &None::]); } - fn handle_offer(&self, offer: &gst_webrtc::WebRTCSessionDescription) { - gst::log!(CAT, imp: self, "Got offer {}", offer.sdp().to_string()); + fn handle_offer( + &self, + offer: &gst_webrtc::WebRTCSessionDescription, + element: &super::BaseWebRTCSrc, + ) -> (gst::Promise, gst::Bin) { + gst::log!(CAT, obj: element, "Got offer {}", offer.sdp().to_string()); let sdp = offer.sdp(); let direction = gst_webrtc::WebRTCRTPTransceiverDirection::Recvonly; let webrtcbin = self.webrtcbin(); for (i, media) in sdp.medias().enumerate() { let (codec_names, do_retransmission) = { - let settings = self.settings.lock().unwrap(); + let settings = element.imp().settings.lock().unwrap(); ( settings .video_codecs @@ -814,7 +824,7 @@ impl BaseWebRTCSrc { { gst::warning!( CAT, - imp: self, + obj: element, "Failed to retrieve attributes from media!" ); return None; @@ -837,10 +847,13 @@ impl BaseWebRTCSrc { if !caps.is_empty() { let stream_id = self.get_stream_id(None, Some(i as u32)).unwrap(); - if self.create_and_probe_src_pad(&caps, &stream_id) { + if element + .imp() + .create_and_probe_src_pad(&caps, &stream_id, self) + { gst::info!( CAT, - imp: self, + obj: element, "Adding transceiver for {stream_id} with caps: {caps:#?}" ); let transceiver = webrtcbin.emit_by_name::( @@ -848,12 +861,13 @@ impl BaseWebRTCSrc { &[&direction, &caps], ); - transceiver.set_property("do-nack", do_retransmission); + transceiver.set_property("do_nack", do_retransmission); transceiver.set_property("fec-type", gst_webrtc::WebRTCFECType::UlpRed); } } else { gst::info!( CAT, + obj: element, "Not using media: {media:#?} as it doesn't match our codec restrictions" ); } @@ -861,19 +875,36 @@ impl BaseWebRTCSrc { webrtcbin.emit_by_name::<()>("set-remote-description", &[&offer, &None::]); - let obj = self.obj(); - obj.no_more_pads(); + gst::info!(CAT, obj: element, "Set remote description"); + let obj = element.clone(); + + let session_id = self.id.clone(); let promise = - gst::Promise::with_change_func(glib::clone!(@weak self as this => move |reply| { - this.on_answer_created(reply); + gst::Promise::with_change_func(glib::clone!(@weak element as ele => move |reply| { + let state = ele.imp().state.lock().unwrap(); + gst::info!(CAT, obj: ele, "got answer for session {session_id:?}"); + let Some(session) = state.sessions.get(&session_id) else { + gst::error!(CAT, obj: ele , "no session {session_id:?}"); + return + }; + session.on_answer_created(reply, &obj); } )); - webrtcbin.emit_by_name::<()>("create-answer", &[&None::, &promise]); + // We cannot emit `create-answer` from here. The promise function + // of the answer needs the state lock which is held by the caller + // of `handle_offer`. So return the promise to the caller so that + // the it can drop the `state` and safely emit `create-answer` + + (promise, webrtcbin.clone()) } - fn on_answer_created(&self, reply: Result, gst::PromiseError>) { + fn on_answer_created( + &self, + reply: Result, gst::PromiseError>, + element: &super::BaseWebRTCSrc, + ) { let reply = match reply { Ok(Some(reply)) => { if !reply.has_field_with_type( @@ -881,14 +912,14 @@ impl BaseWebRTCSrc { gst_webrtc::WebRTCSessionDescription::static_type(), ) { gst::element_error!( - self.obj(), + element, gst::StreamError::Failed, ["create-answer::Promise returned with no reply"] ); return; } else if reply.has_field_with_type("error", glib::Error::static_type()) { gst::element_error!( - self.obj(), + element, gst::LibraryError::Failed, ["create-offer::Promise returned with error: {:?}", reply] ); @@ -899,7 +930,7 @@ impl BaseWebRTCSrc { } Ok(None) => { gst::element_error!( - self.obj(), + element, gst::StreamError::Failed, ["create-answer::Promise returned with no reply"] ); @@ -908,7 +939,7 @@ impl BaseWebRTCSrc { } Err(err) => { gst::element_error!( - self.obj(), + element, gst::LibraryError::Failed, ["create-answer::Promise returned with error {:?}", err] ); @@ -923,83 +954,291 @@ impl BaseWebRTCSrc { .get::() .expect("Invalid argument"); - self.webrtcbin() + self.webrtcbin .emit_by_name::<()>("set-local-description", &[&answer, &None::]); - let session_id = { - let state = self.state.lock().unwrap(); - match &state.session_id { - Some(id) => id.to_owned(), - _ => { - gst::element_error!( - self.obj(), - gst::StreamError::Failed, - ["Signalling error, no session started while requesting to send an SDP offer"] - ); - - return; - } - } - }; - - gst::log!(CAT, imp: self, "Sending SDP, {}", answer.sdp().to_string()); - let signaller = self.signaller(); - signaller.send_sdp(&session_id, &answer); + gst::log!(CAT, obj: element, "Sending SDP, {}", answer.sdp().to_string()); + let signaller = element.imp().signaller(); + signaller.send_sdp(&self.id, &answer); } - fn on_data_channel(&self, data_channel: glib::Object) { - gst::info!(CAT, imp: self, "Received data channel {data_channel:?}"); - let mut state = self.state.lock().unwrap(); - state.data_channel = data_channel.dynamic_cast::().ok(); + fn on_data_channel(&mut self, data_channel: glib::Object, element: &super::BaseWebRTCSrc) { + gst::info!(CAT, obj: element, "Received data channel {data_channel:?}"); + self.data_channel = data_channel.dynamic_cast::().ok(); } - fn on_ice_candidate(&self, sdp_m_line_index: u32, candidate: String) { - let signaller = self.signaller(); - let session_id = match self.state.lock().unwrap().session_id.as_ref() { - Some(id) => id.to_string(), - _ => { - gst::element_error!( - self.obj(), - gst::StreamError::Failed, - ["Signalling error, no session started while requesting to propose ice candidates"] - ); - - return; - } - }; - signaller.add_ice(&session_id, &candidate, sdp_m_line_index, None::); + fn on_ice_candidate( + &self, + sdp_m_line_index: u32, + candidate: String, + element: &super::BaseWebRTCSrc, + ) { + let signaller = element.imp().signaller(); + signaller.add_ice(&self.id, &candidate, sdp_m_line_index, None::); } /// Called by the signaller with an ice candidate fn handle_ice( &self, - peer_id: &str, sdp_m_line_index: Option, _sdp_mid: Option, candidate: &str, + element: &super::BaseWebRTCSrc, ) { let sdp_m_line_index = match sdp_m_line_index { Some(m_line) => m_line, None => { - gst::error!(CAT, imp: self, "No mandatory mline"); + gst::error!(CAT, obj: element, "No mandatory mline"); return; } }; - gst::log!(CAT, imp: self, "Got ice from {peer_id}: {candidate}"); + gst::log!(CAT, obj: element, "Got ice candidate for {}: {candidate}", self.id); self.webrtcbin() .emit_by_name::<()>("add-ice-candidate", &[&sdp_m_line_index, &candidate]); } +} + +impl BaseWebRTCSrc { + fn signaller(&self) -> Signallable { + self.settings.lock().unwrap().signaller.clone() + } + + fn unprepare(&self) -> Result<(), Error> { + gst::info!(CAT, imp: self, "unpreparing"); + + let mut state = self.state.lock().unwrap(); + for (_, s) in state.sessions.iter() { + if let Err(e) = self.end_session(s) { + gst::error!(CAT, imp: self , "Error ending session : {e}"); + } + } + state.sessions.clear(); + drop(state); + + self.maybe_stop_signaller(); + + Ok(()) + } + + fn connect_signaller(&self, signaller: &Signallable) { + let instance = &*self.obj(); + + let _ = self + .state + .lock() + .unwrap() + .signaller_signals + .insert(SignallerSignals { + error: signaller.connect_closure( + "error", + false, + glib::closure!(@watch instance => move | + _signaller: glib::Object, error: String| { + gst::element_error!( + instance, + gst::StreamError::Failed, + ["Signalling error: {}", error] + ); + }), + ), + + session_started: signaller.connect_closure( + "session-started", + false, + glib::closure!(@watch instance => move | + _signaller: glib::Object, + session_id: &str, + _peer_id: &str| { + let imp = instance.imp(); + gst::info!(CAT, imp: imp, "Session started: {session_id}"); + let _ = imp.start_session(session_id); + }), + ), + + session_ended: signaller.connect_closure( + "session-ended", + false, + glib::closure!(@watch instance => move |_signaler: glib::Object, session_id: &str|{ + let this = instance.imp(); + let mut state = this.state.lock().unwrap(); + let Some(session) = state.sessions.get(session_id) else { + gst::error!(CAT, imp: this , " Failed to find session {session_id}"); + return false; + }; + if let Err(e) = this.end_session(session) { + gst::error!(CAT, imp: this , " Failed to end session {session_id}: {e}"); + return false; + } + state.sessions.remove(session_id); + true + }), + ), + + request_meta: signaller.connect_closure( + "request-meta", + false, + glib::closure!(@watch instance => move | + _signaller: glib::Object| -> Option { + instance.imp().settings.lock().unwrap().meta.clone() + }), + ), + + session_description: signaller.connect_closure( + "session-description", + false, + glib::closure!(@watch instance => move | + _signaller: glib::Object, + session_id: &str, + desc: &gst_webrtc::WebRTCSessionDescription| { + match desc.type_() { + gst_webrtc::WebRTCSDPType::Offer =>{ + let this = instance.imp(); + gst::info!(CAT, imp: this, "got sdp offer"); + let state = this.state.lock().unwrap(); + let Some(session) = state.sessions.get(session_id) else { + gst::error!(CAT, imp: this, "session {session_id:?} not found"); + return + }; + + let (promise, webrtcbin) = session.handle_offer(desc, &this.obj()); + drop (state); + webrtcbin.emit_by_name::<()>("create-answer", &[&None::, &promise]); + }, + + gst_webrtc::WebRTCSDPType::Answer => { + let this = instance.imp(); + gst::info!(CAT, imp: this, "got sdp answer"); + let state = this.state.lock().unwrap(); + let Some(session) = state.sessions.get(session_id) else { + gst::error!(CAT, imp: this, "session {session_id:?} not found"); + return + }; + session.handle_answer(desc, &this.obj()); + }, + _ => {}, + } + }), + ), + + session_requested: signaller.connect_closure( + "session-requested", + false, + glib::closure!(@watch instance => move |_signaler: glib::Object, session_id: &str, _peer_id: &str, offer: Option<&gst_webrtc::WebRTCSessionDescription>|{ + if offer.is_none() { + let this = instance.imp(); + let state = this.state.lock().unwrap(); + let Some(session) = state.sessions.get(session_id) else { + gst::error!(CAT, imp: this, "session {session_id:?} not found"); + return + }; + session.generate_offer(&this.obj()); + } + }), + ), + + // sdp_mid is exposed for future proofing, see + // https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/issues/1174, + // at the moment sdp_m_line_index must be Some + handle_ice: signaller.connect_closure( + "handle-ice", + false, + glib::closure!(@watch instance => move | + _signaller: glib::Object, + session_id: &str, + sdp_m_line_index: u32, + _sdp_mid: Option, + candidate: &str| { + let this = instance.imp(); + let state = this.state.lock().unwrap(); + let Some(session) = state.sessions.get(session_id) else { + gst::error!(CAT, imp: this, "session {session_id:?} not found"); + return + }; + session.handle_ice(Some(sdp_m_line_index), None, candidate, &this.obj()); + }), + ), + }); + + // previous signals are disconnected when dropping the old structure + } + + // Creates and adds our `WebRTCSrcPad` source pad, returning caps accepted + // downstream + fn create_and_probe_src_pad( + &self, + caps: &gst::Caps, + stream_id: &str, + session: &Session, + ) -> bool { + gst::log!(CAT, "Creating pad for {caps:?}, stream: {stream_id}"); + + let obj = self.obj(); + let media_type = caps + .structure(0) + .expect("Passing empty caps is invalid") + .get::<&str>("media") + .expect("Only caps with a `media` field are expected when creating the pad"); + + let (template, name, raw_caps) = if media_type == "video" { + ( + obj.pad_template("video_%u").unwrap(), + format!( + "video_{}_{}", + session.id, + session.n_video_pads.fetch_add(1, Ordering::SeqCst) + ), + VIDEO_CAPS.to_owned(), + ) + } else if media_type == "audio" { + ( + obj.pad_template("audio_%u").unwrap(), + format!( + "audio_{}_{}", + session.id, + session.n_audio_pads.fetch_add(1, Ordering::SeqCst) + ), + AUDIO_CAPS.to_owned(), + ) + } else { + gst::info!(CAT, imp: self, "Not an audio or video media {media_type:?}"); + + return false; + }; + + let caps_with_raw = [caps.clone(), raw_caps.clone()] + .into_iter() + .collect::(); + let ghost = gst::GhostPad::builder_from_template(&template) + .name(name) + .build() + .downcast::() + .unwrap(); + ghost.imp().set_stream_id(stream_id); + obj.add_pad(&ghost) + .expect("Adding ghost pad should never fail"); + + let downstream_caps = ghost.peer_query_caps(Some(&caps_with_raw)); + if let Some(first_struct) = downstream_caps.structure(0) { + if first_struct.has_name(raw_caps.structure(0).unwrap().name()) { + ghost.imp().set_needs_decoding(true) + } + } + + true + } fn maybe_start_signaller(&self) { let obj = self.obj(); - let mut state = self.state.lock().unwrap(); + let state = self.state.lock().unwrap(); if state.signaller_state == SignallerState::Stopped && obj.current_state() >= gst::State::Paused { + drop(state); self.signaller().start(); gst::info!(CAT, imp: self, "Started signaller"); + let mut state = self.state.lock().unwrap(); state.signaller_state = SignallerState::Started; } } @@ -1022,6 +1261,133 @@ impl BaseWebRTCSrc { Ok(()) } + + fn start_session(&self, session_id: &str) -> Result<(), Error> { + let state = self.state.lock().unwrap(); + if state.sessions.get(&session_id.to_string()).is_some() { + return Err(anyhow::anyhow!( + "session with id {session_id} already exists" + )); + }; + drop(state); + + let session = Session::new(session_id)?; + + let webrtcbin = session.webrtcbin(); + + { + let settings = self.settings.lock().unwrap(); + + if let Some(stun_server) = settings.stun_server.as_ref() { + webrtcbin.set_property("stun-server", stun_server); + } + + for turn_server in settings.turn_servers.iter() { + webrtcbin.emit_by_name::("add-turn-server", &[&turn_server]); + } + } + + let bin = gst::Bin::new(); + + webrtcbin.connect_pad_added( + glib::clone!(@weak self as this, @weak bin, @to-owned session_id => move |_webrtcbin, pad| { + if pad.direction() == gst::PadDirection::Sink { + return; + } + let mut state = this.state.lock().unwrap(); + let Some(session) = state.sessions.get_mut(&session_id) else { + gst::error!(CAT, imp: this, "session {session_id:?} not found"); + return + }; + session.flow_combiner.lock().unwrap().add_pad(pad); + session.handle_webrtc_src_pad(&bin, pad, &this.obj()) + }), + ); + + webrtcbin.connect_pad_removed( + glib::clone!(@weak self as this, @weak bin, @to-owned session_id => move |_webrtcbin, pad| { + let mut state = this.state.lock().unwrap(); + let Some(session) = state.sessions.get_mut(&session_id) else { + gst::error!(CAT, imp: this, "session {session_id:?} not found"); + return + }; + session.flow_combiner.lock().unwrap().remove_pad(pad); + }), + ); + + webrtcbin.connect_closure( + "on-ice-candidate", + false, + glib::closure!(@weak-allow-none self as this, @to-owned session_id => move | + _webrtcbin: gst::Bin, + sdp_m_line_index: u32, + candidate: String| { + if let Some(ele) = this { + let mut state = ele.state.lock().unwrap(); + let Some(session) = state.sessions.get_mut(&session_id) else { + gst::error!(CAT, imp: ele, "session {session_id:?} not found"); + return + }; + session.on_ice_candidate(sdp_m_line_index, candidate, &ele.obj()); + } + }), + ); + + webrtcbin.connect_closure( + "on-data-channel", + false, + glib::closure!(@weak-allow-none self as this, @to-owned session_id => move | + _webrtcbin: gst::Bin, + data_channel: glib::Object| { + if let Some(ele) = this { + let mut state = ele.state.lock().unwrap(); + + let Some(session) = state.sessions.get_mut(&session_id) else { + gst::error!(CAT, imp: ele, "session {session_id:?} not found"); + return + }; + session.on_data_channel(data_channel, &ele.obj()); + } + }), + ); + + bin.add(&webrtcbin).unwrap(); + self.obj().add(&bin).context("Could not add `webrtcbin`")?; + bin.sync_state_with_parent().unwrap(); + + self.signaller() + .emit_by_name::<()>("webrtcbin-ready", &[&session_id, &webrtcbin]); + + let mut state = self.state.lock().unwrap(); + state.sessions.insert(session_id.to_string(), session); + + Ok(()) + } + + fn end_session(&self, session: &Session) -> Result<(), Error> { + let obj = self.obj(); + let bin = session + .webrtcbin + .parent() + .and_downcast::() + .unwrap(); + + // set the session's bin to Null and remove it + bin.set_state(gst::State::Null)?; + obj.remove(&bin)?; + + for pad in obj.src_pads() { + if pad.name().contains(session.id.as_str()) { + if !pad.push_event(gst::event::Eos::new()) { + gst::error!(CAT, imp: self, "failed to send EOS on {}", pad.name()); + } + obj.remove_pad(&pad) + .map_err(|err| anyhow::anyhow!("Couldn't remove pad? {err:?}"))?; + } + } + self.signaller().end_session(session.id.as_str()); + Ok(()) + } } impl ElementImpl for BaseWebRTCSrc { @@ -1073,16 +1439,6 @@ impl ElementImpl for BaseWebRTCSrc { transition: gst::StateChange, ) -> Result { let obj = &*self.obj(); - if let gst::StateChange::NullToReady = transition { - if let Err(err) = self.prepare() { - gst::element_error!( - obj, - gst::StreamError::Failed, - ["Failed to prepare: {}", err] - ); - return Err(gst::StateChangeError); - } - } let mut ret = self.parent_change_state(transition); @@ -1111,16 +1467,6 @@ impl ElementImpl for BaseWebRTCSrc { ret } - - fn send_event(&self, event: gst::Event) -> bool { - match event.view() { - gst::EventView::Navigation(ev) => { - self.send_navigation_event(gst_video::NavigationEvent::parse(ev).unwrap()); - true - } - _ => true, - } - } } impl GstObjectImpl for BaseWebRTCSrc {} @@ -1157,24 +1503,26 @@ enum SignallerState { Stopped, } -struct State { - session_id: Option, - signaller_state: SignallerState, - webrtcbin: Option, - flow_combiner: gst_base::UniqueFlowCombiner, - signaller_signals: Option, +struct Session { + id: String, + webrtcbin: gst::Element, data_channel: Option, + n_video_pads: AtomicU16, + n_audio_pads: AtomicU16, + flow_combiner: Mutex, +} +struct State { + sessions: HashMap, + signaller_state: SignallerState, + signaller_signals: Option, } impl Default for State { fn default() -> Self { Self { signaller_state: SignallerState::Stopped, - session_id: None, - webrtcbin: None, - flow_combiner: Default::default(), + sessions: HashMap::new(), signaller_signals: Default::default(), - data_channel: None, } } } @@ -1358,3 +1706,57 @@ pub(super) mod livekit { type ParentType = crate::webrtcsrc::BaseWebRTCSrc; } } + +#[cfg(feature = "whep")] +pub(super) mod whep { + use super::*; + use crate::whep_signaller::WhepClientSignaller; + + #[derive(Default)] + pub struct WhepClientSrc {} + + impl ObjectImpl for WhepClientSrc { + fn constructed(&self) { + self.parent_constructed(); + let element = self.obj(); + let ws = element + .upcast_ref::() + .imp(); + + let _ = ws.set_signaller(WhepClientSignaller::default().upcast()); + + let obj = &*self.obj(); + + obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE); + obj.set_element_flags(gst::ElementFlags::SOURCE); + } + } + + impl GstObjectImpl for WhepClientSrc {} + + impl BinImpl for WhepClientSrc {} + + impl ElementImpl for WhepClientSrc { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "WhepClientSrc", + "Source/Network/WebRTC", + "WebRTC source element using WHEP Client as the signaller", + "Sanchayan Maity ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + } + + impl BaseWebRTCSrcImpl for WhepClientSrc {} + + #[glib::object_subclass] + impl ObjectSubclass for WhepClientSrc { + const NAME: &'static str = "GstWhepClientSrc"; + type Type = crate::webrtcsrc::WhepClientSrc; + type ParentType = crate::webrtcsrc::BaseWebRTCSrc; + } +} diff --git a/net/webrtc/src/webrtcsrc/mod.rs b/net/webrtc/src/webrtcsrc/mod.rs index fe1b3e50..d6fabad7 100644 --- a/net/webrtc/src/webrtcsrc/mod.rs +++ b/net/webrtc/src/webrtcsrc/mod.rs @@ -59,6 +59,11 @@ glib::wrapper! { pub struct LiveKitWebRTCSrc(ObjectSubclass) @extends BaseWebRTCSrc, gst::Bin, gst::Element, gst::Object, gst::ChildProxy; } +#[cfg(feature = "whep")] +glib::wrapper! { + pub struct WhepClientSrc(ObjectSubclass) @extends BaseWebRTCSrc, gst::Bin, gst::Element, gst::Object, @implements gst::URIHandler, gst::ChildProxy; +} + glib::wrapper! { pub struct WebRTCSrcPad(ObjectSubclass) @extends gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object; } @@ -139,5 +144,13 @@ pub fn register(plugin: Option<&gst::Plugin>) -> Result<(), glib::BoolError> { LiveKitWebRTCSrc::static_type(), )?; + #[cfg(feature = "whep")] + gst::Element::register( + plugin, + "whepclientsrc", + gst::Rank::PRIMARY, + WhepClientSrc::static_type(), + )?; + Ok(()) } diff --git a/net/webrtc/src/whep_signaller/client.rs b/net/webrtc/src/whep_signaller/client.rs new file mode 100644 index 00000000..7769f708 --- /dev/null +++ b/net/webrtc/src/whep_signaller/client.rs @@ -0,0 +1,600 @@ +// Copyright (C) 2022, Asymptotic Inc. +// Author: Sanchayan Maity +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use crate::signaller::{Signallable, SignallableImpl}; +use crate::utils::{ + build_reqwest_client, parse_redirect_location, set_ice_servers, wait, wait_async, WaitError, +}; +use crate::RUNTIME; +use async_recursion::async_recursion; +use bytes::Bytes; +use futures::future; +use gst::glib::RustClosure; +use gst::{glib, prelude::*, subclass::prelude::*}; +use gst_sdp::*; +use gst_webrtc::*; +use once_cell::sync::Lazy; +use reqwest::header::{HeaderMap, HeaderValue}; +use reqwest::StatusCode; +use std::sync::Mutex; + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "whep-client-signaller", + gst::DebugColorFlags::empty(), + Some("WHEP Client Signaller"), + ) +}); + +const MAX_REDIRECTS: u8 = 10; +const DEFAULT_TIMEOUT: u32 = 15; +const SESSION_ID: &str = "whep-client"; + +#[derive(Debug, Clone)] +struct Settings { + whep_endpoint: Option, + auth_token: Option, + use_link_headers: bool, + timeout: u32, +} + +#[allow(clippy::derivable_impls)] +impl Default for Settings { + fn default() -> Self { + Self { + whep_endpoint: None, + auth_token: None, + use_link_headers: false, + timeout: DEFAULT_TIMEOUT, + } + } +} + +#[derive(Debug)] +enum State { + Stopped, + Post { redirects: u8 }, + Running { whep_resource: String }, +} + +impl Default for State { + fn default() -> Self { + Self::Stopped + } +} + +pub struct WhepClient { + settings: Mutex, + state: Mutex, + canceller: Mutex>, + client: reqwest::Client, +} + +impl Default for WhepClient { + fn default() -> Self { + // We'll handle redirects manually since the default redirect handler does not + // reuse the authentication token on the redirected server + let pol = reqwest::redirect::Policy::none(); + let client = build_reqwest_client(pol); + + Self { + settings: Mutex::new(Settings::default()), + state: Mutex::new(State::default()), + canceller: Mutex::new(None), + client, + } + } +} + +impl ObjectImpl for WhepClient { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecString::builder("whep-endpoint") + .nick("WHEP Endpoint") + .blurb("The WHEP server endpoint to POST SDP offer to.") + .build(), + glib::ParamSpecBoolean::builder("use-link-headers") + .nick("Use Link Headers") + .blurb("Use link headers to configure STUN/TURN servers if present in WHEP endpoint response.") + .build(), + glib::ParamSpecString::builder("auth-token") + .nick("Authorization Token") + .blurb("Authentication token to use, will be sent in the HTTP Header as 'Bearer '") + .build(), + glib::ParamSpecUInt::builder("timeout") + .nick("Timeout") + .blurb("Value in seconds to timeout WHEP endpoint requests (0 = No timeout).") + .maximum(3600) + .default_value(DEFAULT_TIMEOUT) + .readwrite() + .build(), + ] + }); + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "whep-endpoint" => { + let mut settings = self.settings.lock().unwrap(); + settings.whep_endpoint = value.get().expect("WHEP endpoint should be a string"); + } + "use-link-headers" => { + let mut settings = self.settings.lock().unwrap(); + settings.use_link_headers = value + .get() + .expect("use-link-headers should be a boolean value"); + } + "auth-token" => { + let mut settings = self.settings.lock().unwrap(); + settings.auth_token = value.get().expect("Auth token should be a string"); + } + "timeout" => { + let mut settings = self.settings.lock().unwrap(); + settings.timeout = value.get().expect("type checked upstream"); + } + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "whep-endpoint" => { + let settings = self.settings.lock().unwrap(); + settings.whep_endpoint.to_value() + } + "use-link-headers" => { + let settings = self.settings.lock().unwrap(); + settings.use_link_headers.to_value() + } + "auth-token" => { + let settings = self.settings.lock().unwrap(); + settings.auth_token.to_value() + } + "timeout" => { + let settings = self.settings.lock().unwrap(); + settings.timeout.to_value() + } + _ => unimplemented!(), + } + } +} + +#[glib::object_subclass] +impl ObjectSubclass for WhepClient { + const NAME: &'static str = "GstWhepClientSignaller"; + type Type = super::WhepClientSignaller; + type ParentType = glib::Object; + type Interfaces = (Signallable,); +} + +impl WhepClient { + fn raise_error(&self, msg: String) { + self.obj() + .emit_by_name::<()>("error", &[&format!("Error: {msg}")]); + } + + fn handle_future_error(&self, err: WaitError) { + match err { + WaitError::FutureAborted => { + gst::warning!(CAT, imp: self, "Future aborted") + } + WaitError::FutureError(err) => self.raise_error(err.to_string()), + }; + } + + fn sdp_message_parse(&self, sdp_bytes: Bytes, _webrtcbin: &gst::Element) { + let sdp = match sdp_message::SDPMessage::parse_buffer(&sdp_bytes) { + Ok(sdp) => sdp, + Err(_) => { + self.raise_error("Could not parse answer SDP".to_string()); + return; + } + }; + + let remote_sdp = WebRTCSessionDescription::new(WebRTCSDPType::Answer, sdp); + + self.obj() + .emit_by_name::<()>("session-description", &[&SESSION_ID, &remote_sdp]); + } + + async fn parse_endpoint_response( + &self, + sess_desc: WebRTCSessionDescription, + resp: reqwest::Response, + redirects: u8, + webrtcbin: gst::Element, + ) { + let endpoint; + let use_link_headers; + + { + let settings = self.settings.lock().unwrap(); + endpoint = + reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str()).unwrap(); + use_link_headers = settings.use_link_headers; + drop(settings); + } + + match resp.status() { + StatusCode::OK | StatusCode::NO_CONTENT => { + gst::info!(CAT, imp: self, "SDP offer successfully send"); + } + + StatusCode::CREATED => { + gst::debug!(CAT, imp: self, "Response headers: {:?}", resp.headers()); + + if use_link_headers { + if let Err(e) = set_ice_servers(&webrtcbin, resp.headers()) { + self.raise_error(e.to_string()); + return; + }; + } + + /* See section 4.2 of the WHEP specification */ + let location = match resp.headers().get(reqwest::header::LOCATION) { + Some(location) => location, + None => { + self.raise_error( + "Location header field should be present for WHEP resource URL" + .to_string(), + ); + return; + } + }; + + let location = match location.to_str() { + Ok(loc) => loc, + Err(e) => { + self.raise_error(format!("Failed to convert location to string: {e}")); + return; + } + }; + + let url = reqwest::Url::parse(endpoint.as_str()).unwrap(); + + gst::debug!(CAT, imp: self, "WHEP resource: {:?}", location); + + let url = match url.join(location) { + Ok(joined_url) => joined_url, + Err(err) => { + self.raise_error(format!("URL join operation failed: {err:?}")); + return; + } + }; + + match resp.bytes().await { + Ok(ans_bytes) => { + let mut state = self.state.lock().unwrap(); + *state = match *state { + State::Post { redirects: _r } => State::Running { + whep_resource: url.to_string(), + }, + _ => { + self.raise_error("Expected to be in POST state".to_string()); + return; + } + }; + drop(state); + + self.sdp_message_parse(ans_bytes, &webrtcbin) + } + Err(err) => self.raise_error(err.to_string()), + } + } + + status if status.is_redirection() => { + if redirects < MAX_REDIRECTS { + match parse_redirect_location(resp.headers(), &endpoint) { + Ok(redirect_url) => { + { + let mut state = self.state.lock().unwrap(); + *state = match *state { + State::Post { redirects: _r } => State::Post { + redirects: redirects + 1, + }, + /* + * As per section 4.6 of the specification, redirection is + * not required to be supported for the PATCH and DELETE + * requests to the final WHEP resource URL. Only the initial + * POST request may support redirection. + */ + State::Running { .. } => { + self.raise_error( + "Unexpected redirection in RUNNING state".to_string(), + ); + return; + } + State::Stopped => unreachable!(), + }; + drop(state); + } + + gst::warning!( + CAT, + imp: self, + "Redirecting endpoint to {}", + redirect_url.as_str() + ); + + self.do_post(sess_desc, webrtcbin, redirect_url).await + } + Err(e) => self.raise_error(e.to_string()), + } + } else { + self.raise_error("Too many redirects. Unable to connect.".to_string()); + } + } + + s => { + match resp.bytes().await { + Ok(r) => { + let res = r.escape_ascii().to_string(); + + // FIXME: Check and handle 'Retry-After' header in case of server error + self.raise_error(format!("Unexpected response: {} - {}", s.as_str(), res)); + } + Err(err) => self.raise_error(err.to_string()), + } + } + } + } + + async fn whep_offer(&self, webrtcbin: gst::Element) { + let local_desc = + webrtcbin.property::>("local-description"); + + let sess_desc = match local_desc { + None => { + self.raise_error("Local description is not set".to_string()); + return; + } + Some(mut local_desc) => { + local_desc.set_type(WebRTCSDPType::Offer); + local_desc + } + }; + + gst::debug!( + CAT, + imp: self, + "Sending offer SDP: {:?}", + sess_desc.sdp().as_text() + ); + + let timeout; + let endpoint; + + { + let settings = self.settings.lock().unwrap(); + timeout = settings.timeout; + endpoint = + reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str()).unwrap(); + drop(settings); + } + + if let Err(e) = wait_async( + &self.canceller, + self.do_post(sess_desc, webrtcbin, endpoint), + timeout, + ) + .await + { + self.handle_future_error(e); + } + } + + #[async_recursion] + async fn do_post( + &self, + offer: WebRTCSessionDescription, + webrtcbin: gst::Element, + endpoint: reqwest::Url, + ) { + let auth_token; + + { + let settings = self.settings.lock().unwrap(); + auth_token = settings.auth_token.clone(); + drop(settings); + } + + let sdp = offer.sdp(); + let body = sdp.as_text().unwrap(); + + gst::info!(CAT, imp: self, "Using endpoint {}", endpoint.as_str()); + + let mut headermap = HeaderMap::new(); + headermap.insert( + reqwest::header::CONTENT_TYPE, + HeaderValue::from_static("application/sdp"), + ); + + if let Some(token) = auth_token.as_ref() { + let bearer_token = "Bearer ".to_owned() + token.as_str(); + headermap.insert( + reqwest::header::AUTHORIZATION, + HeaderValue::from_str(bearer_token.as_str()) + .expect("Failed to set auth token to header"), + ); + } + + gst::debug!( + CAT, + imp: self, + "Url for HTTP POST request: {}", + endpoint.as_str() + ); + + let resp = self + .client + .request(reqwest::Method::POST, endpoint.clone()) + .headers(headermap) + .body(body) + .send() + .await; + + match resp { + Ok(r) => { + #[allow(unused_mut)] + let mut redirects; + + { + let state = self.state.lock().unwrap(); + redirects = match *state { + State::Post { redirects } => redirects, + _ => { + self.raise_error("Trying to do POST in unexpected state".to_string()); + return; + } + }; + drop(state); + } + + self.parse_endpoint_response(offer, r, redirects, webrtcbin) + .await + } + Err(err) => self.raise_error(err.to_string()), + } + } + + fn terminate_session(&self) { + let settings = self.settings.lock().unwrap(); + let state = self.state.lock().unwrap(); + let timeout = settings.timeout; + + let resource_url = match *state { + State::Running { + whep_resource: ref whep_resource_url, + } => whep_resource_url.clone(), + _ => { + self.raise_error("Terminated in unexpected state".to_string()); + return; + } + }; + + drop(state); + + let mut headermap = HeaderMap::new(); + if let Some(token) = &settings.auth_token { + let bearer_token = "Bearer ".to_owned() + token.as_str(); + headermap.insert( + reqwest::header::AUTHORIZATION, + HeaderValue::from_str(bearer_token.as_str()) + .expect("Failed to set auth token to header"), + ); + } + + drop(settings); + + gst::debug!(CAT, imp: self, "DELETE request on {}", resource_url); + + /* DELETE request goes to the WHEP resource URL. See section 3 of the specification. */ + let client = build_reqwest_client(reqwest::redirect::Policy::default()); + let future = async { + client + .delete(resource_url.clone()) + .headers(headermap) + .send() + .await + .map_err(|err| { + gst::error_msg!( + gst::ResourceError::Failed, + ["DELETE request failed {}: {:?}", resource_url, err] + ) + }) + }; + + let res = wait(&self.canceller, future, timeout); + match res { + Ok(r) => { + gst::debug!(CAT, imp: self, "Response to DELETE : {}", r.status()); + } + Err(e) => match e { + WaitError::FutureAborted => { + gst::warning!(CAT, imp: self, "DELETE request aborted") + } + WaitError::FutureError(e) => { + gst::error!(CAT, imp: self, "Error on DELETE request : {}", e) + } + }, + }; + } + + pub fn on_webrtcbin_ready(&self) -> RustClosure { + glib::closure!(|signaller: &super::WhepClientSignaller, + _consumer_identifier: &str, + webrtcbin: &gst::Element| { + let obj_weak = signaller.downgrade(); + + webrtcbin.connect_notify(Some("ice-gathering-state"), move |webrtcbin, _pspec| { + let Some(obj) = obj_weak.upgrade() else { + return; + }; + + let state = webrtcbin.property::("ice-gathering-state"); + + match state { + WebRTCICEGatheringState::Gathering => { + gst::info!(CAT, obj: obj, "ICE gathering started"); + } + WebRTCICEGatheringState::Complete => { + gst::info!(CAT, obj: obj, "ICE gathering complete"); + + let webrtcbin = webrtcbin.clone(); + + RUNTIME.spawn(async move { obj.imp().whep_offer(webrtcbin).await }); + } + _ => (), + } + }); + }) + } +} + +impl SignallableImpl for WhepClient { + fn start(&self) { + if self.settings.lock().unwrap().whep_endpoint.is_none() { + self.raise_error("WHEP endpoint URL must be set".to_string()); + return; + } + + let mut state = self.state.lock().unwrap(); + *state = State::Post { redirects: 0 }; + drop(state); + + self.obj() + .emit_by_name::<()>("session-started", &[&SESSION_ID, &SESSION_ID]); + self.obj().emit_by_name::<()>( + "session-requested", + &[ + &SESSION_ID, + &SESSION_ID, + &None::, + ], + ); + } + + fn stop(&self) {} + + fn end_session(&self, _session_id: &str) { + // Interrupt requests in progress, if any + if let Some(canceller) = &*self.canceller.lock().unwrap() { + canceller.abort(); + } + + let state = self.state.lock().unwrap(); + if let State::Running { .. } = *state { + // Release server-side resources + drop(state); + self.terminate_session(); + } + } +} diff --git a/net/webrtc/src/whep_signaller/mod.rs b/net/webrtc/src/whep_signaller/mod.rs new file mode 100644 index 00000000..e9a2908c --- /dev/null +++ b/net/webrtc/src/whep_signaller/mod.rs @@ -0,0 +1,21 @@ +// SPDX-License-Identifier: MPL-2.0 + +use crate::signaller::Signallable; +use gst::{glib, prelude::ObjectExt, subclass::prelude::ObjectSubclassIsExt}; + +mod client; + +glib::wrapper! { + pub struct WhepClientSignaller(ObjectSubclass) @implements Signallable; +} + +unsafe impl Send for WhepClientSignaller {} +unsafe impl Sync for WhepClientSignaller {} + +impl Default for WhepClientSignaller { + fn default() -> Self { + let sig: WhepClientSignaller = glib::Object::new(); + sig.connect_closure("webrtcbin-ready", false, sig.imp().on_webrtcbin_ready()); + sig + } +} diff --git a/net/webrtc/src/whip_signaller/imp.rs b/net/webrtc/src/whip_signaller/imp.rs index 9f7adc4f..c6047207 100644 --- a/net/webrtc/src/whip_signaller/imp.rs +++ b/net/webrtc/src/whip_signaller/imp.rs @@ -18,9 +18,8 @@ use reqwest::header::HeaderValue; use reqwest::StatusCode; use std::sync::Mutex; -use core::time::Duration; -use crossbeam_channel::unbounded; use std::net::SocketAddr; +use tokio::sync::mpsc; use url::Url; use warp::{ http, @@ -47,7 +46,6 @@ const ENDPOINT_PATH: &str = "endpoint"; const RESOURCE_PATH: &str = "resource"; const DEFAULT_HOST_ADDR: &str = "http://127.0.0.1:8080"; const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://stun.l.google.com:19303"); -const DEFAULT_PRODUCER_PEER_ID: Option<&str> = Some("whip-client"); const CONTENT_SDP: &str = "application/sdp"; const CONTENT_TRICKLE_ICE: &str = "application/trickle-ice-sdpfrag"; @@ -193,7 +191,7 @@ impl WhipClient { let mut headermap = HeaderMap::new(); headermap.insert( reqwest::header::CONTENT_TYPE, - HeaderValue::from_static("application/sdp"), + HeaderValue::from_static(CONTENT_SDP), ); if let Some(token) = auth_token.as_ref() { @@ -616,27 +614,14 @@ impl ObjectImpl for WhipClient { // WHIP server implementation #[derive(Debug)] -enum WhipServerState { - Idle, - Negotiating, - Ready, -} - -impl Default for WhipServerState { - fn default() -> Self { - Self::Idle - } -} - struct WhipServerSettings { stun_server: Option, turn_servers: gst::Array, host_addr: Url, - producer_peer_id: Option, timeout: u32, shutdown_signal: Option>, server_handle: Option>, - sdp_answer: Option>>, + sdp_answer: Option>>, } impl Default for WhipServerSettings { @@ -645,7 +630,6 @@ impl Default for WhipServerSettings { host_addr: Url::parse(DEFAULT_HOST_ADDR).unwrap(), stun_server: DEFAULT_STUN_SERVER.map(String::from), turn_servers: gst::Array::new(Vec::new() as Vec), - producer_peer_id: DEFAULT_PRODUCER_PEER_ID.map(String::from), timeout: DEFAULT_TIMEOUT, shutdown_signal: None, server_handle: None, @@ -654,18 +638,10 @@ impl Default for WhipServerSettings { } } +#[derive(Default)] pub struct WhipServer { - state: Mutex, settings: Mutex, -} - -impl Default for WhipServer { - fn default() -> Self { - Self { - settings: Mutex::new(WhipServerSettings::default()), - state: Mutex::new(WhipServerState::default()), - } - } + canceller: Mutex>, } #[derive(Debug)] @@ -694,7 +670,7 @@ impl WhipServer { WebRTCICEGatheringState::Complete => { gst::info!(CAT, obj: obj, "ICE gathering complete"); let ans: Option; - let settings = obj.imp().settings.lock().unwrap(); + let mut settings = obj.imp().settings.lock().unwrap(); if let Some(answer_desc) = webrtcbin .property::>("local-description") { @@ -702,9 +678,22 @@ impl WhipServer { } else { ans = None; } - if let Some(tx) = &settings.sdp_answer { - tx.send(ans).unwrap() - } + let tx = settings + .sdp_answer + .take() + .expect("SDP answer Sender needs to be valid"); + + let obj_weak = obj.downgrade(); + RUNTIME.spawn(async move { + let obj = match obj_weak.upgrade() { + Some(obj) => obj, + None => return, + }; + + if let Err(e) = tx.send(ans).await { + gst::error!(CAT, obj: obj, "Failed to send SDP {e}"); + } + }); } _ => (), } @@ -722,57 +711,23 @@ impl WhipServer { //FIXME: add state checking once ICE trickle is implemented } - async fn delete_handler(&self, _id: String) -> Result { - let mut state = self.state.lock().unwrap(); - match *state { - WhipServerState::Ready => { - // FIXME: session-ended will make webrtcsrc send EOS - // and producer-removed is not handled - // Need to address the usecase where when the client terminates - // the webrtcsrc should be running without sending EOS and reset - // for next client connection like a usual server - - self.obj().emit_by_name::("session-ended", &[&ROOT]); - - gst::info!(CAT, imp:self, "Ending session"); - *state = WhipServerState::Idle; - Ok(warp::reply::reply().into_response()) - } - _ => { - gst::error!(CAT, imp: self, "DELETE requested in {state:?} state. Can't Proceed"); - let res = http::Response::builder() - .status(http::StatusCode::CONFLICT) - .body(Body::from(String::from("Session not Ready"))) - .unwrap(); - Ok(res) - } + async fn delete_handler(&self, id: String) -> Result { + if self + .obj() + .emit_by_name::("session-ended", &[&id.as_str()]) + { + gst::info!(CAT, imp:self, "Ended session {id}"); + } else { + gst::info!(CAT, imp:self, "Failed to End session {id}"); + // FIXME: Do we send a different response } + Ok(warp::reply::reply().into_response()) } async fn options_handler(&self) -> Result { let settings = self.settings.lock().unwrap(); - let peer_id = settings.producer_peer_id.clone().unwrap(); drop(settings); - let mut state = self.state.lock().unwrap(); - match *state { - WhipServerState::Idle => { - self.obj() - .emit_by_name::<()>("session-started", &[&ROOT, &peer_id]); - *state = WhipServerState::Negotiating - } - WhipServerState::Ready => { - gst::error!(CAT, imp: self, "OPTIONS requested in {state:?} state. Can't proceed"); - let res = http::Response::builder() - .status(http::StatusCode::CONFLICT) - .body(Body::from(String::from("Session active already"))) - .unwrap(); - return Ok(res); - } - _ => {} - }; - drop(state); - let mut links = HeaderMap::new(); let settings = self.settings.lock().unwrap(); match &settings.stun_server { @@ -806,7 +761,7 @@ impl WhipServer { } let mut res = http::Response::builder() - .header("Access-Post", "application/sdp") + .header("Access-Post", CONTENT_SDP) .body(Body::empty()) .unwrap(); @@ -820,31 +775,15 @@ impl WhipServer { &self, body: warp::hyper::body::Bytes, ) -> Result, warp::Rejection> { - let mut settings = self.settings.lock().unwrap(); - let peer_id = settings.producer_peer_id.clone().unwrap(); - let wait_timeout = settings.timeout; - let (tx, rx) = unbounded::>(); - settings.sdp_answer = Some(tx); - drop(settings); - - let mut state = self.state.lock().unwrap(); - match *state { - WhipServerState::Idle => { - self.obj() - .emit_by_name::<()>("session-started", &[&ROOT, &peer_id]); - *state = WhipServerState::Negotiating - } - WhipServerState::Ready => { - gst::error!(CAT, imp: self, "POST requested in {state:?} state. Can't Proceed"); - let res = http::Response::builder() - .status(http::StatusCode::CONFLICT) - .body(Body::from(String::from("Session active already"))) - .unwrap(); - return Ok(res); - } - _ => {} + let session_id = uuid::Uuid::new_v4().to_string(); + let (tx, mut rx) = mpsc::channel::>(1); + let wait_timeout = { + let mut settings = self.settings.lock().unwrap(); + let wait_timeout = settings.timeout; + settings.sdp_answer = Some(tx); + drop(settings); + wait_timeout }; - drop(state); match gst_sdp::SDPMessage::parse_buffer(body.as_ref()) { Ok(offer_sdp) => { @@ -854,7 +793,9 @@ impl WhipServer { ); self.obj() - .emit_by_name::<()>("session-description", &[&"unique", &offer]); + .emit_by_name::<()>("session-started", &[&session_id, &session_id]); + self.obj() + .emit_by_name::<()>("session-description", &[&session_id, &offer]); } Err(err) => { gst::error!(CAT, imp: self, "Could not parse offer SDP: {err}"); @@ -864,20 +805,32 @@ impl WhipServer { } } - // We don't want to wait infinitely for the ice gathering to complete. - let answer = match rx.recv_timeout(Duration::from_secs(wait_timeout as u64)) { - Ok(a) => a, - Err(e) => { - let reply = warp::reply::reply(); - let res; - if e.is_timeout() { - res = warp::reply::with_status(reply, http::StatusCode::REQUEST_TIMEOUT); - gst::error!(CAT, imp: self, "Timedout waiting for SDP answer"); - } else { - res = warp::reply::with_status(reply, http::StatusCode::INTERNAL_SERVER_ERROR); - gst::error!(CAT, imp: self, "Channel got disconnected"); + let result = wait_async(&self.canceller, rx.recv(), wait_timeout).await; + + let answer = match result { + Ok(ans) => match ans { + Some(a) => a, + None => { + let err = "Channel closed, can't receive SDP".to_owned(); + let res = http::Response::builder() + .status(http::StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from(err)) + .unwrap(); + + return Ok(res); } - return Ok(res.into_response()); + }, + Err(e) => { + let err = match e { + WaitError::FutureAborted => "Aborted".to_owned(), + WaitError::FutureError(err) => err.to_string(), + }; + let res = http::Response::builder() + .status(http::StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from(err)) + .unwrap(); + + return Ok(res); } }; @@ -947,10 +900,10 @@ impl WhipServer { drop(settings); // Got SDP answer, send answer in the response - let resource_url = "/".to_owned() + ROOT + "/" + RESOURCE_PATH + "/" + &peer_id; + let resource_url = "/".to_owned() + ROOT + "/" + RESOURCE_PATH + "/" + &session_id; let mut res = http::Response::builder() .status(StatusCode::CREATED) - .header(CONTENT_TYPE, "application/sdp") + .header(CONTENT_TYPE, CONTENT_SDP) .header("location", resource_url) .body(Body::from(ans_text.unwrap())) .unwrap(); @@ -958,10 +911,6 @@ impl WhipServer { let headers = res.headers_mut(); headers.extend(links); - let mut state = self.state.lock().unwrap(); - *state = WhipServerState::Ready; - drop(state); - Ok(res) } @@ -1117,7 +1066,8 @@ impl SignallableImpl for WhipServer { gst::info!(CAT, imp: self, "stopped the WHIP server"); } - fn end_session(&self, _session_id: &str) { + fn end_session(&self, session_id: &str) { + gst::info!(CAT, imp: self, "Session {session_id} ended"); //FIXME: send any events to the client } } @@ -1140,11 +1090,6 @@ impl ObjectImpl for WhipServer { .default_value(DEFAULT_HOST_ADDR) .flags(glib::ParamFlags::READWRITE) .build(), - // needed by webrtcsrc in handle_webrtc_src_pad - glib::ParamSpecString::builder("producer-peer-id") - .default_value(DEFAULT_PRODUCER_PEER_ID) - .flags(glib::ParamFlags::READABLE) - .build(), glib::ParamSpecString::builder("stun-server") .nick("STUN Server") .blurb("The STUN server of the form stun://hostname:port") @@ -1204,7 +1149,6 @@ impl ObjectImpl for WhipServer { "host-addr" => settings.host_addr.to_string().to_value(), "stun-server" => settings.stun_server.to_value(), "turn-servers" => settings.turn_servers.to_value(), - "producer-peer-id" => settings.producer_peer_id.to_value(), "timeout" => settings.timeout.to_value(), _ => unimplemented!(), }