diff --git a/Cargo.lock b/Cargo.lock index 247cbe17..1b97ae8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2714,6 +2714,7 @@ dependencies = [ name = "gst-plugin-rtp" version = "0.13.0-alpha.1" dependencies = [ + "anyhow", "atomic_refcell", "bitstream-io", "gst-plugin-version-helper", @@ -2721,6 +2722,7 @@ dependencies = [ "gstreamer-app", "gstreamer-check", "gstreamer-rtp", + "gstreamer-video", "once_cell", "rand", "rtp-types", diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 3d4a22b6..b0d90b53 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -6944,6 +6944,258 @@ } }, "rank": "marginal" + }, + "rtpvp8depay2": { + "author": "Sebastian Dröge ", + "description": "Depayload VP8 from RTP packets", + "hierarchy": [ + "GstRtpVp8Depay2", + "GstRtpBaseDepay2", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Codec/Depayloader/Network/RTP", + "pad-templates": { + "sink": { + "caps": "application/x-rtp:\n media: video\n clock-rate: 90000\n encoding-name: { (string)VP8, (string)VP8-DRAFT-IETF-01 }\n", + "direction": "sink", + "presence": "always" + }, + "src": { + "caps": "video/x-vp8:\n", + "direction": "src", + "presence": "always" + } + }, + "properties": { + "request-keyframe": { + "blurb": "Request new keyframe when packet loss is detected", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "ready", + "readable": true, + "type": "gboolean", + "writable": true + }, + "wait-for-keyframe": { + "blurb": "Wait for the next keyframe after packet loss", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "ready", + "readable": true, + "type": "gboolean", + "writable": true + } + }, + "rank": "marginal" + }, + "rtpvp8pay2": { + "author": "Sebastian Dröge ", + "description": "Payload VP8 as RTP packets", + "hierarchy": [ + "GstRtpVp8Pay2", + "GstRtpBasePay2", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Codec/Payloader/Network/RTP", + "pad-templates": { + "sink": { + "caps": "video/x-vp8:\n", + "direction": "sink", + "presence": "always" + }, + "src": { + "caps": "application/x-rtp:\n media: video\n payload: [ 96, 127 ]\n clock-rate: 90000\n encoding-name: { (string)VP8, (string)VP8-DRAFT-IETF-01 }\n", + "direction": "src", + "presence": "always" + } + }, + "properties": { + "fragmentation-mode": { + "blurb": "Fragmentation Mode", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "none (0)", + "mutable": "ready", + "readable": true, + "type": "GstRtpVp8Pay2FragmentationMode", + "writable": true + }, + "picture-id": { + "blurb": "Current Picture ID", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "-1", + "max": "32767", + "min": "-1", + "mutable": "null", + "readable": true, + "type": "gint", + "writable": false + }, + "picture-id-mode": { + "blurb": "The picture ID mode for payloading", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "none (0)", + "mutable": "ready", + "readable": true, + "type": "GstRtpVp8Pay2PictureIdMode", + "writable": true + }, + "picture-id-offset": { + "blurb": "Offset to add to the initial picture-id (-1 = random)", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "-1", + "max": "32767", + "min": "-1", + "mutable": "ready", + "readable": true, + "type": "gint", + "writable": true + } + }, + "rank": "marginal" + }, + "rtpvp9depay2": { + "author": "Sebastian Dröge ", + "description": "Depayload VP9 from RTP packets", + "hierarchy": [ + "GstRtpVp9Depay2", + "GstRtpBaseDepay2", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Codec/Depayloader/Network/RTP", + "pad-templates": { + "sink": { + "caps": "application/x-rtp:\n media: video\n clock-rate: 90000\n encoding-name: { (string)VP9, (string)VP9-DRAFT-IETF-01 }\n", + "direction": "sink", + "presence": "always" + }, + "src": { + "caps": "video/x-vp9:\n", + "direction": "src", + "presence": "always" + } + }, + "properties": { + "request-keyframe": { + "blurb": "Request new keyframe when packet loss is detected", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "ready", + "readable": true, + "type": "gboolean", + "writable": true + }, + "wait-for-keyframe": { + "blurb": "Wait for the next keyframe after packet loss", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "ready", + "readable": true, + "type": "gboolean", + "writable": true + } + }, + "rank": "marginal" + }, + "rtpvp9pay2": { + "author": "Sebastian Dröge ", + "description": "Payload VP9 as RTP packets", + "hierarchy": [ + "GstRtpVp9Pay2", + "GstRtpBasePay2", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Codec/Payloader/Network/RTP", + "pad-templates": { + "sink": { + "caps": "video/x-vp9:\n", + "direction": "sink", + "presence": "always" + }, + "src": { + "caps": "application/x-rtp:\n media: video\n payload: [ 96, 127 ]\n clock-rate: 90000\n encoding-name: { (string)VP9, (string)VP9-DRAFT-IETF-01 }\n", + "direction": "src", + "presence": "always" + } + }, + "properties": { + "picture-id": { + "blurb": "Current Picture ID", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "-1", + "max": "32767", + "min": "-1", + "mutable": "null", + "readable": true, + "type": "gint", + "writable": false + }, + "picture-id-mode": { + "blurb": "The picture ID mode for payloading", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "none (0)", + "mutable": "ready", + "readable": true, + "type": "GstRtpVp9Pay2PictureIdMode", + "writable": true + }, + "picture-id-offset": { + "blurb": "Offset to add to the initial picture-id (-1 = random)", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "-1", + "max": "32767", + "min": "-1", + "mutable": "ready", + "readable": true, + "type": "gint", + "writable": true + } + }, + "rank": "marginal" } }, "filename": "gstrsrtp", @@ -7374,6 +7626,66 @@ "GObject" ], "kind": "object" + }, + "GstRtpVp8Pay2FragmentationMode": { + "kind": "enum", + "values": [ + { + "desc": "Fit as much into each packet as possible", + "name": "none", + "value": "0" + }, + { + "desc": "Make sure that every partition starts at the start of a packet", + "name": "partition-start", + "value": "1" + }, + { + "desc": "Create a new packet for every partition", + "name": "every-partition", + "value": "2" + } + ] + }, + "GstRtpVp8Pay2PictureIdMode": { + "kind": "enum", + "values": [ + { + "desc": "No Picture ID", + "name": "none", + "value": "0" + }, + { + "desc": "7-bit PictureID", + "name": "7-bit", + "value": "1" + }, + { + "desc": "15-bit Picture ID", + "name": "15-bit", + "value": "2" + } + ] + }, + "GstRtpVp9Pay2PictureIdMode": { + "kind": "enum", + "values": [ + { + "desc": "No Picture ID", + "name": "none", + "value": "0" + }, + { + "desc": "7-bit PictureID", + "name": "7-bit", + "value": "1" + }, + { + "desc": "15-bit Picture ID", + "name": "15-bit", + "value": "2" + } + ] } }, "package": "gst-plugin-rtp", diff --git a/net/rtp/Cargo.toml b/net/rtp/Cargo.toml index c60a1ffb..893c8ddb 100644 --- a/net/rtp/Cargo.toml +++ b/net/rtp/Cargo.toml @@ -9,10 +9,12 @@ description = "GStreamer Rust RTP Plugin" rust-version.workspace = true [dependencies] +anyhow = "1" atomic_refcell = "0.1" bitstream-io = "2.1" gst = { workspace = true, features = ["v1_20"] } gst-rtp = { workspace = true, features = ["v1_20"] } +gst-video = { workspace = true, features = ["v1_20"] } once_cell.workspace = true rand = { version = "0.8", default-features = false, features = ["std", "std_rng" ] } rtp-types = { version = "0.1" } diff --git a/net/rtp/src/lib.rs b/net/rtp/src/lib.rs index 7d263ae3..f68dcb41 100644 --- a/net/rtp/src/lib.rs +++ b/net/rtp/src/lib.rs @@ -26,6 +26,8 @@ mod basepay; mod av1; mod mp2t; mod pcmau; +mod vp8; +mod vp9; #[cfg(test)] mod tests; @@ -54,6 +56,12 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { pcmau::depay::register(plugin)?; pcmau::pay::register(plugin)?; + vp8::depay::register(plugin)?; + vp8::pay::register(plugin)?; + + vp9::depay::register(plugin)?; + vp9::pay::register(plugin)?; + Ok(()) } diff --git a/net/rtp/src/vp8/bool_decoder.rs b/net/rtp/src/vp8/bool_decoder.rs new file mode 100644 index 00000000..65e96bf0 --- /dev/null +++ b/net/rtp/src/vp8/bool_decoder.rs @@ -0,0 +1,173 @@ +// +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use bitstream_io::{BigEndian, BitQueue, BitRead, Endianness}; +use std::io; + +/// Implementation of the bool decoder from RFC 6386: +/// https://datatracker.ietf.org/doc/html/rfc6386#section-7.3 +/// +/// See RFC for details. +pub struct BoolDecoder { + reader: R, + eof: bool, + range: u32, + value: u32, + bit_count: u8, +} + +impl BoolDecoder { + #[inline] + pub fn new(mut reader: R) -> Result { + let mut input1 = [0u8; 1]; + let mut input2 = [0u8; 1]; + + reader.read_exact(&mut input1)?; + + let bit_count = if let Err(err) = reader.read_exact(&mut input2) { + if err.kind() == io::ErrorKind::UnexpectedEof { + // no second byte so in a state as if 8 bits were shifted out already + 8 + } else { + return Err(err); + } + } else { + // have not yet shifted out any bits + 0 + }; + + let value = ((input1[0] as u32) << 8) | (input2[0] as u32); + + Ok(BoolDecoder { + reader, + eof: false, + range: 255, // initial range is full + value, + bit_count, + }) + } + + #[inline(always)] + fn next_bool(&mut self, prob: u16) -> Result { + assert!(prob <= 256); + + // range and split are identical to the corresponding values + // used by the encoder when this bool was written + + let split = 1 + (((self.range - 1) * prob as u32) >> 8); + let split_ = split << 8; + + let ret = if self.value >= split_ { + self.range -= split; // reduce range + self.value -= split_; // subtract off left endpoint of interval + true + } else { + self.range = split; // reduce range, no change in left endpoint + false + }; + + // shift out irrelevant bits + while self.range < 128 { + self.value <<= 1; + self.range <<= 1; + + self.bit_count += 1; + // shift in new bits 8 at a time + if self.bit_count == 8 { + if !self.eof { + let mut input = [0u8; 1]; + if let Err(err) = self.reader.read_exact(&mut input) { + if err.kind() == io::ErrorKind::UnexpectedEof { + self.eof = true; + } else { + return Err(err); + } + } else { + self.bit_count = 0; + self.value |= input[0] as u32; + } + } else if self.bit_count == 16 { + return Err(io::Error::from(io::ErrorKind::UnexpectedEof)); + } + } + } + + Ok(ret) + } + + #[inline] + pub fn next_bit(&mut self) -> Result { + self.next_bool(128) + } +} + +impl BitRead for BoolDecoder { + fn read_bit(&mut self) -> std::io::Result { + self.next_bit() + } + + fn read(&mut self, mut bits: u32) -> std::io::Result + where + U: bitstream_io::Numeric, + { + if bits > U::BITS_SIZE { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "excessive bits for type read", + )); + } + + let mut queue = BitQueue::::new(); + while bits > 0 { + queue.push(1, U::from_u8(self.next_bit()? as u8)); + bits -= 1; + } + + Ok(queue.value()) + } + + fn read_signed(&mut self, bits: u32) -> std::io::Result + where + S: bitstream_io::SignedNumeric, + { + BigEndian::read_signed(self, bits) + } + + fn read_to(&mut self) -> std::io::Result + where + V: bitstream_io::Primitive, + { + BigEndian::read_primitive(self) + } + + fn read_as_to(&mut self) -> std::io::Result + where + F: bitstream_io::Endianness, + V: bitstream_io::Primitive, + { + F::read_primitive(self) + } + + fn skip(&mut self, mut bits: u32) -> std::io::Result<()> { + while bits > 0 { + self.next_bit()?; + bits -= 1; + } + + Ok(()) + } + + fn byte_aligned(&self) -> bool { + false + } + + fn byte_align(&mut self) { + unimplemented!() + } +} diff --git a/net/rtp/src/vp8/depay/imp.rs b/net/rtp/src/vp8/depay/imp.rs new file mode 100644 index 00000000..7e07db29 --- /dev/null +++ b/net/rtp/src/vp8/depay/imp.rs @@ -0,0 +1,402 @@ +// +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * SECTION:element-rtpvp8depay2 + * @see_also: rtpvp8pay2, vp8enc, vp8dec + * + * Depayload a VP8 video stream from RTP packets as per [RFC 7741][rfc-7741]. + * + * [rfc-7741]: https://www.rfc-editor.org/rfc/rfc7741#section-4 + * + * ## Example pipeline + * + * ```shell + * gst-launch-1.0 udpsrc address=127.0.0.1 port=5555 caps='application/x-rtp,media=video,clock-rate=90000,encoding-name=VP8' ! rtpjitterbuffer latency=100 ! rtpvp8depay2 ! decodebin3 ! videoconvertscale ! autovideosink + * ``` + * + * This will depayload and decode an incoming RTP VP8 video stream. You can use the #rtpvp8pay2 + * and #vp8enc elements to create such an RTP stream. + * + * Since: plugins-rs-0.13.0 + */ +use std::{io::Cursor, mem, sync::Mutex}; + +use atomic_refcell::AtomicRefCell; +use bitstream_io::{BigEndian, ByteRead as _, ByteReader}; + +use gst::{glib, prelude::*, subclass::prelude::*}; + +use once_cell::sync::Lazy; + +use crate::basedepay::{PacketToBufferRelation, RtpBaseDepay2Ext}; +use crate::vp8::frame_header::UncompressedFrameHeader; +use crate::vp8::payload_descriptor::{PayloadDescriptor, PictureId}; + +#[derive(Clone, Default)] +struct Settings { + request_keyframe: bool, + wait_for_keyframe: bool, +} + +struct State { + /// Last extended RTP timestamp. + last_timestamp: Option, + + /// Last picture ID, if any. + /// + /// This is the picture ID from the last frame and is reset + /// to `None` also if a picture doesn't have any ID. + last_picture_id: Option, + + /// Payload descriptor of the first packet of the current frame. + /// + /// This is reset whenever the current frame is pushed downstream. + current_frame_payload_descriptor: Option, + + /// Last keyframe frame header + last_keyframe_frame_header: Option, + + /// Currently queued data for the current frame. + pending_frame_ext_seqnum: u64, + pending_frame_is_keyframe: bool, + pending_frame: Vec, + + /// Set to `true` if the next outgoing buffer should have the `DISCONT` flag set. + needs_discont: bool, +} + +impl Default for State { + fn default() -> Self { + State { + last_timestamp: None, + last_picture_id: None, + current_frame_payload_descriptor: None, + last_keyframe_frame_header: None, + pending_frame_ext_seqnum: 0, + pending_frame: Vec::default(), + pending_frame_is_keyframe: false, + needs_discont: true, + } + } +} + +#[derive(Default)] +pub struct RtpVp8Depay { + state: AtomicRefCell, + settings: Mutex, +} + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "rtpvp8depay2", + gst::DebugColorFlags::empty(), + Some("RTP VP8 Depayloader"), + ) +}); + +impl RtpVp8Depay { + fn reset(&self, state: &mut State) { + gst::debug!(CAT, imp: self, "resetting state"); + + *state = State::default() + } +} + +#[glib::object_subclass] +impl ObjectSubclass for RtpVp8Depay { + const NAME: &'static str = "GstRtpVp8Depay2"; + type Type = super::RtpVp8Depay; + type ParentType = crate::basedepay::RtpBaseDepay2; +} + +impl ObjectImpl for RtpVp8Depay { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecBoolean::builder("request-keyframe") + .nick("Request Keyframe") + .blurb("Request new keyframe when packet loss is detected") + .default_value(Settings::default().request_keyframe) + .mutable_ready() + .build(), + glib::ParamSpecBoolean::builder("wait-for-keyframe") + .nick("Wait For Keyframe") + .blurb("Wait for the next keyframe after packet loss") + .default_value(Settings::default().wait_for_keyframe) + .mutable_ready() + .build(), + ] + }); + + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "request-keyframe" => { + self.settings.lock().unwrap().request_keyframe = value.get().unwrap(); + } + "wait-for-keyframe" => { + self.settings.lock().unwrap().wait_for_keyframe = value.get().unwrap(); + } + _ => unimplemented!(), + }; + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "request-keyframe" => self.settings.lock().unwrap().request_keyframe.to_value(), + "wait-for-keyframe" => self.settings.lock().unwrap().wait_for_keyframe.to_value(), + _ => unimplemented!(), + } + } +} + +impl GstObjectImpl for RtpVp8Depay {} + +impl ElementImpl for RtpVp8Depay { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "RTP VP8 Depayloader", + "Codec/Depayloader/Network/RTP", + "Depayload VP8 from RTP packets", + "Sebastian Dröge ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &gst::Caps::builder("application/x-rtp") + .field("media", "video") + .field("clock-rate", 90_000i32) + .field( + "encoding-name", + gst::List::new(["VP8", "VP8-DRAFT-IETF-01"]), + ) + .build(), + ) + .unwrap(); + + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::builder("video/x-vp8").build(), + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} + +impl crate::basedepay::RtpBaseDepay2Impl for RtpVp8Depay { + const ALLOWED_META_TAGS: &'static [&'static str] = &["video"]; + + fn start(&self) -> Result<(), gst::ErrorMessage> { + let mut state = self.state.borrow_mut(); + self.reset(&mut state); + + Ok(()) + } + + fn stop(&self) -> Result<(), gst::ErrorMessage> { + let mut state = self.state.borrow_mut(); + self.reset(&mut state); + + Ok(()) + } + + fn flush(&self) { + let mut state = self.state.borrow_mut(); + self.reset(&mut state); + } + + // TODO: Might want to send lost events (and possibly ignore the ones from upstream) if there + // are discontinuities (either in the seqnum or otherwise detected). This is especially useful + // in case of ULPFEC as that breaks seqnum-based discontinuity detecetion. + // + // rtpvp8depay does this but it feels like the whole approach needs some redesign. + fn handle_packet( + &self, + packet: &crate::basedepay::Packet, + ) -> Result { + let settings = self.settings.lock().unwrap().clone(); + + gst::trace!(CAT, imp: self, "Handling RTP packet {packet:?}"); + let mut state = self.state.borrow_mut(); + + let payload = packet.payload(); + let mut cursor = Cursor::new(payload); + + let mut r = ByteReader::endian(&mut cursor, BigEndian); + let payload_descriptor = match r.parse::() { + Ok(payload_descriptor) => payload_descriptor, + Err(err) => { + gst::warning!(CAT, imp: self, "Invalid VP8 RTP packet: {err}"); + self.reset(&mut state); + self.obj().drop_packet(packet); + return Ok(gst::FlowSuccess::Ok); + } + }; + + let payload_start_index = cursor.position() as usize; + gst::trace!(CAT, imp: self, "VP8 RTP payload descriptor size: {}", payload_start_index); + gst::trace!(CAT, imp: self, "Received VP8 RTP payload descriptor: {payload_descriptor:?}"); + + // This is the start of a frame if it is the start of a partition and the partition index + // is 0. + let is_start_of_frame = + payload_descriptor.start_of_partition && payload_descriptor.partition_index == 0; + + // If this is not the start of a picture then we have to wait for one + if state.current_frame_payload_descriptor.is_none() && !is_start_of_frame { + if state.last_timestamp.is_some() { + gst::warning!(CAT, imp: self, "Waiting for start of picture"); + } else { + gst::trace!(CAT, imp: self, "Waiting for start of picture"); + } + self.obj().drop_packet(packet); + self.reset(&mut state); + return Ok(gst::FlowSuccess::Ok); + } + + // Update state tracking + if is_start_of_frame { + let mut r = ByteReader::endian(&mut cursor, BigEndian); + // We assume that the 10 bytes of frame header are in the first packet + let frame_header = match r.parse::() { + Ok(frame_header) => frame_header, + Err(err) => { + gst::warning!(CAT, imp: self, "Failed to read frame header: {err}"); + self.obj().drop_packet(packet); + self.reset(&mut state); + return Ok(gst::FlowSuccess::Ok); + } + }; + + // If necessary wait for a key frame if we never saw one so far and/or request one + // from upstream. + if !frame_header.is_keyframe && state.last_keyframe_frame_header.is_none() { + if settings.request_keyframe { + gst::debug!(CAT, imp: self, "Requesting keyframe from upstream"); + let event = gst_video::UpstreamForceKeyUnitEvent::builder() + .all_headers(true) + .build(); + let _ = self.obj().sink_pad().push_event(event); + } + + if settings.wait_for_keyframe { + gst::trace!(CAT, imp: self, "Waiting for keyframe"); + // TODO: Could potentially drain here? + self.reset(&mut state); + self.obj().drop_packet(packet); + return Ok(gst::FlowSuccess::Ok); + } + } + + assert!(state.pending_frame.is_empty()); + state.pending_frame_ext_seqnum = packet.ext_seqnum(); + state.pending_frame_is_keyframe = frame_header.is_keyframe; + state.current_frame_payload_descriptor = Some(payload_descriptor.clone()); + state.last_timestamp = Some(packet.ext_timestamp()); + if let Some(picture_id) = payload_descriptor.picture_id { + state.last_picture_id = Some(picture_id); + } else { + state.last_picture_id = None; + } + + if frame_header.is_keyframe { + // Update caps with profile and resolution now that we know it + if state + .last_keyframe_frame_header + .as_ref() + .map_or(true, |last_frame_header| { + last_frame_header.profile != frame_header.profile + || last_frame_header.resolution != frame_header.resolution + }) + { + let resolution = frame_header.resolution.unwrap(); + + let caps = gst::Caps::builder("video/x-vp8") + .field("profile", format!("{}", frame_header.profile)) + .field("width", resolution.0 as i32) + .field("height", resolution.1 as i32) + .build(); + + self.obj().set_src_caps(&caps); + } + state.last_keyframe_frame_header = Some(frame_header); + } + } + + state + .pending_frame + .extend_from_slice(&payload[payload_start_index..]); + + // The marker bit is set for the last packet of a frame. + if !packet.marker_bit() { + return Ok(gst::FlowSuccess::Ok); + } + + let mut buffer = gst::Buffer::from_mut_slice(mem::take(&mut state.pending_frame)); + { + let buffer = buffer.get_mut().unwrap(); + + if !state.pending_frame_is_keyframe { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + gst::trace!(CAT, imp: self, "Finishing delta-frame"); + } else { + gst::trace!(CAT, imp: self, "Finishing keyframe"); + } + + if state.needs_discont { + gst::trace!(CAT, imp: self, "Setting DISCONT"); + buffer.set_flags(gst::BufferFlags::DISCONT); + state.needs_discont = false; + } + + // Set MARKER flag on the output so that the parser knows that this buffer ends a full + // frame and potentially can operate a bit faster. + buffer.set_flags(gst::BufferFlags::MARKER); + + // TODO: Could add VP8 custom meta about scalability here + } + + state.current_frame_payload_descriptor = None; + state.pending_frame_is_keyframe = false; + + // Set fallback caps if the first complete frame we have is not a keyframe. For keyframes, + // caps with profile and resolution would've been set above already. + // + // If a keyframe is received in the future then the caps are updated above. + if !self.obj().src_pad().has_current_caps() { + self.obj() + .set_src_caps(&self.obj().src_pad().pad_template_caps()); + } + + self.obj().queue_buffer( + PacketToBufferRelation::Seqnums(state.pending_frame_ext_seqnum..=packet.ext_seqnum()), + buffer, + )?; + + Ok(gst::FlowSuccess::Ok) + } +} diff --git a/net/rtp/src/vp8/depay/mod.rs b/net/rtp/src/vp8/depay/mod.rs new file mode 100644 index 00000000..6215880f --- /dev/null +++ b/net/rtp/src/vp8/depay/mod.rs @@ -0,0 +1,27 @@ +// +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +pub mod imp; + +glib::wrapper! { + pub struct RtpVp8Depay(ObjectSubclass) + @extends crate::basedepay::RtpBaseDepay2, gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "rtpvp8depay2", + gst::Rank::MARGINAL, + RtpVp8Depay::static_type(), + ) +} diff --git a/net/rtp/src/vp8/frame_header.rs b/net/rtp/src/vp8/frame_header.rs new file mode 100644 index 00000000..d63b1b74 --- /dev/null +++ b/net/rtp/src/vp8/frame_header.rs @@ -0,0 +1,422 @@ +// +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use std::io::{self, Cursor}; + +use anyhow::{bail, Context as _}; +use bitstream_io::{ + BigEndian, BitRead, ByteRead as _, ByteReader, FromBitStream, FromBitStreamWith, + FromByteStream, LittleEndian, +}; +use smallvec::SmallVec; + +use super::bool_decoder::BoolDecoder; + +#[derive(Debug)] +pub struct UncompressedFrameHeader { + pub first_partition_size: u32, + pub is_keyframe: bool, + pub show_frame: bool, + pub profile: u8, + /// Horizontal and vertical scale only set for keyframes + pub scale: Option<(u8, u8)>, + /// Width and height only set for keyframes + pub resolution: Option<(u16, u16)>, + // More fields follow that we don't parse +} + +impl FromByteStream for UncompressedFrameHeader { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> Result + where + Self: Sized, + { + let b = r.read::().context("frame_header")?; + + let is_keyframe = (b & 0b0000_0001) == 0; + let show_frame = (b & 0b0001_0000) != 0; + let profile = (b >> 1) & 0b0011; + let size0 = (b & 0b1110_0000) >> 5; + + let size1 = r.read::().context("size1")?; + let size2 = r.read::().context("size2")?; + + let first_partition_size = + ((size2 as u32) << (16 - 5)) | ((size1 as u32) << (8 - 5)) | (size0 as u32); + + let (scale, resolution) = if is_keyframe { + let sync_code_1 = r.read::().context("sync_code_1")?; + let sync_code_2 = r.read::().context("sync_code_2")?; + let sync_code_3 = r.read::().context("sync_code_3")?; + + if [0x9d, 0x01, 0x2a] != [sync_code_1, sync_code_2, sync_code_3] { + bail!("Invalid sync code"); + } + + let w = r.read_as::().context("width")?; + let h = r.read_as::().context("height")?; + + ( + Some(((w >> 14) as u8, (h >> 14) as u8)), + Some((w & 0b0011_1111_1111_1111, h & 0b0011_1111_1111_1111)), + ) + } else { + (None, None) + }; + + Ok(UncompressedFrameHeader { + is_keyframe, + show_frame, + profile, + scale, + resolution, + first_partition_size, + }) + } +} + +#[derive(Debug)] +pub struct FrameHeader { + pub color_space: Option, + pub clamping_type: Option, + pub update_segmentation: Option, + pub filter_type: u8, + pub loop_filter_level: u8, + pub sharpness_level: u8, + pub mb_lf_adjustments: Option, + pub nbr_of_dct_partitions: u8, + // More fields following +} + +/// Helper trait to read an unsigned value and its sign bit. +trait BitReadExt: BitRead { + /// Read an `i8` from 1-7 bits absolute value and a sign bit. + // TODO: Could make this generic over `SignedNumeric` but that requires implementing a + // `as_negative()` that works with absolute value + sign instead of two's complement. + fn read_with_sign(&mut self, bits: u32) -> Result { + assert!(bits > 0 && bits <= 7); + let value = self.read::(bits)?; + let sign = self.read_bit()?; + + if sign { + Ok(-(value as i8)) + } else { + Ok(value as i8) + } + } +} + +impl BitReadExt for T {} + +impl<'a> FromBitStreamWith<'a> for FrameHeader { + type Error = anyhow::Error; + /// Keyframe? + type Context = bool; + + fn from_reader( + r: &mut R, + keyframe: &Self::Context, + ) -> Result + where + Self: Sized, + { + // Technically this uses arithmetic coding / range coding but the probability of every bit + // that is read here is 50:50 so it's equivalent to no encoding at all. + let (color_space, clamping_type) = if *keyframe { + ( + Some(r.read::(1).context("color_space")?), + Some(r.read::(1).context("clamping_type")?), + ) + } else { + (None, None) + }; + + let segmentation_enabled = r.read_bit().context("segmentation_enabled")?; + let update_segmentation = if segmentation_enabled { + Some( + r.parse::() + .context("update_segmentation")?, + ) + } else { + None + }; + + let filter_type = r.read::(1).context("filter_type")?; + let loop_filter_level = r.read::(6).context("loop_filter_level")?; + let sharpness_level = r.read::(3).context("sharpness_level")?; + + let loop_filter_adj_enable = r.read_bit().context("loop_filter_adj_enable")?; + + let mb_lf_adjustments = if loop_filter_adj_enable { + Some(r.parse::().context("mb_lf_adjustments")?) + } else { + None + }; + + let nbr_of_dct_partitions = 1 << r.read::(2).context("nbr_of_dct_partitions")?; + + Ok(FrameHeader { + color_space, + clamping_type, + update_segmentation, + filter_type, + loop_filter_level, + sharpness_level, + mb_lf_adjustments, + nbr_of_dct_partitions, + }) + } +} + +#[derive(Debug)] +pub struct UpdateSegmentation { + pub segment_feature_data: Option, + pub mb_segmentation_map: Option, +} + +impl FromBitStream for UpdateSegmentation { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> Result + where + Self: Sized, + { + let update_mb_segmentation_map = r.read_bit().context("update_mb_segmentation_map")?; + let update_segment_feature_data = r.read_bit().context("update_segment_feature_data")?; + + let segment_feature_data = if update_segment_feature_data { + Some( + r.parse::() + .context("segment_feature_data")?, + ) + } else { + None + }; + + let mb_segmentation_map = if update_mb_segmentation_map { + Some( + r.parse::() + .context("mb_segmentation_map")?, + ) + } else { + None + }; + + Ok(UpdateSegmentation { + segment_feature_data, + mb_segmentation_map, + }) + } +} + +#[derive(Debug)] +pub struct SegmentFeatureData { + pub segment_feature_mode: u8, + pub quantizer_update: SmallVec<[Option; 4]>, + pub loop_filter_update: SmallVec<[Option; 4]>, +} + +impl FromBitStream for SegmentFeatureData { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> Result + where + Self: Sized, + { + let segment_feature_mode = r.read::(1).context("segment_feature_mode")?; + + let mut quantizer_update = SmallVec::new(); + let mut loop_filter_update = SmallVec::new(); + for _ in 0..4 { + let quantizer_update_flag = r.read_bit().context("quantizer_update_flag")?; + + if quantizer_update_flag { + quantizer_update.push(Some(r.read_with_sign(7).context("quantizer_update")?)); + } else { + quantizer_update.push(None); + } + } + for _ in 0..4 { + let loop_filter_update_flags = r.read_bit().context("loop_filter_update_flags")?; + + if loop_filter_update_flags { + loop_filter_update.push(Some(r.read_with_sign(6).context("lf_update")?)); + } else { + loop_filter_update.push(None); + } + } + + Ok(SegmentFeatureData { + segment_feature_mode, + quantizer_update, + loop_filter_update, + }) + } +} + +#[derive(Debug)] +pub struct MbSegmentationMap { + pub segment_probs: SmallVec<[Option; 3]>, +} + +impl FromBitStream for MbSegmentationMap { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> Result + where + Self: Sized, + { + let mut segment_probs = SmallVec::new(); + for _ in 0..3 { + let segment_prob_update = r.read_bit().context("segment_prob_update")?; + + if segment_prob_update { + let segment_prob = r.read::(8).context("segment_prob")?; + + segment_probs.push(Some(segment_prob)); + } else { + segment_probs.push(None); + } + } + + Ok(MbSegmentationMap { segment_probs }) + } +} + +#[derive(Debug)] +pub struct MbLfAdjustments { + pub ref_frame_delta_update: SmallVec<[Option; 4]>, + pub mb_mode_delta_update: SmallVec<[Option; 4]>, +} + +impl FromBitStream for MbLfAdjustments { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> Result + where + Self: Sized, + { + // loop_filter_adj_enable already read by caller! + + let mode_ref_lf_delta_update = r.read_bit().context("mode_ref_lf_delta_update")?; + + let mut ref_frame_delta_update = SmallVec::new(); + let mut mb_mode_delta_update = SmallVec::new(); + if mode_ref_lf_delta_update { + for _ in 0..4 { + let ref_frame_delta_update_flag = + r.read_bit().context("ref_frame_delta_update_flag")?; + + if ref_frame_delta_update_flag { + ref_frame_delta_update + .push(Some(r.read_with_sign(6).context("delta_magnitude")?)); + } else { + ref_frame_delta_update.push(None); + } + } + for _ in 0..4 { + let mb_mode_delta_update_flag = + r.read_bit().context("mb_mode_delta_update_flag")?; + + if mb_mode_delta_update_flag { + mb_mode_delta_update + .push(Some(r.read_with_sign(6).context("delta_magnitude")?)); + } else { + mb_mode_delta_update.push(None); + } + } + } + + Ok(MbLfAdjustments { + ref_frame_delta_update, + mb_mode_delta_update, + }) + } +} + +#[derive(Debug)] +pub struct FrameInfo { + pub uncompressed_frame_header: UncompressedFrameHeader, + pub frame_header: FrameHeader, + pub partition_offsets: SmallVec<[u32; 10]>, +} + +impl FrameInfo { + pub fn parse(data: impl AsRef<[u8]>) -> Result { + let data = data.as_ref(); + let mut cursor = Cursor::new(data); + + let mut r = ByteReader::endian(&mut cursor, BigEndian); + + let uncompressed_frame_header = r + .parse::() + .context("uncompressed_frame_header")?; + + let offset = cursor.position(); + if data.len() < offset as usize + uncompressed_frame_header.first_partition_size as usize { + bail!("not enough data"); + } + + let mut r = BoolDecoder::new(&mut cursor).context("bool_decoder")?; + let frame_header = r + .parse_with::(&uncompressed_frame_header.is_keyframe) + .context("frame_header")?; + + cursor.set_position(offset); + let mut r = ByteReader::endian(&mut cursor, BigEndian); + + // Read partition sizes and calculate offsets from the start of the data + let mut partition_offsets = SmallVec::<[u32; 10]>::new(); + + // The partition sizes are stored right after the first partition as 24 bit little endian + // integers. The last partition size is not given but until the end of the data. + + // We consider the uncompressed header and partition sizes as part of the first partition + partition_offsets.push(0); + + // Skip to the partition sizes + r.skip(uncompressed_frame_header.first_partition_size) + .context("first_partition")?; + + // Offset of the second partition! + let mut current_offset = uncompressed_frame_header.first_partition_size + + offset as u32 + + 3 * (frame_header.nbr_of_dct_partitions as u32 - 1); + + for _ in 1..frame_header.nbr_of_dct_partitions { + let size0 = r.read::().context("size0")?; + let size1 = r.read::().context("size1")?; + let size2 = r.read::().context("size2")?; + + let current_partition_size = + ((size2 as u32) << 16) | ((size1 as u32) << 8) | (size0 as u32); + + partition_offsets.push(current_offset); + + current_offset += current_partition_size; + } + + partition_offsets.push(current_offset); + + // Check if the partition offsets are actually valid. If they go outside the frame then + // assume we just don't know anything about partitions and there's only a single one. + if current_offset >= data.len() as u32 { + partition_offsets.clear(); + partition_offsets.push(0); + } + partition_offsets.push(data.len() as u32); + + Ok(FrameInfo { + uncompressed_frame_header, + frame_header, + partition_offsets, + }) + } +} diff --git a/net/rtp/src/vp8/mod.rs b/net/rtp/src/vp8/mod.rs new file mode 100644 index 00000000..823bc03c --- /dev/null +++ b/net/rtp/src/vp8/mod.rs @@ -0,0 +1,17 @@ +// +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +mod bool_decoder; +pub mod depay; +mod frame_header; +pub mod pay; +mod payload_descriptor; + +#[cfg(test)] +mod tests; diff --git a/net/rtp/src/vp8/pay/imp.rs b/net/rtp/src/vp8/pay/imp.rs new file mode 100644 index 00000000..aa5554a5 --- /dev/null +++ b/net/rtp/src/vp8/pay/imp.rs @@ -0,0 +1,550 @@ +// +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * SECTION:element-rtpvp8pay2 + * @see_also: rtpvp8depay2, vp8dec, vp8enc + * + * Payload a VP8 video stream into RTP packets as per [RFC 7741][rfc-7741]. + * + * [rfc-7741]: https://www.rfc-editor.org/rfc/rfc7741#section-4 + * + * ## Example pipeline + * + * |[ + * gst-launch-1.0 videotestsrc ! video/x-raw,width=1280,height=720,format=I420 ! timeoverlay font-desc=Sans,22 ! vp8enc ! rtpvp8pay2 ! udpsink host=127.0.0.1 port=5004 + * ]| This will create and payload a VP8 video stream with a test pattern and + * send it out via UDP to localhost port 5004. + * + * Since: plugins-rs-0.13.0 + */ +use atomic_refcell::AtomicRefCell; +use gst::{glib, prelude::*, subclass::prelude::*}; +use smallvec::SmallVec; +use std::{cmp, sync::Mutex}; + +use bitstream_io::{BigEndian, ByteWrite as _, ByteWriter}; +use once_cell::sync::Lazy; + +use crate::{ + basepay::{RtpBasePay2Ext, RtpBasePay2ImplExt}, + vp8::{ + frame_header::FrameInfo, + payload_descriptor::{LayerId, PayloadDescriptor, PictureId}, + }, +}; + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "rtpvp8pay2", + gst::DebugColorFlags::empty(), + Some("RTP VP8 Payloader"), + ) +}); + +#[derive(Clone, Default)] +struct Settings { + picture_id_mode: super::PictureIdMode, + picture_id_offset: Option, + fragmentation_mode: super::FragmentationMode, +} + +#[derive(Default)] +struct State { + /// Only set if a VP8 custom meta was ever received for this stream. Incremented whenever a + /// frame with layer-id=0 or no meta is received. + temporal_layer_zero_index: Option, +} + +#[derive(Default)] +pub struct RtpVp8Pay { + settings: Mutex, + state: AtomicRefCell, + /// Current picture ID. + /// + /// Reset to `None` in `Null` / `Ready` state and initialized to the offset when going to + /// `Paused`. + picture_id: Mutex>, +} + +#[glib::object_subclass] +impl ObjectSubclass for RtpVp8Pay { + const NAME: &'static str = "GstRtpVp8Pay2"; + type Type = super::RtpVp8Pay; + type ParentType = crate::basepay::RtpBasePay2; +} + +impl ObjectImpl for RtpVp8Pay { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecEnum::builder::("picture-id-mode") + .nick("Picture ID Mode") + .blurb("The picture ID mode for payloading") + .default_value(Settings::default().picture_id_mode) + .mutable_ready() + .build(), + glib::ParamSpecInt::builder("picture-id-offset") + .nick("Picture ID Offset") + .blurb("Offset to add to the initial picture-id (-1 = random)") + .default_value( + Settings::default() + .picture_id_offset + .map(i32::from) + .unwrap_or(-1), + ) + .minimum(-1) + .maximum(0x7fff) + .mutable_ready() + .build(), + glib::ParamSpecInt::builder("picture-id") + .nick("Picture ID") + .blurb("Current Picture ID") + .default_value(-1) + .minimum(-1) + .maximum(0x7fff) + .read_only() + .build(), + glib::ParamSpecEnum::builder::("fragmentation-mode") + .nick("Fragmentation Mode") + .blurb("Fragmentation Mode") + .default_value(Settings::default().fragmentation_mode) + .mutable_ready() + .build(), + ] + }); + + &PROPERTIES + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "picture-id-mode" => { + self.settings.lock().unwrap().picture_id_mode = value.get().unwrap(); + } + "picture-id-offset" => { + let v = value.get::().unwrap(); + self.settings.lock().unwrap().picture_id_offset = + (v != -1).then_some((v & 0x7fff) as u16); + } + "fragmentation-mode" => { + self.settings.lock().unwrap().fragmentation_mode = value.get().unwrap(); + } + _ => unimplemented!(), + }; + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "picture-id-mode" => self.settings.lock().unwrap().picture_id_mode.to_value(), + "picture-id-offset" => self + .settings + .lock() + .unwrap() + .picture_id_offset + .map(i32::from) + .unwrap_or(-1) + .to_value(), + "picture-id" => { + let picture_id = self.picture_id.lock().unwrap(); + picture_id + .map(u16::from) + .map(i32::from) + .unwrap_or(-1) + .to_value() + } + "fragmentation-mode" => self.settings.lock().unwrap().fragmentation_mode.to_value(), + _ => unimplemented!(), + } + } +} + +impl GstObjectImpl for RtpVp8Pay {} + +impl ElementImpl for RtpVp8Pay { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "RTP VP8 payloader", + "Codec/Payloader/Network/RTP", + "Payload VP8 as RTP packets", + "Sebastian Dröge ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &gst::Caps::builder("video/x-vp8").build(), + ) + .unwrap(); + + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::builder("application/x-rtp") + .field("media", "video") + .field("payload", gst::IntRange::new(96, 127)) + .field("clock-rate", 90_000i32) + .field( + "encoding-name", + gst::List::new(["VP8", "VP8-DRAFT-IETF-01"]), + ) + .build(), + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} + +impl crate::basepay::RtpBasePay2Impl for RtpVp8Pay { + const ALLOWED_META_TAGS: &'static [&'static str] = &["video"]; + + fn start(&self) -> Result<(), gst::ErrorMessage> { + *self.state.borrow_mut() = State::default(); + + let settings = self.settings.lock().unwrap().clone(); + + let picture_id_offset = settings.picture_id_offset.unwrap_or_else(|| { + use rand::Rng as _; + + let mut rng = rand::thread_rng(); + rng.gen::() + }); + + let picture_id = PictureId::new(settings.picture_id_mode, picture_id_offset); + *self.picture_id.lock().unwrap() = picture_id; + + Ok(()) + } + + fn stop(&self) -> Result<(), gst::ErrorMessage> { + *self.state.borrow_mut() = State::default(); + *self.picture_id.lock().unwrap() = None; + + Ok(()) + } + + fn set_sink_caps(&self, caps: &gst::Caps) -> bool { + gst::debug!(CAT, imp: self, "received caps {caps:?}"); + + let caps_builder = gst::Caps::builder("application/x-rtp") + .field("media", "video") + .field("clock-rate", 90_000i32) + .field( + "encoding-name", + gst::List::new(["VP8", "VP8-DRAFT-IETF-01"]), + ); + + self.obj().set_src_caps(&caps_builder.build()); + + true + } + + fn negotiate(&self, mut src_caps: gst::Caps) { + // Fixate the encoding-name with preference to "VP8" + + src_caps.truncate(); + { + let src_caps = src_caps.get_mut().unwrap(); + let s = src_caps.structure_mut(0).unwrap(); + s.fixate_field_str("encoding-name", "VP8"); + } + + self.parent_negotiate(src_caps); + } + + fn handle_buffer( + &self, + buffer: &gst::Buffer, + id: u64, + ) -> Result { + let mut state = self.state.borrow_mut(); + + let settings = self.settings.lock().unwrap(); + let max_payload_size = self.obj().max_payload_size(); + + gst::trace!(CAT, imp: self, "received buffer of size {}", buffer.size()); + + let map = buffer.map_readable().map_err(|_| { + gst::element_imp_error!( + self, + gst::ResourceError::Read, + ["Failed to map buffer readable"] + ); + + gst::FlowError::Error + })?; + + let picture_id = *self.picture_id.lock().unwrap(); + + // If this flag is not set by upstream then we don't set the corresponding value in the + // payload descriptor. That's not a problem because it only means that the frame can't be + // dropped safely, which is what has to be assumed anyway if there's no other information. + let non_reference_frame = buffer.flags().contains(gst::BufferFlags::DROPPABLE); + + let meta = VP8Meta::from_buffer(buffer); + + // Initialize temporal layer zero index the first time we receive a meta with temporal + // scaling enabled. + if meta.as_ref().map(|meta| meta.layer_id.is_some()) == Some(true) + && state.temporal_layer_zero_index.is_none() + { + gst::trace!(CAT, imp: self, "Detected stream with temporal scalability"); + state.temporal_layer_zero_index = Some(0); + } + + // Can't work with partition indices if temporal scalability is enabled + let partition_offsets = if state.temporal_layer_zero_index.is_none() { + match FrameInfo::parse(&map) { + Ok(frame_info) => { + gst::trace!(CAT, imp: self, "Parsed frame info {frame_info:?}"); + + Some(frame_info.partition_offsets) + } + Err(err) => { + gst::error!(CAT, imp: self, "Failed parsing frame info: {err}"); + None + } + } + } else { + None + }; + + let mut first = true; + let mut current_offset = 0; + let mut data = map.as_slice(); + while !data.is_empty() { + let mut payload_descriptor = PayloadDescriptor { + picture_id, + non_reference_frame, + start_of_partition: first, + partition_index: 0, // filled later + temporal_layer_zero_index: state.temporal_layer_zero_index, + temporal_layer_id: if state.temporal_layer_zero_index.is_some() { + let (temporal_layer, layer_sync) = meta + .as_ref() + .and_then(|meta| meta.layer_id) + .unwrap_or((0, false)); + + Some(LayerId { + id: temporal_layer as u8, + sync: layer_sync, + }) + } else { + None + }, + key_index: None, + }; + + let payload_descriptor_size = payload_descriptor.size().map_err(|err| { + gst::error!(CAT, imp: self, "Failed to write payload descriptor: {err:?}"); + gst::FlowError::Error + })?; + let overhead = payload_descriptor_size; + let payload_size = (max_payload_size as usize) + .checked_sub(overhead + 1) + .ok_or_else(|| { + gst::error!(CAT, imp: self, "Too small MTU configured for stream"); + gst::element_imp_error!( + self, + gst::LibraryError::Settings, + ["Too small MTU configured for stream"] + ); + gst::FlowError::Error + })? + + 1; + + let mut payload_size = cmp::min(payload_size, data.len()); + + if let Some(ref partition_offsets) = partition_offsets { + let (start_partition_index, start_partition_start, start_partition_end) = + find_partition_for_offset(partition_offsets, current_offset); + + // FIXME: Partition indices go from 0 to 8 inclusive, but there are only 3 bits + // available. The first two partitions are considered partition index 0. + // + // If there are 8 DCT token partitions then there are 9 partitions overall because + // there's always the first partition with motion vectors etc. + if start_partition_index <= 1 { + payload_descriptor.partition_index = 0; + } else { + payload_descriptor.partition_index = (start_partition_index - 1) as u8 & 0b111; + + // For the first partition this is set above when creating the payload + // descriptor. + if start_partition_start == current_offset { + payload_descriptor.start_of_partition = true; + } + } + + let (end_partition_index, end_partition_start, end_partition_end) = + find_partition_for_offset( + partition_offsets, + // -1 so we have the last byte that is still part of the payload + current_offset + payload_size as u32 - 1, + ); + + // Check if the payload size has to be reduced to fit the selected fragmentation mode. + match settings.fragmentation_mode { + crate::vp8::pay::FragmentationMode::None => (), + crate::vp8::pay::FragmentationMode::PartitionStart => { + // If start and end partition are different then set the payload size in + // such a way that it ends just before the end partition, i.e. the next + // packet would start with that partition. + // + // If the end partition index is partition 1 then don't do anything: as + // explained above we consider partition 0 and 1 as one. + if start_partition_index != end_partition_index + && end_partition_index != 1 + && end_partition_end > (current_offset + payload_size as u32) + { + payload_size = (end_partition_start - current_offset) as usize; + } + } + crate::vp8::pay::FragmentationMode::EveryPartition => { + // If the end offset is after the end of the current partition then reduce + // it to the end of the current partition. + // + // If the end partition index is partition 1 then consider the end of that + // one instead: as explained above we consider partition 0 and 1 as one. + // + // If the end partition is partition 0 then there's nothing to be done. + // Start and end of this packet are inside partition 0. + if end_partition_index > 1 + && current_offset + payload_size as u32 > start_partition_end + { + payload_size = (start_partition_end - current_offset) as usize; + } else if end_partition_index == 1 + && current_offset + payload_size as u32 > end_partition_end + { + payload_size = (end_partition_end - current_offset) as usize; + } + } + } + } + + gst::trace!( + CAT, + imp: self, + "Writing packet with payload descriptor {payload_descriptor:?} and payload size {payload_size} at offset {current_offset}", + ); + + assert!(payload_size > 0); + + let mut payload_descriptor_buffer = + SmallVec::<[u8; 256]>::with_capacity(payload_descriptor_size); + let mut w = ByteWriter::endian(&mut payload_descriptor_buffer, BigEndian); + w.build::(&payload_descriptor) + .map_err(|err| { + gst::error!(CAT, imp: self, "Failed to write payload descriptor: {err:?}"); + gst::FlowError::Error + })?; + assert_eq!(payload_descriptor_buffer.len(), payload_descriptor_size); + + self.obj().queue_packet( + id.into(), + rtp_types::RtpPacketBuilder::new() + .marker_bit(data.len() == payload_size) + .payload(payload_descriptor_buffer.as_slice()) + .payload(&data[..payload_size]), + )?; + + data = &data[payload_size..]; + current_offset += payload_size as u32; + first = false; + } + + // If this temporal layer zero then increment the temporal layer zero index. + // + // FIXME: This is only correct for prediction structures where higher layers always refers + // to the previous base layer frame. + if meta.map_or(true, |meta| matches!(meta.layer_id, Some((0, _)))) { + if let Some(ref mut temporal_layer_zero_index) = state.temporal_layer_zero_index { + *temporal_layer_zero_index = temporal_layer_zero_index.wrapping_add(1); + gst::trace!(CAT, imp: self, "Updated temporal layer zero index to {temporal_layer_zero_index}"); + } + } + + let next_picture_id = picture_id.map(PictureId::increment); + *self.picture_id.lock().unwrap() = next_picture_id; + + Ok(gst::FlowSuccess::Ok) + } + + fn transform_meta( + &self, + in_buf: &gst::BufferRef, + meta: &gst::MetaRef, + out_buf: &mut gst::BufferRef, + ) { + // Drop VP8 custom meta, handle all other metas normally. + if meta + .try_as_custom_meta() + .map_or(false, |meta| meta.has_name("GstVP8Meta")) + { + return; + } + + self.parent_transform_meta(in_buf, meta, out_buf) + } +} + +struct VP8Meta { + layer_id: Option<(u32, bool)>, +} + +impl VP8Meta { + fn from_buffer(buffer: &gst::BufferRef) -> Option { + let meta = gst::meta::CustomMeta::from_buffer(buffer, "GstVP8Meta").ok()?; + + let s = meta.structure(); + + let layer_id = if s.get::("use-temporal-scaling") == Ok(true) { + let layer_id = s.get::("layer-id").ok()?; + let layer_sync = s.get::("layer-sync").ok()?; + + Some((layer_id, layer_sync)) + } else { + None + }; + + Some(VP8Meta { layer_id }) + } +} + +/// Returns the partition for a given offset, the start offset of that partition and its end +/// offset. +fn find_partition_for_offset(partition_offsets: &[u32], offset: u32) -> (usize, u32, u32) { + // There are always at least two items: 0 and the whole frame length. + + assert!(!partition_offsets.is_empty()); + + for (idx, offsets) in partition_offsets.windows(2).enumerate() { + let [start, end] = [offsets[0], offsets[1]]; + + if (start..end).contains(&offset) { + return (idx, start, end); + } + } + + // Can't get here because the last offset is always the length of the whole frame. + unreachable!(); +} diff --git a/net/rtp/src/vp8/pay/mod.rs b/net/rtp/src/vp8/pay/mod.rs new file mode 100644 index 00000000..24fedd99 --- /dev/null +++ b/net/rtp/src/vp8/pay/mod.rs @@ -0,0 +1,65 @@ +// +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +pub mod imp; + +glib::wrapper! { + pub struct RtpVp8Pay(ObjectSubclass) + @extends crate::basepay::RtpBasePay2, gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + #[cfg(feature = "doc")] + { + PictureIdMode::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); + FragmentationMode::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); + } + + gst::Element::register( + Some(plugin), + "rtpvp8pay2", + gst::Rank::MARGINAL, + RtpVp8Pay::static_type(), + ) +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq, glib::Enum, Default)] +#[enum_type(name = "GstRtpVp8Pay2PictureIdMode")] +#[repr(i32)] +pub enum PictureIdMode { + #[default] + #[enum_value(name = "No Picture ID", nick = "none")] + None, + #[enum_value(name = "7-bit PictureID", nick = "7-bit")] + SevenBit, + #[enum_value(name = "15-bit Picture ID", nick = "15-bit")] + FifteenBit, +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq, glib::Enum, Default)] +#[enum_type(name = "GstRtpVp8Pay2FragmentationMode")] +#[repr(i32)] +pub enum FragmentationMode { + #[default] + #[enum_value(name = "Fit as much into each packet as possible", nick = "none")] + None, + #[enum_value( + name = "Make sure that every partition starts at the start of a packet", + nick = "partition-start" + )] + PartitionStart, + #[enum_value( + name = "Create a new packet for every partition", + nick = "every-partition" + )] + EveryPartition, +} diff --git a/net/rtp/src/vp8/payload_descriptor.rs b/net/rtp/src/vp8/payload_descriptor.rs new file mode 100644 index 00000000..56b1823b --- /dev/null +++ b/net/rtp/src/vp8/payload_descriptor.rs @@ -0,0 +1,303 @@ +// +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use std::io; + +use anyhow::{bail, Context as _}; +use bitstream_io::{BigEndian, ByteWrite as _, ByteWriter, FromByteStream, ToByteStream}; + +const X_BIT: u8 = 0b1000_0000; +const N_BIT: u8 = 0b0010_0000; +const S_BIT: u8 = 0b0001_0000; +const I_BIT: u8 = 0b1000_0000; +const L_BIT: u8 = 0b0100_0000; +const T_BIT: u8 = 0b0010_0000; +const K_BIT: u8 = 0b0001_0000; +const M_BIT: u8 = 0b1000_0000; + +#[derive(Debug, Clone)] +pub struct PayloadDescriptor { + pub non_reference_frame: bool, + pub start_of_partition: bool, + pub partition_index: u8, + + pub picture_id: Option, + pub temporal_layer_zero_index: Option, + pub temporal_layer_id: Option, + pub key_index: Option, +} + +#[derive(Debug, Clone)] +pub struct LayerId { + pub id: u8, + pub sync: bool, +} + +impl PayloadDescriptor { + pub fn size(&self) -> Result { + #[derive(Default)] + struct Counter(usize); + + impl io::Write for Counter { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.0 += buf.len(); + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + } + + let mut counter = Counter::default(); + let mut w = ByteWriter::endian(&mut counter, BigEndian); + w.build::(self)?; + + Ok(counter.0) + } +} + +impl FromByteStream for PayloadDescriptor { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> Result + where + Self: Sized, + { + let flags = r.read::().context("flags")?; + + let non_reference_frame = (flags & N_BIT) != 0; + let start_of_partition = (flags & S_BIT) != 0; + let partition_index = flags & 0b0000_0111; + + let ext_flags = if (flags & X_BIT) != 0 { + r.read::().context("ext_flags")? + } else { + 0 + }; + + let picture_id = if (ext_flags & I_BIT) != 0 { + Some(r.parse::().context("picture_id")?) + } else { + None + }; + + let temporal_layer_zero_index = if (ext_flags & L_BIT) != 0 { + Some(r.read::().context("temporal_layer_zero_index")?) + } else { + None + }; + + let (temporal_layer_id, key_index) = if (ext_flags & T_BIT) != 0 || (ext_flags & K_BIT) != 0 + { + let b = r.read::().context("tid_y_keyidx")?; + + let temporal_layer_id = if (ext_flags & T_BIT) != 0 { + Some(LayerId { + id: b >> 6, + sync: (b & 0b0010_0000) != 0, + }) + } else { + None + }; + + let key_index = if (ext_flags & K_BIT) != 0 { + Some(b & 0b0001_1111) + } else { + None + }; + + (temporal_layer_id, key_index) + } else { + (None, None) + }; + + Ok(PayloadDescriptor { + non_reference_frame, + start_of_partition, + partition_index, + picture_id, + temporal_layer_zero_index, + temporal_layer_id, + key_index, + }) + } +} + +impl ToByteStream for PayloadDescriptor { + type Error = anyhow::Error; + + fn to_writer(&self, w: &mut W) -> Result<(), Self::Error> + where + Self: Sized, + { + if self.partition_index > 0b111 { + bail!("Too large partition index"); + } + + let flags = if self.non_reference_frame { N_BIT } else { 0 } + | if self.start_of_partition { S_BIT } else { 0 } + | if self.picture_id.is_some() + || self.temporal_layer_id.is_some() + || self.temporal_layer_zero_index.is_some() + || self.key_index.is_some() + { + X_BIT + } else { + 0 + } + | self.partition_index; + + w.write::(flags).context("flags")?; + + if (flags & X_BIT) != 0 { + let ext_flags = if self.picture_id.is_some() { I_BIT } else { 0 } + | if self.temporal_layer_zero_index.is_some() { + L_BIT + } else { + 0 + } + | if self.temporal_layer_id.is_some() { + T_BIT + } else { + 0 + } + | if self.key_index.is_some() { K_BIT } else { 0 }; + w.write::(ext_flags).context("ext_flags")?; + } + + if let Some(picture_id) = self.picture_id { + w.build::(&picture_id).context("picture_id")?; + } + + if let Some(temporal_layer_zero_index) = self.temporal_layer_zero_index { + w.write::(temporal_layer_zero_index) + .context("temporal_layer_zero_index")?; + } + + if self.temporal_layer_id.is_some() || self.key_index.is_some() { + let mut b = 0; + + if let Some(LayerId { id, sync }) = self.temporal_layer_id { + if id > 0b11 { + bail!("Too high temporal layer id"); + } + + b |= id << 6; + b |= if sync { 0b0010_0000 } else { 0 }; + } + + if let Some(key_index) = self.key_index { + if key_index > 0b0001_1111 { + bail!("Too high key index"); + } + b |= key_index; + } + + w.write::(b).context("tid_y_keyidx")?; + } + + Ok(()) + } +} + +#[derive(Debug, Clone, Copy)] +pub enum PictureId { + SevenBit(u8), + FifteenBit(u16), +} + +impl PictureId { + pub fn new(mode: super::pay::PictureIdMode, v: u16) -> Option { + match mode { + super::pay::PictureIdMode::None => None, + super::pay::PictureIdMode::SevenBit => Some(PictureId::SevenBit((v & 0x7f) as u8)), + super::pay::PictureIdMode::FifteenBit => Some(PictureId::FifteenBit(v & 0x7fff)), + } + } + + pub fn increment(self) -> Self { + match self { + PictureId::SevenBit(v) => PictureId::SevenBit((v + 1) & 0x7f), + PictureId::FifteenBit(v) => PictureId::FifteenBit((v + 1) & 0x7fff), + } + } + + pub fn update_mode(self, mode: super::pay::PictureIdMode) -> Self { + match (self, mode) { + (_, super::pay::PictureIdMode::None) => self, + (PictureId::SevenBit(_), super::pay::PictureIdMode::SevenBit) => self, + (PictureId::FifteenBit(_), super::pay::PictureIdMode::FifteenBit) => self, + (PictureId::SevenBit(v), super::pay::PictureIdMode::FifteenBit) => { + PictureId::FifteenBit(v as u16) + } + (PictureId::FifteenBit(v), super::pay::PictureIdMode::SevenBit) => { + PictureId::SevenBit((v & 0x7f) as u8) + } + } + } +} + +impl From for u16 { + fn from(value: PictureId) -> Self { + match value { + PictureId::SevenBit(v) => v as u16, + PictureId::FifteenBit(v) => v, + } + } +} + +impl PartialEq for PictureId { + fn eq(&self, other: &Self) -> bool { + match (*self, *other) { + (PictureId::SevenBit(s), PictureId::SevenBit(o)) => s == o, + (PictureId::SevenBit(s), PictureId::FifteenBit(o)) => s == (o & 0x7f) as u8, + (PictureId::FifteenBit(s), PictureId::SevenBit(o)) => (s & 0x7f) as u8 == o, + (PictureId::FifteenBit(s), PictureId::FifteenBit(o)) => s == o, + } + } +} + +impl FromByteStream for PictureId { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> Result + where + Self: Sized, + { + let pid = r.read::().context("picture_id")?; + if (pid & M_BIT) != 0 { + let ext_pid = r.read::().context("extended_pid")?; + Ok(PictureId::FifteenBit( + (((pid & !M_BIT) as u16) << 8) | ext_pid as u16, + )) + } else { + Ok(PictureId::SevenBit(pid)) + } + } +} + +impl ToByteStream for PictureId { + type Error = anyhow::Error; + + fn to_writer(&self, w: &mut W) -> Result<(), Self::Error> + where + Self: Sized, + { + match self { + PictureId::SevenBit(v) => w.write::(*v).context("picture_id"), + PictureId::FifteenBit(v) => { + w.write::(M_BIT | (v >> 8) as u8) + .context("picture_id")?; + w.write::((v & 0b1111_1111) as u8) + .context("extended_pid") + } + } + } +} diff --git a/net/rtp/src/vp8/tests.rs b/net/rtp/src/vp8/tests.rs new file mode 100644 index 00000000..94e481f4 --- /dev/null +++ b/net/rtp/src/vp8/tests.rs @@ -0,0 +1,492 @@ +// +// Copyright (C) 2024 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use crate::tests::{run_test_pipeline, ExpectedBuffer, ExpectedPacket, Source}; + +fn init() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + gst::init().unwrap(); + crate::plugin_register_static().expect("rtpvp8 test"); + }); +} + +#[test] +fn test_vp8() { + init(); + + // Generates encoded frames of sizes 1915 (key), 110, 103, 100, 100 + let src = "videotestsrc num-buffers=5 pattern=smpte100 ! video/x-raw,format=I420,width=1280,height=720,framerate=25/1 ! vp8enc target-bitrate=4000000"; + let pay = "rtpvp8pay2 picture-id-mode=7-bit"; + let depay = "rtpvp8depay2"; + + let expected_pay = vec![ + vec![ + // First frame is split into two packets + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .flags(gst::BufferFlags::DISCONT) + .pt(96) + .rtp_time(0) + .marker_bit(false) + .size(1400) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .flags(gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(0) + .marker_bit(true) + .size(545) + .build(), + ], + // Second and following frames + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(40)) + .flags(gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(3_600) + .marker_bit(true) + .size(125) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(80)) + .flags(gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(7_200) + .marker_bit(true) + .size(118) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(120)) + .flags(gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(10_800) + .marker_bit(true) + .size(115) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(160)) + .flags(gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(14_400) + .marker_bit(true) + .size(115) + .build()], + ]; + + let expected_depay = vec![ + // One buffer per frame + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .size(1915) + .flags(gst::BufferFlags::DISCONT | gst::BufferFlags::MARKER) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(40)) + .size(110) + .flags(gst::BufferFlags::MARKER | gst::BufferFlags::DELTA_UNIT) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(80)) + .size(103) + .flags(gst::BufferFlags::MARKER | gst::BufferFlags::DELTA_UNIT) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(120)) + .size(100) + .flags(gst::BufferFlags::MARKER | gst::BufferFlags::DELTA_UNIT) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(160)) + .size(100) + .flags(gst::BufferFlags::MARKER | gst::BufferFlags::DELTA_UNIT) + .build()], + ]; + + run_test_pipeline(Source::Bin(src), pay, depay, expected_pay, expected_depay); +} + +#[test] +fn test_vp8_small_mtu() { + init(); + + // Generates encoded frames of sizes 1915 (key), 110, 103, 100, 100 + let src = "videotestsrc num-buffers=5 pattern=smpte100 ! video/x-raw,format=I420,width=1280,height=720,framerate=25/1 ! vp8enc target-bitrate=4000000"; + let pay = "rtpvp8pay2 mtu=800 picture-id-mode=15-bit"; + let depay = "rtpvp8depay2"; + + let expected_pay = vec![ + vec![ + // First frame is split into three packets + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .flags(gst::BufferFlags::DISCONT) + .pt(96) + .rtp_time(0) + .marker_bit(false) + .size(800) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(0) + .marker_bit(false) + .size(800) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .flags(gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(0) + .marker_bit(true) + .size(363) + .build(), + ], + // Second and following frames + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(40)) + .flags(gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(3_600) + .marker_bit(true) + .size(126) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(80)) + .flags(gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(7_200) + .marker_bit(true) + .size(119) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(120)) + .flags(gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(10_800) + .marker_bit(true) + .size(116) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(160)) + .flags(gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(14_400) + .marker_bit(true) + .size(116) + .build()], + ]; + + let expected_depay = vec![ + // One buffer per frame + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .size(1915) + .flags(gst::BufferFlags::DISCONT | gst::BufferFlags::MARKER) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(40)) + .size(110) + .flags(gst::BufferFlags::MARKER | gst::BufferFlags::DELTA_UNIT) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(80)) + .size(103) + .flags(gst::BufferFlags::MARKER | gst::BufferFlags::DELTA_UNIT) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(120)) + .size(100) + .flags(gst::BufferFlags::MARKER | gst::BufferFlags::DELTA_UNIT) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(160)) + .size(100) + .flags(gst::BufferFlags::MARKER | gst::BufferFlags::DELTA_UNIT) + .build()], + ]; + + run_test_pipeline(Source::Bin(src), pay, depay, expected_pay, expected_depay); +} + +#[test] +fn test_vp8_partitions() { + init(); + + // Generates encoded frames of sizes 1927 (key), 122, 115, 112, 112 + let src = "videotestsrc num-buffers=5 pattern=smpte100 ! video/x-raw,format=I420,width=1280,height=720,framerate=25/1 ! vp8enc token-partitions=4 target-bitrate=4000000"; + let pay = "rtpvp8pay2 mtu=800 fragmentation-mode=every-partition picture-id-mode=15-bit"; + let depay = "rtpvp8depay2"; + + let expected_pay = vec![ + vec![ + // First frame is split into seven packets (3 first partition, 4 for the other partitions) + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .flags(gst::BufferFlags::DISCONT) + .pt(96) + .rtp_time(0) + .marker_bit(false) + .size(800) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(0) + .marker_bit(false) + .size(800) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(0) + .marker_bit(false) + .size(190) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(0) + .marker_bit(false) + .size(187) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(0) + .marker_bit(false) + .size(28) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(0) + .marker_bit(false) + .size(17) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .flags(gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(0) + .marker_bit(true) + .size(17) + .build(), + ], + // Second and following frames are all split into 5 packets, one per partition + vec![ + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(40)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(3_600) + .marker_bit(false) + .size(129) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(40)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(3_600) + .marker_bit(false) + .size(22) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(40)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(3_600) + .marker_bit(false) + .size(17) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(40)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(3_600) + .marker_bit(false) + .size(17) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(40)) + .flags(gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(3_600) + .marker_bit(true) + .size(17) + .build(), + ], + vec![ + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(80)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(7_200) + .marker_bit(false) + .size(125) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(80)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(7_200) + .marker_bit(false) + .size(19) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(80)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(7_200) + .marker_bit(false) + .size(17) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(80)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(7_200) + .marker_bit(false) + .size(17) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(80)) + .flags(gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(7_200) + .marker_bit(true) + .size(17) + .build(), + ], + vec![ + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(120)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(10_800) + .marker_bit(false) + .size(124) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(120)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(10_800) + .marker_bit(false) + .size(17) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(120)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(10_800) + .marker_bit(false) + .size(17) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(120)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(10_800) + .marker_bit(false) + .size(17) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(120)) + .flags(gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(10_800) + .marker_bit(true) + .size(17) + .build(), + ], + vec![ + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(160)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(14_400) + .marker_bit(false) + .size(124) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(160)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(14_400) + .marker_bit(false) + .size(17) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(160)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(14_400) + .marker_bit(false) + .size(17) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(160)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(14_400) + .marker_bit(false) + .size(17) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(160)) + .flags(gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(14_400) + .marker_bit(true) + .size(17) + .build(), + ], + ]; + + let expected_depay = vec![ + // One buffer per frame + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .size(1927) + .flags(gst::BufferFlags::DISCONT | gst::BufferFlags::MARKER) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(40)) + .size(122) + .flags(gst::BufferFlags::MARKER | gst::BufferFlags::DELTA_UNIT) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(80)) + .size(115) + .flags(gst::BufferFlags::MARKER | gst::BufferFlags::DELTA_UNIT) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(120)) + .size(112) + .flags(gst::BufferFlags::MARKER | gst::BufferFlags::DELTA_UNIT) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(160)) + .size(112) + .flags(gst::BufferFlags::MARKER | gst::BufferFlags::DELTA_UNIT) + .build()], + ]; + + run_test_pipeline(Source::Bin(src), pay, depay, expected_pay, expected_depay); +} diff --git a/net/rtp/src/vp9/depay/imp.rs b/net/rtp/src/vp9/depay/imp.rs new file mode 100644 index 00000000..b1a13bdd --- /dev/null +++ b/net/rtp/src/vp9/depay/imp.rs @@ -0,0 +1,569 @@ +// +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * SECTION:element-rtpvp9depay2 + * @see_also: rtpvp9pay2, vp9enc, vp9dec + * + * Depayload a VP9 video stream from RTP packets as per [draft-ietf-payload-vp9][draft-ietf-payload-vp9]. + * + * [draft-ietf-payload-vp9]:https://datatracker.ietf.org/doc/html/draft-ietf-payload-vp9-16#section-4 + * + * ## Example pipeline + * + * ```shell + * gst-launch-1.0 udpsrc address=127.0.0.1 port=5555 caps='application/x-rtp,media=video,clock-rate=90000,encoding-name=VP9' ! rtpjitterbuffer latency=100 ! rtpvp9depay2 ! decodebin3 ! videoconvertscale ! autovideosink + * ``` + * + * This will depayload and decode an incoming RTP VP9 video stream. You can use the #rtpvp9pay2 + * and #vp9enc elements to create such an RTP stream. + * + * Since: plugins-rs-0.13.0 + */ +use std::{io::Cursor, mem, sync::Mutex}; + +use atomic_refcell::AtomicRefCell; +use bitstream_io::{BigEndian, BitRead as _, BitReader, ByteRead as _, ByteReader}; + +use gst::{glib, prelude::*, subclass::prelude::*}; + +use once_cell::sync::Lazy; + +use crate::basedepay::{PacketToBufferRelation, RtpBaseDepay2Ext}; +use crate::vp9::frame_header::FrameHeader; +use crate::vp9::payload_descriptor::{PayloadDescriptor, PictureId}; + +#[derive(Clone, Default)] +struct Settings { + request_keyframe: bool, + wait_for_keyframe: bool, +} + +struct State { + /// Last extended RTP timestamp. + last_timestamp: Option, + + /// Last picture ID, if any. + /// + /// This is the picture ID from the last picture and is reset + /// to `None` also if a picture doesn't have any ID. + last_picture_id: Option, + + /// Payload descriptor of the first packet of the last key picture. + /// + /// If this is not set then we did not see a keyframe yet. + last_key_picture_payload_descriptor: Option, + + /// Frame header of the last keyframe. + /// + /// For scalable streams this is set to the last frame of the picture. + last_keyframe_frame_header: Option, + + /// Frame header of the current keyframe, if any. + /// + /// For scalable streams this is set to the last frame of the picture. + /// + /// This is only set if the current picture is a key picture and is reset whenever a picture is + /// pushed downstream. + current_keyframe_frame_header: Option, + + /// Payload descriptor of the first packet of the current picture. + /// + /// This is reset whenever the current picture is pushed downstream. + current_picture_payload_descriptor: Option, + + /// Currently queued data for the current picture. + pending_picture_ext_seqnum: u64, + pending_picture: Vec, + + /// Set to `true` if the next outgoing buffer should have the `DISCONT` flag set. + needs_discont: bool, +} + +impl Default for State { + fn default() -> Self { + State { + last_timestamp: None, + last_picture_id: None, + last_key_picture_payload_descriptor: None, + last_keyframe_frame_header: None, + current_keyframe_frame_header: None, + current_picture_payload_descriptor: None, + pending_picture_ext_seqnum: 0, + pending_picture: Vec::default(), + needs_discont: true, + } + } +} + +#[derive(Default)] +pub struct RtpVp9Depay { + state: AtomicRefCell, + settings: Mutex, +} + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "rtpvp9depay2", + gst::DebugColorFlags::empty(), + Some("RTP VP9 Depayloader"), + ) +}); + +impl RtpVp9Depay { + fn reset(&self, state: &mut State) { + gst::debug!(CAT, imp: self, "resetting state"); + + *state = State::default() + } +} + +#[glib::object_subclass] +impl ObjectSubclass for RtpVp9Depay { + const NAME: &'static str = "GstRtpVp9Depay2"; + type Type = super::RtpVp9Depay; + type ParentType = crate::basedepay::RtpBaseDepay2; +} + +impl ObjectImpl for RtpVp9Depay { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecBoolean::builder("request-keyframe") + .nick("Request Keyframe") + .blurb("Request new keyframe when packet loss is detected") + .default_value(Settings::default().request_keyframe) + .mutable_ready() + .build(), + glib::ParamSpecBoolean::builder("wait-for-keyframe") + .nick("Wait For Keyframe") + .blurb("Wait for the next keyframe after packet loss") + .default_value(Settings::default().wait_for_keyframe) + .mutable_ready() + .build(), + ] + }); + + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "request-keyframe" => { + self.settings.lock().unwrap().request_keyframe = value.get().unwrap(); + } + "wait-for-keyframe" => { + self.settings.lock().unwrap().wait_for_keyframe = value.get().unwrap(); + } + _ => unimplemented!(), + }; + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "request-keyframe" => self.settings.lock().unwrap().request_keyframe.to_value(), + "wait-for-keyframe" => self.settings.lock().unwrap().wait_for_keyframe.to_value(), + _ => unimplemented!(), + } + } +} + +impl GstObjectImpl for RtpVp9Depay {} + +impl ElementImpl for RtpVp9Depay { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "RTP VP9 Depayloader", + "Codec/Depayloader/Network/RTP", + "Depayload VP9 from RTP packets", + "Sebastian Dröge ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &gst::Caps::builder("application/x-rtp") + .field("media", "video") + .field("clock-rate", 90_000i32) + .field( + "encoding-name", + gst::List::new(["VP9", "VP9-DRAFT-IETF-01"]), + ) + .build(), + ) + .unwrap(); + + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::builder("video/x-vp9").build(), + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} + +impl crate::basedepay::RtpBaseDepay2Impl for RtpVp9Depay { + const ALLOWED_META_TAGS: &'static [&'static str] = &["video"]; + + fn start(&self) -> Result<(), gst::ErrorMessage> { + let mut state = self.state.borrow_mut(); + self.reset(&mut state); + + Ok(()) + } + + fn stop(&self) -> Result<(), gst::ErrorMessage> { + let mut state = self.state.borrow_mut(); + self.reset(&mut state); + + Ok(()) + } + + fn drain(&self) -> Result { + // TODO: Could forward all complete layers here if any are queued up + Ok(gst::FlowSuccess::Ok) + } + + fn flush(&self) { + let mut state = self.state.borrow_mut(); + self.reset(&mut state); + } + + // TODO: Might want to send lost events (and possibly ignore the ones from upstream) if there + // are discontinuities (either in the seqnum or otherwise detected). This is especially useful + // in case of ULPFEC as that breaks seqnum-based discontinuity detecetion. + // + // rtpvp9depay does this but it feels like the whole approach needs some redesign. + fn handle_packet( + &self, + packet: &crate::basedepay::Packet, + ) -> Result { + let settings = self.settings.lock().unwrap().clone(); + + gst::trace!(CAT, imp: self, "Handling RTP packet {packet:?}"); + let mut state = self.state.borrow_mut(); + + let payload = packet.payload(); + let mut cursor = Cursor::new(payload); + + let mut r = ByteReader::endian(&mut cursor, BigEndian); + let payload_descriptor = match r.parse::() { + Ok(payload_descriptor) => payload_descriptor, + Err(err) => { + gst::warning!(CAT, imp: self, "Invalid VP9 RTP packet: {err}"); + // TODO: Could potentially drain here? + self.reset(&mut state); + self.obj().drop_packet(packet); + return Ok(gst::FlowSuccess::Ok); + } + }; + + let payload_start_index = cursor.position() as usize; + + gst::trace!(CAT, imp: self, "VP9 RTP payload descriptor size: {}", payload_start_index); + gst::trace!(CAT, imp: self, "Received VP9 RTP payload descriptor: {payload_descriptor:?}"); + + // This is the start of a picture if this is the start of the frame and either there is no + // layer information or this is the first spatial layer. + let is_start_of_picture = payload_descriptor.start_of_frame + && payload_descriptor + .layer_index + .as_ref() + .map_or(true, |layer_index| layer_index.spatial_layer_id == 0); + + // Additionally, this is a key picture if it is not an inter predicted picture. + let is_key_picture = + !payload_descriptor.inter_picture_predicted_frame && is_start_of_picture; + + // If the timestamp or picture ID is changing we assume that a new picture is starting. + // Any previously queued picture data needs to be drained now. + if is_start_of_picture + || state.last_timestamp != Some(packet.ext_timestamp()) + || state.last_picture_id.map_or(false, |picture_id| { + Some(picture_id) != payload_descriptor.picture_id + }) + { + // Missed the marker packet for the last picture + if state.current_picture_payload_descriptor.is_some() { + gst::warning!(CAT, imp: self, "Packet is part of a new picture but didn't receive last packet of previous picture"); + // TODO: Could potentially drain here? + self.reset(&mut state); + } + // Else cleanly starting a new picture here + } + + // Validate payload descriptor + if let Some(ref last_keyframe_payloader_descriptor) = + state.last_key_picture_payload_descriptor + { + // Section 4.2, I flag + // + // > If the V bit was set in the stream's most recent start of a keyframe (i.e. the SS + // > field was present) and the F bit is set to 0 (i.e. non-flexible scalability mode is + // > in use), then this bit MUST be set on every packet. + // + // This check is extended here to not just check for presence of the SS field but check + // that there are multiple spatial layers. If there is only one then we treat it as if + // the field wasn't set. + if last_keyframe_payloader_descriptor + .scalability_structure + .as_ref() + .map_or(false, |scalability_structure| { + scalability_structure.num_spatial_layers > 1 + }) + && !payload_descriptor.flexible_mode + && payload_descriptor.picture_id.is_none() + { + gst::warning!( + CAT, + imp: self, + "Scalability structure present and non-flexible scalability mode used but no picture ID present", + ); + // TODO: Could potentially drain here? + self.reset(&mut state); + self.obj().drop_packet(packet); + return Ok(gst::FlowSuccess::Ok); + } + + // In other words, picture IDs are only optional if non-flexible scalability mode is + // used and there was no scalability structure in the keyframe. + + // Section 4.2, F flag + // + // > The value of this F bit MUST only change on the first packet of a key picture. A + // > key picture is a picture whose base spatial layer frame is a key frame, and which + // > thus completely resets the encoder state. This packet will have its P bit equal to + // > zero, SID or L bit (described below) equal to zero, and B bit (described below) + // > equal to 1. + if !is_key_picture + && last_keyframe_payloader_descriptor.flexible_mode + != payload_descriptor.flexible_mode + { + gst::warning!(CAT, imp: self, "Flexible scalability mode can only change on key pictures"); + + // TODO: Could potentially drain here? + self.reset(&mut state); + self.obj().drop_packet(packet); + return Ok(gst::FlowSuccess::Ok); + } + } + + // Section 4.2, P flag + // + // > When P is set to zero, the TID field (described below) MUST also be set to 0 (if + // > present). + if !payload_descriptor.inter_picture_predicted_frame + && payload_descriptor + .layer_index + .as_ref() + .map_or(false, |layer_index| layer_index.temporal_layer_id != 0) + { + gst::warning!(CAT, imp: self, "Temporal layer ID of non-inter-predicted frame must be 0"); + // TODO: Could potentially drain here? + self.reset(&mut state); + self.obj().drop_packet(packet); + return Ok(gst::FlowSuccess::Ok); + } + + // Section 4.2, F flag + // + // > This MUST only be set to 1 if the I bit is also set to one; if the I bit is set to + // > zero, then this MUST also be set to zero and ignored by receivers. + if payload_descriptor.flexible_mode && payload_descriptor.picture_id.is_none() { + gst::warning!(CAT, imp: self, "Flexible scalability mode but no picture ID present"); + // TODO: Could potentially drain here? + self.reset(&mut state); + self.obj().drop_packet(packet); + return Ok(gst::FlowSuccess::Ok); + } + + // If this is not the start of a picture then we have to wait for one + if state.current_picture_payload_descriptor.is_none() && !is_start_of_picture { + if state.last_timestamp.is_some() { + gst::warning!(CAT, imp: self, "Waiting for start of picture"); + } else { + gst::trace!(CAT, imp: self, "Waiting for start of picture"); + } + // TODO: Could potentially drain here? + self.obj().drop_packet(packet); + self.reset(&mut state); + return Ok(gst::FlowSuccess::Ok); + } + + // If necessary wait for a key picture if we never saw one so far and/or request one + // from upstream. + if is_start_of_picture + && !is_key_picture + && state.last_key_picture_payload_descriptor.is_none() + { + if settings.request_keyframe { + gst::debug!(CAT, imp: self, "Requesting keyframe from upstream"); + let event = gst_video::UpstreamForceKeyUnitEvent::builder() + .all_headers(true) + .build(); + let _ = self.obj().sink_pad().push_event(event); + } + + if settings.wait_for_keyframe { + gst::trace!(CAT, imp: self, "Waiting for keyframe"); + // TODO: Could potentially drain here? + self.reset(&mut state); + self.obj().drop_packet(packet); + return Ok(gst::FlowSuccess::Ok); + } + } + + // Update state tracking + if is_start_of_picture { + assert!(state.pending_picture.is_empty()); + state.pending_picture_ext_seqnum = packet.ext_seqnum(); + state.current_picture_payload_descriptor = Some(payload_descriptor.clone()); + state.last_timestamp = Some(packet.ext_timestamp()); + if let Some(picture_id) = payload_descriptor.picture_id { + state.last_picture_id = Some(picture_id); + } else { + state.last_picture_id = None; + } + + if is_key_picture { + state.last_key_picture_payload_descriptor = Some(payload_descriptor.clone()); + } + } + + // If this is the start of a frame in a key picture then parse the frame header. We always + // keep the last one around as that should theoretically be the one with the highest + // resolution and profile. + if payload_descriptor.start_of_frame + && state.current_picture_payload_descriptor.as_ref().map_or( + false, + |current_picture_payload_descriptor| { + !current_picture_payload_descriptor.inter_picture_predicted_frame + }, + ) + { + let mut r = BitReader::endian(&mut cursor, BigEndian); + // We assume that the beginning of the frame header fits into the first packet + match r.parse::() { + Ok(frame_header) => { + gst::trace!(CAT, imp: self, "Parsed frame header: {frame_header:?}"); + state.current_keyframe_frame_header = Some(frame_header); + } + Err(err) => { + // Don't consider this a fatal error + gst::warning!(CAT, imp: self, "Failed to read frame header: {err}"); + } + }; + } + + state + .pending_picture + .extend_from_slice(&payload[payload_start_index..]); + + // The marker bit is set for the last packet of a picture. + if !packet.marker_bit() { + return Ok(gst::FlowSuccess::Ok); + } + + let current_picture_payload_descriptor = + state.current_picture_payload_descriptor.take().unwrap(); + + if let Some(current_keyframe_frame_header) = state.current_keyframe_frame_header.take() { + // TODO: Could also add more information to the caps + if current_keyframe_frame_header.keyframe_info.is_some() + && state.last_keyframe_frame_header.as_ref().map_or( + true, + |last_keyframe_frame_header| { + last_keyframe_frame_header.profile != current_keyframe_frame_header.profile + || last_keyframe_frame_header + .keyframe_info + .as_ref() + .map(|keyframe_info| keyframe_info.render_size()) + != current_keyframe_frame_header + .keyframe_info + .as_ref() + .map(|keyframe_info| keyframe_info.render_size()) + }, + ) + { + let render_size = current_keyframe_frame_header + .keyframe_info + .as_ref() + .map(|keyframe_info| keyframe_info.render_size()) + .unwrap(); + + let caps = gst::Caps::builder("video/x-vp9") + .field( + "profile", + format!("{}", current_keyframe_frame_header.profile), + ) + .field("width", render_size.0 as i32) + .field("height", render_size.1 as i32) + .build(); + + self.obj().set_src_caps(&caps); + } + state.last_keyframe_frame_header = Some(current_keyframe_frame_header); + } + + let mut buffer = gst::Buffer::from_mut_slice(mem::take(&mut state.pending_picture)); + { + let buffer = buffer.get_mut().unwrap(); + + if current_picture_payload_descriptor.inter_picture_predicted_frame { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + gst::trace!(CAT, imp: self, "Finishing delta-frame"); + } else { + gst::trace!(CAT, imp: self, "Finishing keyframe"); + } + + if state.needs_discont { + gst::trace!(CAT, imp: self, "Setting DISCONT"); + buffer.set_flags(gst::BufferFlags::DISCONT); + state.needs_discont = false; + } + + // Set MARKER flag on the output so that the parser knows that this buffer ends a full + // picture and potentially can operate a bit faster. + buffer.set_flags(gst::BufferFlags::MARKER); + } + + state.current_picture_payload_descriptor = None; + state.current_keyframe_frame_header = None; + + // Set fallback caps if the first complete frame we have is not a keyframe. For keyframes, + // caps with profile and resolution would've been set above already. + // + // If a keyframe is received in the future then the caps are updated above. + if !self.obj().src_pad().has_current_caps() { + self.obj() + .set_src_caps(&self.obj().src_pad().pad_template_caps()); + } + + self.obj().queue_buffer( + PacketToBufferRelation::Seqnums(state.pending_picture_ext_seqnum..=packet.ext_seqnum()), + buffer, + )?; + + Ok(gst::FlowSuccess::Ok) + } +} diff --git a/net/rtp/src/vp9/depay/mod.rs b/net/rtp/src/vp9/depay/mod.rs new file mode 100644 index 00000000..28f859f7 --- /dev/null +++ b/net/rtp/src/vp9/depay/mod.rs @@ -0,0 +1,27 @@ +// +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +pub mod imp; + +glib::wrapper! { + pub struct RtpVp9Depay(ObjectSubclass) + @extends crate::basedepay::RtpBaseDepay2, gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "rtpvp9depay2", + gst::Rank::MARGINAL, + RtpVp9Depay::static_type(), + ) +} diff --git a/net/rtp/src/vp9/frame_header.rs b/net/rtp/src/vp9/frame_header.rs new file mode 100644 index 00000000..ce80932a --- /dev/null +++ b/net/rtp/src/vp9/frame_header.rs @@ -0,0 +1,230 @@ +// +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use anyhow::{bail, Context as _}; +use bitstream_io::{FromBitStream, FromBitStreamWith}; + +#[derive(Debug)] +pub struct FrameHeader { + pub profile: u8, + pub show_existing_frame: bool, + // All below only set for !show_existing_frame + pub is_keyframe: Option, + pub show_frame: Option, + pub error_resilient_mode: Option, + pub keyframe_info: Option, + // More fields follow that we don't parse + // TODO: intra-only frames can also have a frame size +} + +impl FromBitStream for FrameHeader { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> Result + where + Self: Sized, + { + let marker = r.read::(2).context("frame_marker")?; + if marker != 2 { + bail!("Wrong frame marker"); + } + + let profile_low_bit = r.read::(1).context("profile_low_bit")?; + let profile_high_bit = r.read::(1).context("profile_high_bit")?; + + let profile = (profile_high_bit << 1) | profile_low_bit; + if profile == 3 { + r.skip(1).context("reserved")?; + } + + let show_existing_frame = r.read_bit().context("show_existing_frame")?; + if show_existing_frame { + return Ok(FrameHeader { + profile, + show_existing_frame, + is_keyframe: None, + show_frame: None, + error_resilient_mode: None, + keyframe_info: None, + }); + } + + let is_keyframe = !r.read_bit().context("frame_type")?; + let show_frame = r.read_bit().context("show_frame")?; + let error_resilient_mode = r.read_bit().context("error_resilient_mode")?; + + if !is_keyframe { + return Ok(FrameHeader { + profile, + show_existing_frame, + is_keyframe: Some(is_keyframe), + show_frame: Some(show_frame), + error_resilient_mode: Some(error_resilient_mode), + keyframe_info: None, + }); + } + + let keyframe_info = r + .parse_with::(&profile) + .context("keyframe_info")?; + + Ok(FrameHeader { + profile, + show_existing_frame, + is_keyframe: Some(is_keyframe), + show_frame: Some(show_frame), + error_resilient_mode: Some(error_resilient_mode), + keyframe_info: Some(keyframe_info), + }) + } +} + +#[derive(Debug)] +pub struct KeyframeInfo { + // sync code + // color config + pub color_config: ColorConfig, + // frame size + pub frame_size: (u32, u32), + // render size + pub render_size: Option<(u32, u32)>, +} + +impl<'a> FromBitStreamWith<'a> for KeyframeInfo { + type Error = anyhow::Error; + + /// Profile + type Context = u8; + + fn from_reader( + r: &mut R, + profile: &Self::Context, + ) -> Result + where + Self: Sized, + { + let sync_code_1 = r.read_to::().context("sync_code_1")?; + let sync_code_2 = r.read_to::().context("sync_code_2")?; + let sync_code_3 = r.read_to::().context("sync_code_3")?; + + if [0x49, 0x83, 0x42] != [sync_code_1, sync_code_2, sync_code_3] { + bail!("Invalid sync code"); + } + + let color_config = r + .parse_with::(profile) + .context("color_config")?; + + let frame_width_minus_1 = r.read_to::().context("frame_width_minus_1")?; + let frame_height_minus_1 = r.read_to::().context("frame_height_minus_1")?; + + let frame_size = ( + frame_width_minus_1 as u32 + 1, + frame_height_minus_1 as u32 + 1, + ); + + let render_and_frame_size_different = + r.read::(1).context("render_and_frame_size_different")? == 1; + + let render_size = if render_and_frame_size_different { + let render_width_minus_1 = r.read_to::().context("render_width_minus_1")?; + let render_height_minus_1 = r.read_to::().context("render_height_minus_1")?; + Some(( + render_width_minus_1 as u32 + 1, + render_height_minus_1 as u32 + 1, + )) + } else { + None + }; + + Ok(KeyframeInfo { + color_config, + frame_size, + render_size, + }) + } +} + +impl KeyframeInfo { + pub fn render_size(&self) -> (u32, u32) { + if let Some((width, height)) = self.render_size { + (width, height) + } else { + self.frame_size + } + } +} + +#[derive(Debug)] +pub struct ColorConfig { + pub bit_depth: u8, + pub color_space: u8, + pub color_range: u8, + pub sub_sampling_x: u8, + pub sub_sampling_y: u8, +} + +impl<'a> FromBitStreamWith<'a> for ColorConfig { + type Error = anyhow::Error; + + /// Profile + type Context = u8; + + fn from_reader( + r: &mut R, + profile: &Self::Context, + ) -> Result + where + Self: Sized, + { + const CS_RGB: u8 = 7; + + let bit_depth = if *profile >= 2 { + let ten_or_twelve_bit = r.read_bit().context("ten_or_twelve_bit")?; + if ten_or_twelve_bit { + 12 + } else { + 10 + } + } else { + 8 + }; + + let color_space = r.read::(3).context("color_space")?; + let (color_range, sub_sampling_x, sub_sampling_y) = if color_space != CS_RGB { + let color_range = r.read::(1).context("color_range")?; + + let (sub_sampling_x, sub_sampling_y) = if *profile == 1 || *profile == 3 { + let sub_sampling_x = r.read::(1).context("sub_sampling_x")?; + let sub_sampling_y = r.read::(1).context("sub_sampling_y")?; + r.skip(1).context("reserved_zero")?; + + (sub_sampling_x, sub_sampling_y) + } else { + (1, 1) + }; + + (color_range, sub_sampling_x, sub_sampling_y) + } else { + if *profile == 1 || *profile == 3 { + r.skip(1).context("reserved_zero")?; + } + + (1, 0, 0) + }; + + Ok(ColorConfig { + bit_depth, + color_space, + color_range, + sub_sampling_x, + sub_sampling_y, + }) + } +} diff --git a/net/rtp/src/vp9/mod.rs b/net/rtp/src/vp9/mod.rs new file mode 100644 index 00000000..bd65c8e1 --- /dev/null +++ b/net/rtp/src/vp9/mod.rs @@ -0,0 +1,16 @@ +// +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +pub mod depay; +mod frame_header; +pub mod pay; +mod payload_descriptor; + +#[cfg(test)] +mod tests; diff --git a/net/rtp/src/vp9/pay/imp.rs b/net/rtp/src/vp9/pay/imp.rs new file mode 100644 index 00000000..f5ff3f81 --- /dev/null +++ b/net/rtp/src/vp9/pay/imp.rs @@ -0,0 +1,368 @@ +// +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * SECTION:element-rtpvp9pay2 + * @see_also: rtpvp9depay2, vp9dec, vp9enc + * + * Payload a VP9 video stream into RTP packets as per [draft-ietf-payload-vp9][draft-ietf-payload-vp9]. + * + * [draft-ietf-payload-vp9]:https://datatracker.ietf.org/doc/html/draft-ietf-payload-vp9-16#section-4 + * + * ## Example pipeline + * + * |[ + * gst-launch-1.0 videotestsrc ! video/x-raw,width=1280,height=720,format=I420 ! timeoverlay font-desc=Sans,22 ! vp9enc ! rtpvp9pay2 ! udpsink host=127.0.0.1 port=5004 + * ]| This will create and payload a VP9 video stream with a test pattern and + * send it out via UDP to localhost port 5004. + * + * Since: plugins-rs-0.13.0 + */ +use gst::{glib, prelude::*, subclass::prelude::*}; +use smallvec::SmallVec; +use std::{cmp, sync::Mutex}; + +use bitstream_io::{BigEndian, BitRead as _, BitReader, ByteWrite as _, ByteWriter}; +use once_cell::sync::Lazy; + +use crate::{ + basepay::{RtpBasePay2Ext, RtpBasePay2ImplExt}, + vp9::{ + frame_header::FrameHeader, + payload_descriptor::{PayloadDescriptor, PictureId}, + }, +}; + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "rtpvp9pay2", + gst::DebugColorFlags::empty(), + Some("RTP VP9 Payloader"), + ) +}); + +#[derive(Clone, Default)] +struct Settings { + picture_id_mode: super::PictureIdMode, + picture_id_offset: Option, +} + +#[derive(Default)] +pub struct RtpVp9Pay { + settings: Mutex, + /// Current picture ID. + /// + /// Reset to `None` in `Null` / `Ready` state and initialized to the offset when going to + /// `Paused`. + picture_id: Mutex>, +} + +#[glib::object_subclass] +impl ObjectSubclass for RtpVp9Pay { + const NAME: &'static str = "GstRtpVp9Pay2"; + type Type = super::RtpVp9Pay; + type ParentType = crate::basepay::RtpBasePay2; +} + +impl ObjectImpl for RtpVp9Pay { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecEnum::builder::("picture-id-mode") + .nick("Picture ID Mode") + .blurb("The picture ID mode for payloading") + .default_value(Settings::default().picture_id_mode) + .mutable_ready() + .build(), + glib::ParamSpecInt::builder("picture-id-offset") + .nick("Picture ID Offset") + .blurb("Offset to add to the initial picture-id (-1 = random)") + .default_value( + Settings::default() + .picture_id_offset + .map(i32::from) + .unwrap_or(-1), + ) + .minimum(-1) + .maximum(0x7fff) + .mutable_ready() + .build(), + glib::ParamSpecInt::builder("picture-id") + .nick("Picture ID") + .blurb("Current Picture ID") + .default_value(-1) + .minimum(-1) + .maximum(0x7fff) + .read_only() + .build(), + ] + }); + + &PROPERTIES + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "picture-id-mode" => { + self.settings.lock().unwrap().picture_id_mode = value.get().unwrap(); + } + "picture-id-offset" => { + let v = value.get::().unwrap(); + self.settings.lock().unwrap().picture_id_offset = + (v != -1).then_some((v & 0x7fff) as u16); + } + _ => unimplemented!(), + }; + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "picture-id-mode" => self.settings.lock().unwrap().picture_id_mode.to_value(), + "picture-id-offset" => self + .settings + .lock() + .unwrap() + .picture_id_offset + .map(i32::from) + .unwrap_or(-1) + .to_value(), + "picture-id" => { + let picture_id = self.picture_id.lock().unwrap(); + picture_id + .map(u16::from) + .map(i32::from) + .unwrap_or(-1) + .to_value() + } + _ => unimplemented!(), + } + } +} + +impl GstObjectImpl for RtpVp9Pay {} + +impl ElementImpl for RtpVp9Pay { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "RTP VP9 payloader", + "Codec/Payloader/Network/RTP", + "Payload VP9 as RTP packets", + "Sebastian Dröge ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &gst::Caps::builder("video/x-vp9").build(), + ) + .unwrap(); + + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::builder("application/x-rtp") + .field("media", "video") + .field("payload", gst::IntRange::new(96, 127)) + .field("clock-rate", 90_000i32) + .field( + "encoding-name", + gst::List::new(["VP9", "VP9-DRAFT-IETF-01"]), + ) + .build(), + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} + +impl crate::basepay::RtpBasePay2Impl for RtpVp9Pay { + const ALLOWED_META_TAGS: &'static [&'static str] = &["video"]; + + fn start(&self) -> Result<(), gst::ErrorMessage> { + let settings = self.settings.lock().unwrap().clone(); + + let picture_id_offset = settings.picture_id_offset.unwrap_or_else(|| { + use rand::Rng as _; + + let mut rng = rand::thread_rng(); + rng.gen::() + }); + + let picture_id = PictureId::new(settings.picture_id_mode, picture_id_offset); + *self.picture_id.lock().unwrap() = picture_id; + + Ok(()) + } + + fn stop(&self) -> Result<(), gst::ErrorMessage> { + *self.picture_id.lock().unwrap() = None; + + Ok(()) + } + + fn set_sink_caps(&self, caps: &gst::Caps) -> bool { + gst::debug!(CAT, imp: self, "received caps {caps:?}"); + + let caps_builder = gst::Caps::builder("application/x-rtp") + .field("media", "video") + .field("clock-rate", 90_000i32) + .field( + "encoding-name", + gst::List::new(["VP9", "VP9-DRAFT-IETF-01"]), + ); + + self.obj().set_src_caps(&caps_builder.build()); + + true + } + + fn negotiate(&self, mut src_caps: gst::Caps) { + // Fixate the encoding-name with preference to "VP9" + + src_caps.truncate(); + { + let src_caps = src_caps.get_mut().unwrap(); + let s = src_caps.structure_mut(0).unwrap(); + s.fixate_field_str("encoding-name", "VP9"); + } + + self.parent_negotiate(src_caps); + } + + fn handle_buffer( + &self, + buffer: &gst::Buffer, + id: u64, + ) -> Result { + let max_payload_size = self.obj().max_payload_size(); + + gst::trace!(CAT, imp: self, "received buffer of size {}", buffer.size()); + + let map = buffer.map_readable().map_err(|_| { + gst::element_imp_error!( + self, + gst::ResourceError::Read, + ["Failed to map buffer readable"] + ); + + gst::FlowError::Error + })?; + + // TODO: We assume 1 spatial and 1 temporal layer. Scalable VP9 streams are not really + // supported by GStreamer so far and require further design work. + // FIXME: We also assume that each buffer contains a single VP9 frame. The VP9 caps are + // misdesigned unfortunately and there's no enforced alignment so this could theoretically + // also contain a whole superframe. A receiver is likely not going to fail on this. + + let picture_id = *self.picture_id.lock().unwrap(); + + // For now we're only getting the keyframe information from the frame header. We could also + // get information like the frame size from here but it's optional in the RTP payload + // descriptor and only required for scalable streams. + // + // We parse the frame header for the keyframe information because upstream is not + // necessarily providing correctly parsed information. This is mostly for compatibility + // with `rtpvp9pay`. + let mut r = BitReader::endian(map.as_slice(), BigEndian); + let key_frame = match r.parse::() { + Ok(frame_header) => { + gst::trace!(CAT, imp: self, "Parsed frame header: {frame_header:?}"); + // show_existing_frame assumes that there is an existing frame to show so this is + // clearly not a keyframe + frame_header.is_keyframe.unwrap_or(false) + } + Err(err) => { + gst::trace!(CAT, imp: self, "Failed parsing frame header: {err:?}"); + !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) + } + }; + + let mut first = true; + let mut data = map.as_slice(); + while !data.is_empty() { + let mut payload_descriptor = PayloadDescriptor { + picture_id, + layer_index: None, + inter_picture_predicted_frame: !key_frame, + flexible_mode: false, + reference_indices: Default::default(), + start_of_frame: first, + end_of_frame: false, // reset later + scalability_structure: None, + not_reference_frame_for_upper_layers: true, + }; + + let payload_descriptor_size = payload_descriptor.size().map_err(|err| { + gst::error!(CAT, imp: self, "Failed to write payload descriptor: {err:?}"); + gst::FlowError::Error + })?; + let overhead = payload_descriptor_size; + let payload_size = (max_payload_size as usize) + .checked_sub(overhead + 1) + .ok_or_else(|| { + gst::error!(CAT, imp: self, "Too small MTU configured for stream"); + gst::element_imp_error!( + self, + gst::LibraryError::Settings, + ["Too small MTU configured for stream"] + ); + gst::FlowError::Error + })? + + 1; + let payload_size = cmp::min(payload_size, data.len()); + + payload_descriptor.end_of_frame = data.len() == payload_size; + + gst::trace!( + CAT, + imp: self, + "Writing packet with payload descriptor {payload_descriptor:?} and payload size {payload_size}", + ); + + let mut payload_descriptor_buffer = + SmallVec::<[u8; 256]>::with_capacity(payload_descriptor_size); + let mut w = ByteWriter::endian(&mut payload_descriptor_buffer, BigEndian); + w.build::(&payload_descriptor) + .map_err(|err| { + gst::error!(CAT, imp: self, "Failed to write payload descriptor: {err:?}"); + gst::FlowError::Error + })?; + assert_eq!(payload_descriptor_buffer.len(), payload_descriptor_size); + + self.obj().queue_packet( + id.into(), + rtp_types::RtpPacketBuilder::new() + .marker_bit(data.len() == payload_size) + .payload(payload_descriptor_buffer.as_slice()) + .payload(&data[..payload_size]), + )?; + + data = &data[payload_size..]; + first = false; + } + + let next_picture_id = picture_id.map(PictureId::increment); + *self.picture_id.lock().unwrap() = next_picture_id; + + Ok(gst::FlowSuccess::Ok) + } +} diff --git a/net/rtp/src/vp9/pay/mod.rs b/net/rtp/src/vp9/pay/mod.rs new file mode 100644 index 00000000..dd874175 --- /dev/null +++ b/net/rtp/src/vp9/pay/mod.rs @@ -0,0 +1,45 @@ +// +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +pub mod imp; + +glib::wrapper! { + pub struct RtpVp9Pay(ObjectSubclass) + @extends crate::basepay::RtpBasePay2, gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + #[cfg(feature = "doc")] + { + PictureIdMode::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); + } + + gst::Element::register( + Some(plugin), + "rtpvp9pay2", + gst::Rank::MARGINAL, + RtpVp9Pay::static_type(), + ) +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq, glib::Enum, Default)] +#[enum_type(name = "GstRtpVp9Pay2PictureIdMode")] +#[repr(i32)] +pub enum PictureIdMode { + #[default] + #[enum_value(name = "No Picture ID", nick = "none")] + None, + #[enum_value(name = "7-bit PictureID", nick = "7-bit")] + SevenBit, + #[enum_value(name = "15-bit Picture ID", nick = "15-bit")] + FifteenBit, +} diff --git a/net/rtp/src/vp9/payload_descriptor.rs b/net/rtp/src/vp9/payload_descriptor.rs new file mode 100644 index 00000000..81b944b9 --- /dev/null +++ b/net/rtp/src/vp9/payload_descriptor.rs @@ -0,0 +1,560 @@ +// +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use std::io; + +use anyhow::{bail, Context as _}; +use bitstream_io::{ + BigEndian, ByteWrite as _, ByteWriter, FromByteStream, FromByteStreamWith, ToByteStream, + ToByteStreamWith, +}; +use smallvec::SmallVec; + +const I_BIT: u8 = 0b1000_0000; +const P_BIT: u8 = 0b0100_0000; +const L_BIT: u8 = 0b0010_0000; +const F_BIT: u8 = 0b0001_0000; +const B_BIT: u8 = 0b0000_1000; +const E_BIT: u8 = 0b0000_0100; +const V_BIT: u8 = 0b0000_0010; +const Z_BIT: u8 = 0b0000_0001; +const N_BIT: u8 = 0b0000_0001; +const M_BIT: u8 = 0b1000_0000; + +#[derive(Debug, Clone)] +pub struct PayloadDescriptor { + pub picture_id: Option, + pub layer_index: Option, + pub inter_picture_predicted_frame: bool, + pub flexible_mode: bool, + pub reference_indices: SmallVec<[u8; 3]>, + pub start_of_frame: bool, + pub end_of_frame: bool, + pub scalability_structure: Option, + pub not_reference_frame_for_upper_layers: bool, +} + +impl PayloadDescriptor { + pub fn size(&self) -> Result { + #[derive(Default)] + struct Counter(usize); + + impl io::Write for Counter { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.0 += buf.len(); + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + } + + let mut counter = Counter::default(); + let mut w = ByteWriter::endian(&mut counter, BigEndian); + w.build::(self)?; + + Ok(counter.0) + } +} + +impl FromByteStream for PayloadDescriptor { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> Result + where + Self: Sized, + { + let flags = r.read::().context("flags")?; + + let picture_id = if (flags & I_BIT) != 0 { + Some(r.parse::().context("picture_id")?) + } else { + None + }; + + let inter_picture_predicted_frame = (flags & P_BIT) != 0; + + let flexible_mode = (flags & F_BIT) != 0; + + let layer_index = if (flags & L_BIT) != 0 { + Some( + r.parse_with::(&flexible_mode) + .context("layer_index")?, + ) + } else { + None + }; + + let mut reference_indices = SmallVec::default(); + if inter_picture_predicted_frame && flexible_mode { + for n in 0..3 { + let p_diff = r.read::().context("p_diff")?; + if n == 3 && (p_diff & N_BIT) != 0 { + bail!("More than 3 reference indices"); + } + reference_indices.push(p_diff >> 1); + if (p_diff & N_BIT) == 0 { + break; + } + } + } + + let start_of_frame = (flags & B_BIT) != 0; + let end_of_frame = (flags & E_BIT) != 0; + + let not_reference_frame_for_upper_layers = (flags & Z_BIT) != 0; + + let scalability_structure = if (flags & V_BIT) != 0 { + Some( + r.parse::() + .context("scalability_structure")?, + ) + } else { + None + }; + + Ok(PayloadDescriptor { + picture_id, + layer_index, + inter_picture_predicted_frame, + flexible_mode, + reference_indices, + start_of_frame, + end_of_frame, + scalability_structure, + not_reference_frame_for_upper_layers, + }) + } +} + +impl ToByteStream for PayloadDescriptor { + type Error = anyhow::Error; + + fn to_writer(&self, w: &mut W) -> Result<(), Self::Error> + where + Self: Sized, + { + if self.reference_indices.len() > 3 { + bail!("Too many reference indices"); + } + if self.inter_picture_predicted_frame + && self.flexible_mode + && self.reference_indices.is_empty() + { + bail!("Reference indices required"); + } + + let flags = if self.picture_id.is_some() { I_BIT } else { 0 } + | if self.inter_picture_predicted_frame { + P_BIT + } else { + 0 + } + | if self.flexible_mode { F_BIT } else { 0 } + | if self.layer_index.is_some() { L_BIT } else { 0 } + | if self.start_of_frame { B_BIT } else { 0 } + | if self.end_of_frame { E_BIT } else { 0 } + | if self.not_reference_frame_for_upper_layers { + Z_BIT + } else { + 0 + } + | if self.scalability_structure.is_some() { + V_BIT + } else { + 0 + }; + + w.write::(flags).context("flags")?; + + if let Some(picture_id) = self.picture_id { + w.build::(&picture_id).context("picture_id")?; + } + + if let Some(ref layer_index) = self.layer_index { + w.build_with::(layer_index, &self.flexible_mode) + .context("layer_index")?; + } + + for (i, reference_index) in self.reference_indices.iter().enumerate() { + if *reference_index > 0b0111_1111 { + bail!("Too high reference index"); + } + + let b = if i == self.reference_indices.len() - 1 { + N_BIT + } else { + 0 + } | *reference_index; + + w.write::(b).context("reference_index")?; + } + + if let Some(ref scalability_structure) = self.scalability_structure { + w.build::(scalability_structure) + .context("scalability_structure")?; + } + + Ok(()) + } +} + +#[derive(Debug, Clone, Copy)] +pub enum PictureId { + SevenBit(u8), + FifteenBit(u16), +} + +impl PictureId { + pub fn new(mode: super::pay::PictureIdMode, v: u16) -> Option { + match mode { + super::pay::PictureIdMode::None => None, + super::pay::PictureIdMode::SevenBit => Some(PictureId::SevenBit((v & 0x7f) as u8)), + super::pay::PictureIdMode::FifteenBit => Some(PictureId::FifteenBit(v & 0x7fff)), + } + } + + pub fn increment(self) -> Self { + match self { + PictureId::SevenBit(v) => PictureId::SevenBit((v + 1) & 0x7f), + PictureId::FifteenBit(v) => PictureId::FifteenBit((v + 1) & 0x7fff), + } + } + + pub fn update_mode(self, mode: super::pay::PictureIdMode) -> Self { + match (self, mode) { + (_, super::pay::PictureIdMode::None) => self, + (PictureId::SevenBit(_), super::pay::PictureIdMode::SevenBit) => self, + (PictureId::FifteenBit(_), super::pay::PictureIdMode::FifteenBit) => self, + (PictureId::SevenBit(v), super::pay::PictureIdMode::FifteenBit) => { + PictureId::FifteenBit(v as u16) + } + (PictureId::FifteenBit(v), super::pay::PictureIdMode::SevenBit) => { + PictureId::SevenBit((v & 0x7f) as u8) + } + } + } +} + +impl From for u16 { + fn from(value: PictureId) -> Self { + match value { + PictureId::SevenBit(v) => v as u16, + PictureId::FifteenBit(v) => v, + } + } +} + +impl PartialEq for PictureId { + fn eq(&self, other: &Self) -> bool { + match (*self, *other) { + (PictureId::SevenBit(s), PictureId::SevenBit(o)) => s == o, + (PictureId::SevenBit(s), PictureId::FifteenBit(o)) => s == (o & 0x7f) as u8, + (PictureId::FifteenBit(s), PictureId::SevenBit(o)) => (s & 0x7f) as u8 == o, + (PictureId::FifteenBit(s), PictureId::FifteenBit(o)) => s == o, + } + } +} + +impl FromByteStream for PictureId { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> Result + where + Self: Sized, + { + let pid = r.read::().context("picture_id")?; + if (pid & M_BIT) != 0 { + let ext_pid = r.read::().context("extended_pid")?; + Ok(PictureId::FifteenBit( + (((pid & !M_BIT) as u16) << 8) | ext_pid as u16, + )) + } else { + Ok(PictureId::SevenBit(pid)) + } + } +} + +impl ToByteStream for PictureId { + type Error = anyhow::Error; + + fn to_writer(&self, w: &mut W) -> Result<(), Self::Error> + where + Self: Sized, + { + match self { + PictureId::SevenBit(v) => w.write::(*v).context("picture_id"), + PictureId::FifteenBit(v) => { + w.write::(M_BIT | (v >> 8) as u8) + .context("picture_id")?; + w.write::((v & 0b1111_1111) as u8) + .context("extended_pid") + } + } + } +} + +#[derive(Debug, Clone)] +pub struct LayerIndex { + pub temporal_layer_id: u8, + pub switching_point: bool, + pub spatial_layer_id: u8, + pub inter_layer_dependency_used: bool, + pub temporal_layer_zero_index: Option, +} + +impl<'a> FromByteStreamWith<'a> for LayerIndex { + type Error = anyhow::Error; + /// Flexible mode? + type Context = bool; + + fn from_reader( + r: &mut R, + flexible_mode: &Self::Context, + ) -> Result + where + Self: Sized, + { + let layer_index = r.read::().context("layer_index")?; + + let temporal_layer_id = layer_index >> 5; + let switching_point = (layer_index >> 4) & 0b0001 != 0; + let spatial_layer_id = (layer_index >> 1) & 0b0111; + let inter_layer_dependency_used = layer_index & 0b0001 != 0; + + let temporal_layer_zero_index = if !flexible_mode { + Some(r.read::().context("temporal_layer_zero_index")?) + } else { + None + }; + + Ok(LayerIndex { + temporal_layer_id, + switching_point, + spatial_layer_id, + inter_layer_dependency_used, + temporal_layer_zero_index, + }) + } +} + +impl<'a> ToByteStreamWith<'a> for LayerIndex { + type Error = anyhow::Error; + + /// Flexible mode? + type Context = bool; + fn to_writer( + &self, + w: &mut W, + flexible_mode: &Self::Context, + ) -> Result<(), Self::Error> + where + Self: Sized, + { + { + if self.temporal_layer_id > 0b111 { + bail!("Too high temporal layer id"); + } + if self.spatial_layer_id > 0b111 { + bail!("Too high spatial layer id"); + } + + if *flexible_mode && self.temporal_layer_zero_index.is_some() { + bail!("temporal_layer_zero_index can't be provided in flexible mode"); + } else if !*flexible_mode && self.temporal_layer_zero_index.is_none() { + bail!("temporal_layer_zero_index must be provided in non-flexible mode"); + } + + let b = (self.temporal_layer_id << 5) + | (u8::from(self.switching_point) << 4) + | (self.spatial_layer_id << 1) + | u8::from(self.inter_layer_dependency_used); + w.write::(b).context("layer_index")?; + + if let Some(temporal_layer_zero_index) = self.temporal_layer_zero_index { + w.write::(temporal_layer_zero_index) + .context("temporal_layer_zero_index")?; + } + + Ok(()) + } + } +} + +#[derive(Debug, Clone)] +pub struct ScalabilityStructure { + pub num_spatial_layers: u8, + pub spatial_layer_frame_resolutions: SmallVec<[(u16, u16); 8]>, + pub picture_description: SmallVec<[PictureDescription; 16]>, +} + +impl FromByteStream for ScalabilityStructure { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> Result + where + Self: Sized, + { + const Y_FLAG: u8 = 0b0001_0000; + const G_FLAG: u8 = 0b0000_1000; + + let b = r.read::().context("scalability_structure")?; + + let num_spatial_layers = (b >> 5) + 1; + + let mut spatial_layer_frame_resolutions = SmallVec::default(); + if (b & Y_FLAG) != 0 { + for _ in 0..num_spatial_layers { + let width = r.read_as::().context("width")?; + let height = r.read_as::().context("height")?; + + spatial_layer_frame_resolutions.push((width, height)); + } + } + + let num_pictures_in_group = if (b & G_FLAG) != 0 { + Some(r.read::().context("num_pictures_in_group")?) + } else { + None + }; + + let mut picture_description = SmallVec::new(); + if let Some(num_pictures_in_group) = num_pictures_in_group { + picture_description.reserve(num_pictures_in_group as usize); + for _ in 0..num_pictures_in_group { + picture_description.push( + r.parse::() + .context("picture_description")?, + ); + } + } + + Ok(ScalabilityStructure { + num_spatial_layers, + spatial_layer_frame_resolutions, + picture_description, + }) + } +} + +impl ToByteStream for ScalabilityStructure { + type Error = anyhow::Error; + + fn to_writer(&self, w: &mut W) -> Result<(), Self::Error> + where + Self: Sized, + { + const Y_FLAG: u8 = 0b0001_0000; + const G_FLAG: u8 = 0b0000_1000; + + if self.num_spatial_layers == 0 { + bail!("Zero spatial layers not allowed"); + } + if self.num_spatial_layers - 1 > 0b111 { + bail!("Too many spatial layers"); + } + if self.picture_description.len() > 255 { + bail!("Too many picture descriptions"); + } + + let b = ((self.num_spatial_layers - 1) << 5) + | if !self.spatial_layer_frame_resolutions.is_empty() { + Y_FLAG + } else { + 0 + } + | if !self.picture_description.is_empty() { + G_FLAG + } else { + 0 + }; + w.write::(b).context("scalability_structure")?; + + for (width, height) in &self.spatial_layer_frame_resolutions { + w.write::(*width).context("width")?; + w.write::(*height).context("height")?; + } + + if !self.picture_description.is_empty() { + w.write::(self.picture_description.len() as u8) + .context("num_pictures_in_group")?; + } + + for picture_description in &self.picture_description { + w.build::(picture_description) + .context("picture_description")?; + } + + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub struct PictureDescription { + pub temporal_layer_id: u8, + pub switching_point: bool, + pub reference_indices: SmallVec<[u8; 3]>, +} + +impl FromByteStream for PictureDescription { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> Result + where + Self: Sized, + { + let b = r.read::().context("picture_description")?; + + let temporal_layer_id = b >> 5; + let switching_point = ((b >> 4) & 0b0001) != 0; + + let num_reference_indices = (b >> 2) & 0b0011; + let mut reference_indices = SmallVec::default(); + for _ in 0..num_reference_indices { + reference_indices.push(r.read::().context("reference_indices")?); + } + + Ok(PictureDescription { + temporal_layer_id, + switching_point, + reference_indices, + }) + } +} + +impl ToByteStream for PictureDescription { + type Error = anyhow::Error; + + fn to_writer(&self, w: &mut W) -> Result<(), Self::Error> + where + Self: Sized, + { + if self.temporal_layer_id > 0b111 { + bail!("Too high temporal layer id"); + } + + if self.reference_indices.len() > 3 { + bail!("Too many reference indices"); + } + + let b = (self.temporal_layer_id << 5) + | (u8::from(self.switching_point) << 4) + | ((self.reference_indices.len() as u8) << 2); + + w.write::(b).context("picture_description")?; + + for reference_index in &self.reference_indices { + w.write::(*reference_index) + .context("reference_indices")?; + } + + Ok(()) + } +} diff --git a/net/rtp/src/vp9/tests.rs b/net/rtp/src/vp9/tests.rs new file mode 100644 index 00000000..c2616bf8 --- /dev/null +++ b/net/rtp/src/vp9/tests.rs @@ -0,0 +1,220 @@ +// +// Copyright (C) 2024 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use crate::tests::{run_test_pipeline, ExpectedBuffer, ExpectedPacket, Source}; + +fn init() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + gst::init().unwrap(); + crate::plugin_register_static().expect("rtpvp9 test"); + }); +} + +#[test] +fn test_vp9() { + init(); + + // Generates encoded frames of sizes 1342 (key), 96, 41, 55, 41 + let src = "videotestsrc num-buffers=5 pattern=gradient ! video/x-raw,format=I420,width=1920,height=1080,framerate=25/1 ! vp9enc target-bitrate=4000000"; + let pay = "rtpvp9pay2 mtu=1200 picture-id-mode=7-bit"; + let depay = "rtpvp9depay2"; + + let expected_pay = vec![ + vec![ + // First frame is split into two packets + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .flags(gst::BufferFlags::DISCONT) + .pt(96) + .rtp_time(0) + .marker_bit(false) + .size(1200) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .flags(gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(0) + .marker_bit(true) + .size(170) + .build(), + ], + // Second and following frames + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(40)) + .flags(gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(3_600) + .marker_bit(true) + .size(110) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(80)) + .flags(gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(7_200) + .marker_bit(true) + .size(55) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(120)) + .flags(gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(10_800) + .marker_bit(true) + .size(69) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(160)) + .flags(gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(14_400) + .marker_bit(true) + .size(55) + .build()], + ]; + + let expected_depay = vec![ + // One buffer per frame + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .size(1342) + .flags(gst::BufferFlags::DISCONT | gst::BufferFlags::MARKER) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(40)) + .size(96) + .flags(gst::BufferFlags::MARKER | gst::BufferFlags::DELTA_UNIT) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(80)) + .size(41) + .flags(gst::BufferFlags::MARKER | gst::BufferFlags::DELTA_UNIT) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(120)) + .size(55) + .flags(gst::BufferFlags::MARKER | gst::BufferFlags::DELTA_UNIT) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(160)) + .size(41) + .flags(gst::BufferFlags::MARKER | gst::BufferFlags::DELTA_UNIT) + .build()], + ]; + + run_test_pipeline(Source::Bin(src), pay, depay, expected_pay, expected_depay); +} + +#[test] +fn test_vp9_small_mtu() { + init(); + + // Generates encoded frames of sizes 1342 (key), 96, 41, 55, 41 + let src = "videotestsrc num-buffers=5 pattern=gradient ! video/x-raw,format=I420,width=1920,height=1080,framerate=25/1 ! vp9enc target-bitrate=4000000"; + let pay = "rtpvp9pay2 mtu=500 picture-id-mode=15-bit"; + let depay = "rtpvp9depay2"; + + let expected_pay = vec![ + vec![ + // First frame is split into three packets + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .flags(gst::BufferFlags::DISCONT) + .pt(96) + .rtp_time(0) + .marker_bit(false) + .size(500) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(0) + .marker_bit(false) + .size(500) + .build(), + ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .flags(gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(0) + .marker_bit(true) + .size(387) + .build(), + ], + // Second and following frames + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(40)) + .flags(gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(3_600) + .marker_bit(true) + .size(111) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(80)) + .flags(gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(7_200) + .marker_bit(true) + .size(56) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(120)) + .flags(gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(10_800) + .marker_bit(true) + .size(70) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(160)) + .flags(gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(14_400) + .marker_bit(true) + .size(56) + .build()], + ]; + + let expected_depay = vec![ + // One buffer per frame + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .size(1342) + .flags(gst::BufferFlags::DISCONT | gst::BufferFlags::MARKER) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(40)) + .size(96) + .flags(gst::BufferFlags::MARKER | gst::BufferFlags::DELTA_UNIT) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(80)) + .size(41) + .flags(gst::BufferFlags::MARKER | gst::BufferFlags::DELTA_UNIT) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(120)) + .size(55) + .flags(gst::BufferFlags::MARKER | gst::BufferFlags::DELTA_UNIT) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(160)) + .size(41) + .flags(gst::BufferFlags::MARKER | gst::BufferFlags::DELTA_UNIT) + .build()], + ]; + + run_test_pipeline(Source::Bin(src), pay, depay, expected_pay, expected_depay); +}