diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 2d48a837..755ba1c2 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -6416,6 +6416,32 @@ }, "rank": "marginal" }, + "rtpmp2tpay2": { + "author": "Tim-Philipp Müller ", + "description": "Payload an MPEG Transport Stream into RTP packets (RFC 2250)", + "hierarchy": [ + "GstRtpMP2TPay2", + "GstRtpBasePay2", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Codec/Payloader/Network/RTP", + "pad-templates": { + "sink": { + "caps": "video/mpegts:\n packetsize: { (int)188, (int)192, (int)204, (int)208 }\n systemstream: true\n", + "direction": "sink", + "presence": "always" + }, + "src": { + "caps": "application/x-rtp:\n media: video\n clock-rate: 90000\n encoding-name: MP2T\napplication/x-rtp:\n media: video\n payload: 33\n clock-rate: 90000\n", + "direction": "src", + "presence": "always" + } + }, + "rank": "marginal" + }, "rtppcmadepay2": { "author": "Sebastian Dröge ", "description": "Depayload A-law from RTP packets (RFC 3551)", diff --git a/net/rtp/src/lib.rs b/net/rtp/src/lib.rs index 0b4d88dc..7d263ae3 100644 --- a/net/rtp/src/lib.rs +++ b/net/rtp/src/lib.rs @@ -49,6 +49,7 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { av1::pay::register(plugin)?; mp2t::depay::register(plugin)?; + mp2t::pay::register(plugin)?; pcmau::depay::register(plugin)?; pcmau::pay::register(plugin)?; diff --git a/net/rtp/src/mp2t/mod.rs b/net/rtp/src/mp2t/mod.rs index de9fb4b9..7ec136ac 100644 --- a/net/rtp/src/mp2t/mod.rs +++ b/net/rtp/src/mp2t/mod.rs @@ -1,3 +1,4 @@ // SPDX-License-Identifier: MPL-2.0 pub mod depay; +pub mod pay; diff --git a/net/rtp/src/mp2t/pay/imp.rs b/net/rtp/src/mp2t/pay/imp.rs new file mode 100644 index 00000000..3624bc90 --- /dev/null +++ b/net/rtp/src/mp2t/pay/imp.rs @@ -0,0 +1,388 @@ +// GStreamer RTP MPEG-TS Payloader +// +// Copyright (C) 2023-2024 Tim-Philipp Müller +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * SECTION:element-rtpmp2tpay2 + * @see_also: rtpmp2tdepay2, rtpmp2tdepay, rtpmp2tpay, tsdemux, mpegtsmux + * + * Payload an MPEG Transport Stream into RTP packets as per [RFC 2250][rfc-2250]. + * + * [rfc-2250]: https://www.rfc-editor.org/rfc/rfc2250.html + * + * ## Example pipeline + * + * |[ + * gst-launch-1.0 videotestsrc ! video/x-raw,width=1280,height=720,format=I420 ! timeoverlay font-desc=Sans,22 ! x264enc tune=zerolatency ! mpegtsmux alignment=7 ! rtpmp2tpay2 ! udpsink host=127.0.0.1 port=5555 + * ]| This will create and payload an MPEG-TS stream with a test pattern and send it out via UDP. + * + * Since: plugins-rs-0.13.0 + */ +use atomic_refcell::AtomicRefCell; + +use gst::{glib, prelude::*, subclass::prelude::*}; + +use once_cell::sync::Lazy; + +use std::num::NonZeroUsize; + +use crate::basepay::{PacketToBufferRelation, RtpBasePay2Ext}; + +const RTP_MP2T_DEFAULT_PT: u32 = 33; + +const RTP_MP2T_DEFAULT_PACKET_SIZE: usize = 188; + +#[derive(Default)] +pub struct RtpMP2TPay { + state: AtomicRefCell, +} + +#[derive(Default)] +struct PendingData { + data: Vec, + id: Option, +} + +impl PendingData { + fn clear(&mut self) { + self.data.clear(); + self.id = None; + } + + fn add(&mut self, data: &[u8], id: u64) { + self.id = self.id.or(Some(id)); + self.data.extend_from_slice(data); + } + + fn len(&self) -> usize { + self.data.len() + } + + fn is_empty(&self) -> bool { + self.data.is_empty() + } + + fn id(&self) -> u64 { + self.id.unwrap() + } +} + +struct State { + packet_size: Option, + discont_pending: bool, + + // Leftover data for the next packet + pending_data: PendingData, +} + +impl Default for State { + fn default() -> Self { + State { + packet_size: NonZeroUsize::new(RTP_MP2T_DEFAULT_PACKET_SIZE), + discont_pending: true, + pending_data: Default::default(), + } + } +} + +impl State { + fn want_marker_bit(&mut self) -> bool { + // Clear discont_pending and return current value + std::mem::replace(&mut self.discont_pending, false) + } +} + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "rtpmp2tpay2", + gst::DebugColorFlags::empty(), + Some("RTP MPEG-TS Payloader"), + ) +}); + +#[glib::object_subclass] +impl ObjectSubclass for RtpMP2TPay { + const NAME: &'static str = "GstRtpMP2TPay2"; + type Type = super::RtpMP2TPay; + type ParentType = crate::basepay::RtpBasePay2; +} + +impl ObjectImpl for RtpMP2TPay { + fn constructed(&self) { + self.parent_constructed(); + + self.obj().set_property("pt", RTP_MP2T_DEFAULT_PT); + } +} + +impl GstObjectImpl for RtpMP2TPay {} + +impl ElementImpl for RtpMP2TPay { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "RTP MPEG-TS Payloader", + "Codec/Payloader/Network/RTP", + "Payload an MPEG Transport Stream into RTP packets (RFC 2250)", + "Tim-Philipp Müller ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::builder_full() + .structure( + gst::Structure::builder("application/x-rtp") + .field("media", "video") + .field("clock-rate", 90000i32) + .field("encoding-name", "MP2T") + .build(), + ) + .structure( + gst::Structure::builder("application/x-rtp") + .field("media", "video") + .field("payload", 33i32) + .field("clock-rate", 90000i32) + .build(), + ) + .build(), + ) + .unwrap(); + + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &gst::Caps::builder("video/mpegts") + .field("packetsize", gst::List::new([188i32, 192, 204, 208])) + .field("systemstream", true) + .build(), + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} + +impl crate::basepay::RtpBasePay2Impl for RtpMP2TPay { + const ALLOWED_META_TAGS: &'static [&'static str] = &[]; + + fn set_sink_caps(&self, caps: &gst::Caps) -> bool { + let s = caps.structure(0).unwrap(); + + // Template caps have the field, so we know it exists and is one of the valid sizes + let packet_size = s.get::("packetsize").unwrap() as usize; + assert!(packet_size > 0); + + // Make sure configured MTU is large enough + let max_payload_size = self.obj().max_payload_size() as usize; + + if packet_size > max_payload_size { + gst::element_imp_error!( + self, + gst::LibraryError::Settings, + ("Configured MTU is too small"), + ["Payloader MTU {max_payload_size} must be able to fit at least one MPEG-TS packet of size {packet_size}"] + ); + return false; + } + + let src_caps = gst::Caps::builder("application/x-rtp") + .field("media", "video") + .field("encoding-name", "MP2T") + .field("clock-rate", 90000i32) + .build(); + + self.obj().set_src_caps(&src_caps); + + let mut state = self.state.borrow_mut(); + state.packet_size = NonZeroUsize::new(packet_size); + + true + } + + fn drain(&self) -> Result { + let mut state = self.state.borrow_mut(); + self.send_pending_data(&mut state) + } + + fn flush(&self) { + let mut state = self.state.borrow_mut(); + + state.pending_data.clear(); + state.discont_pending = true; + } + + // Encapsulation of MPEG System and Transport Streams: + // https://www.rfc-editor.org/rfc/rfc2250.html#section-2 + // + fn handle_buffer( + &self, + buffer: &gst::Buffer, + id: u64, + ) -> Result { + let mut state = self.state.borrow_mut(); + + let packet_size = match state.packet_size { + Some(s) => s.get(), + None => return Err(gst::FlowError::NotNegotiated), + }; + + // https://www.rfc-editor.org/rfc/rfc2250.html#section-2.1 + // + // Set marker flag whenever the timestamp is discontinuous. + if buffer.flags().contains(gst::BufferFlags::DISCONT) { + gst::debug!(CAT, imp: self, "discont, pushing out pending packets"); + self.send_pending_data(&mut state)?; + self.obj().finish_pending_packets()?; + state.discont_pending = true; + } + + let map = buffer.map_readable().map_err(|_| { + gst::error!(CAT, imp: self, "Can't map buffer readable"); + gst::FlowError::Error + })?; + + if map.size() % packet_size != 0 { + gst::element_imp_error!( + self, + gst::StreamError::Format, + ("MPEG-TS input is not properly framed"), + [ + "MPEG-TS packet size {packet_size} but buffer is {} bytes", + map.len() + ] + ); + return Err(gst::FlowError::Error); + } + + let max_payload_size = self.obj().max_payload_size() as usize; + + // Target size as clean multiple of the TS packet size + let target_payload_size = max_payload_size - (max_payload_size % packet_size); + + let mut data = map.as_slice(); + + // New data and pending data still isn't enough to fill a whole RTP payload? + if state.pending_data.len() + data.len() + packet_size <= max_payload_size { + state.pending_data.add(data, id); + return Ok(gst::FlowSuccess::Ok); + } + + // If we have pending data, the first packet will be a combo of the pending data and new data + if !state.pending_data.is_empty() { + let pending_id = state.pending_data.id(); + + let n_bytes_from_new_data_in_first_packet = + target_payload_size - state.pending_data.len(); + + gst::log!(CAT, imp: self, + "Using {} bytes ({} packets) of old data and {} bytes ({} packets) from new buffer", + state.pending_data.len(), + state.pending_data.len() / packet_size, + n_bytes_from_new_data_in_first_packet, + n_bytes_from_new_data_in_first_packet / packet_size, + ); + + let marker = state.want_marker_bit(); + + self.obj().queue_packet( + PacketToBufferRelation::Ids(pending_id..=id), + rtp_types::RtpPacketBuilder::new() + .payload(state.pending_data.data.as_slice()) + .payload(&data[0..n_bytes_from_new_data_in_first_packet]) + .marker_bit(marker), + )?; + + state.pending_data.clear(); + + // Trim off the bytes at the start that were sent out with the old pending data already + data = &data[n_bytes_from_new_data_in_first_packet..]; + } + + // Send out as many fully-filled RTP packets as we can + let iter = data.chunks_exact(target_payload_size); + + let remainder = iter.remainder(); + + gst::log!(CAT, imp: self, + "Sending {} bytes ({} packets) in {} RTP packets with max payload size {}, {} bytes ({} packets) remaining for next time", + data.len() - remainder.len(), + (data.len() - remainder.len()) / packet_size, data.len() / target_payload_size, + self.obj().max_payload_size(), remainder.len(), + remainder.len() / packet_size, + ); + + for packet_payload in iter { + let marker = state.want_marker_bit(); + + self.obj().queue_packet( + id.into(), + rtp_types::RtpPacketBuilder::new() + .payload(packet_payload) + .marker_bit(marker), + )?; + } + + // .. and stash any leftovers for sending with the next buffer + if !remainder.is_empty() { + state.pending_data.add(remainder, id); + } + + Ok(gst::FlowSuccess::Ok) + } + + fn start(&self) -> Result<(), gst::ErrorMessage> { + *self.state.borrow_mut() = State::default(); + + Ok(()) + } + + fn stop(&self) -> Result<(), gst::ErrorMessage> { + *self.state.borrow_mut() = State::default(); + + Ok(()) + } +} + +impl RtpMP2TPay { + fn send_pending_data(&self, state: &mut State) -> Result { + if state.pending_data.is_empty() { + gst::log!(CAT, imp: self, "No pending data, nothing to do"); + return Ok(gst::FlowSuccess::Ok); + } + + gst::log!(CAT, imp: self, "Sending {} bytes of old data", state.pending_data.len()); + + let pending_id = state.pending_data.id(); + + let marker = state.want_marker_bit(); + + let res = self.obj().queue_packet( + pending_id.into(), + rtp_types::RtpPacketBuilder::new() + .payload(state.pending_data.data.as_slice()) + .marker_bit(marker), + ); + + state.pending_data.clear(); + + res + } +} diff --git a/net/rtp/src/mp2t/pay/mod.rs b/net/rtp/src/mp2t/pay/mod.rs new file mode 100644 index 00000000..80eeadd8 --- /dev/null +++ b/net/rtp/src/mp2t/pay/mod.rs @@ -0,0 +1,28 @@ +// GStreamer RTP MPEG-TS Payloader +// +// Copyright (C) 2023-2024 Tim-Philipp Müller +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +pub mod imp; + +glib::wrapper! { + pub struct RtpMP2TPay(ObjectSubclass) + @extends crate::basepay::RtpBasePay2, gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "rtpmp2tpay2", + gst::Rank::MARGINAL, + RtpMP2TPay::static_type(), + ) +}