clocksync: Add "QoS" support

When ClockSync synchronizes the data stream on the clock, it should also
push `QoS` events if the user wants to do it as, as stated in [the QoS
design doc] "Elements that synchronize buffers on the pipeline clock
will usually measure the current QoS".

The logic has been replicated from `GstBaseSink`.

[the QoS design doc]: https://gstreamer.freedesktop.org/documentation/plugin-development/advanced/qos.html

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/2225>
This commit is contained in:
Thibault Saunier 2022-04-18 16:33:40 +00:00 committed by GStreamer Marge Bot
parent dfc6545741
commit 35e2ecd48b
3 changed files with 289 additions and 2 deletions

View file

@ -80,6 +80,18 @@
}
},
"properties": {
"qos": {
"blurb": "Generate Quality-of-Service events upstream",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "false",
"mutable": "null",
"readable": true,
"type": "gboolean",
"writable": true
},
"sync": {
"blurb": "Synchronize to pipeline clock",
"conditionally-available": false,

View file

@ -57,6 +57,7 @@ GST_DEBUG_CATEGORY_STATIC (gst_clock_sync_debug);
#define DEFAULT_SYNC TRUE
#define DEFAULT_TS_OFFSET 0
#define DEFAULT_SYNC_TO_FIRST FALSE
#define DEFAULT_QOS FALSE
enum
{
@ -64,6 +65,7 @@ enum
PROP_SYNC,
PROP_TS_OFFSET,
PROP_SYNC_TO_FIRST,
PROP_QOS,
PROP_LAST
};
@ -89,6 +91,17 @@ G_DEFINE_TYPE_WITH_CODE (GstClockSync, gst_clock_sync, GST_TYPE_ELEMENT,
GST_ELEMENT_REGISTER_DEFINE (clocksync, "clocksync", GST_RANK_NONE,
GST_TYPE_CLOCKSYNC);
#define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
/* generic running average, this has a neutral window size */
#define UPDATE_RUNNING_AVG(avg,val) DO_RUNNING_AVG(avg,val,8)
/* the windows for these running averages are experimentally obtained.
* positive values get averaged more while negative values use a small
* window so we can react faster to badness. */
#define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
#define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
static void gst_clock_sync_finalize (GObject * object);
static void gst_clock_sync_set_property (GObject * object, guint prop_id,
@ -104,6 +117,8 @@ static GstFlowReturn gst_clock_sync_chain_list (GstPad * pad,
GstObject * parent, GstBufferList * buflist);
static gboolean gst_clock_sync_src_query (GstPad * pad, GstObject * parent,
GstQuery * query);
static gboolean gst_clock_sync_src_event (GstPad * pad, GstObject * parent,
GstEvent * event);
static GstStateChangeReturn gst_clocksync_change_state (GstElement * element,
GstStateChange transition);
static GstClock *gst_clocksync_provide_clock (GstElement * element);
@ -156,6 +171,19 @@ gst_clock_sync_class_init (GstClockSyncClass * klass)
"Note that mixed use of ts-offset and this property would be racy "
"if clocksync element is running already.",
DEFAULT_SYNC_TO_FIRST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
/**
* GstClockSync:qos:
*
* Generate Quality-of-Service events upstream.
*
* Since: 1.22
*/
properties[PROP_QOS] =
g_param_spec_boolean ("qos", "Qos",
"Generate Quality-of-Service events upstream", DEFAULT_QOS,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
g_object_class_install_properties (gobject_class, PROP_LAST, properties);
gstelement_class->change_state =
@ -200,11 +228,15 @@ gst_clock_sync_init (GstClockSync * clocksync)
gst_pad_set_query_function (clocksync->srcpad, gst_clock_sync_src_query);
GST_PAD_SET_PROXY_CAPS (clocksync->srcpad);
gst_pad_set_event_function (clocksync->srcpad,
GST_DEBUG_FUNCPTR (gst_clock_sync_src_event));
gst_element_add_pad (GST_ELEMENT (clocksync), clocksync->srcpad);
clocksync->ts_offset = DEFAULT_TS_OFFSET;
clocksync->sync = DEFAULT_SYNC;
clocksync->sync_to_first = DEFAULT_SYNC_TO_FIRST;
g_atomic_int_set (&clocksync->qos_enabled, DEFAULT_QOS);
g_cond_init (&clocksync->blocked_cond);
GST_OBJECT_FLAG_SET (clocksync, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
@ -248,6 +280,9 @@ gst_clock_sync_set_property (GObject * object, guint prop_id,
case PROP_SYNC_TO_FIRST:
clocksync->sync_to_first = g_value_get_boolean (value);
break;
case PROP_QOS:
g_atomic_int_set (&clocksync->qos_enabled, g_value_get_boolean (value));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -270,18 +305,168 @@ gst_clock_sync_get_property (GObject * object, guint prop_id,
case PROP_SYNC_TO_FIRST:
g_value_set_boolean (value, clocksync->sync_to_first);
break;
case PROP_QOS:
g_value_set_boolean (value, g_atomic_int_get (&clocksync->qos_enabled));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
/* With STREAM_LOCK
* reset all qos measuring */
static void
gst_clock_sync_reset_qos (GstClockSync * clocksync)
{
clocksync->prev_rstart = GST_CLOCK_TIME_NONE;
clocksync->earliest_in_time = GST_CLOCK_TIME_NONE;
clocksync->last_left = GST_CLOCK_TIME_NONE;
clocksync->avg_pt = GST_CLOCK_TIME_NONE;
clocksync->avg_rate = -1.0;
clocksync->avg_in_diff = GST_CLOCK_TIME_NONE;
}
static gboolean
gst_clock_sync_send_qos (GstClockSync * clocksync, GstQOSType type,
gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
{
GstEvent *event;
gboolean res;
/* generate Quality-of-Service event */
GST_DEBUG_OBJECT (clocksync,
"qos: type %d, proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
GST_TIME_FORMAT, type, proportion, diff, GST_TIME_ARGS (time));
event = gst_event_new_qos (type, proportion, diff, time);
/* send upstream */
res = gst_pad_push_event (clocksync->sinkpad, event);
return res;
}
static gboolean
gst_clock_sync_perform_qos (GstClockSync * clocksync)
{
GstClockTime start;
GstClockTimeDiff jitter;
GstClockTime pt, entered;
GstClockTime duration;
gdouble rate;
start = clocksync->current_rstart;
/* if Quality-of-Service disabled, do nothing */
if (!g_atomic_int_get (&clocksync->qos_enabled) ||
!GST_CLOCK_TIME_IS_VALID (start) || !clocksync->sync) {
return FALSE;
}
jitter = clocksync->current_jitter;
if (jitter < 0) {
/* this is the time the buffer entered the clocksync */
if (start < -jitter)
entered = 0;
else
entered = start + jitter;
} else {
/* this is the time the buffer entered the clocksync */
entered = start + jitter;
}
/* Average duration between each buffer timestamps */
duration = clocksync->avg_in_diff;
/* if we have the time when the last buffer left us, calculate
* processing time */
if (GST_CLOCK_TIME_IS_VALID (clocksync->last_left)) {
if (entered > clocksync->last_left) {
pt = entered - clocksync->last_left;
} else {
pt = 0;
}
} else {
pt = clocksync->avg_pt;
}
GST_DEBUG_OBJECT (clocksync, "start: %" GST_TIME_FORMAT
", entered %" GST_TIME_FORMAT ", pt: %" GST_TIME_FORMAT ", duration %"
GST_TIME_FORMAT ",jitter %" G_GINT64_FORMAT, GST_TIME_ARGS (start),
GST_TIME_ARGS (entered), GST_TIME_ARGS (pt), GST_TIME_ARGS (duration),
jitter);
GST_DEBUG_OBJECT (clocksync,
"avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
GST_TIME_ARGS (clocksync->avg_pt), clocksync->avg_rate);
/* collect running averages. for first observations, we copy the
* values */
if (!GST_CLOCK_TIME_IS_VALID (clocksync->avg_pt))
clocksync->avg_pt = pt;
else
clocksync->avg_pt = UPDATE_RUNNING_AVG (clocksync->avg_pt, pt);
if (duration != -1 && duration != 0) {
rate =
gst_guint64_to_gdouble (clocksync->avg_pt) /
gst_guint64_to_gdouble (duration);
} else {
rate = 1.0;
}
if (GST_CLOCK_TIME_IS_VALID (clocksync->last_left)) {
if (clocksync->avg_rate < 0.0) {
clocksync->avg_rate = rate;
} else {
if (rate > 1.0)
clocksync->avg_rate = UPDATE_RUNNING_AVG_N (clocksync->avg_rate, rate);
else
clocksync->avg_rate = UPDATE_RUNNING_AVG_P (clocksync->avg_rate, rate);
}
}
GST_DEBUG_OBJECT (clocksync,
"updated: avg_pt: %" GST_TIME_FORMAT
", avg_rate: %g", GST_TIME_ARGS (clocksync->avg_pt), clocksync->avg_rate);
if (clocksync->avg_rate >= 0.0) {
GstQOSType type;
GstClockTimeDiff diff;
/* if we have a valid rate, start sending QoS messages */
if (clocksync->current_jitter < 0) {
/* make sure we never go below 0 when adding the jitter to the
* timestamp. */
if (clocksync->current_rstart < -clocksync->current_jitter)
clocksync->current_jitter = -clocksync->current_rstart;
}
diff = clocksync->current_jitter;
if (diff <= 0)
type = GST_QOS_TYPE_OVERFLOW;
else
type = GST_QOS_TYPE_UNDERFLOW;
gst_clock_sync_send_qos (clocksync, type, clocksync->avg_rate,
clocksync->current_rstart, diff);
}
return TRUE;
}
static GstFlowReturn
gst_clocksync_do_sync (GstClockSync * clocksync, GstClockTime running_time)
{
GstFlowReturn ret = GST_FLOW_OK;
GstClock *clock;
clocksync->current_rstart = GST_CLOCK_TIME_NONE;
if (!clocksync->sync)
return GST_FLOW_OK;
@ -355,7 +540,29 @@ gst_clocksync_do_sync (GstClockSync * clocksync, GstClockTime running_time)
}
if (cret == GST_CLOCK_UNSCHEDULED || clocksync->flushing)
ret = GST_FLOW_FLUSHING;
clocksync->current_jitter = jitter;
}
clocksync->current_rstart = running_time;
/* calculate inter frame spacing */
if (G_UNLIKELY (GST_CLOCK_TIME_IS_VALID (clocksync->prev_rstart) &&
clocksync->prev_rstart < running_time)) {
GstClockTime in_diff;
in_diff = running_time - clocksync->prev_rstart;
if (clocksync->avg_in_diff == -1)
clocksync->avg_in_diff = in_diff;
else
clocksync->avg_in_diff =
UPDATE_RUNNING_AVG (clocksync->avg_in_diff, in_diff);
GST_LOG_OBJECT (clocksync, "avg frame diff %" GST_TIME_FORMAT,
GST_TIME_ARGS (clocksync->avg_in_diff));
}
clocksync->prev_rstart = running_time;
GST_OBJECT_UNLOCK (clocksync);
return ret;
@ -374,6 +581,8 @@ gst_clock_sync_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
case GST_EVENT_SEGMENT:
/* store the event for synching */
gst_event_copy_segment (event, &clocksync->segment);
gst_clock_sync_reset_qos (clocksync);
break;
case GST_EVENT_GAP:
{
@ -400,6 +609,11 @@ gst_clock_sync_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
gst_clock_id_unschedule (clocksync->clock_id);
}
GST_OBJECT_UNLOCK (clocksync);
GST_PAD_STREAM_LOCK (pad);
gst_clock_sync_reset_qos (clocksync);
GST_PAD_STREAM_UNLOCK (pad);
break;
case GST_EVENT_FLUSH_STOP:
GST_OBJECT_LOCK (clocksync);
@ -417,6 +631,31 @@ gst_clock_sync_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
return ret;
}
static gboolean
gst_clock_sync_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
GstClockSync *clocksync = GST_CLOCKSYNC (parent);
GST_LOG_OBJECT (clocksync, "Received %s event: %" GST_PTR_FORMAT,
GST_EVENT_TYPE_NAME (event), event);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_QOS:
if (g_atomic_int_get (&clocksync->qos_enabled)) {
GST_LOG_OBJECT (clocksync,
"Dropping downstream QoS event as we are responsible for handling QoS");
gst_event_unref (event);
return TRUE;
}
break;
default:
break;
}
/* Always handle all events as normal: */
return gst_pad_event_default (pad, parent, event);
}
static void
gst_clock_sync_update_ts_offset (GstClockSync * clocksync,
GstClockTime runtimestamp)
@ -456,6 +695,7 @@ gst_clock_sync_update_ts_offset (GstClockSync * clocksync,
static GstFlowReturn
gst_clock_sync_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
{
gboolean performed_qos = FALSE;
GstClockSync *clocksync = GST_CLOCKSYNC (parent);
GstFlowReturn ret = GST_FLOW_OK;
@ -495,16 +735,25 @@ gst_clock_sync_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
gst_buffer_unref (buf);
return ret;
}
performed_qos = gst_clock_sync_perform_qos (clocksync);
}
/* Forward the buffer */
return gst_pad_push (clocksync->srcpad, buf);
ret = gst_pad_push (clocksync->srcpad, buf);
if (performed_qos)
clocksync->last_left =
gst_element_get_current_running_time (GST_ELEMENT (parent));
return ret;
}
static GstFlowReturn
gst_clock_sync_chain_list (GstPad * pad, GstObject * parent,
GstBufferList * buffer_list)
{
gboolean performed_qos = FALSE;
GstClockSync *clocksync = GST_CLOCKSYNC (parent);
GstFlowReturn ret = GST_FLOW_OK;
GstBuffer *buf;
@ -538,11 +787,19 @@ gst_clock_sync_chain_list (GstPad * pad, GstObject * parent,
gst_buffer_list_unref (buffer_list);
return ret;
}
performed_qos = gst_clock_sync_perform_qos (clocksync);
}
/* Forward the buffer list */
done:
return gst_pad_push_list (clocksync->srcpad, buffer_list);
ret = gst_pad_push_list (clocksync->srcpad, buffer_list);
if (performed_qos)
clocksync->last_left =
gst_element_get_current_running_time (GST_ELEMENT (parent));
return ret;
}
static gboolean
@ -647,6 +904,7 @@ gst_clocksync_change_state (GstElement * element, GstStateChange transition)
GST_OBJECT_UNLOCK (clocksync);
if (clocksync->sync)
no_preroll = TRUE;
gst_clock_sync_reset_qos (clocksync);
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
break;

View file

@ -63,6 +63,23 @@ struct _GstClockSync
gboolean is_first;
GstClockTime upstream_latency;
/* QoS */
gboolean qos_enabled;
// With STREAM_LOCK {
GstClockTime earliest_in_time;
GstClockTime current_rstart;
GstClockTimeDiff current_jitter;
GstClockTime avg_pt, avg_in_diff;
gdouble avg_rate; /* average with infinite window */
/* when the last buffer left the sink, running time */
GstClockTime last_left;
/* the running time of the previous buffer */
GstClockTime prev_rstart;
// } With STREAM_LOCK
};
struct _GstClockSyncClass