net/aws/transcriber: rename prop transcript-lookahead & TranslationSrcPad

... as translate-lookahead and TranslateSrcPad.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1137>
This commit is contained in:
François Laignel 2023-03-15 14:59:50 +01:00
parent 3b3f0c1a29
commit d5d6a4daf9
4 changed files with 115 additions and 119 deletions

View file

@ -643,19 +643,19 @@
"caps": "text/x-raw:\n format: utf8\n", "caps": "text/x-raw:\n format: utf8\n",
"direction": "src", "direction": "src",
"presence": "always", "presence": "always",
"type": "GstTranslationSrcPad" "type": "GstTranslateSrcPad"
}, },
"src_%%u": { "src_%%u": {
"caps": "text/x-raw:\n format: utf8\n", "caps": "text/x-raw:\n format: utf8\n",
"direction": "src", "direction": "src",
"presence": "request", "presence": "request",
"type": "GstTranslationSrcPad" "type": "GstTranslateSrcPad"
}, },
"translation_src_%%u": { "translate_src_%%u": {
"caps": "text/x-raw:\n format: utf8\n", "caps": "text/x-raw:\n format: utf8\n",
"direction": "src", "direction": "src",
"presence": "request", "presence": "request",
"type": "GstTranslationSrcPad" "type": "GstTranslateSrcPad"
} }
}, },
"properties": { "properties": {
@ -773,20 +773,6 @@
"type": "guint", "type": "guint",
"writable": true "writable": true
}, },
"transcript-lookahead": {
"blurb": "Maximum duration in milliseconds of transcript to lookahead before sending to translation when no separator was encountered",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "5000",
"max": "-1",
"min": "0",
"mutable": "ready",
"readable": true,
"type": "guint",
"writable": true
},
"translate-latency": { "translate-latency": {
"blurb": "Amount of milliseconds to allow AWS translate (ignored if the input and output languages are the same)", "blurb": "Amount of milliseconds to allow AWS translate (ignored if the input and output languages are the same)",
"conditionally-available": false, "conditionally-available": false,
@ -801,6 +787,20 @@
"type": "guint", "type": "guint",
"writable": true "writable": true
}, },
"translate-lookahead": {
"blurb": "Maximum duration in milliseconds of transcript to lookahead before sending to translation when no separator was encountered",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "5000",
"max": "-1",
"min": "0",
"mutable": "ready",
"readable": true,
"type": "guint",
"writable": true
},
"vocabulary-filter-method": { "vocabulary-filter-method": {
"blurb": "Defines how filtered words will be edited, has no effect when vocabulary-filter-name isn't set", "blurb": "Defines how filtered words will be edited, has no effect when vocabulary-filter-name isn't set",
"conditionally-available": false, "conditionally-available": false,
@ -919,9 +919,9 @@
} }
] ]
}, },
"GstTranslationSrcPad": { "GstTranslateSrcPad": {
"hierarchy": [ "hierarchy": [
"GstTranslationSrcPad", "GstTranslateSrcPad",
"GstPad", "GstPad",
"GstObject", "GstObject",
"GInitiallyUnowned", "GInitiallyUnowned",

View file

@ -13,7 +13,7 @@
//! The element can optionally translate the resulting transcripts to one or //! The element can optionally translate the resulting transcripts to one or
//! multiple languages. //! multiple languages.
//! //!
//! This module contains the element implementation as well as the `TranslationSrcPad` //! This module contains the element implementation as well as the `TranslateSrcPad`
//! sublcass and its `TranslationPadTask`. //! sublcass and its `TranslationPadTask`.
//! //!
//! Web service specific code can be found in the `transcribe` and `translate` modules. //! Web service specific code can be found in the `transcribe` and `translate` modules.
@ -34,7 +34,7 @@ use std::sync::Mutex;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use super::transcribe::{TranscriberLoop, TranscriptEvent, TranscriptItem, TranscriptionSettings}; use super::transcribe::{TranscriberLoop, TranscriptEvent, TranscriptItem, TranscriptionSettings};
use super::translate::{TranslatedItem, TranslationLoop, TranslationQueue}; use super::translate::{TranslateLoop, TranslateQueue, TranslatedItem};
use super::{ use super::{
AwsTranscriberResultStability, AwsTranscriberVocabularyFilterMethod, AwsTranscriberResultStability, AwsTranscriberVocabularyFilterMethod,
TranslationTokenizationMethod, CAT, TranslationTokenizationMethod, CAT,
@ -59,8 +59,8 @@ pub const DEFAULT_TRANSCRIBE_LATENCY: gst::ClockTime = gst::ClockTime::from_seco
const TRANSLATE_LATENCY_PROPERTY: &str = "translate-latency"; const TRANSLATE_LATENCY_PROPERTY: &str = "translate-latency";
pub const DEFAULT_TRANSLATE_LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(500); pub const DEFAULT_TRANSLATE_LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(500);
const TRANSCRIPT_LOOKAHEAD_PROPERTY: &str = "transcript-lookahead"; const TRANSLATE_LOOKAHEAD_PROPERTY: &str = "translate-lookahead";
pub const DEFAULT_TRANSCRIPT_LOOKAHEAD: gst::ClockTime = gst::ClockTime::from_seconds(5); pub const DEFAULT_TRANSLATE_LOOKAHEAD: gst::ClockTime = gst::ClockTime::from_seconds(5);
const DEFAULT_LATENESS: gst::ClockTime = gst::ClockTime::ZERO; const DEFAULT_LATENESS: gst::ClockTime = gst::ClockTime::ZERO;
pub const DEFAULT_INPUT_LANG_CODE: &str = "en-US"; pub const DEFAULT_INPUT_LANG_CODE: &str = "en-US";
@ -82,7 +82,7 @@ const TRANSLATION_TOKENIZATION_PROPERTY: &str = "tokenization-method";
pub(super) struct Settings { pub(super) struct Settings {
transcribe_latency: gst::ClockTime, transcribe_latency: gst::ClockTime,
translate_latency: gst::ClockTime, translate_latency: gst::ClockTime,
transcript_lookahead: gst::ClockTime, translate_lookahead: gst::ClockTime,
lateness: gst::ClockTime, lateness: gst::ClockTime,
pub language_code: String, pub language_code: String,
pub vocabulary: Option<String>, pub vocabulary: Option<String>,
@ -100,7 +100,7 @@ impl Default for Settings {
Self { Self {
transcribe_latency: DEFAULT_TRANSCRIBE_LATENCY, transcribe_latency: DEFAULT_TRANSCRIBE_LATENCY,
translate_latency: DEFAULT_TRANSLATE_LATENCY, translate_latency: DEFAULT_TRANSLATE_LATENCY,
transcript_lookahead: DEFAULT_TRANSCRIPT_LOOKAHEAD, translate_lookahead: DEFAULT_TRANSLATE_LOOKAHEAD,
lateness: DEFAULT_LATENESS, lateness: DEFAULT_LATENESS,
language_code: DEFAULT_INPUT_LANG_CODE.to_string(), language_code: DEFAULT_INPUT_LANG_CODE.to_string(),
vocabulary: None, vocabulary: None,
@ -118,7 +118,7 @@ impl Default for Settings {
pub(super) struct State { pub(super) struct State {
buffer_tx: Option<mpsc::Sender<gst::Buffer>>, buffer_tx: Option<mpsc::Sender<gst::Buffer>>,
transcriber_loop_handle: Option<task::JoinHandle<Result<(), gst::ErrorMessage>>>, transcriber_loop_handle: Option<task::JoinHandle<Result<(), gst::ErrorMessage>>>,
srcpads: BTreeSet<super::TranslationSrcPad>, srcpads: BTreeSet<super::TranslateSrcPad>,
pad_serial: u32, pad_serial: u32,
pub seqnum: gst::Seqnum, pub seqnum: gst::Seqnum,
} }
@ -136,12 +136,12 @@ impl Default for State {
} }
pub struct Transcriber { pub struct Transcriber {
static_srcpad: super::TranslationSrcPad, static_srcpad: super::TranslateSrcPad,
sinkpad: gst::Pad, sinkpad: gst::Pad,
settings: Mutex<Settings>, settings: Mutex<Settings>,
state: Mutex<State>, state: Mutex<State>,
pub(super) aws_config: Mutex<Option<aws_config::SdkConfig>>, pub(super) aws_config: Mutex<Option<aws_config::SdkConfig>>,
// sender to broadcast transcript items to the translation src pads. // sender to broadcast transcript items to the translate src pads.
transcript_event_tx: broadcast::Sender<TranscriptEvent>, transcript_event_tx: broadcast::Sender<TranscriptEvent>,
} }
@ -385,30 +385,30 @@ impl ObjectSubclass for Transcriber {
let templ = klass.pad_template("src").unwrap(); let templ = klass.pad_template("src").unwrap();
let static_srcpad = let static_srcpad =
gst::PadBuilder::<super::TranslationSrcPad>::from_template(&templ, Some("src")) gst::PadBuilder::<super::TranslateSrcPad>::from_template(&templ, Some("src"))
.activatemode_function(|pad, parent, mode, active| { .activatemode_function(|pad, parent, mode, active| {
Transcriber::catch_panic_pad_function( Transcriber::catch_panic_pad_function(
parent, parent,
|| { || {
Err(gst::loggable_error!( Err(gst::loggable_error!(
CAT, CAT,
"Panic activating TranslationSrcPad" "Panic activating TranslateSrcPad"
)) ))
}, },
|elem| TranslationSrcPad::activatemode(elem, pad, mode, active), |elem| TranslateSrcPad::activatemode(elem, pad, mode, active),
) )
}) })
.query_function(|pad, parent, query| { .query_function(|pad, parent, query| {
Transcriber::catch_panic_pad_function( Transcriber::catch_panic_pad_function(
parent, parent,
|| false, || false,
|elem| TranslationSrcPad::src_query(elem, pad, query), |elem| TranslateSrcPad::src_query(elem, pad, query),
) )
}) })
.flags(gst::PadFlags::FIXED_CAPS) .flags(gst::PadFlags::FIXED_CAPS)
.build(); .build();
// Setting the channel capacity so that a TranslationSrcPad that would lag // Setting the channel capacity so that a TranslateSrcPad that would lag
// behind for some reasons get a chance to catch-up without loosing items. // behind for some reasons get a chance to catch-up without loosing items.
// Receiver will be created by subscribing to sender later. // Receiver will be created by subscribing to sender later.
let (transcript_event_tx, _) = broadcast::channel(128); let (transcript_event_tx, _) = broadcast::channel(128);
@ -458,13 +458,13 @@ impl ObjectImpl for Transcriber {
.default_value(DEFAULT_TRANSLATE_LATENCY.mseconds() as u32) .default_value(DEFAULT_TRANSLATE_LATENCY.mseconds() as u32)
.mutable_ready() .mutable_ready()
.build(), .build(),
glib::ParamSpecUInt::builder(TRANSCRIPT_LOOKAHEAD_PROPERTY) glib::ParamSpecUInt::builder(TRANSLATE_LOOKAHEAD_PROPERTY)
.nick("Transcript lookahead") .nick("Translate lookahead")
.blurb(concat!( .blurb(concat!(
"Maximum duration in milliseconds of transcript to lookahead ", "Maximum duration in milliseconds of transcript to lookahead ",
"before sending to translation when no separator was encountered", "before sending to translation when no separator was encountered",
)) ))
.default_value(DEFAULT_TRANSCRIPT_LOOKAHEAD.mseconds() as u32) .default_value(DEFAULT_TRANSLATE_LOOKAHEAD.mseconds() as u32)
.mutable_ready() .mutable_ready()
.build(), .build(),
glib::ParamSpecUInt::builder("lateness") glib::ParamSpecUInt::builder("lateness")
@ -554,8 +554,8 @@ impl ObjectImpl for Transcriber {
self.settings.lock().unwrap().translate_latency = self.settings.lock().unwrap().translate_latency =
gst::ClockTime::from_mseconds(value.get::<u32>().unwrap().into()); gst::ClockTime::from_mseconds(value.get::<u32>().unwrap().into());
} }
TRANSCRIPT_LOOKAHEAD_PROPERTY => { TRANSLATE_LOOKAHEAD_PROPERTY => {
self.settings.lock().unwrap().transcript_lookahead = self.settings.lock().unwrap().translate_lookahead =
gst::ClockTime::from_mseconds(value.get::<u32>().unwrap().into()); gst::ClockTime::from_mseconds(value.get::<u32>().unwrap().into());
} }
"lateness" => { "lateness" => {
@ -621,13 +621,9 @@ impl ObjectImpl for Transcriber {
TRANSLATE_LATENCY_PROPERTY => { TRANSLATE_LATENCY_PROPERTY => {
(self.settings.lock().unwrap().translate_latency.mseconds() as u32).to_value() (self.settings.lock().unwrap().translate_latency.mseconds() as u32).to_value()
} }
TRANSCRIPT_LOOKAHEAD_PROPERTY => (self TRANSLATE_LOOKAHEAD_PROPERTY => {
.settings (self.settings.lock().unwrap().translate_lookahead.mseconds() as u32).to_value()
.lock() }
.unwrap()
.transcript_lookahead
.mseconds() as u32)
.to_value(),
"lateness" => { "lateness" => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
(settings.lateness.mseconds() as u32).to_value() (settings.lateness.mseconds() as u32).to_value()
@ -695,15 +691,15 @@ impl ElementImpl for Transcriber {
gst::PadDirection::Src, gst::PadDirection::Src,
gst::PadPresence::Always, gst::PadPresence::Always,
&src_caps, &src_caps,
super::TranslationSrcPad::static_type(), super::TranslateSrcPad::static_type(),
) )
.unwrap(); .unwrap();
let req_src_pad_template = gst::PadTemplate::with_gtype( let req_src_pad_template = gst::PadTemplate::with_gtype(
"translation_src_%u", "translate_src_%u",
gst::PadDirection::Src, gst::PadDirection::Src,
gst::PadPresence::Request, gst::PadPresence::Request,
&src_caps, &src_caps,
super::TranslationSrcPad::static_type(), super::TranslateSrcPad::static_type(),
) )
.unwrap(); .unwrap();
@ -765,9 +761,9 @@ impl ElementImpl for Transcriber {
) -> Option<gst::Pad> { ) -> Option<gst::Pad> {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
let pad = gst::PadBuilder::<super::TranslationSrcPad>::from_template( let pad = gst::PadBuilder::<super::TranslateSrcPad>::from_template(
templ, templ,
Some(format!("translation_src_{}", state.pad_serial).as_str()), Some(format!("translate_src_{}", state.pad_serial).as_str()),
) )
.activatemode_function(|pad, parent, mode, active| { .activatemode_function(|pad, parent, mode, active| {
Transcriber::catch_panic_pad_function( Transcriber::catch_panic_pad_function(
@ -775,17 +771,17 @@ impl ElementImpl for Transcriber {
|| { || {
Err(gst::loggable_error!( Err(gst::loggable_error!(
CAT, CAT,
"Panic activating TranslationSrcPad" "Panic activating TranslateSrcPad"
)) ))
}, },
|elem| TranslationSrcPad::activatemode(elem, pad, mode, active), |elem| TranslateSrcPad::activatemode(elem, pad, mode, active),
) )
}) })
.query_function(|pad, parent, query| { .query_function(|pad, parent, query| {
Transcriber::catch_panic_pad_function( Transcriber::catch_panic_pad_function(
parent, parent,
|| false, || false,
|elem| TranslationSrcPad::src_query(elem, pad, query), |elem| TranslateSrcPad::src_query(elem, pad, query),
) )
}) })
.flags(gst::PadFlags::FIXED_CAPS) .flags(gst::PadFlags::FIXED_CAPS)
@ -849,16 +845,16 @@ impl ChildProxyImpl for Transcriber {
} }
} }
struct TranslationPadTask { struct TranslationPadTask {
pad: glib::subclass::ObjectImplRef<TranslationSrcPad>, pad: glib::subclass::ObjectImplRef<TranslateSrcPad>,
elem: super::Transcriber, elem: super::Transcriber,
transcript_event_rx: broadcast::Receiver<TranscriptEvent>, transcript_event_rx: broadcast::Receiver<TranscriptEvent>,
needs_translate: bool, needs_translate: bool,
translation_queue: TranslationQueue, translate_queue: TranslateQueue,
translation_loop_handle: Option<task::JoinHandle<Result<(), gst::ErrorMessage>>>, translate_loop_handle: Option<task::JoinHandle<Result<(), gst::ErrorMessage>>>,
to_translation_tx: Option<mpsc::Sender<Vec<TranscriptItem>>>, to_translate_tx: Option<mpsc::Sender<Vec<TranscriptItem>>>,
from_translation_rx: Option<mpsc::Receiver<Vec<TranslatedItem>>>, from_translate_rx: Option<mpsc::Receiver<Vec<TranslatedItem>>>,
translate_latency: gst::ClockTime, translate_latency: gst::ClockTime,
transcript_lookahead: gst::ClockTime, translate_lookahead: gst::ClockTime,
send_events: bool, send_events: bool,
translated_items: VecDeque<TranslatedItem>, translated_items: VecDeque<TranslatedItem>,
our_latency: gst::ClockTime, our_latency: gst::ClockTime,
@ -870,7 +866,7 @@ struct TranslationPadTask {
impl TranslationPadTask { impl TranslationPadTask {
fn try_new( fn try_new(
pad: &TranslationSrcPad, pad: &TranslateSrcPad,
elem: super::Transcriber, elem: super::Transcriber,
transcript_event_rx: broadcast::Receiver<TranscriptEvent>, transcript_event_rx: broadcast::Receiver<TranscriptEvent>,
) -> Result<TranslationPadTask, gst::ErrorMessage> { ) -> Result<TranslationPadTask, gst::ErrorMessage> {
@ -879,12 +875,12 @@ impl TranslationPadTask {
elem, elem,
transcript_event_rx, transcript_event_rx,
needs_translate: false, needs_translate: false,
translation_queue: TranslationQueue::default(), translate_queue: TranslateQueue::default(),
translation_loop_handle: None, translate_loop_handle: None,
to_translation_tx: None, to_translate_tx: None,
from_translation_rx: None, from_translate_rx: None,
translate_latency: DEFAULT_TRANSLATE_LATENCY, translate_latency: DEFAULT_TRANSLATE_LATENCY,
transcript_lookahead: DEFAULT_TRANSCRIPT_LOOKAHEAD, translate_lookahead: DEFAULT_TRANSLATE_LOOKAHEAD,
send_events: true, send_events: true,
translated_items: VecDeque::new(), translated_items: VecDeque::new(),
our_latency: DEFAULT_TRANSCRIBE_LATENCY, our_latency: DEFAULT_TRANSCRIBE_LATENCY,
@ -903,8 +899,8 @@ impl TranslationPadTask {
impl Drop for TranslationPadTask { impl Drop for TranslationPadTask {
fn drop(&mut self) { fn drop(&mut self) {
if let Some(translation_loop_handle) = self.translation_loop_handle.take() { if let Some(translate_loop_handle) = self.translate_loop_handle.take() {
translation_loop_handle.abort(); translate_loop_handle.abort();
} }
} }
} }
@ -969,11 +965,11 @@ impl TranslationPadTask {
async fn translate_iter(&mut self) -> Result<(), gst::ErrorMessage> { async fn translate_iter(&mut self) -> Result<(), gst::ErrorMessage> {
if self if self
.translation_loop_handle .translate_loop_handle
.as_ref() .as_ref()
.map_or(true, task::JoinHandle::is_finished) .map_or(true, task::JoinHandle::is_finished)
{ {
const ERR: &str = "Translation loop is not running"; const ERR: &str = "Translate loop is not running";
gst::error!(CAT, imp: self.pad, "{ERR}"); gst::error!(CAT, imp: self.pad, "{ERR}");
return Err(gst::error_msg!(gst::StreamError::Failed, ["{ERR}"])); return Err(gst::error_msg!(gst::StreamError::Failed, ["{ERR}"]));
} }
@ -983,8 +979,8 @@ impl TranslationPadTask {
let timeout = tokio::time::sleep(GRANULARITY.into()).fuse(); let timeout = tokio::time::sleep(GRANULARITY.into()).fuse();
futures::pin_mut!(timeout); futures::pin_mut!(timeout);
let from_translation_rx = self let from_translate_rx = self
.from_translation_rx .from_translate_rx
.as_mut() .as_mut()
.expect("from_translation chan must be available in translation mode"); .expect("from_translation chan must be available in translation mode");
@ -996,7 +992,7 @@ impl TranslationPadTask {
// before current latency budget is exhausted. // before current latency budget is exhausted.
futures::select_biased! { futures::select_biased! {
_ = timeout => return Ok(()), _ = timeout => return Ok(()),
translated_items = from_translation_rx.next() => { translated_items = from_translate_rx.next() => {
let Some(translated_items) = translated_items else { let Some(translated_items) = translated_items else {
const ERR: &str = "translation chan terminated"; const ERR: &str = "translation chan terminated";
gst::debug!(CAT, imp: self.pad, "{ERR}"); gst::debug!(CAT, imp: self.pad, "{ERR}");
@ -1033,7 +1029,7 @@ impl TranslationPadTask {
}; };
for items in transcript_items.iter() { for items in transcript_items.iter() {
if let Some(ready_items) = self.translation_queue.push(items) { if let Some(ready_items) = self.translate_queue.push(items) {
self.send_for_translation(ready_items).await?; self.send_for_translation(ready_items).await?;
} }
} }
@ -1062,7 +1058,7 @@ impl TranslationPadTask {
discont_pending = pad_state.discont_pending; discont_pending = pad_state.discont_pending;
} }
if self.needs_translate && !self.translation_queue.is_empty() { if self.needs_translate && !self.translate_queue.is_empty() {
// Maximum delay for an item to be pushed to stream on time // Maximum delay for an item to be pushed to stream on time
// Margin: // Margin:
// - 1 * GRANULARITY: the time it will take before we can check this again, // - 1 * GRANULARITY: the time it will take before we can check this again,
@ -1078,8 +1074,8 @@ impl TranslationPadTask {
let deadline = translation_eta.saturating_sub(max_delay); let deadline = translation_eta.saturating_sub(max_delay);
if let Some(ready_items) = self if let Some(ready_items) = self
.translation_queue .translate_queue
.dequeue(deadline, self.transcript_lookahead) .dequeue(deadline, self.translate_lookahead)
{ {
gst::debug!(CAT, imp: self.pad, "Forcing {} transcripts to translation", ready_items.len()); gst::debug!(CAT, imp: self.pad, "Forcing {} transcripts to translation", ready_items.len());
if self.send_for_translation(ready_items).await.is_err() { if self.send_for_translation(ready_items).await.is_err() {
@ -1184,7 +1180,7 @@ impl TranslationPadTask {
if self.send_eos if self.send_eos
&& self.pending_translations == 0 && self.pending_translations == 0
&& self.translated_items.is_empty() && self.translated_items.is_empty()
&& self.translation_queue.is_empty() && self.translate_queue.is_empty()
{ {
/* We're EOS, we can pause and exit early */ /* We're EOS, we can pause and exit early */
let _ = self.pad.obj().pause_task(); let _ = self.pad.obj().pause_task();
@ -1241,7 +1237,7 @@ impl TranslationPadTask {
transcript_items: Vec<TranscriptItem>, transcript_items: Vec<TranscriptItem>,
) -> Result<(), gst::ErrorMessage> { ) -> Result<(), gst::ErrorMessage> {
let res = self let res = self
.to_translation_tx .to_translate_tx
.as_mut() .as_mut()
.expect("to_translation chan must be available in translation mode") .expect("to_translation chan must be available in translation mode")
.send(transcript_items) .send(transcript_items)
@ -1317,7 +1313,7 @@ impl TranslationPadTask {
let pad_settings = self.pad.settings.lock().unwrap(); let pad_settings = self.pad.settings.lock().unwrap();
self.our_latency = TranslationSrcPad::our_latency(&elem_settings, &pad_settings); self.our_latency = TranslateSrcPad::our_latency(&elem_settings, &pad_settings);
if self.our_latency + elem_settings.lateness <= 2 * GRANULARITY { if self.our_latency + elem_settings.lateness <= 2 * GRANULARITY {
let err = format!( let err = format!(
"total latency + lateness must be greater than {}", "total latency + lateness must be greater than {}",
@ -1328,35 +1324,35 @@ impl TranslationPadTask {
} }
self.translate_latency = elem_settings.translate_latency; self.translate_latency = elem_settings.translate_latency;
self.transcript_lookahead = elem_settings.transcript_lookahead; self.translate_lookahead = elem_settings.translate_lookahead;
self.needs_translate = TranslationSrcPad::needs_translation( self.needs_translate = TranslateSrcPad::needs_translation(
&elem_settings.language_code, &elem_settings.language_code,
pad_settings.language_code.as_deref(), pad_settings.language_code.as_deref(),
); );
if self.needs_translate { if self.needs_translate {
let (to_translation_tx, to_translation_rx) = mpsc::channel(64); let (to_translate_tx, to_translate_rx) = mpsc::channel(64);
let (from_translation_tx, from_translation_rx) = mpsc::channel(64); let (from_translate_tx, from_translate_rx) = mpsc::channel(64);
translation_loop = Some(TranslationLoop::new( translation_loop = Some(TranslateLoop::new(
elem_imp, elem_imp,
&self.pad, &self.pad,
&elem_settings.language_code, &elem_settings.language_code,
pad_settings.language_code.as_deref().unwrap(), pad_settings.language_code.as_deref().unwrap(),
pad_settings.tokenization_method, pad_settings.tokenization_method,
to_translation_rx, to_translate_rx,
from_translation_tx, from_translate_tx,
)); ));
self.to_translation_tx = Some(to_translation_tx); self.to_translate_tx = Some(to_translate_tx);
self.from_translation_rx = Some(from_translation_rx); self.from_translate_rx = Some(from_translate_rx);
} }
} }
if let Some(translation_loop) = translation_loop { if let Some(translation_loop) = translation_loop {
translation_loop.check_language().await?; translation_loop.check_language().await?;
self.translation_loop_handle = Some(RUNTIME.spawn(translation_loop.run())); self.translate_loop_handle = Some(RUNTIME.spawn(translation_loop.run()));
} }
Ok(()) Ok(())
@ -1381,18 +1377,18 @@ impl Default for TranslationPadState {
} }
#[derive(Debug, Default, Clone)] #[derive(Debug, Default, Clone)]
struct TranslationPadSettings { struct TranslatePadSettings {
language_code: Option<String>, language_code: Option<String>,
tokenization_method: TranslationTokenizationMethod, tokenization_method: TranslationTokenizationMethod,
} }
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct TranslationSrcPad { pub struct TranslateSrcPad {
state: Mutex<TranslationPadState>, state: Mutex<TranslationPadState>,
settings: Mutex<TranslationPadSettings>, settings: Mutex<TranslatePadSettings>,
} }
impl TranslationSrcPad { impl TranslateSrcPad {
fn start_task(&self) -> Result<(), gst::LoggableError> { fn start_task(&self) -> Result<(), gst::LoggableError> {
gst::debug!(CAT, imp: self, "Starting task"); gst::debug!(CAT, imp: self, "Starting task");
@ -1457,14 +1453,14 @@ impl TranslationSrcPad {
#[inline] #[inline]
fn our_latency( fn our_latency(
elem_settings: &Settings, elem_settings: &Settings,
pad_settings: &TranslationPadSettings, pad_settings: &TranslatePadSettings,
) -> gst::ClockTime { ) -> gst::ClockTime {
if Self::needs_translation( if Self::needs_translation(
&elem_settings.language_code, &elem_settings.language_code,
pad_settings.language_code.as_deref(), pad_settings.language_code.as_deref(),
) { ) {
elem_settings.transcribe_latency elem_settings.transcribe_latency
+ elem_settings.transcript_lookahead + elem_settings.translate_lookahead
+ elem_settings.translate_latency + elem_settings.translate_latency
} else { } else {
elem_settings.transcribe_latency elem_settings.transcribe_latency
@ -1484,11 +1480,11 @@ impl TranslationSrcPad {
} }
} }
impl TranslationSrcPad { impl TranslateSrcPad {
#[track_caller] #[track_caller]
pub fn activatemode( pub fn activatemode(
_elem: &Transcriber, _elem: &Transcriber,
pad: &super::TranslationSrcPad, pad: &super::TranslateSrcPad,
_mode: gst::PadMode, _mode: gst::PadMode,
active: bool, active: bool,
) -> Result<(), gst::LoggableError> { ) -> Result<(), gst::LoggableError> {
@ -1503,7 +1499,7 @@ impl TranslationSrcPad {
pub fn src_query( pub fn src_query(
elem: &Transcriber, elem: &Transcriber,
pad: &super::TranslationSrcPad, pad: &super::TranslateSrcPad,
query: &mut gst::QueryRef, query: &mut gst::QueryRef,
) -> bool { ) -> bool {
gst::log!(CAT, obj: pad, "Handling query {query:?}"); gst::log!(CAT, obj: pad, "Handling query {query:?}");
@ -1553,9 +1549,9 @@ impl TranslationSrcPad {
} }
#[glib::object_subclass] #[glib::object_subclass]
impl ObjectSubclass for TranslationSrcPad { impl ObjectSubclass for TranslateSrcPad {
const NAME: &'static str = "GstTranslationSrcPad"; const NAME: &'static str = "GstTranslateSrcPad";
type Type = super::TranslationSrcPad; type Type = super::TranslateSrcPad;
type ParentType = gst::Pad; type ParentType = gst::Pad;
fn new() -> Self { fn new() -> Self {
@ -1563,7 +1559,7 @@ impl ObjectSubclass for TranslationSrcPad {
} }
} }
impl ObjectImpl for TranslationSrcPad { impl ObjectImpl for TranslateSrcPad {
fn properties() -> &'static [glib::ParamSpec] { fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| { static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![ vec![
@ -1608,6 +1604,6 @@ impl ObjectImpl for TranslationSrcPad {
} }
} }
impl GstObjectImpl for TranslationSrcPad {} impl GstObjectImpl for TranslateSrcPad {}
impl PadImpl for TranslationSrcPad {} impl PadImpl for TranslateSrcPad {}

View file

@ -99,7 +99,7 @@ glib::wrapper! {
} }
glib::wrapper! { glib::wrapper! {
pub struct TranslationSrcPad(ObjectSubclass<imp::TranslationSrcPad>) @extends gst::Pad, gst::Object; pub struct TranslateSrcPad(ObjectSubclass<imp::TranslateSrcPad>) @extends gst::Pad, gst::Object;
} }
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
@ -111,7 +111,7 @@ pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
.mark_as_plugin_api(gst::PluginAPIFlags::empty()); .mark_as_plugin_api(gst::PluginAPIFlags::empty());
TranslationTokenizationMethod::static_type() TranslationTokenizationMethod::static_type()
.mark_as_plugin_api(gst::PluginAPIFlags::empty()); .mark_as_plugin_api(gst::PluginAPIFlags::empty());
TranslationSrcPad::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); TranslateSrcPad::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
} }
gst::Element::register( gst::Element::register(
Some(plugin), Some(plugin),

View file

@ -16,7 +16,7 @@ use futures::prelude::*;
use std::collections::VecDeque; use std::collections::VecDeque;
use super::imp::TranslationSrcPad; use super::imp::TranslateSrcPad;
use super::transcribe::TranscriptItem; use super::transcribe::TranscriptItem;
use super::{TranslationTokenizationMethod, CAT}; use super::{TranslationTokenizationMethod, CAT};
@ -41,11 +41,11 @@ impl From<&TranscriptItem> for TranslatedItem {
} }
#[derive(Default)] #[derive(Default)]
pub struct TranslationQueue { pub struct TranslateQueue {
items: VecDeque<TranscriptItem>, items: VecDeque<TranscriptItem>,
} }
impl TranslationQueue { impl TranslateQueue {
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.items.is_empty() self.items.is_empty()
} }
@ -102,39 +102,39 @@ impl TranslationQueue {
} }
} }
pub struct TranslationLoop { pub struct TranslateLoop {
pad: glib::subclass::ObjectImplRef<TranslationSrcPad>, pad: glib::subclass::ObjectImplRef<TranslateSrcPad>,
client: aws_translate::Client, client: aws_translate::Client,
input_lang: String, input_lang: String,
output_lang: String, output_lang: String,
tokenization_method: TranslationTokenizationMethod, tokenization_method: TranslationTokenizationMethod,
transcript_rx: mpsc::Receiver<Vec<TranscriptItem>>, transcript_rx: mpsc::Receiver<Vec<TranscriptItem>>,
translation_tx: mpsc::Sender<Vec<TranslatedItem>>, translate_tx: mpsc::Sender<Vec<TranslatedItem>>,
} }
impl TranslationLoop { impl TranslateLoop {
pub fn new( pub fn new(
imp: &super::imp::Transcriber, imp: &super::imp::Transcriber,
pad: &TranslationSrcPad, pad: &TranslateSrcPad,
input_lang: &str, input_lang: &str,
output_lang: &str, output_lang: &str,
tokenization_method: TranslationTokenizationMethod, tokenization_method: TranslationTokenizationMethod,
transcript_rx: mpsc::Receiver<Vec<TranscriptItem>>, transcript_rx: mpsc::Receiver<Vec<TranscriptItem>>,
translation_tx: mpsc::Sender<Vec<TranslatedItem>>, translate_tx: mpsc::Sender<Vec<TranslatedItem>>,
) -> Self { ) -> Self {
let aws_config = imp.aws_config.lock().unwrap(); let aws_config = imp.aws_config.lock().unwrap();
let aws_config = aws_config let aws_config = aws_config
.as_ref() .as_ref()
.expect("aws_config must be initialized at this stage"); .expect("aws_config must be initialized at this stage");
TranslationLoop { TranslateLoop {
pad: pad.ref_counted(), pad: pad.ref_counted(),
client: aws_sdk_translate::Client::new(aws_config), client: aws_sdk_translate::Client::new(aws_config),
input_lang: input_lang.to_string(), input_lang: input_lang.to_string(),
output_lang: output_lang.to_string(), output_lang: output_lang.to_string(),
tokenization_method, tokenization_method,
transcript_rx, transcript_rx,
translation_tx, translate_tx,
} }
} }
@ -227,7 +227,7 @@ impl TranslationLoop {
gst::trace!(CAT, imp: self.pad, "Sending {translated_items:?}"); gst::trace!(CAT, imp: self.pad, "Sending {translated_items:?}");
if self.translation_tx.send(translated_items).await.is_err() { if self.translate_tx.send(translated_items).await.is_err() {
gst::info!( gst::info!(
CAT, CAT,
imp: self.pad, imp: self.pad,