From 80b58f3b45d2c3adee5684888937a3aa30e30cd7 Mon Sep 17 00:00:00 2001 From: Eva Pace Date: Mon, 16 Oct 2023 19:16:52 +0300 Subject: [PATCH] net/webrtc/janusvr: add JanusVRWebRTCSink plugin/signaller The JanusVRWebRTCSink is a new plugin that integrates with the Video Room plugin of the Janus Gateway, which simplifies WebRTC communication. Part-of: --- Cargo.lock | 2 + docs/plugins/gst_plugins_cache.json | 31 + net/webrtc/Cargo.toml | 2 + net/webrtc/src/janusvr_signaller/imp.rs | 726 ++++++++++++++++++++++++ net/webrtc/src/janusvr_signaller/mod.rs | 19 + net/webrtc/src/lib.rs | 1 + net/webrtc/src/signaller/imp.rs | 4 +- net/webrtc/src/webrtcsink/imp.rs | 41 ++ net/webrtc/src/webrtcsink/mod.rs | 59 ++ 9 files changed, 882 insertions(+), 3 deletions(-) create mode 100644 net/webrtc/src/janusvr_signaller/imp.rs create mode 100644 net/webrtc/src/janusvr_signaller/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 275de9b4..62345751 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2816,10 +2816,12 @@ dependencies = [ "gstreamer-video", "gstreamer-webrtc", "http 0.2.11", + "http 1.0.0", "human_bytes", "livekit-api", "livekit-protocol", "parse_link_header", + "rand", "regex", "reqwest", "serde", diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index cf55efd7..01f07bdd 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -6609,6 +6609,37 @@ }, "rank": "none" }, + "janusvrwebrtcsink": { + "author": "Eva Pace ", + "description": "WebRTC sink with Janus Video Room signaller", + "hierarchy": [ + "GstJanusVRWebRTCSink", + "GstBaseWebRTCSink", + "GstBin", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "interfaces": [ + "GstChildProxy", + "GstNavigation" + ], + "klass": "Sink/Network/WebRTC", + "pad-templates": { + "audio_%%u": { + "caps": "audio/x-raw:\naudio/x-opus:\n", + "direction": "sink", + "presence": "request" + }, + "video_%%u": { + "caps": "video/x-raw:\n\nvideo/x-raw(memory:CUDAMemory):\n\nvideo/x-raw(memory:GLMemory):\n\nvideo/x-raw(memory:NVMM):\n\nvideo/x-raw(memory:D3D11Memory):\nvideo/x-vp8:\nvideo/x-h264:\nvideo/x-vp9:\nvideo/x-h265:\n", + "direction": "sink", + "presence": "request" + } + }, + "rank": "none" + }, "livekitwebrtcsink": { "author": "Olivier CrĂȘte ", "description": "WebRTC sink with LiveKit signaller", diff --git a/net/webrtc/Cargo.toml b/net/webrtc/Cargo.toml index a4a466a3..ff8ea15c 100644 --- a/net/webrtc/Cargo.toml +++ b/net/webrtc/Cargo.toml @@ -56,6 +56,8 @@ livekit-api = { version = "0.3", default-features = false, features = ["signal-c warp = "0.3" crossbeam-channel = "0.5" +rand = "0.8" +http_1 = { version = "1.0", package = "http" } [dev-dependencies] tracing = { version = "0.1", features = ["log"] } diff --git a/net/webrtc/src/janusvr_signaller/imp.rs b/net/webrtc/src/janusvr_signaller/imp.rs new file mode 100644 index 00000000..52c9a2c0 --- /dev/null +++ b/net/webrtc/src/janusvr_signaller/imp.rs @@ -0,0 +1,726 @@ +// SPDX-License-Identifier: MPL-2.0 + +use crate::signaller::{Signallable, SignallableImpl}; +use crate::RUNTIME; + +use anyhow::{anyhow, Error}; +use async_tungstenite::tungstenite; +use futures::channel::mpsc; +use futures::sink::SinkExt; +use futures::stream::StreamExt; +use gst::glib; +use gst::glib::once_cell::sync::Lazy; +use gst::glib::Properties; +use gst::prelude::*; +use gst::subclass::prelude::*; +use http_1::Uri; +use rand::prelude::*; +use serde::{Deserialize, Serialize}; +use std::ops::ControlFlow; +use std::sync::Mutex; +use std::time::Duration; +use tokio::{task, time::timeout}; +use tungstenite::Message as WsMessage; + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "webrtc-janusvr-signaller", + gst::DebugColorFlags::empty(), + Some("WebRTC Janus Video Room signaller"), + ) +}); + +fn transaction_id() -> String { + thread_rng() + .sample_iter(&rand::distributions::Alphanumeric) + .map(char::from) + .take(30) + .collect() +} + +fn feed_id() -> u32 { + thread_rng().gen() +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +struct KeepAliveMsg { + janus: String, + transaction: String, + session_id: u64, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +struct CreateSessionMsg { + janus: String, + transaction: String, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +struct AttachPluginMsg { + janus: String, + transaction: String, + plugin: String, + session_id: u64, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +struct RoomRequestBody { + request: String, + ptype: String, + room: u64, + id: u32, + #[serde(skip_serializing_if = "Option::is_none")] + display: Option, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +struct RoomRequestMsg { + janus: String, + transaction: String, + session_id: u64, + handle_id: u64, + body: RoomRequestBody, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +struct PublishBody { + request: String, + audio: bool, + video: bool, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +struct Jsep { + sdp: String, + trickle: Option, + r#type: String, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +struct PublishMsg { + janus: String, + transaction: String, + session_id: u64, + handle_id: u64, + body: PublishBody, + jsep: Jsep, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +struct Candidate { + candidate: String, + #[serde(rename = "sdpMLineIndex")] + sdp_m_line_index: u32, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +struct TrickleMsg { + janus: String, + transaction: String, + session_id: u64, + handle_id: u64, + candidate: Candidate, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(untagged)] +enum OutgoingMessage { + KeepAlive(KeepAliveMsg), + CreateSession(CreateSessionMsg), + AttachPlugin(AttachPluginMsg), + RoomRequest(RoomRequestMsg), + Publish(PublishMsg), + Trickle(TrickleMsg), +} + +#[derive(Serialize, Deserialize, Debug)] +struct InnerError { + code: i32, + reason: String, +} + +#[derive(Serialize, Deserialize, Debug)] +struct RoomJoined { + room: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +struct RoomEvent { + room: Option, + error_code: Option, + error: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(tag = "videoroom")] +enum VideoRoomData { + #[serde(rename = "joined")] + Joined(RoomJoined), + #[serde(rename = "event")] + Event(RoomEvent), +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(tag = "plugin")] +enum PluginData { + #[serde(rename = "janus.plugin.videoroom")] + VideoRoom { data: VideoRoomData }, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +struct DataHolder { + id: u64, +} + +#[derive(Serialize, Deserialize, Debug)] +struct SuccessMsg { + transaction: Option, + session_id: Option, + data: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +struct EventMsg { + transaction: Option, + session_id: Option, + plugindata: Option, + jsep: Option, +} + +// IncomingMessage +#[derive(Serialize, Deserialize, Debug)] +#[serde(tag = "janus")] +enum JsonReply { + #[serde(rename = "ack")] + Ack, + #[serde(rename = "success")] + Success(SuccessMsg), + #[serde(rename = "event")] + Event(EventMsg), + #[serde(rename = "webrtcup")] + WebRTCUp, + #[serde(rename = "media")] + Media, + #[serde(rename = "error")] + Error(InnerError), +} + +#[derive(Default)] +struct State { + ws_sender: Option>, + send_task_handle: Option>>, + recv_task_handle: Option>, + session_id: Option, + handle_id: Option, + transaction_id: Option, +} + +#[derive(Clone)] +struct Settings { + janus_endpoint: String, + room_id: Option, + feed_id: u32, + display_name: Option, +} + +impl Default for Settings { + fn default() -> Self { + Self { + janus_endpoint: "ws://127.0.0.1:8188".to_string(), + room_id: None, + feed_id: feed_id(), + display_name: None, + } + } +} + +#[derive(Default, Properties)] +#[properties(wrapper_type = super::JanusVRSignaller)] +pub struct Signaller { + state: Mutex, + #[property(name="janus-endpoint", get, set, type = String, member = janus_endpoint, blurb = "The Janus server endpoint to POST SDP offer to")] + #[property(name="room-id", get, set, type = String, member = room_id, blurb = "The Janus Room ID that will be joined to")] + #[property(name="feed-id", get, set, type = u32, member = feed_id, blurb = "The Janus Feed ID to identify where the track is coming from")] + #[property(name="display-name", get, set, type = String, member = display_name, blurb = "The name of the publisher in the Janus Video Room")] + settings: Mutex, +} + +impl Signaller { + fn raise_error(&self, msg: String) { + self.obj() + .emit_by_name::<()>("error", &[&format!("Error: {msg}")]); + } + + async fn connect(&self) -> Result<(), Error> { + let settings = self.settings.lock().unwrap().clone(); + use tungstenite::client::IntoClientRequest; + let mut request = settings + .janus_endpoint + .parse::()? + .into_client_request()?; + request.headers_mut().append( + "Sec-WebSocket-Protocol", + http_1::HeaderValue::from_static("janus-protocol"), + ); + + let (ws, _) = timeout( + // FIXME: Make the timeout configurable + Duration::from_secs(20), + async_tungstenite::tokio::connect_async(request), + ) + .await??; + + // Channel for asynchronously sending out websocket message + let (mut ws_sink, mut ws_stream) = ws.split(); + + // 1000 is completely arbitrary, we simply don't want infinite piling + // up of messages as with unbounded + let (ws_sender, mut ws_receiver) = mpsc::channel::(1000); + let send_task_handle = + RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move { + loop { + tokio::select! { + opt = ws_receiver.next() => match opt { + Some(msg) => { + gst::log!(CAT, "Sending websocket message {:?}", msg); + ws_sink + .send(WsMessage::Text(serde_json::to_string(&msg).unwrap())) + .await?; + }, + None => break, + }, + _ = tokio::time::sleep(Duration::from_secs(10)) => { + if let Some(ref this) = this { + let (transaction, session_id) = { + let state = this.state.lock().unwrap(); + (state.transaction_id.clone().unwrap(), + state.session_id.unwrap()) + }; + let msg = OutgoingMessage::KeepAlive(KeepAliveMsg { + janus: "keepalive".to_string(), + transaction, + session_id, + }); + ws_sink + .send(WsMessage::Text(serde_json::to_string(&msg).unwrap())) + .await?; + } + } + } + } + + let msg = "Done sending"; + this.map_or_else(|| gst::info!(CAT, "{msg}"), + |this| gst::info!(CAT, imp: this, "{msg}") + ); + + ws_sink.send(WsMessage::Close(None)).await?; + ws_sink.close().await?; + + Ok::<(), Error>(()) + })); + + let recv_task_handle = + RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move { + while let Some(msg) = tokio_stream::StreamExt::next(&mut ws_stream).await { + if let Some(ref this) = this { + if let ControlFlow::Break(_) = this.handle_msg(msg) { + break; + } + } else { + break; + } + } + + let msg = "Stopped websocket receiving"; + this.map_or_else(|| gst::info!(CAT, "{msg}"), + |this| gst::info!(CAT, imp: this, "{msg}") + ); + })); + + let mut state = self.state.lock().unwrap(); + state.ws_sender = Some(ws_sender); + state.send_task_handle = Some(send_task_handle); + state.recv_task_handle = Some(recv_task_handle); + + Ok(()) + } + + fn handle_msg( + &self, + msg: Result, + ) -> ControlFlow<()> { + match msg { + Ok(WsMessage::Text(msg)) => { + gst::trace!(CAT, imp: self, "Received message {}", msg); + if let Ok(reply) = serde_json::from_str::(&msg) { + self.handle_reply(reply); + } else { + gst::error!(CAT, imp: self, "Unknown message from server: {}", msg); + } + } + Ok(WsMessage::Close(reason)) => { + gst::info!(CAT, imp: self, "websocket connection closed: {:?}", reason); + return ControlFlow::Break(()); + } + Ok(_) => (), + Err(err) => { + self.raise_error(err.to_string()); + return ControlFlow::Break(()); + } + } + ControlFlow::Continue(()) + } + + fn handle_reply(&self, reply: JsonReply) { + match reply { + JsonReply::WebRTCUp => { + gst::trace!(CAT, imp: self, "WebRTC streaming is working!"); + } + JsonReply::Success(success) => { + if let Some(data) = success.data { + if success.session_id.is_none() { + gst::trace!(CAT, imp: self, "Janus session {} was created successfully", data.id); + self.set_session_id(data.id); + self.attach_plugin(); + } else { + gst::trace!(CAT, imp: self, "Attached to Janus Video Room plugin successfully, handle: {}", data.id); + self.set_handle_id(data.id); + self.join_room(); + } + } + } + JsonReply::Event(event) => { + if let Some(PluginData::VideoRoom { data: plugindata }) = event.plugindata { + match plugindata { + VideoRoomData::Joined(joined) => { + if let Some(room) = joined.room { + gst::trace!(CAT, imp: self, "Joined room {} successfully", room); + self.session_requested(); + } + } + VideoRoomData::Event(room_event) => { + if room_event.error_code.is_some() && room_event.error.is_some() { + self.raise_error(format!( + "code: {}, reason: {}", + room_event.error_code.unwrap(), + room_event.error.unwrap(), + )); + return; + } + + if let Some(jsep) = event.jsep { + if jsep.r#type == "answer" { + gst::trace!(CAT, imp: self, "Session requested successfully"); + self.handle_answer(jsep.sdp); + } + } + } + } + } + } + JsonReply::Error(error) => { + self.raise_error(format!("code: {}, reason: {}", error.code, error.reason)) + } + // ignore for now + JsonReply::Ack | JsonReply::Media => {} + } + } + + fn send(&self, msg: OutgoingMessage) { + let state = self.state.lock().unwrap(); + if let Some(mut sender) = state.ws_sender.clone() { + RUNTIME.spawn(glib::clone!(@weak self as this => async move { + if let Err(err) = sender.send(msg).await { + this.raise_error(err.to_string()); + } + })); + } + } + + // Only used at the end when cleaning up the resources. + // So that `SignallableImpl::stop` waits the last message + // to be sent properly. + fn send_blocking(&self, msg: OutgoingMessage) { + let state = self.state.lock().unwrap(); + if let Some(mut sender) = state.ws_sender.clone() { + RUNTIME.block_on(glib::clone!(@weak self as this => async move { + if let Err(err) = sender.send(msg).await { + this.raise_error(err.to_string()); + } + })); + } + } + + fn set_transaction_id(&self, transaction: String) { + self.state.lock().unwrap().transaction_id = Some(transaction); + } + + fn create_session(&self) { + let transaction = transaction_id(); + self.set_transaction_id(transaction.clone()); + self.send(OutgoingMessage::CreateSession(CreateSessionMsg { + janus: "create".to_string(), + transaction, + })); + } + + fn set_session_id(&self, session_id: u64) { + self.state.lock().unwrap().session_id = Some(session_id); + } + + fn set_handle_id(&self, handle_id: u64) { + self.state.lock().unwrap().handle_id = Some(handle_id); + } + + fn attach_plugin(&self) { + let (transaction, session_id) = { + let state = self.state.lock().unwrap(); + + ( + state.transaction_id.clone().unwrap(), + state.session_id.unwrap(), + ) + }; + self.send(OutgoingMessage::AttachPlugin(AttachPluginMsg { + janus: "attach".to_string(), + transaction, + plugin: "janus.plugin.videoroom".to_string(), + session_id, + })); + } + + fn join_room(&self) { + let (transaction, session_id, handle_id, room, feed_id, display) = { + let state = self.state.lock().unwrap(); + let settings = self.settings.lock().unwrap(); + + if settings.room_id.is_none() { + self.raise_error("Janus Room ID must be set".to_string()); + return; + } + + ( + state.transaction_id.clone().unwrap(), + state.session_id.unwrap(), + state.handle_id.unwrap(), + settings.room_id.as_ref().unwrap().parse().unwrap(), + settings.feed_id, + settings.display_name.clone(), + ) + }; + self.send(OutgoingMessage::RoomRequest(RoomRequestMsg { + janus: "message".to_string(), + transaction, + session_id, + handle_id, + body: RoomRequestBody { + request: "join".to_string(), + ptype: "publisher".to_string(), + room, + id: feed_id, + display, + }, + })); + } + + fn leave_room(&self) { + let (transaction, session_id, handle_id, room, feed_id, display) = { + let state = self.state.lock().unwrap(); + let settings = self.settings.lock().unwrap(); + + if settings.room_id.is_none() { + self.raise_error("Janus Room ID must be set".to_string()); + return; + } + + ( + state.transaction_id.clone().unwrap(), + state.session_id.unwrap(), + state.handle_id.unwrap(), + settings.room_id.as_ref().unwrap().parse().unwrap(), + settings.feed_id, + settings.display_name.clone(), + ) + }; + self.send_blocking(OutgoingMessage::RoomRequest(RoomRequestMsg { + janus: "message".to_string(), + transaction, + session_id, + handle_id, + body: RoomRequestBody { + request: "leave".to_string(), + ptype: "publisher".to_string(), + room, + id: feed_id, + display, + }, + })); + } + + fn publish(&self, offer: &gst_webrtc::WebRTCSessionDescription) { + let (transaction, session_id, handle_id) = { + let state = self.state.lock().unwrap(); + let settings = self.settings.lock().unwrap(); + + if settings.room_id.is_none() { + self.raise_error("Janus Room ID must be set".to_string()); + return; + } + + ( + state.transaction_id.clone().unwrap(), + state.session_id.unwrap(), + state.handle_id.unwrap(), + ) + }; + let sdp_data = offer.sdp().as_text().unwrap(); + self.send(OutgoingMessage::Publish(PublishMsg { + janus: "message".to_string(), + transaction, + session_id, + handle_id, + body: PublishBody { + request: "publish".to_string(), + audio: true, + video: true, + }, + jsep: Jsep { + sdp: sdp_data, + trickle: Some(true), + r#type: "offer".to_string(), + }, + })); + } + + fn trickle(&self, candidate: &str, sdp_m_line_index: u32) { + let (transaction, session_id, handle_id) = { + let state = self.state.lock().unwrap(); + let settings = self.settings.lock().unwrap(); + + if settings.room_id.is_none() { + self.raise_error("Janus Room ID must be set".to_string()); + return; + } + + ( + state.transaction_id.clone().unwrap(), + state.session_id.unwrap(), + state.handle_id.unwrap(), + ) + }; + self.send(OutgoingMessage::Trickle(TrickleMsg { + janus: "trickle".to_string(), + transaction, + session_id, + handle_id, + candidate: Candidate { + candidate: candidate.to_string(), + sdp_m_line_index, + }, + })); + } + + fn session_requested(&self) { + self.obj().emit_by_name::<()>( + "session-requested", + &[ + &"unique", + &"unique", + &None::, + ], + ); + } + + fn handle_answer(&self, sdp: String) { + match gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()) { + Ok(ans_sdp) => { + let answer = gst_webrtc::WebRTCSessionDescription::new( + gst_webrtc::WebRTCSDPType::Answer, + ans_sdp, + ); + self.obj() + .emit_by_name::<()>("session-description", &[&"unique", &answer]); + } + Err(err) => { + self.raise_error(format!("Could not parse answer SDP: {err}")); + } + } + } +} + +impl SignallableImpl for Signaller { + fn start(&self) { + let this = self.obj().clone(); + let imp = self.downgrade(); + RUNTIME.spawn(async move { + if let Some(imp) = imp.upgrade() { + if let Err(err) = imp.connect().await { + this.emit_by_name::<()>("error", &[&format!("{:?}", anyhow!(err))]); + } else { + imp.create_session(); + } + } + }); + } + + fn send_sdp(&self, _session_id: &str, offer: &gst_webrtc::WebRTCSessionDescription) { + gst::info!(CAT, imp: self, "sending SDP offer to peer: {:?}", offer.sdp().as_text()); + + self.publish(offer); + } + + fn add_ice( + &self, + _session_id: &str, + candidate: &str, + sdp_m_line_index: u32, + _sdp_mid: Option, + ) { + self.trickle(candidate, sdp_m_line_index); + } + + fn stop(&self) { + gst::info!(CAT, imp: self, "Stopping now"); + let mut state = self.state.lock().unwrap(); + + let send_task_handle = state.send_task_handle.take(); + let recv_task_handle = state.recv_task_handle.take(); + + if let Some(mut sender) = state.ws_sender.take() { + RUNTIME.block_on(async move { + sender.close_channel(); + + if let Some(handle) = send_task_handle { + if let Err(err) = handle.await { + gst::warning!(CAT, imp: self, "Error while joining send task: {}", err); + } + } + + if let Some(handle) = recv_task_handle { + // if awaited instead, it hangs the plugin + handle.abort(); + } + }); + } + + state.session_id = None; + state.handle_id = None; + state.transaction_id = None; + } + + fn end_session(&self, _session_id: &str) { + self.leave_room(); + } +} + +#[glib::object_subclass] +impl ObjectSubclass for Signaller { + const NAME: &'static str = "GstJanusVRWebRTCSignaller"; + type Type = super::JanusVRSignaller; + type ParentType = glib::Object; + type Interfaces = (Signallable,); +} + +#[glib::derived_properties] +impl ObjectImpl for Signaller {} diff --git a/net/webrtc/src/janusvr_signaller/mod.rs b/net/webrtc/src/janusvr_signaller/mod.rs new file mode 100644 index 00000000..79094cc5 --- /dev/null +++ b/net/webrtc/src/janusvr_signaller/mod.rs @@ -0,0 +1,19 @@ +// SPDX-License-Identifier: MPL-2.0 + +use crate::signaller::Signallable; +use gst::glib; + +mod imp; + +glib::wrapper! { + pub struct JanusVRSignaller(ObjectSubclass) @implements Signallable; +} + +unsafe impl Send for JanusVRSignaller {} +unsafe impl Sync for JanusVRSignaller {} + +impl Default for JanusVRSignaller { + fn default() -> Self { + glib::Object::new() + } +} diff --git a/net/webrtc/src/lib.rs b/net/webrtc/src/lib.rs index f09950f1..847a6bba 100644 --- a/net/webrtc/src/lib.rs +++ b/net/webrtc/src/lib.rs @@ -15,6 +15,7 @@ use gst::glib::once_cell::sync::Lazy; use tokio::runtime; mod aws_kvs_signaller; +mod janusvr_signaller; mod livekit_signaller; pub mod signaller; pub mod utils; diff --git a/net/webrtc/src/signaller/imp.rs b/net/webrtc/src/signaller/imp.rs index 3dda6793..9299c0b1 100644 --- a/net/webrtc/src/signaller/imp.rs +++ b/net/webrtc/src/signaller/imp.rs @@ -547,9 +547,7 @@ impl SignallableImpl for Signaller { } if let Some(handle) = receive_task_handle { - if let Err(err) = handle.await { - gst::warning!(CAT, imp: self, "Error while joining receive task: {}", err); - } + handle.abort(); } }); } diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index 5d2b5352..ff628c49 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -22,6 +22,7 @@ use std::sync::{mpsc, Arc, Condvar, Mutex}; use super::homegrown_cc::CongestionController; use super::{WebRTCSinkCongestionControl, WebRTCSinkError, WebRTCSinkMitigationMode}; use crate::aws_kvs_signaller::AwsKvsSignaller; +use crate::janusvr_signaller::JanusVRSignaller; use crate::livekit_signaller::LiveKitSignaller; use crate::signaller::{prelude::*, Signallable, Signaller, WebRTCSignallerRole}; use crate::whip_signaller::WhipClientSignaller; @@ -4414,3 +4415,43 @@ impl ObjectSubclass for LiveKitWebRTCSink { type Type = super::LiveKitWebRTCSink; type ParentType = super::BaseWebRTCSink; } + +#[derive(Default)] +pub struct JanusVRWebRTCSink {} + +impl ObjectImpl for JanusVRWebRTCSink { + fn constructed(&self) { + let element = self.obj(); + let ws = element.upcast_ref::().imp(); + + let _ = ws.set_signaller(JanusVRSignaller::default().upcast()); + } +} + +impl GstObjectImpl for JanusVRWebRTCSink {} + +impl ElementImpl for JanusVRWebRTCSink { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "JanusVRWebRTCSink", + "Sink/Network/WebRTC", + "WebRTC sink with Janus Video Room signaller", + "Eva Pace ", + ) + }); + + Some(&*ELEMENT_METADATA) + } +} + +impl BinImpl for JanusVRWebRTCSink {} + +impl BaseWebRTCSinkImpl for JanusVRWebRTCSink {} + +#[glib::object_subclass] +impl ObjectSubclass for JanusVRWebRTCSink { + const NAME: &'static str = "GstJanusVRWebRTCSink"; + type Type = super::JanusVRWebRTCSink; + type ParentType = super::BaseWebRTCSink; +} diff --git a/net/webrtc/src/webrtcsink/mod.rs b/net/webrtc/src/webrtcsink/mod.rs index 5397def0..413d0220 100644 --- a/net/webrtc/src/webrtcsink/mod.rs +++ b/net/webrtc/src/webrtcsink/mod.rs @@ -60,6 +60,10 @@ glib::wrapper! { pub struct LiveKitWebRTCSink(ObjectSubclass) @extends BaseWebRTCSink, gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation; } +glib::wrapper! { + pub struct JanusVRWebRTCSink(ObjectSubclass) @extends BaseWebRTCSink, gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation; +} + #[derive(thiserror::Error, Debug)] pub enum WebRTCSinkError { #[error("no session with id")] @@ -146,6 +150,61 @@ pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { gst::Rank::NONE, LiveKitWebRTCSink::static_type(), )?; + /** + * element-janusvrwebrtcsink: + * + * The `JanusVRWebRTCSink` is a plugin that integrates with the [Video Room plugin](https://janus.conf.meetecho.com/docs/videoroom) of the [Janus Gateway](https://github.com/meetecho/janus-gateway). It basically streams whatever data you pipe to it (video, audio) into WebRTC using Janus as the signaller. + * + * ## How to use it + * + * You'll need to have: + * + * - A Janus server endpoint; + * - Any WebRTC browser application that uses Janus as the signaller, eg: the `html` folder of [janus-gateway repository](https://github.com/meetecho/janus-gateway). + * + * You can pipe the video like this (if you don't happen to run Janus locally, you can set the endpoint + * like this: `signaller::janus-endpoint=ws://127.0.0.1:8188`): + * + * ```bash + * $ gst-launch-1.0 videotestsrc ! janusvrwebrtcsink signaller::room-id=1234 + * ``` + * + * And for audio (yes you can do both at the same time, you just need to pipe it properly). + * + * ```bash + * $ gst-launch-1.0 audiotestsrc ! janusvrwebrtcsink signaller::room-id=1234 + * ``` + * + * And you can set the display name via `signaller::display-name`, eg: + * + * ```bash + * $ gst-launch-1.0 videotestsrc ! janusvrwebrtcsink signaller::room-id=1234 signaller::display-name=ana + * ``` + * + * You should see the GStreamer `videotestsrc`/`audiotestsrc` output in your browser now! + * + * If for some reason you can't run Janus locally, you can use their open [demo webpage](https://janus.conf.meetecho.com/demos/videoroom.html), and point to its WebSocket server: + * + * ```bash + * $ gst-launch-1.0 videotestsrc ! janusvrwebrtcsink signaller::room-id=1234 signaller::janus-endpoint=wss://janus.conf.meetecho.com/ws + * ``` + * + * ## Reference links + * + * - [Janus REST/WebSockets docs](https://janus.conf.meetecho.com/docs/rest.html) + * - [Example implementation in GStreamer](https://gitlab.freedesktop.org/gstreamer/gstreamer/-/blob/269ab858813e670d521cc4b6a71cc0ec4a6e70ed/subprojects/gst-examples/webrtc/janus/rust/src/janus.rs) + * + * ## Notes + * + * - This plugin supports both the legacy Video Room plugin as well as the `multistream` one; + * - If you see a warning in the logs related to `rtpgccbwe`, you're probably missing the `gst-plugin-rtp` in your system. + */ + gst::Element::register( + Some(plugin), + "janusvrwebrtcsink", + gst::Rank::NONE, + JanusVRWebRTCSink::static_type(), + )?; Ok(()) }