From 8d73b5008af0a3855d5655c4a14dcfd275dfbd37 Mon Sep 17 00:00:00 2001 From: Vivienne Watermeier Date: Wed, 11 May 2022 15:16:36 +0200 Subject: [PATCH] Add RTP de/payloader elements for AV1 Fixes https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/881 --- Cargo.toml | 2 + docs/plugins/gst_plugins_cache.json | 66 +++ meson.build | 3 +- net/rtpav1/Cargo.toml | 45 ++ net/rtpav1/build.rs | 12 + net/rtpav1/src/common/aggr_header.rs | 99 ++++ net/rtpav1/src/common/error.rs | 98 ++++ net/rtpav1/src/common/integers.rs | 102 ++++ net/rtpav1/src/common/mod.rs | 23 + net/rtpav1/src/common/obu.rs | 418 ++++++++++++++ net/rtpav1/src/depay/imp.rs | 585 +++++++++++++++++++ net/rtpav1/src/depay/mod.rs | 35 ++ net/rtpav1/src/lib.rs | 32 ++ net/rtpav1/src/pay/imp.rs | 818 +++++++++++++++++++++++++++ net/rtpav1/src/pay/mod.rs | 35 ++ net/rtpav1/tests/rtpav1.rs | 224 ++++++++ 16 files changed, 2596 insertions(+), 1 deletion(-) create mode 100644 net/rtpav1/Cargo.toml create mode 100644 net/rtpav1/build.rs create mode 100644 net/rtpav1/src/common/aggr_header.rs create mode 100644 net/rtpav1/src/common/error.rs create mode 100644 net/rtpav1/src/common/integers.rs create mode 100644 net/rtpav1/src/common/mod.rs create mode 100644 net/rtpav1/src/common/obu.rs create mode 100644 net/rtpav1/src/depay/imp.rs create mode 100644 net/rtpav1/src/depay/mod.rs create mode 100644 net/rtpav1/src/lib.rs create mode 100644 net/rtpav1/src/pay/imp.rs create mode 100644 net/rtpav1/src/pay/mod.rs create mode 100644 net/rtpav1/tests/rtpav1.rs diff --git a/Cargo.toml b/Cargo.toml index 309e1d8e..da655f63 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ members = [ "net/hlssink3", "net/onvif", "net/reqwest", + "net/rtpav1", "net/aws", "net/webrtc-http", "utils/fallbackswitch", @@ -53,6 +54,7 @@ default-members = [ "net/onvif", "net/raptorq", "net/reqwest", + "net/rtpav1", "net/aws", "net/webrtc-http", "utils/fallbackswitch", diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index e9109a08..f38a4b16 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -5004,6 +5004,72 @@ "tracers": {}, "url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" }, + "rtpav1": { + "description": "AV1 RTP (De)payloader Plugins", + "elements": { + "rtpav1depay": { + "author": "Vivienne Watermeier ", + "description": "Depayload AV1 from RTP packets", + "hierarchy": [ + "GstRtpAv1Depay", + "GstRTPBaseDepayload", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Codec/Depayloader/Network/RTP", + "long-name": "RTP AV1 Depayloader", + "pad-templates": { + "sink": { + "caps": "application/x-rtp:\n media: video\n payload: [ 96, 127 ]\n clock-rate: 90000\n encoding-name: AV1\n", + "direction": "sink", + "presence": "always" + }, + "src": { + "caps": "video/x-av1:\n parsed: true\n stream-format: obu-stream\n alignment: tu\n", + "direction": "src", + "presence": "always" + } + }, + "rank": "marginal" + }, + "rtpav1pay": { + "author": "Vivienne Watermeier ", + "description": "Payload AV1 as RTP packets", + "hierarchy": [ + "GstRtpAv1Pay", + "GstRTPBasePayload", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Codec/Payloader/Network/RTP", + "long-name": "RTP AV1 payloader", + "pad-templates": { + "sink": { + "caps": "video/x-av1:\n parsed: true\n stream-format: obu-stream\n alignment: obu\n", + "direction": "sink", + "presence": "always" + }, + "src": { + "caps": "application/x-rtp:\n media: video\n payload: [ 96, 127 ]\n clock-rate: 90000\n encoding-name: AV1\n", + "direction": "src", + "presence": "always" + } + }, + "rank": "marginal" + } + }, + "filename": "gstrtpav1", + "license": "MPL", + "other-types": {}, + "package": "gst-plugin-rtpav1", + "source": "gst-plugin-rtpav1", + "tracers": {}, + "url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" + }, "sodium": { "description": "libsodium-based file encryption and decryption", "elements": { diff --git a/meson.build b/meson.build index 0e985479..4f6487e9 100644 --- a/meson.build +++ b/meson.build @@ -67,7 +67,8 @@ plugins = { 'gst-plugin-textahead': 'libgsttextahead', 'gst-plugin-onvif': 'libgstrsonvif', 'gst-plugin-tracers': 'libgstrstracers', - 'gst-plugin-webrtchttp': 'libgstwebrtchttp' + 'gst-plugin-webrtchttp': 'libgstwebrtchttp', + 'gst-plugin-rtpav1': 'libgstrtpav1', } extra_env = {} diff --git a/net/rtpav1/Cargo.toml b/net/rtpav1/Cargo.toml new file mode 100644 index 00000000..4ed720cc --- /dev/null +++ b/net/rtpav1/Cargo.toml @@ -0,0 +1,45 @@ +[package] +name = "gst-plugin-rtpav1" +version = "0.1.0" +authors = ["Vivienne Watermeier "] +repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" +license = "MPL-2.0" +edition = "2021" +description = "AV1 RTP (De)payloader Plugins" +rust-version = "1.63" + +[dependencies] +bitstream-io = "1.3" +gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] } +gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] } +gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"]} +once_cell = "1.0" + +[dev-dependencies] +gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] } + +[build-dependencies] +gst-plugin-version-helper = { git = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" } + +[lib] +name = "gstrtpav1" +crate-type = ["cdylib", "rlib"] +path = "src/lib.rs" + +[features] +static = [] +capi = [] +doc = [] + +[package.metadata.capi] +min_version = "0.8.0" + +[package.metadata.capi.header] +enabled = false + +[package.metadata.capi.library] +install_subdir = "gstreamer-1.0" +versioning = false + +[package.metadata.capi.pkg_config] +requires_private = "gstreamer-1.0, gstreamer-base-1.0, gstreamer-rtp-1.0, gobject-2.0, glib-2.0, gmodule-2.0" diff --git a/net/rtpav1/build.rs b/net/rtpav1/build.rs new file mode 100644 index 00000000..abb3ea50 --- /dev/null +++ b/net/rtpav1/build.rs @@ -0,0 +1,12 @@ +// +// Copyright (C) 2022 Vivienne Watermeier +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +fn main() { + gst_plugin_version_helper::info() +} diff --git a/net/rtpav1/src/common/aggr_header.rs b/net/rtpav1/src/common/aggr_header.rs new file mode 100644 index 00000000..16db80fa --- /dev/null +++ b/net/rtpav1/src/common/aggr_header.rs @@ -0,0 +1,99 @@ +// +// Copyright (C) 2022 Vivienne Watermeier +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +#[derive(Default, Debug, PartialEq, Eq, Clone, Copy)] +pub struct AggregationHeader { + pub leading_fragment: bool, + pub trailing_fragment: bool, + pub obu_count: Option, + pub start_of_seq: bool, +} + +impl From for AggregationHeader { + fn from(byte: u8) -> Self { + Self { + leading_fragment: byte & (1 << 7) != 0, + trailing_fragment: byte & (1 << 6) != 0, + obu_count: match (byte >> 4) & 0b11 { + 0 => None, + n => Some(n), + }, + start_of_seq: byte & (1 << 3) != 0, + } + } +} + +impl From<&[u8; 1]> for AggregationHeader { + fn from(slice: &[u8; 1]) -> Self { + AggregationHeader::from(slice[0]) + } +} + +impl From for u8 { + fn from(aggr: AggregationHeader) -> Self { + let mut byte = 0; + + byte |= (aggr.leading_fragment as u8) << 7; + byte |= (aggr.trailing_fragment as u8) << 6; + byte |= (aggr.start_of_seq as u8) << 3; + + if let Some(n) = aggr.obu_count { + assert!(n < 0b100, "OBU count out of range"); + byte |= n << 4; + } + + byte + } +} + +#[cfg(test)] +mod tests { + use crate::common::*; + + const HEADERS: [(u8, AggregationHeader); 3] = [ + ( + 0b01011000, + AggregationHeader { + leading_fragment: false, + trailing_fragment: true, + obu_count: Some(1), + start_of_seq: true, + }, + ), + ( + 0b11110000, + AggregationHeader { + leading_fragment: true, + trailing_fragment: true, + obu_count: Some(3), + start_of_seq: false, + }, + ), + ( + 0b10000000, + AggregationHeader { + leading_fragment: true, + trailing_fragment: false, + obu_count: None, + start_of_seq: false, + }, + ), + ]; + + #[test] + fn test_aggr_header() { + for (idx, (byte, header)) in HEADERS.into_iter().enumerate() { + println!("running test {}...", idx); + + assert_eq!(byte, header.into()); + assert_eq!(header, byte.into()); + assert_eq!(header, (&[byte; 1]).into()); + } + } +} diff --git a/net/rtpav1/src/common/error.rs b/net/rtpav1/src/common/error.rs new file mode 100644 index 00000000..37a92180 --- /dev/null +++ b/net/rtpav1/src/common/error.rs @@ -0,0 +1,98 @@ +// +// Copyright (C) 2022 Vivienne Watermeier +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +macro_rules! err_flow { + ($element:ident, read, $msg:literal) => { + |err| { + gst::element_error!($element, ResourceError::Read, [$msg, err]); + FlowError::Error + } + }; + ($element:ident, write, $msg:literal) => { + |err| { + gst::element_error!($element, ResourceError::Write, [$msg, err]); + FlowError::Error + } + }; + + ($element:ident, buf_read) => { + err_flow!($element, read, "Failed to read buffer: {}") + }; + ($element:ident, aggr_header_write) => { + err_flow!( + $element, + write, + "Failed to write aggregation header to the payload: {}" + ) + }; + ($element:ident, leb_write) => { + err_flow!( + $element, + write, + "Failed to write leb128 size field to the payload: {}" + ) + }; + ($element:ident, obu_write) => { + err_flow!( + $element, + write, + "Failed to write OBU bytes to the payload: {}" + ) + }; + ($element:ident, outbuf_alloc) => { + err_flow!($element, write, "Failed to allocate output buffer: {}") + }; +} + +macro_rules! err_opt { + ($element:ident, read, $msg:literal) => { + |err| { + gst::element_error!($element, ResourceError::Read, [$msg, err]); + Option::<()>::None + } + }; + ($element:ident, write, $msg:literal) => { + |err| { + gst::element_error!($element, ResourceError::Write, [$msg, err]); + Option::<()>::None + } + }; + + ($element:ident, buf_alloc) => { + err_opt!($element, write, "Failed to allocate new buffer: {}") + }; + + ($element:ident, payload_buf) => { + err_opt!($element, read, "Failed to get RTP payload buffer: {}") + }; + ($element:ident, payload_map) => { + err_opt!($element, read, "Failed to map payload as readable: {}") + }; + ($element:ident, buf_take) => { + err_opt!($element, read, "Failed to take buffer from adapter: {}") + }; + ($element:ident, aggr_header_read) => { + err_opt!($element, read, "Failed to read aggregation header: {}") + }; + ($element:ident, leb_read) => { + err_opt!($element, read, "Failed to read leb128 size field: {}") + }; + ($element:ident, leb_write) => { + err_opt!($element, read, "Failed to write leb128 size field: {}") + }; + ($element:ident, obu_read) => { + err_opt!($element, read, "Failed to read OBU header: {}") + }; + ($element:ident, buf_read) => { + err_opt!($element, read, "Failed to read RTP buffer: {}") + }; +} + +pub(crate) use err_flow; +pub(crate) use err_opt; diff --git a/net/rtpav1/src/common/integers.rs b/net/rtpav1/src/common/integers.rs new file mode 100644 index 00000000..e084e9d1 --- /dev/null +++ b/net/rtpav1/src/common/integers.rs @@ -0,0 +1,102 @@ +// +// Copyright (C) 2022 Vivienne Watermeier +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +#![allow(non_camel_case_types)] + +use bitstream_io::{BitRead, BitReader, BitWrite, BitWriter, Endianness}; +use std::io::{self, Read, Seek, Write}; + +pub fn parse_leb128(reader: &mut BitReader) -> io::Result +where + R: Read + Seek, + E: Endianness, +{ + let mut value = 0; + + for i in 0..8 { + let byte = reader.read::(8)?; + value |= (byte & 0x7f) << (i * 7); + if byte & 0x80 == 0 { + break; + } + } + + reader.byte_align(); + Ok(value) +} + +pub fn write_leb128(writer: &mut BitWriter, mut value: u32) -> io::Result<()> +where + W: Write + Seek, + E: Endianness, +{ + loop { + writer.write_bit(value > 0x7f)?; + writer.write(7, value & 0x7f)?; + value >>= 7; + if value == 0 { + writer.byte_align()?; + return Ok(()); + } + } +} + +pub fn leb128_size(mut value: u32) -> u8 { + let mut bytes = 1; + + loop { + value >>= 7; + if value == 0 { + return bytes; + } + bytes += 1; + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bitstream_io::{BigEndian, BitReader, BitWrite, BitWriter}; + use std::io::Cursor; + + #[test] + fn test_leb128() { + const TEST_CASES: [(u32, &[u8]); 8] = [ + (0, &[0x00]), + (1, &[0x01]), + (2, &[0x02]), + (3, &[0x03]), + (123, &[0x7b]), + (2468, &[0xa4, 0x13]), + (987654, &[0x86, 0xa4, 0x3c]), + (u32::MAX, &[0xff, 0xff, 0xff, 0xff, 0x0f]), + ]; + + for (value, encoding) in TEST_CASES { + println!("testing: value={}", value); + + let mut reader = BitReader::endian(Cursor::new(&encoding), BigEndian); + assert_eq!(value, parse_leb128(&mut reader).unwrap()); + assert_eq!( + encoding.len() as u64 * 8, + reader.position_in_bits().unwrap() + ); + + let mut writer = BitWriter::endian(Cursor::new(Vec::new()), BigEndian); + write_leb128(&mut writer, value).unwrap(); + writer.byte_align().unwrap(); + + let mut data = writer.into_writer(); + data.set_position(0); + + let mut reader = BitReader::endian(data, BigEndian); + assert_eq!(value, parse_leb128(&mut reader).unwrap()); + } + } +} diff --git a/net/rtpav1/src/common/mod.rs b/net/rtpav1/src/common/mod.rs new file mode 100644 index 00000000..53b30e95 --- /dev/null +++ b/net/rtpav1/src/common/mod.rs @@ -0,0 +1,23 @@ +// +// Copyright (C) 2022 Vivienne Watermeier +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use bitstream_io::BigEndian; + +pub const CLOCK_RATE: u32 = 90000; +pub const ENDIANNESS: BigEndian = BigEndian; + +mod aggr_header; +mod error; +mod integers; +mod obu; + +pub use aggr_header::*; +pub(crate) use error::*; +pub use integers::*; +pub use obu::*; diff --git a/net/rtpav1/src/common/obu.rs b/net/rtpav1/src/common/obu.rs new file mode 100644 index 00000000..dedaba07 --- /dev/null +++ b/net/rtpav1/src/common/obu.rs @@ -0,0 +1,418 @@ +// +// Copyright (C) 2022 Vivienne Watermeier +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use crate::common::{leb128_size, parse_leb128}; +use bitstream_io::{BitRead, BitReader, Endianness}; +use std::io::{self, Read, Seek}; + +#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)] +pub struct UnsizedObu { + pub obu_type: ObuType, + pub has_extension: bool, + pub temporal_id: u8, + pub spatial_id: u8, + pub header_len: u32, + /// indicates that only part of this OBU has been processed so far + pub is_fragment: bool, +} + +#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)] +pub struct SizedObu { + pub obu_type: ObuType, + pub has_extension: bool, + /// If the OBU header is followed by a leb128 size field. + pub has_size_field: bool, + pub temporal_id: u8, + pub spatial_id: u8, + /// size of the OBU payload in bytes. + /// This may refer to different sizes in different contexts, not always + /// to the entire OBU payload as it is in the AV1 bitstream. + pub size: u32, + /// the number of bytes the leb128 size field will take up + /// when written with write_leb128(). + /// This does not imply `has_size_field`, and does not necessarily match with + /// the length of the internal size field if present. + pub leb_size: u32, + pub header_len: u32, + /// indicates that only part of this OBU has been processed so far + pub is_fragment: bool, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ObuType { + Reserved, + SequenceHeader, + TemporalDelimiter, + FrameHeader, + TileGroup, + Metadata, + Frame, + RedundantFrameHeader, + TileList, + Padding, +} + +impl Default for ObuType { + fn default() -> Self { + Self::Reserved + } +} + +impl UnsizedObu { + pub fn parse(reader: &mut BitReader) -> io::Result + where + R: Read + Seek, + E: Endianness, + { + // check the forbidden bit + if reader.read_bit()? { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "forbidden bit in OBU header is set", + )); + } + + let obu_type = reader.read::(4)?.into(); + let has_extension = reader.read_bit()?; + + // make sure there is no size field + if reader.read_bit()? { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "did not expect size field", + )); + } + + // ignore the reserved bit + let _ = reader.read_bit()?; + + let (temporal_id, spatial_id) = if has_extension { + (reader.read::(3)?, reader.read::(2)?) + } else { + (0, 0) + }; + + reader.byte_align(); + + Ok(Self { + obu_type, + has_extension, + temporal_id, + spatial_id, + header_len: has_extension as u32 + 1, + is_fragment: false, + }) + } + + /// Convert to a `SizedObu` without internal size field and the given sizes. + pub fn as_sized(&self, size: u32, leb_size: u32) -> SizedObu { + SizedObu { + obu_type: self.obu_type, + has_extension: self.has_extension, + has_size_field: false, + temporal_id: self.temporal_id, + spatial_id: self.spatial_id, + size, + leb_size, + header_len: self.header_len, + is_fragment: self.is_fragment, + } + } +} + +impl SizedObu { + /// Parse an OBU header and size field. If the OBU is not expected to contain + /// a size field, but the size is known from external information, + /// parse as an `UnsizedObu` and use `to_sized`. + pub fn parse(reader: &mut BitReader) -> io::Result + where + R: Read + Seek, + E: Endianness, + { + // check the forbidden bit + if reader.read_bit()? { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "forbidden bit in OBU header is set", + )); + } + + let obu_type = reader.read::(4)?.into(); + let has_extension = reader.read_bit()?; + + // require a size field + if !reader.read_bit()? { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "expected a size field", + )); + } + + // ignore the reserved bit + let _ = reader.read_bit()?; + + let (temporal_id, spatial_id) = if has_extension { + (reader.read::(3)?, reader.read::(2)?) + } else { + (0, 0) + }; + + reader.byte_align(); + + let size = parse_leb128(reader)?; + let leb_size = leb128_size(size) as u32; + + Ok(Self { + obu_type, + has_extension, + has_size_field: true, + temporal_id, + spatial_id, + size, + leb_size, + header_len: has_extension as u32 + 1, + is_fragment: false, + }) + } + + /// The amount of bytes this OBU will take up, including the space needed for + /// its leb128 size field. + pub fn full_size(&self) -> u32 { + self.size + self.leb_size + self.header_len + } + + /// The amount of bytes this OBU will take up without a leb128 size field. + pub fn partial_size(&self) -> u32 { + self.size + self.header_len + } +} + +impl From for ObuType { + fn from(n: u8) -> Self { + assert!(n < 16); + + match n { + 1 => Self::SequenceHeader, + 2 => Self::TemporalDelimiter, + 3 => Self::FrameHeader, + 4 => Self::TileGroup, + 5 => Self::Metadata, + 6 => Self::Frame, + 7 => Self::RedundantFrameHeader, + 8 => Self::TileList, + 15 => Self::Padding, + _ => Self::Reserved, + } + } +} + +impl From for u8 { + fn from(ty: ObuType) -> Self { + match ty { + ObuType::Reserved => 0, + ObuType::SequenceHeader => 1, + ObuType::TemporalDelimiter => 2, + ObuType::FrameHeader => 3, + ObuType::TileGroup => 4, + ObuType::Metadata => 5, + ObuType::Frame => 6, + ObuType::RedundantFrameHeader => 7, + ObuType::TileList => 8, + ObuType::Padding => 15, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bitstream_io::{BigEndian, BitRead, BitReader}; + use once_cell::sync::Lazy; + use std::io::Cursor; + + #[allow(clippy::type_complexity)] + static OBUS: Lazy, u64, UnsizedObu, Vec)>> = Lazy::new(|| { + vec![ + ( + SizedObu { + obu_type: ObuType::TemporalDelimiter, + has_extension: false, + has_size_field: true, + temporal_id: 0, + spatial_id: 0, + size: 0, + leb_size: 1, + header_len: 1, + is_fragment: false, + }, + vec![0b0001_0010, 0b0000_0000], + 2, + UnsizedObu { + obu_type: ObuType::TemporalDelimiter, + has_extension: false, + temporal_id: 0, + spatial_id: 0, + header_len: 1, + is_fragment: false, + }, + vec![0b0001_0000], + ), + ( + SizedObu { + obu_type: ObuType::Padding, + has_extension: false, + has_size_field: true, + temporal_id: 0, + spatial_id: 0, + size: 10, + leb_size: 1, + header_len: 1, + is_fragment: false, + }, + vec![0b0111_1010, 0b0000_1010, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + 2, + UnsizedObu { + obu_type: ObuType::Padding, + has_extension: false, + temporal_id: 0, + spatial_id: 0, + header_len: 1, + is_fragment: false, + }, + vec![0b0111_1000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + ), + ( + SizedObu { + obu_type: ObuType::Frame, + has_extension: true, + has_size_field: true, + temporal_id: 4, + spatial_id: 3, + size: 5, + leb_size: 1, + header_len: 2, + is_fragment: false, + }, + vec![0b0011_0110, 0b1001_1000, 0b0000_0101, 1, 2, 3, 4, 5], + 3, + UnsizedObu { + obu_type: ObuType::Frame, + has_extension: true, + temporal_id: 4, + spatial_id: 3, + header_len: 2, + is_fragment: false, + }, + vec![0b0011_0100, 0b1001_1000, 1, 2, 3, 4, 5], + ), + ] + }); + + #[test] + fn test_parse() { + for (idx, (sized_obu, sized_bytes, expected_position, unsized_obu, unsized_bytes)) in + (*OBUS).iter().enumerate() + { + println!("running test {}...", idx); + { + println!(" parsing sized..."); + let mut reader = BitReader::endian(Cursor::new(&sized_bytes), BigEndian); + + assert_eq!(SizedObu::parse(&mut reader).unwrap(), *sized_obu); + assert!(reader.byte_aligned()); + assert_eq!(reader.into_reader().position(), *expected_position); + }; + { + println!(" parsing unsized..."); + let mut reader = BitReader::endian(Cursor::new(&unsized_bytes), BigEndian); + + assert_eq!(UnsizedObu::parse(&mut reader).unwrap(), *unsized_obu); + assert!(reader.byte_aligned()); + assert_eq!( + reader.into_reader().position(), + unsized_obu.header_len as u64 + ); + } + } + } + + #[test] + fn test_conversion() { + for (idx, (sized_obu, _, _, unsized_obu, _)) in (*OBUS).iter().enumerate() { + println!("running test {}...", idx); + assert_eq!( + unsized_obu.as_sized(sized_obu.size, sized_obu.leb_size), + SizedObu { + has_size_field: false, + ..*sized_obu + }, + ); + } + } + + #[test] + fn test_parse_rtp_obu() { + let obus = [ + ( + SizedObu { + obu_type: ObuType::TemporalDelimiter, + has_extension: false, + has_size_field: false, + temporal_id: 0, + spatial_id: 0, + size: 0, + leb_size: 1, + header_len: 1, + is_fragment: false, + }, + vec![0b0001_0000], + ), + ( + SizedObu { + obu_type: ObuType::Padding, + has_extension: false, + has_size_field: false, + temporal_id: 0, + spatial_id: 0, + size: 10, + leb_size: 1, + header_len: 1, + is_fragment: false, + }, + vec![0b0111_1000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + ), + ( + SizedObu { + obu_type: ObuType::Frame, + has_extension: true, + has_size_field: false, + temporal_id: 4, + spatial_id: 3, + size: 5, + leb_size: 1, + header_len: 2, + is_fragment: false, + }, + vec![0b0011_0100, 0b1001_1000, 1, 2, 3, 4, 5], + ), + ]; + + for (idx, (sized_obu, rtp_bytes)) in obus.into_iter().enumerate() { + println!("running test {}...", idx); + + let mut reader = BitReader::endian(Cursor::new(&rtp_bytes), BigEndian); + + let unsized_obu = UnsizedObu::parse(&mut reader).unwrap(); + assert_eq!( + unsized_obu.as_sized(sized_obu.size, sized_obu.leb_size), + sized_obu + ); + } + } +} diff --git a/net/rtpav1/src/depay/imp.rs b/net/rtpav1/src/depay/imp.rs new file mode 100644 index 00000000..d0c7f228 --- /dev/null +++ b/net/rtpav1/src/depay/imp.rs @@ -0,0 +1,585 @@ +// +// Copyright (C) 2022 Vivienne Watermeier +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::{ + glib, + subclass::{prelude::*, ElementMetadata}, + Buffer, BufferFlags, Caps, DebugCategory, DebugColorFlags, IntRange, Memory, PadDirection, + PadPresence, PadTemplate, ResourceError, StateChange, StateChangeError, StateChangeSuccess, +}; +use gst_base::UniqueAdapter; +use gst_rtp::{ + rtp_buffer::{RTPBuffer, Readable}, + subclass::prelude::*, +}; +use std::{ + cmp::Ordering, + io::{Cursor, Read, Seek, SeekFrom}, + sync::{Mutex, MutexGuard}, +}; + +use bitstream_io::{BitReader, BitWriter}; +use once_cell::sync::Lazy; + +use crate::common::{ + err_opt, leb128_size, parse_leb128, write_leb128, AggregationHeader, ObuType, SizedObu, + UnsizedObu, CLOCK_RATE, ENDIANNESS, +}; + +// TODO: handle internal size fields in RTP OBUs + +#[derive(Debug, Default)] +struct State { + /// used to store outgoing OBUs until the TU is complete + adapter: UniqueAdapter, + + last_timestamp: Option, + /// if true, the last packet of a temporal unit has been received + marked_packet: bool, + /// holds data for a fragment + obu_fragment: Option<(UnsizedObu, Vec)>, +} + +#[derive(Debug, Default)] +pub struct RTPAv1Depay { + state: Mutex, +} + +static CAT: Lazy = Lazy::new(|| { + DebugCategory::new( + "rtpav1depay", + DebugColorFlags::empty(), + Some("RTP AV1 Depayloader"), + ) +}); + +static TEMPORAL_DELIMITER: Lazy = Lazy::new(|| Memory::from_slice(&[0b0001_0010, 0])); + +impl RTPAv1Depay { + fn reset<'s>( + &'s self, + element: &::Type, + state: &mut MutexGuard<'s, State>, + ) { + gst::debug!(CAT, obj: element, "resetting state"); + + **state = State::default() + } +} + +#[glib::object_subclass] +impl ObjectSubclass for RTPAv1Depay { + const NAME: &'static str = "GstRtpAv1Depay"; + type Type = super::RTPAv1Depay; + type ParentType = gst_rtp::RTPBaseDepayload; +} + +impl ObjectImpl for RTPAv1Depay {} + +impl GstObjectImpl for RTPAv1Depay {} + +impl ElementImpl for RTPAv1Depay { + fn metadata() -> Option<&'static ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "RTP AV1 Depayloader", + "Codec/Depayloader/Network/RTP", + "Depayload AV1 from RTP packets", + "Vivienne Watermeier ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let sink_pad_template = PadTemplate::new( + "sink", + PadDirection::Sink, + PadPresence::Always, + &Caps::builder("application/x-rtp") + .field("media", "video") + .field("payload", IntRange::new(96, 127)) + .field("clock-rate", CLOCK_RATE as i32) + .field("encoding-name", "AV1") + .build(), + ) + .unwrap(); + + let src_pad_template = PadTemplate::new( + "src", + PadDirection::Src, + PadPresence::Always, + &Caps::builder("video/x-av1") + .field("parsed", true) + .field("stream-format", "obu-stream") + .field("alignment", "tu") + .build(), + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } + + fn change_state( + &self, + element: &Self::Type, + transition: StateChange, + ) -> Result { + gst::debug!(CAT, obj: element, "changing state: {}", transition); + + if matches!(transition, StateChange::ReadyToPaused) { + let mut state = self.state.lock().unwrap(); + self.reset(element, &mut state); + } + + let ret = self.parent_change_state(element, transition); + + if matches!(transition, StateChange::PausedToReady) { + let mut state = self.state.lock().unwrap(); + self.reset(element, &mut state); + } + + ret + } +} + +impl RTPBaseDepayloadImpl for RTPAv1Depay { + fn process_rtp_packet( + &self, + element: &Self::Type, + rtp: &RTPBuffer, + ) -> Option { + gst::log!( + CAT, + obj: element, + "processing RTP packet with payload type {} and size {}", + rtp.payload_type(), + rtp.buffer().size(), + ); + + let payload = rtp + .payload_buffer() + .map_err(err_opt!(element, payload_buf)) + .ok()?; + let payload_map = payload + .map_readable() + .map_err(err_opt!(element, payload_map)) + .ok()?; + + let mut state = self.state.lock().unwrap(); + + if rtp.buffer().flags().contains(BufferFlags::DISCONT) { + gst::debug!(CAT, obj: element, "buffer discontinuity"); + self.reset(element, &mut state); + } + + // number of bytes that can be used in the next outgoing buffer + let mut bytes_ready = 0; + let mut reader = Cursor::new(payload_map.as_ref()); + let mut ready_obus = Buffer::new(); + + let aggr_header = { + let mut byte = [0; 1]; + reader + .read_exact(&mut byte) + .map_err(err_opt!(element, aggr_header_read)) + .ok()?; + AggregationHeader::from(&byte) + }; + + // handle new temporal units + if state.marked_packet || state.last_timestamp != Some(rtp.timestamp()) { + if state.last_timestamp.is_some() && state.obu_fragment.is_some() { + gst::error!( + CAT, + obj: element, + concat!( + "invalid packet: packet is part of a new TU but ", + "the previous TU still has an incomplete OBU", + "marked_packet: {}, last_timestamp: {:?}" + ), + state.marked_packet, + state.last_timestamp + ); + self.reset(element, &mut state); + return None; + } + + // all the currently stored bytes can be packed into the next outgoing buffer + bytes_ready = state.adapter.available(); + + // the next temporal unit starts with a temporal delimiter OBU + ready_obus + .get_mut() + .unwrap() + .insert_memory(None, TEMPORAL_DELIMITER.clone()); + state.marked_packet = false; + } + state.marked_packet = rtp.is_marker(); + state.last_timestamp = Some(rtp.timestamp()); + + // parse and prepare the received OBUs + let mut idx = 0; + + // handle leading OBU fragment + if let Some((obu, ref mut bytes)) = &mut state.obu_fragment { + if !aggr_header.leading_fragment { + gst::error!( + CAT, + obj: element, + "invalid packet: ignores unclosed OBU fragment" + ); + return None; + } + + let (element_size, is_last_obu) = + find_element_info(element, rtp, &mut reader, &aggr_header, idx)?; + + let bytes_end = bytes.len(); + bytes.resize(bytes_end + element_size as usize, 0); + reader + .read_exact(&mut bytes[bytes_end..]) + .map_err(err_opt!(element, buf_read)) + .ok()?; + + // if this OBU is complete, it can be appended to the adapter + if !(is_last_obu && aggr_header.trailing_fragment) { + let full_obu = { + let size = bytes.len() as u32 - obu.header_len; + let leb_size = leb128_size(size) as u32; + obu.as_sized(size, leb_size) + }; + + let buffer = translate_obu(element, &mut Cursor::new(bytes.as_slice()), &full_obu)?; + + state.adapter.push(buffer); + state.obu_fragment = None; + } + } + + // handle other OBUs, including trailing fragments + while reader.position() < rtp.payload_size() as u64 { + let (element_size, is_last_obu) = + find_element_info(element, rtp, &mut reader, &aggr_header, idx)?; + + let header_pos = reader.position(); + let mut bitreader = BitReader::endian(&mut reader, ENDIANNESS); + let obu = UnsizedObu::parse(&mut bitreader) + .map_err(err_opt!(element, obu_read)) + .ok()?; + + reader + .seek(SeekFrom::Start(header_pos)) + .map_err(err_opt!(element, buf_read)) + .ok()?; + + // ignore these OBU types + if matches!(obu.obu_type, ObuType::TemporalDelimiter | ObuType::TileList) { + reader + .seek(SeekFrom::Current(element_size as i64)) + .map_err(err_opt!(element, buf_read)) + .ok()?; + } + // trailing OBU fragments are stored in the state + if is_last_obu && aggr_header.trailing_fragment { + let bytes_left = rtp.payload_size() - (reader.position() as u32); + let mut bytes = vec![0; bytes_left as usize]; + reader + .read_exact(bytes.as_mut_slice()) + .map_err(err_opt!(element, buf_read)) + .ok()?; + + state.obu_fragment = Some((obu, bytes)); + } + // full OBUs elements are translated and appended to the adapter + else { + let full_obu = { + let size = element_size - obu.header_len; + let leb_size = leb128_size(size) as u32; + obu.as_sized(size, leb_size) + }; + + ready_obus.append(translate_obu(element, &mut reader, &full_obu)?); + } + + idx += 1; + } + + state.adapter.push(ready_obus); + + if state.marked_packet { + if state.obu_fragment.is_some() { + gst::error!( + CAT, + obj: element, + concat!( + "invalid packet: has marker bit set, but ", + "last OBU is not yet complete" + ) + ); + self.reset(element, &mut state); + return None; + } + + bytes_ready = state.adapter.available(); + } + + // now push all the complete temporal units + if bytes_ready > 0 { + gst::log!( + CAT, + obj: element, + "creating buffer containing {} bytes of data...", + bytes_ready + ); + Some( + state + .adapter + .take_buffer(bytes_ready) + .map_err(err_opt!(element, buf_take)) + .ok()?, + ) + } else { + None + } + } +} + +/// Find out the next OBU element's size, and if it is the last OBU in the packet. +/// The reader is expected to be at the first byte of the element, +/// or its preceding size field if present, +/// and will be at the first byte past the element's size field afterwards. +fn find_element_info( + element: &::Type, + rtp: &RTPBuffer, + reader: &mut Cursor<&[u8]>, + aggr_header: &AggregationHeader, + index: u32, +) -> Option<(u32, bool)> { + let element_size: u32; + let is_last_obu: bool; + + if let Some(count) = aggr_header.obu_count { + is_last_obu = index + 1 == count as u32; + element_size = if is_last_obu { + rtp.payload_size() - (reader.position() as u32) + } else { + let mut bitreader = BitReader::endian(reader, ENDIANNESS); + parse_leb128(&mut bitreader) + .map_err(err_opt!(element, leb_read)) + .ok()? as u32 + } + } else { + element_size = parse_leb128(&mut BitReader::endian(&mut *reader, ENDIANNESS)) + .map_err(err_opt!(element, leb_read)) + .ok()? as u32; + is_last_obu = match rtp + .payload_size() + .cmp(&(reader.position() as u32 + element_size)) + { + Ordering::Greater => false, + Ordering::Equal => true, + Ordering::Less => { + gst::error!( + CAT, + obj: element, + "invalid packet: size field gives impossibly large OBU size" + ); + return None; + } + }; + } + + Some((element_size, is_last_obu)) +} + +/// Using OBU data from an RTP packet, construct a buffer containing that OBU in AV1 bitstream format +fn translate_obu( + element: &::Type, + reader: &mut Cursor<&[u8]>, + obu: &SizedObu, +) -> Option { + let mut bytes = Buffer::with_size(obu.full_size() as usize) + .map_err(err_opt!(element, buf_alloc)) + .ok()? + .into_mapped_buffer_writable() + .unwrap(); + + // write OBU header + reader + .read_exact(&mut bytes[..obu.header_len as usize]) + .map_err(err_opt!(element, buf_read)) + .ok()?; + + // set `has_size_field` + bytes[0] |= 1 << 1; + + // skip internal size field if present + if obu.has_size_field { + parse_leb128(&mut BitReader::endian(&mut *reader, ENDIANNESS)) + .map_err(err_opt!(element, leb_read)) + .ok()?; + } + + // write size field + write_leb128( + &mut BitWriter::endian( + Cursor::new(&mut bytes[obu.header_len as usize..]), + ENDIANNESS, + ), + obu.size, + ) + .map_err(err_opt!(element, leb_write)) + .ok()?; + + // write OBU payload + reader + .read_exact(&mut bytes[(obu.header_len + obu.leb_size) as usize..]) + .map_err(err_opt!(element, buf_read)) + .ok()?; + + Some(bytes.into_buffer()) +} + +#[cfg(test)] +#[rustfmt::skip] +mod tests { + use super::*; + use std::io::Cursor; + use gst::buffer::Buffer; + use gst_rtp::rtp_buffer::RTPBufferExt; + + #[test] + fn test_translate_obu() { + gst::init().unwrap(); + + let test_data = [ + ( + SizedObu { + obu_type: ObuType::TemporalDelimiter, + has_extension: false, + has_size_field: false, + temporal_id: 0, + spatial_id: 0, + size: 0, + leb_size: 1, + header_len: 1, + is_fragment: false, + }, + vec![0b0001_0000], + vec![0b0001_0010, 0], + ), ( + SizedObu { + obu_type: ObuType::Frame, + has_extension: true, + has_size_field: false, + temporal_id: 3, + spatial_id: 2, + size: 5, + leb_size: 1, + header_len: 2, + is_fragment: false, + }, + vec![0b0011_0100, 0b0111_0000, 1, 2, 3, 4, 5], + vec![0b0011_0110, 0b0111_0000, 0b0000_0101, 1, 2, 3, 4, 5], + ), ( + SizedObu { + obu_type: ObuType::Frame, + has_extension: true, + has_size_field: true, + temporal_id: 3, + spatial_id: 2, + size: 5, + leb_size: 1, + header_len: 2, + is_fragment: false, + }, + vec![0b0011_0100, 0b0111_0000, 0b0000_0101, 1, 2, 3, 4, 5], + vec![0b0011_0110, 0b0111_0000, 0b0000_0101, 1, 2, 3, 4, 5], + ) + ]; + + let element = ::Type::new(); + for (idx, (obu, rtp_bytes, out_bytes)) in test_data.into_iter().enumerate() { + println!("running test {}...", idx); + let mut reader = Cursor::new(rtp_bytes.as_slice()); + + let actual = translate_obu(&element, &mut reader, &obu); + assert_eq!(reader.position(), rtp_bytes.len() as u64); + assert!(actual.is_some()); + + let actual = actual + .unwrap() + .into_mapped_buffer_readable() + .unwrap(); + assert_eq!(actual.as_slice(), out_bytes.as_slice()); + } + } + + #[test] + #[allow(clippy::type_complexity)] + fn test_find_element_info() { + gst::init().unwrap(); + + let test_data: [(Vec<(u32, bool)>, u32, Vec, AggregationHeader); 4] = [ + ( + vec![(1, false)], // expected results + 100, // RTP payload size + vec![0b0000_0001, 0b0001_0000], + AggregationHeader { obu_count: None, ..AggregationHeader::default() }, + ), ( + vec![(5, true)], + 5, + vec![0b0111_1000, 0, 0, 0, 0], + AggregationHeader { obu_count: Some(1), ..AggregationHeader::default() }, + ), ( + vec![(7, true)], + 8, + vec![0b0000_0111, 0b0011_0110, 0b0010_1000, 0b0000_1010, 1, 2, 3, 4], + AggregationHeader { obu_count: None, ..AggregationHeader::default() }, + ), ( + vec![(6, false), (4, true)], + 11, + vec![0b0000_0110, 0b0111_1000, 1, 2, 3, 4, 5, 0b0011_0000, 1, 2, 3], + AggregationHeader { obu_count: Some(2), ..AggregationHeader::default() }, + ) + ]; + + let element = ::Type::new(); + for (idx, ( + info, + payload_size, + rtp_bytes, + aggr_header, + )) in test_data.into_iter().enumerate() { + println!("running test {}...", idx); + let buffer = Buffer::new_rtp_with_sizes(payload_size, 0, 0).unwrap(); + let rtp = RTPBuffer::from_buffer_readable(&buffer).unwrap(); + let mut reader = Cursor::new(rtp_bytes.as_slice()); + + let mut element_size = 0; + for (obu_idx, expected) in info.into_iter().enumerate() { + if element_size != 0 { + reader.seek(SeekFrom::Current(element_size as i64)).unwrap(); + } + + println!("testing element {} with reader position {}...", obu_idx, reader.position()); + + let actual = find_element_info(&element, &rtp, &mut reader, &aggr_header, obu_idx as u32); + assert_eq!(actual, Some(expected)); + element_size = actual.unwrap().0; + } + } + } +} diff --git a/net/rtpav1/src/depay/mod.rs b/net/rtpav1/src/depay/mod.rs new file mode 100644 index 00000000..2313ef30 --- /dev/null +++ b/net/rtpav1/src/depay/mod.rs @@ -0,0 +1,35 @@ +// +// Copyright (C) 2022 Vivienne Watermeier +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 +#![allow(clippy::new_without_default)] + +use glib::Object; +use gst::glib; +use gst::prelude::*; + +pub mod imp; + +glib::wrapper! { + pub struct RTPAv1Depay(ObjectSubclass) + @extends gst_rtp::RTPBaseDepayload, gst::Element, gst::Object; +} + +impl RTPAv1Depay { + pub fn new() -> Self { + Object::new(&[]).expect("Failed to create AV1 depayloader") + } +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "rtpav1depay", + gst::Rank::Marginal, + RTPAv1Depay::static_type(), + ) +} diff --git a/net/rtpav1/src/lib.rs b/net/rtpav1/src/lib.rs new file mode 100644 index 00000000..793abe41 --- /dev/null +++ b/net/rtpav1/src/lib.rs @@ -0,0 +1,32 @@ +// +// Copyright (C) 2022 Vivienne Watermeier +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; + +mod common; +pub mod depay; +pub mod pay; + +fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + depay::register(plugin)?; + pay::register(plugin)?; + Ok(()) +} + +gst::plugin_define!( + rtpav1, + env!("CARGO_PKG_DESCRIPTION"), + plugin_init, + concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")), + "MPL", + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_REPOSITORY"), + env!("BUILD_REL_DATE") +); diff --git a/net/rtpav1/src/pay/imp.rs b/net/rtpav1/src/pay/imp.rs new file mode 100644 index 00000000..c8ceb721 --- /dev/null +++ b/net/rtpav1/src/pay/imp.rs @@ -0,0 +1,818 @@ +// +// Copyright (C) 2022 Vivienne Watermeier +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::{ + glib, + subclass::{prelude::*, ElementMetadata}, + Buffer, BufferFlags, Caps, ClockTime, DebugCategory, DebugColorFlags, Event, EventType, + FlowError, FlowSuccess, IntRange, LoggableError, PadDirection, PadPresence, PadTemplate, + ResourceError, StateChange, StateChangeError, StateChangeSuccess, +}; +use gst_rtp::{prelude::*, rtp_buffer::RTPBuffer, subclass::prelude::*, RTPBasePayload}; +use std::{ + io::{Cursor, Read, Seek, SeekFrom, Write}, + sync::{Mutex, MutexGuard}, +}; + +use bitstream_io::{BitReader, BitWriter}; +use once_cell::sync::Lazy; + +use crate::common::{ + err_flow, leb128_size, write_leb128, ObuType, SizedObu, CLOCK_RATE, ENDIANNESS, +}; + +static CAT: Lazy = Lazy::new(|| { + DebugCategory::new( + "rtpav1pay", + DebugColorFlags::empty(), + Some("RTP AV1 Payloader"), + ) +}); + +// TODO: properly handle `max_ptime` and `min_ptime` + +/// Information about the OBUs intended to be grouped into one packet +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] +struct PacketOBUData { + obu_count: usize, + payload_size: u32, + last_obu_fragment_size: Option, + omit_last_size_field: bool, + ends_temporal_unit: bool, +} + +/// Temporary information held between invocations of `consider_new_packet()` +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] +struct TempPacketData { + payload_limit: u32, + required_ids: Option<(u8, u8)>, + /// bytes used for an OBUs size field will only be added to the total + /// once its known for sure it will be placed in the packet + pending_bytes: u32, + packet: PacketOBUData, +} + +#[derive(Clone, Debug, Default, PartialEq, Eq)] +struct ObuData { + info: SizedObu, + bytes: Vec, + dts: Option, + pts: Option, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +struct State { + /// Holds header information and raw bytes for all received OBUs, + /// as well as DTS and PTS + //obus: Vec<(SizedObu, Vec, Option, Option)>, + obus: Vec, + + /// Indicates that the first element in the Buffer is an OBU fragment, + /// left over from the previous RTP packet + open_obu_fragment: bool, + + /// Indicates the next constructed packet will be the first in its sequence + /// (Corresponds to `N` field in the aggregation header) + first_packet_in_seq: bool, + + temp_packet_data: Option, +} + +#[derive(Debug, Default)] +pub struct RTPAv1Pay { + state: Mutex, +} + +impl Default for State { + fn default() -> Self { + Self { + obus: Vec::new(), + open_obu_fragment: false, + first_packet_in_seq: true, + temp_packet_data: None, + } + } +} + +impl RTPAv1Pay { + fn reset<'s>( + &'s self, + element: &::Type, + state: &mut MutexGuard<'s, State>, + ) { + gst::debug!(CAT, obj: element, "resetting state"); + + state.obus.clear(); + } + + /// Parses new OBUs, stores them in the state, + /// and constructs and sends new RTP packets when appropriate. + fn handle_new_obus<'s>( + &'s self, + element: &::Type, + state: &mut MutexGuard<'s, State>, + data: &[u8], + dts: Option, + pts: Option, + ) -> Result { + let mut reader = Cursor::new(data); + + while reader.position() < data.len() as u64 { + let obu_start = reader.position(); + let obu = SizedObu::parse(&mut BitReader::endian(&mut reader, ENDIANNESS)) + .map_err(err_flow!(element, buf_read))?; + + // tile lists and temporal delimiters should not be transmitted, + // see section 5 of the RTP AV1 spec + match obu.obu_type { + // completely ignore tile lists + ObuType::TileList => { + gst::log!(CAT, obj: element, "ignoring tile list OBU"); + reader + .seek(SeekFrom::Current( + (obu.header_len + obu.leb_size + obu.size) as i64, + )) + .map_err(err_flow!(element, buf_read))?; + } + + // keep these OBUs around for now so we know where temporal units end + ObuType::TemporalDelimiter => { + if obu.size != 0 { + gst::element_error!( + element, + ResourceError::Read, + ["temporal delimiter OBUs should have empty payload"] + ); + return Err(FlowError::Error); + } + state.obus.push(ObuData { + info: obu, + bytes: Vec::new(), + dts, + pts, + }); + } + + _ => { + let bytes_total = (obu.header_len + obu.size) as usize; + let mut bytes = vec![0; bytes_total]; + + // read header + reader + .seek(SeekFrom::Start(obu_start)) + .map_err(err_flow!(element, buf_read))?; + reader + .read_exact(&mut bytes[0..(obu.header_len as usize)]) + .map_err(err_flow!(element, buf_read))?; + + // skip size field + bytes[0] &= !2_u8; // set `has_size_field` to 0 + reader + .seek(SeekFrom::Current(obu.leb_size as i64)) + .map_err(err_flow!(element, buf_read))?; + + // read OBU bytes + reader + .read_exact(&mut bytes[(obu.header_len as usize)..bytes_total]) + .map_err(err_flow!(element, buf_read))?; + + state.obus.push(ObuData { + info: obu, + bytes, + dts, + pts, + }); + } + } + } + + while let Some(packet_data) = self.consider_new_packet(element, state, false) { + self.push_new_packet(element, state, packet_data)?; + } + + Ok(FlowSuccess::Ok) + } + + /// Look at the size the currently stored OBUs would require, + /// as well as their temportal IDs to decide if it is time to construct a + /// new packet, and what OBUs to include in it. + /// + /// If `true` is passed for `force`, packets of any size will be accepted, + /// which is used in flushing the last OBUs after receiving an EOS for example. + fn consider_new_packet<'s>( + &'s self, + element: &::Type, + state: &mut MutexGuard<'s, State>, + force: bool, + ) -> Option { + gst::trace!( + CAT, + obj: element, + "{} new packet, currently storing {} OBUs", + if force { "forcing" } else { "considering" }, + state.obus.len() + ); + + let mut data = state.temp_packet_data.take().unwrap_or_else(|| { + TempPacketData { + payload_limit: RTPBuffer::calc_payload_len(element.mtu(), 0, 0), + packet: PacketOBUData { + payload_size: 1, // 1 byte is used for the aggregation header + omit_last_size_field: true, + ..PacketOBUData::default() + }, + ..TempPacketData::default() + } + }); + let mut packet = data.packet; + + // figure out how many OBUs we can fit into this packet + while packet.obu_count < state.obus.len() { + // for OBUs with extension headers, spatial and temporal IDs must be equal + // to all other such OBUs in the packet + let matching_obu_ids = |obu: &SizedObu, data: &mut TempPacketData| -> bool { + if let Some((sid, tid)) = data.required_ids { + sid == obu.spatial_id && tid == obu.temporal_id + } else { + data.required_ids = Some((obu.spatial_id, obu.temporal_id)); + true + } + }; + + let current = state.obus[packet.obu_count].info; + + // should this packet be finished here? + if current.obu_type == ObuType::TemporalDelimiter { + // remove the temporal delimiter, it is not supposed to be transmitted + gst::log!(CAT, obj: element, "ignoring temporal delimiter OBU"); + state.obus.remove(packet.obu_count); + + if packet.obu_count > 0 { + packet.ends_temporal_unit = true; + if packet.obu_count > 3 { + packet.payload_size += data.pending_bytes; + packet.omit_last_size_field = false; + } + return Some(packet); + } else { + continue; + } + } else if packet.payload_size >= data.payload_limit + || (packet.obu_count > 0 && current.obu_type == ObuType::SequenceHeader) + || !matching_obu_ids(&state.obus[packet.obu_count].info, &mut data) + { + if packet.obu_count > 3 { + packet.payload_size += data.pending_bytes; + packet.omit_last_size_field = false; + } + return Some(packet); + } + + // would the full OBU fit? + if packet.payload_size + data.pending_bytes + current.full_size() <= data.payload_limit + { + packet.obu_count += 1; + packet.payload_size += current.partial_size() + data.pending_bytes; + data.pending_bytes = current.leb_size; + } + // would it fit without the size field? + else if packet.obu_count < 3 + && packet.payload_size + data.pending_bytes + current.partial_size() + <= data.payload_limit + { + packet.obu_count += 1; + packet.payload_size += current.partial_size() + data.pending_bytes; + + return Some(packet); + } + // otherwise consider putting an OBU fragment + else { + let leb_size = if packet.obu_count < 3 { + 0 + } else { + // assume the biggest possible OBU fragment, + // so if anything the size field will be smaller than expected + leb128_size(data.payload_limit - packet.payload_size) as u32 + }; + + // is there even enough space to bother? + if packet.payload_size + data.pending_bytes + leb_size + current.header_len + < data.payload_limit + { + packet.obu_count += 1; + packet.last_obu_fragment_size = Some( + data.payload_limit - packet.payload_size - data.pending_bytes - leb_size, + ); + packet.payload_size = data.payload_limit; + packet.omit_last_size_field = leb_size == 0; + } else if packet.obu_count > 3 { + packet.payload_size += data.pending_bytes; + } + + return Some(packet); + } + } + + if force && packet.obu_count > 0 { + if packet.obu_count > 3 { + packet.payload_size += data.pending_bytes; + packet.omit_last_size_field = false; + } + Some(packet) + } else { + // if we ran out of OBUs with space in the packet to spare, wait a bit longer + data.packet = packet; + state.temp_packet_data = Some(data); + None + } + } + + /// Given the information returned by consider_new_packet(), construct and push + /// new RTP packet, filled with those OBUs. + fn push_new_packet<'s>( + &'s self, + element: &::Type, + state: &mut MutexGuard<'s, State>, + packet: PacketOBUData, + ) -> Result { + gst::log!( + CAT, + obj: element, + "constructing new RTP packet with {} OBUs", + packet.obu_count + ); + + // prepare the outgoing buffer + let mut outbuf = Buffer::new_rtp_with_sizes(packet.payload_size, 0, 0).map_err(|err| { + gst::element_error!( + element, + ResourceError::Write, + ["Failed to allocate output buffer: {}", err] + ); + + FlowError::Error + })?; + + { + // this block enforces that outbuf_mut is dropped before pushing outbuf + let outbuf_mut = outbuf + .get_mut() + .expect("Failed to get mutable reference to outbuf"); + outbuf_mut.set_dts(state.obus[0].dts); + outbuf_mut.set_pts(state.obus[0].pts); + + let mut rtp = + RTPBuffer::from_buffer_writable(outbuf_mut).expect("Failed to create RTPBuffer"); + rtp.set_marker(packet.ends_temporal_unit); + + let payload = rtp + .payload_mut() + .expect("Failed to get mutable reference to RTP payload"); + let mut writer = Cursor::new(payload); + + { + // construct aggregation header + let w = if packet.omit_last_size_field && packet.obu_count < 4 { + packet.obu_count + } else { + 0 + }; + + let aggr_header: [u8; 1] = [ + (state.open_obu_fragment as u8) << 7 | // Z + ((packet.last_obu_fragment_size != None) as u8) << 6 | // Y + (w as u8) << 4 | // W + (state.first_packet_in_seq as u8) << 3 // N + ; 1]; + + writer + .write(&aggr_header) + .map_err(err_flow!(element, aggr_header_write))?; + + state.first_packet_in_seq = false; + } + + // append OBUs to the buffer + for _ in 1..packet.obu_count { + let obu = &state.obus[0]; + + write_leb128( + &mut BitWriter::endian(&mut writer, ENDIANNESS), + obu.info.size + obu.info.header_len, + ) + .map_err(err_flow!(element, leb_write))?; + writer + .write(&obu.bytes) + .map_err(err_flow!(element, obu_write))?; + + state.obus.remove(0); + } + state.open_obu_fragment = false; + + { + // do the last OBU separately + // in this instance `obu_size` includes the header length + let obu_size = if let Some(size) = packet.last_obu_fragment_size { + state.open_obu_fragment = true; + size + } else { + state.obus[0].bytes.len() as u32 + }; + + if !packet.omit_last_size_field { + write_leb128(&mut BitWriter::endian(&mut writer, ENDIANNESS), obu_size) + .map_err(err_flow!(element, leb_write))?; + } + + // if this OBU is not a fragment, handle it as usual + if packet.last_obu_fragment_size == None { + writer + .write(&state.obus[0].bytes) + .map_err(err_flow!(element, obu_write))?; + state.obus.remove(0); + } + // otherwise write only a slice, and update the element + // to only contain the unwritten bytes + else { + writer + .write(&state.obus[0].bytes[0..obu_size as usize]) + .map_err(err_flow!(element, obu_write))?; + + let new_size = state.obus[0].bytes.len() as u32 - obu_size; + state.obus[0] = ObuData { + info: SizedObu { + size: new_size, + header_len: 0, + leb_size: leb128_size(new_size) as u32, + is_fragment: true, + ..state.obus[0].info + }, + bytes: Vec::from( + &state.obus[0].bytes[obu_size as usize..state.obus[0].bytes.len()], + ), + ..state.obus[0] + }; + } + } + } + + gst::log!( + CAT, + obj: element, + "pushing RTP packet of size {}", + outbuf.size() + ); + element.push(outbuf) + } +} + +#[glib::object_subclass] +impl ObjectSubclass for RTPAv1Pay { + const NAME: &'static str = "GstRtpAv1Pay"; + type Type = super::RTPAv1Pay; + type ParentType = RTPBasePayload; +} + +impl ObjectImpl for RTPAv1Pay {} + +impl GstObjectImpl for RTPAv1Pay {} + +impl ElementImpl for RTPAv1Pay { + fn metadata() -> Option<&'static ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + ElementMetadata::new( + "RTP AV1 payloader", + "Codec/Payloader/Network/RTP", + "Payload AV1 as RTP packets", + "Vivienne Watermeier ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let sink_pad_template = PadTemplate::new( + "sink", + PadDirection::Sink, + PadPresence::Always, + &Caps::builder("video/x-av1") + .field("parsed", true) + .field("stream-format", "obu-stream") + .field("alignment", "obu") + .build(), + ) + .unwrap(); + + let src_pad_template = PadTemplate::new( + "src", + PadDirection::Src, + PadPresence::Always, + &Caps::builder("application/x-rtp") + .field("media", "video") + .field("payload", IntRange::new(96, 127)) + .field("clock-rate", CLOCK_RATE as i32) + .field("encoding-name", "AV1") + .build(), + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } + + fn change_state( + &self, + element: &Self::Type, + transition: StateChange, + ) -> Result { + gst::debug!(CAT, obj: element, "changing state: {}", transition); + + if matches!(transition, StateChange::ReadyToPaused) { + let mut state = self.state.lock().unwrap(); + self.reset(element, &mut state); + } + + let ret = self.parent_change_state(element, transition); + + if matches!(transition, StateChange::PausedToReady) { + let mut state = self.state.lock().unwrap(); + self.reset(element, &mut state); + } + + ret + } +} + +impl RTPBasePayloadImpl for RTPAv1Pay { + fn set_caps(&self, element: &Self::Type, _caps: &Caps) -> Result<(), LoggableError> { + element.set_options("video", true, "AV1", CLOCK_RATE); + + gst::debug!(CAT, obj: element, "setting caps"); + + Ok(()) + } + + fn handle_buffer( + &self, + element: &Self::Type, + buffer: Buffer, + ) -> Result { + gst::trace!( + CAT, + obj: element, + "received buffer of size {}", + buffer.size() + ); + + let mut state = self.state.lock().unwrap(); + + if buffer.flags().contains(BufferFlags::DISCONT) { + gst::debug!(CAT, obj: element, "buffer discontinuity"); + self.reset(element, &mut state); + } + + let dts = buffer.dts(); + let pts = buffer.pts(); + + let buffer = buffer.into_mapped_buffer_readable().map_err(|_| { + gst::element_error!( + element, + ResourceError::Read, + ["Failed to map buffer readable"] + ); + + FlowError::Error + })?; + + self.handle_new_obus(element, &mut state, buffer.as_slice(), dts, pts) + } + + fn sink_event(&self, element: &Self::Type, event: Event) -> bool { + gst::log!(CAT, obj: element, "sink event: {}", event.type_()); + + if matches!(event.type_(), EventType::Eos) { + let mut state = self.state.lock().unwrap(); + + // flush all remaining OBUs + while let Some(packet_data) = self.consider_new_packet(element, &mut state, true) { + if self + .push_new_packet(element, &mut state, packet_data) + .is_err() + { + break; + } + } + } + + self.parent_sink_event(element, event) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::common::*; + + #[test] + fn test_consider_new_packet() { + gst::init().unwrap(); + + let base_obu = SizedObu { + has_extension: false, + has_size_field: true, + leb_size: 1, + header_len: 1, + is_fragment: false, + ..SizedObu::default() + }; + + let input_data = [ + ( + false, // force argument + State { + // payloader state + obus: vec![ + ObuData { + info: SizedObu { + obu_type: ObuType::Padding, + size: 3, + ..base_obu + }, + bytes: vec![1, 2, 3], + ..ObuData::default() + }, + ObuData { + info: SizedObu { + obu_type: ObuType::Frame, + size: 4, + ..base_obu + }, + bytes: vec![1, 2, 3, 4], + ..ObuData::default() + }, + ObuData { + info: SizedObu { + obu_type: ObuType::Frame, + size: 5, + ..base_obu + }, + bytes: vec![1, 2, 3, 4, 5], + ..ObuData::default() + }, + ObuData { + // last two OBUs should not be counted + info: SizedObu { + obu_type: ObuType::TemporalDelimiter, + size: 0, + ..base_obu + }, + ..ObuData::default() + }, + ObuData { + info: SizedObu { + obu_type: ObuType::Frame, + size: 10, + ..base_obu + }, + bytes: vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + ..ObuData::default() + }, + ], + ..State::default() + }, + ), + ( + true, + State { + obus: vec![ + ObuData { + info: SizedObu { + obu_type: ObuType::TemporalDelimiter, + size: 0, + ..base_obu + }, + ..ObuData::default() + }, + ObuData { + info: SizedObu { + obu_type: ObuType::Frame, + size: 7, + ..base_obu + }, + bytes: vec![1, 2, 3, 4, 5, 6, 7], + ..ObuData::default() + }, + ObuData { + info: SizedObu { + obu_type: ObuType::Padding, + size: 6, + ..base_obu + }, + bytes: vec![1, 2, 3, 4, 5, 6], + ..ObuData::default() + }, + ObuData { + info: SizedObu { + obu_type: ObuType::Frame, + size: 9, + ..base_obu + }, + bytes: vec![1, 2, 3, 4, 5, 6, 7, 8, 9], + ..ObuData::default() + }, + ObuData { + info: SizedObu { + obu_type: ObuType::Frame, + size: 3, + ..base_obu + }, + bytes: vec![1, 2, 3], + ..ObuData::default() + }, + ], + ..State::default() + }, + ), + ( + false, + State { + obus: vec![ObuData { + info: SizedObu { + obu_type: ObuType::Frame, + size: 4, + ..base_obu + }, + bytes: vec![1, 2, 3, 4], + ..ObuData::default() + }], + ..State::default() + }, + ), + ]; + + let results = [ + ( + Some(PacketOBUData { + obu_count: 3, + payload_size: 18, + last_obu_fragment_size: None, + omit_last_size_field: true, + ends_temporal_unit: true, + }), + State { + obus: vec![ + input_data[0].1.obus[0].clone(), + input_data[0].1.obus[1].clone(), + input_data[0].1.obus[2].clone(), + input_data[0].1.obus[4].clone(), + ], + ..input_data[0].1 + }, + ), + ( + Some(PacketOBUData { + obu_count: 4, + payload_size: 34, + last_obu_fragment_size: None, + omit_last_size_field: false, + ends_temporal_unit: false, + }), + State { + obus: input_data[1].1.obus[1..].to_owned(), + ..input_data[1].1 + }, + ), + (None, input_data[2].1.clone()), + ]; + + let element = ::Type::new(); + let pay = element.imp(); + for idx in 0..input_data.len() { + println!("running test {}...", idx); + + let mut state = pay.state.lock().unwrap(); + *state = input_data[idx].1.clone(); + + assert_eq!( + pay.consider_new_packet(&element, &mut state, input_data[idx].0), + results[idx].0, + ); + assert_eq!(state.obus, results[idx].1.obus); + assert_eq!(state.open_obu_fragment, results[idx].1.open_obu_fragment); + assert_eq!( + state.first_packet_in_seq, + results[idx].1.first_packet_in_seq + ); + } + } +} diff --git a/net/rtpav1/src/pay/mod.rs b/net/rtpav1/src/pay/mod.rs new file mode 100644 index 00000000..a6517e7e --- /dev/null +++ b/net/rtpav1/src/pay/mod.rs @@ -0,0 +1,35 @@ +// +// Copyright (C) 2022 Vivienne Watermeier +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 +#![allow(clippy::new_without_default)] + +use glib::Object; +use gst::glib; +use gst::prelude::*; + +pub mod imp; + +glib::wrapper! { + pub struct RTPAv1Pay(ObjectSubclass) + @extends gst_rtp::RTPBasePayload, gst::Element, gst::Object; +} + +impl RTPAv1Pay { + pub fn new() -> Self { + Object::new(&[]).expect("Failed to create AV1 payloader") + } +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "rtpav1pay", + gst::Rank::Marginal, + RTPAv1Pay::static_type(), + ) +} diff --git a/net/rtpav1/tests/rtpav1.rs b/net/rtpav1/tests/rtpav1.rs new file mode 100644 index 00000000..93462386 --- /dev/null +++ b/net/rtpav1/tests/rtpav1.rs @@ -0,0 +1,224 @@ +// +// Copyright (C) 2022 Vivienne Watermeier +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::{event::Eos, prelude::*, Buffer, Caps, ClockTime}; +use gst_check::Harness; +use gst_rtp::{rtp_buffer::RTPBufferExt, RTPBuffer}; + +fn init() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + gst::init().unwrap(); + gstrtpav1::plugin_register_static().expect("rtpav1 test"); + }); +} + +#[test] +#[rustfmt::skip] +fn test_depayloader() { + let test_packets: [(Vec, bool, u32); 4] = [ + ( // simple packet, complete TU + vec![ // RTP payload + 0b0001_1000, + 0b0011_0000, 1, 2, 3, 4, 5, 6, + ], + true, // marker bit + 100_000, // timestamp + ), ( // 2 OBUs, last is fragmented + vec![ + 0b0110_0000, + 0b0000_0110, 0b0111_1000, 1, 2, 3, 4, 5, + 0b0011_0000, 1, 2, 3, + ], + false, + 190_000, + ), ( // continuation of the last OBU + vec![ + 0b1100_0000, + 0b0000_0100, 4, 5, 6, 7, + ], + false, + 190_000, + ), ( // finishing the OBU fragment + vec![ + 0b1001_0000, + 8, 9, 10, + ], + true, + 190_000, + ) + ]; + + let expected: [Vec; 2] = [ + vec![ + 0b0001_0010, 0, + 0b0011_0010, 0b0000_0110, 1, 2, 3, 4, 5, 6, + ], + vec![ + 0b0001_0010, 0, + 0b0111_1010, 0b0000_0101, 1, 2, 3, 4, 5, + 0b0011_0010, 0b0000_1010, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, + ], + ]; + + init(); + + let mut h = Harness::new("rtpav1depay"); + h.play(); + + let caps = Caps::builder("application/x-rtp") + .field("media", "video") + .field("payload", 96) + .field("clock-rate", 90000) + .field("encoding-name", "AV1") + .build(); + h.set_src_caps(caps); + + for (idx, (bytes, marker, timestamp)) in test_packets.iter().enumerate() { + let mut buf = Buffer::new_rtp_with_sizes(bytes.len() as u32, 0, 0).unwrap(); + { + let buf_mut = buf.get_mut().unwrap(); + let mut rtp_mut = RTPBuffer::from_buffer_writable(buf_mut).unwrap(); + rtp_mut.set_marker(*marker); + rtp_mut.set_timestamp(*timestamp); + rtp_mut.set_payload_type(96); + rtp_mut.set_seq(idx as u16); + rtp_mut.payload_mut().unwrap().copy_from_slice(bytes); + } + + h.push(buf).unwrap(); + } + h.push_event(Eos::new()); + + for (idx, ex) in expected.iter().enumerate() { + println!("checking buffer {}...", idx); + + let buffer = h.pull().unwrap(); + let actual = buffer.into_mapped_buffer_readable().unwrap(); + assert_eq!(actual.as_slice(), ex.as_slice()); + } +} + +#[test] +#[rustfmt::skip] +fn test_payloader() { + let test_buffers: [(u64, Vec); 3] = [ + ( + 0, + vec![ // this should result in exactly 25 bytes for the RTP payload + 0b0001_0010, 0, + 0b0011_0010, 0b0000_1100, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, + 0b0011_0010, 0b0000_1001, 1, 2, 3, 4, 5, 6, 7, 8, 9, + ], + ), ( + 0, + vec![ // these all have to go in separate packets since their IDs mismatch + 0b0111_1010, 0b0000_0100, 1, 2, 3, 4, + 0b0011_0110, 0b0010_1000, 0b0000_0101, 1, 2, 3, 4, 5, + 0b0011_0110, 0b0100_1000, 0b0000_0001, 1, + ], + ), ( + 1_000_000_000, + vec![ + 0b0001_0010, 0, + 0b0011_0010, 0b0000_0100, 1, 2, 3, 4, + ] + ) + ]; + + let expected = [ + ( + false, // marker bit + 0, // relative RTP timestamp + vec![ // payload bytes + 0b0010_1000, + 0b0000_1101, 0b0011_0000, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, + 0b0011_0000, 1, 2, 3, 4, 5, 6, 7, 8, 9, + ], + ), ( + false, + 0, + vec![ + 0b0001_0000, + 0b0111_1000, 1, 2, 3, 4, + ] + ), ( + false, + 0, + vec![ + 0b0001_0000, + 0b0011_0100, 0b0010_1000, 1, 2, 3, 4, 5, + ] + ), ( + true, + 0, + vec![ + 0b0001_0000, + 0b0011_0100, 0b0100_1000, 1, + ] + ), ( + false, + 90_000, + vec![ + 0b0001_0000, + 0b0011_0000, 1, 2, 3, 4, + ] + ) + ]; + + init(); + + let mut h = Harness::new("rtpav1pay"); + { + let pay = h.element().unwrap(); + pay.set_property( + "mtu", + RTPBuffer::calc_packet_len(25, 0, 0) + ); + } + h.play(); + + let caps = Caps::builder("video/x-av1") + .field("parsed", true) + .field("stream-format", "obu-stream") + .field("alignment", "obu") + .build(); + h.set_src_caps(caps); + + for (pts, bytes) in &test_buffers { + let mut buffer = Buffer::with_size(bytes.len()) + .unwrap() + .into_mapped_buffer_writable() + .unwrap(); + buffer.copy_from_slice(bytes); + + let mut buffer = buffer.into_buffer(); + buffer.get_mut().unwrap().set_pts(ClockTime::try_from(*pts).unwrap()); + + h.push(buffer).unwrap(); + } + h.push_event(Eos::new()); + + let mut base_ts = None; + for (idx, (marker, ts_offset, payload)) in expected.iter().enumerate() { + println!("checking packet {}...", idx); + + let buffer = h.pull().unwrap(); + let packet = RTPBuffer::from_buffer_readable(&buffer).unwrap(); + if base_ts.is_none() { + base_ts = Some(packet.timestamp()); + } + + assert_eq!(packet.payload().unwrap(), payload.as_slice()); + assert_eq!(packet.is_marker(), *marker); + assert_eq!(packet.timestamp(), base_ts.unwrap() + ts_offset); + } +}