ts/standalone: multiple improvements

- Reworked buffer push.
- Reworked stats.
- Make first elements logs stand out. This make it possible to
  follow what's going on with pipelines containing 1000s of
  elements.
- Actually handle EOS.
- Use more significant defaults.
- Allow building without `clap` feature.
This commit is contained in:
François Laignel 2022-08-19 19:40:05 +02:00 committed by Sebastian Dröge
parent 348a1d0207
commit 72acbebff0
5 changed files with 698 additions and 348 deletions

View file

@ -54,9 +54,8 @@ name = "tcpclientsrc-benchmark-sender"
path = "examples/tcpclientsrc_benchmark_sender.rs"
[[example]]
name = "standalone"
name = "ts-standalone"
path = "examples/standalone/main.rs"
required-features = ["clap"]
[build-dependencies]
gst-plugin-version-helper = { path="../../version-helper" }

View file

@ -31,14 +31,18 @@ gst::plugin_define!(
env!("BUILD_REL_DATE")
);
#[cfg(feature = "clap")]
use clap::Parser;
#[cfg(feature = "clap")]
#[derive(Parser, Debug)]
#[clap(version)]
#[clap(about = "Standalone pipeline threadshare runtime test")]
#[clap(
about = "Standalone pipeline threadshare runtime test. Use `GST_DEBUG=ts-standalone*:4` for stats"
)]
struct Args {
/// Parallel streams to process.
#[clap(short, long, default_value_t = 100)]
#[clap(short, long, default_value_t = 5000)]
streams: u32,
/// Threadshare groups.
@ -49,13 +53,67 @@ struct Args {
#[clap(short, long, default_value_t = 20)]
wait: u32,
/// Buffer push period in ms.
#[clap(short, long, default_value_t = 20)]
push_period: u32,
/// Number of buffers per stream to output before sending EOS (-1 = unlimited).
#[clap(short, long, default_value_t = 6000)]
#[clap(short, long, default_value_t = 5000)]
num_buffers: i32,
/// Enables statistics logging (use GST_DEBUG=ts-standalone*:4).
/// Disables statistics logging.
#[clap(short, long)]
log_stats: bool,
disable_stats_log: bool,
}
#[cfg(not(feature = "clap"))]
#[derive(Debug)]
struct Args {
streams: u32,
groups: u32,
wait: u32,
push_period: u32,
num_buffers: i32,
disable_stats_log: bool,
}
#[cfg(not(feature = "clap"))]
impl Default for Args {
fn default() -> Self {
Args {
streams: 5000,
groups: 2,
wait: 20,
push_period: 20,
num_buffers: 5000,
disable_stats_log: false,
}
}
}
fn args() -> Args {
#[cfg(feature = "clap")]
let args = {
let args = Args::parse();
gst::info!(CAT, "{:?}", args);
args
};
#[cfg(not(feature = "clap"))]
let args = {
if std::env::args().len() > 1 {
gst::warning!(CAT, "Ignoring command line arguments");
gst::warning!(CAT, "Build with `--features=clap`");
}
let args = Args::default();
gst::warning!(CAT, "{:?}", args);
args
};
args
}
fn main() {
@ -68,7 +126,7 @@ fn main() {
#[cfg(debug_assertions)]
gst::warning!(CAT, "RUNNING DEBUG BUILD");
let args = Args::parse();
let args = args();
let pipeline = gst::Pipeline::new(None);
@ -82,6 +140,7 @@ fn main() {
.unwrap();
src.set_property("context", &ctx_name);
src.set_property("context-wait", args.wait);
src.set_property("push-period", args.push_period);
src.set_property("num-buffers", args.num_buffers);
let sink = gst::ElementFactory::make(
@ -91,8 +150,27 @@ fn main() {
.unwrap();
sink.set_property("context", &ctx_name);
sink.set_property("context-wait", args.wait);
if i == 0 && args.log_stats {
sink.set_property("must-log-stats", true);
if i == 0 {
src.set_property("raise-log-level", true);
sink.set_property("raise-log-level", true);
if !args.disable_stats_log {
// Don't use the last 5 secs in stats
// otherwise we get outliers when reaching EOS.
// Note that stats don't start before the 20 first seconds
// and we get 50 buffers per sec.
const BUFFERS_BEFORE_LOGS: i32 = 20 * 50;
const BUFFERS_TO_SKIP: i32 = BUFFERS_BEFORE_LOGS + 5 * 50;
if args.num_buffers > BUFFERS_TO_SKIP {
sink.set_property("push-period", args.push_period);
sink.set_property("logs-stats", true);
let max_buffers = args.num_buffers - BUFFERS_TO_SKIP;
sink.set_property("max-buffers", max_buffers);
} else {
gst::warning!(CAT, "Not enough buffers to log, disabling stats");
}
}
}
let elements = &[&src, &sink];
@ -109,11 +187,8 @@ fn main() {
use gst::MessageView;
match msg.view() {
MessageView::Eos(..) => {
gst::info!(CAT, "Shuting down");
let stop = Instant::now();
pipeline_clone.set_state(gst::State::Null).unwrap();
gst::info!(CAT, "Shuting down took {:.2?}", stop.elapsed());
MessageView::Eos(_) => {
gst::info!(CAT, "Received eos");
l_clone.quit();
}
MessageView::Error(err) => {
@ -133,10 +208,25 @@ fn main() {
})
.expect("Failed to add bus watch");
gst::info!(CAT, "Starting");
gst::info!(CAT, "Switching to Ready");
let start = Instant::now();
pipeline.set_state(gst::State::Ready).unwrap();
gst::info!(CAT, "Switching to Ready took {:.2?}", start.elapsed());
gst::info!(CAT, "Switching to Playing");
let start = Instant::now();
pipeline.set_state(gst::State::Playing).unwrap();
gst::info!(CAT, "Starting took {:.2?}", start.elapsed());
gst::info!(CAT, "Switching to Playing took {:.2?}", start.elapsed());
l.run();
gst::info!(CAT, "Switching to Ready");
let stop = Instant::now();
pipeline_clone.set_state(gst::State::Ready).unwrap();
gst::info!(CAT, "Switching to Ready took {:.2?}", stop.elapsed());
gst::info!(CAT, "Shutting down");
let stop = Instant::now();
pipeline_clone.set_state(gst::State::Null).unwrap();
gst::info!(CAT, "Shutting down took {:.2?}", stop.elapsed());
}

View file

@ -10,18 +10,17 @@ use futures::future::BoxFuture;
use futures::prelude::*;
use futures::stream::Peekable;
use gst::error_msg;
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::EventView;
use gst::{element_error, error_msg};
use once_cell::sync::Lazy;
use gstthreadshare::runtime::prelude::*;
use gstthreadshare::runtime::{self, Context, PadSink, PadSinkRef, Task};
use gstthreadshare::runtime::{Context, PadSink, PadSinkRef, Task};
use std::pin::Pin;
use std::sync::Mutex;
use std::task::Poll;
use std::time::{Duration, Instant};
@ -36,32 +35,36 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_millis(20);
const DEFAULT_SYNC: bool = true;
const DEFAULT_MUST_LOG_STATS: bool = false;
const DEFAULT_PUSH_PERIOD: Duration = Duration::from_millis(20);
const DEFAULT_MAX_BUFFERS: i32 = 50 * (100 - 25);
const LOG_PERIOD: Duration = Duration::from_secs(20);
#[derive(Debug, Clone)]
struct Settings {
sync: bool,
context: String,
context_wait: Duration,
must_log_stats: bool,
raise_log_level: bool,
logs_stats: bool,
push_period: Duration,
max_buffers: Option<u32>,
}
impl Default for Settings {
fn default() -> Self {
Settings {
sync: DEFAULT_SYNC,
context: DEFAULT_CONTEXT.into(),
context_wait: DEFAULT_CONTEXT_WAIT,
must_log_stats: DEFAULT_MUST_LOG_STATS,
raise_log_level: false,
logs_stats: false,
push_period: DEFAULT_PUSH_PERIOD,
max_buffers: Some(DEFAULT_MAX_BUFFERS as u32),
}
}
}
#[derive(Debug)]
enum TaskItem {
enum StreamItem {
Buffer(gst::Buffer),
Event(gst::Event),
}
@ -83,7 +86,7 @@ impl PadSinkHandler for TestSinkPadHandler {
let element = element.clone().downcast::<super::TestSink>().unwrap();
async move {
if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() {
if sender.send_async(StreamItem::Buffer(buffer)).await.is_err() {
gst::debug!(CAT, obj: &element, "Flushing");
return Err(gst::FlowError::Flushing);
}
@ -105,7 +108,7 @@ impl PadSinkHandler for TestSinkPadHandler {
async move {
for buffer in list.iter_owned() {
if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() {
if sender.send_async(StreamItem::Buffer(buffer)).await.is_err() {
gst::debug!(CAT, obj: &element, "Flushing");
return Err(gst::FlowError::Flushing);
}
@ -130,7 +133,7 @@ impl PadSinkHandler for TestSinkPadHandler {
if let EventView::FlushStop(_) = event.view() {
let test_sink = element.imp();
return test_sink.task.flush_stop().await_maybe_on_context().is_ok();
} else if sender.send_async(TaskItem::Event(event)).await.is_err() {
} else if sender.send_async(StreamItem::Event(event)).await.is_err() {
gst::debug!(CAT, obj: &element, "Flushing");
}
@ -161,20 +164,31 @@ impl PadSinkHandler for TestSinkPadHandler {
#[derive(Default)]
struct Stats {
must_log: bool,
sync: bool,
ramp_up_instant: Option<Instant>,
log_start_instant: Option<Instant>,
last_delta_instant: Option<Instant>,
buffer_count: u32,
buffer_count_delta: u32,
buffer_headroom: Duration,
buffer_headroom_delta: Duration,
late_buffer_count: u32,
lateness: Duration,
max_lateness: Duration,
late_buffer_count_delta: u32,
lateness_delta: Duration,
max_lateness_delta: Duration,
max_buffers: Option<f32>,
buffer_count: f32,
buffer_count_delta: f32,
latency_sum: f32,
latency_square_sum: f32,
latency_sum_delta: f32,
latency_square_sum_delta: f32,
latency_min: Duration,
latency_min_delta: Duration,
latency_max: Duration,
latency_max_delta: Duration,
interval_sum: f32,
interval_square_sum: f32,
interval_sum_delta: f32,
interval_square_sum_delta: f32,
interval_min: Duration,
interval_min_delta: Duration,
interval_max: Duration,
interval_max_delta: Duration,
interval_late_warn: Duration,
interval_late_count: f32,
interval_late_count_delta: f32,
}
impl Stats {
@ -183,24 +197,34 @@ impl Stats {
return;
}
self.buffer_count = 0;
self.buffer_count_delta = 0;
self.buffer_headroom = Duration::ZERO;
self.buffer_headroom_delta = Duration::ZERO;
self.late_buffer_count = 0;
self.lateness = Duration::ZERO;
self.max_lateness = Duration::ZERO;
self.late_buffer_count_delta = 0;
self.lateness_delta = Duration::ZERO;
self.max_lateness_delta = Duration::ZERO;
self.buffer_count = 0.0;
self.buffer_count_delta = 0.0;
self.latency_sum = 0.0;
self.latency_square_sum = 0.0;
self.latency_sum_delta = 0.0;
self.latency_square_sum_delta = 0.0;
self.latency_min = Duration::MAX;
self.latency_min_delta = Duration::MAX;
self.latency_max = Duration::ZERO;
self.latency_max_delta = Duration::ZERO;
self.interval_sum = 0.0;
self.interval_square_sum = 0.0;
self.interval_sum_delta = 0.0;
self.interval_square_sum_delta = 0.0;
self.interval_min = Duration::MAX;
self.interval_min_delta = Duration::MAX;
self.interval_max = Duration::ZERO;
self.interval_max_delta = Duration::ZERO;
self.interval_late_count = 0.0;
self.interval_late_count_delta = 0.0;
self.last_delta_instant = None;
self.log_start_instant = None;
self.ramp_up_instant = Some(Instant::now());
gst::info!(CAT, "First stats logs in {:.2?}", 2 * LOG_PERIOD);
gst::info!(CAT, "First stats logs in {:2?}", 2 * LOG_PERIOD);
}
fn can_count(&mut self) -> bool {
fn is_active(&mut self) -> bool {
if !self.must_log {
return false;
}
@ -211,53 +235,64 @@ impl Stats {
}
self.ramp_up_instant = None;
gst::info!(CAT, "Ramp up complete. Stats logs in {:.2?}", LOG_PERIOD);
gst::info!(CAT, "Ramp up complete. Stats logs in {:2?}", LOG_PERIOD);
self.log_start_instant = Some(Instant::now());
self.last_delta_instant = self.log_start_instant;
}
true
use std::cmp::Ordering::*;
match self.max_buffers.opt_cmp(self.buffer_count) {
Some(Equal) => {
self.log_global();
self.buffer_count += 1.0;
false
}
Some(Less) => false,
_ => true,
}
}
fn notify_buffer(&mut self) {
if !self.can_count() {
fn add_buffer(&mut self, latency: Duration, interval: Duration) {
if !self.is_active() {
return;
}
self.buffer_count += 1;
self.buffer_count_delta += 1;
}
self.buffer_count += 1.0;
self.buffer_count_delta += 1.0;
fn notify_buffer_headroom(&mut self, headroom: Duration) {
if !self.can_count() {
return;
// Latency
let latency_f32 = latency.as_nanos() as f32;
let latency_square = latency_f32.powi(2);
self.latency_sum += latency_f32;
self.latency_square_sum += latency_square;
self.latency_min = self.latency_min.min(latency);
self.latency_max = self.latency_max.max(latency);
self.latency_sum_delta += latency_f32;
self.latency_square_sum_delta += latency_square;
self.latency_min_delta = self.latency_min_delta.min(latency);
self.latency_max_delta = self.latency_max_delta.max(latency);
// Interval
let interval_f32 = interval.as_nanos() as f32;
let interval_square = interval_f32.powi(2);
self.interval_sum += interval_f32;
self.interval_square_sum += interval_square;
self.interval_min = self.interval_min.min(interval);
self.interval_max = self.interval_max.max(interval);
self.interval_sum_delta += interval_f32;
self.interval_square_sum_delta += interval_square;
self.interval_min_delta = self.interval_min_delta.min(interval);
self.interval_max_delta = self.interval_max_delta.max(interval);
if interval > self.interval_late_warn {
self.interval_late_count += 1.0;
self.interval_late_count_delta += 1.0;
}
self.buffer_headroom += headroom;
self.buffer_headroom_delta += headroom;
}
fn notify_late_buffer(&mut self, now: Option<gst::ClockTime>, pts: gst::ClockTime) {
if !self.can_count() {
return;
}
let lateness = now
.opt_checked_sub(pts)
.ok()
.flatten()
.map_or(Duration::ZERO, Duration::from);
self.late_buffer_count += 1;
self.lateness += lateness;
self.max_lateness = self.max_lateness.max(lateness);
self.late_buffer_count_delta += 1;
self.lateness_delta += lateness;
self.max_lateness_delta = self.max_lateness_delta.max(lateness);
}
fn log_delta(&mut self) {
let delta_duration = match self.last_delta_instant {
Some(last_delta) => last_delta.elapsed(),
None => return,
@ -269,94 +304,121 @@ impl Stats {
self.last_delta_instant = Some(Instant::now());
gst::info!(CAT, "Delta stats:");
gst::info!(
CAT,
"o {:>5.2} buffers / s",
self.buffer_count_delta as f32 / delta_duration.as_millis() as f32 * 1_000f32,
);
if self.buffer_count_delta > 1.0 {
gst::info!(CAT, "Delta stats:");
if self.sync && self.buffer_count_delta > 0 {
let early_buffers_count = self
.buffer_count_delta
.saturating_sub(self.late_buffer_count_delta);
if early_buffers_count > 0 {
gst::info!(
CAT,
"o {:>5.2?} headroom / early buffers",
self.buffer_headroom_delta / early_buffers_count,
);
let interval_mean = self.interval_sum_delta / self.buffer_count_delta;
let interval_std_dev = f32::sqrt(
self.interval_square_sum_delta / self.buffer_count_delta - interval_mean.powi(2),
);
gst::info!(
gst::info!(
CAT,
"o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
Duration::from_nanos(interval_mean as u64),
Duration::from_nanos(interval_std_dev as u64),
self.interval_min_delta,
self.interval_max_delta,
);
if self.interval_late_count_delta > 1.0 {
gst::warning!(
CAT,
"o {:>5.2}% late buffers - mean {:>5.2?}, max {:>5.2?}",
self.late_buffer_count_delta as f32 / self.buffer_count_delta as f32 * 100f32,
self.lateness_delta
.checked_div(self.late_buffer_count_delta)
.unwrap_or(Duration::ZERO),
self.max_lateness_delta,
"o {:5.2}% late buffers",
100f32 * self.interval_late_count_delta / self.buffer_count_delta
);
}
self.buffer_headroom_delta = Duration::ZERO;
self.late_buffer_count_delta = 0;
self.lateness_delta = Duration::ZERO;
self.max_lateness_delta = Duration::ZERO;
self.interval_sum_delta = 0.0;
self.interval_square_sum_delta = 0.0;
self.interval_min_delta = Duration::MAX;
self.interval_max_delta = Duration::ZERO;
self.interval_late_count_delta = 0.0;
let latency_mean = self.latency_sum_delta / self.buffer_count_delta;
let latency_std_dev = f32::sqrt(
self.latency_square_sum_delta / self.buffer_count_delta - latency_mean.powi(2),
);
gst::info!(
CAT,
"o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
Duration::from_nanos(latency_mean as u64),
Duration::from_nanos(latency_std_dev as u64),
self.latency_min_delta,
self.latency_max_delta,
);
self.latency_sum_delta = 0.0;
self.latency_square_sum_delta = 0.0;
self.latency_min_delta = Duration::MAX;
self.latency_max_delta = Duration::ZERO;
}
self.buffer_count_delta = 0;
self.buffer_count_delta = 0.0;
}
fn log_global(&mut self) {
let log_duration = match self.log_start_instant {
Some(start) => start.elapsed(),
None => return,
};
if self.log_start_instant.is_none() {
return;
}
gst::info!(CAT, "Global stats:");
gst::info!(
CAT,
"o {:>5.2} buffers / s",
self.buffer_count as f32 / log_duration.as_millis() as f32 * 1_000f32,
);
if self.buffer_count > 1.0 {
gst::info!(CAT, "Global stats:");
if self.sync && self.buffer_count > 0 {
let early_buffers_count = self.buffer_count.saturating_sub(self.late_buffer_count);
if early_buffers_count > 0 {
gst::info!(
let interval_mean = self.interval_sum / self.buffer_count;
let interval_std_dev =
f32::sqrt(self.interval_square_sum / self.buffer_count - interval_mean.powi(2));
gst::info!(
CAT,
"o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
Duration::from_nanos(interval_mean as u64),
Duration::from_nanos(interval_std_dev as u64),
self.interval_min,
self.interval_max,
);
if self.interval_late_count > f32::EPSILON {
gst::warning!(
CAT,
"o {:>5.2?} headroom / early buffers",
self.buffer_headroom / early_buffers_count,
);
gst::info!(
CAT,
"o {:>5.2}% late buffers - mean {:>5.2?}, max {:>5.2?}",
self.late_buffer_count as f32 / self.buffer_count as f32 * 100f32,
self.lateness
.checked_div(self.late_buffer_count)
.unwrap_or(Duration::ZERO),
self.max_lateness,
"o {:5.2}% late buffers",
100f32 * self.interval_late_count / self.buffer_count
);
}
let latency_mean = self.latency_sum / self.buffer_count;
let latency_std_dev =
f32::sqrt(self.latency_square_sum / self.buffer_count - latency_mean.powi(2));
gst::info!(
CAT,
"o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
Duration::from_nanos(latency_mean as u64),
Duration::from_nanos(latency_std_dev as u64),
self.latency_min,
self.latency_max,
);
}
}
}
struct TestSinkTask {
element: super::TestSink,
item_receiver: Peekable<flume::r#async::RecvStream<'static, TaskItem>>,
sync: bool,
raise_log_level: bool,
last_dts: Option<gst::ClockTime>,
item_receiver: Peekable<flume::r#async::RecvStream<'static, StreamItem>>,
stats: Stats,
segment: Option<gst::Segment>,
}
impl TestSinkTask {
fn new(element: &super::TestSink, item_receiver: flume::Receiver<TaskItem>) -> Self {
fn new(element: &super::TestSink, item_receiver: flume::Receiver<StreamItem>) -> Self {
TestSinkTask {
element: element.clone(),
raise_log_level: false,
last_dts: None,
item_receiver: item_receiver.into_stream().peekable(),
sync: DEFAULT_SYNC,
stats: Stats::default(),
segment: None,
}
@ -369,17 +431,23 @@ impl TestSinkTask {
}
impl TaskImpl for TestSinkTask {
type Item = TaskItem;
type Item = StreamItem;
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj: &self.element, "Preparing Task");
let sink = self.element.imp();
let settings = sink.settings.lock().unwrap();
self.sync = settings.sync;
self.stats.sync = self.sync;
self.stats.must_log = settings.must_log_stats;
self.raise_log_level = settings.raise_log_level;
if self.raise_log_level {
gst::log!(CAT, obj: &self.element, "Preparing Task");
} else {
gst::trace!(CAT, obj: &self.element, "Preparing Task");
}
self.stats.must_log = settings.logs_stats;
self.stats.max_buffers = settings.max_buffers.map(|max_buffers| max_buffers as f32);
self.stats.interval_late_warn = settings.push_period + settings.context_wait / 2;
Ok(())
}
@ -388,7 +456,13 @@ impl TaskImpl for TestSinkTask {
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async {
gst::log!(CAT, obj: &self.element, "Starting Task");
if self.raise_log_level {
gst::log!(CAT, obj: &self.element, "Starting Task");
} else {
gst::trace!(CAT, obj: &self.element, "Starting Task");
}
self.last_dts = None;
self.stats.start();
Ok(())
}
@ -397,81 +471,92 @@ impl TaskImpl for TestSinkTask {
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async {
gst::log!(CAT, obj: &self.element, "Stopping Task");
self.flush().await;
Ok(())
}
.boxed()
}
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async {
gst::log!(CAT, obj: &self.element, "Starting Task Flush");
self.flush().await;
Ok(())
}
.boxed()
}
fn try_next(&mut self) -> BoxFuture<'_, Result<TaskItem, gst::FlowError>> {
async move {
let item_opt = Pin::new(&mut self.item_receiver).peek().await;
// Check the peeked item in case we need to sync.
// The item will still be available in the channel
// in case this is cancelled by a state transition.
match item_opt {
Some(TaskItem::Buffer(buffer)) => {
self.stats.notify_buffer();
if self.sync {
let rtime = self.segment.as_ref().and_then(|segment| {
segment
.downcast_ref::<gst::format::Time>()
.and_then(|segment| segment.to_running_time(buffer.pts()))
});
if let Some(pts) = rtime {
// This can be cancelled by a state transition.
self.sync(pts).await;
}
}
}
Some(_) => (),
None => {
panic!("Internal channel sender dropped while Task is Started");
}
if self.raise_log_level {
gst::log!(CAT, obj: &self.element, "Stopping Task");
} else {
gst::trace!(CAT, obj: &self.element, "Stopping Task");
}
// An item was peeked above, we can now pop it without losing it.
Ok(self.item_receiver.next().await.unwrap())
self.flush().await;
Ok(())
}
.boxed()
}
fn handle_item(&mut self, item: TaskItem) -> BoxFuture<'_, Result<(), gst::FlowError>> {
fn try_next(&mut self) -> BoxFuture<'_, Result<StreamItem, gst::FlowError>> {
async move {
gst::debug!(CAT, obj: &self.element, "Handling {:?}", item);
let item = self.item_receiver.next().await.unwrap();
if self.raise_log_level {
gst::log!(CAT, obj: &self.element, "Popped item");
} else {
gst::trace!(CAT, obj: &self.element, "Popped item");
}
Ok(item)
}
.boxed()
}
fn handle_item(&mut self, item: StreamItem) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
if self.raise_log_level {
gst::debug!(CAT, obj: &self.element, "Received {:?}", item);
} else {
gst::trace!(CAT, obj: &self.element, "Received {:?}", item);
}
match item {
TaskItem::Buffer(buffer) => {
self.render(buffer).await.map_err(|err| {
element_error!(
&self.element,
gst::StreamError::Failed,
["Failed to render item, stopping task: {}", err]
);
gst::FlowError::Error
})?;
StreamItem::Buffer(buffer) => {
let dts = self
.segment
.as_ref()
.and_then(|segment| {
segment
.downcast_ref::<gst::format::Time>()
.and_then(|segment| segment.to_running_time(buffer.dts()))
})
.unwrap();
self.stats.log_delta();
if let Some(last_dts) = self.last_dts {
let cur_ts = self.element.current_running_time().unwrap();
let latency: Duration = (cur_ts - dts).into();
let interval: Duration = (dts - last_dts).into();
self.stats.add_buffer(latency, interval);
if self.raise_log_level {
gst::debug!(CAT, obj: &self.element, "o latency {:.2?}", latency);
gst::debug!(CAT, obj: &self.element, "o interval {:.2?}", interval);
} else {
gst::trace!(CAT, obj: &self.element, "o latency {:.2?}", latency);
gst::trace!(CAT, obj: &self.element, "o interval {:.2?}", interval);
}
}
self.last_dts = Some(dts);
if self.raise_log_level {
gst::log!(CAT, obj: &self.element, "Buffer processed");
} else {
gst::trace!(CAT, obj: &self.element, "Buffer processed");
}
}
TaskItem::Event(event) => match event.view() {
StreamItem::Event(event) => match event.view() {
EventView::Eos(_) => {
self.stats.log_global();
if self.raise_log_level {
gst::debug!(CAT, obj: &self.element, "EOS");
} else {
gst::trace!(CAT, obj: &self.element, "EOS");
}
let _ = self
.element
.post_message(gst::message::Eos::builder().src(&self.element).build());
let elem = self.element.clone();
self.element.call_async(move |_| {
let _ =
elem.post_message(gst::message::Eos::builder().src(&elem).build());
});
return Err(gst::FlowError::Eos);
}
EventView::Segment(e) => {
self.segment = Some(e.segment().clone());
@ -489,54 +574,27 @@ impl TaskImpl for TestSinkTask {
}
}
impl TestSinkTask {
async fn render(&mut self, buffer: gst::Buffer) -> Result<(), gst::FlowError> {
let _data = buffer.map_readable().map_err(|_| {
element_error!(
self.element,
gst::StreamError::Format,
["Failed to map buffer readable"]
);
gst::FlowError::Error
})?;
gst::log!(CAT, obj: &self.element, "buffer {:?} rendered", buffer);
Ok(())
}
/// Waits until specified time.
async fn sync(&mut self, pts: gst::ClockTime) {
let now = self.element.current_running_time();
if let Ok(Some(delay)) = pts.opt_checked_sub(now) {
let delay = delay.into();
gst::trace!(CAT, obj: &self.element, "sync: waiting {:?}", delay);
runtime::time::delay_for(delay).await;
self.stats.notify_buffer_headroom(delay);
} else {
self.stats.notify_late_buffer(now, pts);
}
}
}
#[derive(Debug)]
pub struct TestSink {
sink_pad: PadSink,
task: Task,
item_sender: Mutex<Option<flume::Sender<TaskItem>>>,
item_sender: Mutex<Option<flume::Sender<StreamItem>>>,
settings: Mutex<Settings>,
}
impl TestSink {
#[track_caller]
fn clone_item_sender(&self) -> flume::Sender<TaskItem> {
fn clone_item_sender(&self) -> flume::Sender<StreamItem> {
self.item_sender.lock().unwrap().as_ref().unwrap().clone()
}
fn prepare(&self, element: &super::TestSink) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, obj: element, "Preparing");
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
gst::debug!(CAT, obj: element, "Preparing");
} else {
gst::trace!(CAT, obj: element, "Preparing");
}
let context = {
let settings = self.settings.lock().unwrap();
@ -556,28 +614,67 @@ impl TestSink {
*self.item_sender.lock().unwrap() = Some(item_sender);
gst::debug!(CAT, obj: element, "Started preparation");
if raise_log_level {
gst::debug!(CAT, obj: element, "Prepared");
} else {
gst::trace!(CAT, obj: element, "Prepared");
}
Ok(())
}
fn unprepare(&self, element: &super::TestSink) {
gst::debug!(CAT, obj: element, "Unpreparing");
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
gst::debug!(CAT, obj: element, "Unpreparing");
} else {
gst::trace!(CAT, obj: element, "Unpreparing");
}
self.task.unprepare().block_on().unwrap();
gst::debug!(CAT, obj: element, "Unprepared");
if raise_log_level {
gst::debug!(CAT, obj: element, "Unprepared");
} else {
gst::trace!(CAT, obj: element, "Unprepared");
}
}
fn stop(&self, element: &super::TestSink) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, obj: element, "Stopping");
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
gst::debug!(CAT, obj: element, "Stopping");
} else {
gst::trace!(CAT, obj: element, "Stopping");
}
self.task.stop().block_on()?;
gst::debug!(CAT, obj: element, "Stopped");
if raise_log_level {
gst::debug!(CAT, obj: element, "Stopped");
} else {
gst::trace!(CAT, obj: element, "Stopped");
}
Ok(())
}
fn start(&self, element: &super::TestSink) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, obj: element, "Starting");
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
gst::debug!(CAT, obj: element, "Starting");
} else {
gst::trace!(CAT, obj: element, "Starting");
}
self.task.start().block_on()?;
gst::debug!(CAT, obj: element, "Started");
if raise_log_level {
gst::debug!(CAT, obj: element, "Started");
} else {
gst::trace!(CAT, obj: element, "Started");
}
Ok(())
}
}
@ -616,17 +713,27 @@ impl ObjectImpl for TestSink {
.maximum(1000)
.default_value(DEFAULT_CONTEXT_WAIT.as_millis() as u32)
.build(),
glib::ParamSpecBoolean::builder("sync")
.nick("Sync")
.blurb("Sync on the clock")
.default_value(DEFAULT_SYNC)
.build(),
glib::ParamSpecBoolean::builder("must-log-stats")
.nick("Must Log Stats")
.blurb("Whether statistics should be logged")
.default_value(DEFAULT_MUST_LOG_STATS)
glib::ParamSpecBoolean::builder("raise-log-level")
.nick("Raise log level")
.blurb("Raises the log level so that this element stands out")
.write_only()
.build(),
glib::ParamSpecBoolean::builder("logs-stats")
.nick("Logs Stats")
.blurb("Whether statistics should be logged")
.write_only()
.build(),
glib::ParamSpecUInt::builder("push-period")
.nick("Src buffer Push Period")
.blurb("Push period used by `src` element (used for stats warnings)")
.default_value(DEFAULT_PUSH_PERIOD.as_millis() as u32)
.build(),
glib::ParamSpecInt::builder("max-buffers")
.nick("Max Buffers")
.blurb("Number of buffers to count before stopping stats (-1 = unlimited)")
.minimum(-1i32)
.default_value(DEFAULT_MAX_BUFFERS)
.build(),
]
});
@ -642,10 +749,6 @@ impl ObjectImpl for TestSink {
) {
let mut settings = self.settings.lock().unwrap();
match pspec.name() {
"sync" => {
let sync = value.get().expect("type checked upstream");
settings.sync = sync;
}
"context" => {
settings.context = value
.get::<Option<String>>()
@ -657,9 +760,21 @@ impl ObjectImpl for TestSink {
value.get::<u32>().expect("type checked upstream").into(),
);
}
"must-log-stats" => {
let must_log_stats = value.get().expect("type checked upstream");
settings.must_log_stats = must_log_stats;
"raise-log-level" => {
settings.raise_log_level = value.get::<bool>().expect("type checked upstream");
}
"logs-stats" => {
let logs_stats = value.get().expect("type checked upstream");
settings.logs_stats = logs_stats;
}
"push-period" => {
settings.push_period = Duration::from_millis(
value.get::<u32>().expect("type checked upstream").into(),
);
}
"max-buffers" => {
let value = value.get::<i32>().expect("type checked upstream");
settings.max_buffers = if value > 0 { Some(value as u32) } else { None };
}
_ => unimplemented!(),
}
@ -668,9 +783,15 @@ impl ObjectImpl for TestSink {
fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"sync" => settings.sync.to_value(),
"context" => settings.context.to_value(),
"context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
"raise-log-level" => settings.raise_log_level.to_value(),
"push-period" => (settings.push_period.as_millis() as u32).to_value(),
"max-buffers" => settings
.max_buffers
.and_then(|val| val.try_into().ok())
.unwrap_or(-1i32)
.to_value(),
_ => unimplemented!(),
}
}

View file

@ -16,7 +16,7 @@ use gst::subclass::prelude::*;
use once_cell::sync::Lazy;
use std::sync::Mutex;
use std::time::{Duration, Instant};
use std::time::Duration;
use gstthreadshare::runtime::prelude::*;
use gstthreadshare::runtime::{Context, PadSrc, Task, Timer};
@ -29,17 +29,18 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
)
});
const BUFFER_DURATION: gst::ClockTime = gst::ClockTime::from_mseconds(20);
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_millis(20);
const DEFAULT_NUM_BUFFERS: i32 = 50 * 60 * 2;
const DEFAULT_PUSH_PERIOD: gst::ClockTime = gst::ClockTime::from_mseconds(20);
const DEFAULT_NUM_BUFFERS: i32 = 50 * 100;
#[derive(Debug, Clone)]
struct Settings {
context: String,
context_wait: Duration,
num_buffers: Option<i32>,
push_period: gst::ClockTime,
raise_log_level: bool,
num_buffers: Option<u32>,
}
impl Default for Settings {
@ -47,7 +48,9 @@ impl Default for Settings {
Settings {
context: DEFAULT_CONTEXT.into(),
context_wait: DEFAULT_CONTEXT_WAIT,
num_buffers: Some(DEFAULT_NUM_BUFFERS),
push_period: DEFAULT_PUSH_PERIOD,
raise_log_level: false,
num_buffers: Some(DEFAULT_NUM_BUFFERS as u32),
}
}
}
@ -62,13 +65,13 @@ impl PadSrcHandler for TestSrcPadHandler {
struct SrcTask {
element: super::TestSrc,
buffer_pool: gst::BufferPool,
last_pts: gst::ClockTime,
last_buf_instant: Option<Instant>,
push_period: Duration,
timer: Option<Timer>,
raise_log_level: bool,
push_period: gst::ClockTime,
need_initial_events: bool,
need_segment: bool,
num_buffers: Option<i32>,
buffer_count: i32,
num_buffers: Option<u32>,
buffer_count: u32,
}
impl SrcTask {
@ -83,12 +86,12 @@ impl SrcTask {
SrcTask {
element,
buffer_pool,
last_pts: gst::ClockTime::ZERO,
last_buf_instant: None,
push_period: Duration::ZERO,
timer: None,
raise_log_level: false,
push_period: gst::ClockTime::ZERO,
need_initial_events: true,
need_segment: true,
num_buffers: Some(DEFAULT_NUM_BUFFERS),
num_buffers: Some(DEFAULT_NUM_BUFFERS as u32),
buffer_count: 0,
}
}
@ -99,11 +102,17 @@ impl TaskImpl for SrcTask {
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj: &self.element, "Preparing Task");
let src = self.element.imp();
let settings = src.settings.lock().unwrap();
self.push_period = settings.context_wait;
self.raise_log_level = settings.raise_log_level;
if self.raise_log_level {
gst::log!(CAT, obj: &self.element, "Preparing Task");
} else {
gst::trace!(CAT, obj: &self.element, "Preparing Task");
}
self.push_period = settings.push_period;
self.num_buffers = settings.num_buffers;
Ok(())
@ -113,9 +122,18 @@ impl TaskImpl for SrcTask {
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async {
gst::log!(CAT, obj: &self.element, "Starting Task");
if self.raise_log_level {
gst::log!(CAT, obj: &self.element, "Starting Task");
} else {
gst::trace!(CAT, obj: &self.element, "Starting Task");
}
self.timer = Some(Timer::interval_delayed_by(
// Delay first buffer push so as to let others start.
Duration::from_secs(2),
self.push_period.into(),
));
self.buffer_count = 0;
self.last_buf_instant = None;
self.buffer_pool.set_active(true).unwrap();
Ok(())
}
@ -124,27 +142,17 @@ impl TaskImpl for SrcTask {
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj: &self.element, "Stopping task");
if self.raise_log_level {
gst::log!(CAT, obj: &self.element, "Stopping Task");
} else {
gst::trace!(CAT, obj: &self.element, "Stopping Task");
}
self.buffer_pool.set_active(false).unwrap();
self.last_pts = gst::ClockTime::ZERO;
self.timer = None;
self.need_initial_events = true;
self.need_segment = true;
gst::log!(CAT, obj: &self.element, "Task stopped");
Ok(())
}
.boxed()
}
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj: &self.element, "Starting task flush");
self.buffer_pool.set_active(false).unwrap();
self.need_segment = true;
gst::log!(CAT, obj: &self.element, "Task flush started");
Ok(())
}
.boxed()
@ -152,30 +160,34 @@ impl TaskImpl for SrcTask {
fn try_next(&mut self) -> BoxFuture<'_, Result<gst::Buffer, gst::FlowError>> {
async move {
if let Some(delay) = self
.last_buf_instant
.map(|last| last.elapsed())
.opt_checked_sub(self.push_period)
.ok()
.flatten()
{
Timer::after(delay).await;
if self.raise_log_level {
gst::log!(CAT, obj: &self.element, "Awaiting timer");
} else {
gst::trace!(CAT, obj: &self.element, "Awaiting timer");
}
self.last_buf_instant = Some(Instant::now());
self.timer.as_mut().unwrap().next().await;
let start = self.last_pts;
self.last_pts = start + BUFFER_DURATION;
if self.raise_log_level {
gst::log!(CAT, obj: &self.element, "Timer ticked");
} else {
gst::trace!(CAT, obj: &self.element, "Timer ticked");
}
self.buffer_pool.acquire_buffer(None).map(|mut buffer| {
{
let buffer = buffer.get_mut().unwrap();
buffer.set_pts(start);
buffer.set_duration(BUFFER_DURATION);
}
buffer
})
self.buffer_pool
.acquire_buffer(None)
.map(|mut buffer| {
{
let buffer = buffer.get_mut().unwrap();
let rtime = self.element.current_running_time().unwrap();
buffer.set_dts(rtime);
}
buffer
})
.map_err(|err| {
gst::error!(CAT, obj: &self.element, "Failed to acquire buffer {}", err);
err
})
}
.boxed()
}
@ -185,15 +197,29 @@ impl TaskImpl for SrcTask {
let res = self.push(buffer).await;
match res {
Ok(_) => {
gst::log!(CAT, obj: &self.element, "Successfully pushed buffer");
if self.raise_log_level {
gst::log!(CAT, obj: &self.element, "Successfully pushed buffer");
} else {
gst::trace!(CAT, obj: &self.element, "Successfully pushed buffer");
}
}
Err(gst::FlowError::Eos) => {
gst::debug!(CAT, obj: &self.element, "EOS");
if self.raise_log_level {
gst::debug!(CAT, obj: &self.element, "EOS");
} else {
gst::trace!(CAT, obj: &self.element, "EOS");
}
let test_src = self.element.imp();
test_src.src_pad.push_event(gst::event::Eos::new()).await;
return Err(gst::FlowError::Eos);
}
Err(gst::FlowError::Flushing) => {
gst::debug!(CAT, obj: &self.element, "Flushing");
if self.raise_log_level {
gst::debug!(CAT, obj: &self.element, "Flushing");
} else {
gst::trace!(CAT, obj: &self.element, "Flushing");
}
}
Err(err) => {
gst::error!(CAT, obj: &self.element, "Got error {}", err);
@ -214,11 +240,20 @@ impl TaskImpl for SrcTask {
impl SrcTask {
async fn push(&mut self, buffer: gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::log!(CAT, obj: &self.element, "Handling {:?}", buffer);
if self.raise_log_level {
gst::debug!(CAT, obj: &self.element, "Pushing {:?}", buffer);
} else {
gst::trace!(CAT, obj: &self.element, "Pushing {:?}", buffer);
}
let test_src = self.element.imp();
if self.need_initial_events {
gst::debug!(CAT, obj: &self.element, "Pushing initial events");
if self.raise_log_level {
gst::debug!(CAT, obj: &self.element, "Pushing initial events");
} else {
gst::trace!(CAT, obj: &self.element, "Pushing initial events");
}
let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
let stream_start_evt = gst::event::StreamStart::builder(&stream_id)
@ -244,12 +279,27 @@ impl SrcTask {
self.need_segment = false;
}
gst::debug!(CAT, obj: &self.element, "Forwarding {:?}", buffer);
if self.raise_log_level {
gst::debug!(CAT, obj: &self.element, "Forwarding buffer");
} else {
gst::trace!(CAT, obj: &self.element, "Forwarding buffer");
}
let ok = test_src.src_pad.push(buffer).await?;
self.buffer_count += 1;
if self.num_buffers.opt_eq(self.buffer_count).unwrap_or(false) {
if self.raise_log_level {
gst::debug!(CAT, obj: &self.element, "Pushing EOS");
} else {
gst::trace!(CAT, obj: &self.element, "Pushing EOS");
}
let test_src = self.element.imp();
if !test_src.src_pad.push_event(gst::event::Eos::new()).await {
gst::error!(CAT, obj: &self.element, "Error pushing EOS");
}
return Err(gst::FlowError::Eos);
}
@ -266,7 +316,12 @@ pub struct TestSrc {
impl TestSrc {
fn prepare(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, obj: element, "Preparing");
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
gst::debug!(CAT, obj: element, "Preparing");
} else {
gst::trace!(CAT, obj: element, "Preparing");
}
let settings = self.settings.lock().unwrap();
let context =
@ -282,35 +337,86 @@ impl TestSrc {
.prepare(SrcTask::new(element.clone()), context)
.block_on()?;
gst::debug!(CAT, obj: element, "Prepared");
if raise_log_level {
gst::debug!(CAT, obj: element, "Prepared");
} else {
gst::trace!(CAT, obj: element, "Prepared");
}
Ok(())
}
fn unprepare(&self, element: &super::TestSrc) {
gst::debug!(CAT, obj: element, "Unpreparing");
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
gst::debug!(CAT, obj: element, "Unpreparing");
} else {
gst::trace!(CAT, obj: element, "Unpreparing");
}
self.task.unprepare().block_on().unwrap();
gst::debug!(CAT, obj: element, "Unprepared");
if raise_log_level {
gst::debug!(CAT, obj: element, "Unprepared");
} else {
gst::trace!(CAT, obj: element, "Unprepared");
}
}
fn stop(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, obj: element, "Stopping");
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
gst::debug!(CAT, obj: element, "Stopping");
} else {
gst::trace!(CAT, obj: element, "Stopping");
}
self.task.stop().block_on()?;
gst::debug!(CAT, obj: element, "Stopped");
if raise_log_level {
gst::debug!(CAT, obj: element, "Stopped");
} else {
gst::trace!(CAT, obj: element, "Stopped");
}
Ok(())
}
fn start(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, obj: element, "Starting");
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
gst::debug!(CAT, obj: element, "Starting");
} else {
gst::trace!(CAT, obj: element, "Starting");
}
self.task.start().block_on()?;
gst::debug!(CAT, obj: element, "Started");
if raise_log_level {
gst::debug!(CAT, obj: element, "Started");
} else {
gst::trace!(CAT, obj: element, "Started");
}
Ok(())
}
fn pause(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, obj: element, "Pausing");
let raise_log_level = self.settings.lock().unwrap().raise_log_level;
if raise_log_level {
gst::debug!(CAT, obj: element, "Pausing");
} else {
gst::trace!(CAT, obj: element, "Pausing");
}
self.task.pause().block_on()?;
gst::debug!(CAT, obj: element, "Paused");
if raise_log_level {
gst::debug!(CAT, obj: element, "Paused");
} else {
gst::trace!(CAT, obj: element, "Paused");
}
Ok(())
}
}
@ -348,6 +454,16 @@ impl ObjectImpl for TestSrc {
.maximum(1000)
.default_value(DEFAULT_CONTEXT_WAIT.as_millis() as u32)
.build(),
glib::ParamSpecUInt::builder("push-period")
.nick("Buffer Push Period")
.blurb("Push a new buffer every this many ms")
.default_value(DEFAULT_PUSH_PERIOD.mseconds() as u32)
.build(),
glib::ParamSpecBoolean::builder("raise-log-level")
.nick("Raise log level")
.blurb("Raises the log level so that this element stands out")
.write_only()
.build(),
glib::ParamSpecInt::builder("num-buffers")
.nick("Num Buffers")
.blurb("Number of buffers to output before sending EOS (-1 = unlimited)")
@ -380,9 +496,17 @@ impl ObjectImpl for TestSrc {
value.get::<u32>().expect("type checked upstream").into(),
);
}
"push-period" => {
settings.push_period = gst::ClockTime::from_mseconds(
value.get::<u32>().expect("type checked upstream").into(),
);
}
"raise-log-level" => {
settings.raise_log_level = value.get::<bool>().expect("type checked upstream");
}
"num-buffers" => {
let value = value.get::<i32>().expect("type checked upstream");
settings.num_buffers = if value > 0 { Some(value) } else { None };
settings.num_buffers = if value > 0 { Some(value as u32) } else { None };
}
_ => unimplemented!(),
}
@ -393,7 +517,13 @@ impl ObjectImpl for TestSrc {
match pspec.name() {
"context" => settings.context.to_value(),
"context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
"num-buffers" => settings.num_buffers.unwrap_or(-1).to_value(),
"push-period" => (settings.push_period.mseconds() as u32).to_value(),
"raise-log-level" => settings.raise_log_level.to_value(),
"num-buffers" => settings
.num_buffers
.and_then(|val| val.try_into().ok())
.unwrap_or(-1i32)
.to_value(),
_ => unimplemented!(),
}
}

View file

@ -83,6 +83,16 @@ impl Timer {
Timer::interval_at(Instant::now() + period, period)
}
/// Creates a timer that emits events periodically, starting after `delay`.
///
/// When throttling is activated (i.e. when using a non-`0` `wait`
/// duration in `Context::acquire`), timer entries are assigned to
/// the nearest time frame, meaning that the delay might elapse
/// `wait` / 2 ms earlier or later than the expected instant.
pub fn interval_delayed_by(delay: Duration, period: Duration) -> Timer {
Timer::interval_at(Instant::now() + delay, period)
}
/// Creates a timer that emits events periodically, starting at `start`.
///
/// When throttling is activated (i.e. when using a non-`0` `wait`