webrtcsink: wait for Sessions to end

`State::finalize_session()` asynchronously sets the Session pipeline to Null.
In some cases, sessions `webrtcbin` could terminate their transition to Null
after `webrtcsink` had reached Null.

This commit adds a set of `finalizing_sessions`. When the finalization process
starts, the session is added to the set. After `webrtcbin` has reached the Null
state, the session is removed from the set and a condvar is notified.

In `unprepare`, `webrtcsink` loops until the `finalizing_sessions` set is
empty, awaiting for the condvar to be notified when it's not.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1221>
This commit is contained in:
François Laignel 2023-05-23 18:51:11 +02:00
parent b68e2a1ed0
commit 9a59763df1

View file

@ -15,14 +15,14 @@ use anyhow::{anyhow, Error};
use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::ops::Mul;
use std::sync::Mutex;
use std::sync::{Arc, Condvar, Mutex};
use super::homegrown_cc::CongestionController;
use super::{WebRTCSinkCongestionControl, WebRTCSinkError, WebRTCSinkMitigationMode};
use crate::aws_kvs_signaller::AwsKvsSignaller;
use crate::signaller::{prelude::*, Signallable, Signaller, WebRTCSignallerRole};
use crate::RUNTIME;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashSet};
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
@ -219,6 +219,7 @@ struct State {
mids: HashMap<String, String>,
signaller_signals: Option<SignallerSignals>,
stats_collection_handle: Option<tokio::task::JoinHandle<()>>,
finalizing_sessions: Arc<(Mutex<HashSet<String>>, Condvar)>,
}
fn create_navigation_event(sink: &super::BaseWebRTCSink, msg: &str) {
@ -334,6 +335,7 @@ impl Default for State {
mids: HashMap::new(),
signaller_signals: Default::default(),
stats_collection_handle: None,
finalizing_sessions: Arc::new((Mutex::new(HashSet::new()), Condvar::new())),
}
}
}
@ -790,8 +792,18 @@ impl State {
session.links.remove(ssrc);
}
session.pipeline.call_async(|pipeline| {
let finalizing_sessions = self.finalizing_sessions.clone();
let session_id = session.id.clone();
let (sessions, _cvar) = &*finalizing_sessions;
sessions.lock().unwrap().insert(session_id.clone());
session.pipeline.call_async(move |pipeline| {
let _ = pipeline.set_state(gst::State::Null);
let (sessions, cvar) = &*finalizing_sessions;
let mut sessions = sessions.lock().unwrap();
sessions.remove(&session_id);
cvar.notify_one();
});
}
@ -1425,6 +1437,14 @@ impl BaseWebRTCSink {
gst::info!(CAT, "Stopped signaller");
}
let finalizing_sessions = self.state.lock().unwrap().finalizing_sessions.clone();
let (sessions, cvar) = &*finalizing_sessions;
let mut sessions = sessions.lock().unwrap();
while !sessions.is_empty() {
sessions = cvar.wait(sessions).unwrap();
}
Ok(())
}