diff --git a/Cargo.lock b/Cargo.lock index ed23933..464f83e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -284,6 +284,7 @@ dependencies = [ "tokio", "tokio-stream", "tonic", + "tracing", "tracing-subscriber", ] diff --git a/Cargo.toml b/Cargo.toml index 7a3251b..2b7655c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] eyre = "0.6.6" log = "0.4" +tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } tokio = { version = "1.15", features = ["macros", "rt-multi-thread", "fs"] } google-authz = { version = "1.0.0-alpha.2", features = ["tonic"] } @@ -18,4 +19,7 @@ prost-types = { version = "0.9" } google-api-proto = { version = "1.0.0-alpha", features = ["google-cloud-translation-v3", "google-cloud-speech-v1"] } tokio-stream = "0.1.8" futures-util = "0.3" -async-stream = "*" \ No newline at end of file +async-stream = "*" +#glib = "0.15.4" +#gst = { package = "gstreamer", version = "0.18.3" } +#gstreamer-base = "0.18.0" diff --git a/src/main.rs b/src/main.rs index 3493798..230eeb7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,7 @@ use google_authz::{Credentials, GoogleAuthz}; use log::{debug, info}; use tokio::io::AsyncReadExt; use tonic::transport::Channel; +use tracing::Instrument; #[tokio::main] async fn main() -> eyre::Result<()> { @@ -36,28 +37,31 @@ async fn main() -> eyre::Result<()> { let mut client = SpeechClient::new(channel); - let outbound = async_stream::stream! { - let request = StreamingRecognizeRequest { - streaming_request: Some(StreamingRequest::StreamingConfig( - StreamingRecognitionConfig { - config: Some(RecognitionConfig { - encoding: AudioEncoding::Flac.into(), // matching current example file - sample_rate_hertz: 44_100, // matching current example file - audio_channel_count: 2, - language_code: "en-US".to_string(), // we only support en-US to start with - model: "video".to_string(), // dictate does not set this option - use_enhanced: true, // dictate does not set this option - profanity_filter: true, // used by Dictate, so we also use it here - enable_word_time_offsets: true, // important so we can get the spoken word time ranges - max_alternatives: 1, // make sure the default is used - ..Default::default() - }), - single_utterance: false, - interim_results: false, - }, - )), - }; - yield request; + + let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel(); + + sender.send(StreamingRecognizeRequest { + streaming_request: Some(StreamingRequest::StreamingConfig( + StreamingRecognitionConfig { + config: Some(RecognitionConfig { + encoding: AudioEncoding::Flac.into(), // matching current example file + sample_rate_hertz: 44_100, // matching current example file + audio_channel_count: 2, + language_code: "en-US".to_string(), // we only support en-US to start with + model: "video".to_string(), // dictate does not set this option + use_enhanced: true, // dictate does not set this option + profanity_filter: true, // used by Dictate, so we also use it here + enable_word_time_offsets: true, // important so we can get the spoken word time ranges + max_alternatives: 1, // make sure the default is used + ..Default::default() + }), + single_utterance: false, + interim_results: false, + }, + )), + })?; + + tokio::spawn(async move { let file = tokio::fs::File::open("some-audio.flac").await.unwrap(); let mut audio_file = tokio::io::BufReader::new(file); // read file chunk @@ -69,18 +73,26 @@ async fn main() -> eyre::Result<()> { BytesMut::from(&buffer.as_slice()[..n]).freeze(), )), }; - yield request; - // debug!("added a buffer to the sender queue: {} bytes", n); + sender.send(request).unwrap(); + //debug!("added a buffer to the sender queue: {} bytes", n); tokio::time::sleep(std::time::Duration::from_millis(100)).await; } + }).instrument(tracing::info_span!("audio-source")).await?; + + + let message = async_stream::stream! { + while let Some(message) = receiver.recv().await { + debug!("drained message inside stream..."); + yield message; + } }; let response = client - .streaming_recognize(tonic::Request::new(outbound)) + .streaming_recognize(tonic::Request::new(message)) .await?; let mut inbound = response.into_inner(); - while let Some(response) = inbound.message().await? { + while let Some(response) = inbound.message().instrument(tracing::info_span!("transcription-results")).await? { let mut num_results = 0; for res in &response.results { num_results = num_results + 1;