tracers: Add a pad push durations tracer

This tracer measures the time it takes for a buffer/buffer list push to return.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1506>
This commit is contained in:
Sebastian Dröge 2024-03-20 12:23:54 +02:00
parent b3d3895ae7
commit 66030f36ad
5 changed files with 449 additions and 0 deletions

View file

@ -7808,6 +7808,7 @@
"source": "gst-plugin-tracers",
"tracers": {
"buffer-lateness": {},
"pad-push-timings": {},
"pipeline-snapshot": {},
"queue-levels": {}
},

View file

@ -0,0 +1,77 @@
import argparse
import csv
import re
import statistics
import matplotlib
import matplotlib.pyplot as plt
parser = argparse.ArgumentParser()
parser.add_argument("file", help="Input file with pad push timings")
parser.add_argument("--include-filter", help="Regular expression for element:pad names that should be included")
parser.add_argument("--exclude-filter", help="Regular expression for element:pad names that should be excluded")
args = parser.parse_args()
include_filter = None
if args.include_filter is not None:
include_filter = re.compile(args.include_filter)
exclude_filter = None
if args.exclude_filter is not None:
exclude_filter = re.compile(args.exclude_filter)
pads = {}
with open(args.file, mode='r', encoding='utf_8', newline='') as csvfile:
reader = csv.reader(csvfile, delimiter=',', quotechar='|')
for row in reader:
if len(row) != 4:
continue
if include_filter is not None and not include_filter.match(row[1]):
continue
if exclude_filter is not None and exclude_filter.match(row[1]):
continue
if not row[1] in pads:
pads[row[1]] = {
'push-duration': [],
}
push_duration = float(row[3]) / 1000000.0
wallclock = float(row[0]) / 1000000000.0
pads[row[1]]['push-duration'].append((wallclock, push_duration))
matplotlib.rcParams['figure.dpi'] = 200
prop_cycle = plt.rcParams['axes.prop_cycle']
colors = prop_cycle.by_key()['color']
fig, ax1 = plt.subplots()
ax1.set_xlabel("wallclock (s)")
ax1.set_ylabel("push duration (ms)")
ax1.tick_params(axis='y')
for (i, (pad, values)) in enumerate(pads.items()):
# cycle colors
i = i % len(colors)
push_durations = [x[1] for x in values['push-duration']]
ax1.plot(
[x[0] for x in values['push-duration']],
push_durations,
'.', label = pad,
color = colors[i],
)
print("{} push durations: min: {}ms max: {}ms mean: {}ms".format(
pad,
min(push_durations),
max(push_durations),
statistics.mean(push_durations)))
fig.tight_layout()
plt.legend(loc='best')
plt.show()

View file

@ -15,6 +15,7 @@
use gst::glib;
mod buffer_lateness;
mod pad_push_timings;
#[cfg(unix)]
mod pipeline_snapshot;
mod queue_levels;
@ -24,6 +25,7 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
pipeline_snapshot::register(plugin)?;
queue_levels::register(plugin)?;
buffer_lateness::register(plugin)?;
pad_push_timings::register(plugin)?;
Ok(())
}

View file

@ -0,0 +1,345 @@
// Copyright (C) 2024 Sebastian Dröge <sebastian@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
/**
* tracer-pad-push-timings:
*
* This tracer measures how long it takes to push a buffer or buffer list out of a pad.
*
* Example:
*
* ```console
* $ GST_TRACERS='pad-push-timings(file="/tmp/pad_push_timings.log")' gst-launch-1.0 audiotestsrc ! queue ! fakesink
* ```
*
* The generated file is a CSV file of the format
*
* ```csv
* timestamp,element:pad name,pad pointer,push duration
* ```
*
* ## Parameters
*
* ### `file`
*
* Specifies the path to the file that will collect the CSV file with the push timings.
*
* By default the file is written to `/tmp/pad_push_timings.log`.
*
* ### `include-filter`
*
* Specifies a regular expression for the `element:pad` names that should be included.
*
* By default this is not set.
*
* ### `exclude-filter`
*
* Specifies a regular expression for the `element:pad` names that should **not** be included.
*
* By default this is not set.
*/
use std::collections::HashMap;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use once_cell::sync::Lazy;
use regex::Regex;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"pad-push-timings",
gst::DebugColorFlags::empty(),
Some("Tracer to pad push timings"),
)
});
#[derive(Debug)]
struct Settings {
file: PathBuf,
include_filter: Option<Regex>,
exclude_filter: Option<Regex>,
}
impl Default for Settings {
fn default() -> Self {
let mut file = glib::tmp_dir();
file.push("pad_push_timings.log");
Self {
file,
include_filter: None,
exclude_filter: None,
}
}
}
impl Settings {
fn update_from_params(&mut self, imp: &PadPushTimings, params: String) {
let s = match gst::Structure::from_str(&format!("pad-push-timings,{params}")) {
Ok(s) => s,
Err(err) => {
gst::warning!(CAT, imp: imp, "failed to parse tracer parameters: {}", err);
return;
}
};
if let Ok(file) = s.get::<&str>("file") {
gst::log!(CAT, imp: imp, "file= {}", file);
self.file = PathBuf::from(file);
}
if let Ok(filter) = s.get::<&str>("include-filter") {
gst::log!(CAT, imp: imp, "include filter= {}", filter);
let filter = match Regex::new(filter) {
Ok(filter) => Some(filter),
Err(err) => {
gst::error!(
CAT,
imp: imp,
"Failed to compile include-filter regex: {}",
err
);
None
}
};
self.include_filter = filter;
}
if let Ok(filter) = s.get::<&str>("exclude-filter") {
gst::log!(CAT, imp: imp, "exclude filter= {}", filter);
let filter = match Regex::new(filter) {
Ok(filter) => Some(filter),
Err(err) => {
gst::error!(
CAT,
imp: imp,
"Failed to compile exclude-filter regex: {}",
err
);
None
}
};
self.exclude_filter = filter;
}
}
}
#[derive(Default)]
struct State {
pads: HashMap<usize, Pad>,
log: Vec<LogLine>,
settings: Settings,
}
struct Pad {
parent_name: Option<Arc<glib::GString>>,
pad_name: Arc<glib::GString>,
pending_push_start: Option<u64>,
include: bool,
}
struct LogLine {
timestamp: u64,
parent_name: Option<Arc<glib::GString>>,
pad_name: Arc<glib::GString>,
ptr: usize,
push_duration: u64,
}
#[derive(Default)]
pub struct PadPushTimings {
state: Mutex<State>,
}
#[glib::object_subclass]
impl ObjectSubclass for PadPushTimings {
const NAME: &'static str = "GstPadPushTimings";
type Type = super::PadPushTimings;
type ParentType = gst::Tracer;
}
impl ObjectImpl for PadPushTimings {
fn constructed(&self) {
self.parent_constructed();
if let Some(params) = self.obj().property::<Option<String>>("params") {
let mut state = self.state.lock().unwrap();
state.settings.update_from_params(self, params);
}
self.register_hook(TracerHook::PadPushPre);
self.register_hook(TracerHook::PadPushListPre);
self.register_hook(TracerHook::PadPushPost);
self.register_hook(TracerHook::PadPushListPost);
self.register_hook(TracerHook::ObjectDestroyed);
}
fn dispose(&self) {
use std::io::prelude::*;
let state = self.state.lock().unwrap();
let mut file = match std::fs::File::create(&state.settings.file) {
Ok(file) => file,
Err(err) => {
gst::error!(CAT, imp: self, "Failed to create file: {err}");
return;
}
};
gst::debug!(
CAT,
imp: self,
"Writing file {}",
state.settings.file.display()
);
for LogLine {
timestamp,
parent_name,
pad_name,
ptr,
push_duration,
} in &state.log
{
let res = if let Some(parent_name) = parent_name {
writeln!(
&mut file,
"{timestamp},{parent_name}:{pad_name},0x{ptr:08x},{push_duration}"
)
} else {
writeln!(
&mut file,
"{timestamp},:{pad_name},0x{ptr:08x},{push_duration}"
)
};
if let Err(err) = res {
gst::error!(CAT, imp: self, "Failed to write to file: {err}");
return;
}
}
}
}
impl GstObjectImpl for PadPushTimings {}
impl TracerImpl for PadPushTimings {
fn pad_push_pre(&self, ts: u64, pad: &gst::Pad, _buffer: &gst::Buffer) {
self.push_pre(ts, pad);
}
fn pad_push_list_pre(&self, ts: u64, pad: &gst::Pad, _list: &gst::BufferList) {
self.push_pre(ts, pad);
}
fn pad_push_post(
&self,
ts: u64,
pad: &gst::Pad,
_result: Result<gst::FlowSuccess, gst::FlowError>,
) {
self.push_post(ts, pad);
}
fn pad_push_list_post(
&self,
ts: u64,
pad: &gst::Pad,
_result: Result<gst::FlowSuccess, gst::FlowError>,
) {
self.push_post(ts, pad);
}
fn object_destroyed(&self, _ts: u64, object: std::ptr::NonNull<gst::ffi::GstObject>) {
let ptr = object.as_ptr() as usize;
let mut state = self.state.lock().unwrap();
state.pads.remove(&ptr);
}
}
impl PadPushTimings {
fn push_pre(&self, ts: u64, pad: &gst::Pad) {
let ptr = pad.as_ptr() as usize;
let mut state = self.state.lock().unwrap();
let State {
ref mut pads,
ref settings,
..
} = &mut *state;
let pad = pads.entry(ptr).or_insert_with(|| {
let parent_name = pad.parent().map(|p| p.name());
let pad_name = pad.name();
let mut include = true;
let name = if let Some(ref parent_name) = parent_name {
format!("{}:{}", parent_name, pad_name)
} else {
format!(":{}", pad_name)
};
if let Some(ref filter) = settings.include_filter {
if !filter.is_match(&name) {
include = false;
}
}
if let Some(ref filter) = settings.exclude_filter {
if filter.is_match(&name) {
include = false;
}
}
Pad {
parent_name: parent_name.map(Arc::new),
pad_name: Arc::new(pad_name),
pending_push_start: None,
include,
}
});
if !pad.include {
return;
}
assert!(pad.pending_push_start.is_none());
pad.pending_push_start = Some(ts);
}
fn push_post(&self, ts: u64, pad: &gst::Pad) {
let ptr = pad.as_ptr() as usize;
let mut state = self.state.lock().unwrap();
let State {
ref mut pads,
ref mut log,
..
} = &mut *state;
let Some(pad) = pads.get_mut(&ptr) else {
return;
};
if !pad.include {
return;
}
let push_start = pad.pending_push_start.take().unwrap();
log.push(LogLine {
timestamp: push_start,
parent_name: pad.parent_name.clone(),
pad_name: pad.pad_name.clone(),
ptr,
push_duration: ts - push_start,
});
}
}

View file

@ -0,0 +1,24 @@
// Copyright (C) 2024 Sebastian Dröge <sebastian@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use gst::glib;
use gst::prelude::*;
mod imp;
glib::wrapper! {
pub struct PadPushTimings(ObjectSubclass<imp::PadPushTimings>) @extends gst::Tracer, gst::Object;
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Tracer::register(
Some(plugin),
"pad-push-timings",
PadPushTimings::static_type(),
)
}