From 5a5ca76d9d750c58dd801a58d751849044dd2c01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Thu, 16 Mar 2023 12:17:38 +0100 Subject: [PATCH] net/aws/transcriber: desambiguify SrcPad output items queue Part-of: --- net/aws/src/transcriber/imp.rs | 99 +++++++++++++++++++++++----------- 1 file changed, 67 insertions(+), 32 deletions(-) diff --git a/net/aws/src/transcriber/imp.rs b/net/aws/src/transcriber/imp.rs index 8501936b..0c99c760 100644 --- a/net/aws/src/transcriber/imp.rs +++ b/net/aws/src/transcriber/imp.rs @@ -115,6 +115,33 @@ impl Default for Settings { } } +#[derive(Clone, Debug, Default)] +struct OutputItem { + pts: gst::ClockTime, + duration: gst::ClockTime, + content: String, +} + +impl From<&TranscriptItem> for OutputItem { + fn from(item: &TranscriptItem) -> Self { + OutputItem { + pts: item.pts, + duration: item.duration, + content: item.content.clone(), + } + } +} + +impl From for OutputItem { + fn from(item: TranslatedItem) -> Self { + OutputItem { + pts: item.pts, + duration: item.duration, + content: item.content, + } + } +} + struct State { buffer_tx: Option>, transcriber_loop_handle: Option>>, @@ -856,7 +883,7 @@ struct TranslationPadTask { translate_latency: gst::ClockTime, translate_lookahead: gst::ClockTime, send_events: bool, - translated_items: VecDeque, + output_items: VecDeque, our_latency: gst::ClockTime, seqnum: gst::Seqnum, send_eos: bool, @@ -882,7 +909,7 @@ impl TranslationPadTask { translate_latency: DEFAULT_TRANSLATE_LATENCY, translate_lookahead: DEFAULT_TRANSLATE_LOOKAHEAD, send_events: true, - translated_items: VecDeque::new(), + output_items: VecDeque::new(), our_latency: DEFAULT_TRANSCRIBE_LATENCY, seqnum: gst::Seqnum::next(), send_eos: false, @@ -941,9 +968,7 @@ impl TranslationPadTask { use broadcast::error::RecvError; match items_res { Ok(Items(transcript_items)) => { - for transcript_item in transcript_items.iter() { - self.translated_items.push_back(transcript_item.into()); - } + self.output_items.extend(transcript_items.iter().map(Into::into)); } Ok(Eos) => { gst::debug!(CAT, imp: self.pad, "Got eos"); @@ -999,7 +1024,7 @@ impl TranslationPadTask { return Err(gst::error_msg!(gst::StreamError::Failed, ["{ERR}"])); }; - self.translated_items.extend(translated_items); + self.output_items.extend(translated_items.into_iter().map(Into::into)); self.pending_translations = self.pending_translations.saturating_sub(1); return Ok(()); @@ -1029,8 +1054,36 @@ impl TranslationPadTask { }; for items in transcript_items.iter() { - if let Some(ready_items) = self.translate_queue.push(items) { - self.send_for_translation(ready_items).await?; + if let Some(items_to_translate) = self.translate_queue.push(items) { + self.send_for_translation(items_to_translate).await?; + } + } + + Ok(()) + } + + async fn dequeue_for_translation( + &mut self, + start_time: gst::ClockTime, + now: gst::ClockTime, + ) -> Result<(), gst::ErrorMessage> { + if !self.translate_queue.is_empty() { + // Latency budget for an item to be pushed to stream on time + // Margin: + // - 2 * GRANULARITY: to make sure we don't push items up to GRANULARITY late. + // - 1 * GRANULARITY: extra margin to account for additional overheads. + let latency = self.our_latency.saturating_sub(3 * GRANULARITY); + + // Estimated time of arrival for an item sent to translation now. + // (in transcript item ts base) + let translation_eta = now + self.translate_latency - start_time; + + if let Some(items_to_translate) = + self.translate_queue + .dequeue(latency, translation_eta, self.translate_lookahead) + { + gst::debug!(CAT, imp: self.pad, "Forcing to translation: {items_to_translate:?}"); + self.send_for_translation(items_to_translate).await?; } } @@ -1058,30 +1111,12 @@ impl TranslationPadTask { discont_pending = pad_state.discont_pending; } - if self.needs_translate && !self.translate_queue.is_empty() { - // Latency budget for an item to be pushed to stream on time - // Margin: - // - 2 * GRANULARITY: to make sure we don't push items up to GRANULARITY late. - // - 1 * GRANULARITY: extra margin to account for additional overheads. - let latency = self.our_latency.saturating_sub(3 * GRANULARITY); - - // Estimated time of arrival for an item sent to translation now. - // (in transcript item ts base) - let translation_eta = now + self.translate_latency - start_time; - - if let Some(ready_items) = - self.translate_queue - .dequeue(latency, translation_eta, self.translate_lookahead) - { - gst::debug!(CAT, imp: self.pad, "Forcing to translation: {ready_items:?}"); - if self.send_for_translation(ready_items).await.is_err() { - return false; - } - } + if self.needs_translate && self.dequeue_for_translation(start_time, now).await.is_err() { + return false; } /* First, check our pending buffers */ - while let Some(item) = self.translated_items.front() { + while let Some(item) = self.output_items.front() { // Note: items pts start from 0 + lateness gst::trace!( CAT, @@ -1097,11 +1132,11 @@ impl TranslationPadTask { // - 1 * GRANULARITY: extra margin to account for additional overheads. if item.pts + self.our_latency.saturating_sub(3 * GRANULARITY) < now - start_time { /* Safe unwrap, we know we have an item */ - let TranslatedItem { + let OutputItem { pts: item_pts, mut duration, content, - } = self.translated_items.pop_front().unwrap(); + } = self.output_items.pop_front().unwrap(); let mut pts = start_time + item_pts; @@ -1173,7 +1208,7 @@ impl TranslationPadTask { if self.send_eos && self.pending_translations == 0 - && self.translated_items.is_empty() + && self.output_items.is_empty() && self.translate_queue.is_empty() { /* We're EOS, we can pause and exit early */