rusoto: Port to tokio 1.0

This commit is contained in:
Sebastian Dröge 2021-01-09 12:14:31 +02:00
parent c380a3ea3d
commit e3aa368d94
3 changed files with 44 additions and 33 deletions

View file

@ -10,19 +10,19 @@ description = "Amazon Web Services plugin"
edition = "2018"
[dependencies]
bytes = "0.5"
bytes = "1.0"
futures = "0.3"
glib = { git = "https://github.com/gtk-rs/gtk-rs" }
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_12"] }
gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_12"] }
rusoto_core = "0.45"
rusoto_s3 = "0.45"
rusoto_credential = "0.45"
rusoto_signature = "0.45"
rusoto_core = "0.46"
rusoto_s3 = "0.46"
rusoto_credential = "0.46"
rusoto_signature = "0.46"
url = "2"
percent-encoding = "2"
tokio = { version = "0.2", features = [ "rt-threaded" ] }
async-tungstenite = { version = "0.9", features = ["tokio", "tokio-runtime", "tokio-native-tls"] }
tokio = { version = "1.0", features = [ "rt-multi-thread" ] }
async-tungstenite = { version = "0.12", features = ["tokio", "tokio-runtime", "tokio-native-tls"] }
nom = "5.1.1"
crc = "1.8.1"
byteorder = "1.3.4"

View file

@ -105,10 +105,9 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
});
static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
runtime::Builder::new()
.threaded_scheduler()
runtime::Builder::new_multi_thread()
.enable_all()
.core_threads(1)
.worker_threads(1)
.build()
.unwrap()
});
@ -569,7 +568,8 @@ impl Transcriber {
}
};
RUNTIME.enter(|| futures::executor::block_on(future))
let _enter = RUNTIME.enter();
futures::executor::block_on(future)
}
fn start_task(&self, element: &super::Transcriber) -> Result<(), gst::LoggableError> {
@ -800,7 +800,7 @@ impl Transcriber {
}
if let Some(delay) = delay {
tokio::time::delay_for(Duration::from_nanos(delay.nseconds().unwrap())).await;
tokio::time::sleep(Duration::from_nanos(delay.nseconds().unwrap())).await;
}
if let Some(ws_sink) = self.ws_sink.borrow_mut().as_mut() {
@ -847,7 +847,12 @@ impl Transcriber {
self.state.lock().unwrap().send_abort_handle = Some(abort_handle);
match RUNTIME.enter(|| futures::executor::block_on(future)) {
let res = {
let _enter = RUNTIME.enter();
futures::executor::block_on(future)
};
match res {
Err(_) => Err(gst::FlowError::Flushing),
Ok(res) => res,
}
@ -877,15 +882,18 @@ impl Transcriber {
gst_info!(CAT, obj: element, "Connecting ..");
let creds = RUNTIME
.enter(|| futures::executor::block_on(EnvironmentProvider::default().credentials()))
.map_err(|err| {
gst_error!(CAT, obj: element, "Failed to generate credentials: {}", err);
error_msg!(
gst::CoreError::Failed,
["Failed to generate credentials: {}", err]
)
})?;
let creds = {
let _enter = RUNTIME.enter();
futures::executor::block_on(EnvironmentProvider::default().credentials()).map_err(
|err| {
gst_error!(CAT, obj: element, "Failed to generate credentials: {}", err);
error_msg!(
gst::CoreError::Failed,
["Failed to generate credentials: {}", err]
)
},
)?
};
let language_code = settings
.language_code
@ -909,12 +917,15 @@ impl Transcriber {
signed.add_param("sample-rate", &sample_rate.to_string());
let url = signed.generate_presigned_url(&creds, &std::time::Duration::from_secs(60), true);
let (ws, _) = RUNTIME
.enter(|| futures::executor::block_on(connect_async(format!("wss{}", &url[5..]))))
.map_err(|err| {
gst_error!(CAT, obj: element, "Failed to connect: {}", err);
error_msg!(gst::CoreError::Failed, ["Failed to connect: {}", err])
})?;
let (ws, _) = {
let _enter = RUNTIME.enter();
futures::executor::block_on(connect_async(format!("wss{}", &url[5..]))).map_err(
|err| {
gst_error!(CAT, obj: element, "Failed to connect: {}", err);
error_msg!(gst::CoreError::Failed, ["Failed to connect: {}", err])
},
)?
};
let (ws_sink, mut ws_stream) = ws.split();

View file

@ -15,10 +15,9 @@ use std::sync::Mutex;
use tokio::runtime;
static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
runtime::Builder::new()
.threaded_scheduler()
runtime::Builder::new_multi_thread()
.enable_all()
.core_threads(2)
.worker_threads(2)
.thread_name("gst-rusoto-runtime")
.build()
.unwrap()
@ -49,7 +48,8 @@ where
// FIXME: add a timeout as well
let res = RUNTIME.enter(|| {
let res = {
let _enter = RUNTIME.enter();
futures::executor::block_on(async {
match abortable_future.await {
// Future resolved successfully
@ -60,7 +60,7 @@ where
Err(future::Aborted) => Err(WaitError::Cancelled),
}
})
});
};
/* Clear out the canceller */
canceller_guard = canceller.lock().unwrap();