threadshare: spawn StateMachine on the futures::executor::ThreadPool

StateMachines are spawned on a runtime::Context which uses a tokio
runtime. The StateMachine doesn't need all the features from tokio
such as the IO and timers drivers.

This commit makes use of a light-weight futures executor to spawn
the StateMachines.
This commit is contained in:
François Laignel 2020-05-15 23:37:56 +02:00
parent f0793587f6
commit 725eb0a093
2 changed files with 26 additions and 30 deletions

View file

@ -21,8 +21,9 @@ gst-net = { package = "gstreamer-net", git = "https://gitlab.freedesktop.org/gst
gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gstreamer-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys" }
pin-project = "0.4"
once_cell = "1"
tokio = { git = "https://github.com/fengalin/tokio", branch = "fengalin/throttling", features = ["io-util", "macros", "rt-core", "sync", "stream", "time", "tcp", "udp", "rt-util"] }
futures = "0.3"
futures = { version = "0.3", features = ["thread-pool"] }
lazy_static = "1.0"
rand = "0.7"
net2 = "0.2"

View file

@ -20,7 +20,7 @@
use futures::channel::mpsc as async_mpsc;
use futures::channel::oneshot;
use futures::future::{self, abortable, AbortHandle, Aborted, BoxFuture};
use futures::future::{self, abortable, AbortHandle, Aborted, BoxFuture, RemoteHandle};
use futures::prelude::*;
use futures::stream::StreamExt;
@ -32,7 +32,7 @@ use std::stringify;
use std::sync::{Arc, Mutex, MutexGuard};
use super::executor::{block_on_or_add_sub_task, TaskId};
use super::{Context, JoinHandle, RUNTIME_CAT};
use super::{Context, RUNTIME_CAT};
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)]
pub enum TaskState {
@ -277,18 +277,9 @@ impl fmt::Debug for TransitionRequest {
#[derive(Debug)]
struct TaskInner {
// The state machine needs an un-throttling Context because otherwise
// `transition_rx.next()` would throttle at each transition request.
// Since pipelines serialize state changes, this would lead to long starting / stopping
// when a pipeline consists in a large number of elements.
// See also the comment about transition_tx below.
state_machine_context: Context,
// The TaskImpl processings are spawned on the Task Context by the state machine.
context: Option<Context>,
state: TaskState,
state_machine_handle: Option<JoinHandle<()>>,
// The transition channel allows serializing transitions handling,
// preventing race conditions when transitions are run in //.
state_machine_handle: Option<RemoteHandle<()>>,
transition_tx: Option<async_mpsc::Sender<TransitionRequest>>,
prepare_abort_handle: Option<AbortHandle>,
loop_abort_handle: Option<AbortHandle>,
@ -298,7 +289,6 @@ struct TaskInner {
impl Default for TaskInner {
fn default() -> Self {
TaskInner {
state_machine_context: Context::acquire("state_machine", 0).unwrap(),
context: None,
state: TaskState::Unprepared,
state_machine_handle: None,
@ -458,24 +448,17 @@ impl Task {
inner.state = TaskState::Preparing;
gst_log!(RUNTIME_CAT, "Starting task state machine");
gst_log!(RUNTIME_CAT, "Spawning task state machine");
// FIXME allow configuration of the channel buffer size,
// this determines the contention on the Task.
let (transition_tx, transition_rx) = async_mpsc::channel(4);
let state_machine = StateMachine::new(Box::new(task_impl), transition_rx);
let (transition_req, _) = TransitionRequest::new(Transition::Prepare);
inner.state_machine_handle = Some(inner.state_machine_context.spawn(state_machine.run(
Arc::clone(&self.0),
context.clone(),
transition_req,
)));
inner.state_machine_handle = Some(self.spawn_state_machine(state_machine, &context));
inner.transition_tx = Some(transition_tx);
inner.context = Some(context);
gst_log!(RUNTIME_CAT, "Task state machine started");
Ok(TransitionStatus::Async {
transition: Transition::Prepare,
origin,
@ -540,7 +523,7 @@ impl Task {
state_machine_handle,
);
let join_fut = block_on_or_add_sub_task(async {
state_machine_handle.await.unwrap();
state_machine_handle.await;
drop(transition_tx);
drop(context);
@ -689,6 +672,22 @@ impl Task {
Ok(TransitionStatus::Async { transition, origin })
})
}
fn spawn_state_machine(
&self,
state_machine: StateMachine,
context: &Context,
) -> RemoteHandle<()> {
use futures::executor::ThreadPool;
use futures::task::SpawnExt;
use once_cell::sync::OnceCell;
static EXECUTOR: OnceCell<ThreadPool> = OnceCell::new();
EXECUTOR
.get_or_init(|| ThreadPool::builder().pool_size(1).create().unwrap())
.spawn_with_handle(state_machine.run(Arc::clone(&self.0), context.clone()))
.unwrap()
}
}
struct StateMachine {
@ -783,15 +782,11 @@ impl StateMachine {
}
}
async fn run(
mut self,
task_inner: Arc<Mutex<TaskInner>>,
context: Context,
mut transition_req: TransitionRequest,
) {
async fn run(mut self, task_inner: Arc<Mutex<TaskInner>>, context: Context) {
gst_trace!(RUNTIME_CAT, "Preparing task");
{
let (mut transition_req, _) = TransitionRequest::new(Transition::Prepare);
let res = exec_hook!(
self,
prepare,