net/webrtc: sink: abort stats collection before stopping the Signaller

In some rare cases, the webrtc-test entered a deadlock while executing
`WebRTCSink::unprepare`. Attaching gdb to a blocked instance showed:

* `gstrswebrtc::signaller:👿:Signaller::stop()` parked, waiting for a
  `Condvar` in `Signaller::stop()`. This was most likely awaiting for the
  receive task to complete while it was locked in `element.end_session()`.
  This code path is triggered from `unprepare` with the `State` `Mutex` locked.
* `webrtcsink:👿:WebRtcSink::process_stats` waiting for a contended `Mutex`,
  which is also the `State` `Mutex`. This prevented completion of the signal
  `gst_webrtc_bin_get_stats`.

This commit aborts the task in charge of periodically collecting stats and
ensures any remaining iteration completes before requesting the Signaller to
stop.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1176>
This commit is contained in:
François Laignel 2023-04-29 20:49:34 +02:00
parent 2cb1fd7fc1
commit 0e6b9df932

View file

@ -195,6 +195,7 @@ struct State {
streams: HashMap<String, InputStream>,
navigation_handler: Option<NavigationEventHandler>,
mids: HashMap<String, String>,
stats_collection_handle: Option<tokio::task::JoinHandle<()>>,
}
fn create_navigation_event(sink: &super::WebRTCSink, msg: &str) {
@ -308,6 +309,7 @@ impl Default for State {
streams: HashMap::new(),
navigation_handler: None,
mids: HashMap::new(),
stats_collection_handle: None,
}
}
}
@ -1236,6 +1238,14 @@ impl WebRTCSink {
});
}
if let Some(stats_collection_handle) = state.stats_collection_handle.take() {
stats_collection_handle.abort();
// Make sure any currently running stats collection task completes
drop(state);
let _ = RUNTIME.block_on(stats_collection_handle);
state = self.state.lock().unwrap();
}
state.maybe_stop_signaller(element);
state.codec_discovery_done = false;
@ -1940,7 +1950,7 @@ impl WebRTCSink {
let element_clone = element.downgrade();
let webrtcbin = session.webrtcbin.downgrade();
RUNTIME.spawn(async move {
state.stats_collection_handle = Some(RUNTIME.spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_millis(100));
loop {
@ -1956,7 +1966,7 @@ impl WebRTCSink {
break;
}
}
});
}));
if remove {
state.finalize_session(element, &mut session, true);