diff --git a/Cargo.toml b/Cargo.toml index ea472bd4..bfca285d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "generic/sodium", "generic/threadshare", "net/hlssink3", + "net/onvif", "net/reqwest", "net/rusoto", "utils/fallbackswitch", @@ -45,6 +46,7 @@ default-members = [ "audio/lewton", "generic/file", "generic/threadshare", + "net/onvif", "net/reqwest", "net/rusoto", "utils/fallbackswitch", diff --git a/ci/utils.py b/ci/utils.py index 1dd5a79a..77753551 100644 --- a/ci/utils.py +++ b/ci/utils.py @@ -3,7 +3,7 @@ import os DIRS = ['audio', 'generic', 'net', 'text', 'utils', 'video'] # Plugins whose name is prefixed by 'rs' RS_PREFIXED = ['audiofx', 'closedcaption', - 'dav1d', 'file', 'json', 'regex', 'webp'] + 'dav1d', 'file', 'json', 'onvif', 'regex', 'webp'] OVERRIDE = {'wrap': 'rstextwrap', 'flavors': 'rsflv', 'ahead': 'textahead'} diff --git a/meson.build b/meson.build index a5a540a9..1cc88311 100644 --- a/meson.build +++ b/meson.build @@ -61,6 +61,7 @@ plugins = { 'gst-plugin-uriplaylistbin': 'libgsturiplaylistbin', 'gst-plugin-spotify': 'libgstspotify', 'gst-plugin-textahead': 'libgsttextahead', + 'gst-plugin-onvif': 'libgstrsonvif', } extra_env = {} diff --git a/net/onvif/Cargo.toml b/net/onvif/Cargo.toml new file mode 100644 index 00000000..68490421 --- /dev/null +++ b/net/onvif/Cargo.toml @@ -0,0 +1,47 @@ +[package] +name = "gst-plugin-onvif" +version = "0.9.0" +authors = ["Mathieu Duponchelle "] +repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" +license = "MPL-2.0" +description = "Rust ONVIF Plugin" +edition = "2021" +rust-version = "1.57" + +[dependencies] +gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] } +gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] } +gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] } +gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] } +once_cell = "1.0" +xmlparser = "0.13" +minidom = "0.14" +chrono = "0.4" +cairo-rs = { git = "https://github.com/gtk-rs/gtk-rs-core", features=["use_glib"] } +pango = { git = "https://github.com/gtk-rs/gtk-rs-core" } +pangocairo = { git = "https://github.com/gtk-rs/gtk-rs-core" } + +[lib] +name = "gstrsonvif" +crate-type = ["cdylib", "rlib"] +path = "src/lib.rs" + +[build-dependencies] +gst-plugin-version-helper = { path="../../version-helper" } + +[features] +static = [] +capi = [] + +[package.metadata.capi] +min_version = "0.8.0" + +[package.metadata.capi.header] +enabled = false + +[package.metadata.capi.library] +install_subdir = "gstreamer-1.0" +versioning = false + +[package.metadata.capi.pkg_config] +requires_private = "gstreamer-1.0, gstreamer-base-1.0, gobject-2.0, glib-2.0, gmodule-2.0" diff --git a/net/onvif/LICENSE-MPL-2.0 b/net/onvif/LICENSE-MPL-2.0 new file mode 120000 index 00000000..eb5d24fe --- /dev/null +++ b/net/onvif/LICENSE-MPL-2.0 @@ -0,0 +1 @@ +../../LICENSE-MPL-2.0 \ No newline at end of file diff --git a/net/onvif/build.rs b/net/onvif/build.rs new file mode 100644 index 00000000..cda12e57 --- /dev/null +++ b/net/onvif/build.rs @@ -0,0 +1,3 @@ +fn main() { + gst_plugin_version_helper::info() +} diff --git a/net/onvif/src/lib.rs b/net/onvif/src/lib.rs new file mode 100644 index 00000000..c62a3626 --- /dev/null +++ b/net/onvif/src/lib.rs @@ -0,0 +1,38 @@ +// Copyright (C) 2022 Mathieu Duponchelle +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 +#![allow(clippy::non_send_fields_in_send_ty)] + +use gst::glib; + +mod onvifaggregator; +mod onvifdepay; +mod onvifoverlay; +mod onvifpay; + +fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + onvifpay::register(plugin)?; + onvifdepay::register(plugin)?; + onvifaggregator::register(plugin)?; + onvifoverlay::register(plugin)?; + + gst::meta::CustomMeta::register("OnvifXMLFrameMeta", &[], |_, _, _, _| true); + + Ok(()) +} + +gst::plugin_define!( + rsonvif, + env!("CARGO_PKG_DESCRIPTION"), + plugin_init, + concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")), + "MPL", + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_REPOSITORY"), + env!("BUILD_REL_DATE") +); diff --git a/net/onvif/src/onvifaggregator/imp.rs b/net/onvif/src/onvifaggregator/imp.rs new file mode 100644 index 00000000..26de81a3 --- /dev/null +++ b/net/onvif/src/onvifaggregator/imp.rs @@ -0,0 +1,541 @@ +use gst::glib; +use gst::prelude::*; +use gst::subclass::prelude::*; +use gst_base::prelude::*; +use gst_base::subclass::prelude::*; +use gst_base::AGGREGATOR_FLOW_NEED_DATA; +use minidom::Element; +use once_cell::sync::Lazy; +use std::collections::BTreeSet; +use std::io::Cursor; +use std::sync::Mutex; + +// Offset in nanoseconds from midnight 01-01-1900 (prime epoch) to +// midnight 01-01-1970 (UNIX epoch) +const PRIME_EPOCH_OFFSET: gst::ClockTime = gst::ClockTime::from_seconds(2_208_988_800); + +// Incoming metadata is split up frame-wise, and stored in a FIFO. +#[derive(Eq, Clone)] +struct MetaFrame { + // From UtcTime attribute, in nanoseconds since prime epoch + timestamp: gst::ClockTime, + // The frame element, dumped to XML + buffer: gst::Buffer, +} + +impl Ord for MetaFrame { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.timestamp.cmp(&other.timestamp) + } +} + +impl PartialOrd for MetaFrame { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for MetaFrame { + fn eq(&self, other: &Self) -> bool { + self.timestamp == other.timestamp + } +} + +#[derive(Default)] +struct State { + // FIFO of MetaFrames + meta_frames: BTreeSet, +} + +pub struct OnvifAggregator { + // Input media stream, can be anything with a reference timestamp meta + media_sink_pad: gst_base::AggregatorPad, + // Input metadata stream, must be complete VideoAnalytics XML documents + // as output by onvifdepay + meta_sink_pad: gst_base::AggregatorPad, + state: Mutex, +} + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "onvifaggregator", + gst::DebugColorFlags::empty(), + Some("ONVIF metadata / video aggregator"), + ) +}); + +static NTP_CAPS: Lazy = Lazy::new(|| gst::Caps::builder("timestamp/x-ntp").build()); + +#[glib::object_subclass] +impl ObjectSubclass for OnvifAggregator { + const NAME: &'static str = "GstOnvifAggregator"; + type Type = super::OnvifAggregator; + type ParentType = gst_base::Aggregator; + + fn with_class(klass: &Self::Class) -> Self { + let templ = klass.pad_template("media").unwrap(); + let media_sink_pad = + gst::PadBuilder::::from_template(&templ, Some("media")) + .build(); + + let templ = klass.pad_template("meta").unwrap(); + let meta_sink_pad = + gst::PadBuilder::::from_template(&templ, Some("meta")).build(); + + Self { + media_sink_pad, + meta_sink_pad, + state: Mutex::default(), + } + } +} + +impl ObjectImpl for OnvifAggregator { + fn constructed(&self, obj: &Self::Type) { + self.parent_constructed(obj); + + obj.add_pad(&self.media_sink_pad).unwrap(); + obj.add_pad(&self.meta_sink_pad).unwrap(); + } +} + +impl GstObjectImpl for OnvifAggregator {} + +impl ElementImpl for OnvifAggregator { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "ONVIF metadata aggregator", + "Aggregator", + "ONVIF metadata aggregator", + "Mathieu Duponchelle ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let media_caps = gst::Caps::new_any(); + let media_sink_pad_template = gst::PadTemplate::with_gtype( + "media", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &media_caps, + gst_base::AggregatorPad::static_type(), + ) + .unwrap(); + + let meta_caps = gst::Caps::builder("application/x-onvif-metadata") + .field("encoding", "utf8") + .build(); + + let meta_sink_pad_template = gst::PadTemplate::with_gtype( + "meta", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &meta_caps, + gst_base::AggregatorPad::static_type(), + ) + .unwrap(); + + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &media_caps, + ) + .unwrap(); + + vec![ + media_sink_pad_template, + meta_sink_pad_template, + src_pad_template, + ] + }); + + PAD_TEMPLATES.as_ref() + } + + fn request_new_pad( + &self, + element: &Self::Type, + _templ: &gst::PadTemplate, + _name: Option, + _caps: Option<&gst::Caps>, + ) -> Option { + gst::error!( + CAT, + obj: element, + "onvifaggregator doesn't expose request pads" + ); + + None + } + + fn release_pad(&self, element: &Self::Type, _pad: &gst::Pad) { + gst::error!( + CAT, + obj: element, + "onvifaggregator doesn't expose request pads" + ); + } +} + +impl OnvifAggregator { + // We simply consume all the incoming meta buffers and store them in a FIFO + // as they arrive + fn consume_meta( + &self, + state: &mut State, + element: &super::OnvifAggregator, + ) -> Result<(), gst::FlowError> { + while let Some(buffer) = self.meta_sink_pad.pop_buffer() { + let buffer = buffer.into_mapped_buffer_readable().map_err(|_| { + gst::element_error!( + element, + gst::ResourceError::Read, + ["Failed to map buffer readable"] + ); + + gst::FlowError::Error + })?; + + let utf8 = std::str::from_utf8(buffer.as_ref()).map_err(|err| { + gst::element_error!( + element, + gst::StreamError::Format, + ["Failed to decode buffer as UTF-8: {}", err] + ); + + gst::FlowError::Error + })?; + + let root = utf8.parse::().map_err(|err| { + gst::element_error!( + element, + gst::ResourceError::Read, + ["Failed to parse buffer as XML: {}", err] + ); + + gst::FlowError::Error + })?; + + if let Some(analytics) = + root.get_child("VideoAnalytics", "http://www.onvif.org/ver10/schema") + { + for el in analytics.children() { + // We are only interested in associating Frame metadata with video frames + if el.is("Frame", "http://www.onvif.org/ver10/schema") { + let timestamp = el.attr("UtcTime").ok_or_else(|| { + gst::element_error!( + element, + gst::ResourceError::Read, + ["Frame element has no UtcTime attribute"] + ); + + gst::FlowError::Error + })?; + + let dt = + chrono::DateTime::parse_from_rfc3339(timestamp).map_err(|err| { + gst::element_error!( + element, + gst::ResourceError::Read, + ["Failed to parse UtcTime {}: {}", timestamp, err] + ); + + gst::FlowError::Error + })?; + + let prime_dt_ns = PRIME_EPOCH_OFFSET + + gst::ClockTime::from_nseconds(dt.timestamp_nanos() as u64); + + let mut writer = Cursor::new(Vec::new()); + el.write_to(&mut writer).map_err(|err| { + gst::element_error!( + element, + gst::ResourceError::Write, + ["Failed to write back frame as XML: {}", err] + ); + + gst::FlowError::Error + })?; + + gst::trace!(CAT, "Consuming metadata buffer {}", prime_dt_ns); + + state.meta_frames.insert(MetaFrame { + timestamp: prime_dt_ns, + buffer: gst::Buffer::from_slice(writer.into_inner()), + }); + } + } + } + } + + Ok(()) + } + + fn lookup_reference_timestamp(&self, buffer: gst::Buffer) -> Option { + for meta in buffer.iter_meta::() { + if meta.reference().is_subset(&NTP_CAPS) { + return Some(meta.timestamp()); + } + } + + None + } + + // Called after consuming metadata buffers, we peek the current media buffer + // and output it when: + // + // * it does not have a reference timestamp meta + // * we have timed out + // * we have consumed a metadata buffer for a future frame + fn consume_media( + &self, + state: &mut State, + element: &super::OnvifAggregator, + timeout: bool, + ) -> Result)>, gst::FlowError> { + if let Some(media_buffer) = self.media_sink_pad.peek_buffer() { + let duration = media_buffer.duration().ok_or_else(|| { + gst::error!(CAT, obj: element, "Require buffers with duration"); + gst::FlowError::Error + })?; + + if let Some(start) = self.lookup_reference_timestamp(media_buffer) { + let end = start + duration; + + if let Some(latest_frame) = state.meta_frames.iter().next_back() { + if latest_frame.timestamp > end || timeout { + gst::debug!( + CAT, + obj: element, + "Media buffer spanning {} -> {} is ready", + start, + end + ); + Ok(Some((self.media_sink_pad.pop_buffer().unwrap(), Some(end)))) + } else { + gst::trace!( + CAT, + obj: element, + "Media buffer spanning {} -> {} isn't ready yet", + start, + end + ); + Ok(None) + } + } else { + gst::trace!( + CAT, + obj: element, + "Media buffer spanning {} -> {} isn't ready yet", + start, + end + ); + + Ok(None) + } + } else { + gst::debug!( + CAT, + obj: element, + "Consuming media buffer with no reference NTP timestamp" + ); + + Ok(Some(( + self.media_sink_pad.pop_buffer().unwrap(), + gst::ClockTime::NONE, + ))) + } + } else { + gst::trace!(CAT, obj: element, "No media buffer queued"); + + Ok(None) + } + } +} + +impl AggregatorImpl for OnvifAggregator { + fn aggregate( + &self, + element: &Self::Type, + timeout: bool, + ) -> Result { + let mut state = self.state.lock().unwrap(); + + self.consume_meta(&mut state, element)?; + + // When the current media buffer is ready, we attach all matching metadata buffers + // and push it out + if let Some((mut buffer, end)) = self.consume_media(&mut state, element, timeout)? { + let mut buflist = gst::BufferList::new(); + + if let Some(end) = end { + let mut split_at: Option = None; + let buflist_mut = buflist.get_mut().unwrap(); + + for frame in state.meta_frames.iter() { + if frame.timestamp > end { + gst::trace!( + CAT, + obj: element, + "keeping metadata buffer at {} for next media buffer", + frame.timestamp + ); + split_at = Some(frame.clone()); + break; + } else { + gst::debug!( + CAT, + obj: element, + "Attaching meta buffer {}", + frame.timestamp + ); + buflist_mut.add(frame.buffer.clone()); + } + } + + if let Some(split_at) = split_at { + state.meta_frames = state.meta_frames.split_off(&split_at); + } else { + state.meta_frames.clear(); + } + } + + drop(state); + + { + let buf = buffer.make_mut(); + let mut meta = gst::meta::CustomMeta::add(buf, "OnvifXMLFrameMeta").unwrap(); + + let s = meta.mut_structure(); + s.set("frames", buflist); + } + + element.set_position(buffer.pts().opt_add(buffer.duration())); + + self.finish_buffer(element, buffer) + } else { + Err(AGGREGATOR_FLOW_NEED_DATA) + } + } + + fn src_query(&self, aggregator: &Self::Type, query: &mut gst::QueryRef) -> bool { + use gst::QueryViewMut; + + match query.view_mut() { + QueryViewMut::Position(..) + | QueryViewMut::Duration(..) + | QueryViewMut::Uri(..) + | QueryViewMut::Caps(..) + | QueryViewMut::Allocation(..) => self.media_sink_pad.peer_query(query), + QueryViewMut::AcceptCaps(q) => { + let caps = q.caps_owned(); + let class = aggregator.class(); + let templ_caps = class.pad_template("media").unwrap().caps(); + + q.set_result(caps.is_subset(&templ_caps)); + + true + } + _ => self.parent_src_query(aggregator, query), + } + } + + fn sink_event( + &self, + aggregator: &Self::Type, + aggregator_pad: &gst_base::AggregatorPad, + event: gst::Event, + ) -> bool { + use gst::EventView; + + match event.view() { + EventView::Caps(e) => { + if aggregator_pad.upcast_ref::() == &self.media_sink_pad { + gst::info!(CAT, obj: aggregator, "Pushing caps {}", e.caps()); + aggregator.set_src_caps(&e.caps_owned()); + } + + true + } + EventView::Segment(e) => { + if aggregator_pad.upcast_ref::() == &self.media_sink_pad { + aggregator.update_segment(e.segment()); + } + self.parent_sink_event(aggregator, aggregator_pad, event) + } + _ => self.parent_sink_event(aggregator, aggregator_pad, event), + } + } + + fn sink_query( + &self, + aggregator: &Self::Type, + aggregator_pad: &gst_base::AggregatorPad, + query: &mut gst::QueryRef, + ) -> bool { + use gst::QueryViewMut; + + match query.view_mut() { + QueryViewMut::Position(..) + | QueryViewMut::Duration(..) + | QueryViewMut::Uri(..) + | QueryViewMut::Allocation(..) => { + if aggregator_pad == &self.media_sink_pad { + let srcpad = aggregator.src_pad(); + srcpad.peer_query(query) + } else { + self.parent_sink_query(aggregator, aggregator_pad, query) + } + } + QueryViewMut::Caps(q) => { + if aggregator_pad == &self.media_sink_pad { + let srcpad = aggregator.src_pad(); + srcpad.peer_query(query) + } else { + let filter = q.filter_owned(); + let class = aggregator.class(); + let templ_caps = class.pad_template("meta").unwrap().caps(); + + if let Some(filter) = filter { + q.set_result( + &filter.intersect_with_mode(&templ_caps, gst::CapsIntersectMode::First), + ); + } else { + q.set_result(&templ_caps); + } + + true + } + } + QueryViewMut::AcceptCaps(q) => { + if aggregator_pad.upcast_ref::() == &self.media_sink_pad { + let srcpad = aggregator.src_pad(); + srcpad.peer_query(query); + } else { + let caps = q.caps_owned(); + let class = aggregator.class(); + let templ_caps = class.pad_template("meta").unwrap().caps(); + + q.set_result(caps.is_subset(&templ_caps)); + } + + true + } + _ => self.parent_src_query(aggregator, query), + } + } + + fn next_time(&self, aggregator: &Self::Type) -> Option { + aggregator.simple_get_next_time() + } + + fn negotiate(&self, _aggregator: &Self::Type) -> bool { + true + } +} diff --git a/net/onvif/src/onvifaggregator/mod.rs b/net/onvif/src/onvifaggregator/mod.rs new file mode 100644 index 00000000..50513f86 --- /dev/null +++ b/net/onvif/src/onvifaggregator/mod.rs @@ -0,0 +1,17 @@ +use gst::glib; +use gst::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct OnvifAggregator(ObjectSubclass) @extends gst_base::Aggregator, gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "onvifaggregator", + gst::Rank::Primary, + OnvifAggregator::static_type(), + ) +} diff --git a/net/onvif/src/onvifdepay/imp.rs b/net/onvif/src/onvifdepay/imp.rs new file mode 100644 index 00000000..6f36d3b0 --- /dev/null +++ b/net/onvif/src/onvifdepay/imp.rs @@ -0,0 +1,204 @@ +use gst::glib; +use gst::subclass::prelude::*; +use gst_rtp::subclass::prelude::*; +use once_cell::sync::Lazy; +use std::sync::Mutex; + +#[derive(Default)] +struct State { + // Aggregate payloads to form a complete XML document + adapter: gst_base::UniqueAdapter, +} + +#[derive(Default)] +pub struct OnvifDepay { + state: Mutex, +} + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "onvifdepay", + gst::DebugColorFlags::empty(), + Some("ONVIF metadata depayloader"), + ) +}); + +#[glib::object_subclass] +impl ObjectSubclass for OnvifDepay { + const NAME: &'static str = "GstOnvifDepay"; + type Type = super::OnvifDepay; + type ParentType = gst_rtp::RTPBaseDepayload; +} + +impl ObjectImpl for OnvifDepay {} + +impl GstObjectImpl for OnvifDepay {} + +impl ElementImpl for OnvifDepay { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "ONVIF metadata RTP depayloader", + "Depayloader/Network/RTP", + "ONVIF metadata RTP depayloader", + "Mathieu Duponchelle ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let sink_caps = gst::Caps::builder("application/x-rtp") + .field("media", "application") + .field("payload", gst::IntRange::new(96, 127)) + .field("clock-rate", 90000) + .field("encoding-name", "VND.ONVIF.METADATA") + .build(); + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &sink_caps, + ) + .unwrap(); + + let src_caps = gst::Caps::builder("application/x-onvif-metadata") + .field("encoding", "utf8") + .build(); + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &src_caps, + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} + +impl RTPBaseDepayloadImpl for OnvifDepay { + fn process_rtp_packet( + &self, + element: &Self::Type, + rtp_buffer: &gst_rtp::RTPBuffer, + ) -> Option { + // Retrieve the payload subbuffer + let payload_buffer = match rtp_buffer.payload_buffer() { + Ok(buffer) => buffer, + Err(..) => { + gst::element_error!( + element, + gst::ResourceError::Read, + ["Failed to retrieve RTP buffer payload"] + ); + + return None; + } + }; + + let mut state = self.state.lock().unwrap(); + + if rtp_buffer + .buffer() + .flags() + .contains(gst::BufferFlags::DISCONT) + { + gst::debug!(CAT, obj: element, "processing discont RTP buffer"); + state.adapter.clear(); + } + + // Now store in the adapter + state.adapter.push(payload_buffer); + + if !rtp_buffer.is_marker() { + return None; + } + + // We have found the last chunk for this document, empty the adapter + let available = state.adapter.available(); + let buffer = match state.adapter.take_buffer(available) { + Ok(buffer) => buffer, + Err(err) => { + gst::element_error!( + element, + gst::ResourceError::Read, + ["Failed to empty adapter: {}", err] + ); + + return None; + } + }; + + // Sanity check the document + let map = buffer.map_readable().unwrap(); + + let utf8 = match std::str::from_utf8(map.as_ref()) { + Ok(s) => s, + Err(err) => { + gst::warning!( + CAT, + obj: element, + "Failed to decode payload as UTF-8: {}", + err + ); + + return None; + } + }; + + let forward = { + let mut forward = false; + + for token in xmlparser::Tokenizer::from(utf8) { + match token { + Ok(token) => match token { + xmlparser::Token::Comment { .. } => { + continue; + } + xmlparser::Token::Declaration { .. } => { + continue; + } + xmlparser::Token::ElementStart { local, .. } => { + if local.as_str() == "MetadataStream" { + forward = true; + } + break; + } + _ => { + forward = false; + break; + } + }, + Err(err) => { + gst::warning!(CAT, obj: element, "Invalid XML in payload: {}", err); + + return None; + } + } + } + + forward + }; + + // Incomplete, wait for the next document + if !forward { + gst::warning!( + CAT, + obj: element, + "document must start with tt:MetadataStream element", + ); + + return None; + } + + drop(map); + + Some(buffer) + } +} diff --git a/net/onvif/src/onvifdepay/mod.rs b/net/onvif/src/onvifdepay/mod.rs new file mode 100644 index 00000000..8c56bbbf --- /dev/null +++ b/net/onvif/src/onvifdepay/mod.rs @@ -0,0 +1,17 @@ +use gst::glib; +use gst::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct OnvifDepay(ObjectSubclass) @extends gst_rtp::RTPBaseDepayload, gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "rtponvifdepay", + gst::Rank::Primary, + OnvifDepay::static_type(), + ) +} diff --git a/net/onvif/src/onvifoverlay/imp.rs b/net/onvif/src/onvifoverlay/imp.rs new file mode 100644 index 00000000..07a8c336 --- /dev/null +++ b/net/onvif/src/onvifoverlay/imp.rs @@ -0,0 +1,758 @@ +use gst::glib; +use gst::prelude::*; +use gst::subclass::prelude::*; +use gst_video::prelude::*; +use pango::prelude::*; + +use once_cell::sync::Lazy; + +use std::collections::HashSet; +use std::sync::Mutex; + +use minidom::Element; + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "onvifoverlay", + gst::DebugColorFlags::empty(), + Some("ONVIF overlay element"), + ) +}); + +const DEFAULT_FONT_DESC: &str = "monospace 12"; + +// Shape description in cairo coordinates (0, 0) is top left +#[derive(Debug)] +struct Shape { + x: u32, + y: u32, + width: u32, + height: u32, + // Optional text rendered from top left of rectangle + tag: Option, +} + +#[derive(Default)] +struct State { + video_info: Option, + composition: Option, + layout: Option, + attach: bool, +} + +// SAFETY: Required because `pango::Layout` is not `Send` but the whole `State` needs to be. +// We ensure that no additional references to the layout are ever created, which makes it safe +// to send it to other threads as long as only a single thread uses it concurrently. +unsafe impl Send for State {} + +struct Settings { + font_desc: String, +} + +impl Default for Settings { + fn default() -> Self { + Self { + font_desc: String::from(DEFAULT_FONT_DESC), + } + } +} + +pub struct OnvifOverlay { + srcpad: gst::Pad, + sinkpad: gst::Pad, + state: Mutex, + settings: Mutex, +} + +impl OnvifOverlay { + fn negotiate(&self, element: &super::OnvifOverlay) -> Result { + let video_info = match self.state.lock().unwrap().video_info.as_ref() { + Some(video_info) => Ok(video_info.clone()), + None => { + gst::element_error!( + element, + gst::CoreError::Negotiation, + ["Element hasn't received valid video caps at negotiation time"] + ); + Err(gst::FlowError::NotNegotiated) + } + }?; + + let mut caps = video_info.to_caps().unwrap(); + let mut downstream_accepts_meta = false; + + let upstream_has_meta = caps + .features(0) + .map(|f| f.contains(&gst_video::CAPS_FEATURE_META_GST_VIDEO_OVERLAY_COMPOSITION)) + .unwrap_or(false); + + if !upstream_has_meta { + let mut caps_clone = caps.clone(); + let overlay_caps = caps_clone.make_mut(); + + if let Some(features) = overlay_caps.features_mut(0) { + features.add(&gst_video::CAPS_FEATURE_META_GST_VIDEO_OVERLAY_COMPOSITION); + let peercaps = self.srcpad.peer_query_caps(Some(&caps_clone)); + downstream_accepts_meta = !peercaps.is_empty(); + if downstream_accepts_meta { + caps = caps_clone; + } + } + } + + gst::debug!( + CAT, + obj: element, + "upstream has meta: {}, downstream accepts meta: {}", + upstream_has_meta, + downstream_accepts_meta + ); + + if upstream_has_meta || downstream_accepts_meta { + let mut query = gst::query::Allocation::new(&caps, false); + + if !self.srcpad.push_event(gst::event::Caps::new(&caps)) { + return Err(gst::FlowError::NotNegotiated); + } + + if !self.srcpad.peer_query(&mut query) + && self.srcpad.pad_flags().contains(gst::PadFlags::FLUSHING) + { + return Err(gst::FlowError::NotNegotiated); + } + + let attach = query + .find_allocation_meta::() + .is_some(); + + gst::debug!(CAT, obj: element, "attach meta: {}", attach); + + self.state.lock().unwrap().attach = attach; + + Ok(gst::FlowSuccess::Ok) + } else { + self.state.lock().unwrap().attach = false; + + if !self.srcpad.push_event(gst::event::Caps::new(&caps)) { + Err(gst::FlowError::NotNegotiated) + } else { + Ok(gst::FlowSuccess::Ok) + } + } + } + + fn render_shape_buffer( + &self, + state: &mut State, + width: u32, + height: u32, + tag: Option<&str>, + ) -> Option<(gst::Buffer, u32, u32)> { + let mut text_width = 0; + let mut text_height = 0; + + // If we have text to render, update the layout first in order to compute + // the final size + let layout = tag.and_then(|tag| { + state.layout.as_ref().unwrap().set_text(tag); + + state.layout.clone() + }); + + if let Some(ref layout) = layout { + let (_ink_rect, logical_rect) = layout.extents(); + + text_width = logical_rect.width() / pango::SCALE; + text_height = logical_rect.height() / pango::SCALE; + } + + let total_height = height.max(text_height as u32); + let total_width = width.max(text_width as u32); + + let mut buffer = gst::Buffer::with_size((total_width * total_height) as usize * 4).ok()?; + + gst_video::VideoMeta::add( + buffer.get_mut().unwrap(), + gst_video::VideoFrameFlags::empty(), + #[cfg(target_endian = "little")] + gst_video::VideoFormat::Bgra, + #[cfg(target_endian = "big")] + gst_video::VideoFormat::Argb, + total_width as u32, + total_height as u32, + ) + .ok()?; + + let buffer = buffer.into_mapped_buffer_writable().unwrap(); + + // Pass ownership of the buffer to the cairo surface but keep around + // a raw pointer so we can later retrieve it again when the surface + // is done + let buffer_ptr = unsafe { buffer.buffer().as_ptr() }; + let surface = cairo::ImageSurface::create_for_data( + buffer, + cairo::Format::ARgb32, + total_width as i32, + total_height as i32, + total_width as i32 * 4, + ) + .ok()?; + + let cr = cairo::Context::new(&surface).ok()?; + let line_width = 1.; + + // Clear background + cr.set_operator(cairo::Operator::Source); + cr.set_source_rgba(0.0, 0.0, 0.0, 0.0); + cr.paint().ok()?; + + cr.save().ok()?; + + // Now render the rectangle + cr.move_to(line_width, line_width); + cr.line_to(line_width, height as f64 - line_width); + cr.line_to(width as f64 - line_width, height as f64 - line_width); + cr.line_to(width as f64 - line_width, line_width); + cr.close_path(); + cr.set_source_rgba(1., 0., 0., 1.); + cr.set_line_width(line_width); + let _ = cr.stroke(); + + cr.restore().ok()?; + + // Finally render the text, if any + if let Some(layout) = layout { + cr.save().ok()?; + + cr.move_to(0., 0.); + cr.set_operator(cairo::Operator::Over); + cr.set_source_rgba(1.0, 1.0, 1.0, 1.0); + pangocairo::functions::layout_path(&cr, &layout); + cr.stroke().ok()?; + + cr.restore().ok()?; + cr.save().ok()?; + + cr.move_to(0., 0.); + cr.set_source_rgba(0.0, 0.0, 0.0, 1.0); + pangocairo::functions::show_layout(&cr, &layout); + + cr.restore().ok()?; + } + + drop(cr); + + // Safety: The surface still owns a mutable reference to the buffer but our reference + // to the surface here is the last one. After dropping the surface the buffer would be + // freed, so we keep an additional strong reference here before dropping the surface, + // which is then returned. As such it's guaranteed that nothing is using the buffer + // anymore mutably. + unsafe { + assert_eq!( + cairo::ffi::cairo_surface_get_reference_count(surface.to_raw_none()), + 1 + ); + let buffer = glib::translate::from_glib_none(buffer_ptr); + drop(surface); + Some((buffer, total_width, total_height)) + } + } + + // Update our overlay composition with a set of rectangles + fn overlay_shapes(&self, state: &mut State, element: &super::OnvifOverlay, shapes: Vec) { + if shapes.is_empty() { + state.composition = None; + return; + } + + if state.layout.is_none() { + let fontmap = match pangocairo::FontMap::new() { + Some(fontmap) => Ok(fontmap), + None => { + gst::element_error!( + element, + gst::LibraryError::Failed, + ["Failed to create pangocairo font map"] + ); + Err(gst::FlowError::Error) + } + } + .unwrap(); + let context = match fontmap.create_context() { + Some(context) => Ok(context), + None => { + gst::element_error!( + element, + gst::LibraryError::Failed, + ["Failed to create font map context"] + ); + Err(gst::FlowError::Error) + } + } + .unwrap(); + context.set_language(&pango::Language::from_string("en_US")); + context.set_base_dir(pango::Direction::Ltr); + let layout = pango::Layout::new(&context); + layout.set_alignment(pango::Alignment::Left); + let font_desc = + pango::FontDescription::from_string(&self.settings.lock().unwrap().font_desc); + layout.set_font_description(Some(&font_desc)); + + state.layout = Some(layout); + } + + let mut composition = gst_video::VideoOverlayComposition::default(); + let composition_mut = composition.get_mut().unwrap(); + for shape in &shapes { + // Sanity check: don't render 0-sized shapes + if shape.width == 0 || shape.height == 0 { + continue; + } + + gst::debug!( + CAT, + obj: element, + "Rendering shape with tag {:?} x {} y {} width {} height {}", + shape.tag, + shape.x, + shape.y, + shape.width, + shape.height + ); + + let (buffer, width, height) = match self.render_shape_buffer( + state, + shape.width, + shape.height, + shape.tag.as_deref(), + ) { + Some(ret) => ret, + None => { + gst::error!(CAT, obj: element, "Failed to render buffer"); + state.composition = None; + return; + } + }; + + let rect = gst_video::VideoOverlayRectangle::new_raw( + &buffer, + shape.x as i32, + shape.y as i32, + width, + height, + gst_video::VideoOverlayFormatFlags::PREMULTIPLIED_ALPHA, + ); + + composition_mut.add_rectangle(&rect); + } + + state.composition = Some(composition); + } + + fn sink_chain( + &self, + pad: &gst::Pad, + element: &super::OnvifOverlay, + mut buffer: gst::Buffer, + ) -> Result { + gst::trace!(CAT, obj: pad, "Handling buffer {:?}", buffer); + + if self.srcpad.check_reconfigure() { + if let Err(err) = self.negotiate(element) { + if self.srcpad.pad_flags().contains(gst::PadFlags::FLUSHING) { + self.srcpad.mark_reconfigure(); + return Ok(gst::FlowSuccess::Ok); + } else { + return Err(err); + } + } + } + + let mut state = self.state.lock().unwrap(); + + let video_info = state.video_info.as_ref().unwrap(); + let width = video_info.width() as i32; + let height = video_info.height() as i32; + + if let Ok(meta) = gst::meta::CustomMeta::from_buffer(&buffer, "OnvifXMLFrameMeta") { + let s = meta.structure(); + let mut shapes: Vec = Vec::new(); + + if let Ok(frames) = s.get::("frames") { + gst::log!(CAT, obj: element, "Overlaying {} frames", frames.len()); + + // Metadata for multiple frames may be attached to this frame, either because: + // + // * Multiple analytics modules are producing metadata + // * The metadata for two frames produced by the same module is attached + // to this frame, for instance because of resynchronization or other + // timing-related situations + // + // We want to display all detected objects for the first case, but only the + // latest version for the second case. As frames are sorted in increasing temporal + // order, we iterate them in reverse to start with the most recent, and deduplicate + // by object id. + + let mut object_ids = HashSet::new(); + + for buffer in frames.iter().rev() { + let buffer = buffer.map_readable().map_err(|_| { + gst::element_error!( + element, + gst::ResourceError::Read, + ["Failed to map buffer readable"] + ); + + gst::FlowError::Error + })?; + + let utf8 = std::str::from_utf8(buffer.as_ref()).map_err(|err| { + gst::element_error!( + element, + gst::StreamError::Format, + ["Failed to decode buffer as UTF-8: {}", err] + ); + + gst::FlowError::Error + })?; + + let root = utf8.parse::().map_err(|err| { + gst::element_error!( + element, + gst::ResourceError::Read, + ["Failed to parse buffer as XML: {}", err] + ); + + gst::FlowError::Error + })?; + + for object in root.children() { + if object.is("Object", "http://www.onvif.org/ver10/schema") { + gst::trace!(CAT, obj: element, "Handling object {:?}", object); + + let object_id = match object.attr("ObjectId") { + Some(id) => id.to_string(), + None => { + gst::warning!(CAT, obj: element, "XML Object with no ObjectId"); + continue; + } + }; + + if !object_ids.insert(object_id.clone()) { + gst::debug!(CAT, "Skipping older version of object {}", object_id); + continue; + } + + let appearance = match object + .get_child("Appearance", "http://www.onvif.org/ver10/schema") + { + Some(appearance) => appearance, + None => continue, + }; + + let shape = match appearance + .get_child("Shape", "http://www.onvif.org/ver10/schema") + { + Some(shape) => shape, + None => continue, + }; + + let tag = appearance + .get_child("Class", "http://www.onvif.org/ver10/schema") + .and_then(|class| { + class.get_child("Type", "http://www.onvif.org/ver10/schema") + }) + .map(|t| t.text()); + + let bbox = match shape + .get_child("BoundingBox", "http://www.onvif.org/ver10/schema") + { + Some(bbox) => bbox, + None => { + gst::warning!( + CAT, + obj: element, + "XML Shape with no BoundingBox" + ); + continue; + } + }; + + let left: f64 = match bbox.attr("left").and_then(|val| val.parse().ok()) + { + Some(val) => val, + None => { + gst::warning!( + CAT, + obj: element, + "BoundingBox with no left attribute" + ); + continue; + } + }; + + let right: f64 = + match bbox.attr("right").and_then(|val| val.parse().ok()) { + Some(val) => val, + None => { + gst::warning!( + CAT, + obj: element, + "BoundingBox with no right attribute" + ); + continue; + } + }; + + let top: f64 = match bbox.attr("top").and_then(|val| val.parse().ok()) { + Some(val) => val, + None => { + gst::warning!( + CAT, + obj: element, + "BoundingBox with no top attribute" + ); + continue; + } + }; + + let bottom: f64 = + match bbox.attr("bottom").and_then(|val| val.parse().ok()) { + Some(val) => val, + None => { + gst::warning!( + CAT, + obj: element, + "BoundingBox with no bottom attribute" + ); + continue; + } + }; + + let x1 = width / 2 + ((left * (width / 2) as f64) as i32); + let y1 = height / 2 - ((top * (height / 2) as f64) as i32); + let x2 = width / 2 + ((right * (width / 2) as f64) as i32); + let y2 = height / 2 - ((bottom * (height / 2) as f64) as i32); + + shapes.push(Shape { + x: x1 as u32, + y: y1 as u32, + width: (x2 - x1) as u32, + height: (y2 - y1) as u32, + tag, + }); + } + } + } + + if !frames.is_empty() { + self.overlay_shapes(&mut state, element, shapes); + } + } + } + + if let Some(composition) = &state.composition { + let buffer = buffer.make_mut(); + if state.attach { + gst_video::VideoOverlayCompositionMeta::add(buffer, composition); + } else { + let mut frame = gst_video::VideoFrameRef::from_buffer_ref_writable( + buffer, + state.video_info.as_ref().unwrap(), + ) + .unwrap(); + + if composition.blend(&mut frame).is_err() { + gst::error!(CAT, obj: pad, "Failed to blend composition"); + } + } + } + drop(state); + + self.srcpad.push(buffer) + } + + fn sink_event(&self, pad: &gst::Pad, element: &super::OnvifOverlay, event: gst::Event) -> bool { + use gst::EventView; + + gst::log!(CAT, obj: pad, "Handling event {:?}", event); + + match event.view() { + EventView::Caps(c) => { + let mut state = self.state.lock().unwrap(); + state.video_info = gst_video::VideoInfo::from_caps(c.caps()).ok(); + drop(state); + self.srcpad.check_reconfigure(); + match self.negotiate(element) { + Ok(_) => true, + Err(_) => { + self.srcpad.mark_reconfigure(); + true + } + } + } + EventView::FlushStop(..) => { + let mut state = self.state.lock().unwrap(); + state.composition = None; + pad.event_default(Some(element), event) + } + _ => pad.event_default(Some(element), event), + } + } +} + +#[glib::object_subclass] +impl ObjectSubclass for OnvifOverlay { + const NAME: &'static str = "GstOnvifOverlay"; + type Type = super::OnvifOverlay; + type ParentType = gst::Element; + + fn with_class(klass: &Self::Class) -> Self { + let templ = klass.pad_template("sink").unwrap(); + let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink")) + .chain_function(|pad, parent, buffer| { + OnvifOverlay::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |overlay, element| overlay.sink_chain(pad, element, buffer), + ) + }) + .event_function(|pad, parent, event| { + OnvifOverlay::catch_panic_pad_function( + parent, + || false, + |overlay, element| overlay.sink_event(pad, element, event), + ) + }) + .flags(gst::PadFlags::PROXY_CAPS) + .flags(gst::PadFlags::PROXY_ALLOCATION) + .build(); + + let templ = klass.pad_template("src").unwrap(); + let srcpad = gst::Pad::builder_with_template(&templ, Some("src")) + .flags(gst::PadFlags::PROXY_CAPS) + .flags(gst::PadFlags::PROXY_ALLOCATION) + .build(); + + Self { + srcpad, + sinkpad, + state: Mutex::new(State::default()), + settings: Mutex::new(Settings::default()), + } + } +} + +impl ObjectImpl for OnvifOverlay { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![glib::ParamSpecString::new( + "font-desc", + "Font Description", + "Pango font description of font to be used for rendering", + Some(DEFAULT_FONT_DESC), + glib::ParamFlags::READWRITE, + )] + }); + + PROPERTIES.as_ref() + } + + fn set_property( + &self, + _obj: &Self::Type, + _id: usize, + value: &glib::Value, + pspec: &glib::ParamSpec, + ) { + match pspec.name() { + "font-desc" => { + self.settings.lock().unwrap().font_desc = value + .get::>() + .expect("type checked upstream") + .unwrap_or_else(|| DEFAULT_FONT_DESC.into()); + self.state.lock().unwrap().layout.take(); + } + _ => unimplemented!(), + }; + } + + fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "font-desc" => self.settings.lock().unwrap().font_desc.to_value(), + _ => unimplemented!(), + } + } + + fn constructed(&self, obj: &Self::Type) { + self.parent_constructed(obj); + + obj.add_pad(&self.sinkpad).unwrap(); + obj.add_pad(&self.srcpad).unwrap(); + } +} + +impl GstObjectImpl for OnvifOverlay {} + +impl ElementImpl for OnvifOverlay { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "ONVIF overlay", + "Video/Overlay", + "Renders ONVIF analytics meta over raw video frames", + "Mathieu Duponchelle ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let caps = gst_video::VideoFormat::iter_raw() + .into_video_caps() + .unwrap() + .build(); + + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } + + fn change_state( + &self, + element: &Self::Type, + transition: gst::StateChange, + ) -> Result { + gst::trace!(CAT, obj: element, "Changing state {:?}", transition); + + match transition { + gst::StateChange::ReadyToPaused | gst::StateChange::PausedToReady => { + // Reset the whole state + let mut state = self.state.lock().unwrap(); + *state = State::default(); + } + _ => (), + } + + self.parent_change_state(element, transition) + } +} diff --git a/net/onvif/src/onvifoverlay/mod.rs b/net/onvif/src/onvifoverlay/mod.rs new file mode 100644 index 00000000..d37635e5 --- /dev/null +++ b/net/onvif/src/onvifoverlay/mod.rs @@ -0,0 +1,17 @@ +use gst::glib; +use gst::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct OnvifOverlay(ObjectSubclass) @extends gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "onvifoverlay", + gst::Rank::Primary, + OnvifOverlay::static_type(), + ) +} diff --git a/net/onvif/src/onvifpay/imp.rs b/net/onvif/src/onvifpay/imp.rs new file mode 100644 index 00000000..59ac47a0 --- /dev/null +++ b/net/onvif/src/onvifpay/imp.rs @@ -0,0 +1,197 @@ +use gst::glib; +use gst::subclass::prelude::*; +use gst_rtp::prelude::*; +use gst_rtp::subclass::prelude::*; +use once_cell::sync::Lazy; + +#[derive(Default)] +pub struct OnvifPay {} + +#[glib::object_subclass] +impl ObjectSubclass for OnvifPay { + const NAME: &'static str = "GstOnvifPay"; + type Type = super::OnvifPay; + type ParentType = gst_rtp::RTPBasePayload; +} + +impl ObjectImpl for OnvifPay {} + +impl GstObjectImpl for OnvifPay {} + +impl ElementImpl for OnvifPay { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "ONVIF metadata RTP payloader", + "Payloader/Network/RTP", + "ONVIF metadata RTP payloader", + "Mathieu Duponchelle ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let sink_caps = gst::Caps::builder("application/x-onvif-metadata") + .field("encoding", "utf8") + .build(); + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &sink_caps, + ) + .unwrap(); + + let src_caps = gst::Caps::builder("application/x-rtp") + .field("media", "application") + .field("payload", gst::IntRange::new(96, 127)) + .field("clock-rate", 90000) + .field("encoding-name", "VND.ONVIF.METADATA") + .build(); + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &src_caps, + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} + +impl RTPBasePayloadImpl for OnvifPay { + fn handle_buffer( + &self, + element: &Self::Type, + buffer: gst::Buffer, + ) -> Result { + let pts = buffer.pts(); + let dts = buffer.dts(); + + // Input buffer must be readable + let buffer = buffer.into_mapped_buffer_readable().map_err(|_| { + gst::element_error!( + element, + gst::ResourceError::Read, + ["Failed to map buffer readable"] + ); + + gst::FlowError::Error + })?; + + // Input buffer must be valid UTF-8 + let utf8 = std::str::from_utf8(buffer.as_ref()).map_err(|err| { + gst::element_error!( + element, + gst::StreamError::Format, + ["Failed to decode buffer as UTF-8: {}", err] + ); + + gst::FlowError::Error + })?; + + // Input buffer must start with a tt:MetadataStream node + let process = { + let mut process = false; + + for token in xmlparser::Tokenizer::from(utf8) { + match token { + Ok(token) => match token { + xmlparser::Token::Comment { .. } => { + continue; + } + xmlparser::Token::Declaration { .. } => { + continue; + } + xmlparser::Token::ElementStart { local, .. } => { + if local.as_str() == "MetadataStream" { + process = true; + } + break; + } + _ => { + process = false; + break; + } + }, + Err(err) => { + gst::element_error!( + element, + gst::StreamError::Format, + ["Invalid XML: {}", err] + ); + + return Err(gst::FlowError::Error); + } + } + } + + process + }; + + if !process { + gst::element_error!( + element, + gst::StreamError::Format, + ["document must start with tt:MetadataStream element"] + ); + + return Err(gst::FlowError::Error); + } + + let mtu = element.mtu(); + let payload_size = gst_rtp::RTPBuffer::<()>::calc_payload_len(mtu, 0, 0) as usize; + + let mut chunks = utf8.as_bytes().chunks(payload_size).peekable(); + let mut buflist = gst::BufferList::new_sized((utf8.len() / payload_size) + 1); + + { + let buflist_mut = buflist.get_mut().unwrap(); + + while let Some(chunk) = chunks.next() { + let mut outbuf = gst::Buffer::new_rtp_with_sizes(chunk.len() as u32, 0, 0) + .map_err(|err| { + gst::element_error!( + element, + gst::ResourceError::Write, + ["Failed to allocate output buffer: {}", err] + ); + + gst::FlowError::Error + })?; + + { + let outbuf_mut = outbuf.get_mut().unwrap(); + outbuf_mut.set_pts(pts); + outbuf_mut.set_dts(dts); + + let mut outrtp = gst_rtp::RTPBuffer::from_buffer_writable(outbuf_mut).unwrap(); + let payload = outrtp.payload_mut().unwrap(); + payload.copy_from_slice(chunk); + + // Last chunk, set marker bit + if chunks.peek().is_none() { + outrtp.set_marker(true); + } + } + + buflist_mut.add(outbuf); + } + } + + element.push_list(buflist) + } + + fn set_caps(&self, element: &Self::Type, _caps: &gst::Caps) -> Result<(), gst::LoggableError> { + element.set_options("application", true, "VND.ONVIF.METADATA", 90000); + + Ok(()) + } +} diff --git a/net/onvif/src/onvifpay/mod.rs b/net/onvif/src/onvifpay/mod.rs new file mode 100644 index 00000000..18b4a8a3 --- /dev/null +++ b/net/onvif/src/onvifpay/mod.rs @@ -0,0 +1,17 @@ +use gst::glib; +use gst::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct OnvifPay(ObjectSubclass) @extends gst_rtp::RTPBasePayload, gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "rtponvifpay", + gst::Rank::Primary, + OnvifPay::static_type(), + ) +}