diff --git a/net/webrtc/Cargo.toml b/net/webrtc/Cargo.toml index 42a7c5aa..b5d61c28 100644 --- a/net/webrtc/Cargo.toml +++ b/net/webrtc/Cargo.toml @@ -78,7 +78,7 @@ path = "src/lib.rs" gst-plugin-version-helper.workspace = true [features] -default = ["v1_22", "aws", "janus", "livekit", "whip"] +default = ["v1_22", "aws", "janus", "livekit", "whip", "whep"] static = [] capi = [] v1_22 = ["gst/v1_22", "gst-app/v1_22", "gst-video/v1_22", "gst-webrtc/v1_22", "gst-sdp/v1_22", "gst-rtp/v1_22"] @@ -90,6 +90,7 @@ aws = ["dep:aws-config", "dep:aws-types", "dep:aws-credential-types", "dep:aws-s janus = ["dep:http"] livekit = ["dep:livekit-protocol", "dep:livekit-api"] whip = ["dep:async-recursion", "dep:crossbeam-channel", "dep:reqwest", "dep:warp"] +whep = ["dep:async-recursion", "dep:reqwest", "dep:warp"] [package.metadata.capi] min_version = "0.9.21" diff --git a/net/webrtc/src/lib.rs b/net/webrtc/src/lib.rs index cb8e993b..257a1a68 100644 --- a/net/webrtc/src/lib.rs +++ b/net/webrtc/src/lib.rs @@ -26,6 +26,8 @@ pub mod webrtcsink; pub mod webrtcsrc; #[cfg(feature = "whip")] mod whip_signaller; +#[cfg(feature = "whep")] +mod whep_signaller; fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { webrtcsink::register(plugin)?; diff --git a/net/webrtc/src/utils.rs b/net/webrtc/src/utils.rs index 1c806c91..b32371fb 100644 --- a/net/webrtc/src/utils.rs +++ b/net/webrtc/src/utils.rs @@ -362,6 +362,7 @@ pub fn set_ice_servers( Ok(()) } +#[cfg(any(feature = "whip", feature = "whep"))] pub fn build_link_header(url_str: &str) -> Result { let url = url::Url::parse(url_str)?; diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index 847352e9..1d00d0e3 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -4733,3 +4733,48 @@ pub(super) mod janus { type ParentType = crate::webrtcsink::BaseWebRTCSink; } } + +#[cfg(feature = "whep")] +pub(super) mod whep { + use super::*; + use crate::whep_signaller::WhepServerSignaller; + #[derive(Default)] + pub struct WhepWebRTCSink {} + + impl ObjectImpl for WhepWebRTCSink { + fn constructed(&self) { + let element = self.obj(); + let ws = element.upcast_ref::().imp(); + + let _ = ws.set_signaller(WhepServerSignaller::default().upcast()); + } + } + + impl GstObjectImpl for WhepWebRTCSink {} + + impl ElementImpl for WhepWebRTCSink { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "WhepWebRTCSink", + "Sink/Network/WebRTC", + "WebRTC sink with WHEP server signaller", + "Taruntej Kanakamalla ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + } + + impl BinImpl for WhepWebRTCSink {} + + impl BaseWebRTCSinkImpl for WhepWebRTCSink {} + + #[glib::object_subclass] + impl ObjectSubclass for WhepWebRTCSink { + const NAME: &'static str = "GstWhepWebRTCSink"; + type Type = crate::webrtcsink::WhepWebRTCSink; + type ParentType = crate::webrtcsink::BaseWebRTCSink; + } +} \ No newline at end of file diff --git a/net/webrtc/src/webrtcsink/mod.rs b/net/webrtc/src/webrtcsink/mod.rs index 33399467..88ffb18a 100644 --- a/net/webrtc/src/webrtcsink/mod.rs +++ b/net/webrtc/src/webrtcsink/mod.rs @@ -72,6 +72,10 @@ glib::wrapper! { glib::wrapper! { pub struct JanusVRWebRTCSink(ObjectSubclass) @extends BaseWebRTCSink, gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation; } +#[cfg(feature = "whep")] +glib::wrapper! { + pub struct WhepWebRTCSink(ObjectSubclass) @extends BaseWebRTCSink, gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation; +} #[derive(thiserror::Error, Debug)] pub enum WebRTCSinkError { @@ -229,6 +233,13 @@ pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { gst::Rank::NONE, JanusVRWebRTCSink::static_type(), )?; + #[cfg(feature = "whep")] + gst::Element::register( + Some(plugin), + "whepserversink", + gst::Rank::NONE, + WhepWebRTCSink::static_type(), + )?; Ok(()) } diff --git a/net/webrtc/src/whep_signaller/mod.rs b/net/webrtc/src/whep_signaller/mod.rs new file mode 100644 index 00000000..7e36c10e --- /dev/null +++ b/net/webrtc/src/whep_signaller/mod.rs @@ -0,0 +1,20 @@ +// SPDX-License-Identifier: MPL-2.0 + +use crate::signaller::Signallable; +use gst::{glib, prelude::ObjectExt, subclass::prelude::ObjectSubclassIsExt}; + +mod server; +glib::wrapper! { + pub struct WhepServerSignaller(ObjectSubclass) @implements Signallable; +} + +unsafe impl Send for WhepServerSignaller {} +unsafe impl Sync for WhepServerSignaller {} + +impl Default for WhepServerSignaller { + fn default() -> Self { + let sig: WhepServerSignaller = glib::Object::new(); + sig.connect_closure("webrtcbin-ready", false, sig.imp().on_webrtcbin_ready()); + sig + } +} \ No newline at end of file diff --git a/net/webrtc/src/whep_signaller/server.rs b/net/webrtc/src/whep_signaller/server.rs new file mode 100644 index 00000000..64ab9052 --- /dev/null +++ b/net/webrtc/src/whep_signaller/server.rs @@ -0,0 +1,580 @@ +// SPDX-License-Identifier: MPL-2.0 + +use crate::signaller::{Signallable, SignallableImpl}; +use crate::utils::{build_link_header, wait_async, WaitError}; +use crate::RUNTIME; +use once_cell::sync::Lazy; +use gst::glib::{self, RustClosure}; +use gst::prelude::*; +use gst::subclass::prelude::*; +use gst_sdp::SDPMessage; +use gst_webrtc::{WebRTCICEGatheringState, WebRTCSessionDescription}; +use reqwest::header::HeaderMap; +use reqwest::header::HeaderValue; +use reqwest::StatusCode; +use std::sync::Mutex; + +use std::net::SocketAddr; +use tokio::sync::mpsc; +use url::Url; +use warp::{ + http, + hyper::{ + header::{CONTENT_TYPE, LINK}, + Body, + }, + Filter, Reply, +}; + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "whep-server-signaller", + gst::DebugColorFlags::empty(), + Some("WHEP Server Signaller"), + ) +}); + +const DEFAULT_TIMEOUT: u32 = 30; + +const ROOT: &str = "whep"; +const ENDPOINT_PATH: &str = "endpoint"; +const RESOURCE_PATH: &str = "resource"; +const DEFAULT_HOST_ADDR: &str = "http://127.0.0.1:9090"; +const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://stun.l.google.com:19303"); +const CONTENT_SDP: &str = "application/sdp"; +const CONTENT_TRICKLE_ICE: &str = "application/trickle-ice-sdpfrag"; + +struct Settings { + stun_server: Option, + turn_servers: gst::Array, + host_addr: Url, + timeout: u32, + shutdown_signal: Option>, + server_handle: Option>, + sdp_answer: Option>>, +} + +impl Default for Settings { + fn default() -> Self { + Self { + host_addr: Url::parse(DEFAULT_HOST_ADDR).unwrap(), + stun_server: DEFAULT_STUN_SERVER.map(String::from), + turn_servers: gst::Array::new(Vec::new() as Vec), + timeout: DEFAULT_TIMEOUT, + shutdown_signal: None, + server_handle: None, + sdp_answer: None, + } + } +} + +#[derive(Default)] +pub struct WhepServer { + settings: Mutex, + canceller: Mutex>, +} + +impl WhepServer { + pub fn on_webrtcbin_ready(&self) -> RustClosure { + glib::closure!(|signaller: &super::WhepServerSignaller, + _consumer_identifier: &str, + webrtcbin: &gst::Element| { + let obj_weak = signaller.downgrade(); + webrtcbin.connect_notify(Some("ice-gathering-state"), move |webrtcbin, _pspec| { + let obj = match obj_weak.upgrade() { + Some(obj) => obj, + None => return, + }; + + let state = webrtcbin.property::("ice-gathering-state"); + + match state { + WebRTCICEGatheringState::Gathering => { + gst::info!(CAT, obj: obj, "ICE gathering started"); + } + WebRTCICEGatheringState::Complete => { + gst::info!(CAT, obj: obj, "ICE gathering complete"); + let ans: Option; + let mut settings = obj.imp().settings.lock().unwrap(); + if let Some(answer_desc) = webrtcbin + .property::>("local-description") + { + ans = Some(answer_desc.sdp().to_owned()); + } else { + ans = None; + } + + let tx = settings + .sdp_answer + .take() + .expect("SDP answer Sender needs to be valid"); + + let obj_weak = obj.downgrade(); + RUNTIME.spawn( async move { + let obj = match obj_weak.upgrade() { + Some(obj) => obj, + None => return, + }; + + if let Err(e) = tx.send(ans).await { + gst::error!(CAT, obj: obj, "Failed to send SDP {e}"); + } + }); + } + _ => (), + } + }); + }) + } + + async fn patch_handler(&self, _id: String) -> Result { + // FIXME: implement ICE Trickle and ICE restart + // emit signal `handle-ice` to for ICE trickle + let reply = warp::reply::reply(); + let res = warp::reply::with_status(reply, http::StatusCode::NOT_IMPLEMENTED); + Ok(res.into_response()) + //FIXME: add state checking once ICE trickle is implemented + } + + async fn delete_handler(&self, id: String) -> Result { + if self + .obj() + .emit_by_name::("session-ended", &[&id.as_str()]) + { + //do nothing + // FIXME: revisit once the return values are changed in webrtcsink/imp.rs and webrtcsrc/imp.rs + } + + gst::info!(CAT, imp:self, "Ended session {id}"); + Ok(warp::reply::reply().into_response()) + } + + async fn options_handler(&self) -> Result { + let mut links = HeaderMap::new(); + let settings = self.settings.lock().unwrap(); + + match &settings.stun_server { + Some(stun) => match build_link_header(stun.as_str()) { + Ok(stun_link) => { + links.append(LINK, HeaderValue::from_str(stun_link.as_str()).unwrap()); + } + Err(e) => { + gst::error!(CAT, imp: self, "Failed to parse {stun:?} : {e:?}"); + } + }, + None => {} + } + + if !settings.turn_servers.is_empty() { + for turn_server in settings.turn_servers.iter() { + if let Ok(turn) = turn_server.get::() { + gst::debug!(CAT, imp: self, "turn server: {}",turn.as_str()); + match build_link_header(turn.as_str()) { + Ok(turn_link) => { + links.append(LINK, HeaderValue::from_str(turn_link.as_str()).unwrap()); + } + Err(e) => { + gst::error!(CAT, imp: self, "Failed to parse {turn_server:?} : {e:?}"); + } + } + } else { + gst::debug!(CAT, imp: self, "Failed to get String value of {turn_server:?}"); + } + } + } + + let mut res = http::Response::builder() + .header("Access-Post", CONTENT_SDP) + .body(Body::empty()) + .unwrap(); + + let headers = res.headers_mut(); + headers.extend(links); + + Ok(res) + } + + async fn post_handler( + &self, + body: warp::hyper::body::Bytes, + ) -> Result, warp::Rejection> { + let session_id = uuid::Uuid::new_v4().to_string(); + let (tx, mut rx) = mpsc::channel::>(1); + + let wait_timeout = { + let mut settings = self.settings.lock().unwrap(); + let wait_timeout = settings.timeout; + settings.sdp_answer = Some(tx); + drop(settings); + wait_timeout + }; + + match gst_sdp::SDPMessage::parse_buffer(body.as_ref()) { + Ok(offer_sdp) => { + let offer = gst_webrtc::WebRTCSessionDescription::new( + gst_webrtc::WebRTCSDPType::Offer, + offer_sdp, + ); + self.obj() + .emit_by_name::<()>("session-requested", &[&session_id, &session_id, &offer]); + } + Err(err) => { + gst::error!(CAT, imp: self, "Could not parse offer SDP: {err}"); + let reply = warp::reply::reply(); + let res = warp::reply::with_status(reply, http::StatusCode::NOT_ACCEPTABLE); + return Ok(res.into_response()); + } + } + + let result = wait_async(&self.canceller, rx.recv(), wait_timeout).await; + + let answer = match result { + Ok(ans) => { + match ans { + Some(a) => a, + None => { + let err = "Channel closed, can't receive SDP".to_owned(); + let res = http::Response::builder() + .status(http::StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from(err)) + .unwrap(); + + return Ok(res); + }, + } + }, + Err(e) => { + let err = match e { + WaitError::FutureAborted => { + "Aborted".to_owned() + } + WaitError::FutureError(err) => { + err.to_string() + } + }; + let res = http::Response::builder() + .status(http::StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from(err)) + .unwrap(); + + return Ok(res); + } + }; + + let settings = self.settings.lock().unwrap(); + let mut links = HeaderMap::new(); + + if let Some(stun) = &settings.stun_server { + match build_link_header(stun.as_str()) { + Ok(stun_link) => { + links.append(LINK, HeaderValue::from_str(stun_link.as_str()).unwrap()); + } + Err(e) => { + gst::error!(CAT, imp: self, "Failed to parse {stun:?} : {e:?}"); + } + } + } + + if !settings.turn_servers.is_empty() { + for turn_server in settings.turn_servers.iter() { + if let Ok(turn) = turn_server.get::() { + gst::debug!(CAT, imp: self, "turn server: {}",turn.as_str()); + match build_link_header(turn.as_str()) { + Ok(turn_link) => { + links.append(LINK, HeaderValue::from_str(turn_link.as_str()).unwrap()); + } + Err(e) => { + gst::error!(CAT, imp: self, "Failed to parse {turn_server:?} : {e:?}"); + } + } + } else { + gst::error!(CAT, imp: self, "Failed to get String value of {turn_server:?}"); + } + } + } + + // Note: including the ETag in the original "201 Created" response is only REQUIRED + // if the WHEP resource supports ICE restarts and OPTIONAL otherwise. + + let ans_text: Result; + if let Some(sdp) = answer { + match sdp.as_text() { + Ok(text) => { + ans_text = Ok(text); + gst::debug!(CAT, imp: self, "{ans_text:?}"); + } + Err(e) => { + ans_text = Err(format!("Failed to get SDP answer: {e:?}")); + gst::error!(CAT, imp: self, "{e:?}"); + } + } + } else { + let e = "SDP Answer is empty!".to_string(); + gst::error!(CAT, imp: self, "{e:?}"); + ans_text = Err(e); + } + + // If ans_text is an error. Send error code and error string in the response + if let Err(e) = ans_text { + let res = http::Response::builder() + .status(http::StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from(e)) + .unwrap(); + return Ok(res); + } + + let resource_url = "/".to_owned() + ROOT + "/" + RESOURCE_PATH + "/" + &session_id; + let mut res = http::Response::builder() + .status(StatusCode::CREATED) + .header(CONTENT_TYPE, CONTENT_SDP) + .header("location", resource_url) + .body(Body::from(ans_text.unwrap())) + .unwrap(); + + let headers = res.headers_mut(); + headers.extend(links); + + Ok(res) + } + + fn serve(&self) -> Option> { + let mut settings = self.settings.lock().unwrap(); + let addr: SocketAddr; + match settings.host_addr.socket_addrs(|| None) { + Ok(v) => { + // pick the first vector item + addr = v[0]; + gst::info!(CAT, imp:self, "using {addr:?} as address"); + } + Err(e) => { + gst::error!(CAT, imp:self, "error getting addr from uri {e:?}"); + self.obj() + .emit_by_name::<()>("error", &[&format!("Unable to start Whep Server: {e:?}")]); + return None; + } + } + + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + settings.shutdown_signal = Some(tx); + drop(settings); + + let prefix = warp::path(ROOT); + + let self_weak = self.downgrade(); + + // POST /endpoint + let post_filter = warp::post() + .and(warp::path(ENDPOINT_PATH)) + .and(warp::path::end()) + .and(warp::header::exact(CONTENT_TYPE.as_str(), CONTENT_SDP)) + .and(warp::body::bytes()) + .and_then(move |body| { + let s = self_weak.upgrade(); + async { + let self_ = s.expect("Need to have the ObjectRef"); + self_.post_handler(body).await + } + }); + + let self_weak = self.downgrade(); + + // OPTIONS /endpoint + let options_filter = warp::options() + .and(warp::path(ENDPOINT_PATH)) + .and(warp::path::end()) + .and_then(move || { + let s = self_weak.upgrade(); + async { + let self_ = s.expect("Need to have the ObjectRef"); + self_.options_handler().await + } + }); + + let self_weak = self.downgrade(); + + // PATCH /resource/:id + let patch_filter = warp::patch() + .and(warp::path(RESOURCE_PATH)) + .and(warp::path::param::()) + .and(warp::path::end()) + .and(warp::header::exact( + CONTENT_TYPE.as_str(), + CONTENT_TRICKLE_ICE, + )) + .and_then(move |id| { + let s = self_weak.upgrade(); + async { + let self_ = s.expect("Need to have the ObjectRef"); + self_.patch_handler(id).await + } + }); + + let self_weak = self.downgrade(); + + // DELETE /resource/:id + let delete_filter = warp::delete() + .and(warp::path(RESOURCE_PATH)) + .and(warp::path::param::()) + .and(warp::path::end()) + .and_then(move |id| { + let s = self_weak.upgrade(); + async { + let self_ = s.expect("Need to have the ObjectRef"); + self_.delete_handler(id).await + } + }); + + let api = prefix + .and(post_filter) + .or(prefix.and(options_filter)) + .or(prefix.and(patch_filter)) + .or(prefix.and(delete_filter)); + + let s = warp::serve(api); + let f = async move { + let (_, server) = s.bind_with_graceful_shutdown(addr, async move { + match rx.await { + Ok(_) => gst::debug!(CAT, "Server shut down signal received"), + Err(e) => gst::error!(CAT, "{e:?}: Sender dropped"), + } + }); + + server.await; + gst::debug!(CAT, "Stopped the server task..."); + }; + + let jh = RUNTIME.spawn(f); + + gst::debug!(CAT, imp: self, "Started the server..."); + Some(jh) + } + + fn set_host_addr(&self, host_addr: &str) -> Result<(), url::ParseError> { + let mut settings = self.settings.lock().unwrap(); + settings.host_addr = Url::parse(host_addr)?; + Ok(()) + } +} + +impl SignallableImpl for WhepServer { + fn start(&self) { + gst::info!(CAT, imp: self, "starting the WHEP server"); + let jh = self.serve(); + let mut settings = self.settings.lock().unwrap(); + settings.server_handle = jh; + } + + fn stop(&self) { + let mut settings = self.settings.lock().unwrap(); + + let handle = settings + .server_handle + .take() + .expect("Server handle should be set"); + + let tx = settings + .shutdown_signal + .take() + .expect("Shutdown signal Sender needs to be valid"); + + if tx.send(()).is_err() { + gst::error!(CAT, imp: self, "Failed to send shutdown signal. Receiver dropped"); + } + + gst::debug!(CAT, imp: self, "Await server handle to join"); + RUNTIME.block_on(async { + if let Err(e) = handle.await { + gst::error!(CAT, imp:self, "Failed to join server handle: {e:?}"); + }; + }); + + gst::info!(CAT, imp: self, "stopped the WHEP server"); + } + + fn end_session(&self, _session_id: &str) { + //FIXME: send any events to the client + } +} + +#[glib::object_subclass] +impl ObjectSubclass for WhepServer { + const NAME: &'static str = "GstWhepServerSignaller"; + type Type = super::WhepServerSignaller; + type ParentType = glib::Object; + type Interfaces = (Signallable,); +} + +impl ObjectImpl for WhepServer { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecString::builder("host-addr") + .nick("Host address") + .blurb("The the host address of the WHEP endpoint e.g., http://127.0.0.1:9090") + .default_value(DEFAULT_HOST_ADDR) + .flags(glib::ParamFlags::READWRITE) + .build(), + glib::ParamSpecString::builder("stun-server") + .nick("STUN Server") + .blurb("The STUN server of the form stun://hostname:port") + .default_value(DEFAULT_STUN_SERVER) + .build(), + gst::ParamSpecArray::builder("turn-servers") + .nick("List of TURN Servers to user") + .blurb("The TURN servers of the form <\"turn(s)://username:password@host:port\", \"turn(s)://username1:password1@host1:port1\">") + .element_spec(&glib::ParamSpecString::builder("turn-server") + .nick("TURN Server") + .blurb("The TURN server of the form turn(s)://username:password@host:port.") + .build() + ) + .mutable_ready() + .build(), + glib::ParamSpecUInt::builder("timeout") + .nick("Timeout") + .blurb("Value in seconds to timeout WHEP endpoint requests (0 = No timeout).") + .maximum(3600) + .default_value(DEFAULT_TIMEOUT) + .build(), + ] + }); + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "host-addr" => { + if let Err(e) = + self.set_host_addr(value.get::<&str>().expect("type checked upstream")) + { + gst::error!(CAT, "Couldn't set the host address as {e:?}, fallback to the default value {DEFAULT_HOST_ADDR:?}"); + } + } + "stun-server" => { + let mut settings = self.settings.lock().unwrap(); + settings.stun_server = value + .get::>() + .expect("type checked upstream") + } + "turn-servers" => { + let mut settings = self.settings.lock().unwrap(); + settings.turn_servers = value.get::().expect("type checked upstream") + } + "timeout" => { + let mut settings = self.settings.lock().unwrap(); + settings.timeout = value.get().unwrap(); + } + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + let settings = self.settings.lock().unwrap(); + match pspec.name() { + "host-addr" => settings.host_addr.to_string().to_value(), + "stun-server" => settings.stun_server.to_value(), + "turn-servers" => settings.turn_servers.to_value(), + "timeout" => settings.timeout.to_value(), + _ => unimplemented!(), + } + } +}