net/webrtc: sink: abort stats collection before stopping the Signaller

In some rare cases, the webrtc-test entered a deadlock while executing
`WebRTCSink::unprepare`. Attaching gdb to a blocked instance showed:

* `gstrswebrtc::signaller:👿:Signaller::stop()` parked, waiting for a
  `Condvar` in `Signaller::stop()`. This was most likely awaiting for the
  receive task to complete while it was locked in `element.end_session()`.
  This code path is triggered from `unprepare` with the `State` `Mutex` locked.
* `webrtcsink:👿:WebRtcSink::process_stats` waiting for a contended `Mutex`,
  which is also the `State` `Mutex`. This prevented completion of the signal
  `gst_webrtc_bin_get_stats`.

This commit aborts the task in charge of periodically collecting stats and
ensures any remaining iteration completes before requesting the Signaller to
stop.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1202>
This commit is contained in:
François Laignel 2023-04-29 20:49:34 +02:00
parent eca269cbf2
commit dc5ddd3022

View file

@ -218,6 +218,7 @@ struct State {
navigation_handler: Option<NavigationEventHandler>,
mids: HashMap<String, String>,
signaller_signals: Option<SignallerSignals>,
stats_collection_handle: Option<tokio::task::JoinHandle<()>>,
}
fn create_navigation_event(sink: &super::BaseWebRTCSink, msg: &str) {
@ -332,6 +333,7 @@ impl Default for State {
navigation_handler: None,
mids: HashMap::new(),
signaller_signals: Default::default(),
stats_collection_handle: None,
}
}
}
@ -1392,7 +1394,14 @@ impl BaseWebRTCSink {
state.signaller_state = SignallerState::Stopped;
}
let stats_collection_handle = state.stats_collection_handle.take();
drop(state);
if let Some(stats_collection_handle) = stats_collection_handle {
stats_collection_handle.abort();
let _ = RUNTIME.block_on(stats_collection_handle);
}
for session in sessions {
signaller.end_session(&session.id);
}
@ -2539,7 +2548,7 @@ impl BaseWebRTCSink {
let element_clone = element.downgrade();
let webrtcbin = session.webrtcbin.downgrade();
let session_id_clone = session_id.clone();
RUNTIME.spawn(async move {
state.stats_collection_handle = Some(RUNTIME.spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_millis(100));
loop {
@ -2555,7 +2564,7 @@ impl BaseWebRTCSink {
break;
}
}
});
}));
if remove {
state.finalize_session(&mut session);