mccparse: Refactor huge line-handling function into smaller separate functions

* The Debug category was moved to a lazy_static!.
This allowed for a couple of methods, to be implemented
directly against the State struct since the debug category
was their only dependency from MccParse.

* Log the Caps/Format change
This commit is contained in:
Jordan Petridis 2019-01-21 21:45:00 +02:00 committed by Jordan Petridis
parent ccc3652a1a
commit 45ebb4c629
No known key found for this signature in database
GPG key ID: 902CC06D159744F5
3 changed files with 230 additions and 184 deletions

View file

@ -11,6 +11,7 @@ combine = "3.6"
either = "1"
uuid = { version = "0.7", features = ["v4"] }
chrono = "0.4"
lazy_static = "1.2"
[dependencies.gst]
git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs"

View file

@ -26,6 +26,8 @@
extern crate glib;
#[macro_use]
extern crate gst;
#[macro_use]
extern crate lazy_static;
#[cfg(test)]
#[macro_use]

View file

@ -22,12 +22,22 @@ use glib::subclass::prelude::*;
use gst;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst_video;
use gst_video::{self, ValidVideoTimeCode};
use std::sync::Mutex;
use std::sync::{Mutex, MutexGuard};
use crate::line_reader::LineReader;
use crate::mcc_parser::{MccLine, MccParser};
use crate::mcc_parser::{MccLine, MccParser, TimeCode};
lazy_static! {
static ref CAT: gst::DebugCategory = {
gst::DebugCategory::new(
"mccparse",
gst::DebugColorFlags::empty(),
"Mcc Parser Element",
)
};
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum Format {
@ -87,10 +97,155 @@ impl State {
.map(Option::Some)
.map_err(|err| (line, err))
}
fn handle_timecode(
&mut self,
element: &gst::Element,
tc: TimeCode,
framerate: gst::Fraction,
drop_frame: bool,
) -> Result<ValidVideoTimeCode, gst::FlowError> {
let timecode = gst_video::VideoTimeCode::new(
framerate,
None,
if drop_frame {
gst_video::VideoTimeCodeFlags::DROP_FRAME
} else {
gst_video::VideoTimeCodeFlags::empty()
},
tc.hours,
tc.minutes,
tc.seconds,
tc.frames,
0,
);
match timecode.try_into() {
Ok(timecode) => Ok(timecode),
Err(timecode) => {
let last_timecode =
self.last_timecode
.as_ref()
.map(Clone::clone)
.ok_or_else(|| {
gst_element_error!(
element,
gst::StreamError::Decode,
["Invalid first timecode {:?}", timecode]
);
gst::FlowError::Error
})?;
gst_warning!(
CAT,
obj: element,
"Invalid timecode {:?}, using previous {:?}",
timecode,
last_timecode
);
Ok(last_timecode)
}
}
}
/// Calculate a timestamp from the timecode and make sure to
/// not produce timestamps jumping backwards
fn update_timestamp(
&mut self,
element: &gst::Element,
timecode: &gst_video::ValidVideoTimeCode,
) {
let nsecs = gst::ClockTime::from(timecode.nsec_since_daily_jam());
if self.start_position.is_none() {
self.start_position = nsecs;
}
let nsecs = if nsecs < self.start_position {
gst_fixme!(
CAT,
obj: element,
"New position {} < start position {}",
nsecs,
self.start_position
);
self.start_position
} else {
nsecs - self.start_position
};
if nsecs >= self.last_position {
self.last_position = nsecs;
} else {
gst_fixme!(
CAT,
obj: element,
"New position {} < last position {}",
nsecs,
self.last_position
);
}
}
fn add_buffer_metadata(
&mut self,
element: &gst::Element,
buffer: &mut gst::buffer::Buffer,
timecode: &gst_video::ValidVideoTimeCode,
framerate: &gst::Fraction,
) {
let buffer = buffer.get_mut().unwrap();
gst_video::VideoTimeCodeMeta::add(buffer, &timecode);
self.update_timestamp(element, &timecode);
buffer.set_pts(self.last_position);
buffer.set_duration(
gst::SECOND
.mul_div_ceil(*framerate.denom() as u64, *framerate.numer() as u64)
.unwrap_or(gst::CLOCK_TIME_NONE),
);
}
fn create_events(
&mut self,
element: &gst::Element,
format: Format,
framerate: &gst::Fraction,
) -> Vec<gst::Event> {
let mut events = Vec::new();
if self.format != Some(format) {
self.format = Some(format);
let caps = match format {
Format::Cea708Cdp => gst::Caps::builder("closedcaption/x-cea-708")
.field("format", &"cdp")
.field("framerate", framerate)
.build(),
Format::Cea608 => gst::Caps::builder("closedcaption/x-cea-608")
.field("format", &"s334-1a")
.field("framerate", framerate)
.build(),
};
events.push(gst::Event::new_caps(&caps).build());
gst_info!(CAT, obj: element, "Caps changed to {:?}", &caps);
}
if self.need_segment {
let segment = gst::FormattedSegment::<gst::format::Time>::new();
events.push(gst::Event::new_segment(&segment).build());
self.need_segment = false;
}
events.extend(self.pending_events.drain(..));
events
}
}
struct MccParse {
cat: gst::DebugCategory,
srcpad: gst::Pad,
sinkpad: gst::Pad,
state: Mutex<State>,
@ -178,7 +333,7 @@ impl MccParse {
match line {
Ok(Some(MccLine::Caption(tc, data))) => {
gst_trace!(
self.cat,
CAT,
obj: element,
"Got caption buffer with timecode {:?} and size {}",
tc,
@ -187,7 +342,7 @@ impl MccParse {
if data.len() < 3 {
gst_debug!(
self.cat,
CAT,
obj: element,
"Too small caption packet: {}",
data.len(),
@ -199,13 +354,7 @@ impl MccParse {
(0x61, 0x01) => Format::Cea708Cdp,
(0x61, 0x02) => Format::Cea608,
(did, sdid) => {
gst_debug!(
self.cat,
obj: element,
"Unknown DID {:x} SDID {:x}",
did,
sdid
);
gst_debug!(CAT, obj: element, "Unknown DID {:x} SDID {:x}", did, sdid);
continue;
}
};
@ -213,7 +362,7 @@ impl MccParse {
let len = data[2];
if data.len() < 3 + len as usize {
gst_debug!(
self.cat,
CAT,
obj: element,
"Too small caption packet: {} < {}",
data.len(),
@ -222,158 +371,11 @@ impl MccParse {
continue;
}
let (framerate, drop_frame) = match state.timecode_rate {
Some((rate, false)) => (gst::Fraction::new(rate as i32, 1), false),
Some((rate, true)) => (gst::Fraction::new(rate as i32 * 1000, 1001), true),
None => {
gst_element_error!(
element,
gst::StreamError::Decode,
["Got caption before time code rate"]
);
break Err(gst::FlowError::Error);
}
};
let mut events = Vec::new();
if state.format != Some(format) {
state.format = Some(format);
let caps = match format {
Format::Cea708Cdp => gst::Caps::builder("closedcaption/x-cea-708")
.field("format", &"cdp")
.field("framerate", &framerate)
.build(),
Format::Cea608 => gst::Caps::builder("closedcaption/x-cea-608")
.field("format", &"s334-1a")
.field("framerate", &framerate)
.build(),
};
events.push(gst::Event::new_caps(&caps).build());
}
if state.need_segment {
let segment = gst::FormattedSegment::<gst::format::Time>::new();
events.push(gst::Event::new_segment(&segment).build());
state.need_segment = false;
}
events.extend(state.pending_events.drain(..));
let timecode = gst_video::VideoTimeCode::new(
framerate,
None,
if drop_frame {
gst_video::VideoTimeCodeFlags::DROP_FRAME
} else {
gst_video::VideoTimeCodeFlags::empty()
},
tc.hours,
tc.minutes,
tc.seconds,
tc.frames,
0,
);
let timecode = match timecode.try_into() {
Ok(timecode) => timecode,
Err(timecode) => {
let last_timecode =
state.last_timecode.as_ref().map(Clone::clone).ok_or_else(
|| {
gst_element_error!(
element,
gst::StreamError::Decode,
["Invalid first timecode {:?}", timecode]
);
gst::FlowError::Error
},
)?;
gst_warning!(
self.cat,
obj: element,
"Invalid timecode {:?}, using previous {:?}",
timecode,
last_timecode
);
last_timecode
}
};
let mut buffer = gst::Buffer::from_mut_slice(OffsetVec {
vec: data,
offset: 3,
len: len as usize,
});
{
let buffer = buffer.get_mut().unwrap();
gst_video::VideoTimeCodeMeta::add(buffer, &timecode);
// Calculate a timestamp from the timecode and make sure to
// not produce timestamps jumping backwards
let nsecs = gst::ClockTime::from(timecode.nsec_since_daily_jam());
if state.start_position.is_none() {
state.start_position = nsecs;
}
let nsecs = if nsecs < state.start_position {
gst_fixme!(
self.cat,
obj: element,
"New position {} < start position {}",
nsecs,
state.start_position
);
state.start_position
} else {
nsecs - state.start_position
};
if nsecs >= state.last_position {
state.last_position = nsecs;
} else {
gst_fixme!(
self.cat,
obj: element,
"New position {} < last position {}",
nsecs,
state.last_position
);
}
buffer.set_pts(state.last_position);
buffer.set_duration(
gst::SECOND
.mul_div_ceil(*framerate.denom() as u64, *framerate.numer() as u64)
.unwrap_or(gst::CLOCK_TIME_NONE),
);
}
// Drop our state mutex while we push out buffers or events
drop(state);
for event in events {
gst_debug!(self.cat, obj: element, "Pushing event {:?}", event);
self.srcpad.push_event(event);
}
self.srcpad.push(buffer).map_err(|err| {
gst_error!(self.cat, obj: element, "Pushing buffer returned {:?}", err);
err
})?;
state = self.state.lock().unwrap();
state = self.handle_line(element, tc, data, format, state)?;
}
Ok(Some(MccLine::TimeCodeRate(rate, df))) => {
gst_debug!(
self.cat,
CAT,
obj: element,
"Got timecode rate {} (drop frame {})",
rate,
@ -382,7 +384,7 @@ impl MccParse {
state.timecode_rate = Some((rate, df));
}
Ok(Some(line)) => {
gst_debug!(self.cat, obj: element, "Got line '{:?}'", line);
gst_debug!(CAT, obj: element, "Got line '{:?}'", line);
}
Err((line, err)) => {
gst_element_error!(
@ -398,13 +400,63 @@ impl MccParse {
}
}
fn handle_line(
&self,
element: &gst::Element,
tc: TimeCode,
data: Vec<u8>,
format: Format,
mut state: MutexGuard<State>,
) -> Result<MutexGuard<State>, gst::FlowError> {
let (framerate, drop_frame) = match state.timecode_rate {
Some((rate, false)) => (gst::Fraction::new(rate as i32, 1), false),
Some((rate, true)) => (gst::Fraction::new(rate as i32 * 1000, 1001), true),
None => {
gst_element_error!(
element,
gst::StreamError::Decode,
["Got caption before time code rate"]
);
return Err(gst::FlowError::Error);
}
};
let events = state.create_events(element, format, &framerate);
let timecode = state.handle_timecode(element, tc, framerate, drop_frame)?;
let len = data[2] as usize;
let mut buffer = gst::Buffer::from_mut_slice(OffsetVec {
vec: data,
offset: 3,
len,
});
state.add_buffer_metadata(element, &mut buffer, &timecode, &framerate);
// Drop our state mutex while we push out buffers or events
drop(state);
for event in events {
gst_debug!(CAT, obj: element, "Pushing event {:?}", event);
self.srcpad.push_event(event);
}
self.srcpad.push(buffer).map_err(|err| {
gst_error!(CAT, obj: element, "Pushing buffer returned {:?}", err);
err
})?;
Ok(self.state.lock().unwrap())
}
fn sink_chain(
&self,
pad: &gst::Pad,
element: &gst::Element,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_log!(self.cat, obj: pad, "Handling buffer {:?}", buffer);
gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer);
self.handle_buffer(element, Some(buffer))
}
@ -412,17 +464,17 @@ impl MccParse {
fn sink_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
gst_log!(CAT, obj: pad, "Handling event {:?}", event);
match event.view() {
EventView::Caps(_) => {
// We send a proper caps event from the chain function later
gst_log!(self.cat, obj: pad, "Dropping caps event");
gst_log!(CAT, obj: pad, "Dropping caps event");
true
}
EventView::Segment(_) => {
// We send a gst::Format::Time segment event later when needed
gst_log!(self.cat, obj: pad, "Dropping segment event");
gst_log!(CAT, obj: pad, "Dropping segment event");
true
}
EventView::FlushStop(_) => {
@ -435,9 +487,9 @@ impl MccParse {
pad.event_default(element, event)
}
EventView::Eos(_) => {
gst_log!(self.cat, obj: pad, "Draining");
gst_log!(CAT, obj: pad, "Draining");
if let Err(err) = self.handle_buffer(element, None) {
gst_error!(self.cat, obj: pad, "Failed to drain parser: {:?}", err);
gst_error!(CAT, obj: pad, "Failed to drain parser: {:?}", err);
}
pad.event_default(element, event)
}
@ -446,11 +498,7 @@ impl MccParse {
&& !self.srcpad.has_current_caps()
&& event.get_type() > gst::EventType::Caps
{
gst_log!(
self.cat,
obj: pad,
"Deferring sticky event until we have caps"
);
gst_log!(CAT, obj: pad, "Deferring sticky event until we have caps");
let mut state = self.state.lock().unwrap();
state.pending_events.push(event);
true
@ -464,10 +512,10 @@ impl MccParse {
fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
gst_log!(CAT, obj: pad, "Handling event {:?}", event);
match event.view() {
EventView::Seek(_) => {
gst_log!(self.cat, obj: pad, "Dropping seek event");
gst_log!(CAT, obj: pad, "Dropping seek event");
false
}
_ => pad.event_default(element, event),
@ -477,7 +525,7 @@ impl MccParse {
fn src_query(&self, pad: &gst::Pad, element: &gst::Element, query: &mut gst::QueryRef) -> bool {
use gst::QueryView;
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
gst_log!(CAT, obj: pad, "Handling query {:?}", query);
match query.view_mut() {
QueryView::Seeking(mut q) => {
@ -522,11 +570,6 @@ impl ObjectSubclass for MccParse {
MccParse::set_pad_functions(&sinkpad, &srcpad);
Self {
cat: gst::DebugCategory::new(
"mccparse",
gst::DebugColorFlags::empty(),
"Mcc Parser Element",
),
srcpad,
sinkpad,
state: Mutex::new(State::default()),
@ -594,7 +637,7 @@ impl ElementImpl for MccParse {
element: &gst::Element,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_trace!(self.cat, obj: element, "Changing state {:?}", transition);
gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
match transition {
gst::StateChange::ReadyToPaused | gst::StateChange::PausedToReady => {