Merge branch 'shmsink-streamheaders' into 'main'

Draft: shmsink: Send streamheaders from caps to every new client

See merge request gstreamer/gstreamer!5946
This commit is contained in:
Olivier Crête 2024-05-03 20:24:01 +00:00
commit 963421ac36
4 changed files with 122 additions and 30 deletions

View file

@ -94,7 +94,8 @@ static void gst_shm_sink_get_property (GObject * object, guint prop_id,
static gboolean gst_shm_sink_start (GstBaseSink * bsink);
static gboolean gst_shm_sink_stop (GstBaseSink * bsink);
static GstFlowReturn gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf);
static gboolean gst_shm_sink_set_caps (GstBaseSink *bsink, GstCaps *caps);
static GstFlowReturn gst_shm_sink_render (GstBaseSink *bsink, GstBuffer *buf);
static gboolean gst_shm_sink_event (GstBaseSink * bsink, GstEvent * event);
static gboolean gst_shm_sink_unlock (GstBaseSink * bsink);
@ -378,6 +379,7 @@ gst_shm_sink_class_init (GstShmSinkClass * klass)
gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_shm_sink_start);
gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_shm_sink_stop);
gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR (gst_shm_sink_set_caps);
gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_shm_sink_render);
gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_shm_sink_event);
gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_shm_sink_unlock);
@ -623,6 +625,8 @@ gst_shm_sink_stop (GstBaseSink * bsink)
GST_DEBUG_OBJECT (self, "Stopping");
gst_clear_buffer_list (&self->streamheaders);
while (self->clients) {
struct GstShmClient *client = self->clients->data;
self->clients = g_list_remove (self->clients, client);
@ -642,6 +646,54 @@ gst_shm_sink_stop (GstBaseSink * bsink)
return TRUE;
}
static gboolean
gst_shm_sink_set_caps (GstBaseSink *bsink, GstCaps *caps)
{
GstShmSink *self = GST_SHM_SINK (bsink);
GstStructure *s;
s = gst_caps_get_structure (caps, 0);
GST_OBJECT_LOCK (self);
gst_clear_buffer_list (&self->streamheaders);
if (gst_structure_has_field_typed (s, "streamheader", GST_TYPE_ARRAY)) {
const GValue *streamheader;
guint i, size;
streamheader = gst_structure_get_value (s, "streamheader");
GST_DEBUG_OBJECT (self, "'streamheader' field holds array");
size = gst_value_array_get_size (streamheader);
self->streamheaders = gst_buffer_list_new_sized (size);
for (i = 0; i < size; i++) {
const GValue *v = gst_value_array_get_value (streamheader, i);
if (!GST_VALUE_HOLDS_BUFFER (v)) {
GST_ERROR_OBJECT (self, "'streamheader' item of unexpected type '%s'",
G_VALUE_TYPE_NAME (v));
GST_OBJECT_UNLOCK (self);
return FALSE;
}
gst_buffer_list_add (self->streamheaders, g_value_dup_boxed (v));
}
} else if (gst_structure_has_field_typed (s, "streamheader",
GST_TYPE_BUFFER)) {
GstBuffer *buf;
gst_structure_get (s, "streamheader", GST_TYPE_BUFFER, &buf, NULL);
gst_buffer_list_add (self->streamheaders, buf);
}
GST_OBJECT_UNLOCK (self);
return TRUE;
}
static gboolean
gst_shm_sink_can_render (GstShmSink * self, GstClockTime time)
{
@ -660,13 +712,32 @@ gst_shm_sink_can_render (GstShmSink * self, GstClockTime time)
return TRUE;
}
static gboolean
gst_shm_sink_is_usable_mem (GstShmSink * self, GstBuffer * buf)
{
if (gst_buffer_n_memory (buf) > 1) {
GST_LOG_OBJECT (self, "Buffer %p has %d GstMemory, we only support a single"
" one, need to do a memcpy", buf, gst_buffer_n_memory (buf));
return FALSE;
} else {
GstMemory *memory = gst_buffer_peek_memory (buf, 0);
if (memory->allocator != GST_ALLOCATOR (self->allocator)) {
GST_LOG_OBJECT (self, "Memory in buffer %p was not allocated by "
"%" GST_PTR_FORMAT ", will memcpy", buf, memory->allocator);
return FALSE;
}
}
return TRUE;
}
static GstFlowReturn
gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf)
{
GstShmSink *self = GST_SHM_SINK (bsink);
int rv = 0;
GstMapInfo map;
gboolean need_new_memory = FALSE;
GstFlowReturn ret = GST_FLOW_OK;
GstMemory *memory = NULL;
GstBuffer *sendbuf = NULL;
@ -702,22 +773,7 @@ gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf)
}
}
if (gst_buffer_n_memory (buf) > 1) {
GST_LOG_OBJECT (self, "Buffer %p has %d GstMemory, we only support a single"
" one, need to do a memcpy", buf, gst_buffer_n_memory (buf));
need_new_memory = TRUE;
} else {
memory = gst_buffer_peek_memory (buf, 0);
if (memory->allocator != GST_ALLOCATOR (self->allocator)) {
need_new_memory = TRUE;
GST_LOG_OBJECT (self, "Memory in buffer %p was not allocated by "
"%" GST_PTR_FORMAT ", will memcpy", buf, memory->allocator);
}
}
if (need_new_memory) {
if (!gst_shm_sink_is_usable_mem (self, buf)) {
if (gst_buffer_get_size (buf) > sp_writer_get_max_buf_size (self->pipe)) {
gsize area_size = sp_writer_get_max_buf_size (self->pipe);
GST_ELEMENT_ERROR (self, RESOURCE, NO_SPACE_LEFT, (NULL),
@ -790,7 +846,7 @@ gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf)
* We know it's not mapped for writing anywhere as we just mapped it for
* reading
*/
rv = sp_writer_send_buf (self->pipe, (char *) map.data, map.size, sendbuf);
rv = sp_writer_send_buf (self->pipe, (char *) map.data, map.size, sendbuf, NULL);
if (rv == -1) {
GST_ELEMENT_ERROR (self, STREAM, FAILED,
(NULL), ("Failed to send data over SHM"));
@ -868,15 +924,31 @@ pollthread_func (gpointer data)
GST_OBJECT_LOCK (self);
client = sp_writer_accept_client (self->pipe);
GST_OBJECT_UNLOCK (self);
if (!client) {
GST_ELEMENT_ERROR (self, RESOURCE, READ,
GST_OBJECT_UNLOCK (self);
GST_ELEMENT_ERROR (self, RESOURCE, READ,
("Failed to read from shmsink"),
("Control socket returns wrong data"));
return NULL;
}
if (self->streamheaders) {
guint i;
for (i = 0; i < gst_buffer_list_length (self->streamheaders); i++) {
GstBuffer *buf = gst_buffer_list_get (self->streamheaders, i);
gsize size = gst_buffer_get_size (buf);
ShmBlock *block = sp_writer_alloc_block (self->pipe, size);
g_assert (block);
gst_buffer_extract (buf, 0, sp_writer_block_get_buf (block), size);
sp_writer_send_buf (self->pipe, sp_writer_block_get_buf (block), size, NULL, client);
sp_writer_free_block (block);
}
}
gclient = g_new (struct GstShmClient, 1);
gclient->client = client;
gst_poll_fd_init (&gclient->pollfd);
@ -884,6 +956,8 @@ pollthread_func (gpointer data)
gst_poll_add_fd (self->poll, &gclient->pollfd);
gst_poll_fd_ctl_read (self->poll, &gclient->pollfd, TRUE);
self->clients = g_list_prepend (self->clients, gclient);
GST_OBJECT_UNLOCK (self);
g_signal_emit (self, signals[SIGNAL_CLIENT_CONNECTED], 0,
gclient->pollfd.fd);
/* we need to call gst_poll_wait before calling gst_poll_* status
@ -924,7 +998,7 @@ pollthread_func (gpointer data)
g_assert (rv == 0 || tag == NULL);
if (rv == 0)
if (tag && rv == 0)
gst_buffer_unref (tag);
}
continue;
@ -934,12 +1008,13 @@ pollthread_func (gpointer data)
GST_OBJECT_LOCK (self);
sp_writer_close_client (self->pipe, gclient->client,
(sp_buffer_free_callback) free_buffer_locked, (void **) &list);
gst_poll_remove_fd (self->poll, &gclient->pollfd);
self->clients = g_list_remove (self->clients, gclient);
GST_OBJECT_UNLOCK (self);
g_slist_free_full (list, (GDestroyNotify) gst_buffer_unref);
}
gst_poll_remove_fd (self->poll, &gclient->pollfd);
self->clients = g_list_remove (self->clients, gclient);
g_signal_emit (self, signals[SIGNAL_CLIENT_DISCONNECTED], 0,
gclient->pollfd.fd);

View file

@ -64,6 +64,8 @@ struct _GstShmSink
gboolean unlock;
GstClockTimeDiff buffer_time;
GstBufferList *streamheaders;
GCond cond;
GstShmSinkAllocator *allocator;

View file

@ -575,7 +575,7 @@ sp_writer_free_block (ShmBlock * block)
/* Returns the number of client this has successfully been sent to */
int
sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, void *tag)
sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, void *tag, ShmClient *single_client)
{
ShmArea *area = NULL;
unsigned long offset = 0;
@ -583,12 +583,25 @@ sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, void *tag)
ShmBuffer *sb;
ShmClient *client = NULL;
ShmAllocBlock *ablock = NULL;
int num_clients = 0;
int i = 0;
int c = 0;
if (self->num_clients == 0)
return 0;
if (single_client) {
for (client = self->clients; client; client = client->next) {
if (client == single_client)
break;
}
if (client == NULL)
return -1;
num_clients = 1;
} else {
num_clients = self->num_clients;
}
for (area = self->shm_area; area; area = area->next) {
if (buf >= area->shm_area_buf &&
buf < (area->shm_area_buf + area->shm_area_len)) {
@ -602,18 +615,20 @@ sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, void *tag)
if (!ablock)
return -1;
sb = spalloc_alloc (sizeof (ShmBuffer) + sizeof (int) * self->num_clients);
sb = spalloc_alloc (sizeof (ShmBuffer) + sizeof (int) * num_clients);
memset (sb, 0, sizeof (ShmBuffer));
memset (sb->clients, -1, sizeof (int) * self->num_clients);
memset (sb->clients, -1, sizeof (int) * num_clients);
sb->shm_area = area;
sb->offset = offset;
sb->size = size;
sb->num_clients = self->num_clients;
sb->num_clients = num_clients;
sb->ablock = ablock;
sb->tag = tag;
for (client = self->clients; client; client = client->next) {
struct CommandBuffer cb = { 0 };
if (single_client && client != single_client)
continue;
cb.payload.buffer.offset = offset;
cb.payload.buffer.size = bsize;
if (!send_command (client->fd, &cb, COMMAND_NEW_BUFFER, self->shm_area->id))
@ -623,7 +638,7 @@ sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, void *tag)
}
if (c == 0) {
spalloc_free1 (sizeof (ShmBuffer) + sizeof (int) * sb->num_clients, sb);
spalloc_free1 (sizeof (ShmBuffer) + sizeof (int) * num_clients, sb);
return 0;
}

View file

@ -96,7 +96,7 @@ int sp_writer_get_client_fd (ShmClient * client);
ShmBlock *sp_writer_alloc_block (ShmPipe * self, size_t size);
void sp_writer_free_block (ShmBlock *block);
int sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, void * tag);
int sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, void * tag, ShmClient *single_client);
char *sp_writer_block_get_buf (ShmBlock *block);
ShmPipe *sp_writer_block_get_pipe (ShmBlock *block);
size_t sp_writer_get_max_buf_size (ShmPipe * self);