From 84a33ca7b9fde27aabee41c191ad63763947033e Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Fri, 7 Apr 2023 00:41:16 +0200 Subject: [PATCH] webrtcsink: bring in signalling code from whipsink as a signaller Part-of: --- docs/plugins/gst_plugins_cache.json | 31 ++ net/webrtc/Cargo.toml | 4 + net/webrtc/README.md | 31 +- net/webrtc/src/lib.rs | 1 + net/webrtc/src/utils.rs | 250 +++++++++++ net/webrtc/src/webrtcsink/imp.rs | 41 ++ net/webrtc/src/webrtcsink/mod.rs | 10 + net/webrtc/src/whip_signaller/imp.rs | 612 +++++++++++++++++++++++++++ net/webrtc/src/whip_signaller/mod.rs | 19 + net/webrtchttp/src/whipsink/imp.rs | 2 + 10 files changed, 1000 insertions(+), 1 deletion(-) create mode 100644 net/webrtc/src/whip_signaller/imp.rs create mode 100644 net/webrtc/src/whip_signaller/mod.rs diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 3f382c40..096232e4 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -6259,6 +6259,37 @@ "when": "last" } } + }, + "whipwebrtcsink": { + "author": "Taruntej Kanakamalla ", + "description": "WebRTC sink with WHIP signaller", + "hierarchy": [ + "GstWhipWebRTCSink", + "GstBaseWebRTCSink", + "GstBin", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "interfaces": [ + "GstChildProxy", + "GstNavigation" + ], + "klass": "Sink/Network/WebRTC", + "pad-templates": { + "audio_%%u": { + "caps": "audio/x-raw:\naudio/x-opus:\n", + "direction": "sink", + "presence": "request" + }, + "video_%%u": { + "caps": "video/x-raw:\n\nvideo/x-raw(memory:CUDAMemory):\n\nvideo/x-raw(memory:GLMemory):\n\nvideo/x-raw(memory:NVMM):\nvideo/x-vp8:\nvideo/x-h264:\nvideo/x-vp9:\nvideo/x-h265:\n", + "direction": "sink", + "presence": "request" + } + }, + "rank": "none" } }, "filename": "gstrswebrtc", diff --git a/net/webrtc/Cargo.toml b/net/webrtc/Cargo.toml index 5b330ec0..a69e786f 100644 --- a/net/webrtc/Cargo.toml +++ b/net/webrtc/Cargo.toml @@ -47,6 +47,10 @@ chrono = "0.4" data-encoding = "2.3.3" url-escape = "0.1.1" +reqwest = { version = "0.11", features = ["default-tls"] } +parse_link_header = {version = "0.3", features = ["url"]} +async-recursion = "1.0.0" + [dev-dependencies] tracing = { version = "0.1", features = ["log"] } tracing-subscriber = { version = "0.3", features = ["registry", "env-filter"] } diff --git a/net/webrtc/README.md b/net/webrtc/README.md index 1a7ddc12..c5cd45b1 100644 --- a/net/webrtc/README.md +++ b/net/webrtc/README.md @@ -216,6 +216,8 @@ All the rust code in this repository is licensed under the Code in [gstwebrtc-api](gstwebrtc-api) is also licensed under the [Mozilla Public License Version 2.0]. +[Mozilla Public License Version 2.0]: http://opensource.org/licenses/MPL-2.0 + ## Using the AWS KVS signaller * Setup AWS Kinesis Video Streams @@ -230,4 +232,31 @@ AWS_ACCESS_KEY_ID="XXX" AWS_SECRET_ACCESS_KEY="XXX" gst-launch-1.0 videotestsrc * Connect a viewer @ -[Mozilla Public License Version 2.0]: http://opensource.org/licenses/MPL-2.0 +## Using the WHIP Signaller + +Testing the whip signaller can be done by setting up janus and +. + +* Set up a [janus] instance with the videoroom plugin configured + to expose a room with ID 1234 (configuration in `janus.plugin.videoroom.jcfg`) + +* Open the web page, click start + and join the room + +* Set up the [simple whip server] as explained in its README + +* Navigate to , create an endpoint named room1234 + pointing to the Janus room with ID 1234 + +* Finally, send a stream to the endpoint with: + +``` shell +gst-launch-1.0 -e uridecodebin uri=file:///home/meh/path/to/video/file ! \ + videoconvert ! video/x-raw ! queue ! \ + whipwebrtcsink name=ws signaller::whip-endpoint="http://127.0.0.1:7080/whip/endpoint/room1234" +``` + +You should see a second video displayed in the videoroomtest web page. + +[janus]: https://github.com/meetecho/janus-gateway +[simple whip server]: https://github.com/meetecho/simple-whip-server/ diff --git a/net/webrtc/src/lib.rs b/net/webrtc/src/lib.rs index d17b825b..1735edc7 100644 --- a/net/webrtc/src/lib.rs +++ b/net/webrtc/src/lib.rs @@ -19,6 +19,7 @@ pub mod signaller; pub mod utils; pub mod webrtcsink; pub mod webrtcsrc; +mod whip_signaller; fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { webrtcsink::register(plugin)?; diff --git a/net/webrtc/src/utils.rs b/net/webrtc/src/utils.rs index 14d89bca..12e2db27 100644 --- a/net/webrtc/src/utils.rs +++ b/net/webrtc/src/utils.rs @@ -100,6 +100,256 @@ pub fn serialize_json_object(val: &serde_json::Map) - res } +use crate::RUNTIME; +use futures::future; +use futures::prelude::*; +use gst::ErrorMessage; +use reqwest::header::HeaderMap; +use reqwest::redirect::Policy; +use std::sync::Mutex; +use std::time::Duration; + +#[derive(Debug)] +pub enum WaitError { + FutureAborted, + FutureError(ErrorMessage), +} + +pub async fn wait_async( + canceller: &Mutex>, + future: F, + timeout: u32, +) -> Result +where + F: Send + Future, + T: Send + 'static, +{ + let (abort_handle, abort_registration) = future::AbortHandle::new_pair(); + { + let mut canceller_guard = canceller.lock().unwrap(); + canceller_guard.replace(abort_handle); + drop(canceller_guard); + } + + let future = async { + if timeout == 0 { + Ok(future.await) + } else { + let res = tokio::time::timeout(Duration::from_secs(timeout.into()), future).await; + + match res { + Ok(r) => Ok(r), + Err(e) => Err(WaitError::FutureError(gst::error_msg!( + gst::ResourceError::Read, + ["Request timeout, elapsed: {}", e] + ))), + } + } + }; + + let future = async { + match future::Abortable::new(future, abort_registration).await { + Ok(Ok(r)) => Ok(r), + + Ok(Err(err)) => Err(WaitError::FutureError(gst::error_msg!( + gst::ResourceError::Failed, + ["Future resolved with an error {:?}", err] + ))), + + Err(future::Aborted) => Err(WaitError::FutureAborted), + } + }; + + let res = future.await; + + let mut canceller_guard = canceller.lock().unwrap(); + *canceller_guard = None; + + res +} + +pub fn wait( + canceller: &Mutex>, + future: F, + timeout: u32, +) -> Result +where + F: Send + Future>, + T: Send + 'static, +{ + let mut canceller_guard = canceller.lock().unwrap(); + let (abort_handle, abort_registration) = future::AbortHandle::new_pair(); + + if canceller_guard.is_some() { + return Err(WaitError::FutureError(gst::error_msg!( + gst::ResourceError::Failed, + ["Old Canceller should not exist"] + ))); + } + + canceller_guard.replace(abort_handle); + drop(canceller_guard); + + let future = async { + if timeout == 0 { + future.await + } else { + let res = tokio::time::timeout(Duration::from_secs(timeout.into()), future).await; + + match res { + Ok(r) => r, + Err(e) => Err(gst::error_msg!( + gst::ResourceError::Read, + ["Request timeout, elapsed: {}", e.to_string()] + )), + } + } + }; + + let future = async { + match future::Abortable::new(future, abort_registration).await { + Ok(Ok(res)) => Ok(res), + + Ok(Err(err)) => Err(WaitError::FutureError(gst::error_msg!( + gst::ResourceError::Failed, + ["Future resolved with an error {:?}", err] + ))), + + Err(future::Aborted) => Err(WaitError::FutureAborted), + } + }; + + let res = { + let _enter = RUNTIME.enter(); + futures::executor::block_on(future) + }; + + canceller_guard = canceller.lock().unwrap(); + *canceller_guard = None; + + res +} + +pub fn parse_redirect_location( + headermap: &HeaderMap, + old_url: &reqwest::Url, +) -> Result { + let location = match headermap.get(reqwest::header::LOCATION) { + Some(location) => location, + None => { + return Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Location header field should be present for WHIP/WHEP resource URL"] + )); + } + }; + + let location = match location.to_str() { + Ok(loc) => loc, + Err(e) => { + return Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to convert location to string {}", e] + )); + } + }; + + match reqwest::Url::parse(location) { + Ok(url) => Ok(url), // Location URL is an absolute path + Err(_) => { + // Location URL is a relative path + let new_url = old_url.clone().join(location).map_err(|err| { + gst::error_msg!( + gst::ResourceError::Failed, + ["URL join operation failed: {:?}", err] + ) + })?; + + Ok(new_url) + } + } +} + +pub fn build_reqwest_client(pol: Policy) -> reqwest::Client { + let client_builder = reqwest::Client::builder(); + client_builder.redirect(pol).build().unwrap() +} + +pub fn set_ice_servers( + webrtcbin: &gst::Element, + headermap: &HeaderMap, +) -> Result<(), ErrorMessage> { + for link in headermap.get_all("link").iter() { + let link = link.to_str().map_err(|err| { + gst::error_msg!( + gst::ResourceError::Failed, + [ + "Header value should contain only visible ASCII strings: {}", + err + ] + ) + })?; + + let item_map = match parse_link_header::parse_with_rel(link) { + Ok(map) => map, + Err(_) => continue, + }; + + let link = match item_map.contains_key("ice-server") { + true => item_map.get("ice-server").unwrap(), + false => continue, // Not a link header we care about + }; + + // Note: webrtcbin needs ice servers to be in the below format + // ://@ + // and the ice-servers (link headers) received from the whip server might be + // in the format : with username and password as separate params. + // Constructing these with 'url' crate also require a format/parse + // for changing : to ://:@. + // So preferred to use the String rather + + // check if uri has :// + let ice_server_url = if link.uri.has_authority() { + // use raw_uri as is + // username and password in the link.uri.params ignored + link.uri.clone() + } else { + // No builder pattern is provided by reqwest::Url. Use string operation. + // construct url as '://@' + let url = format!("{}://{}", link.uri.scheme(), link.uri.path()); + + let mut new_url = match reqwest::Url::parse(url.as_str()) { + Ok(url) => url, + Err(_) => continue, + }; + + if let Some(user) = link.params.get("username") { + new_url.set_username(user.as_str()).unwrap(); + if let Some(pass) = link.params.get("credential") { + new_url.set_password(Some(pass.as_str())).unwrap(); + } + } + + new_url + }; + + // It's nicer to not collapse the `else if` and its inner `if` + #[allow(clippy::collapsible_if)] + if link.uri.scheme() == "stun" { + webrtcbin.set_property_from_str("stun-server", ice_server_url.as_str()); + } else if link.uri.scheme().starts_with("turn") { + if !webrtcbin.emit_by_name::("add-turn-server", &[&ice_server_url.as_str()]) { + return Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to set turn server {}", ice_server_url] + )); + } + } + } + + Ok(()) +} + /// Wrapper around `gst::ElementFactory::make` with a better error /// message pub fn make_element(element: &str, name: Option<&str>) -> Result { diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index bc6f415a..7b776afc 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -23,6 +23,7 @@ use super::homegrown_cc::CongestionController; use super::{WebRTCSinkCongestionControl, WebRTCSinkError, WebRTCSinkMitigationMode}; use crate::aws_kvs_signaller::AwsKvsSignaller; use crate::signaller::{prelude::*, Signallable, Signaller, WebRTCSignallerRole}; +use crate::whip_signaller::WhipSignaller; use crate::RUNTIME; use std::collections::{BTreeMap, HashSet}; @@ -3810,3 +3811,43 @@ impl ObjectSubclass for AwsKvsWebRTCSink { type Type = super::AwsKvsWebRTCSink; type ParentType = super::BaseWebRTCSink; } + +#[derive(Default)] +pub struct WhipWebRTCSink {} + +impl ObjectImpl for WhipWebRTCSink { + fn constructed(&self) { + let element = self.obj(); + let ws = element.upcast_ref::().imp(); + + let _ = ws.set_signaller(WhipSignaller::default().upcast()); + } +} + +impl GstObjectImpl for WhipWebRTCSink {} + +impl ElementImpl for WhipWebRTCSink { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "WhipWebRTCSink", + "Sink/Network/WebRTC", + "WebRTC sink with WHIP signaller", + "Taruntej Kanakamalla ", + ) + }); + + Some(&*ELEMENT_METADATA) + } +} + +impl BinImpl for WhipWebRTCSink {} + +impl BaseWebRTCSinkImpl for WhipWebRTCSink {} + +#[glib::object_subclass] +impl ObjectSubclass for WhipWebRTCSink { + const NAME: &'static str = "GstWhipWebRTCSink"; + type Type = super::WhipWebRTCSink; + type ParentType = super::BaseWebRTCSink; +} diff --git a/net/webrtc/src/webrtcsink/mod.rs b/net/webrtc/src/webrtcsink/mod.rs index 976e28de..fb5e2500 100644 --- a/net/webrtc/src/webrtcsink/mod.rs +++ b/net/webrtc/src/webrtcsink/mod.rs @@ -52,6 +52,10 @@ glib::wrapper! { pub struct AwsKvsWebRTCSink(ObjectSubclass) @extends BaseWebRTCSink, gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation; } +glib::wrapper! { + pub struct WhipWebRTCSink(ObjectSubclass) @extends BaseWebRTCSink, gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation; +} + #[derive(thiserror::Error, Debug)] pub enum WebRTCSinkError { #[error("no session with id")] @@ -126,6 +130,12 @@ pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { gst::Rank::None, AwsKvsWebRTCSink::static_type(), )?; + gst::Element::register( + Some(plugin), + "whipwebrtcsink", + gst::Rank::None, + WhipWebRTCSink::static_type(), + )?; Ok(()) } diff --git a/net/webrtc/src/whip_signaller/imp.rs b/net/webrtc/src/whip_signaller/imp.rs new file mode 100644 index 00000000..a26aba78 --- /dev/null +++ b/net/webrtc/src/whip_signaller/imp.rs @@ -0,0 +1,612 @@ +// SPDX-License-Identifier: MPL-2.0 + +use crate::signaller::{Signallable, SignallableImpl}; +use crate::utils::{ + build_reqwest_client, parse_redirect_location, set_ice_servers, wait, wait_async, WaitError, +}; +use crate::RUNTIME; +use async_recursion::async_recursion; +use gst::glib; +use gst::prelude::*; +use gst::subclass::prelude::*; +use gst_webrtc::{WebRTCICEGatheringState, WebRTCSessionDescription}; +use once_cell::sync::Lazy; +use reqwest::header::HeaderMap; +use reqwest::header::HeaderValue; +use reqwest::StatusCode; +use std::sync::Mutex; + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "webrtc-whip-signaller", + gst::DebugColorFlags::empty(), + Some("WebRTC WHIP signaller"), + ) +}); + +const MAX_REDIRECTS: u8 = 10; +const DEFAULT_TIMEOUT: u32 = 15; + +#[derive(Debug)] +enum State { + Stopped, + Post { redirects: u8 }, + Running { whip_resource_url: String }, +} + +impl Default for State { + fn default() -> Self { + Self::Stopped + } +} + +#[derive(Clone)] +struct Settings { + whip_endpoint: Option, + use_link_headers: bool, + auth_token: Option, + timeout: u32, +} + +impl Default for Settings { + fn default() -> Self { + Self { + whip_endpoint: None, + use_link_headers: false, + auth_token: None, + timeout: DEFAULT_TIMEOUT, + } + } +} + +#[derive(Default)] +pub struct Signaller { + state: Mutex, + settings: Mutex, + canceller: Mutex>, +} + +impl Signaller { + fn raise_error(&self, msg: String) { + self.obj() + .emit_by_name::<()>("error", &[&format!("Error: {msg}")]); + } + + fn handle_future_error(&self, err: WaitError) { + match err { + WaitError::FutureAborted => { + gst::warning!(CAT, imp: self, "Future aborted") + } + WaitError::FutureError(err) => self.raise_error(err.to_string()), + }; + } + + async fn send_offer(&self, webrtcbin: &gst::Element) { + { + let mut state = self.state.lock().unwrap(); + *state = State::Post { redirects: 0 }; + drop(state); + } + + let local_desc = + webrtcbin.property::>("local-description"); + + let offer_sdp = match local_desc { + None => { + self.raise_error("Local description is not set".to_string()); + return; + } + Some(offer) => offer, + }; + + gst::debug!( + CAT, + imp: self, + "Sending offer SDP: {:?}", + offer_sdp.sdp().as_text() + ); + + let timeout; + { + let settings = self.settings.lock().unwrap(); + timeout = settings.timeout; + drop(settings); + } + + if let Err(e) = + wait_async(&self.canceller, self.do_post(offer_sdp, webrtcbin), timeout).await + { + self.handle_future_error(e); + } + } + + #[async_recursion] + async fn do_post(&self, offer: gst_webrtc::WebRTCSessionDescription, webrtcbin: &gst::Element) { + let auth_token; + let endpoint; + let timeout; + + { + let settings = self.settings.lock().unwrap(); + endpoint = + reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str()).unwrap(); + auth_token = settings.auth_token.clone(); + timeout = settings.timeout; + drop(settings); + } + + #[allow(unused_mut)] + let mut redirects; + + { + let state = self.state.lock().unwrap(); + redirects = match *state { + State::Post { redirects } => redirects, + _ => { + self.raise_error("Trying to do POST in unexpected state".to_string()); + return; + } + }; + drop(state); + } + + // Default policy for redirect does not share the auth token to new location + // So disable inbuilt redirecting and do a recursive call upon 3xx response code + let pol = reqwest::redirect::Policy::none(); + let client = build_reqwest_client(pol); + + let sdp = offer.sdp(); + let body = sdp.as_text().unwrap(); + + gst::debug!(CAT, imp: self, "Using endpoint {}", endpoint.as_str()); + let mut headermap = HeaderMap::new(); + headermap.insert( + reqwest::header::CONTENT_TYPE, + HeaderValue::from_static("application/sdp"), + ); + + if let Some(token) = auth_token.as_ref() { + let bearer_token = "Bearer ".to_owned() + token; + headermap.insert( + reqwest::header::AUTHORIZATION, + HeaderValue::from_str(bearer_token.as_str()) + .expect("Failed to set auth token to header"), + ); + } + + let res = wait_async( + &self.canceller, + client + .request(reqwest::Method::POST, endpoint.clone()) + .headers(headermap) + .body(body) + .send(), + timeout, + ) + .await; + + match res { + Ok(r) => match r { + Ok(resp) => { + if let Err(e) = wait_async( + &self.canceller, + self.parse_endpoint_response(offer, resp, redirects, webrtcbin), + timeout, + ) + .await + { + self.handle_future_error(e); + } + } + Err(err) => self.raise_error(err.to_string()), + }, + Err(err) => self.handle_future_error(err), + } + } + + async fn parse_endpoint_response( + &self, + offer: gst_webrtc::WebRTCSessionDescription, + resp: reqwest::Response, + redirects: u8, + webrtcbin: &gst::Element, + ) { + gst::debug!(CAT, imp: self, "Parsing endpoint response"); + + let endpoint; + let timeout; + let use_link_headers; + + { + let settings = self.settings.lock().unwrap(); + endpoint = + reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str()).unwrap(); + use_link_headers = settings.use_link_headers; + timeout = settings.timeout; + drop(settings); + } + + gst::debug!(CAT, "response status: {}", resp.status()); + + match resp.status() { + StatusCode::OK | StatusCode::CREATED => { + if use_link_headers { + if let Err(e) = set_ice_servers(webrtcbin, resp.headers()) { + self.raise_error(e.to_string()); + return; + }; + } + + // Get the url of the resource from 'location' header. + // The resource created is expected be a relative path + // and not an absolute path + // So we want to construct the full url of the resource + // using the endpoint url i.e., replace the end point path with + // resource path + let location = match resp.headers().get(reqwest::header::LOCATION) { + Some(location) => location, + None => { + self.raise_error( + "Location header field should be present for WHIP resource URL" + .to_string(), + ); + return; + } + }; + + let location = match location.to_str() { + Ok(loc) => loc, + Err(e) => { + self.raise_error(format!("Failed to convert location to string: {e}")); + return; + } + }; + + let url = reqwest::Url::parse(endpoint.as_str()).unwrap(); + + gst::debug!(CAT, imp: self, "WHIP resource: {:?}", location); + + let url = match url.join(location) { + Ok(joined_url) => joined_url, + Err(err) => { + self.raise_error(format!("URL join operation failed: {err:?}")); + return; + } + }; + + { + let mut state = self.state.lock().unwrap(); + *state = match *state { + State::Post { redirects: _r } => State::Running { + whip_resource_url: url.to_string(), + }, + _ => { + self.raise_error("Expected to be in POST state".to_string()); + return; + } + }; + drop(state); + } + + match wait_async(&self.canceller, resp.bytes(), timeout).await { + Ok(res) => match res { + Ok(ans_bytes) => match gst_sdp::SDPMessage::parse_buffer(&ans_bytes) { + Ok(ans_sdp) => { + let answer = gst_webrtc::WebRTCSessionDescription::new( + gst_webrtc::WebRTCSDPType::Answer, + ans_sdp, + ); + self.obj().emit_by_name::<()>( + "session-description", + &[&"unique", &answer], + ); + } + Err(err) => { + self.raise_error(format!("Could not parse answer SDP: {err}")); + } + }, + Err(err) => self.raise_error(err.to_string()), + }, + Err(err) => self.handle_future_error(err), + } + } + + s if s.is_redirection() => { + gst::debug!(CAT, "redirected"); + + if redirects < MAX_REDIRECTS { + match parse_redirect_location(resp.headers(), &endpoint) { + Ok(redirect_url) => { + { + let mut state = self.state.lock().unwrap(); + *state = match *state { + State::Post { redirects: _r } => State::Post { + redirects: redirects + 1, + }, + /* + * As per section 4.6 of the specification, redirection is + * not required to be supported for the PATCH and DELETE + * requests to the final WHEP resource URL. Only the initial + * POST request may support redirection. + */ + State::Running { .. } => { + self.raise_error( + "Unexpected redirection in RUNNING state".to_string(), + ); + return; + } + State::Stopped => unreachable!(), + }; + drop(state); + } + + gst::debug!( + CAT, + imp: self, + "Redirecting endpoint to {}", + redirect_url.as_str() + ); + + if let Err(err) = + wait_async(&self.canceller, self.do_post(offer, webrtcbin), timeout) + .await + { + self.handle_future_error(err); + } + } + Err(e) => self.raise_error(e.to_string()), + } + } else { + self.raise_error("Too many redirects. Unable to connect.".to_string()); + } + } + + s => { + match wait_async(&self.canceller, resp.bytes(), timeout).await { + Ok(r) => { + let res = r + .map(|x| x.escape_ascii().to_string()) + .unwrap_or_else(|_| "(no further details)".to_string()); + + // FIXME: Check and handle 'Retry-After' header in case of server error + self.raise_error(format!("Unexpected response: {} - {}", s.as_str(), res)); + } + Err(err) => self.handle_future_error(err), + } + } + } + } + + fn terminate_session(&self) { + let settings = self.settings.lock().unwrap(); + let state = self.state.lock().unwrap(); + let timeout = settings.timeout; + + let resource_url = match *state { + State::Running { + whip_resource_url: ref resource_url, + } => resource_url.clone(), + _ => { + self.raise_error("Terminated in unexpected state".to_string()); + return; + } + }; + + drop(state); + + let mut headermap = HeaderMap::new(); + if let Some(token) = &settings.auth_token { + let bearer_token = "Bearer ".to_owned() + token.as_str(); + headermap.insert( + reqwest::header::AUTHORIZATION, + HeaderValue::from_str(bearer_token.as_str()) + .expect("Failed to set auth token to header"), + ); + } + + gst::debug!(CAT, imp: self, "DELETE request on {}", resource_url); + let client = build_reqwest_client(reqwest::redirect::Policy::default()); + let future = async { + client + .delete(resource_url.clone()) + .headers(headermap) + .send() + .await + .map_err(|err| { + gst::error_msg!( + gst::ResourceError::Failed, + ["DELETE request failed {}: {:?}", resource_url, err] + ) + }) + }; + + let res = wait(&self.canceller, future, timeout); + match res { + Ok(r) => { + gst::debug!(CAT, imp: self, "Response to DELETE : {}", r.status()); + } + Err(e) => match e { + WaitError::FutureAborted => { + gst::warning!(CAT, imp: self, "DELETE request aborted") + } + WaitError::FutureError(e) => { + gst::error!(CAT, imp: self, "Error on DELETE request : {}", e) + } + }, + }; + } +} + +impl SignallableImpl for Signaller { + fn start(&self) { + if self.settings.lock().unwrap().whip_endpoint.is_none() { + self.raise_error("WHIP endpoint URL must be set".to_string()); + return; + } + + self.obj().connect_closure( + "consumer-added", + false, + glib::closure!(|signaller: &super::WhipSignaller, + _consumer_identifier: &str, + webrtcbin: &gst::Element| { + let obj_weak = signaller.downgrade(); + webrtcbin.connect_notify(Some("ice-gathering-state"), move |webrtcbin, _pspec| { + let obj = match obj_weak.upgrade() { + Some(obj) => obj, + None => return, + }; + + let state = + webrtcbin.property::("ice-gathering-state"); + + match state { + WebRTCICEGatheringState::Gathering => { + gst::info!(CAT, obj: obj, "ICE gathering started"); + } + WebRTCICEGatheringState::Complete => { + gst::info!(CAT, obj: obj, "ICE gathering complete"); + + let webrtcbin = webrtcbin.clone(); + + RUNTIME.spawn(async move { + /* Note that we check for a valid WHIP endpoint in change_state */ + obj.imp().send_offer(&webrtcbin).await + }); + } + _ => (), + } + }); + }), + ); + + self.obj().emit_by_name::<()>( + "session-requested", + &[ + &"unique", + &"unique", + &None::, + ], + ); + } + + fn stop(&self) { + // Interrupt requests in progress, if any + if let Some(canceller) = &*self.canceller.lock().unwrap() { + canceller.abort(); + } + + let state = self.state.lock().unwrap(); + if let State::Running { .. } = *state { + // Release server-side resources + drop(state); + } + } + + fn end_session(&self, session_id: &str) { + assert_eq!(session_id, "unique"); + + // Interrupt requests in progress, if any + if let Some(canceller) = &*self.canceller.lock().unwrap() { + canceller.abort(); + } + + let state = self.state.lock().unwrap(); + if let State::Running { .. } = *state { + // Release server-side resources + drop(state); + self.terminate_session(); + } + } +} + +#[glib::object_subclass] +impl ObjectSubclass for Signaller { + const NAME: &'static str = "GstWHIPWebRTCSinkSignaller"; + type Type = super::WhipSignaller; + type ParentType = glib::Object; + type Interfaces = (Signallable,); +} + +impl ObjectImpl for Signaller { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![glib::ParamSpecString::builder("whip-endpoint") + .nick("WHIP Endpoint") + .blurb("The WHIP server endpoint to POST SDP offer to. + e.g.: https://example.com/whip/endpoint/room1234") + .mutable_ready() + .build(), + + glib::ParamSpecBoolean::builder("use-link-headers") + .nick("Use Link Headers") + .blurb("Use link headers to configure ice-servers from the WHIP server response to the POST request. + If set to TRUE and the WHIP server returns valid ice-servers, + this property overrides the ice-servers values set using the stun-server and turn-server properties.") + .mutable_ready() + .build(), + + glib::ParamSpecString::builder("auth-token") + .nick("Authorization Token") + .blurb("Authentication token to use, will be sent in the HTTP Header as 'Bearer '") + .mutable_ready() + .build(), + + glib::ParamSpecUInt::builder("timeout") + .nick("Timeout") + .blurb("Value in seconds to timeout WHIP endpoint requests (0 = No timeout).") + .maximum(3600) + .default_value(DEFAULT_TIMEOUT) + .build(), + ] + }); + + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "whip-endpoint" => { + let mut settings = self.settings.lock().unwrap(); + settings.whip_endpoint = value.get().expect("WHIP endpoint should be a string"); + } + "use-link-headers" => { + let mut settings = self.settings.lock().unwrap(); + settings.use_link_headers = value + .get() + .expect("use-link-headers should be a boolean value"); + } + "auth-token" => { + let mut settings = self.settings.lock().unwrap(); + settings.auth_token = value.get().expect("Auth token should be a string"); + } + "timeout" => { + let mut settings = self.settings.lock().unwrap(); + settings.timeout = value.get().expect("type checked upstream"); + } + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "whip-endpoint" => { + let settings = self.settings.lock().unwrap(); + settings.whip_endpoint.to_value() + } + "use-link-headers" => { + let settings = self.settings.lock().unwrap(); + settings.use_link_headers.to_value() + } + "auth-token" => { + let settings = self.settings.lock().unwrap(); + settings.auth_token.to_value() + } + "timeout" => { + let settings = self.settings.lock().unwrap(); + settings.timeout.to_value() + } + _ => unimplemented!(), + } + } +} diff --git a/net/webrtc/src/whip_signaller/mod.rs b/net/webrtc/src/whip_signaller/mod.rs new file mode 100644 index 00000000..adffb6b0 --- /dev/null +++ b/net/webrtc/src/whip_signaller/mod.rs @@ -0,0 +1,19 @@ +// SPDX-License-Identifier: MPL-2.0 + +use crate::signaller::Signallable; +use gst::glib; + +mod imp; + +glib::wrapper! { + pub struct WhipSignaller(ObjectSubclass) @implements Signallable; +} + +unsafe impl Send for WhipSignaller {} +unsafe impl Sync for WhipSignaller {} + +impl Default for WhipSignaller { + fn default() -> Self { + glib::Object::new() + } +} diff --git a/net/webrtchttp/src/whipsink/imp.rs b/net/webrtchttp/src/whipsink/imp.rs index c3a08d59..6115fb96 100644 --- a/net/webrtchttp/src/whipsink/imp.rs +++ b/net/webrtchttp/src/whipsink/imp.rs @@ -366,6 +366,8 @@ impl ObjectImpl for WhipSink { let self_ref = self_.ref_counted(); + gst::info!(CAT, imp: self_, "ICE gathering complete"); + // With tokio's spawn one does not have to .await the // returned JoinHandle to make the provided future start // execution. It will start running in the background