onvifmetadataparse: Schedule EOS events after the last currently queued up frame

Otherwise EOS might be sent before the last frame's data, or even at a
much earlier frame due to reordering.
This commit is contained in:
Sebastian Dröge 2022-09-27 14:38:35 +03:00 committed by Sebastian Dröge
parent f0b2df49dc
commit d6ab55c263

View file

@ -627,6 +627,8 @@ impl OnvifMetadataParse {
let mut frame = queued_frames.remove(&utc_time).unwrap();
let had_events = !frame.events.is_empty();
let mut eos_events = vec![];
for event in frame.events.drain(..) {
match event.view() {
gst::EventView::Segment(ev) => {
@ -679,6 +681,10 @@ impl OnvifMetadataParse {
data.push(BufferOrEvent::Event(event));
}
gst::EventView::Eos(_) => {
// Forward EOS events *after* any frame data
eos_events.push(event);
}
_ => {
data.push(BufferOrEvent::Event(event));
}
@ -703,6 +709,10 @@ impl OnvifMetadataParse {
));
}
for event in eos_events {
data.push(BufferOrEvent::Event(event));
}
continue;
}
@ -722,6 +732,11 @@ impl OnvifMetadataParse {
diff,
position,
);
for event in eos_events {
data.push(BufferOrEvent::Event(event));
}
continue;
} else if diff > gst::ClockTime::ZERO {
gst::warning!(
@ -781,6 +796,9 @@ impl OnvifMetadataParse {
let mut vec = Vec::new();
if let Err(err) = xml.write_to_decl(&mut vec) {
gst::error!(CAT, obj: element, "Can't serialize XML element: {}", err);
for event in eos_events {
data.push(BufferOrEvent::Event(event));
}
continue;
}
@ -796,6 +814,10 @@ impl OnvifMetadataParse {
);
data.push(BufferOrEvent::Buffer(buffer));
for event in eos_events {
data.push(BufferOrEvent::Event(event));
}
}
gst::trace!(
@ -947,31 +969,49 @@ impl OnvifMetadataParse {
} = &mut *state;
if let Some(utc_time_running_time_mapping) = utc_time_running_time_mapping {
let current_running_time = in_segment
.to_running_time_full(in_segment.position())
.unwrap_or(gst::ClockTime::MIN_SIGNED);
let current_utc_time = running_time_to_utc_time(
*utc_time_running_time_mapping,
current_running_time,
)
.unwrap_or(gst::ClockTime::ZERO);
let frame = if matches!(ev, gst::EventView::Eos(_)) && !queued_frames.is_empty()
{
// FIXME: Use last_entry() once stabilized
let (&eos_utc_time, frame) = queued_frames.iter_mut().last().unwrap();
gst::trace!(
CAT,
obj: element,
"Queueing event with UTC time {} / running time {}",
current_utc_time,
current_running_time.display(),
);
gst::trace!(
CAT,
obj: element,
"Queueing EOS event with UTC time {} / running time {}",
eos_utc_time,
utc_time_to_running_time(*utc_time_running_time_mapping, eos_utc_time)
.display(),
);
frame
} else {
let current_running_time = in_segment
.to_running_time_full(in_segment.position())
.unwrap_or(gst::ClockTime::MIN_SIGNED);
let current_utc_time = running_time_to_utc_time(
*utc_time_running_time_mapping,
current_running_time,
)
.unwrap_or(gst::ClockTime::ZERO);
gst::trace!(
CAT,
obj: element,
"Queueing event with UTC time {} / running time {}",
current_utc_time,
current_running_time.display(),
);
queued_frames
.entry(current_utc_time)
.or_insert_with(Frame::default)
};
let frame = queued_frames
.entry(current_utc_time)
.or_insert_with(Frame::default);
frame.events.push(event);
self.wake_up_output(element, state).is_ok()
} else {
if let gst::EventView::Eos(_) = ev {
if matches!(ev, gst::EventView::Eos(_)) {
gst::error!(
CAT,
obj: element,