ts/scheduler: improve tasks / io & timers polling balance

Set a limit to the nb of task checked before checking the reactor
and the main future again.
This commit is contained in:
François Laignel 2022-08-30 23:56:01 +02:00 committed by Sebastian Dröge
parent d39aabe054
commit ab327be9af

View file

@ -42,6 +42,7 @@ pub(super) struct Scheduler {
impl Scheduler {
pub const DUMMY_NAME: &'static str = "DUMMY";
const MAX_SUCCESSIVE_TASKS: usize = 64;
pub fn start(context_name: &str, max_throttling: Duration) -> Handle {
// Name the thread so that it appears in panic messages.
@ -178,52 +179,73 @@ impl Scheduler {
let _guard = CallOnDrop::new(|| Scheduler::close(Arc::clone(&self.context_name)));
if let Poll::Ready(t) = future.as_mut().poll(cx) {
return Ok(t);
}
let mut last;
loop {
last = Instant::now();
Reactor::with_mut(|reactor| reactor.react(last).ok());
let mut now;
// This is to ensure reactor invocation on the first iteration.
let mut last_react = Instant::now() - self.max_throttling;
let mut tasks_checked;
'main: loop {
// Only check I/O and timers every `max_throttling`.
now = Instant::now();
if now - last_react >= self.max_throttling {
last_react = now;
Reactor::with_mut(|reactor| reactor.react(now).ok());
}
if let Poll::Ready(t) = future.as_mut().poll(cx) {
return Ok(t);
}
while let Ok(runnable) = self.tasks.pop_runnable() {
panic::catch_unwind(|| runnable.run()).map_err(|err| {
gst::error!(
RUNTIME_CAT,
"A task has panicked within Context {}",
self.context_name
);
tasks_checked = 0;
while tasks_checked < Self::MAX_SUCCESSIVE_TASKS {
if let Ok(runnable) = self.tasks.pop_runnable() {
panic::catch_unwind(|| runnable.run()).map_err(|err| {
gst::error!(
RUNTIME_CAT,
"A task has panicked within Context {}",
self.context_name
);
err
})?;
}
err
})?;
let mut must_unpark = self.must_unpark.lock().unwrap();
loop {
if *must_unpark {
*must_unpark = false;
break;
}
if let Some(parking_duration) = self.max_throttling.checked_sub(last.elapsed()) {
let result = self
.must_unpark_cvar
.wait_timeout(must_unpark, parking_duration)
.unwrap();
#[cfg(feature = "tuning")]
self.parked_duration
.fetch_add(parking_duration.subsec_nanos() as u64, Ordering::Relaxed);
must_unpark = result.0;
tasks_checked += 1;
} else {
*must_unpark = false;
break;
// No more ready tasks.
if tasks_checked > 0 {
// Check if the main future is ready before parking.
if let Poll::Ready(t) = future.as_mut().poll(cx) {
return Ok(t);
}
}
// else: main future has just been checked.
let mut must_unpark = self.must_unpark.lock().unwrap();
loop {
if *must_unpark {
*must_unpark = false;
continue 'main;
}
if let Some(parking_duration) =
self.max_throttling.checked_sub(last_react.elapsed())
{
#[cfg(feature = "tuning")]
self.parked_duration.fetch_add(
parking_duration.subsec_nanos() as u64,
Ordering::Relaxed,
);
let result = self
.must_unpark_cvar
.wait_timeout(must_unpark, parking_duration)
.unwrap();
must_unpark = result.0;
} else {
*must_unpark = false;
continue 'main;
}
}
}
}
}