webrtcsink/signaller: don't call signals while having state/settings locked

It is a recipe for deadlocks if the signal callback calls back into
webrtcsink in some way.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1141>
This commit is contained in:
Matthew Waters 2023-03-29 12:13:42 +11:00
parent 1c61e46f37
commit 4f4e5f0d75

View file

@ -16,7 +16,6 @@ use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::ops::Mul;
use std::sync::Mutex;
use std::sync::MutexGuard;
use super::homegrown_cc::CongestionController;
use super::{WebRTCSinkCongestionControl, WebRTCSinkError, WebRTCSinkMitigationMode};
@ -796,17 +795,10 @@ impl State {
}
}
fn maybe_start_signaller(
&mut self,
element: &super::WebRTCSink,
settings: &MutexGuard<Settings>,
) {
if self.signaller_state == SignallerState::Stopped
fn should_start_signaller(&mut self, element: &super::WebRTCSink) -> bool {
self.signaller_state == SignallerState::Stopped
&& element.current_state() >= gst::State::Paused
&& self.codec_discovery_done
{
settings.signaller.start();
}
}
}
@ -1524,14 +1516,17 @@ impl WebRTCSink {
session_id: &str,
) {
let settings = self.settings.lock().unwrap();
let signaller = settings.signaller.clone();
drop(settings);
let state = self.state.lock().unwrap();
if let Some(session) = state.sessions.get(session_id) {
session
.webrtcbin
.emit_by_name::<()>("set-local-description", &[&offer, &None::<gst::Promise>]);
drop(state);
settings.signaller.send_sdp(session_id, &offer);
signaller.send_sdp(session_id, &offer);
}
}
@ -1542,6 +1537,8 @@ impl WebRTCSink {
session_id: &str,
) {
let settings = self.settings.lock().unwrap();
let signaller = settings.signaller.clone();
drop(settings);
let mut state = self.state.lock().unwrap();
if let Some(mut session) = state.sessions.remove(session_id) {
@ -1560,13 +1557,12 @@ impl WebRTCSink {
.webrtcbin
.emit_by_name::<()>("set-local-description", &[&answer, &None::<gst::Promise>]);
settings.signaller.send_sdp(session_id, &answer);
let session_id = session.id.clone();
state.sessions.insert(session.id.clone(), session);
drop(state);
drop(settings);
signaller.send_sdp(&session_id, &answer);
self.on_remote_description_set(element, session_id)
}
@ -1877,9 +1873,9 @@ impl WebRTCSink {
candidate: String,
) {
let settings = self.settings.lock().unwrap();
settings
.signaller
.add_ice(&session_id, &candidate, sdp_m_line_index, None)
let signaller = settings.signaller.clone();
drop(settings);
signaller.add_ice(&session_id, &candidate, sdp_m_line_index, None)
}
/// Called by the signaller to add a new session
@ -2239,6 +2235,7 @@ impl WebRTCSink {
let this = element.imp();
let settings_clone = this.settings.lock().unwrap().clone();
let signaller = settings_clone.signaller.clone();
let mut webrtc_pads: HashMap<u32, WebRTCPad> = HashMap::new();
let mut codecs: BTreeMap<i32, Codec> = BTreeMap::new();
@ -2304,6 +2301,7 @@ impl WebRTCSink {
.await;
}
}
drop(settings_clone);
{
let mut state = this.state.lock().unwrap();
@ -2331,9 +2329,7 @@ impl WebRTCSink {
// so that application code can create data channels at the correct
// moment.
element.emit_by_name::<()>("consumer-added", &[&peer_id, &webrtcbin]);
settings_clone
.signaller
.emit_by_name::<()>("consumer-added", &[&peer_id, &webrtcbin]);
signaller.emit_by_name::<()>("consumer-added", &[&peer_id, &webrtcbin]);
// We don't connect to on-negotiation-needed, this in order to call the above
// signal without holding the state lock:
@ -2369,6 +2365,8 @@ impl WebRTCSink {
signal: bool,
) -> Result<(), WebRTCSinkError> {
let settings = self.settings.lock().unwrap();
let signaller = settings.signaller.clone();
drop(settings);
let mut state = self.state.lock().unwrap();
if !state.sessions.contains_key(session_id) {
@ -2376,9 +2374,7 @@ impl WebRTCSink {
}
if let Some(session) = state.end_session(session_id) {
let signaller = settings.signaller.clone();
drop(state);
drop(settings);
signaller
.emit_by_name::<()>("consumer-removed", &[&session.peer_id, &session.webrtcbin]);
if signal {
@ -2975,7 +2971,12 @@ impl WebRTCSink {
let settings = this.settings.lock().unwrap();
let mut state = this.state.lock().unwrap();
state.codec_discovery_done = true;
state.maybe_start_signaller(&element, &settings);
let signaller = settings.signaller.clone();
drop(settings);
if state.should_start_signaller(&element) {
drop(state);
signaller.start();
}
}
_ => (),
}
@ -3497,8 +3498,13 @@ impl ElementImpl for WebRTCSink {
}
gst::StateChange::PausedToPlaying => {
let settings = self.settings.lock().unwrap();
let signaller = settings.signaller.clone();
drop(settings);
let mut state = self.state.lock().unwrap();
state.maybe_start_signaller(&element, &settings);
if state.should_start_signaller(&element) {
drop(state);
signaller.start();
}
}
_ => (),
}