diff --git a/src/lib.rs b/src/lib.rs index 040f7ce..9f83a20 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,8 @@ // // SPDX-License-Identifier: MPL-2.0 +extern crate core; + use gst::glib; mod transcriber; diff --git a/src/transcriber/imp.rs b/src/transcriber/imp.rs index 94a8973..6eca185 100644 --- a/src/transcriber/imp.rs +++ b/src/transcriber/imp.rs @@ -518,7 +518,12 @@ impl Transcriber { if ret { let (_, min, _) = peer_query.result(); let our_latency = self.settings.lock().unwrap().latency; - gst_info!(CAT, obj: element, "Replying to latency query: {}", our_latency + min); + gst_info!( + CAT, + obj: element, + "Replying to latency query: {}", + our_latency + min + ); // We never drop buffers, so our max latency is set to infinity q.set(true, our_latency + min, gst::ClockTime::NONE); } @@ -624,44 +629,59 @@ impl Transcriber { buffer: Option, ) -> Result { let mut delay = None; - - { + if let Some(buffer) = &buffer { let state = self.state.lock().unwrap(); + let running_time = state.in_segment.to_running_time(buffer.pts()); + let now = element.current_running_time(); - if let Some(buffer) = &buffer { - let running_time = state.in_segment.to_running_time(buffer.pts()); - let now = element.current_running_time(); - - delay = running_time.opt_checked_sub(now).ok().flatten(); - } + delay = running_time.opt_checked_sub(now).ok().flatten(); } // Wait until now is close enough to the buffer's PTS if let Some(delay) = delay { + gst_trace!( + CAT, + obj: element, + "Waiting {:?} before sending buffer", + delay + ); tokio::time::sleep(delay.into()).await; } if let Some(ws_sink) = self.ws_sink.borrow_mut().as_mut() { if let Some(buffer) = buffer { let data = buffer.map_readable().unwrap(); - for chunk in data.chunks(8000) { - ws_sink - .send(Message::Binary(chunk.to_vec())) - .await - .map_err(|err| { - gst_error!(CAT, obj: element, "Failed sending audio packet to server: {}", err); - gst::FlowError::Error - })?; - } - gst_trace!(CAT, obj: element, "Sent complete buffer to server!"); + + gst_trace!(CAT, obj: element, "Sending {} bytes", data.len()); + + ws_sink + .send(Message::Binary(data.to_vec())) + .await + .map_err(|err| { + gst_error!( + CAT, + obj: element, + "Failed sending audio packet to server: {}", + err + ); + gst::FlowError::Error + })?; } else { - // Send end of stream - gst_info!(CAT, obj: element, "Closing transcription session to Vosk server"); + gst_info!( + CAT, + obj: element, + "Closing transcription session to Vosk server" + ); ws_sink .send(Message::Text("{\"eof\": 1}".to_string())) .await .map_err(|err| { - gst_error!(CAT, obj: element, "Failed sending EOF packet to server: {}", err); + gst_error!( + CAT, + obj: element, + "Failed sending EOF packet to server: {}", + err + ); gst::FlowError::Error })?; } @@ -676,7 +696,7 @@ impl Transcriber { element: &super::Transcriber, buffer: Option, ) -> Result { - gst_log!(CAT, obj: element, "Handling {:?}", buffer); + gst_trace!(CAT, obj: element, "Handling {:?}", buffer); self.ensure_connection(element).map_err(|err| { element_error!( @@ -763,7 +783,12 @@ impl Transcriber { let s = in_caps.structure(0).unwrap(); let sample_rate = s.get::("rate").unwrap(); - gst_debug!(CAT, obj: &element, "Configuring transcription session, sample_rate={}", sample_rate); + gst_debug!( + CAT, + obj: &element, + "Configuring transcription session, sample_rate={}", + sample_rate + ); let config = Configuration::new(sample_rate); let packet = serde_json::to_vec(&config).unwrap(); @@ -788,7 +813,7 @@ impl Transcriber { let transcribe = element.imp(); let msg = match ws_stream.next().await { Some(msg) => msg, - None => continue + None => continue, }; let msg = match msg { @@ -929,7 +954,7 @@ impl ObjectImpl for Transcriber { "Address of the Vosk websocket server", Some(DEFAULT_SERVER_ADDRESS), glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, - ) + ), ] }); @@ -1011,8 +1036,8 @@ impl ElementImpl for Transcriber { .unwrap(); let sink_caps = gst::Caps::builder("audio/x-raw") - .field("format", "S16BE") - .field("rate", gst::IntRange::new(8000_i32, 48000)) + .field("format", "S16LE") + .field("rate", 16000) .field("channels", 1) .build(); let sink_pad_template = gst::PadTemplate::new(