net: migrate to new ClockTime design

This commit is contained in:
François Laignel 2021-05-26 15:22:01 +02:00
parent 8f81cb8812
commit 2c4c35deba
3 changed files with 123 additions and 88 deletions

View file

@ -1116,18 +1116,18 @@ impl BaseSrcImpl for ReqwestHttpSrc {
}
};
let start = segment.start().expect("No start position given");
let stop = segment.stop();
let start = *segment.start().expect("No start position given");
let stop = segment.stop().map(|stop| *stop);
gst_debug!(CAT, obj: src, "Seeking to {}-{:?}", start, stop);
if position == start && old_stop == stop.0 {
if position == start && old_stop == stop {
gst_debug!(CAT, obj: src, "No change to current request");
return true;
}
*state = State::Stopped;
match self.do_request(src, uri, start, stop.0) {
match self.do_request(src, uri, start, stop) {
Ok(s) => {
*state = s;
true

View file

@ -372,7 +372,7 @@ fn test_basic_request() {
if cursor.position() == 0 {
assert_eq!(
h.src.query_duration::<gst::format::Bytes>(),
Some(gst::format::Bytes::from(expected_output.len() as u64))
Some(gst::format::Bytes(expected_output.len() as u64))
);
}
@ -430,7 +430,7 @@ fn test_basic_request_inverted_defaults() {
if cursor.position() == 0 {
assert_eq!(
h.src.query_duration::<gst::format::Bytes>(),
Some(gst::format::Bytes::from(expected_output.len() as u64))
Some(gst::format::Bytes(expected_output.len() as u64))
);
}
@ -508,7 +508,7 @@ fn test_extra_headers() {
if cursor.position() == 0 {
assert_eq!(
h.src.query_duration::<gst::format::Bytes>(),
Some(gst::format::Bytes::from(expected_output.len() as u64))
Some(gst::format::Bytes(expected_output.len() as u64))
);
}
@ -568,7 +568,7 @@ fn test_cookies_property() {
if cursor.position() == 0 {
assert_eq!(
h.src.query_duration::<gst::format::Bytes>(),
Some(gst::format::Bytes::from(expected_output.len() as u64))
Some(gst::format::Bytes(expected_output.len() as u64))
);
}
@ -628,7 +628,7 @@ fn test_iradio_mode() {
if cursor.position() == 0 {
assert_eq!(
h.src.query_duration::<gst::format::Bytes>(),
Some(gst::format::Bytes::from(expected_output.len() as u64))
Some(gst::format::Bytes(expected_output.len() as u64))
);
}
@ -706,7 +706,7 @@ fn test_audio_l16() {
if cursor.position() == 0 {
assert_eq!(
h.src.query_duration::<gst::format::Bytes>(),
Some(gst::format::Bytes::from(expected_output.len() as u64))
Some(gst::format::Bytes(expected_output.len() as u64))
);
}
@ -780,7 +780,7 @@ fn test_authorization() {
if cursor.position() == 0 {
assert_eq!(
h.src.query_duration::<gst::format::Bytes>(),
Some(gst::format::Bytes::from(expected_output.len() as u64))
Some(gst::format::Bytes(expected_output.len() as u64))
);
}
@ -930,13 +930,13 @@ fn test_seek_after_ready() {
assert_eq!(current_state, gst::State::Ready);
h.run(|src| {
src.seek_simple(gst::SeekFlags::FLUSH, gst::format::Bytes::from(123))
src.seek_simple(gst::SeekFlags::FLUSH, gst::format::Bytes(123))
.unwrap();
src.set_state(gst::State::Playing).unwrap();
});
let segment = h.wait_for_segment(false);
assert_eq!(segment.start(), gst::format::Bytes::from(123));
assert_eq!(segment.start(), Some(gst::format::Bytes(123)));
let mut expected_output = vec![0; 8192 - 123];
for (i, d) in expected_output.iter_mut().enumerate() {
@ -1009,12 +1009,12 @@ fn test_seek_after_buffer_received() {
//seek to a position after a buffer is Received
h.run(|src| {
src.seek_simple(gst::SeekFlags::FLUSH, gst::format::Bytes::from(123))
src.seek_simple(gst::SeekFlags::FLUSH, gst::format::Bytes(123))
.unwrap();
});
let segment = h.wait_for_segment(true);
assert_eq!(segment.start(), gst::format::Bytes::from(123));
assert_eq!(segment.start(), Some(gst::format::Bytes(123)));
let mut expected_output = vec![0; 8192 - 123];
for (i, d) in expected_output.iter_mut().enumerate() {
@ -1091,16 +1091,16 @@ fn test_seek_with_stop_position() {
1.0,
gst::SeekFlags::FLUSH,
gst::SeekType::Set,
gst::format::Bytes::from(123),
gst::format::Bytes(123),
gst::SeekType::Set,
gst::format::Bytes::from(131),
gst::format::Bytes(131),
)
.unwrap();
});
let segment = h.wait_for_segment(true);
assert_eq!(segment.start(), gst::format::Bytes::from(123));
assert_eq!(segment.stop(), gst::format::Bytes::from(131));
assert_eq!(segment.start(), Some(gst::format::Bytes(123)));
assert_eq!(segment.stop(), Some(gst::format::Bytes(131)));
let mut expected_output = vec![0; 8];
for (i, d) in expected_output.iter_mut().enumerate() {

View file

@ -39,7 +39,6 @@ use tokio::runtime;
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Mutex;
use std::time::Duration;
use atomic_refcell::AtomicRefCell;
@ -110,13 +109,13 @@ static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
.unwrap()
});
const DEFAULT_LATENCY_MS: u32 = 8000;
const DEFAULT_LATENCY: gst::ClockTime = gst::ClockTime::from_seconds(8);
const DEFAULT_USE_PARTIAL_RESULTS: bool = true;
const GRANULARITY_MS: u32 = 100;
const GRANULARITY: gst::ClockTime = gst::ClockTime::from_mseconds(100);
#[derive(Debug, Clone)]
struct Settings {
latency_ms: u32,
latency: gst::ClockTime,
language_code: Option<String>,
use_partial_results: bool,
vocabulary: Option<String>,
@ -125,7 +124,7 @@ struct Settings {
impl Default for Settings {
fn default() -> Self {
Self {
latency_ms: DEFAULT_LATENCY_MS,
latency: DEFAULT_LATENCY,
language_code: Some("en-US".to_string()),
use_partial_results: DEFAULT_USE_PARTIAL_RESULTS,
vocabulary: None,
@ -144,7 +143,7 @@ struct State {
buffers: VecDeque<gst::Buffer>,
send_eos: bool,
discont: bool,
last_partial_end_time: gst::ClockTime,
last_partial_end_time: Option<gst::ClockTime>,
partial_alternative: Option<TranscriptAlternative>,
}
@ -161,7 +160,7 @@ impl Default for State {
buffers: VecDeque::new(),
send_eos: false,
discont: true,
last_partial_end_time: gst::CLOCK_TIME_NONE,
last_partial_end_time: None,
partial_alternative: None,
}
}
@ -207,10 +206,8 @@ impl Transcriber {
let (latency, now, mut last_position, send_eos, seqnum) = {
let mut state = self.state.lock().unwrap();
// Multiply GRANULARITY by 2 in order to not send buffers that
// are less than GRANULARITY milliseconds away too late
let latency: gst::ClockTime = (self.settings.lock().unwrap().latency_ms as u64
- (2 * GRANULARITY_MS) as u64)
* gst::MSECOND;
// are less than GRANULARITY away too late
let latency = self.settings.lock().unwrap().latency - 2 * GRANULARITY;
let now = element.current_running_time();
if let Some(alternative) = state.partial_alternative.take() {
@ -220,7 +217,10 @@ impl Transcriber {
let send_eos = state.send_eos && state.buffers.is_empty();
while let Some(buf) = state.buffers.front() {
if now - buf.pts() > latency {
if now
.zip(buf.pts())
.map_or(false, |(now, pts)| now - pts > latency)
{
/* Safe unwrap, we know we have an item */
let buf = state.buffers.pop_front().unwrap();
items.push(buf);
@ -248,16 +248,31 @@ impl Transcriber {
}
for mut buf in items.drain(..) {
if buf.pts() > last_position {
let gap_event = gst::event::Gap::builder(last_position, buf.pts() - last_position)
let delta = buf
.pts()
.zip(last_position)
.map(|(pts, last_pos)| pts.checked_sub(last_pos));
if let Some(delta) = delta {
let last_pos = last_position.expect("defined since delta could be computed");
let gap_event = gst::event::Gap::builder(last_pos)
.duration(delta)
.seqnum(seqnum)
.build();
gst_debug!(CAT, "Pushing gap: {} -> {}", last_position, buf.pts());
gst_debug!(
CAT,
"Pushing gap: {} -> {}",
last_pos,
buf.pts().display()
);
if !self.srcpad.push_event(gap_event) {
return false;
}
}
last_position = buf.pts() + buf.duration();
last_position = buf
.pts()
.zip(buf.duration())
.map(|(pts, duration)| pts + duration);
{
let buf = buf.get_mut().unwrap();
buf.set_pts(buf.pts());
@ -265,8 +280,11 @@ impl Transcriber {
gst_debug!(
CAT,
"Pushing buffer: {} -> {}",
buf.pts(),
buf.pts() + buf.duration()
buf.pts().display(),
buf.pts()
.zip(buf.duration())
.map(|(pts, duration)| pts + duration)
.display(),
);
if self.srcpad.push(buf).is_err() {
return false;
@ -275,19 +293,19 @@ impl Transcriber {
/* next, push a gap if we're lagging behind the target position */
if now - last_position > latency {
let duration = now - last_position - latency;
let gap_event = gst::event::Gap::builder(last_position, duration)
let duration = now
.zip(last_position)
.and_then(|(now, last_position)| now.checked_sub(last_position))
.and_then(|delta| delta.checked_sub(latency));
if let Some(duration) = duration {
let last_pos = last_position.expect("defined since duration could be computed");
let gap_event = gst::event::Gap::builder(last_pos)
.duration(duration)
.seqnum(seqnum)
.build();
gst_debug!(
CAT,
"Pushing gap: {} -> {}",
last_position,
last_position + duration
);
last_position += duration;
let next_position = last_pos + duration;
gst_debug!(CAT, "Pushing gap: {} -> {}", last_pos, next_position,);
last_position = Some(next_position);
if !self.srcpad.push_event(gap_event) {
return false;
}
@ -309,22 +327,27 @@ impl Transcriber {
alternative: &TranscriptAlternative,
partial: bool,
latency: gst::ClockTime,
now: gst::ClockTime,
now: impl Into<Option<gst::ClockTime>> + Copy,
) {
for item in &alternative.items {
let mut start_time: gst::ClockTime =
((item.start_time as f64 * 1_000_000_000.0) as u64).into();
let mut end_time: gst::ClockTime =
((item.end_time as f64 * 1_000_000_000.0) as u64).into();
let mut start_time =
gst::ClockTime::from_nseconds((item.start_time as f64 * 1_000_000_000.0) as u64);
let mut end_time =
gst::ClockTime::from_nseconds((item.end_time as f64 * 1_000_000_000.0) as u64);
if start_time <= state.last_partial_end_time {
if state
.last_partial_end_time
.map_or(false, |last_partial_end_time| {
start_time <= last_partial_end_time
})
{
/* Already sent (hopefully) */
continue;
} else if !partial || start_time + latency < now {
} else if !partial || now.into().map_or(false, |now| start_time + latency < now) {
/* Should be sent now */
gst_debug!(CAT, obj: element, "Item is ready: {}", item.content);
let mut buf = gst::Buffer::from_mut_slice(item.content.clone().into_bytes());
state.last_partial_end_time = end_time;
state.last_partial_end_time = Some(end_time);
{
let buf = buf.get_mut().unwrap();
@ -334,15 +357,23 @@ impl Transcriber {
state.discont = false;
}
if start_time < state.out_segment.position() {
if state
.out_segment
.position()
.map_or(false, |pos| start_time < pos)
{
let pos = state
.out_segment
.position()
.expect("position checked above");
gst_debug!(
CAT,
obj: element,
"Adjusting item timing({:?} < {:?})",
"Adjusting item timing({} < {})",
start_time,
state.out_segment.position()
pos,
);
start_time = state.out_segment.position();
start_time = pos;
if end_time < start_time {
end_time = start_time;
}
@ -436,24 +467,29 @@ impl Transcriber {
alternative.transcript
);
let mut start_time: gst::ClockTime =
((result.start_time as f64 * 1_000_000_000.0) as u64).into();
let end_time: gst::ClockTime =
((result.end_time as f64 * 1_000_000_000.0) as u64).into();
let mut start_time = gst::ClockTime::from_nseconds(
(result.start_time as f64 * 1_000_000_000.0) as u64,
);
let end_time = gst::ClockTime::from_nseconds(
(result.end_time as f64 * 1_000_000_000.0) as u64,
);
let mut state = self.state.lock().unwrap();
let position = state.out_segment.position();
if end_time < position {
if position.map_or(false, |position| end_time < position) {
let pos = position.expect("position checked above");
gst_warning!(CAT, obj: element,
"Received transcript is too late by {:?}, dropping, consider increasing the latency",
position - start_time);
"Received transcript is too late by {}, dropping, consider increasing the latency",
pos - start_time);
} else {
if start_time < position {
if let Some(delta) =
position.and_then(|pos| pos.checked_sub(start_time))
{
gst_warning!(CAT, obj: element,
"Received transcript is too late by {:?}, clipping, consider increasing the latency",
position - start_time);
start_time = position;
"Received transcript is too late by {}, clipping, consider increasing the latency",
delta);
start_time = position.expect("position checked above");
}
let mut buf = gst::Buffer::from_mut_slice(
@ -489,8 +525,8 @@ impl Transcriber {
&mut state,
&alternative,
false,
0.into(),
0.into(),
gst::ClockTime::ZERO,
gst::ClockTime::ZERO,
);
state.partial_alternative = None;
}
@ -508,7 +544,7 @@ impl Transcriber {
/* Wrap in a timeout so we can push gaps regularly */
let future = async move {
match tokio::time::timeout(Duration::from_millis(GRANULARITY_MS.into()), future).await {
match tokio::time::timeout(GRANULARITY.into(), future).await {
Err(_) => {
if !self.dequeue(element) {
gst_info!(CAT, obj: element, "Failed to push gap event, pausing");
@ -611,9 +647,8 @@ impl Transcriber {
if ret {
let (_, min, _) = peer_query.result();
let our_latency: gst::ClockTime =
self.settings.lock().unwrap().latency_ms as u64 * gst::MSECOND;
q.set(true, our_latency + min, gst::CLOCK_TIME_NONE);
let our_latency = self.settings.lock().unwrap().latency;
q.set(true, our_latency + min, gst::ClockTime::NONE);
}
ret
}
@ -694,9 +729,7 @@ impl Transcriber {
let mut state = self.state.lock().unwrap();
state.out_segment.set_time(segment.time());
state
.out_segment
.set_position(gst::ClockTime::from_nseconds(0));
state.out_segment.set_position(gst::ClockTime::ZERO);
state.in_segment = segment;
state.seqnum = e.seqnum();
@ -738,14 +771,14 @@ impl Transcriber {
let running_time = state.in_segment.to_running_time(buffer.pts());
let now = element.current_running_time();
if now.is_some() && now < running_time {
delay = Some(running_time - now);
}
delay = running_time
.zip(now)
.and_then(|(running_time, now)| running_time.checked_sub(now));
}
}
if let Some(delay) = delay {
tokio::time::sleep(Duration::from_nanos(delay.nseconds().unwrap())).await;
tokio::time::sleep(delay.into()).await;
}
if let Some(ws_sink) = self.ws_sink.borrow_mut().as_mut() {
@ -1032,9 +1065,9 @@ impl ObjectImpl for Transcriber {
"latency",
"Latency",
"Amount of milliseconds to allow AWS transcribe",
2 * GRANULARITY_MS,
2 * GRANULARITY.mseconds() as u32,
std::u32::MAX,
DEFAULT_LATENCY_MS,
DEFAULT_LATENCY.mseconds() as u32,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpec::new_string(
@ -1074,7 +1107,9 @@ impl ObjectImpl for Transcriber {
}
"latency" => {
let mut settings = self.settings.lock().unwrap();
settings.latency_ms = value.get().expect("type checked upstream");
settings.latency = gst::ClockTime::from_mseconds(
value.get::<u32>().expect("type checked upstream").into(),
);
}
"use-partial-results" => {
let mut settings = self.settings.lock().unwrap();
@ -1096,7 +1131,7 @@ impl ObjectImpl for Transcriber {
}
"latency" => {
let settings = self.settings.lock().unwrap();
settings.latency_ms.to_value()
(settings.latency.mseconds() as u32).to_value()
}
"use-partial-results" => {
let settings = self.settings.lock().unwrap();