togglerecord: Clip raw audio/video buffers to the segment/recording boundaries

And extend tests to actually check for this to work.
This commit is contained in:
Sebastian Dröge 2019-07-11 00:45:02 +03:00
parent 20c02c4b38
commit f7fd1e3f99
2 changed files with 414 additions and 106 deletions

View file

@ -187,6 +187,12 @@ trait HandleData: Sized {
}
fn get_duration(&self, state: &StreamState) -> gst::ClockTime;
fn is_keyframe(&self) -> bool;
fn can_clip(&self, state: &StreamState) -> bool;
fn clip(
self,
state: &StreamState,
segment: &gst::FormattedSegment<gst::ClockTime>,
) -> Option<Self>;
}
impl HandleData for (gst::ClockTime, gst::ClockTime) {
@ -205,6 +211,26 @@ impl HandleData for (gst::ClockTime, gst::ClockTime) {
fn is_keyframe(&self) -> bool {
true
}
fn can_clip(&self, _state: &StreamState) -> bool {
true
}
fn clip(
self,
_state: &StreamState,
segment: &gst::FormattedSegment<gst::ClockTime>,
) -> Option<Self> {
let stop = if self.1.is_some() {
self.0 + self.1
} else {
self.0
};
segment
.clip(self.0, stop)
.map(|(start, stop)| (start, stop - start))
}
}
impl HandleData for gst::Buffer {
@ -239,11 +265,9 @@ impl HandleData for gst::Buffer {
let size = self.get_size() as u64;
let num_samples = size / audio_info.bpf() as u64;
let duration = gst::SECOND
gst::SECOND
.mul_div_floor(num_samples, audio_info.rate() as u64)
.unwrap_or(gst::CLOCK_TIME_NONE);
duration
.unwrap_or(gst::CLOCK_TIME_NONE)
} else {
gst::CLOCK_TIME_NONE
}
@ -252,6 +276,71 @@ impl HandleData for gst::Buffer {
fn is_keyframe(&self) -> bool {
!gst::BufferRef::get_flags(self).contains(gst::BufferFlags::DELTA_UNIT)
}
fn can_clip(&self, state: &StreamState) -> bool {
// Only do actual clipping for raw audio/video
if let Some(ref audio_info) = state.audio_info {
if audio_info.format() == gst_audio::AudioFormat::Unknown
|| audio_info.format() == gst_audio::AudioFormat::Encoded
|| audio_info.rate() == 0
|| audio_info.bpf() == 0
{
return false;
}
} else if let Some(ref video_info) = state.video_info {
if video_info.format() == gst_video::VideoFormat::Unknown
|| video_info.format() == gst_video::VideoFormat::Encoded
|| self.get_dts_or_pts() != self.get_pts()
{
return false;
}
} else {
return false;
}
true
}
fn clip(
mut self,
state: &StreamState,
segment: &gst::FormattedSegment<gst::ClockTime>,
) -> Option<Self> {
// Only do actual clipping for raw audio/video
if !self.can_clip(state) {
return Some(self);
}
let pts = HandleData::get_pts(&self);
let duration = HandleData::get_duration(&self, state);
let stop = if duration.is_some() {
pts + duration
} else {
pts
};
if let Some(ref audio_info) = state.audio_info {
gst_audio::audio_buffer_clip(
self,
segment.upcast_ref(),
audio_info.rate(),
audio_info.bpf(),
)
} else if let Some(_) = state.video_info {
segment.clip(pts, stop).map(move |(start, stop)| {
{
let buffer = self.make_mut();
buffer.set_pts(start);
buffer.set_dts(start);
buffer.set_duration(stop - start);
}
self
})
} else {
unreachable!();
}
}
}
struct ToggleRecord {
@ -348,6 +437,15 @@ impl ToggleRecord {
dts_or_pts
};
let data = match data.clip(&state, &state.in_segment) {
None => {
gst_log!(self.cat, obj: pad, "Dropping raw data outside segment");
return Ok(HandleResult::Drop);
}
Some(data) => data,
};
// This will only do anything for non-raw data
dts_or_pts = cmp::max(state.in_segment.get_start(), dts_or_pts);
dts_or_pts_end = cmp::max(state.in_segment.get_start(), dts_or_pts_end);
if state.in_segment.get_stop().is_some() {
@ -355,9 +453,9 @@ impl ToggleRecord {
dts_or_pts_end = cmp::min(state.in_segment.get_stop(), dts_or_pts_end);
}
let mut current_running_time = state.in_segment.to_running_time(dts_or_pts);
current_running_time = cmp::max(current_running_time, state.current_running_time);
state.current_running_time = current_running_time;
let current_running_time = state.in_segment.to_running_time(dts_or_pts);
let current_running_time_end = state.in_segment.to_running_time(dts_or_pts_end);
state.current_running_time = cmp::max(current_running_time_end, state.current_running_time);
// Wake up everybody, we advanced a bit
// Important: They will only be able to advance once we're done with this
@ -365,8 +463,6 @@ impl ToggleRecord {
// get the wrong state
self.main_stream_cond.notify_all();
let current_running_time_end = state.in_segment.to_running_time(dts_or_pts_end);
gst_log!(
self.cat,
obj: pad,
@ -420,19 +516,18 @@ impl ToggleRecord {
return Ok(HandleResult::Pass(data));
}
// Remember the time when we stopped: now!
// Remember the time when we stopped: now, i.e. right before the current buffer!
rec_state.last_recording_stop = current_running_time;
gst_debug!(self.cat, obj: pad, "Stopping at {}", current_running_time);
// Then unlock and wait for all other streams to reach
// it or go EOS instead.
// Then unlock and wait for all other streams to reach it or go EOS instead.
drop(rec_state);
while !self.other_streams.lock().0.iter().all(|s| {
let s = s.state.lock();
s.eos
|| (s.current_running_time.is_some()
&& s.current_running_time >= current_running_time)
&& s.current_running_time >= current_running_time_end)
}) {
gst_log!(self.cat, obj: pad, "Waiting for other streams to stop");
self.main_stream_cond.wait(&mut state);
@ -511,7 +606,7 @@ impl ToggleRecord {
let s = s.state.lock();
s.eos
|| (s.current_running_time.is_some()
&& s.current_running_time >= current_running_time)
&& s.current_running_time >= current_running_time_end)
}) {
gst_log!(self.cat, obj: pad, "Waiting for other streams to start");
self.main_stream_cond.wait(&mut state);
@ -586,32 +681,26 @@ impl ToggleRecord {
pts
};
pts = cmp::max(state.in_segment.get_start(), pts);
if state.in_segment.get_stop().is_some() && pts >= state.in_segment.get_stop() {
state.current_running_time = state
.in_segment
.to_running_time(state.in_segment.get_stop());
state.eos = true;
gst_debug!(
self.cat,
obj: pad,
"After segment end {} >= {}, EOS",
pts,
state.in_segment.get_stop()
);
let data = match data.clip(&state, &state.in_segment) {
None => {
gst_log!(self.cat, obj: pad, "Dropping raw data outside segment");
return Ok(HandleResult::Drop);
}
Some(data) => data,
};
return Ok(HandleResult::Eos);
}
// This will only do anything for non-raw data
pts = cmp::max(state.in_segment.get_start(), pts);
pts_end = cmp::max(state.in_segment.get_start(), pts_end);
if state.in_segment.get_stop().is_some() {
pts = cmp::min(state.in_segment.get_stop(), pts);
pts_end = cmp::min(state.in_segment.get_stop(), pts_end);
}
let mut current_running_time = state.in_segment.to_running_time(pts);
current_running_time = cmp::max(current_running_time, state.current_running_time);
state.current_running_time = current_running_time;
let current_running_time = state.in_segment.to_running_time(pts);
let current_running_time_end = state.in_segment.to_running_time(pts_end);
state.current_running_time = cmp::max(current_running_time_end, state.current_running_time);
gst_log!(
self.cat,
obj: pad,
@ -631,7 +720,7 @@ impl ToggleRecord {
self.main_stream_cond.notify_all();
while (main_state.current_running_time == gst::CLOCK_TIME_NONE
|| main_state.current_running_time < current_running_time)
|| main_state.current_running_time < current_running_time_end)
&& !main_state.eos
&& !stream.state.lock().flushing
{
@ -645,7 +734,10 @@ impl ToggleRecord {
self.main_stream_cond.wait(&mut main_state);
}
if stream.state.lock().flushing {
state = stream.state.lock();
if state.flushing {
gst_debug!(self.cat, obj: pad, "Flushing");
return Ok(HandleResult::Flushing);
}
@ -655,36 +747,126 @@ impl ToggleRecord {
// If the main stream is EOS, we are also EOS unless we are
// before the final last recording stop running time
if main_state.eos {
// If we have no start or stop position (we never recorded), or are after the current
// stop position that we're EOS now
// If we're before the start position (we were starting before EOS),
// drop the buffer
if rec_state.last_recording_stop.is_none()
|| rec_state.last_recording_start.is_none()
|| current_running_time_end > rec_state.last_recording_stop
{
// If we have no start or stop position (we never recorded) then we're EOS too now
if rec_state.last_recording_stop.is_none() || rec_state.last_recording_start.is_none() {
gst_debug!(
self.cat,
obj: pad,
"Main stream EOS and we're EOS ({} > {})",
current_running_time_end,
rec_state.last_recording_stop
"Main stream EOS and recording never started",
);
return Ok(HandleResult::Eos);
} else if data.can_clip(&*state)
&& current_running_time < rec_state.last_recording_start
&& current_running_time_end > rec_state.last_recording_start
{
// Otherwise if we're before the recording start but the end of the buffer is after
// the start and we can clip, clip the buffer and pass it onwards.
gst_debug!(
self.cat,
obj: pad,
"Main stream EOS and we're not EOS yet (overlapping recording start, {} < {} < {})",
current_running_time,
rec_state.last_recording_start,
current_running_time_end
);
let mut clip_start = state
.in_segment
.position_from_running_time(rec_state.last_recording_start);
if clip_start.is_none() {
clip_start = state.in_segment.get_start();
}
let mut clip_stop = state
.in_segment
.position_from_running_time(rec_state.last_recording_stop);
if clip_stop.is_none() {
clip_stop = state.in_segment.get_stop();
}
let mut segment = state.in_segment.clone();
segment.set_start(clip_start);
segment.set_stop(clip_stop);
gst_log!(self.cat, obj: pad, "Clipping to segment {:?}", segment,);
if let Some(data) = data.clip(&*state, &segment) {
return Ok(HandleResult::Pass(data));
} else {
gst_warning!(self.cat, obj: pad, "Complete buffer clipped!");
return Ok(HandleResult::Drop);
}
} else if current_running_time < rec_state.last_recording_start {
// Otherwise if the buffer starts before the recording start, drop it. This
// means that we either can't clip, or that the end is also before the
// recording start
gst_debug!(
self.cat,
obj: pad,
"Main stream EOS and we're not EOS yet (before recording start, {} <= {})",
"Main stream EOS and we're not EOS yet (before recording start, {} < {})",
current_running_time,
rec_state.last_recording_start
);
return Ok(HandleResult::Drop);
} else {
} else if data.can_clip(&*state)
&& current_running_time < rec_state.last_recording_stop
&& current_running_time_end > rec_state.last_recording_stop
{
// Similarly if the end is after the recording stop but the start is before and we
// can clip, clip the buffer and pass it through.
gst_debug!(
self.cat,
obj: pad,
"Main stream EOS and we're not EOS yet (overlapping recording end, {} < {} < {})",
current_running_time,
rec_state.last_recording_stop,
current_running_time_end
);
let mut clip_start = state
.in_segment
.position_from_running_time(rec_state.last_recording_start);
if clip_start.is_none() {
clip_start = state.in_segment.get_start();
}
let mut clip_stop = state
.in_segment
.position_from_running_time(rec_state.last_recording_stop);
if clip_stop.is_none() {
clip_stop = state.in_segment.get_stop();
}
let mut segment = state.in_segment.clone();
segment.set_start(clip_start);
segment.set_stop(clip_stop);
gst_log!(self.cat, obj: pad, "Clipping to segment {:?}", segment,);
if let Some(data) = data.clip(&*state, &segment) {
return Ok(HandleResult::Pass(data));
} else {
gst_warning!(self.cat, obj: pad, "Complete buffer clipped!");
return Ok(HandleResult::Eos);
}
} else if current_running_time_end > rec_state.last_recording_stop {
// Otherwise if the end of the buffer is after the recording stop, we're EOS
// now. This means that we either couldn't clip or that the start is also after
// the recording stop
gst_debug!(
self.cat,
obj: pad,
"Main stream EOS and we're not EOS yet (before recording end, {} <= {} < {})",
"Main stream EOS and we're EOS too (after recording end, {} > {})",
current_running_time_end,
rec_state.last_recording_stop
);
return Ok(HandleResult::Eos);
} else {
// In all other cases the buffer is fully between recording start and end and
// can be passed through as is
assert!(current_running_time >= rec_state.last_recording_start);
assert!(current_running_time_end <= rec_state.last_recording_stop);
gst_debug!(
self.cat,
obj: pad,
"Main stream EOS and we're not EOS yet (before recording end, {} <= {} <= {})",
rec_state.last_recording_start,
current_running_time,
rec_state.last_recording_stop
@ -693,7 +875,8 @@ impl ToggleRecord {
}
}
assert!(main_state.current_running_time >= current_running_time);
// The end of our buffer is before the end of the previous buffer of the main stream
assert!(main_state.current_running_time >= current_running_time_end);
match rec_state.recording_state {
RecordingState::Recording => {
@ -722,6 +905,36 @@ impl ToggleRecord {
rec_state.last_recording_stop
);
Ok(HandleResult::Pass(data))
} else if data.can_clip(&*state)
&& current_running_time < rec_state.last_recording_stop
&& current_running_time_end > rec_state.last_recording_stop
{
gst_log!(
self.cat,
obj: pad,
"Passing buffer (stopping: {} < {} < {})",
current_running_time,
rec_state.last_recording_stop,
current_running_time_end,
);
let mut clip_stop = state
.in_segment
.position_from_running_time(rec_state.last_recording_stop);
if clip_stop.is_none() {
clip_stop = state.in_segment.get_stop();
}
let mut segment = state.in_segment.clone();
segment.set_stop(clip_stop);
gst_log!(self.cat, obj: pad, "Clipping to segment {:?}", segment,);
if let Some(data) = data.clip(&*state, &segment) {
Ok(HandleResult::Pass(data))
} else {
gst_warning!(self.cat, obj: pad, "Complete buffer clipped!");
Ok(HandleResult::Drop)
}
} else {
gst_log!(
self.cat,
@ -756,6 +969,36 @@ impl ToggleRecord {
rec_state.last_recording_start
);
Ok(HandleResult::Pass(data))
} else if data.can_clip(&*state)
&& current_running_time < rec_state.last_recording_start
&& current_running_time_end > rec_state.last_recording_start
{
gst_log!(
self.cat,
obj: pad,
"Passing buffer (starting: {} < {} < {})",
current_running_time,
rec_state.last_recording_start,
current_running_time_end,
);
let mut clip_start = state
.in_segment
.position_from_running_time(rec_state.last_recording_start);
if clip_start.is_none() {
clip_start = state.in_segment.get_start();
}
let mut segment = state.in_segment.clone();
segment.set_start(clip_start);
gst_log!(self.cat, obj: pad, "Clipping to segment {:?}", segment,);
if let Some(data) = data.clip(&*state, &segment) {
Ok(HandleResult::Pass(data))
} else {
gst_warning!(self.cat, obj: pad, "Complete buffer clipped!");
Ok(HandleResult::Drop)
}
} else {
gst_log!(
self.cat,
@ -776,19 +1019,14 @@ impl ToggleRecord {
element: &gst::Element,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let stream = self
.pads
.lock()
.get(pad)
.map(|stream| stream.clone())
.ok_or_else(|| {
gst_element_error!(
element,
gst::CoreError::Pad,
["Unknown pad {:?}", pad.get_name()]
);
gst::FlowError::Error
})?;
let stream = self.pads.lock().get(pad).cloned().ok_or_else(|| {
gst_element_error!(
element,
gst::CoreError::Pad,
["Unknown pad {:?}", pad.get_name()]
);
gst::FlowError::Error
})?;
gst_log!(self.cat, obj: pad, "Handling buffer {:?}", buffer);
@ -930,10 +1168,12 @@ impl ToggleRecord {
let s = caps.get_structure(0).unwrap();
if s.get_name().starts_with("audio/") {
state.audio_info = gst_audio::AudioInfo::from_caps(caps);
gst_log!(self.cat, obj: pad, "Got audio caps {:?}", state.audio_info);
state.video_info = None;
} else if s.get_name().starts_with("video/") {
state.audio_info = None;
state.video_info = gst_video::VideoInfo::from_caps(caps);
gst_log!(self.cat, obj: pad, "Got video caps {:?}", state.video_info);
} else {
state.audio_info = None;
state.video_info = None;

View file

@ -61,7 +61,9 @@ fn setup_sender_receiver(
fakesink.set_property("async", &false).unwrap();
pipeline.add(&fakesink).unwrap();
let (srcpad, sinkpad) = if pad == "src" {
let main_stream = pad == "src";
let (srcpad, sinkpad) = if main_stream {
(
togglerecord.get_static_pad("src").unwrap(),
togglerecord.get_static_pad("sink").unwrap(),
@ -112,6 +114,23 @@ fn setup_sender_receiver(
while let Ok(send_data) = receiver_input.recv() {
if first {
assert!(sinkpad.send_event(gst::Event::new_stream_start("test").build()));
let caps = if main_stream {
gst::Caps::builder("video/x-raw")
.field("format", &"ARGB")
.field("width", &320i32)
.field("height", &240i32)
.field("framerate", &gst::Fraction::new(50, 1))
.build()
} else {
gst::Caps::builder("audio/x-raw")
.field("format", &"U8")
.field("layout", &"interleaved")
.field("rate", &8000i32)
.field("channels", &1i32)
.build()
};
assert!(sinkpad.send_event(gst::Event::new_caps(&caps).build()));
let segment = gst::FormattedSegment::<gst::ClockTime>::new();
assert!(sinkpad.send_event(gst::Event::new_segment(&segment).build()));
@ -124,18 +143,24 @@ fn setup_sender_receiver(
first = false;
}
let buffer = if main_stream {
gst::Buffer::with_size(320 * 240 * 4).unwrap()
} else {
gst::Buffer::with_size(160).unwrap()
};
match send_data {
SendData::Eos => {
break;
}
SendData::Buffers(n) => {
for _ in 0..n {
let mut buffer = gst::Buffer::new();
buffer
.get_mut()
.unwrap()
.set_pts(offset + i * 20 * gst::MSECOND);
buffer.get_mut().unwrap().set_duration(20 * gst::MSECOND);
let mut buffer = buffer.clone();
{
let buffer = buffer.make_mut();
buffer.set_pts(offset + i * 20 * gst::MSECOND);
buffer.set_duration(20 * gst::MSECOND);
}
let _ = sinkpad.chain(buffer);
i += 1;
}
@ -181,13 +206,17 @@ fn recv_buffers(
receiver_output: &mpsc::Receiver<Either<gst::Buffer, gst::Event>>,
segment: &mut gst::FormattedSegment<gst::ClockTime>,
wait_buffers: usize,
) -> Vec<(gst::ClockTime, gst::ClockTime)> {
) -> Vec<(gst::ClockTime, gst::ClockTime, gst::ClockTime)> {
let mut res = Vec::new();
let mut n_buffers = 0;
while let Ok(val) = receiver_output.recv() {
match val {
Left(buffer) => {
res.push((segment.to_running_time(buffer.get_pts()), buffer.get_pts()));
res.push((
segment.to_running_time(buffer.get_pts()),
buffer.get_pts(),
buffer.get_duration(),
));
n_buffers += 1;
if wait_buffers > 0 && n_buffers == wait_buffers {
return res;
@ -198,9 +227,9 @@ fn recv_buffers(
match event.view() {
EventView::Gap(ref e) => {
let (ts, _) = e.get();
let (ts, duration) = e.get();
res.push((segment.to_running_time(ts), ts));
res.push((segment.to_running_time(ts), ts, duration));
n_buffers += 1;
if wait_buffers > 0 && n_buffers == wait_buffers {
return res;
@ -263,10 +292,11 @@ fn test_one_stream_open() {
let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers = recv_buffers(&receiver_output, &mut segment, 0);
assert_eq!(buffers.len(), 10);
for (index, &(running_time, pts)) in buffers.iter().enumerate() {
for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
thread.join().unwrap();
@ -295,10 +325,11 @@ fn test_one_stream_gaps_open() {
let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers = recv_buffers(&receiver_output, &mut segment, 0);
assert_eq!(buffers.len(), 10);
for (index, &(running_time, pts)) in buffers.iter().enumerate() {
for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
thread.join().unwrap();
@ -328,10 +359,11 @@ fn test_one_stream_close_open() {
let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers = recv_buffers(&receiver_output, &mut segment, 0);
assert_eq!(buffers.len(), 10);
for (index, &(running_time, pts)) in buffers.iter().enumerate() {
for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, (10 + index) * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
thread.join().unwrap();
@ -362,10 +394,11 @@ fn test_one_stream_open_close() {
let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers = recv_buffers(&receiver_output, &mut segment, 0);
assert_eq!(buffers.len(), 10);
for (index, &(running_time, pts)) in buffers.iter().enumerate() {
for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
thread.join().unwrap();
@ -399,7 +432,7 @@ fn test_one_stream_open_close_open() {
let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers = recv_buffers(&receiver_output, &mut segment, 0);
assert_eq!(buffers.len(), 20);
for (index, &(running_time, pts)) in buffers.iter().enumerate() {
for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
} else {
@ -409,6 +442,7 @@ fn test_one_stream_open_close_open() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
thread.join().unwrap();
@ -444,20 +478,22 @@ fn test_two_stream_open() {
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts)) in buffers_1.iter().enumerate() {
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
// Last buffer should be dropped from second stream
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts)) in buffers_2.iter().enumerate() {
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_2.len(), 10);
@ -485,7 +521,7 @@ fn test_two_stream_open_shift() {
togglerecord.set_property("record", &true).unwrap();
sender_input_1.send(SendData::Buffers(10)).unwrap();
sender_input_2.send(SendData::Buffers(10)).unwrap();
sender_input_2.send(SendData::Buffers(11)).unwrap();
receiver_input_done_1.recv().unwrap();
sender_input_1.send(SendData::Eos).unwrap();
receiver_input_done_1.recv().unwrap();
@ -495,22 +531,28 @@ fn test_two_stream_open_shift() {
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts)) in buffers_1.iter().enumerate() {
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
// Last buffer should be dropped from second stream
// Second to last buffer should be clipped from second stream, last should be dropped
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts)) in buffers_2.iter().enumerate() {
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, 5 * gst::MSECOND + index * 20 * gst::MSECOND);
assert_eq!(pts, 5 * gst::MSECOND + index * 20 * gst::MSECOND);
if index == 9 {
assert_eq!(duration, 15 * gst::MSECOND);
} else {
assert_eq!(duration, 20 * gst::MSECOND);
}
}
assert_eq!(buffers_2.len(), 9);
assert_eq!(buffers_2.len(), 10);
thread_1.join().unwrap();
thread_2.join().unwrap();
@ -536,7 +578,7 @@ fn test_two_stream_open_shift_main() {
togglerecord.set_property("record", &true).unwrap();
sender_input_1.send(SendData::Buffers(10)).unwrap();
sender_input_2.send(SendData::Buffers(11)).unwrap();
sender_input_2.send(SendData::Buffers(12)).unwrap();
receiver_input_done_1.recv().unwrap();
sender_input_1.send(SendData::Eos).unwrap();
receiver_input_done_1.recv().unwrap();
@ -547,22 +589,35 @@ fn test_two_stream_open_shift_main() {
// PTS 5 maps to running time 0 now
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts)) in buffers_1.iter().enumerate() {
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, 5 * gst::MSECOND + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
// First and last buffer should be dropped from second stream
// First and second last buffer should be clipped from second stream,
// last buffer should be dropped
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts)) in buffers_2.iter().enumerate() {
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, 15 * gst::MSECOND + index * 20 * gst::MSECOND);
assert_eq!(pts, 20 * gst::MSECOND + index * 20 * gst::MSECOND);
if index == 0 {
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, 5 * gst::MSECOND + index * 20 * gst::MSECOND);
assert_eq!(duration, 15 * gst::MSECOND);
} else if index == 10 {
assert_eq!(running_time, index * 20 * gst::MSECOND - 5 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 5 * gst::MSECOND);
} else {
assert_eq!(running_time, index * 20 * gst::MSECOND - 5 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
}
assert_eq!(buffers_2.len(), 9);
assert_eq!(buffers_2.len(), 11);
thread_1.join().unwrap();
thread_2.join().unwrap();
@ -614,20 +669,22 @@ fn test_two_stream_open_close() {
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts)) in buffers_1.iter().enumerate() {
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
// Last buffer should be dropped from second stream
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts)) in buffers_2.iter().enumerate() {
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_2.len(), 10);
@ -681,20 +738,22 @@ fn test_two_stream_close_open() {
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts)) in buffers_1.iter().enumerate() {
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, (10 + index) * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
// Last buffer should be dropped from second stream
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts)) in buffers_2.iter().enumerate() {
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, (10 + index) * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_2.len(), 10);
@ -761,7 +820,7 @@ fn test_two_stream_open_close_open() {
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts)) in buffers_1.iter().enumerate() {
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
} else {
@ -771,13 +830,14 @@ fn test_two_stream_open_close_open() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_1.len(), 20);
// Last buffer should be dropped from second stream
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts)) in buffers_2.iter().enumerate() {
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
} else {
@ -787,6 +847,7 @@ fn test_two_stream_open_close_open() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_2.len(), 20);
@ -859,7 +920,7 @@ fn test_two_stream_open_close_open_gaps() {
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts)) in buffers_1.iter().enumerate() {
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
} else {
@ -869,13 +930,14 @@ fn test_two_stream_open_close_open_gaps() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_1.len(), 20);
// Last buffer should be dropped from second stream
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts)) in buffers_2.iter().enumerate() {
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
} else {
@ -885,6 +947,7 @@ fn test_two_stream_open_close_open_gaps() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_2.len(), 20);
@ -958,20 +1021,22 @@ fn test_two_stream_close_open_close_delta() {
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts)) in buffers_1.iter().enumerate() {
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, (11 + index) * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
// Last buffer should be dropped from second stream
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts)) in buffers_2.iter().enumerate() {
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, (11 + index) * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_2.len(), 10);
@ -1052,7 +1117,7 @@ fn test_three_stream_open_close_open() {
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts)) in buffers_1.iter().enumerate() {
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
} else {
@ -1062,13 +1127,14 @@ fn test_three_stream_open_close_open() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_1.len(), 20);
// Last buffer should be dropped from second stream
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts)) in buffers_2.iter().enumerate() {
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
} else {
@ -1078,12 +1144,13 @@ fn test_three_stream_open_close_open() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_2.len(), 20);
let mut segment_3 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_3 = recv_buffers(&receiver_output_3, &mut segment_3, 0);
for (index, &(running_time, pts)) in buffers_3.iter().enumerate() {
for (index, &(running_time, pts, duration)) in buffers_3.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
} else {
@ -1093,6 +1160,7 @@ fn test_three_stream_open_close_open() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_3.len(), 20);