ts: migrate most tests so that they don't use tokio

This commit is contained in:
François Laignel 2021-11-25 19:26:26 +01:00 committed by Sebastian Dröge
parent bba26a9cf5
commit cd0773662f
3 changed files with 173 additions and 136 deletions

View file

@ -706,14 +706,85 @@ mod tests {
const SLEEP_DURATION: Duration = Duration::from_millis(SLEEP_DURATION_MS);
const DELAY: Duration = Duration::from_millis(SLEEP_DURATION_MS * 10);
#[tokio::test]
async fn drain_sub_tasks() {
#[test]
fn block_on_task_id() {
gst::init().unwrap();
crate::runtime::executor::block_on(async {
let (_ctx, task_id) = Context::current_task().unwrap();
assert_eq!(task_id, super::TaskId(0));
/* Adding the sub task fails
let res = Context::add_sub_task(async move {
let (_ctx, task_id) = Context::current_task().unwrap();
assert_eq!(task_id, super::TaskId(0));
Ok(())
});
assert!(res.is_ok());
*/
});
}
#[test]
fn context_task_id() {
gst::init().unwrap();
let context = Context::acquire("context_task_id", SLEEP_DURATION).unwrap();
let join_handle = context.spawn(async {
let (_ctx, task_id) = Context::current_task().unwrap();
assert_eq!(task_id, super::TaskId(0));
});
futures::executor::block_on(join_handle).unwrap();
let ctx_weak = context.downgrade();
let join_handle = context.spawn(async move {
let (_ctx, task_id) = Context::current_task().unwrap();
assert_eq!(task_id, super::TaskId(1));
let res = Context::add_sub_task(async move {
let (_ctx, task_id) = Context::current_task().unwrap();
assert_eq!(task_id, super::TaskId(1));
Ok(())
});
assert!(res.is_ok());
ctx_weak
.upgrade()
.unwrap()
.spawn(async {
let (_ctx, task_id) = Context::current_task().unwrap();
assert_eq!(task_id, super::TaskId(2));
let res = Context::add_sub_task(async move {
let (_ctx, task_id) = Context::current_task().unwrap();
assert_eq!(task_id, super::TaskId(2));
Ok(())
});
assert!(res.is_ok());
assert!(Context::drain_sub_tasks().await.is_ok());
let (_ctx, task_id) = Context::current_task().unwrap();
assert_eq!(task_id, super::TaskId(2));
})
.await
.unwrap();
assert!(Context::drain_sub_tasks().await.is_ok());
let (_ctx, task_id) = Context::current_task().unwrap();
assert_eq!(task_id, super::TaskId(1));
});
futures::executor::block_on(join_handle).unwrap();
}
#[test]
fn drain_sub_tasks() {
// Setup
gst::init().unwrap();
let context = Context::acquire("drain_sub_tasks", SLEEP_DURATION).unwrap();
let join_handle = context.spawn(async move {
let join_handle = context.spawn(async {
let (sender, mut receiver) = mpsc::channel(1);
let sender: Arc<Mutex<mpsc::Sender<Item>>> = Arc::new(Mutex::new(sender));
@ -754,38 +825,12 @@ mod tests {
receiver
});
let mut receiver = join_handle.await.unwrap();
let mut receiver = futures::executor::block_on(join_handle).unwrap();
// The last sub task should be simply dropped at this point
assert_eq!(receiver.try_next().unwrap(), None);
}
#[tokio::test]
async fn block_on_within_tokio() {
gst::init().unwrap();
let context = Context::acquire("block_on_within_tokio", SLEEP_DURATION).unwrap();
let bytes_sent = crate::runtime::executor::block_on(context.spawn(async {
let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
let socket = UdpSocket::bind(saddr).unwrap();
let mut socket = tokio::net::UdpSocket::from_std(socket).unwrap();
let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4000);
socket.send_to(&[0; 10], saddr).await.unwrap()
}))
.unwrap();
assert_eq!(bytes_sent, 10);
let elapsed = crate::runtime::executor::block_on(context.spawn(async {
let now = Instant::now();
crate::runtime::time::delay_for(DELAY).await;
now.elapsed()
}))
.unwrap();
// Due to throttling, `Delay` may be fired earlier
assert!(elapsed + SLEEP_DURATION / 2 >= DELAY);
}
#[test]
fn block_on_from_sync() {
gst::init().unwrap();

View file

@ -1150,14 +1150,15 @@ impl StateMachine {
#[cfg(test)]
mod tests {
use futures::channel::{mpsc, oneshot};
use futures::executor::block_on;
use std::time::Duration;
use crate::runtime::Context;
use super::*;
#[tokio::test]
async fn iterate() {
#[test]
fn iterate() {
gst::init().unwrap();
struct TaskTest {
@ -1302,15 +1303,15 @@ mod tests {
assert_eq!(task.state(), TaskState::Started);
// At this point, prepared must be completed
prepared_receiver.next().await.unwrap();
block_on(prepared_receiver.next()).unwrap();
// ... and start executed
started_receiver.next().await.unwrap();
block_on(started_receiver.next()).unwrap();
assert_eq!(task.state(), TaskState::Started);
// unlock task loop and keep looping
iterate_receiver.next().await.unwrap();
complete_iterate_sender.send(Ok(())).await.unwrap();
block_on(iterate_receiver.next()).unwrap();
block_on(complete_iterate_sender.send(Ok(()))).unwrap();
gst_debug!(RUNTIME_CAT, "task_iterate: starting (redundant)");
// start will return immediately
@ -1334,16 +1335,16 @@ mod tests {
// Pause transition is asynchronous
while TaskState::Paused != task.state() {
tokio::time::delay_for(Duration::from_millis(2)).await;
std::thread::sleep(Duration::from_millis(2));
if let Ok(Some(())) = iterate_receiver.try_next() {
// unlock iteration
complete_iterate_sender.send(Ok(())).await.unwrap();
block_on(complete_iterate_sender.send(Ok(()))).unwrap();
}
}
gst_debug!(RUNTIME_CAT, "task_iterate: awaiting pause ack");
paused_receiver.next().await.unwrap();
block_on(paused_receiver.next()).unwrap();
gst_debug!(RUNTIME_CAT, "task_iterate: starting (after pause)");
assert_eq!(
@ -1356,7 +1357,7 @@ mod tests {
assert_eq!(task.state(), TaskState::Started);
// Paused -> Started
let _ = started_receiver.next().await;
let _ = block_on(started_receiver.next());
gst_debug!(RUNTIME_CAT, "task_iterate: stopping");
assert_eq!(
@ -1368,7 +1369,7 @@ mod tests {
);
assert_eq!(task.state(), TaskState::Stopped);
let _ = stopped_receiver.next().await;
let _ = block_on(stopped_receiver.next());
// purge remaining iteration received before stop if any
let _ = iterate_receiver.try_next();
@ -1381,21 +1382,18 @@ mod tests {
target: TaskState::Started
},
);
let _ = started_receiver.next().await;
let _ = block_on(started_receiver.next());
gst_debug!(RUNTIME_CAT, "task_iterate: req. iterate to return Eos");
iterate_receiver.next().await.unwrap();
complete_iterate_sender
.send(Err(gst::FlowError::Eos))
.await
.unwrap();
block_on(iterate_receiver.next()).unwrap();
block_on(complete_iterate_sender.send(Err(gst::FlowError::Eos))).unwrap();
gst_debug!(RUNTIME_CAT, "task_iterate: awaiting stop ack");
stopped_receiver.next().await.unwrap();
block_on(stopped_receiver.next()).unwrap();
// Wait for state machine to reach Stopped
while TaskState::Stopped != task.state() {
tokio::time::delay_for(Duration::from_millis(2)).await;
std::thread::sleep(Duration::from_millis(2));
}
gst_debug!(RUNTIME_CAT, "task_iterate: starting (after stop)");
@ -1406,21 +1404,18 @@ mod tests {
target: TaskState::Started
},
);
let _ = started_receiver.next().await;
let _ = block_on(started_receiver.next());
gst_debug!(RUNTIME_CAT, "task_iterate: req. iterate to return Flushing");
iterate_receiver.next().await.unwrap();
complete_iterate_sender
.send(Err(gst::FlowError::Flushing))
.await
.unwrap();
block_on(iterate_receiver.next()).unwrap();
block_on(complete_iterate_sender.send(Err(gst::FlowError::Flushing))).unwrap();
gst_debug!(RUNTIME_CAT, "task_iterate: awaiting flush_start ack");
flush_start_receiver.next().await.unwrap();
block_on(flush_start_receiver.next()).unwrap();
// Wait for state machine to reach Flushing
while TaskState::Flushing != task.state() {
tokio::time::delay_for(Duration::from_millis(2)).await;
std::thread::sleep(Duration::from_millis(2));
}
gst_debug!(RUNTIME_CAT, "task_iterate: stop flushing");
@ -1431,18 +1426,15 @@ mod tests {
target: TaskState::Started
},
);
let _ = started_receiver.next().await;
let _ = block_on(started_receiver.next());
gst_debug!(RUNTIME_CAT, "task_iterate: req. iterate to return Error");
iterate_receiver.next().await.unwrap();
complete_iterate_sender
.send(Err(gst::FlowError::Error))
.await
.unwrap();
block_on(iterate_receiver.next()).unwrap();
block_on(complete_iterate_sender.send(Err(gst::FlowError::Error))).unwrap();
// Wait for state machine to reach Error
while TaskState::Error != task.state() {
tokio::time::delay_for(Duration::from_millis(2)).await;
std::thread::sleep(Duration::from_millis(2));
}
gst_debug!(
@ -1468,11 +1460,11 @@ mod tests {
);
assert_eq!(task.state(), TaskState::Unprepared);
let _ = unprepared_receiver.next().await;
let _ = block_on(unprepared_receiver.next());
}
#[tokio::test]
async fn prepare_error() {
#[test]
fn prepare_error() {
gst::init().unwrap();
struct TaskPrepareTest {
@ -1538,11 +1530,11 @@ mod tests {
RUNTIME_CAT,
"prepare_error: await action error notification"
);
prepare_error_receiver.next().await.unwrap();
block_on(prepare_error_receiver.next()).unwrap();
// Wait for state machine to reach Error
while TaskState::Error != task.state() {
tokio::time::delay_for(Duration::from_millis(2)).await;
std::thread::sleep(Duration::from_millis(2));
}
let res = task.start().unwrap_err();
@ -1558,8 +1550,8 @@ mod tests {
task.unprepare().unwrap();
}
#[tokio::test]
async fn prepare_start_ok() {
#[test]
fn prepare_start_ok() {
// Hold the preparation function so that it completes after the start request is engaged
gst::init().unwrap();
@ -1651,16 +1643,16 @@ mod tests {
});
gst_debug!(RUNTIME_CAT, "prepare_start_ok: awaiting for start_ctx");
ready_receiver.await.unwrap();
block_on(ready_receiver).unwrap();
gst_debug!(RUNTIME_CAT, "prepare_start_ok: triggering preparation");
prepare_sender.send(()).await.unwrap();
block_on(prepare_sender.send(())).unwrap();
start_handle.await.unwrap();
block_on(start_handle).unwrap();
}
#[tokio::test]
async fn prepare_start_error() {
#[test]
fn prepare_start_error() {
// Hold the preparation function so that it completes after the start request is engaged
gst::init().unwrap();
@ -1756,25 +1748,25 @@ mod tests {
});
gst_debug!(RUNTIME_CAT, "prepare_start_error: awaiting for start_ctx");
ready_receiver.await.unwrap();
block_on(ready_receiver).unwrap();
gst_debug!(
RUNTIME_CAT,
"prepare_start_error: triggering preparation (failure)"
);
prepare_sender.send(()).await.unwrap();
block_on(prepare_sender.send(())).unwrap();
gst_debug!(
RUNTIME_CAT,
"prepare_start_error: await prepare error notification"
);
prepare_error_receiver.next().await.unwrap();
block_on(prepare_error_receiver.next()).unwrap();
start_handle.await.unwrap();
block_on(start_handle).unwrap();
}
#[tokio::test]
async fn pause_start() {
#[test]
fn pause_start() {
gst::init().unwrap();
struct TaskPauseStartTest {
@ -1836,7 +1828,7 @@ mod tests {
assert_eq!(task.state(), TaskState::Started);
gst_debug!(RUNTIME_CAT, "pause_start: awaiting 1st iteration");
iterate_receiver.next().await.unwrap();
block_on(iterate_receiver.next()).unwrap();
gst_debug!(RUNTIME_CAT, "pause_start: pausing (1)");
assert_eq!(
@ -1852,11 +1844,11 @@ mod tests {
// Pause transition is asynchronous
while TaskState::Paused != task.state() {
tokio::time::delay_for(Duration::from_millis(5)).await;
std::thread::sleep(Duration::from_millis(5));
}
gst_debug!(RUNTIME_CAT, "pause_start: awaiting paused");
let _ = paused_receiver.next().await;
let _ = block_on(paused_receiver.next());
// Loop held on due to Pause
iterate_receiver.try_next().unwrap_err();
@ -1871,7 +1863,7 @@ mod tests {
assert_eq!(task.state(), TaskState::Started);
gst_debug!(RUNTIME_CAT, "pause_start: awaiting 2d iteration");
iterate_receiver.next().await.unwrap();
block_on(iterate_receiver.next()).unwrap();
gst_debug!(RUNTIME_CAT, "pause_start: sending 2d iteration completion");
complete_sender.try_send(()).unwrap();
@ -1880,8 +1872,8 @@ mod tests {
task.unprepare().unwrap();
}
#[tokio::test]
async fn successive_pause_start() {
#[test]
fn successive_pause_start() {
// Purpose: check pause cancellation.
gst::init().unwrap();
@ -1913,7 +1905,7 @@ mod tests {
task.start().unwrap();
gst_debug!(RUNTIME_CAT, "successive_pause_start: awaiting iteration 1");
iterate_receiver.next().await.unwrap();
block_on(iterate_receiver.next()).unwrap();
gst_debug!(RUNTIME_CAT, "successive_pause_start: pause and start");
task.pause().unwrap();
@ -1922,15 +1914,15 @@ mod tests {
assert_eq!(task.state(), TaskState::Started);
gst_debug!(RUNTIME_CAT, "successive_pause_start: awaiting iteration 2");
iterate_receiver.next().await.unwrap();
block_on(iterate_receiver.next()).unwrap();
gst_debug!(RUNTIME_CAT, "successive_pause_start: stopping");
task.stop().unwrap();
task.unprepare().unwrap();
}
#[tokio::test]
async fn flush_regular_sync() {
#[test]
fn flush_regular_sync() {
gst::init().unwrap();
struct TaskFlushTest {
@ -1990,7 +1982,7 @@ mod tests {
);
assert_eq!(task.state(), TaskState::Flushing);
flush_start_receiver.next().await.unwrap();
block_on(flush_start_receiver.next()).unwrap();
gst_debug!(RUNTIME_CAT, "flush_regular_sync: stopping flush");
assert_eq!(
@ -2002,15 +1994,15 @@ mod tests {
);
assert_eq!(task.state(), TaskState::Started);
flush_stop_receiver.next().await.unwrap();
block_on(flush_stop_receiver.next()).unwrap();
task.pause().unwrap();
task.stop().unwrap();
task.unprepare().unwrap();
}
#[tokio::test]
async fn flush_regular_different_context() {
#[test]
fn flush_regular_different_context() {
// Purpose: make sure a flush sequence triggered from a Context doesn't block.
gst::init().unwrap();
@ -2098,15 +2090,15 @@ mod tests {
assert_eq!(task_clone.state(), TaskState::Started);
});
flush_handle.await.unwrap();
flush_stop_receiver.next().await.unwrap();
block_on(flush_handle).unwrap();
block_on(flush_stop_receiver.next()).unwrap();
task.stop().unwrap();
task.unprepare().unwrap();
}
#[tokio::test]
async fn flush_regular_same_context() {
#[test]
fn flush_regular_same_context() {
// Purpose: make sure a flush sequence triggered from the same Context doesn't block.
gst::init().unwrap();
@ -2181,15 +2173,15 @@ mod tests {
assert_eq!(task_clone.state(), TaskState::Started);
});
flush_handle.await.unwrap();
flush_stop_receiver.next().await.unwrap();
block_on(flush_handle).unwrap();
block_on(flush_stop_receiver.next()).unwrap();
task.stop().unwrap();
task.unprepare().unwrap();
}
#[tokio::test]
async fn flush_from_loop() {
#[test]
fn flush_from_loop() {
// Purpose: make sure a flush_start triggered from an iteration doesn't block.
gst::init().unwrap();
@ -2244,7 +2236,7 @@ mod tests {
RUNTIME_CAT,
"flush_from_loop: awaiting flush_start notification"
);
flush_start_receiver.next().await.unwrap();
block_on(flush_start_receiver.next()).unwrap();
assert_eq!(
task.stop().unwrap(),
@ -2256,8 +2248,8 @@ mod tests {
task.unprepare().unwrap();
}
#[tokio::test]
async fn pause_from_loop() {
#[test]
fn pause_from_loop() {
// Purpose: make sure a start triggered from an iteration doesn't block.
// E.g. an auto pause cancellation after a delay.
gst::init().unwrap();
@ -2314,14 +2306,14 @@ mod tests {
task.start().unwrap();
gst_debug!(RUNTIME_CAT, "pause_from_loop: awaiting pause notification");
pause_receiver.next().await.unwrap();
block_on(pause_receiver.next()).unwrap();
task.stop().unwrap();
task.unprepare().unwrap();
}
#[tokio::test]
async fn trigger_from_action() {
#[test]
fn trigger_from_action() {
// Purpose: make sure an event triggered from a transition action doesn't block.
gst::init().unwrap();
@ -2384,14 +2376,14 @@ mod tests {
RUNTIME_CAT,
"trigger_from_action: awaiting flush_stop notification"
);
flush_stop_receiver.next().await.unwrap();
block_on(flush_stop_receiver.next()).unwrap();
task.stop().unwrap();
task.unprepare().unwrap();
}
#[tokio::test]
async fn pause_flush_start() {
#[test]
fn pause_flush_start() {
gst::init().unwrap();
struct TaskFlushTest {
@ -2470,7 +2462,7 @@ mod tests {
},
);
assert_eq!(task.state(), TaskState::PausedFlushing);
flush_start_receiver.next().await;
block_on(flush_start_receiver.next());
gst_debug!(RUNTIME_CAT, "pause_flush_start: stopping flush");
assert_eq!(
@ -2481,7 +2473,7 @@ mod tests {
},
);
assert_eq!(task.state(), TaskState::Paused);
flush_stop_receiver.next().await;
block_on(flush_stop_receiver.next());
// start action not executed
started_receiver.try_next().unwrap_err();
@ -2495,14 +2487,14 @@ mod tests {
},
);
assert_eq!(task.state(), TaskState::Started);
started_receiver.next().await;
block_on(started_receiver.next());
task.stop().unwrap();
task.unprepare().unwrap();
}
#[tokio::test]
async fn pause_flushing_start() {
#[test]
fn pause_flushing_start() {
gst::init().unwrap();
struct TaskFlushTest {
@ -2569,7 +2561,7 @@ mod tests {
gst_debug!(RUNTIME_CAT, "pause_flushing_start: starting flush");
task.flush_start().unwrap();
assert_eq!(task.state(), TaskState::PausedFlushing);
flush_start_receiver.next().await;
block_on(flush_start_receiver.next());
gst_debug!(RUNTIME_CAT, "pause_flushing_start: starting while flushing");
assert_eq!(
@ -2593,15 +2585,15 @@ mod tests {
},
);
assert_eq!(task.state(), TaskState::Started);
flush_stop_receiver.next().await;
started_receiver.next().await;
block_on(flush_stop_receiver.next());
block_on(started_receiver.next());
task.stop().unwrap();
task.unprepare().unwrap();
}
#[tokio::test]
async fn flush_concurrent_start() {
#[test]
fn flush_concurrent_start() {
// Purpose: check the racy case of start being triggered in // after flush_start
// e.g.: a FlushStart event received on a Pad and the element starting after a Pause
gst::init().unwrap();
@ -2682,7 +2674,7 @@ mod tests {
RUNTIME_CAT,
"flush_concurrent_start: awaiting for oob_context"
);
ready_receiver.await.unwrap();
block_on(ready_receiver).unwrap();
gst_debug!(RUNTIME_CAT, "flush_concurrent_start: // start");
let res = task.start().unwrap();
@ -2698,7 +2690,7 @@ mod tests {
other => unreachable!("{:?}", other),
}
flush_start_handle.await.unwrap();
block_on(flush_start_handle).unwrap();
gst_debug!(RUNTIME_CAT, "flush_concurrent_start: requesting flush_stop");
assert_eq!(
@ -2710,14 +2702,14 @@ mod tests {
);
assert_eq!(task.state(), TaskState::Started);
flush_stop_receiver.next().await;
block_on(flush_stop_receiver.next());
task.stop().unwrap();
task.unprepare().unwrap();
}
#[tokio::test]
async fn start_timer() {
#[test]
fn start_timer() {
// Purpose: make sure a Timer initialized in a transition is
// available when iterating in the loop.
gst::init().unwrap();
@ -2770,7 +2762,7 @@ mod tests {
gst_debug!(RUNTIME_CAT, "start_timer: start");
task.start().unwrap();
timer_elapsed_receiver.await.unwrap();
block_on(timer_elapsed_receiver).unwrap();
gst_debug!(RUNTIME_CAT, "start_timer: timer elapsed received");
task.stop().unwrap();

View file

@ -68,18 +68,18 @@ pub fn interval(interval: Duration) -> impl Stream<Item = ()> {
mod tests {
use std::time::{Duration, Instant};
use crate::runtime::Context;
use crate::runtime::{executor, Context};
const MAX_THROTTLING: Duration = Duration::from_millis(10);
const DELAY: Duration = Duration::from_millis(12);
#[tokio::test]
async fn delay_for() {
#[test]
fn delay_for() {
gst::init().unwrap();
let context = Context::acquire("delay_for", MAX_THROTTLING).unwrap();
let elapsed = crate::runtime::executor::block_on(context.spawn(async {
let elapsed = executor::block_on(context.spawn(async {
let now = Instant::now();
crate::runtime::time::delay_for(DELAY).await;
now.elapsed()
@ -90,13 +90,13 @@ mod tests {
assert!(elapsed + MAX_THROTTLING / 2 >= DELAY);
}
#[tokio::test]
async fn delay_for_at_least() {
#[test]
fn delay_for_at_least() {
gst::init().unwrap();
let context = Context::acquire("delay_for_at_least", MAX_THROTTLING).unwrap();
let elapsed = crate::runtime::executor::block_on(context.spawn(async {
let elapsed = executor::block_on(context.spawn(async {
let now = Instant::now();
crate::runtime::time::delay_for_at_least(DELAY).await;
now.elapsed()