From 8236f3e5e765667cb905c1af851201990e6b73a7 Mon Sep 17 00:00:00 2001 From: Thibault Saunier Date: Fri, 18 Nov 2022 21:43:03 -0300 Subject: [PATCH] webrtcsink: Port to the 'webrtcsrc' signaller object/interface With contributions from: Matthew Waters Part-of: --- net/webrtc/src/aws_kvs_signaller/imp.rs | 175 ++-- net/webrtc/src/aws_kvs_signaller/mod.rs | 49 +- net/webrtc/src/lib.rs | 2 +- .../src/{webrtcsrc => }/signaller/iface.rs | 34 +- net/webrtc/src/signaller/imp.rs | 860 ++++++++++-------- net/webrtc/src/signaller/mod.rs | 86 +- net/webrtc/src/webrtcsink/imp.rs | 285 +++--- net/webrtc/src/webrtcsink/mod.rs | 95 +- net/webrtc/src/webrtcsrc/imp.rs | 2 +- net/webrtc/src/webrtcsrc/mod.rs | 13 +- net/webrtc/src/webrtcsrc/signaller/imp.rs | 584 ------------ net/webrtc/src/webrtcsrc/signaller/mod.rs | 46 - 12 files changed, 825 insertions(+), 1406 deletions(-) rename net/webrtc/src/{webrtcsrc => }/signaller/iface.rs (90%) delete mode 100644 net/webrtc/src/webrtcsrc/signaller/imp.rs delete mode 100644 net/webrtc/src/webrtcsrc/signaller/mod.rs diff --git a/net/webrtc/src/aws_kvs_signaller/imp.rs b/net/webrtc/src/aws_kvs_signaller/imp.rs index e08468a2..54f84d0d 100644 --- a/net/webrtc/src/aws_kvs_signaller/imp.rs +++ b/net/webrtc/src/aws_kvs_signaller/imp.rs @@ -1,7 +1,7 @@ // SPDX-License-Identifier: MPL-2.0 use super::protocol as p; -use crate::webrtcsink::WebRTCSink; +use crate::signaller::{Signallable, SignallableImpl}; use crate::RUNTIME; use anyhow::{anyhow, Error}; use async_tungstenite::tungstenite::Message as WsMessage; @@ -84,7 +84,7 @@ pub struct Signaller { } impl Signaller { - fn handle_message(element: &WebRTCSink, msg: String) { + fn handle_message(&self, msg: String) { if let Ok(msg) = serde_json::from_str::(&msg) { match BASE64.decode(&msg.message_payload.into_bytes()) { Ok(payload) => { @@ -98,21 +98,27 @@ impl Signaller { msg.sender_client_id, sdp_msg.sdp ); - if let Err(err) = element.start_session( - &msg.sender_client_id, - &msg.sender_client_id, - Some(&gst_webrtc::WebRTCSessionDescription::new( - gst_webrtc::WebRTCSDPType::Offer, - gst_sdp::SDPMessage::parse_buffer(sdp_msg.sdp.as_bytes()) + self.obj().emit_by_name::<()>( + "session-requested", + &[&msg.sender_client_id, &msg.sender_client_id], + ); + self.obj().emit_by_name::<()>( + "session-description", + &[ + &msg.sender_client_id, + &gst_webrtc::WebRTCSessionDescription::new( + gst_webrtc::WebRTCSDPType::Offer, + gst_sdp::SDPMessage::parse_buffer( + sdp_msg.sdp.as_bytes(), + ) .unwrap(), - )), - ) { - gst::warning!(CAT, obj: element, "{err}"); - } + ), + ], + ); } else { gst::warning!( CAT, - obj: element, + imp: self, "Failed to parse SDP_OFFER: {payload}" ); } @@ -127,18 +133,19 @@ impl Signaller { ice_msg.sdp_m_line_index, ice_msg.sdp_mid ); - if let Err(err) = element.handle_ice( - &msg.sender_client_id, - Some(ice_msg.sdp_m_line_index), - Some(ice_msg.sdp_mid), - &ice_msg.candidate, - ) { - gst::warning!(CAT, obj: element, "{err}"); - } + self.obj().emit_by_name::<()>( + "handle-ice", + &[ + &msg.sender_client_id, + &ice_msg.sdp_m_line_index, + &Some(ice_msg.sdp_mid), + &ice_msg.candidate, + ], + ); } else { gst::warning!( CAT, - obj: element, + imp: self, "Failed to parse ICE_CANDIDATE: {payload}" ); } @@ -146,7 +153,7 @@ impl Signaller { _ => { gst::log!( CAT, - obj: element, + imp: self, "Ignoring unsupported message type {}", msg.message_type ); @@ -156,20 +163,24 @@ impl Signaller { Err(e) => { gst::error!( CAT, - obj: element, + imp: self, "Failed to decode message payload from server: {e}" ); - element.handle_signalling_error( - anyhow!("Failed to decode message payload from server: {e}").into(), + self.obj().emit_by_name::<()>( + "error", + &[&format!( + "{:?}", + anyhow!("Failed to decode message payload from server: {e}") + )], ); } } } else { - gst::log!(CAT, obj: element, "Unknown message from server: [{msg}]"); + gst::log!(CAT, imp: self, "Unknown message from server: [{msg}]"); } } - async fn connect(&self, element: &WebRTCSink) -> Result<(), Error> { + async fn connect(&self) -> Result<(), Error> { let settings = self.settings.lock().unwrap().clone(); let connector = if let Some(path) = settings.cafile { @@ -341,8 +352,7 @@ impl Signaller { .collect(); gst::info!(CAT, "Ice servers: {:?}", ice_servers); - - element.connect_closure( + self.obj().connect_closure( "consumer-added", false, glib::closure!(|_webrtcsink: &gst::Element, @@ -409,7 +419,7 @@ impl Signaller { let (ws, _) = async_tungstenite::tokio::connect_async_with_tls_connector(url, connector).await?; - gst::info!(CAT, obj: element, "connected"); + gst::info!(CAT, imp: self, "connected"); // Channel for asynchronously sending out websocket message let (mut ws_sink, mut ws_stream) = ws.split(); @@ -418,7 +428,7 @@ impl Signaller { // up of messages as with unbounded let (mut _websocket_sender, mut websocket_receiver) = mpsc::channel::(1000); - let element_clone = element.downgrade(); + let imp = self.downgrade(); let ping_timeout = settings.ping_timeout; let send_task_handle = task::spawn(async move { loop { @@ -429,10 +439,10 @@ impl Signaller { .await { Ok(Some(msg)) => { - if let Some(element) = element_clone.upgrade() { + if let Some(imp) = imp.upgrade() { gst::trace!( CAT, - obj: element, + imp: imp, "Sending websocket message {}", serde_json::to_string(&msg).unwrap() ); @@ -450,8 +460,8 @@ impl Signaller { } } - if let Some(element) = element_clone.upgrade() { - gst::info!(CAT, obj: element, "Done sending"); + if let Some(imp) = imp.upgrade() { + gst::info!(CAT, imp: imp, "Done sending"); } ws_sink.send(WsMessage::Close(None)).await?; @@ -460,29 +470,26 @@ impl Signaller { Ok::<(), Error>(()) }); - let element_clone = element.downgrade(); + let imp = self.downgrade(); let receive_task_handle = task::spawn(async move { while let Some(msg) = tokio_stream::StreamExt::next(&mut ws_stream).await { - if let Some(element) = element_clone.upgrade() { + if let Some(imp) = imp.upgrade() { match msg { Ok(WsMessage::Text(msg)) => { gst::trace!(CAT, "received message [{msg}]"); - Signaller::handle_message(&element, msg); + imp.handle_message(msg); } Ok(WsMessage::Close(reason)) => { - gst::info!( - CAT, - obj: element, - "websocket connection closed: {:?}", - reason - ); - element.shutdown(); + gst::info!(CAT, imp: imp, "websocket connection closed: {:?}", reason); + imp.obj().emit_by_name::<()>("shutdown", &[]); break; } Ok(_) => (), Err(err) => { - element - .handle_signalling_error(anyhow!("Error receiving: {err}").into()); + imp.obj().emit_by_name::<()>( + "error", + &[&format!("{:?}", anyhow!("Error receiving: {err}"))], + ); break; } } @@ -491,8 +498,8 @@ impl Signaller { } } - if let Some(element) = element_clone.upgrade() { - gst::info!(CAT, obj: element, "Stopped websocket receiving"); + if let Some(imp) = imp.upgrade() { + gst::info!(CAT, imp: imp, "Stopped websocket receiving"); } }); @@ -503,24 +510,22 @@ impl Signaller { Ok(()) } +} - pub fn start(&self, element: &WebRTCSink) { +impl SignallableImpl for Signaller { + fn start(&self) { let this = self.obj().clone(); - let element_clone = element.clone(); + let imp = self.downgrade(); task::spawn(async move { - let this = this.imp(); - if let Err(err) = this.connect(&element_clone).await { - element_clone.handle_signalling_error(err.into()); + if let Some(imp) = imp.upgrade() { + if let Err(err) = imp.connect().await { + this.emit_by_name::<()>("error", &[&format!("{:?}", anyhow!(err))]); + } } }); } - pub fn handle_sdp( - &self, - element: &WebRTCSink, - session_id: &str, - sdp: &gst_webrtc::WebRTCSessionDescription, - ) { + fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) { let state = self.state.lock().unwrap(); let msg = p::OutgoingMessage { @@ -537,20 +542,22 @@ impl Signaller { }; if let Some(mut sender) = state.websocket_sender.clone() { - let element = element.downgrade(); + let imp = self.downgrade(); RUNTIME.spawn(async move { if let Err(err) = sender.send(msg).await { - if let Some(element) = element.upgrade() { - element.handle_signalling_error(anyhow!("Error: {err}").into()); + if let Some(imp) = imp.upgrade() { + imp.obj().emit_by_name::<()>( + "error", + &[&format!("{:?}", anyhow!("Error: {err}"))], + ); } } }); } } - pub fn handle_ice( + fn add_ice( &self, - element: &WebRTCSink, session_id: &str, candidate: &str, sdp_m_line_index: Option, @@ -573,48 +580,43 @@ impl Signaller { }; if let Some(mut sender) = state.websocket_sender.clone() { - let element = element.downgrade(); + let imp = self.downgrade(); RUNTIME.spawn(async move { if let Err(err) = sender.send(msg).await { - if let Some(element) = element.upgrade() { - element.handle_signalling_error(anyhow!("Error: {err}").into()); + if let Some(imp) = imp.upgrade() { + imp.obj().emit_by_name::<()>( + "error", + &[&format!("{:?}", anyhow!("Error: {err}"))], + ); } } }); } } - pub fn stop(&self, element: &WebRTCSink) { - gst::info!(CAT, obj: element, "Stopping now"); + fn stop(&self) { + gst::info!(CAT, imp: self, "Stopping now"); let mut state = self.state.lock().unwrap(); let send_task_handle = state.send_task_handle.take(); let receive_task_handle = state.receive_task_handle.take(); if let Some(mut sender) = state.websocket_sender.take() { - let element = element.downgrade(); + let imp = self.downgrade(); RUNTIME.block_on(async move { sender.close_channel(); if let Some(handle) = send_task_handle { if let Err(err) = handle.await { - if let Some(element) = element.upgrade() { - gst::warning!( - CAT, - obj: element, - "Error while joining send task: {err}" - ); + if let Some(imp) = imp.upgrade() { + gst::warning!(CAT, imp: imp, "Error while joining send task: {err}"); } } } if let Some(handle) = receive_task_handle { if let Err(err) = handle.await { - if let Some(element) = element.upgrade() { - gst::warning!( - CAT, - obj: element, - "Error while joining receive task: {err}" - ); + if let Some(imp) = imp.upgrade() { + gst::warning!(CAT, imp: imp, "Error while joining receive task: {err}"); } } } @@ -622,8 +624,8 @@ impl Signaller { } } - pub fn end_session(&self, element: &WebRTCSink, session_id: &str) { - gst::info!(CAT, obj: element, "Signalling session {session_id} ended"); + fn end_session(&self, session_id: &str) { + gst::info!(CAT, imp: self, "Signalling session {session_id} ended"); // We can seemingly not do anything beyond that } @@ -634,6 +636,7 @@ impl ObjectSubclass for Signaller { const NAME: &'static str = "GstAwsKvsWebRTCSinkSignaller"; type Type = super::AwsKvsSignaller; type ParentType = glib::Object; + type Interfaces = (Signallable,); } impl ObjectImpl for Signaller { diff --git a/net/webrtc/src/aws_kvs_signaller/mod.rs b/net/webrtc/src/aws_kvs_signaller/mod.rs index b5cb35c3..b4fddbfb 100644 --- a/net/webrtc/src/aws_kvs_signaller/mod.rs +++ b/net/webrtc/src/aws_kvs_signaller/mod.rs @@ -1,63 +1,18 @@ // SPDX-License-Identifier: MPL-2.0 -use crate::webrtcsink::{Signallable, WebRTCSink}; +use crate::signaller::Signallable; use gst::glib; -use gst::subclass::prelude::*; -use std::error::Error; mod imp; mod protocol; glib::wrapper! { - pub struct AwsKvsSignaller(ObjectSubclass); + pub struct AwsKvsSignaller(ObjectSubclass) @implements Signallable; } unsafe impl Send for AwsKvsSignaller {} unsafe impl Sync for AwsKvsSignaller {} -impl Signallable for AwsKvsSignaller { - fn start(&mut self, element: &WebRTCSink) -> Result<(), Box> { - let signaller = self.imp(); - signaller.start(element); - - Ok(()) - } - - fn handle_sdp( - &mut self, - element: &WebRTCSink, - peer_id: &str, - sdp: &gst_webrtc::WebRTCSessionDescription, - ) -> Result<(), Box> { - let signaller = self.imp(); - signaller.handle_sdp(element, peer_id, sdp); - Ok(()) - } - - fn handle_ice( - &mut self, - element: &WebRTCSink, - session_id: &str, - candidate: &str, - sdp_mline_index: Option, - sdp_mid: Option, - ) -> Result<(), Box> { - let signaller = self.imp(); - signaller.handle_ice(element, session_id, candidate, sdp_mline_index, sdp_mid); - Ok(()) - } - - fn stop(&mut self, element: &WebRTCSink) { - let signaller = self.imp(); - signaller.stop(element); - } - - fn session_ended(&mut self, element: &WebRTCSink, session_id: &str) { - let signaller = self.imp(); - signaller.end_session(element, session_id); - } -} - impl Default for AwsKvsSignaller { fn default() -> Self { glib::Object::new() diff --git a/net/webrtc/src/lib.rs b/net/webrtc/src/lib.rs index f3a97317..9b0376c4 100644 --- a/net/webrtc/src/lib.rs +++ b/net/webrtc/src/lib.rs @@ -11,7 +11,7 @@ use once_cell::sync::Lazy; use tokio::runtime; mod aws_kvs_signaller; -mod signaller; +pub mod signaller; pub mod utils; pub mod webrtcsink; pub mod webrtcsrc; diff --git a/net/webrtc/src/webrtcsrc/signaller/iface.rs b/net/webrtc/src/signaller/iface.rs similarity index 90% rename from net/webrtc/src/webrtcsrc/signaller/iface.rs rename to net/webrtc/src/signaller/iface.rs index 5da1bed2..d8a1d6bd 100644 --- a/net/webrtc/src/webrtcsrc/signaller/iface.rs +++ b/net/webrtc/src/signaller/iface.rs @@ -102,7 +102,11 @@ unsafe impl prelude::ObjectInterface for Signallable { * session */ Signal::builder("session-requested") - .param_types([str::static_type(), str::static_type()]) + .param_types([ + str::static_type(), + str::static_type(), + gst_webrtc::WebRTCSessionDescription::static_type(), + ]) .build(), /** * GstRSWebRTCSignallableIface::error: @@ -197,6 +201,34 @@ unsafe impl prelude::ObjectInterface for Signallable { None }) .build(), + /** + * GstRSWebRTCSignallableIface::shutdown: + * @self: The object implementing #GstRSWebRTCSignallableIface + */ + Signal::builder("shutdown").build(), + /** + * GstRSWebRTCSignallableIface::consumer-added: + * @self: The object implementing #GstRSWebRTCSignallableIface + * @peer_id: Id of the consumer + * @webrtcbin: The internal WebRTCBin element + * + * This signal can be used to tweak @webrtcbin, creating a data + * channel for example. + */ + Signal::builder("consumer-added") + .param_types([String::static_type(), gst::Element::static_type()]) + .build(), + /** + * GstRSWebRTCSignallableIface::consumer-removed: + * @consumer_id: Identifier of the consumer that was removed + * @webrtcbin: The webrtcbin connected to the newly removed consumer + * + * This signal is emitted right after the connection with a consumer + * has been dropped. + */ + glib::subclass::Signal::builder("consumer-removed") + .param_types([String::static_type(), gst::Element::static_type()]) + .build(), ] }); SIGNALS.as_ref() diff --git a/net/webrtc/src/signaller/imp.rs b/net/webrtc/src/signaller/imp.rs index 8995f280..650fb335 100644 --- a/net/webrtc/src/signaller/imp.rs +++ b/net/webrtc/src/signaller/imp.rs @@ -1,48 +1,51 @@ // SPDX-License-Identifier: MPL-2.0 -use crate::{webrtcsink::WebRTCSink, RUNTIME}; +use crate::signaller::{prelude::*, Signallable}; +use crate::utils::{gvalue_to_json, serialize_json_object}; +use crate::RUNTIME; use anyhow::{anyhow, Error}; use async_tungstenite::tungstenite::Message as WsMessage; use futures::channel::mpsc; use futures::prelude::*; +use gst::glib; use gst::glib::prelude::*; -use gst::glib::{self, Type}; -use gst::prelude::*; use gst::subclass::prelude::*; use gst_plugin_webrtc_protocol as p; use once_cell::sync::Lazy; -use std::collections::HashMap; -use std::path::PathBuf; +use std::collections::HashSet; +use std::ops::ControlFlow; +use std::str::FromStr; use std::sync::Mutex; -use tokio::task; +use std::time::Duration; +use tokio::{task, time::timeout}; +use url::Url; -static CAT: Lazy = Lazy::new(|| { - gst::DebugCategory::new( - "webrtcsink-signaller", - gst::DebugColorFlags::empty(), - Some("WebRTC sink signaller"), - ) -}); +use super::CAT; -#[derive(Default)] -struct State { - /// Sender for the websocket messages - websocket_sender: Option>, - send_task_handle: Option>>, - receive_task_handle: Option>, +#[derive(Debug, Eq, PartialEq, Clone, Copy, glib::Enum, Default)] +#[repr(u32)] +#[enum_type(name = "GstRSWebRTCSignallerRole")] +pub enum WebRTCSignallerRole { + #[default] + Consumer, + Producer, + Listener, } -#[derive(Clone)] -struct Settings { - address: Option, - cafile: Option, +pub struct Settings { + uri: Url, + producer_peer_id: Option, + cafile: Option, + role: WebRTCSignallerRole, } impl Default for Settings { fn default() -> Self { Self { - address: Some("ws://127.0.0.1:8443".to_string()), - cafile: None, + uri: Url::from_str("ws://127.0.0.1:8443").unwrap(), + producer_peer_id: None, + cafile: Default::default(), + role: Default::default(), } } } @@ -53,11 +56,61 @@ pub struct Signaller { settings: Mutex, } -impl Signaller { - async fn connect(&self, element: &WebRTCSink) -> Result<(), Error> { - let settings = self.settings.lock().unwrap().clone(); +#[derive(Default)] +struct State { + /// Sender for the websocket messages + websocket_sender: Option>, + send_task_handle: Option>>, + receive_task_handle: Option>, + producers: HashSet, +} - let connector = if let Some(path) = settings.cafile { +impl Signaller { + fn uri(&self) -> Url { + self.settings.lock().unwrap().uri.clone() + } + + fn set_uri(&self, uri: &str) -> Result<(), Error> { + let mut settings = self.settings.lock().unwrap(); + let mut uri = Url::from_str(uri).map_err(|err| anyhow!("{err:?}"))?; + + if let Some(peer_id) = uri + .query_pairs() + .find(|(k, _)| k == "peer-id") + .map(|v| v.1.to_string()) + { + if !matches!(settings.role, WebRTCSignallerRole::Consumer) { + gst::warning!( + CAT, + "Setting peer-id doesn't make sense for {:?}", + settings.role + ); + } else { + settings.producer_peer_id = Some(peer_id); + } + } + + if let Some(peer_id) = &settings.producer_peer_id { + uri.query_pairs_mut() + .clear() + .append_pair("peer-id", peer_id); + } + + settings.uri = uri; + + Ok(()) + } + + async fn connect(&self) -> Result<(), Error> { + let obj = self.obj(); + + let role = self.settings.lock().unwrap().role; + if let super::WebRTCSignallerRole::Consumer = role { + self.producer_peer_id() + .ok_or_else(|| anyhow!("No target producer peer id set"))?; + } + + let connector = if let Some(path) = obj.property::>("cafile") { let cert = tokio::fs::read_to_string(&path).await?; let cert = tokio_native_tls::native_tls::Certificate::from_pem(cert.as_bytes())?; let mut connector_builder = tokio_native_tls::native_tls::TlsConnector::builder(); @@ -67,178 +120,68 @@ impl Signaller { None }; - let (ws, _) = async_tungstenite::tokio::connect_async_with_tls_connector( - settings.address.unwrap(), - connector, + let mut uri = self.uri(); + uri.set_query(None); + let (ws, _) = timeout( + // FIXME: Make the timeout configurable + Duration::from_secs(20), + async_tungstenite::tokio::connect_async_with_tls_connector(uri.to_string(), connector), ) - .await?; + .await??; - gst::info!(CAT, obj: element, "connected"); + gst::info!(CAT, imp: self, "connected"); // Channel for asynchronously sending out websocket message let (mut ws_sink, mut ws_stream) = ws.split(); // 1000 is completely arbitrary, we simply don't want infinite piling // up of messages as with unbounded - let (mut websocket_sender, mut websocket_receiver) = - mpsc::channel::(1000); - let element_clone = element.downgrade(); - let send_task_handle = task::spawn(async move { - while let Some(msg) = websocket_receiver.next().await { - if let Some(element) = element_clone.upgrade() { - gst::trace!(CAT, obj: element, "Sending websocket message {:?}", msg); + let (websocket_sender, mut websocket_receiver) = mpsc::channel::(1000); + let send_task_handle = + RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move { + while let Some(msg) = websocket_receiver.next().await { + gst::log!(CAT, "Sending websocket message {:?}", msg); + ws_sink + .send(WsMessage::Text(serde_json::to_string(&msg).unwrap())) + .await?; } - ws_sink - .send(WsMessage::Text(serde_json::to_string(&msg).unwrap())) - .await?; - } - if let Some(element) = element_clone.upgrade() { - gst::info!(CAT, obj: element, "Done sending"); - } + let msg = "Done sending"; + this.map_or_else(|| gst::info!(CAT, "{msg}"), + |this| gst::info!(CAT, imp: this, "{msg}") + ); - ws_sink.send(WsMessage::Close(None)).await?; - ws_sink.close().await?; + ws_sink.send(WsMessage::Close(None)).await?; + ws_sink.close().await?; - Ok::<(), Error>(()) - }); + Ok::<(), Error>(()) + })); - let meta = if let Some(meta) = element.property::>("meta") { - serialize_value(&meta.to_value()) - } else { - None - }; - websocket_sender - .send(p::IncomingMessage::SetPeerStatus(p::PeerStatus { - roles: vec![p::PeerRole::Producer], - meta, - peer_id: None, - })) - .await?; + let obj = self.obj(); + let meta = + if let Some(meta) = obj.emit_by_name::>("request-meta", &[]) { + gvalue_to_json(&meta.to_value()) + } else { + None + }; - let element_clone = element.downgrade(); - let receive_task_handle = task::spawn(async move { - while let Some(msg) = tokio_stream::StreamExt::next(&mut ws_stream).await { - if let Some(element) = element_clone.upgrade() { - match msg { - Ok(WsMessage::Text(msg)) => { - gst::trace!(CAT, obj: element, "Received message {}", msg); - - if let Ok(msg) = serde_json::from_str::(&msg) { - match msg { - p::OutgoingMessage::Welcome { peer_id } => { - gst::info!( - CAT, - obj: element, - "We are registered with the server, our peer id is {}", - peer_id - ); - } - p::OutgoingMessage::StartSession { - session_id, - peer_id, - } => { - if let Err(err) = - element.start_session(&session_id, &peer_id, None) - { - gst::warning!(CAT, obj: element, "{}", err); - } - } - p::OutgoingMessage::EndSession(session_info) => { - if let Err(err) = - element.end_session(&session_info.session_id) - { - gst::warning!(CAT, obj: element, "{}", err); - } - } - p::OutgoingMessage::Peer(p::PeerMessage { - session_id, - peer_message, - }) => match peer_message { - p::PeerMessageInner::Sdp(p::SdpMessage::Answer { sdp }) => { - if let Err(err) = element.handle_sdp( - &session_id, - &gst_webrtc::WebRTCSessionDescription::new( - gst_webrtc::WebRTCSDPType::Answer, - gst_sdp::SDPMessage::parse_buffer( - sdp.as_bytes(), - ) - .unwrap(), - ), - ) { - gst::warning!(CAT, obj: element, "{}", err); - } - } - p::PeerMessageInner::Sdp(p::SdpMessage::Offer { - .. - }) => { - gst::warning!( - CAT, - obj: element, - "Ignoring offer from peer" - ); - } - p::PeerMessageInner::Ice { - candidate, - sdp_m_line_index, - } => { - if let Err(err) = element.handle_ice( - &session_id, - Some(sdp_m_line_index), - None, - &candidate, - ) { - gst::warning!(CAT, obj: element, "{}", err); - } - } - }, - _ => { - gst::warning!( - CAT, - obj: element, - "Ignoring unsupported message {:?}", - msg - ); - } - } - } else { - gst::error!( - CAT, - obj: element, - "Unknown message from server: {}", - msg - ); - element.handle_signalling_error( - anyhow!("Unknown message from server: {}", msg).into(), - ); - } - } - Ok(WsMessage::Close(reason)) => { - gst::info!( - CAT, - obj: element, - "websocket connection closed: {:?}", - reason - ); - break; - } - Ok(_) => (), - Err(err) => { - element.handle_signalling_error( - anyhow!("Error receiving: {}", err).into(), - ); + let receive_task_handle = + RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move { + while let Some(msg) = tokio_stream::StreamExt::next(&mut ws_stream).await { + if let Some(ref this) = this { + if let ControlFlow::Break(_) = this.handle_message(msg, &meta) { break; } + } else { + break; } - } else { - break; } - } - if let Some(element) = element_clone.upgrade() { - gst::info!(CAT, obj: element, "Stopped websocket receiving"); - } - }); + let msg = "Stopped websocket receiving"; + this.map_or_else(|| gst::info!(CAT, "{msg}"), + |this| gst::info!(CAT, imp: this, "{msg}") + ); + })); let mut state = self.state.lock().unwrap(); state.websocket_sender = Some(websocket_sender); @@ -248,53 +191,372 @@ impl Signaller { Ok(()) } - pub fn start(&self, element: &WebRTCSink) { - let this = self.obj().clone(); - let element_clone = element.clone(); - task::spawn(async move { - let this = this.imp(); - if let Err(err) = this.connect(&element_clone).await { - element_clone.handle_signalling_error(err.into()); - } - }); + fn set_status(&self, meta: &Option, peer_id: &str) { + let role = self.settings.lock().unwrap().role; + self.send(p::IncomingMessage::SetPeerStatus(match role { + super::WebRTCSignallerRole::Consumer => p::PeerStatus { + meta: meta.clone(), + peer_id: Some(peer_id.to_string()), + roles: vec![], + }, + super::WebRTCSignallerRole::Producer => p::PeerStatus { + meta: meta.clone(), + peer_id: Some(peer_id.to_string()), + roles: vec![p::PeerRole::Producer], + }, + super::WebRTCSignallerRole::Listener => p::PeerStatus { + meta: meta.clone(), + peer_id: Some(peer_id.to_string()), + roles: vec![p::PeerRole::Listener], + }, + })); } - pub fn handle_sdp( - &self, - element: &WebRTCSink, - session_id: &str, - sdp: &gst_webrtc::WebRTCSessionDescription, - ) { - let state = self.state.lock().unwrap(); + fn producer_peer_id(&self) -> Option { + let settings = self.settings.lock().unwrap(); - let msg = p::IncomingMessage::Peer(p::PeerMessage { - session_id: session_id.to_string(), - peer_message: p::PeerMessageInner::Sdp(p::SdpMessage::Offer { - sdp: sdp.sdp().as_text().unwrap(), - }), + settings.producer_peer_id.clone() + } + + fn send(&self, msg: p::IncomingMessage) { + let state = self.state.lock().unwrap(); + if let Some(mut sender) = state.websocket_sender.clone() { + RUNTIME.spawn(glib::clone!(@weak self as this => async move { + if let Err(err) = sender.send(msg).await { + this.obj().emit_by_name::<()>("error", &[&format!("Error: {}", err)]); + } + })); + } + } + + pub fn start_session(&self) { + let role = self.settings.lock().unwrap().role; + if matches!(role, super::WebRTCSignallerRole::Consumer) { + let target_producer = self.producer_peer_id().unwrap(); + + self.send(p::IncomingMessage::StartSession(p::StartSessionMessage { + peer_id: target_producer.clone(), + })); + + gst::info!( + CAT, + imp: self, + "Started session with producer peer id {target_producer}", + ); + } + } + + fn handle_message( + &self, + msg: Result, + meta: &Option, + ) -> ControlFlow<()> { + match msg { + Ok(WsMessage::Text(msg)) => { + gst::trace!(CAT, imp: self, "Received message {}", msg); + + if let Ok(msg) = serde_json::from_str::(&msg) { + match msg { + p::OutgoingMessage::Welcome { peer_id } => { + self.set_status(meta, &peer_id); + self.start_session(); + } + p::OutgoingMessage::PeerStatusChanged(p::PeerStatus { + meta, + roles, + peer_id, + }) => { + let meta = meta.and_then(|m| match m { + serde_json::Value::Object(v) => Some(serialize_json_object(&v)), + _ => { + gst::error!(CAT, imp: self, "Invalid json value: {m:?}"); + None + } + }); + + let peer_id = + peer_id.expect("Status changed should always contain a peer ID"); + let mut state = self.state.lock().unwrap(); + if roles.iter().any(|r| matches!(r, p::PeerRole::Producer)) { + if !state.producers.contains(&peer_id) { + state.producers.insert(peer_id.clone()); + drop(state); + + self.obj() + .emit_by_name::<()>("producer-added", &[&peer_id, &meta]); + } + } else if state.producers.remove(&peer_id) { + drop(state); + + self.obj() + .emit_by_name::<()>("producer-removed", &[&peer_id, &meta]); + } + } + p::OutgoingMessage::SessionStarted { + peer_id, + session_id, + } => { + self.obj() + .emit_by_name::<()>("session-started", &[&session_id, &peer_id]); + } + p::OutgoingMessage::StartSession { + session_id, + peer_id, + } => { + assert!(matches!( + self.obj().property::("role"), + super::WebRTCSignallerRole::Producer + )); + + self.obj().emit_by_name::<()>( + "session-requested", + &[ + &session_id, + &peer_id, + &None::, + ], + ); + } + p::OutgoingMessage::EndSession(p::EndSessionMessage { session_id }) => { + gst::info!(CAT, imp: self, "Session {session_id} ended"); + + self.obj() + .emit_by_name::<()>("session-ended", &[&session_id]); + } + p::OutgoingMessage::Peer(p::PeerMessage { + session_id, + peer_message, + }) => match peer_message { + p::PeerMessageInner::Sdp(reply) => { + let (sdp, desc_type) = match reply { + p::SdpMessage::Answer { sdp } => { + (sdp, gst_webrtc::WebRTCSDPType::Answer) + } + p::SdpMessage::Offer { sdp } => { + (sdp, gst_webrtc::WebRTCSDPType::Offer) + } + }; + let sdp = match gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()) { + Ok(sdp) => sdp, + Err(err) => { + self.obj().emit_by_name::<()>( + "error", + &[&format!("Error parsing SDP: {sdp} {err:?}")], + ); + + return ControlFlow::Break(()); + } + }; + + let desc = + gst_webrtc::WebRTCSessionDescription::new(desc_type, sdp); + self.obj().emit_by_name::<()>( + "session-description", + &[&session_id, &desc], + ); + } + p::PeerMessageInner::Ice { + candidate, + sdp_m_line_index, + } => { + let sdp_mid: Option = None; + self.obj().emit_by_name::<()>( + "handle-ice", + &[&session_id, &sdp_m_line_index, &sdp_mid, &candidate], + ); + } + }, + p::OutgoingMessage::Error { details } => { + self.obj().emit_by_name::<()>( + "error", + &[&format!("Error message from server: {details}")], + ); + } + _ => { + gst::warning!(CAT, imp: self, "Ignoring unsupported message {:?}", msg); + } + } + } else { + gst::error!(CAT, imp: self, "Unknown message from server: {}", msg); + + self.obj().emit_by_name::<()>( + "error", + &[&format!("Unknown message from server: {}", msg)], + ); + } + } + Ok(WsMessage::Close(reason)) => { + gst::info!(CAT, imp: self, "websocket connection closed: {:?}", reason); + return ControlFlow::Break(()); + } + Ok(_) => (), + Err(err) => { + self.obj() + .emit_by_name::<()>("error", &[&format!("Error receiving: {}", err)]); + return ControlFlow::Break(()); + } + } + ControlFlow::Continue(()) + } +} + +#[glib::object_subclass] +impl ObjectSubclass for Signaller { + const NAME: &'static str = "GstWebRTCSignaller"; + type Type = super::Signaller; + type ParentType = glib::Object; + type Interfaces = (Signallable,); +} + +impl ObjectImpl for Signaller { + fn properties() -> &'static [glib::ParamSpec] { + static PROPS: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecString::builder("uri") + .flags(glib::ParamFlags::READWRITE) + .build(), + glib::ParamSpecString::builder("producer-peer-id") + .flags(glib::ParamFlags::READWRITE) + .build(), + glib::ParamSpecString::builder("cafile") + .flags(glib::ParamFlags::READWRITE) + .build(), + glib::ParamSpecEnum::builder_with_default("role", WebRTCSignallerRole::Consumer) + .flags(glib::ParamFlags::READWRITE) + .build(), + ] }); - if let Some(mut sender) = state.websocket_sender.clone() { - let element = element.downgrade(); - RUNTIME.spawn(async move { - if let Err(err) = sender.send(msg).await { - if let Some(element) = element.upgrade() { - element.handle_signalling_error(anyhow!("Error: {}", err).into()); + PROPS.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "uri" => { + if let Err(e) = self.set_uri(value.get::<&str>().expect("type checked upstream")) { + gst::error!(CAT, "Couldn't set URI: {e:?}"); + } + } + "producer-peer-id" => { + let mut settings = self.settings.lock().unwrap(); + + if !matches!(settings.role, WebRTCSignallerRole::Consumer) { + gst::warning!( + CAT, + "Setting `producer-peer-id` doesn't make sense for {:?}", + settings.role + ); + } else { + settings.producer_peer_id = value + .get::>() + .expect("type checked upstream"); + } + } + "cafile" => { + self.settings.lock().unwrap().cafile = value + .get::>() + .expect("type checked upstream") + } + "role" => { + self.settings.lock().unwrap().role = value + .get::() + .expect("type checked upstream") + } + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + let settings = self.settings.lock().unwrap(); + match pspec.name() { + "uri" => settings.uri.to_string().to_value(), + "producer-peer-id" => { + if !matches!(settings.role, WebRTCSignallerRole::Consumer) { + gst::warning!( + CAT, + "`producer-peer-id` doesn't make sense for {:?}", + settings.role + ); + } + + settings.producer_peer_id.to_value() + } + "cafile" => settings.cafile.to_value(), + "role" => settings.role.to_value(), + _ => unimplemented!(), + } + } +} + +impl SignallableImpl for Signaller { + fn start(&self) { + gst::info!(CAT, imp: self, "Starting"); + RUNTIME.spawn(glib::clone!(@weak self as this => async move { + if let Err(err) = this.connect().await { + this.obj().emit_by_name::<()>("error", &[&format!("Error receiving: {}", err)]); + } + })); + } + + fn stop(&self) { + gst::info!(CAT, imp: self, "Stopping now"); + + let mut state = self.state.lock().unwrap(); + let send_task_handle = state.send_task_handle.take(); + let receive_task_handle = state.receive_task_handle.take(); + if let Some(mut sender) = state.websocket_sender.take() { + RUNTIME.block_on(async move { + sender.close_channel(); + + if let Some(handle) = send_task_handle { + if let Err(err) = handle.await { + gst::warning!(CAT, imp: self, "Error while joining send task: {}", err); + } + } + + if let Some(handle) = receive_task_handle { + if let Err(err) = handle.await { + gst::warning!(CAT, imp: self, "Error while joining receive task: {}", err); } } }); } } - pub fn handle_ice( + fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) { + gst::debug!(CAT, imp: self, "Sending SDP {sdp:#?}"); + + let role = self.settings.lock().unwrap().role; + let is_consumer = matches!(role, super::WebRTCSignallerRole::Consumer); + + let msg = p::IncomingMessage::Peer(p::PeerMessage { + session_id: session_id.to_owned(), + peer_message: p::PeerMessageInner::Sdp(if is_consumer { + p::SdpMessage::Answer { + sdp: sdp.sdp().as_text().unwrap(), + } + } else { + p::SdpMessage::Offer { + sdp: sdp.sdp().as_text().unwrap(), + } + }), + }); + + self.send(msg); + } + + fn add_ice( &self, - element: &WebRTCSink, session_id: &str, candidate: &str, sdp_m_line_index: Option, _sdp_mid: Option, ) { - let state = self.state.lock().unwrap(); + gst::debug!( + CAT, + imp: self, + "Adding ice candidate {candidate:?} for {sdp_m_line_index:?} on session {session_id}" + ); let msg = p::IncomingMessage::Peer(p::PeerMessage { session_id: session_id.to_string(), @@ -304,185 +566,27 @@ impl Signaller { }, }); - if let Some(mut sender) = state.websocket_sender.clone() { - let element = element.downgrade(); - RUNTIME.spawn(async move { - if let Err(err) = sender.send(msg).await { - if let Some(element) = element.upgrade() { - element.handle_signalling_error(anyhow!("Error: {}", err).into()); - } - } - }); - } + self.send(msg); } - pub fn stop(&self, element: &WebRTCSink) { - gst::info!(CAT, obj: element, "Stopping now"); - - let mut state = self.state.lock().unwrap(); - let send_task_handle = state.send_task_handle.take(); - let receive_task_handle = state.receive_task_handle.take(); - if let Some(mut sender) = state.websocket_sender.take() { - let element = element.downgrade(); - RUNTIME.block_on(async move { - sender.close_channel(); - - if let Some(handle) = send_task_handle { - if let Err(err) = handle.await { - if let Some(element) = element.upgrade() { - gst::warning!( - CAT, - obj: element, - "Error while joining send task: {}", - err - ); - } - } - } - - if let Some(handle) = receive_task_handle { - if let Err(err) = handle.await { - if let Some(element) = element.upgrade() { - gst::warning!( - CAT, - obj: element, - "Error while joining receive task: {}", - err - ); - } - } - } - }); - } - } - - pub fn end_session(&self, element: &WebRTCSink, session_id: &str) { - gst::debug!(CAT, obj: element, "Signalling session {} ended", session_id); + fn end_session(&self, session_id: &str) { + gst::debug!(CAT, imp: self, "Signalling session done {}", session_id); let state = self.state.lock().unwrap(); let session_id = session_id.to_string(); - let element = element.downgrade(); if let Some(mut sender) = state.websocket_sender.clone() { - RUNTIME.spawn(async move { + RUNTIME.spawn(glib::clone!(@weak self as this => async move { if let Err(err) = sender .send(p::IncomingMessage::EndSession(p::EndSessionMessage { - session_id: session_id.to_string(), + session_id, })) .await { - if let Some(element) = element.upgrade() { - element.handle_signalling_error(anyhow!("Error: {}", err).into()); - } + this.obj().emit_by_name::<()>("error", &[&format!("Error: {}", err)]); } - }); + })); } } } -#[glib::object_subclass] -impl ObjectSubclass for Signaller { - const NAME: &'static str = "GstWebRTCSinkSignaller"; - type Type = super::Signaller; - type ParentType = glib::Object; -} - -impl ObjectImpl for Signaller { - fn properties() -> &'static [glib::ParamSpec] { - static PROPERTIES: Lazy> = Lazy::new(|| { - vec![ - glib::ParamSpecString::builder("address") - .nick("Address") - .blurb("Address of the signalling server") - .default_value("ws://127.0.0.1:8443") - .build(), - glib::ParamSpecString::builder("cafile") - .nick("CA file") - .blurb("Path to a Certificate file to add to the set of roots the TLS connector will trust") - .build(), - ] - }); - - PROPERTIES.as_ref() - } - - fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { - match pspec.name() { - "address" => { - let address: Option<_> = value.get().expect("type checked upstream"); - - if let Some(address) = address { - gst::info!(CAT, "Signaller address set to {}", address); - - let mut settings = self.settings.lock().unwrap(); - settings.address = Some(address); - } else { - gst::error!(CAT, "address can't be None"); - } - } - "cafile" => { - let value: String = value.get().unwrap(); - let mut settings = self.settings.lock().unwrap(); - settings.cafile = Some(value.into()); - } - _ => unimplemented!(), - } - } - - fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { - match pspec.name() { - "address" => self.settings.lock().unwrap().address.to_value(), - "cafile" => { - let settings = self.settings.lock().unwrap(); - let cafile = settings.cafile.as_ref(); - cafile.and_then(|file| file.to_str()).to_value() - } - _ => unimplemented!(), - } - } -} - -fn serialize_value(val: &gst::glib::Value) -> Option { - match val.type_() { - Type::STRING => Some(val.get::().unwrap().into()), - Type::BOOL => Some(val.get::().unwrap().into()), - Type::I32 => Some(val.get::().unwrap().into()), - Type::U32 => Some(val.get::().unwrap().into()), - Type::I_LONG | Type::I64 => Some(val.get::().unwrap().into()), - Type::U_LONG | Type::U64 => Some(val.get::().unwrap().into()), - Type::F32 => Some(val.get::().unwrap().into()), - Type::F64 => Some(val.get::().unwrap().into()), - _ => { - if let Ok(s) = val.get::() { - serde_json::to_value( - s.iter() - .filter_map(|(name, value)| { - serialize_value(value).map(|value| (name.to_string(), value)) - }) - .collect::>(), - ) - .ok() - } else if let Ok(a) = val.get::() { - serde_json::to_value( - a.iter() - .filter_map(|value| serialize_value(value)) - .collect::>(), - ) - .ok() - } else if let Some((_klass, values)) = gst::glib::FlagsValue::from_value(val) { - Some( - values - .iter() - .map(|value| value.nick()) - .collect::>() - .join("+") - .into(), - ) - } else if let Ok(value) = val.serialize() { - Some(value.as_str().into()) - } else { - gst::warning!(CAT, "Can't convert {} to json", val.type_().name()); - None - } - } - } -} +impl GstObjectImpl for Signaller {} diff --git a/net/webrtc/src/signaller/mod.rs b/net/webrtc/src/signaller/mod.rs index 5aac4fae..ef337be3 100644 --- a/net/webrtc/src/signaller/mod.rs +++ b/net/webrtc/src/signaller/mod.rs @@ -1,64 +1,46 @@ -// SPDX-License-Identifier: MPL-2.0 - -use crate::webrtcsink::{Signallable, WebRTCSink}; -use gst::glib; -use gst::subclass::prelude::*; -use std::error::Error; - +mod iface; mod imp; +use gst::glib; -glib::wrapper! { - pub struct Signaller(ObjectSubclass); +use once_cell::sync::Lazy; +// Expose traits and objects from the module itself so it exactly looks like +// generated bindings +pub use imp::WebRTCSignallerRole; +pub mod prelude { + pub use {super::SignallableExt, super::SignallableImpl}; } -unsafe impl Send for Signaller {} -unsafe impl Sync for Signaller {} +pub static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "webrtcsrc-signaller", + gst::DebugColorFlags::empty(), + Some("WebRTC src signaller"), + ) +}); -impl Signallable for Signaller { - fn start(&mut self, element: &WebRTCSink) -> Result<(), Box> { - let signaller = self.imp(); - signaller.start(element); +glib::wrapper! { + pub struct Signallable(ObjectInterface); +} - Ok(()) - } - - fn handle_sdp( - &mut self, - element: &WebRTCSink, - peer_id: &str, - sdp: &gst_webrtc::WebRTCSessionDescription, - ) -> Result<(), Box> { - let signaller = self.imp(); - signaller.handle_sdp(element, peer_id, sdp); - Ok(()) - } - - fn handle_ice( - &mut self, - element: &WebRTCSink, - session_id: &str, - candidate: &str, - sdp_mline_index: Option, - sdp_mid: Option, - ) -> Result<(), Box> { - let signaller = self.imp(); - signaller.handle_ice(element, session_id, candidate, sdp_mline_index, sdp_mid); - Ok(()) - } - - fn stop(&mut self, element: &WebRTCSink) { - let signaller = self.imp(); - signaller.stop(element); - } - - fn session_ended(&mut self, element: &WebRTCSink, session_id: &str) { - let signaller = self.imp(); - signaller.end_session(element, session_id); - } +glib::wrapper! { + pub struct Signaller(ObjectSubclass ) @implements Signallable; } impl Default for Signaller { fn default() -> Self { - glib::Object::new() + glib::Object::builder().build() } } + +impl Signaller { + pub fn new(mode: WebRTCSignallerRole) -> Self { + glib::Object::builder().property("role", &mode).build() + } +} + +pub use iface::SignallableExt; +pub use iface::SignallableImpl; +pub use iface::SignallableImplExt; + +unsafe impl Send for Signallable {} +unsafe impl Sync for Signallable {} diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index 621c7b56..ee391621 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -20,7 +20,7 @@ use std::sync::Mutex; use super::homegrown_cc::CongestionController; use super::{WebRTCSinkCongestionControl, WebRTCSinkError, WebRTCSinkMitigationMode}; use crate::aws_kvs_signaller::AwsKvsSignaller; -use crate::signaller::Signaller as DefaultSignaller; +use crate::signaller::{prelude::*, Signallable, Signaller, WebRTCSignallerRole}; use crate::RUNTIME; use std::collections::BTreeMap; @@ -187,9 +187,21 @@ struct NavigationEvent { event: gst_video::NavigationEvent, } +// Used to ensure signal are disconnected when a new signaller is is +#[allow(dead_code)] +struct SignallerSignals { + error: glib::SignalHandlerId, + request_meta: glib::SignalHandlerId, + session_requested: glib::SignalHandlerId, + session_ended: glib::SignalHandlerId, + session_description: glib::SignalHandlerId, + handle_ice: glib::SignalHandlerId, + shutdown: glib::SignalHandlerId, +} + /* Our internal state */ struct State { - signaller: Box, + signaller: Signallable, signaller_state: SignallerState, sessions: HashMap, codecs: BTreeMap, @@ -205,6 +217,7 @@ struct State { streams: HashMap, navigation_handler: Option, mids: HashMap, + signaller_signals: Option, } fn create_navigation_event(sink: &super::WebRTCSink, msg: &str) { @@ -303,10 +316,10 @@ impl Default for Settings { impl Default for State { fn default() -> Self { - let signaller = DefaultSignaller::default(); + let signaller = Signaller::new(WebRTCSignallerRole::Producer); Self { - signaller: Box::new(signaller), + signaller: signaller.upcast(), signaller_state: SignallerState::Stopped, sessions: HashMap::new(), codecs: BTreeMap::new(), @@ -318,6 +331,7 @@ impl Default for State { streams: HashMap::new(), navigation_handler: None, mids: HashMap::new(), + signaller_signals: Default::default(), } } } @@ -756,12 +770,7 @@ impl VideoEncoder { } impl State { - fn finalize_session( - &mut self, - element: &super::WebRTCSink, - session: &mut Session, - signal: bool, - ) { + fn finalize_session(&mut self, session: &mut Session, signal: bool) { gst::info!(CAT, "Ending session {}", session.id); session.pipeline.debug_to_dot_file_with_ts( gst::DebugGraphDetails::all(), @@ -777,18 +786,13 @@ impl State { }); if signal { - self.signaller.session_ended(element, &session.peer_id); + self.signaller.end_session(&session.id); } } - fn end_session( - &mut self, - element: &super::WebRTCSink, - session_id: &str, - signal: bool, - ) -> Option { + fn end_session(&mut self, session_id: &str, signal: bool) -> Option { if let Some(mut session) = self.sessions.remove(session_id) { - self.finalize_session(element, &mut session, signal); + self.finalize_session(&mut session, signal); Some(session) } else { None @@ -800,23 +804,13 @@ impl State { && element.current_state() >= gst::State::Paused && self.codec_discovery_done { - if let Err(err) = self.signaller.start(element) { - gst::error!(CAT, obj: element, "error: {}", err); - gst::element_error!( - element, - gst::StreamError::Failed, - ["Failed to start signaller {}", err] - ); - } else { - gst::info!(CAT, "Started signaller"); - self.signaller_state = SignallerState::Started; - } + self.signaller.start(); } } - fn maybe_stop_signaller(&mut self, element: &super::WebRTCSink) { + fn maybe_stop_signaller(&mut self, _element: &super::WebRTCSink) { if self.signaller_state == SignallerState::Started { - self.signaller.stop(element); + self.signaller.stop(); self.signaller_state = SignallerState::Stopped; gst::info!(CAT, "Stopped signaller"); } @@ -1380,7 +1374,7 @@ impl WebRTCSink { let session_ids: Vec<_> = state.sessions.keys().map(|k| k.to_owned()).collect(); for id in session_ids { - state.end_session(element, &id, true); + state.end_session(&id, true); } state @@ -1406,10 +1400,100 @@ impl WebRTCSink { Ok(()) } + fn connect_signaller(&self, signaler: &Signallable) { + let instance = &*self.obj(); + + let _ = self.state.lock().unwrap().signaller_signals.insert(SignallerSignals { + error: signaler.connect_closure( + "error", + false, + glib::closure!(@watch instance => move |_signaler: glib::Object, error: String| { + gst::element_error!( + instance, + gst::StreamError::Failed, + ["Signalling error: {}", error] + ); + }) + ), + + request_meta: signaler.connect_closure( + "request-meta", + false, + glib::closure!(@watch instance => move |_signaler: glib::Object| -> Option { + let meta = instance.imp().settings.lock().unwrap().meta.clone(); + + meta + }) + ), + + session_requested: signaler.connect_closure( + "session-requested", + false, + glib::closure!(@watch instance => move |_signaler: glib::Object, session_id: &str, peer_id: &str, offer: Option<&gst_webrtc::WebRTCSessionDescription>|{ + if let Err(err) = instance.imp().start_session(session_id, peer_id, offer) { + gst::warning!(CAT, "{}", err); + } + }) + ), + + session_description: signaler.connect_closure( + "session-description", + false, + glib::closure!(@watch instance => move | + _signaler: glib::Object, + peer_id: &str, + session_description: &gst_webrtc::WebRTCSessionDescription| { + + if session_description.type_() == gst_webrtc::WebRTCSDPType::Answer { + instance.imp().handle_sdp_answer(instance, peer_id, session_description); + } else { + gst::error!(CAT, obj: instance, "Unsupported SDP Type"); + } + } + ), + ), + + handle_ice: signaler.connect_closure( + "handle-ice", + false, + glib::closure!(@watch instance => move | + _signaler: glib::Object, + session_id: &str, + sdp_m_line_index: u32, + _sdp_mid: Option, + candidate: &str| { + instance + .imp() + .handle_ice(session_id, Some(sdp_m_line_index), None, candidate); + }), + ), + + session_ended: signaler.connect_closure( + "session-ended", + false, + glib::closure!(@watch instance => move |_signaler: glib::Object, session_id: &str|{ + if let Err(err) = instance.imp().remove_session(instance, session_id, false) { + gst::warning!(CAT, "{}", err); + } + }) + ), + + shutdown: signaler.connect_closure( + "shutdown", + false, + glib::closure!(@watch instance => move |_signaler: glib::Object|{ + instance.imp().shutdown(instance); + }) + ), + }); + } + /// When using a custom signaller - pub fn set_signaller(&self, signaller: Box) -> Result<(), Error> { + pub fn set_signaller(&self, signaller: Signallable) -> Result<(), Error> { + let sigobj = signaller.clone(); let mut state = self.state.lock().unwrap(); + self.connect_signaller(&sigobj); state.signaller = signaller; Ok(()) @@ -1434,27 +1518,18 @@ impl WebRTCSink { fn on_offer_created( &self, - element: &super::WebRTCSink, + _element: &super::WebRTCSink, offer: gst_webrtc::WebRTCSessionDescription, session_id: &str, ) { - let mut state = self.state.lock().unwrap(); + let state = self.state.lock().unwrap(); if let Some(session) = state.sessions.get(session_id) { session .webrtcbin .emit_by_name::<()>("set-local-description", &[&offer, &None::]); - if let Err(err) = state.signaller.handle_sdp(element, session_id, &offer) { - gst::warning!( - CAT, - "Failed to handle SDP for session {}: {}", - session_id, - err - ); - - state.end_session(element, session_id, true); - } + state.signaller.send_sdp(session_id, &offer); } } @@ -1482,24 +1557,14 @@ impl WebRTCSink { .webrtcbin .emit_by_name::<()>("set-local-description", &[&answer, &None::]); - if let Err(err) = state.signaller.handle_sdp(element, session_id, &answer) { - gst::warning!( - CAT, - "Failed to handle SDP for session {}: {}", - session_id, - err - ); + state.signaller.send_sdp(session_id, &answer); + let session_id = session.id.clone(); - state.finalize_session(element, &mut session, true); - } else { - let session_id = session.id.clone(); + state.sessions.insert(session.id.clone(), session); - state.sessions.insert(session.id.clone(), session); + drop(state); - drop(state); - - self.on_remote_description_set(element, session_id) - } + self.on_remote_description_set(element, session_id) } } @@ -1802,34 +1867,20 @@ impl WebRTCSink { fn on_ice_candidate( &self, - element: &super::WebRTCSink, + _element: &super::WebRTCSink, session_id: String, sdp_m_line_index: u32, candidate: String, ) { - let mut state = self.state.lock().unwrap(); - if let Err(err) = state.signaller.handle_ice( - element, - &session_id, - &candidate, - Some(sdp_m_line_index), - None, - ) { - gst::warning!( - CAT, - "Failed to handle ICE in session {}: {}", - session_id, - err - ); - - state.end_session(element, &session_id, true); - } + let state = self.state.lock().unwrap(); + state + .signaller + .add_ice(&session_id, &candidate, Some(sdp_m_line_index), None) } /// Called by the signaller to add a new session pub fn start_session( &self, - element: &super::WebRTCSink, session_id: &str, peer_id: &str, offer: Option<&gst_webrtc::WebRTCSessionDescription>, @@ -1838,6 +1889,7 @@ impl WebRTCSink { let mut state = self.state.lock().unwrap(); let peer_id = peer_id.to_string(); let session_id = session_id.to_string(); + let element = self.obj().clone(); if state.sessions.contains_key(&session_id) { return Err(WebRTCSinkError::DuplicateSessionId(session_id)); @@ -2167,16 +2219,16 @@ impl WebRTCSink { }); if settings.enable_data_channel_navigation { - state.navigation_handler = Some(NavigationEventHandler::new(element, &webrtcbin)); + state.navigation_handler = Some(NavigationEventHandler::new(&element, &webrtcbin)); } state.sessions.insert(session_id.to_string(), session); - let element_clone = element.downgrade(); let mut streams: Vec = state.streams.values().cloned().collect(); streams.sort_by_key(|s| s.serial); + let element_clone = element.downgrade(); let offer_clone = offer.cloned(); RUNTIME.spawn(async move { if let Some(element) = element_clone.upgrade() { @@ -2275,6 +2327,12 @@ impl WebRTCSink { // so that application code can create data channels at the correct // moment. element.emit_by_name::<()>("consumer-added", &[&peer_id, &webrtcbin]); + { + let state = this.state.lock().unwrap(); + state + .signaller + .emit_by_name::<()>("consumer-added", &[&peer_id, &webrtcbin]); + } // We don't connect to on-negotiation-needed, this in order to call the above // signal without holding the state lock: @@ -2315,7 +2373,10 @@ impl WebRTCSink { return Err(WebRTCSinkError::NoSessionWithId(session_id.to_string())); } - if let Some(session) = state.end_session(element, session_id, signal) { + if let Some(session) = state.end_session(session_id, signal) { + state + .signaller + .emit_by_name::<()>("consumer-removed", &[&session.peer_id, &session.webrtcbin]); drop(state); element.emit_by_name::<()>("consumer-removed", &[&session.peer_id, &session.webrtcbin]); } @@ -2482,7 +2543,7 @@ impl WebRTCSink { }); if remove { - state.finalize_session(element, &mut session, true); + state.finalize_session(&mut session, true); } else { state.sessions.insert(session.id.clone(), session); } @@ -2492,34 +2553,37 @@ impl WebRTCSink { /// Called by the signaller with an ice candidate pub fn handle_ice( &self, - _element: &super::WebRTCSink, session_id: &str, sdp_m_line_index: Option, _sdp_mid: Option, candidate: &str, - ) -> Result<(), WebRTCSinkError> { + ) { let state = self.state.lock().unwrap(); - let sdp_m_line_index = sdp_m_line_index.ok_or(WebRTCSinkError::MandatorySdpMlineIndex)?; + let sdp_m_line_index = match sdp_m_line_index { + Some(sdp_m_line_index) => sdp_m_line_index, + None => { + gst::warning!(CAT, "No mandatory SDP m-line index"); + return; + } + }; if let Some(session) = state.sessions.get(session_id) { gst::trace!(CAT, "adding ice candidate for session {}", session_id); session .webrtcbin .emit_by_name::<()>("add-ice-candidate", &[&sdp_m_line_index, &candidate]); - Ok(()) } else { - Err(WebRTCSinkError::NoSessionWithId(session_id.to_string())) + gst::warning!(CAT, "No consumer with ID {session_id}"); } } - /// Called by the signaller with an answer to our offer - pub fn handle_sdp( + pub fn handle_sdp_answer( &self, element: &super::WebRTCSink, session_id: &str, desc: &gst_webrtc::WebRTCSessionDescription, - ) -> Result<(), WebRTCSinkError> { + ) { let mut state = self.state.lock().unwrap(); if let Some(session) = state.sessions.get_mut(session_id) { @@ -2544,12 +2608,14 @@ impl WebRTCSink { media_idx, media_str ); - state.end_session(element, session_id, true); + state.end_session(session_id, true); - return Err(WebRTCSinkError::ConsumerRefusedMedia { - session_id: session_id.to_string(), - media_idx, - }); + gst::warning!( + CAT, + obj: element, + "Consumer refused media {session_id}, {media_idx}" + ); + return; } } @@ -2568,12 +2634,10 @@ impl WebRTCSink { session_id, ); - state.end_session(element, session_id, true); + state.end_session(session_id, true); - return Err(WebRTCSinkError::ConsumerNoValidPayload { - session_id: session_id.to_string(), - media_idx, - }); + gst::warning!(CAT, obj: element, "Consumer did not provide valid payload for media sesion: {session_id} media_ix: {media_idx}"); + return; } } @@ -2592,10 +2656,8 @@ impl WebRTCSink { session .webrtcbin .emit_by_name::<()>("set-remote-description", &[desc, &promise]); - - Ok(()) } else { - Err(WebRTCSinkError::NoSessionWithId(session_id.to_string())) + gst::warning!(CAT, "No consumer with ID {session_id}"); } } @@ -3246,6 +3308,9 @@ impl ObjectImpl for WebRTCSink { fn constructed(&self) { self.parent_constructed(); + let signaller = self.state.lock().unwrap().signaller.clone(); + + self.connect_signaller(&signaller); let obj = self.obj(); obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE); @@ -3424,15 +3489,7 @@ impl ChildProxyImpl for WebRTCSink { fn child_by_name(&self, name: &str) -> Option { match name { - "signaller" => Some( - self.state - .lock() - .unwrap() - .signaller - .as_ref() - .as_ref() - .clone(), - ), + "signaller" => Some(self.state.lock().unwrap().signaller.clone().upcast()), _ => None, } } @@ -3463,7 +3520,7 @@ impl ObjectImpl for AwsKvsWebRTCSink { let element = self.obj(); let ws = element.upcast_ref::().imp(); - let _ = ws.set_signaller(Box::::default()); + let _ = ws.set_signaller(AwsKvsSignaller::default().upcast()); } } diff --git a/net/webrtc/src/webrtcsink/mod.rs b/net/webrtc/src/webrtcsink/mod.rs index 16afeb10..55508496 100644 --- a/net/webrtc/src/webrtcsink/mod.rs +++ b/net/webrtc/src/webrtcsink/mod.rs @@ -1,5 +1,6 @@ // SPDX-License-Identifier: MPL-2.0 +use crate::signaller::Signallable; /** * element-webrtcsink: * @@ -9,9 +10,9 @@ use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; -use std::error::Error; mod homegrown_cc; + mod imp; glib::wrapper! { @@ -42,101 +43,21 @@ pub enum WebRTCSinkError { }, } -pub trait Signallable: Sync + Send + 'static { - fn start(&mut self, element: &WebRTCSink) -> Result<(), Box>; - - fn handle_sdp( - &mut self, - element: &WebRTCSink, - session_id: &str, - sdp: &gst_webrtc::WebRTCSessionDescription, - ) -> Result<(), Box>; - - /// sdp_mid is exposed for future proofing, see - /// https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/issues/1174, - /// at the moment sdp_m_line_index will always be Some and sdp_mid will always - /// be None - fn handle_ice( - &mut self, - element: &WebRTCSink, - session_id: &str, - candidate: &str, - sdp_m_line_index: Option, - sdp_mid: Option, - ) -> Result<(), Box>; - - fn session_ended(&mut self, element: &WebRTCSink, session_id: &str); - - fn stop(&mut self, element: &WebRTCSink); +impl Default for WebRTCSink { + fn default() -> Self { + glib::Object::new() + } } -/// When providing a signaller, we expect it to both be a GObject -/// and be Signallable. This is arguably a bit strange, but exposing -/// a GInterface from rust is at the moment a bit awkward, so I went -/// for a rust interface for now. The reason the signaller needs to be -/// a GObject is to make its properties available through the GstChildProxy -/// interface. -pub trait SignallableObject: AsRef + Signallable {} - -impl + Signallable> SignallableObject for T {} - impl WebRTCSink { - pub fn with_signaller(signaller: Box) -> Self { - let ret = glib::Object::new::(); + pub fn with_signaller(signaller: Signallable) -> Self { + let ret: WebRTCSink = glib::Object::new(); let ws = ret.imp(); ws.set_signaller(signaller).unwrap(); ret } - - pub fn handle_sdp( - &self, - session_id: &str, - sdp: &gst_webrtc::WebRTCSessionDescription, - ) -> Result<(), WebRTCSinkError> { - let ws = self.imp(); - ws.handle_sdp(self, session_id, sdp) - } - - /// sdp_mid is exposed for future proofing, see - /// https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/issues/1174, - /// at the moment sdp_m_line_index must be Some - pub fn handle_ice( - &self, - session_id: &str, - sdp_m_line_index: Option, - sdp_mid: Option, - candidate: &str, - ) -> Result<(), WebRTCSinkError> { - let ws = self.imp(); - ws.handle_ice(self, session_id, sdp_m_line_index, sdp_mid, candidate) - } - - pub fn handle_signalling_error(&self, error: Box) { - let ws = self.imp(); - ws.handle_signalling_error(self, anyhow::anyhow!(error)); - } - - pub fn shutdown(&self) { - let ws = self.imp(); - ws.shutdown(self); - } - - pub fn start_session( - &self, - session_id: &str, - peer_id: &str, - offer: Option<&gst_webrtc::WebRTCSessionDescription>, - ) -> Result<(), WebRTCSinkError> { - let ws = self.imp(); - ws.start_session(self, session_id, peer_id, offer) - } - - pub fn end_session(&self, session_id: &str) -> Result<(), WebRTCSinkError> { - let ws = self.imp(); - ws.remove_session(self, session_id, false) - } } #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)] diff --git a/net/webrtc/src/webrtcsrc/imp.rs b/net/webrtc/src/webrtcsrc/imp.rs index 0330b57e..7821dd38 100644 --- a/net/webrtc/src/webrtcsrc/imp.rs +++ b/net/webrtc/src/webrtcsrc/imp.rs @@ -2,7 +2,7 @@ use gst::prelude::*; -use crate::webrtcsrc::signaller::{prelude::*, Signallable, Signaller}; +use crate::signaller::{prelude::*, Signallable, Signaller}; use crate::webrtcsrc::WebRTCSrcPad; use anyhow::{Context, Error}; use core::ops::Deref; diff --git a/net/webrtc/src/webrtcsrc/mod.rs b/net/webrtc/src/webrtcsrc/mod.rs index c869ebcd..955b2201 100644 --- a/net/webrtc/src/webrtcsrc/mod.rs +++ b/net/webrtc/src/webrtcsrc/mod.rs @@ -1,9 +1,4 @@ // SPDX-License-Identifier: MPL-2.0 - -use crate::webrtcsrc::signaller::WebRTCSignallerRole; -use gst::prelude::*; -use gst::{glib, prelude::StaticType}; - /** * element-webrtcsrc: * @@ -38,11 +33,11 @@ use gst::{glib, prelude::StaticType}; */ mod imp; mod pad; -pub mod signaller; -pub use signaller::{SignallableImpl, SignallableImplExt}; - -use self::signaller::Signallable; +use crate::signaller::Signallable; +use crate::signaller::WebRTCSignallerRole; +use gst::prelude::*; +use gst::{glib, prelude::StaticType}; glib::wrapper! { pub struct WebRTCSrc(ObjectSubclass) @extends gst::Bin, gst::Element, gst::Object, @implements gst::URIHandler, gst::ChildProxy; diff --git a/net/webrtc/src/webrtcsrc/signaller/imp.rs b/net/webrtc/src/webrtcsrc/signaller/imp.rs deleted file mode 100644 index 8f9715a1..00000000 --- a/net/webrtc/src/webrtcsrc/signaller/imp.rs +++ /dev/null @@ -1,584 +0,0 @@ -use crate::utils::{gvalue_to_json, serialize_json_object}; -use crate::webrtcsrc::signaller::{prelude::*, Signallable}; -use crate::RUNTIME; -use anyhow::{anyhow, Error}; -use async_tungstenite::tungstenite::Message as WsMessage; -use futures::channel::mpsc; -use futures::prelude::*; -use gst::glib; -use gst::glib::prelude::*; -use gst::subclass::prelude::*; -use gst_plugin_webrtc_protocol as p; -use once_cell::sync::Lazy; -use std::collections::HashSet; -use std::ops::ControlFlow; -use std::str::FromStr; -use std::sync::Mutex; -use std::time::Duration; -use tokio::{task, time::timeout}; -use url::Url; - -use super::CAT; - -#[derive(Debug, Eq, PartialEq, Clone, Copy, glib::Enum, Default)] -#[repr(u32)] -#[enum_type(name = "GstRSWebRTCSignallerRole")] -pub enum WebRTCSignallerRole { - #[default] - Consumer, - Producer, - Listener, -} - -pub struct Settings { - uri: Url, - producer_peer_id: Option, - cafile: Option, - role: WebRTCSignallerRole, -} - -impl Default for Settings { - fn default() -> Self { - Self { - uri: Url::from_str("ws://127.0.0.1:8443").unwrap(), - producer_peer_id: None, - cafile: Default::default(), - role: Default::default(), - } - } -} - -#[derive(Default)] -pub struct Signaller { - state: Mutex, - settings: Mutex, -} - -#[derive(Default)] -struct State { - /// Sender for the websocket messages - websocket_sender: Option>, - send_task_handle: Option>>, - receive_task_handle: Option>, - producers: HashSet, -} - -impl Signaller { - fn uri(&self) -> Url { - self.settings.lock().unwrap().uri.clone() - } - - fn set_uri(&self, uri: &str) -> Result<(), Error> { - let mut settings = self.settings.lock().unwrap(); - let mut uri = Url::from_str(uri).map_err(|err| anyhow!("{err:?}"))?; - - if let Some(peer_id) = uri - .query_pairs() - .find(|(k, _)| k == "peer-id") - .map(|v| v.1.to_string()) - { - if !matches!(settings.role, WebRTCSignallerRole::Consumer) { - gst::warning!( - CAT, - "Setting peer-id doesn't make sense for {:?}", - settings.role - ); - } else { - settings.producer_peer_id = Some(peer_id); - } - } - - if let Some(peer_id) = &settings.producer_peer_id { - uri.query_pairs_mut() - .clear() - .append_pair("peer-id", peer_id); - } - - settings.uri = uri; - - Ok(()) - } - - async fn connect(&self) -> Result<(), Error> { - let obj = self.obj(); - - let role = self.settings.lock().unwrap().role; - if let super::WebRTCSignallerRole::Consumer = role { - self.producer_peer_id() - .ok_or_else(|| anyhow!("No target producer peer id set"))?; - } - - let connector = if let Some(path) = obj.property::>("cafile") { - let cert = tokio::fs::read_to_string(&path).await?; - let cert = tokio_native_tls::native_tls::Certificate::from_pem(cert.as_bytes())?; - let mut connector_builder = tokio_native_tls::native_tls::TlsConnector::builder(); - let connector = connector_builder.add_root_certificate(cert).build()?; - Some(tokio_native_tls::TlsConnector::from(connector)) - } else { - None - }; - - let mut uri = self.uri(); - uri.set_query(None); - let (ws, _) = timeout( - // FIXME: Make the timeout configurable - Duration::from_secs(20), - async_tungstenite::tokio::connect_async_with_tls_connector(uri.to_string(), connector), - ) - .await??; - - gst::info!(CAT, imp: self, "connected"); - - // Channel for asynchronously sending out websocket message - let (mut ws_sink, mut ws_stream) = ws.split(); - - // 1000 is completely arbitrary, we simply don't want infinite piling - // up of messages as with unbounded - let (websocket_sender, mut websocket_receiver) = mpsc::channel::(1000); - let send_task_handle = - RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move { - while let Some(msg) = websocket_receiver.next().await { - gst::log!(CAT, "Sending websocket message {:?}", msg); - ws_sink - .send(WsMessage::Text(serde_json::to_string(&msg).unwrap())) - .await?; - } - - let msg = "Done sending"; - this.map_or_else(|| gst::info!(CAT, "{msg}"), - |this| gst::info!(CAT, imp: this, "{msg}") - ); - - ws_sink.send(WsMessage::Close(None)).await?; - ws_sink.close().await?; - - Ok::<(), Error>(()) - })); - - let obj = self.obj(); - let meta = - if let Some(meta) = obj.emit_by_name::>("request-meta", &[]) { - gvalue_to_json(&meta.to_value()) - } else { - None - }; - - let receive_task_handle = - RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move { - while let Some(msg) = tokio_stream::StreamExt::next(&mut ws_stream).await { - if let Some(ref this) = this { - if let ControlFlow::Break(_) = this.handle_message(msg, &meta) { - break; - } - } else { - break; - } - } - - let msg = "Stopped websocket receiving"; - this.map_or_else(|| gst::info!(CAT, "{msg}"), - |this| gst::info!(CAT, imp: this, "{msg}") - ); - })); - - let mut state = self.state.lock().unwrap(); - state.websocket_sender = Some(websocket_sender); - state.send_task_handle = Some(send_task_handle); - state.receive_task_handle = Some(receive_task_handle); - - Ok(()) - } - - fn set_status(&self, meta: &Option, peer_id: &str) { - let role = self.settings.lock().unwrap().role; - self.send(p::IncomingMessage::SetPeerStatus(match role { - super::WebRTCSignallerRole::Consumer => p::PeerStatus { - meta: meta.clone(), - peer_id: Some(peer_id.to_string()), - roles: vec![], - }, - super::WebRTCSignallerRole::Producer => p::PeerStatus { - meta: meta.clone(), - peer_id: Some(peer_id.to_string()), - roles: vec![p::PeerRole::Producer], - }, - super::WebRTCSignallerRole::Listener => p::PeerStatus { - meta: meta.clone(), - peer_id: Some(peer_id.to_string()), - roles: vec![p::PeerRole::Listener], - }, - })); - } - - fn producer_peer_id(&self) -> Option { - let settings = self.settings.lock().unwrap(); - - settings.producer_peer_id.clone() - } - - fn send(&self, msg: p::IncomingMessage) { - let state = self.state.lock().unwrap(); - if let Some(mut sender) = state.websocket_sender.clone() { - RUNTIME.spawn(glib::clone!(@weak self as this => async move { - if let Err(err) = sender.send(msg).await { - this.obj().emit_by_name::<()>("error", &[&format!("Error: {}", err)]); - } - })); - } - } - - pub fn start_session(&self) { - let role = self.settings.lock().unwrap().role; - if matches!(role, super::WebRTCSignallerRole::Consumer) { - let target_producer = self.producer_peer_id().unwrap(); - - self.send(p::IncomingMessage::StartSession(p::StartSessionMessage { - peer_id: target_producer.clone(), - })); - - gst::info!( - CAT, - imp: self, - "Started session with producer peer id {target_producer}", - ); - } - } - - fn handle_message( - &self, - msg: Result, - meta: &Option, - ) -> ControlFlow<()> { - match msg { - Ok(WsMessage::Text(msg)) => { - gst::trace!(CAT, imp: self, "Received message {}", msg); - - if let Ok(msg) = serde_json::from_str::(&msg) { - match msg { - p::OutgoingMessage::Welcome { peer_id } => { - self.set_status(meta, &peer_id); - self.start_session(); - } - p::OutgoingMessage::PeerStatusChanged(p::PeerStatus { - meta, - roles, - peer_id, - }) => { - let meta = meta.and_then(|m| match m { - serde_json::Value::Object(v) => Some(serialize_json_object(&v)), - _ => { - gst::error!(CAT, imp: self, "Invalid json value: {m:?}"); - None - } - }); - - let peer_id = - peer_id.expect("Status changed should always contain a peer ID"); - let mut state = self.state.lock().unwrap(); - if roles.iter().any(|r| matches!(r, p::PeerRole::Producer)) { - if !state.producers.contains(&peer_id) { - state.producers.insert(peer_id.clone()); - drop(state); - - self.obj() - .emit_by_name::<()>("producer-added", &[&peer_id, &meta]); - } - } else if state.producers.remove(&peer_id) { - drop(state); - - self.obj() - .emit_by_name::<()>("producer-removed", &[&peer_id, &meta]); - } - } - p::OutgoingMessage::SessionStarted { - peer_id, - session_id, - } => { - self.obj() - .emit_by_name::<()>("session-started", &[&session_id, &peer_id]); - } - p::OutgoingMessage::StartSession { - session_id, - peer_id, - } => { - assert!(matches!( - self.obj().property::("role"), - super::WebRTCSignallerRole::Producer - )); - - self.obj() - .emit_by_name::<()>("session-requested", &[&session_id, &peer_id]); - } - p::OutgoingMessage::EndSession(p::EndSessionMessage { session_id }) => { - gst::info!(CAT, imp: self, "Session {session_id} ended"); - - self.obj() - .emit_by_name::<()>("session-ended", &[&session_id]); - } - p::OutgoingMessage::Peer(p::PeerMessage { - session_id, - peer_message, - }) => match peer_message { - p::PeerMessageInner::Sdp(reply) => { - let (sdp, desc_type) = match reply { - p::SdpMessage::Answer { sdp } => { - (sdp, gst_webrtc::WebRTCSDPType::Answer) - } - p::SdpMessage::Offer { sdp } => { - (sdp, gst_webrtc::WebRTCSDPType::Offer) - } - }; - let sdp = match gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()) { - Ok(sdp) => sdp, - Err(err) => { - self.obj().emit_by_name::<()>( - "error", - &[&format!("Error parsing SDP: {sdp} {err:?}")], - ); - - return ControlFlow::Break(()); - } - }; - - let desc = - gst_webrtc::WebRTCSessionDescription::new(desc_type, sdp); - self.obj().emit_by_name::<()>( - "session-description", - &[&session_id, &desc], - ); - } - p::PeerMessageInner::Ice { - candidate, - sdp_m_line_index, - } => { - let sdp_mid: Option = None; - self.obj().emit_by_name::<()>( - "handle-ice", - &[&session_id, &sdp_m_line_index, &sdp_mid, &candidate], - ); - } - }, - p::OutgoingMessage::Error { details } => { - self.obj().emit_by_name::<()>( - "error", - &[&format!("Error message from server: {details}")], - ); - } - _ => { - gst::warning!(CAT, imp: self, "Ignoring unsupported message {:?}", msg); - } - } - } else { - gst::error!(CAT, imp: self, "Unknown message from server: {}", msg); - - self.obj().emit_by_name::<()>( - "error", - &[&format!("Unknown message from server: {}", msg)], - ); - } - } - Ok(WsMessage::Close(reason)) => { - gst::info!(CAT, imp: self, "websocket connection closed: {:?}", reason); - return ControlFlow::Break(()); - } - Ok(_) => (), - Err(err) => { - self.obj() - .emit_by_name::<()>("error", &[&format!("Error receiving: {}", err)]); - return ControlFlow::Break(()); - } - } - ControlFlow::Continue(()) - } -} - -#[glib::object_subclass] -impl ObjectSubclass for Signaller { - const NAME: &'static str = "GstWebRTCSignaller"; - type Type = super::Signaller; - type ParentType = glib::Object; - type Interfaces = (Signallable,); -} - -impl ObjectImpl for Signaller { - fn properties() -> &'static [glib::ParamSpec] { - static PROPS: Lazy> = Lazy::new(|| { - vec![ - glib::ParamSpecString::builder("uri") - .flags(glib::ParamFlags::READWRITE) - .build(), - glib::ParamSpecString::builder("producer-peer-id") - .flags(glib::ParamFlags::READWRITE) - .build(), - glib::ParamSpecString::builder("cafile") - .flags(glib::ParamFlags::READWRITE) - .build(), - glib::ParamSpecEnum::builder_with_default("role", WebRTCSignallerRole::Consumer) - .flags(glib::ParamFlags::READWRITE) - .build(), - ] - }); - - PROPS.as_ref() - } - - fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { - match pspec.name() { - "uri" => { - if let Err(e) = self.set_uri(value.get::<&str>().expect("type checked upstream")) { - gst::error!(CAT, "Couldn't set URI: {e:?}"); - } - } - "producer-peer-id" => { - let mut settings = self.settings.lock().unwrap(); - - if !matches!(settings.role, WebRTCSignallerRole::Consumer) { - gst::warning!( - CAT, - "Setting `producer-peer-id` doesn't make sense for {:?}", - settings.role - ); - } else { - settings.producer_peer_id = value - .get::>() - .expect("type checked upstream"); - } - } - "cafile" => { - self.settings.lock().unwrap().cafile = value - .get::>() - .expect("type checked upstream") - } - "role" => { - self.settings.lock().unwrap().role = value - .get::() - .expect("type checked upstream") - } - _ => unimplemented!(), - } - } - - fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { - let settings = self.settings.lock().unwrap(); - match pspec.name() { - "uri" => settings.uri.to_string().to_value(), - "producer-peer-id" => { - if !matches!(settings.role, WebRTCSignallerRole::Consumer) { - gst::warning!( - CAT, - "`producer-peer-id` doesn't make sense for {:?}", - settings.role - ); - } - - settings.producer_peer_id.to_value() - } - "cafile" => settings.cafile.to_value(), - "role" => settings.role.to_value(), - _ => unimplemented!(), - } - } -} - -impl SignallableImpl for Signaller { - fn start(&self) { - gst::info!(CAT, imp: self, "Starting"); - RUNTIME.spawn(glib::clone!(@weak self as this => async move { - if let Err(err) = this.connect().await { - this.obj().emit_by_name::<()>("error", &[&format!("Error receiving: {}", err)]); - } - })); - } - - fn stop(&self) { - gst::info!(CAT, imp: self, "Stopping now"); - - let mut state = self.state.lock().unwrap(); - let send_task_handle = state.send_task_handle.take(); - let receive_task_handle = state.receive_task_handle.take(); - if let Some(mut sender) = state.websocket_sender.take() { - RUNTIME.block_on(async move { - sender.close_channel(); - - if let Some(handle) = send_task_handle { - if let Err(err) = handle.await { - gst::warning!(CAT, imp: self, "Error while joining send task: {}", err); - } - } - - if let Some(handle) = receive_task_handle { - if let Err(err) = handle.await { - gst::warning!(CAT, imp: self, "Error while joining receive task: {}", err); - } - } - }); - } - } - - fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) { - gst::debug!(CAT, imp: self, "Sending SDP {sdp:#?}"); - - let role = self.settings.lock().unwrap().role; - let is_consumer = matches!(role, super::WebRTCSignallerRole::Consumer); - - let msg = p::IncomingMessage::Peer(p::PeerMessage { - session_id: session_id.to_owned(), - peer_message: p::PeerMessageInner::Sdp(if is_consumer { - p::SdpMessage::Answer { - sdp: sdp.sdp().as_text().unwrap(), - } - } else { - p::SdpMessage::Offer { - sdp: sdp.sdp().as_text().unwrap(), - } - }), - }); - - self.send(msg); - } - - fn add_ice( - &self, - session_id: &str, - candidate: &str, - sdp_m_line_index: Option, - _sdp_mid: Option, - ) { - gst::debug!( - CAT, - imp: self, - "Adding ice candidate {candidate:?} for {sdp_m_line_index:?} on session {session_id}" - ); - - let msg = p::IncomingMessage::Peer(p::PeerMessage { - session_id: session_id.to_string(), - peer_message: p::PeerMessageInner::Ice { - candidate: candidate.to_string(), - sdp_m_line_index: sdp_m_line_index.unwrap(), - }, - }); - - self.send(msg); - } - - fn end_session(&self, session_id: &str) { - gst::debug!(CAT, imp: self, "Signalling session done {}", session_id); - - let state = self.state.lock().unwrap(); - let session_id = session_id.to_string(); - if let Some(mut sender) = state.websocket_sender.clone() { - RUNTIME.spawn(glib::clone!(@weak self as this => async move { - if let Err(err) = sender - .send(p::IncomingMessage::EndSession(p::EndSessionMessage { - session_id, - })) - .await - { - this.obj().emit_by_name::<()>("error", &[&format!("Error: {}", err)]); - } - })); - } - } -} - -impl GstObjectImpl for Signaller {} diff --git a/net/webrtc/src/webrtcsrc/signaller/mod.rs b/net/webrtc/src/webrtcsrc/signaller/mod.rs deleted file mode 100644 index ef337be3..00000000 --- a/net/webrtc/src/webrtcsrc/signaller/mod.rs +++ /dev/null @@ -1,46 +0,0 @@ -mod iface; -mod imp; -use gst::glib; - -use once_cell::sync::Lazy; -// Expose traits and objects from the module itself so it exactly looks like -// generated bindings -pub use imp::WebRTCSignallerRole; -pub mod prelude { - pub use {super::SignallableExt, super::SignallableImpl}; -} - -pub static CAT: Lazy = Lazy::new(|| { - gst::DebugCategory::new( - "webrtcsrc-signaller", - gst::DebugColorFlags::empty(), - Some("WebRTC src signaller"), - ) -}); - -glib::wrapper! { - pub struct Signallable(ObjectInterface); -} - -glib::wrapper! { - pub struct Signaller(ObjectSubclass ) @implements Signallable; -} - -impl Default for Signaller { - fn default() -> Self { - glib::Object::builder().build() - } -} - -impl Signaller { - pub fn new(mode: WebRTCSignallerRole) -> Self { - glib::Object::builder().property("role", &mode).build() - } -} - -pub use iface::SignallableExt; -pub use iface::SignallableImpl; -pub use iface::SignallableImplExt; - -unsafe impl Send for Signallable {} -unsafe impl Sync for Signallable {}