diff --git a/Cargo.lock b/Cargo.lock index c97817b2..a1f6a027 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2956,6 +2956,7 @@ dependencies = [ "aws-smithy-http", "aws-smithy-types", "aws-types", + "bytes", "chrono", "clap", "ctrlc", diff --git a/net/webrtc/Cargo.toml b/net/webrtc/Cargo.toml index 9a8a3cc5..a07d4eda 100644 --- a/net/webrtc/Cargo.toml +++ b/net/webrtc/Cargo.toml @@ -61,6 +61,8 @@ warp = {version = "0.3", optional = true } ctrlc = {version = "3.4.0", optional = true } +bytes = "1" + [dev-dependencies] gst-plugin-rtp = { path = "../rtp" } tokio = { version = "1", features = ["signal"] } @@ -79,7 +81,7 @@ path = "src/lib.rs" gst-plugin-version-helper.workspace = true [features] -default = ["v1_22", "aws", "janus", "livekit", "whip"] +default = ["v1_22", "aws", "janus", "livekit", "whip", "whep"] static = [] capi = [] v1_22 = ["gst/v1_22", "gst-app/v1_22", "gst-video/v1_22", "gst-webrtc/v1_22", "gst-sdp/v1_22", "gst-rtp/v1_22"] @@ -91,6 +93,7 @@ aws = ["dep:aws-config", "dep:aws-types", "dep:aws-credential-types", "dep:aws-s janus = ["dep:http"] livekit = ["dep:livekit-protocol", "dep:livekit-api"] whip = ["dep:async-recursion", "dep:reqwest", "dep:warp", "dep:ctrlc"] +whep = ["dep:async-recursion", "dep:reqwest", "dep:warp"] [package.metadata.capi] min_version = "0.9.21" diff --git a/net/webrtc/src/lib.rs b/net/webrtc/src/lib.rs index cb8e993b..0d716b31 100644 --- a/net/webrtc/src/lib.rs +++ b/net/webrtc/src/lib.rs @@ -24,6 +24,8 @@ pub mod signaller; pub mod utils; pub mod webrtcsink; pub mod webrtcsrc; +#[cfg(feature = "whep")] +mod whep_signaller; #[cfg(feature = "whip")] mod whip_signaller; diff --git a/net/webrtc/src/utils.rs b/net/webrtc/src/utils.rs index 1c806c91..dacb30f9 100644 --- a/net/webrtc/src/utils.rs +++ b/net/webrtc/src/utils.rs @@ -104,9 +104,9 @@ use crate::RUNTIME; use futures::future; use futures::prelude::*; use gst::ErrorMessage; -#[cfg(feature = "whip")] +#[cfg(any(feature = "whip", feature = "whep"))] use reqwest::header::HeaderMap; -#[cfg(feature = "whip")] +#[cfg(any(feature = "whip", feature = "whep"))] use reqwest::redirect::Policy; use std::sync::Mutex; use std::time::Duration; @@ -239,7 +239,7 @@ where res } -#[cfg(feature = "whip")] +#[cfg(any(feature = "whip", feature = "whep"))] pub fn parse_redirect_location( headermap: &HeaderMap, old_url: &reqwest::Url, @@ -280,13 +280,13 @@ pub fn parse_redirect_location( } } -#[cfg(feature = "whip")] +#[cfg(any(feature = "whip", feature = "whep"))] pub fn build_reqwest_client(pol: Policy) -> reqwest::Client { let client_builder = reqwest::Client::builder(); client_builder.redirect(pol).build().unwrap() } -#[cfg(feature = "whip")] +#[cfg(any(feature = "whip", feature = "whep"))] pub fn set_ice_servers( webrtcbin: &gst::Element, headermap: &HeaderMap, diff --git a/net/webrtc/src/webrtcsrc/imp.rs b/net/webrtc/src/webrtcsrc/imp.rs index c07ffb2f..ca4e2ed6 100644 --- a/net/webrtc/src/webrtcsrc/imp.rs +++ b/net/webrtc/src/webrtcsrc/imp.rs @@ -586,7 +586,7 @@ impl Session { .map(|codec| { let name = &codec.name; - let (media, clock_rate, pt) = if codec.stream_type == gst::StreamType::AUDIO { + let (media, clock_rate, pt) = if codec.stream_type == gst::StreamType::AUDIO { ("audio", 48000, 96) } else { //video stream type @@ -740,7 +740,7 @@ impl Session { filtered_s.extend(s.iter().filter_map(|(key, value)| { if key.starts_with("extmap-") { - return Some((key, value.to_owned())); + return Some((key, value.to_owned())); } None @@ -774,7 +774,6 @@ impl Session { } webrtcbin.emit_by_name::<()>("set-remote-description", &[&answer, &None::]); - } fn handle_offer( @@ -1707,3 +1706,57 @@ pub(super) mod livekit { type ParentType = crate::webrtcsrc::BaseWebRTCSrc; } } + +#[cfg(feature = "whep")] +pub(super) mod whep { + use super::*; + use crate::whep_signaller::WhepClientSignaller; + + #[derive(Default)] + pub struct WhepClientSrc {} + + impl ObjectImpl for WhepClientSrc { + fn constructed(&self) { + self.parent_constructed(); + let element = self.obj(); + let ws = element + .upcast_ref::() + .imp(); + + let _ = ws.set_signaller(WhepClientSignaller::default().upcast()); + + let obj = &*self.obj(); + + obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE); + obj.set_element_flags(gst::ElementFlags::SOURCE); + } + } + + impl GstObjectImpl for WhepClientSrc {} + + impl BinImpl for WhepClientSrc {} + + impl ElementImpl for WhepClientSrc { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "WhepClientSrc", + "Source/Network/WebRTC", + "WebRTC source element using WHEP Client as the signaller", + "Sanchayan Maity ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + } + + impl BaseWebRTCSrcImpl for WhepClientSrc {} + + #[glib::object_subclass] + impl ObjectSubclass for WhepClientSrc { + const NAME: &'static str = "GstWhepClientSrc"; + type Type = crate::webrtcsrc::WhepClientSrc; + type ParentType = crate::webrtcsrc::BaseWebRTCSrc; + } +} diff --git a/net/webrtc/src/webrtcsrc/mod.rs b/net/webrtc/src/webrtcsrc/mod.rs index fe1b3e50..d6fabad7 100644 --- a/net/webrtc/src/webrtcsrc/mod.rs +++ b/net/webrtc/src/webrtcsrc/mod.rs @@ -59,6 +59,11 @@ glib::wrapper! { pub struct LiveKitWebRTCSrc(ObjectSubclass) @extends BaseWebRTCSrc, gst::Bin, gst::Element, gst::Object, gst::ChildProxy; } +#[cfg(feature = "whep")] +glib::wrapper! { + pub struct WhepClientSrc(ObjectSubclass) @extends BaseWebRTCSrc, gst::Bin, gst::Element, gst::Object, @implements gst::URIHandler, gst::ChildProxy; +} + glib::wrapper! { pub struct WebRTCSrcPad(ObjectSubclass) @extends gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object; } @@ -139,5 +144,13 @@ pub fn register(plugin: Option<&gst::Plugin>) -> Result<(), glib::BoolError> { LiveKitWebRTCSrc::static_type(), )?; + #[cfg(feature = "whep")] + gst::Element::register( + plugin, + "whepclientsrc", + gst::Rank::PRIMARY, + WhepClientSrc::static_type(), + )?; + Ok(()) } diff --git a/net/webrtc/src/whep_signaller/client.rs b/net/webrtc/src/whep_signaller/client.rs new file mode 100644 index 00000000..7769f708 --- /dev/null +++ b/net/webrtc/src/whep_signaller/client.rs @@ -0,0 +1,600 @@ +// Copyright (C) 2022, Asymptotic Inc. +// Author: Sanchayan Maity +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use crate::signaller::{Signallable, SignallableImpl}; +use crate::utils::{ + build_reqwest_client, parse_redirect_location, set_ice_servers, wait, wait_async, WaitError, +}; +use crate::RUNTIME; +use async_recursion::async_recursion; +use bytes::Bytes; +use futures::future; +use gst::glib::RustClosure; +use gst::{glib, prelude::*, subclass::prelude::*}; +use gst_sdp::*; +use gst_webrtc::*; +use once_cell::sync::Lazy; +use reqwest::header::{HeaderMap, HeaderValue}; +use reqwest::StatusCode; +use std::sync::Mutex; + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "whep-client-signaller", + gst::DebugColorFlags::empty(), + Some("WHEP Client Signaller"), + ) +}); + +const MAX_REDIRECTS: u8 = 10; +const DEFAULT_TIMEOUT: u32 = 15; +const SESSION_ID: &str = "whep-client"; + +#[derive(Debug, Clone)] +struct Settings { + whep_endpoint: Option, + auth_token: Option, + use_link_headers: bool, + timeout: u32, +} + +#[allow(clippy::derivable_impls)] +impl Default for Settings { + fn default() -> Self { + Self { + whep_endpoint: None, + auth_token: None, + use_link_headers: false, + timeout: DEFAULT_TIMEOUT, + } + } +} + +#[derive(Debug)] +enum State { + Stopped, + Post { redirects: u8 }, + Running { whep_resource: String }, +} + +impl Default for State { + fn default() -> Self { + Self::Stopped + } +} + +pub struct WhepClient { + settings: Mutex, + state: Mutex, + canceller: Mutex>, + client: reqwest::Client, +} + +impl Default for WhepClient { + fn default() -> Self { + // We'll handle redirects manually since the default redirect handler does not + // reuse the authentication token on the redirected server + let pol = reqwest::redirect::Policy::none(); + let client = build_reqwest_client(pol); + + Self { + settings: Mutex::new(Settings::default()), + state: Mutex::new(State::default()), + canceller: Mutex::new(None), + client, + } + } +} + +impl ObjectImpl for WhepClient { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecString::builder("whep-endpoint") + .nick("WHEP Endpoint") + .blurb("The WHEP server endpoint to POST SDP offer to.") + .build(), + glib::ParamSpecBoolean::builder("use-link-headers") + .nick("Use Link Headers") + .blurb("Use link headers to configure STUN/TURN servers if present in WHEP endpoint response.") + .build(), + glib::ParamSpecString::builder("auth-token") + .nick("Authorization Token") + .blurb("Authentication token to use, will be sent in the HTTP Header as 'Bearer '") + .build(), + glib::ParamSpecUInt::builder("timeout") + .nick("Timeout") + .blurb("Value in seconds to timeout WHEP endpoint requests (0 = No timeout).") + .maximum(3600) + .default_value(DEFAULT_TIMEOUT) + .readwrite() + .build(), + ] + }); + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "whep-endpoint" => { + let mut settings = self.settings.lock().unwrap(); + settings.whep_endpoint = value.get().expect("WHEP endpoint should be a string"); + } + "use-link-headers" => { + let mut settings = self.settings.lock().unwrap(); + settings.use_link_headers = value + .get() + .expect("use-link-headers should be a boolean value"); + } + "auth-token" => { + let mut settings = self.settings.lock().unwrap(); + settings.auth_token = value.get().expect("Auth token should be a string"); + } + "timeout" => { + let mut settings = self.settings.lock().unwrap(); + settings.timeout = value.get().expect("type checked upstream"); + } + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "whep-endpoint" => { + let settings = self.settings.lock().unwrap(); + settings.whep_endpoint.to_value() + } + "use-link-headers" => { + let settings = self.settings.lock().unwrap(); + settings.use_link_headers.to_value() + } + "auth-token" => { + let settings = self.settings.lock().unwrap(); + settings.auth_token.to_value() + } + "timeout" => { + let settings = self.settings.lock().unwrap(); + settings.timeout.to_value() + } + _ => unimplemented!(), + } + } +} + +#[glib::object_subclass] +impl ObjectSubclass for WhepClient { + const NAME: &'static str = "GstWhepClientSignaller"; + type Type = super::WhepClientSignaller; + type ParentType = glib::Object; + type Interfaces = (Signallable,); +} + +impl WhepClient { + fn raise_error(&self, msg: String) { + self.obj() + .emit_by_name::<()>("error", &[&format!("Error: {msg}")]); + } + + fn handle_future_error(&self, err: WaitError) { + match err { + WaitError::FutureAborted => { + gst::warning!(CAT, imp: self, "Future aborted") + } + WaitError::FutureError(err) => self.raise_error(err.to_string()), + }; + } + + fn sdp_message_parse(&self, sdp_bytes: Bytes, _webrtcbin: &gst::Element) { + let sdp = match sdp_message::SDPMessage::parse_buffer(&sdp_bytes) { + Ok(sdp) => sdp, + Err(_) => { + self.raise_error("Could not parse answer SDP".to_string()); + return; + } + }; + + let remote_sdp = WebRTCSessionDescription::new(WebRTCSDPType::Answer, sdp); + + self.obj() + .emit_by_name::<()>("session-description", &[&SESSION_ID, &remote_sdp]); + } + + async fn parse_endpoint_response( + &self, + sess_desc: WebRTCSessionDescription, + resp: reqwest::Response, + redirects: u8, + webrtcbin: gst::Element, + ) { + let endpoint; + let use_link_headers; + + { + let settings = self.settings.lock().unwrap(); + endpoint = + reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str()).unwrap(); + use_link_headers = settings.use_link_headers; + drop(settings); + } + + match resp.status() { + StatusCode::OK | StatusCode::NO_CONTENT => { + gst::info!(CAT, imp: self, "SDP offer successfully send"); + } + + StatusCode::CREATED => { + gst::debug!(CAT, imp: self, "Response headers: {:?}", resp.headers()); + + if use_link_headers { + if let Err(e) = set_ice_servers(&webrtcbin, resp.headers()) { + self.raise_error(e.to_string()); + return; + }; + } + + /* See section 4.2 of the WHEP specification */ + let location = match resp.headers().get(reqwest::header::LOCATION) { + Some(location) => location, + None => { + self.raise_error( + "Location header field should be present for WHEP resource URL" + .to_string(), + ); + return; + } + }; + + let location = match location.to_str() { + Ok(loc) => loc, + Err(e) => { + self.raise_error(format!("Failed to convert location to string: {e}")); + return; + } + }; + + let url = reqwest::Url::parse(endpoint.as_str()).unwrap(); + + gst::debug!(CAT, imp: self, "WHEP resource: {:?}", location); + + let url = match url.join(location) { + Ok(joined_url) => joined_url, + Err(err) => { + self.raise_error(format!("URL join operation failed: {err:?}")); + return; + } + }; + + match resp.bytes().await { + Ok(ans_bytes) => { + let mut state = self.state.lock().unwrap(); + *state = match *state { + State::Post { redirects: _r } => State::Running { + whep_resource: url.to_string(), + }, + _ => { + self.raise_error("Expected to be in POST state".to_string()); + return; + } + }; + drop(state); + + self.sdp_message_parse(ans_bytes, &webrtcbin) + } + Err(err) => self.raise_error(err.to_string()), + } + } + + status if status.is_redirection() => { + if redirects < MAX_REDIRECTS { + match parse_redirect_location(resp.headers(), &endpoint) { + Ok(redirect_url) => { + { + let mut state = self.state.lock().unwrap(); + *state = match *state { + State::Post { redirects: _r } => State::Post { + redirects: redirects + 1, + }, + /* + * As per section 4.6 of the specification, redirection is + * not required to be supported for the PATCH and DELETE + * requests to the final WHEP resource URL. Only the initial + * POST request may support redirection. + */ + State::Running { .. } => { + self.raise_error( + "Unexpected redirection in RUNNING state".to_string(), + ); + return; + } + State::Stopped => unreachable!(), + }; + drop(state); + } + + gst::warning!( + CAT, + imp: self, + "Redirecting endpoint to {}", + redirect_url.as_str() + ); + + self.do_post(sess_desc, webrtcbin, redirect_url).await + } + Err(e) => self.raise_error(e.to_string()), + } + } else { + self.raise_error("Too many redirects. Unable to connect.".to_string()); + } + } + + s => { + match resp.bytes().await { + Ok(r) => { + let res = r.escape_ascii().to_string(); + + // FIXME: Check and handle 'Retry-After' header in case of server error + self.raise_error(format!("Unexpected response: {} - {}", s.as_str(), res)); + } + Err(err) => self.raise_error(err.to_string()), + } + } + } + } + + async fn whep_offer(&self, webrtcbin: gst::Element) { + let local_desc = + webrtcbin.property::>("local-description"); + + let sess_desc = match local_desc { + None => { + self.raise_error("Local description is not set".to_string()); + return; + } + Some(mut local_desc) => { + local_desc.set_type(WebRTCSDPType::Offer); + local_desc + } + }; + + gst::debug!( + CAT, + imp: self, + "Sending offer SDP: {:?}", + sess_desc.sdp().as_text() + ); + + let timeout; + let endpoint; + + { + let settings = self.settings.lock().unwrap(); + timeout = settings.timeout; + endpoint = + reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str()).unwrap(); + drop(settings); + } + + if let Err(e) = wait_async( + &self.canceller, + self.do_post(sess_desc, webrtcbin, endpoint), + timeout, + ) + .await + { + self.handle_future_error(e); + } + } + + #[async_recursion] + async fn do_post( + &self, + offer: WebRTCSessionDescription, + webrtcbin: gst::Element, + endpoint: reqwest::Url, + ) { + let auth_token; + + { + let settings = self.settings.lock().unwrap(); + auth_token = settings.auth_token.clone(); + drop(settings); + } + + let sdp = offer.sdp(); + let body = sdp.as_text().unwrap(); + + gst::info!(CAT, imp: self, "Using endpoint {}", endpoint.as_str()); + + let mut headermap = HeaderMap::new(); + headermap.insert( + reqwest::header::CONTENT_TYPE, + HeaderValue::from_static("application/sdp"), + ); + + if let Some(token) = auth_token.as_ref() { + let bearer_token = "Bearer ".to_owned() + token.as_str(); + headermap.insert( + reqwest::header::AUTHORIZATION, + HeaderValue::from_str(bearer_token.as_str()) + .expect("Failed to set auth token to header"), + ); + } + + gst::debug!( + CAT, + imp: self, + "Url for HTTP POST request: {}", + endpoint.as_str() + ); + + let resp = self + .client + .request(reqwest::Method::POST, endpoint.clone()) + .headers(headermap) + .body(body) + .send() + .await; + + match resp { + Ok(r) => { + #[allow(unused_mut)] + let mut redirects; + + { + let state = self.state.lock().unwrap(); + redirects = match *state { + State::Post { redirects } => redirects, + _ => { + self.raise_error("Trying to do POST in unexpected state".to_string()); + return; + } + }; + drop(state); + } + + self.parse_endpoint_response(offer, r, redirects, webrtcbin) + .await + } + Err(err) => self.raise_error(err.to_string()), + } + } + + fn terminate_session(&self) { + let settings = self.settings.lock().unwrap(); + let state = self.state.lock().unwrap(); + let timeout = settings.timeout; + + let resource_url = match *state { + State::Running { + whep_resource: ref whep_resource_url, + } => whep_resource_url.clone(), + _ => { + self.raise_error("Terminated in unexpected state".to_string()); + return; + } + }; + + drop(state); + + let mut headermap = HeaderMap::new(); + if let Some(token) = &settings.auth_token { + let bearer_token = "Bearer ".to_owned() + token.as_str(); + headermap.insert( + reqwest::header::AUTHORIZATION, + HeaderValue::from_str(bearer_token.as_str()) + .expect("Failed to set auth token to header"), + ); + } + + drop(settings); + + gst::debug!(CAT, imp: self, "DELETE request on {}", resource_url); + + /* DELETE request goes to the WHEP resource URL. See section 3 of the specification. */ + let client = build_reqwest_client(reqwest::redirect::Policy::default()); + let future = async { + client + .delete(resource_url.clone()) + .headers(headermap) + .send() + .await + .map_err(|err| { + gst::error_msg!( + gst::ResourceError::Failed, + ["DELETE request failed {}: {:?}", resource_url, err] + ) + }) + }; + + let res = wait(&self.canceller, future, timeout); + match res { + Ok(r) => { + gst::debug!(CAT, imp: self, "Response to DELETE : {}", r.status()); + } + Err(e) => match e { + WaitError::FutureAborted => { + gst::warning!(CAT, imp: self, "DELETE request aborted") + } + WaitError::FutureError(e) => { + gst::error!(CAT, imp: self, "Error on DELETE request : {}", e) + } + }, + }; + } + + pub fn on_webrtcbin_ready(&self) -> RustClosure { + glib::closure!(|signaller: &super::WhepClientSignaller, + _consumer_identifier: &str, + webrtcbin: &gst::Element| { + let obj_weak = signaller.downgrade(); + + webrtcbin.connect_notify(Some("ice-gathering-state"), move |webrtcbin, _pspec| { + let Some(obj) = obj_weak.upgrade() else { + return; + }; + + let state = webrtcbin.property::("ice-gathering-state"); + + match state { + WebRTCICEGatheringState::Gathering => { + gst::info!(CAT, obj: obj, "ICE gathering started"); + } + WebRTCICEGatheringState::Complete => { + gst::info!(CAT, obj: obj, "ICE gathering complete"); + + let webrtcbin = webrtcbin.clone(); + + RUNTIME.spawn(async move { obj.imp().whep_offer(webrtcbin).await }); + } + _ => (), + } + }); + }) + } +} + +impl SignallableImpl for WhepClient { + fn start(&self) { + if self.settings.lock().unwrap().whep_endpoint.is_none() { + self.raise_error("WHEP endpoint URL must be set".to_string()); + return; + } + + let mut state = self.state.lock().unwrap(); + *state = State::Post { redirects: 0 }; + drop(state); + + self.obj() + .emit_by_name::<()>("session-started", &[&SESSION_ID, &SESSION_ID]); + self.obj().emit_by_name::<()>( + "session-requested", + &[ + &SESSION_ID, + &SESSION_ID, + &None::, + ], + ); + } + + fn stop(&self) {} + + fn end_session(&self, _session_id: &str) { + // Interrupt requests in progress, if any + if let Some(canceller) = &*self.canceller.lock().unwrap() { + canceller.abort(); + } + + let state = self.state.lock().unwrap(); + if let State::Running { .. } = *state { + // Release server-side resources + drop(state); + self.terminate_session(); + } + } +} diff --git a/net/webrtc/src/whep_signaller/mod.rs b/net/webrtc/src/whep_signaller/mod.rs new file mode 100644 index 00000000..e9a2908c --- /dev/null +++ b/net/webrtc/src/whep_signaller/mod.rs @@ -0,0 +1,21 @@ +// SPDX-License-Identifier: MPL-2.0 + +use crate::signaller::Signallable; +use gst::{glib, prelude::ObjectExt, subclass::prelude::ObjectSubclassIsExt}; + +mod client; + +glib::wrapper! { + pub struct WhepClientSignaller(ObjectSubclass) @implements Signallable; +} + +unsafe impl Send for WhepClientSignaller {} +unsafe impl Sync for WhepClientSignaller {} + +impl Default for WhepClientSignaller { + fn default() -> Self { + let sig: WhepClientSignaller = glib::Object::new(); + sig.connect_closure("webrtcbin-ready", false, sig.imp().on_webrtcbin_ready()); + sig + } +}