Merge branch 'unixfd-wait-for-connection' into 'main'

unixfd: Fix racy unit test by adding wait-for-connection property

See merge request gstreamer/gstreamer!6765
This commit is contained in:
Xavier Claessens 2024-05-04 00:07:34 +00:00
commit a45f08ded3
3 changed files with 110 additions and 11 deletions

View file

@ -241517,6 +241517,18 @@
"readable": true,
"type": "GUnixSocketAddressType",
"writable": true
},
"wait-for-connection": {
"blurb": "Block the stream until a least one client is connected",
"conditionally-available": false,
"construct": true,
"construct-only": false,
"controllable": false,
"default": "false",
"mutable": "null",
"readable": true,
"type": "gboolean",
"writable": true
}
},
"rank": "none"
@ -241565,6 +241577,18 @@
"readable": true,
"type": "GUnixSocketAddressType",
"writable": true
},
"wait-for-connection": {
"blurb": "Block the stream until a least one client is connected",
"conditionally-available": false,
"construct": true,
"construct-only": false,
"controllable": false,
"default": "false",
"mutable": "null",
"readable": true,
"type": "gboolean",
"writable": true
}
},
"rank": "none"

View file

@ -87,6 +87,10 @@ struct _GstUnixFdSink
GstCaps *caps;
gboolean uses_monotonic_clock;
GByteArray *payload;
gboolean wait_for_connection;
GCond wait_for_connection_cond;
gboolean unlock;
};
G_DEFINE_TYPE (GstUnixFdSink, gst_unix_fd_sink, GST_TYPE_BASE_SINK);
@ -94,12 +98,14 @@ GST_ELEMENT_REGISTER_DEFINE (unixfdsink, "unixfdsink", GST_RANK_NONE,
GST_TYPE_UNIX_FD_SINK);
#define DEFAULT_SOCKET_TYPE G_UNIX_SOCKET_ADDRESS_PATH
#define DEFAULT_WAIT_FOR_CONNECTION FALSE
enum
{
PROP_0,
PROP_SOCKET_PATH,
PROP_SOCKET_TYPE,
PROP_WAIT_FOR_CONNECTION,
};
@ -122,6 +128,7 @@ gst_unix_fd_sink_init (GstUnixFdSink * self)
self->clients =
g_hash_table_new_full (NULL, NULL, g_object_unref,
(GDestroyNotify) client_free);
g_cond_init (&self->wait_for_connection_cond);
}
static void
@ -133,6 +140,7 @@ gst_unix_fd_sink_finalize (GObject * object)
g_main_context_unref (self->context);
g_main_loop_unref (self->loop);
g_hash_table_unref (self->clients);
g_cond_clear (&self->wait_for_connection_cond);
G_OBJECT_CLASS (gst_unix_fd_sink_parent_class)->finalize (object);
}
@ -163,6 +171,10 @@ gst_unix_fd_sink_set_property (GObject * object, guint prop_id,
}
self->socket_type = g_value_get_enum (value);
break;
case PROP_WAIT_FOR_CONNECTION:
self->wait_for_connection = g_value_get_boolean (value);
g_cond_signal (&self->wait_for_connection_cond);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -186,6 +198,9 @@ gst_unix_fd_sink_get_property (GObject * object, guint prop_id,
case PROP_SOCKET_TYPE:
g_value_set_enum (value, self->socket_type);
break;
case PROP_WAIT_FOR_CONNECTION:
g_value_set_boolean (value, self->wait_for_connection);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -314,6 +329,8 @@ new_client_cb (GSocket * socket, GIOCondition cond, gpointer user_data)
}
g_free (payload);
g_cond_signal (&self->wait_for_connection_cond);
GST_OBJECT_UNLOCK (self);
return G_SOURCE_CONTINUE;
@ -551,8 +568,21 @@ gst_unix_fd_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
new_buffer->type = MEMORY_TYPE_DMABUF;
GST_OBJECT_LOCK (self);
while (self->wait_for_connection && g_hash_table_size (self->clients) == 0) {
g_cond_wait (&self->wait_for_connection_cond, GST_OBJECT_GET_LOCK (self));
if (self->unlock) {
GST_OBJECT_UNLOCK (self);
ret = gst_base_sink_wait_preroll (bsink);
if (ret != GST_FLOW_OK)
goto out;
GST_OBJECT_LOCK (self);
}
}
send_command_to_all (self, COMMAND_TYPE_NEW_BUFFER, fds,
self->payload->data, self->payload->len, buffer);
GST_OBJECT_UNLOCK (self);
out:
@ -561,6 +591,31 @@ out:
return ret;
}
static gboolean
gst_unix_fd_sink_unlock (GstBaseSink * bsink)
{
GstUnixFdSink *self = (GstUnixFdSink *) bsink;
GST_OBJECT_LOCK (self);
self->unlock = TRUE;
g_cond_signal (&self->wait_for_connection_cond);
GST_OBJECT_UNLOCK (self);
return TRUE;
}
static gboolean
gst_unix_fd_sink_unlock_stop (GstBaseSink * bsink)
{
GstUnixFdSink *self = (GstUnixFdSink *) bsink;
GST_OBJECT_LOCK (self);
self->unlock = FALSE;
GST_OBJECT_UNLOCK (self);
return TRUE;
}
static gboolean
gst_unix_fd_sink_event (GstBaseSink * bsink, GstEvent * event)
{
@ -650,6 +705,9 @@ gst_unix_fd_sink_class_init (GstUnixFdSinkClass * klass)
gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_event);
gstbasesink_class->propose_allocation =
GST_DEBUG_FUNCPTR (gst_unix_fd_sink_propose_allocation);
gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_unlock);
gstbasesink_class->unlock_stop =
GST_DEBUG_FUNCPTR (gst_unix_fd_sink_unlock_stop);
g_object_class_install_property (gobject_class, PROP_SOCKET_PATH,
g_param_spec_string ("socket-path",
@ -666,4 +724,18 @@ gst_unix_fd_sink_class_init (GstUnixFdSinkClass * klass)
G_TYPE_UNIX_SOCKET_ADDRESS_TYPE, DEFAULT_SOCKET_TYPE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT |
GST_PARAM_MUTABLE_READY));
/**
* GstUnixFdSink:wait-for-connection:
*
* Block the stream until a least one client is connected.
*
* Since: 1.26
*/
g_object_class_install_property (gobject_class, PROP_WAIT_FOR_CONNECTION,
g_param_spec_boolean ("wait-for-connection",
"Wait for a connection until rendering",
"Block the stream until a least one client is connected",
DEFAULT_WAIT_FOR_CONNECTION,
G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
}

View file

@ -59,11 +59,9 @@ GST_START_TEST (test_unixfd_videotestsrc)
gst_meta_register_custom ("unix-fd-custom-meta", tags, NULL, NULL, NULL);
/* Ensure we don't have socket from previous failed test */
gchar *socket_path =
g_strdup_printf ("%s/unixfd-test-socket", g_get_user_runtime_dir ());
if (g_file_test (socket_path, G_FILE_TEST_EXISTS)) {
g_unlink (socket_path);
}
gchar *tempdir = g_dir_make_tmp ("unixfd-test-XXXXXX", &error);
g_assert_no_error (error);
gchar *socket_path = g_strdup_printf ("%s/socket", tempdir);
/* Setup source */
gchar *pipeline_str =
@ -126,6 +124,9 @@ GST_START_TEST (test_unixfd_videotestsrc)
GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS);
fail_if (g_file_test (socket_path, G_FILE_TEST_EXISTS));
g_rmdir (tempdir);
g_free (tempdir);
gst_object_unref (pipeline_service);
gst_object_unref (pipeline_client_1);
gst_object_unref (pipeline_client_2);
@ -140,18 +141,16 @@ GST_START_TEST (test_unixfd_segment)
GError *error = NULL;
/* Ensure we don't have socket from previous failed test */
gchar *socket_path =
g_strdup_printf ("%s/unixfd-test-socket", g_get_user_runtime_dir ());
if (g_file_test (socket_path, G_FILE_TEST_EXISTS)) {
g_unlink (socket_path);
}
gchar *tempdir = g_dir_make_tmp ("unixfd-test-XXXXXX", &error);
g_assert_no_error (error);
gchar *socket_path = g_strdup_printf ("%s/socket", tempdir);
GstCaps *caps = gst_caps_new_empty_simple ("video/x-raw");
/* Setup service */
gchar *pipeline_str =
g_strdup_printf
("appsrc name=src format=time handle-segment-change=true ! unixfdsink socket-path=%s sync=false async=false",
("appsrc name=src format=time handle-segment-change=true ! unixfdsink socket-path=%s sync=false async=false wait-for-connection=true",
socket_path);
GstElement *pipeline_service = gst_parse_launch (pipeline_str, &error);
g_assert_no_error (error);
@ -213,6 +212,10 @@ GST_START_TEST (test_unixfd_segment)
GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS);
fail_unless (gst_element_set_state (pipeline_service,
GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS);
g_rmdir (tempdir);
g_free (tempdir);
gst_object_unref (pipeline_service);
gst_object_unref (pipeline_client);
g_free (socket_path);