gst/systemclock: wait on each entry individually

Problem:
multiple aggregator elements (audiomixer, compositor) in a live
pipeline use a lot of CPU waiting each other up.  This is because
of the previously unused clock entry unscheduling during regular
operation.

Clock entry unscheduling has the potential to wake up every clock entry
waiting using the system clock which may be a large number.

Solution:
Implement waiting per entry and only wakeup the unscheduled entry.

While this may be possible using GCond, theoretically GCond only gives
us microsecond accuracy and uses relative waits in a number of places.
We can unfortunately do better poking at the platform specifics
ourselves by using futexes on linux and pthread on other unix.  Windows
may have a possible implementation using Waitable timers but that is
not implemented here and instead falls back to the GCond implementation.
GCond waits on Windows is still as accurate as the previous GstPoll-based
implementation.
This commit is contained in:
Matthew Waters 2020-04-15 17:54:21 +10:00
parent 546bf6ec25
commit 6f9a63a10d
4 changed files with 333 additions and 25 deletions

View file

@ -514,5 +514,16 @@ struct _GstDynamicTypeFactoryClass {
/* privat flag used by GstBus / GstMessage */
#define GST_MESSAGE_FLAG_ASYNC_DELIVERY (GST_MINI_OBJECT_FLAG_LAST << 0)
/* private struct used by GstClock and GstSystemClock */
struct _GstClockEntryImpl
{
GstClockEntry entry;
GWeakRef clock;
GDestroyNotify destroy_entry;
gpointer padding[16]; /* padding for allowing e.g. systemclock
* to add data in lieu of overridable
* virtual functions on the clock */
};
G_END_DECLS
#endif /* __GST_PRIVATE_H__ */

View file

@ -167,11 +167,7 @@ struct _GstClockPrivate
gboolean synced;
};
typedef struct
{
GstClockEntry entry;
GWeakRef clock;
} GstClockEntryImpl;
typedef struct _GstClockEntryImpl GstClockEntryImpl;
#define GST_CLOCK_ENTRY_CLOCK_WEAK_REF(entry) (&((GstClockEntryImpl *)(entry))->clock)
@ -249,7 +245,7 @@ gst_clock_entry_new (GstClock * clock, GstClockTime time,
{
GstClockEntry *entry;
entry = (GstClockEntry *) g_slice_new (GstClockEntryImpl);
entry = (GstClockEntry *) g_slice_new0 (GstClockEntryImpl);
/* FIXME: add tracer hook for struct allocations such as clock entries */
@ -363,6 +359,7 @@ static void
_gst_clock_id_free (GstClockID id)
{
GstClockEntry *entry;
GstClockEntryImpl *entry_impl;
g_return_if_fail (id != NULL);
GST_CAT_DEBUG (GST_CAT_CLOCK, "freed entry %p", id);
@ -370,6 +367,10 @@ _gst_clock_id_free (GstClockID id)
if (entry->destroy_data)
entry->destroy_data (entry->user_data);
entry_impl = (GstClockEntryImpl *) id;
if (entry_impl->destroy_entry)
entry_impl->destroy_entry (entry_impl);
g_weak_ref_clear (GST_CLOCK_ENTRY_CLOCK_WEAK_REF (entry));
/* FIXME: add tracer hook for struct allocations such as clock entries */

View file

@ -73,9 +73,265 @@
#define GST_SYSTEM_CLOCK_UNLOCK(clock) g_mutex_unlock(GST_SYSTEM_CLOCK_GET_LOCK(clock))
#define GST_SYSTEM_CLOCK_GET_COND(clock) (&GST_SYSTEM_CLOCK_CAST(clock)->priv->entries_changed)
#define GST_SYSTEM_CLOCK_WAIT(clock) g_cond_wait(GST_SYSTEM_CLOCK_GET_COND(clock),GST_SYSTEM_CLOCK_GET_LOCK(clock))
#define GST_SYSTEM_CLOCK_WAIT_UNTIL(clock,us) g_cond_wait_until(GST_SYSTEM_CLOCK_GET_COND(clock),GST_SYSTEM_CLOCK_GET_LOCK(clock),us)
#define GST_SYSTEM_CLOCK_BROADCAST(clock) g_cond_broadcast(GST_SYSTEM_CLOCK_GET_COND(clock))
#if defined(HAVE_FUTEX)
#include <unistd.h>
#include <linux/futex.h>
#include <sys/syscall.h>
#ifndef FUTEX_WAIT_BITSET_PRIVATE
#define FUTEX_WAIT_BITSET_PRIVATE FUTEX_WAIT_BITSET
#endif
#ifndef FUTEX_WAKE_PRIVATE
#define FUTEX_WAKE_PRIVATE FUTEX_WAKE
#endif
#define GST_SYSTEM_CLOCK_ENTRY_GET_LOCK(entry) (&(entry)->lock)
#define GST_SYSTEM_CLOCK_ENTRY_GET_COND(entry) (&(entry)->cond_val)
#define GST_SYSTEM_CLOCK_ENTRY_LOCK(entry) (g_mutex_lock(GST_SYSTEM_CLOCK_ENTRY_GET_LOCK(entry)))
#define GST_SYSTEM_CLOCK_ENTRY_UNLOCK(entry) (g_mutex_unlock(GST_SYSTEM_CLOCK_ENTRY_GET_LOCK(entry)))
#define GST_SYSTEM_CLOCK_ENTRY_WAIT_UNTIL(entry,ns) gst_futex_cond_wait_until(GST_SYSTEM_CLOCK_ENTRY_GET_COND(entry),GST_SYSTEM_CLOCK_ENTRY_GET_LOCK(entry),ns)
#define GST_SYSTEM_CLOCK_ENTRY_BROADCAST(entry) gst_futex_cond_broadcast(GST_SYSTEM_CLOCK_ENTRY_GET_COND(entry))
typedef struct _GstClockEntryFutex GstClockEntryImpl;
struct _GstClockEntryFutex
{
GstClockEntry entry;
GWeakRef clock;
GDestroyNotify destroy_entry;
gboolean initialized;
GMutex lock;
guint cond_val;
};
static void
clear_entry (GstClockEntryImpl * entry)
{
g_mutex_clear (&entry->lock);
}
static void
init_entry (GstClockEntryImpl * entry)
{
g_mutex_init (&entry->lock);
entry->destroy_entry = (GDestroyNotify) clear_entry;
}
static void
gst_futex_cond_broadcast (guint * cond_val)
{
g_atomic_int_inc (cond_val);
syscall (__NR_futex, cond_val, (gsize) FUTEX_WAKE_PRIVATE, (gsize) INT_MAX,
NULL);
}
static gboolean
gst_futex_cond_wait_until (guint * cond_val, GMutex * mutex, gint64 end_time)
{
struct timespec end;
guint sampled;
int res;
gboolean success;
if (end_time < 0)
return FALSE;
end.tv_sec = end_time / 1000000000;
end.tv_nsec = end_time % 1000000000;
sampled = *cond_val;
g_mutex_unlock (mutex);
/* we use FUTEX_WAIT_BITSET_PRIVATE rather than FUTEX_WAIT_PRIVATE to be
* able to use absolute time */
res =
syscall (__NR_futex, cond_val, (gsize) FUTEX_WAIT_BITSET_PRIVATE,
(gsize) sampled, &end, NULL, FUTEX_BITSET_MATCH_ANY);
success = (res < 0 && errno == ETIMEDOUT) ? FALSE : TRUE;
g_mutex_lock (mutex);
return success;
}
#elif defined (G_OS_UNIX)
#define GST_SYSTEM_CLOCK_ENTRY_GET_LOCK(entry) (&(entry)->lock)
#define GST_SYSTEM_CLOCK_ENTRY_GET_COND(entry) (&(entry)->cond)
#define GST_SYSTEM_CLOCK_ENTRY_LOCK(entry) (pthread_mutex_lock(GST_SYSTEM_CLOCK_ENTRY_GET_LOCK(entry)))
#define GST_SYSTEM_CLOCK_ENTRY_UNLOCK(entry) (pthread_mutex_unlock(GST_SYSTEM_CLOCK_ENTRY_GET_LOCK(entry)))
#define GST_SYSTEM_CLOCK_ENTRY_WAIT_UNTIL(entry,ns) gst_pthread_cond_wait_until(GST_SYSTEM_CLOCK_ENTRY_GET_COND(entry),GST_SYSTEM_CLOCK_ENTRY_GET_LOCK(entry),(ns))
#define GST_SYSTEM_CLOCK_ENTRY_BROADCAST(entry) pthread_cond_broadcast(GST_SYSTEM_CLOCK_ENTRY_GET_COND(entry))
typedef struct _GstClockEntryPThread GstClockEntryImpl;
struct _GstClockEntryPThread
{
GstClockEntry entry;
GWeakRef clock;
GDestroyNotify destroy_entry;
gboolean initialized;
pthread_cond_t cond;
pthread_mutex_t lock;
};
static gboolean
gst_pthread_cond_wait_until (pthread_cond_t * cond, pthread_mutex_t * lock,
guint64 end_time)
{
struct timespec ts;
gint status;
#if defined (HAVE_PTHREAD_CONDATTR_SETCLOCK) && defined (CLOCK_MONOTONIC)
/* This is the exact check we used during init to set the clock to
* monotonic, so if we're in this branch, timedwait() will already be
* expecting a monotonic clock.
*/
{
ts.tv_sec = end_time / 1000000000;
ts.tv_nsec = end_time % 1000000000;
if ((status = pthread_cond_timedwait (cond, lock, &ts)) == 0)
return TRUE;
}
#elif defined (HAVE_PTHREAD_COND_TIMEDWAIT_RELATIVE_NP)
/* end_time is given relative to the monotonic clock as returned by
* g_get_monotonic_time().
*
* Since this pthreads wants the relative time, convert it back again.
*/
{
gint64 now = g_get_monotonic_time () * 1000;
gint64 relative;
if (end_time <= now)
return FALSE;
relative = end_time - now;
ts.tv_sec = relative / 1000000000;
ts.tv_nsec = relative % 1000000000;
if ((status = pthread_cond_timedwait_relative_np (cond, lock, &ts)) == 0)
return TRUE;
}
#else
#error Cannot use pthread condition variables on your platform.
#endif
if (G_UNLIKELY (status != ETIMEDOUT)) {
g_error ("pthread_cond_timedwait returned %d", status);
}
return FALSE;
}
static void
clear_entry (GstClockEntryImpl * entry)
{
pthread_cond_destroy (&entry->cond);
pthread_mutex_destroy (&entry->lock);
}
static void
init_entry (GstClockEntryImpl * entry)
{
pthread_mutexattr_t *m_pattr = NULL;
#ifdef PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
pthread_mutexattr_t m_attr;
#endif
pthread_condattr_t c_attr;
gint status;
pthread_condattr_init (&c_attr);
#if defined (HAVE_PTHREAD_CONDATTR_SETCLOCK) && defined (CLOCK_MONOTONIC)
status = pthread_condattr_setclock (&c_attr, CLOCK_MONOTONIC);
if (G_UNLIKELY (status != 0)) {
g_error ("pthread_condattr_setclock returned %d", status);
}
#elif defined (HAVE_PTHREAD_COND_TIMEDWAIT_RELATIVE_NP)
#else
#error Cannot use pthread condition variables on your platform.
#endif
status = pthread_cond_init (&entry->cond, &c_attr);
if (G_UNLIKELY (status != 0)) {
g_error ("pthread_cond_init returned %d", status);
}
pthread_condattr_destroy (&c_attr);
#ifdef PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
pthread_mutexattr_init (&m_attr);
pthread_mutexattr_settype (&m_attr, PTHREAD_MUTEX_ADAPTIVE_NP);
m_pattr = &m_attr;
#endif
status = pthread_mutex_init (&entry->lock, m_pattr);
if (G_UNLIKELY (status != 0)) {
g_error ("pthread_mutex_init returned %d", status);
}
#ifdef PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
pthread_mutexattr_destroy (&m_attr);
#endif
entry->destroy_entry = (GDestroyNotify) clear_entry;
}
#else
#define GST_SYSTEM_CLOCK_ENTRY_GET_LOCK(entry) (&(entry)->lock)
#define GST_SYSTEM_CLOCK_ENTRY_GET_COND(entry) (&(entry)->cond)
#define GST_SYSTEM_CLOCK_ENTRY_LOCK(entry) (g_mutex_lock(GST_SYSTEM_CLOCK_ENTRY_GET_LOCK(entry)))
#define GST_SYSTEM_CLOCK_ENTRY_UNLOCK(entry) (g_mutex_unlock(GST_SYSTEM_CLOCK_ENTRY_GET_LOCK(entry)))
#define GST_SYSTEM_CLOCK_ENTRY_WAIT_UNTIL(entry,ns) g_cond_wait_until(GST_SYSTEM_CLOCK_ENTRY_GET_COND(entry),GST_SYSTEM_CLOCK_ENTRY_GET_LOCK(entry),(ns / 1000))
#define GST_SYSTEM_CLOCK_ENTRY_BROADCAST(entry) g_cond_broadcast(GST_SYSTEM_CLOCK_ENTRY_GET_COND(entry))
typedef struct _GstClockEntryGLib GstClockEntryImpl;
struct _GstClockEntryGLib
{
GstClockEntry entry;
GWeakRef clock;
GDestroyNotify destroy_entry;
gboolean initialized;
GMutex lock;
GCond cond;
};
static void
clear_entry (GstClockEntryImpl * entry)
{
g_cond_clear (&entry->cond);
g_mutex_clear (&entry->lock);
}
static void
init_entry (GstClockEntryImpl * entry)
{
g_cond_init (&entry->cond);
g_mutex_init (&entry->lock);
entry->destroy_entry = (GDestroyNotify) clear_entry;
}
#endif
/* check that our impl is smaller than what will be allocated by gstclock.c */
G_STATIC_ASSERT (sizeof (GstClockEntryImpl) <=
sizeof (struct _GstClockEntryImpl));
static inline void
ensure_entry_initialized (GstClockEntryImpl * entry_impl)
{
if (!entry_impl->initialized) {
init_entry (entry_impl);
entry_impl->initialized = TRUE;
}
}
struct _GstSystemClockPrivate
{
GThread *thread; /* thread for async notify */
@ -226,6 +482,11 @@ gst_system_clock_dispose (GObject * object)
SET_ENTRY_STATUS (entry, GST_CLOCK_UNSCHEDULED);
}
GST_SYSTEM_CLOCK_BROADCAST (clock);
if (priv->entries) {
GstClockEntryImpl *entry = (GstClockEntryImpl *) priv->entries->data;
ensure_entry_initialized (entry);
GST_SYSTEM_CLOCK_ENTRY_BROADCAST (entry);
}
GST_SYSTEM_CLOCK_UNLOCK (clock);
if (priv->thread)
@ -427,11 +688,17 @@ gst_system_clock_async_thread (GstClock * clock)
requested = entry->time;
GST_CAT_DEBUG_OBJECT (GST_CAT_CLOCK, clock, "waiting on entry %p", entry);
GST_SYSTEM_CLOCK_UNLOCK (clock);
/* now wait for the entry */
res =
gst_system_clock_id_wait_jitter_unlocked (clock, (GstClockID) entry,
NULL, FALSE);
GST_SYSTEM_CLOCK_LOCK (clock);
switch (res) {
case GST_CLOCK_UNSCHEDULED:
/* entry was unscheduled, move to the next */
@ -604,12 +871,10 @@ gst_system_clock_get_resolution (GstClock * clock)
/* synchronously wait on the given GstClockEntry.
*
* We do this by blocking on the global GstPoll timer with
* the requested timeout. This allows us to unblock the
* entry by writing on the control fd.
*
* Note that writing the global GstPoll unlocks all waiting entries. So
* we need to check if an unlocked entry has changed when it unlocks.
* We do this by blocking on the entry specifically rather than a global
* condition variable so that each possible thread may be woken up
* individually. This ensures that we don't wake up possibly multiple threads
* when unscheduling an entry.
*
* Entries that arrive too late are simply not waited on and a
* GST_CLOCK_EARLY result is returned.
@ -661,7 +926,12 @@ gst_system_clock_id_wait_jitter_unlocked (GstClock * clock,
/* now wait on the entry, it either times out or the cond is signalled.
* The status of the entry is BUSY only around the wait. */
waitret = GST_SYSTEM_CLOCK_WAIT_UNTIL (clock, mono_ts + (diff / 1000));
GST_SYSTEM_CLOCK_ENTRY_LOCK ((GstClockEntryImpl *) entry);
waitret =
GST_SYSTEM_CLOCK_ENTRY_WAIT_UNTIL ((GstClockEntryImpl *) entry,
mono_ts * 1000 + diff);
GST_SYSTEM_CLOCK_ENTRY_UNLOCK ((GstClockEntryImpl *) entry);
/* get the new status, mark as DONE. We do this so that the unschedule
* function knows when we left the poll and doesn't need to wakeup the
@ -784,14 +1054,13 @@ gst_system_clock_id_wait_jitter (GstClock * clock, GstClockEntry * entry,
GstClockTimeDiff * jitter)
{
GstClockReturn status;
GstClockEntryImpl *entry_impl = (GstClockEntryImpl *) entry;
GST_SYSTEM_CLOCK_LOCK (clock);
do {
status = GET_ENTRY_STATUS (entry);
/* stop when we are unscheduled */
if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
GST_SYSTEM_CLOCK_UNLOCK (clock);
return status;
}
@ -803,9 +1072,16 @@ gst_system_clock_id_wait_jitter (GstClock * clock, GstClockEntry * entry,
* statuses */
} while (G_UNLIKELY (!CAS_ENTRY_STATUS (entry, status, GST_CLOCK_BUSY)));
GST_CAT_DEBUG_OBJECT (GST_CAT_CLOCK, clock, "waiting on entry %p", entry);
if (!entry_impl->initialized) {
GST_SYSTEM_CLOCK_LOCK (clock);
ensure_entry_initialized (entry_impl);
GST_SYSTEM_CLOCK_UNLOCK (clock);
}
status =
gst_system_clock_id_wait_jitter_unlocked (clock, entry, jitter, TRUE);
GST_SYSTEM_CLOCK_UNLOCK (clock);
return status;
}
@ -875,6 +1151,9 @@ gst_system_clock_id_wait_async (GstClock * clock, GstClockEntry * entry)
/* need to take a ref */
gst_clock_id_ref ((GstClockID) entry);
ensure_entry_initialized ((GstClockEntryImpl *) entry);
/* insert the entry in sorted order */
priv->entries = g_list_insert_sorted (priv->entries, entry,
gst_clock_id_compare_func);
@ -903,7 +1182,7 @@ gst_system_clock_id_wait_async (GstClock * clock, GstClockEntry * entry)
* looks at the new head entry instead, we only need to do this once */
GST_CAT_DEBUG_OBJECT (GST_CAT_CLOCK, clock,
"head entry was busy. Wakeup async thread");
GST_SYSTEM_CLOCK_BROADCAST (clock);
GST_SYSTEM_CLOCK_ENTRY_BROADCAST ((GstClockEntryImpl *) head);
}
}
}
@ -937,9 +1216,11 @@ gst_system_clock_id_unschedule (GstClock * clock, GstClockEntry * entry)
{
GstClockReturn status;
GST_CAT_DEBUG_OBJECT (GST_CAT_CLOCK, clock, "unscheduling entry %p", entry);
GST_SYSTEM_CLOCK_LOCK (clock);
GST_CAT_DEBUG_OBJECT (GST_CAT_CLOCK, clock, "unscheduling entry %p time %"
GST_TIME_FORMAT, entry, GST_TIME_ARGS (GST_CLOCK_ENTRY_TIME (entry)));
/* change the entry status to unscheduled */
do {
status = GET_ENTRY_STATUS (entry);
@ -947,13 +1228,11 @@ gst_system_clock_id_unschedule (GstClock * clock, GstClockEntry * entry)
GST_CLOCK_UNSCHEDULED)));
if (G_LIKELY (status == GST_CLOCK_BUSY)) {
/* the entry was being busy, wake up all entries so that they recheck their
* status. We cannot wake up just one entry because allocating such a
* datastructure for each entry would be too heavy and unlocking an entry
* is usually done when shutting down or some other exceptional case. */
/* the entry was being busy, wake up the entry */
GST_CAT_DEBUG_OBJECT (GST_CAT_CLOCK, clock, "entry was BUSY, doing wakeup");
if (!entry->unscheduled) {
GST_SYSTEM_CLOCK_BROADCAST (clock);
ensure_entry_initialized ((GstClockEntryImpl *) entry);
GST_SYSTEM_CLOCK_ENTRY_BROADCAST ((GstClockEntryImpl *) entry);
}
}
GST_SYSTEM_CLOCK_UNLOCK (clock);

View file

@ -257,6 +257,23 @@ if cc.links('''#include <pthread.h>
}''', name : 'pthread_setname_np(const char*)')
cdata.set('HAVE_PTHREAD_SETNAME_NP_WITHOUT_TID', 1)
endif
if cc.has_header_symbol('pthread.h', 'pthread_condattr_setclock')
cdata.set('HAVE_PTHREAD_CONDATTR_SETCLOCK', 1)
endif
if cc.has_header_symbol('pthread.h', 'pthread_cond_timedwait_relative_np')
cdata.set('HAVE_PTHREAD_COND_TIMEDWAIT_RELATIVE_NP', 1)
endif
# Check for futex(2)
if cc.links('''#include <linux/futex.h>
#include <sys/syscall.h>
#include <unistd.h>
int main (int argc, char ** argv) {
syscall (__NR_futex, NULL, FUTEX_WAKE, FUTEX_WAIT);
return 0;
}''', name : 'futex(2) system call')
cdata.set('HAVE_FUTEX', 1)
endif
# Check for posix timers and monotonic clock
time_prefix = '#include <time.h>\n'