From c021e2b69f905a53c6eb80a7a52cf84e309b2649 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Mon, 18 Sep 2023 16:59:27 +0200 Subject: [PATCH] net/webrtcsink: don't miss ice candidates During `on_remote_description_set()` processing, current session is removed from the sessions `HashMap`. If an ice candidate is submitted to `handle_ice()` by that time, the session can't be found and the candidate is ignored. This commit wraps the Session in the sessions `HashMap` so an entry is kept while `on_remote_description_set()` is running. Incoming candidates received by `handle_ice()` will be processed immediately or enqueued and handled when the session is restored by `on_remote_description_set()`. Part-of: --- net/webrtc/src/webrtcsink/imp.rs | 195 ++++++++++++++++++++++++++++--- 1 file changed, 176 insertions(+), 19 deletions(-) diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index f6b83ae6..ca4b393c 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -277,10 +277,141 @@ struct SignallerSignals { shutdown: glib::SignalHandlerId, } +struct IceCandidate { + sdp_m_line_index: u32, + candidate: String, +} + +/// Wrapper around `Session`. +/// +/// This makes it possible for the `Session` to be taken out of the `State`, +/// without removing the entry in the `sessions` `HashMap`, thus allowing +/// the `State` lock to be released, e.g. before calling a `Signal`. +/// +/// Taking the `Session`, replaces it with a placeholder which can enqueue +/// items (currently ICE candidates) received while the `Session` is taken. +/// In which case, the enqueued items will be processed when the `Session` is +/// restored. +enum SessionWrapper { + /// The `Session` is available in the `SessionWrapper`. + InPlace(Session), + /// The `Session` was taken out the `SessionWrapper`. + Taken(Vec), +} + +impl SessionWrapper { + /// Unwraps a reference to the `Session` of this `SessionWrapper`. + /// + /// # Panics + /// + /// Panics is the `Session` was taken. + fn unwrap(&self) -> &Session { + match self { + SessionWrapper::InPlace(session) => session, + _ => panic!("Session is not In Place"), + } + } + + /// Unwraps a mutable reference to the `Session` of this `SessionWrapper`. + /// + /// # Panics + /// + /// Panics is the `Session` was taken. + fn unwrap_mut(&mut self) -> &mut Session { + match self { + SessionWrapper::InPlace(session) => session, + _ => panic!("Session is not In Place"), + } + } + + /// Consumes the `SessionWrapper`, returning the wrapped `Session`. + /// + /// # Panics + /// + /// Panics is the `Session` was taken. + fn into_inner(self) -> Session { + match self { + SessionWrapper::InPlace(session) => session, + _ => panic!("Session is not In Place"), + } + } + + /// Takes the `Session` out of this `SessionWrapper`, leaving it in the `Taken` state. + /// + /// # Panics + /// + /// Panics is the `Session` was taken. + fn take(&mut self) -> Session { + use SessionWrapper::*; + match std::mem::replace(self, Taken(Vec::new())) { + InPlace(session) => session, + _ => panic!("Session is not In Place"), + } + } + + /// Restores a `Session` to this `SessionWrapper`. + /// + /// Processes any pending items enqueued while the `Session` was taken. + /// + /// # Panics + /// + /// Panics is the `Session` is already in place. + fn restore(&mut self, session: Session) { + let SessionWrapper::Taken(ref cands) = self else { + panic!("Session is already in place"); + }; + + if !cands.is_empty() { + gst::trace!( + CAT, + "handling {} pending ice candidates for session {}", + cands.len(), + session.id, + ); + for cand in cands { + session.webrtcbin.emit_by_name::<()>( + "add-ice-candidate", + &[&cand.sdp_m_line_index, &cand.candidate], + ); + } + } + + *self = SessionWrapper::InPlace(session); + } + + /// Adds an ICE candidate to this `SessionWrapper`. + /// + /// If the `Session` is in place, the ICE candidate is added immediately, + /// otherwise, it will be added when the `Session` is restored. + fn add_ice_candidate(&mut self, session_id: &str, sdp_m_line_index: u32, candidate: &str) { + match self { + SessionWrapper::InPlace(session) => { + gst::trace!(CAT, "adding ice candidate for session {session_id}"); + session + .webrtcbin + .emit_by_name::<()>("add-ice-candidate", &[&sdp_m_line_index, &candidate]); + } + SessionWrapper::Taken(cands) => { + gst::trace!(CAT, "queuing ice candidate for session {session_id}"); + cands.push(IceCandidate { + sdp_m_line_index, + candidate: candidate.to_string(), + }); + } + } + } +} + +impl From for SessionWrapper { + fn from(session: Session) -> Self { + SessionWrapper::InPlace(session) + } +} + /* Our internal state */ struct State { signaller_state: SignallerState, - sessions: HashMap, + sessions: HashMap, codecs: BTreeMap, /// Used to abort codec discovery codecs_abort_handles: Vec, @@ -957,7 +1088,8 @@ impl State { } fn end_session(&mut self, session_id: &str) -> Option { - if let Some(mut session) = self.sessions.remove(session_id) { + if let Some(session) = self.sessions.remove(session_id) { + let mut session = session.into_inner(); self.finalize_session(&mut session); Some(session) } else { @@ -1674,6 +1806,7 @@ impl BaseWebRTCSink { if let Some(session) = state.sessions.get(session_id) { session + .unwrap() .webrtcbin .emit_by_name::<()>("set-local-description", &[&offer, &None::]); drop(state); @@ -1693,7 +1826,8 @@ impl BaseWebRTCSink { drop(settings); let mut state = self.state.lock().unwrap(); - if let Some(mut session) = state.sessions.remove(session_id) { + if let Some(session) = state.sessions.get_mut(session_id) { + let mut session = session.take(); let sdp = answer.sdp(); session.sdp = Some(sdp.to_owned()); @@ -1705,13 +1839,19 @@ impl BaseWebRTCSink { .and_then(|format| format.parse::().ok()); } + // FIXME I think the intention was to drop(state) and re-acquire the lock after the call + session .webrtcbin .emit_by_name::<()>("set-local-description", &[&answer, &None::]); let session_id = session.id.clone(); - state.sessions.insert(session.id.clone(), session); + if let Some(session_wrapper) = state.sessions.get_mut(&session_id) { + session_wrapper.restore(session); + } else { + gst::warning!(CAT, "Session {session_id} was removed"); + } drop(state); signaller.send_sdp(&session_id, &answer); @@ -1776,6 +1916,7 @@ impl BaseWebRTCSink { }); session + .unwrap() .webrtcbin .emit_by_name::<()>("create-answer", &[&None::, &promise]); } @@ -1910,6 +2051,7 @@ impl BaseWebRTCSink { gst::debug!(CAT, obj: element, "Negotiating for session {}", session_id); if let Some(session) = state.sessions.get(session_id) { + let session = session.unwrap(); gst::trace!(CAT, "WebRTC pads: {:?}", session.webrtc_pads); if let Some(offer) = offer { @@ -2211,7 +2353,7 @@ impl BaseWebRTCSink { let state = this.state.lock().unwrap(); if let Some(session) = state.sessions.get(&session_id_clone) { - for webrtc_pad in session.webrtc_pads.values() { + for webrtc_pad in session.unwrap().webrtc_pads.values() { if let Some(srcpad) = webrtc_pad.pad.peer() { srcpad.send_event( gst_video::UpstreamForceKeyUnitEvent::builder() @@ -2277,7 +2419,7 @@ impl BaseWebRTCSink { let element = element.expect("on-new-ssrc emitted when webrtcsink has been disposed?"); let mut state = element.imp().state.lock().unwrap(); if let Some(session) = state.sessions.get_mut(&session_id_str) { - + let session = session.unwrap_mut(); if session.stats_sigid.is_none() { let session_id_str = session_id_str.clone(); let element = element.downgrade(); @@ -2357,7 +2499,9 @@ impl BaseWebRTCSink { } }); - state.sessions.insert(session_id.to_string(), session); + state + .sessions + .insert(session_id.to_string(), session.into()); let mut streams: Vec = state.streams.values().cloned().collect(); @@ -2437,12 +2581,12 @@ impl BaseWebRTCSink { { let mut state = this.state.lock().unwrap(); - if let Some(mut session) = state.sessions.remove(&session_id) { + if let Some(session) = state.sessions.get_mut(&session_id) { + let session = session.unwrap_mut(); session.webrtc_pads = webrtc_pads; if offer_clone.is_some() { session.codecs = Some(codecs); } - state.sessions.insert(session_id.to_owned(), session); } } @@ -2532,6 +2676,7 @@ impl BaseWebRTCSink { ) { let mut state = element.imp().state.lock().unwrap(); if let Some(session) = state.sessions.get_mut(session_id) { + let session = session.unwrap_mut(); if let Some(congestion_controller) = session.congestion_controller.as_mut() { congestion_controller.loss_control(element, stats, &mut session.encoders); } @@ -2552,6 +2697,7 @@ impl BaseWebRTCSink { let mut state = element.imp().state.lock().unwrap(); if let Some(session) = state.sessions.get_mut(&session_id) { + let session = session.unwrap_mut(); if let Some(congestion_controller) = session.congestion_controller.as_mut() { congestion_controller.delay_control(&element, stats, &mut session.encoders,); } @@ -2573,7 +2719,7 @@ impl BaseWebRTCSink { let mut state = element.imp().state.lock().unwrap(); if let Some(session) = state.sessions.get_mut(session_id) { - session.rtprtxsend = Some(rtprtxsend); + session.unwrap_mut().rtprtxsend = Some(rtprtxsend); } } @@ -2582,6 +2728,8 @@ impl BaseWebRTCSink { let mut state = element.imp().state.lock().unwrap(); if let Some(session) = state.sessions.get_mut(session_id) { + let session = session.unwrap_mut(); + let n_encoders = session.encoders.len(); let fec_ratio = { @@ -2616,7 +2764,9 @@ impl BaseWebRTCSink { let mut remove = false; let codecs = state.codecs.clone(); - if let Some(mut session) = state.sessions.remove(&session_id) { + if let Some(session) = state.sessions.get_mut(&session_id) { + let mut session = session.take(); + for webrtc_pad in session.webrtc_pads.clone().values() { let transceiver = webrtc_pad .pad @@ -2691,14 +2841,17 @@ impl BaseWebRTCSink { })); if remove { + let _ = state.sessions.remove(&session_id); state.finalize_session(&mut session); drop(state); let settings = self.settings.lock().unwrap(); let signaller = settings.signaller.clone(); drop(settings); signaller.end_session(&session_id); + } else if let Some(session_wrapper) = state.sessions.get_mut(&session_id) { + session_wrapper.restore(session); } else { - state.sessions.insert(session.id.clone(), session); + gst::warning!(CAT, "Session {session_id} was removed"); } } } @@ -2711,7 +2864,7 @@ impl BaseWebRTCSink { _sdp_mid: Option, candidate: &str, ) { - let state = self.state.lock().unwrap(); + let mut state = self.state.lock().unwrap(); let sdp_m_line_index = match sdp_m_line_index { Some(sdp_m_line_index) => sdp_m_line_index, @@ -2721,11 +2874,8 @@ impl BaseWebRTCSink { } }; - if let Some(session) = state.sessions.get(session_id) { - gst::trace!(CAT, "adding ice candidate for session {}", session_id); - session - .webrtcbin - .emit_by_name::<()>("add-ice-candidate", &[&sdp_m_line_index, &candidate]); + if let Some(session_wrapper) = state.sessions.get_mut(session_id) { + session_wrapper.add_ice_candidate(session_id, sdp_m_line_index, candidate); } else { gst::warning!(CAT, "No consumer with ID {session_id}"); } @@ -2740,6 +2890,8 @@ impl BaseWebRTCSink { let mut state = self.state.lock().unwrap(); if let Some(session) = state.sessions.get_mut(session_id) { + let session = session.unwrap_mut(); + let sdp = desc.sdp(); session.sdp = Some(sdp.to_owned()); @@ -3100,7 +3252,12 @@ impl BaseWebRTCSink { .unwrap() .sessions .iter() - .map(|(name, consumer)| (name.as_str(), consumer.gather_stats().to_send_value())), + .map(|(name, consumer)| { + ( + name.as_str(), + consumer.unwrap().gather_stats().to_send_value(), + ) + }), ) }