webrtchttp: Do not block webrtcbin signal handlers for sending candidates

While at it, drop the OPTIONS request in WHIP sink. This was not really
required. See section 4.4 of the spec
https://www.ietf.org/archive/id/draft-ietf-wish-whip-01.html#name-stun-turn-server-configurat

Also introduce a new error type and distinguish between a future being
aborted or returning an error.

We call abort only during shutdown and hence except for the DELETE
resource request being aborted, other waits on future should not
be fatal.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/949>
This commit is contained in:
Sanchayan Maity 2022-11-22 16:13:35 +05:30
parent db39370701
commit 0b1b8b91b9
3 changed files with 280 additions and 233 deletions

View file

@ -2,13 +2,18 @@ use futures::future;
use futures::prelude::*;
use gst::{prelude::*, ErrorMessage};
use once_cell::sync::Lazy;
use parse_link_header;
use reqwest::header::HeaderMap;
use reqwest::redirect::Policy;
use std::sync::Mutex;
use std::time::Duration;
use tokio::runtime;
#[derive(Debug)]
pub enum WaitError {
FutureAborted,
FutureError(ErrorMessage),
}
pub static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
runtime::Builder::new_multi_thread()
.enable_all()
@ -21,8 +26,8 @@ pub static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
pub fn wait<F, T>(
canceller: &Mutex<Option<future::AbortHandle>>,
future: F,
timeout: u32
) -> Result<T, ErrorMessage>
timeout: u32,
) -> Result<T, WaitError>
where
F: Send + Future<Output = Result<T, ErrorMessage>>,
T: Send + 'static,
@ -31,10 +36,10 @@ where
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
if canceller_guard.is_some() {
return Err(gst::error_msg!(
return Err(WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Old Canceller should not exist"]
));
)));
}
canceller_guard.replace(abort_handle);
@ -60,15 +65,12 @@ where
match future::Abortable::new(future, abort_registration).await {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(gst::error_msg!(
Ok(Err(err)) => Err(WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Future resolved with an error {}", err.to_string()]
)),
["Future resolved with an error {:?}", err]
))),
Err(future::Aborted) => Err(gst::error_msg!(
gst::ResourceError::Failed,
["Canceller called before future resolved"]
)),
Err(future::Aborted) => Err(WaitError::FutureAborted),
}
};

View file

@ -7,7 +7,9 @@
//
// SPDX-License-Identifier: MPL-2.0
use crate::utils::{build_reqwest_client, parse_redirect_location, set_ice_servers, wait};
use crate::utils::{
build_reqwest_client, parse_redirect_location, set_ice_servers, wait, WaitError,
};
use crate::GstRsWebRTCICETransportPolicy;
use bytes::Bytes;
use futures::future;
@ -18,6 +20,7 @@ use once_cell::sync::Lazy;
use reqwest::header::{HeaderMap, HeaderValue};
use reqwest::StatusCode;
use std::sync::Mutex;
use std::thread::{spawn, JoinHandle};
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
@ -74,11 +77,17 @@ impl Default for Settings {
}
}
#[derive(Debug, Clone)]
#[derive(Debug)]
enum State {
Stopped,
Post { redirects: u8 },
Running { whep_resource: String },
Post {
redirects: u8,
thread_handle: Option<JoinHandle<()>>,
},
Running {
whep_resource: String,
thread_handle: Option<JoinHandle<()>>,
},
}
impl Default for State {
@ -203,9 +212,7 @@ impl ElementImpl for WhepSrc {
}
}
let ret = self.parent_change_state(transition);
ret
self.parent_change_state(transition)
}
}
@ -423,7 +430,17 @@ impl WhepSrc {
WebRTCICEGatheringState::Complete => {
gst::info!(CAT, imp: self_, "ICE gathering completed");
self_.whep_offer();
let mut state = self_.state.lock().unwrap();
let self_ref = self_.ref_counted();
gst::debug!(CAT, imp: self_, "Spawning thread to send offer");
let handle = spawn(move || self_ref.whep_offer());
*state = State::Post {
redirects: 0,
thread_handle: Some(handle),
};
drop(state);
}
_ => (),
}
@ -544,7 +561,10 @@ impl WhepSrc {
drop(settings);
let mut state = self_.state.lock().unwrap();
*state = State::Post { redirects: 0 };
*state = State::Post {
redirects: 0,
thread_handle: None,
};
drop(state);
if let Err(e) = self_.initial_post_request(endpoint.unwrap()) {
@ -562,11 +582,8 @@ impl WhepSrc {
}
fn sdp_message_parse(&self, sdp_bytes: Bytes) -> Result<(), ErrorMessage> {
let sdp = sdp_message::SDPMessage::parse_buffer(&sdp_bytes).or_else(|_| {
Err(gst::error_msg!(
gst::ResourceError::Failed,
["Could not parse answer SDP"]
))
let sdp = sdp_message::SDPMessage::parse_buffer(&sdp_bytes).map_err(|_| {
gst::error_msg!(gst::ResourceError::Failed, ["Could not parse answer SDP"])
})?;
let remote_sdp = WebRTCSessionDescription::new(WebRTCSDPType::Answer, sdp);
@ -660,8 +677,20 @@ impl WhepSrc {
})?;
let mut state = self.state.lock().unwrap();
*state = State::Running {
whep_resource: url.to_string(),
*state = match *state {
State::Post {
redirects: _r,
thread_handle: ref mut h,
} => State::Running {
whep_resource: url.to_string(),
thread_handle: h.take(),
},
_ => {
return Err(gst::error_msg!(
gst::ResourceError::Failed,
["Expected to be in POST state"]
));
}
};
drop(state);
@ -674,29 +703,39 @@ impl WhepSrc {
})
};
let ans_bytes = wait(&self.canceller, future, timeout)?;
self.sdp_message_parse(ans_bytes)
match wait(&self.canceller, future, timeout) {
Ok(ans_bytes) => self.sdp_message_parse(ans_bytes),
Err(err) => match err {
WaitError::FutureAborted => Ok(()),
WaitError::FutureError(e) => Err(e),
},
}
}
status if status.is_redirection() => {
if redirects < MAX_REDIRECTS {
let mut state = self.state.lock().unwrap();
/*
* As per section 4.6 of the specification, redirection is
* not required to be supported for the PATCH and DELETE
* requests to the final WHEP resource URL. Only the initial
* POST request may support redirection.
*/
if let State::Running { .. } = *state {
return Err(gst::error_msg!(
gst::ResourceError::Failed,
["Unexpected redirection in RUNNING state"]
));
}
*state = State::Post {
redirects: redirects + 1,
*state = match *state {
State::Post {
redirects: _r,
thread_handle: ref mut h,
} => State::Post {
redirects: redirects + 1,
thread_handle: h.take(),
},
/*
* As per section 4.6 of the specification, redirection is
* not required to be supported for the PATCH and DELETE
* requests to the final WHEP resource URL. Only the initial
* POST request may support redirection.
*/
State::Running { .. } => {
return Err(gst::error_msg!(
gst::ResourceError::Failed,
["Unexpected redirection in RUNNING state"]
));
}
State::Stopped => unreachable!(),
};
drop(state);
@ -845,8 +884,8 @@ impl WhepSrc {
gst::info!(CAT, imp: self, "WHEP endpoint url: {}", endpoint.as_str());
let _ = match *state {
State::Post { redirects } => redirects,
match *state {
State::Post { .. } => (),
_ => {
return Err(gst::error_msg!(
gst::ResourceError::Failed,
@ -964,20 +1003,36 @@ impl WhepSrc {
})
};
let resp = wait(&self.canceller, future, timeout)?;
self.parse_endpoint_response(endpoint, 0, resp)
match wait(&self.canceller, future, timeout) {
Ok(resp) => self.parse_endpoint_response(endpoint, 0, resp),
Err(err) => match err {
WaitError::FutureAborted => Ok(()),
WaitError::FutureError(e) => Err(e),
},
}
}
fn terminate_session(&self) {
let settings = self.settings.lock().unwrap();
let state = self.state.lock().unwrap();
let mut state = self.state.lock().unwrap();
let timeout = settings.timeout;
let resource_url;
let resource_url = match *state {
(*state, resource_url) = match *state {
State::Running {
whep_resource: ref whep_resource_url,
} => whep_resource_url.clone(),
thread_handle: ref mut h,
} => {
if let Some(th) = h.take() {
match th.join() {
Ok(_) => {
gst::debug!(CAT, imp: self, "Send offer thread joined successfully");
}
Err(e) => gst::error!(CAT, imp: self, "Failed to join thread: {:?}", e),
}
}
(State::Stopped, whep_resource_url.clone())
}
_ => {
element_imp_error!(
self,
@ -1025,9 +1080,14 @@ impl WhepSrc {
Ok(r) => {
gst::debug!(CAT, imp: self, "Response to DELETE : {}", r.status());
}
Err(e) => {
gst::error!(CAT, imp: self, "Error on DELETE request : {}", e);
}
Err(e) => match e {
WaitError::FutureAborted => {
gst::warning!(CAT, imp: self, "DELETE request aborted")
}
WaitError::FutureError(e) => {
gst::error!(CAT, imp: self, "Error on DELETE request : {}", e)
}
},
};
}
}

View file

@ -7,7 +7,9 @@
//
// SPDX-License-Identifier: MPL-2.0
use crate::utils::{build_reqwest_client, parse_redirect_location, set_ice_servers, wait};
use crate::utils::{
build_reqwest_client, parse_redirect_location, set_ice_servers, wait, WaitError,
};
use crate::GstRsWebRTCICETransportPolicy;
use futures::future;
use gst::element_imp_error;
@ -22,6 +24,7 @@ use reqwest::header::HeaderMap;
use reqwest::header::HeaderValue;
use reqwest::StatusCode;
use std::sync::Mutex;
use std::thread::{spawn, JoinHandle};
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new("whipsink", gst::DebugColorFlags::empty(), Some("WHIP Sink"))
@ -58,12 +61,17 @@ impl Default for Settings {
}
}
#[derive(Debug, Clone)]
#[derive(Debug)]
enum State {
Stopped,
Options { redirects: u8 },
Post { redirects: u8 },
Running { whip_resource_url: String },
Post {
redirects: u8,
thread_handle: Option<JoinHandle<()>>,
},
Running {
whip_resource_url: String,
thread_handle: Option<JoinHandle<()>>,
},
}
impl Default for State {
@ -191,9 +199,7 @@ impl ElementImpl for WhipSink {
}
}
let ret = self.parent_change_state(transition);
ret
self.parent_change_state(transition)
}
}
@ -209,7 +215,7 @@ impl ObjectImpl for WhipSink {
glib::ParamSpecBoolean::builder("use-link-headers")
.nick("Use Link Headers")
.blurb("Use link headers to configure ice-servers from the WHIP server response to the POST or OPTIONS request.
.blurb("Use link headers to configure ice-servers from the WHIP server response to the POST request.
If set to TRUE and the WHIP server returns valid ice-servers,
this property overrides the ice-servers values set using the stun-server and turn-server properties.")
.mutable_ready()
@ -363,7 +369,17 @@ impl ObjectImpl for WhipSink {
WebRTCICEGatheringState::Complete => {
gst::info!(CAT, imp: self_, "ICE gathering completed");
self_.send_offer();
let mut state = self_.state.lock().unwrap();
let self_ref = self_.ref_counted();
gst::debug!(CAT, imp: self_, "Spawning thread to send offer");
let handle = spawn(move || self_ref.send_offer());
*state = State::Post {
redirects: 0,
thread_handle: Some(handle),
};
drop(state);
}
_ => (),
}
@ -403,17 +419,6 @@ impl ObjectImpl for WhipSink {
}
drop(settings);
let mut state = whipsink.state.lock().unwrap();
*state = State::Options { redirects: 0 };
drop(state);
if let Err(e) = whipsink.lookup_ice_servers(endpoint.unwrap()) {
gst::element_error!(
ele,
gst::ResourceError::Failed,
["Error in 'lookup_ice_servers' - {}", e.to_string()]
);
}
// Promise for 'create-offer' signal emitted to webrtcbin
// Closure is called when the promise is fulfilled
@ -487,115 +492,6 @@ impl ObjectSubclass for WhipSink {
}
impl WhipSink {
fn lookup_ice_servers(&self, endpoint: reqwest::Url) -> Result<(), ErrorMessage> {
let settings = self.settings.lock().unwrap();
let state = self.state.lock().unwrap();
let timeout = settings.timeout;
let redirects = match *state {
State::Options { redirects } => redirects,
_ => {
return Err(gst::error_msg!(
gst::ResourceError::Failed,
["Trying to do OPTIONS in unexpected state"]
));
}
};
drop(state);
if !settings.use_link_headers {
// We're not configured to use OPTIONS, so we're done
return Ok(());
}
// We'll handle redirects manually since the default redirect handler does not
// reuse the authentication token on the redirected server
let pol = reqwest::redirect::Policy::none();
let client = build_reqwest_client(pol);
let mut headermap = HeaderMap::new();
if let Some(token) = &settings.auth_token {
let bearer_token = "Bearer ".to_owned() + token;
drop(settings);
headermap.insert(
reqwest::header::AUTHORIZATION,
HeaderValue::from_str(&bearer_token)
.expect("Auth token should only contain characters valid for an HTTP header"),
);
}
let future = async {
client
.request(reqwest::Method::OPTIONS, endpoint.as_ref())
.headers(headermap)
.send()
.await
.map_err(|err| {
gst::error_msg!(
gst::ResourceError::Failed,
["OPTIONS request failed: {:?}", err]
)
})
};
let resp = wait(&self.canceller, future, timeout)?;
match resp.status() {
StatusCode::NO_CONTENT => {
set_ice_servers(&self.webrtcbin, resp.headers())?;
Ok(())
}
status if status.is_redirection() => {
if redirects < MAX_REDIRECTS {
let mut state = self.state.lock().unwrap();
*state = State::Options {
redirects: redirects + 1,
};
drop(state);
match parse_redirect_location(resp.headers(), &endpoint) {
Ok(redirect_url) => {
gst::debug!(
CAT,
imp: self,
"Redirecting endpoint to {}",
redirect_url.as_str()
);
self.lookup_ice_servers(redirect_url)
}
Err(e) => Err(e),
}
} else {
Err(gst::error_msg!(
gst::ResourceError::Failed,
["Too many redirects. Unable to connect to do OPTIONS request"]
))
}
}
status => {
let future = async {
resp.bytes().await.map_err(|err| {
gst::error_msg!(
gst::ResourceError::Failed,
["Failed to get response body: {:?}", err]
)
})
};
let res = wait(&self.canceller, future, timeout);
Err(gst::error_msg!(
gst::ResourceError::Failed,
[
"lookup_ice_servers - Unexpected response {} {:?}",
status,
res
]
))
}
}
}
fn send_offer(&self) {
let settings = self.settings.lock().unwrap();
@ -612,7 +508,10 @@ impl WhipSink {
drop(settings);
let mut state = self.state.lock().unwrap();
*state = State::Post { redirects: 0 };
*state = State::Post {
redirects: 0,
thread_handle: None,
};
drop(state);
let local_desc = self
@ -639,12 +538,7 @@ impl WhipSink {
);
match self.do_post(offer_sdp, endpoint.unwrap()) {
Ok(answer) => {
self.webrtcbin.emit_by_name::<()>(
"set-remote-description",
&[&answer, &None::<gst::Promise>],
);
}
Ok(_) => (),
Err(e) => {
element_imp_error!(
self,
@ -659,13 +553,16 @@ impl WhipSink {
&self,
offer: gst_webrtc::WebRTCSessionDescription,
endpoint: reqwest::Url,
) -> Result<gst_webrtc::WebRTCSessionDescription, gst::ErrorMessage> {
) -> Result<(), gst::ErrorMessage> {
let settings = self.settings.lock().unwrap();
let state = self.state.lock().unwrap();
let timeout = settings.timeout;
let redirects = match *state {
State::Post { redirects } => redirects,
State::Post {
redirects,
thread_handle: ref _h,
} => redirects,
_ => {
return Err(gst::error_msg!(
gst::ResourceError::Failed,
@ -692,7 +589,6 @@ impl WhipSink {
if let Some(token) = &settings.auth_token {
let bearer_token = "Bearer ".to_owned() + token;
drop(settings);
headermap.insert(
reqwest::header::AUTHORIZATION,
HeaderValue::from_str(bearer_token.as_str())
@ -715,14 +611,36 @@ impl WhipSink {
})
};
let resp = wait(&self.canceller, future, timeout)?;
drop(settings);
let res = match resp.status() {
match wait(&self.canceller, future, timeout) {
Ok(resp) => self.parse_endpoint_response(offer, endpoint, redirects, resp),
Err(err) => match err {
WaitError::FutureAborted => Ok(()),
WaitError::FutureError(e) => Err(e),
},
}
}
fn parse_endpoint_response(
&self,
offer: gst_webrtc::WebRTCSessionDescription,
endpoint: reqwest::Url,
redirects: u8,
resp: reqwest::Response,
) -> Result<(), ErrorMessage> {
match resp.status() {
StatusCode::OK | StatusCode::CREATED => {
// XXX: this is a long function, we should factor it out.
// Note: Not taking care of 'Link' headers in POST response
// because we do a mandatory OPTIONS request before 'create-offer'
// and update the ICE servers from response to OPTIONS
let settings = self
.settings
.lock()
.expect("Failed to acquire settings lock");
let timeout = settings.timeout;
if settings.use_link_headers {
set_ice_servers(&self.webrtcbin, resp.headers())?;
}
drop(settings);
// Get the url of the resource from 'location' header.
// The resource created is expected be a relative path
@ -762,8 +680,20 @@ impl WhipSink {
})?;
let mut state = self.state.lock().unwrap();
*state = State::Running {
whip_resource_url: url.to_string(),
*state = match *state {
State::Post {
redirects: _r,
thread_handle: ref mut h,
} => State::Running {
whip_resource_url: url.to_string(),
thread_handle: h.take(),
},
_ => {
return Err(gst::error_msg!(
gst::ResourceError::Failed,
["Expected to be in POST state"]
));
}
};
drop(state);
@ -776,29 +706,56 @@ impl WhipSink {
})
};
let ans_bytes = wait(&self.canceller, future, timeout)?;
match wait(&self.canceller, future, timeout) {
Ok(ans_bytes) => match sdp_message::SDPMessage::parse_buffer(&ans_bytes) {
Ok(ans_sdp) => {
let answer = gst_webrtc::WebRTCSessionDescription::new(
gst_webrtc::WebRTCSDPType::Answer,
ans_sdp,
);
self.webrtcbin.emit_by_name::<()>(
"set-remote-description",
&[&answer, &None::<gst::Promise>],
);
Ok(())
}
match sdp_message::SDPMessage::parse_buffer(&ans_bytes) {
Ok(ans_sdp) => {
let answer = gst_webrtc::WebRTCSessionDescription::new(
gst_webrtc::WebRTCSDPType::Answer,
ans_sdp,
);
Ok(answer)
}
Err(e) => Err(gst::error_msg!(
gst::ResourceError::Failed,
["Could not parse answer SDP: {}", e]
)),
Err(e) => Err(gst::error_msg!(
gst::ResourceError::Failed,
["Could not parse answer SDP: {}", e]
)),
},
Err(err) => match err {
WaitError::FutureAborted => Ok(()),
WaitError::FutureError(e) => Err(e),
},
}
}
s if s.is_redirection() => {
if redirects < MAX_REDIRECTS {
let mut state = self.state.lock().unwrap();
*state = State::Post {
redirects: redirects + 1,
*state = match *state {
State::Post {
redirects: _r,
thread_handle: ref mut h,
} => State::Post {
redirects: redirects + 1,
thread_handle: h.take(),
},
/*
* As per section 4.6 of the specification, redirection is
* not required to be supported for the PATCH and DELETE
* requests to the final WHEP resource URL. Only the initial
* POST request may support redirection.
*/
State::Running { .. } => {
return Err(gst::error_msg!(
gst::ResourceError::Failed,
["Unexpected redirection in RUNNING state"]
));
}
State::Stopped => unreachable!(),
};
drop(state);
@ -812,7 +769,7 @@ impl WhipSink {
);
self.do_post(offer, redirect_url)
}
Err(e) => return Err(e),
Err(e) => Err(e),
}
} else {
Err(gst::error_msg!(
@ -832,6 +789,13 @@ impl WhipSink {
})
};
let settings = self
.settings
.lock()
.expect("Failed to acquire settings lock");
let timeout = settings.timeout;
drop(settings);
let resp = wait(&self.canceller, future, timeout)
.map(|x| x.escape_ascii().to_string())
.unwrap_or_else(|_| "(no further details)".to_string());
@ -842,24 +806,40 @@ impl WhipSink {
["Server returned error: {} - {}", s.as_str(), resp]
))
}
};
res
}
}
fn terminate_session(&self) {
let settings = self.settings.lock().unwrap();
let state = self.state.lock().unwrap();
let mut state = self.state.lock().unwrap();
let timeout = settings.timeout;
let resource_url = match *state {
let resource_url;
(*state, resource_url) = match *state {
State::Running {
ref whip_resource_url,
} => whip_resource_url.clone(),
whip_resource_url: ref resource_url,
thread_handle: ref mut h,
} => {
if let Some(th) = h.take() {
match th.join() {
Ok(_) => {
gst::debug!(CAT, imp: self, "Send offer thread joined successfully");
}
Err(e) => gst::error!(CAT, imp: self, "Failed to join thread: {:?}", e),
}
}
(State::Stopped, resource_url.clone())
}
_ => {
gst::error!(CAT, imp: self, "Terminated in unexpected state");
element_imp_error!(
self,
gst::ResourceError::Failed,
["Terminated in unexpected state"]
);
return;
}
};
drop(state);
let mut headermap = HeaderMap::new();
@ -893,9 +873,14 @@ impl WhipSink {
Ok(r) => {
gst::debug!(CAT, imp: self, "Response to DELETE : {}", r.status());
}
Err(e) => {
gst::error!(CAT, imp: self, "{}", e);
}
Err(e) => match e {
WaitError::FutureAborted => {
gst::warning!(CAT, imp: self, "DELETE request aborted")
}
WaitError::FutureError(e) => {
gst::error!(CAT, imp: self, "Error on DELETE request : {}", e)
}
},
};
}
}