net/webrtc/janusvr: add new source element

This commit is contained in:
Eva Pace 2023-10-27 12:45:05 -03:00
parent fc1c017fc6
commit b946fb24aa
4 changed files with 422 additions and 30 deletions

View file

@ -1,6 +1,6 @@
// SPDX-License-Identifier: MPL-2.0
use crate::signaller::{Signallable, SignallableImpl};
use crate::signaller::{Signallable, SignallableImpl, WebRTCSignallerRole};
use crate::RUNTIME;
use anyhow::{anyhow, Error};
@ -86,6 +86,31 @@ struct RoomRequestMsg {
body: RoomRequestBody,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct SubscribeStream {
feed: u64,
mid: String,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct SubscribeRequestBody {
request: String,
ptype: String,
room: u64,
streams: Vec<SubscribeStream>,
use_msid: bool,
private_id: Option<u64>,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct SubscribeRequestMsg {
janus: String,
transaction: String,
session_id: u64,
handle_id: u64,
body: SubscribeRequestBody,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct PublishBody {
request: String,
@ -96,6 +121,7 @@ struct PublishBody {
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct Jsep {
sdp: String,
#[serde(skip_serializing_if = "Option::is_none")]
trickle: Option<bool>,
r#type: String,
}
@ -128,6 +154,22 @@ struct TrickleMsg {
candidate: Candidate,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct RequestStartBody {
request: String,
room_id: u64,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct RequestStartMsg {
janus: String,
transaction: String,
session_id: u64,
handle_id: u64,
jsep: Jsep,
body: RequestStartBody,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(untagged)]
enum OutgoingMessage {
@ -135,8 +177,10 @@ enum OutgoingMessage {
CreateSession(CreateSessionMsg),
AttachPlugin(AttachPluginMsg),
RoomRequest(RoomRequestMsg),
SubscribeRequest(SubscribeRequestMsg),
Publish(PublishMsg),
Trickle(TrickleMsg),
RequestStart(RequestStartMsg),
}
#[derive(Serialize, Deserialize, Debug)]
@ -145,18 +189,39 @@ struct InnerError {
reason: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct Stream {
mid: String,
}
#[derive(Serialize, Deserialize, Debug)]
struct RoomJoined {
room: Option<u64>,
private_id: Option<u64>,
publishers: Option<Vec<Publisher>>,
}
#[derive(Serialize, Deserialize, Debug)]
struct Publisher {
id: u64,
streams: Vec<Stream>,
}
#[derive(Serialize, Deserialize, Debug)]
struct RoomEvent {
room: Option<u64>,
publishers: Option<Vec<Publisher>>,
private_id: Option<u64>,
started: Option<String>,
error_code: Option<i32>,
error: Option<String>,
}
#[derive(Serialize, Deserialize, Debug)]
struct Attached {
streams: Option<Vec<ConsumerStream>>,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "videoroom")]
enum VideoRoomData {
@ -164,6 +229,8 @@ enum VideoRoomData {
Joined(RoomJoined),
#[serde(rename = "event")]
Event(RoomEvent),
#[serde(rename = "attached")]
Attached(Attached),
}
#[derive(Serialize, Deserialize, Debug)]
@ -173,6 +240,11 @@ enum PluginData {
VideoRoom { data: VideoRoomData },
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct ConsumerStream {
feed_id: Option<u64>,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct DataHolder {
id: u64,
@ -217,27 +289,34 @@ struct State {
send_task_handle: Option<task::JoinHandle<Result<(), Error>>>,
recv_task_handle: Option<task::JoinHandle<()>>,
session_id: Option<u64>,
handle_id: Option<u64>,
producer_handle: Option<u64>,
consumer_handle: Option<u64>,
private_id: Option<u64>,
transaction_id: Option<String>,
offer_sdp: Option<String>,
}
#[derive(Clone)]
struct Settings {
janus_endpoint: String,
uri: String,
producer_peer_id: String,
room_id: Option<String>,
feed_id: u32,
display_name: Option<String>,
secret_key: Option<String>,
role: WebRTCSignallerRole,
}
impl Default for Settings {
fn default() -> Self {
Self {
janus_endpoint: "ws://127.0.0.1:8188".to_string(),
uri: "ws://127.0.0.1:8188".to_string(),
producer_peer_id: "".to_string(),
room_id: None,
feed_id: feed_id(),
display_name: None,
secret_key: None,
role: WebRTCSignallerRole::default(),
}
}
}
@ -246,11 +325,13 @@ impl Default for Settings {
#[properties(wrapper_type = super::JanusVRSignaller)]
pub struct Signaller {
state: Mutex<State>,
#[property(name="janus-endpoint", get, set, type = String, member = janus_endpoint, blurb = "The Janus server endpoint to POST SDP offer to")]
#[property(name="uri", get, set, type = String, member = uri, blurb = "The Janus server endpoint to communicate with")]
#[property(name="producer-peer-id", get, set, type = String, member = producer_peer_id, blurb = "producer peer id to pass to webrtcsrc base")]
#[property(name="room-id", get, set, type = String, member = room_id, blurb = "The Janus Room ID that will be joined to")]
#[property(name="feed-id", get, set, type = u32, member = feed_id, blurb = "The Janus Feed ID to identify where the track is coming from")]
#[property(name="display-name", get, set, type = String, member = display_name, blurb = "The name of the publisher in the Janus Video Room")]
#[property(name="secret-key", get, set, type = String, member = secret_key, blurb = "The secret API key to communicate with Janus server")]
#[property(name="role", get, set, type = WebRTCSignallerRole, member = role, blurb = "The role the signaller will take: consumer, producer or listener", builder(WebRTCSignallerRole::default()))]
settings: Mutex<Settings>,
}
@ -264,7 +345,7 @@ impl Signaller {
let settings = self.settings.lock().unwrap().clone();
use tungstenite::client::IntoClientRequest;
let mut request = settings
.janus_endpoint
.uri
.parse::<Uri>()?
.into_client_request()?;
request.headers_mut().append(
@ -303,12 +384,17 @@ impl Signaller {
if let Some(ref this) = this {
let (transaction, session_id, apisecret) = {
let state = this.state.lock().unwrap();
let settings = this.settings.lock().unwrap();
(
state.transaction_id.clone().unwrap(),
state.session_id.unwrap(),
settings.secret_key.clone(),
)
let session_id = if let Some(s) = state.session_id {
s
} else {
// session_id is set to None when the plugin is dying
break
};
(state.transaction_id.clone().unwrap(),
session_id,
settings.secret_key.clone())
};
let msg = OutgoingMessage::KeepAlive(KeepAliveMsg {
janus: "keepalive".to_string(),
@ -398,6 +484,29 @@ impl Signaller {
JsonReply::WebRTCUp => {
gst::trace!(CAT, imp: self, "WebRTC streaming is working!");
}
JsonReply::Success(_) | JsonReply::Event(_) => {
let role = {
let settings = self.settings.lock().unwrap();
settings.role.clone()
};
match role {
WebRTCSignallerRole::Producer => self.handle_reply_producer(reply),
WebRTCSignallerRole::Consumer => self.handle_reply_consumer(reply),
WebRTCSignallerRole::Listener => { /*nothing yet*/ }
}
}
JsonReply::Error(error) => {
self.raise_error(format!("code: {}, reason: {}", error.code, error.reason))
}
// ignore for now
JsonReply::Ack | JsonReply::Media => {}
}
}
fn handle_reply_producer(&self, reply: JsonReply) {
match reply {
JsonReply::Success(success) => {
if let Some(data) = success.data {
if success.session_id.is_none() {
@ -406,7 +515,8 @@ impl Signaller {
self.attach_plugin();
} else {
gst::trace!(CAT, imp: self, "Attached to Janus Video Room plugin successfully, handle: {}", data.id);
self.set_handle_id(data.id);
self.set_producer_handle(data.id);
self.join_room();
}
}
@ -430,21 +540,110 @@ impl Signaller {
return;
}
// publish stream & handle answer
if let Some(jsep) = event.jsep {
if jsep.r#type == "answer" {
gst::trace!(CAT, imp: self, "Session requested successfully");
self.handle_answer(jsep.sdp);
self.handle_answer(&jsep.sdp);
}
}
}
VideoRoomData::Attached(_) => {
/* shouldn't happen, it's in the consumer flow */
}
}
}
}
JsonReply::Ack | JsonReply::WebRTCUp | JsonReply::Media | JsonReply::Error(_) => {
unreachable!("handled in fn handle_reply()")
}
}
}
fn handle_reply_consumer(&self, reply: JsonReply) {
match reply {
JsonReply::Success(success) => {
if let Some(data) = success.data {
if success.session_id.is_none() {
gst::trace!(CAT, imp: self, "Janus session {} was created successfully", data.id);
self.set_session_id(data.id);
self.attach_plugin();
} else {
gst::trace!(CAT, imp: self, "Attached to Janus Video Room plugin successfully, handle: {}", data.id);
if self.has_producer_handle() {
self.set_consumer_handle(data.id);
self.join_room();
self.obj()
.emit_by_name::<()>("session-started", &[&"unique", &"unique"]);
return;
} else {
self.set_producer_handle(data.id);
self.attach_plugin();
return;
}
}
}
}
JsonReply::Event(event) => {
if let Some(PluginData::VideoRoom { data: plugindata }) = event.plugindata {
match plugindata {
VideoRoomData::Joined(joined) => {
if joined.private_id.is_some() {
self.set_private_id(joined.private_id);
}
if joined.publishers.as_ref().map_or(false, |a| !a.is_empty()) {
let publishers = joined.publishers.unwrap();
let publisher_id = publishers[0].id;
self.subscribe_feed(publishers[0].streams.clone(), publisher_id);
}
}
VideoRoomData::Event(room_event) => {
if room_event.error_code.is_some() && room_event.error.is_some() {
self.raise_error(format!(
"code: {}, reason: {}",
room_event.error_code.unwrap(),
room_event.error.unwrap(),
));
return;
}
if room_event.private_id.is_some() {
self.set_private_id(room_event.private_id);
}
if room_event
.publishers
.as_ref()
.map_or(false, |a| !a.is_empty())
{
let publishers = room_event.publishers.unwrap();
let publisher_id = publishers[0].id;
self.subscribe_feed(publishers[0].streams.clone(), publisher_id);
return;
}
if room_event.started == Some("ok".to_string()) {
gst::trace!(CAT, imp: self, "Handled answer properly");
}
}
VideoRoomData::Attached(attached) => {
if let Some(ref jsep) = event.jsep {
if jsep.r#type == "offer" {
if let Some(streams) = attached.streams {
self.set_producer_peer_id(streams[0].feed_id.unwrap());
}
gst::trace!(CAT, imp: self, "Offer received!");
self.set_offer_sdp(&jsep.sdp);
self.handle_offer();
}
}
}
}
}
}
JsonReply::Error(error) => {
self.raise_error(format!("code: {}, reason: {}", error.code, error.reason))
JsonReply::Ack | JsonReply::WebRTCUp | JsonReply::Media | JsonReply::Error(_) => {
unreachable!("handled in fn handle_reply()")
}
// ignore for now
JsonReply::Ack | JsonReply::Media => {}
}
}
@ -493,8 +692,28 @@ impl Signaller {
self.state.lock().unwrap().session_id = Some(session_id);
}
fn set_handle_id(&self, handle_id: u64) {
self.state.lock().unwrap().handle_id = Some(handle_id);
fn set_producer_handle(&self, handle_id: u64) {
self.state.lock().unwrap().producer_handle = Some(handle_id);
}
fn has_producer_handle(&self) -> bool {
self.state.lock().unwrap().producer_handle.is_some()
}
fn set_consumer_handle(&self, handle_id: u64) {
self.state.lock().unwrap().consumer_handle = Some(handle_id);
}
fn set_private_id(&self, private_id: Option<u64>) {
self.state.lock().unwrap().private_id = private_id;
}
fn set_offer_sdp(&self, sdp: &str) {
self.state.lock().unwrap().offer_sdp = Some(sdp.to_string());
}
fn set_producer_peer_id(&self, producer_peer_id: u64) {
self.settings.lock().unwrap().producer_peer_id = producer_peer_id.to_string();
}
fn attach_plugin(&self) {
@ -530,7 +749,7 @@ impl Signaller {
(
state.transaction_id.clone().unwrap(),
state.session_id.unwrap(),
state.handle_id.unwrap(),
state.producer_handle.unwrap(),
settings.room_id.as_ref().unwrap().parse().unwrap(),
settings.feed_id,
settings.display_name.clone(),
@ -566,7 +785,7 @@ impl Signaller {
(
state.transaction_id.clone().unwrap(),
state.session_id.unwrap(),
state.handle_id.unwrap(),
state.producer_handle.unwrap(),
settings.room_id.as_ref().unwrap().parse().unwrap(),
settings.feed_id,
settings.display_name.clone(),
@ -589,6 +808,46 @@ impl Signaller {
}));
}
fn subscribe_feed(&self, streams: Vec<Stream>, publisher_id: u64) {
let (transaction, session_id, handle_id, room, private_id) = {
let state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
if settings.room_id.is_none() {
self.raise_error("Janus Room ID must be set".to_string());
return;
}
(
state.transaction_id.clone().unwrap(),
state.session_id.unwrap(),
state.consumer_handle.unwrap(),
settings.room_id.as_ref().unwrap().parse().unwrap(),
state.private_id,
)
};
self.send(OutgoingMessage::SubscribeRequest(SubscribeRequestMsg {
janus: "message".to_string(),
transaction,
body: SubscribeRequestBody {
request: "join".to_string(),
ptype: "subscriber".to_string(),
room,
streams: streams
.iter()
.map(|stream| SubscribeStream {
feed: publisher_id,
mid: stream.mid.clone(),
})
.collect(),
use_msid: false,
private_id,
},
session_id,
handle_id,
}));
}
fn publish(&self, offer: &gst_webrtc::WebRTCSessionDescription) {
let (transaction, session_id, handle_id, apisecret) = {
let state = self.state.lock().unwrap();
@ -602,7 +861,7 @@ impl Signaller {
(
state.transaction_id.clone().unwrap(),
state.session_id.unwrap(),
state.handle_id.unwrap(),
state.producer_handle.unwrap(),
settings.secret_key.clone(),
)
};
@ -639,7 +898,7 @@ impl Signaller {
(
state.transaction_id.clone().unwrap(),
state.session_id.unwrap(),
state.handle_id.unwrap(),
state.producer_handle.unwrap(),
settings.secret_key.clone(),
)
};
@ -667,7 +926,7 @@ impl Signaller {
);
}
fn handle_answer(&self, sdp: String) {
fn handle_answer(&self, sdp: &str) {
match gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()) {
Ok(ans_sdp) => {
let answer = gst_webrtc::WebRTCSessionDescription::new(
@ -682,6 +941,58 @@ impl Signaller {
}
}
}
fn handle_offer(&self) {
let sdp = {
let state = self.state.lock().unwrap();
state.offer_sdp.clone().unwrap()
};
match gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()) {
Ok(offer_sdp) => {
let offer = gst_webrtc::WebRTCSessionDescription::new(
gst_webrtc::WebRTCSDPType::Offer,
offer_sdp,
);
self.obj()
.emit_by_name::<()>("session-description", &[&"unique", &offer]);
}
Err(err) => {
self.raise_error(format!("Could not parse answer SDP: {err}"));
}
}
}
fn request_start(&self, sdp: &gst_webrtc::WebRTCSessionDescription) {
let (transaction, session_id, handle_id, room_id) = {
let state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
(
state.transaction_id.clone().unwrap(),
state.session_id.unwrap(),
state.consumer_handle.unwrap(),
settings.room_id.as_ref().unwrap().parse().unwrap(),
)
};
let sdp = sdp.sdp().as_text().unwrap();
self.send(OutgoingMessage::RequestStart(RequestStartMsg {
janus: "message".to_string(),
transaction,
session_id,
handle_id,
jsep: Jsep {
r#type: "answer".to_string(),
trickle: None,
sdp,
},
body: RequestStartBody {
request: "start".to_string(),
room_id,
},
}));
}
}
impl SignallableImpl for Signaller {
@ -700,9 +1011,22 @@ impl SignallableImpl for Signaller {
}
fn send_sdp(&self, _session_id: &str, offer: &gst_webrtc::WebRTCSessionDescription) {
gst::info!(CAT, imp: self, "sending SDP offer to peer: {:?}", offer.sdp().as_text());
let role = {
let settings = self.settings.lock().unwrap();
settings.role.clone()
};
self.publish(offer);
match role {
WebRTCSignallerRole::Producer => {
gst::info!(CAT, imp: self, "sending SDP offer to peer: {:?}", offer.sdp().as_text());
self.publish(offer)
}
WebRTCSignallerRole::Consumer => {
gst::info!(CAT, imp: self, "sending SDP answer to peer: {:?}", offer.sdp().as_text());
self.request_start(offer)
}
WebRTCSignallerRole::Listener => { /*nothing yet*/ }
}
}
fn add_ice(
@ -740,7 +1064,8 @@ impl SignallableImpl for Signaller {
}
state.session_id = None;
state.handle_id = None;
state.producer_handle = None;
state.consumer_handle = None;
state.transaction_id = None;
}

View file

@ -4447,7 +4447,14 @@ impl ObjectImpl for JanusVRWebRTCSink {
let element = self.obj();
let ws = element.upcast_ref::<super::BaseWebRTCSink>().imp();
let _ = ws.set_signaller(JanusVRSignaller::default().upcast());
let signaller = JanusVRSignaller::default();
signaller.set_role(WebRTCSignallerRole::Producer);
let _ = ws.set_signaller(signaller.upcast());
let obj = &*self.obj();
obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE);
obj.set_element_flags(gst::ElementFlags::SINK);
}
}

View file

@ -2,7 +2,8 @@
use gst::prelude::*;
use crate::signaller::{prelude::*, Signallable, Signaller};
use crate::janusvr_signaller::JanusVRSignaller;
use crate::signaller::{prelude::*, Signallable, Signaller, WebRTCSignallerRole};
use crate::utils::{Codec, Codecs, NavigationEvent, AUDIO_CAPS, RTP_CAPS, VIDEO_CAPS};
use crate::webrtcsrc::WebRTCSrcPad;
use crate::whip_signaller::WhipServerSignaller;
@ -1286,3 +1287,51 @@ impl ObjectSubclass for WhipServerSrc {
type Type = super::WhipServerSrc;
type ParentType = super::BaseWebRTCSrc;
}
#[derive(Default)]
pub struct JanusVRWebRTCSrc {}
impl ObjectImpl for JanusVRWebRTCSrc {
fn constructed(&self) {
self.parent_constructed();
let element = self.obj();
let ws = element.upcast_ref::<super::BaseWebRTCSrc>().imp();
let signaller = JanusVRSignaller::default();
signaller.set_role(WebRTCSignallerRole::Consumer);
let _ = ws.set_signaller(signaller.upcast());
let obj = &*self.obj();
obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE);
obj.set_element_flags(gst::ElementFlags::SOURCE);
}
}
impl GstObjectImpl for JanusVRWebRTCSrc {}
impl BinImpl for JanusVRWebRTCSrc {}
impl ElementImpl for JanusVRWebRTCSrc {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"JanusVRWebRTCSrc",
"Source/Network/WebRTC",
"WebRTC source element using Janus Video Room as the signaller",
"Eva Pace <epace@igalia.com>",
)
});
Some(&*ELEMENT_METADATA)
}
}
impl BaseWebRTCSrcImpl for JanusVRWebRTCSrc {}
#[glib::object_subclass]
impl ObjectSubclass for JanusVRWebRTCSrc {
const NAME: &'static str = "GstJanusVRWebRTCSrc";
type Type = super::JanusVRWebRTCSrc;
type ParentType = super::BaseWebRTCSrc;
}

View file

@ -53,6 +53,10 @@ glib::wrapper! {
pub struct WhipServerSrc(ObjectSubclass<imp::WhipServerSrc>) @extends BaseWebRTCSrc, gst::Bin, gst::Element, gst::Object, @implements gst::URIHandler, gst::ChildProxy;
}
glib::wrapper! {
pub struct JanusVRWebRTCSrc(ObjectSubclass<imp::JanusVRWebRTCSrc>) @extends BaseWebRTCSrc, gst::Bin, gst::Element, gst::Object, @implements gst::URIHandler, gst::ChildProxy;
}
glib::wrapper! {
pub struct WebRTCSrcPad(ObjectSubclass<pad::WebRTCSrcPad>) @extends gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object;
}
@ -76,5 +80,12 @@ pub fn register(plugin: Option<&gst::Plugin>) -> Result<(), glib::BoolError> {
WhipServerSrc::static_type(),
)?;
gst::Element::register(
plugin,
"janusvrwebrtcsrc",
gst::Rank::PRIMARY,
JanusVRWebRTCSrc::static_type(),
)?;
Ok(())
}