Merge branch 'for_3047' into 'main'

pulsesink: add lock as atomic operation for getting/setting properties

See merge request gstreamer/gstreamer!6588
This commit is contained in:
Jimmy Ohn 2024-05-03 21:15:02 +00:00
commit 22756d712c
4 changed files with 165 additions and 27 deletions

View file

@ -63,6 +63,7 @@
#include "pulsesink.h"
#include "pulseutil.h"
GST_DEBUG_CATEGORY_EXTERN (pulse_debug);
#define GST_CAT_DEFAULT pulse_debug
@ -467,6 +468,7 @@ gst_pulsering_context_subscribe_cb (pa_context * c,
if (idx != pa_stream_get_index (pbuf->stream))
continue;
g_mutex_lock (&psink->prop_lock);
if (psink->device && pbuf->is_pcm &&
!g_str_equal (psink->device,
pa_stream_get_device_name (pbuf->stream))) {
@ -476,6 +478,7 @@ gst_pulsering_context_subscribe_cb (pa_context * c,
g_free (psink->device);
psink->device = g_strdup (pa_stream_get_device_name (pbuf->stream));
g_mutex_unlock (&psink->prop_lock);
GST_INFO_OBJECT (psink, "emitting sink-changed");
@ -486,6 +489,8 @@ gst_pulsering_context_subscribe_cb (pa_context * c,
if (!gst_pad_push_event (GST_BASE_SINK (psink)->sinkpad, renego))
GST_DEBUG_OBJECT (psink, "Emitted sink-changed - nobody was listening");
} else {
g_mutex_unlock (&psink->prop_lock);
}
/* Actually this event is also triggered when other properties of
@ -515,11 +520,13 @@ gst_pulseringbuffer_open_device (GstAudioRingBuffer * buf)
g_assert (!pbuf->stream);
g_assert (psink->client_name);
g_mutex_lock (&psink->prop_lock);
if (psink->server)
pbuf->context_name = g_strdup_printf ("%s@%s", psink->client_name,
psink->server);
else
pbuf->context_name = g_strdup (psink->client_name);
g_mutex_unlock (&psink->prop_lock);
pa_threaded_mainloop_lock (mainloop);
@ -548,11 +555,15 @@ gst_pulseringbuffer_open_device (GstAudioRingBuffer * buf)
/* try to connect to the server and wait for completion, we don't want to
* autospawn a daemon */
g_mutex_lock (&psink->prop_lock);
GST_LOG_OBJECT (psink, "connect to server %s",
GST_STR_NULL (psink->server));
if (pa_context_connect (pctx->context, psink->server,
PA_CONTEXT_NOAUTOSPAWN, NULL) < 0)
PA_CONTEXT_NOAUTOSPAWN, NULL) < 0) {
g_mutex_unlock (&psink->prop_lock);
goto connect_failed;
}
g_mutex_unlock (&psink->prop_lock);
} else {
GST_INFO_OBJECT (psink,
"reusing shared context with name %s, pbuf=%p, pctx=%p",
@ -821,8 +832,10 @@ gst_pulsering_stream_event_cb (pa_stream * p, const char *name,
psink->format_lost_time = g_ascii_strtoull (pa_proplist_gets (pl,
"stream-time"), NULL, 0) * 1000;
g_mutex_lock (&psink->prop_lock);
g_free (psink->device);
psink->device = g_strdup (pa_proplist_gets (pl, "device"));
g_mutex_unlock (&psink->prop_lock);
/* FIXME: send reconfigure event instead and let decodebin/playbin
* handle that. Also take care of ac3 alignment */
@ -937,8 +950,13 @@ gst_pulseringbuffer_acquire (GstAudioRingBuffer * buf,
/* create a stream */
formats[0] = pbuf->format;
if (!(pbuf->stream = pa_stream_new_extended (pbuf->context, name, formats, 1,
psink->proplist)))
g_mutex_lock (&psink->prop_lock);
pbuf->stream =
pa_stream_new_extended (pbuf->context, name, formats, 1, psink->proplist);
g_mutex_unlock (&psink->prop_lock);
if (!pbuf->stream)
goto stream_failed;
/* install essential callbacks */
@ -973,16 +991,21 @@ gst_pulseringbuffer_acquire (GstAudioRingBuffer * buf,
GST_INFO_OBJECT (psink, "minreq: %d", wanted.minreq);
/* configure volume when we changed it, else we leave the default */
g_mutex_lock (&psink->prop_lock);
if (psink->volume_set) {
GST_LOG_OBJECT (psink, "have volume of %f", psink->volume);
g_mutex_unlock (&psink->prop_lock);
pv = &v;
if (pbuf->is_pcm)
if (pbuf->is_pcm) {
g_mutex_lock (&psink->prop_lock);
gst_pulse_cvolume_from_linear (pv, pbuf->channels, psink->volume);
else {
g_mutex_unlock (&psink->prop_lock);
} else {
GST_DEBUG_OBJECT (psink, "passthrough stream, not setting volume");
pv = NULL;
}
} else {
g_mutex_unlock (&psink->prop_lock);
pv = NULL;
}
@ -990,22 +1013,28 @@ gst_pulseringbuffer_acquire (GstAudioRingBuffer * buf,
flags = PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE |
PA_STREAM_ADJUST_LATENCY | PA_STREAM_START_CORKED;
g_mutex_lock (&psink->prop_lock);
if (psink->mute_set) {
if (psink->mute)
flags |= PA_STREAM_START_MUTED;
else
flags |= PA_STREAM_START_UNMUTED;
}
g_mutex_unlock (&psink->prop_lock);
/* we always start corked (see flags above) */
pbuf->corked = TRUE;
/* try to connect now */
g_mutex_lock (&psink->prop_lock);
GST_LOG_OBJECT (psink, "connect for playback to device %s",
GST_STR_NULL (psink->device));
if (pa_stream_connect_playback (pbuf->stream, psink->device,
&wanted, flags, pv, NULL) < 0)
&wanted, flags, pv, NULL) < 0) {
g_mutex_unlock (&psink->prop_lock);
goto connect_failed;
}
g_mutex_unlock (&psink->prop_lock);
/* our clock will now start from 0 again */
clock = GST_AUDIO_CLOCK (GST_AUDIO_BASE_SINK (psink)->provided_clock);
@ -1014,8 +1043,10 @@ gst_pulseringbuffer_acquire (GstAudioRingBuffer * buf,
if (!gst_pulsering_wait_for_stream_ready (psink, pbuf->stream))
goto connect_failed;
g_mutex_lock (&psink->prop_lock);
g_free (psink->device);
psink->device = g_strdup (pa_stream_get_device_name (pbuf->stream));
g_mutex_unlock (&psink->prop_lock);
#ifndef GST_DISABLE_GST_DEBUG
pa_format_info_snprint (print_buf, sizeof (print_buf),
@ -1025,7 +1056,10 @@ gst_pulseringbuffer_acquire (GstAudioRingBuffer * buf,
/* After we passed the volume off of to PA we never want to set it
again, since it is PA's job to save/restore volumes. */
g_mutex_lock (&psink->prop_lock);
psink->volume_set = psink->mute_set = FALSE;
g_mutex_unlock (&psink->prop_lock);
GST_LOG_OBJECT (psink, "stream is acquired now");
@ -2072,8 +2106,13 @@ gst_pulsesink_create_probe_stream (GstPulseSink * psink,
GST_LOG_OBJECT (psink, "Creating probe stream");
if (!(stream = pa_stream_new_extended (pbuf->context, "pulsesink probe",
formats, 1, psink->proplist)))
g_mutex_lock (&psink->prop_lock);
stream =
pa_stream_new_extended (pbuf->context, "pulsesink probe", formats, 1,
psink->proplist);
g_mutex_unlock (&psink->prop_lock);
if (!stream)
goto error;
/* construct the flags */
@ -2367,6 +2406,8 @@ gst_pulsesink_init (GstPulseSink * pulsesink)
pulsesink->properties = NULL;
pulsesink->proplist = NULL;
g_mutex_init (&pulsesink->prop_lock);
/* override with a custom clock */
if (GST_AUDIO_BASE_SINK (pulsesink)->provided_clock)
gst_object_unref (GST_AUDIO_BASE_SINK (pulsesink)->provided_clock);
@ -2385,6 +2426,7 @@ gst_pulsesink_finalize (GObject * object)
g_free (pulsesink->device);
g_free (pulsesink->client_name);
g_free (pulsesink->current_sink_name);
g_mutex_clear (&pulsesink->prop_lock);
free_device_info (&pulsesink->device_info);
@ -2556,6 +2598,7 @@ gst_pulsesink_sink_input_info_cb (pa_context * c, const pa_sink_input_info * i,
* it implies we just recreated the stream (caps change)
*/
if (i->index == pa_stream_get_index (pbuf->stream)) {
g_mutex_lock (&psink->prop_lock);
psink->volume = pa_sw_volume_to_linear (pa_cvolume_max (&i->volume));
psink->mute = i->mute;
psink->current_sink_idx = i->sink;
@ -2565,6 +2608,7 @@ gst_pulsesink_sink_input_info_cb (pa_context * c, const pa_sink_input_info * i,
MAX_VOLUME);
psink->volume = MAX_VOLUME;
}
g_mutex_unlock (&psink->prop_lock);
}
done:
@ -2602,10 +2646,16 @@ gst_pulsesink_get_sink_input_info (GstPulseSink * psink, gdouble * volume,
}
unlock:
if (volume)
if (volume) {
g_mutex_lock (&psink->prop_lock);
*volume = psink->volume;
if (mute)
g_mutex_unlock (&psink->prop_lock);
}
if (mute) {
g_mutex_lock (&psink->prop_lock);
*mute = psink->mute;
g_mutex_unlock (&psink->prop_lock);
}
if (o)
pa_operation_unref (o);
@ -2617,10 +2667,16 @@ unlock:
/* ERRORS */
no_mainloop:
{
if (volume)
if (volume) {
g_mutex_lock (&psink->prop_lock);
*volume = psink->volume;
if (mute)
g_mutex_unlock (&psink->prop_lock);
}
if (mute) {
g_mutex_lock (&psink->prop_lock);
*mute = psink->mute;
g_mutex_unlock (&psink->prop_lock);
}
GST_DEBUG_OBJECT (psink, "we have no mainloop");
return;
@ -2659,8 +2715,10 @@ gst_pulsesink_current_sink_info_cb (pa_context * c, const pa_sink_info * i,
* it implies we just recreated the stream (caps change)
*/
if (i->index == psink->current_sink_idx) {
g_mutex_lock (&psink->prop_lock);
g_free (psink->current_sink_name);
psink->current_sink_name = g_strdup (i->name);
g_mutex_unlock (&psink->prop_lock);
}
done:
@ -2685,7 +2743,6 @@ gst_pulsesink_get_current_device (GstPulseSink * pulsesink)
gst_pulsesink_get_sink_input_info (pulsesink, NULL, NULL);
pa_threaded_mainloop_lock (mainloop);
if (!(o = pa_context_get_sink_info_by_index (pbuf->context,
pulsesink->current_sink_idx, gst_pulsesink_current_sink_info_cb,
pulsesink)))
@ -2699,7 +2756,9 @@ gst_pulsesink_get_current_device (GstPulseSink * pulsesink)
unlock:
g_mutex_lock (&pulsesink->prop_lock);
current_sink = g_strdup (pulsesink->current_sink_name);
g_mutex_unlock (&pulsesink->prop_lock);
if (o)
pa_operation_unref (o);
@ -2738,15 +2797,22 @@ gst_pulsesink_device_description (GstPulseSink * psink)
if (!mainloop)
goto no_mainloop;
pa_threaded_mainloop_lock (mainloop);
pbuf = GST_PULSERING_BUFFER_CAST (GST_AUDIO_BASE_SINK (psink)->ringbuffer);
GST_OBJECT_LOCK (pbuf);
pa_threaded_mainloop_lock (mainloop);
if (pbuf == NULL)
goto no_buffer;
g_mutex_lock (&psink->prop_lock);
free_device_info (&psink->device_info);
if (!(o = pa_context_get_sink_info_by_name (pbuf->context,
psink->device, gst_pulsesink_sink_info_cb, &psink->device_info)))
psink->device, gst_pulsesink_sink_info_cb,
&psink->device_info))) {
g_mutex_unlock (&psink->prop_lock);
goto info_failed;
}
g_mutex_unlock (&psink->prop_lock);
while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
pa_threaded_mainloop_wait (mainloop);
@ -2758,8 +2824,12 @@ unlock:
if (o)
pa_operation_unref (o);
g_mutex_lock (&psink->prop_lock);
t = g_strdup (psink->device_info.description);
g_mutex_unlock (&psink->prop_lock);
pa_threaded_mainloop_unlock (mainloop);
GST_OBJECT_UNLOCK (pbuf);
return t;
@ -2850,6 +2920,7 @@ gst_pulsesink_set_property (GObject * object,
{
GstPulseSink *pulsesink = GST_PULSESINK_CAST (object);
g_mutex_lock (&pulsesink->prop_lock);
switch (prop_id) {
case PROP_SERVER:
g_free (pulsesink->server);
@ -2888,6 +2959,7 @@ gst_pulsesink_set_property (GObject * object,
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
g_mutex_unlock (&pulsesink->prop_lock);
}
static void
@ -2899,10 +2971,14 @@ gst_pulsesink_get_property (GObject * object,
switch (prop_id) {
case PROP_SERVER:
g_mutex_lock (&pulsesink->prop_lock);
g_value_set_string (value, pulsesink->server);
g_mutex_unlock (&pulsesink->prop_lock);
break;
case PROP_DEVICE:
g_mutex_lock (&pulsesink->prop_lock);
g_value_set_string (value, pulsesink->device);
g_mutex_unlock (&pulsesink->prop_lock);
break;
case PROP_CURRENT_DEVICE:
{
@ -2933,10 +3009,14 @@ gst_pulsesink_get_property (GObject * object,
break;
}
case PROP_CLIENT_NAME:
g_mutex_lock (&pulsesink->prop_lock);
g_value_set_string (value, pulsesink->client_name);
g_mutex_unlock (&pulsesink->prop_lock);
break;
case PROP_STREAM_PROPERTIES:
g_mutex_lock (&pulsesink->prop_lock);
gst_value_set_structure (value, pulsesink->properties);
g_mutex_unlock (&pulsesink->prop_lock);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);

View file

@ -74,6 +74,8 @@ struct _GstPulseSink
gint format_lost;
GstClockTime format_lost_time;
GMutex prop_lock;
};
#define PULSE_SINK_TEMPLATE_CAPS \

View file

@ -281,6 +281,8 @@ gst_pulsesrc_init (GstPulseSrc * pulsesrc)
pulsesrc->properties = NULL;
pulsesrc->proplist = NULL;
g_mutex_init (&pulsesrc->prop_lock);
/* this should be the default but it isn't yet */
gst_audio_base_src_set_slave_method (GST_AUDIO_BASE_SRC (pulsesrc),
GST_AUDIO_BASE_SRC_SLAVE_SKEW);
@ -306,8 +308,10 @@ gst_pulsesrc_destroy_stream (GstPulseSrc * pulsesrc)
g_object_notify (G_OBJECT (pulsesrc), "source-output-index");
}
g_mutex_lock (&pulsesrc->prop_lock);
g_free (pulsesrc->device_description);
pulsesrc->device_description = NULL;
g_mutex_unlock (&pulsesrc->prop_lock);
}
static void
@ -338,6 +342,7 @@ gst_pulsesrc_finalize (GObject * object)
g_free (pulsesrc->device);
g_free (pulsesrc->client_name);
g_free (pulsesrc->current_source_name);
g_mutex_clear (&pulsesrc->prop_lock);
if (pulsesrc->properties)
gst_structure_free (pulsesrc->properties);
@ -383,8 +388,10 @@ gst_pulsesrc_source_info_cb (pa_context * c, const pa_source_info * i, int eol,
if (!i)
goto done;
g_mutex_lock (&pulsesrc->prop_lock);
g_free (pulsesrc->device_description);
pulsesrc->device_description = g_strdup (i->description);
g_mutex_unlock (&pulsesrc->prop_lock);
done:
pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
@ -423,7 +430,9 @@ unlock:
if (o)
pa_operation_unref (o);
g_mutex_lock (&pulsesrc->prop_lock);
t = g_strdup (pulsesrc->device_description);
g_mutex_unlock (&pulsesrc->prop_lock);
pa_threaded_mainloop_unlock (pulsesrc->mainloop);
@ -451,6 +460,7 @@ gst_pulsesrc_source_output_info_cb (pa_context * c,
* it implies we just recreated the stream (caps change)
*/
if (i->index == psrc->source_output_idx) {
g_mutex_lock (&psrc->prop_lock);
psrc->volume = pa_sw_volume_to_linear (pa_cvolume_max (&i->volume));
psrc->mute = i->mute;
psrc->current_source_idx = i->source;
@ -460,6 +470,7 @@ gst_pulsesrc_source_output_info_cb (pa_context * c,
psrc->volume, MAX_VOLUME);
psrc->volume = MAX_VOLUME;
}
g_mutex_unlock (&psrc->prop_lock);
}
done:
@ -493,10 +504,16 @@ gst_pulsesrc_get_source_output_info (GstPulseSrc * pulsesrc, gdouble * volume,
unlock:
if (volume)
if (volume) {
g_mutex_lock (&pulsesrc->prop_lock);
*volume = pulsesrc->volume;
if (mute)
g_mutex_unlock (&pulsesrc->prop_lock);
}
if (mute) {
g_mutex_lock (&pulsesrc->prop_lock);
*mute = pulsesrc->mute;
g_mutex_unlock (&pulsesrc->prop_lock);
}
if (o)
pa_operation_unref (o);
@ -509,19 +526,31 @@ unlock:
no_mainloop:
{
GST_DEBUG_OBJECT (pulsesrc, "we have no mainloop");
if (volume)
if (volume) {
g_mutex_lock (&pulsesrc->prop_lock);
*volume = pulsesrc->volume;
if (mute)
g_mutex_unlock (&pulsesrc->prop_lock);
}
if (mute) {
g_mutex_lock (&pulsesrc->prop_lock);
*mute = pulsesrc->mute;
g_mutex_unlock (&pulsesrc->prop_lock);
}
return;
}
no_index:
{
GST_DEBUG_OBJECT (pulsesrc, "we don't have a stream index");
if (volume)
if (volume) {
g_mutex_lock (&pulsesrc->prop_lock);
*volume = pulsesrc->volume;
if (mute)
g_mutex_unlock (&pulsesrc->prop_lock);
}
if (mute) {
g_mutex_lock (&pulsesrc->prop_lock);
*mute = pulsesrc->mute;
g_mutex_unlock (&pulsesrc->prop_lock);
}
return;
}
info_failed:
@ -548,8 +577,10 @@ gst_pulsesrc_current_source_info_cb (pa_context * c, const pa_source_info * i,
* it implies we just recreated the stream (caps change)
*/
if (i->index == psrc->current_source_idx) {
g_mutex_lock (&psrc->prop_lock);
g_free (psrc->current_source_name);
psrc->current_source_name = g_strdup (i->name);
g_mutex_unlock (&psrc->prop_lock);
}
done:
@ -586,7 +617,9 @@ gst_pulsesrc_get_current_device (GstPulseSrc * pulsesrc)
unlock:
g_mutex_lock (&pulsesrc->prop_lock);
current_src = g_strdup (pulsesrc->current_source_name);
g_mutex_unlock (&pulsesrc->prop_lock);
if (o)
pa_operation_unref (o);
@ -780,6 +813,7 @@ gst_pulsesrc_set_property (GObject * object,
GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (object);
g_mutex_lock (&pulsesrc->prop_lock);
switch (prop_id) {
case PROP_SERVER:
g_free (pulsesrc->server);
@ -818,6 +852,7 @@ gst_pulsesrc_set_property (GObject * object,
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
g_mutex_unlock (&pulsesrc->prop_lock);
}
static void
@ -829,10 +864,14 @@ gst_pulsesrc_get_property (GObject * object,
switch (prop_id) {
case PROP_SERVER:
g_mutex_lock (&pulsesrc->prop_lock);
g_value_set_string (value, pulsesrc->server);
g_mutex_unlock (&pulsesrc->prop_lock);
break;
case PROP_DEVICE:
g_mutex_lock (&pulsesrc->prop_lock);
g_value_set_string (value, pulsesrc->device);
g_mutex_unlock (&pulsesrc->prop_lock);
break;
case PROP_CURRENT_DEVICE:
{
@ -847,13 +886,19 @@ gst_pulsesrc_get_property (GObject * object,
g_value_take_string (value, gst_pulsesrc_device_description (pulsesrc));
break;
case PROP_CLIENT_NAME:
g_mutex_lock (&pulsesrc->prop_lock);
g_value_set_string (value, pulsesrc->client_name);
g_mutex_unlock (&pulsesrc->prop_lock);
break;
case PROP_STREAM_PROPERTIES:
g_mutex_lock (&pulsesrc->prop_lock);
gst_value_set_structure (value, pulsesrc->properties);
g_mutex_unlock (&pulsesrc->prop_lock);
break;
case PROP_SOURCE_OUTPUT_INDEX:
g_mutex_lock (&pulsesrc->prop_lock);
g_value_set_uint (value, pulsesrc->source_output_idx);
g_mutex_unlock (&pulsesrc->prop_lock);
break;
case PROP_VOLUME:
{
@ -996,9 +1041,13 @@ gst_pulsesrc_open (GstAudioSrc * asrc)
GST_DEBUG_OBJECT (pulsesrc, "opening device");
if (!(pulsesrc->context =
pa_context_new (pa_threaded_mainloop_get_api (pulsesrc->mainloop),
pulsesrc->client_name))) {
g_mutex_lock (&pulsesrc->prop_lock);
pulsesrc->context =
pa_context_new (pa_threaded_mainloop_get_api (pulsesrc->mainloop),
pulsesrc->client_name);
g_mutex_unlock (&pulsesrc->prop_lock);
if (!pulsesrc->context) {
GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED, ("Failed to create context"),
(NULL));
goto unlock_and_fail;
@ -1310,17 +1359,22 @@ gst_pulsesrc_create_stream (GstPulseSrc * pulsesrc, GstCaps ** caps,
goto bad_context;
name = "Record Stream";
g_mutex_lock (&pulsesrc->prop_lock);
if (pulsesrc->proplist) {
if (!(pulsesrc->stream = pa_stream_new_with_proplist (pulsesrc->context,
name, &pulsesrc->sample_spec,
(need_channel_layout) ? NULL : &channel_map,
pulsesrc->proplist)))
pulsesrc->proplist))) {
g_mutex_unlock (&pulsesrc->prop_lock);
goto create_failed;
}
} else if (!(pulsesrc->stream = pa_stream_new (pulsesrc->context,
name, &pulsesrc->sample_spec,
(need_channel_layout) ? NULL : &channel_map)))
(need_channel_layout) ? NULL : &channel_map))) {
g_mutex_unlock (&pulsesrc->prop_lock);
goto create_failed;
}
g_mutex_unlock (&pulsesrc->prop_lock);
if (caps) {
m = pa_stream_get_channel_map (pulsesrc->stream);

View file

@ -72,6 +72,8 @@ struct _GstPulseSrc
GstStructure *properties;
pa_proplist *proplist;
GMutex prop_lock;
};
G_END_DECLS