webrtc: extract a BaseWebRTCSink

For documentation purposes, AwsKVSWebRTCSink should not inherit from
another element.

+ Mark base class as plugin API and update plugin cache

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1178>
This commit is contained in:
Mathieu Duponchelle 2023-04-13 17:02:18 +02:00 committed by GStreamer Marge Bot
parent 367b98bfcb
commit f1fd8d84c3
4 changed files with 254 additions and 188 deletions

View file

@ -6065,10 +6065,10 @@
"elements": { "elements": {
"awskvswebrtcsink": { "awskvswebrtcsink": {
"author": "Mathieu Duponchelle <mathieu@centricular.com>", "author": "Mathieu Duponchelle <mathieu@centricular.com>",
"description": "WebRTC sink", "description": "WebRTC sink with kinesis video streams signaller",
"hierarchy": [ "hierarchy": [
"GstAwsKvsWebRTCSink", "GstAwsKvsWebRTCSink",
"GstWebRTCSink", "GstBaseWebRTCSink",
"GstBin", "GstBin",
"GstElement", "GstElement",
"GstObject", "GstObject",
@ -6096,9 +6096,10 @@
}, },
"webrtcsink": { "webrtcsink": {
"author": "Mathieu Duponchelle <mathieu@centricular.com>", "author": "Mathieu Duponchelle <mathieu@centricular.com>",
"description": "WebRTC sink", "description": "WebRTC sink with custom protocol signaller",
"hierarchy": [ "hierarchy": [
"GstWebRTCSink", "GstWebRTCSink",
"GstBaseWebRTCSink",
"GstBin", "GstBin",
"GstElement", "GstElement",
"GstObject", "GstObject",
@ -6110,7 +6111,6 @@
"GstNavigation" "GstNavigation"
], ],
"klass": "Sink/Network/WebRTC", "klass": "Sink/Network/WebRTC",
"long-name": "WebRTCSink",
"pad-templates": { "pad-templates": {
"audio_%%u": { "audio_%%u": {
"caps": "audio/x-raw:\n", "caps": "audio/x-raw:\n",
@ -6123,6 +6123,117 @@
"presence": "request" "presence": "request"
} }
}, },
"rank": "none"
},
"webrtcsrc": {
"author": "Thibault Saunier <tsaunier@igalia.com>",
"description": "WebRTC src",
"hierarchy": [
"GstWebRTCSrc",
"GstBin",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"interfaces": [
"GstChildProxy",
"GstURIHandler"
],
"klass": "Source/Network/WebRTC",
"long-name": "WebRTCSrc",
"pad-templates": {
"audio_%%u": {
"caps": "audio/x-raw(ANY):\naudio/x-opus:\napplication/x-rtp:\n",
"direction": "src",
"presence": "sometimes",
"type": "GstWebRTCSrcPad"
},
"video_%%u": {
"caps": "video/x-raw(ANY):\napplication/x-rtp:\n",
"direction": "src",
"presence": "sometimes",
"type": "GstWebRTCSrcPad"
}
},
"properties": {
"audio-codecs": {
"blurb": "Names of audio codecs to be be used during the SDP negotiation. Valid values: [OPUS]",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"mutable": "ready",
"readable": true,
"type": "GstValueArray",
"writable": true
},
"meta": {
"blurb": "Free form metadata about the consumer",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"mutable": "ready",
"readable": true,
"type": "GstStructure",
"writable": true
},
"signaller": {
"blurb": "The Signallable object to use to handle WebRTC Signalling",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"mutable": "ready",
"readable": true,
"type": "GstRSWebRTCSignallableIface",
"writable": true
},
"stun-server": {
"blurb": "NULL",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "stun://stun.l.google.com:19302",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": true
},
"video-codecs": {
"blurb": "Names of video codecs to be be used during the SDP negotiation. Valid values: [VP8, H264, VP9, H265]",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"mutable": "ready",
"readable": true,
"type": "GstValueArray",
"writable": true
}
},
"rank": "primary"
}
},
"filename": "gstrswebrtc",
"license": "MPL-2.0",
"other-types": {
"GstBaseWebRTCSink": {
"hierarchy": [
"GstBaseWebRTCSink",
"GstBin",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"interfaces": [
"GstChildProxy",
"GstNavigation"
],
"kind": "object",
"properties": { "properties": {
"audio-caps": { "audio-caps": {
"blurb": "Governs what audio codecs will be proposed", "blurb": "Governs what audio codecs will be proposed",
@ -6308,7 +6419,6 @@
"writable": true "writable": true
} }
}, },
"rank": "none",
"signals": { "signals": {
"consumer-added": { "consumer-added": {
"args": [ "args": [
@ -6364,101 +6474,6 @@
} }
} }
}, },
"webrtcsrc": {
"author": "Thibault Saunier <tsaunier@igalia.com>",
"description": "WebRTC src",
"hierarchy": [
"GstWebRTCSrc",
"GstBin",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"interfaces": [
"GstChildProxy",
"GstURIHandler"
],
"klass": "Source/Network/WebRTC",
"long-name": "WebRTCSrc",
"pad-templates": {
"audio_%%u": {
"caps": "audio/x-raw(ANY):\naudio/x-opus:\napplication/x-rtp:\n",
"direction": "src",
"presence": "sometimes",
"type": "GstWebRTCSrcPad"
},
"video_%%u": {
"caps": "video/x-raw(ANY):\napplication/x-rtp:\n",
"direction": "src",
"presence": "sometimes",
"type": "GstWebRTCSrcPad"
}
},
"properties": {
"audio-codecs": {
"blurb": "Names of audio codecs to be be used during the SDP negotiation. Valid values: [OPUS]",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"mutable": "ready",
"readable": true,
"type": "GstValueArray",
"writable": true
},
"meta": {
"blurb": "Free form metadata about the consumer",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"mutable": "ready",
"readable": true,
"type": "GstStructure",
"writable": true
},
"signaller": {
"blurb": "The Signallable object to use to handle WebRTC Signalling",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"mutable": "ready",
"readable": true,
"type": "GstRSWebRTCSignallableIface",
"writable": true
},
"stun-server": {
"blurb": "NULL",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "stun://stun.l.google.com:19302",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": true
},
"video-codecs": {
"blurb": "Names of video codecs to be be used during the SDP negotiation. Valid values: [VP8, H264, VP9, H265]",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"mutable": "ready",
"readable": true,
"type": "GstValueArray",
"writable": true
}
},
"rank": "primary"
}
},
"filename": "gstrswebrtc",
"license": "MPL-2.0",
"other-types": {
"GstRSWebRTCSignallableIface": { "GstRSWebRTCSignallableIface": {
"hierarchy": [ "hierarchy": [
"GstRSWebRTCSignallableIface", "GstRSWebRTCSignallableIface",

View file

@ -110,7 +110,7 @@ impl CongestionController {
fn update_delay( fn update_delay(
&mut self, &mut self,
element: &super::WebRTCSink, element: &super::BaseWebRTCSink,
twcc_stats: &gst::StructureRef, twcc_stats: &gst::StructureRef,
rtt: f64, rtt: f64,
) -> CongestionControlOp { ) -> CongestionControlOp {
@ -291,7 +291,7 @@ impl CongestionController {
pub fn loss_control( pub fn loss_control(
&mut self, &mut self,
element: &super::WebRTCSink, element: &super::BaseWebRTCSink,
stats: &gst::StructureRef, stats: &gst::StructureRef,
encoders: &mut Vec<VideoEncoder>, encoders: &mut Vec<VideoEncoder>,
) { ) {
@ -316,7 +316,7 @@ impl CongestionController {
pub fn delay_control( pub fn delay_control(
&mut self, &mut self,
element: &super::WebRTCSink, element: &super::BaseWebRTCSink,
stats: &gst::StructureRef, stats: &gst::StructureRef,
encoders: &mut Vec<VideoEncoder>, encoders: &mut Vec<VideoEncoder>,
) { ) {
@ -328,7 +328,7 @@ impl CongestionController {
fn apply_control_op( fn apply_control_op(
&mut self, &mut self,
element: &super::WebRTCSink, element: &super::BaseWebRTCSink,
encoders: &mut Vec<VideoEncoder>, encoders: &mut Vec<VideoEncoder>,
control_op: CongestionControlOp, control_op: CongestionControlOp,
controller_type: ControllerType, controller_type: ControllerType,

View file

@ -220,7 +220,7 @@ struct State {
signaller_signals: Option<SignallerSignals>, signaller_signals: Option<SignallerSignals>,
} }
fn create_navigation_event(sink: &super::WebRTCSink, msg: &str) { fn create_navigation_event(sink: &super::BaseWebRTCSink, msg: &str) {
let event: Result<NavigationEvent, _> = serde_json::from_str(msg); let event: Result<NavigationEvent, _> = serde_json::from_str(msg);
if let Ok(event) = event { if let Ok(event) = event {
@ -281,7 +281,7 @@ struct NavigationEventHandler((glib::SignalHandlerId, WebRTCDataChannel));
/// Our instance structure /// Our instance structure
#[derive(Default)] #[derive(Default)]
pub struct WebRTCSink { pub struct BaseWebRTCSink {
state: Mutex<State>, state: Mutex<State>,
settings: Mutex<Settings>, settings: Mutex<Settings>,
} }
@ -486,7 +486,7 @@ fn configure_encoder(enc: &gst::Element, start_bitrate: u32) {
/// the codec discovery code, and this gets the job done. /// the codec discovery code, and this gets the job done.
#[allow(clippy::too_many_arguments)] // This needs some more refactoring and it will happen soon #[allow(clippy::too_many_arguments)] // This needs some more refactoring and it will happen soon
fn setup_encoding( fn setup_encoding(
element: &super::WebRTCSink, element: &super::BaseWebRTCSink,
pipeline: &gst::Pipeline, pipeline: &gst::Pipeline,
src: &gst::Element, src: &gst::Element,
input_caps: &gst::Caps, input_caps: &gst::Caps,
@ -681,7 +681,7 @@ impl VideoEncoder {
(width + 1) & !1 (width + 1) & !1
} }
pub(crate) fn set_bitrate(&mut self, element: &super::WebRTCSink, bitrate: i32) { pub(crate) fn set_bitrate(&mut self, element: &super::BaseWebRTCSink, bitrate: i32) {
match self.factory_name.as_str() { match self.factory_name.as_str() {
"vp8enc" | "vp9enc" => self.element.set_property("target-bitrate", bitrate), "vp8enc" | "vp9enc" => self.element.set_property("target-bitrate", bitrate),
"x264enc" | "nvh264enc" | "vaapih264enc" | "vaapivp8enc" => self "x264enc" | "nvh264enc" | "vaapih264enc" | "vaapivp8enc" => self
@ -795,7 +795,7 @@ impl State {
} }
} }
fn should_start_signaller(&mut self, element: &super::WebRTCSink) -> bool { fn should_start_signaller(&mut self, element: &super::BaseWebRTCSink) -> bool {
self.signaller_state == SignallerState::Stopped self.signaller_state == SignallerState::Stopped
&& element.current_state() >= gst::State::Paused && element.current_state() >= gst::State::Paused
&& self.codec_discovery_done && self.codec_discovery_done
@ -854,7 +854,7 @@ impl Session {
/// to a given WebRTCPad /// to a given WebRTCPad
fn connect_input_stream( fn connect_input_stream(
&mut self, &mut self,
element: &super::WebRTCSink, element: &super::BaseWebRTCSink,
producer: &StreamProducer, producer: &StreamProducer,
webrtc_pad: &WebRTCPad, webrtc_pad: &WebRTCPad,
codecs: &BTreeMap<i32, Codec>, codecs: &BTreeMap<i32, Codec>,
@ -1045,7 +1045,7 @@ impl Drop for PipelineWrapper {
impl InputStream { impl InputStream {
/// Called when transitioning state up to Paused /// Called when transitioning state up to Paused
fn prepare(&mut self, element: &super::WebRTCSink) -> Result<(), Error> { fn prepare(&mut self, element: &super::BaseWebRTCSink) -> Result<(), Error> {
let clocksync = make_element("clocksync", None)?; let clocksync = make_element("clocksync", None)?;
let appsink = make_element("appsink", None)? let appsink = make_element("appsink", None)?
.downcast::<gst_app::AppSink>() .downcast::<gst_app::AppSink>()
@ -1072,7 +1072,7 @@ impl InputStream {
} }
/// Called when transitioning state back down to Ready /// Called when transitioning state back down to Ready
fn unprepare(&mut self, element: &super::WebRTCSink) { fn unprepare(&mut self, element: &super::BaseWebRTCSink) {
self.sink_pad.set_target(None::<&gst::Pad>).unwrap(); self.sink_pad.set_target(None::<&gst::Pad>).unwrap();
if let Some(clocksync) = self.clocksync.take() { if let Some(clocksync) = self.clocksync.take() {
@ -1089,7 +1089,7 @@ impl InputStream {
} }
impl NavigationEventHandler { impl NavigationEventHandler {
fn new(element: &super::WebRTCSink, webrtcbin: &gst::Element) -> Self { fn new(element: &super::BaseWebRTCSink, webrtcbin: &gst::Element) -> Self {
gst::info!(CAT, "Creating navigation data channel"); gst::info!(CAT, "Creating navigation data channel");
let channel = webrtcbin.emit_by_name::<WebRTCDataChannel>( let channel = webrtcbin.emit_by_name::<WebRTCDataChannel>(
"create-data-channel", "create-data-channel",
@ -1117,8 +1117,11 @@ impl NavigationEventHandler {
} }
} }
impl WebRTCSink { impl BaseWebRTCSink {
fn generate_ssrc(element: &super::WebRTCSink, webrtc_pads: &HashMap<u32, WebRTCPad>) -> u32 { fn generate_ssrc(
element: &super::BaseWebRTCSink,
webrtc_pads: &HashMap<u32, WebRTCPad>,
) -> u32 {
loop { loop {
let ret = fastrand::u32(..); let ret = fastrand::u32(..);
@ -1130,12 +1133,12 @@ impl WebRTCSink {
} }
fn request_inactive_webrtcbin_pad( fn request_inactive_webrtcbin_pad(
element: &super::WebRTCSink, element: &super::BaseWebRTCSink,
webrtcbin: &gst::Element, webrtcbin: &gst::Element,
webrtc_pads: &mut HashMap<u32, WebRTCPad>, webrtc_pads: &mut HashMap<u32, WebRTCPad>,
is_video: bool, is_video: bool,
) { ) {
let ssrc = WebRTCSink::generate_ssrc(element, webrtc_pads); let ssrc = BaseWebRTCSink::generate_ssrc(element, webrtc_pads);
let media_idx = webrtc_pads.len() as i32; let media_idx = webrtc_pads.len() as i32;
let pad = webrtcbin let pad = webrtcbin
@ -1169,7 +1172,7 @@ impl WebRTCSink {
} }
async fn request_webrtcbin_pad( async fn request_webrtcbin_pad(
element: &super::WebRTCSink, element: &super::BaseWebRTCSink,
webrtcbin: &gst::Element, webrtcbin: &gst::Element,
stream: &InputStream, stream: &InputStream,
media: Option<&gst_sdp::SDPMediaRef>, media: Option<&gst_sdp::SDPMediaRef>,
@ -1177,7 +1180,7 @@ impl WebRTCSink {
webrtc_pads: &mut HashMap<u32, WebRTCPad>, webrtc_pads: &mut HashMap<u32, WebRTCPad>,
codecs: &mut BTreeMap<i32, Codec>, codecs: &mut BTreeMap<i32, Codec>,
) { ) {
let ssrc = WebRTCSink::generate_ssrc(element, webrtc_pads); let ssrc = BaseWebRTCSink::generate_ssrc(element, webrtc_pads);
let media_idx = webrtc_pads.len() as i32; let media_idx = webrtc_pads.len() as i32;
let mut payloader_caps = match media { let mut payloader_caps = match media {
@ -1192,7 +1195,7 @@ impl WebRTCSink {
gst::Rank::Marginal, gst::Rank::Marginal,
); );
let codec = WebRTCSink::select_codec( let codec = BaseWebRTCSink::select_codec(
element, element,
&encoders, &encoders,
&payloaders, &payloaders,
@ -1224,7 +1227,7 @@ impl WebRTCSink {
}; };
if payloader_caps.is_empty() { if payloader_caps.is_empty() {
WebRTCSink::request_inactive_webrtcbin_pad( BaseWebRTCSink::request_inactive_webrtcbin_pad(
element, element,
webrtcbin, webrtcbin,
webrtc_pads, webrtc_pads,
@ -1336,7 +1339,7 @@ impl WebRTCSink {
/// Prepare for accepting consumers, by setting /// Prepare for accepting consumers, by setting
/// up StreamProducers for each of our sink pads /// up StreamProducers for each of our sink pads
fn prepare(&self, element: &super::WebRTCSink) -> Result<(), Error> { fn prepare(&self, element: &super::BaseWebRTCSink) -> Result<(), Error> {
gst::debug!(CAT, obj: element, "preparing"); gst::debug!(CAT, obj: element, "preparing");
self.state self.state
@ -1351,7 +1354,7 @@ impl WebRTCSink {
/// Unprepare by stopping consumers, then the signaller object. /// Unprepare by stopping consumers, then the signaller object.
/// Might abort codec discovery /// Might abort codec discovery
fn unprepare(&self, element: &super::WebRTCSink) -> Result<(), Error> { fn unprepare(&self, element: &super::BaseWebRTCSink) -> Result<(), Error> {
gst::info!(CAT, obj: element, "unpreparing"); gst::info!(CAT, obj: element, "unpreparing");
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
@ -1503,14 +1506,14 @@ impl WebRTCSink {
} }
/// Called by the signaller when it wants to shut down gracefully /// Called by the signaller when it wants to shut down gracefully
fn shutdown(&self, element: &super::WebRTCSink) { fn shutdown(&self, element: &super::BaseWebRTCSink) {
gst::info!(CAT, "Shutting down"); gst::info!(CAT, "Shutting down");
let _ = element.post_message(gst::message::Eos::builder().src(element).build()); let _ = element.post_message(gst::message::Eos::builder().src(element).build());
} }
fn on_offer_created( fn on_offer_created(
&self, &self,
_element: &super::WebRTCSink, _element: &super::BaseWebRTCSink,
offer: gst_webrtc::WebRTCSessionDescription, offer: gst_webrtc::WebRTCSessionDescription,
session_id: &str, session_id: &str,
) { ) {
@ -1531,7 +1534,7 @@ impl WebRTCSink {
fn on_answer_created( fn on_answer_created(
&self, &self,
element: &super::WebRTCSink, element: &super::BaseWebRTCSink,
answer: gst_webrtc::WebRTCSessionDescription, answer: gst_webrtc::WebRTCSessionDescription,
session_id: &str, session_id: &str,
) { ) {
@ -1567,7 +1570,7 @@ impl WebRTCSink {
} }
} }
fn on_remote_description_offer_set(&self, element: &super::WebRTCSink, session_id: String) { fn on_remote_description_offer_set(&self, element: &super::BaseWebRTCSink, session_id: String) {
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
if let Some(session) = state.sessions.get(&session_id) { if let Some(session) = state.sessions.get(&session_id) {
@ -1663,7 +1666,7 @@ impl WebRTCSink {
} }
async fn select_codec( async fn select_codec(
element: &super::WebRTCSink, element: &super::BaseWebRTCSink,
encoders: &gst::glib::List<gst::ElementFactory>, encoders: &gst::glib::List<gst::ElementFactory>,
payloaders: &gst::glib::List<gst::ElementFactory>, payloaders: &gst::glib::List<gst::ElementFactory>,
media: &gst_sdp::SDPMediaRef, media: &gst_sdp::SDPMediaRef,
@ -1713,7 +1716,7 @@ impl WebRTCSink {
let encoding_name = s.get::<String>("encoding-name").unwrap(); let encoding_name = s.get::<String>("encoding-name").unwrap();
if let Some(codec) = if let Some(codec) =
WebRTCSink::build_codec(&encoding_name, payload, encoders, payloaders) BaseWebRTCSink::build_codec(&encoding_name, payload, encoders, payloaders)
{ {
for (user_caps, codecs_and_caps) in ordered_codecs_and_caps.iter_mut() { for (user_caps, codecs_and_caps) in ordered_codecs_and_caps.iter_mut() {
if codec.caps.is_subset(user_caps) { if codec.caps.is_subset(user_caps) {
@ -1751,7 +1754,7 @@ impl WebRTCSink {
.iter() .iter()
.flat_map(|(_, codecs_and_caps)| codecs_and_caps) .flat_map(|(_, codecs_and_caps)| codecs_and_caps)
.map(|(codec, caps)| async move { .map(|(codec, caps)| async move {
WebRTCSink::run_discovery_pipeline(element, codec, in_caps, caps, twcc_idx) BaseWebRTCSink::run_discovery_pipeline(element, codec, in_caps, caps, twcc_idx)
.await .await
.map(|s| { .map(|s| {
let mut codec = codec.clone(); let mut codec = codec.clone();
@ -1772,7 +1775,7 @@ impl WebRTCSink {
fn negotiate( fn negotiate(
&self, &self,
element: &super::WebRTCSink, element: &super::BaseWebRTCSink,
session_id: &str, session_id: &str,
offer: Option<&gst_webrtc::WebRTCSessionDescription>, offer: Option<&gst_webrtc::WebRTCSessionDescription>,
) { ) {
@ -1866,7 +1869,7 @@ impl WebRTCSink {
fn on_ice_candidate( fn on_ice_candidate(
&self, &self,
_element: &super::WebRTCSink, _element: &super::BaseWebRTCSink,
session_id: String, session_id: String,
sdp_m_line_index: u32, sdp_m_line_index: u32,
candidate: String, candidate: String,
@ -2267,7 +2270,7 @@ impl WebRTCSink {
media_is_video == stream_is_video media_is_video == stream_is_video
}) { }) {
let stream = streams.remove(idx); let stream = streams.remove(idx);
WebRTCSink::request_webrtcbin_pad( BaseWebRTCSink::request_webrtcbin_pad(
&element, &element,
&webrtcbin, &webrtcbin,
&stream, &stream,
@ -2278,7 +2281,7 @@ impl WebRTCSink {
) )
.await; .await;
} else { } else {
WebRTCSink::request_inactive_webrtcbin_pad( BaseWebRTCSink::request_inactive_webrtcbin_pad(
&element, &element,
&webrtcbin, &webrtcbin,
&mut webrtc_pads, &mut webrtc_pads,
@ -2288,7 +2291,7 @@ impl WebRTCSink {
} }
} else { } else {
for stream in streams { for stream in streams {
WebRTCSink::request_webrtcbin_pad( BaseWebRTCSink::request_webrtcbin_pad(
&element, &element,
&webrtcbin, &webrtcbin,
&stream, &stream,
@ -2359,7 +2362,7 @@ impl WebRTCSink {
/// Called by the signaller to remove a consumer /// Called by the signaller to remove a consumer
fn remove_session( fn remove_session(
&self, &self,
element: &super::WebRTCSink, element: &super::BaseWebRTCSink,
session_id: &str, session_id: &str,
signal: bool, signal: bool,
) -> Result<(), WebRTCSinkError> { ) -> Result<(), WebRTCSinkError> {
@ -2387,7 +2390,7 @@ impl WebRTCSink {
fn process_loss_stats( fn process_loss_stats(
&self, &self,
element: &super::WebRTCSink, element: &super::BaseWebRTCSink,
session_id: &str, session_id: &str,
stats: &gst::Structure, stats: &gst::Structure,
) { ) {
@ -2402,7 +2405,7 @@ impl WebRTCSink {
fn process_stats( fn process_stats(
&self, &self,
element: &super::WebRTCSink, element: &super::BaseWebRTCSink,
webrtcbin: gst::Element, webrtcbin: gst::Element,
session_id: &str, session_id: &str,
) { ) {
@ -2425,7 +2428,12 @@ impl WebRTCSink {
webrtcbin.emit_by_name::<()>("get-stats", &[&None::<gst::Pad>, &promise]); webrtcbin.emit_by_name::<()>("get-stats", &[&None::<gst::Pad>, &promise]);
} }
fn set_rtptrxsend(&self, element: &super::WebRTCSink, peer_id: &str, rtprtxsend: gst::Element) { fn set_rtptrxsend(
&self,
element: &super::BaseWebRTCSink,
peer_id: &str,
rtprtxsend: gst::Element,
) {
let mut state = element.imp().state.lock().unwrap(); let mut state = element.imp().state.lock().unwrap();
if let Some(session) = state.sessions.get_mut(peer_id) { if let Some(session) = state.sessions.get_mut(peer_id) {
@ -2433,7 +2441,7 @@ impl WebRTCSink {
} }
} }
fn set_bitrate(&self, element: &super::WebRTCSink, peer_id: &str, bitrate: u32) { fn set_bitrate(&self, element: &super::BaseWebRTCSink, peer_id: &str, bitrate: u32) {
let settings = element.imp().settings.lock().unwrap(); let settings = element.imp().settings.lock().unwrap();
let mut state = element.imp().state.lock().unwrap(); let mut state = element.imp().state.lock().unwrap();
@ -2467,7 +2475,7 @@ impl WebRTCSink {
} }
} }
fn on_remote_description_set(&self, element: &super::WebRTCSink, session_id: String) { fn on_remote_description_set(&self, element: &super::BaseWebRTCSink, session_id: String) {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
let mut remove = false; let mut remove = false;
let codecs = state.codecs.clone(); let codecs = state.codecs.clone();
@ -2587,7 +2595,7 @@ impl WebRTCSink {
fn handle_sdp_answer( fn handle_sdp_answer(
&self, &self,
element: &super::WebRTCSink, element: &super::BaseWebRTCSink,
session_id: &str, session_id: &str,
desc: &gst_webrtc::WebRTCSessionDescription, desc: &gst_webrtc::WebRTCSessionDescription,
) { ) {
@ -2681,7 +2689,7 @@ impl WebRTCSink {
} }
async fn run_discovery_pipeline( async fn run_discovery_pipeline(
element: &super::WebRTCSink, element: &super::BaseWebRTCSink,
codec: &Codec, codec: &Codec,
caps: &gst::Caps, caps: &gst::Caps,
output_caps: &gst::Caps, output_caps: &gst::Caps,
@ -2802,7 +2810,7 @@ impl WebRTCSink {
} }
async fn lookup_caps( async fn lookup_caps(
element: &super::WebRTCSink, element: &super::BaseWebRTCSink,
name: String, name: String,
in_caps: gst::Caps, in_caps: gst::Caps,
output_caps: gst::Caps, output_caps: gst::Caps,
@ -2823,7 +2831,7 @@ impl WebRTCSink {
.iter() .iter()
.filter(|(_, codec)| codec.is_video() == is_video) .filter(|(_, codec)| codec.is_video() == is_video)
.map(|(_, codec)| { .map(|(_, codec)| {
WebRTCSink::run_discovery_pipeline( BaseWebRTCSink::run_discovery_pipeline(
element, element,
codec, codec,
&sink_caps, &sink_caps,
@ -2854,7 +2862,7 @@ impl WebRTCSink {
(name, payloader_caps) (name, payloader_caps)
} }
async fn lookup_streams_caps(&self, element: &super::WebRTCSink) -> Result<(), Error> { async fn lookup_streams_caps(&self, element: &super::BaseWebRTCSink) -> Result<(), Error> {
let codecs = self.lookup_codecs(); let codecs = self.lookup_codecs();
gst::debug!(CAT, obj: element, "Looked up codecs {codecs:?}"); gst::debug!(CAT, obj: element, "Looked up codecs {codecs:?}");
@ -2866,7 +2874,7 @@ impl WebRTCSink {
.streams .streams
.iter() .iter()
.map(|(name, stream)| { .map(|(name, stream)| {
WebRTCSink::lookup_caps( BaseWebRTCSink::lookup_caps(
element, element,
name.to_owned(), name.to_owned(),
stream.in_caps.as_ref().unwrap().to_owned(), stream.in_caps.as_ref().unwrap().to_owned(),
@ -2907,7 +2915,12 @@ impl WebRTCSink {
) )
} }
fn sink_event(&self, pad: &gst::Pad, element: &super::WebRTCSink, event: gst::Event) -> bool { fn sink_event(
&self,
pad: &gst::Pad,
element: &super::BaseWebRTCSink,
event: gst::Event,
) -> bool {
use gst::EventView; use gst::EventView;
match event.view() { match event.view() {
@ -2995,22 +3008,22 @@ impl WebRTCSink {
} }
#[glib::object_subclass] #[glib::object_subclass]
impl ObjectSubclass for WebRTCSink { impl ObjectSubclass for BaseWebRTCSink {
const NAME: &'static str = "GstWebRTCSink"; const NAME: &'static str = "GstBaseWebRTCSink";
type Type = super::WebRTCSink; type Type = super::BaseWebRTCSink;
type ParentType = gst::Bin; type ParentType = gst::Bin;
type Interfaces = (gst::ChildProxy, gst_video::Navigation); type Interfaces = (gst::ChildProxy, gst_video::Navigation);
} }
unsafe impl<T: WebRTCSinkImpl> IsSubclassable<T> for super::WebRTCSink { unsafe impl<T: BaseWebRTCSinkImpl> IsSubclassable<T> for super::BaseWebRTCSink {
fn class_init(class: &mut glib::Class<Self>) { fn class_init(class: &mut glib::Class<Self>) {
Self::parent_class_init::<T>(class); Self::parent_class_init::<T>(class);
} }
} }
pub(crate) trait WebRTCSinkImpl: BinImpl {} pub(crate) trait BaseWebRTCSinkImpl: BinImpl {}
impl ObjectImpl for WebRTCSink { impl ObjectImpl for BaseWebRTCSink {
fn properties() -> &'static [glib::ParamSpec] { fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| { static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![ vec![
@ -3247,7 +3260,7 @@ impl ObjectImpl for WebRTCSink {
static SIGNALS: Lazy<Vec<glib::subclass::Signal>> = Lazy::new(|| { static SIGNALS: Lazy<Vec<glib::subclass::Signal>> = Lazy::new(|| {
vec![ vec![
/** /**
* RsWebRTCSink::consumer-added: * RsBaseWebRTCSink::consumer-added:
* @consumer_id: Identifier of the consumer added * @consumer_id: Identifier of the consumer added
* @webrtcbin: The new webrtcbin * @webrtcbin: The new webrtcbin
* *
@ -3258,7 +3271,7 @@ impl ObjectImpl for WebRTCSink {
.param_types([String::static_type(), gst::Element::static_type()]) .param_types([String::static_type(), gst::Element::static_type()])
.build(), .build(),
/** /**
* RsWebRTCSink::consumer_removed: * RsBaseWebRTCSink::consumer_removed:
* @consumer_id: Identifier of the consumer that was removed * @consumer_id: Identifier of the consumer that was removed
* @webrtcbin: The webrtcbin connected to the newly removed consumer * @webrtcbin: The webrtcbin connected to the newly removed consumer
* *
@ -3269,14 +3282,14 @@ impl ObjectImpl for WebRTCSink {
.param_types([String::static_type(), gst::Element::static_type()]) .param_types([String::static_type(), gst::Element::static_type()])
.build(), .build(),
/** /**
* RsWebRTCSink::get_sessions: * RsBaseWebRTCSink::get_sessions:
* *
* List all sessions (by ID). * List all sessions (by ID).
*/ */
glib::subclass::Signal::builder("get-sessions") glib::subclass::Signal::builder("get-sessions")
.action() .action()
.class_handler(|_, args| { .class_handler(|_, args| {
let element = args[0].get::<super::WebRTCSink>().expect("signal arg"); let element = args[0].get::<super::BaseWebRTCSink>().expect("signal arg");
let this = element.imp(); let this = element.imp();
let res = Some( let res = Some(
@ -3294,7 +3307,7 @@ impl ObjectImpl for WebRTCSink {
.return_type::<Vec<String>>() .return_type::<Vec<String>>()
.build(), .build(),
/** /**
* RsWebRTCSink::encoder-setup: * RsBaseWebRTCSink::encoder-setup:
* @consumer_id: Identifier of the consumer * @consumer_id: Identifier of the consumer
* @pad_name: The name of the corresponding input pad * @pad_name: The name of the corresponding input pad
* @encoder: The constructed encoder * @encoder: The constructed encoder
@ -3313,7 +3326,7 @@ impl ObjectImpl for WebRTCSink {
.return_type::<bool>() .return_type::<bool>()
.accumulator(|_hint, _ret, value| !value.get::<bool>().unwrap()) .accumulator(|_hint, _ret, value| !value.get::<bool>().unwrap())
.class_handler(|_, args| { .class_handler(|_, args| {
let element = args[0].get::<super::WebRTCSink>().expect("signal arg"); let element = args[0].get::<super::BaseWebRTCSink>().expect("signal arg");
let enc = args[3].get::<gst::Element>().unwrap(); let enc = args[3].get::<gst::Element>().unwrap();
gst::debug!( gst::debug!(
@ -3349,22 +3362,9 @@ impl ObjectImpl for WebRTCSink {
} }
} }
impl GstObjectImpl for WebRTCSink {} impl GstObjectImpl for BaseWebRTCSink {}
impl ElementImpl for WebRTCSink {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"WebRTCSink",
"Sink/Network/WebRTC",
"WebRTC sink",
"Mathieu Duponchelle <mathieu@centricular.com>",
)
});
Some(&*ELEMENT_METADATA)
}
impl ElementImpl for BaseWebRTCSink {
fn pad_templates() -> &'static [gst::PadTemplate] { fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| { static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let caps = gst::Caps::builder_full() let caps = gst::Caps::builder_full()
@ -3436,7 +3436,7 @@ impl ElementImpl for WebRTCSink {
let sink_pad = gst::GhostPad::builder_with_template(templ, Some(name.as_str())) let sink_pad = gst::GhostPad::builder_with_template(templ, Some(name.as_str()))
.event_function(|pad, parent, event| { .event_function(|pad, parent, event| {
WebRTCSink::catch_panic_pad_function( BaseWebRTCSink::catch_panic_pad_function(
parent, parent,
|| false, || false,
|this| this.sink_event(pad.upcast_ref(), &this.obj(), event), |this| this.sink_event(pad.upcast_ref(), &this.obj(), event),
@ -3514,9 +3514,9 @@ impl ElementImpl for WebRTCSink {
} }
} }
impl BinImpl for WebRTCSink {} impl BinImpl for BaseWebRTCSink {}
impl ChildProxyImpl for WebRTCSink { impl ChildProxyImpl for BaseWebRTCSink {
fn child_by_index(&self, _index: u32) -> Option<glib::Object> { fn child_by_index(&self, _index: u32) -> Option<glib::Object> {
None None
} }
@ -3533,7 +3533,7 @@ impl ChildProxyImpl for WebRTCSink {
} }
} }
impl NavigationImpl for WebRTCSink { impl NavigationImpl for BaseWebRTCSink {
fn send_event(&self, event_def: gst::Structure) { fn send_event(&self, event_def: gst::Structure) {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
let event = gst::event::Navigation::new(event_def); let event = gst::event::Navigation::new(event_def);
@ -3550,13 +3550,46 @@ impl NavigationImpl for WebRTCSink {
} }
} }
#[derive(Default)]
pub struct WebRTCSink {}
impl ObjectImpl for WebRTCSink {}
impl GstObjectImpl for WebRTCSink {}
impl ElementImpl for WebRTCSink {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"WebRTCSink",
"Sink/Network/WebRTC",
"WebRTC sink with custom protocol signaller",
"Mathieu Duponchelle <mathieu@centricular.com>",
)
});
Some(&*ELEMENT_METADATA)
}
}
impl BinImpl for WebRTCSink {}
impl BaseWebRTCSinkImpl for WebRTCSink {}
#[glib::object_subclass]
impl ObjectSubclass for WebRTCSink {
const NAME: &'static str = "GstWebRTCSink";
type Type = super::WebRTCSink;
type ParentType = super::BaseWebRTCSink;
}
#[derive(Default)] #[derive(Default)]
pub struct AwsKvsWebRTCSink {} pub struct AwsKvsWebRTCSink {}
impl ObjectImpl for AwsKvsWebRTCSink { impl ObjectImpl for AwsKvsWebRTCSink {
fn constructed(&self) { fn constructed(&self) {
let element = self.obj(); let element = self.obj();
let ws = element.upcast_ref::<super::WebRTCSink>().imp(); let ws = element.upcast_ref::<super::BaseWebRTCSink>().imp();
let _ = ws.set_signaller(AwsKvsSignaller::default().upcast()); let _ = ws.set_signaller(AwsKvsSignaller::default().upcast());
} }
@ -3564,15 +3597,28 @@ impl ObjectImpl for AwsKvsWebRTCSink {
impl GstObjectImpl for AwsKvsWebRTCSink {} impl GstObjectImpl for AwsKvsWebRTCSink {}
impl ElementImpl for AwsKvsWebRTCSink {} impl ElementImpl for AwsKvsWebRTCSink {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"AwsKvsWebRTCSink",
"Sink/Network/WebRTC",
"WebRTC sink with kinesis video streams signaller",
"Mathieu Duponchelle <mathieu@centricular.com>",
)
});
Some(&*ELEMENT_METADATA)
}
}
impl BinImpl for AwsKvsWebRTCSink {} impl BinImpl for AwsKvsWebRTCSink {}
impl WebRTCSinkImpl for AwsKvsWebRTCSink {} impl BaseWebRTCSinkImpl for AwsKvsWebRTCSink {}
#[glib::object_subclass] #[glib::object_subclass]
impl ObjectSubclass for AwsKvsWebRTCSink { impl ObjectSubclass for AwsKvsWebRTCSink {
const NAME: &'static str = "GstAwsKvsWebRTCSink"; const NAME: &'static str = "GstAwsKvsWebRTCSink";
type Type = super::AwsKvsWebRTCSink; type Type = super::AwsKvsWebRTCSink;
type ParentType = super::WebRTCSink; type ParentType = super::BaseWebRTCSink;
} }

View file

@ -16,11 +16,15 @@ mod homegrown_cc;
mod imp; mod imp;
glib::wrapper! { glib::wrapper! {
pub struct WebRTCSink(ObjectSubclass<imp::WebRTCSink>) @extends gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation; pub struct BaseWebRTCSink(ObjectSubclass<imp::BaseWebRTCSink>) @extends gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation;
} }
glib::wrapper! { glib::wrapper! {
pub struct AwsKvsWebRTCSink(ObjectSubclass<imp::AwsKvsWebRTCSink>) @extends WebRTCSink, gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation; pub struct WebRTCSink(ObjectSubclass<imp::WebRTCSink>) @extends BaseWebRTCSink, gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation;
}
glib::wrapper! {
pub struct AwsKvsWebRTCSink(ObjectSubclass<imp::AwsKvsWebRTCSink>) @extends BaseWebRTCSink, gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation;
} }
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
@ -43,15 +47,15 @@ pub enum WebRTCSinkError {
}, },
} }
impl Default for WebRTCSink { impl Default for BaseWebRTCSink {
fn default() -> Self { fn default() -> Self {
glib::Object::new() glib::Object::new()
} }
} }
impl WebRTCSink { impl BaseWebRTCSink {
pub fn with_signaller(signaller: Signallable) -> Self { pub fn with_signaller(signaller: Signallable) -> Self {
let ret: WebRTCSink = glib::Object::new(); let ret: BaseWebRTCSink = glib::Object::new();
let ws = ret.imp(); let ws = ret.imp();
ws.set_signaller(signaller).unwrap(); ws.set_signaller(signaller).unwrap();
@ -83,6 +87,7 @@ enum WebRTCSinkMitigationMode {
} }
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
BaseWebRTCSink::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
WebRTCSinkCongestionControl::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); WebRTCSinkCongestionControl::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
gst::Element::register( gst::Element::register(
Some(plugin), Some(plugin),