Add new fmp4 plugin with muxers for ISO fragmented MP4, DASH and CMAF containers

This commit is contained in:
Sebastian Dröge 2021-10-18 09:42:42 +03:00
parent e81047b8a2
commit e3fbf2078d
11 changed files with 3872 additions and 0 deletions

View file

@ -7,6 +7,7 @@ members = [
"audio/claxon",
"audio/csound",
"audio/lewton",
"generic/fmp4",
"generic/file",
"generic/sodium",
"generic/threadshare",

47
generic/fmp4/Cargo.toml Normal file
View file

@ -0,0 +1,47 @@
[package]
name = "gst-plugin-fmp4"
version = "0.8.0"
authors = ["Sebastian Dröge <sebastian@centricular.com>"]
license = "MPL-2.0"
description = "Fragmented MP4 Plugin"
repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
edition = "2021"
rust-version = "1.56"
[dependencies]
anyhow = "1"
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_14"] }
gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_14"] }
gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_14"] }
once_cell = "1.0"
[lib]
name = "gstfmp4"
crate-type = ["cdylib", "rlib"]
path = "src/lib.rs"
[dev-dependencies]
gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] }
gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] }
[build-dependencies]
gst-plugin-version-helper = { path="../../version-helper" }
[features]
default = ["v1_18"]
static = []
capi = []
v1_18 = ["gst-video/v1_18"]
[package.metadata.capi]
min_version = "0.8.0"
[package.metadata.capi.header]
enabled = false
[package.metadata.capi.library]
install_subdir = "gstreamer-1.0"
versioning = false
[package.metadata.capi.pkg_config]
requires_private = "gstreamer-1.0, gstreamer-audio-1.0, gstreamer-video-1.0, gobject-2.0, glib-2.0, gmodule-2.0"

1
generic/fmp4/LICENSE Symbolic link
View file

@ -0,0 +1 @@
../../LICENSE-MPL-2.0

3
generic/fmp4/build.rs Normal file
View file

@ -0,0 +1,3 @@
fn main() {
gst_plugin_version_helper::info()
}

View file

@ -0,0 +1,266 @@
// Copyright (C) 2021 Sebastian Dröge <sebastian@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
// This creates a VoD DASH manifest based on the output of `cmafmux`. The media header
// ("initialization segment") is written into a separate file as the segments, and each segment is
// its own file too.
//
// All segments that are created are exactly 10s, expect for the last one which is only 3.333s.
use gst::prelude::*;
use std::fmt::Write;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use anyhow::Error;
struct Segment {
start_time: gst::ClockTime,
duration: gst::ClockTime,
}
struct State {
start_time: Option<gst::ClockTime>,
end_time: Option<gst::ClockTime>,
segments: Vec<Segment>,
path: PathBuf,
}
fn main() -> Result<(), Error> {
gst::init()?;
gstfmp4::plugin_register_static()?;
let state = Arc::new(Mutex::new(State {
start_time: None,
end_time: None,
segments: Vec::new(),
path: PathBuf::from("dash_stream"),
}));
let pipeline = gst::parse_launch("videotestsrc num-buffers=2500 ! timecodestamper ! video/x-raw,format=I420,width=1280,height=720,framerate=30/1 ! timeoverlay ! x264enc bframes=0 bitrate=2048 ! video/x-h264,profile=main ! cmafmux fragment-duration=10000000000 header-update-mode=update write-mehd=true ! appsink name=sink").unwrap().downcast::<gst::Pipeline>().unwrap();
let sink = pipeline
.by_name("sink")
.unwrap()
.dynamic_cast::<gst_app::AppSink>()
.unwrap();
sink.set_buffer_list(true);
let state_clone = state.clone();
sink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample(move |sink| {
let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let mut state = state.lock().unwrap();
// The muxer only outputs non-empty buffer lists
let mut buffer_list = sample.buffer_list_owned().expect("no buffer list");
assert!(!buffer_list.is_empty());
let mut first = buffer_list.get(0).unwrap();
// Each list contains a full segment, i.e. does not start with a DELTA_UNIT
assert!(!first.flags().contains(gst::BufferFlags::DELTA_UNIT));
// If the buffer has the DISCONT and HEADER flag set then it contains the media
// header, i.e. the `ftyp`, `moov` and other media boxes.
//
// This might be the initial header or the updated header at the end of the stream.
if first.flags().contains(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER) {
let mut path = state.path.clone();
std::fs::create_dir_all(&path).expect("failed to create directory");
path.push("init.cmfi");
println!("writing header to {}", path.display());
let map = first.map_readable().unwrap();
std::fs::write(path, &map).expect("failed to write header");
drop(map);
// Remove the header from the buffer list
buffer_list.make_mut().remove(0, 1);
// If the list is now empty then it only contained the media header and nothing
// else.
if buffer_list.is_empty() {
return Ok(gst::FlowSuccess::Ok);
}
// Otherwise get the next buffer and continue working with that.
first = buffer_list.get(0).unwrap();
}
// If the buffer only has the HEADER flag set then this is a segment header that is
// followed by one or more actual media buffers.
assert!(first.flags().contains(gst::BufferFlags::HEADER));
let segment = sample.segment().expect("no segment")
.downcast_ref::<gst::ClockTime>().expect("no time segment");
// Initialize the start time with the first PTS we observed. This will be used
// later for calculating the duration of the whole media for the DASH manifest.
//
// The PTS of the segment header is equivalent to the earliest PTS of the whole
// segment.
let pts = segment.to_running_time(first.pts().unwrap()).expect("can't get running time");
if state.start_time.is_none() {
state.start_time = Some(pts);
}
// The metadata of the first media buffer is duplicated to the segment header.
// Based on this we can know the timecode of the first frame in this segment.
let meta = first.meta::<gst_video::VideoTimeCodeMeta>().expect("no timecode meta");
let mut path = state.path.clone();
path.push(format!("segment_{}.cmfv", state.segments.len() + 1));
println!("writing segment with timecode {} to {}", meta.tc(), path.display());
// Calculate the end time at this point. The duration of the segment header is set
// to the whole duration of this segment.
let duration = first.duration().unwrap();
let end_time = first.pts().unwrap() + first.duration().unwrap();
state.end_time = Some(segment.to_running_time(end_time).expect("can't get running time"));
let mut file = std::fs::File::create(path).expect("failed to open fragment");
for buffer in &*buffer_list {
use std::io::prelude::*;
let map = buffer.map_readable().unwrap();
file.write_all(&map).expect("failed to write fragment");
}
state.segments.push(Segment {
start_time: pts,
duration,
});
Ok(gst::FlowSuccess::Ok)
})
.eos(move |_sink| {
let state = state_clone.lock().unwrap();
// Now write the manifest
let mut path = state.path.clone();
path.push("manifest.mpd");
println!("writing manifest to {}", path.display());
let duration = state.end_time.opt_checked_sub(state.start_time).ok().flatten().unwrap().mseconds() as f64 / 1000.0;
// Write the whole segment timeline out here, compressing multiple segments with
// the same duration to a repeated segment.
let mut segment_timeline = String::new();
let mut write_segment = |start: gst::ClockTime, duration: gst::ClockTime, repeat: usize| {
if repeat > 0 {
writeln!(
&mut segment_timeline,
" <S t=\"{time}\" d=\"{duration}\" r=\"{repeat}\" />",
time = start.mseconds(),
duration = duration.mseconds(),
repeat = repeat
).unwrap();
} else {
writeln!(
&mut segment_timeline,
" <S t=\"{time}\" d=\"{duration}\" />",
time = start.mseconds(),
duration = duration.mseconds()
).unwrap();
}
};
let mut start = None;
let mut num_segments = 0;
let mut last_duration = None;
for segment in &state.segments {
if start.is_none() {
start = Some(segment.start_time);
}
if last_duration.is_none() {
last_duration = Some(segment.duration);
}
// If the duration of this segment is different from the previous one then we
// have to write out the segment now.
if last_duration != Some(segment.duration) {
write_segment(start.unwrap(), last_duration.unwrap(), num_segments - 1);
start = Some(segment.start_time);
last_duration = Some(segment.duration);
num_segments = 1;
} else {
num_segments += 1;
}
}
// Write the last segment if any
if num_segments > 0 {
write_segment(start.unwrap(), last_duration.unwrap(), num_segments - 1);
}
let manifest = format!(r###"<?xml version="1.0" encoding="UTF-8"?>
<MPD
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="urn:mpeg:dash:schema:mpd:2011"
xsi:schemaLocation="urn:mpeg:dash:schema:mpd:2011 DASH-MPD.xsd"
type="static"
mediaPresentationDuration="PT{duration:.3}S"
profiles="urn:mpeg:dash:profile:isoff-on-demand:2011">
<Period>
<AdaptationSet mimeType="video/mp4" codecs="avc1.4d0228" frameRate="30/1" segmentAlignment="true" startWithSAP="1">
<Representation id="A" bandwidth="2048000" with="1280" height="720">
<SegmentTemplate timescale="1000" initialization="init.cmfi" media="segment_$Number$.cmfv">
<SegmentTimeline>
{segment_timeline} </SegmentTimeline>
</SegmentTemplate>
</Representation>
</AdaptationSet>
</Period>
</MPD>
"###,
duration = duration, segment_timeline = segment_timeline);
std::fs::write(path, &manifest).expect("failed to write manifest");
})
.build(),
);
pipeline.set_state(gst::State::Playing)?;
let bus = pipeline
.bus()
.expect("Pipeline without bus. Shouldn't happen!");
for msg in bus.iter_timed(gst::ClockTime::NONE) {
use gst::MessageView;
match msg.view() {
MessageView::Eos(..) => {
println!("EOS");
break;
}
MessageView::Error(err) => {
pipeline.set_state(gst::State::Null)?;
eprintln!(
"Got error from {}: {} ({})",
msg.src()
.map(|s| String::from(s.path_string()))
.unwrap_or_else(|| "None".into()),
err.error().to_string(),
err.debug().unwrap_or_else(|| "".into()),
);
break;
}
_ => (),
}
}
pipeline.set_state(gst::State::Null)?;
Ok(())
}

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,118 @@
// Copyright (C) 2021 Sebastian Dröge <sebastian@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use gst::glib;
use gst::prelude::*;
mod boxes;
mod imp;
glib::wrapper! {
pub(crate) struct FMP4Mux(ObjectSubclass<imp::FMP4Mux>) @extends gst::Element, gst::Object;
}
unsafe impl Send for FMP4Mux {}
unsafe impl Sync for FMP4Mux {}
glib::wrapper! {
pub(crate) struct ISOFMP4Mux(ObjectSubclass<imp::ISOFMP4Mux>) @extends FMP4Mux, gst::Element, gst::Object;
}
unsafe impl Send for ISOFMP4Mux {}
unsafe impl Sync for ISOFMP4Mux {}
glib::wrapper! {
pub(crate) struct CMAFMux(ObjectSubclass<imp::CMAFMux>) @extends FMP4Mux, gst::Element, gst::Object;
}
unsafe impl Send for CMAFMux {}
unsafe impl Sync for CMAFMux {}
glib::wrapper! {
pub(crate) struct DASHMP4Mux(ObjectSubclass<imp::DASHMP4Mux>) @extends FMP4Mux, gst::Element, gst::Object;
}
unsafe impl Send for DASHMP4Mux {}
unsafe impl Sync for DASHMP4Mux {}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"isofmp4mux",
gst::Rank::Primary,
ISOFMP4Mux::static_type(),
)?;
gst::Element::register(
Some(plugin),
"cmafmux",
gst::Rank::Primary,
CMAFMux::static_type(),
)?;
gst::Element::register(
Some(plugin),
"dashmp4mux",
gst::Rank::Primary,
DASHMP4Mux::static_type(),
)?;
Ok(())
}
#[derive(Debug)]
pub(crate) struct Buffer {
buffer: gst::Buffer,
// Running times
pts: gst::ClockTime,
dts: Option<gst::ClockTime>,
}
#[derive(Debug)]
pub(crate) struct HeaderConfiguration<'a> {
variant: Variant,
update: bool,
caps: &'a gst::Caps,
write_mehd: bool,
duration: Option<gst::ClockTime>,
}
#[derive(Debug)]
pub(crate) struct FragmentHeaderConfiguration<'a> {
variant: Variant,
sequence_number: u32,
caps: &'a gst::Caps,
buffers: &'a [Buffer],
earliest_pts: gst::ClockTime,
start_dts: Option<gst::ClockTime>,
end_pts: gst::ClockTime,
end_dts: Option<gst::ClockTime>,
dts_offset: Option<gst::ClockTime>,
}
#[allow(clippy::upper_case_acronyms)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum Variant {
ISO,
CMAF,
DASH,
}
#[derive(Debug)]
pub(crate) struct FragmentOffset {
time: gst::ClockTime,
offset: u64,
}
#[allow(clippy::upper_case_acronyms)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, glib::GEnum)]
#[repr(i32)]
#[genum(type_name = "GstFMP4MuxHeaderUpdateMode")]
pub(crate) enum HeaderUpdateMode {
None,
Rewrite,
Update,
}

28
generic/fmp4/src/lib.rs Normal file
View file

@ -0,0 +1,28 @@
// Copyright (C) 2021 Sebastian Dröge <sebastian@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use gst::glib;
mod fmp4mux;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
fmp4mux::register(plugin)
}
gst::plugin_define!(
fmp4,
env!("CARGO_PKG_DESCRIPTION"),
plugin_init,
concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),
// FIXME: MPL-2.0 is only allowed since 1.18.3 (as unknown) and 1.20 (as known)
"MPL",
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_REPOSITORY"),
env!("BUILD_REL_DATE")
);

139
generic/fmp4/tests/tests.rs Normal file
View file

@ -0,0 +1,139 @@
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
//
fn init() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
gst::init().unwrap();
gstfmp4::plugin_register_static().unwrap();
});
}
#[test]
fn test_buffer_flags() {
init();
// 5s fragment duration
let mut h = gst_check::Harness::new_parse("isofmp4mux fragment-duration=5000000000");
h.set_src_caps(
gst::Caps::builder("video/x-h264")
.field("width", 1920i32)
.field("height", 1080i32)
.field("framerate", gst::Fraction::new(30, 1))
.field("stream-format", "avc")
.field("alignment", "au")
.field("codec_data", gst::Buffer::with_size(1).unwrap())
.build(),
);
h.play();
// Push 7 buffers of 1s each, 1st and last buffer without DELTA_UNIT flag
for i in 0..7 {
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
let buffer = buffer.get_mut().unwrap();
buffer.set_pts(gst::ClockTime::from_seconds(i));
buffer.set_dts(gst::ClockTime::from_seconds(i));
buffer.set_duration(gst::ClockTime::SECOND);
if i != 0 && i != 5 {
buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
}
}
assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok));
if i == 2 {
let ev = loop {
let ev = h.pull_upstream_event().unwrap();
if ev.type_() != gst::EventType::Reconfigure
&& ev.type_() != gst::EventType::Latency
{
break ev;
}
};
assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
assert_eq!(
gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
gst_video::UpstreamForceKeyUnitEvent {
running_time: Some(gst::ClockTime::from_seconds(5)),
all_headers: true,
count: 0
}
);
}
}
let header = h.pull().unwrap();
assert_eq!(
header.flags(),
gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
);
assert_eq!(header.pts(), Some(gst::ClockTime::ZERO));
assert_eq!(header.dts(), Some(gst::ClockTime::ZERO));
let fragment_header = h.pull().unwrap();
assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
assert_eq!(fragment_header.pts(), Some(gst::ClockTime::ZERO));
assert_eq!(fragment_header.dts(), Some(gst::ClockTime::ZERO));
assert_eq!(
fragment_header.duration(),
Some(gst::ClockTime::from_seconds(5))
);
for i in 0..5 {
let buffer = h.pull().unwrap();
if i == 4 {
assert_eq!(
buffer.flags(),
gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
);
} else {
assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
}
assert_eq!(buffer.pts(), Some(gst::ClockTime::from_seconds(i)));
assert_eq!(buffer.dts(), Some(gst::ClockTime::from_seconds(i)));
assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
}
h.push_event(gst::event::Eos::new());
let fragment_header = h.pull().unwrap();
assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
assert_eq!(fragment_header.pts(), Some(gst::ClockTime::from_seconds(5)));
assert_eq!(fragment_header.dts(), Some(gst::ClockTime::from_seconds(5)));
assert_eq!(
fragment_header.duration(),
Some(gst::ClockTime::from_seconds(2))
);
for i in 5..7 {
let buffer = h.pull().unwrap();
if i == 6 {
assert_eq!(
buffer.flags(),
gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
);
} else {
assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
}
assert_eq!(buffer.pts(), Some(gst::ClockTime::from_seconds(i)));
assert_eq!(buffer.dts(), Some(gst::ClockTime::from_seconds(i)));
assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
}
let ev = h.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::StreamStart);
let ev = h.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Caps);
let ev = h.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Segment);
let ev = h.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Eos);
}

View file

@ -47,6 +47,7 @@ plugins = {
'gst-plugin-rspng': 'libgstrspng',
'gst-plugin-rusoto': 'libgstrusoto',
'gst-plugin-textwrap': 'libgstrstextwrap',
'gst-plugin-fmp4': 'libgstfmp4',
'gst-plugin-threadshare': 'libgstthreadshare',
'gst-plugin-togglerecord': 'libgsttogglerecord',
'gst-plugin-hsv': 'libgsthsv',