// SPDX-License-Identifier: MPL-2.0 use crate::signaller::{Signallable, SignallableImpl, WebRTCSignallerRole}; use crate::utils::{wait_async, WaitError}; use crate::RUNTIME; use anyhow::anyhow; use futures::executor::block_on; use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::str::FromStr; use std::sync::{Arc, Mutex}; use tokio::sync::oneshot; use tokio::task::JoinHandle; use livekit_api::access_token::{AccessToken, VideoGrants}; use livekit_api::signal_client; use livekit_protocol as proto; static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( "webrtc-livekit-signaller", gst::DebugColorFlags::empty(), Some("WebRTC LiveKit signaller"), ) }); const DEFAULT_TRACK_PUBLISH_TIMEOUT: u32 = 10; #[derive(Clone)] struct Settings { wsurl: Option, api_key: Option, secret_key: Option, participant_name: Option, identity: Option, room_name: Option, auth_token: Option, role: WebRTCSignallerRole, producer_peer_id: Option, excluded_produder_peer_ids: Vec, timeout: u32, } impl Default for Settings { fn default() -> Self { Self { wsurl: Some("ws://127.0.0.1:7880".to_string()), api_key: None, secret_key: None, participant_name: Some("GStreamer".to_string()), identity: Some("gstreamer".to_string()), room_name: None, auth_token: None, role: WebRTCSignallerRole::default(), producer_peer_id: None, excluded_produder_peer_ids: vec![], timeout: DEFAULT_TRACK_PUBLISH_TIMEOUT, } } } #[derive(Default)] pub struct Signaller { settings: Mutex, connection: Mutex>, join_canceller: Mutex>, signal_task_canceller: Mutex>, } struct Channels { reliable_channel: gst_webrtc::WebRTCDataChannel, lossy_channel: gst_webrtc::WebRTCDataChannel, } struct Connection { signal_client: Arc, pending_tracks: HashMap>, signal_task: JoinHandle<()>, early_candidates: Option>, channels: Option, } #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct IceCandidateJson { pub sdp_mid: String, pub sdp_m_line_index: i32, pub candidate: String, } impl Signaller { fn raise_error(&self, msg: String) { self.obj() .emit_by_name::<()>("error", &[&format!("Error: {msg}")]); } fn role(&self) -> Option { self.settings.lock().map(|s| s.role).ok() } fn is_subscriber(&self) -> bool { matches!(self.role(), Some(WebRTCSignallerRole::Consumer)) } fn producer_peer_id(&self) -> Option { assert!(self.is_subscriber()); let settings = self.settings.lock().ok()?; settings.producer_peer_id.clone() } fn auto_subscribe(&self) -> bool { self.is_subscriber() && self.producer_peer_id().is_none() && self.excluded_producer_peer_ids_is_empty() } fn signal_target(&self) -> Option { match self.role()? { WebRTCSignallerRole::Consumer => Some(proto::SignalTarget::Subscriber), WebRTCSignallerRole::Producer => Some(proto::SignalTarget::Publisher), _ => None, } } fn excluded_producer_peer_ids_is_empty(&self) -> bool { assert!(self.is_subscriber()); self.settings .lock() .unwrap() .excluded_produder_peer_ids .is_empty() } fn is_peer_excluded(&self, peer_id: &str) -> bool { self.settings .lock() .unwrap() .excluded_produder_peer_ids .iter() .any(|id| id == peer_id) } fn signal_client(&self) -> Option> { let connection = self.connection.lock().unwrap(); Some(connection.as_ref()?.signal_client.clone()) } fn require_signal_client(&self) -> Arc { self.signal_client().unwrap() } async fn send_trickle_request(&self, candidate_init: &str) { let Some(signal_client) = self.signal_client() else { return; }; let Some(target) = self.signal_target() else { return; }; signal_client .send(proto::signal_request::Message::Trickle( proto::TrickleRequest { candidate_init: candidate_init.to_string(), target: target as i32, }, )) .await; } async fn send_delayed_ice_candidates(&self) { let Some(mut early_candidates) = self .connection .lock() .unwrap() .as_mut() .and_then(|c| c.early_candidates.take()) else { return; }; while let Some(candidate_str) = early_candidates.pop() { gst::debug!( CAT, imp: self, "Sending delayed ice candidate {candidate_str:?}" ); self.send_trickle_request(&candidate_str).await; } } async fn signal_task(&self, mut signal_events: signal_client::SignalEvents) { loop { match wait_async(&self.signal_task_canceller, signal_events.recv(), 0).await { Ok(Some(signal)) => match signal { signal_client::SignalEvent::Message(signal) => { self.on_signal_event(*signal).await; } signal_client::SignalEvent::Close(reason) => { gst::debug!(CAT, imp: self, "Close: {reason}"); self.raise_error("Server disconnected".to_string()); break; } }, Ok(None) => {} Err(err) => match err { WaitError::FutureAborted => { gst::debug!(CAT, imp: self, "Closing signal_task"); break; } WaitError::FutureError(err) => self.raise_error(err.to_string()), }, } } } async fn on_signal_event(&self, event: proto::signal_response::Message) { match event { proto::signal_response::Message::Answer(answer) => { gst::debug!(CAT, imp: self, "Received publisher answer: {:?}", answer); let sdp = match gst_sdp::SDPMessage::parse_buffer(answer.sdp.as_bytes()) { Ok(sdp) => sdp, Err(_) => { self.raise_error("Couldn't parse Answer SDP".to_string()); return; } }; let answer = gst_webrtc::WebRTCSessionDescription::new( gst_webrtc::WebRTCSDPType::Answer, sdp, ); self.obj() .emit_by_name::<()>("session-description", &[&"unique", &answer]); } proto::signal_response::Message::Offer(offer) => { if !self.is_subscriber() { gst::warning!(CAT, imp: self, "Ignoring subscriber offer in non-subscriber mode: {:?}", offer); return; } gst::debug!(CAT, imp: self, "Received subscriber offer: {:?}", offer); let sdp = match gst_sdp::SDPMessage::parse_buffer(offer.sdp.as_bytes()) { Ok(sdp) => sdp, Err(_) => { self.raise_error("Couldn't parse Offer SDP".to_string()); return; } }; let offer = gst_webrtc::WebRTCSessionDescription::new( gst_webrtc::WebRTCSDPType::Offer, sdp, ); self.obj() .emit_by_name::<()>("session-description", &[&"unique", &offer]); } proto::signal_response::Message::Trickle(trickle) => { gst::debug!(CAT, imp: self, "Received ice_candidate {:?}", trickle); let Some(target) = self.signal_target() else { return; }; if target == trickle.target() { if let Ok(json) = serde_json::from_str::(&trickle.candidate_init) { let mline = json.sdp_m_line_index as u32; self.obj().emit_by_name::<()>( "handle-ice", &[&"unique", &mline, &Some(json.sdp_mid), &json.candidate], ); } } } proto::signal_response::Message::ConnectionQuality(quality) => { gst::debug!(CAT, imp: self, "Connection quality: {:?}", quality); } proto::signal_response::Message::TrackPublished(publish_res) => { gst::debug!(CAT, imp: self, "Track published: {:?}", publish_res); if let Some(connection) = &mut *self.connection.lock().unwrap() { if let Some(tx) = connection.pending_tracks.remove(&publish_res.cid) { let _ = tx.send(publish_res.track.unwrap()); } } } proto::signal_response::Message::Update(update) => { if !self.is_subscriber() { gst::trace!(CAT, imp: self, "Ignoring update in non-subscriber mode: {:?}", update); return; } gst::debug!(CAT, imp: self, "Update: {:?}", update); for participant in update.participants { self.on_participant(&participant, true) } } proto::signal_response::Message::Leave(leave) => { gst::debug!(CAT, imp: self, "Leave: {:?}", leave); } _ => {} } } fn send_sdp_answer(&self, _session_id: &str, sessdesc: &gst_webrtc::WebRTCSessionDescription) { let weak_imp = self.downgrade(); let sessdesc = sessdesc.clone(); RUNTIME.spawn(async move { if let Some(imp) = weak_imp.upgrade() { let sdp = sessdesc.sdp(); gst::debug!(CAT, imp: imp, "Sending SDP {:?} now", &sdp); let signal_client = imp.require_signal_client(); signal_client .send(proto::signal_request::Message::Answer( proto::SessionDescription { r#type: "answer".to_string(), sdp: sdp.to_string(), }, )) .await; imp.send_delayed_ice_candidates().await; } }); } fn send_sdp_offer(&self, _session_id: &str, sessdesc: &gst_webrtc::WebRTCSessionDescription) { let weak_imp = self.downgrade(); let sessdesc = sessdesc.clone(); RUNTIME.spawn(async move { if let Some(imp) = weak_imp.upgrade() { let sdp = sessdesc.sdp(); let signal_client = imp.require_signal_client(); let timeout = imp.settings.lock().unwrap().timeout; for media in sdp.medias() { if let Some(mediatype) = media.media() { let (mtype, msource) = if mediatype == "audio" { ( proto::TrackType::Audio, proto::TrackSource::Microphone as i32, ) } else if mediatype == "video" { (proto::TrackType::Video, proto::TrackSource::Camera as i32) } else { continue; }; let mut disable_red = true; if mtype == proto::TrackType::Audio { for format in media.formats() { if let Ok(pt) = format.parse::() { if let Some(caps) = media.caps_from_media(pt) { let s = caps.structure(0).unwrap(); let encoding_name = s.get::<&str>("encoding-name").unwrap(); if encoding_name == "RED" { disable_red = false; } } } } } // Our SDP should always have a mid let mid = media.attribute_val("mid").unwrap().to_string(); let mut trackid = ""; for attr in media.attributes() { if attr.key() == "ssrc" { if let Some(val) = attr.value() { let split: Vec<&str> = val.split_whitespace().collect(); if split.len() == 3 && split[1].starts_with("msid:") { trackid = split[2]; break; } } } } let layers = if mtype == proto::TrackType::Video { vec![proto::VideoLayer { quality: proto::VideoQuality::High as i32, ..Default::default() }] } else { vec![] }; let req = proto::AddTrackRequest { cid: trackid.to_string(), name: mid.clone(), r#type: mtype as i32, muted: false, source: msource, disable_dtx: true, disable_red, layers, ..Default::default() }; let (tx, rx) = oneshot::channel(); if let Some(connection) = &mut *imp.connection.lock().unwrap() { let pendings_tracks = &mut connection.pending_tracks; if pendings_tracks.contains_key(&req.cid) { panic!("track already published"); } pendings_tracks.insert(req.cid.clone(), tx); } let cid = req.cid.clone(); signal_client .send(proto::signal_request::Message::AddTrack(req)) .await; if let Err(err) = wait_async(&imp.join_canceller, rx, timeout).await { if let Some(connection) = &mut *imp.connection.lock().unwrap() { connection.pending_tracks.remove(&cid); } match err { WaitError::FutureAborted => { gst::warning!(CAT, imp: imp, "Future aborted") } WaitError::FutureError(err) => imp.raise_error(err.to_string()), }; } } } gst::debug!(CAT, imp: imp, "Sending SDP now"); signal_client .send(proto::signal_request::Message::Offer( proto::SessionDescription { r#type: "offer".to_string(), sdp: sessdesc.sdp().to_string(), }, )) .await; if let Some(imp) = weak_imp.upgrade() { imp.send_delayed_ice_candidates().await; } } }); } fn on_participant(&self, participant: &proto::ParticipantInfo, new_connection: bool) { gst::debug!(CAT, imp: self, "{:?}", participant); if !participant.is_publisher { return; } let peer_sid = &participant.sid; let peer_identity = &participant.identity; match self.producer_peer_id() { Some(id) if id == *peer_sid => { gst::debug!(CAT, imp: self, "matching peer sid {id:?}"); } Some(id) if id == *peer_identity => { gst::debug!(CAT, imp: self, "matching peer identity {id:?}"); } None => { if self.is_peer_excluded(peer_sid) || self.is_peer_excluded(peer_identity) { gst::debug!(CAT, imp: self, "ignoring excluded peer {participant:?}"); return; } gst::debug!(CAT, imp: self, "catch-all mode, matching {participant:?}"); } _ => return, } let meta = Some(&participant.metadata) .filter(|meta| !meta.is_empty()) .and_then(|meta| gst::Structure::from_str(meta).ok()); match participant.state { x if x == proto::participant_info::State::Active as i32 => { let track_sids = participant .tracks .iter() .filter(|t| !t.muted) .map(|t| t.sid.clone()) .collect::>(); let update = proto::UpdateSubscription { track_sids: track_sids.clone(), subscribe: true, participant_tracks: vec![proto::ParticipantTracks { participant_sid: participant.sid.clone(), track_sids: track_sids.clone(), }], }; let update = proto::signal_request::Message::Subscription(update); let weak_imp = self.downgrade(); let peer_sid = peer_sid.clone(); RUNTIME.spawn(async move { let imp = match weak_imp.upgrade() { Some(imp) => imp, None => return, }; let signal_client = imp.require_signal_client(); signal_client.send(update).await; imp.obj() .emit_by_name::<()>("producer-added", &[&peer_sid, &meta, &new_connection]); }); } _ => { self.obj() .emit_by_name::<()>("producer-removed", &[&peer_sid, &meta]); } } } async fn close_signal_client(signal_client: &signal_client::SignalClient) { signal_client .send(proto::signal_request::Message::Leave(proto::LeaveRequest { can_reconnect: false, reason: proto::DisconnectReason::ClientInitiated as i32, ..Default::default() })) .await; signal_client.close().await; } } impl SignallableImpl for Signaller { fn start(&self) { gst::debug!(CAT, imp: self, "Connecting"); let wsurl = if let Some(wsurl) = &self.settings.lock().unwrap().wsurl { wsurl.clone() } else { self.raise_error("WebSocket URL must be set".to_string()); return; }; let auth_token = { let settings = self.settings.lock().unwrap(); let role = settings.role; if let Some(auth_token) = &settings.auth_token { auth_token.clone() } else if let ( Some(api_key), Some(secret_key), Some(identity), Some(participant_name), Some(room_name), ) = ( &settings.api_key, &settings.secret_key, &settings.identity, &settings.participant_name, &settings.room_name, ) { let grants = VideoGrants { room_join: true, can_subscribe: role == WebRTCSignallerRole::Consumer, room: room_name.clone(), ..Default::default() }; let access_token = AccessToken::with_api_key(api_key, secret_key) .with_name(participant_name) .with_identity(identity) .with_grants(grants); match access_token.to_jwt() { Ok(token) => token, Err(err) => { self.raise_error(format!( "{:?}", anyhow!("Could not create auth token {err}") )); return; } } } else { self.raise_error("Either auth-token or (api-key and secret-key and identity and room-name) must be set".to_string()); return; } }; gst::debug!(CAT, imp: self, "We have an authentication token"); let weak_imp = self.downgrade(); RUNTIME.spawn(async move { let imp = if let Some(imp) = weak_imp.upgrade() { imp } else { return; }; let options = signal_client::SignalOptions { auto_subscribe: imp.auto_subscribe(), ..Default::default() }; gst::debug!(CAT, imp: imp, "Connecting to {}", wsurl); let res = signal_client::SignalClient::connect(&wsurl, &auth_token, options).await; let (signal_client, join_response, signal_events) = match res { Err(err) => { imp.obj() .emit_by_name::<()>("error", &[&format!("{:?}", anyhow!("Error: {err}"))]); return; } Ok(ok) => ok, }; let signal_client = Arc::new(signal_client); gst::debug!( CAT, imp: imp, "Connected with JoinResponse: {:?}", join_response ); let weak_imp = imp.downgrade(); let signal_task = RUNTIME.spawn(async move { if let Some(imp) = weak_imp.upgrade() { imp.signal_task(signal_events).await; } }); if imp.is_subscriber() { imp.obj() .emit_by_name::<()>("session-started", &[&"unique", &"unique"]); for participant in &join_response.other_participants { imp.on_participant(participant, false) } } let weak_imp = imp.downgrade(); imp.obj().connect_closure( "webrtcbin-ready", false, glib::closure!(|_signaller: &super::LiveKitSignaller, _consumer_identifier: &str, webrtcbin: &gst::Element| { gst::info!(CAT, "Adding data channels"); let reliable_channel = webrtcbin.emit_by_name::( "create-data-channel", &[ &"_reliable", &gst::Structure::builder("config") .field("ordered", true) .build(), ], ); let lossy_channel = webrtcbin.emit_by_name::( "create-data-channel", &[ &"_lossy", &gst::Structure::builder("config") .field("ordered", true) .field("max-retransmits", 0) .build(), ], ); if let Some(imp) = weak_imp.upgrade() { let mut connection = imp.connection.lock().unwrap(); if let Some(connection) = connection.as_mut() { connection.channels = Some(Channels { reliable_channel, lossy_channel, }); } } }), ); let connection = Connection { signal_client, signal_task, pending_tracks: Default::default(), early_candidates: Some(Vec::new()), channels: None, }; if let Ok(mut sc) = imp.connection.lock() { *sc = Some(connection); } imp.obj().emit_by_name::<()>( "session-requested", &[ &"unique", &"unique", &None::, ], ); }); } fn send_sdp(&self, session_id: &str, sessdesc: &gst_webrtc::WebRTCSessionDescription) { gst::debug!(CAT, imp: self, "Created SDP {:?}", sessdesc.sdp()); match sessdesc.type_() { gst_webrtc::WebRTCSDPType::Offer => { self.send_sdp_offer(session_id, sessdesc); } gst_webrtc::WebRTCSDPType::Answer => { self.send_sdp_answer(session_id, sessdesc); } _ => { gst::debug!(CAT, imp: self, "Ignoring SDP {:?}", sessdesc.sdp()); } } } fn add_ice( &self, _session_id: &str, candidate: &str, sdp_m_line_index: u32, sdp_mid: Option, ) { let candidate_str = serde_json::to_string(&IceCandidateJson { sdp_mid: sdp_mid.unwrap_or("".to_string()), sdp_m_line_index: sdp_m_line_index as i32, candidate: candidate.to_string(), }) .unwrap(); if let Some(connection) = &mut *self.connection.lock().unwrap() { if let Some(early_candidates) = connection.early_candidates.as_mut() { gst::debug!(CAT, imp: self, "Delaying ice candidate {candidate_str:?}"); early_candidates.push(candidate_str); return; } }; gst::debug!(CAT, imp: self, "Sending ice candidate {candidate_str:?}"); let imp = self.downgrade(); RUNTIME.spawn(async move { if let Some(imp) = imp.upgrade() { imp.send_trickle_request(&candidate_str).await; }; }); } fn stop(&self) { if let Some(canceller) = &*self.join_canceller.lock().unwrap() { canceller.abort(); } if let Some(canceller) = &*self.signal_task_canceller.lock().unwrap() { canceller.abort(); } if let Some(connection) = self.connection.lock().unwrap().take() { block_on(connection.signal_task).unwrap(); block_on(Self::close_signal_client(&connection.signal_client)); } } fn end_session(&self, session_id: &str) { assert_eq!(session_id, "unique"); } } #[glib::object_subclass] impl ObjectSubclass for Signaller { const NAME: &'static str = "GstLiveKitWebRTCSinkSignaller"; type Type = super::LiveKitSignaller; type ParentType = glib::Object; type Interfaces = (Signallable,); } impl ObjectImpl for Signaller { fn properties() -> &'static [glib::ParamSpec] { static PROPERTIES: Lazy> = Lazy::new(|| { vec![ glib::ParamSpecString::builder("ws-url") .nick("WebSocket URL") .blurb("The URL of the websocket of the LiveKit server") .mutable_ready() .build(), glib::ParamSpecString::builder("api-key") .nick("API key") .blurb("API key (combined into auth-token)") .mutable_ready() .build(), glib::ParamSpecString::builder("secret-key") .nick("Secret Key") .blurb("Secret key (combined into auth-token)") .mutable_ready() .build(), glib::ParamSpecString::builder("participant-name") .nick("Participant name") .blurb("Human readable name of the participant (combined into auth-token)") .mutable_ready() .build(), glib::ParamSpecString::builder("identity") .nick("Participant Identity") .blurb("Identity of the participant (combined into auth-token)") .mutable_ready() .build(), glib::ParamSpecString::builder("auth-token") .nick("Authorization Token") .blurb("Authentication token to use (contains api_key/secret/name/identity)") .mutable_ready() .build(), glib::ParamSpecString::builder("room-name") .nick("Room Name") .blurb("Name of the room to join (mandatory)") .mutable_ready() .build(), glib::ParamSpecUInt::builder("timeout") .nick("Timeout") .blurb("Value in seconds to timeout join requests.") .maximum(3600) .minimum(1) .default_value(DEFAULT_TRACK_PUBLISH_TIMEOUT) .build(), glib::ParamSpecObject::builder::("reliable-channel") .nick("Reliable Channel") .blurb("Reliable Data Channel object.") .flags(glib::ParamFlags::READABLE) .build(), glib::ParamSpecObject::builder::("lossy-channel") .nick("Lossy Channel") .blurb("Lossy Data Channel object.") .flags(glib::ParamFlags::READABLE) .build(), glib::ParamSpecEnum::builder_with_default("role", WebRTCSignallerRole::default()) .nick("Sigaller Role") .blurb("Whether this signaller acts as either a Consumer or Producer. Listener is not currently supported.") .flags(glib::ParamFlags::READWRITE) .build(), glib::ParamSpecString::builder("producer-peer-id") .nick("Producer Peer ID") .blurb("When in Consumer Role, the signaller will subscribe to this peer's tracks.") .flags(glib::ParamFlags::READWRITE) .build(), gst::ParamSpecArray::builder("excluded-producer-peer-ids") .nick("Excluded Producer Peer IDs") .blurb("When in Consumer Role, the signaller will not subscribe to these peers' tracks.") .flags(glib::ParamFlags::READWRITE) .element_spec(&glib::ParamSpecString::builder("producer-peer-id").build()) .build(), ] }); PROPERTIES.as_ref() } fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { let mut settings = self.settings.lock().unwrap(); match pspec.name() { "ws-url" => { settings.wsurl = value.get().unwrap(); } "api-key" => { settings.api_key = value.get().unwrap(); } "secret-key" => { settings.secret_key = value.get().unwrap(); } "participant-name" => { settings.participant_name = value.get().unwrap(); } "identity" => { settings.identity = value.get().unwrap(); } "room-name" => { settings.room_name = value.get().unwrap(); } "auth-token" => { settings.auth_token = value.get().unwrap(); } "timeout" => { settings.timeout = value.get().unwrap(); } "role" => settings.role = value.get().unwrap(), "producer-peer-id" => settings.producer_peer_id = value.get().unwrap(), "excluded-producer-peer-ids" => { settings.excluded_produder_peer_ids = value .get::() .expect("type checked upstream") .as_slice() .iter() .filter_map(|id| id.get::<&str>().ok()) .map(|id| id.to_string()) .collect::>() } _ => unimplemented!(), } } fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { let settings = self.settings.lock().unwrap(); match pspec.name() { "ws-url" => settings.wsurl.to_value(), "api-key" => settings.api_key.to_value(), "secret-key" => settings.secret_key.to_value(), "participant-name" => settings.participant_name.to_value(), "identity" => settings.identity.to_value(), "room-name" => settings.room_name.to_value(), "auth-token" => settings.auth_token.to_value(), "timeout" => settings.timeout.to_value(), channel @ ("reliable-channel" | "lossy-channel") => { let channel = if let Some(connection) = &*self.connection.lock().unwrap() { if let Some(channels) = &connection.channels { if channel == "reliable-channel" { Some(channels.reliable_channel.clone()) } else { Some(channels.lossy_channel.clone()) } } else { None } } else { None }; channel.to_value() } "role" => settings.role.to_value(), "producer-peer-id" => settings.producer_peer_id.to_value(), "excluded-producer-peer-ids" => { gst::Array::new(&settings.excluded_produder_peer_ids).to_value() } _ => unimplemented!(), } } }