threadshare: benchmark: add throughput tracking

This commit is contained in:
François Laignel 2020-04-25 23:18:20 +02:00
parent 36f032ef15
commit dae38eb0a3

View file

@ -19,6 +19,12 @@ use glib::prelude::*;
use gst::prelude::*;
use std::env;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
const THROUGHPUT_PERIOD: Duration = Duration::from_secs(20);
fn main() {
gst::init().unwrap();
@ -55,6 +61,7 @@ fn main() {
let l = glib::MainLoop::new(None, false);
let pipeline = gst::Pipeline::new(None);
let counter = Arc::new(AtomicU64::new(0));
for i in 0..n_streams {
let sink =
@ -62,6 +69,15 @@ fn main() {
sink.set_property("sync", &false).unwrap();
sink.set_property("async", &false).unwrap();
let counter_clone = Arc::clone(&counter);
sink.get_static_pad("sink").unwrap().add_probe(
gst::PadProbeType::BUFFER,
move |_pad, _probe_info| {
let _ = counter_clone.fetch_add(1, Ordering::SeqCst);
gst::PadProbeReturn::Ok
},
);
let source = match source.as_str() {
"udpsrc" => {
let source =
@ -90,12 +106,24 @@ fn main() {
source
}
"tcpclientsrc" => {
let source = gst::ElementFactory::make(
"tcpclientsrc",
Some(format!("source-{}", i).as_str()),
)
.unwrap();
source.set_property("host", &"127.0.0.1").unwrap();
source.set_property("port", &(40000i32)).unwrap();
source
}
"ts-tcpclientsrc" => {
let source = gst::ElementFactory::make(
"ts-tcpclientsrc",
Some(format!("source-{}", i).as_str()),
)
.unwrap();
source.set_property("address", &"127.0.0.1").unwrap();
source.set_property("port", &(40000u32)).unwrap();
source
.set_property("context", &format!("context-{}", (i as u32) % n_groups))
@ -166,5 +194,31 @@ fn main() {
println!("started");
thread::spawn(move || {
let throughput_factor = 1_000f32 / (n_streams as f32);
let mut prev_reset_instant: Option<Instant> = None;
let mut count;
let mut reset_instant;
loop {
count = counter.fetch_and(0, Ordering::SeqCst);
reset_instant = Instant::now();
if let Some(prev_reset_instant) = prev_reset_instant {
println!(
"{:>5.1} / s / stream",
(count as f32) * throughput_factor
/ ((reset_instant - prev_reset_instant).as_millis() as f32)
);
}
if let Some(sleep_duration) = THROUGHPUT_PERIOD.checked_sub(reset_instant.elapsed()) {
thread::sleep(sleep_duration);
}
prev_reset_instant = Some(reset_instant);
}
});
l.run();
}