mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2024-05-12 21:32:39 +00:00
net/webrtc/whep: add WHEP client signaller
Rewrite the whepsrc element with WHEP Client as a signaller on top of webrtcsrc
This commit is contained in:
parent
f2783fda93
commit
23e8b46cee
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -2956,6 +2956,7 @@ dependencies = [
|
|||
"aws-smithy-http",
|
||||
"aws-smithy-types",
|
||||
"aws-types",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"clap",
|
||||
"ctrlc",
|
||||
|
|
|
@ -61,6 +61,8 @@ warp = {version = "0.3", optional = true }
|
|||
ctrlc = {version = "3.4.0", optional = true }
|
||||
|
||||
|
||||
bytes = "1"
|
||||
|
||||
[dev-dependencies]
|
||||
gst-plugin-rtp = { path = "../rtp" }
|
||||
tokio = { version = "1", features = ["signal"] }
|
||||
|
@ -79,7 +81,7 @@ path = "src/lib.rs"
|
|||
gst-plugin-version-helper.workspace = true
|
||||
|
||||
[features]
|
||||
default = ["v1_22", "aws", "janus", "livekit", "whip"]
|
||||
default = ["v1_22", "aws", "janus", "livekit", "whip", "whep"]
|
||||
static = []
|
||||
capi = []
|
||||
v1_22 = ["gst/v1_22", "gst-app/v1_22", "gst-video/v1_22", "gst-webrtc/v1_22", "gst-sdp/v1_22", "gst-rtp/v1_22"]
|
||||
|
@ -91,6 +93,7 @@ aws = ["dep:aws-config", "dep:aws-types", "dep:aws-credential-types", "dep:aws-s
|
|||
janus = ["dep:http"]
|
||||
livekit = ["dep:livekit-protocol", "dep:livekit-api"]
|
||||
whip = ["dep:async-recursion", "dep:reqwest", "dep:warp", "dep:ctrlc"]
|
||||
whep = ["dep:async-recursion", "dep:reqwest", "dep:warp"]
|
||||
|
||||
[package.metadata.capi]
|
||||
min_version = "0.9.21"
|
||||
|
|
|
@ -24,6 +24,8 @@ pub mod signaller;
|
|||
pub mod utils;
|
||||
pub mod webrtcsink;
|
||||
pub mod webrtcsrc;
|
||||
#[cfg(feature = "whep")]
|
||||
mod whep_signaller;
|
||||
#[cfg(feature = "whip")]
|
||||
mod whip_signaller;
|
||||
|
||||
|
|
|
@ -104,9 +104,9 @@ use crate::RUNTIME;
|
|||
use futures::future;
|
||||
use futures::prelude::*;
|
||||
use gst::ErrorMessage;
|
||||
#[cfg(feature = "whip")]
|
||||
#[cfg(any(feature = "whip", feature = "whep"))]
|
||||
use reqwest::header::HeaderMap;
|
||||
#[cfg(feature = "whip")]
|
||||
#[cfg(any(feature = "whip", feature = "whep"))]
|
||||
use reqwest::redirect::Policy;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
|
@ -239,7 +239,7 @@ where
|
|||
res
|
||||
}
|
||||
|
||||
#[cfg(feature = "whip")]
|
||||
#[cfg(any(feature = "whip", feature = "whep"))]
|
||||
pub fn parse_redirect_location(
|
||||
headermap: &HeaderMap,
|
||||
old_url: &reqwest::Url,
|
||||
|
@ -280,13 +280,13 @@ pub fn parse_redirect_location(
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "whip")]
|
||||
#[cfg(any(feature = "whip", feature = "whep"))]
|
||||
pub fn build_reqwest_client(pol: Policy) -> reqwest::Client {
|
||||
let client_builder = reqwest::Client::builder();
|
||||
client_builder.redirect(pol).build().unwrap()
|
||||
}
|
||||
|
||||
#[cfg(feature = "whip")]
|
||||
#[cfg(any(feature = "whip", feature = "whep"))]
|
||||
pub fn set_ice_servers(
|
||||
webrtcbin: &gst::Element,
|
||||
headermap: &HeaderMap,
|
||||
|
|
|
@ -586,7 +586,7 @@ impl Session {
|
|||
.map(|codec| {
|
||||
let name = &codec.name;
|
||||
|
||||
let (media, clock_rate, pt) = if codec.stream_type == gst::StreamType::AUDIO {
|
||||
let (media, clock_rate, pt) = if codec.stream_type == gst::StreamType::AUDIO {
|
||||
("audio", 48000, 96)
|
||||
} else {
|
||||
//video stream type
|
||||
|
@ -740,7 +740,7 @@ impl Session {
|
|||
|
||||
filtered_s.extend(s.iter().filter_map(|(key, value)| {
|
||||
if key.starts_with("extmap-") {
|
||||
return Some((key, value.to_owned()));
|
||||
return Some((key, value.to_owned()));
|
||||
}
|
||||
|
||||
None
|
||||
|
@ -774,7 +774,6 @@ impl Session {
|
|||
}
|
||||
|
||||
webrtcbin.emit_by_name::<()>("set-remote-description", &[&answer, &None::<gst::Promise>]);
|
||||
|
||||
}
|
||||
|
||||
fn handle_offer(
|
||||
|
@ -1707,3 +1706,57 @@ pub(super) mod livekit {
|
|||
type ParentType = crate::webrtcsrc::BaseWebRTCSrc;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "whep")]
|
||||
pub(super) mod whep {
|
||||
use super::*;
|
||||
use crate::whep_signaller::WhepClientSignaller;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct WhepClientSrc {}
|
||||
|
||||
impl ObjectImpl for WhepClientSrc {
|
||||
fn constructed(&self) {
|
||||
self.parent_constructed();
|
||||
let element = self.obj();
|
||||
let ws = element
|
||||
.upcast_ref::<crate::webrtcsrc::BaseWebRTCSrc>()
|
||||
.imp();
|
||||
|
||||
let _ = ws.set_signaller(WhepClientSignaller::default().upcast());
|
||||
|
||||
let obj = &*self.obj();
|
||||
|
||||
obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE);
|
||||
obj.set_element_flags(gst::ElementFlags::SOURCE);
|
||||
}
|
||||
}
|
||||
|
||||
impl GstObjectImpl for WhepClientSrc {}
|
||||
|
||||
impl BinImpl for WhepClientSrc {}
|
||||
|
||||
impl ElementImpl for WhepClientSrc {
|
||||
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
|
||||
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
|
||||
gst::subclass::ElementMetadata::new(
|
||||
"WhepClientSrc",
|
||||
"Source/Network/WebRTC",
|
||||
"WebRTC source element using WHEP Client as the signaller",
|
||||
"Sanchayan Maity <sanchayan@asymptotic.io>",
|
||||
)
|
||||
});
|
||||
|
||||
Some(&*ELEMENT_METADATA)
|
||||
}
|
||||
}
|
||||
|
||||
impl BaseWebRTCSrcImpl for WhepClientSrc {}
|
||||
|
||||
#[glib::object_subclass]
|
||||
impl ObjectSubclass for WhepClientSrc {
|
||||
const NAME: &'static str = "GstWhepClientSrc";
|
||||
type Type = crate::webrtcsrc::WhepClientSrc;
|
||||
type ParentType = crate::webrtcsrc::BaseWebRTCSrc;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -59,6 +59,11 @@ glib::wrapper! {
|
|||
pub struct LiveKitWebRTCSrc(ObjectSubclass<imp::livekit::LiveKitWebRTCSrc>) @extends BaseWebRTCSrc, gst::Bin, gst::Element, gst::Object, gst::ChildProxy;
|
||||
}
|
||||
|
||||
#[cfg(feature = "whep")]
|
||||
glib::wrapper! {
|
||||
pub struct WhepClientSrc(ObjectSubclass<imp::whep::WhepClientSrc>) @extends BaseWebRTCSrc, gst::Bin, gst::Element, gst::Object, @implements gst::URIHandler, gst::ChildProxy;
|
||||
}
|
||||
|
||||
glib::wrapper! {
|
||||
pub struct WebRTCSrcPad(ObjectSubclass<pad::WebRTCSrcPad>) @extends gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object;
|
||||
}
|
||||
|
@ -139,5 +144,13 @@ pub fn register(plugin: Option<&gst::Plugin>) -> Result<(), glib::BoolError> {
|
|||
LiveKitWebRTCSrc::static_type(),
|
||||
)?;
|
||||
|
||||
#[cfg(feature = "whep")]
|
||||
gst::Element::register(
|
||||
plugin,
|
||||
"whepclientsrc",
|
||||
gst::Rank::PRIMARY,
|
||||
WhepClientSrc::static_type(),
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
600
net/webrtc/src/whep_signaller/client.rs
Normal file
600
net/webrtc/src/whep_signaller/client.rs
Normal file
|
@ -0,0 +1,600 @@
|
|||
// Copyright (C) 2022, Asymptotic Inc.
|
||||
// Author: Sanchayan Maity <sanchayan@asymptotic.io>
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
|
||||
// If a copy of the MPL was not distributed with this file, You can obtain one at
|
||||
// <https://mozilla.org/MPL/2.0/>.
|
||||
//
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
use crate::signaller::{Signallable, SignallableImpl};
|
||||
use crate::utils::{
|
||||
build_reqwest_client, parse_redirect_location, set_ice_servers, wait, wait_async, WaitError,
|
||||
};
|
||||
use crate::RUNTIME;
|
||||
use async_recursion::async_recursion;
|
||||
use bytes::Bytes;
|
||||
use futures::future;
|
||||
use gst::glib::RustClosure;
|
||||
use gst::{glib, prelude::*, subclass::prelude::*};
|
||||
use gst_sdp::*;
|
||||
use gst_webrtc::*;
|
||||
use once_cell::sync::Lazy;
|
||||
use reqwest::header::{HeaderMap, HeaderValue};
|
||||
use reqwest::StatusCode;
|
||||
use std::sync::Mutex;
|
||||
|
||||
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
||||
gst::DebugCategory::new(
|
||||
"whep-client-signaller",
|
||||
gst::DebugColorFlags::empty(),
|
||||
Some("WHEP Client Signaller"),
|
||||
)
|
||||
});
|
||||
|
||||
const MAX_REDIRECTS: u8 = 10;
|
||||
const DEFAULT_TIMEOUT: u32 = 15;
|
||||
const SESSION_ID: &str = "whep-client";
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct Settings {
|
||||
whep_endpoint: Option<String>,
|
||||
auth_token: Option<String>,
|
||||
use_link_headers: bool,
|
||||
timeout: u32,
|
||||
}
|
||||
|
||||
#[allow(clippy::derivable_impls)]
|
||||
impl Default for Settings {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
whep_endpoint: None,
|
||||
auth_token: None,
|
||||
use_link_headers: false,
|
||||
timeout: DEFAULT_TIMEOUT,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum State {
|
||||
Stopped,
|
||||
Post { redirects: u8 },
|
||||
Running { whep_resource: String },
|
||||
}
|
||||
|
||||
impl Default for State {
|
||||
fn default() -> Self {
|
||||
Self::Stopped
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WhepClient {
|
||||
settings: Mutex<Settings>,
|
||||
state: Mutex<State>,
|
||||
canceller: Mutex<Option<future::AbortHandle>>,
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
impl Default for WhepClient {
|
||||
fn default() -> Self {
|
||||
// We'll handle redirects manually since the default redirect handler does not
|
||||
// reuse the authentication token on the redirected server
|
||||
let pol = reqwest::redirect::Policy::none();
|
||||
let client = build_reqwest_client(pol);
|
||||
|
||||
Self {
|
||||
settings: Mutex::new(Settings::default()),
|
||||
state: Mutex::new(State::default()),
|
||||
canceller: Mutex::new(None),
|
||||
client,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ObjectImpl for WhepClient {
|
||||
fn properties() -> &'static [glib::ParamSpec] {
|
||||
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
|
||||
vec![
|
||||
glib::ParamSpecString::builder("whep-endpoint")
|
||||
.nick("WHEP Endpoint")
|
||||
.blurb("The WHEP server endpoint to POST SDP offer to.")
|
||||
.build(),
|
||||
glib::ParamSpecBoolean::builder("use-link-headers")
|
||||
.nick("Use Link Headers")
|
||||
.blurb("Use link headers to configure STUN/TURN servers if present in WHEP endpoint response.")
|
||||
.build(),
|
||||
glib::ParamSpecString::builder("auth-token")
|
||||
.nick("Authorization Token")
|
||||
.blurb("Authentication token to use, will be sent in the HTTP Header as 'Bearer <auth-token>'")
|
||||
.build(),
|
||||
glib::ParamSpecUInt::builder("timeout")
|
||||
.nick("Timeout")
|
||||
.blurb("Value in seconds to timeout WHEP endpoint requests (0 = No timeout).")
|
||||
.maximum(3600)
|
||||
.default_value(DEFAULT_TIMEOUT)
|
||||
.readwrite()
|
||||
.build(),
|
||||
]
|
||||
});
|
||||
PROPERTIES.as_ref()
|
||||
}
|
||||
|
||||
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
|
||||
match pspec.name() {
|
||||
"whep-endpoint" => {
|
||||
let mut settings = self.settings.lock().unwrap();
|
||||
settings.whep_endpoint = value.get().expect("WHEP endpoint should be a string");
|
||||
}
|
||||
"use-link-headers" => {
|
||||
let mut settings = self.settings.lock().unwrap();
|
||||
settings.use_link_headers = value
|
||||
.get()
|
||||
.expect("use-link-headers should be a boolean value");
|
||||
}
|
||||
"auth-token" => {
|
||||
let mut settings = self.settings.lock().unwrap();
|
||||
settings.auth_token = value.get().expect("Auth token should be a string");
|
||||
}
|
||||
"timeout" => {
|
||||
let mut settings = self.settings.lock().unwrap();
|
||||
settings.timeout = value.get().expect("type checked upstream");
|
||||
}
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
}
|
||||
|
||||
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
|
||||
match pspec.name() {
|
||||
"whep-endpoint" => {
|
||||
let settings = self.settings.lock().unwrap();
|
||||
settings.whep_endpoint.to_value()
|
||||
}
|
||||
"use-link-headers" => {
|
||||
let settings = self.settings.lock().unwrap();
|
||||
settings.use_link_headers.to_value()
|
||||
}
|
||||
"auth-token" => {
|
||||
let settings = self.settings.lock().unwrap();
|
||||
settings.auth_token.to_value()
|
||||
}
|
||||
"timeout" => {
|
||||
let settings = self.settings.lock().unwrap();
|
||||
settings.timeout.to_value()
|
||||
}
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[glib::object_subclass]
|
||||
impl ObjectSubclass for WhepClient {
|
||||
const NAME: &'static str = "GstWhepClientSignaller";
|
||||
type Type = super::WhepClientSignaller;
|
||||
type ParentType = glib::Object;
|
||||
type Interfaces = (Signallable,);
|
||||
}
|
||||
|
||||
impl WhepClient {
|
||||
fn raise_error(&self, msg: String) {
|
||||
self.obj()
|
||||
.emit_by_name::<()>("error", &[&format!("Error: {msg}")]);
|
||||
}
|
||||
|
||||
fn handle_future_error(&self, err: WaitError) {
|
||||
match err {
|
||||
WaitError::FutureAborted => {
|
||||
gst::warning!(CAT, imp: self, "Future aborted")
|
||||
}
|
||||
WaitError::FutureError(err) => self.raise_error(err.to_string()),
|
||||
};
|
||||
}
|
||||
|
||||
fn sdp_message_parse(&self, sdp_bytes: Bytes, _webrtcbin: &gst::Element) {
|
||||
let sdp = match sdp_message::SDPMessage::parse_buffer(&sdp_bytes) {
|
||||
Ok(sdp) => sdp,
|
||||
Err(_) => {
|
||||
self.raise_error("Could not parse answer SDP".to_string());
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let remote_sdp = WebRTCSessionDescription::new(WebRTCSDPType::Answer, sdp);
|
||||
|
||||
self.obj()
|
||||
.emit_by_name::<()>("session-description", &[&SESSION_ID, &remote_sdp]);
|
||||
}
|
||||
|
||||
async fn parse_endpoint_response(
|
||||
&self,
|
||||
sess_desc: WebRTCSessionDescription,
|
||||
resp: reqwest::Response,
|
||||
redirects: u8,
|
||||
webrtcbin: gst::Element,
|
||||
) {
|
||||
let endpoint;
|
||||
let use_link_headers;
|
||||
|
||||
{
|
||||
let settings = self.settings.lock().unwrap();
|
||||
endpoint =
|
||||
reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str()).unwrap();
|
||||
use_link_headers = settings.use_link_headers;
|
||||
drop(settings);
|
||||
}
|
||||
|
||||
match resp.status() {
|
||||
StatusCode::OK | StatusCode::NO_CONTENT => {
|
||||
gst::info!(CAT, imp: self, "SDP offer successfully send");
|
||||
}
|
||||
|
||||
StatusCode::CREATED => {
|
||||
gst::debug!(CAT, imp: self, "Response headers: {:?}", resp.headers());
|
||||
|
||||
if use_link_headers {
|
||||
if let Err(e) = set_ice_servers(&webrtcbin, resp.headers()) {
|
||||
self.raise_error(e.to_string());
|
||||
return;
|
||||
};
|
||||
}
|
||||
|
||||
/* See section 4.2 of the WHEP specification */
|
||||
let location = match resp.headers().get(reqwest::header::LOCATION) {
|
||||
Some(location) => location,
|
||||
None => {
|
||||
self.raise_error(
|
||||
"Location header field should be present for WHEP resource URL"
|
||||
.to_string(),
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let location = match location.to_str() {
|
||||
Ok(loc) => loc,
|
||||
Err(e) => {
|
||||
self.raise_error(format!("Failed to convert location to string: {e}"));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let url = reqwest::Url::parse(endpoint.as_str()).unwrap();
|
||||
|
||||
gst::debug!(CAT, imp: self, "WHEP resource: {:?}", location);
|
||||
|
||||
let url = match url.join(location) {
|
||||
Ok(joined_url) => joined_url,
|
||||
Err(err) => {
|
||||
self.raise_error(format!("URL join operation failed: {err:?}"));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match resp.bytes().await {
|
||||
Ok(ans_bytes) => {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
*state = match *state {
|
||||
State::Post { redirects: _r } => State::Running {
|
||||
whep_resource: url.to_string(),
|
||||
},
|
||||
_ => {
|
||||
self.raise_error("Expected to be in POST state".to_string());
|
||||
return;
|
||||
}
|
||||
};
|
||||
drop(state);
|
||||
|
||||
self.sdp_message_parse(ans_bytes, &webrtcbin)
|
||||
}
|
||||
Err(err) => self.raise_error(err.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
status if status.is_redirection() => {
|
||||
if redirects < MAX_REDIRECTS {
|
||||
match parse_redirect_location(resp.headers(), &endpoint) {
|
||||
Ok(redirect_url) => {
|
||||
{
|
||||
let mut state = self.state.lock().unwrap();
|
||||
*state = match *state {
|
||||
State::Post { redirects: _r } => State::Post {
|
||||
redirects: redirects + 1,
|
||||
},
|
||||
/*
|
||||
* As per section 4.6 of the specification, redirection is
|
||||
* not required to be supported for the PATCH and DELETE
|
||||
* requests to the final WHEP resource URL. Only the initial
|
||||
* POST request may support redirection.
|
||||
*/
|
||||
State::Running { .. } => {
|
||||
self.raise_error(
|
||||
"Unexpected redirection in RUNNING state".to_string(),
|
||||
);
|
||||
return;
|
||||
}
|
||||
State::Stopped => unreachable!(),
|
||||
};
|
||||
drop(state);
|
||||
}
|
||||
|
||||
gst::warning!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"Redirecting endpoint to {}",
|
||||
redirect_url.as_str()
|
||||
);
|
||||
|
||||
self.do_post(sess_desc, webrtcbin, redirect_url).await
|
||||
}
|
||||
Err(e) => self.raise_error(e.to_string()),
|
||||
}
|
||||
} else {
|
||||
self.raise_error("Too many redirects. Unable to connect.".to_string());
|
||||
}
|
||||
}
|
||||
|
||||
s => {
|
||||
match resp.bytes().await {
|
||||
Ok(r) => {
|
||||
let res = r.escape_ascii().to_string();
|
||||
|
||||
// FIXME: Check and handle 'Retry-After' header in case of server error
|
||||
self.raise_error(format!("Unexpected response: {} - {}", s.as_str(), res));
|
||||
}
|
||||
Err(err) => self.raise_error(err.to_string()),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn whep_offer(&self, webrtcbin: gst::Element) {
|
||||
let local_desc =
|
||||
webrtcbin.property::<Option<WebRTCSessionDescription>>("local-description");
|
||||
|
||||
let sess_desc = match local_desc {
|
||||
None => {
|
||||
self.raise_error("Local description is not set".to_string());
|
||||
return;
|
||||
}
|
||||
Some(mut local_desc) => {
|
||||
local_desc.set_type(WebRTCSDPType::Offer);
|
||||
local_desc
|
||||
}
|
||||
};
|
||||
|
||||
gst::debug!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"Sending offer SDP: {:?}",
|
||||
sess_desc.sdp().as_text()
|
||||
);
|
||||
|
||||
let timeout;
|
||||
let endpoint;
|
||||
|
||||
{
|
||||
let settings = self.settings.lock().unwrap();
|
||||
timeout = settings.timeout;
|
||||
endpoint =
|
||||
reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str()).unwrap();
|
||||
drop(settings);
|
||||
}
|
||||
|
||||
if let Err(e) = wait_async(
|
||||
&self.canceller,
|
||||
self.do_post(sess_desc, webrtcbin, endpoint),
|
||||
timeout,
|
||||
)
|
||||
.await
|
||||
{
|
||||
self.handle_future_error(e);
|
||||
}
|
||||
}
|
||||
|
||||
#[async_recursion]
|
||||
async fn do_post(
|
||||
&self,
|
||||
offer: WebRTCSessionDescription,
|
||||
webrtcbin: gst::Element,
|
||||
endpoint: reqwest::Url,
|
||||
) {
|
||||
let auth_token;
|
||||
|
||||
{
|
||||
let settings = self.settings.lock().unwrap();
|
||||
auth_token = settings.auth_token.clone();
|
||||
drop(settings);
|
||||
}
|
||||
|
||||
let sdp = offer.sdp();
|
||||
let body = sdp.as_text().unwrap();
|
||||
|
||||
gst::info!(CAT, imp: self, "Using endpoint {}", endpoint.as_str());
|
||||
|
||||
let mut headermap = HeaderMap::new();
|
||||
headermap.insert(
|
||||
reqwest::header::CONTENT_TYPE,
|
||||
HeaderValue::from_static("application/sdp"),
|
||||
);
|
||||
|
||||
if let Some(token) = auth_token.as_ref() {
|
||||
let bearer_token = "Bearer ".to_owned() + token.as_str();
|
||||
headermap.insert(
|
||||
reqwest::header::AUTHORIZATION,
|
||||
HeaderValue::from_str(bearer_token.as_str())
|
||||
.expect("Failed to set auth token to header"),
|
||||
);
|
||||
}
|
||||
|
||||
gst::debug!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"Url for HTTP POST request: {}",
|
||||
endpoint.as_str()
|
||||
);
|
||||
|
||||
let resp = self
|
||||
.client
|
||||
.request(reqwest::Method::POST, endpoint.clone())
|
||||
.headers(headermap)
|
||||
.body(body)
|
||||
.send()
|
||||
.await;
|
||||
|
||||
match resp {
|
||||
Ok(r) => {
|
||||
#[allow(unused_mut)]
|
||||
let mut redirects;
|
||||
|
||||
{
|
||||
let state = self.state.lock().unwrap();
|
||||
redirects = match *state {
|
||||
State::Post { redirects } => redirects,
|
||||
_ => {
|
||||
self.raise_error("Trying to do POST in unexpected state".to_string());
|
||||
return;
|
||||
}
|
||||
};
|
||||
drop(state);
|
||||
}
|
||||
|
||||
self.parse_endpoint_response(offer, r, redirects, webrtcbin)
|
||||
.await
|
||||
}
|
||||
Err(err) => self.raise_error(err.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
fn terminate_session(&self) {
|
||||
let settings = self.settings.lock().unwrap();
|
||||
let state = self.state.lock().unwrap();
|
||||
let timeout = settings.timeout;
|
||||
|
||||
let resource_url = match *state {
|
||||
State::Running {
|
||||
whep_resource: ref whep_resource_url,
|
||||
} => whep_resource_url.clone(),
|
||||
_ => {
|
||||
self.raise_error("Terminated in unexpected state".to_string());
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
drop(state);
|
||||
|
||||
let mut headermap = HeaderMap::new();
|
||||
if let Some(token) = &settings.auth_token {
|
||||
let bearer_token = "Bearer ".to_owned() + token.as_str();
|
||||
headermap.insert(
|
||||
reqwest::header::AUTHORIZATION,
|
||||
HeaderValue::from_str(bearer_token.as_str())
|
||||
.expect("Failed to set auth token to header"),
|
||||
);
|
||||
}
|
||||
|
||||
drop(settings);
|
||||
|
||||
gst::debug!(CAT, imp: self, "DELETE request on {}", resource_url);
|
||||
|
||||
/* DELETE request goes to the WHEP resource URL. See section 3 of the specification. */
|
||||
let client = build_reqwest_client(reqwest::redirect::Policy::default());
|
||||
let future = async {
|
||||
client
|
||||
.delete(resource_url.clone())
|
||||
.headers(headermap)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|err| {
|
||||
gst::error_msg!(
|
||||
gst::ResourceError::Failed,
|
||||
["DELETE request failed {}: {:?}", resource_url, err]
|
||||
)
|
||||
})
|
||||
};
|
||||
|
||||
let res = wait(&self.canceller, future, timeout);
|
||||
match res {
|
||||
Ok(r) => {
|
||||
gst::debug!(CAT, imp: self, "Response to DELETE : {}", r.status());
|
||||
}
|
||||
Err(e) => match e {
|
||||
WaitError::FutureAborted => {
|
||||
gst::warning!(CAT, imp: self, "DELETE request aborted")
|
||||
}
|
||||
WaitError::FutureError(e) => {
|
||||
gst::error!(CAT, imp: self, "Error on DELETE request : {}", e)
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
pub fn on_webrtcbin_ready(&self) -> RustClosure {
|
||||
glib::closure!(|signaller: &super::WhepClientSignaller,
|
||||
_consumer_identifier: &str,
|
||||
webrtcbin: &gst::Element| {
|
||||
let obj_weak = signaller.downgrade();
|
||||
|
||||
webrtcbin.connect_notify(Some("ice-gathering-state"), move |webrtcbin, _pspec| {
|
||||
let Some(obj) = obj_weak.upgrade() else {
|
||||
return;
|
||||
};
|
||||
|
||||
let state = webrtcbin.property::<WebRTCICEGatheringState>("ice-gathering-state");
|
||||
|
||||
match state {
|
||||
WebRTCICEGatheringState::Gathering => {
|
||||
gst::info!(CAT, obj: obj, "ICE gathering started");
|
||||
}
|
||||
WebRTCICEGatheringState::Complete => {
|
||||
gst::info!(CAT, obj: obj, "ICE gathering complete");
|
||||
|
||||
let webrtcbin = webrtcbin.clone();
|
||||
|
||||
RUNTIME.spawn(async move { obj.imp().whep_offer(webrtcbin).await });
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
});
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl SignallableImpl for WhepClient {
|
||||
fn start(&self) {
|
||||
if self.settings.lock().unwrap().whep_endpoint.is_none() {
|
||||
self.raise_error("WHEP endpoint URL must be set".to_string());
|
||||
return;
|
||||
}
|
||||
|
||||
let mut state = self.state.lock().unwrap();
|
||||
*state = State::Post { redirects: 0 };
|
||||
drop(state);
|
||||
|
||||
self.obj()
|
||||
.emit_by_name::<()>("session-started", &[&SESSION_ID, &SESSION_ID]);
|
||||
self.obj().emit_by_name::<()>(
|
||||
"session-requested",
|
||||
&[
|
||||
&SESSION_ID,
|
||||
&SESSION_ID,
|
||||
&None::<gst_webrtc::WebRTCSessionDescription>,
|
||||
],
|
||||
);
|
||||
}
|
||||
|
||||
fn stop(&self) {}
|
||||
|
||||
fn end_session(&self, _session_id: &str) {
|
||||
// Interrupt requests in progress, if any
|
||||
if let Some(canceller) = &*self.canceller.lock().unwrap() {
|
||||
canceller.abort();
|
||||
}
|
||||
|
||||
let state = self.state.lock().unwrap();
|
||||
if let State::Running { .. } = *state {
|
||||
// Release server-side resources
|
||||
drop(state);
|
||||
self.terminate_session();
|
||||
}
|
||||
}
|
||||
}
|
21
net/webrtc/src/whep_signaller/mod.rs
Normal file
21
net/webrtc/src/whep_signaller/mod.rs
Normal file
|
@ -0,0 +1,21 @@
|
|||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
use crate::signaller::Signallable;
|
||||
use gst::{glib, prelude::ObjectExt, subclass::prelude::ObjectSubclassIsExt};
|
||||
|
||||
mod client;
|
||||
|
||||
glib::wrapper! {
|
||||
pub struct WhepClientSignaller(ObjectSubclass<client::WhepClient>) @implements Signallable;
|
||||
}
|
||||
|
||||
unsafe impl Send for WhepClientSignaller {}
|
||||
unsafe impl Sync for WhepClientSignaller {}
|
||||
|
||||
impl Default for WhepClientSignaller {
|
||||
fn default() -> Self {
|
||||
let sig: WhepClientSignaller = glib::Object::new();
|
||||
sig.connect_closure("webrtcbin-ready", false, sig.imp().on_webrtcbin_ready());
|
||||
sig
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue