diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index a13879a3..c606db33 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -2527,6 +2527,34 @@ } }, "rank": "marginal" + }, + "onvifmp4mux": { + "author": "Sebastian Dröge ", + "description": "ONVIF MP4 muxer", + "hierarchy": [ + "GstONVIFMP4Mux", + "GstRsMP4Mux", + "GstAggregator", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Codec/Muxer", + "pad-templates": { + "sink_%%u": { + "caps": "video/x-h264:\n stream-format: { (string)avc, (string)avc3 }\n alignment: au\n width: [ 1, 65535 ]\n height: [ 1, 65535 ]\nvideo/x-h265:\n stream-format: { (string)hvc1, (string)hev1 }\n alignment: au\n width: [ 1, 65535 ]\n height: [ 1, 65535 ]\nimage/jpeg:\n width: [ 1, 65535 ]\n height: [ 1, 65535 ]\naudio/mpeg:\n mpegversion: 4\n stream-format: raw\n channels: [ 1, 65535 ]\n rate: [ 1, 2147483647 ]\naudio/x-alaw:\n channels: [ 1, 2 ]\n rate: [ 1, 2147483647 ]\naudio/x-mulaw:\n channels: [ 1, 2 ]\n rate: [ 1, 2147483647 ]\naudio/x-adpcm:\n layout: g726\n channels: 1\n rate: 8000\n bitrate: { (int)16000, (int)24000, (int)32000, (int)40000 }\napplication/x-onvif-metadata:\n parsed: true\n", + "direction": "sink", + "presence": "request", + "type": "GstRsMP4MuxPad" + }, + "src": { + "caps": "video/quicktime:\n variant: iso\n", + "direction": "src", + "presence": "always" + } + }, + "rank": "marginal" } }, "filename": "gstmp4", diff --git a/mux/mp4/src/mp4mux/boxes.rs b/mux/mp4/src/mp4mux/boxes.rs index 5f121466..a4bc5e16 100644 --- a/mux/mp4/src/mp4mux/boxes.rs +++ b/mux/mp4/src/mp4mux/boxes.rs @@ -60,7 +60,7 @@ pub(super) fn create_ftyp(variant: super::Variant) -> Result let mut v = vec![]; let (brand, compatible_brands) = match variant { - super::Variant::ISO => (b"isom", vec![b"mp41", b"mp42"]), + super::Variant::ISO | super::Variant::ONVIF => (b"iso4", vec![b"mp41", b"mp42", b"isom"]), }; write_box(&mut v, b"ftyp", |v| { @@ -102,15 +102,71 @@ pub(super) fn create_mdat_header(size: Option) -> Result Result { let mut v = vec![]; write_box(&mut v, b"moov", |v| write_moov(v, &header))?; + if header.variant == super::Variant::ONVIF { + write_full_box( + &mut v, + b"meta", + FULL_BOX_VERSION_0, + FULL_BOX_FLAGS_NONE, + |v| { + write_full_box(v, b"hdlr", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { + // Handler type + v.extend(b"null"); + + // Reserved + v.extend([0u8; 3 * 4]); + + // Name + v.extend(b"MetadataHandler"); + + Ok(()) + })?; + + write_box(v, b"cstb", |v| { + // entry count + v.extend(1u32.to_be_bytes()); + + // track id + v.extend(0u32.to_be_bytes()); + + // start UTC time in 100ns units since Jan 1 1601 + // This is the UTC time of the earliest stream, which has to be converted to + // the correct epoch and scale. + let start_utc_time = header + .streams + .iter() + .map(|s| s.earliest_pts) + .min() + .unwrap() + .nseconds() + / 100; + let start_utc_time = start_utc_time + UNIX_1601_OFFSET * 10_000_000; + v.extend(start_utc_time.to_be_bytes()); + + Ok(()) + }) + }, + )?; + } + Ok(gst::Buffer::from_mut_slice(v)) } +struct TrackReference { + reference_type: [u8; 4], + track_ids: Vec, +} + fn write_moov(v: &mut Vec, header: &super::Header) -> Result<(), Error> { use gst::glib; @@ -124,7 +180,27 @@ fn write_moov(v: &mut Vec, header: &super::Header) -> Result<(), Error> { })?; for (idx, stream) in header.streams.iter().enumerate() { write_box(v, b"trak", |v| { - write_trak(v, header, idx, stream, creation_time) + let mut references = Vec::new(); + + // Reference the video track for ONVIF metadata tracks + if header.variant == super::Variant::ONVIF + && stream.caps.structure(0).unwrap().name() == "application/x-onvif-metadata" + { + // Find the first video track + for (idx, other_stream) in header.streams.iter().enumerate() { + let s = other_stream.caps.structure(0).unwrap(); + + if matches!(s.name(), "video/x-h264" | "video/x-h265" | "image/jpeg") { + references.push(TrackReference { + reference_type: *b"cdsc", + track_ids: vec![idx as u32 + 1], + }); + break; + } + } + } + + write_trak(v, header, idx, stream, creation_time, &references) })?; } @@ -240,6 +316,7 @@ fn write_trak( idx: usize, stream: &super::Stream, creation_time: u64, + references: &[TrackReference], ) -> Result<(), Error> { write_full_box( v, @@ -250,6 +327,9 @@ fn write_trak( )?; write_box(v, b"mdia", |v| write_mdia(v, header, stream, creation_time))?; + if !references.is_empty() { + write_box(v, b"tref", |v| write_tref(v, header, references))?; + } write_box(v, b"edts", |v| write_edts(v, header, stream))?; Ok(()) @@ -430,6 +510,7 @@ fn write_hdlr( "audio/mpeg" | "audio/x-opus" | "audio/x-alaw" | "audio/x-mulaw" | "audio/x-adpcm" => { (b"soun", b"SoundHandler\0".as_slice()) } + "application/x-onvif-metadata" => (b"meta", b"MetadataHandler\0".as_slice()), _ => unreachable!(), }; @@ -462,6 +543,11 @@ fn write_minf( write_smhd(v, header) })? } + "application/x-onvif-metadata" => { + write_full_box(v, b"nmhd", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |_v| { + Ok(()) + })? + } _ => unreachable!(), } @@ -613,6 +699,7 @@ fn write_stsd( "audio/mpeg" | "audio/x-opus" | "audio/x-alaw" | "audio/x-mulaw" | "audio/x-adpcm" => { write_audio_sample_entry(v, header, stream)? } + "application/x-onvif-metadata" => write_xml_meta_data_sample_entry(v, header, stream)?, _ => unreachable!(), } @@ -1201,6 +1288,34 @@ fn write_dops(v: &mut Vec, caps: &gst::Caps) -> Result<(), Error> { }) } +fn write_xml_meta_data_sample_entry( + v: &mut Vec, + _header: &super::Header, + stream: &super::Stream, +) -> Result<(), Error> { + let s = stream.caps.structure(0).unwrap(); + let namespace = match s.name() { + "application/x-onvif-metadata" => b"http://www.onvif.org/ver10/schema", + _ => unreachable!(), + }; + + write_sample_entry_box(v, b"metx", move |v| { + // content_encoding, empty string + v.push(0); + + // namespace + v.extend_from_slice(namespace); + v.push(0); + + // schema_location, empty string list + v.push(0); + + Ok(()) + })?; + + Ok(()) +} + fn write_stts( v: &mut Vec, _header: &super::Header, @@ -1528,6 +1643,24 @@ fn write_stco( Ok(()) } +fn write_tref( + v: &mut Vec, + _header: &super::Header, + references: &[TrackReference], +) -> Result<(), Error> { + for reference in references { + write_box(v, reference.reference_type, |v| { + for track_id in &reference.track_ids { + v.extend(track_id.to_be_bytes()); + } + + Ok(()) + })?; + } + + Ok(()) +} + fn write_edts( v: &mut Vec, header: &super::Header, diff --git a/mux/mp4/src/mp4mux/imp.rs b/mux/mp4/src/mp4mux/imp.rs index b8ec1d57..a57de86c 100644 --- a/mux/mp4/src/mp4mux/imp.rs +++ b/mux/mp4/src/mp4mux/imp.rs @@ -12,12 +12,38 @@ use gst::subclass::prelude::*; use gst_base::prelude::*; use gst_base::subclass::prelude::*; +use std::collections::VecDeque; use std::sync::Mutex; use once_cell::sync::Lazy; use super::boxes; +/// Offset between NTP and UNIX epoch in seconds. +/// NTP = UNIX + NTP_UNIX_OFFSET. +const NTP_UNIX_OFFSET: u64 = 2_208_988_800; + +/// Reference timestamp meta caps for NTP timestamps. +static NTP_CAPS: Lazy = Lazy::new(|| gst::Caps::builder("timestamp/x-ntp").build()); + +/// Reference timestamp meta caps for UNIX timestamps. +static UNIX_CAPS: Lazy = Lazy::new(|| gst::Caps::builder("timestamp/x-unix").build()); + +/// Returns the UTC time of the buffer in the UNIX epoch. +fn get_utc_time_from_buffer(buffer: &gst::BufferRef) -> Option { + buffer + .iter_meta::() + .find_map(|meta| { + if meta.reference().can_intersect(&UNIX_CAPS) { + Some(meta.timestamp()) + } else if meta.reference().can_intersect(&NTP_CAPS) { + meta.timestamp().checked_sub(NTP_UNIX_OFFSET.seconds()) + } else { + None + } + }) +} + static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( "mp4mux", @@ -46,6 +72,7 @@ impl Default for Settings { } } +#[derive(Debug)] struct PendingBuffer { buffer: gst::Buffer, timestamp: gst::Signed, @@ -58,6 +85,9 @@ struct Stream { /// Sink pad for this stream. sinkpad: super::MP4MuxPad, + /// Pre-queue for ONVIF variant to timestamp all buffers with their UTC time. + pre_queue: VecDeque<(gst::FormattedSegment, gst::Buffer)>, + /// Currently configured caps for this stream. caps: gst::Caps, /// Whether this stream is intra-only and has frame reordering. @@ -84,6 +114,9 @@ struct Stream { earliest_pts: Option, /// Current end PTS. end_pts: Option, + + /// In ONVIF mode, the mapping between running time and UTC time (UNIX) + running_time_utc_time_mapping: Option<(gst::ClockTime, gst::ClockTime)>, } #[derive(Default)] @@ -111,6 +144,287 @@ pub(crate) struct MP4Mux { } impl MP4Mux { + /// Checks if a buffer is valid according to the stream configuration. + fn check_buffer( + buffer: &gst::BufferRef, + sinkpad: &super::MP4MuxPad, + delta_frames: super::DeltaFrames, + ) -> Result<(), gst::FlowError> { + if delta_frames.requires_dts() && buffer.dts().is_none() { + gst::error!(CAT, obj: sinkpad, "Require DTS for video streams"); + return Err(gst::FlowError::Error); + } + + if buffer.pts().is_none() { + gst::error!(CAT, obj: sinkpad, "Require timestamped buffers"); + return Err(gst::FlowError::Error); + } + + if delta_frames.intra_only() && buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) { + gst::error!(CAT, obj: sinkpad, "Intra-only stream with delta units"); + return Err(gst::FlowError::Error); + } + + Ok(()) + } + + fn peek_buffer( + &self, + sinkpad: &super::MP4MuxPad, + delta_frames: super::DeltaFrames, + pre_queue: &mut VecDeque<(gst::FormattedSegment, gst::Buffer)>, + running_time_utc_time_mapping: &Option<(gst::ClockTime, gst::ClockTime)>, + ) -> Result, gst::Buffer)>, gst::FlowError> { + if let Some((segment, buffer)) = pre_queue.front() { + return Ok(Some((segment.clone(), buffer.clone()))); + } + + let mut buffer = match sinkpad.peek_buffer() { + None => return Ok(None), + Some(buffer) => buffer, + }; + + Self::check_buffer(&buffer, sinkpad, delta_frames)?; + + let mut segment = match sinkpad.segment().downcast::().ok() { + Some(segment) => segment, + None => { + gst::error!(CAT, obj: sinkpad, "Got buffer before segment"); + return Err(gst::FlowError::Error); + } + }; + + // For ONVIF we need to re-timestamp the buffer with its UTC time. + // We can only possibly end up here after the running-time UTC mapping is known. + // + // After re-timestamping, put the buffer into the pre-queue so re-timestamping only has to + // happen once. + if self.obj().class().as_ref().variant == super::Variant::ONVIF { + let running_time_utc_time_mapping = running_time_utc_time_mapping.unwrap(); + + let pts_position = buffer.pts().unwrap(); + let dts_position = buffer.dts(); + + let pts = segment.to_running_time_full(pts_position).unwrap(); + + let dts = dts_position + .map(|dts_position| segment.to_running_time_full(dts_position).unwrap()); + + let utc_time = match get_utc_time_from_buffer(&buffer) { + None => { + // Calculate from the mapping + gst::Signed::Positive(running_time_utc_time_mapping.1) + .checked_sub_unsigned(running_time_utc_time_mapping.0) + .and_then(|res| res.checked_add(pts)) + .and_then(|res| res.positive()) + .ok_or_else(|| { + gst::error!(CAT, obj: sinkpad, "Stream has negative PTS UTC time"); + gst::FlowError::Error + })? + } + Some(utc_time) => utc_time, + }; + + gst::trace!( + CAT, + obj: sinkpad, + "Mapped PTS running time {pts} to UTC time {utc_time}" + ); + + { + let buffer = buffer.make_mut(); + buffer.set_pts(utc_time); + + if let Some(dts) = dts { + let dts_utc_time = gst::Signed::Positive(utc_time) + .checked_sub(pts) + .and_then(|res| res.checked_add(dts)) + .and_then(|res| res.positive()) + .ok_or_else(|| { + gst::error!(CAT, obj: sinkpad, "Stream has negative DTS UTC time"); + gst::FlowError::Error + })?; + gst::trace!( + CAT, + obj: sinkpad, + "Mapped DTS running time {dts} to UTC time {dts_utc_time}" + ); + buffer.set_dts(dts_utc_time); + } + } + + segment = gst::FormattedSegment::default(); + + // Drop current buffer as it is now queued + sinkpad.drop_buffer(); + pre_queue.push_back((segment.clone(), buffer.clone())); + } + + Ok(Some((segment, buffer))) + } + + fn pop_buffer( + &self, + sinkpad: &super::MP4MuxPad, + delta_frames: super::DeltaFrames, + pre_queue: &mut VecDeque<(gst::FormattedSegment, gst::Buffer)>, + running_time_utc_time_mapping: &mut Option<(gst::ClockTime, gst::ClockTime)>, + ) -> Result, gst::Buffer)>, gst::FlowError> { + // In ONVIF mode we need to get UTC times for each buffer and synchronize based on that. + // Queue up to 6s of data to get the first UTC time and then backdate. + if self.obj().class().as_ref().variant == super::Variant::ONVIF + && running_time_utc_time_mapping.is_none() + { + if let Some((last, first)) = Option::zip(pre_queue.back(), pre_queue.front()) { + // Existence of PTS/DTS checked below + let (last, first) = if delta_frames.requires_dts() { + ( + last.0.to_running_time_full(last.1.dts()).unwrap(), + first.0.to_running_time_full(first.1.dts()).unwrap(), + ) + } else { + ( + last.0.to_running_time_full(last.1.pts()).unwrap(), + first.0.to_running_time_full(first.1.pts()).unwrap(), + ) + }; + + if last.saturating_sub(first) + > gst::Signed::Positive(gst::ClockTime::from_seconds(6)) + { + gst::error!( + CAT, + obj: sinkpad, + "Got no UTC time in the first 6s of the stream" + ); + return Err(gst::FlowError::Error); + } + } + + let buffer = match sinkpad.pop_buffer() { + None => { + if sinkpad.is_eos() { + gst::error!(CAT, obj: sinkpad, "Got no UTC time before EOS"); + return Err(gst::FlowError::Error); + } else { + return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); + } + } + Some(buffer) => buffer, + }; + + Self::check_buffer(&buffer, sinkpad, delta_frames)?; + + let segment = match sinkpad.segment().downcast::().ok() { + Some(segment) => segment, + None => { + gst::error!(CAT, obj: sinkpad, "Got buffer before segment"); + return Err(gst::FlowError::Error); + } + }; + + let utc_time = match get_utc_time_from_buffer(&buffer) { + Some(utc_time) => utc_time, + None => { + pre_queue.push_back((segment, buffer)); + return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); + } + }; + + let running_time = segment.to_running_time_full(buffer.pts()).unwrap(); + gst::info!( + CAT, + obj: sinkpad, + "Got initial UTC time {utc_time} at PTS running time {running_time}", + ); + + let running_time = running_time.positive().unwrap_or_else(|| { + gst::error!(CAT, obj: sinkpad, "Stream has negative PTS running time"); + gst::ClockTime::ZERO + }); + + *running_time_utc_time_mapping = Some((running_time, utc_time)); + + // Push the buffer onto the pre-queue and re-timestamp it and all other buffers + // based on the mapping above. + pre_queue.push_back((segment, buffer)); + + for (segment, buffer) in pre_queue.iter_mut() { + let buffer = buffer.make_mut(); + + let pts = segment.to_running_time_full(buffer.pts().unwrap()).unwrap(); + let pts_utc_time = gst::Signed::Positive(utc_time) + .checked_sub_unsigned(running_time) + .and_then(|res| res.checked_add(pts)) + .and_then(|res| res.positive()) + .ok_or_else(|| { + gst::error!(CAT, obj: sinkpad, "Stream has negative PTS UTC time"); + gst::FlowError::Error + })?; + gst::trace!( + CAT, + obj: sinkpad, + "Mapped PTS running time {pts} to UTC time {pts_utc_time}" + ); + buffer.set_pts(pts_utc_time); + + if let Some(dts) = buffer.dts() { + let dts = segment.to_running_time_full(dts).unwrap(); + let dts_utc_time = gst::Signed::Positive(pts_utc_time) + .checked_sub(pts) + .and_then(|res| res.checked_add(dts)) + .and_then(|res| res.positive()) + .ok_or_else(|| { + gst::error!(CAT, obj: sinkpad, "Stream has negative DTS UTC time"); + gst::FlowError::Error + })?; + gst::trace!( + CAT, + obj: sinkpad, + "Mapped DTS running time {dts} to UTC time {dts_utc_time}" + ); + buffer.set_dts(dts_utc_time); + } + + *segment = gst::FormattedSegment::default(); + } + + // Fall through below and pop the first buffer finally + } + + if let Some((segment, buffer)) = pre_queue.pop_front() { + return Ok(Some((segment, buffer))); + } + + // If the mapping is set, then we would get the buffer always from the pre-queue: + // - either it was set before already, in which case the next buffer would've been peeked + // for calculating the duration to the previous buffer, and then put into the pre-queue + // - or this is the very first buffer and we just put it into the queue overselves above + if self.obj().class().as_ref().variant == super::Variant::ONVIF { + if sinkpad.is_eos() { + return Ok(None); + } + unreachable!(); + } + + let buffer = match sinkpad.pop_buffer() { + None => return Ok(None), + Some(buffer) => buffer, + }; + + Self::check_buffer(&buffer, sinkpad, delta_frames)?; + + let segment = match sinkpad.segment().downcast::().ok() { + Some(segment) => segment, + None => { + gst::error!(CAT, obj: sinkpad, "Got buffer before segment"); + return Err(gst::FlowError::Error); + } + }; + + Ok(Some((segment, buffer))) + } + /// Queue a buffer and calculate its duration. /// /// Returns `Ok(())` if a buffer with duration is known or if the stream is EOS and a buffer is @@ -137,8 +451,13 @@ impl MP4Mux { .. }) => { // Already have a pending buffer but no duration, so try to get that now - let buffer = match stream.sinkpad.peek_buffer() { - Some(buffer) => buffer, + let (segment, buffer) = match self.peek_buffer( + &stream.sinkpad, + stream.delta_frames, + &mut stream.pre_queue, + &stream.running_time_utc_time_mapping, + )? { + Some(res) => res, None => { if stream.sinkpad.is_eos() { let dur = buffer.duration().unwrap_or(gst::ClockTime::ZERO); @@ -164,23 +483,8 @@ impl MP4Mux { } }; - if stream.delta_frames.requires_dts() && buffer.dts().is_none() { - gst::error!(CAT, obj: stream.sinkpad, "Require DTS for video streams"); - return Err(gst::FlowError::Error); - } - - if stream.delta_frames.intra_only() - && buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) - { - gst::error!(CAT, obj: stream.sinkpad, "Intra-only stream with delta units"); - return Err(gst::FlowError::Error); - } - - let pts_position = buffer.pts().ok_or_else(|| { - gst::error!(CAT, obj: stream.sinkpad, "Require timestamped buffers"); - gst::FlowError::Error - })?; - + // Was checked above + let pts_position = buffer.pts().unwrap(); let next_timestamp_position = if stream.delta_frames.requires_dts() { // Was checked above buffer.dts().unwrap() @@ -188,23 +492,9 @@ impl MP4Mux { pts_position }; - let segment = match stream.sinkpad.segment().downcast::().ok() { - Some(segment) => segment, - None => { - gst::error!(CAT, obj: stream.sinkpad, "Got buffer before segment"); - return Err(gst::FlowError::Error); - } - }; - - // If the stream has no valid running time, assume it's before everything else. - let next_timestamp = match segment.to_running_time_full(next_timestamp_position) - { - None => { - gst::error!(CAT, obj: stream.sinkpad, "Stream has no valid running time"); - return Err(gst::FlowError::Error); - } - Some(running_time) => running_time, - }; + let next_timestamp = segment + .to_running_time_full(next_timestamp_position) + .unwrap(); gst::trace!( CAT, @@ -243,8 +533,13 @@ impl MP4Mux { None => { // Have no buffer queued at all yet - let buffer = match stream.sinkpad.pop_buffer() { - Some(buffer) => buffer, + let (segment, buffer) = match self.pop_buffer( + &stream.sinkpad, + stream.delta_frames, + &mut stream.pre_queue, + &mut stream.running_time_utc_time_mapping, + )? { + Some(res) => res, None => { if stream.sinkpad.is_eos() { gst::trace!( @@ -261,59 +556,18 @@ impl MP4Mux { } }; - if stream.delta_frames.requires_dts() && buffer.dts().is_none() { - gst::error!(CAT, obj: stream.sinkpad, "Require DTS for video streams"); - return Err(gst::FlowError::Error); - } - - if stream.delta_frames.intra_only() - && buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) - { - gst::error!(CAT, obj: stream.sinkpad, "Intra-only stream with delta units"); - return Err(gst::FlowError::Error); - } - - let pts_position = buffer.pts().ok_or_else(|| { - gst::error!(CAT, obj: stream.sinkpad, "Require timestamped buffers"); - gst::FlowError::Error - })?; + // Was checked above + let pts_position = buffer.pts().unwrap(); let dts_position = buffer.dts(); - let segment = match stream - .sinkpad - .segment() - .clone() - .downcast::() - .ok() - { - Some(segment) => segment, - None => { - gst::error!(CAT, obj: stream.sinkpad, "Got buffer before segment"); - return Err(gst::FlowError::Error); - } - }; + let pts = segment.to_running_time_full(pts_position).unwrap() + .positive().unwrap_or_else(|| { + gst::error!(CAT, obj: stream.sinkpad, "Stream has negative PTS running time"); + gst::ClockTime::ZERO + }); - let pts = match segment.to_running_time_full(pts_position) { - None => { - gst::error!(CAT, obj: stream.sinkpad, "Stream has no valid PTS running time"); - return Err(gst::FlowError::Error); - } - Some(running_time) => running_time, - }.positive().unwrap_or_else(|| { - gst::error!(CAT, obj: stream.sinkpad, "Stream has negative PTS running time"); - gst::ClockTime::ZERO - }); - - let dts = match dts_position { - None => None, - Some(dts_position) => match segment.to_running_time_full(dts_position) { - None => { - gst::error!(CAT, obj: stream.sinkpad, "Stream has no valid DTS running time"); - return Err(gst::FlowError::Error); - } - Some(running_time) => Some(running_time), - }, - }; + let dts = dts_position + .map(|dts_position| segment.to_running_time_full(dts_position).unwrap()); let timestamp = if stream.delta_frames.requires_dts() { // Was checked above @@ -407,8 +661,7 @@ impl MP4Mux { { gst::trace!(CAT, obj: stream.sinkpad, - "Continuing current chunk: single stream {}, or {} >= {} and {} >= {}", - single_stream, + "Continuing current chunk: single stream {single_stream}, or {} >= {} and {} >= {}", gst::format::Bytes::from_u64(stream.queued_chunk_bytes), settings.interleave_bytes.map(gst::format::Bytes::from_u64).display(), stream.queued_chunk_time, settings.interleave_time.display(), @@ -528,8 +781,10 @@ impl MP4Mux { // Now we can start handling buffers while let Some(idx) = self.find_earliest_stream(settings, state)? { let stream = &mut state.streams[idx]; - let buffer = stream.pending_buffer.take().unwrap(); + + gst::trace!(CAT, obj: stream.sinkpad, "Handling buffer {buffer:?} at offset {}", state.current_offset); + let duration = buffer.duration.unwrap(); let composition_time_offset = buffer.composition_time_offset; let mut buffer = buffer.buffer; @@ -582,7 +837,7 @@ impl MP4Mux { } }; - gst::info!(CAT, obj: pad, "Configuring caps {:?}", caps); + gst::info!(CAT, obj: pad, "Configuring caps {caps:?}"); let s = caps.structure(0).unwrap(); @@ -632,6 +887,7 @@ impl MP4Mux { state.streams.push(Stream { sinkpad: pad, + pre_queue: VecDeque::new(), caps, delta_frames, chunks: Vec::new(), @@ -641,6 +897,7 @@ impl MP4Mux { start_dts: None, earliest_pts: None, end_pts: None, + running_time_utc_time_mapping: None, }); } @@ -781,7 +1038,7 @@ impl ElementImpl for MP4Mux { gst::error!( CAT, imp: self, - "Can't request new pads after start was generated" + "Can't request new pads after stream was started" ); return None; } @@ -802,7 +1059,7 @@ impl AggregatorImpl for MP4Mux { ) -> bool { use gst::QueryViewMut; - gst::trace!(CAT, obj: aggregator_pad, "Handling query {:?}", query); + gst::trace!(CAT, obj: aggregator_pad, "Handling query {query:?}"); match query.view_mut() { QueryViewMut::Caps(q) => { @@ -831,7 +1088,7 @@ impl AggregatorImpl for MP4Mux { ) -> Result { use gst::EventView; - gst::trace!(CAT, obj: aggregator_pad, "Handling event {:?}", event); + gst::trace!(CAT, obj: aggregator_pad, "Handling event {event:?}"); match event.view() { EventView::Segment(ev) => { @@ -855,7 +1112,7 @@ impl AggregatorImpl for MP4Mux { fn sink_event(&self, aggregator_pad: &gst_base::AggregatorPad, event: gst::Event) -> bool { use gst::EventView; - gst::trace!(CAT, obj: aggregator_pad, "Handling event {:?}", event); + gst::trace!(CAT, obj: aggregator_pad, "Handling event {event:?}"); match event.view() { EventView::Tag(_ev) => { @@ -870,7 +1127,7 @@ impl AggregatorImpl for MP4Mux { fn src_query(&self, query: &mut gst::QueryRef) -> bool { use gst::QueryViewMut; - gst::trace!(CAT, imp: self, "Handling query {:?}", query); + gst::trace!(CAT, imp: self, "Handling query {query:?}"); match query.view_mut() { QueryViewMut::Seeking(q) => { @@ -885,7 +1142,7 @@ impl AggregatorImpl for MP4Mux { fn src_event(&self, event: gst::Event) -> bool { use gst::EventView; - gst::trace!(CAT, imp: self, "Handling event {:?}", event); + gst::trace!(CAT, imp: self, "Handling event {event:?}"); match event.view() { EventView::Seek(_ev) => false, @@ -894,9 +1151,13 @@ impl AggregatorImpl for MP4Mux { } fn flush(&self) -> Result { + gst::info!(CAT, imp: self, "Flushing"); + let mut state = self.state.lock().unwrap(); for stream in &mut state.streams { stream.pending_buffer = None; + stream.pre_queue.clear(); + stream.running_time_utc_time_mapping = None; } drop(state); @@ -977,14 +1238,12 @@ impl AggregatorImpl for MP4Mux { // ... and then create the ftyp box plus mdat box header so we can start outputting // actual data - let buffers = buffers.get_mut().unwrap(); - let ftyp = boxes::create_ftyp(self.obj().class().as_ref().variant).map_err(|err| { gst::error!(CAT, imp: self, "Failed to create ftyp box: {err}"); gst::FlowError::Error })?; state.current_offset += ftyp.size() as u64; - buffers.add(ftyp); + buffers.get_mut().unwrap().add(ftyp); gst::info!( CAT, @@ -999,7 +1258,7 @@ impl AggregatorImpl for MP4Mux { })?; state.current_offset += mdat.size() as u64; state.mdat_size = 0; - buffers.add(mdat); + buffers.get_mut().unwrap().add(mdat); } let res = match self.drain_buffers(&settings, &mut state, buffers.get_mut().unwrap()) { @@ -1062,7 +1321,7 @@ impl AggregatorImpl for MP4Mux { if !buffers.is_empty() { if let Err(err) = self.obj().finish_buffer_list(buffers) { - gst::error!(CAT, imp: self, "Failed pushing buffer: {:?}", err); + gst::error!(CAT, imp: self, "Failed pushing buffers: {err:?}"); return Err(err); } } @@ -1091,8 +1350,7 @@ impl AggregatorImpl for MP4Mux { gst::error!( CAT, imp: self, - "Failed pushing updated mdat box header buffer downstream: {:?}", - err, + "Failed pushing updated mdat box header buffer downstream: {err:?}", ); } } @@ -1229,6 +1487,110 @@ impl MP4MuxImpl for ISOMP4Mux { const VARIANT: super::Variant = super::Variant::ISO; } +#[derive(Default)] +pub(crate) struct ONVIFMP4Mux; + +#[glib::object_subclass] +impl ObjectSubclass for ONVIFMP4Mux { + const NAME: &'static str = "GstONVIFMP4Mux"; + type Type = super::ONVIFMP4Mux; + type ParentType = super::MP4Mux; +} + +impl ObjectImpl for ONVIFMP4Mux {} + +impl GstObjectImpl for ONVIFMP4Mux {} + +impl ElementImpl for ONVIFMP4Mux { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "ONVIFMP4Mux", + "Codec/Muxer", + "ONVIF MP4 muxer", + "Sebastian Dröge ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::builder("video/quicktime") + .field("variant", "iso") + .build(), + ) + .unwrap(); + + let sink_pad_template = gst::PadTemplate::with_gtype( + "sink_%u", + gst::PadDirection::Sink, + gst::PadPresence::Request, + &[ + gst::Structure::builder("video/x-h264") + .field("stream-format", gst::List::new(["avc", "avc3"])) + .field("alignment", "au") + .field("width", gst::IntRange::::new(1, u16::MAX as i32)) + .field("height", gst::IntRange::::new(1, u16::MAX as i32)) + .build(), + gst::Structure::builder("video/x-h265") + .field("stream-format", gst::List::new(["hvc1", "hev1"])) + .field("alignment", "au") + .field("width", gst::IntRange::::new(1, u16::MAX as i32)) + .field("height", gst::IntRange::::new(1, u16::MAX as i32)) + .build(), + gst::Structure::builder("image/jpeg") + .field("width", gst::IntRange::::new(1, u16::MAX as i32)) + .field("height", gst::IntRange::::new(1, u16::MAX as i32)) + .build(), + gst::Structure::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("stream-format", "raw") + .field("channels", gst::IntRange::::new(1, u16::MAX as i32)) + .field("rate", gst::IntRange::::new(1, i32::MAX)) + .build(), + gst::Structure::builder("audio/x-alaw") + .field("channels", gst::IntRange::::new(1, 2)) + .field("rate", gst::IntRange::::new(1, i32::MAX)) + .build(), + gst::Structure::builder("audio/x-mulaw") + .field("channels", gst::IntRange::::new(1, 2)) + .field("rate", gst::IntRange::::new(1, i32::MAX)) + .build(), + gst::Structure::builder("audio/x-adpcm") + .field("layout", "g726") + .field("channels", 1i32) + .field("rate", 8000i32) + .field("bitrate", gst::List::new([16000i32, 24000, 32000, 40000])) + .build(), + gst::Structure::builder("application/x-onvif-metadata") + .field("parsed", true) + .build(), + ] + .into_iter() + .collect::(), + super::MP4MuxPad::static_type(), + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} + +impl AggregatorImpl for ONVIFMP4Mux {} + +impl MP4MuxImpl for ONVIFMP4Mux { + const VARIANT: super::Variant = super::Variant::ONVIF; +} + #[derive(Default, Clone)] struct PadSettings { trak_timescale: u32, @@ -1291,9 +1653,13 @@ impl AggregatorPadImpl for MP4MuxPad { let mux = aggregator.downcast_ref::().unwrap(); let mut mux_state = mux.imp().state.lock().unwrap(); + gst::info!(CAT, imp: self, "Flushing"); + for stream in &mut mux_state.streams { if stream.sinkpad == *self.obj() { stream.pending_buffer = None; + stream.pre_queue.clear(); + stream.running_time_utc_time_mapping = None; break; } } diff --git a/mux/mp4/src/mp4mux/mod.rs b/mux/mp4/src/mp4mux/mod.rs index 1e6ad28b..7ee2f439 100644 --- a/mux/mp4/src/mp4mux/mod.rs +++ b/mux/mp4/src/mp4mux/mod.rs @@ -24,6 +24,10 @@ glib::wrapper! { pub(crate) struct ISOMP4Mux(ObjectSubclass) @extends MP4Mux, gst_base::Aggregator, gst::Element, gst::Object; } +glib::wrapper! { + pub(crate) struct ONVIFMP4Mux(ObjectSubclass) @extends MP4Mux, gst_base::Aggregator, gst::Element, gst::Object; +} + pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { #[cfg(feature = "doc")] { @@ -36,6 +40,12 @@ pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { gst::Rank::Marginal, ISOMP4Mux::static_type(), )?; + gst::Element::register( + Some(plugin), + "onvifmp4mux", + gst::Rank::Marginal, + ONVIFMP4Mux::static_type(), + )?; Ok(()) } @@ -131,4 +141,5 @@ pub(crate) struct Header { #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) enum Variant { ISO, + ONVIF, }