From e3fbf2078de6f996cb05600e70e24c9021bb0454 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 18 Oct 2021 09:42:42 +0300 Subject: [PATCH] Add new fmp4 plugin with muxers for ISO fragmented MP4, DASH and CMAF containers --- Cargo.toml | 1 + generic/fmp4/Cargo.toml | 47 + generic/fmp4/LICENSE | 1 + generic/fmp4/build.rs | 3 + generic/fmp4/examples/dash_vod.rs | 266 +++++ generic/fmp4/src/fmp4mux/boxes.rs | 1760 +++++++++++++++++++++++++++++ generic/fmp4/src/fmp4mux/imp.rs | 1508 ++++++++++++++++++++++++ generic/fmp4/src/fmp4mux/mod.rs | 118 ++ generic/fmp4/src/lib.rs | 28 + generic/fmp4/tests/tests.rs | 139 +++ meson.build | 1 + 11 files changed, 3872 insertions(+) create mode 100644 generic/fmp4/Cargo.toml create mode 120000 generic/fmp4/LICENSE create mode 100644 generic/fmp4/build.rs create mode 100644 generic/fmp4/examples/dash_vod.rs create mode 100644 generic/fmp4/src/fmp4mux/boxes.rs create mode 100644 generic/fmp4/src/fmp4mux/imp.rs create mode 100644 generic/fmp4/src/fmp4mux/mod.rs create mode 100644 generic/fmp4/src/lib.rs create mode 100644 generic/fmp4/tests/tests.rs diff --git a/Cargo.toml b/Cargo.toml index 2ff8e0d3..aeed29b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "audio/claxon", "audio/csound", "audio/lewton", + "generic/fmp4", "generic/file", "generic/sodium", "generic/threadshare", diff --git a/generic/fmp4/Cargo.toml b/generic/fmp4/Cargo.toml new file mode 100644 index 00000000..9304911f --- /dev/null +++ b/generic/fmp4/Cargo.toml @@ -0,0 +1,47 @@ +[package] +name = "gst-plugin-fmp4" +version = "0.8.0" +authors = ["Sebastian Dröge "] +license = "MPL-2.0" +description = "Fragmented MP4 Plugin" +repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" +edition = "2021" +rust-version = "1.56" + +[dependencies] +anyhow = "1" +gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_14"] } +gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_14"] } +gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_14"] } +once_cell = "1.0" + +[lib] +name = "gstfmp4" +crate-type = ["cdylib", "rlib"] +path = "src/lib.rs" + +[dev-dependencies] +gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] } +gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] } + +[build-dependencies] +gst-plugin-version-helper = { path="../../version-helper" } + +[features] +default = ["v1_18"] +static = [] +capi = [] +v1_18 = ["gst-video/v1_18"] + +[package.metadata.capi] +min_version = "0.8.0" + +[package.metadata.capi.header] +enabled = false + +[package.metadata.capi.library] +install_subdir = "gstreamer-1.0" +versioning = false + +[package.metadata.capi.pkg_config] +requires_private = "gstreamer-1.0, gstreamer-audio-1.0, gstreamer-video-1.0, gobject-2.0, glib-2.0, gmodule-2.0" diff --git a/generic/fmp4/LICENSE b/generic/fmp4/LICENSE new file mode 120000 index 00000000..eb5d24fe --- /dev/null +++ b/generic/fmp4/LICENSE @@ -0,0 +1 @@ +../../LICENSE-MPL-2.0 \ No newline at end of file diff --git a/generic/fmp4/build.rs b/generic/fmp4/build.rs new file mode 100644 index 00000000..cda12e57 --- /dev/null +++ b/generic/fmp4/build.rs @@ -0,0 +1,3 @@ +fn main() { + gst_plugin_version_helper::info() +} diff --git a/generic/fmp4/examples/dash_vod.rs b/generic/fmp4/examples/dash_vod.rs new file mode 100644 index 00000000..763ca7d3 --- /dev/null +++ b/generic/fmp4/examples/dash_vod.rs @@ -0,0 +1,266 @@ +// Copyright (C) 2021 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +// This creates a VoD DASH manifest based on the output of `cmafmux`. The media header +// ("initialization segment") is written into a separate file as the segments, and each segment is +// its own file too. +// +// All segments that are created are exactly 10s, expect for the last one which is only 3.333s. + +use gst::prelude::*; + +use std::fmt::Write; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; + +use anyhow::Error; + +struct Segment { + start_time: gst::ClockTime, + duration: gst::ClockTime, +} + +struct State { + start_time: Option, + end_time: Option, + segments: Vec, + path: PathBuf, +} + +fn main() -> Result<(), Error> { + gst::init()?; + + gstfmp4::plugin_register_static()?; + + let state = Arc::new(Mutex::new(State { + start_time: None, + end_time: None, + segments: Vec::new(), + path: PathBuf::from("dash_stream"), + })); + + let pipeline = gst::parse_launch("videotestsrc num-buffers=2500 ! timecodestamper ! video/x-raw,format=I420,width=1280,height=720,framerate=30/1 ! timeoverlay ! x264enc bframes=0 bitrate=2048 ! video/x-h264,profile=main ! cmafmux fragment-duration=10000000000 header-update-mode=update write-mehd=true ! appsink name=sink").unwrap().downcast::().unwrap(); + + let sink = pipeline + .by_name("sink") + .unwrap() + .dynamic_cast::() + .unwrap(); + sink.set_buffer_list(true); + + let state_clone = state.clone(); + sink.set_callbacks( + gst_app::AppSinkCallbacks::builder() + .new_sample(move |sink| { + let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?; + let mut state = state.lock().unwrap(); + + // The muxer only outputs non-empty buffer lists + let mut buffer_list = sample.buffer_list_owned().expect("no buffer list"); + assert!(!buffer_list.is_empty()); + + let mut first = buffer_list.get(0).unwrap(); + + // Each list contains a full segment, i.e. does not start with a DELTA_UNIT + assert!(!first.flags().contains(gst::BufferFlags::DELTA_UNIT)); + + // If the buffer has the DISCONT and HEADER flag set then it contains the media + // header, i.e. the `ftyp`, `moov` and other media boxes. + // + // This might be the initial header or the updated header at the end of the stream. + if first.flags().contains(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER) { + let mut path = state.path.clone(); + std::fs::create_dir_all(&path).expect("failed to create directory"); + path.push("init.cmfi"); + + println!("writing header to {}", path.display()); + let map = first.map_readable().unwrap(); + std::fs::write(path, &map).expect("failed to write header"); + drop(map); + + // Remove the header from the buffer list + buffer_list.make_mut().remove(0, 1); + + // If the list is now empty then it only contained the media header and nothing + // else. + if buffer_list.is_empty() { + return Ok(gst::FlowSuccess::Ok); + } + + // Otherwise get the next buffer and continue working with that. + first = buffer_list.get(0).unwrap(); + } + + // If the buffer only has the HEADER flag set then this is a segment header that is + // followed by one or more actual media buffers. + assert!(first.flags().contains(gst::BufferFlags::HEADER)); + + let segment = sample.segment().expect("no segment") + .downcast_ref::().expect("no time segment"); + + // Initialize the start time with the first PTS we observed. This will be used + // later for calculating the duration of the whole media for the DASH manifest. + // + // The PTS of the segment header is equivalent to the earliest PTS of the whole + // segment. + let pts = segment.to_running_time(first.pts().unwrap()).expect("can't get running time"); + if state.start_time.is_none() { + state.start_time = Some(pts); + } + + // The metadata of the first media buffer is duplicated to the segment header. + // Based on this we can know the timecode of the first frame in this segment. + let meta = first.meta::().expect("no timecode meta"); + + let mut path = state.path.clone(); + path.push(format!("segment_{}.cmfv", state.segments.len() + 1)); + println!("writing segment with timecode {} to {}", meta.tc(), path.display()); + + // Calculate the end time at this point. The duration of the segment header is set + // to the whole duration of this segment. + let duration = first.duration().unwrap(); + let end_time = first.pts().unwrap() + first.duration().unwrap(); + state.end_time = Some(segment.to_running_time(end_time).expect("can't get running time")); + + let mut file = std::fs::File::create(path).expect("failed to open fragment"); + for buffer in &*buffer_list { + use std::io::prelude::*; + + let map = buffer.map_readable().unwrap(); + file.write_all(&map).expect("failed to write fragment"); + } + + state.segments.push(Segment { + start_time: pts, + duration, + }); + + Ok(gst::FlowSuccess::Ok) + }) + .eos(move |_sink| { + let state = state_clone.lock().unwrap(); + + // Now write the manifest + let mut path = state.path.clone(); + path.push("manifest.mpd"); + + println!("writing manifest to {}", path.display()); + + let duration = state.end_time.opt_checked_sub(state.start_time).ok().flatten().unwrap().mseconds() as f64 / 1000.0; + + // Write the whole segment timeline out here, compressing multiple segments with + // the same duration to a repeated segment. + let mut segment_timeline = String::new(); + let mut write_segment = |start: gst::ClockTime, duration: gst::ClockTime, repeat: usize| { + if repeat > 0 { + writeln!( + &mut segment_timeline, + " ", + time = start.mseconds(), + duration = duration.mseconds(), + repeat = repeat + ).unwrap(); + } else { + writeln!( + &mut segment_timeline, + " ", + time = start.mseconds(), + duration = duration.mseconds() + ).unwrap(); + } + }; + + let mut start = None; + let mut num_segments = 0; + let mut last_duration = None; + for segment in &state.segments { + if start.is_none() { + start = Some(segment.start_time); + } + if last_duration.is_none() { + last_duration = Some(segment.duration); + } + + // If the duration of this segment is different from the previous one then we + // have to write out the segment now. + if last_duration != Some(segment.duration) { + write_segment(start.unwrap(), last_duration.unwrap(), num_segments - 1); + start = Some(segment.start_time); + last_duration = Some(segment.duration); + num_segments = 1; + } else { + num_segments += 1; + } + } + + // Write the last segment if any + if num_segments > 0 { + write_segment(start.unwrap(), last_duration.unwrap(), num_segments - 1); + } + + let manifest = format!(r###" + + + + + + +{segment_timeline} + + + + + +"###, + duration = duration, segment_timeline = segment_timeline); + + std::fs::write(path, &manifest).expect("failed to write manifest"); + }) + .build(), + ); + + pipeline.set_state(gst::State::Playing)?; + + let bus = pipeline + .bus() + .expect("Pipeline without bus. Shouldn't happen!"); + + for msg in bus.iter_timed(gst::ClockTime::NONE) { + use gst::MessageView; + + match msg.view() { + MessageView::Eos(..) => { + println!("EOS"); + break; + } + MessageView::Error(err) => { + pipeline.set_state(gst::State::Null)?; + eprintln!( + "Got error from {}: {} ({})", + msg.src() + .map(|s| String::from(s.path_string())) + .unwrap_or_else(|| "None".into()), + err.error().to_string(), + err.debug().unwrap_or_else(|| "".into()), + ); + break; + } + _ => (), + } + } + + pipeline.set_state(gst::State::Null)?; + + Ok(()) +} diff --git a/generic/fmp4/src/fmp4mux/boxes.rs b/generic/fmp4/src/fmp4mux/boxes.rs new file mode 100644 index 00000000..7f9ae910 --- /dev/null +++ b/generic/fmp4/src/fmp4mux/boxes.rs @@ -0,0 +1,1760 @@ +// Copyright (C) 2021 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::prelude::*; + +use anyhow::{bail, Context, Error}; + +use super::Buffer; + +fn write_box) -> Result>( + vec: &mut Vec, + fourcc: impl std::borrow::Borrow<[u8; 4]>, + content_func: F, +) -> Result { + // Write zero size ... + let size_pos = vec.len(); + vec.extend([0u8; 4]); + vec.extend(fourcc.borrow()); + + let res = content_func(vec)?; + + // ... and update it here later. + let size: u32 = vec + .len() + .checked_sub(size_pos) + .expect("vector shrunk") + .try_into() + .context("too big box content")?; + vec[size_pos..][..4].copy_from_slice(&size.to_be_bytes()); + + Ok(res) +} + +const FULL_BOX_VERSION_0: u8 = 0; +const FULL_BOX_VERSION_1: u8 = 1; + +const FULL_BOX_FLAGS_NONE: u32 = 0; + +fn write_full_box) -> Result>( + vec: &mut Vec, + fourcc: impl std::borrow::Borrow<[u8; 4]>, + version: u8, + flags: u32, + content_func: F, +) -> Result { + write_box(vec, fourcc, move |vec| { + assert_eq!(flags >> 24, 0); + vec.extend(((u32::from(version) << 24) | flags).to_be_bytes()); + content_func(vec) + }) +} + +fn cmaf_brands_from_caps(caps: &gst::CapsRef, compatible_brands: &mut Vec<&'static [u8; 4]>) { + let s = caps.structure(0).unwrap(); + match s.name() { + "video/x-h264" => { + let width = s.get::("width").ok(); + let height = s.get::("height").ok(); + let fps = s.get::("framerate").ok(); + let profile = s.get::<&str>("profile").ok(); + let level = s + .get::<&str>("level") + .ok() + .map(|l| l.split_once('.').unwrap_or((l, "0"))); + let colorimetry = s.get::<&str>("colorimetry").ok(); + + if let (Some(width), Some(height), Some(profile), Some(level), Some(fps)) = + (width, height, profile, level, fps) + { + if profile == "high" + || profile == "main" + || profile == "baseline" + || profile == "constrained-baseline" + { + if width <= 864 + && height <= 576 + && level <= ("3", "1") + && fps <= gst::Fraction::new(60, 1) + { + #[cfg(feature = "v1_18")] + { + if let Some(colorimetry) = colorimetry + .and_then(|c| c.parse::().ok()) + { + if matches!( + colorimetry.primaries(), + gst_video::VideoColorPrimaries::Bt709 + | gst_video::VideoColorPrimaries::Bt470bg + | gst_video::VideoColorPrimaries::Smpte170m + ) && matches!( + colorimetry.transfer(), + gst_video::VideoTransferFunction::Bt709 + | gst_video::VideoTransferFunction::Bt601 + ) && matches!( + colorimetry.matrix(), + gst_video::VideoColorMatrix::Bt709 + | gst_video::VideoColorMatrix::Bt601 + ) { + compatible_brands.push(b"cfsd"); + } + } else { + // Assume it's OK + compatible_brands.push(b"cfsd"); + } + } + #[cfg(not(feature = "v1_18"))] + { + // Assume it's OK + compatible_brands.push(b"cfsd"); + } + } else if width <= 1920 + && height <= 1080 + && level <= ("4", "0") + && fps <= gst::Fraction::new(60, 1) + { + #[cfg(feature = "v1_18")] + { + if let Some(colorimetry) = colorimetry + .and_then(|c| c.parse::().ok()) + { + if matches!( + colorimetry.primaries(), + gst_video::VideoColorPrimaries::Bt709 + ) && matches!( + colorimetry.transfer(), + gst_video::VideoTransferFunction::Bt709 + ) && matches!( + colorimetry.matrix(), + gst_video::VideoColorMatrix::Bt709 + ) { + compatible_brands.push(b"cfhd"); + } + } else { + // Assume it's OK + compatible_brands.push(b"cfhd"); + } + } + #[cfg(not(feature = "v1_18"))] + { + // Assume it's OK + compatible_brands.push(b"cfhd"); + } + } else if width <= 1920 + && height <= 1080 + && level <= ("4", "2") + && fps <= gst::Fraction::new(60, 1) + { + if let Some(colorimetry) = + colorimetry.and_then(|c| c.parse::().ok()) + { + if matches!( + colorimetry.primaries(), + gst_video::VideoColorPrimaries::Bt709 + ) && matches!( + colorimetry.transfer(), + gst_video::VideoTransferFunction::Bt709 + ) && matches!( + colorimetry.matrix(), + gst_video::VideoColorMatrix::Bt709 + ) { + compatible_brands.push(b"chdf"); + } + } else { + // Assume it's OK + compatible_brands.push(b"chdf"); + } + } + } + } + } + "audio/mpeg" => { + compatible_brands.push(b"caac"); + } + "video/x-h265" => { + let width = s.get::("width").ok(); + let height = s.get::("height").ok(); + let fps = s.get::("framerate").ok(); + let profile = s.get::<&str>("profile").ok(); + let tier = s.get::<&str>("tier").ok(); + let level = s + .get::<&str>("level") + .ok() + .map(|l| l.split_once('.').unwrap_or((l, "0"))); + let colorimetry = s.get::<&str>("colorimetry").ok(); + + if let (Some(width), Some(height), Some(profile), Some(tier), Some(level), Some(fps)) = + (width, height, profile, tier, level, fps) + { + if profile == "main" && tier == "main" { + if width <= 1920 + && height <= 1080 + && level <= ("4", "1") + && fps <= gst::Fraction::new(60, 1) + { + if let Some(colorimetry) = + colorimetry.and_then(|c| c.parse::().ok()) + { + if matches!( + colorimetry.primaries(), + gst_video::VideoColorPrimaries::Bt709 + ) && matches!( + colorimetry.transfer(), + gst_video::VideoTransferFunction::Bt709 + ) && matches!( + colorimetry.matrix(), + gst_video::VideoColorMatrix::Bt709 + ) { + compatible_brands.push(b"chhd"); + } + } else { + // Assume it's OK + compatible_brands.push(b"chhd"); + } + } else if width <= 3840 + && height <= 2160 + && level <= ("5", "0") + && fps <= gst::Fraction::new(60, 1) + { + if let Some(colorimetry) = + colorimetry.and_then(|c| c.parse::().ok()) + { + if matches!( + colorimetry.primaries(), + gst_video::VideoColorPrimaries::Bt709 + ) && matches!( + colorimetry.transfer(), + gst_video::VideoTransferFunction::Bt709 + ) && matches!( + colorimetry.matrix(), + gst_video::VideoColorMatrix::Bt709 + ) { + compatible_brands.push(b"cud8"); + } + } else { + // Assume it's OK + compatible_brands.push(b"cud8"); + } + } + } else if profile == "main-10" && tier == "main-10" { + if width <= 1920 + && height <= 1080 + && level <= ("4", "1") + && fps <= gst::Fraction::new(60, 1) + { + if let Some(colorimetry) = + colorimetry.and_then(|c| c.parse::().ok()) + { + if matches!( + colorimetry.primaries(), + gst_video::VideoColorPrimaries::Bt709 + ) && matches!( + colorimetry.transfer(), + gst_video::VideoTransferFunction::Bt709 + ) && matches!( + colorimetry.matrix(), + gst_video::VideoColorMatrix::Bt709 + ) { + compatible_brands.push(b"chh1"); + } + } else { + // Assume it's OK + compatible_brands.push(b"chh1"); + } + } else if width <= 3840 + && height <= 2160 + && level <= ("5", "1") + && fps <= gst::Fraction::new(60, 1) + { + #[cfg(feature = "v1_18")] + if let Some(colorimetry) = + colorimetry.and_then(|c| c.parse::().ok()) + { + if matches!( + colorimetry.primaries(), + gst_video::VideoColorPrimaries::Bt709 + | gst_video::VideoColorPrimaries::Bt2020 + ) && matches!( + colorimetry.transfer(), + gst_video::VideoTransferFunction::Bt709 + | gst_video::VideoTransferFunction::Bt202010 + | gst_video::VideoTransferFunction::Bt202012 + ) && matches!( + colorimetry.matrix(), + gst_video::VideoColorMatrix::Bt709 + | gst_video::VideoColorMatrix::Bt2020 + ) { + compatible_brands.push(b"cud1"); + } else if matches!( + colorimetry.primaries(), + gst_video::VideoColorPrimaries::Bt2020 + ) && matches!( + colorimetry.transfer(), + gst_video::VideoTransferFunction::Smpte2084 + ) && matches!( + colorimetry.matrix(), + gst_video::VideoColorMatrix::Bt2020 + ) { + compatible_brands.push(b"chd1"); + } else if matches!( + colorimetry.primaries(), + gst_video::VideoColorPrimaries::Bt2020 + ) && matches!( + colorimetry.transfer(), + gst_video::VideoTransferFunction::AribStdB67 + ) && matches!( + colorimetry.matrix(), + gst_video::VideoColorMatrix::Bt2020 + ) { + compatible_brands.push(b"clg1"); + } + } else { + // Assume it's OK + compatible_brands.push(b"cud1"); + } + } + #[cfg(not(feature = "v1_18"))] + { + // Assume it's OK + compatible_brands.push(b"cud1"); + } + } + } + } + _ => (), + } +} + +fn brands_from_variant_and_caps( + variant: super::Variant, + caps: &gst::CapsRef, +) -> (&'static [u8; 4], Vec<&'static [u8; 4]>) { + match variant { + super::Variant::ISO => (b"iso6", vec![b"iso6"]), + super::Variant::DASH => { + // FIXME: `dsms` / `dash` brands, `msix` + (b"msdh", vec![b"dums", b"msdh", b"iso6"]) + } + super::Variant::CMAF => { + let mut compatible_brands = vec![b"iso6", b"cmfc"]; + + cmaf_brands_from_caps(caps, &mut compatible_brands); + + (b"cmf2", compatible_brands) + } + } +} + +/// Creates `ftyp` and `moov` boxes +pub(super) fn create_fmp4_header(cfg: super::HeaderConfiguration) -> Result { + let mut v = vec![]; + + let (brand, compatible_brands) = brands_from_variant_and_caps(cfg.variant, cfg.caps); + + write_box(&mut v, b"ftyp", |v| { + // major brand + v.extend(brand); + // minor version + v.extend(0u32.to_be_bytes()); + // compatible brands + v.extend(compatible_brands.into_iter().flatten()); + + Ok(()) + })?; + + write_box(&mut v, b"moov", |v| write_moov(v, &cfg))?; + + Ok(gst::Buffer::from_mut_slice(v)) +} + +fn write_moov(v: &mut Vec, cfg: &super::HeaderConfiguration) -> Result<(), Error> { + use gst::glib; + + let base = glib::DateTime::new_utc(1904, 1, 1, 0, 0, 0.0)?; + let now = glib::DateTime::new_now_utc()?; + let creation_time = + u64::try_from(now.difference(&base).as_seconds()).expect("time before 1904"); + + write_full_box(v, b"mvhd", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| { + write_mvhd(v, cfg, creation_time) + })?; + write_box(v, b"trak", |v| write_trak(v, cfg, creation_time))?; + write_box(v, b"mvex", |v| write_mvex(v, cfg))?; + + Ok(()) +} + +fn caps_to_timescale(caps: &gst::CapsRef) -> u32 { + let s = caps.structure(0).unwrap(); + + if let Ok(fps) = s.get::("framerate") { + if fps.numer() == 0 { + return 10_000; + } + + if fps.denom() != 1 && fps.denom() != 1001 { + if let Some(fps) = gst::ClockTime::from_nseconds(fps.denom() as u64) + .mul_div_round(1_000_000_000, fps.numer() as u64) + .and_then(gst_video::guess_framerate) + { + return (fps.numer() as u32) + .mul_div_round(100, fps.denom() as u32) + .unwrap_or(10_000); + } + } + + (fps.numer() as u32) + .mul_div_round(100, fps.denom() as u32) + .unwrap_or(10_000) + } else if let Ok(rate) = s.get::("rate") { + rate as u32 + } else { + 10_000 + } +} + +fn write_mvhd( + v: &mut Vec, + cfg: &super::HeaderConfiguration, + creation_time: u64, +) -> Result<(), Error> { + // Creation time + v.extend(creation_time.to_be_bytes()); + // Modification time + v.extend(creation_time.to_be_bytes()); + // Timescale + v.extend(caps_to_timescale(cfg.caps).to_be_bytes()); + // Duration + v.extend(0u64.to_be_bytes()); + + // Rate 1.0 + v.extend((1u32 << 16).to_be_bytes()); + // Volume 1.0 + v.extend((1u16 << 8).to_be_bytes()); + // Reserved + v.extend([0u8; 2 + 2 * 4]); + + // Matrix + v.extend( + std::array::IntoIter::new([ + (1u32 << 16).to_be_bytes(), + 0u32.to_be_bytes(), + 0u32.to_be_bytes(), + 0u32.to_be_bytes(), + (1u32 << 16).to_be_bytes(), + 0u32.to_be_bytes(), + 0u32.to_be_bytes(), + 0u32.to_be_bytes(), + (16384u32 << 16).to_be_bytes(), + ]) + .flatten(), + ); + + // Pre defined + v.extend([0u8; 6 * 4]); + + // Next track id + v.extend(2u32.to_be_bytes()); + + Ok(()) +} + +const TKHD_FLAGS_TRACK_ENABLED: u32 = 0x1; +const TKHD_FLAGS_TRACK_IN_MOVIE: u32 = 0x2; +const TKHD_FLAGS_TRACK_IN_PREVIEW: u32 = 0x4; + +fn write_trak( + v: &mut Vec, + cfg: &super::HeaderConfiguration, + creation_time: u64, +) -> Result<(), Error> { + write_full_box( + v, + b"tkhd", + FULL_BOX_VERSION_1, + TKHD_FLAGS_TRACK_ENABLED | TKHD_FLAGS_TRACK_IN_MOVIE | TKHD_FLAGS_TRACK_IN_PREVIEW, + |v| write_tkhd(v, cfg, creation_time), + )?; + + // TODO: write edts if necessary: for audio tracks to remove initialization samples + // TODO: write edts optionally for negative DTS instead of offsetting the DTS + + write_box(v, b"mdia", |v| write_mdia(v, cfg, creation_time))?; + + Ok(()) +} + +fn write_tkhd( + v: &mut Vec, + cfg: &super::HeaderConfiguration, + creation_time: u64, +) -> Result<(), Error> { + // Creation time + v.extend(creation_time.to_be_bytes()); + // Modification time + v.extend(creation_time.to_be_bytes()); + // Track ID + v.extend(1u32.to_be_bytes()); + // Reserved + v.extend(0u32.to_be_bytes()); + // Duration + v.extend(0u64.to_be_bytes()); + + // Reserved + v.extend([0u8; 2 * 4]); + + // Layer + v.extend(0u16.to_be_bytes()); + // Alternate group + v.extend(0u16.to_be_bytes()); + + // Volume + let s = cfg.caps.structure(0).unwrap(); + match s.name() { + "audio/mpeg" => v.extend((1u16 << 8).to_be_bytes()), + _ => v.extend(0u16.to_be_bytes()), + } + + // Reserved + v.extend([0u8; 2]); + + // Matrix + v.extend( + std::array::IntoIter::new([ + (1u32 << 16).to_be_bytes(), + 0u32.to_be_bytes(), + 0u32.to_be_bytes(), + 0u32.to_be_bytes(), + (1u32 << 16).to_be_bytes(), + 0u32.to_be_bytes(), + 0u32.to_be_bytes(), + 0u32.to_be_bytes(), + (16384u32 << 16).to_be_bytes(), + ]) + .flatten(), + ); + + // Width/height + match s.name() { + "video/x-h264" | "video/x-h265" => { + let width = s.get::("width").context("video caps without width")? as u32; + let height = s + .get::("height") + .context("video caps without height")? as u32; + let par = s + .get::("pixel-aspect-ratio") + .unwrap_or_else(|_| gst::Fraction::new(1, 1)); + + let width = std::cmp::min( + width + .mul_div_round(par.numer() as u32, par.denom() as u32) + .unwrap_or(u16::MAX as u32), + u16::MAX as u32, + ); + let height = std::cmp::min(height, u16::MAX as u32); + + v.extend((width << 16).to_be_bytes()); + v.extend((height << 16).to_be_bytes()); + } + _ => v.extend([0u8; 2 * 4]), + } + + Ok(()) +} + +fn write_mdia( + v: &mut Vec, + cfg: &super::HeaderConfiguration, + creation_time: u64, +) -> Result<(), Error> { + write_full_box(v, b"mdhd", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| { + write_mdhd(v, cfg, creation_time) + })?; + write_full_box(v, b"hdlr", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { + write_hdlr(v, cfg) + })?; + + // TODO: write elng if needed + + write_box(v, b"minf", |v| write_minf(v, cfg))?; + + Ok(()) +} + +fn language_code(lang: impl std::borrow::Borrow<[u8; 3]>) -> u16 { + let lang = lang.borrow(); + + // TODO: Need to relax this once we get the language code from tags + assert!(lang.iter().all(|c| (b'a'..b'z').contains(c))); + + (((lang[0] as u16 - 0x60) & 0x1F) << 10) + + (((lang[1] as u16 - 0x60) & 0x1F) << 5) + + ((lang[2] as u16 - 0x60) & 0x1F) +} + +fn write_mdhd( + v: &mut Vec, + cfg: &super::HeaderConfiguration, + creation_time: u64, +) -> Result<(), Error> { + // Creation time + v.extend(creation_time.to_be_bytes()); + // Modification time + v.extend(creation_time.to_be_bytes()); + // Timescale + v.extend(caps_to_timescale(cfg.caps).to_be_bytes()); + // Duration + v.extend(0u64.to_be_bytes()); + + // Language as ISO-639-2/T + // TODO: get actual language from the tags + v.extend(language_code(b"und").to_be_bytes()); + + // Pre-defined + v.extend([0u8; 2]); + + Ok(()) +} + +fn write_hdlr(v: &mut Vec, cfg: &super::HeaderConfiguration) -> Result<(), Error> { + // Pre-defined + v.extend([0u8; 4]); + + let s = cfg.caps.structure(0).unwrap(); + let (handler_type, name) = match s.name() { + "video/x-h264" | "video/x-h265" => (b"vide", b"VideoHandler\0"), + "audio/mpeg" => (b"soun", b"SoundHandler\0"), + _ => unreachable!(), + }; + + // Handler type + v.extend(handler_type); + + // Reserved + v.extend([0u8; 3 * 4]); + + // Name + v.extend(name); + + Ok(()) +} + +fn write_minf(v: &mut Vec, cfg: &super::HeaderConfiguration) -> Result<(), Error> { + let s = cfg.caps.structure(0).unwrap(); + + match s.name() { + "video/x-h264" | "video/x-h265" => { + // Flags are always 1 for unspecified reasons + write_full_box(v, b"vmhd", FULL_BOX_VERSION_0, 1, |v| write_vmhd(v, cfg))? + } + "audio/mpeg" => write_full_box(v, b"smhd", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { + write_smhd(v, cfg) + })?, + _ => unreachable!(), + } + + write_box(v, b"dinf", |v| write_dinf(v, cfg))?; + + write_box(v, b"stbl", |v| write_stbl(v, cfg))?; + + Ok(()) +} + +fn write_vmhd(v: &mut Vec, _cfg: &super::HeaderConfiguration) -> Result<(), Error> { + // Graphics mode + v.extend([0u8; 2]); + + // opcolor + v.extend([0u8; 2 * 3]); + + Ok(()) +} + +fn write_smhd(v: &mut Vec, _cfg: &super::HeaderConfiguration) -> Result<(), Error> { + // Balance + v.extend([0u8; 2]); + + // Reserved + v.extend([0u8; 2]); + + Ok(()) +} + +fn write_dinf(v: &mut Vec, cfg: &super::HeaderConfiguration) -> Result<(), Error> { + write_full_box(v, b"dref", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { + write_dref(v, cfg) + })?; + + Ok(()) +} + +const DREF_FLAGS_MEDIA_IN_SAME_FILE: u32 = 0x1; + +fn write_dref(v: &mut Vec, _cfg: &super::HeaderConfiguration) -> Result<(), Error> { + // Entry count + v.extend(1u32.to_be_bytes()); + + write_full_box( + v, + b"url ", + FULL_BOX_VERSION_0, + DREF_FLAGS_MEDIA_IN_SAME_FILE, + |_v| Ok(()), + )?; + + Ok(()) +} + +fn write_stbl(v: &mut Vec, cfg: &super::HeaderConfiguration) -> Result<(), Error> { + write_full_box(v, b"stsd", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { + write_stsd(v, cfg) + })?; + write_full_box(v, b"stts", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { + write_stts(v, cfg) + })?; + write_full_box(v, b"stsc", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { + write_stsc(v, cfg) + })?; + write_full_box(v, b"stsz", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { + write_stsz(v, cfg) + })?; + + write_full_box(v, b"stco", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { + write_stco(v, cfg) + })?; + + // For video write a sync sample box as indication that not all samples are sync samples + let s = cfg.caps.structure(0).unwrap(); + match s.name() { + "video/x-h264" | "video/x-h265" => { + write_full_box(v, b"stss", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { + write_stss(v, cfg) + })? + } + _ => (), + } + + Ok(()) +} + +fn write_stsd(v: &mut Vec, cfg: &super::HeaderConfiguration) -> Result<(), Error> { + // Entry count + v.extend(1u32.to_be_bytes()); + + let s = cfg.caps.structure(0).unwrap(); + match s.name() { + "video/x-h264" | "video/x-h265" => write_visual_sample_entry(v, cfg)?, + "audio/mpeg" => write_audio_sample_entry(v, cfg)?, + _ => unreachable!(), + } + + Ok(()) +} + +fn write_sample_entry_box) -> Result>( + v: &mut Vec, + fourcc: impl std::borrow::Borrow<[u8; 4]>, + content_func: F, +) -> Result { + write_box(v, fourcc, move |v| { + // Reserved + v.extend([0u8; 6]); + + // Data reference index + v.extend(1u16.to_be_bytes()); + + content_func(v) + }) +} + +fn write_visual_sample_entry( + v: &mut Vec, + cfg: &super::HeaderConfiguration, +) -> Result<(), Error> { + let s = cfg.caps.structure(0).unwrap(); + let fourcc = match s.name() { + "video/x-h264" => { + let stream_format = s.get::<&str>("stream-format").context("no stream-format")?; + match stream_format { + "avc" => b"avc1", + "avc3" => b"avc3", + _ => unreachable!(), + } + } + "video/x-h265" => { + let stream_format = s.get::<&str>("stream-format").context("no stream-format")?; + match stream_format { + "hvc1" => b"hvc1", + "hev1" => b"hev1", + _ => unreachable!(), + } + } + _ => unreachable!(), + }; + + write_sample_entry_box(v, fourcc, move |v| { + // pre-defined + v.extend([0u8; 2]); + // Reserved + v.extend([0u8; 2]); + // pre-defined + v.extend([0u8; 3 * 4]); + + // Width + let width = + u16::try_from(s.get::("width").context("no width")?).context("too big width")?; + v.extend(width.to_be_bytes()); + + // Height + let height = u16::try_from(s.get::("height").context("no height")?) + .context("too big height")?; + v.extend(height.to_be_bytes()); + + // Horizontal resolution + v.extend(0x00480000u32.to_be_bytes()); + + // Vertical resolution + v.extend(0x00480000u32.to_be_bytes()); + + // Reserved + v.extend([0u8; 4]); + + // Frame count + v.extend(1u16.to_be_bytes()); + + // Compressor name + v.extend([0u8; 32]); + + // Depth + v.extend(0x0018u16.to_be_bytes()); + + // Pre-defined + v.extend((-1i16).to_be_bytes()); + + // Codec specific boxes + match s.name() { + "video/x-h264" => { + let codec_data = s + .get::<&gst::BufferRef>("codec_data") + .context("no codec_data")?; + let map = codec_data + .map_readable() + .context("codec_data not mappable")?; + write_box(v, b"avcC", move |v| { + v.extend_from_slice(&map); + Ok(()) + })?; + } + "video/x-h265" => { + let codec_data = s + .get::<&gst::BufferRef>("codec_data") + .context("no codec_data")?; + let map = codec_data + .map_readable() + .context("codec_data not mappable")?; + write_box(v, b"hvcC", move |v| { + v.extend_from_slice(&map); + Ok(()) + })?; + } + _ => unreachable!(), + } + + if let Ok(par) = s.get::("pixel-aspect-ratio") { + write_box(v, b"pasp", move |v| { + v.extend((par.numer() as u32).to_be_bytes()); + v.extend((par.denom() as u32).to_be_bytes()); + Ok(()) + })?; + } + + if let Some(colorimetry) = s + .get::<&str>("colorimetry") + .ok() + .and_then(|c| c.parse::().ok()) + { + write_box(v, b"colr", move |v| { + v.extend(b"nclx"); + let (primaries, transfer, matrix) = { + #[cfg(feature = "v1_18")] + { + ( + (colorimetry.primaries().to_iso() as u16), + (colorimetry.transfer().to_iso() as u16), + (colorimetry.matrix().to_iso() as u16), + ) + } + #[cfg(not(feature = "v1_18"))] + { + let primaries = match colorimetry.primaries() { + gst_video::VideoColorPrimaries::Bt709 => 1u16, + gst_video::VideoColorPrimaries::Bt470m => 4u16, + gst_video::VideoColorPrimaries::Bt470bg => 5u16, + gst_video::VideoColorPrimaries::Smpte170m => 6u16, + gst_video::VideoColorPrimaries::Smpte240m => 7u16, + gst_video::VideoColorPrimaries::Film => 8u16, + gst_video::VideoColorPrimaries::Bt2020 => 9u16, + _ => 2, + }; + let transfer = match colorimetry.transfer() { + gst_video::VideoTransferFunction::Bt709 => 1u16, + gst_video::VideoTransferFunction::Gamma22 => 4u16, + gst_video::VideoTransferFunction::Gamma28 => 5u16, + gst_video::VideoTransferFunction::Smpte240m => 7u16, + gst_video::VideoTransferFunction::Gamma10 => 8u16, + gst_video::VideoTransferFunction::Log100 => 9u16, + gst_video::VideoTransferFunction::Log316 => 10u16, + gst_video::VideoTransferFunction::Srgb => 13u16, + gst_video::VideoTransferFunction::Bt202012 => 15u16, + _ => 2, + }; + let matrix = match colorimetry.matrix() { + gst_video::VideoColorMatrix::Rgb => 0u16, + gst_video::VideoColorMatrix::Bt709 => 1u16, + gst_video::VideoColorMatrix::Fcc => 4u16, + gst_video::VideoColorMatrix::Bt601 => 6u16, + gst_video::VideoColorMatrix::Smpte240m => 7u16, + gst_video::VideoColorMatrix::Bt2020 => 9u16, + _ => 2, + }; + + (primaries, transfer, matrix) + } + }; + + let full_range = match colorimetry.range() { + gst_video::VideoColorRange::Range0_255 => 0x80u8, + gst_video::VideoColorRange::Range16_235 => 0x00u8, + _ => 0x00, + }; + + v.extend(primaries.to_be_bytes()); + v.extend(transfer.to_be_bytes()); + v.extend(matrix.to_be_bytes()); + v.push(full_range); + + Ok(()) + })?; + } + + #[cfg(feature = "v1_18")] + { + if let Ok(cll) = gst_video::VideoContentLightLevel::from_caps(cfg.caps) { + write_box(v, b"clli", move |v| { + v.extend((cll.max_content_light_level() as u16).to_be_bytes()); + v.extend((cll.max_frame_average_light_level() as u16).to_be_bytes()); + Ok(()) + })?; + } + + if let Ok(mastering) = gst_video::VideoMasteringDisplayInfo::from_caps(cfg.caps) { + write_box(v, b"mdcv", move |v| { + for primary in mastering.display_primaries() { + v.extend(primary.x.to_be_bytes()); + v.extend(primary.y.to_be_bytes()); + } + v.extend(mastering.white_point().x.to_be_bytes()); + v.extend(mastering.white_point().y.to_be_bytes()); + v.extend(mastering.max_display_mastering_luminance().to_be_bytes()); + v.extend(mastering.max_display_mastering_luminance().to_be_bytes()); + Ok(()) + })?; + } + } + + // TODO: write btrt bitrate box based on tags + + Ok(()) + })?; + + Ok(()) +} + +fn write_audio_sample_entry( + v: &mut Vec, + cfg: &super::HeaderConfiguration, +) -> Result<(), Error> { + let s = cfg.caps.structure(0).unwrap(); + let fourcc = match s.name() { + "audio/mpeg" => b"mp4a", + _ => unreachable!(), + }; + + write_sample_entry_box(v, fourcc, move |v| { + // Reserved + v.extend([0u8; 2 * 4]); + + // Channel count + let channels = u16::try_from(s.get::("channels").context("no channels")?) + .context("too many channels")?; + v.extend(channels.to_be_bytes()); + + // Sample size + v.extend(16u16.to_be_bytes()); + + // Pre-defined + v.extend([0u8; 2]); + + // Reserved + v.extend([0u8; 2]); + + // Sample rate + let rate = u16::try_from(s.get::("rate").context("no rate")?).unwrap_or(0); + v.extend((u32::from(rate) << 16).to_be_bytes()); + + // Codec specific boxes + match s.name() { + "audio/mpeg" => { + let codec_data = s + .get::<&gst::BufferRef>("codec_data") + .context("no codec_data")?; + let map = codec_data + .map_readable() + .context("codec_data not mappable")?; + if map.len() < 2 { + bail!("too small codec_data"); + } + write_esds_aac(v, &map)?; + } + _ => unreachable!(), + } + + // If rate did not fit into 16 bits write a full `srat` box + if rate == 0 { + let rate = s.get::("rate").context("no rate")?; + // FIXME: This is defined as full box? + write_full_box( + v, + b"srat", + FULL_BOX_VERSION_0, + FULL_BOX_FLAGS_NONE, + move |v| { + v.extend((rate as u32).to_be_bytes()); + Ok(()) + }, + )?; + } + + // TODO: write btrt bitrate box based on tags + + // TODO: chnl box for channel ordering? probably not needed for AAC + + Ok(()) + })?; + + Ok(()) +} + +fn write_esds_aac(v: &mut Vec, codec_data: &[u8]) -> Result<(), Error> { + let calculate_len = |mut len| { + if len > 260144641 { + bail!("too big descriptor length"); + } + + if len == 0 { + return Ok(([0; 4], 1)); + } + + let mut idx = 0; + let mut lens = [0u8; 4]; + while len > 0 { + lens[idx] = ((if len > 0x7f { 0x80 } else { 0x00 }) | (len & 0x7f)) as u8; + idx += 1; + len >>= 7; + } + + Ok((lens, idx)) + }; + + write_full_box( + v, + b"esds", + FULL_BOX_VERSION_0, + FULL_BOX_FLAGS_NONE, + move |v| { + // Calculate all lengths bottom up + + // Decoder specific info + let decoder_specific_info_len = calculate_len(codec_data.len())?; + + // Decoder config + let decoder_config_len = + calculate_len(13 + 1 + decoder_specific_info_len.1 + codec_data.len())?; + + // SL config + let sl_config_len = calculate_len(1)?; + + // ES descriptor + let es_descriptor_len = calculate_len( + 3 + 1 + + decoder_config_len.1 + + 13 + + 1 + + decoder_specific_info_len.1 + + codec_data.len() + + 1 + + sl_config_len.1 + + 1, + )?; + + // ES descriptor tag + v.push(0x03); + + // Length + v.extend_from_slice(&es_descriptor_len.0[..(es_descriptor_len.1)]); + + // Track ID + v.extend(1u16.to_be_bytes()); + // Flags + v.push(0u8); + + // Decoder config descriptor + v.push(0x04); + + // Length + v.extend_from_slice(&decoder_config_len.0[..(decoder_config_len.1)]); + + // Object type ESDS_OBJECT_TYPE_MPEG4_P3 + v.push(0x40); + // Stream type ESDS_STREAM_TYPE_AUDIO + v.push((0x05 << 2) | 0x01); + + // Buffer size db? + v.extend([0u8; 3]); + + // Max bitrate + v.extend(0u32.to_be_bytes()); + + // Avg bitrate + v.extend(0u32.to_be_bytes()); + + // Decoder specific info + v.push(0x05); + + // Length + v.extend_from_slice(&decoder_specific_info_len.0[..(decoder_specific_info_len.1)]); + v.extend_from_slice(codec_data); + + // SL config descriptor + v.push(0x06); + + // Length: 1 (tag) + 1 (length) + 1 (predefined) + v.extend_from_slice(&sl_config_len.0[..(sl_config_len.1)]); + + // Predefined + v.push(0x02); + Ok(()) + }, + ) +} + +fn write_stts(v: &mut Vec, _cfg: &super::HeaderConfiguration) -> Result<(), Error> { + // Entry count + v.extend(0u32.to_be_bytes()); + + Ok(()) +} + +fn write_stsc(v: &mut Vec, _cfg: &super::HeaderConfiguration) -> Result<(), Error> { + // Entry count + v.extend(0u32.to_be_bytes()); + + Ok(()) +} + +fn write_stsz(v: &mut Vec, _cfg: &super::HeaderConfiguration) -> Result<(), Error> { + // Sample size + v.extend(0u32.to_be_bytes()); + + // Sample count + v.extend(0u32.to_be_bytes()); + + Ok(()) +} + +fn write_stco(v: &mut Vec, _cfg: &super::HeaderConfiguration) -> Result<(), Error> { + // Entry count + v.extend(0u32.to_be_bytes()); + + Ok(()) +} + +fn write_stss(v: &mut Vec, _cfg: &super::HeaderConfiguration) -> Result<(), Error> { + // Entry count + v.extend(0u32.to_be_bytes()); + + Ok(()) +} + +fn write_mvex(v: &mut Vec, cfg: &super::HeaderConfiguration) -> Result<(), Error> { + if cfg.write_mehd { + if cfg.update && cfg.duration.is_some() { + write_full_box(v, b"mehd", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| { + write_mehd(v, cfg) + })?; + } else { + write_box(v, b"free", |v| { + // version/flags of full box + v.extend(0u32.to_be_bytes()); + // mehd duration + v.extend(0u64.to_be_bytes()); + + Ok(()) + })?; + } + } + + write_full_box(v, b"trex", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { + write_trex(v, cfg) + })?; + + Ok(()) +} + +fn write_mehd(v: &mut Vec, cfg: &super::HeaderConfiguration) -> Result<(), Error> { + let timescale = caps_to_timescale(cfg.caps); + + let duration = cfg + .duration + .expect("no duration") + .mul_div_ceil(timescale as u64, gst::ClockTime::SECOND.nseconds()) + .context("too long duration")?; + + // Media duration in mvhd.timescale units + v.extend(duration.to_be_bytes()); + + Ok(()) +} + +fn write_trex(v: &mut Vec, _cfg: &super::HeaderConfiguration) -> Result<(), Error> { + // Track ID + v.extend(1u32.to_be_bytes()); + + // Default sample description index + v.extend(1u32.to_be_bytes()); + + // Default sample duration + v.extend(0u32.to_be_bytes()); + + // Default sample size + v.extend(0u32.to_be_bytes()); + + // Default sample flags + v.extend(0u32.to_be_bytes()); + + // Default sample duration/size/etc will be provided in the traf/trun if one can be determined + // for a whole fragment + + Ok(()) +} + +/// Creates `styp` and `moof` boxes and `mdat` header +pub(super) fn create_fmp4_fragment_header( + cfg: super::FragmentHeaderConfiguration, +) -> Result<(gst::Buffer, u64), Error> { + let mut v = vec![]; + + let (brand, compatible_brands) = brands_from_variant_and_caps(cfg.variant, cfg.caps); + + write_box(&mut v, b"styp", |v| { + // major brand + v.extend(brand); + // minor version + v.extend(0u32.to_be_bytes()); + // compatible brands + v.extend(compatible_brands.into_iter().flatten()); + + Ok(()) + })?; + + let styp_len = v.len(); + + let data_offset_offset = write_box(&mut v, b"moof", |v| write_moof(v, &cfg))?; + + let size = cfg + .buffers + .iter() + .map(|buffer| buffer.buffer.size() as u64) + .sum::(); + if let Ok(size) = u32::try_from(size + 8) { + v.extend(size.to_be_bytes()); + v.extend(b"mdat"); + } else { + v.extend(1u32.to_be_bytes()); + v.extend(b"mdat"); + v.extend((size + 16).to_be_bytes()); + } + + let data_offset = v.len() - styp_len; + v[data_offset_offset..][..4].copy_from_slice(&(data_offset as u32).to_be_bytes()); + + Ok((gst::Buffer::from_mut_slice(v), styp_len as u64)) +} + +#[allow(clippy::too_many_arguments)] +fn write_moof(v: &mut Vec, cfg: &super::FragmentHeaderConfiguration) -> Result { + write_full_box(v, b"mfhd", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { + write_mfhd(v, cfg) + })?; + let data_offset_offset = write_box(v, b"traf", |v| write_traf(v, cfg))?; + + Ok(data_offset_offset) +} + +fn write_mfhd(v: &mut Vec, cfg: &super::FragmentHeaderConfiguration) -> Result<(), Error> { + v.extend(cfg.sequence_number.to_be_bytes()); + + Ok(()) +} + +#[allow(clippy::identity_op)] +fn sample_flags_from_buffer(buffer: &gst::BufferRef, intra_only: bool) -> u32 { + if intra_only { + (0b00u32 << (16 + 10)) | // leading: unknown + (0b10u32 << (16 + 8)) | // depends: no + (0b10u32 << (16 + 6)) | // depended: no + (0b00u32 << (16 + 4)) | // redundancy: unknown + (0b000u32 << (16 + 1)) | // padding: no + (0b0u32 << 16) | // non-sync-sample: no + (0u32) // degradation priority + } else { + let depends = if buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) { + 0b01u32 + } else { + 0b10u32 + }; + let depended = if buffer.flags().contains(gst::BufferFlags::DROPPABLE) { + 0b10u32 + } else { + 0b00u32 + }; + let non_sync_sample = if buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) { + 0b1u32 + } else { + 0b0u32 + }; + + (0b00u32 << (16 + 10)) | // leading: unknown + (depends << (16 + 8)) | // depends + (depended << (16 + 6)) | // depended + (0b00u32 << (16 + 4)) | // redundancy: unknown + (0b000u32 << (16 + 1)) | // padding: no + (non_sync_sample << 16) | // non-sync-sample + (0u32) // degradation priority + } +} + +fn composition_time_offset_from_pts_dts( + pts: gst::ClockTime, + dts: Option, + timescale: u32, +) -> Result { + let (_, pts, dts) = timestamp_from_pts_dts(pts, dts, true, timescale)?; + let dts = dts.expect("no DTS"); + + let diff = if pts > dts { + i32::try_from((pts - dts) as i64).context("pts-dts diff too big")? + } else { + let diff = dts - pts; + i32::try_from(-(diff as i64)).context("pts-dts diff too big")? + }; + + Ok(diff) +} + +fn timestamp_from_pts_dts( + pts: gst::ClockTime, + dts: Option, + check_dts: bool, + timescale: u32, +) -> Result<(u64, u64, Option), Error> { + let pts = pts + .nseconds() + .mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds()) + .context("too big PTS")?; + + if check_dts { + let dts = dts.expect("no DTS"); + let dts = dts + .nseconds() + .mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds()) + .context("too big DTS")?; + Ok((dts, pts, Some(dts))) + } else { + Ok((pts, pts, None)) + } +} + +const DEFAULT_SAMPLE_DURATION_PRESENT: u32 = 0x08; +const DEFAULT_SAMPLE_SIZE_PRESENT: u32 = 0x10; +const DEFAULT_SAMPLE_FLAGS_PRESENT: u32 = 0x20; +const DEFAULT_BASE_IS_MOOF: u32 = 0x2_00_00; + +const DATA_OFFSET_PRESENT: u32 = 0x0_01; +const FIRST_SAMPLE_FLAGS_PRESENT: u32 = 0x0_04; +const SAMPLE_DURATION_PRESENT: u32 = 0x1_00; +const SAMPLE_SIZE_PRESENT: u32 = 0x2_00; +const SAMPLE_FLAGS_PRESENT: u32 = 0x4_00; +const SAMPLE_COMPOSITION_TIME_OFFSET_PRESENT: u32 = 0x8_00; + +#[allow(clippy::type_complexity)] +fn analyze_buffers( + cfg: &super::FragmentHeaderConfiguration, + check_dts: bool, + intra_only: bool, + timescale: u32, +) -> Result< + ( + // tf_flags + u32, + // tr_flags + u32, + // default size + Option, + // default duration + Option, + // default flags + Option, + // negative composition time offsets + bool, + ), + Error, +> { + let mut tf_flags = DEFAULT_BASE_IS_MOOF; + let mut tr_flags = DATA_OFFSET_PRESENT; + + let mut last_timestamp = None; + let mut duration = None; + let mut size = None; + let mut first_buffer_flags = None; + let mut flags = None; + + let mut negative_composition_time_offsets = false; + + for Buffer { buffer, pts, dts } in cfg.buffers { + if size.is_none() { + size = Some(buffer.size() as u32); + } + if Some(buffer.size() as u32) != size { + tr_flags |= SAMPLE_SIZE_PRESENT; + } + + { + let (current_timestamp, _pts, _dts) = + timestamp_from_pts_dts(*pts, *dts, check_dts, timescale)?; + + if let Some(prev_timestamp) = last_timestamp { + let dur = u32::try_from(current_timestamp.saturating_sub(prev_timestamp)) + .context("too big sample duration")?; + last_timestamp = Some(current_timestamp); + + if duration.is_none() { + duration = Some(dur); + } + if Some(dur) != duration { + tr_flags |= SAMPLE_DURATION_PRESENT; + } + } else { + last_timestamp = Some(current_timestamp); + } + } + + let f = sample_flags_from_buffer(buffer, intra_only); + if first_buffer_flags.is_none() { + first_buffer_flags = Some(f); + } else if flags.is_none() { + if Some(f) != first_buffer_flags { + tr_flags |= FIRST_SAMPLE_FLAGS_PRESENT; + } + flags = Some(f); + } + + if flags.is_some() && Some(f) != flags { + tr_flags &= !FIRST_SAMPLE_FLAGS_PRESENT; + tr_flags |= SAMPLE_FLAGS_PRESENT; + } + + if check_dts { + let diff = composition_time_offset_from_pts_dts(*pts, *dts, timescale)?; + if diff != 0 { + tr_flags |= SAMPLE_COMPOSITION_TIME_OFFSET_PRESENT; + } + if diff < 0 { + negative_composition_time_offsets = true; + } + } + } + + // Check duration of the last buffer against end_pts / end_dts + { + let current_timestamp = if check_dts { + cfg.end_dts.expect("no end DTS") + } else { + cfg.end_pts + }; + let current_timestamp = current_timestamp + .nseconds() + .mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds()) + .context("too big timestamp")?; + + if let Some(prev_timestamp) = last_timestamp { + let dur = u32::try_from(current_timestamp.saturating_sub(prev_timestamp)) + .context("too big sample duration")?; + + if duration.is_none() { + duration = Some(dur); + } + if Some(dur) != duration { + tr_flags |= SAMPLE_DURATION_PRESENT; + } + } + } + + if (tr_flags & SAMPLE_SIZE_PRESENT) == 0 { + tf_flags |= DEFAULT_SAMPLE_SIZE_PRESENT; + } else { + size = None; + } + + if (tr_flags & SAMPLE_DURATION_PRESENT) == 0 { + tf_flags |= DEFAULT_SAMPLE_DURATION_PRESENT; + } else { + duration = None; + } + + if (tr_flags & SAMPLE_FLAGS_PRESENT) == 0 { + tf_flags |= DEFAULT_SAMPLE_FLAGS_PRESENT; + } else { + flags = None; + } + + Ok(( + tf_flags, + tr_flags, + size, + duration, + flags, + negative_composition_time_offsets, + )) +} + +fn write_traf(v: &mut Vec, cfg: &super::FragmentHeaderConfiguration) -> Result { + let s = cfg.caps.structure(0).unwrap(); + let timescale = caps_to_timescale(cfg.caps); + + let check_dts = matches!(s.name(), "video/x-h264" | "video/x-h265"); + let intra_only = matches!(s.name(), "audio/mpeg"); + + // Analyze all buffers to know what values can be put into the tfhd for all samples and what + // has to be stored for every single sample + let ( + tf_flags, + tr_flags, + default_size, + default_duration, + default_flags, + negative_composition_time_offsets, + ) = analyze_buffers(cfg, check_dts, intra_only, timescale)?; + + write_full_box(v, b"tfhd", FULL_BOX_VERSION_0, tf_flags, |v| { + write_tfhd(v, cfg, default_size, default_duration, default_flags) + })?; + write_full_box(v, b"tfdt", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| { + write_tfdt(v, cfg, timescale) + })?; + + let data_offset_offset = write_full_box( + v, + b"trun", + if negative_composition_time_offsets { + FULL_BOX_VERSION_1 + } else { + FULL_BOX_VERSION_0 + }, + tr_flags, + |v| write_trun(v, cfg, tr_flags, check_dts, intra_only, timescale), + )?; + + // TODO: saio, saiz, sbgp, sgpd, subs? + + Ok(data_offset_offset) +} + +fn write_tfhd( + v: &mut Vec, + _cfg: &super::FragmentHeaderConfiguration, + default_size: Option, + default_duration: Option, + default_flags: Option, +) -> Result<(), Error> { + // Track ID + v.extend(1u32.to_be_bytes()); + + // No base data offset, no sample description index + + if let Some(default_duration) = default_duration { + v.extend(default_duration.to_be_bytes()); + } + + if let Some(default_size) = default_size { + v.extend(default_size.to_be_bytes()); + } + + if let Some(default_flags) = default_flags { + v.extend(default_flags.to_be_bytes()); + } + + Ok(()) +} + +fn write_tfdt( + v: &mut Vec, + cfg: &super::FragmentHeaderConfiguration, + timescale: u32, +) -> Result<(), Error> { + let base_time = cfg + .start_dts + .unwrap_or(cfg.earliest_pts) + .mul_div_floor(timescale as u64, gst::ClockTime::SECOND.nseconds()) + .context("base time overflow")?; + + v.extend(base_time.to_be_bytes()); + + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +fn write_trun( + v: &mut Vec, + cfg: &super::FragmentHeaderConfiguration, + tr_flags: u32, + check_dts: bool, + intra_only: bool, + timescale: u32, +) -> Result { + // Sample count + v.extend((cfg.buffers.len() as u32).to_be_bytes()); + + let data_offset_offset = v.len(); + // Data offset, will be rewritten later + v.extend(0i32.to_be_bytes()); + + if (tr_flags & FIRST_SAMPLE_FLAGS_PRESENT) != 0 { + v.extend(sample_flags_from_buffer(&cfg.buffers[0].buffer, intra_only).to_be_bytes()); + } + + let last_timestamp = if check_dts { + cfg.end_dts.expect("no end DTS") + } else { + cfg.end_pts + }; + let last_timestamp = last_timestamp + .nseconds() + .mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds()) + .context("too big timestamp")?; + + for (Buffer { buffer, pts, dts }, next_timestamp) in Iterator::zip( + cfg.buffers.iter(), + cfg.buffers + .iter() + .skip(1) + .map(|Buffer { pts, dts, .. }| { + timestamp_from_pts_dts(*pts, *dts, check_dts, timescale) + .map(|(current_timestamp, _pts, _dts)| current_timestamp) + }) + .chain(Some(Ok(last_timestamp))), + ) { + let next_timestamp = next_timestamp?; + + if (tr_flags & SAMPLE_DURATION_PRESENT) != 0 { + // Sample duration + let (current_timestamp, _pts, _dts) = + timestamp_from_pts_dts(*pts, *dts, check_dts, timescale)?; + let dur = u32::try_from(next_timestamp.saturating_sub(current_timestamp)) + .context("too big sample duration")?; + v.extend(dur.to_be_bytes()); + } + + if (tr_flags & SAMPLE_SIZE_PRESENT) != 0 { + // Sample size + v.extend((buffer.size() as u32).to_be_bytes()); + } + + if (tr_flags & SAMPLE_FLAGS_PRESENT) != 0 { + assert!((tr_flags & FIRST_SAMPLE_FLAGS_PRESENT) == 0); + + // Sample flags + v.extend(sample_flags_from_buffer(buffer, intra_only).to_be_bytes()); + } + + if (tr_flags & SAMPLE_COMPOSITION_TIME_OFFSET_PRESENT) != 0 { + // Sample composition time offset + v.extend(composition_time_offset_from_pts_dts(*pts, *dts, timescale)?.to_be_bytes()); + } + } + + Ok(data_offset_offset) +} + +/// Creates `mfra` box +pub(crate) fn create_mfra( + caps: &gst::CapsRef, + fragment_offsets: &[super::FragmentOffset], +) -> Result { + let timescale = caps_to_timescale(caps); + + let mut v = vec![]; + + let offset = write_box(&mut v, b"mfra", |v| { + write_full_box(v, b"tfra", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| { + // Track ID + v.extend(1u32.to_be_bytes()); + + // Reserved / length of traf/trun/sample + v.extend(0u32.to_be_bytes()); + + // Number of entries + v.extend( + u32::try_from(fragment_offsets.len()) + .context("too many fragments")? + .to_be_bytes(), + ); + + for super::FragmentOffset { time, offset } in fragment_offsets { + // Time + let time = time + .nseconds() + .mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds()) + .context("time overflow")?; + v.extend(time.to_be_bytes()); + + // moof offset + v.extend(offset.to_be_bytes()); + + // traf/trun/sample number + v.extend_from_slice(&[1u8; 3][..]); + } + + Ok(()) + })?; + + let offset = write_full_box(v, b"mfro", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { + let offset = v.len(); + // Parent size + v.extend(0u32.to_be_bytes()); + Ok(offset) + })?; + + Ok(offset) + })?; + + let len = u32::try_from(v.len() as u64).context("too big mfra")?; + v[offset..][..4].copy_from_slice(&len.to_be_bytes()); + + Ok(gst::Buffer::from_mut_slice(v)) +} diff --git a/generic/fmp4/src/fmp4mux/imp.rs b/generic/fmp4/src/fmp4mux/imp.rs new file mode 100644 index 00000000..26e0df14 --- /dev/null +++ b/generic/fmp4/src/fmp4mux/imp.rs @@ -0,0 +1,1508 @@ +// Copyright (C) 2021 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; +use gst::subclass::prelude::*; +use gst::{gst_debug, gst_error, gst_info, gst_trace, gst_warning}; + +use std::collections::VecDeque; +use std::sync::Mutex; + +use once_cell::sync::Lazy; + +use super::boxes; +use super::Buffer; + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "fmp4mux", + gst::DebugColorFlags::empty(), + Some("FMP4Mux Element"), + ) +}); + +const DEFAULT_FRAGMENT_DURATION: gst::ClockTime = gst::ClockTime::from_seconds(10); +const DEFAULT_HEADER_UPDATE_MODE: super::HeaderUpdateMode = super::HeaderUpdateMode::None; +const DEFAULT_WRITE_MFRA: bool = false; +const DEFAULT_WRITE_MEHD: bool = false; + +#[derive(Debug, Clone)] +struct Settings { + fragment_duration: gst::ClockTime, + header_update_mode: super::HeaderUpdateMode, + write_mfra: bool, + write_mehd: bool, +} + +impl Default for Settings { + fn default() -> Self { + Settings { + fragment_duration: DEFAULT_FRAGMENT_DURATION, + header_update_mode: DEFAULT_HEADER_UPDATE_MODE, + write_mfra: DEFAULT_WRITE_MFRA, + write_mehd: DEFAULT_WRITE_MEHD, + } + } +} + +struct Gop { + // Running times + start_pts: gst::ClockTime, + start_dts: Option, + earliest_pts: gst::ClockTime, + // Once this is known to be the final earliest PTS/DTS + final_earliest_pts: bool, + // PTS plus duration of last buffer, or start of next GOP + end_pts: gst::ClockTime, + // DTS plus duration of last buffer, or start of next GOP + end_dts: Option, + + // Buffer positions + earliest_pts_position: gst::ClockTime, + start_dts_position: Option, + + // Buffer, PTS running time, DTS running time + buffers: Vec, +} + +#[derive(Default)] +struct State { + segment: Option>, + caps: Option, + intra_only: bool, + + // Created once we received caps and kept up to date with the caps, + // sent as part of the buffer list for the first fragment. + stream_header: Option, + + sequence_number: u32, + queued_gops: VecDeque, + // Duration of all GOPs except for the newest one that is still being filled + queued_duration: gst::ClockTime, + + // Difference between the first DTS and 0 in case of negative DTS + dts_offset: Option, + + last_force_keyunit_time: Option, + + // Fragment tracking for mfra + current_offset: u64, + fragment_offsets: Vec, + + // Start / end PTS of the whole stream + earliest_pts: Option, + end_pts: Option, +} + +pub(crate) struct FMP4Mux { + srcpad: gst::Pad, + sinkpad: gst::Pad, + state: Mutex, + settings: Mutex, +} + +impl FMP4Mux { + fn queue_input( + &self, + element: &super::FMP4Mux, + state: &mut State, + buffer: gst::Buffer, + ) -> Result<(), gst::FlowError> { + gst_trace!(CAT, obj: element, "Handling buffer {:?}", buffer); + + let segment = match state.segment { + Some(ref segment) => segment, + None => { + gst_error!(CAT, obj: element, "Got buffer before segment"); + return Err(gst::FlowError::Error); + } + }; + + if state.caps.is_none() { + gst_error!(CAT, obj: element, "Got buffer before caps"); + return Err(gst::FlowError::NotNegotiated); + } + + let intra_only = state.intra_only; + + if !intra_only && buffer.dts().is_none() { + gst_error!(CAT, obj: element, "Require DTS for video streams"); + return Err(gst::FlowError::Error); + } + + if intra_only && buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) { + gst_error!(CAT, obj: element, "Intra-only stream with delta units"); + return Err(gst::FlowError::Error); + } + + let pts = buffer.pts().ok_or_else(|| { + gst_error!(CAT, obj: element, "Require timestamped buffers"); + gst::FlowError::Error + })?; + let duration = buffer.duration(); + let end_pts = duration.opt_add(pts).unwrap_or(pts); + + let pts = match segment.to_running_time_full(pts) { + (_, None) => { + gst_error!(CAT, obj: element, "Couldn't convert PTS to running time"); + return Err(gst::FlowError::Error); + } + (pts_signum, _) if pts_signum < 0 => { + gst_error!(CAT, obj: element, "Negative PTSs are not supported"); + return Err(gst::FlowError::Error); + } + (_, Some(pts)) => pts, + }; + + let end_pts = match segment.to_running_time_full(end_pts) { + (_, None) => { + gst_error!( + CAT, + obj: element, + "Couldn't convert end PTS to running time" + ); + return Err(gst::FlowError::Error); + } + (pts_signum, _) if pts_signum < 0 => { + gst_error!(CAT, obj: element, "Negative PTSs are not supported"); + return Err(gst::FlowError::Error); + } + (_, Some(pts)) => pts, + }; + + let (dts, end_dts) = if intra_only { + (None, None) + } else { + // with the dts_offset by having negative composition time offsets in the `trun` box. + let dts = buffer.dts().expect("not DTS"); + let end_dts = duration.opt_add(dts).unwrap_or(dts); + + let dts = match segment.to_running_time_full(dts) { + (_, None) => { + gst_error!(CAT, obj: element, "Couldn't convert DTS to running time"); + return Err(gst::FlowError::Error); + } + (pts_signum, Some(dts)) if pts_signum < 0 => { + if state.dts_offset.is_none() { + state.dts_offset = Some(dts); + } + + let dts_offset = state.dts_offset.unwrap(); + if dts > dts_offset { + gst_warning!(CAT, obj: element, "DTS before first DTS"); + gst::ClockTime::ZERO + } else { + dts_offset - dts + } + } + (_, Some(dts)) => { + if let Some(dts_offset) = state.dts_offset { + dts + dts_offset + } else { + dts + } + } + }; + + let end_dts = match segment.to_running_time_full(end_dts) { + (_, None) => { + gst_error!( + CAT, + obj: element, + "Couldn't convert end DTS to running time" + ); + return Err(gst::FlowError::Error); + } + (pts_signum, Some(dts)) if pts_signum < 0 => { + if state.dts_offset.is_none() { + state.dts_offset = Some(dts); + } + + let dts_offset = state.dts_offset.unwrap(); + if dts > dts_offset { + gst_warning!(CAT, obj: element, "End DTS before first DTS"); + gst::ClockTime::ZERO + } else { + dts_offset - dts + } + } + (_, Some(dts)) => { + if let Some(dts_offset) = state.dts_offset { + dts + dts_offset + } else { + dts + } + } + }; + (Some(dts), Some(end_dts)) + }; + + if !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) { + gst_debug!( + CAT, + obj: element, + "Starting new GOP at PTS {} DTS {}", + pts, + dts.display() + ); + + let gop = Gop { + start_pts: pts, + start_dts: dts, + start_dts_position: if intra_only { None } else { buffer.dts() }, + earliest_pts: pts, + earliest_pts_position: buffer.pts().expect("no PTS"), + final_earliest_pts: intra_only, + end_pts, + end_dts, + buffers: vec![Buffer { buffer, pts, dts }], + }; + state.queued_gops.push_front(gop); + + if let Some(prev_gop) = state.queued_gops.get_mut(1) { + gst_debug!( + CAT, + obj: element, + "Updating previous GOP starting at PTS {} to end PTS {} DTS {}", + prev_gop.earliest_pts, + pts, + dts.display(), + ); + prev_gop.end_pts = pts; + prev_gop.end_dts = dts; + + if !prev_gop.final_earliest_pts { + // Don't bother logging this for intra-only streams as it would be for every + // single buffer. + if !intra_only { + gst_debug!( + CAT, + obj: element, + "Previous GOP has final earliest PTS at {}", + prev_gop.earliest_pts + ); + } + + prev_gop.final_earliest_pts = true; + + state.queued_duration = + prev_gop.end_pts - state.queued_gops.back().unwrap().earliest_pts; + gst_debug!( + CAT, + obj: element, + "Queued duration updated to {}", + state.queued_duration + ); + } else if intra_only { + state.queued_duration = + prev_gop.end_pts - state.queued_gops.back().unwrap().earliest_pts; + gst_debug!( + CAT, + obj: element, + "Queued duration updated to {}", + state.queued_duration + ); + } + } + } else if let Some(gop) = state.queued_gops.front_mut() { + assert!(!intra_only); + + // We require DTS for non-intra-only streams + let dts = dts.unwrap(); + let end_dts = end_dts.unwrap(); + let pts_position = buffer.pts().expect("no PTS"); + + gop.end_pts = std::cmp::max(gop.end_pts, end_pts); + gop.end_dts = Some(std::cmp::max(gop.end_dts.expect("no end DTS"), end_dts)); + gop.buffers.push(Buffer { + buffer, + pts, + dts: Some(dts), + }); + + if gop.earliest_pts > pts && !gop.final_earliest_pts { + gst_debug!( + CAT, + obj: element, + "Updating current GOP earliest PTS from {} to {}", + gop.earliest_pts, + pts + ); + gop.earliest_pts = pts; + gop.earliest_pts_position = pts_position; + + if let Some(prev_gop) = state.queued_gops.get_mut(1) { + gst_debug!( + CAT, + obj: element, + "Updating previous GOP starting PTS {} end time from {} to {}", + pts, + prev_gop.end_pts, + pts + ); + prev_gop.end_pts = pts; + } + } + + let gop = state.queued_gops.front_mut().unwrap(); + + // The earliest PTS is known when the current DTS is bigger or equal to the first + // PTS that was observed in this GOP. If there was another frame later that had a + // lower PTS then it wouldn't be possible to display it in time anymore, i.e. the + // stream would be invalid. + if gop.start_pts <= dts && !gop.final_earliest_pts { + gst_debug!( + CAT, + obj: element, + "GOP has final earliest PTS at {}", + gop.earliest_pts + ); + gop.final_earliest_pts = true; + + if let Some(prev_gop) = state.queued_gops.get_mut(1) { + state.queued_duration = + prev_gop.end_pts - state.queued_gops.back().unwrap().earliest_pts; + gst_debug!( + CAT, + obj: element, + "Queued duration updated to {}", + state.queued_duration + ); + } + } + } else { + gst_warning!( + CAT, + obj: element, + "Waiting for keyframe at the beginning of the stream" + ); + } + + Ok(()) + } + + fn create_force_keyunit_event( + &self, + element: &super::FMP4Mux, + state: &mut State, + settings: &Settings, + pts: gst::ClockTime, + ) -> Result, gst::FlowError> { + let segment = state.segment.as_ref().expect("no segment"); + + // If we never sent a force-keyunit event then wait until the earliest PTS of the first GOP + // is known and send one now. + // + // Otherwise if the current PTS is a fragment duration in the future, send the next one + // now. + let oldest_gop = state.queued_gops.back().unwrap(); + let earliest_pts = oldest_gop.earliest_pts; + let pts = segment.to_running_time(pts).expect("no running time"); + + if state.last_force_keyunit_time.is_none() && oldest_gop.final_earliest_pts { + let fku_running_time = earliest_pts + settings.fragment_duration; + gst_debug!( + CAT, + obj: element, + "Sending first force-keyunit event for running time {}", + fku_running_time + ); + state.last_force_keyunit_time = Some(fku_running_time); + + return Ok(Some( + gst_video::UpstreamForceKeyUnitEvent::builder() + .running_time(fku_running_time) + .all_headers(true) + .build(), + )); + } else if state.last_force_keyunit_time.is_some() + && state.last_force_keyunit_time <= Some(pts) + { + let fku_running_time = + state.last_force_keyunit_time.unwrap() + settings.fragment_duration; + gst_debug!( + CAT, + obj: element, + "Sending force-keyunit event for running time {}", + fku_running_time + ); + state.last_force_keyunit_time = Some(fku_running_time); + + return Ok(Some( + gst_video::UpstreamForceKeyUnitEvent::builder() + .running_time(fku_running_time) + .all_headers(true) + .build(), + )); + } + + Ok(None) + } + + fn drain( + &self, + element: &super::FMP4Mux, + state: &mut State, + settings: &Settings, + at_eos: bool, + ) -> Result, gst::FlowError> { + let class = element.class(); + + if state.queued_duration < settings.fragment_duration && !at_eos { + return Ok(None); + } + + assert!(at_eos || state.queued_gops.get(1).map(|gop| gop.final_earliest_pts) == Some(true)); + + // At EOS, finalize all GOPs and drain them out. Otherwise if the queued duration is + // equal to the fragment duration then drain out all complete GOPs, otherwise all + // except for the newest complete GOP. + let drain_gops = if at_eos { + gst_info!(CAT, obj: element, "Draining at EOS"); + state.queued_duration = gst::ClockTime::ZERO; + state + .queued_gops + .drain(..) + .map(|mut gop| { + gop.final_earliest_pts = true; + gop + }) + .collect::>() + } else if state.queued_duration == settings.fragment_duration + || state.queued_gops.len() == 2 + { + state.queued_duration = gst::ClockTime::ZERO; + state.queued_gops.drain(1..).collect::>() + } else { + let gops = state.queued_gops.drain(2..).collect::>(); + + let gop = state.queued_gops.front().unwrap(); + if gop.final_earliest_pts { + let prev_gop = state.queued_gops.get(1).unwrap(); + state.queued_duration = + prev_gop.end_pts - state.queued_gops.back().unwrap().earliest_pts; + } else { + state.queued_duration = gst::ClockTime::ZERO; + } + + gops + }; + + let mut buffer_list = None; + + if !drain_gops.is_empty() { + let earliest_pts = drain_gops.last().unwrap().earliest_pts; + let earliest_pts_position = drain_gops.last().unwrap().earliest_pts_position; + let start_dts = drain_gops.last().unwrap().start_dts; + let start_dts_position = drain_gops.last().unwrap().start_dts_position; + let end_pts = drain_gops[0].end_pts; + let end_dts = drain_gops[0].end_dts; + let dts_offset = state.dts_offset; + + gst_info!( + CAT, + obj: element, + "Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}", + end_pts - earliest_pts, + earliest_pts, + start_dts.display(), + dts_offset.display(), + ); + + let mut fmp4_header = None; + if state.sequence_number == 0 { + let mut buffer = state.stream_header.as_ref().unwrap().copy(); + { + let buffer = buffer.get_mut().unwrap(); + + buffer.set_pts(earliest_pts_position); + buffer.set_dts(start_dts_position); + + // Header is DISCONT|HEADER + buffer.set_flags(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER); + } + + fmp4_header = Some(buffer); + + state.earliest_pts = Some(earliest_pts); + state.sequence_number = 1; + } + + let mut buffers = drain_gops + .into_iter() + .rev() + .map(|gop| gop.buffers) + .flatten() + .collect::>(); + + // TODO: Write prft boxes before moof + // TODO: Write sidx boxes before moof and rewrite once offsets are known + + let sequence_number = state.sequence_number; + state.sequence_number += 1; + let (mut fmp4_fragment_header, moof_offset) = + boxes::create_fmp4_fragment_header(super::FragmentHeaderConfiguration { + variant: class.as_ref().variant, + sequence_number, + caps: state.caps.as_ref().unwrap(), + buffers: &buffers, + earliest_pts, + start_dts, + end_pts, + end_dts, + dts_offset, + }) + .map_err(|err| { + gst_error!( + CAT, + obj: element, + "Failed to create FMP4 fragment header: {}", + err + ); + gst::FlowError::Error + })?; + + { + let buffer = fmp4_fragment_header.get_mut().unwrap(); + buffer.set_pts(earliest_pts_position); + buffer.set_dts(start_dts_position); + buffer.set_duration(end_pts.checked_sub(earliest_pts)); + + // Fragment header is HEADER + buffer.set_flags(gst::BufferFlags::HEADER); + + // Copy metas from the first actual buffer to the fragment header. This allows + // getting things like the reference timestamp meta or the timecode meta to identify + // the fragment. + let _ = buffers[0] + .buffer + .copy_into(buffer, gst::BufferCopyFlags::META, 0, None); + } + + let moof_offset = state.current_offset + + fmp4_header.as_ref().map(|h| h.size()).unwrap_or(0) as u64 + + moof_offset; + + let buffers_len = buffers.len(); + for (idx, buffer) in buffers.iter_mut().enumerate() { + // Fix up buffer flags, all other buffers are DELTA_UNIT + let buffer_ref = buffer.buffer.make_mut(); + buffer_ref.unset_flags(gst::BufferFlags::all()); + buffer_ref.set_flags(gst::BufferFlags::DELTA_UNIT); + + // Set the marker flag for the last buffer of the segment + if idx == buffers_len - 1 { + buffer_ref.set_flags(gst::BufferFlags::MARKER); + } + } + + buffer_list = Some( + fmp4_header + .into_iter() + .chain(Some(fmp4_fragment_header)) + .chain(buffers.into_iter().map(|buffer| buffer.buffer)) + .inspect(|b| { + state.current_offset += b.size() as u64; + }) + .collect::(), + ); + + state.fragment_offsets.push(super::FragmentOffset { + time: earliest_pts, + offset: moof_offset, + }); + state.end_pts = Some(end_pts); + + gst_debug!( + CAT, + obj: element, + "Queued duration updated to {} after draining", + state.queued_duration + ); + } + + if settings.write_mfra && at_eos { + match boxes::create_mfra(state.caps.as_ref().unwrap(), &state.fragment_offsets) { + Ok(mut mfra) => { + { + let mfra = mfra.get_mut().unwrap(); + // mfra is HEADER|DELTA_UNIT like other boxes + mfra.set_flags(gst::BufferFlags::HEADER | gst::BufferFlags::DELTA_UNIT); + } + + if buffer_list.is_none() { + buffer_list = Some(gst::BufferList::new_sized(1)); + } + buffer_list.as_mut().unwrap().get_mut().unwrap().add(mfra); + } + Err(err) => { + gst_error!(CAT, obj: element, "Failed to create mfra box: {}", err); + } + } + } + + // TODO: Write edit list at EOS + // TODO: Rewrite bitrates at EOS + + Ok(buffer_list) + } + + fn update_header( + &self, + element: &super::FMP4Mux, + state: &mut State, + settings: &Settings, + at_eos: bool, + ) -> Result, gst::FlowError> { + let class = element.class(); + let variant = class.as_ref().variant; + + if settings.header_update_mode == super::HeaderUpdateMode::None && at_eos { + return Ok(None); + } + + assert!(!at_eos || state.queued_gops.is_empty()); + + let duration = state + .end_pts + .opt_checked_sub(state.earliest_pts) + .ok() + .flatten(); + + let mut buffer = boxes::create_fmp4_header(super::HeaderConfiguration { + variant, + update: at_eos, + caps: state.caps.as_ref().unwrap(), + write_mehd: settings.write_mehd, + duration: if at_eos { duration } else { None }, + }) + .map_err(|err| { + gst_error!(CAT, obj: element, "Failed to create FMP4 header: {}", err); + gst::FlowError::Error + })?; + + { + let buffer = buffer.get_mut().unwrap(); + + // No timestamps + + // Header is DISCONT|HEADER + buffer.set_flags(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER); + } + + // Remember stream header for later + state.stream_header = Some(buffer.clone()); + + let variant = match variant { + super::Variant::ISO | super::Variant::DASH => "iso-fragmented", + super::Variant::CMAF => "cmaf", + }; + let caps = gst::Caps::builder("video/quicktime") + .field("variant", variant) + .field("streamheader", gst::Array::new(&[&buffer])) + .build(); + + let mut list = gst::BufferList::new_sized(1); + { + let list = list.get_mut().unwrap(); + list.add(buffer); + } + + Ok(Some((list, caps))) + } + + fn sink_chain( + &self, + _pad: &gst::Pad, + element: &super::FMP4Mux, + buffer: gst::Buffer, + ) -> Result { + let settings = self.settings.lock().unwrap().clone(); + + let mut upstream_events = vec![]; + + let buffers = { + let mut state = self.state.lock().unwrap(); + + let pts = buffer.pts(); + + // Queue up the buffer and update GOP tracking state + self.queue_input(element, &mut state, buffer)?; + + // If we have a PTS with this buffer, check if a new force-keyunit event for the next + // fragment start has to be created + if let Some(pts) = pts { + if let Some(event) = + self.create_force_keyunit_event(element, &mut state, &settings, pts)? + { + upstream_events.push(event); + } + } + + // If enough GOPs were queued, drain and create the output fragment + self.drain(element, &mut state, &settings, false)? + }; + + for event in upstream_events { + self.sinkpad.push_event(event); + } + + if let Some(buffers) = buffers { + gst_trace!(CAT, obj: element, "Pushing buffer list {:?}", buffers); + self.srcpad.push_list(buffers)?; + } + + Ok(gst::FlowSuccess::Ok) + } + + fn sink_event(&self, pad: &gst::Pad, element: &super::FMP4Mux, mut event: gst::Event) -> bool { + use gst::EventView; + + gst_trace!(CAT, obj: pad, "Handling event {:?}", event); + + match event.view() { + EventView::Segment(ev) => { + let segment = match ev.segment().downcast_ref::() { + Some(segment) => { + gst_info!(CAT, obj: pad, "Received segment {:?}", segment); + segment.clone() + } + None => { + gst_warning!( + CAT, + obj: pad, + "Received non-TIME segment, replacing with default TIME segment" + ); + let segment = gst::FormattedSegment::new(); + event = gst::event::Segment::builder(&segment) + .seqnum(event.seqnum()) + .build(); + segment + } + }; + + self.state.lock().unwrap().segment = Some(segment); + + self.srcpad.push_event(event) + } + EventView::Caps(ev) => { + let caps = ev.caps_owned(); + + gst_info!(CAT, obj: pad, "Received caps {:?}", caps); + let caps = { + let settings = self.settings.lock().unwrap().clone(); + let mut state = self.state.lock().unwrap(); + + let s = caps.structure(0).unwrap(); + + match s.name() { + "video/x-h264" | "video/x-h265" => { + if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) { + gst_error!(CAT, obj: pad, "Received caps without codec_data"); + return false; + } + } + "audio/mpeg" => { + if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) { + gst_error!(CAT, obj: pad, "Received caps without codec_data"); + return false; + } + state.intra_only = true; + } + _ => unreachable!(), + } + + state.caps = Some(caps); + + let (_, caps) = match self.update_header(element, &mut state, &settings, false) + { + Ok(Some(res)) => res, + _ => { + return false; + } + }; + + caps + }; + + self.srcpad.push_event(gst::event::Caps::new(&caps)) + } + EventView::Tag(_ev) => { + // TODO: Maybe store for putting into the headers of the next fragment? + + pad.event_default(Some(element), event) + } + EventView::Gap(_ev) => { + // TODO: queue up and check if draining is needed now + // i.e. make the last sample much longer + true + } + EventView::Eos(_ev) => { + let settings = self.settings.lock().unwrap().clone(); + + let drained = self.drain(element, &mut self.state.lock().unwrap(), &settings, true); + let update_header = + drained.is_ok() && settings.header_update_mode != super::HeaderUpdateMode::None; + + match drained { + Ok(Some(buffers)) => { + gst_trace!(CAT, obj: element, "Pushing buffer list {:?}", buffers); + + if let Err(err) = self.srcpad.push_list(buffers) { + gst_error!( + CAT, + obj: element, + "Failed pushing EOS buffers downstream: {:?}", + err, + ); + } + } + Ok(None) => {} + Err(err) => { + gst_error!(CAT, obj: element, "Failed draining at EOS: {:?}", err); + } + } + + if update_header { + let updated_header = self.update_header( + element, + &mut self.state.lock().unwrap(), + &settings, + true, + ); + match updated_header { + Ok(Some((buffer_list, caps))) => { + match settings.header_update_mode { + super::HeaderUpdateMode::None => unreachable!(), + super::HeaderUpdateMode::Rewrite => { + let mut q = gst::query::Seeking::new(gst::Format::Bytes); + if self.srcpad.peer_query(&mut q) && q.result().0 { + // Seek to the beginning with a default bytes segment + self.srcpad.push_event(gst::event::Segment::new( + &gst::FormattedSegment::::new(), + )); + + self.srcpad.push_event(gst::event::Caps::new(&caps)); + if let Err(err) = self.srcpad.push_list(buffer_list) { + gst_error!( + CAT, + obj: element, + "Failed pushing updated header buffer downstream: {:?}", + err, + ); + } + } else { + gst_error!(CAT, obj: element, "Can't rewrite header because downstream is not seekable"); + } + } + super::HeaderUpdateMode::Update => { + self.srcpad.push_event(gst::event::Caps::new(&caps)); + if let Err(err) = self.srcpad.push_list(buffer_list) { + gst_error!( + CAT, + obj: element, + "Failed pushing updated header buffer downstream: {:?}", + err, + ); + } + } + } + } + Ok(None) => {} + Err(err) => { + gst_error!( + CAT, + obj: element, + "Failed to generate updated header: {:?}", + err + ); + } + } + } + + pad.event_default(Some(element), event) + } + EventView::FlushStop(_ev) => { + let mut state = self.state.lock().unwrap(); + + state.segment = None; + state.queued_gops.clear(); + state.queued_duration = gst::ClockTime::ZERO; + state.dts_offset = None; + state.last_force_keyunit_time = None; + state.current_offset = 0; + state.fragment_offsets.clear(); + + pad.event_default(Some(element), event) + } + _ => pad.event_default(Some(element), event), + } + } + + fn sink_query( + &self, + pad: &gst::Pad, + element: &super::FMP4Mux, + query: &mut gst::QueryRef, + ) -> bool { + use gst::QueryView; + + gst_trace!(CAT, obj: pad, "Handling query {:?}", query); + + match query.view_mut() { + QueryView::Caps(mut q) => { + let state = self.state.lock().unwrap(); + + let allowed_caps = if let Some(ref caps) = state.caps { + // TODO: Maybe allow codec_data changes and similar? + caps.clone() + } else { + pad.pad_template_caps() + }; + + if let Some(filter_caps) = q.filter() { + let res = filter_caps + .intersect_with_mode(&allowed_caps, gst::CapsIntersectMode::First); + q.set_result(&res); + } else { + q.set_result(&allowed_caps); + } + + true + } + _ => pad.query_default(Some(element), query), + } + } + + fn src_event(&self, pad: &gst::Pad, element: &super::FMP4Mux, event: gst::Event) -> bool { + use gst::EventView; + + gst_trace!(CAT, obj: pad, "Handling event {:?}", event); + + match event.view() { + EventView::Seek(_ev) => false, + _ => pad.event_default(Some(element), event), + } + } + + fn src_query( + &self, + pad: &gst::Pad, + element: &super::FMP4Mux, + query: &mut gst::QueryRef, + ) -> bool { + use gst::QueryView; + + gst_trace!(CAT, obj: pad, "Handling query {:?}", query); + + match query.view_mut() { + QueryView::Seeking(mut q) => { + // We can't really handle seeking, it would break everything + q.set(false, gst::ClockTime::ZERO.into(), gst::ClockTime::NONE); + true + } + QueryView::Latency(mut q) => { + if !self.sinkpad.peer_query(q.query_mut()) { + return false; + } + + let settings = self.settings.lock().unwrap(); + let (live, min, max) = q.result(); + gst_info!( + CAT, + obj: pad, + "Upstream latency: live {}, min {}, max {}", + live, + min, + max.display() + ); + let (min, max) = ( + min + settings.fragment_duration, + max.opt_add(settings.fragment_duration), + ); + gst_info!( + CAT, + obj: pad, + "Returning latency: live {}, min {}, max {}", + live, + min, + max.display() + ); + q.set(live, min, max); + + true + } + _ => pad.query_default(Some(element), query), + } + } +} + +#[glib::object_subclass] +impl ObjectSubclass for FMP4Mux { + const NAME: &'static str = "GstFMP4Mux"; + type Type = super::FMP4Mux; + type ParentType = gst::Element; + type Class = Class; + + fn with_class(klass: &Self::Class) -> Self { + let templ = klass.pad_template("sink").unwrap(); + let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink")) + .chain_function(|pad, parent, buffer| { + FMP4Mux::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |fmp4mux, element| fmp4mux.sink_chain(pad, element, buffer), + ) + }) + .event_function(|pad, parent, event| { + FMP4Mux::catch_panic_pad_function( + parent, + || false, + |fmp4mux, element| fmp4mux.sink_event(pad, element, event), + ) + }) + .query_function(|pad, parent, query| { + FMP4Mux::catch_panic_pad_function( + parent, + || false, + |fmp4mux, element| fmp4mux.sink_query(pad, element, query), + ) + }) + .flags(gst::PadFlags::ACCEPT_INTERSECT) + .build(); + + let templ = klass.pad_template("src").unwrap(); + let srcpad = gst::Pad::builder_with_template(&templ, Some("src")) + .event_function(|pad, parent, event| { + FMP4Mux::catch_panic_pad_function( + parent, + || false, + |fmp4mux, element| fmp4mux.src_event(pad, element, event), + ) + }) + .query_function(|pad, parent, query| { + FMP4Mux::catch_panic_pad_function( + parent, + || false, + |fmp4mux, element| fmp4mux.src_query(pad, element, query), + ) + }) + .flags(gst::PadFlags::FIXED_CAPS | gst::PadFlags::ACCEPT_TEMPLATE) + .build(); + + Self { + srcpad, + sinkpad, + settings: Mutex::default(), + state: Mutex::default(), + } + } +} + +impl ObjectImpl for FMP4Mux { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + // TODO: Add chunk-duration property separate from fragment-size + glib::ParamSpec::new_uint64( + "fragment-duration", + "Fragment Duration", + "Duration for each FMP4 fragment", + 0, + u64::MAX, + DEFAULT_FRAGMENT_DURATION.nseconds(), + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), + glib::ParamSpec::new_enum( + "header-update-mode", + "Header update mode", + "Mode for updating the header at the end of the stream", + super::HeaderUpdateMode::static_type(), + DEFAULT_HEADER_UPDATE_MODE as i32, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), + glib::ParamSpec::new_boolean( + "write-mfra", + "Write mfra box", + "Write fragment random access box at the end of the stream", + DEFAULT_WRITE_MFRA, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), + glib::ParamSpec::new_boolean( + "write-mehd", + "Write mehd box", + "Write movie extends header box with the duration at the end of the stream (needs a header-update-mode enabled)", + DEFAULT_WRITE_MFRA, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), + ] + }); + + &*PROPERTIES + } + + fn set_property( + &self, + _obj: &Self::Type, + _id: usize, + value: &glib::Value, + pspec: &glib::ParamSpec, + ) { + match pspec.name() { + "fragment-duration" => { + let mut settings = self.settings.lock().unwrap(); + settings.fragment_duration = value.get().expect("type checked upstream"); + } + + "header-update-mode" => { + let mut settings = self.settings.lock().unwrap(); + settings.header_update_mode = value.get().expect("type checked upstream"); + } + + "write-mfra" => { + let mut settings = self.settings.lock().unwrap(); + settings.write_mfra = value.get().expect("type checked upstream"); + } + + "write-mehd" => { + let mut settings = self.settings.lock().unwrap(); + settings.write_mehd = value.get().expect("type checked upstream"); + } + + _ => unimplemented!(), + } + } + + fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "fragment-duration" => { + let settings = self.settings.lock().unwrap(); + settings.fragment_duration.to_value() + } + + "header-update-mode" => { + let settings = self.settings.lock().unwrap(); + settings.header_update_mode.to_value() + } + + "write-mfra" => { + let settings = self.settings.lock().unwrap(); + settings.write_mfra.to_value() + } + + "write-mehd" => { + let settings = self.settings.lock().unwrap(); + settings.write_mehd.to_value() + } + + _ => unimplemented!(), + } + } + + fn constructed(&self, obj: &Self::Type) { + self.parent_constructed(obj); + + obj.add_pad(&self.sinkpad).unwrap(); + obj.add_pad(&self.srcpad).unwrap(); + } +} + +impl GstObjectImpl for FMP4Mux {} + +impl ElementImpl for FMP4Mux { + #[allow(clippy::single_match)] + fn change_state( + &self, + element: &Self::Type, + transition: gst::StateChange, + ) -> Result { + gst_trace!(CAT, obj: element, "Changing state {:?}", transition); + + let res = self.parent_change_state(element, transition)?; + + match transition { + gst::StateChange::PausedToReady => { + *self.state.lock().unwrap() = State::default(); + } + _ => (), + } + + Ok(res) + } +} + +#[repr(C)] +pub(crate) struct Class { + parent: gst::ffi::GstElementClass, + variant: super::Variant, +} + +unsafe impl ClassStruct for Class { + type Type = FMP4Mux; +} + +impl std::ops::Deref for Class { + type Target = glib::Class; + + fn deref(&self) -> &Self::Target { + unsafe { &*(&self.parent as *const _ as *const _) } + } +} + +unsafe impl IsSubclassable for super::FMP4Mux { + fn class_init(class: &mut glib::Class) { + Self::parent_class_init::(class); + + let class = class.as_mut(); + class.variant = T::VARIANT; + } +} + +pub(crate) trait FMP4MuxImpl: ElementImpl { + const VARIANT: super::Variant; +} + +#[derive(Default)] +pub(crate) struct ISOFMP4Mux; + +#[glib::object_subclass] +impl ObjectSubclass for ISOFMP4Mux { + const NAME: &'static str = "GstISOFMP4Mux"; + type Type = super::ISOFMP4Mux; + type ParentType = super::FMP4Mux; +} + +impl ObjectImpl for ISOFMP4Mux {} + +impl GstObjectImpl for ISOFMP4Mux {} + +impl ElementImpl for ISOFMP4Mux { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "ISOFMP4Mux", + "Codec/Muxer", + "ISO fragmented MP4 muxer", + "Sebastian Dröge ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::builder("video/quicktime") + .field("variant", "iso-fragmented") + .build(), + ) + .unwrap(); + + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &[ + gst::Structure::builder("video/x-h264") + .field("stream-format", gst::List::new(["avc", "avc3"])) + .field("alignment", "au") + .field("width", gst::IntRange::new(1, u16::MAX as i32)) + .field("height", gst::IntRange::new(1, u16::MAX as i32)) + .build(), + gst::Structure::builder("video/x-h265") + .field("stream-format", gst::List::new(["hvc1", "hev1"])) + .field("alignment", "au") + .field("width", gst::IntRange::new(1, u16::MAX as i32)) + .field("height", gst::IntRange::new(1, u16::MAX as i32)) + .build(), + gst::Structure::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("stream-format", "raw") + .field("channels", gst::IntRange::new(1, u16::MAX as i32)) + .field("rate", gst::IntRange::new(1, i32::MAX)) + .build(), + ] + .into_iter() + .collect::(), + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} + +impl FMP4MuxImpl for ISOFMP4Mux { + const VARIANT: super::Variant = super::Variant::ISO; +} + +#[derive(Default)] +pub(crate) struct CMAFMux; + +#[glib::object_subclass] +impl ObjectSubclass for CMAFMux { + const NAME: &'static str = "GstCMAFMux"; + type Type = super::CMAFMux; + type ParentType = super::FMP4Mux; +} + +impl ObjectImpl for CMAFMux {} + +impl GstObjectImpl for CMAFMux {} + +impl ElementImpl for CMAFMux { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "CMAFMux", + "Codec/Muxer", + "CMAF fragmented MP4 muxer", + "Sebastian Dröge ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::builder("video/quicktime") + .field("variant", "cmaf") + .build(), + ) + .unwrap(); + + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &[ + gst::Structure::builder("video/x-h264") + .field("stream-format", gst::List::new(["avc", "avc3"])) + .field("alignment", "au") + .field("width", gst::IntRange::new(1, u16::MAX as i32)) + .field("height", gst::IntRange::new(1, u16::MAX as i32)) + .build(), + gst::Structure::builder("video/x-h265") + .field("stream-format", gst::List::new(["hvc1", "hev1"])) + .field("alignment", "au") + .field("width", gst::IntRange::new(1, u16::MAX as i32)) + .field("height", gst::IntRange::new(1, u16::MAX as i32)) + .build(), + gst::Structure::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("stream-format", "raw") + .field("channels", gst::IntRange::new(1, u16::MAX as i32)) + .field("rate", gst::IntRange::new(1, i32::MAX)) + .build(), + ] + .into_iter() + .collect::(), + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} + +impl FMP4MuxImpl for CMAFMux { + const VARIANT: super::Variant = super::Variant::CMAF; +} + +#[derive(Default)] +pub(crate) struct DASHMP4Mux; + +#[glib::object_subclass] +impl ObjectSubclass for DASHMP4Mux { + const NAME: &'static str = "GstDASHMP4Mux"; + type Type = super::DASHMP4Mux; + type ParentType = super::FMP4Mux; +} + +impl ObjectImpl for DASHMP4Mux {} + +impl GstObjectImpl for DASHMP4Mux {} + +impl ElementImpl for DASHMP4Mux { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "DASHMP4Mux", + "Codec/Muxer", + "DASH fragmented MP4 muxer", + "Sebastian Dröge ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::builder("video/quicktime") + .field("variant", "iso-fragmented") + .build(), + ) + .unwrap(); + + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &std::array::IntoIter::new([ + gst::Structure::builder("video/x-h264") + .field("stream-format", gst::List::new(&[&"avc", &"avc3"])) + .field("alignment", "au") + .field("width", gst::IntRange::::new(1, u16::MAX as i32)) + .field("height", gst::IntRange::::new(1, u16::MAX as i32)) + .build(), + gst::Structure::builder("video/x-h265") + .field("stream-format", gst::List::new(&[&"hvc1", &"hev1"])) + .field("alignment", "au") + .field("width", gst::IntRange::::new(1, u16::MAX as i32)) + .field("height", gst::IntRange::::new(1, u16::MAX as i32)) + .build(), + gst::Structure::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("stream-format", "raw") + .field("channels", gst::IntRange::::new(1, u16::MAX as i32)) + .field("rate", gst::IntRange::::new(1, i32::MAX)) + .build(), + ]) + .collect::(), + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} + +impl FMP4MuxImpl for DASHMP4Mux { + const VARIANT: super::Variant = super::Variant::DASH; +} diff --git a/generic/fmp4/src/fmp4mux/mod.rs b/generic/fmp4/src/fmp4mux/mod.rs new file mode 100644 index 00000000..914d7875 --- /dev/null +++ b/generic/fmp4/src/fmp4mux/mod.rs @@ -0,0 +1,118 @@ +// Copyright (C) 2021 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +mod boxes; +mod imp; + +glib::wrapper! { + pub(crate) struct FMP4Mux(ObjectSubclass) @extends gst::Element, gst::Object; +} + +unsafe impl Send for FMP4Mux {} +unsafe impl Sync for FMP4Mux {} + +glib::wrapper! { + pub(crate) struct ISOFMP4Mux(ObjectSubclass) @extends FMP4Mux, gst::Element, gst::Object; +} + +unsafe impl Send for ISOFMP4Mux {} +unsafe impl Sync for ISOFMP4Mux {} + +glib::wrapper! { + pub(crate) struct CMAFMux(ObjectSubclass) @extends FMP4Mux, gst::Element, gst::Object; +} + +unsafe impl Send for CMAFMux {} +unsafe impl Sync for CMAFMux {} + +glib::wrapper! { + pub(crate) struct DASHMP4Mux(ObjectSubclass) @extends FMP4Mux, gst::Element, gst::Object; +} + +unsafe impl Send for DASHMP4Mux {} +unsafe impl Sync for DASHMP4Mux {} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "isofmp4mux", + gst::Rank::Primary, + ISOFMP4Mux::static_type(), + )?; + gst::Element::register( + Some(plugin), + "cmafmux", + gst::Rank::Primary, + CMAFMux::static_type(), + )?; + gst::Element::register( + Some(plugin), + "dashmp4mux", + gst::Rank::Primary, + DASHMP4Mux::static_type(), + )?; + + Ok(()) +} + +#[derive(Debug)] +pub(crate) struct Buffer { + buffer: gst::Buffer, + // Running times + pts: gst::ClockTime, + dts: Option, +} + +#[derive(Debug)] +pub(crate) struct HeaderConfiguration<'a> { + variant: Variant, + update: bool, + caps: &'a gst::Caps, + write_mehd: bool, + duration: Option, +} + +#[derive(Debug)] +pub(crate) struct FragmentHeaderConfiguration<'a> { + variant: Variant, + sequence_number: u32, + caps: &'a gst::Caps, + buffers: &'a [Buffer], + earliest_pts: gst::ClockTime, + start_dts: Option, + end_pts: gst::ClockTime, + end_dts: Option, + dts_offset: Option, +} + +#[allow(clippy::upper_case_acronyms)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum Variant { + ISO, + CMAF, + DASH, +} + +#[derive(Debug)] +pub(crate) struct FragmentOffset { + time: gst::ClockTime, + offset: u64, +} + +#[allow(clippy::upper_case_acronyms)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, glib::GEnum)] +#[repr(i32)] +#[genum(type_name = "GstFMP4MuxHeaderUpdateMode")] +pub(crate) enum HeaderUpdateMode { + None, + Rewrite, + Update, +} diff --git a/generic/fmp4/src/lib.rs b/generic/fmp4/src/lib.rs new file mode 100644 index 00000000..dfd77114 --- /dev/null +++ b/generic/fmp4/src/lib.rs @@ -0,0 +1,28 @@ +// Copyright (C) 2021 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; + +mod fmp4mux; + +fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + fmp4mux::register(plugin) +} + +gst::plugin_define!( + fmp4, + env!("CARGO_PKG_DESCRIPTION"), + plugin_init, + concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")), + // FIXME: MPL-2.0 is only allowed since 1.18.3 (as unknown) and 1.20 (as known) + "MPL", + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_REPOSITORY"), + env!("BUILD_REL_DATE") +); diff --git a/generic/fmp4/tests/tests.rs b/generic/fmp4/tests/tests.rs new file mode 100644 index 00000000..6c871ccc --- /dev/null +++ b/generic/fmp4/tests/tests.rs @@ -0,0 +1,139 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 +// + +fn init() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + gst::init().unwrap(); + gstfmp4::plugin_register_static().unwrap(); + }); +} + +#[test] +fn test_buffer_flags() { + init(); + + // 5s fragment duration + let mut h = gst_check::Harness::new_parse("isofmp4mux fragment-duration=5000000000"); + h.set_src_caps( + gst::Caps::builder("video/x-h264") + .field("width", 1920i32) + .field("height", 1080i32) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::with_size(1).unwrap()) + .build(), + ); + h.play(); + + // Push 7 buffers of 1s each, 1st and last buffer without DELTA_UNIT flag + for i in 0..7 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(gst::ClockTime::from_seconds(i)); + buffer.set_dts(gst::ClockTime::from_seconds(i)); + buffer.set_duration(gst::ClockTime::SECOND); + if i != 0 && i != 5 { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok)); + + if i == 2 { + let ev = loop { + let ev = h.pull_upstream_event().unwrap(); + if ev.type_() != gst::EventType::Reconfigure + && ev.type_() != gst::EventType::Latency + { + break ev; + } + }; + + assert_eq!(ev.type_(), gst::EventType::CustomUpstream); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(gst::ClockTime::from_seconds(5)), + all_headers: true, + count: 0 + } + ); + } + } + + let header = h.pull().unwrap(); + assert_eq!( + header.flags(), + gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT + ); + assert_eq!(header.pts(), Some(gst::ClockTime::ZERO)); + assert_eq!(header.dts(), Some(gst::ClockTime::ZERO)); + + let fragment_header = h.pull().unwrap(); + assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); + assert_eq!(fragment_header.pts(), Some(gst::ClockTime::ZERO)); + assert_eq!(fragment_header.dts(), Some(gst::ClockTime::ZERO)); + assert_eq!( + fragment_header.duration(), + Some(gst::ClockTime::from_seconds(5)) + ); + + for i in 0..5 { + let buffer = h.pull().unwrap(); + if i == 4 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + assert_eq!(buffer.pts(), Some(gst::ClockTime::from_seconds(i))); + assert_eq!(buffer.dts(), Some(gst::ClockTime::from_seconds(i))); + assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); + } + + h.push_event(gst::event::Eos::new()); + + let fragment_header = h.pull().unwrap(); + assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); + assert_eq!(fragment_header.pts(), Some(gst::ClockTime::from_seconds(5))); + assert_eq!(fragment_header.dts(), Some(gst::ClockTime::from_seconds(5))); + assert_eq!( + fragment_header.duration(), + Some(gst::ClockTime::from_seconds(2)) + ); + + for i in 5..7 { + let buffer = h.pull().unwrap(); + if i == 6 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + assert_eq!(buffer.pts(), Some(gst::ClockTime::from_seconds(i))); + assert_eq!(buffer.dts(), Some(gst::ClockTime::from_seconds(i))); + assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); + } + + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Caps); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); +} diff --git a/meson.build b/meson.build index d93ee19a..d9761a5e 100644 --- a/meson.build +++ b/meson.build @@ -47,6 +47,7 @@ plugins = { 'gst-plugin-rspng': 'libgstrspng', 'gst-plugin-rusoto': 'libgstrusoto', 'gst-plugin-textwrap': 'libgstrstextwrap', + 'gst-plugin-fmp4': 'libgstfmp4', 'gst-plugin-threadshare': 'libgstthreadshare', 'gst-plugin-togglerecord': 'libgsttogglerecord', 'gst-plugin-hsv': 'libgsthsv',