diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index e827f9b0..93cc4823 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -5235,6 +5235,34 @@ }, "rank": "none" }, + "cea708mux": { + "author": "Matthew Waters ", + "description": "Combines multiple CEA-708 streams", + "hierarchy": [ + "GstCea708Mux", + "GstAggregator", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Muxer", + "pad-templates": { + "sink_%%u": { + "caps": "closedcaption/x-cea-708:\n format: cc_data\n framerate: { (fraction)60/1, (fraction)60000/1001, (fraction)50/1, (fraction)30/1, (fraction)30000/1001, (fraction)25/1, (fraction)24/1, (fraction)24000/1001 }\n", + "direction": "sink", + "presence": "request", + "type": "GstCea708MuxSinkPad" + }, + "src": { + "caps": "closedcaption/x-cea-708:\n format: cc_data\n framerate: { (fraction)60/1, (fraction)60000/1001, (fraction)50/1, (fraction)30/1, (fraction)30000/1001, (fraction)25/1, (fraction)24/1, (fraction)24000/1001 }\n", + "direction": "src", + "presence": "always", + "type": "GstAggregatorPad" + } + }, + "rank": "none" + }, "jsontovtt": { "author": "Jan Schmidt ", "description": "Converts JSON to WebVTT", @@ -5807,6 +5835,17 @@ "filename": "gstrsclosedcaption", "license": "MPL", "other-types": { + "GstCea708MuxSinkPad": { + "hierarchy": [ + "GstCea708MuxSinkPad", + "GstAggregatorPad", + "GstPad", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "kind": "object" + }, "GstTranscriberBinCaptionSource": { "kind": "enum", "values": [ diff --git a/video/closedcaption/Cargo.toml b/video/closedcaption/Cargo.toml index 14c484a7..6d2d5d9b 100644 --- a/video/closedcaption/Cargo.toml +++ b/video/closedcaption/Cargo.toml @@ -23,7 +23,7 @@ serde_json = { version = "1.0", features = ["raw_value"] } cea708-types = "0.3" once_cell.workspace = true gst = { workspace = true, features = ["v1_16"]} -gst-base = { workspace = true, features = ["v1_16"]} +gst-base = { workspace = true, features = ["v1_18"]} gst-video = { workspace = true, features = ["v1_16"]} winnow = "0.6" diff --git a/video/closedcaption/src/cea708mux/imp.rs b/video/closedcaption/src/cea708mux/imp.rs new file mode 100644 index 00000000..2dc9f405 --- /dev/null +++ b/video/closedcaption/src/cea708mux/imp.rs @@ -0,0 +1,613 @@ +// Copyright (C) 2023 Matthew Waters +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use std::collections::{HashMap, VecDeque}; +use std::sync::Mutex; + +use cea708_types::{CCDataParser, Service}; +use cea708_types::{CCDataWriter, DTVCCPacket, Framerate}; +use gst::glib; +use gst::prelude::*; +use gst::subclass::prelude::*; +use gst_base::prelude::*; +use gst_base::subclass::prelude::*; + +use once_cell::sync::Lazy; + +#[derive(Default, Copy, Clone, PartialEq, Eq)] +enum CeaFormat { + S334_1a, + Cea608Field0, + Cea608Field1, + CcData, + #[default] + Cdp, +} + +impl CeaFormat { + fn from_caps(caps: &gst::CapsRef) -> Result { + let structure = caps.structure(0).expect("Caps has no structure"); + match structure.name().as_str() { + "closedcaption/x-cea-608" => match structure.get::<&str>("format") { + Ok("raw") => { + if structure.has_field("field") { + match structure.get::("field") { + Ok(0) => Ok(CeaFormat::Cea608Field0), + Ok(1) => Ok(CeaFormat::Cea608Field1), + _ => Err(gst::loggable_error!( + CAT, + "unknown \'field\' value in caps, {caps:?}" + )), + } + } else { + Ok(CeaFormat::Cea608Field0) + } + } + Ok("s334-1a") => Ok(CeaFormat::S334_1a), + v => Err(gst::loggable_error!( + CAT, + "unknown or missing \'format\' value {v:?} in caps, {caps:?}" + )), + }, + "closedcaption/x-cea-708" => match structure.get::<&str>("format") { + Ok("cdp") => Ok(CeaFormat::Cdp), + Ok("cc_data") => Ok(CeaFormat::CcData), + v => Err(gst::loggable_error!( + CAT, + "unknown or missing \'format\' value {v:?} in caps, {caps:?}" + )), + }, + name => Err(gst::loggable_error!( + CAT, + "Unknown caps name: {name} in caps" + )), + } + } +} + +fn fps_from_caps(caps: &gst::CapsRef) -> Result { + let structure = caps.structure(0).expect("Caps has no structure"); + let framerate = structure + .get::("framerate") + .map_err(|_| gst::loggable_error!(CAT, "Caps do not contain framerate"))?; + Ok(Framerate::new( + framerate.numer() as u32, + framerate.denom() as u32, + )) +} + +#[derive(Default)] +struct State { + out_format: CeaFormat, + fps: Option, + dtvcc_seq_no: u8, + writer: CCDataWriter, + n_frames: u64, +} + +pub struct Cea708Mux { + srcpad: gst_base::AggregatorPad, + + state: Mutex, +} + +pub(crate) static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "cea708mux", + gst::DebugColorFlags::empty(), + Some("CEA-708 Mux Element"), + ) +}); + +impl AggregatorImpl for Cea708Mux { + fn aggregate(&self, timeout: bool) -> Result { + let mut state = self.state.lock().unwrap(); + let fps = state.fps.unwrap(); + let src_segment = self + .obj() + .src_pad() + .segment() + .downcast::() + .expect("Non-TIME segment"); + + let start_running_time = + if src_segment.position().is_none() || src_segment.position() < src_segment.start() { + src_segment.start().unwrap() + } else { + src_segment.position().unwrap() + }; + + let duration = 1_000_000_000 + .mul_div_round(fps.denom() as u64, fps.numer() as u64) + .unwrap() + .nseconds(); + let end_running_time = start_running_time + duration; + let mut need_data = false; + gst::debug!(CAT, imp: self, "Aggregating for start time {} end {} timeout {}", + start_running_time.display(), + end_running_time.display(), + timeout); + + let sinkpads = self.obj().sink_pads(); + + // phase 1, ensure all pads have the relevant data (or a timeout) + for pad in sinkpads.iter().map(|pad| { + pad.downcast_ref::() + .expect("Not a Cea708MuxSinkPad?!") + }) { + let mut pad_state = pad.imp().pad_state.lock().unwrap(); + pad_state.pending_buffer = None; + // any data we currently have stored + let have_pending = pad_state + .pending_services + .values() + .any(|codes| !codes.is_empty()); + if pad.is_eos() { + continue; + } + let buffer = if let Some(buffer) = pad.peek_buffer() { + buffer + } else { + need_data = true; + continue; + }; + + let Ok(segment) = pad.segment().downcast::() else { + drop(pad_state); + drop(state); + self.post_error_message(gst::error_msg!( + gst::CoreError::Clock, + ["Incoming segment not in TIME format"] + )); + return Err(gst::FlowError::Error); + }; + let Some(buffer_start_ts) = segment.to_running_time(buffer.pts()) else { + drop(pad_state); + drop(state); + self.post_error_message(gst::error_msg!( + gst::CoreError::Clock, + ["Incoming buffer does not contain valid PTS"] + )); + return Err(gst::FlowError::Error); + }; + if buffer_start_ts > end_running_time { + // buffer is not for this output time, skip + continue; + } + let duration = buffer.duration().unwrap_or(gst::ClockTime::ZERO); + let buffer_end_ts = buffer_start_ts + duration; + // allow a 1 second grace period before dropping data + if start_running_time.saturating_sub(buffer_end_ts) > gst::ClockTime::from_seconds(1) { + gst::warning!(CAT, obj: pad, + "Dropping buffer because start_running_time {} is more than 1s later than buffer_end_ts {}", + start_running_time.display(), + buffer_end_ts.display()); + pad.drop_buffer(); + if !have_pending { + need_data = true; + } + continue; + } + + let Ok(mapped) = buffer.map_readable() else { + drop(pad_state); + drop(state); + self.post_error_message(gst::error_msg!( + gst::CoreError::Clock, + ["Failed to map input buffer"] + )); + return Err(gst::FlowError::Error); + }; + + gst::debug!(CAT, obj: pad, "Parsing input buffer {buffer:?}"); + let in_format = pad_state.format; + match in_format { + CeaFormat::CcData => { + // gst's cc_data does not contain the 2 byte header contained in the CEA-708 + // specification + let mut cc_data = vec![0; 2]; + // reserved | process_cc_data | length + cc_data[0] = 0x80 | 0x40 | ((mapped.len() / 3) & 0x1f) as u8; + cc_data[1] = 0xFF; + cc_data.extend(mapped.iter()); + pad_state.ccp_parser.push(&cc_data).unwrap(); + } + _ => unreachable!(), + } + pad_state.pending_buffer = Some(buffer.clone()); + pad.drop_buffer(); + } + + if need_data && !timeout { + return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); + } + + self.obj() + .selected_samples(start_running_time, None, duration, None); + + // phase 2: write stored data into output packet + let mut services = HashMap::new(); + + for pad in sinkpads.iter().map(|pad| { + pad.downcast_ref::() + .expect("Not a Cea708MuxSinkPad?!") + }) { + let mut pad_state = pad.imp().pad_state.lock().unwrap(); + pad_state.pending_buffer = None; + let in_format = pad_state.format; + #[allow(clippy::single_match)] + match in_format { + CeaFormat::CcData => { + while let Some(packet) = pad_state.ccp_parser.pop_packet() { + for service in packet.services() { + let service_no = service.number(); + if service.number() == 0 { + // skip null service + continue; + } + let new_service = services + .entry(service_no) + .or_insert_with_key(|&n| Service::new(n)); + + let mut overflowed = false; + if let Some(pending_codes) = + pad_state.pending_services.get_mut(&service.number()) + { + while let Some(code) = pending_codes.pop_front() { + match new_service.push_code(&code) { + Ok(_) => (), + Err(cea708_types::WriterError::WouldOverflow(_)) => { + overflowed = true; + pending_codes.push_front(code); + break; + } + Err(cea708_types::WriterError::ReadOnly) => unreachable!(), + } + } + } + + for code in service.codes() { + gst::trace!(CAT, obj: pad, "Handling service {} code {code:?}", service.number()); + if overflowed { + pad_state + .pending_services + .entry(service.number()) + .or_default() + .push_back(code.clone()); + } else { + match new_service.push_code(code) { + Ok(_) => (), + Err(cea708_types::WriterError::WouldOverflow(_)) => { + overflowed = true; + pad_state + .pending_services + .entry(service.number()) + .or_default() + .push_back(code.clone()); + } + Err(cea708_types::WriterError::ReadOnly) => unreachable!(), + } + } + } + } + } + } + _ => (), + } + } + + let mut packet = DTVCCPacket::new(state.dtvcc_seq_no & 0x3); + state.dtvcc_seq_no = state.dtvcc_seq_no.wrapping_add(1); + + for (_service_no, service) in services.into_iter() { + // FIXME: handle needing to split services + gst::trace!(CAT, imp: self, "Adding service {} to packet", service.number()); + packet.push_service(service).unwrap(); + } + + let mut data = vec![]; + state.writer.push_packet(packet); + let _ = state.writer.write(fps, &mut data); + state.n_frames += 1; + drop(state); + + // remove 2 byte header that our cc_data format does not use + let ret = if data.len() > 2 { + let data = data.split_off(2); + gst::trace!(CAT, "generated data {data:x?}"); + let mut buf = gst::Buffer::from_mut_slice(data); + { + let buf = buf.get_mut().unwrap(); + buf.set_pts(Some(start_running_time)); + if start_running_time < end_running_time { + buf.set_duration(Some(end_running_time - start_running_time)); + } + } + + self.finish_buffer(buf) + } else { + self.srcpad.push_event( + gst::event::Gap::builder(start_running_time) + .duration(duration) + .build(), + ); + Ok(gst::FlowSuccess::Ok) + }; + + self.obj().set_position(end_running_time); + + ret + } + + fn peek_next_sample(&self, pad: &gst_base::AggregatorPad) -> Option { + let cea_pad = pad + .downcast_ref::() + .expect("Not a Cea708MuxSinkPad?!"); + let pad_state = cea_pad.imp().pad_state.lock().unwrap(); + pad_state + .pending_buffer + .as_ref() + .zip(cea_pad.current_caps()) + .map(|(buffer, caps)| { + gst::Sample::builder() + .buffer(buffer) + .segment(&cea_pad.segment()) + .caps(&caps) + .build() + }) + } + + fn next_time(&self) -> Option { + self.obj().simple_get_next_time() + } + + fn flush(&self) -> Result { + let mut state = self.state.lock().unwrap(); + let format = state.out_format; + let fps = state.fps; + *state = State::default(); + state.out_format = format; + state.fps = fps; + state.n_frames = 0; + + self.obj() + .src_pad() + .segment() + .set_position(None::); + + Ok(gst::FlowSuccess::Ok) + } + + fn negotiated_src_caps(&self, caps: &gst::Caps) -> Result<(), gst::LoggableError> { + let mut state = self.state.lock().unwrap(); + state.out_format = CeaFormat::from_caps(caps.as_ref())?; + state.fps = Some(fps_from_caps(caps.as_ref())?); + Ok(()) + } + + fn sink_event(&self, pad: &gst_base::AggregatorPad, event: gst::Event) -> bool { + let mux_pad = pad + .downcast_ref::() + .expect("Not a Cea708MuxSinkPad"); + use gst::EventView; + + gst::log!(CAT, obj: pad, "Handling event {:?}", event); + #[allow(clippy::single_match)] + match event.view() { + EventView::Caps(event) => { + let mut state = mux_pad.imp().pad_state.lock().unwrap(); + state.format = match CeaFormat::from_caps(event.caps()) { + Ok(format) => format, + Err(err) => { + err.log_with_imp(self); + return false; + } + }; + } + _ => (), + } + + self.parent_sink_event(pad, event) + } + + fn clip( + &self, + aggregator_pad: &gst_base::AggregatorPad, + buffer: gst::Buffer, + ) -> Option { + let Some(pts) = buffer.pts() else { + return Some(buffer); + }; + let segment = aggregator_pad.segment(); + segment + .downcast_ref::() + .map(|segment| segment.clip(pts, pts)) + .map(|_| buffer) + } +} + +impl ElementImpl for Cea708Mux { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "CEA-708 Mux", + "Muxer", + "Combines multiple CEA-708 streams", + "Matthew Waters ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let framerates = gst::List::new([ + gst::Fraction::new(60, 1), + gst::Fraction::new(60000, 1001), + gst::Fraction::new(50, 1), + gst::Fraction::new(30, 1), + gst::Fraction::new(30000, 1001), + gst::Fraction::new(25, 1), + gst::Fraction::new(24, 1), + gst::Fraction::new(24000, 1001), + ]); + + let src_pad_template = gst::PadTemplate::builder( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &[ + // TODO: handle CDP and s334-1a output + /*gst::Structure::builder("closedcaption/x-cea-708") + .field("format", "cdp") + .field("framerate", framerates) + .build(),*/ + gst::Structure::builder("closedcaption/x-cea-708") + .field("format", "cc_data") + .field("framerate", framerates.clone()) + .build(), + /*gst::Structure::builder("closedcaption/x-cea-608") + .field("format", "s334-1a") + .field("framerate", framerates) + .build()*/ + ] + .into_iter() + .collect::(), + ) + .gtype(gst_base::AggregatorPad::static_type()) + .build() + .unwrap(); + + let sink_pad_template = gst::PadTemplate::builder( + "sink_%u", + gst::PadDirection::Sink, + gst::PadPresence::Request, + &[ + // TODO: handle 608-only or cdp input + /* + gst::Structure::builder("closedcaption/x-cea-608") + .field("format", "s334-1a") + .build(), + gst::Structure::builder("closedcaption/x-cea-608") + .field("format", "raw") + .field("field", gst::List::new([0, 1])) + .build(),*/ + gst::Structure::builder("closedcaption/x-cea-708") + .field("format", "cc_data") + .field("framerate", framerates) + .build(), + /*gst::Structure::builder("closedcaption/x-cea-708") + .field("framerate", framerates) + .field("format", "cdp") + .build(),*/ + ] + .into_iter() + .collect::(), + ) + .gtype(super::Cea708MuxSinkPad::static_type()) + .build() + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } + + #[allow(clippy::single_match)] + fn change_state( + &self, + transition: gst::StateChange, + ) -> Result { + gst::trace!(CAT, imp: self, "Changing state {:?}", transition); + + match transition { + gst::StateChange::ReadyToPaused => { + let mut state = self.state.lock().unwrap(); + *state = State::default(); + } + _ => (), + } + + let ret = self.parent_change_state(transition)?; + + match transition { + gst::StateChange::PausedToReady => { + let mut state = self.state.lock().unwrap(); + *state = State::default(); + } + _ => (), + } + + Ok(ret) + } +} + +impl GstObjectImpl for Cea708Mux {} + +impl ObjectImpl for Cea708Mux {} + +#[glib::object_subclass] +impl ObjectSubclass for Cea708Mux { + const NAME: &'static str = "GstCea708Mux"; + type Type = super::Cea708Mux; + type ParentType = gst_base::Aggregator; + + fn with_class(klass: &Self::Class) -> Self { + let templ = klass.pad_template("src").unwrap(); + let srcpad = gst::Pad::builder_from_template(&templ) + .build() + .downcast::() + .expect("Not a GstAggregatorPad?!"); + + Self { + srcpad, + state: Mutex::new(State::default()), + } + } +} + +#[derive(Default)] +struct PadState { + format: CeaFormat, + ccp_parser: CCDataParser, + pending_services: HashMap>, + pending_buffer: Option, +} + +#[derive(Default)] +pub struct Cea708MuxSinkPad { + pad_state: Mutex, +} + +impl Cea708MuxSinkPad {} + +impl AggregatorPadImpl for Cea708MuxSinkPad { + fn flush( + &self, + _aggregator: &gst_base::Aggregator, + ) -> Result { + let mut state = self.pad_state.lock().unwrap(); + state.ccp_parser.flush(); + Ok(gst::FlowSuccess::Ok) + } +} + +impl PadImpl for Cea708MuxSinkPad {} + +impl GstObjectImpl for Cea708MuxSinkPad {} + +impl ObjectImpl for Cea708MuxSinkPad {} + +#[glib::object_subclass] +impl ObjectSubclass for Cea708MuxSinkPad { + const NAME: &'static str = "GstCea708MuxSinkPad"; + type Type = super::Cea708MuxSinkPad; + type ParentType = gst_base::AggregatorPad; +} diff --git a/video/closedcaption/src/cea708mux/mod.rs b/video/closedcaption/src/cea708mux/mod.rs new file mode 100644 index 00000000..317ff5d1 --- /dev/null +++ b/video/closedcaption/src/cea708mux/mod.rs @@ -0,0 +1,32 @@ +// Copyright (C) 2023 Matthew Waters +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct Cea708Mux(ObjectSubclass) @extends gst_base::Aggregator, gst::Element, gst::Object; +} + +glib::wrapper! { + pub struct Cea708MuxSinkPad(ObjectSubclass) @extends gst_base::AggregatorPad, gst::Pad, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + #[cfg(feature = "doc")] + Cea708MuxSinkPad::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); + + gst::Element::register( + Some(plugin), + "cea708mux", + gst::Rank::NONE, + Cea708Mux::static_type(), + ) +} diff --git a/video/closedcaption/src/lib.rs b/video/closedcaption/src/lib.rs index 22622cc4..6ce3ab5e 100644 --- a/video/closedcaption/src/lib.rs +++ b/video/closedcaption/src/lib.rs @@ -30,6 +30,7 @@ mod cea608tocea708; mod cea608tojson; mod cea608tott; mod cea608utils; +mod cea708mux; mod cea708utils; mod jsontovtt; mod line_reader; @@ -60,6 +61,7 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { jsontovtt::register(plugin)?; transcriberbin::register(plugin)?; cea608tocea708::register(plugin)?; + cea708mux::register(plugin)?; tttocea708::register(plugin)?; Ok(()) } diff --git a/video/closedcaption/tests/cea708mux.rs b/video/closedcaption/tests/cea708mux.rs new file mode 100644 index 00000000..43dd9d8c --- /dev/null +++ b/video/closedcaption/tests/cea708mux.rs @@ -0,0 +1,132 @@ +// Copyright (C) 2023 Matthew Waters +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use std::u8; + +use gst::prelude::*; +use pretty_assertions::assert_eq; + +use cea708_types::tables::*; +use cea708_types::*; + +fn init() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + gst::init().unwrap(); + gstrsclosedcaption::plugin_register_static().unwrap(); + }); +} + +fn gen_cc_data(seq: u8, service: u8, codes: &[Code]) -> gst::Buffer { + assert!(seq < 4); + assert!(service < 64); + + let fps = Framerate::new(30, 1); + let mut writer = CCDataWriter::default(); + let mut packet = DTVCCPacket::new(seq); + let mut service = Service::new(service); + for c in codes { + service.push_code(c).unwrap(); + } + packet.push_service(service).unwrap(); + writer.push_packet(packet); + let mut data = vec![]; + writer.write(fps, &mut data).unwrap(); + let data = data.split_off(2); + let mut buf = gst::Buffer::from_mut_slice(data); + { + let buf = buf.get_mut().unwrap(); + buf.set_pts(0.nseconds()); + } + buf +} + +fn cc_data_to_cea708_types(cc_data: &[u8]) -> Vec { + let mut ret = vec![0; 2]; + ret[0] = 0x80 | 0x40 | ((cc_data.len() / 3) & 0x1f) as u8; + ret[1] = 0xFF; + ret.extend(cc_data); + ret +} + +#[test] +fn test_cea708mux_single_buffer_cc_data() { + init(); + + let mut h = gst_check::Harness::with_padnames("cea708mux", Some("sink_0"), Some("src")); + h.set_src_caps_str("closedcaption/x-cea-708,format=cc_data,framerate=60/1"); + + let buf = gen_cc_data(0, 1, &[Code::LatinCapitalA]); + h.push(buf).unwrap(); + + let mut parser = CCDataParser::new(); + let out = h.pull().unwrap(); + let readable = out.map_readable().unwrap(); + let cc_data = cc_data_to_cea708_types(&readable); + parser.push(&cc_data).unwrap(); + let parsed_packet = parser.pop_packet().unwrap(); + assert_eq!(parsed_packet.sequence_no(), 0); + let services = parsed_packet.services(); + assert_eq!(services.len(), 1); + assert_eq!(services[0].number(), 1); + let codes = services[0].codes(); + assert_eq!(codes.len(), 1); + assert_eq!(codes[0], Code::LatinCapitalA); +} + +#[test] +fn test_cea708mux_2pads_cc_data() { + init(); + + let mut h = gst_check::Harness::with_padnames("cea708mux", None, Some("src")); + let mut sink_0 = gst_check::Harness::with_element(&h.element().unwrap(), Some("sink_0"), None); + sink_0.set_src_caps_str("closedcaption/x-cea-708,format=cc_data,framerate=60/1"); + let mut sink_1 = gst_check::Harness::with_element(&h.element().unwrap(), Some("sink_1"), None); + sink_1.set_src_caps_str("closedcaption/x-cea-708,format=cc_data,framerate=60/1"); + + let buf = gen_cc_data(0, 1, &[Code::LatinLowerA]); + sink_0.push(buf).unwrap(); + + let buf = gen_cc_data(0, 2, &[Code::LatinCapitalA]); + sink_1.push(buf).unwrap(); + + let mut parser = CCDataParser::new(); + let out = h.pull().unwrap(); + let readable = out.map_readable().unwrap(); + let mut cc_data = vec![0; 2]; + cc_data[0] = 0x80 | 0x40 | ((readable.len() / 3) & 0x1f) as u8; + cc_data[1] = 0xFF; + cc_data.extend(readable.iter()); + parser.push(&cc_data).unwrap(); + let parsed_packet = parser.pop_packet().unwrap(); + assert_eq!(parsed_packet.sequence_no(), 0); + let services = parsed_packet.services(); + assert_eq!(services.len(), 2); + // TODO: deterministic service ordering? + if services[0].number() == 1 { + let codes = services[0].codes(); + assert_eq!(codes.len(), 1); + assert_eq!(codes[0], Code::LatinLowerA); + assert_eq!(services[1].number(), 2); + let codes = services[1].codes(); + assert_eq!(codes.len(), 1); + assert_eq!(codes[0], Code::LatinCapitalA); + } else if services[0].number() == 2 { + let codes = services[0].codes(); + assert_eq!(codes.len(), 1); + assert_eq!(codes[0], Code::LatinCapitalA); + assert_eq!(services[1].number(), 1); + let codes = services[1].codes(); + assert_eq!(codes.len(), 1); + assert_eq!(codes[0], Code::LatinLowerA); + } else { + unreachable!(); + } +}