From 9a59763df1287d8b8a0ddfb3b8df711fac5a26ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Tue, 23 May 2023 18:51:11 +0200 Subject: [PATCH] webrtcsink: wait for Sessions to end `State::finalize_session()` asynchronously sets the Session pipeline to Null. In some cases, sessions `webrtcbin` could terminate their transition to Null after `webrtcsink` had reached Null. This commit adds a set of `finalizing_sessions`. When the finalization process starts, the session is added to the set. After `webrtcbin` has reached the Null state, the session is removed from the set and a condvar is notified. In `unprepare`, `webrtcsink` loops until the `finalizing_sessions` set is empty, awaiting for the condvar to be notified when it's not. Part-of: --- net/webrtc/src/webrtcsink/imp.rs | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index 1be780d9..42b36474 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -15,14 +15,14 @@ use anyhow::{anyhow, Error}; use once_cell::sync::Lazy; use std::collections::HashMap; use std::ops::Mul; -use std::sync::Mutex; +use std::sync::{Arc, Condvar, Mutex}; use super::homegrown_cc::CongestionController; use super::{WebRTCSinkCongestionControl, WebRTCSinkError, WebRTCSinkMitigationMode}; use crate::aws_kvs_signaller::AwsKvsSignaller; use crate::signaller::{prelude::*, Signallable, Signaller, WebRTCSignallerRole}; use crate::RUNTIME; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashSet}; static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( @@ -219,6 +219,7 @@ struct State { mids: HashMap, signaller_signals: Option, stats_collection_handle: Option>, + finalizing_sessions: Arc<(Mutex>, Condvar)>, } fn create_navigation_event(sink: &super::BaseWebRTCSink, msg: &str) { @@ -334,6 +335,7 @@ impl Default for State { mids: HashMap::new(), signaller_signals: Default::default(), stats_collection_handle: None, + finalizing_sessions: Arc::new((Mutex::new(HashSet::new()), Condvar::new())), } } } @@ -790,8 +792,18 @@ impl State { session.links.remove(ssrc); } - session.pipeline.call_async(|pipeline| { + let finalizing_sessions = self.finalizing_sessions.clone(); + let session_id = session.id.clone(); + let (sessions, _cvar) = &*finalizing_sessions; + sessions.lock().unwrap().insert(session_id.clone()); + + session.pipeline.call_async(move |pipeline| { let _ = pipeline.set_state(gst::State::Null); + + let (sessions, cvar) = &*finalizing_sessions; + let mut sessions = sessions.lock().unwrap(); + sessions.remove(&session_id); + cvar.notify_one(); }); } @@ -1425,6 +1437,14 @@ impl BaseWebRTCSink { gst::info!(CAT, "Stopped signaller"); } + let finalizing_sessions = self.state.lock().unwrap().finalizing_sessions.clone(); + + let (sessions, cvar) = &*finalizing_sessions; + let mut sessions = sessions.lock().unwrap(); + while !sessions.is_empty() { + sessions = cvar.wait(sessions).unwrap(); + } + Ok(()) }