mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2024-05-12 21:32:39 +00:00
net/webrtc: add WHEP server signaller
WHEP server implementation derived from BaseWebRTCSink
This commit is contained in:
parent
b3d3895ae7
commit
4d6e0f707b
|
@ -78,7 +78,7 @@ path = "src/lib.rs"
|
|||
gst-plugin-version-helper.workspace = true
|
||||
|
||||
[features]
|
||||
default = ["v1_22", "aws", "janus", "livekit", "whip"]
|
||||
default = ["v1_22", "aws", "janus", "livekit", "whip", "whep"]
|
||||
static = []
|
||||
capi = []
|
||||
v1_22 = ["gst/v1_22", "gst-app/v1_22", "gst-video/v1_22", "gst-webrtc/v1_22", "gst-sdp/v1_22", "gst-rtp/v1_22"]
|
||||
|
@ -90,6 +90,7 @@ aws = ["dep:aws-config", "dep:aws-types", "dep:aws-credential-types", "dep:aws-s
|
|||
janus = ["dep:http"]
|
||||
livekit = ["dep:livekit-protocol", "dep:livekit-api"]
|
||||
whip = ["dep:async-recursion", "dep:crossbeam-channel", "dep:reqwest", "dep:warp"]
|
||||
whep = ["dep:async-recursion", "dep:reqwest", "dep:warp"]
|
||||
|
||||
[package.metadata.capi]
|
||||
min_version = "0.9.21"
|
||||
|
|
|
@ -26,6 +26,8 @@ pub mod webrtcsink;
|
|||
pub mod webrtcsrc;
|
||||
#[cfg(feature = "whip")]
|
||||
mod whip_signaller;
|
||||
#[cfg(feature = "whep")]
|
||||
mod whep_signaller;
|
||||
|
||||
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
|
||||
webrtcsink::register(plugin)?;
|
||||
|
|
|
@ -362,6 +362,7 @@ pub fn set_ice_servers(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(any(feature = "whip", feature = "whep"))]
|
||||
pub fn build_link_header(url_str: &str) -> Result<String, url::ParseError> {
|
||||
let url = url::Url::parse(url_str)?;
|
||||
|
||||
|
|
|
@ -4733,3 +4733,48 @@ pub(super) mod janus {
|
|||
type ParentType = crate::webrtcsink::BaseWebRTCSink;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "whep")]
|
||||
pub(super) mod whep {
|
||||
use super::*;
|
||||
use crate::whep_signaller::WhepServerSignaller;
|
||||
#[derive(Default)]
|
||||
pub struct WhepWebRTCSink {}
|
||||
|
||||
impl ObjectImpl for WhepWebRTCSink {
|
||||
fn constructed(&self) {
|
||||
let element = self.obj();
|
||||
let ws = element.upcast_ref::<crate::webrtcsink::BaseWebRTCSink>().imp();
|
||||
|
||||
let _ = ws.set_signaller(WhepServerSignaller::default().upcast());
|
||||
}
|
||||
}
|
||||
|
||||
impl GstObjectImpl for WhepWebRTCSink {}
|
||||
|
||||
impl ElementImpl for WhepWebRTCSink {
|
||||
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
|
||||
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
|
||||
gst::subclass::ElementMetadata::new(
|
||||
"WhepWebRTCSink",
|
||||
"Sink/Network/WebRTC",
|
||||
"WebRTC sink with WHEP server signaller",
|
||||
"Taruntej Kanakamalla <taruntej@asymptotic.io>",
|
||||
)
|
||||
});
|
||||
|
||||
Some(&*ELEMENT_METADATA)
|
||||
}
|
||||
}
|
||||
|
||||
impl BinImpl for WhepWebRTCSink {}
|
||||
|
||||
impl BaseWebRTCSinkImpl for WhepWebRTCSink {}
|
||||
|
||||
#[glib::object_subclass]
|
||||
impl ObjectSubclass for WhepWebRTCSink {
|
||||
const NAME: &'static str = "GstWhepWebRTCSink";
|
||||
type Type = crate::webrtcsink::WhepWebRTCSink;
|
||||
type ParentType = crate::webrtcsink::BaseWebRTCSink;
|
||||
}
|
||||
}
|
|
@ -72,6 +72,10 @@ glib::wrapper! {
|
|||
glib::wrapper! {
|
||||
pub struct JanusVRWebRTCSink(ObjectSubclass<imp::janus::JanusVRWebRTCSink>) @extends BaseWebRTCSink, gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation;
|
||||
}
|
||||
#[cfg(feature = "whep")]
|
||||
glib::wrapper! {
|
||||
pub struct WhepWebRTCSink(ObjectSubclass<imp::whep::WhepWebRTCSink>) @extends BaseWebRTCSink, gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation;
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum WebRTCSinkError {
|
||||
|
@ -229,6 +233,13 @@ pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
|
|||
gst::Rank::NONE,
|
||||
JanusVRWebRTCSink::static_type(),
|
||||
)?;
|
||||
#[cfg(feature = "whep")]
|
||||
gst::Element::register(
|
||||
Some(plugin),
|
||||
"whepserversink",
|
||||
gst::Rank::NONE,
|
||||
WhepWebRTCSink::static_type(),
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
20
net/webrtc/src/whep_signaller/mod.rs
Normal file
20
net/webrtc/src/whep_signaller/mod.rs
Normal file
|
@ -0,0 +1,20 @@
|
|||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
use crate::signaller::Signallable;
|
||||
use gst::{glib, prelude::ObjectExt, subclass::prelude::ObjectSubclassIsExt};
|
||||
|
||||
mod server;
|
||||
glib::wrapper! {
|
||||
pub struct WhepServerSignaller(ObjectSubclass<server::WhepServer>) @implements Signallable;
|
||||
}
|
||||
|
||||
unsafe impl Send for WhepServerSignaller {}
|
||||
unsafe impl Sync for WhepServerSignaller {}
|
||||
|
||||
impl Default for WhepServerSignaller {
|
||||
fn default() -> Self {
|
||||
let sig: WhepServerSignaller = glib::Object::new();
|
||||
sig.connect_closure("webrtcbin-ready", false, sig.imp().on_webrtcbin_ready());
|
||||
sig
|
||||
}
|
||||
}
|
580
net/webrtc/src/whep_signaller/server.rs
Normal file
580
net/webrtc/src/whep_signaller/server.rs
Normal file
|
@ -0,0 +1,580 @@
|
|||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
use crate::signaller::{Signallable, SignallableImpl};
|
||||
use crate::utils::{build_link_header, wait_async, WaitError};
|
||||
use crate::RUNTIME;
|
||||
use once_cell::sync::Lazy;
|
||||
use gst::glib::{self, RustClosure};
|
||||
use gst::prelude::*;
|
||||
use gst::subclass::prelude::*;
|
||||
use gst_sdp::SDPMessage;
|
||||
use gst_webrtc::{WebRTCICEGatheringState, WebRTCSessionDescription};
|
||||
use reqwest::header::HeaderMap;
|
||||
use reqwest::header::HeaderValue;
|
||||
use reqwest::StatusCode;
|
||||
use std::sync::Mutex;
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use tokio::sync::mpsc;
|
||||
use url::Url;
|
||||
use warp::{
|
||||
http,
|
||||
hyper::{
|
||||
header::{CONTENT_TYPE, LINK},
|
||||
Body,
|
||||
},
|
||||
Filter, Reply,
|
||||
};
|
||||
|
||||
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
||||
gst::DebugCategory::new(
|
||||
"whep-server-signaller",
|
||||
gst::DebugColorFlags::empty(),
|
||||
Some("WHEP Server Signaller"),
|
||||
)
|
||||
});
|
||||
|
||||
const DEFAULT_TIMEOUT: u32 = 30;
|
||||
|
||||
const ROOT: &str = "whep";
|
||||
const ENDPOINT_PATH: &str = "endpoint";
|
||||
const RESOURCE_PATH: &str = "resource";
|
||||
const DEFAULT_HOST_ADDR: &str = "http://127.0.0.1:9090";
|
||||
const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://stun.l.google.com:19303");
|
||||
const CONTENT_SDP: &str = "application/sdp";
|
||||
const CONTENT_TRICKLE_ICE: &str = "application/trickle-ice-sdpfrag";
|
||||
|
||||
struct Settings {
|
||||
stun_server: Option<String>,
|
||||
turn_servers: gst::Array,
|
||||
host_addr: Url,
|
||||
timeout: u32,
|
||||
shutdown_signal: Option<tokio::sync::oneshot::Sender<()>>,
|
||||
server_handle: Option<tokio::task::JoinHandle<()>>,
|
||||
sdp_answer: Option<mpsc::Sender<Option<SDPMessage>>>,
|
||||
}
|
||||
|
||||
impl Default for Settings {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
host_addr: Url::parse(DEFAULT_HOST_ADDR).unwrap(),
|
||||
stun_server: DEFAULT_STUN_SERVER.map(String::from),
|
||||
turn_servers: gst::Array::new(Vec::new() as Vec<glib::SendValue>),
|
||||
timeout: DEFAULT_TIMEOUT,
|
||||
shutdown_signal: None,
|
||||
server_handle: None,
|
||||
sdp_answer: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct WhepServer {
|
||||
settings: Mutex<Settings>,
|
||||
canceller: Mutex<Option<futures::future::AbortHandle>>,
|
||||
}
|
||||
|
||||
impl WhepServer {
|
||||
pub fn on_webrtcbin_ready(&self) -> RustClosure {
|
||||
glib::closure!(|signaller: &super::WhepServerSignaller,
|
||||
_consumer_identifier: &str,
|
||||
webrtcbin: &gst::Element| {
|
||||
let obj_weak = signaller.downgrade();
|
||||
webrtcbin.connect_notify(Some("ice-gathering-state"), move |webrtcbin, _pspec| {
|
||||
let obj = match obj_weak.upgrade() {
|
||||
Some(obj) => obj,
|
||||
None => return,
|
||||
};
|
||||
|
||||
let state = webrtcbin.property::<WebRTCICEGatheringState>("ice-gathering-state");
|
||||
|
||||
match state {
|
||||
WebRTCICEGatheringState::Gathering => {
|
||||
gst::info!(CAT, obj: obj, "ICE gathering started");
|
||||
}
|
||||
WebRTCICEGatheringState::Complete => {
|
||||
gst::info!(CAT, obj: obj, "ICE gathering complete");
|
||||
let ans: Option<gst_sdp::SDPMessage>;
|
||||
let mut settings = obj.imp().settings.lock().unwrap();
|
||||
if let Some(answer_desc) = webrtcbin
|
||||
.property::<Option<WebRTCSessionDescription>>("local-description")
|
||||
{
|
||||
ans = Some(answer_desc.sdp().to_owned());
|
||||
} else {
|
||||
ans = None;
|
||||
}
|
||||
|
||||
let tx = settings
|
||||
.sdp_answer
|
||||
.take()
|
||||
.expect("SDP answer Sender needs to be valid");
|
||||
|
||||
let obj_weak = obj.downgrade();
|
||||
RUNTIME.spawn( async move {
|
||||
let obj = match obj_weak.upgrade() {
|
||||
Some(obj) => obj,
|
||||
None => return,
|
||||
};
|
||||
|
||||
if let Err(e) = tx.send(ans).await {
|
||||
gst::error!(CAT, obj: obj, "Failed to send SDP {e}");
|
||||
}
|
||||
});
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
});
|
||||
})
|
||||
}
|
||||
|
||||
async fn patch_handler(&self, _id: String) -> Result<impl warp::Reply, warp::Rejection> {
|
||||
// FIXME: implement ICE Trickle and ICE restart
|
||||
// emit signal `handle-ice` to for ICE trickle
|
||||
let reply = warp::reply::reply();
|
||||
let res = warp::reply::with_status(reply, http::StatusCode::NOT_IMPLEMENTED);
|
||||
Ok(res.into_response())
|
||||
//FIXME: add state checking once ICE trickle is implemented
|
||||
}
|
||||
|
||||
async fn delete_handler(&self, id: String) -> Result<impl warp::Reply, warp::Rejection> {
|
||||
if self
|
||||
.obj()
|
||||
.emit_by_name::<bool>("session-ended", &[&id.as_str()])
|
||||
{
|
||||
//do nothing
|
||||
// FIXME: revisit once the return values are changed in webrtcsink/imp.rs and webrtcsrc/imp.rs
|
||||
}
|
||||
|
||||
gst::info!(CAT, imp:self, "Ended session {id}");
|
||||
Ok(warp::reply::reply().into_response())
|
||||
}
|
||||
|
||||
async fn options_handler(&self) -> Result<impl warp::Reply, warp::Rejection> {
|
||||
let mut links = HeaderMap::new();
|
||||
let settings = self.settings.lock().unwrap();
|
||||
|
||||
match &settings.stun_server {
|
||||
Some(stun) => match build_link_header(stun.as_str()) {
|
||||
Ok(stun_link) => {
|
||||
links.append(LINK, HeaderValue::from_str(stun_link.as_str()).unwrap());
|
||||
}
|
||||
Err(e) => {
|
||||
gst::error!(CAT, imp: self, "Failed to parse {stun:?} : {e:?}");
|
||||
}
|
||||
},
|
||||
None => {}
|
||||
}
|
||||
|
||||
if !settings.turn_servers.is_empty() {
|
||||
for turn_server in settings.turn_servers.iter() {
|
||||
if let Ok(turn) = turn_server.get::<String>() {
|
||||
gst::debug!(CAT, imp: self, "turn server: {}",turn.as_str());
|
||||
match build_link_header(turn.as_str()) {
|
||||
Ok(turn_link) => {
|
||||
links.append(LINK, HeaderValue::from_str(turn_link.as_str()).unwrap());
|
||||
}
|
||||
Err(e) => {
|
||||
gst::error!(CAT, imp: self, "Failed to parse {turn_server:?} : {e:?}");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
gst::debug!(CAT, imp: self, "Failed to get String value of {turn_server:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut res = http::Response::builder()
|
||||
.header("Access-Post", CONTENT_SDP)
|
||||
.body(Body::empty())
|
||||
.unwrap();
|
||||
|
||||
let headers = res.headers_mut();
|
||||
headers.extend(links);
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
async fn post_handler(
|
||||
&self,
|
||||
body: warp::hyper::body::Bytes,
|
||||
) -> Result<http::Response<warp::hyper::Body>, warp::Rejection> {
|
||||
let session_id = uuid::Uuid::new_v4().to_string();
|
||||
let (tx, mut rx) = mpsc::channel::<Option<SDPMessage>>(1);
|
||||
|
||||
let wait_timeout = {
|
||||
let mut settings = self.settings.lock().unwrap();
|
||||
let wait_timeout = settings.timeout;
|
||||
settings.sdp_answer = Some(tx);
|
||||
drop(settings);
|
||||
wait_timeout
|
||||
};
|
||||
|
||||
match gst_sdp::SDPMessage::parse_buffer(body.as_ref()) {
|
||||
Ok(offer_sdp) => {
|
||||
let offer = gst_webrtc::WebRTCSessionDescription::new(
|
||||
gst_webrtc::WebRTCSDPType::Offer,
|
||||
offer_sdp,
|
||||
);
|
||||
self.obj()
|
||||
.emit_by_name::<()>("session-requested", &[&session_id, &session_id, &offer]);
|
||||
}
|
||||
Err(err) => {
|
||||
gst::error!(CAT, imp: self, "Could not parse offer SDP: {err}");
|
||||
let reply = warp::reply::reply();
|
||||
let res = warp::reply::with_status(reply, http::StatusCode::NOT_ACCEPTABLE);
|
||||
return Ok(res.into_response());
|
||||
}
|
||||
}
|
||||
|
||||
let result = wait_async(&self.canceller, rx.recv(), wait_timeout).await;
|
||||
|
||||
let answer = match result {
|
||||
Ok(ans) => {
|
||||
match ans {
|
||||
Some(a) => a,
|
||||
None => {
|
||||
let err = "Channel closed, can't receive SDP".to_owned();
|
||||
let res = http::Response::builder()
|
||||
.status(http::StatusCode::INTERNAL_SERVER_ERROR)
|
||||
.body(Body::from(err))
|
||||
.unwrap();
|
||||
|
||||
return Ok(res);
|
||||
},
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
let err = match e {
|
||||
WaitError::FutureAborted => {
|
||||
"Aborted".to_owned()
|
||||
}
|
||||
WaitError::FutureError(err) => {
|
||||
err.to_string()
|
||||
}
|
||||
};
|
||||
let res = http::Response::builder()
|
||||
.status(http::StatusCode::INTERNAL_SERVER_ERROR)
|
||||
.body(Body::from(err))
|
||||
.unwrap();
|
||||
|
||||
return Ok(res);
|
||||
}
|
||||
};
|
||||
|
||||
let settings = self.settings.lock().unwrap();
|
||||
let mut links = HeaderMap::new();
|
||||
|
||||
if let Some(stun) = &settings.stun_server {
|
||||
match build_link_header(stun.as_str()) {
|
||||
Ok(stun_link) => {
|
||||
links.append(LINK, HeaderValue::from_str(stun_link.as_str()).unwrap());
|
||||
}
|
||||
Err(e) => {
|
||||
gst::error!(CAT, imp: self, "Failed to parse {stun:?} : {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !settings.turn_servers.is_empty() {
|
||||
for turn_server in settings.turn_servers.iter() {
|
||||
if let Ok(turn) = turn_server.get::<String>() {
|
||||
gst::debug!(CAT, imp: self, "turn server: {}",turn.as_str());
|
||||
match build_link_header(turn.as_str()) {
|
||||
Ok(turn_link) => {
|
||||
links.append(LINK, HeaderValue::from_str(turn_link.as_str()).unwrap());
|
||||
}
|
||||
Err(e) => {
|
||||
gst::error!(CAT, imp: self, "Failed to parse {turn_server:?} : {e:?}");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
gst::error!(CAT, imp: self, "Failed to get String value of {turn_server:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Note: including the ETag in the original "201 Created" response is only REQUIRED
|
||||
// if the WHEP resource supports ICE restarts and OPTIONAL otherwise.
|
||||
|
||||
let ans_text: Result<String, String>;
|
||||
if let Some(sdp) = answer {
|
||||
match sdp.as_text() {
|
||||
Ok(text) => {
|
||||
ans_text = Ok(text);
|
||||
gst::debug!(CAT, imp: self, "{ans_text:?}");
|
||||
}
|
||||
Err(e) => {
|
||||
ans_text = Err(format!("Failed to get SDP answer: {e:?}"));
|
||||
gst::error!(CAT, imp: self, "{e:?}");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let e = "SDP Answer is empty!".to_string();
|
||||
gst::error!(CAT, imp: self, "{e:?}");
|
||||
ans_text = Err(e);
|
||||
}
|
||||
|
||||
// If ans_text is an error. Send error code and error string in the response
|
||||
if let Err(e) = ans_text {
|
||||
let res = http::Response::builder()
|
||||
.status(http::StatusCode::INTERNAL_SERVER_ERROR)
|
||||
.body(Body::from(e))
|
||||
.unwrap();
|
||||
return Ok(res);
|
||||
}
|
||||
|
||||
let resource_url = "/".to_owned() + ROOT + "/" + RESOURCE_PATH + "/" + &session_id;
|
||||
let mut res = http::Response::builder()
|
||||
.status(StatusCode::CREATED)
|
||||
.header(CONTENT_TYPE, CONTENT_SDP)
|
||||
.header("location", resource_url)
|
||||
.body(Body::from(ans_text.unwrap()))
|
||||
.unwrap();
|
||||
|
||||
let headers = res.headers_mut();
|
||||
headers.extend(links);
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
fn serve(&self) -> Option<tokio::task::JoinHandle<()>> {
|
||||
let mut settings = self.settings.lock().unwrap();
|
||||
let addr: SocketAddr;
|
||||
match settings.host_addr.socket_addrs(|| None) {
|
||||
Ok(v) => {
|
||||
// pick the first vector item
|
||||
addr = v[0];
|
||||
gst::info!(CAT, imp:self, "using {addr:?} as address");
|
||||
}
|
||||
Err(e) => {
|
||||
gst::error!(CAT, imp:self, "error getting addr from uri {e:?}");
|
||||
self.obj()
|
||||
.emit_by_name::<()>("error", &[&format!("Unable to start Whep Server: {e:?}")]);
|
||||
return None;
|
||||
}
|
||||
}
|
||||
|
||||
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
|
||||
settings.shutdown_signal = Some(tx);
|
||||
drop(settings);
|
||||
|
||||
let prefix = warp::path(ROOT);
|
||||
|
||||
let self_weak = self.downgrade();
|
||||
|
||||
// POST /endpoint
|
||||
let post_filter = warp::post()
|
||||
.and(warp::path(ENDPOINT_PATH))
|
||||
.and(warp::path::end())
|
||||
.and(warp::header::exact(CONTENT_TYPE.as_str(), CONTENT_SDP))
|
||||
.and(warp::body::bytes())
|
||||
.and_then(move |body| {
|
||||
let s = self_weak.upgrade();
|
||||
async {
|
||||
let self_ = s.expect("Need to have the ObjectRef");
|
||||
self_.post_handler(body).await
|
||||
}
|
||||
});
|
||||
|
||||
let self_weak = self.downgrade();
|
||||
|
||||
// OPTIONS /endpoint
|
||||
let options_filter = warp::options()
|
||||
.and(warp::path(ENDPOINT_PATH))
|
||||
.and(warp::path::end())
|
||||
.and_then(move || {
|
||||
let s = self_weak.upgrade();
|
||||
async {
|
||||
let self_ = s.expect("Need to have the ObjectRef");
|
||||
self_.options_handler().await
|
||||
}
|
||||
});
|
||||
|
||||
let self_weak = self.downgrade();
|
||||
|
||||
// PATCH /resource/:id
|
||||
let patch_filter = warp::patch()
|
||||
.and(warp::path(RESOURCE_PATH))
|
||||
.and(warp::path::param::<String>())
|
||||
.and(warp::path::end())
|
||||
.and(warp::header::exact(
|
||||
CONTENT_TYPE.as_str(),
|
||||
CONTENT_TRICKLE_ICE,
|
||||
))
|
||||
.and_then(move |id| {
|
||||
let s = self_weak.upgrade();
|
||||
async {
|
||||
let self_ = s.expect("Need to have the ObjectRef");
|
||||
self_.patch_handler(id).await
|
||||
}
|
||||
});
|
||||
|
||||
let self_weak = self.downgrade();
|
||||
|
||||
// DELETE /resource/:id
|
||||
let delete_filter = warp::delete()
|
||||
.and(warp::path(RESOURCE_PATH))
|
||||
.and(warp::path::param::<String>())
|
||||
.and(warp::path::end())
|
||||
.and_then(move |id| {
|
||||
let s = self_weak.upgrade();
|
||||
async {
|
||||
let self_ = s.expect("Need to have the ObjectRef");
|
||||
self_.delete_handler(id).await
|
||||
}
|
||||
});
|
||||
|
||||
let api = prefix
|
||||
.and(post_filter)
|
||||
.or(prefix.and(options_filter))
|
||||
.or(prefix.and(patch_filter))
|
||||
.or(prefix.and(delete_filter));
|
||||
|
||||
let s = warp::serve(api);
|
||||
let f = async move {
|
||||
let (_, server) = s.bind_with_graceful_shutdown(addr, async move {
|
||||
match rx.await {
|
||||
Ok(_) => gst::debug!(CAT, "Server shut down signal received"),
|
||||
Err(e) => gst::error!(CAT, "{e:?}: Sender dropped"),
|
||||
}
|
||||
});
|
||||
|
||||
server.await;
|
||||
gst::debug!(CAT, "Stopped the server task...");
|
||||
};
|
||||
|
||||
let jh = RUNTIME.spawn(f);
|
||||
|
||||
gst::debug!(CAT, imp: self, "Started the server...");
|
||||
Some(jh)
|
||||
}
|
||||
|
||||
fn set_host_addr(&self, host_addr: &str) -> Result<(), url::ParseError> {
|
||||
let mut settings = self.settings.lock().unwrap();
|
||||
settings.host_addr = Url::parse(host_addr)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl SignallableImpl for WhepServer {
|
||||
fn start(&self) {
|
||||
gst::info!(CAT, imp: self, "starting the WHEP server");
|
||||
let jh = self.serve();
|
||||
let mut settings = self.settings.lock().unwrap();
|
||||
settings.server_handle = jh;
|
||||
}
|
||||
|
||||
fn stop(&self) {
|
||||
let mut settings = self.settings.lock().unwrap();
|
||||
|
||||
let handle = settings
|
||||
.server_handle
|
||||
.take()
|
||||
.expect("Server handle should be set");
|
||||
|
||||
let tx = settings
|
||||
.shutdown_signal
|
||||
.take()
|
||||
.expect("Shutdown signal Sender needs to be valid");
|
||||
|
||||
if tx.send(()).is_err() {
|
||||
gst::error!(CAT, imp: self, "Failed to send shutdown signal. Receiver dropped");
|
||||
}
|
||||
|
||||
gst::debug!(CAT, imp: self, "Await server handle to join");
|
||||
RUNTIME.block_on(async {
|
||||
if let Err(e) = handle.await {
|
||||
gst::error!(CAT, imp:self, "Failed to join server handle: {e:?}");
|
||||
};
|
||||
});
|
||||
|
||||
gst::info!(CAT, imp: self, "stopped the WHEP server");
|
||||
}
|
||||
|
||||
fn end_session(&self, _session_id: &str) {
|
||||
//FIXME: send any events to the client
|
||||
}
|
||||
}
|
||||
|
||||
#[glib::object_subclass]
|
||||
impl ObjectSubclass for WhepServer {
|
||||
const NAME: &'static str = "GstWhepServerSignaller";
|
||||
type Type = super::WhepServerSignaller;
|
||||
type ParentType = glib::Object;
|
||||
type Interfaces = (Signallable,);
|
||||
}
|
||||
|
||||
impl ObjectImpl for WhepServer {
|
||||
fn properties() -> &'static [glib::ParamSpec] {
|
||||
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
|
||||
vec![
|
||||
glib::ParamSpecString::builder("host-addr")
|
||||
.nick("Host address")
|
||||
.blurb("The the host address of the WHEP endpoint e.g., http://127.0.0.1:9090")
|
||||
.default_value(DEFAULT_HOST_ADDR)
|
||||
.flags(glib::ParamFlags::READWRITE)
|
||||
.build(),
|
||||
glib::ParamSpecString::builder("stun-server")
|
||||
.nick("STUN Server")
|
||||
.blurb("The STUN server of the form stun://hostname:port")
|
||||
.default_value(DEFAULT_STUN_SERVER)
|
||||
.build(),
|
||||
gst::ParamSpecArray::builder("turn-servers")
|
||||
.nick("List of TURN Servers to user")
|
||||
.blurb("The TURN servers of the form <\"turn(s)://username:password@host:port\", \"turn(s)://username1:password1@host1:port1\">")
|
||||
.element_spec(&glib::ParamSpecString::builder("turn-server")
|
||||
.nick("TURN Server")
|
||||
.blurb("The TURN server of the form turn(s)://username:password@host:port.")
|
||||
.build()
|
||||
)
|
||||
.mutable_ready()
|
||||
.build(),
|
||||
glib::ParamSpecUInt::builder("timeout")
|
||||
.nick("Timeout")
|
||||
.blurb("Value in seconds to timeout WHEP endpoint requests (0 = No timeout).")
|
||||
.maximum(3600)
|
||||
.default_value(DEFAULT_TIMEOUT)
|
||||
.build(),
|
||||
]
|
||||
});
|
||||
PROPERTIES.as_ref()
|
||||
}
|
||||
|
||||
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
|
||||
match pspec.name() {
|
||||
"host-addr" => {
|
||||
if let Err(e) =
|
||||
self.set_host_addr(value.get::<&str>().expect("type checked upstream"))
|
||||
{
|
||||
gst::error!(CAT, "Couldn't set the host address as {e:?}, fallback to the default value {DEFAULT_HOST_ADDR:?}");
|
||||
}
|
||||
}
|
||||
"stun-server" => {
|
||||
let mut settings = self.settings.lock().unwrap();
|
||||
settings.stun_server = value
|
||||
.get::<Option<String>>()
|
||||
.expect("type checked upstream")
|
||||
}
|
||||
"turn-servers" => {
|
||||
let mut settings = self.settings.lock().unwrap();
|
||||
settings.turn_servers = value.get::<gst::Array>().expect("type checked upstream")
|
||||
}
|
||||
"timeout" => {
|
||||
let mut settings = self.settings.lock().unwrap();
|
||||
settings.timeout = value.get().unwrap();
|
||||
}
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
}
|
||||
|
||||
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
|
||||
let settings = self.settings.lock().unwrap();
|
||||
match pspec.name() {
|
||||
"host-addr" => settings.host_addr.to_string().to_value(),
|
||||
"stun-server" => settings.stun_server.to_value(),
|
||||
"turn-servers" => settings.turn_servers.to_value(),
|
||||
"timeout" => settings.timeout.to_value(),
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue