fmp4mux: Implement a better strategy for splitting fragments and deciding which buffers go into which fragment

It is now guaranteed that each fragment is at most fragment-duration
long unless the one and only GOP of the fragment is longer than that.
The first (non-EOS) stream determines the duration of each fragment and
all other streams are drained to at most the fragment end timestamp
determined this way.

In addition the next fragment's target time is now at the end of the
previous fragment plus fragment-duration instead of using
  first-fragment + N*fragment-duration
regardless of where fragments were split before.

That is, fmp4mux now uses the same strategy as used by splitmuxsink and
as is required e.g. by HLS with regards to the target duration.
This commit is contained in:
Sebastian Dröge 2022-10-01 20:52:18 +03:00 committed by Sebastian Dröge
parent f66aafb039
commit 36ce8bd4f7
2 changed files with 703 additions and 134 deletions

View file

@ -91,12 +91,14 @@ impl Default for Settings {
}
}
#[derive(Debug)]
struct GopBuffer {
buffer: gst::Buffer,
pts: gst::ClockTime,
dts: Option<gst::ClockTime>,
}
#[derive(Debug)]
struct Gop {
// Running times
start_pts: gst::ClockTime,
@ -139,8 +141,6 @@ struct Stream {
// going backwards when draining a fragment.
// UNIX epoch.
current_utc_time: gst::ClockTime,
last_force_keyunit_time: Option<gst::ClockTime>,
}
#[derive(Default)]
@ -163,6 +163,8 @@ struct State {
// Start PTS of the current fragment
fragment_start_pts: Option<gst::ClockTime>,
// Additional timeout delay in case GOPs are bigger than the fragment duration
timeout_delay: gst::ClockTime,
// In ONVIF mode the UTC time corresponding to the beginning of the stream
// UNIX epoch.
@ -604,92 +606,29 @@ impl FMP4Mux {
Ok(())
}
fn create_initial_force_keyunit_event(
&self,
_element: &super::FMP4Mux,
stream: &mut Stream,
settings: &Settings,
earliest_pts: gst::ClockTime,
) -> Result<Option<gst::Event>, gst::FlowError> {
assert!(stream.last_force_keyunit_time.is_none());
// If we never sent a force-keyunit event then send one now.
let fku_running_time = earliest_pts + settings.fragment_duration;
gst::debug!(
CAT,
obj: &stream.sinkpad,
"Sending first force-keyunit event for running time {}",
fku_running_time
);
stream.last_force_keyunit_time = Some(fku_running_time);
return Ok(Some(
gst_video::UpstreamForceKeyUnitEvent::builder()
.running_time(fku_running_time)
.all_headers(true)
.build(),
));
}
fn create_force_keyunit_event(
&self,
_element: &super::FMP4Mux,
stream: &mut Stream,
settings: &Settings,
segment: &gst::FormattedSegment<gst::ClockTime>,
pts: gst::ClockTime,
) -> Result<Option<gst::Event>, gst::FlowError> {
// If we never sent a force-keyunit event then wait until the earliest PTS of the first GOP
// is known and send it then.
//
// Otherwise if the current PTS is a fragment duration in the future, send the next one
// now.
let last_force_keyunit_time = match stream.last_force_keyunit_time {
None => return Ok(None),
Some(last_force_keyunit_time) => last_force_keyunit_time,
};
let pts = segment.to_running_time(pts);
if pts.opt_lt(last_force_keyunit_time).unwrap_or(true) {
return Ok(None);
}
let fku_running_time = last_force_keyunit_time + settings.fragment_duration;
gst::debug!(
CAT,
obj: &stream.sinkpad,
"Sending force-keyunit event for running time {}",
fku_running_time
);
stream.last_force_keyunit_time = Some(fku_running_time);
Ok(Some(
gst_video::UpstreamForceKeyUnitEvent::builder()
.running_time(fku_running_time)
.all_headers(true)
.build(),
))
}
#[allow(clippy::type_complexity)]
fn drain_buffers(
&self,
_element: &super::FMP4Mux,
element: &super::FMP4Mux,
state: &mut State,
settings: &Settings,
timeout: bool,
at_eos: bool,
) -> Result<
(
// Drained streams
Vec<(
gst::Caps,
Option<super::FragmentTimingInfo>,
VecDeque<Buffer>,
)>,
// Minimum earliest PTS position of all streams
Option<gst::ClockTime>,
// Minimum earliest PTS of all streams
Option<gst::ClockTime>,
// Minimum start DTS position of all streams (if any stream has DTS)
Option<gst::ClockTime>,
// End PTS of this drained fragment, i.e. start PTS of the next fragment
Option<gst::ClockTime>,
),
gst::FlowError,
@ -699,7 +638,24 @@ impl FMP4Mux {
let mut min_earliest_pts_position = None;
let mut min_earliest_pts = None;
let mut min_start_dts_position = None;
let mut max_end_pts = None;
let mut fragment_end_pts = None;
// The first stream decides how much can be dequeued, if anything at all.
//
// All complete GOPs (or at EOS everything) up to the fragment duration will be dequeued
// but on timeout in live pipelines it might happen that the first stream does not have a
// complete GOP queued. In that case nothing is dequeued for any of the streams and the
// timeout is advanced by 1s until at least one complete GOP can be dequeued.
//
// If the first stream is already EOS then the next stream that is not EOS yet will be
// taken in its place.
let fragment_start_pts = state.fragment_start_pts.unwrap();
gst::info!(
CAT,
obj: element,
"Starting to drain at {}",
fragment_start_pts
);
for (idx, stream) in state.streams.iter_mut().enumerate() {
assert!(
@ -709,29 +665,74 @@ impl FMP4Mux {
|| stream.queued_gops.get(1).map(|gop| gop.final_earliest_pts) == Some(true)
);
// At EOS, finalize all GOPs and drain them out. Otherwise if the queued duration is
// equal to the fragment duration then drain out all complete GOPs, otherwise all
// except for the newest complete GOP.
let gops = if at_eos || stream.sinkpad.is_eos() {
stream.queued_gops.drain(..).rev().collect::<Vec<_>>()
} else {
let mut gops = vec![];
// Drain all complete GOPs until at most one fragment duration was dequeued for the
// first stream, or until the dequeued duration of the first stream.
let mut gops = Vec::with_capacity(stream.queued_gops.len());
let dequeue_end_pts =
fragment_end_pts.unwrap_or(fragment_start_pts + settings.fragment_duration);
gst::trace!(
CAT,
obj: &stream.sinkpad,
"Draining up to end PTS {} / duration {}",
dequeue_end_pts,
dequeue_end_pts - fragment_start_pts
);
let fragment_start_pts = state.fragment_start_pts.unwrap();
while let Some(gop) = stream.queued_gops.pop_back() {
assert!(timeout || gop.final_end_pts);
let end_pts = gop.end_pts;
gops.push(gop);
if end_pts.saturating_sub(fragment_start_pts) >= settings.fragment_duration {
break;
}
while let Some(gop) = stream.queued_gops.back() {
// If this GOP is not complete then we can't pop it yet.
//
// If there was no complete GOP at all yet then it might be bigger than the
// fragment duration. In this case we might not be able to handle the latency
// requirements in a live pipeline.
if !gop.final_end_pts && !at_eos && !stream.sinkpad.is_eos() {
break;
}
gops
};
// If this GOP starts after the fragment end then don't dequeue it yet unless this is
// the first stream and no GOPs were dequeued at all yet. This would mean that the
// GOP is bigger than the fragment duration.
if gop.end_pts > dequeue_end_pts && (fragment_end_pts.is_some() || !gops.is_empty())
{
break;
}
gops.push(stream.queued_gops.pop_back().unwrap());
}
stream.fragment_filled = false;
// If we don't have a next fragment start PTS then this is the first stream as above.
if fragment_end_pts.is_none() {
if let Some(last_gop) = gops.last() {
// Dequeued something so let's take the end PTS of the last GOP
fragment_end_pts = Some(last_gop.end_pts);
gst::info!(
CAT,
obj: &stream.sinkpad,
"Draining up to PTS {} for this fragment",
last_gop.end_pts,
);
} else {
// If nothing was dequeued for the first stream then this is OK if we're at
// EOS: we just consider the next stream as first stream then.
if at_eos || stream.sinkpad.is_eos() {
// This is handled below generally if nothing was dequeued
} else {
// Otherwise this can only really happen on timeout in live pipelines.
assert!(timeout);
gst::warning!(
CAT,
obj: &stream.sinkpad,
"Don't have a complete GOP for the first stream on timeout in a live pipeline",
);
// In this case we advance the timeout by 1s and hope that things are
// better then.
return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
}
}
}
if gops.is_empty() {
gst::info!(
CAT,
@ -743,6 +744,8 @@ impl FMP4Mux {
continue;
}
assert!(fragment_end_pts.is_some());
let first_gop = gops.first().unwrap();
let last_gop = gops.last().unwrap();
let earliest_pts = first_gop.earliest_pts;
@ -769,9 +772,6 @@ impl FMP4Mux {
min_start_dts_position = Some(start_dts_position);
}
}
if max_end_pts.opt_lt(end_pts).unwrap_or(true) {
max_end_pts = Some(end_pts);
}
gst::info!(
CAT,
@ -892,7 +892,7 @@ impl FMP4Mux {
min_earliest_pts_position,
min_earliest_pts,
min_start_dts_position,
max_end_pts,
fragment_end_pts,
))
}
@ -1229,6 +1229,7 @@ impl FMP4Mux {
settings: &Settings,
timeout: bool,
at_eos: bool,
upstream_events: &mut Vec<(gst_base::AggregatorPad, gst::Event)>,
) -> Result<(Option<gst::Caps>, Option<gst::BufferList>), gst::FlowError> {
if at_eos {
gst::info!(CAT, obj: element, "Draining at EOS");
@ -1248,7 +1249,7 @@ impl FMP4Mux {
min_earliest_pts_position,
min_earliest_pts,
min_start_dts_position,
max_end_pts,
fragment_end_pts,
) = self.drain_buffers(element, state, settings, timeout, at_eos)?;
// For ONVIF, replace all timestamps with timestamps based on UTC times.
@ -1279,13 +1280,13 @@ impl FMP4Mux {
let mut buffer_list = None;
if interleaved_buffers.is_empty() {
assert!(timeout || at_eos);
assert!(at_eos);
} else {
// If there are actual buffers to output then create headers as needed and create a
// bufferlist for all buffers that have to be output.
let min_earliest_pts_position = min_earliest_pts_position.unwrap();
let min_earliest_pts = min_earliest_pts.unwrap();
let max_end_pts = max_end_pts.unwrap();
let fragment_end_pts = fragment_end_pts.unwrap();
let mut fmp4_header = None;
if !state.sent_headers {
@ -1334,7 +1335,7 @@ impl FMP4Mux {
let buffer = fmp4_fragment_header.get_mut().unwrap();
buffer.set_pts(min_earliest_pts_position);
buffer.set_dts(min_start_dts_position);
buffer.set_duration(max_end_pts.checked_sub(min_earliest_pts));
buffer.set_duration(fragment_end_pts.checked_sub(min_earliest_pts));
// Fragment header is HEADER
buffer.set_flags(gst::BufferFlags::HEADER);
@ -1386,22 +1387,37 @@ impl FMP4Mux {
offset: moof_offset,
});
}
state.end_pts = Some(max_end_pts);
state.end_pts = Some(fragment_end_pts);
state.end_utc_time = max_end_utc_time;
// Update for the start PTS of the next fragment
state.fragment_start_pts = state.fragment_start_pts.map(|start| {
let new_fragment_start = start + settings.fragment_duration;
gst::info!(
CAT,
obj: element,
"Starting new fragment at {}",
fragment_end_pts,
);
state.fragment_start_pts = Some(fragment_end_pts);
gst::info!(
CAT,
obj: element,
"Starting new fragment at {}",
new_fragment_start
);
gst::debug!(
CAT,
obj: element,
"Sending force-keyunit events for running time {}",
fragment_end_pts + settings.fragment_duration,
);
new_fragment_start
});
let fku = gst_video::UpstreamForceKeyUnitEvent::builder()
.running_time(fragment_end_pts + settings.fragment_duration)
.all_headers(true)
.build();
for stream in &state.streams {
upstream_events.push((stream.sinkpad.clone(), fku.clone()));
}
// Reset timeout delay now that we've output an actual fragment
state.timeout_delay = gst::ClockTime::ZERO;
}
if settings.write_mfra && at_eos {
@ -1491,7 +1507,6 @@ impl FMP4Mux {
dts_offset: None,
current_position: gst::ClockTime::ZERO,
current_utc_time: gst::ClockTime::ZERO,
last_force_keyunit_time: None,
});
}
@ -1801,7 +1816,7 @@ impl ElementImpl for FMP4Mux {
impl AggregatorImpl for FMP4Mux {
fn next_time(&self, _aggregator: &Self::Type) -> Option<gst::ClockTime> {
let state = self.state.lock().unwrap();
state.fragment_start_pts
state.fragment_start_pts.opt_add(state.timeout_delay)
}
fn sink_query(
@ -1938,7 +1953,6 @@ impl AggregatorImpl for FMP4Mux {
stream.dts_offset = None;
stream.current_position = gst::ClockTime::ZERO;
stream.current_utc_time = gst::ClockTime::ZERO;
stream.last_force_keyunit_time = None;
stream.fragment_filled = false;
}
@ -2030,21 +2044,9 @@ impl AggregatorImpl for FMP4Mux {
}
};
let pts = buffer.pts();
// Queue up the buffer and update GOP tracking state
self.queue_gops(aggregator, idx, stream, &segment, buffer)?;
// If we have a PTS with this buffer, check if a new force-keyunit event for the next
// fragment start has to be created
if let Some(pts) = pts {
if let Some(event) = self
.create_force_keyunit_event(aggregator, stream, &settings, &segment, pts)?
{
upstream_events.push((stream.sinkpad.clone(), event));
}
}
// Check if this stream is filled enough now.
if let Some((queued_end_pts, fragment_start_pts)) = Option::zip(
stream
@ -2093,15 +2095,20 @@ impl AggregatorImpl for FMP4Mux {
state.earliest_pts = Some(earliest_pts);
state.fragment_start_pts = Some(earliest_pts);
gst::debug!(
CAT,
obj: aggregator,
"Sending first force-keyunit event for running time {}",
earliest_pts + settings.fragment_duration,
);
let fku = gst_video::UpstreamForceKeyUnitEvent::builder()
.running_time(earliest_pts + settings.fragment_duration)
.all_headers(true)
.build();
for stream in &mut state.streams {
if let Some(event) = self.create_initial_force_keyunit_event(
aggregator,
stream,
&settings,
earliest_pts,
)? {
upstream_events.push((stream.sinkpad.clone(), event));
}
upstream_events.push((stream.sinkpad.clone(), fku.clone()));
// Check if this stream is filled enough now.
if let Some(queued_end_pts) = stream
@ -2127,7 +2134,31 @@ impl AggregatorImpl for FMP4Mux {
}
// If enough GOPs were queued, drain and create the output fragment
self.drain(aggregator, &mut state, &settings, timeout, all_eos)?
match self.drain(
aggregator,
&mut state,
&settings,
timeout,
all_eos,
&mut upstream_events,
) {
Ok(res) => res,
Err(gst_base::AGGREGATOR_FLOW_NEED_DATA) => {
gst::element_warning!(
aggregator,
gst::StreamError::Format,
["Longer GOPs than fragment duration"]
);
state.timeout_delay += gst::ClockTime::from_seconds(1);
drop(state);
for (sinkpad, event) in upstream_events {
sinkpad.push_event(event);
}
return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
}
Err(err) => return Err(err),
}
};
for (sinkpad, event) in upstream_events {

View file

@ -48,7 +48,7 @@ fn test_buffer_flags_single_stream(cmaf: bool) {
gst::ClockTime::from_seconds(60 * 60 * 1000)
};
// Push 7 buffers of 1s each, 1st and last buffer without DELTA_UNIT flag
// Push 7 buffers of 1s each, 1st and 6 buffer without DELTA_UNIT flag
for i in 0..7 {
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
@ -881,3 +881,541 @@ fn test_gap_events() {
let ev = h1.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Eos);
}
#[test]
fn test_single_stream_short_gops() {
init();
let mut h = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
// 5s fragment duration
h.element()
.unwrap()
.set_property("fragment-duration", gst::ClockTime::from_seconds(5));
h.set_src_caps(
gst::Caps::builder("video/x-h264")
.field("width", 1920i32)
.field("height", 1080i32)
.field("framerate", gst::Fraction::new(30, 1))
.field("stream-format", "avc")
.field("alignment", "au")
.field("codec_data", gst::Buffer::with_size(1).unwrap())
.build(),
);
h.play();
let output_offset = gst::ClockTime::from_seconds(60 * 60 * 1000);
// Push 8 buffers of 1s each, 1st, 4th and 7th buffer without DELTA_UNIT flag
for i in 0..8 {
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
let buffer = buffer.get_mut().unwrap();
buffer.set_pts(gst::ClockTime::from_seconds(i));
buffer.set_dts(gst::ClockTime::from_seconds(i));
buffer.set_duration(gst::ClockTime::SECOND);
if i != 0 && i != 3 && i != 6 {
buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
}
}
assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok));
if i == 2 || i == 7 {
let ev = loop {
let ev = h.pull_upstream_event().unwrap();
if ev.type_() != gst::EventType::Reconfigure
&& ev.type_() != gst::EventType::Latency
{
break ev;
}
};
let fku_time = if i == 2 {
gst::ClockTime::from_seconds(5)
} else {
gst::ClockTime::from_seconds(8)
};
assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
assert_eq!(
gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
gst_video::UpstreamForceKeyUnitEvent {
running_time: Some(fku_time),
all_headers: true,
count: 0
}
);
}
}
let header = h.pull().unwrap();
assert_eq!(
header.flags(),
gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
);
assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset));
assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset));
let fragment_header = h.pull().unwrap();
assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
assert_eq!(
fragment_header.pts(),
Some(gst::ClockTime::ZERO + output_offset)
);
assert_eq!(
fragment_header.dts(),
Some(gst::ClockTime::ZERO + output_offset)
);
assert_eq!(
fragment_header.duration(),
Some(gst::ClockTime::from_seconds(3))
);
for i in 0..3 {
let buffer = h.pull().unwrap();
if i == 2 {
assert_eq!(
buffer.flags(),
gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
);
} else {
assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
}
assert_eq!(
buffer.pts(),
Some(gst::ClockTime::from_seconds(i) + output_offset)
);
assert_eq!(
buffer.dts(),
Some(gst::ClockTime::from_seconds(i) + output_offset)
);
assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
}
h.push_event(gst::event::Eos::new());
let fragment_header = h.pull().unwrap();
assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
assert_eq!(
fragment_header.pts(),
Some(gst::ClockTime::from_seconds(3) + output_offset)
);
assert_eq!(
fragment_header.dts(),
Some(gst::ClockTime::from_seconds(3) + output_offset)
);
assert_eq!(
fragment_header.duration(),
Some(gst::ClockTime::from_seconds(5))
);
for i in 3..8 {
let buffer = h.pull().unwrap();
if i == 7 {
assert_eq!(
buffer.flags(),
gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
);
} else {
assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
}
assert_eq!(
buffer.pts(),
Some(gst::ClockTime::from_seconds(i) + output_offset)
);
assert_eq!(
buffer.dts(),
Some(gst::ClockTime::from_seconds(i) + output_offset)
);
assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
}
let ev = h.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::StreamStart);
let ev = h.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Caps);
let ev = h.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Segment);
let ev = h.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Eos);
}
#[test]
fn test_single_stream_long_gops() {
init();
let mut h = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
// 5s fragment duration
h.element()
.unwrap()
.set_property("fragment-duration", gst::ClockTime::from_seconds(5));
h.set_src_caps(
gst::Caps::builder("video/x-h264")
.field("width", 1920i32)
.field("height", 1080i32)
.field("framerate", gst::Fraction::new(30, 1))
.field("stream-format", "avc")
.field("alignment", "au")
.field("codec_data", gst::Buffer::with_size(1).unwrap())
.build(),
);
h.play();
let output_offset = gst::ClockTime::from_seconds(60 * 60 * 1000);
// Push 10 buffers of 1s each, 1st and 7th buffer without DELTA_UNIT flag
for i in 0..10 {
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
let buffer = buffer.get_mut().unwrap();
buffer.set_pts(gst::ClockTime::from_seconds(i));
buffer.set_dts(gst::ClockTime::from_seconds(i));
buffer.set_duration(gst::ClockTime::SECOND);
if i != 0 && i != 6 {
buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
}
}
assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok));
if i == 2 || i == 7 {
let ev = loop {
let ev = h.pull_upstream_event().unwrap();
if ev.type_() != gst::EventType::Reconfigure
&& ev.type_() != gst::EventType::Latency
{
break ev;
}
};
let fku_time = if i == 2 {
gst::ClockTime::from_seconds(5)
} else {
gst::ClockTime::from_seconds(11)
};
assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
assert_eq!(
gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
gst_video::UpstreamForceKeyUnitEvent {
running_time: Some(fku_time),
all_headers: true,
count: 0
}
);
}
}
let header = h.pull().unwrap();
assert_eq!(
header.flags(),
gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
);
assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset));
assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset));
let fragment_header = h.pull().unwrap();
assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
assert_eq!(
fragment_header.pts(),
Some(gst::ClockTime::ZERO + output_offset)
);
assert_eq!(
fragment_header.dts(),
Some(gst::ClockTime::ZERO + output_offset)
);
assert_eq!(
fragment_header.duration(),
Some(gst::ClockTime::from_seconds(6))
);
for i in 0..6 {
let buffer = h.pull().unwrap();
if i == 5 {
assert_eq!(
buffer.flags(),
gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
);
} else {
assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
}
assert_eq!(
buffer.pts(),
Some(gst::ClockTime::from_seconds(i) + output_offset)
);
assert_eq!(
buffer.dts(),
Some(gst::ClockTime::from_seconds(i) + output_offset)
);
assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
}
h.push_event(gst::event::Eos::new());
let fragment_header = h.pull().unwrap();
assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
assert_eq!(
fragment_header.pts(),
Some(gst::ClockTime::from_seconds(6) + output_offset)
);
assert_eq!(
fragment_header.dts(),
Some(gst::ClockTime::from_seconds(6) + output_offset)
);
assert_eq!(
fragment_header.duration(),
Some(gst::ClockTime::from_seconds(4))
);
for i in 6..10 {
let buffer = h.pull().unwrap();
if i == 9 {
assert_eq!(
buffer.flags(),
gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
);
} else {
assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
}
assert_eq!(
buffer.pts(),
Some(gst::ClockTime::from_seconds(i) + output_offset)
);
assert_eq!(
buffer.dts(),
Some(gst::ClockTime::from_seconds(i) + output_offset)
);
assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
}
let ev = h.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::StreamStart);
let ev = h.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Caps);
let ev = h.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Segment);
let ev = h.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Eos);
}
#[test]
fn test_buffer_multi_stream_short_gops() {
init();
let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None);
// 5s fragment duration
h1.element()
.unwrap()
.set_property("fragment-duration", gst::ClockTime::from_seconds(5));
h1.set_src_caps(
gst::Caps::builder("video/x-h264")
.field("width", 1920i32)
.field("height", 1080i32)
.field("framerate", gst::Fraction::new(30, 1))
.field("stream-format", "avc")
.field("alignment", "au")
.field("codec_data", gst::Buffer::with_size(1).unwrap())
.build(),
);
h1.play();
h2.set_src_caps(
gst::Caps::builder("audio/mpeg")
.field("mpegversion", 4i32)
.field("channels", 1i32)
.field("rate", 44100i32)
.field("stream-format", "raw")
.field("base-profile", "lc")
.field("profile", "lc")
.field("level", "2")
.field(
"codec_data",
gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]),
)
.build(),
);
h2.play();
let output_offset = gst::ClockTime::from_seconds(60 * 60 * 1000);
// Push 8 buffers of 1s each, 1st, 4th and 7th buffer without DELTA_UNIT flag
for i in 0..8 {
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
let buffer = buffer.get_mut().unwrap();
buffer.set_pts(gst::ClockTime::from_seconds(i));
buffer.set_dts(gst::ClockTime::from_seconds(i));
buffer.set_duration(gst::ClockTime::SECOND);
if i != 0 && i != 3 && i != 6 {
buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
}
}
assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok));
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
let buffer = buffer.get_mut().unwrap();
buffer.set_pts(gst::ClockTime::from_seconds(i));
buffer.set_dts(gst::ClockTime::from_seconds(i));
buffer.set_duration(gst::ClockTime::SECOND);
}
assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok));
if i == 2 || i == 7 {
let ev = loop {
let ev = h1.pull_upstream_event().unwrap();
if ev.type_() != gst::EventType::Reconfigure
&& ev.type_() != gst::EventType::Latency
{
break ev;
}
};
let fku_time = if i == 2 {
gst::ClockTime::from_seconds(5)
} else {
gst::ClockTime::from_seconds(8)
};
assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
assert_eq!(
gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
gst_video::UpstreamForceKeyUnitEvent {
running_time: Some(fku_time),
all_headers: true,
count: 0
}
);
let ev = loop {
let ev = h2.pull_upstream_event().unwrap();
if ev.type_() != gst::EventType::Reconfigure
&& ev.type_() != gst::EventType::Latency
{
break ev;
}
};
assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
assert_eq!(
gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
gst_video::UpstreamForceKeyUnitEvent {
running_time: Some(fku_time),
all_headers: true,
count: 0
}
);
}
}
let header = h1.pull().unwrap();
assert_eq!(
header.flags(),
gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
);
assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset));
assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset));
let fragment_header = h1.pull().unwrap();
assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
assert_eq!(
fragment_header.pts(),
Some(gst::ClockTime::ZERO + output_offset)
);
assert_eq!(
fragment_header.dts(),
Some(gst::ClockTime::ZERO + output_offset)
);
assert_eq!(
fragment_header.duration(),
Some(gst::ClockTime::from_seconds(3))
);
for i in 0..3 {
for j in 0..2 {
let buffer = h1.pull().unwrap();
if i == 2 && j == 1 {
assert_eq!(
buffer.flags(),
gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
);
} else {
assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
}
assert_eq!(
buffer.pts(),
Some(gst::ClockTime::from_seconds(i) + output_offset)
);
if j == 0 {
assert_eq!(
buffer.dts(),
Some(gst::ClockTime::from_seconds(i) + output_offset)
);
} else {
assert!(buffer.dts().is_none());
}
assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
}
}
h1.push_event(gst::event::Eos::new());
h2.push_event(gst::event::Eos::new());
let fragment_header = h1.pull().unwrap();
assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
assert_eq!(
fragment_header.pts(),
Some(gst::ClockTime::from_seconds(3) + output_offset)
);
assert_eq!(
fragment_header.dts(),
Some(gst::ClockTime::from_seconds(3) + output_offset)
);
assert_eq!(
fragment_header.duration(),
Some(gst::ClockTime::from_seconds(5))
);
for i in 3..8 {
for j in 0..2 {
let buffer = h1.pull().unwrap();
if i == 7 && j == 1 {
assert_eq!(
buffer.flags(),
gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
);
} else {
assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
}
assert_eq!(
buffer.pts(),
Some(gst::ClockTime::from_seconds(i) + output_offset)
);
if j == 0 {
assert_eq!(
buffer.dts(),
Some(gst::ClockTime::from_seconds(i) + output_offset)
);
} else {
assert!(buffer.dts().is_none());
}
assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
}
}
let ev = h1.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::StreamStart);
let ev = h1.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Caps);
let ev = h1.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Segment);
let ev = h1.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Eos);
}