diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index f38a4b16..fcc39517 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -4516,6 +4516,20 @@ "readable": true, "type": "guint64", "writable": true + }, + "max-lateness": { + "blurb": "Drop metadata that delayed by more than this", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "200000000", + "max": "18446744073709551615", + "min": "0", + "mutable": "ready", + "readable": true, + "type": "guint64", + "writable": true } }, "rank": "none" diff --git a/net/onvif/src/onvifmetadataparse/imp.rs b/net/onvif/src/onvifmetadataparse/imp.rs index 0f0b9538..2e2e43bc 100644 --- a/net/onvif/src/onvifmetadataparse/imp.rs +++ b/net/onvif/src/onvifmetadataparse/imp.rs @@ -12,10 +12,45 @@ use gst::subclass::prelude::*; use once_cell::sync::Lazy; -use minidom::Element; - use std::collections::BTreeMap; -use std::sync::Mutex; +use std::sync::{Condvar, Mutex}; + +fn utc_time_to_pts( + segment: &gst::FormattedSegment, + utc_time_running_time_mapping: (gst::ClockTime, gst::Signed), + utc_time: gst::ClockTime, +) -> Option { + let running_time = match utc_time_to_running_time(utc_time_running_time_mapping, utc_time)? { + gst::Signed::Positive(running_time) => running_time, + _ => return None, + }; + segment.position_from_running_time(running_time) +} + +fn utc_time_to_running_time( + utc_time_running_time_mapping: (gst::ClockTime, gst::Signed), + utc_time: gst::ClockTime, +) -> Option> { + if utc_time < utc_time_running_time_mapping.0 { + let diff = utc_time_running_time_mapping.0 - utc_time; + utc_time_running_time_mapping.1.checked_sub_unsigned(diff) + } else { + let diff = utc_time - utc_time_running_time_mapping.0; + utc_time_running_time_mapping.1.checked_add_unsigned(diff) + } +} + +fn running_time_to_utc_time( + utc_time_running_time_mapping: (gst::ClockTime, gst::Signed), + running_time: gst::Signed, +) -> Option { + let diff = running_time.checked_sub(utc_time_running_time_mapping.1)?; + + match diff { + gst::Signed::Positive(diff) => utc_time_running_time_mapping.0.checked_add(diff), + gst::Signed::Negative(diff) => utc_time_running_time_mapping.0.checked_sub(diff), + } +} static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( @@ -25,21 +60,97 @@ static CAT: Lazy = Lazy::new(|| { ) }); -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] struct Settings { latency: Option, + max_lateness: Option, } -#[derive(Default, Debug)] +impl Default for Settings { + fn default() -> Self { + Settings { + latency: None, + max_lateness: Some(gst::ClockTime::from_mseconds(200)), + } + } +} + +#[derive(Debug)] +struct Frame { + video_analytics: minidom::Element, + other_elements: Vec, + events: Vec, +} + +impl Default for Frame { + fn default() -> Self { + Frame { + video_analytics: minidom::Element::bare( + "VideoAnalytics", + "http://www.onvif.org/ver10/schema", + ), + other_elements: Vec::new(), + events: Vec::new(), + } + } +} + +#[derive(Debug, Clone)] +enum TimedBufferOrEvent { + Buffer(gst::Signed, gst::Buffer), + Event(gst::Signed, gst::Event), +} + +#[derive(Debug, Clone)] +enum BufferOrEvent { + Buffer(gst::Buffer), + Event(gst::Event), +} + +#[derive(Debug)] struct State { - // Initially queued buffers until we have a UTC time / PTS mapping - pre_queued_buffers: Vec, - // Mapping of UTC time to PTS - utc_time_pts_mapping: Option<(gst::ClockTime, gst::ClockTime)>, - // UTC time -> (VideoAnalytics XML with Frames, other XML nodes) - queued_frames: BTreeMap)>, - // Configured latency + /// Initially queued buffers and serialized events until we have a UTC time / running time mapping. + pre_queued_buffers: Vec, + /// Mapping of UTC time to running time. + utc_time_running_time_mapping: Option<(gst::ClockTime, gst::Signed)>, + /// UTC time -> XML data and serialized events. + queued_frames: BTreeMap, + + /// Currently configured input segment. + in_segment: gst::FormattedSegment, + /// Currently configured output segment. + out_segment: gst::FormattedSegment, + + /// Upstream latency and live'ness. + upstream_latency: Option<(bool, gst::ClockTime)>, + /// Configured latency on this element. configured_latency: gst::ClockTime, + /// Last flow return of the source pad. + last_flow_ret: Result, + /// Clock wait of the source pad task. + /// Otherwise the source pad task waits on the condition variable. + clock_wait: Option, +} + +impl Default for State { + fn default() -> Self { + let mut segment = gst::FormattedSegment::default(); + segment.set_position(gst::ClockTime::NONE); + + State { + pre_queued_buffers: Vec::new(), + utc_time_running_time_mapping: None, + queued_frames: BTreeMap::new(), + + in_segment: segment.clone(), + out_segment: segment.clone(), + + upstream_latency: None, + configured_latency: gst::ClockTime::ZERO, + last_flow_ret: Err(gst::FlowError::Flushing), + clock_wait: None, + } + } } pub struct OnvifMetadataParse { @@ -47,18 +158,19 @@ pub struct OnvifMetadataParse { sinkpad: gst::Pad, settings: Mutex, state: Mutex, + cond: Condvar, } impl OnvifMetadataParse { fn sink_chain( &self, - _pad: &gst::Pad, + pad: &gst::Pad, element: &super::OnvifMetadataParse, buffer: gst::Buffer, ) -> Result { gst::log!( CAT, - obj: element, + obj: pad, "Handling buffer {:?} with UTC time {}", buffer, crate::lookup_reference_timestamp(&buffer).display() @@ -69,46 +181,100 @@ impl OnvifMetadataParse { let pts = match buffer.pts() { Some(pts) => pts, None => { - gst::error!(CAT, obj: element, "Need buffers with PTS"); + gst::error!(CAT, obj: pad, "Need buffers with PTS"); return Err(gst::FlowError::Error); } }; - // First we need to get an UTC/PTS mapping. We wait up to the latency + + let running_time = state.in_segment.to_running_time_full(pts).unwrap(); + + if state + .in_segment + .position() + .map_or(true, |position| position < pts) + { + gst::trace!(CAT, obj: element, "Input position updated to {}", pts); + state.in_segment.set_position(pts); + } + + // First we need to get an UTC/running time mapping. We wait up to the latency // for that and otherwise error out. - if state.utc_time_pts_mapping.is_none() { + if state.utc_time_running_time_mapping.is_none() { let utc_time = crate::lookup_reference_timestamp(&buffer); if let Some(utc_time) = utc_time { - let initial_pts = state + let initial_running_time = state .pre_queued_buffers - .first() - .map(|b| b.pts().unwrap()) - .unwrap_or(pts); - let diff = pts.saturating_sub(initial_pts); - let initial_utc_time = match utc_time.checked_sub(diff) { - Some(initial_utc_time) => initial_utc_time, + .iter() + .find_map(|o| { + if let TimedBufferOrEvent::Buffer(running_time, _) = o { + Some(*running_time) + } else { + None + } + }) + .unwrap_or(running_time); + + let diff = match running_time.checked_sub(initial_running_time) { + Some(diff) => diff, None => { - gst::error!(CAT, obj: element, "Can't calculate initial UTC time"); + gst::error!( + CAT, + obj: pad, + "Too big running time difference between initial running time {:?} and current running time {:?}", + initial_running_time, + running_time, + ); + return Err(gst::FlowError::Error); + } + }; + + let initial_utc_time = match gst::Signed::Positive(utc_time).checked_sub(diff) { + Some(gst::Signed::Positive(initial_utc_time)) => initial_utc_time, + Some(gst::Signed::Negative(initial_utc_time)) => { + // XXX: Optionally just skip the first buffers over it + gst::error!( + CAT, + obj: pad, + "Initial UTC time is negative: -{}", + initial_utc_time + ); + return Err(gst::FlowError::Error); + } + None => { + // XXX: Optionally just skip the first buffers over it + gst::error!(CAT, obj: pad, "Can't calculate initial UTC time"); return Err(gst::FlowError::Error); } }; gst::info!( CAT, - obj: element, - "Calculated initial UTC/PTS mapping: {}/{}", + obj: pad, + "Calculated initial UTC/running time mapping: {}/{:?}", initial_utc_time, - initial_pts + initial_running_time ); - state.utc_time_pts_mapping = Some((initial_utc_time, initial_pts)); + state.utc_time_running_time_mapping = + Some((initial_utc_time, initial_running_time)); } else { - state.pre_queued_buffers.push(buffer); + state + .pre_queued_buffers + .push(TimedBufferOrEvent::Buffer(running_time, buffer)); - if let Some(front_pts) = state.pre_queued_buffers.first().map(|b| b.pts().unwrap()) - { - if pts.saturating_sub(front_pts) >= state.configured_latency { + if let Some(front_running_time) = state.pre_queued_buffers.iter().find_map(|o| { + if let TimedBufferOrEvent::Buffer(running_time, _) = o { + Some(*running_time) + } else { + None + } + }) { + if running_time.saturating_sub(front_running_time) + >= gst::Signed::Positive(state.configured_latency) + { + // XXX: Optionally discard, error or just output anyway gst::error!( CAT, - obj: element, + obj: pad, "Received no UTC time in the first {}", state.configured_latency ); @@ -120,15 +286,13 @@ impl OnvifMetadataParse { } } - self.queue(element, &mut state, buffer, pts)?; - let buffers = self.drain(element, &mut state, Some(pts))?; + assert!(state.utc_time_running_time_mapping.is_some()); + self.queue(element, &mut state, buffer, running_time)?; + let res = self.wake_up_output(element, state); - if let Some(buffers) = buffers { - drop(state); - self.srcpad.push_list(buffers) - } else { - Ok(gst::FlowSuccess::Ok) - } + gst::trace!(CAT, obj: pad, "Returning {:?}", res); + + res } fn queue( @@ -136,18 +300,41 @@ impl OnvifMetadataParse { element: &super::OnvifMetadataParse, state: &mut State, buffer: gst::Buffer, - pts: gst::ClockTime, + running_time: gst::Signed, ) -> Result<(), gst::FlowError> { let State { ref mut pre_queued_buffers, ref mut queued_frames, - ref utc_time_pts_mapping, + ref utc_time_running_time_mapping, .. } = &mut *state; - let utc_time_pts_mapping = utc_time_pts_mapping.unwrap(); + let utc_time_running_time_mapping = utc_time_running_time_mapping.unwrap(); + + for buffer_or_event in + pre_queued_buffers + .drain(..) + .chain(std::iter::once(TimedBufferOrEvent::Buffer( + running_time, + buffer, + ))) + { + let (running_time, buffer) = match buffer_or_event { + TimedBufferOrEvent::Event(running_time, event) => { + let current_utc_time = + running_time_to_utc_time(utc_time_running_time_mapping, running_time) + .unwrap_or(gst::ClockTime::ZERO); + + let frame = queued_frames + .entry(current_utc_time) + .or_insert_with(Frame::default); + frame.events.push(event); + + continue; + } + TimedBufferOrEvent::Buffer(running_time, buffer) => (running_time, buffer), + }; - for buffer in pre_queued_buffers.drain(..).chain(std::iter::once(buffer)) { let root = crate::xml_from_buffer(&buffer).map_err(|err| { element.post_error_message(err); @@ -171,42 +358,29 @@ impl OnvifMetadataParse { dt_unix_ns ); - let (xml, _) = queued_frames.entry(dt_unix_ns).or_insert_with(|| { - ( - Element::bare("VideoAnalytics", "http://www.onvif.org/ver10/schema"), - Vec::new(), - ) - }); + let frame = queued_frames + .entry(dt_unix_ns) + .or_insert_with(Frame::default); - xml.append_child(el.clone()); + frame.video_analytics.append_child(el.clone()); } - let pts_diff = pts.saturating_sub(utc_time_pts_mapping.1); - let utc_time = utc_time_pts_mapping.0 + pts_diff; + let utc_time = running_time_to_utc_time(utc_time_running_time_mapping, running_time) + .unwrap_or(gst::ClockTime::ZERO); for child in root.children() { - let (_, xmls) = queued_frames.entry(utc_time).or_insert_with(|| { - ( - Element::bare("VideoAnalytics", "http://www.onvif.org/ver10/schema"), - Vec::new(), - ) - }); + let frame = queued_frames.entry(utc_time).or_insert_with(Frame::default); if child.is("VideoAnalytics", "http://www.onvif.org/ver10/schema") { - let mut element = - Element::bare("VideoAnalytics", "http://www.onvif.org/ver10/schema"); - for subchild in child.children() { if subchild.is("Frame", "http://www.onvif.org/ver10/schema") { continue; } - element.append_child(subchild.clone()); + frame.video_analytics.append_child(subchild.clone()); } - - xmls.push(element); } else { - xmls.push(child.clone()); + frame.other_elements.push(child.clone()); } } } @@ -214,72 +388,385 @@ impl OnvifMetadataParse { Ok(()) } + fn wake_up_output<'a>( + &'a self, + element: &super::OnvifMetadataParse, + mut state: std::sync::MutexGuard<'a, State>, + ) -> Result { + if state.upstream_latency.is_none() { + drop(state); + + gst::debug!(CAT, obj: element, "Have no upstream latency yet, querying"); + let mut q = gst::query::Latency::new(); + let res = self.sinkpad.peer_query(&mut q); + + state = self.state.lock().unwrap(); + + if res { + let (live, min, max) = q.result(); + + gst::debug!( + CAT, + obj: element, + "Latency query response: live {} min {} max {}", + live, + min, + max.display() + ); + + state.upstream_latency = Some((live, min)); + } else { + gst::warning!( + CAT, + obj: element, + "Can't query upstream latency -- assuming non-live upstream for now" + ); + } + } + + // Consider waking up the source element thread + if self.sinkpad.pad_flags().contains(gst::PadFlags::EOS) { + gst::trace!(CAT, obj: element, "Scheduling immediate wakeup at EOS",); + + if let Some(clock_wait) = state.clock_wait.take() { + clock_wait.unschedule(); + } + self.cond.notify_all(); + } else if self.reschedule_clock_wait(element, &mut state) { + self.cond.notify_all(); + } else { + // Not live or have no clock + + // Wake up if between now and the earliest frame's running time more than the + // configured latency has passed. + let queued_time = self.calculate_queued_time(element, &state); + + if queued_time.map_or(false, |queued_time| queued_time >= state.configured_latency) { + gst::trace!( + CAT, + obj: element, + "Scheduling immediate wakeup -- queued time {}", + queued_time.display() + ); + + if let Some(clock_wait) = state.clock_wait.take() { + clock_wait.unschedule(); + } + self.cond.notify_all(); + } + } + + state.last_flow_ret + } + + fn calculate_queued_time( + &self, + element: &super::OnvifMetadataParse, + state: &State, + ) -> Option { + let earliest_utc_time = match state.queued_frames.iter().next() { + Some((&earliest_utc_time, _earliest_frame)) => earliest_utc_time, + None => return None, + }; + + let earliest_running_time = utc_time_to_running_time( + state.utc_time_running_time_mapping.unwrap(), + earliest_utc_time, + ); + + let current_running_time = state + .in_segment + .to_running_time_full(state.in_segment.position()); + + let queued_time = Option::zip(current_running_time, earliest_running_time) + .and_then(|(current_running_time, earliest_running_time)| { + current_running_time.checked_sub(earliest_running_time) + }) + .and_then(|queued_time| queued_time.positive_or(()).ok()) + .unwrap_or(gst::ClockTime::ZERO); + + gst::trace!( + CAT, + obj: element, + "Currently queued {}", + queued_time.display() + ); + + Some(queued_time) + } + + fn reschedule_clock_wait( + &self, + element: &super::OnvifMetadataParse, + state: &mut State, + ) -> bool { + let earliest_utc_time = match state.queued_frames.iter().next() { + Some((&earliest_utc_time, _earliest_frame)) => earliest_utc_time, + None => return false, + }; + + let min_latency = match state.upstream_latency { + Some((true, min_latency)) => min_latency, + _ => return false, + }; + + let earliest_running_time = utc_time_to_running_time( + state.utc_time_running_time_mapping.unwrap(), + earliest_utc_time, + ); + + let (clock, base_time) = match (element.clock(), element.base_time()) { + (Some(clock), Some(base_time)) => (clock, base_time), + _ => { + gst::warning!( + CAT, + obj: element, + "Upstream is live but have no clock -- assuming non-live for now" + ); + return false; + } + }; + + // Update clock wait to the clock time of the earliest metadata to output plus + // the configured latency + if let Some(earliest_clock_time) = earliest_running_time + .and_then(|earliest_running_time| { + earliest_running_time + .checked_add_unsigned(base_time + min_latency + state.configured_latency) + }) + .and_then(|earliest_clock_time| earliest_clock_time.positive_or(()).ok()) + { + if state + .clock_wait + .as_ref() + .map_or(true, |clock_wait| clock_wait.time() != earliest_clock_time) + { + if let Some(clock_wait) = state.clock_wait.take() { + clock_wait.unschedule(); + } + gst::trace!( + CAT, + obj: element, + "Scheduling timer for {} / running time {}, now {}", + earliest_clock_time, + earliest_running_time + .unwrap() + .positive_or(()) + .unwrap_or(gst::ClockTime::ZERO) + .display(), + clock.time().display(), + ); + state.clock_wait = Some(clock.new_single_shot_id(earliest_clock_time)); + } + } else { + // Wake up immediately if the metadata is before the segment + if let Some(clock_wait) = state.clock_wait.take() { + clock_wait.unschedule(); + } + gst::trace!(CAT, obj: element, "Scheduling immediate wakeup"); + } + + true + } + fn drain( &self, element: &super::OnvifMetadataParse, state: &mut State, - pts: Option, - ) -> Result, gst::FlowError> { + drain_utc_time: Option, + ) -> Result, gst::FlowError> { let State { ref mut queued_frames, - utc_time_pts_mapping, - configured_latency, + ref mut out_segment, + utc_time_running_time_mapping, .. } = &mut *state; - let utc_time_pts_mapping = match utc_time_pts_mapping { - Some(utc_time_pts_mapping) => utc_time_pts_mapping, - None => return Ok(None), + let utc_time_running_time_mapping = match utc_time_running_time_mapping { + Some(utc_time_running_time_mapping) => *utc_time_running_time_mapping, + None => return Ok(vec![]), }; - let utc_time_to_pts = |utc_time: gst::ClockTime| { - if utc_time < utc_time_pts_mapping.0 { - let diff = utc_time_pts_mapping.0 - utc_time; - utc_time_pts_mapping.1.checked_sub(diff) - } else { - let diff = utc_time - utc_time_pts_mapping.0; - Some(utc_time_pts_mapping.1 + diff) - } - }; + gst::log!( + CAT, + obj: element, + "Draining up to UTC time {} / running time {} from current position {} / running time {}", + drain_utc_time.display(), + drain_utc_time + .and_then(|drain_utc_time| utc_time_to_running_time( + utc_time_running_time_mapping, + drain_utc_time + )) + .and_then(|running_time| running_time.positive_or(()).ok()) + .display(), + out_segment.position().display(), + out_segment + .to_running_time(out_segment.position()) + .display(), + ); - let mut buffers = Vec::new(); + let mut data = Vec::new(); while !queued_frames.is_empty() { let utc_time = *queued_frames.iter().next().unwrap().0; - let frame_pts = match utc_time_to_pts(utc_time) { - Some(frame_pts) => frame_pts, - None => { - gst::warning!(CAT, obj: element, "UTC time {} outside segment", utc_time); - gst::ClockTime::ZERO - } - }; - // Not at EOS and not above the latency yet - if pts.map_or(false, |pts| { - pts.saturating_sub(frame_pts) < *configured_latency - }) { + // Check if this frame should still be drained + if drain_utc_time.map_or(false, |drain_utc_time| drain_utc_time < utc_time) { break; } - let (frames, xmls) = queued_frames.remove(&utc_time).unwrap(); + // FIXME: Use pop_first() once stabilized + let mut frame = queued_frames.remove(&utc_time).unwrap(); + + let had_events = !frame.events.is_empty(); + for event in frame.events.drain(..) { + match event.view() { + gst::EventView::Segment(ev) => { + let mut segment = ev + .segment() + .downcast_ref::() + .unwrap() + .clone(); + let current_position = out_segment + .position() + .and_then(|position| out_segment.to_running_time(position)) + .and_then(|running_time| { + segment.position_from_running_time(running_time) + }); + segment.set_position(current_position); + + gst::debug!( + CAT, + obj: element, + "Configuring output segment {:?}", + segment + ); + + *out_segment = segment; + + data.push(BufferOrEvent::Event(event)); + } + gst::EventView::Caps(ev) => { + data.push(BufferOrEvent::Event( + gst::event::Caps::builder(&self.srcpad.pad_template_caps()) + .seqnum(ev.seqnum()) + .build(), + )); + } + gst::EventView::Gap(ev) => { + let (current_position, _duration) = ev.get(); + + if out_segment + .position() + .map_or(true, |position| position < current_position) + { + gst::trace!( + CAT, + obj: element, + "Output position updated to {}", + current_position + ); + out_segment.set_position(current_position); + } + + data.push(BufferOrEvent::Event(event)); + } + _ => { + data.push(BufferOrEvent::Event(event)); + } + } + } + + let mut frame_pts = + match utc_time_to_pts(out_segment, utc_time_running_time_mapping, utc_time) { + Some(frame_pts) => frame_pts, + None => { + gst::warning!(CAT, obj: element, "UTC time {} outside segment", utc_time); + gst::ClockTime::ZERO + } + }; + + if frame.video_analytics.children().next().is_none() && frame.other_elements.is_empty() + { + // Generate a gap event if there's no actual data for this time + if !had_events { + data.push(BufferOrEvent::Event( + gst::event::Gap::builder(frame_pts).build(), + )); + } + + continue; + } + + if let Some(position) = out_segment.position() { + let settings = self.settings.lock().unwrap().clone(); + let diff = position.saturating_sub(frame_pts); + if settings + .max_lateness + .map_or(false, |max_lateness| diff > max_lateness) + { + gst::warning!( + CAT, + obj: element, + "Dropping frame with UTC time {} / PTS {} that is too late by {} at current position {}", + utc_time, + frame_pts, + diff, + position, + ); + continue; + } else if diff > gst::ClockTime::ZERO { + gst::warning!( + CAT, + obj: element, + "Frame in the past by {} with UTC time {} / PTS {} at current position {}", + diff, + utc_time, + frame_pts, + position, + ); + + frame_pts = position; + } + } + + if out_segment + .position() + .map_or(true, |position| position < frame_pts) + { + gst::trace!( + CAT, + obj: element, + "Output position updated to {}", + frame_pts + ); + out_segment.set_position(frame_pts); + } + + let Frame { + video_analytics, + other_elements, + .. + } = frame; gst::trace!( CAT, obj: element, - "Dequeueing frame with UTC time {} / PTS {}", + "Producing frame with UTC time {} / PTS {}", utc_time, frame_pts ); - let mut xml = Element::builder("MetadataStream", "http://www.onvif.org/ver10/schema") - .prefix(Some("tt".into()), "http://www.onvif.org/ver10/schema") - .unwrap() - .build(); + let mut xml = + minidom::Element::builder("MetadataStream", "http://www.onvif.org/ver10/schema") + .prefix(Some("tt".into()), "http://www.onvif.org/ver10/schema") + .unwrap() + .build(); - if frames.children().next().is_some() { - xml.append_child(frames); + if video_analytics.children().next().is_some() { + xml.append_child(video_analytics); } - for child in xmls { + for child in other_elements { xml.append_child(child); } @@ -300,20 +787,22 @@ impl OnvifMetadataParse { gst::ClockTime::NONE, ); - buffers.push(buffer); + data.push(BufferOrEvent::Buffer(buffer)); } - buffers.sort_by_key(|b| b.pts()); + gst::trace!( + CAT, + obj: element, + "Position after draining {} / running time {} -- queued now {} / {} items", + out_segment.position().display(), + out_segment + .to_running_time(out_segment.position()) + .display(), + self.calculate_queued_time(element, state).display(), + state.queued_frames.len(), + ); - if !buffers.is_empty() { - let mut buffer_list = gst::BufferList::new_sized(buffers.len()); - let buffer_list_ref = buffer_list.get_mut().unwrap(); - buffer_list_ref.extend(buffers); - - Ok(Some(buffer_list)) - } else { - Ok(None) - } + Ok(data) } fn sink_event( @@ -325,60 +814,182 @@ impl OnvifMetadataParse { gst::log!(CAT, obj: pad, "Handling event {:?}", event); match event.view() { - gst::EventView::Segment(_) | gst::EventView::Eos(_) => { + gst::EventView::FlushStart(_) => { let mut state = self.state.lock().unwrap(); - let buffers = self.drain(element, &mut state, None).ok().flatten(); - state.pre_queued_buffers.clear(); - state.utc_time_pts_mapping = None; - state.queued_frames.clear(); - drop(state); - - if let Some(buffers) = buffers { - if let Err(err) = self.srcpad.push_list(buffers) { - gst::error!(CAT, obj: element, "Failed to drain frames: {}", err); - } + state.last_flow_ret = Err(gst::FlowError::Flushing); + if let Some(ref clock_wait) = state.clock_wait.take() { + clock_wait.unschedule(); } + drop(state); + self.cond.notify_all(); pad.event_default(Some(element), event) } gst::EventView::FlushStop(_) => { + let _ = self.srcpad.stop_task(); let mut state = self.state.lock().unwrap(); state.pre_queued_buffers.clear(); state.queued_frames.clear(); - state.utc_time_pts_mapping = None; + state.utc_time_running_time_mapping = None; + state.in_segment.reset(); + state.in_segment.set_position(gst::ClockTime::NONE); + state.out_segment.reset(); + state.out_segment.set_position(gst::ClockTime::NONE); + state.last_flow_ret = Ok(gst::FlowSuccess::Ok); drop(state); - pad.event_default(Some(element), event) + let mut res = pad.event_default(Some(element), event); + if res { + res = Self::src_start_task(element, &self.srcpad).is_ok(); + } + res } - gst::EventView::Caps(ev) => { - let settings = self.settings.lock().unwrap().clone(); - + ev if event.is_serialized() => { let mut state = self.state.lock().unwrap(); - let previous_latency = state.configured_latency; - let latency = if let Some(latency) = settings.latency { - latency - } else { - let caps = ev.caps(); - let s = caps.structure(0).unwrap(); - let parsed = Some(true) == s.get("parsed").ok(); - if parsed { - gst::ClockTime::ZERO - } else { - gst::ClockTime::from_seconds(6) + match ev { + gst::EventView::Segment(ev) => { + match ev.segment().downcast_ref::().cloned() { + Some(mut segment) => { + let current_position = state + .in_segment + .position() + .and_then(|position| state.in_segment.to_running_time(position)) + .and_then(|running_time| { + segment.position_from_running_time(running_time) + }); + segment.set_position(current_position); + + gst::debug!( + CAT, + obj: pad, + "Configuring input segment {:?}", + segment + ); + state.in_segment = segment; + } + None => { + gst::error!(CAT, obj: pad, "Non-TIME segment"); + return false; + } + } } - }; - state.configured_latency = latency; - drop(state); + gst::EventView::Caps(ev) => { + let settings = self.settings.lock().unwrap().clone(); - gst::debug!(CAT, obj: element, "Configuring latency of {}", latency); - if previous_latency != latency { - let _ = - element.post_message(gst::message::Latency::builder().src(element).build()); + let previous_latency = state.configured_latency; + let latency = if let Some(latency) = settings.latency { + latency + } else { + let caps = ev.caps(); + let s = caps.structure(0).unwrap(); + let parsed = Some(true) == s.get("parsed").ok(); + + if parsed { + gst::ClockTime::ZERO + } else { + gst::ClockTime::from_seconds(6) + } + }; + state.configured_latency = latency; + drop(state); + + gst::debug!(CAT, obj: pad, "Configuring latency of {}", latency); + if previous_latency != latency { + let _ = element.post_message( + gst::message::Latency::builder().src(element).build(), + ); + } + + state = self.state.lock().unwrap(); + } + gst::EventView::Gap(ev) => { + let (mut current_position, duration) = ev.get(); + if let Some(duration) = duration { + current_position += duration; + } + + if state + .in_segment + .position() + .map_or(true, |position| position < current_position) + { + gst::trace!( + CAT, + obj: element, + "Input position updated to {}", + current_position + ); + state.in_segment.set_position(current_position); + } + } + _ => (), } - let caps = self.srcpad.pad_template_caps(); - self.srcpad - .push_event(gst::event::Caps::builder(&caps).build()) + let State { + utc_time_running_time_mapping, + ref in_segment, + ref mut queued_frames, + ref mut pre_queued_buffers, + .. + } = &mut *state; + + if let Some(utc_time_running_time_mapping) = utc_time_running_time_mapping { + let current_running_time = in_segment + .to_running_time_full(in_segment.position()) + .unwrap_or(gst::Signed::Negative(gst::ClockTime::from_nseconds( + u64::MAX, + ))); + let current_utc_time = running_time_to_utc_time( + *utc_time_running_time_mapping, + current_running_time, + ) + .unwrap_or(gst::ClockTime::ZERO); + + gst::trace!( + CAT, + obj: element, + "Queueing event with UTC time {} / running time {}", + current_utc_time, + current_running_time.positive_or(()).ok().display(), + ); + + let frame = queued_frames + .entry(current_utc_time) + .or_insert_with(Frame::default); + frame.events.push(event); + + self.wake_up_output(element, state).is_ok() + } else { + if let gst::EventView::Eos(_) = ev { + gst::error!( + CAT, + obj: element, + "Got EOS event before creating UTC/running time mapping" + ); + gst::element_error!( + element, + gst::StreamError::Failed, + ["Got EOS event before creating UTC/running time mapping"] + ); + return pad.event_default(Some(element), event); + } + + let current_running_time = in_segment + .to_running_time_full(in_segment.position()) + .unwrap_or(gst::Signed::Negative(gst::ClockTime::from_nseconds( + u64::MAX, + ))); + + gst::trace!( + CAT, + obj: element, + "Pre-queueing event with running time {}", + current_running_time.positive_or(()).ok().display() + ); + + pre_queued_buffers.push(TimedBufferOrEvent::Event(current_running_time, event)); + true + } } _ => pad.event_default(Some(element), event), } @@ -412,6 +1023,10 @@ impl OnvifMetadataParse { true } + gst::QueryViewMut::Allocation(_) => { + gst::fixme!(CAT, obj: pad, "Dropping allocation query"); + false + } _ => pad.query_default(Some(element), query), } } @@ -425,13 +1040,34 @@ impl OnvifMetadataParse { gst::log!(CAT, obj: pad, "Handling event {:?}", event); match event.view() { + gst::EventView::FlushStart(_) => { + let mut state = self.state.lock().unwrap(); + state.last_flow_ret = Err(gst::FlowError::Flushing); + if let Some(ref clock_wait) = state.clock_wait.take() { + clock_wait.unschedule(); + } + drop(state); + self.cond.notify_all(); + + pad.event_default(Some(element), event) + } gst::EventView::FlushStop(_) => { + let _ = self.srcpad.stop_task(); let mut state = self.state.lock().unwrap(); state.pre_queued_buffers.clear(); state.queued_frames.clear(); - state.utc_time_pts_mapping = None; + state.utc_time_running_time_mapping = None; + state.in_segment.reset(); + state.in_segment.set_position(gst::ClockTime::NONE); + state.out_segment.reset(); + state.out_segment.set_position(gst::ClockTime::NONE); + state.last_flow_ret = Ok(gst::FlowSuccess::Ok); drop(state); - pad.event_default(Some(element), event) + let mut res = pad.event_default(Some(element), event); + if res { + res = Self::src_start_task(element, &self.srcpad).is_ok(); + } + res } _ => pad.event_default(Some(element), event), } @@ -467,13 +1103,13 @@ impl OnvifMetadataParse { } gst::QueryViewMut::Latency(q) => { let mut upstream_query = gst::query::Latency::new(); - let ret = self.sinkpad.peer_query(&mut upstream_query); if ret { let (live, mut min, mut max) = upstream_query.result(); - let state = self.state.lock().unwrap(); + let mut state = self.state.lock().unwrap(); + state.upstream_latency = Some((live, min)); min += state.configured_latency; max = max.map(|max| max + state.configured_latency); @@ -487,6 +1123,8 @@ impl OnvifMetadataParse { min, max.display() ); + + let _ = self.wake_up_output(element, state); } ret @@ -494,6 +1132,245 @@ impl OnvifMetadataParse { _ => pad.query_default(Some(element), query), } } + + fn src_start_task( + element: &super::OnvifMetadataParse, + pad: &gst::Pad, + ) -> Result<(), gst::LoggableError> { + let element = element.clone(); + pad.start_task(move || { + let self_ = element.imp(); + if let Err(err) = self_.src_loop(&element) { + match err { + gst::FlowError::Flushing => { + gst::debug!(CAT, obj: &element, "Pausing after flow {:?}", err); + } + gst::FlowError::Eos => { + let _ = self_.srcpad.push_event(gst::event::Eos::builder().build()); + + gst::debug!(CAT, obj: &element, "Pausing after flow {:?}", err); + } + _ => { + let _ = self_.srcpad.push_event(gst::event::Eos::builder().build()); + + gst::error!(CAT, obj: &element, "Pausing after flow {:?}", err); + + gst::element_error!( + &element, + gst::StreamError::Failed, + ["Streaming stopped, reason: {:?}", err] + ); + } + } + + let _ = self_.srcpad.pause_task(); + } + }) + .map_err(|err| gst::loggable_error!(CAT, "Failed to start pad task: {}", err)) + } + + fn src_activatemode( + pad: &gst::Pad, + mode: gst::PadMode, + activate: bool, + ) -> Result<(), gst::LoggableError> { + if mode == gst::PadMode::Pull || activate && mode == gst::PadMode::None { + return Err(gst::loggable_error!(CAT, "Invalid activation mode")); + } + + if activate { + let element = pad + .parent() + .map(|p| p.downcast::<::Type>().unwrap()) + .ok_or_else(|| { + gst::loggable_error!(CAT, "Failed to start pad task: pad has no parent") + })?; + + let self_ = element.imp(); + let mut state = self_.state.lock().unwrap(); + state.last_flow_ret = Ok(gst::FlowSuccess::Ok); + drop(state); + + Self::src_start_task(&element, pad)?; + } else { + let element = pad + .parent() + .map(|p| p.downcast::().unwrap()) + .ok_or_else(|| { + gst::loggable_error!(CAT, "Failed to stop pad task: pad has no parent") + })?; + + let self_ = element.imp(); + let mut state = self_.state.lock().unwrap(); + state.last_flow_ret = Err(gst::FlowError::Flushing); + if let Some(ref clock_wait) = state.clock_wait.take() { + clock_wait.unschedule(); + } + drop(state); + self_.cond.notify_all(); + + pad.stop_task() + .map_err(|err| gst::loggable_error!(CAT, "Failed to stop pad task: {}", err))?; + } + + Ok(()) + } + + fn src_loop(&self, element: &super::OnvifMetadataParse) -> Result<(), gst::FlowError> { + let mut state = self.state.lock().unwrap(); + + let mut clock_wait_time = None; + + loop { + // If flushing or any other error then just return here + state.last_flow_ret?; + + if !self.sinkpad.pad_flags().contains(gst::PadFlags::EOS) { + if let Some(clock_wait) = state.clock_wait.clone() { + gst::trace!( + CAT, + obj: element, + "Waiting on timer with time {}, now {}", + clock_wait.time(), + clock_wait.clock().and_then(|clock| clock.time()).display(), + ); + clock_wait_time = Some(clock_wait.time()); + + drop(state); + let res = clock_wait.wait(); + state = self.state.lock().unwrap(); + + match res { + (Ok(_), jitter) => { + gst::trace!(CAT, obj: element, "Woke up after waiting for {}", jitter); + } + (Err(err), jitter) => { + gst::trace!( + CAT, + obj: element, + "Woke up with error {:?} and jitter {}", + err, + jitter + ); + + // If unscheduled wait again or return immediately above if flushing + if err == gst::ClockError::Unscheduled { + continue; + } + } + } + } else { + gst::debug!(CAT, obj: element, "Waiting on cond"); + state = self.cond.wait(state).unwrap(); + + if state.clock_wait.is_some() { + gst::trace!(CAT, obj: element, "Got timer now, waiting again"); + continue; + } + gst::trace!(CAT, obj: element, "Woke up and checking for data to drain"); + } + } + + break; + } + + // If flushing or any other error then just return here + state.last_flow_ret?; + + let res = loop { + // Calculate running time until which to drain now + let mut drain_running_time = None; + if self.sinkpad.pad_flags().contains(gst::PadFlags::EOS) { + // Drain completely + gst::debug!(CAT, obj: element, "Sink pad is EOS, draining"); + } else if let Some((true, min_latency)) = state.upstream_latency { + if let Some((now, base_time)) = Option::zip( + element.clock().and_then(|clock| clock.time()), + element.base_time(), + ) { + gst::trace!( + CAT, + obj: element, + "Clock time now {}, timer at {}", + now, + clock_wait_time.display() + ); + + let now = + (now - base_time).saturating_sub(min_latency + state.configured_latency); + let now = if let Some(clock_wait_time) = clock_wait_time { + let clock_wait_time = (clock_wait_time - base_time) + .saturating_sub(min_latency + state.configured_latency); + std::cmp::max(now, clock_wait_time) + } else { + now + }; + + drain_running_time = Some(gst::Signed::Positive(now)); + } + } else { + let current_running_time = state + .in_segment + .to_running_time_full(state.in_segment.position()); + drain_running_time = current_running_time.and_then(|current_running_time| { + current_running_time.checked_sub_unsigned(state.configured_latency) + }); + } + + // And drain up to that running time now + let data = if self.sinkpad.pad_flags().contains(gst::PadFlags::EOS) { + self.drain(element, &mut state, None)? + } else if let Some((drain_running_time, utc_time_running_time_mapping)) = + Option::zip(drain_running_time, state.utc_time_running_time_mapping) + { + if let Some(drain_utc_time) = + running_time_to_utc_time(utc_time_running_time_mapping, drain_running_time) + { + self.drain(element, &mut state, Some(drain_utc_time))? + } else { + vec![] + } + } else { + vec![] + }; + + if data.is_empty() { + break state.last_flow_ret.map(|_| ()); + } + + drop(state); + + let mut res = Ok(()); + + gst::trace!(CAT, obj: element, "Pushing {} items downstream", data.len()); + for data in data { + match data { + BufferOrEvent::Event(event) => { + gst::trace!(CAT, obj: element, "Pushing event {:?}", event); + self.srcpad.push_event(event); + } + BufferOrEvent::Buffer(buffer) => { + gst::trace!(CAT, obj: element, "Pushing buffer {:?}", buffer); + if let Err(err) = self.srcpad.push(buffer) { + res = Err(err); + break; + } + } + } + } + gst::trace!(CAT, obj: element, "Pushing returned {:?}", res); + + state = self.state.lock().unwrap(); + + // If flushing or any other error then just return here + state.last_flow_ret?; + state.last_flow_ret = res.map(|_| gst::FlowSuccess::Ok); + + self.reschedule_clock_wait(element, &mut state); + }; + + res + } } #[glib::object_subclass] @@ -545,6 +1422,9 @@ impl ObjectSubclass for OnvifMetadataParse { |parse, element| parse.src_query(pad, element, query), ) }) + .activatemode_function(|pad, _parent, mode, activate| { + Self::src_activatemode(pad, mode, activate) + }) .flags(gst::PadFlags::PROXY_ALLOCATION) .flags(gst::PadFlags::FIXED_CAPS) .build(); @@ -554,6 +1434,7 @@ impl ObjectSubclass for OnvifMetadataParse { sinkpad, settings: Mutex::default(), state: Mutex::default(), + cond: Condvar::new(), } } } @@ -561,20 +1442,33 @@ impl ObjectSubclass for OnvifMetadataParse { impl ObjectImpl for OnvifMetadataParse { fn properties() -> &'static [glib::ParamSpec] { static PROPERTIES: Lazy> = Lazy::new(|| { - vec![glib::ParamSpecUInt64::builder("latency") - .nick("Latency") - .blurb( - "Maximum latency to introduce for reordering metadata \ - (max=auto: 6s if unparsed input, 0s if parsed input)", - ) - .default_value( - Settings::default() - .latency - .map(|l| l.nseconds()) - .unwrap_or(u64::MAX), - ) - .mutable_ready() - .build()] + vec![ + glib::ParamSpecUInt64::builder("latency") + .nick("Latency") + .blurb( + "Maximum latency to introduce for reordering metadata \ + (max=auto: 6s if unparsed input, 0s if parsed input)", + ) + .default_value( + Settings::default() + .latency + .map(|l| l.nseconds()) + .unwrap_or(u64::MAX), + ) + .mutable_ready() + .build(), + glib::ParamSpecUInt64::builder("max-lateness") + .nick("Maximum Lateness") + .blurb("Drop metadata that delayed by more than this") + .default_value( + Settings::default() + .max_lateness + .map(|l| l.nseconds()) + .unwrap_or(u64::MAX), + ) + .mutable_ready() + .build(), + ] }); PROPERTIES.as_ref() @@ -593,6 +1487,10 @@ impl ObjectImpl for OnvifMetadataParse { let _ = obj.post_message(gst::message::Latency::builder().src(obj).build()); } + "max-lateness" => { + self.settings.lock().unwrap().max_lateness = + value.get().expect("type checked upstream"); + } _ => unimplemented!(), }; } @@ -600,6 +1498,7 @@ impl ObjectImpl for OnvifMetadataParse { fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { match pspec.name() { "latency" => self.settings.lock().unwrap().latency.to_value(), + "max-lateness" => self.settings.lock().unwrap().max_lateness.to_value(), _ => unimplemented!(), } } @@ -663,18 +1562,14 @@ impl ElementImpl for OnvifMetadataParse { ) -> Result { gst::trace!(CAT, obj: element, "Changing state {:?}", transition); - if transition == gst::StateChange::PausedToReady { + if matches!( + transition, + gst::StateChange::PausedToReady | gst::StateChange::ReadyToPaused + ) { let mut state = self.state.lock().unwrap(); *state = State::default(); } - let ret = self.parent_change_state(element, transition)?; - - if transition == gst::StateChange::ReadyToPaused { - let mut state = self.state.lock().unwrap(); - *state = State::default(); - } - - Ok(ret) + self.parent_change_state(element, transition) } }