diff --git a/generic/threadshare/src/runtime/executor.rs b/generic/threadshare/src/runtime/executor.rs index 445c56eb..a4f614fc 100644 --- a/generic/threadshare/src/runtime/executor.rs +++ b/generic/threadshare/src/runtime/executor.rs @@ -209,6 +209,7 @@ impl ContextThread { let context = Context(Arc::new(ContextInner { real: Some(ContextRealInner { name: self.name.clone(), + wait_duration: wait, handle: Mutex::new(runtime.handle().clone()), shutdown, }), @@ -352,6 +353,7 @@ impl fmt::Debug for JoinHandle { struct ContextRealInner { name: String, handle: Mutex, + wait_duration: Duration, // Only used for dropping shutdown: ContextShutdown, } @@ -444,6 +446,13 @@ impl Context { } } + pub fn wait_duration(&self) -> Duration { + match self.0.real { + Some(ref real) => real.wait_duration, + None => Duration::ZERO, + } + } + /// Returns `true` if a `Context` is running on current thread. pub fn is_context_thread() -> bool { CURRENT_THREAD_CONTEXT.with(|cur_ctx| cur_ctx.borrow().is_some()) diff --git a/generic/threadshare/src/runtime/time.rs b/generic/threadshare/src/runtime/time.rs index 5dac6c49..b085a2a0 100644 --- a/generic/threadshare/src/runtime/time.rs +++ b/generic/threadshare/src/runtime/time.rs @@ -22,18 +22,88 @@ use futures::stream::StreamExt; use std::time::Duration; +use super::Context; + /// Wait until the given `delay` has elapsed. /// /// This must be called from within the target runtime environment. +/// +/// When throttling is activated (i.e. when using a non-`0` `wait` +/// duration in `Context::acquire`), timer entries are assigned to +/// the nearest time frame, meaning that the delay might elapse +/// `wait` / 2 ms earlier or later than the expected instant. +/// +/// Use [`delay_for_at_least`] when it's preferable not to return +/// before the expected instant. pub async fn delay_for(delay: Duration) { - if delay > Duration::from_nanos(0) { + if delay > Duration::ZERO { tokio::time::delay_for(delay).map(drop).await; } } +/// Wait until at least the given `delay` has elapsed. +/// +/// This must be called from within the target runtime environment. +/// +/// See [`delay_for`] for details. This method won't return before +/// the expected delay has elapsed. +pub async fn delay_for_at_least(delay: Duration) { + if delay > Duration::ZERO { + tokio::time::delay_for( + delay + Context::current().map_or(Duration::ZERO, |ctx| ctx.wait_duration() / 2), + ) + .map(drop) + .await; + } +} + /// Builds a `Stream` that yields at `interval. /// /// This must be called from within the target runtime environment. pub fn interval(interval: Duration) -> impl Stream { tokio::time::interval(interval).map(drop) } + +#[cfg(test)] +mod tests { + use std::time::{Duration, Instant}; + + use crate::runtime::Context; + + const MAX_THROTTLING: Duration = Duration::from_millis(10); + const DELAY: Duration = Duration::from_millis(12); + + #[tokio::test] + async fn delay_for() { + gst::init().unwrap(); + + let context = Context::acquire("delay_for", MAX_THROTTLING).unwrap(); + + let elapsed = crate::runtime::executor::block_on(context.spawn(async { + let now = Instant::now(); + crate::runtime::time::delay_for(DELAY).await; + now.elapsed() + })) + .unwrap(); + + // Due to throttling, timer may be fired earlier + assert!(elapsed + MAX_THROTTLING / 2 >= DELAY); + } + + #[tokio::test] + async fn delay_for_at_least() { + gst::init().unwrap(); + + let context = Context::acquire("delay_for_at_least", MAX_THROTTLING).unwrap(); + + let elapsed = crate::runtime::executor::block_on(context.spawn(async { + let now = Instant::now(); + crate::runtime::time::delay_for_at_least(DELAY).await; + now.elapsed() + })) + .unwrap(); + + // Never returns earlier that DELAY + assert!(elapsed >= DELAY); + } +}