From d985fb58ab4bdd22664a63ef9b8193d92965b42f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Wed, 20 Dec 2023 20:00:09 +0200 Subject: [PATCH] rtp: Add JPEG RTP payloader/depayloader --- net/rtp/src/jpeg/depay/imp.rs | 455 ++++++++++++++++++ net/rtp/src/jpeg/depay/mod.rs | 27 ++ net/rtp/src/jpeg/header.rs | 838 ++++++++++++++++++++++++++++++++++ net/rtp/src/jpeg/mod.rs | 12 + net/rtp/src/jpeg/pay/imp.rs | 346 ++++++++++++++ net/rtp/src/jpeg/pay/mod.rs | 27 ++ net/rtp/src/lib.rs | 4 + 7 files changed, 1709 insertions(+) create mode 100644 net/rtp/src/jpeg/depay/imp.rs create mode 100644 net/rtp/src/jpeg/depay/mod.rs create mode 100644 net/rtp/src/jpeg/header.rs create mode 100644 net/rtp/src/jpeg/mod.rs create mode 100644 net/rtp/src/jpeg/pay/imp.rs create mode 100644 net/rtp/src/jpeg/pay/mod.rs diff --git a/net/rtp/src/jpeg/depay/imp.rs b/net/rtp/src/jpeg/depay/imp.rs new file mode 100644 index 00000000..02dc1833 --- /dev/null +++ b/net/rtp/src/jpeg/depay/imp.rs @@ -0,0 +1,455 @@ +// +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use std::{collections::BTreeMap, io, mem}; + +use atomic_refcell::AtomicRefCell; +use bitstream_io::{BigEndian, ByteRead, ByteReader, ByteWrite as _, ByteWriter}; +/** + * SECTION:element-rtpjpegdepay2 + * @see_also: rtpjpegpay2, jpegdec, jpegenc + * + * Extracts a JPEG video stream from RTP packets as per [RFC 2435][rfc-2435]. + * + * [rfc-2435]: https://www.rfc-editor.org/rfc/rfc2435.html + * + * ## Example pipeline + * + * |[ + * gst-launch-1.0 udpsrc caps='application/x-rtp, media=video, clock-rate=90000' ! rtpjitterbuffer latency=50 ! rtpjpegdepay2 ! jpegdec ! videoconvert ! autovideosink + * ]| This will depayload an incoming RTP JPEG video stream. You can use the #jpegenc and + * #rtpjpegpay2 elements to create such an RTP stream. + * + * Since: plugins-rs-0.13.0 + */ +use gst::{glib, prelude::*, subclass::prelude::*}; + +use once_cell::sync::Lazy; + +use crate::{ + basedepay::{RtpBaseDepay2Ext, RtpBaseDepay2Impl}, + jpeg::header::{ + make_quant_tables, JpegHeader, MainHeader, QuantizationTableHeader, RestartHeader, + }, +}; + +struct PendingFrame { + /// RTP main header from the first fragment. + main_header: MainHeader, + /// Pending JPEG data. + /// + /// Already contains the JPEG headers. + data: Vec, + /// Expected next fragment offset. + expected_fragment_offset: u32, + /// Start extended seqnum. + start_ext_seqnum: u64, +} + +#[derive(Default)] +struct State { + /// Resolution from `x-dimensions` attribute + sdp_dimensions: Option<(u16, u16)>, + /// Framerate from `a-framerate` / `x-framerate` + sdp_framerate: Option, + /// Last configured width/height + dimensions: Option<(i32, i32)>, + /// Last configured framerate + framerate: Option, + + /// Currently pending frame, if any. + pending_frame: Option, + + /// Cache quantization tables. + quant_tables: BTreeMap, +} + +#[derive(Default)] +pub struct RtpJpegDepay { + state: AtomicRefCell, +} + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "rtpjpegdepay2", + gst::DebugColorFlags::empty(), + Some("RTP JPEG Depayloader"), + ) +}); + +#[glib::object_subclass] +impl ObjectSubclass for RtpJpegDepay { + const NAME: &'static str = "GstRtpJpegDepay2"; + type Type = super::RtpJpegDepay; + type ParentType = crate::basedepay::RtpBaseDepay2; +} + +impl ObjectImpl for RtpJpegDepay {} + +impl GstObjectImpl for RtpJpegDepay {} + +impl ElementImpl for RtpJpegDepay { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "RTP JPEG Depayloader", + "Codec/Depayloader/Network/RTP", + "Depayload a JPEG Video stream from RTP packets (RFC 2435)", + "Sebastian Dröge ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &gst::Caps::builder_full() + .structure( + gst::Structure::builder("application/x-rtp") + .field("media", "video") + .field("payload", 26i32) + .field("clock-rate", 90_000i32) + .build(), + ) + .structure( + gst::Structure::builder("application/x-rtp") + .field("media", "video") + .field("encoding-name", "JPEG") + .field("clock-rate", 90_000i32) + .build(), + ) + .build(), + ) + .unwrap(); + + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::builder("image/jpeg").build(), + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} +impl RtpBaseDepay2Impl for RtpJpegDepay { + const ALLOWED_META_TAGS: &'static [&'static str] = &["video"]; + + fn start(&self) -> Result<(), gst::ErrorMessage> { + let mut state = self.state.borrow_mut(); + *state = State::default(); + + Ok(()) + } + + fn stop(&self) -> Result<(), gst::ErrorMessage> { + let mut state = self.state.borrow_mut(); + *state = State::default(); + + Ok(()) + } + + fn set_sink_caps(&self, caps: &gst::Caps) -> bool { + let s = caps.structure(0).unwrap(); + + let mut state = self.state.borrow_mut(); + state.sdp_framerate = None; + state.sdp_dimensions = None; + + if let Ok(dimensions_str) = s.get::<&str>("x-dimensions") { + let dimensions = dimensions_str.split_once(',').and_then(|(width, height)| { + Some(( + width.trim().parse::().ok()?, + height.trim().parse::().ok()?, + )) + }); + + if let Some((width, height)) = dimensions { + gst::debug!(CAT, imp: self, "Parsed SDP dimensions {width}x{height}"); + state.sdp_dimensions = dimensions; + } else { + gst::warning!(CAT, imp: self, "Failed to parse 'x-dimensions' attribute: {dimensions_str}"); + } + } + + if let Some(framerate_str) = s + .get::<&str>("x-framerate") + .ok() + .or_else(|| s.get::<&str>("a-framerate").ok()) + { + // Theoretically only `.` is allowed as decimal point but thanks to C formatting + // functions being locale dependent, a lot of code out there puts a comma. + let framerate_str = framerate_str.replace(',', "."); + if let Some(framerate) = framerate_str + .parse::() + .ok() + .and_then(gst::Fraction::approximate_f64) + { + gst::debug!(CAT, imp: self, "Parsed SDP framerate {framerate}"); + state.sdp_framerate = Some(framerate); + } else { + gst::warning!(CAT, imp: self, "Failed to parse 'a-framerate' attribute: {framerate_str}"); + } + } + + true + } + + fn drain(&self) -> Result { + let mut state = self.state.borrow_mut(); + // FIXME: Could theoretically forward / handle incomplete frames + // with complete restart intervals + state.pending_frame = None; + + Ok(gst::FlowSuccess::Ok) + } + + fn handle_packet( + &self, + packet: &crate::basedepay::Packet, + ) -> Result { + let mut state = self.state.borrow_mut(); + + let payload = packet.payload(); + let mut cursor = io::Cursor::new(payload); + let mut r = ByteReader::endian(&mut cursor, BigEndian); + + // TODO: Currently only types 0, 1, 64, 65 (4:2:0 / 4:2:2 YUV) and progressive frames + // (subtype 0) are supported. + let main_header = match r.parse::() { + Ok(main_header) => main_header, + Err(err) => { + gst::warning!(CAT, imp: self, "Failed to parse main header: {err}"); + state.pending_frame = None; + self.obj().drop_packet(packet); + return Ok(gst::FlowSuccess::Ok); + } + }; + + gst::trace!(CAT, imp: self, "Parsed main header {main_header:?}"); + + if state.pending_frame.is_none() && main_header.fragment_offset > 0 { + gst::trace!(CAT, imp: self, "Waiting for start of frame"); + self.obj().drop_packet(packet); + return Ok(gst::FlowSuccess::Ok); + } + + let restart_header = if main_header.type_ >= 64 { + match r.parse::() { + Ok(restart_header) => Some(restart_header), + Err(err) => { + gst::warning!(CAT, imp: self, "Failed to parse restart header: {err}"); + self.obj().drop_packet(packet); + return Ok(gst::FlowSuccess::Ok); + } + } + } else { + None + }; + + // Handle initial setup for a frame + if main_header.fragment_offset == 0 { + if state.pending_frame.is_some() { + gst::warning!(CAT, imp: self, "Dropping incomplete pending frame"); + state.pending_frame = None; + } + + // Retrieve quantization tables, either from the packet itself or frame cached/static + // quantization tables depending on the Q value. + let quant = if main_header.q >= 128 { + match r.parse_with::(&main_header) { + Ok(quant_table_header) + if quant_table_header.luma_len != 0 + && quant_table_header.chroma_len != 0 => + { + // Dynamic quantization tables are not cached + if main_header.q != 255 { + state + .quant_tables + .insert(main_header.q, quant_table_header.clone()); + } + + quant_table_header + } + Ok(_) => match state.quant_tables.get(&main_header.q) { + Some(quant) => quant.clone(), + None => { + gst::warning!(CAT, imp: self, "Have no quantization table for Q {} yet", main_header.q); + self.obj().drop_packet(packet); + return Ok(gst::FlowSuccess::Ok); + } + }, + Err(err) => { + gst::warning!(CAT, imp: self, "Failed to parse quantization table header: {err}"); + self.obj().drop_packet(packet); + return Ok(gst::FlowSuccess::Ok); + } + } + } else { + let quant = state.quant_tables.entry(main_header.q).or_insert_with(|| { + let (luma_quant, chroma_quant) = make_quant_tables(main_header.q); + QuantizationTableHeader { + luma_quant, + luma_len: 64, + chroma_quant, + chroma_len: 64, + } + }); + quant.clone() + }; + + // Negotiate with downstream + let width = if main_header.width != 0 { + main_header.width as i32 + } else if let Some((width, _)) = state.sdp_dimensions { + width as i32 + } else { + gst::warning!(CAT, imp: self, "Can't determine valid width for frame"); + self.obj().drop_packet(packet); + return Ok(gst::FlowSuccess::Ok); + }; + + let height = if main_header.height != 0 { + main_header.height as i32 + } else if let Some((height, _)) = state.sdp_dimensions { + height as i32 + } else { + gst::warning!(CAT, imp: self, "Can't determine valid height for frame"); + self.obj().drop_packet(packet); + return Ok(gst::FlowSuccess::Ok); + }; + + if !self.obj().src_pad().has_current_caps() + || state.dimensions != Some((width, height)) + || state.framerate != state.sdp_framerate + { + let mut caps_builder = gst::Caps::builder("image/jpeg") + .field("parsed", true) + .field("width", width) + .field("height", height) + .field("sof-marker", 0i32) + .field("colorspace", "sYUV") + .field( + "sampling", + if main_header.type_ & 0x3f == 0 { + "YCbCr-4:2:2" + } else { + "YCbCr-4:2:0" + }, + ); + if let Some(framerate) = state.sdp_framerate { + caps_builder = caps_builder.field("framerate", framerate); + } + + let caps = caps_builder.build(); + gst::debug!(CAT, imp: self, "Setting caps {caps:?}"); + self.obj().set_src_caps(&caps); + state.dimensions = Some((width, height)); + state.framerate = state.sdp_framerate; + } + + let mut data = Vec::new(); + + // Prepend the JPEG headers before the actual JPEG data that comes from the packet + // payload. + let jpeg_header = match JpegHeader::new( + &main_header, + restart_header.as_ref(), + quant, + width as u16, + height as u16, + ) { + Ok(jpeg_header) => jpeg_header, + Err(err) => { + gst::warning!(CAT, imp: self, "Can't create JPEG header for frame: {err}"); + self.obj().drop_packet(packet); + return Ok(gst::FlowSuccess::Ok); + } + }; + let mut w = ByteWriter::endian(&mut data, BigEndian); + + if let Err(err) = w.build::(&jpeg_header) { + gst::warning!(CAT, imp: self, "Failed to write JPEG header for frame: {err}"); + self.obj().drop_packet(packet); + return Ok(gst::FlowSuccess::Ok); + } + + state.pending_frame = Some(PendingFrame { + main_header: main_header.clone(), + data, + expected_fragment_offset: 0, + start_ext_seqnum: packet.ext_seqnum(), + }); + } + + let pending_frame = state.pending_frame.as_mut().expect("no pending frame"); + if pending_frame.expected_fragment_offset != main_header.fragment_offset { + gst::warning!(CAT, imp: self, "Expected fragment offset {} but got {}", + pending_frame.expected_fragment_offset, + main_header.fragment_offset, + ); + state.pending_frame = None; + self.obj().drop_packet(packet); + return Ok(gst::FlowSuccess::Ok); + } + + if pending_frame.main_header.type_specific != main_header.type_specific + || pending_frame.main_header.type_ != main_header.type_ + || pending_frame.main_header.q != main_header.q + || pending_frame.main_header.width != main_header.width + || pending_frame.main_header.height != main_header.height + { + gst::warning!( + CAT, + imp: self, + "Main header changed in incompatible ways from {:?} to {:?} during a frame", + pending_frame.main_header, + main_header, + ); + + state.pending_frame = None; + self.obj().drop_packet(packet); + return Ok(gst::FlowSuccess::Ok); + } + + let jpeg_payload_offset = cursor.position() as usize; + let jpeg_payload = &payload[jpeg_payload_offset..]; + pending_frame.data.extend_from_slice(jpeg_payload); + pending_frame.expected_fragment_offset += jpeg_payload.len() as u32; + + // Wait for marker before outputting anything + if !packet.marker_bit() { + return Ok(gst::FlowSuccess::Ok); + } + + let mut pending_frame = state.pending_frame.take().expect("no pending frame"); + + // Add EOI marker if there is none + if !pending_frame.data.ends_with(&[0xff, 0xd9]) { + pending_frame.data.extend_from_slice(&[0xff, 0xd9]); + } + + let buffer = gst::Buffer::from_mut_slice(mem::take(&mut pending_frame.data)); + self.obj().queue_buffer( + crate::basedepay::PacketToBufferRelation::Seqnums( + pending_frame.start_ext_seqnum..=packet.ext_seqnum(), + ), + buffer, + ) + } +} diff --git a/net/rtp/src/jpeg/depay/mod.rs b/net/rtp/src/jpeg/depay/mod.rs new file mode 100644 index 00000000..365cf8cc --- /dev/null +++ b/net/rtp/src/jpeg/depay/mod.rs @@ -0,0 +1,27 @@ +// +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +pub mod imp; + +glib::wrapper! { + pub struct RtpJpegDepay(ObjectSubclass) + @extends crate::basedepay::RtpBaseDepay2, gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "rtpjpegdepay2", + gst::Rank::MARGINAL, + RtpJpegDepay::static_type(), + ) +} diff --git a/net/rtp/src/jpeg/header.rs b/net/rtp/src/jpeg/header.rs new file mode 100644 index 00000000..f7839580 --- /dev/null +++ b/net/rtp/src/jpeg/header.rs @@ -0,0 +1,838 @@ +// +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use std::io; + +use anyhow::{bail, Context as _}; +use bitstream_io::{ + BigEndian, ByteWrite as _, ByteWriter, FromByteStream, FromByteStreamWith, ToByteStream, + ToByteStreamWith, +}; + +#[derive(Default)] +struct Counter(usize); + +impl io::Write for Counter { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.0 += buf.len(); + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub struct MainHeader { + pub type_specific: u8, + pub fragment_offset: u32, + pub type_: u8, + pub q: u8, + pub width: u16, + pub height: u16, +} + +impl MainHeader { + pub fn size(&self) -> Result { + let mut counter = Counter::default(); + let mut w = ByteWriter::endian(&mut counter, BigEndian); + w.build::(self)?; + + Ok(counter.0) + } +} + +impl FromByteStream for MainHeader { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> Result + where + Self: Sized, + { + let type_specific = r.read::().context("type_specific")?; + let fragment_offset = u32::from_be_bytes([ + 0, + r.read::().context("fragment_offset")?, + r.read::().context("fragment_offset")?, + r.read::().context("fragment_offset")?, + ]); + let type_ = r.read::().context("type")?; + let q = r.read::().context("q")?; + let width = r.read::().context("width")?; + let height = r.read::().context("height")?; + let width = u16::from(width) * 8; + let height = u16::from(height) * 8; + + if ![0, 1, 64, 65].contains(&type_) { + bail!("Unsupported RTP JPEG type {type_}"); + } + + if type_specific != 0 { + bail!("Interlaced JPEG not supported"); + } + + Ok(MainHeader { + type_specific, + fragment_offset, + type_, + q, + width, + height, + }) + } +} + +impl ToByteStream for MainHeader { + type Error = anyhow::Error; + + fn to_writer(&self, w: &mut W) -> Result<(), Self::Error> + where + Self: Sized, + { + w.write::(self.type_specific).context("type_specific")?; + + if self.fragment_offset > 0x00ff_ffff { + bail!("Too big frame"); + } + + let fragment_offset = self.fragment_offset.to_be_bytes(); + w.write_bytes(&fragment_offset[1..]) + .context("fragment_offset")?; + + w.write::(self.type_).context("type_")?; + w.write::(self.q).context("q")?; + if self.height > 2040 || self.width > 2040 { + w.write::(0).context("width_height")?; + } else { + w.write::((self.width / 8) as u8).context("width")?; + w.write::((self.height / 8) as u8).context("height")?; + } + + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub struct RestartHeader { + pub restart_interval: u16, + pub first: bool, + pub last: bool, + pub restart_count: u16, +} + +impl FromByteStream for RestartHeader { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> Result + where + Self: Sized, + { + let restart_interval = r.read_as::().context("restart_interval")?; + let restart_count = r.read_as::().context("restart_count")?; + + let first = (restart_count & 0b1000_0000_0000_0000) != 0; + let last = (restart_count & 0b0100_0000_0000_0000) != 0; + + let restart_count = restart_count & 0b0011_1111_1111_1111; + + Ok(RestartHeader { + restart_interval, + first, + last, + restart_count, + }) + } +} + +impl<'a> ToByteStreamWith<'a> for RestartHeader { + type Error = anyhow::Error; + + type Context = MainHeader; + + fn to_writer( + &self, + w: &mut W, + main_header: &Self::Context, + ) -> Result<(), Self::Error> + where + Self: Sized, + { + // Nothing to write here + if main_header.type_ < 64 { + return Ok(()); + } + + w.write::(self.restart_interval) + .context("restart_interval")?; + + if self.restart_count > 0b0011_1111_1111_1111 { + bail!("Too high restart count"); + } + + w.write::( + self.restart_count + | if self.first { 0b1000_0000_0000_0000 } else { 0 } + | if self.last { 0b0100_0000_0000_0000 } else { 0 }, + ) + .context("restart_count")?; + + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub struct QuantizationTableHeader { + pub luma_quant: [u8; 128], + pub luma_len: u8, + pub chroma_quant: [u8; 128], + pub chroma_len: u8, +} + +impl QuantizationTableHeader { + pub fn size(&self, main_header: &MainHeader) -> Result { + let mut counter = Counter::default(); + let mut w = ByteWriter::endian(&mut counter, BigEndian); + w.build_with::(self, main_header)?; + + Ok(counter.0) + } +} + +impl Default for QuantizationTableHeader { + fn default() -> Self { + Self { + luma_quant: [0u8; 128], + luma_len: 0, + chroma_quant: [0u8; 128], + chroma_len: 0, + } + } +} + +impl<'a> FromByteStreamWith<'a> for QuantizationTableHeader { + type Error = anyhow::Error; + type Context = MainHeader; + + fn from_reader( + r: &mut R, + main_header: &Self::Context, + ) -> Result + where + Self: Sized, + { + assert!(main_header.q >= 128); + assert!(main_header.fragment_offset == 0); + assert!([0, 1, 64, 65].contains(&main_header.type_)); + + let _mbz = r.read::().context("mbz")?; + let precision = r.read::().context("precision")?; + let length = r.read_as::().context("length")?; + + if length == 0 && main_header.q == 255 { + bail!("Dynamic quantization tables can't be empty"); + } + + let mut luma_quant = [0u8; 128]; + let mut luma_len = 0; + let mut chroma_quant = [0u8; 128]; + let mut chroma_len = 0; + if length != 0 { + // All currently supported types have two tables + luma_len = if precision & 1 != 0 { 128 } else { 64 }; + chroma_len = if precision & 2 != 0 { 128 } else { 64 }; + + if length != (luma_len + chroma_len) as u16 { + bail!("Unsupported quantization table length {length}"); + } + + r.read_bytes(&mut luma_quant[..luma_len]) + .context("luma_quant")?; + r.read_bytes(&mut chroma_quant[..chroma_len]) + .context("chroma_quant")?; + } + + Ok(QuantizationTableHeader { + luma_quant, + luma_len: luma_len as u8, + chroma_quant, + chroma_len: chroma_len as u8, + }) + } +} + +impl<'a> ToByteStreamWith<'a> for QuantizationTableHeader { + type Error = anyhow::Error; + + type Context = MainHeader; + + fn to_writer( + &self, + w: &mut W, + main_header: &Self::Context, + ) -> Result<(), Self::Error> + where + Self: Sized, + { + // Nothing to write here + if main_header.q < 128 { + return Ok(()); + } + + assert!(main_header.fragment_offset == 0); + assert!([0, 1, 64, 65].contains(&main_header.type_)); + + let (precision, length) = match (self.luma_len, self.chroma_len) { + (64, 64) => (0, 64 + 64), + (128, 64) => (1, 128 + 64), + (64, 128) => (2, 128 + 64), + (128, 128) => (3, 128 + 128), + _ => { + bail!("Unsupported quantization table lengths"); + } + }; + + w.write::(0).context("mbz")?; + w.write::(precision).context("precision")?; + + // TODO: Could theoretically only write the tables every few frames + // if the same table was written before + w.write::(length).context("length")?; + + w.write_bytes(&self.luma_quant[..self.luma_len as usize]) + .context("luma_quant")?; + w.write_bytes(&self.chroma_quant[..self.chroma_len as usize]) + .context("chroma_quant")?; + + Ok(()) + } +} + +// From Appendix A + +const ZIG_ZAG: [u8; 64] = [ + 0, 1, 8, 16, 9, 2, 3, 10, 17, 24, 32, 25, 18, 11, 4, 5, 12, 19, 26, 33, 40, 48, 41, 34, 27, 20, + 13, 6, 7, 14, 21, 28, 35, 42, 49, 56, 57, 50, 43, 36, 29, 22, 15, 23, 30, 37, 44, 51, 58, 59, + 52, 45, 38, 31, 39, 46, 53, 60, 61, 54, 47, 55, 62, 63, +]; + +// Table K.1 from JPEG spec. +static JPEG_LUMA_QUANT: [u8; 64] = [ + 16, 11, 10, 16, 24, 40, 51, 61, 12, 12, 14, 19, 26, 58, 60, 55, 14, 13, 16, 24, 40, 57, 69, 56, + 14, 17, 22, 29, 51, 87, 80, 62, 18, 22, 37, 56, 68, 109, 103, 77, 24, 35, 55, 64, 81, 104, 113, + 92, 49, 64, 78, 87, 103, 121, 120, 101, 72, 92, 95, 98, 112, 100, 103, 99, +]; + +// Table K.2 from JPEG spec. +static JPEG_CHROMA_QUANT: [u8; 64] = [ + 17, 18, 24, 47, 99, 99, 99, 99, 18, 21, 26, 66, 99, 99, 99, 99, 24, 26, 56, 99, 99, 99, 99, 99, + 47, 66, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, + 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, +]; + +pub fn detect_static_quant_table( + luma_quant: &[u8], + chroma_quant: &[u8], + previous_q: Option, +) -> Option { + if luma_quant.len() != 64 || chroma_quant.len() != 64 { + return None; + } + + // TODO: Could estimate a lower bound for q here based on luma_quant[0] + // but probably doesn't really matter in the bigger picture. + + // Short-cut in case quantization tables don't change + if let Some(previous_q) = previous_q { + if Iterator::zip( + Iterator::zip(luma_quant.iter().copied(), chroma_quant.iter().copied()), + make_quant_tables_iter(previous_q), + ) + .all(|(a, b)| a == b) + { + return Some(previous_q); + } + } + + (1..=99).find(|&q| { + Iterator::zip( + Iterator::zip(luma_quant.iter().copied(), chroma_quant.iter().copied()), + make_quant_tables_iter(q), + ) + .all(|(a, b)| a == b) + }) +} + +pub fn make_quant_tables_iter(q: u8) -> impl Iterator { + let factor = u8::clamp(q, 1, 99); + let q = if q < 50 { + 5000 / factor as u32 + } else { + 200 - factor as u32 * 2 + }; + + ZIG_ZAG + .iter() + .copied() + .map(|idx| { + let idx = idx as usize; + (JPEG_LUMA_QUANT[idx], JPEG_CHROMA_QUANT[idx]) + }) + .map(move |(lq, cq)| { + let lq = (lq as u32 * q + 50) / 100; + let cq = (cq as u32 * q + 50) / 100; + + // Limit the quantizers to 1 <= q <= 255 + (u32::clamp(lq, 1, 255) as u8, u32::clamp(cq, 1, 255) as u8) + }) +} + +pub fn make_quant_tables(q: u8) -> ([u8; 128], [u8; 128]) { + let mut luma_quant = [0u8; 128]; + let mut chroma_quant = [0u8; 128]; + + for ((lq_out, cq_out), (lq, cq)) in Iterator::zip( + Iterator::zip(luma_quant.iter_mut(), chroma_quant.iter_mut()), + make_quant_tables_iter(q), + ) { + *lq_out = lq; + *cq_out = cq; + } + + (luma_quant, chroma_quant) +} + +// Appendix B + +static LUM_DC_CODELENS: [u8; 16] = [0, 1, 5, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0]; + +static LUM_DC_SYMBOLS: [u8; 12] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]; + +static LUM_AC_CODELENS: [u8; 16] = [0, 2, 1, 3, 3, 2, 4, 3, 5, 5, 4, 4, 0, 0, 1, 0x7d]; + +static LUM_AC_SYMBOLS: [u8; 162] = [ + 0x01, 0x02, 0x03, 0x00, 0x04, 0x11, 0x05, 0x12, 0x21, 0x31, 0x41, 0x06, 0x13, 0x51, 0x61, 0x07, + 0x22, 0x71, 0x14, 0x32, 0x81, 0x91, 0xa1, 0x08, 0x23, 0x42, 0xb1, 0xc1, 0x15, 0x52, 0xd1, 0xf0, + 0x24, 0x33, 0x62, 0x72, 0x82, 0x09, 0x0a, 0x16, 0x17, 0x18, 0x19, 0x1a, 0x25, 0x26, 0x27, 0x28, + 0x29, 0x2a, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x3a, 0x43, 0x44, 0x45, 0x46, 0x47, 0x48, 0x49, + 0x4a, 0x53, 0x54, 0x55, 0x56, 0x57, 0x58, 0x59, 0x5a, 0x63, 0x64, 0x65, 0x66, 0x67, 0x68, 0x69, + 0x6a, 0x73, 0x74, 0x75, 0x76, 0x77, 0x78, 0x79, 0x7a, 0x83, 0x84, 0x85, 0x86, 0x87, 0x88, 0x89, + 0x8a, 0x92, 0x93, 0x94, 0x95, 0x96, 0x97, 0x98, 0x99, 0x9a, 0xa2, 0xa3, 0xa4, 0xa5, 0xa6, 0xa7, + 0xa8, 0xa9, 0xaa, 0xb2, 0xb3, 0xb4, 0xb5, 0xb6, 0xb7, 0xb8, 0xb9, 0xba, 0xc2, 0xc3, 0xc4, 0xc5, + 0xc6, 0xc7, 0xc8, 0xc9, 0xca, 0xd2, 0xd3, 0xd4, 0xd5, 0xd6, 0xd7, 0xd8, 0xd9, 0xda, 0xe1, 0xe2, + 0xe3, 0xe4, 0xe5, 0xe6, 0xe7, 0xe8, 0xe9, 0xea, 0xf1, 0xf2, 0xf3, 0xf4, 0xf5, 0xf6, 0xf7, 0xf8, + 0xf9, 0xfa, +]; + +static CHM_DC_CODELENS: [u8; 16] = [0, 3, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0]; + +static CHM_DC_SYMBOLS: [u8; 12] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]; + +static CHM_AC_CODELENS: [u8; 16] = [0, 2, 1, 2, 4, 4, 3, 4, 7, 5, 4, 4, 0, 1, 2, 0x77]; + +static CHM_AC_SYMBOLS: [u8; 162] = [ + 0x00, 0x01, 0x02, 0x03, 0x11, 0x04, 0x05, 0x21, 0x31, 0x06, 0x12, 0x41, 0x51, 0x07, 0x61, 0x71, + 0x13, 0x22, 0x32, 0x81, 0x08, 0x14, 0x42, 0x91, 0xa1, 0xb1, 0xc1, 0x09, 0x23, 0x33, 0x52, 0xf0, + 0x15, 0x62, 0x72, 0xd1, 0x0a, 0x16, 0x24, 0x34, 0xe1, 0x25, 0xf1, 0x17, 0x18, 0x19, 0x1a, 0x26, + 0x27, 0x28, 0x29, 0x2a, 0x35, 0x36, 0x37, 0x38, 0x39, 0x3a, 0x43, 0x44, 0x45, 0x46, 0x47, 0x48, + 0x49, 0x4a, 0x53, 0x54, 0x55, 0x56, 0x57, 0x58, 0x59, 0x5a, 0x63, 0x64, 0x65, 0x66, 0x67, 0x68, + 0x69, 0x6a, 0x73, 0x74, 0x75, 0x76, 0x77, 0x78, 0x79, 0x7a, 0x82, 0x83, 0x84, 0x85, 0x86, 0x87, + 0x88, 0x89, 0x8a, 0x92, 0x93, 0x94, 0x95, 0x96, 0x97, 0x98, 0x99, 0x9a, 0xa2, 0xa3, 0xa4, 0xa5, + 0xa6, 0xa7, 0xa8, 0xa9, 0xaa, 0xb2, 0xb3, 0xb4, 0xb5, 0xb6, 0xb7, 0xb8, 0xb9, 0xba, 0xc2, 0xc3, + 0xc4, 0xc5, 0xc6, 0xc7, 0xc8, 0xc9, 0xca, 0xd2, 0xd3, 0xd4, 0xd5, 0xd6, 0xd7, 0xd8, 0xd9, 0xda, + 0xe2, 0xe3, 0xe4, 0xe5, 0xe6, 0xe7, 0xe8, 0xe9, 0xea, 0xf2, 0xf3, 0xf4, 0xf5, 0xf6, 0xf7, 0xf8, + 0xf9, 0xfa, +]; + +#[derive(Debug, Clone)] +pub struct JpegHeader { + pub type_: u8, + pub width: u16, + pub height: u16, + pub quant: QuantizationTableHeader, + pub dri: u16, +} + +impl JpegHeader { + pub fn new( + main_header: &MainHeader, + restart_header: Option<&RestartHeader>, + quant: QuantizationTableHeader, + width: u16, + height: u16, + ) -> Result { + if ![0, 1, 64, 65].contains(&main_header.type_) { + bail!("Unsupported type {}", main_header.type_); + } + + let dri = if main_header.type_ >= 64 { + if let Some(restart_header) = restart_header { + restart_header.restart_interval + } else { + bail!("Require restart header"); + } + } else { + 0 + }; + + Ok(JpegHeader { + type_: main_header.type_, + width, + height, + quant, + dri, + }) + } +} + +impl ToByteStream for JpegHeader { + type Error = anyhow::Error; + + fn to_writer(&self, w: &mut W) -> Result<(), Self::Error> + where + Self: Sized, + { + // Generate a frame and scan headers that can be prepended to the + // RTP/JPEG data payload to produce a JPEG compressed image in + // interchange format (except for possible trailing garbage and + // absence of an EOI marker to terminate the scan). + + w.write_bytes(&[0xff, 0xd8]).context("SOI")?; + + for (i, quant) in [ + &self.quant.luma_quant[..self.quant.luma_len as usize], + &self.quant.chroma_quant[..self.quant.chroma_len as usize], + ] + .into_iter() + .enumerate() + { + assert!([64, 128].contains(&quant.len())); + w.write_bytes(&[0xff, 0xdb]).context("DQT")?; + w.write::(quant.len() as u16 + 3).context("size")?; + w.write::(i as u8).context("table_no")?; + w.write_bytes(quant).context("quant")?; + } + + if self.dri != 0 { + w.write_bytes(&[0xff, 0xdd]).context("DRI")?; + w.write::(4).context("size")?; + w.write::(self.dri).context("dri")?; + } + + w.write_bytes(&[0xff, 0xc0]).context("SOF")?; + w.write::(17).context("size")?; + w.write::(8).context("precision")?; + + w.write::(self.height).context("height")?; + w.write::(self.width).context("width")?; + w.write::(3).context("comps")?; + w.write::(0).context("comp 0")?; + w.write::(if self.type_ & 0x3f == 0 { 0x21 } else { 0x22 }) + .context("samp")?; + w.write::(0).context("quant table")?; + + w.write::(1).context("comp 1")?; + w.write::(0x11).context("samp")?; + w.write::(1).context("quant table")?; + + w.write::(2).context("comp 2")?; + w.write::(0x11).context("samp")?; + w.write::(1).context("quant table")?; + + for (codelens, symbols, table, class) in [ + (LUM_DC_CODELENS, LUM_DC_SYMBOLS.as_slice(), 0, 0), + (LUM_AC_CODELENS, LUM_AC_SYMBOLS.as_slice(), 0, 1), + (CHM_DC_CODELENS, CHM_DC_SYMBOLS.as_slice(), 1, 0), + (CHM_AC_CODELENS, CHM_AC_SYMBOLS.as_slice(), 1, 1), + ] { + w.write_bytes(&[0xff, 0xc4]).context("DHT")?; + w.write::(3 + codelens.len() as u16 + symbols.len() as u16) + .context("size")?; + w.write::((class << 4) | table).context("class_table")?; + w.write_bytes(&codelens).context("codelens")?; + w.write_bytes(symbols).context("symbols")?; + } + + w.write_bytes(&[0xff, 0xda]).context("SOS")?; + w.write::(12).context("size")?; + w.write::(3).context("comps")?; + w.write::(0).context("comp")?; + w.write::(0).context("huffman table")?; + w.write::(1).context("comp")?; + w.write::(0x11).context("huffman table")?; + w.write::(2).context("comp")?; + w.write::(0x11).context("huffman table")?; + w.write::(0).context("first DCT coeff")?; + w.write::(63).context("last DCT coeff")?; + w.write::(0).context("successive approx.")?; + + Ok(()) + } +} + +impl FromByteStream for JpegHeader { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> Result + where + Self: Sized, + { + let mut width = None; + let mut height = None; + let mut dri = None; + + #[derive(Default, Clone, Copy)] + struct Component { + id: u8, + samp: u8, + quant_table: u8, + huffman_table: Option, + } + let mut components = [Component::default(); 3]; + + #[derive(Clone, Copy)] + struct QuantTable { + id: u8, + len: u8, + table: [u8; 128], + } + impl Default for QuantTable { + fn default() -> Self { + Self { + id: 0, + len: 0, + table: [0u8; 128], + } + } + } + let mut quant_table_idx = 0; + let mut quant_tables = [QuantTable::default(); 2]; + + // Parse the different markers in the JPEG headers here to extract the few information + // we're actually interested in + 'marker_loop: loop { + let marker = { + let mut start_code = false; + + loop { + match r.read::()? { + v @ 0xc0..=0xfe if start_code => { + break 0xff00 | (v as u16); + } + 0xff => { + start_code = true; + } + _ => { + start_code = false; + } + } + } + }; + + match marker { + // SOI + 0xff_d8 => (), + // EOI + 0xff_d9 => { + bail!("EOI marker before SOS marker, empty image"); + } + // SOF0 + 0xff_c0 => { + let len = r.read::().context("len")?; + if len != 17 { + bail!("Invalid SOF length {len}"); + } + let precision = r.read::().context("precision")?; + if precision != 8 { + bail!("Unsupported precision {precision}"); + } + let h = r.read::().context("height")?; + let w = r.read::().context("width")?; + if width.is_some() || height.is_some() { + bail!("Multiple SOF"); + } + width = Some(w); + height = Some(h); + + let comps = r.read::().context("comps")?; + if comps != 3 { + bail!("Unsupported number of components {comps}"); + } + + for component in &mut components { + let comp = r.read::().context("comp")?; + let samp = r.read::().context("samp")?; + let quant_table = r.read::().context("quant table")?; + + *component = Component { + id: comp, + samp, + quant_table, + huffman_table: None, + }; + } + + components.sort_by(|a, b| a.id.cmp(&b.id)); + } + // DQT + 0xff_db => { + let len = r.read::().context("len")?; + if len != 3 + 64 && len != 3 + 128 { + bail!("Unsupported quantization table size {}", len - 3); + } + let table_no = r.read::().context("table_no")?; + + if quant_table_idx > quant_tables.len() { + bail!("Too many quantization tables"); + } + + let len = len - 3; + quant_tables[quant_table_idx].id = table_no; + quant_tables[quant_table_idx].len = len as u8; + r.read_bytes( + &mut quant_tables[quant_table_idx].table[..len as usize] + ).context("quant")?; + quant_table_idx += 1; + } + // DRI + 0xff_dd => { + let len = r.read::().context("len")?; + if len != 4 { + bail!("Invalid DRI length {len}"); + } + if dri.is_some() { + bail!("Multiple DRI"); + } + dri = Some(r.read::().context("dri")?); + } + // SOF1, SOF2, SOF3, SOF9, SOF10, SOF11 + (0xff_c1..=0xff_c3) | (0xff_c9..=0xff_cb) + // DHT, DAC, COM, DNL + | 0xff_c4 | 0xff_cc | 0xff_fe | 0xff_dc | + // APP0-APP15 + (0xff_e0..=0xff_ef) => { + let len = r.read::().context("len")?; + if len < 2 { + bail!("Invalid length"); + } + r.skip(len as u32 - 2).context("skip")?; + } + // RST0-RST7 + (0xff_d0..=0xff_d7) => { + // two bytes fixed size, just the marker id itself + } + // SOS + 0xff_da => { + let len = r.read::().context("len")?; + if len != 12 { + bail!("Unsupported SOS length"); + } + + let comps = r.read::().context("comps")?; + if comps != 3 { + bail!("Unsupported number of components {comps}"); + } + + for _ in 0..3 { + let comp = r.read::().context("comp")?; + let Some(comp) = components.iter_mut().find(|c| c.id == comp) else { + bail!("Unhandled component {comp}"); + }; + let huffman_table = r.read::().context("huffman_table")?; + comp.huffman_table = Some(huffman_table); + // TODO: Could check this together with parsing DHT + } + + let first_dct = r.read::().context("first DCT coeff")?; + if first_dct != 0 { + bail!("Unsupported first DCT {first_dct}"); + } + let last_dct = r.read::().context("last DCT coeff")?; + if last_dct != 63 { + bail!("Unsupported last DCT {last_dct}"); + } + let successive_approx = r.read::().context("successive approx.")?; + if successive_approx!= 0 { + bail!("Unsupported successive approx. {successive_approx}"); + } + + break 'marker_loop; + } + _ => (), + } + } + + let width = width.unwrap(); + let height = height.unwrap(); + let dri = dri.unwrap_or(0); + + // Check if the headers are compatible with the subset of JPEG covered by the RFC + if components[0].samp != 0x21 && components[0].samp != 0x22 { + bail!( + "Unsupported component sampling {} for component 0", + components[0].samp + ); + } + if components[1].samp != 0x11 { + bail!( + "Unsupported component sampling {} for component 1", + components[1].samp + ); + } + if components[2].samp != 0x11 { + bail!( + "Unsupported component sampling {} for component 2", + components[2].samp + ); + } + let type_ = match components[0].samp { + 0x21 => 0, + 0x22 => 1, + _ => unreachable!(), + }; + + if components[1].quant_table != components[2].quant_table { + bail!("Components 1/2 have different quantization tables"); + } + if components[0].quant_table == components[1].quant_table { + bail!("Component 0 has same quantization table as component 1"); + } + + if quant_table_idx != 2 { + bail!("Wrong number of quantization tables"); + } + + let Some(luma_quant) = quant_tables + .iter() + .find(|t| t.id == components[0].quant_table) + else { + bail!("Can't find luma quantization table"); + }; + + let Some(chroma_quant) = quant_tables + .iter() + .find(|t| t.id == components[1].quant_table) + else { + bail!("Can't find chroma quantization table"); + }; + + Ok(JpegHeader { + type_, + width, + height, + quant: QuantizationTableHeader { + luma_quant: luma_quant.table, + luma_len: luma_quant.len, + chroma_quant: chroma_quant.table, + chroma_len: chroma_quant.len, + }, + dri, + }) + } +} diff --git a/net/rtp/src/jpeg/mod.rs b/net/rtp/src/jpeg/mod.rs new file mode 100644 index 00000000..eac26687 --- /dev/null +++ b/net/rtp/src/jpeg/mod.rs @@ -0,0 +1,12 @@ +// +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +pub mod depay; +mod header; +pub mod pay; diff --git a/net/rtp/src/jpeg/pay/imp.rs b/net/rtp/src/jpeg/pay/imp.rs new file mode 100644 index 00000000..6d34cfef --- /dev/null +++ b/net/rtp/src/jpeg/pay/imp.rs @@ -0,0 +1,346 @@ +// +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use atomic_refcell::AtomicRefCell; +/** + * SECTION:element-rtpjpegpay2 + * @see_also: rtpjpegdepay2, jpegdec, jpegenc + * + * Payload a JPEG video stream into RTP packets as per [RFC 2435][rfc-2435]. + * + * [rfc-2435]: https://www.rfc-editor.org/rfc/rfc2435.html + * + * ## Example pipeline + * + * |[ + * gst-launch-1.0 videotestsrc ! video/x-raw,width=1280,height=720,format=I420 ! timeoverlay font-desc=Sans,22 ! jpegenc ! jpegparse ! rtpjpegpay2 ! udpsink host=127.0.0.1 port=5004 + * ]| This will create and payload a JPEG video stream with a test pattern and + * send it out via UDP to localhost port 5004. + * + * Since: plugins-rs-0.13.0 + */ +use gst::{glib, prelude::*, subclass::prelude::*}; +use smallvec::SmallVec; +use std::{cmp, io}; + +use bitstream_io::{BigEndian, ByteRead as _, ByteReader, ByteWrite as _, ByteWriter}; +use once_cell::sync::Lazy; + +use crate::{ + basepay::RtpBasePay2Ext, + jpeg::header::{detect_static_quant_table, JpegHeader, MainHeader, QuantizationTableHeader}, +}; + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "rtpjpegpay2", + gst::DebugColorFlags::empty(), + Some("RTP JPEG Payloader"), + ) +}); + +#[derive(Default)] +struct State { + width: Option, + height: Option, + previous_q: Option, +} + +#[derive(Default)] +pub struct RtpJpegPay { + state: AtomicRefCell, +} + +#[glib::object_subclass] +impl ObjectSubclass for RtpJpegPay { + const NAME: &'static str = "GstRtpJpegPay2"; + type Type = super::RtpJpegPay; + type ParentType = crate::basepay::RtpBasePay2; +} + +impl ObjectImpl for RtpJpegPay { + fn constructed(&self) { + self.parent_constructed(); + + // Default static payload type + self.obj().set_property("pt", 26u32); + } +} + +impl GstObjectImpl for RtpJpegPay {} + +impl ElementImpl for RtpJpegPay { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "RTP JPEG payloader", + "Codec/Payloader/Network/RTP", + "Payload a JPEG Video stream to RTP packets (RFC 2435)", + "Sebastian Dröge ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &gst::Caps::builder("image/jpeg") + .field("parsed", true) + .field("width", gst::IntRange::new(1i32, u16::MAX as i32)) + .field("height", gst::IntRange::new(1i32, u16::MAX as i32)) + .field("sof-marker", 0i32) + .field("colorspace", "sYUV") + .field("sampling", gst::List::new(["YCbCr-4:2:0", "YCbCr-4:2:2"])) + .build(), + ) + .unwrap(); + + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::builder_full() + .structure( + gst::Structure::builder("application/x-rtp") + .field("media", "video") + .field("payload", 26i32) + .field("clock-rate", 90_000i32) + .build(), + ) + .structure( + gst::Structure::builder("application/x-rtp") + .field("media", "video") + .field("encoding-name", "JPEG") + .field("clock-rate", 90_000i32) + .build(), + ) + .build(), + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} + +impl crate::basepay::RtpBasePay2Impl for RtpJpegPay { + const ALLOWED_META_TAGS: &'static [&'static str] = &["video"]; + + fn start(&self) -> Result<(), gst::ErrorMessage> { + let mut state = self.state.borrow_mut(); + *state = State::default(); + + Ok(()) + } + + fn stop(&self) -> Result<(), gst::ErrorMessage> { + let mut state = self.state.borrow_mut(); + *state = State::default(); + + Ok(()) + } + + fn set_sink_caps(&self, caps: &gst::Caps) -> bool { + gst::debug!(CAT, imp: self, "received caps {caps:?}"); + + let s = caps.structure(0).unwrap(); + + let mut caps_builder = gst::Caps::builder("application/x-rtp") + .field("media", "video") + .field("clock-rate", 90_000i32); + + if let Some(framerate) = s + .get::("framerate") + .ok() + .filter(|fps| *fps > gst::Fraction::new(0, 1)) + { + caps_builder = caps_builder.field( + "a-framerate", + format!( + "{}", + (framerate.numer() as f64 / (framerate.denom() as f64)) + ), + ); + } + + let width = s.get::("width").unwrap() as u16; + let height = s.get::("height").unwrap() as u16; + + // If the resolution doesn't fit into the RTP payload header then pass it via the SDP and + // set it to 0 inside the RTP payload header + if width > 2040 || height > 2040 { + caps_builder = caps_builder.field("x-dimensions", format!("{width},{height}")); + } + + self.obj().set_src_caps(&caps_builder.build()); + + let mut state = self.state.borrow_mut(); + // If the resolution doesn't fit into the RTP payload header then pass it via the SDP and + // set it to 0 inside the RTP payload header + if width > 2040 || height > 2040 { + state.width = Some(0); + state.height = Some(0); + } else { + state.width = Some(width); + state.height = Some(height); + } + + true + } + + fn handle_buffer( + &self, + buffer: &gst::Buffer, + id: u64, + ) -> Result { + let mut state = self.state.borrow_mut(); + + let max_payload_size = self.obj().max_payload_size(); + + gst::trace!(CAT, imp: self, "received buffer of size {}", buffer.size()); + + let map = buffer.map_readable().map_err(|_| { + gst::element_imp_error!( + self, + gst::ResourceError::Read, + ["Failed to map buffer readable"] + ); + + gst::FlowError::Error + })?; + + // Set together with the caps + let width = state.width.unwrap(); + let height = state.height.unwrap(); + + let mut cursor = io::Cursor::new(&map); + let mut r = ByteReader::endian(&mut cursor, BigEndian); + let jpeg_header = match r.parse::() { + Ok(header) => header, + Err(err) => { + gst::error!(CAT, imp: self, "Failed parsing JPEG header: {err}"); + return Err(gst::FlowError::Error); + } + }; + let data_offset = cursor.position() as usize; + gst::trace!(CAT, imp: self, "Parsed JPEG header {jpeg_header:?}, data starts at offset {data_offset}"); + + // Try detecting static quantization headers + let luma_quant = &jpeg_header.quant.luma_quant[..jpeg_header.quant.luma_len as usize]; + let chroma_quant = &jpeg_header.quant.chroma_quant[..jpeg_header.quant.chroma_len as usize]; + let q = if let Some(q) = + detect_static_quant_table(luma_quant, chroma_quant, state.previous_q) + { + state.previous_q = Some(q); + q + } else { + state.previous_q = None; + 255 + }; + + gst::trace!(CAT, imp: self, "Using Q {q}"); + + let mut data = &map[data_offset..]; + let mut fragment_offset = 0; + while !data.is_empty() { + let main_header = MainHeader { + type_specific: 0, + fragment_offset, + type_: jpeg_header.type_, + q, + width, + height, + }; + let main_header_size = main_header.size().map_err(|err| { + gst::error!(CAT, imp: self, "Failed to write main header: {err:?}"); + gst::FlowError::Error + })?; + + // TODO: can handle restart headers better, for now we just don't bother + + let quant_table_header = if fragment_offset == 0 && q >= 128 { + Some(jpeg_header.quant.clone()) + } else { + None + }; + let quant_table_header_size = quant_table_header + .as_ref() + .map(|q| q.size(&main_header)) + .unwrap_or(Ok(0)) + .map_err(|err| { + gst::error!(CAT, imp: self, "Failed to write quantization table header: {err:?}"); + gst::FlowError::Error + })?; + + let overhead = main_header_size + quant_table_header_size; + let payload_size = (max_payload_size as usize) + .checked_sub(overhead + 1) + .ok_or_else(|| { + gst::error!(CAT, imp: self, "Too small MTU configured for stream"); + gst::element_imp_error!( + self, + gst::LibraryError::Settings, + ["Too small MTU configured for stream"] + ); + gst::FlowError::Error + })? + + 1; + let payload_size = cmp::min(payload_size, data.len()); + + gst::trace!( + CAT, + imp: self, + "Writing packet with main header {main_header:?}, quantization table header {quant_table_header:?} and payload size {payload_size}", + ); + + // 8 bytes main header, 4 bytes quantization table header and up to 2x 128 bytes + // quantization table. + let mut headers_buffer = SmallVec::<[u8; 8 + 4 + 256]>::with_capacity( + main_header_size + quant_table_header_size, + ); + + let mut w = ByteWriter::endian(&mut headers_buffer, BigEndian); + w.build::(&main_header).map_err(|err| { + gst::error!(CAT, imp: self, "Failed to write main header: {err:?}"); + gst::FlowError::Error + })?; + if let Some(quant_table_header) = quant_table_header { + w.build_with::(&quant_table_header, &main_header) + .map_err(|err| { + gst::error!(CAT, imp: self, "Failed to write quantization table header: {err:?}"); + gst::FlowError::Error + })?; + } + assert_eq!( + headers_buffer.len(), + main_header_size + quant_table_header_size, + ); + + self.obj().queue_packet( + id.into(), + rtp_types::RtpPacketBuilder::new() + .marker_bit(data.len() == payload_size) + .payload(headers_buffer.as_slice()) + .payload(&data[..payload_size]), + )?; + + fragment_offset += payload_size as u32; + data = &data[payload_size..]; + } + + Ok(gst::FlowSuccess::Ok) + } +} diff --git a/net/rtp/src/jpeg/pay/mod.rs b/net/rtp/src/jpeg/pay/mod.rs new file mode 100644 index 00000000..c69f8e1c --- /dev/null +++ b/net/rtp/src/jpeg/pay/mod.rs @@ -0,0 +1,27 @@ +// +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +pub mod imp; + +glib::wrapper! { + pub struct RtpJpegPay(ObjectSubclass) + @extends crate::basepay::RtpBasePay2, gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "rtpjpegpay2", + gst::Rank::MARGINAL, + RtpJpegPay::static_type(), + ) +} diff --git a/net/rtp/src/lib.rs b/net/rtp/src/lib.rs index f68dcb41..8d3f2b42 100644 --- a/net/rtp/src/lib.rs +++ b/net/rtp/src/lib.rs @@ -24,6 +24,7 @@ mod basedepay; mod basepay; mod av1; +mod jpeg; mod mp2t; mod pcmau; mod vp8; @@ -50,6 +51,9 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { av1::depay::register(plugin)?; av1::pay::register(plugin)?; + jpeg::depay::register(plugin)?; + jpeg::pay::register(plugin)?; + mp2t::depay::register(plugin)?; mp2t::pay::register(plugin)?;