Compare commits

...

8 commits

Author SHA1 Message Date
Taruntej Kanakamalla e8b563a314 Merge branch 'whepclientsrc' into 'main'
Draft: net/webrtc/whep: add WHEP client signaller

See merge request gstreamer/gst-plugins-rs!1545
2024-04-26 22:00:53 +00:00
Maksym Khomenko a87eaa4b79 hrtfrender: use bitmask, not int, to prevent a capsnego failure
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1549>
2024-04-26 20:24:19 +00:00
Philippe Normand 88cbc93338 dav1ddec: Negotiate bt709 colorimetry when values from seq header are unspecified
With unknown range colorimetry validation would fail in video-info. As our
decoder outputs only YUV formats Bt709 should be a reasonable default.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1548>
2024-04-26 19:35:41 +00:00
Taruntej Kanakamalla 23e8b46cee net/webrtc/whep: add WHEP client signaller
Rewrite the whepsrc element with WHEP Client as a signaller
on top of webrtcsrc
2024-04-19 17:37:19 +05:30
Taruntej Kanakamalla f2783fda93 webrtcsrc: add capability to initiate offer and handle answer
add the handler for session-request signal from the signaller
to initiate an offer

in the session-description signal handler, if the sdp is an answer type
handle the answer from the remote peer and create the src pad using
the media caps of sendonly type media
2024-04-19 17:37:13 +05:30
Taruntej Kanakamalla ac9ef0a8d2 net/webrtc: Example for whipserver
rudimentary sample to test multiple WHIP client connections
2024-04-17 17:27:51 +05:30
Taruntej Kanakamalla 4404cb42b8 net/webrtc/whip_signaller: multiple client support in the server
- generate a new session id for every new client
use the session id in the resource url

- remove the producer-peer-id property in the WhipServer signaler as it
is redundant to have producer id in a session having only one producer

- read the 'producer-peer-id' property on the signaller conditionally
if it exists else use the session id as producer id
2024-04-17 17:17:31 +05:30
Taruntej Kanakamalla 6e1aac0d0b net/webrtc: multi producer support in webrtcsrc
- Add a new structure Session
  - manage each producer using a session
  - avoid send EOS when a session terminates, instead keep running
    waiting for any new producer to connect

- Maintain a bin element per session
  - each session bin encapsulates webrtcbin and the decoder if needed
    as well as the parser and filter if requested by the application
    (through request-encoded-filter)
  - this will be helpful to cleanup the session's respective elements
    when the corresponding producer terminates the session
2024-04-17 16:25:07 +05:30
12 changed files with 1665 additions and 540 deletions

35
Cargo.lock generated
View file

@ -1263,15 +1263,6 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab3db02a9c5b5121e1e42fbdb1aeb65f5e02624cc58c43f2884c6ccac0b82f95"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.5"
@ -1370,6 +1361,16 @@ dependencies = [
"cipher",
]
[[package]]
name = "ctrlc"
version = "3.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82e95fbd621905b854affdc67943b043a0fbb6ed7385fd5a25650d19a8a6cfdf"
dependencies = [
"nix 0.27.1",
"windows-sys 0.48.0",
]
[[package]]
name = "darling"
version = "0.20.8"
@ -2944,9 +2945,10 @@ dependencies = [
"aws-smithy-http",
"aws-smithy-types",
"aws-types",
"bytes",
"chrono",
"clap",
"crossbeam-channel",
"ctrlc",
"data-encoding",
"fastrand",
"futures",
@ -4183,7 +4185,7 @@ dependencies = [
"if-addrs",
"log",
"multimap 0.8.3",
"nix",
"nix 0.23.2",
"rand",
"socket2 0.4.10",
"thiserror",
@ -4674,6 +4676,17 @@ dependencies = [
"memoffset 0.6.5",
]
[[package]]
name = "nix"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053"
dependencies = [
"bitflags 2.5.0",
"cfg-if",
"libc",
]
[[package]]
name = "nnnoiseless"
version = "0.5.1"

View file

@ -649,7 +649,7 @@ impl BaseTransformImpl for HrtfRender {
if direction == gst::PadDirection::Sink {
s.set("channels", 2);
s.set("channel-mask", 0x3);
s.set("channel-mask", gst::Bitmask(0x3));
} else {
let settings = self.settings.lock().unwrap();
if let Some(objs) = &settings.spatial_objects {

View file

@ -58,7 +58,10 @@ livekit-protocol = { version = "0.3", optional = true }
livekit-api = { version = "0.3", default-features = false, features = ["signal-client", "access-token", "native-tls"], optional = true }
warp = {version = "0.3", optional = true }
crossbeam-channel = { version = "0.5", optional = true }
ctrlc = {version = "3.4.0", optional = true }
bytes = "1"
[dev-dependencies]
gst-plugin-rtp = { path = "../rtp" }
@ -78,7 +81,7 @@ path = "src/lib.rs"
gst-plugin-version-helper.workspace = true
[features]
default = ["v1_22", "aws", "janus", "livekit", "whip"]
default = ["v1_22", "aws", "janus", "livekit", "whip", "whep"]
static = []
capi = []
v1_22 = ["gst/v1_22", "gst-app/v1_22", "gst-video/v1_22", "gst-webrtc/v1_22", "gst-sdp/v1_22", "gst-rtp/v1_22"]
@ -89,7 +92,8 @@ aws = ["dep:aws-config", "dep:aws-types", "dep:aws-credential-types", "dep:aws-s
"dep:aws-sdk-kinesisvideosignaling", "dep:data-encoding", "dep:http", "dep:url-escape"]
janus = ["dep:http"]
livekit = ["dep:livekit-protocol", "dep:livekit-api"]
whip = ["dep:async-recursion", "dep:crossbeam-channel", "dep:reqwest", "dep:warp"]
whip = ["dep:async-recursion", "dep:reqwest", "dep:warp", "dep:ctrlc"]
whep = ["dep:async-recursion", "dep:reqwest", "dep:warp"]
[package.metadata.capi]
min_version = "0.9.21"
@ -119,3 +123,6 @@ name = "webrtc-precise-sync-send"
[[example]]
name = "webrtc-precise-sync-recv"
[[example]]
name = "whipserver"

View file

@ -0,0 +1,123 @@
use std::process::exit;
use anyhow::Error;
use clap::Parser;
use gst::prelude::*;
#[derive(Parser, Debug)]
struct Args {
host_addr: String,
}
fn link_video(pad: &gst::Pad, pipeline: &gst::Pipeline) {
let q = gst::ElementFactory::make_with_name(
"queue",
Some(format!("queue_{}", pad.name()).as_str()),
)
.unwrap();
// let vconv = gst::ElementFactory::make_with_name("videoconvert", Some(format!("vconv_{}",pad.name()).as_str())).unwrap();
let vsink = gst::ElementFactory::make_with_name(
"autovideosink",
Some(format!("vsink_{}", pad.name()).as_str()),
)
.unwrap();
pipeline.add_many([&q, &vsink]).unwrap();
gst::Element::link_many([&q, &vsink]).unwrap();
let qsinkpad = q.static_pad("sink").unwrap();
pad.link(&qsinkpad).expect("linking should work");
q.sync_state_with_parent().unwrap();
// vconv.sync_state_with_parent().unwrap();
vsink.sync_state_with_parent().unwrap();
}
fn unlink_video(pad: &gst::Pad, pipeline: &gst::Pipeline) {
let q = pipeline
.by_name(format!("queue_{}", pad.name()).as_str())
.unwrap();
// let vconv = pipeline.by_name(format!("vconv_{}",pad.name()).as_str()).unwrap();
let vsink = pipeline
.by_name(format!("vsink_{}", pad.name()).as_str())
.unwrap();
q.set_state(gst::State::Null).unwrap();
// vconv.set_state(gst::State::Null).unwrap();
vsink.set_state(gst::State::Null).unwrap();
pipeline.remove_many([&q, &vsink]).unwrap();
}
fn link_audio(_pad: &gst::Pad) {}
fn main() -> Result<(), Error> {
gst::init()?;
let args = Args::parse();
let pipeline = gst::Pipeline::builder().build();
let ws = gst::ElementFactory::make("whipserversrc").build()?;
ws.dynamic_cast_ref::<gst::ChildProxy>()
.unwrap()
.set_child_property("signaller::host-addr", &args.host_addr);
ws.set_property("enable-data-channel-navigation", true);
let pipe = pipeline.clone();
ws.connect_pad_added(move |_ws, pad| {
if pad.name().contains("video_") {
link_video(pad, &pipe);
} else if pad.name().contains("audio_") {
} else {
println!("unknown pad type {}", pad.name());
}
});
let pipe = pipeline.clone();
ws.connect_pad_removed(move |_ws, pad| {
if pad.name().contains("video_") {
unlink_video(pad, &pipe);
} else if pad.name().contains("audio_") {
} else {
println!("unknown pad type {}", pad.name());
}
});
pipeline.add(&ws)?;
pipeline.set_state(gst::State::Playing)?;
let p = pipeline.clone();
ctrlc::set_handler(move || {
p.set_state(gst::State::Null).unwrap();
exit(0);
})
.expect("Error setting Ctrl-C handler");
let bus = pipeline.bus().expect("Pipeline should have a bus");
for msg in bus.iter_timed(gst::ClockTime::NONE) {
use gst::MessageView;
match msg.view() {
MessageView::Eos(..) => {
println!("EOS");
break;
}
MessageView::Error(err) => {
pipeline.set_state(gst::State::Null)?;
eprintln!(
"Got error from {}: {} ({})",
msg.src()
.map(|s| String::from(s.path_string()))
.unwrap_or_else(|| "None".into()),
err.error(),
err.debug().unwrap_or_else(|| "".into()),
);
break;
}
_ => (),
}
}
pipeline.set_state(gst::State::Null)?;
Ok(())
}

View file

@ -24,6 +24,8 @@ pub mod signaller;
pub mod utils;
pub mod webrtcsink;
pub mod webrtcsrc;
#[cfg(feature = "whep")]
mod whep_signaller;
#[cfg(feature = "whip")]
mod whip_signaller;

View file

@ -104,9 +104,9 @@ use crate::RUNTIME;
use futures::future;
use futures::prelude::*;
use gst::ErrorMessage;
#[cfg(feature = "whip")]
#[cfg(any(feature = "whip", feature = "whep"))]
use reqwest::header::HeaderMap;
#[cfg(feature = "whip")]
#[cfg(any(feature = "whip", feature = "whep"))]
use reqwest::redirect::Policy;
use std::sync::Mutex;
use std::time::Duration;
@ -239,7 +239,7 @@ where
res
}
#[cfg(feature = "whip")]
#[cfg(any(feature = "whip", feature = "whep"))]
pub fn parse_redirect_location(
headermap: &HeaderMap,
old_url: &reqwest::Url,
@ -280,13 +280,13 @@ pub fn parse_redirect_location(
}
}
#[cfg(feature = "whip")]
#[cfg(any(feature = "whip", feature = "whep"))]
pub fn build_reqwest_client(pol: Policy) -> reqwest::Client {
let client_builder = reqwest::Client::builder();
client_builder.redirect(pol).build().unwrap()
}
#[cfg(feature = "whip")]
#[cfg(any(feature = "whip", feature = "whep"))]
pub fn set_ice_servers(
webrtcbin: &gst::Element,
headermap: &HeaderMap,

File diff suppressed because it is too large Load diff

View file

@ -59,6 +59,11 @@ glib::wrapper! {
pub struct LiveKitWebRTCSrc(ObjectSubclass<imp::livekit::LiveKitWebRTCSrc>) @extends BaseWebRTCSrc, gst::Bin, gst::Element, gst::Object, gst::ChildProxy;
}
#[cfg(feature = "whep")]
glib::wrapper! {
pub struct WhepClientSrc(ObjectSubclass<imp::whep::WhepClientSrc>) @extends BaseWebRTCSrc, gst::Bin, gst::Element, gst::Object, @implements gst::URIHandler, gst::ChildProxy;
}
glib::wrapper! {
pub struct WebRTCSrcPad(ObjectSubclass<pad::WebRTCSrcPad>) @extends gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object;
}
@ -139,5 +144,13 @@ pub fn register(plugin: Option<&gst::Plugin>) -> Result<(), glib::BoolError> {
LiveKitWebRTCSrc::static_type(),
)?;
#[cfg(feature = "whep")]
gst::Element::register(
plugin,
"whepclientsrc",
gst::Rank::PRIMARY,
WhepClientSrc::static_type(),
)?;
Ok(())
}

View file

@ -0,0 +1,600 @@
// Copyright (C) 2022, Asymptotic Inc.
// Author: Sanchayan Maity <sanchayan@asymptotic.io>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use crate::signaller::{Signallable, SignallableImpl};
use crate::utils::{
build_reqwest_client, parse_redirect_location, set_ice_servers, wait, wait_async, WaitError,
};
use crate::RUNTIME;
use async_recursion::async_recursion;
use bytes::Bytes;
use futures::future;
use gst::glib::RustClosure;
use gst::{glib, prelude::*, subclass::prelude::*};
use gst_sdp::*;
use gst_webrtc::*;
use once_cell::sync::Lazy;
use reqwest::header::{HeaderMap, HeaderValue};
use reqwest::StatusCode;
use std::sync::Mutex;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"whep-client-signaller",
gst::DebugColorFlags::empty(),
Some("WHEP Client Signaller"),
)
});
const MAX_REDIRECTS: u8 = 10;
const DEFAULT_TIMEOUT: u32 = 15;
const SESSION_ID: &str = "whep-client";
#[derive(Debug, Clone)]
struct Settings {
whep_endpoint: Option<String>,
auth_token: Option<String>,
use_link_headers: bool,
timeout: u32,
}
#[allow(clippy::derivable_impls)]
impl Default for Settings {
fn default() -> Self {
Self {
whep_endpoint: None,
auth_token: None,
use_link_headers: false,
timeout: DEFAULT_TIMEOUT,
}
}
}
#[derive(Debug)]
enum State {
Stopped,
Post { redirects: u8 },
Running { whep_resource: String },
}
impl Default for State {
fn default() -> Self {
Self::Stopped
}
}
pub struct WhepClient {
settings: Mutex<Settings>,
state: Mutex<State>,
canceller: Mutex<Option<future::AbortHandle>>,
client: reqwest::Client,
}
impl Default for WhepClient {
fn default() -> Self {
// We'll handle redirects manually since the default redirect handler does not
// reuse the authentication token on the redirected server
let pol = reqwest::redirect::Policy::none();
let client = build_reqwest_client(pol);
Self {
settings: Mutex::new(Settings::default()),
state: Mutex::new(State::default()),
canceller: Mutex::new(None),
client,
}
}
}
impl ObjectImpl for WhepClient {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpecString::builder("whep-endpoint")
.nick("WHEP Endpoint")
.blurb("The WHEP server endpoint to POST SDP offer to.")
.build(),
glib::ParamSpecBoolean::builder("use-link-headers")
.nick("Use Link Headers")
.blurb("Use link headers to configure STUN/TURN servers if present in WHEP endpoint response.")
.build(),
glib::ParamSpecString::builder("auth-token")
.nick("Authorization Token")
.blurb("Authentication token to use, will be sent in the HTTP Header as 'Bearer <auth-token>'")
.build(),
glib::ParamSpecUInt::builder("timeout")
.nick("Timeout")
.blurb("Value in seconds to timeout WHEP endpoint requests (0 = No timeout).")
.maximum(3600)
.default_value(DEFAULT_TIMEOUT)
.readwrite()
.build(),
]
});
PROPERTIES.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"whep-endpoint" => {
let mut settings = self.settings.lock().unwrap();
settings.whep_endpoint = value.get().expect("WHEP endpoint should be a string");
}
"use-link-headers" => {
let mut settings = self.settings.lock().unwrap();
settings.use_link_headers = value
.get()
.expect("use-link-headers should be a boolean value");
}
"auth-token" => {
let mut settings = self.settings.lock().unwrap();
settings.auth_token = value.get().expect("Auth token should be a string");
}
"timeout" => {
let mut settings = self.settings.lock().unwrap();
settings.timeout = value.get().expect("type checked upstream");
}
_ => unimplemented!(),
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"whep-endpoint" => {
let settings = self.settings.lock().unwrap();
settings.whep_endpoint.to_value()
}
"use-link-headers" => {
let settings = self.settings.lock().unwrap();
settings.use_link_headers.to_value()
}
"auth-token" => {
let settings = self.settings.lock().unwrap();
settings.auth_token.to_value()
}
"timeout" => {
let settings = self.settings.lock().unwrap();
settings.timeout.to_value()
}
_ => unimplemented!(),
}
}
}
#[glib::object_subclass]
impl ObjectSubclass for WhepClient {
const NAME: &'static str = "GstWhepClientSignaller";
type Type = super::WhepClientSignaller;
type ParentType = glib::Object;
type Interfaces = (Signallable,);
}
impl WhepClient {
fn raise_error(&self, msg: String) {
self.obj()
.emit_by_name::<()>("error", &[&format!("Error: {msg}")]);
}
fn handle_future_error(&self, err: WaitError) {
match err {
WaitError::FutureAborted => {
gst::warning!(CAT, imp: self, "Future aborted")
}
WaitError::FutureError(err) => self.raise_error(err.to_string()),
};
}
fn sdp_message_parse(&self, sdp_bytes: Bytes, _webrtcbin: &gst::Element) {
let sdp = match sdp_message::SDPMessage::parse_buffer(&sdp_bytes) {
Ok(sdp) => sdp,
Err(_) => {
self.raise_error("Could not parse answer SDP".to_string());
return;
}
};
let remote_sdp = WebRTCSessionDescription::new(WebRTCSDPType::Answer, sdp);
self.obj()
.emit_by_name::<()>("session-description", &[&SESSION_ID, &remote_sdp]);
}
async fn parse_endpoint_response(
&self,
sess_desc: WebRTCSessionDescription,
resp: reqwest::Response,
redirects: u8,
webrtcbin: gst::Element,
) {
let endpoint;
let use_link_headers;
{
let settings = self.settings.lock().unwrap();
endpoint =
reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str()).unwrap();
use_link_headers = settings.use_link_headers;
drop(settings);
}
match resp.status() {
StatusCode::OK | StatusCode::NO_CONTENT => {
gst::info!(CAT, imp: self, "SDP offer successfully send");
}
StatusCode::CREATED => {
gst::debug!(CAT, imp: self, "Response headers: {:?}", resp.headers());
if use_link_headers {
if let Err(e) = set_ice_servers(&webrtcbin, resp.headers()) {
self.raise_error(e.to_string());
return;
};
}
/* See section 4.2 of the WHEP specification */
let location = match resp.headers().get(reqwest::header::LOCATION) {
Some(location) => location,
None => {
self.raise_error(
"Location header field should be present for WHEP resource URL"
.to_string(),
);
return;
}
};
let location = match location.to_str() {
Ok(loc) => loc,
Err(e) => {
self.raise_error(format!("Failed to convert location to string: {e}"));
return;
}
};
let url = reqwest::Url::parse(endpoint.as_str()).unwrap();
gst::debug!(CAT, imp: self, "WHEP resource: {:?}", location);
let url = match url.join(location) {
Ok(joined_url) => joined_url,
Err(err) => {
self.raise_error(format!("URL join operation failed: {err:?}"));
return;
}
};
match resp.bytes().await {
Ok(ans_bytes) => {
let mut state = self.state.lock().unwrap();
*state = match *state {
State::Post { redirects: _r } => State::Running {
whep_resource: url.to_string(),
},
_ => {
self.raise_error("Expected to be in POST state".to_string());
return;
}
};
drop(state);
self.sdp_message_parse(ans_bytes, &webrtcbin)
}
Err(err) => self.raise_error(err.to_string()),
}
}
status if status.is_redirection() => {
if redirects < MAX_REDIRECTS {
match parse_redirect_location(resp.headers(), &endpoint) {
Ok(redirect_url) => {
{
let mut state = self.state.lock().unwrap();
*state = match *state {
State::Post { redirects: _r } => State::Post {
redirects: redirects + 1,
},
/*
* As per section 4.6 of the specification, redirection is
* not required to be supported for the PATCH and DELETE
* requests to the final WHEP resource URL. Only the initial
* POST request may support redirection.
*/
State::Running { .. } => {
self.raise_error(
"Unexpected redirection in RUNNING state".to_string(),
);
return;
}
State::Stopped => unreachable!(),
};
drop(state);
}
gst::warning!(
CAT,
imp: self,
"Redirecting endpoint to {}",
redirect_url.as_str()
);
self.do_post(sess_desc, webrtcbin, redirect_url).await
}
Err(e) => self.raise_error(e.to_string()),
}
} else {
self.raise_error("Too many redirects. Unable to connect.".to_string());
}
}
s => {
match resp.bytes().await {
Ok(r) => {
let res = r.escape_ascii().to_string();
// FIXME: Check and handle 'Retry-After' header in case of server error
self.raise_error(format!("Unexpected response: {} - {}", s.as_str(), res));
}
Err(err) => self.raise_error(err.to_string()),
}
}
}
}
async fn whep_offer(&self, webrtcbin: gst::Element) {
let local_desc =
webrtcbin.property::<Option<WebRTCSessionDescription>>("local-description");
let sess_desc = match local_desc {
None => {
self.raise_error("Local description is not set".to_string());
return;
}
Some(mut local_desc) => {
local_desc.set_type(WebRTCSDPType::Offer);
local_desc
}
};
gst::debug!(
CAT,
imp: self,
"Sending offer SDP: {:?}",
sess_desc.sdp().as_text()
);
let timeout;
let endpoint;
{
let settings = self.settings.lock().unwrap();
timeout = settings.timeout;
endpoint =
reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str()).unwrap();
drop(settings);
}
if let Err(e) = wait_async(
&self.canceller,
self.do_post(sess_desc, webrtcbin, endpoint),
timeout,
)
.await
{
self.handle_future_error(e);
}
}
#[async_recursion]
async fn do_post(
&self,
offer: WebRTCSessionDescription,
webrtcbin: gst::Element,
endpoint: reqwest::Url,
) {
let auth_token;
{
let settings = self.settings.lock().unwrap();
auth_token = settings.auth_token.clone();
drop(settings);
}
let sdp = offer.sdp();
let body = sdp.as_text().unwrap();
gst::info!(CAT, imp: self, "Using endpoint {}", endpoint.as_str());
let mut headermap = HeaderMap::new();
headermap.insert(
reqwest::header::CONTENT_TYPE,
HeaderValue::from_static("application/sdp"),
);
if let Some(token) = auth_token.as_ref() {
let bearer_token = "Bearer ".to_owned() + token.as_str();
headermap.insert(
reqwest::header::AUTHORIZATION,
HeaderValue::from_str(bearer_token.as_str())
.expect("Failed to set auth token to header"),
);
}
gst::debug!(
CAT,
imp: self,
"Url for HTTP POST request: {}",
endpoint.as_str()
);
let resp = self
.client
.request(reqwest::Method::POST, endpoint.clone())
.headers(headermap)
.body(body)
.send()
.await;
match resp {
Ok(r) => {
#[allow(unused_mut)]
let mut redirects;
{
let state = self.state.lock().unwrap();
redirects = match *state {
State::Post { redirects } => redirects,
_ => {
self.raise_error("Trying to do POST in unexpected state".to_string());
return;
}
};
drop(state);
}
self.parse_endpoint_response(offer, r, redirects, webrtcbin)
.await
}
Err(err) => self.raise_error(err.to_string()),
}
}
fn terminate_session(&self) {
let settings = self.settings.lock().unwrap();
let state = self.state.lock().unwrap();
let timeout = settings.timeout;
let resource_url = match *state {
State::Running {
whep_resource: ref whep_resource_url,
} => whep_resource_url.clone(),
_ => {
self.raise_error("Terminated in unexpected state".to_string());
return;
}
};
drop(state);
let mut headermap = HeaderMap::new();
if let Some(token) = &settings.auth_token {
let bearer_token = "Bearer ".to_owned() + token.as_str();
headermap.insert(
reqwest::header::AUTHORIZATION,
HeaderValue::from_str(bearer_token.as_str())
.expect("Failed to set auth token to header"),
);
}
drop(settings);
gst::debug!(CAT, imp: self, "DELETE request on {}", resource_url);
/* DELETE request goes to the WHEP resource URL. See section 3 of the specification. */
let client = build_reqwest_client(reqwest::redirect::Policy::default());
let future = async {
client
.delete(resource_url.clone())
.headers(headermap)
.send()
.await
.map_err(|err| {
gst::error_msg!(
gst::ResourceError::Failed,
["DELETE request failed {}: {:?}", resource_url, err]
)
})
};
let res = wait(&self.canceller, future, timeout);
match res {
Ok(r) => {
gst::debug!(CAT, imp: self, "Response to DELETE : {}", r.status());
}
Err(e) => match e {
WaitError::FutureAborted => {
gst::warning!(CAT, imp: self, "DELETE request aborted")
}
WaitError::FutureError(e) => {
gst::error!(CAT, imp: self, "Error on DELETE request : {}", e)
}
},
};
}
pub fn on_webrtcbin_ready(&self) -> RustClosure {
glib::closure!(|signaller: &super::WhepClientSignaller,
_consumer_identifier: &str,
webrtcbin: &gst::Element| {
let obj_weak = signaller.downgrade();
webrtcbin.connect_notify(Some("ice-gathering-state"), move |webrtcbin, _pspec| {
let Some(obj) = obj_weak.upgrade() else {
return;
};
let state = webrtcbin.property::<WebRTCICEGatheringState>("ice-gathering-state");
match state {
WebRTCICEGatheringState::Gathering => {
gst::info!(CAT, obj: obj, "ICE gathering started");
}
WebRTCICEGatheringState::Complete => {
gst::info!(CAT, obj: obj, "ICE gathering complete");
let webrtcbin = webrtcbin.clone();
RUNTIME.spawn(async move { obj.imp().whep_offer(webrtcbin).await });
}
_ => (),
}
});
})
}
}
impl SignallableImpl for WhepClient {
fn start(&self) {
if self.settings.lock().unwrap().whep_endpoint.is_none() {
self.raise_error("WHEP endpoint URL must be set".to_string());
return;
}
let mut state = self.state.lock().unwrap();
*state = State::Post { redirects: 0 };
drop(state);
self.obj()
.emit_by_name::<()>("session-started", &[&SESSION_ID, &SESSION_ID]);
self.obj().emit_by_name::<()>(
"session-requested",
&[
&SESSION_ID,
&SESSION_ID,
&None::<gst_webrtc::WebRTCSessionDescription>,
],
);
}
fn stop(&self) {}
fn end_session(&self, _session_id: &str) {
// Interrupt requests in progress, if any
if let Some(canceller) = &*self.canceller.lock().unwrap() {
canceller.abort();
}
let state = self.state.lock().unwrap();
if let State::Running { .. } = *state {
// Release server-side resources
drop(state);
self.terminate_session();
}
}
}

View file

@ -0,0 +1,21 @@
// SPDX-License-Identifier: MPL-2.0
use crate::signaller::Signallable;
use gst::{glib, prelude::ObjectExt, subclass::prelude::ObjectSubclassIsExt};
mod client;
glib::wrapper! {
pub struct WhepClientSignaller(ObjectSubclass<client::WhepClient>) @implements Signallable;
}
unsafe impl Send for WhepClientSignaller {}
unsafe impl Sync for WhepClientSignaller {}
impl Default for WhepClientSignaller {
fn default() -> Self {
let sig: WhepClientSignaller = glib::Object::new();
sig.connect_closure("webrtcbin-ready", false, sig.imp().on_webrtcbin_ready());
sig
}
}

View file

@ -18,9 +18,8 @@ use reqwest::header::HeaderValue;
use reqwest::StatusCode;
use std::sync::Mutex;
use core::time::Duration;
use crossbeam_channel::unbounded;
use std::net::SocketAddr;
use tokio::sync::mpsc;
use url::Url;
use warp::{
http,
@ -47,7 +46,6 @@ const ENDPOINT_PATH: &str = "endpoint";
const RESOURCE_PATH: &str = "resource";
const DEFAULT_HOST_ADDR: &str = "http://127.0.0.1:8080";
const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://stun.l.google.com:19303");
const DEFAULT_PRODUCER_PEER_ID: Option<&str> = Some("whip-client");
const CONTENT_SDP: &str = "application/sdp";
const CONTENT_TRICKLE_ICE: &str = "application/trickle-ice-sdpfrag";
@ -193,7 +191,7 @@ impl WhipClient {
let mut headermap = HeaderMap::new();
headermap.insert(
reqwest::header::CONTENT_TYPE,
HeaderValue::from_static("application/sdp"),
HeaderValue::from_static(CONTENT_SDP),
);
if let Some(token) = auth_token.as_ref() {
@ -616,27 +614,14 @@ impl ObjectImpl for WhipClient {
// WHIP server implementation
#[derive(Debug)]
enum WhipServerState {
Idle,
Negotiating,
Ready,
}
impl Default for WhipServerState {
fn default() -> Self {
Self::Idle
}
}
struct WhipServerSettings {
stun_server: Option<String>,
turn_servers: gst::Array,
host_addr: Url,
producer_peer_id: Option<String>,
timeout: u32,
shutdown_signal: Option<tokio::sync::oneshot::Sender<()>>,
server_handle: Option<tokio::task::JoinHandle<()>>,
sdp_answer: Option<crossbeam_channel::Sender<Option<SDPMessage>>>,
sdp_answer: Option<mpsc::Sender<Option<SDPMessage>>>,
}
impl Default for WhipServerSettings {
@ -645,7 +630,6 @@ impl Default for WhipServerSettings {
host_addr: Url::parse(DEFAULT_HOST_ADDR).unwrap(),
stun_server: DEFAULT_STUN_SERVER.map(String::from),
turn_servers: gst::Array::new(Vec::new() as Vec<glib::SendValue>),
producer_peer_id: DEFAULT_PRODUCER_PEER_ID.map(String::from),
timeout: DEFAULT_TIMEOUT,
shutdown_signal: None,
server_handle: None,
@ -654,18 +638,10 @@ impl Default for WhipServerSettings {
}
}
#[derive(Default)]
pub struct WhipServer {
state: Mutex<WhipServerState>,
settings: Mutex<WhipServerSettings>,
}
impl Default for WhipServer {
fn default() -> Self {
Self {
settings: Mutex::new(WhipServerSettings::default()),
state: Mutex::new(WhipServerState::default()),
}
}
canceller: Mutex<Option<futures::future::AbortHandle>>,
}
#[derive(Debug)]
@ -694,7 +670,7 @@ impl WhipServer {
WebRTCICEGatheringState::Complete => {
gst::info!(CAT, obj: obj, "ICE gathering complete");
let ans: Option<gst_sdp::SDPMessage>;
let settings = obj.imp().settings.lock().unwrap();
let mut settings = obj.imp().settings.lock().unwrap();
if let Some(answer_desc) = webrtcbin
.property::<Option<WebRTCSessionDescription>>("local-description")
{
@ -702,9 +678,22 @@ impl WhipServer {
} else {
ans = None;
}
if let Some(tx) = &settings.sdp_answer {
tx.send(ans).unwrap()
}
let tx = settings
.sdp_answer
.take()
.expect("SDP answer Sender needs to be valid");
let obj_weak = obj.downgrade();
RUNTIME.spawn(async move {
let obj = match obj_weak.upgrade() {
Some(obj) => obj,
None => return,
};
if let Err(e) = tx.send(ans).await {
gst::error!(CAT, obj: obj, "Failed to send SDP {e}");
}
});
}
_ => (),
}
@ -722,57 +711,23 @@ impl WhipServer {
//FIXME: add state checking once ICE trickle is implemented
}
async fn delete_handler(&self, _id: String) -> Result<impl warp::Reply, warp::Rejection> {
let mut state = self.state.lock().unwrap();
match *state {
WhipServerState::Ready => {
// FIXME: session-ended will make webrtcsrc send EOS
// and producer-removed is not handled
// Need to address the usecase where when the client terminates
// the webrtcsrc should be running without sending EOS and reset
// for next client connection like a usual server
self.obj().emit_by_name::<bool>("session-ended", &[&ROOT]);
gst::info!(CAT, imp:self, "Ending session");
*state = WhipServerState::Idle;
Ok(warp::reply::reply().into_response())
}
_ => {
gst::error!(CAT, imp: self, "DELETE requested in {state:?} state. Can't Proceed");
let res = http::Response::builder()
.status(http::StatusCode::CONFLICT)
.body(Body::from(String::from("Session not Ready")))
.unwrap();
Ok(res)
}
async fn delete_handler(&self, id: String) -> Result<impl warp::Reply, warp::Rejection> {
if self
.obj()
.emit_by_name::<bool>("session-ended", &[&id.as_str()])
{
gst::info!(CAT, imp:self, "Ended session {id}");
} else {
gst::info!(CAT, imp:self, "Failed to End session {id}");
// FIXME: Do we send a different response
}
Ok(warp::reply::reply().into_response())
}
async fn options_handler(&self) -> Result<impl warp::Reply, warp::Rejection> {
let settings = self.settings.lock().unwrap();
let peer_id = settings.producer_peer_id.clone().unwrap();
drop(settings);
let mut state = self.state.lock().unwrap();
match *state {
WhipServerState::Idle => {
self.obj()
.emit_by_name::<()>("session-started", &[&ROOT, &peer_id]);
*state = WhipServerState::Negotiating
}
WhipServerState::Ready => {
gst::error!(CAT, imp: self, "OPTIONS requested in {state:?} state. Can't proceed");
let res = http::Response::builder()
.status(http::StatusCode::CONFLICT)
.body(Body::from(String::from("Session active already")))
.unwrap();
return Ok(res);
}
_ => {}
};
drop(state);
let mut links = HeaderMap::new();
let settings = self.settings.lock().unwrap();
match &settings.stun_server {
@ -806,7 +761,7 @@ impl WhipServer {
}
let mut res = http::Response::builder()
.header("Access-Post", "application/sdp")
.header("Access-Post", CONTENT_SDP)
.body(Body::empty())
.unwrap();
@ -820,31 +775,15 @@ impl WhipServer {
&self,
body: warp::hyper::body::Bytes,
) -> Result<http::Response<warp::hyper::Body>, warp::Rejection> {
let mut settings = self.settings.lock().unwrap();
let peer_id = settings.producer_peer_id.clone().unwrap();
let wait_timeout = settings.timeout;
let (tx, rx) = unbounded::<Option<SDPMessage>>();
settings.sdp_answer = Some(tx);
drop(settings);
let mut state = self.state.lock().unwrap();
match *state {
WhipServerState::Idle => {
self.obj()
.emit_by_name::<()>("session-started", &[&ROOT, &peer_id]);
*state = WhipServerState::Negotiating
}
WhipServerState::Ready => {
gst::error!(CAT, imp: self, "POST requested in {state:?} state. Can't Proceed");
let res = http::Response::builder()
.status(http::StatusCode::CONFLICT)
.body(Body::from(String::from("Session active already")))
.unwrap();
return Ok(res);
}
_ => {}
let session_id = uuid::Uuid::new_v4().to_string();
let (tx, mut rx) = mpsc::channel::<Option<SDPMessage>>(1);
let wait_timeout = {
let mut settings = self.settings.lock().unwrap();
let wait_timeout = settings.timeout;
settings.sdp_answer = Some(tx);
drop(settings);
wait_timeout
};
drop(state);
match gst_sdp::SDPMessage::parse_buffer(body.as_ref()) {
Ok(offer_sdp) => {
@ -854,7 +793,9 @@ impl WhipServer {
);
self.obj()
.emit_by_name::<()>("session-description", &[&"unique", &offer]);
.emit_by_name::<()>("session-started", &[&session_id, &session_id]);
self.obj()
.emit_by_name::<()>("session-description", &[&session_id, &offer]);
}
Err(err) => {
gst::error!(CAT, imp: self, "Could not parse offer SDP: {err}");
@ -864,20 +805,32 @@ impl WhipServer {
}
}
// We don't want to wait infinitely for the ice gathering to complete.
let answer = match rx.recv_timeout(Duration::from_secs(wait_timeout as u64)) {
Ok(a) => a,
Err(e) => {
let reply = warp::reply::reply();
let res;
if e.is_timeout() {
res = warp::reply::with_status(reply, http::StatusCode::REQUEST_TIMEOUT);
gst::error!(CAT, imp: self, "Timedout waiting for SDP answer");
} else {
res = warp::reply::with_status(reply, http::StatusCode::INTERNAL_SERVER_ERROR);
gst::error!(CAT, imp: self, "Channel got disconnected");
let result = wait_async(&self.canceller, rx.recv(), wait_timeout).await;
let answer = match result {
Ok(ans) => match ans {
Some(a) => a,
None => {
let err = "Channel closed, can't receive SDP".to_owned();
let res = http::Response::builder()
.status(http::StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(err))
.unwrap();
return Ok(res);
}
return Ok(res.into_response());
},
Err(e) => {
let err = match e {
WaitError::FutureAborted => "Aborted".to_owned(),
WaitError::FutureError(err) => err.to_string(),
};
let res = http::Response::builder()
.status(http::StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(err))
.unwrap();
return Ok(res);
}
};
@ -947,10 +900,10 @@ impl WhipServer {
drop(settings);
// Got SDP answer, send answer in the response
let resource_url = "/".to_owned() + ROOT + "/" + RESOURCE_PATH + "/" + &peer_id;
let resource_url = "/".to_owned() + ROOT + "/" + RESOURCE_PATH + "/" + &session_id;
let mut res = http::Response::builder()
.status(StatusCode::CREATED)
.header(CONTENT_TYPE, "application/sdp")
.header(CONTENT_TYPE, CONTENT_SDP)
.header("location", resource_url)
.body(Body::from(ans_text.unwrap()))
.unwrap();
@ -958,10 +911,6 @@ impl WhipServer {
let headers = res.headers_mut();
headers.extend(links);
let mut state = self.state.lock().unwrap();
*state = WhipServerState::Ready;
drop(state);
Ok(res)
}
@ -1117,7 +1066,8 @@ impl SignallableImpl for WhipServer {
gst::info!(CAT, imp: self, "stopped the WHIP server");
}
fn end_session(&self, _session_id: &str) {
fn end_session(&self, session_id: &str) {
gst::info!(CAT, imp: self, "Session {session_id} ended");
//FIXME: send any events to the client
}
}
@ -1140,11 +1090,6 @@ impl ObjectImpl for WhipServer {
.default_value(DEFAULT_HOST_ADDR)
.flags(glib::ParamFlags::READWRITE)
.build(),
// needed by webrtcsrc in handle_webrtc_src_pad
glib::ParamSpecString::builder("producer-peer-id")
.default_value(DEFAULT_PRODUCER_PEER_ID)
.flags(glib::ParamFlags::READABLE)
.build(),
glib::ParamSpecString::builder("stun-server")
.nick("STUN Server")
.blurb("The STUN server of the form stun://hostname:port")
@ -1204,7 +1149,6 @@ impl ObjectImpl for WhipServer {
"host-addr" => settings.host_addr.to_string().to_value(),
"stun-server" => settings.stun_server.to_value(),
"turn-servers" => settings.turn_servers.to_value(),
"producer-peer-id" => settings.producer_peer_id.to_value(),
"timeout" => settings.timeout.to_value(),
_ => unimplemented!(),
}

View file

@ -134,7 +134,7 @@ impl Dav1dDec {
let matrix = match pic.matrix_coefficients() {
pixel::MatrixCoefficients::Identity => gst_video::VideoColorMatrix::Rgb,
pixel::MatrixCoefficients::BT709 => gst_video::VideoColorMatrix::Bt709,
pixel::MatrixCoefficients::Unspecified => gst_video::VideoColorMatrix::Unknown,
pixel::MatrixCoefficients::Unspecified => gst_video::VideoColorMatrix::Bt709,
pixel::MatrixCoefficients::BT470M => gst_video::VideoColorMatrix::Fcc,
pixel::MatrixCoefficients::BT470BG => gst_video::VideoColorMatrix::Bt601,
pixel::MatrixCoefficients::ST240M => gst_video::VideoColorMatrix::Smpte240m,
@ -149,7 +149,7 @@ impl Dav1dDec {
let transfer = match pic.transfer_characteristic() {
pixel::TransferCharacteristic::BT1886 => gst_video::VideoTransferFunction::Bt709,
pixel::TransferCharacteristic::Unspecified => gst_video::VideoTransferFunction::Unknown,
pixel::TransferCharacteristic::Unspecified => gst_video::VideoTransferFunction::Bt709,
pixel::TransferCharacteristic::BT470M => gst_video::VideoTransferFunction::Bt709,
pixel::TransferCharacteristic::BT470BG => gst_video::VideoTransferFunction::Gamma28,
pixel::TransferCharacteristic::ST170M => gst_video::VideoTransferFunction::Bt601,
@ -180,7 +180,7 @@ impl Dav1dDec {
let primaries = match pic.color_primaries() {
pixel::ColorPrimaries::BT709 => gst_video::VideoColorPrimaries::Bt709,
pixel::ColorPrimaries::Unspecified => gst_video::VideoColorPrimaries::Unknown,
pixel::ColorPrimaries::Unspecified => gst_video::VideoColorPrimaries::Bt709,
pixel::ColorPrimaries::BT470M => gst_video::VideoColorPrimaries::Bt470m,
pixel::ColorPrimaries::BT470BG => gst_video::VideoColorPrimaries::Bt470bg,
pixel::ColorPrimaries::ST240M => gst_video::VideoColorPrimaries::Smpte240m,