shmsink: Send streamheaders to every client on connection

This commit is contained in:
Olivier Crête 2024-01-19 18:33:50 -05:00
parent 18db1d530e
commit eb975af5e7
4 changed files with 94 additions and 9 deletions

View file

@ -94,7 +94,8 @@ static void gst_shm_sink_get_property (GObject * object, guint prop_id,
static gboolean gst_shm_sink_start (GstBaseSink * bsink);
static gboolean gst_shm_sink_stop (GstBaseSink * bsink);
static GstFlowReturn gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf);
static gboolean gst_shm_sink_set_caps (GstBaseSink *bsink, GstCaps *caps);
static GstFlowReturn gst_shm_sink_render (GstBaseSink *bsink, GstBuffer *buf);
static gboolean gst_shm_sink_event (GstBaseSink * bsink, GstEvent * event);
static gboolean gst_shm_sink_unlock (GstBaseSink * bsink);
@ -378,6 +379,7 @@ gst_shm_sink_class_init (GstShmSinkClass * klass)
gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_shm_sink_start);
gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_shm_sink_stop);
gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR (gst_shm_sink_set_caps);
gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_shm_sink_render);
gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_shm_sink_event);
gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_shm_sink_unlock);
@ -623,6 +625,8 @@ gst_shm_sink_stop (GstBaseSink * bsink)
GST_DEBUG_OBJECT (self, "Stopping");
gst_clear_buffer_list (&self->streamheaders);
while (self->clients) {
struct GstShmClient *client = self->clients->data;
self->clients = g_list_remove (self->clients, client);
@ -642,6 +646,54 @@ gst_shm_sink_stop (GstBaseSink * bsink)
return TRUE;
}
static gboolean
gst_shm_sink_set_caps (GstBaseSink *bsink, GstCaps *caps)
{
GstShmSink *self = GST_SHM_SINK (bsink);
GstStructure *s;
s = gst_caps_get_structure (caps, 0);
GST_OBJECT_LOCK (self);
gst_clear_buffer_list (&self->streamheaders);
if (gst_structure_has_field_typed (s, "streamheader", GST_TYPE_ARRAY)) {
const GValue *streamheader;
guint i, size;
streamheader = gst_structure_get_value (s, "streamheader");
GST_DEBUG_OBJECT (self, "'streamheader' field holds array");
size = gst_value_array_get_size (streamheader);
self->streamheaders = gst_buffer_list_new_sized (size);
for (i = 0; i < size; i++) {
const GValue *v = gst_value_array_get_value (streamheader, i);
if (!GST_VALUE_HOLDS_BUFFER (v)) {
GST_ERROR_OBJECT (self, "'streamheader' item of unexpected type '%s'",
G_VALUE_TYPE_NAME (v));
GST_OBJECT_UNLOCK (self);
return FALSE;
}
gst_buffer_list_add (self->streamheaders, g_value_dup_boxed (v));
}
} else if (gst_structure_has_field_typed (s, "streamheader",
GST_TYPE_BUFFER)) {
GstBuffer *buf;
gst_structure_get (s, "streamheader", GST_TYPE_BUFFER, &buf, NULL);
gst_buffer_list_add (self->streamheaders, buf);
}
GST_OBJECT_UNLOCK (self);
return TRUE;
}
static gboolean
gst_shm_sink_can_render (GstShmSink * self, GstClockTime time)
{
@ -794,7 +846,7 @@ gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf)
* We know it's not mapped for writing anywhere as we just mapped it for
* reading
*/
rv = sp_writer_send_buf (self->pipe, (char *) map.data, map.size, sendbuf);
rv = sp_writer_send_buf (self->pipe, (char *) map.data, map.size, sendbuf, NULL);
if (rv == -1) {
GST_ELEMENT_ERROR (self, STREAM, FAILED,
(NULL), ("Failed to send data over SHM"));
@ -881,6 +933,22 @@ pollthread_func (gpointer data)
return NULL;
}
if (self->streamheaders) {
guint i;
for (i = 0; i < gst_buffer_list_length (self->streamheaders); i++) {
GstBuffer *buf = gst_buffer_list_get (self->streamheaders, i);
gsize size = gst_buffer_get_size (buf);
ShmBlock *block = sp_writer_alloc_block (self->pipe, size);
g_assert (block);
gst_buffer_extract (buf, 0, sp_writer_block_get_buf (block), size);
sp_writer_send_buf (self->pipe, sp_writer_block_get_buf (block), size, NULL, client);
sp_writer_free_block (block);
}
}
gclient = g_new (struct GstShmClient, 1);
gclient->client = client;
gst_poll_fd_init (&gclient->pollfd);
@ -930,7 +998,7 @@ pollthread_func (gpointer data)
g_assert (rv == 0 || tag == NULL);
if (rv == 0)
if (tag && rv == 0)
gst_buffer_unref (tag);
}
continue;

View file

@ -64,6 +64,8 @@ struct _GstShmSink
gboolean unlock;
GstClockTimeDiff buffer_time;
GstBufferList *streamheaders;
GCond cond;
GstShmSinkAllocator *allocator;

View file

@ -575,7 +575,7 @@ sp_writer_free_block (ShmBlock * block)
/* Returns the number of client this has successfully been sent to */
int
sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, void *tag)
sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, void *tag, ShmClient *single_client)
{
ShmArea *area = NULL;
unsigned long offset = 0;
@ -583,12 +583,25 @@ sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, void *tag)
ShmBuffer *sb;
ShmClient *client = NULL;
ShmAllocBlock *ablock = NULL;
int num_clients = 0;
int i = 0;
int c = 0;
if (self->num_clients == 0)
return 0;
if (single_client) {
for (client = self->clients; client; client = client->next) {
if (client == single_client)
break;
}
if (client == NULL)
return -1;
num_clients = 1;
} else {
num_clients = self->num_clients;
}
for (area = self->shm_area; area; area = area->next) {
if (buf >= area->shm_area_buf &&
buf < (area->shm_area_buf + area->shm_area_len)) {
@ -602,18 +615,20 @@ sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, void *tag)
if (!ablock)
return -1;
sb = spalloc_alloc (sizeof (ShmBuffer) + sizeof (int) * self->num_clients);
sb = spalloc_alloc (sizeof (ShmBuffer) + sizeof (int) * num_clients);
memset (sb, 0, sizeof (ShmBuffer));
memset (sb->clients, -1, sizeof (int) * self->num_clients);
memset (sb->clients, -1, sizeof (int) * num_clients);
sb->shm_area = area;
sb->offset = offset;
sb->size = size;
sb->num_clients = self->num_clients;
sb->num_clients = num_clients;
sb->ablock = ablock;
sb->tag = tag;
for (client = self->clients; client; client = client->next) {
struct CommandBuffer cb = { 0 };
if (single_client && client != single_client)
continue;
cb.payload.buffer.offset = offset;
cb.payload.buffer.size = bsize;
if (!send_command (client->fd, &cb, COMMAND_NEW_BUFFER, self->shm_area->id))
@ -623,7 +638,7 @@ sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, void *tag)
}
if (c == 0) {
spalloc_free1 (sizeof (ShmBuffer) + sizeof (int) * sb->num_clients, sb);
spalloc_free1 (sizeof (ShmBuffer) + sizeof (int) * num_clients, sb);
return 0;
}

View file

@ -96,7 +96,7 @@ int sp_writer_get_client_fd (ShmClient * client);
ShmBlock *sp_writer_alloc_block (ShmPipe * self, size_t size);
void sp_writer_free_block (ShmBlock *block);
int sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, void * tag);
int sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, void * tag, ShmClient *single_client);
char *sp_writer_block_get_buf (ShmBlock *block);
ShmPipe *sp_writer_block_get_pipe (ShmBlock *block);
size_t sp_writer_get_max_buf_size (ShmPipe * self);