From 80dd01404fbcb6146aef13ebae485eace05fcb38 Mon Sep 17 00:00:00 2001 From: Rafael Caricio Date: Mon, 15 Apr 2024 18:45:44 +0200 Subject: [PATCH] fmp4mux: Support AV1 packaging in the fragmented mp4 plugin --- mux/fmp4/Cargo.toml | 3 +- mux/fmp4/src/fmp4mux/boxes.rs | 6 + mux/fmp4/src/fmp4mux/imp.rs | 67 +++++++++ mux/fmp4/src/fmp4mux/mod.rs | 4 + mux/fmp4/src/fmp4mux/obu.rs | 251 ++++++++++++++++++++++++++++++++++ 5 files changed, 330 insertions(+), 1 deletion(-) create mode 100644 mux/fmp4/src/fmp4mux/obu.rs diff --git a/mux/fmp4/Cargo.toml b/mux/fmp4/Cargo.toml index e7e2b102..0ae3dff7 100644 --- a/mux/fmp4/Cargo.toml +++ b/mux/fmp4/Cargo.toml @@ -14,8 +14,9 @@ gst = { workspace = true, features = ["v1_18"] } gst-base = { workspace = true, features = ["v1_18"] } gst-audio = { workspace = true, features = ["v1_18"] } gst-video = { workspace = true, features = ["v1_18"] } -gst-pbutils = { workspace = true, features = ["v1_18"] } +gst-pbutils = { workspace = true, features = ["v1_20"] } once_cell.workspace = true +bitstream-io = "2.1" [lib] name = "gstfmp4" diff --git a/mux/fmp4/src/fmp4mux/boxes.rs b/mux/fmp4/src/fmp4mux/boxes.rs index e2fdeae7..fa870c33 100644 --- a/mux/fmp4/src/fmp4mux/boxes.rs +++ b/mux/fmp4/src/fmp4mux/boxes.rs @@ -161,6 +161,9 @@ fn cmaf_brands_from_caps(caps: &gst::CapsRef, compatible_brands: &mut Vec<&'stat "audio/mpeg" => { compatible_brands.push(b"caac"); } + "video/x-av1" => { + compatible_brands.push(b"cmf2"); + } "video/x-h265" => { let width = s.get::("width").ok(); let height = s.get::("height").ok(); @@ -1144,6 +1147,9 @@ fn write_visual_sample_entry( v.extend_from_slice(&codec_data); } + if let Some(extra_data) = &stream.extra_header_data { + v.extend_from_slice(extra_data.as_slice()); + } Ok(()) })?; } diff --git a/mux/fmp4/src/fmp4mux/imp.rs b/mux/fmp4/src/fmp4mux/imp.rs index 501a482e..f773cb3b 100644 --- a/mux/fmp4/src/fmp4mux/imp.rs +++ b/mux/fmp4/src/fmp4mux/imp.rs @@ -12,10 +12,13 @@ use gst::subclass::prelude::*; use gst_base::prelude::*; use gst_base::subclass::prelude::*; +use bitstream_io::{BigEndian, BitReader, BitWriter}; use std::collections::VecDeque; +use std::io::{Cursor, Read, Seek, SeekFrom}; use std::mem; use std::sync::Mutex; +use crate::fmp4mux::obu::{parse_leb128, write_leb128, ObuType, SizedObu}; use once_cell::sync::Lazy; use super::boxes; @@ -224,6 +227,8 @@ struct Stream { /// Mapping between running time and UTC time in ONVIF mode. running_time_utc_time_mapping: Option<(gst::Signed, gst::ClockTime)>, + + extra_header_data: Option>, } #[derive(Default)] @@ -798,6 +803,62 @@ impl FMP4Mux { stream.dts_offset.display(), ); + let s = stream.caps.structure(0).unwrap(); + if s.name().as_str() == "video/x-av1" { + let buf_map = buffer.map_readable().map_err(|_| { + gst::error!(CAT, obj: stream.sinkpad, "Failed to map buffer"); + gst::FlowError::Error + })?; + let data = buf_map.as_slice(); + let mut cursor = Cursor::new(data); + + while cursor.position() < data.len() as u64 { + let obu_start = cursor.position(); + if let Ok(obu) = SizedObu::parse(&mut BitReader::endian(&mut cursor, BigEndian)) + { + gst::debug!(CAT, obj: stream.sinkpad, "Parsed OBU: {:?}", obu); + if obu.obu_type == ObuType::SequenceHeader { + let mut bytes = vec![0; obu.full_size() as usize]; + + // set reader to the beginning of the OBU + cursor + .seek(SeekFrom::Start(obu_start)) + .map_err(|_| gst::FlowError::Error)?; + + // write OBU header + cursor + .read_exact(&mut bytes[..obu.header_len as usize]) + .map_err(|_| gst::FlowError::Error)?; + + bytes[0] |= 1 << 1; // set `has_size_field` + parse_leb128(&mut BitReader::endian(&mut cursor, BigEndian)) // skip internal size field + .map_err(|_| gst::FlowError::Error)?; + + // write size field + write_leb128( + &mut BitWriter::endian( + Cursor::new(&mut bytes[obu.header_len as usize..]), + BigEndian, + ), + obu.size, + ) + .map_err(|_| gst::FlowError::Error)?; + + // write OBU payload + cursor + .read_exact(&mut bytes[(obu.header_len + obu.leb_size) as usize..]) + .map_err(|_| gst::FlowError::Error)?; + + // copy header to extra_header_data + stream.extra_header_data = Some(bytes); + break; + } + } else { + break; + } + } + } + let gop = Gop { start_pts: pts, start_dts: dts, @@ -2630,6 +2691,7 @@ impl FMP4Mux { dts_offset: None, current_position: gst::ClockTime::ZERO, running_time_utc_time_mapping: None, + extra_header_data: None, }); } @@ -2697,6 +2759,7 @@ impl FMP4Mux { trak_timescale: s.sinkpad.imp().settings.lock().unwrap().trak_timescale, delta_frames: s.delta_frames, caps: s.caps.clone(), + extra_header_data: s.extra_header_data.clone(), }) .collect::>(); @@ -3556,6 +3619,10 @@ impl ElementImpl for CMAFMux { .field("width", gst::IntRange::new(1, u16::MAX as i32)) .field("height", gst::IntRange::new(1, u16::MAX as i32)) .build(), + gst::Structure::builder("video/x-av1") + .field("stream-format", "obu-stream") + .field("alignment", "tu") + .build(), gst::Structure::builder("video/x-h265") .field("stream-format", gst::List::new(["hvc1", "hev1"])) .field("alignment", "au") diff --git a/mux/fmp4/src/fmp4mux/mod.rs b/mux/fmp4/src/fmp4mux/mod.rs index bb50c289..e7d890ac 100644 --- a/mux/fmp4/src/fmp4mux/mod.rs +++ b/mux/fmp4/src/fmp4mux/mod.rs @@ -11,6 +11,7 @@ use gst::prelude::*; mod boxes; mod imp; +mod obu; glib::wrapper! { pub(crate) struct FMP4MuxPad(ObjectSubclass) @extends gst_base::AggregatorPad, gst::Pad, gst::Object; @@ -101,6 +102,9 @@ pub(crate) struct HeaderStream { /// Pre-defined trak timescale if not 0. trak_timescale: u32, + + // More data to be included in the fragmented stream header + extra_header_data: Option>, } #[derive(Debug)] diff --git a/mux/fmp4/src/fmp4mux/obu.rs b/mux/fmp4/src/fmp4mux/obu.rs new file mode 100644 index 00000000..852e72a2 --- /dev/null +++ b/mux/fmp4/src/fmp4mux/obu.rs @@ -0,0 +1,251 @@ +// +// Copyright (C) 2022 Vivienne Watermeier +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 +#![allow(non_camel_case_types)] + +use bitstream_io::{BitRead, BitReader, BitWrite, BitWriter, Endianness}; +use std::io::{self, Read, Seek, Write}; + +pub fn parse_leb128(reader: &mut BitReader) -> io::Result<(u32, u32)> +where + R: Read + Seek, + E: Endianness, +{ + let mut value = 0; + let mut num_bytes = 0; + + for i in 0..8 { + let byte = reader.read::(8)?; + value |= (byte & 0x7f) << (i * 7); + num_bytes += 1; + if byte & 0x80 == 0 { + break; + } + } + + reader.byte_align(); + Ok((value, num_bytes)) +} + +pub fn write_leb128(writer: &mut BitWriter, mut value: u32) -> io::Result<()> +where + W: Write + Seek, + E: Endianness, +{ + loop { + writer.write_bit(value > 0x7f)?; + writer.write(7, value & 0x7f)?; + value >>= 7; + if value == 0 { + writer.byte_align()?; + return Ok(()); + } + } +} + +#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)] +pub struct SizedObu { + pub obu_type: ObuType, + pub has_extension: bool, + /// If the OBU header is followed by a leb128 size field. + pub has_size_field: bool, + pub temporal_id: u8, + pub spatial_id: u8, + /// size of the OBU payload in bytes. + /// This may refer to different sizes in different contexts, not always + /// to the entire OBU payload as it is in the AV1 bitstream. + pub size: u32, + /// the number of bytes the leb128 size field will take up + /// when written with write_leb128(). + /// This does not imply `has_size_field`, and does not necessarily match with + /// the length of the internal size field if present. + pub leb_size: u32, + pub header_len: u32, + /// indicates that only part of this OBU has been processed so far + pub is_fragment: bool, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ObuType { + Reserved, + SequenceHeader, + TemporalDelimiter, + FrameHeader, + TileGroup, + Metadata, + Frame, + RedundantFrameHeader, + TileList, + Padding, +} + +impl Default for ObuType { + fn default() -> Self { + Self::Reserved + } +} + +impl SizedObu { + /// Parse an OBU header and size field. If the OBU is not expected to contain + /// a size field, but the size is known from external information, + /// parse as an `UnsizedObu` and use `to_sized`. + pub fn parse(reader: &mut BitReader) -> io::Result + where + R: Read + Seek, + E: Endianness, + { + // check the forbidden bit + if reader.read_bit()? { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "forbidden bit in OBU header is set", + )); + } + + let obu_type = reader.read::(4)?.into(); + let has_extension = reader.read_bit()?; + + // require a size field + if !reader.read_bit()? { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "expected a size field", + )); + } + + // ignore the reserved bit + let _ = reader.read_bit()?; + + let (temporal_id, spatial_id) = if has_extension { + (reader.read::(3)?, reader.read::(2)?) + } else { + (0, 0) + }; + + reader.byte_align(); + + let (size, leb_size) = parse_leb128(reader)?; + + Ok(Self { + obu_type, + has_extension, + has_size_field: true, + temporal_id, + spatial_id, + size, + leb_size, + header_len: has_extension as u32 + 1, + is_fragment: false, + }) + } + + /// The amount of bytes this OBU will take up, including the space needed for + /// its leb128 size field. + pub fn full_size(&self) -> u32 { + self.size + self.leb_size + self.header_len + } +} + +impl From for ObuType { + fn from(n: u8) -> Self { + assert!(n < 16); + + match n { + 1 => Self::SequenceHeader, + 2 => Self::TemporalDelimiter, + 3 => Self::FrameHeader, + 4 => Self::TileGroup, + 5 => Self::Metadata, + 6 => Self::Frame, + 7 => Self::RedundantFrameHeader, + 8 => Self::TileList, + 15 => Self::Padding, + _ => Self::Reserved, + } + } +} + +impl From for u8 { + fn from(ty: ObuType) -> Self { + match ty { + ObuType::Reserved => 0, + ObuType::SequenceHeader => 1, + ObuType::TemporalDelimiter => 2, + ObuType::FrameHeader => 3, + ObuType::TileGroup => 4, + ObuType::Metadata => 5, + ObuType::Frame => 6, + ObuType::RedundantFrameHeader => 7, + ObuType::TileList => 8, + ObuType::Padding => 15, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bitstream_io::{BigEndian, BitRead, BitReader}; + use std::io::Cursor; + + #[test] + fn test_parse_rtp_obu() { + let obus = [ + ( + SizedObu { + obu_type: ObuType::TemporalDelimiter, + has_extension: false, + has_size_field: false, + temporal_id: 0, + spatial_id: 0, + size: 0, + leb_size: 1, + header_len: 1, + is_fragment: false, + }, + vec![0b0001_0000], + ), + ( + SizedObu { + obu_type: ObuType::Padding, + has_extension: false, + has_size_field: false, + temporal_id: 0, + spatial_id: 0, + size: 10, + leb_size: 1, + header_len: 1, + is_fragment: false, + }, + vec![0b0111_1000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + ), + ( + SizedObu { + obu_type: ObuType::Frame, + has_extension: true, + has_size_field: false, + temporal_id: 4, + spatial_id: 3, + size: 5, + leb_size: 1, + header_len: 2, + is_fragment: false, + }, + vec![0b0011_0100, 0b1001_1000, 1, 2, 3, 4, 5], + ), + ]; + + for (idx, (sized_obu, rtp_bytes)) in obus.into_iter().enumerate() { + println!("running test {idx}..."); + + let mut reader = BitReader::endian(Cursor::new(&rtp_bytes), BigEndian); + + SizedObu::parse(&mut reader).unwrap(); + } + } +}