From 9f07ec35e662a1d4c0a4fd833173828b701a08dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim-Philipp=20M=C3=BCller?= Date: Sat, 21 Oct 2023 16:55:03 +0100 Subject: [PATCH] rtp: Add MPEG-TS RTP depayloader Can handle different packet sizes, also see: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1310 Has clock-rate=90000 as spec prescribes, see: https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/-/issues/691 Part-of: --- docs/plugins/gst_plugins_cache.json | 42 ++++ net/rtp/src/lib.rs | 3 + net/rtp/src/mp2t/depay/imp.rs | 316 ++++++++++++++++++++++++++++ net/rtp/src/mp2t/depay/mod.rs | 28 +++ net/rtp/src/mp2t/mod.rs | 3 + 5 files changed, 392 insertions(+) create mode 100644 net/rtp/src/mp2t/depay/imp.rs create mode 100644 net/rtp/src/mp2t/depay/mod.rs create mode 100644 net/rtp/src/mp2t/mod.rs diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 13c92826..2d48a837 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -6374,6 +6374,48 @@ }, "rank": "none" }, + "rtpmp2tdepay2": { + "author": "Tim-Philipp Müller ", + "description": "Depayload an MPEG Transport Stream from RTP packets (RFC 2250)", + "hierarchy": [ + "GstRtpMP2TDepay2", + "GstRtpBaseDepay2", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Codec/Depayloader/Network/RTP", + "pad-templates": { + "sink": { + "caps": "application/x-rtp:\n media: video\n clock-rate: 90000\n encoding-name: MP2T\napplication/x-rtp:\n media: video\n payload: 33\n clock-rate: 90000\n", + "direction": "sink", + "presence": "always" + }, + "src": { + "caps": "video/mpegts:\n packetsize: { (int)188, (int)192, (int)204, (int)208 }\n systemstream: true\n", + "direction": "src", + "presence": "always" + } + }, + "properties": { + "skip-first-bytes": { + "blurb": "Number of bytes to skip at the beginning of the payload", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "-1", + "min": "0", + "mutable": "ready", + "readable": true, + "type": "guint", + "writable": true + } + }, + "rank": "marginal" + }, "rtppcmadepay2": { "author": "Sebastian Dröge ", "description": "Depayload A-law from RTP packets (RFC 3551)", diff --git a/net/rtp/src/lib.rs b/net/rtp/src/lib.rs index 3fa2ee5a..0b4d88dc 100644 --- a/net/rtp/src/lib.rs +++ b/net/rtp/src/lib.rs @@ -24,6 +24,7 @@ mod basedepay; mod basepay; mod av1; +mod mp2t; mod pcmau; #[cfg(test)] @@ -47,6 +48,8 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { av1::depay::register(plugin)?; av1::pay::register(plugin)?; + mp2t::depay::register(plugin)?; + pcmau::depay::register(plugin)?; pcmau::pay::register(plugin)?; diff --git a/net/rtp/src/mp2t/depay/imp.rs b/net/rtp/src/mp2t/depay/imp.rs new file mode 100644 index 00000000..a75a8578 --- /dev/null +++ b/net/rtp/src/mp2t/depay/imp.rs @@ -0,0 +1,316 @@ +// GStreamer RTP MPEG-TS Depayloader +// +// Copyright (C) 2023-2024 Tim-Philipp Müller +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * SECTION:element-rtpmp2tdepay2 + * @see_also: rtpmp2tpay2, rtpmp2tdepay, rtpmp2tpay, tsdemux, mpegtsmux + * + * Depayload an MPEG Transport Stream from RTP packets as per [RFC 2250][rfc-2250]. + * + * [rfc-2250]: https://www.rfc-editor.org/rfc/rfc2250.html + * + * ## Example pipeline + * + * |[ + * gst-launch-1.0 udpsrc address=127.0.0.1 port=5555 caps='application/x-rtp,media=video,clock-rate=90000,encoding-name=MP2T' ! rtpjitterbuffer latency=100 ! rtpmp2tdepay2 ! decodebin3 ! videoconvertscale ! autovideosink + * ]| This will depayload an incoming RTP MPEG-TS stream. You can use the #rtpmp2tpay2 or #rtpmp2tpay + * element to create such an RTP stream. + * + * Since: plugins-rs-0.13.0 + */ +use atomic_refcell::AtomicRefCell; + +use gst::{glib, prelude::*, subclass::prelude::*}; + +use once_cell::sync::Lazy; + +use std::num::NonZeroUsize; +use std::sync::Mutex; + +use crate::basedepay::RtpBaseDepay2Ext; + +const TS_PACKET_SYNC: u8 = 0x47; + +#[derive(Default)] +pub struct RtpMP2TDepay { + state: AtomicRefCell, + settings: Mutex, +} + +#[derive(Default)] +struct State { + packet_size: Option, + bytes_to_skip: usize, +} + +#[derive(Debug, Clone)] +struct Settings { + skip_first_bytes: u32, +} + +const DEFAULT_SKIP_FIRST_BYTES: u32 = 0; + +impl Default for Settings { + fn default() -> Self { + Settings { + skip_first_bytes: DEFAULT_SKIP_FIRST_BYTES, + } + } +} + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "rtpmp2tdepay2", + gst::DebugColorFlags::empty(), + Some("RTP MPEG-TS Depayloader"), + ) +}); + +#[glib::object_subclass] +impl ObjectSubclass for RtpMP2TDepay { + const NAME: &'static str = "GstRtpMP2TDepay2"; + type Type = super::RtpMP2TDepay; + type ParentType = crate::basedepay::RtpBaseDepay2; +} + +impl ObjectImpl for RtpMP2TDepay { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![glib::ParamSpecUInt::builder("skip-first-bytes") + .nick("Skip first bytes") + .blurb("Number of bytes to skip at the beginning of the payload") + .default_value(DEFAULT_SKIP_FIRST_BYTES) + .mutable_ready() + .build()] + }); + + PROPERTIES.as_ref() + } + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "skip-first-bytes" => { + let mut settings = self.settings.lock().unwrap(); + settings.skip_first_bytes = value.get().expect("type checked upstream"); + } + name => unimplemented!("Property '{name}'"), + }; + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "skip-first-bytes" => { + let settings = self.settings.lock().unwrap(); + settings.skip_first_bytes.to_value() + } + name => unimplemented!("Property '{name}'"), + } + } +} + +impl GstObjectImpl for RtpMP2TDepay {} + +impl ElementImpl for RtpMP2TDepay { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "RTP MPEG-TS Depayloader", + "Codec/Depayloader/Network/RTP", + "Depayload an MPEG Transport Stream from RTP packets (RFC 2250)", + "Tim-Philipp Müller ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &gst::Caps::builder_full() + .structure( + // Note: C depayloader accepts MP2T-ES as well but that was just for + // backward compatibility because the GStreamer 0.10 payloader used + // to (wrongly) produce that at some point a long time ago. + // Also spec (and common sense) say clock-rate should always be 90000 + // (C depayloader accepts any clock rate in caps), see + // https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/-/issues/691 + gst::Structure::builder("application/x-rtp") + .field("media", "video") + .field("clock-rate", 90000i32) + .field("encoding-name", "MP2T") + .build(), + ) + .structure( + gst::Structure::builder("application/x-rtp") + .field("media", "video") + .field("payload", 33i32) + .field("clock-rate", 90000i32) + .build(), + ) + .build(), + ) + .unwrap(); + + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::builder("video/mpegts") + .field("packetsize", gst::List::new([188i32, 192, 204, 208])) + .field("systemstream", true) + .build(), + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} + +impl crate::basedepay::RtpBaseDepay2Impl for RtpMP2TDepay { + fn start(&self) -> Result<(), gst::ErrorMessage> { + let settings = self.settings.lock().unwrap(); + // Copy skip bytes into state so we don't have to take the settings lock all the time + *self.state.borrow_mut() = State { + packet_size: None, + bytes_to_skip: settings.skip_first_bytes as usize, + }; + + Ok(()) + } + + fn stop(&self) -> Result<(), gst::ErrorMessage> { + *self.state.borrow_mut() = State::default(); + + Ok(()) + } + + // Encapsulation of MPEG System and Transport Streams: + // https://www.rfc-editor.org/rfc/rfc2250.html#section-2 + // + fn handle_packet( + &self, + packet: &crate::basedepay::Packet, + ) -> Result { + let mut state = self.state.borrow_mut(); + let bytes_to_skip = state.bytes_to_skip; + + let payload = packet.payload(); + + if payload.len() < 188 + bytes_to_skip { + gst::warning!(CAT, imp: self, + "Payload too small: {} bytes, but need at least {} bytes", + payload.len(), 188 + bytes_to_skip + ); + + self.obj().drop_packet(packet); + + return Ok(gst::FlowSuccess::Ok); + } + + let (_, payload) = payload.split_at(bytes_to_skip); + + if state.packet_size.is_none() { + state.packet_size = self.detect_packet_size(payload); + + if let Some(packet_size) = state.packet_size { + let src_caps = gst::Caps::builder("video/mpegts") + .field("packetsize", packet_size.get() as i32) + .field("systemstream", true) + .build(); + + self.obj().set_src_caps(&src_caps); + } + } + + let Some(packet_size) = state.packet_size else { + gst::debug!(CAT, imp: self, "Could not determine packet size, dropping packet {packet:?}"); + self.obj().drop_packet(packet); + return Ok(gst::FlowSuccess::Ok); + }; + + let packet_size = packet_size.get(); + + // For MPEG2 Transport Streams the RTP payload will contain an integral + // number of MPEG transport packets. + let n_packets = payload.len() / packet_size; + + if payload.len() % packet_size != 0 { + gst::warning!(CAT, imp: self, + "Payload does not contain an integral number of MPEG-TS packets! ({} left over)", + payload.len() % packet_size); + } + + let output_size = n_packets * packet_size; + + gst::trace!(CAT, imp: self, "Packet with {n_packets} MPEG-TS packets of size {packet_size}"); + + let mut buffer = + packet.payload_subbuffer_from_offset_with_length(bytes_to_skip, output_size); + + // Marker flag indicates MPEG-TS timestamping discontinuity + if packet.marker_bit() { + let buffer_ref = buffer.get_mut().unwrap(); + buffer_ref.set_flags(gst::BufferFlags::RESYNC); + } + + gst::trace!(CAT, imp: self, "Finishing buffer {buffer:?}"); + + self.obj().queue_buffer(packet.into(), buffer) + } +} + +impl RtpMP2TDepay { + fn detect_packet_size(&self, payload: &[u8]) -> Option { + const PACKET_SIZES: [(usize, usize); 4] = [(188, 0), (192, 4), (204, 0), (208, 0)]; + + for (size, offset) in PACKET_SIZES { + gst::debug!(CAT, imp: self, "Trying MPEG-TS packet size of {size} bytes.."); + + // Try exact size match for the payload first + if payload.len() >= size + && payload.len() % size == 0 + && payload + .chunks_exact(size) + .all(|packet| packet[offset] == TS_PACKET_SYNC) + { + gst::info!(CAT, imp: self, "Detected MPEG-TS packet size of {size} bytes, {} packets", payload.len() / size); + return NonZeroUsize::new(size); + } + } + + gst::warning!(CAT, imp: self, "Could not detect MPEG-TS packet size using full payload"); + + // No match? Try if we find a size if we ignore any leftover bytes + for (size, offset) in PACKET_SIZES { + gst::debug!(CAT, imp: self, "Trying MPEG-TS packet size of {size} bytes with remainder.."); + + if payload.len() >= size + && payload.len() % size != 0 + && payload + .chunks_exact(size) + .all(|packet| packet[offset] == TS_PACKET_SYNC) + { + gst::info!(CAT, imp: self, "Detected MPEG-TS packet size of {size} bytes, {} packets, {} bytes leftover", + payload.len() / size, payload.len() % size); + return NonZeroUsize::new(size); + } + } + + gst::warning!(CAT, imp: self, "Could not detect MPEG-TS packet size"); + + None + } +} diff --git a/net/rtp/src/mp2t/depay/mod.rs b/net/rtp/src/mp2t/depay/mod.rs new file mode 100644 index 00000000..c9cdda6c --- /dev/null +++ b/net/rtp/src/mp2t/depay/mod.rs @@ -0,0 +1,28 @@ +// GStreamer RTP MPEG-TS Depayloader +// +// Copyright (C) 2023-2024 Tim-Philipp Müller +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +pub mod imp; + +glib::wrapper! { + pub struct RtpMP2TDepay(ObjectSubclass) + @extends crate::basedepay::RtpBaseDepay2, gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "rtpmp2tdepay2", + gst::Rank::MARGINAL, + RtpMP2TDepay::static_type(), + ) +} diff --git a/net/rtp/src/mp2t/mod.rs b/net/rtp/src/mp2t/mod.rs new file mode 100644 index 00000000..de9fb4b9 --- /dev/null +++ b/net/rtp/src/mp2t/mod.rs @@ -0,0 +1,3 @@ +// SPDX-License-Identifier: MPL-2.0 + +pub mod depay;