ts/scheduler: fix shutdown

A strong handle reference was held in the `block_on_priv` `Result`
handler in the thread for the `Scheduler::start` code path, which
lead to the `Handler` strong count not dropping to 0 when it
should, leading to the shutdown request not being triggered.

Use an Arc<AtomicBool> instead of a oneshot channel for shutdown.
The main Future is always polled and never relies on a waker, a
`poll_fn` is cheap and does the job.

Unpark the scheduler after posting a request to shutdown.
This commit is contained in:
François Laignel 2022-09-11 19:57:36 +02:00 committed by Sebastian Dröge
parent ab327be9af
commit 1be30b8ecc

View file

@ -3,7 +3,7 @@
//
// Take a look at the license at the top of the repository in the LICENSE file.
use futures::channel::oneshot;
use futures::future::poll_fn;
use futures::pin_mut;
use gio::glib::clone::Downgrade;
@ -12,7 +12,8 @@ use std::cell::RefCell;
use std::future::Future;
use std::panic;
#[cfg(feature = "tuning")]
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::AtomicU64;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc as sync_mpsc;
use std::sync::{Arc, Condvar, Mutex, Weak};
use std::task::Poll;
@ -49,7 +50,6 @@ impl Scheduler {
let thread = thread::Builder::new().name(context_name.to_string());
let (handle_sender, handle_receiver) = sync_mpsc::channel();
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
let context_name = Arc::from(context_name);
let thread_ctx_name = Arc::clone(&context_name);
let join = thread
@ -62,9 +62,20 @@ impl Scheduler {
let handle = Scheduler::init(Arc::clone(&thread_ctx_name), max_throttling);
let this = Arc::clone(&handle.0.scheduler);
handle_sender.send(handle.clone()).unwrap();
let must_shutdown = handle.0.must_shutdown.clone();
let handle_weak = handle.downgrade();
handle_sender.send(handle).unwrap();
match this.block_on_priv(shutdown_receiver) {
let shutdown_fut = poll_fn(move |_| {
if must_shutdown.load(Ordering::SeqCst) {
Poll::Ready(())
} else {
Poll::Pending
}
});
// Blocking on `shutdown_fut` which is cheap to `poll`.
match this.block_on_priv(shutdown_fut) {
Ok(_) => {
gst::debug!(
RUNTIME_CAT,
@ -79,9 +90,8 @@ impl Scheduler {
thread_ctx_name
);
// We are shutting down on our own initiative
if let Ok(mut shutdown) = handle.0.shutdown.lock() {
shutdown.clear();
if let Some(handle) = handle_weak.upgrade() {
handle.self_shutdown();
}
panic::resume_unwind(e);
@ -91,7 +101,7 @@ impl Scheduler {
.expect("Failed to spawn Scheduler thread");
let handle = handle_receiver.recv().expect("Context thread init failed");
handle.set_shutdown(shutdown_sender, join);
handle.set_join_handle(join);
handle
}
@ -132,9 +142,11 @@ impl Scheduler {
!Scheduler::is_scheduler_thread(),
"Attempt to block within an existing Scheduler thread."
);
let handle = Scheduler::init(Scheduler::DUMMY_NAME.into(), Duration::ZERO);
let this = Arc::clone(&handle.0.scheduler);
// Move the (only) handle for this scheduler in the main task.
let (task_id, task) = this.tasks.add(async move {
let res = future.await;
@ -154,6 +166,7 @@ impl Scheduler {
);
});
// Blocking on `task` which is cheap to `poll`.
match this.block_on_priv(task) {
Ok(res) => res,
Err(e) => {
@ -168,14 +181,21 @@ impl Scheduler {
}
}
fn block_on_priv<F>(&self, future: F) -> std::thread::Result<F::Output>
// Important: the `termination_future` MUST be cheap to poll.
//
// Examples of appropriate `termination_future` are:
//
// - an `executor::Task` returned by `self.tasks.add(..)`.
// - a `JoinHandle` returned by `Handle::spawn`.
// - a custom future with few cycles (ex. checking an `AtomicBool`).
fn block_on_priv<F>(&self, termination_future: F) -> std::thread::Result<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let waker = waker_fn(|| ());
let cx = &mut std::task::Context::from_waker(&waker);
pin_mut!(future);
pin_mut!(termination_future);
let _guard = CallOnDrop::new(|| Scheduler::close(Arc::clone(&self.context_name)));
@ -191,7 +211,7 @@ impl Scheduler {
Reactor::with_mut(|reactor| reactor.react(now).ok());
}
if let Poll::Ready(t) = future.as_mut().poll(cx) {
if let Poll::Ready(t) = termination_future.as_mut().poll(cx) {
return Ok(t);
}
@ -210,15 +230,6 @@ impl Scheduler {
tasks_checked += 1;
} else {
// No more ready tasks.
if tasks_checked > 0 {
// Check if the main future is ready before parking.
if let Poll::Ready(t) = future.as_mut().poll(cx) {
return Ok(t);
}
}
// else: main future has just been checked.
let mut must_unpark = self.must_unpark.lock().unwrap();
loop {
if *must_unpark {
@ -297,51 +308,38 @@ impl Scheduler {
}
}
impl Drop for Scheduler {
fn drop(&mut self) {
gst::debug!(
RUNTIME_CAT,
"Terminated: Scheduler for Context {}",
self.context_name
);
}
}
#[derive(Debug)]
struct SchedulerShutdown {
struct HandleInner {
scheduler: Arc<Scheduler>,
sender: Option<oneshot::Sender<()>>,
join: Option<thread::JoinHandle<()>>,
must_shutdown: Arc<AtomicBool>,
join: Mutex<Option<thread::JoinHandle<()>>>,
}
impl SchedulerShutdown {
impl HandleInner {
fn new(scheduler: Arc<Scheduler>) -> Self {
SchedulerShutdown {
HandleInner {
scheduler,
sender: None,
join: None,
must_shutdown: Default::default(),
join: Default::default(),
}
}
fn clear(&mut self) {
self.sender = None;
self.join = None;
}
}
impl Drop for SchedulerShutdown {
impl Drop for HandleInner {
fn drop(&mut self) {
if let Some(sender) = self.sender.take() {
gst::debug!(
if !self.must_shutdown.fetch_or(true, Ordering::SeqCst) {
// Was not already shutting down.
self.scheduler.unpark();
gst::trace!(
RUNTIME_CAT,
"Shutting down Scheduler thread for Context {}",
self.scheduler.context_name
);
drop(sender);
// Don't block shutting down itself
if !self.scheduler.is_current() {
if let Some(join_handler) = self.join.take() {
if let Some(join_handler) = self.join.lock().unwrap().take() {
gst::trace!(
RUNTIME_CAT,
"Waiting for Scheduler thread to shutdown for Context {}",
@ -355,12 +353,6 @@ impl Drop for SchedulerShutdown {
}
}
#[derive(Debug)]
struct HandleInner {
scheduler: Arc<Scheduler>,
shutdown: Mutex<SchedulerShutdown>,
}
#[derive(Clone, Debug)]
pub(super) struct HandleWeak(Weak<HandleInner>);
@ -375,16 +367,16 @@ pub(super) struct Handle(Arc<HandleInner>);
impl Handle {
fn new(scheduler: Arc<Scheduler>) -> Self {
Handle(Arc::new(HandleInner {
shutdown: Mutex::new(SchedulerShutdown::new(Arc::clone(&scheduler))),
scheduler,
}))
Handle(Arc::new(HandleInner::new(scheduler)))
}
fn set_shutdown(&self, sender: oneshot::Sender<()>, join: thread::JoinHandle<()>) {
let mut shutdown = self.0.shutdown.lock().unwrap();
shutdown.sender = Some(sender);
shutdown.join = Some(join);
fn set_join_handle(&self, join: thread::JoinHandle<()>) {
*self.0.join.lock().unwrap() = Some(join);
}
fn self_shutdown(self) {
self.0.must_shutdown.store(true, Ordering::SeqCst);
*self.0.join.lock().unwrap() = None;
}
pub fn context_name(&self) -> &str {