tttocea608: refactor to fit more scenarios

- Report a latency:
  By design, tttocea608 will output buffers in the "past" when
  receiving an input buffer: we want the second to last buffer
  in the buffer list that we output to have the same pts as the
  input buffer, as it contains the end_of_caption control code
  which determines when the current closed caption actually gets
  displayed in pop_on mode. The previous buffers have timestamps
  decreasing as a function of the framerate, for up to potentially
  74 byte pairs (the breakdown is detailed in a comment).

  The element thus has to report a latency, at 30 frames per second
  it represents around 2.5 seconds.

- Refactor timestamping:
  Stop using a frame duration, but rather base our timestamps on
  a scaled frame index. This is to avoid rounding errors, and
  allow for exactly one byte pair per buffer if the proper framerate
  is set on the closed caption branch, and the video branch has
  perfect timestamps, eg videorate. In practice, that one byte
  pair per frame requirement should only matter for line 21 encoding,
  but we have to think about this use case too.

- Splice in erase_display_memory:
  When there is a gap between the end of a buffer and the start
  of the next one, we want to erase the display memory (this
  is unnecessary otherwise, as the end_of_caption control code
  will in effect ensure that the display is erased when the
  new caption is displayed). The previous implementation only
  supported this imperfectly, as it could cause timestamps to
  go backwards.

- Output last erase_display_memory:
  The previous implementation was missing the final
  erase_display_memory on EOS

- Output gaps

- Write more tests

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/314>
This commit is contained in:
Mathieu Duponchelle 2020-04-22 00:23:28 +02:00
parent bdfb24abbd
commit de796c95f0
5 changed files with 451 additions and 58 deletions

View file

@ -74,7 +74,7 @@ if not csound_option.disabled()
endif
endif
if csound_dep.found()
if csound_dep.found() and false
plugins_rep += {'gst-plugin-csound' : 'libgstcsound'}
else
exclude += ['gst-plugin-csound']

View file

@ -9,6 +9,7 @@ repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
[dependencies]
glib = { git = "https://github.com/gtk-rs/glib" }
gstreamer-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys" }
combine = "4.0"
either = "1"
uuid = { version = "0.8", features = ["v4"] }

View file

@ -30,6 +30,8 @@ extern crate gst;
#[macro_use]
extern crate lazy_static;
extern crate gstreamer_sys as gst_sys;
#[cfg(test)]
#[macro_use]
extern crate pretty_assertions;

View file

@ -26,6 +26,31 @@ use gst::subclass::prelude::*;
use super::cea608tott_ffi as ffi;
use atomic_refcell::AtomicRefCell;
fn scale_round(val: u64, num: u64, denom: u64) -> u64 {
unsafe { gst_sys::gst_util_uint64_scale_round(val, num, denom) }
}
fn decrement_pts(min_frame_no: u64, frame_no: &mut u64, fps_n: u64, fps_d: u64) -> (u64, u64) {
let old_pts = scale_round(
(*frame_no * gst::SECOND).nseconds().unwrap() as u64,
fps_d,
fps_n,
);
if *frame_no > min_frame_no {
*frame_no -= 1;
}
let new_pts = scale_round(
(*frame_no * gst::SECOND).nseconds().unwrap() as u64,
fps_d,
fps_n,
);
let duration = old_pts - new_pts;
(new_pts, duration)
}
fn is_basicna(cc_data: u16) -> bool {
0x0000 != (0x6000 & cc_data)
}
@ -101,14 +126,6 @@ fn erase_display_memory(
buf_mut.set_duration(duration);
}
bufferlist.insert(0, buffer);
let mut buffer = buffer_from_cc_data(cc_data);
{
let buf_mut = buffer.get_mut().unwrap();
buf_mut.set_pts(pts + duration);
buf_mut.set_duration(duration);
}
bufferlist.insert(1, buffer);
}
fn resume_caption_loading(buffers: &mut Vec<gst::Buffer>) {
@ -137,16 +154,27 @@ fn bna_buffer(buffers: &mut Vec<gst::Buffer>, bna1: u16, bna2: u16) {
const DEFAULT_FPS_N: i32 = 30;
const DEFAULT_FPS_D: i32 = 1;
/* 74 is quite the magic number:
* 2 byte pairs for resume_caption_loading
* 2 byte pairs for erase_non_displayed_memory
* At most 4 byte pairs for the preambles (one per line, at most 2 lines)
* At most 64 byte pairs for the text if it's made up of 64 westeu characters
* At most 2 byte pairs if we need to splice in an erase_display_memory
*/
const LATENCY_BUFFERS: u64 = 74;
struct State {
framerate: gst::Fraction,
last_ts: gst::ClockTime,
erase_display_frame_no: Option<u64>,
last_frame_no: u64,
}
impl Default for State {
fn default() -> Self {
Self {
framerate: gst::Fraction::new(DEFAULT_FPS_N, DEFAULT_FPS_D),
last_ts: gst::CLOCK_TIME_NONE,
erase_display_frame_no: None,
last_frame_no: 0,
}
}
}
@ -168,17 +196,78 @@ lazy_static! {
}
impl TtToCea608 {
fn push_list(
&self,
bufferlist: gst::BufferList,
last_frame_no: u64,
new_frame_no: u64,
) -> Result<gst::FlowSuccess, gst::FlowError> {
if last_frame_no != new_frame_no {
let state = self.state.borrow_mut();
let (fps_n, fps_d) = (
*state.framerate.numer() as u64,
*state.framerate.denom() as u64,
);
let start: gst::ClockTime = scale_round(
(last_frame_no * gst::SECOND).nseconds().unwrap() as u64,
fps_d,
fps_n,
)
.into();
let end: gst::ClockTime = scale_round(
(new_frame_no * gst::SECOND).nseconds().unwrap() as u64,
fps_d,
fps_n,
)
.into();
let event = gst::Event::new_gap(start, end - start).build();
drop(state);
let _ = self.srcpad.push_event(event);
}
self.srcpad.push_list(bufferlist)
}
fn do_erase_display(
&self,
min_frame_no: u64,
mut erase_display_frame_no: u64,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.borrow_mut();
let (fps_n, fps_d) = (
*state.framerate.numer() as u64,
*state.framerate.denom() as u64,
);
let mut bufferlist = gst::BufferList::new();
state.last_frame_no = erase_display_frame_no;
let (pts, duration) =
decrement_pts(min_frame_no, &mut erase_display_frame_no, fps_n, fps_d);
erase_display_memory(bufferlist.get_mut().unwrap(), pts.into(), duration.into());
let (pts, duration) =
decrement_pts(min_frame_no, &mut erase_display_frame_no, fps_n, fps_d);
erase_display_memory(bufferlist.get_mut().unwrap(), pts.into(), duration.into());
drop(state);
self.push_list(bufferlist, min_frame_no, erase_display_frame_no)
}
fn sink_chain(
&self,
pad: &gst::Pad,
element: &gst::Element,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_debug!(CAT, obj: pad, "Handling buffer {:?}", buffer);
let mut row = 13;
let mut col = 0;
let mut pts = match buffer.get_pts() {
let pts = match buffer.get_pts() {
gst::CLOCK_TIME_NONE => {
gst_element_error!(
element,
@ -205,13 +294,6 @@ impl TtToCea608 {
let mut state = self.state.borrow_mut();
let mut buffers = vec![];
let frame_duration = gst::SECOND
.mul_div_floor(
*state.framerate.denom() as u64,
*state.framerate.numer() as u64,
)
.unwrap();
{
resume_caption_loading(&mut buffers);
erase_non_displayed_memory(&mut buffers);
@ -310,50 +392,111 @@ impl TtToCea608 {
let mut bufferlist = gst::BufferList::new();
let erase_display_pts = {
if state.last_ts.is_some() && state.last_ts < pts {
state.last_ts
let (fps_n, fps_d) = (
*state.framerate.numer() as u64,
*state.framerate.denom() as u64,
);
/* Calculate the frame for which we want the first of our
* (doubled) end_of_caption control codes to be output
*/
let mut frame_no =
scale_round(pts.nseconds().unwrap(), fps_n, fps_d) / gst::SECOND.nseconds().unwrap();
let mut erase_display_frame_no = {
if state.erase_display_frame_no < Some(frame_no) {
state.erase_display_frame_no
} else {
gst::CLOCK_TIME_NONE
None
}
};
state.last_ts = pts + duration;
/* Add 2: One for our second end_of_caption control
* code, another to calculate its duration */
frame_no += 2;
// FIXME: the following code may result in overlapping timestamps
// when too many characters need encoding for a given interval
/* Store that frame number, so we can make sure not to output
* overlapped timestamps, outputting multiple buffers with
* a 0 duration will break strict line-21 encoding, but
* we should be fine with 608 over 708, as we can encode
* multiple byte pairs into a single frame */
let mut min_frame_no = state.last_frame_no;
state.last_frame_no = frame_no;
/* Account for doubled end_of_caption control */
pts += frame_duration;
state.erase_display_frame_no = Some(
scale_round((pts + duration).nseconds().unwrap(), fps_n, fps_d)
/ gst::SECOND.nseconds().unwrap()
+ 2,
);
for mut buffer in buffers.drain(..).rev() {
let buf_mut = buffer.get_mut().unwrap();
let prev_pts = pts;
/* Insert display erasure at the correct moment */
if erase_display_frame_no == Some(frame_no) {
let (pts, duration) = decrement_pts(min_frame_no, &mut frame_no, fps_n, fps_d);
erase_display_memory(bufferlist.get_mut().unwrap(), pts.into(), duration.into());
let (pts, duration) = decrement_pts(min_frame_no, &mut frame_no, fps_n, fps_d);
erase_display_memory(bufferlist.get_mut().unwrap(), pts.into(), duration.into());
buf_mut.set_pts(pts);
if pts > frame_duration {
pts -= frame_duration;
} else {
pts = 0.into();
erase_display_frame_no = None;
}
buf_mut.set_duration(prev_pts - pts);
let (pts, duration) = decrement_pts(min_frame_no, &mut frame_no, fps_n, fps_d);
let buf_mut = buffer.get_mut().unwrap();
buf_mut.set_pts(pts.into());
buf_mut.set_duration(duration.into());
bufferlist.get_mut().unwrap().insert(0, buffer);
}
if erase_display_pts.is_some() {
erase_display_memory(
bufferlist.get_mut().unwrap(),
erase_display_pts,
frame_duration,
);
drop(state);
if let Some(erase_display_frame_no) = erase_display_frame_no {
self.do_erase_display(min_frame_no, erase_display_frame_no)?;
min_frame_no = erase_display_frame_no;
}
self.srcpad.push_list(bufferlist).map_err(|err| {
gst_error!(CAT, obj: &self.srcpad, "Pushing buffer returned {:?}", err);
err
})
self.push_list(bufferlist, min_frame_no, frame_no)
.map_err(|err| {
gst_error!(CAT, obj: &self.srcpad, "Pushing buffer returned {:?}", err);
err
})
}
fn src_query(&self, pad: &gst::Pad, element: &gst::Element, query: &mut gst::QueryRef) -> bool {
use gst::QueryView;
gst_log!(CAT, obj: pad, "Handling query {:?}", query);
match query.view_mut() {
QueryView::Latency(ref mut q) => {
let mut peer_query = gst::query::Query::new_latency();
let ret = self.sinkpad.peer_query(&mut peer_query);
if ret {
let state = self.state.borrow();
let (live, mut min, mut max) = peer_query.get_result();
let (fps_n, fps_d) = (
*state.framerate.numer() as u64,
*state.framerate.denom() as u64,
);
let our_latency: gst::ClockTime = scale_round(
(LATENCY_BUFFERS * gst::SECOND).nseconds().unwrap(),
fps_d,
fps_n,
)
.into();
min += our_latency;
max += our_latency;
q.set(live, min, max);
}
ret
}
_ => pad.query_default(Some(element), query),
}
}
fn sink_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
@ -389,8 +532,58 @@ impl TtToCea608 {
let new_event = gst::Event::new_caps(&downstream_caps).build();
drop(state);
return self.srcpad.push_event(new_event);
}
EventView::Gap(e) => {
let mut state = self.state.borrow_mut();
let (fps_n, fps_d) = (
*state.framerate.numer() as u64,
*state.framerate.denom() as u64,
);
let (timestamp, duration) = e.get();
let mut frame_no =
scale_round((timestamp + duration).nseconds().unwrap(), fps_n, fps_d)
/ gst::SECOND.nseconds().unwrap();
if frame_no < LATENCY_BUFFERS {
return true;
}
frame_no -= LATENCY_BUFFERS;
if let Some(erase_display_frame_no) = state.erase_display_frame_no {
if erase_display_frame_no <= frame_no {
let min_frame_no = state.last_frame_no;
state.erase_display_frame_no = None;
drop(state);
/* Ignore return value, we may be flushing here and can't
* communicate that through a boolean
*/
let _ = self.do_erase_display(min_frame_no, erase_display_frame_no);
}
}
return true;
}
EventView::Eos(_) => {
let mut state = self.state.borrow_mut();
if let Some(erase_display_frame_no) = state.erase_display_frame_no {
let min_frame_no = state.last_frame_no;
state.erase_display_frame_no = None;
drop(state);
/* Ignore return value, we may be flushing here and can't
* communicate that through a boolean
*/
let _ = self.do_erase_display(min_frame_no, erase_display_frame_no);
}
}
_ => (),
}
@ -426,6 +619,13 @@ impl ObjectSubclass for TtToCea608 {
|this, element| this.sink_event(pad, element, event),
)
});
srcpad.set_query_function(|pad, parent, query| {
TtToCea608::catch_panic_pad_function(
parent,
|| false,
|this, element| this.src_query(pad, element, query),
)
});
sinkpad.use_fixed_caps();
srcpad.use_fixed_caps();

View file

@ -17,6 +17,7 @@
#[macro_use]
extern crate pretty_assertions;
use gst::EventView;
fn init() {
use std::sync::Once;
@ -69,17 +70,17 @@ fn test_one_timed_buffer_and_eos() {
assert_eq!(h.push(inbuf), Ok(gst::FlowSuccess::Ok));
let expected: [(gst::ClockTime, gst::ClockTime, [u8; 2usize]); 11] = [
(700_000_003.into(), 33_333_333.into(), [0x94, 0x20]), /* resume_caption_loading */
(733_333_336.into(), 33_333_333.into(), [0x94, 0x20]), /* control doubled */
(766_666_669.into(), 33_333_333.into(), [0x94, 0xae]), /* erase_non_displayed_memory */
(800_000_002.into(), 33_333_333.into(), [0x94, 0xae]), /* control doubled */
(833_333_335.into(), 33_333_333.into(), [0x94, 0x40]), /* preamble */
(866_666_668.into(), 33_333_333.into(), [0x94, 0x40]), /* control doubled */
(900_000_001.into(), 33_333_333.into(), [0xc8, 0xe5]), /* H e */
(933_333_334.into(), 33_333_333.into(), [0xec, 0xec]), /* l l */
(700_000_000.into(), 33_333_333.into(), [0x94, 0x20]), /* resume_caption_loading */
(733_333_333.into(), 33_333_334.into(), [0x94, 0x20]), /* control doubled */
(766_666_667.into(), 33_333_333.into(), [0x94, 0xae]), /* erase_non_displayed_memory */
(800_000_000.into(), 33_333_333.into(), [0x94, 0xae]), /* control doubled */
(833_333_333.into(), 33_333_334.into(), [0x94, 0x40]), /* preamble */
(866_666_667.into(), 33_333_333.into(), [0x94, 0x40]), /* control doubled */
(900_000_000.into(), 33_333_333.into(), [0xc8, 0xe5]), /* H e */
(933_333_333.into(), 33_333_334.into(), [0xec, 0xec]), /* l l */
(966_666_667.into(), 33_333_333.into(), [0xef, 0x80]), /* o, nil */
(gst::SECOND, 33_333_333.into(), [0x94, 0x2f]), /* end_of_caption */
(1_033_333_333.into(), 33_333_333.into(), [0x94, 0x2f]), /* control doubled */
(1_033_333_333.into(), 33_333_334.into(), [0x94, 0x2f]), /* control doubled */
];
for (i, e) in expected.iter().enumerate() {
@ -102,9 +103,198 @@ fn test_one_timed_buffer_and_eos() {
assert_eq!(e.2, &*data);
}
assert_eq!(h.buffers_in_queue(), 0);
h.push_event(gst::Event::new_eos().build());
assert_eq!(h.events_in_queue(), 1);
/* Check that we do receive an erase_display */
assert_eq!(h.buffers_in_queue(), 2);
while h.buffers_in_queue() > 0 {
let outbuf = h.try_pull().unwrap();
let data = outbuf.map_readable().unwrap();
assert_eq!(&*data, &[0x94, 0x2c]);
}
assert_eq!(h.events_in_queue() >= 1, true);
/* Gap event, we ignore those here and test them separately */
while h.events_in_queue() > 1 {
let _event = h.pull_event().unwrap();
}
let event = h.pull_event().unwrap();
assert_eq!(event.get_type(), gst::EventType::Eos);
}
/* Here we test that the erase_display_memory control code
* gets inserted at the correct moment, when there's enough
* of an interval between two buffers
*/
#[test]
fn test_erase_display_memory_non_spliced() {
init();
let mut h = gst_check::Harness::new("tttocea608");
h.set_src_caps_str("text/x-raw");
while h.events_in_queue() != 0 {
let _event = h.pull_event().unwrap();
}
let inbuf = new_timed_buffer(&"Hello", 1_000_000_000.into(), gst::SECOND);
assert_eq!(h.push(inbuf), Ok(gst::FlowSuccess::Ok));
let inbuf = new_timed_buffer(&"World", 3_000_000_000.into(), gst::SECOND);
assert_eq!(h.push(inbuf), Ok(gst::FlowSuccess::Ok));
let mut erase_display_buffers = 0;
while h.buffers_in_queue() > 0 {
let outbuf = h.pull().unwrap();
if outbuf.get_pts() == 2_000_000_000.into() || outbuf.get_pts() == 2_033_333_333.into() {
let data = outbuf.map_readable().unwrap();
assert_eq!(&*data, &[0x94, 0x2c]);
erase_display_buffers += 1;
}
}
assert_eq!(erase_display_buffers, 2);
}
/* Here we test that the erase_display_memory control code
* gets spliced in with the byte pairs of the following buffer
* when there's not enough of an interval between them.
*/
#[test]
fn test_erase_display_memory_spliced() {
init();
let mut h = gst_check::Harness::new("tttocea608");
h.set_src_caps_str("text/x-raw");
while h.events_in_queue() != 0 {
let _event = h.pull_event().unwrap();
}
let inbuf = new_timed_buffer(&"Hello", 1_000_000_000.into(), gst::SECOND);
assert_eq!(h.push(inbuf), Ok(gst::FlowSuccess::Ok));
let inbuf = new_timed_buffer(&"World", 2_200_000_000.into(), gst::SECOND);
assert_eq!(h.push(inbuf), Ok(gst::FlowSuccess::Ok));
let mut erase_display_buffers = 0;
let mut prev_pts: gst::ClockTime = 0.into();
while h.buffers_in_queue() > 0 {
let outbuf = h.pull().unwrap();
/* Check that our timestamps are strictly ascending */
assert!(outbuf.get_pts() > prev_pts);
if outbuf.get_pts() == 2_000_000_000.into() || outbuf.get_pts() == 2_033_333_333.into() {
let data = outbuf.map_readable().unwrap();
assert_eq!(&*data, &[0x94, 0x2c]);
erase_display_buffers += 1;
}
prev_pts = outbuf.get_pts();
}
assert_eq!(erase_display_buffers, 2);
}
/* Here we test that the erase_display_memory control code
* gets output "in time" when we receive gaps
*/
#[test]
fn test_erase_display_memory_gaps() {
init();
let mut h = gst_check::Harness::new("tttocea608");
h.set_src_caps_str("text/x-raw");
while h.events_in_queue() != 0 {
let _event = h.pull_event().unwrap();
}
let inbuf = new_timed_buffer(&"Hello", 1_000_000_000.into(), gst::SECOND);
assert_eq!(h.push(inbuf), Ok(gst::FlowSuccess::Ok));
/* Let's first push a gap that doesn't leave room for our two control codes */
let gap_event = gst::Event::new_gap(2 * gst::SECOND, 2_533_333_333.into()).build();
assert_eq!(h.push_event(gap_event), true);
let mut erase_display_buffers = 0;
while h.buffers_in_queue() > 0 {
let outbuf = h.pull().unwrap();
let data = outbuf.map_readable().unwrap();
if *data == [0x94, 0x2c] {
erase_display_buffers += 1;
}
}
assert_eq!(erase_display_buffers, 0);
let gap_event = gst::Event::new_gap(4_533_333_333.into(), 1.into()).build();
assert_eq!(h.push_event(gap_event), true);
while h.buffers_in_queue() > 0 {
let outbuf = h.pull().unwrap();
let data = outbuf.map_readable().unwrap();
if *data == [0x94, 0x2c] {
erase_display_buffers += 1;
}
}
assert_eq!(erase_display_buffers, 2);
}
/* Here we verify that the element outputs a continuous stream
* with gap events
*/
#[test]
fn test_output_gaps() {
init();
let mut h = gst_check::Harness::new("tttocea608");
h.set_src_caps_str("text/x-raw");
while h.events_in_queue() != 0 {
let _event = h.pull_event().unwrap();
}
let inbuf = new_timed_buffer(&"Hello", 1_000_000_000.into(), gst::SECOND);
assert_eq!(h.push(inbuf), Ok(gst::FlowSuccess::Ok));
let inbuf = new_timed_buffer(&"World", 3_000_000_000.into(), gst::SECOND);
assert_eq!(h.push(inbuf), Ok(gst::FlowSuccess::Ok));
assert_eq!(h.events_in_queue(), 3);
/* One gap from the start of the segment to the first
* buffer, another from the end_of_caption control code for
* the first buffer to its erase_display control code,
* then one gap from erase_display to the beginning
* of the second buffer
*/
let expected: [(gst::ClockTime, gst::ClockTime); 3] = [
(0.into(), 700_000_000.into()),
(1_066_666_667.into(), 933_333_333.into()),
(2_066_666_667.into(), 633_333_333.into()),
];
for e in &expected {
let event = h.pull_event().unwrap();
assert_eq!(event.get_type(), gst::EventType::Gap);
if let EventView::Gap(ev) = event.view() {
let (timestamp, duration) = ev.get();
assert_eq!(e.0, timestamp);
assert_eq!(e.1, duration);
}
}
}