Merge branch 'whepsink' into 'main'

Draft: net/webrtc: add WHEP server signaller

See merge request gstreamer/gst-plugins-rs!1538
This commit is contained in:
Taruntej Kanakamalla 2024-04-27 05:04:40 +00:00
commit b90e84cfba
7 changed files with 661 additions and 1 deletions

View file

@ -78,7 +78,7 @@ path = "src/lib.rs"
gst-plugin-version-helper.workspace = true
[features]
default = ["v1_22", "aws", "janus", "livekit", "whip"]
default = ["v1_22", "aws", "janus", "livekit", "whip", "whep"]
static = []
capi = []
v1_22 = ["gst/v1_22", "gst-app/v1_22", "gst-video/v1_22", "gst-webrtc/v1_22", "gst-sdp/v1_22", "gst-rtp/v1_22"]
@ -90,6 +90,7 @@ aws = ["dep:aws-config", "dep:aws-types", "dep:aws-credential-types", "dep:aws-s
janus = ["dep:http"]
livekit = ["dep:livekit-protocol", "dep:livekit-api"]
whip = ["dep:async-recursion", "dep:crossbeam-channel", "dep:reqwest", "dep:warp"]
whep = ["dep:async-recursion", "dep:reqwest", "dep:warp"]
[package.metadata.capi]
min_version = "0.9.21"

View file

@ -26,6 +26,8 @@ pub mod webrtcsink;
pub mod webrtcsrc;
#[cfg(feature = "whip")]
mod whip_signaller;
#[cfg(feature = "whep")]
mod whep_signaller;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
webrtcsink::register(plugin)?;

View file

@ -362,6 +362,7 @@ pub fn set_ice_servers(
Ok(())
}
#[cfg(any(feature = "whip", feature = "whep"))]
pub fn build_link_header(url_str: &str) -> Result<String, url::ParseError> {
let url = url::Url::parse(url_str)?;

View file

@ -4733,3 +4733,48 @@ pub(super) mod janus {
type ParentType = crate::webrtcsink::BaseWebRTCSink;
}
}
#[cfg(feature = "whep")]
pub(super) mod whep {
use super::*;
use crate::whep_signaller::WhepServerSignaller;
#[derive(Default)]
pub struct WhepWebRTCSink {}
impl ObjectImpl for WhepWebRTCSink {
fn constructed(&self) {
let element = self.obj();
let ws = element.upcast_ref::<crate::webrtcsink::BaseWebRTCSink>().imp();
let _ = ws.set_signaller(WhepServerSignaller::default().upcast());
}
}
impl GstObjectImpl for WhepWebRTCSink {}
impl ElementImpl for WhepWebRTCSink {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"WhepWebRTCSink",
"Sink/Network/WebRTC",
"WebRTC sink with WHEP server signaller",
"Taruntej Kanakamalla <taruntej@asymptotic.io>",
)
});
Some(&*ELEMENT_METADATA)
}
}
impl BinImpl for WhepWebRTCSink {}
impl BaseWebRTCSinkImpl for WhepWebRTCSink {}
#[glib::object_subclass]
impl ObjectSubclass for WhepWebRTCSink {
const NAME: &'static str = "GstWhepWebRTCSink";
type Type = crate::webrtcsink::WhepWebRTCSink;
type ParentType = crate::webrtcsink::BaseWebRTCSink;
}
}

View file

@ -72,6 +72,10 @@ glib::wrapper! {
glib::wrapper! {
pub struct JanusVRWebRTCSink(ObjectSubclass<imp::janus::JanusVRWebRTCSink>) @extends BaseWebRTCSink, gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation;
}
#[cfg(feature = "whep")]
glib::wrapper! {
pub struct WhepWebRTCSink(ObjectSubclass<imp::whep::WhepWebRTCSink>) @extends BaseWebRTCSink, gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation;
}
#[derive(thiserror::Error, Debug)]
pub enum WebRTCSinkError {
@ -229,6 +233,13 @@ pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Rank::NONE,
JanusVRWebRTCSink::static_type(),
)?;
#[cfg(feature = "whep")]
gst::Element::register(
Some(plugin),
"whepserversink",
gst::Rank::NONE,
WhepWebRTCSink::static_type(),
)?;
Ok(())
}

View file

@ -0,0 +1,20 @@
// SPDX-License-Identifier: MPL-2.0
use crate::signaller::Signallable;
use gst::{glib, prelude::ObjectExt, subclass::prelude::ObjectSubclassIsExt};
mod server;
glib::wrapper! {
pub struct WhepServerSignaller(ObjectSubclass<server::WhepServer>) @implements Signallable;
}
unsafe impl Send for WhepServerSignaller {}
unsafe impl Sync for WhepServerSignaller {}
impl Default for WhepServerSignaller {
fn default() -> Self {
let sig: WhepServerSignaller = glib::Object::new();
sig.connect_closure("webrtcbin-ready", false, sig.imp().on_webrtcbin_ready());
sig
}
}

View file

@ -0,0 +1,580 @@
// SPDX-License-Identifier: MPL-2.0
use crate::signaller::{Signallable, SignallableImpl};
use crate::utils::{build_link_header, wait_async, WaitError};
use crate::RUNTIME;
use once_cell::sync::Lazy;
use gst::glib::{self, RustClosure};
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst_sdp::SDPMessage;
use gst_webrtc::{WebRTCICEGatheringState, WebRTCSessionDescription};
use reqwest::header::HeaderMap;
use reqwest::header::HeaderValue;
use reqwest::StatusCode;
use std::sync::Mutex;
use std::net::SocketAddr;
use tokio::sync::mpsc;
use url::Url;
use warp::{
http,
hyper::{
header::{CONTENT_TYPE, LINK},
Body,
},
Filter, Reply,
};
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"whep-server-signaller",
gst::DebugColorFlags::empty(),
Some("WHEP Server Signaller"),
)
});
const DEFAULT_TIMEOUT: u32 = 30;
const ROOT: &str = "whep";
const ENDPOINT_PATH: &str = "endpoint";
const RESOURCE_PATH: &str = "resource";
const DEFAULT_HOST_ADDR: &str = "http://127.0.0.1:9090";
const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://stun.l.google.com:19303");
const CONTENT_SDP: &str = "application/sdp";
const CONTENT_TRICKLE_ICE: &str = "application/trickle-ice-sdpfrag";
struct Settings {
stun_server: Option<String>,
turn_servers: gst::Array,
host_addr: Url,
timeout: u32,
shutdown_signal: Option<tokio::sync::oneshot::Sender<()>>,
server_handle: Option<tokio::task::JoinHandle<()>>,
sdp_answer: Option<mpsc::Sender<Option<SDPMessage>>>,
}
impl Default for Settings {
fn default() -> Self {
Self {
host_addr: Url::parse(DEFAULT_HOST_ADDR).unwrap(),
stun_server: DEFAULT_STUN_SERVER.map(String::from),
turn_servers: gst::Array::new(Vec::new() as Vec<glib::SendValue>),
timeout: DEFAULT_TIMEOUT,
shutdown_signal: None,
server_handle: None,
sdp_answer: None,
}
}
}
#[derive(Default)]
pub struct WhepServer {
settings: Mutex<Settings>,
canceller: Mutex<Option<futures::future::AbortHandle>>,
}
impl WhepServer {
pub fn on_webrtcbin_ready(&self) -> RustClosure {
glib::closure!(|signaller: &super::WhepServerSignaller,
_consumer_identifier: &str,
webrtcbin: &gst::Element| {
let obj_weak = signaller.downgrade();
webrtcbin.connect_notify(Some("ice-gathering-state"), move |webrtcbin, _pspec| {
let obj = match obj_weak.upgrade() {
Some(obj) => obj,
None => return,
};
let state = webrtcbin.property::<WebRTCICEGatheringState>("ice-gathering-state");
match state {
WebRTCICEGatheringState::Gathering => {
gst::info!(CAT, obj: obj, "ICE gathering started");
}
WebRTCICEGatheringState::Complete => {
gst::info!(CAT, obj: obj, "ICE gathering complete");
let ans: Option<gst_sdp::SDPMessage>;
let mut settings = obj.imp().settings.lock().unwrap();
if let Some(answer_desc) = webrtcbin
.property::<Option<WebRTCSessionDescription>>("local-description")
{
ans = Some(answer_desc.sdp().to_owned());
} else {
ans = None;
}
let tx = settings
.sdp_answer
.take()
.expect("SDP answer Sender needs to be valid");
let obj_weak = obj.downgrade();
RUNTIME.spawn( async move {
let obj = match obj_weak.upgrade() {
Some(obj) => obj,
None => return,
};
if let Err(e) = tx.send(ans).await {
gst::error!(CAT, obj: obj, "Failed to send SDP {e}");
}
});
}
_ => (),
}
});
})
}
async fn patch_handler(&self, _id: String) -> Result<impl warp::Reply, warp::Rejection> {
// FIXME: implement ICE Trickle and ICE restart
// emit signal `handle-ice` to for ICE trickle
let reply = warp::reply::reply();
let res = warp::reply::with_status(reply, http::StatusCode::NOT_IMPLEMENTED);
Ok(res.into_response())
//FIXME: add state checking once ICE trickle is implemented
}
async fn delete_handler(&self, id: String) -> Result<impl warp::Reply, warp::Rejection> {
if self
.obj()
.emit_by_name::<bool>("session-ended", &[&id.as_str()])
{
//do nothing
// FIXME: revisit once the return values are changed in webrtcsink/imp.rs and webrtcsrc/imp.rs
}
gst::info!(CAT, imp:self, "Ended session {id}");
Ok(warp::reply::reply().into_response())
}
async fn options_handler(&self) -> Result<impl warp::Reply, warp::Rejection> {
let mut links = HeaderMap::new();
let settings = self.settings.lock().unwrap();
match &settings.stun_server {
Some(stun) => match build_link_header(stun.as_str()) {
Ok(stun_link) => {
links.append(LINK, HeaderValue::from_str(stun_link.as_str()).unwrap());
}
Err(e) => {
gst::error!(CAT, imp: self, "Failed to parse {stun:?} : {e:?}");
}
},
None => {}
}
if !settings.turn_servers.is_empty() {
for turn_server in settings.turn_servers.iter() {
if let Ok(turn) = turn_server.get::<String>() {
gst::debug!(CAT, imp: self, "turn server: {}",turn.as_str());
match build_link_header(turn.as_str()) {
Ok(turn_link) => {
links.append(LINK, HeaderValue::from_str(turn_link.as_str()).unwrap());
}
Err(e) => {
gst::error!(CAT, imp: self, "Failed to parse {turn_server:?} : {e:?}");
}
}
} else {
gst::debug!(CAT, imp: self, "Failed to get String value of {turn_server:?}");
}
}
}
let mut res = http::Response::builder()
.header("Access-Post", CONTENT_SDP)
.body(Body::empty())
.unwrap();
let headers = res.headers_mut();
headers.extend(links);
Ok(res)
}
async fn post_handler(
&self,
body: warp::hyper::body::Bytes,
) -> Result<http::Response<warp::hyper::Body>, warp::Rejection> {
let session_id = uuid::Uuid::new_v4().to_string();
let (tx, mut rx) = mpsc::channel::<Option<SDPMessage>>(1);
let wait_timeout = {
let mut settings = self.settings.lock().unwrap();
let wait_timeout = settings.timeout;
settings.sdp_answer = Some(tx);
drop(settings);
wait_timeout
};
match gst_sdp::SDPMessage::parse_buffer(body.as_ref()) {
Ok(offer_sdp) => {
let offer = gst_webrtc::WebRTCSessionDescription::new(
gst_webrtc::WebRTCSDPType::Offer,
offer_sdp,
);
self.obj()
.emit_by_name::<()>("session-requested", &[&session_id, &session_id, &offer]);
}
Err(err) => {
gst::error!(CAT, imp: self, "Could not parse offer SDP: {err}");
let reply = warp::reply::reply();
let res = warp::reply::with_status(reply, http::StatusCode::NOT_ACCEPTABLE);
return Ok(res.into_response());
}
}
let result = wait_async(&self.canceller, rx.recv(), wait_timeout).await;
let answer = match result {
Ok(ans) => {
match ans {
Some(a) => a,
None => {
let err = "Channel closed, can't receive SDP".to_owned();
let res = http::Response::builder()
.status(http::StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(err))
.unwrap();
return Ok(res);
},
}
},
Err(e) => {
let err = match e {
WaitError::FutureAborted => {
"Aborted".to_owned()
}
WaitError::FutureError(err) => {
err.to_string()
}
};
let res = http::Response::builder()
.status(http::StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(err))
.unwrap();
return Ok(res);
}
};
let settings = self.settings.lock().unwrap();
let mut links = HeaderMap::new();
if let Some(stun) = &settings.stun_server {
match build_link_header(stun.as_str()) {
Ok(stun_link) => {
links.append(LINK, HeaderValue::from_str(stun_link.as_str()).unwrap());
}
Err(e) => {
gst::error!(CAT, imp: self, "Failed to parse {stun:?} : {e:?}");
}
}
}
if !settings.turn_servers.is_empty() {
for turn_server in settings.turn_servers.iter() {
if let Ok(turn) = turn_server.get::<String>() {
gst::debug!(CAT, imp: self, "turn server: {}",turn.as_str());
match build_link_header(turn.as_str()) {
Ok(turn_link) => {
links.append(LINK, HeaderValue::from_str(turn_link.as_str()).unwrap());
}
Err(e) => {
gst::error!(CAT, imp: self, "Failed to parse {turn_server:?} : {e:?}");
}
}
} else {
gst::error!(CAT, imp: self, "Failed to get String value of {turn_server:?}");
}
}
}
// Note: including the ETag in the original "201 Created" response is only REQUIRED
// if the WHEP resource supports ICE restarts and OPTIONAL otherwise.
let ans_text: Result<String, String>;
if let Some(sdp) = answer {
match sdp.as_text() {
Ok(text) => {
ans_text = Ok(text);
gst::debug!(CAT, imp: self, "{ans_text:?}");
}
Err(e) => {
ans_text = Err(format!("Failed to get SDP answer: {e:?}"));
gst::error!(CAT, imp: self, "{e:?}");
}
}
} else {
let e = "SDP Answer is empty!".to_string();
gst::error!(CAT, imp: self, "{e:?}");
ans_text = Err(e);
}
// If ans_text is an error. Send error code and error string in the response
if let Err(e) = ans_text {
let res = http::Response::builder()
.status(http::StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(e))
.unwrap();
return Ok(res);
}
let resource_url = "/".to_owned() + ROOT + "/" + RESOURCE_PATH + "/" + &session_id;
let mut res = http::Response::builder()
.status(StatusCode::CREATED)
.header(CONTENT_TYPE, CONTENT_SDP)
.header("location", resource_url)
.body(Body::from(ans_text.unwrap()))
.unwrap();
let headers = res.headers_mut();
headers.extend(links);
Ok(res)
}
fn serve(&self) -> Option<tokio::task::JoinHandle<()>> {
let mut settings = self.settings.lock().unwrap();
let addr: SocketAddr;
match settings.host_addr.socket_addrs(|| None) {
Ok(v) => {
// pick the first vector item
addr = v[0];
gst::info!(CAT, imp:self, "using {addr:?} as address");
}
Err(e) => {
gst::error!(CAT, imp:self, "error getting addr from uri {e:?}");
self.obj()
.emit_by_name::<()>("error", &[&format!("Unable to start Whep Server: {e:?}")]);
return None;
}
}
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
settings.shutdown_signal = Some(tx);
drop(settings);
let prefix = warp::path(ROOT);
let self_weak = self.downgrade();
// POST /endpoint
let post_filter = warp::post()
.and(warp::path(ENDPOINT_PATH))
.and(warp::path::end())
.and(warp::header::exact(CONTENT_TYPE.as_str(), CONTENT_SDP))
.and(warp::body::bytes())
.and_then(move |body| {
let s = self_weak.upgrade();
async {
let self_ = s.expect("Need to have the ObjectRef");
self_.post_handler(body).await
}
});
let self_weak = self.downgrade();
// OPTIONS /endpoint
let options_filter = warp::options()
.and(warp::path(ENDPOINT_PATH))
.and(warp::path::end())
.and_then(move || {
let s = self_weak.upgrade();
async {
let self_ = s.expect("Need to have the ObjectRef");
self_.options_handler().await
}
});
let self_weak = self.downgrade();
// PATCH /resource/:id
let patch_filter = warp::patch()
.and(warp::path(RESOURCE_PATH))
.and(warp::path::param::<String>())
.and(warp::path::end())
.and(warp::header::exact(
CONTENT_TYPE.as_str(),
CONTENT_TRICKLE_ICE,
))
.and_then(move |id| {
let s = self_weak.upgrade();
async {
let self_ = s.expect("Need to have the ObjectRef");
self_.patch_handler(id).await
}
});
let self_weak = self.downgrade();
// DELETE /resource/:id
let delete_filter = warp::delete()
.and(warp::path(RESOURCE_PATH))
.and(warp::path::param::<String>())
.and(warp::path::end())
.and_then(move |id| {
let s = self_weak.upgrade();
async {
let self_ = s.expect("Need to have the ObjectRef");
self_.delete_handler(id).await
}
});
let api = prefix
.and(post_filter)
.or(prefix.and(options_filter))
.or(prefix.and(patch_filter))
.or(prefix.and(delete_filter));
let s = warp::serve(api);
let f = async move {
let (_, server) = s.bind_with_graceful_shutdown(addr, async move {
match rx.await {
Ok(_) => gst::debug!(CAT, "Server shut down signal received"),
Err(e) => gst::error!(CAT, "{e:?}: Sender dropped"),
}
});
server.await;
gst::debug!(CAT, "Stopped the server task...");
};
let jh = RUNTIME.spawn(f);
gst::debug!(CAT, imp: self, "Started the server...");
Some(jh)
}
fn set_host_addr(&self, host_addr: &str) -> Result<(), url::ParseError> {
let mut settings = self.settings.lock().unwrap();
settings.host_addr = Url::parse(host_addr)?;
Ok(())
}
}
impl SignallableImpl for WhepServer {
fn start(&self) {
gst::info!(CAT, imp: self, "starting the WHEP server");
let jh = self.serve();
let mut settings = self.settings.lock().unwrap();
settings.server_handle = jh;
}
fn stop(&self) {
let mut settings = self.settings.lock().unwrap();
let handle = settings
.server_handle
.take()
.expect("Server handle should be set");
let tx = settings
.shutdown_signal
.take()
.expect("Shutdown signal Sender needs to be valid");
if tx.send(()).is_err() {
gst::error!(CAT, imp: self, "Failed to send shutdown signal. Receiver dropped");
}
gst::debug!(CAT, imp: self, "Await server handle to join");
RUNTIME.block_on(async {
if let Err(e) = handle.await {
gst::error!(CAT, imp:self, "Failed to join server handle: {e:?}");
};
});
gst::info!(CAT, imp: self, "stopped the WHEP server");
}
fn end_session(&self, _session_id: &str) {
//FIXME: send any events to the client
}
}
#[glib::object_subclass]
impl ObjectSubclass for WhepServer {
const NAME: &'static str = "GstWhepServerSignaller";
type Type = super::WhepServerSignaller;
type ParentType = glib::Object;
type Interfaces = (Signallable,);
}
impl ObjectImpl for WhepServer {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpecString::builder("host-addr")
.nick("Host address")
.blurb("The the host address of the WHEP endpoint e.g., http://127.0.0.1:9090")
.default_value(DEFAULT_HOST_ADDR)
.flags(glib::ParamFlags::READWRITE)
.build(),
glib::ParamSpecString::builder("stun-server")
.nick("STUN Server")
.blurb("The STUN server of the form stun://hostname:port")
.default_value(DEFAULT_STUN_SERVER)
.build(),
gst::ParamSpecArray::builder("turn-servers")
.nick("List of TURN Servers to user")
.blurb("The TURN servers of the form <\"turn(s)://username:password@host:port\", \"turn(s)://username1:password1@host1:port1\">")
.element_spec(&glib::ParamSpecString::builder("turn-server")
.nick("TURN Server")
.blurb("The TURN server of the form turn(s)://username:password@host:port.")
.build()
)
.mutable_ready()
.build(),
glib::ParamSpecUInt::builder("timeout")
.nick("Timeout")
.blurb("Value in seconds to timeout WHEP endpoint requests (0 = No timeout).")
.maximum(3600)
.default_value(DEFAULT_TIMEOUT)
.build(),
]
});
PROPERTIES.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"host-addr" => {
if let Err(e) =
self.set_host_addr(value.get::<&str>().expect("type checked upstream"))
{
gst::error!(CAT, "Couldn't set the host address as {e:?}, fallback to the default value {DEFAULT_HOST_ADDR:?}");
}
}
"stun-server" => {
let mut settings = self.settings.lock().unwrap();
settings.stun_server = value
.get::<Option<String>>()
.expect("type checked upstream")
}
"turn-servers" => {
let mut settings = self.settings.lock().unwrap();
settings.turn_servers = value.get::<gst::Array>().expect("type checked upstream")
}
"timeout" => {
let mut settings = self.settings.lock().unwrap();
settings.timeout = value.get().unwrap();
}
_ => unimplemented!(),
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"host-addr" => settings.host_addr.to_string().to_value(),
"stun-server" => settings.stun_server.to_value(),
"turn-servers" => settings.turn_servers.to_value(),
"timeout" => settings.timeout.to_value(),
_ => unimplemented!(),
}
}
}