Merge branch 'mr-6823-backport-into-1.24-tsmux-live-timeout' into '1.24'

Backport of "mpegtsmux: Correctly time out and mux anyway in live pipelines" into 1.24

See merge request gstreamer/gstreamer!6859
This commit is contained in:
Backport Bot 2024-05-15 19:43:43 +00:00
commit 538a82b7e7

View file

@ -856,18 +856,12 @@ is_valid_pmt_pid (guint16 pmt_pid)
/* Must be called with mux->lock held */
static GstFlowReturn
gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsMuxPad * ts_pad)
gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsMuxPad * ts_pad,
GstCaps * caps)
{
GstCaps *caps = gst_pad_get_current_caps (GST_PAD (ts_pad));
GstFlowReturn ret;
if (caps == NULL) {
GST_DEBUG_OBJECT (ts_pad, "Sink pad caps were not set before pushing");
return GST_FLOW_NOT_NEGOTIATED;
}
ret = gst_base_ts_mux_create_or_update_stream (mux, ts_pad, caps);
gst_caps_unref (caps);
if (ret == GST_FLOW_OK) {
tsmux_program_add_stream (ts_pad->prog, ts_pad->stream);
@ -904,12 +898,22 @@ get_pmt_pcr_sink (GstBaseTsMux * mux, const gchar * prop_name)
/* Must be called with mux->lock held */
static GstFlowReturn
gst_base_ts_mux_create_pad_stream (GstBaseTsMux * mux, GstPad * pad)
gst_base_ts_mux_create_pad_stream (GstBaseTsMux * mux, GstPad * pad,
gboolean allow_no_caps)
{
GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (pad);
gchar *name = NULL;
gchar *prop_name;
GstFlowReturn ret = GST_FLOW_OK;
GstCaps *caps = gst_pad_get_current_caps (pad);
if (caps == NULL) {
GST_DEBUG_OBJECT (ts_pad, "Sink pad caps were not set yet");
/* Try again later once the first buffer is pushed */
if (allow_no_caps)
return GST_FLOW_OK;
return GST_FLOW_NOT_NEGOTIATED;
}
if (ts_pad->prog_id == -1) {
name = GST_PAD_NAME (pad);
@ -967,7 +971,7 @@ gst_base_ts_mux_create_pad_stream (GstBaseTsMux * mux, GstPad * pad)
}
if (ts_pad->stream == NULL) {
ret = gst_base_ts_mux_create_stream (mux, ts_pad);
ret = gst_base_ts_mux_create_stream (mux, ts_pad, caps);
if (ret != GST_FLOW_OK)
goto no_stream;
}
@ -991,6 +995,7 @@ gst_base_ts_mux_create_pad_stream (GstBaseTsMux * mux, GstPad * pad)
tsmux_program_set_pcr_pid (ts_pad->prog, pcr_pid);
goto have_pcr_pid;
}
gchar *pcr_sink_name = get_pmt_pcr_sink (mux, prop_name);
if (!g_strcmp0 (GST_PAD_NAME (pad), pcr_sink_name)) {
GST_DEBUG_OBJECT (mux, "User specified stream (pid=%d) as PCR for "
@ -1001,6 +1006,7 @@ gst_base_ts_mux_create_pad_stream (GstBaseTsMux * mux, GstPad * pad)
have_pcr_pid:
g_clear_pointer (&prop_name, g_free);
gst_clear_caps (&caps);
return ret;
@ -1009,12 +1015,14 @@ no_program:
{
GST_ELEMENT_ERROR (mux, STREAM, MUX,
("Could not create new program"), (NULL));
gst_clear_caps (&caps);
return GST_FLOW_ERROR;
}
no_stream:
{
GST_ELEMENT_ERROR (mux, STREAM, MUX,
("Could not create handler for stream"), (NULL));
gst_clear_caps (&caps);
return ret;
}
}
@ -1026,7 +1034,8 @@ gst_base_ts_mux_create_pad_stream_func (GstElement * element, GstPad * pad,
{
GstFlowReturn *ret = user_data;
*ret = gst_base_ts_mux_create_pad_stream (GST_BASE_TS_MUX (element), pad);
*ret =
gst_base_ts_mux_create_pad_stream (GST_BASE_TS_MUX (element), pad, TRUE);
return *ret == GST_FLOW_OK;
}
@ -1090,9 +1099,13 @@ new_packet_common_init (GstBaseTsMux * mux, GstBuffer * buf, guint8 * data,
static GstFlowReturn
gst_base_ts_mux_push_packets (GstBaseTsMux * mux, gboolean force)
{
GstSegment *segment =
&GST_AGGREGATOR_PAD (GST_AGGREGATOR_SRC_PAD (mux))->segment;
GstBufferList *buffer_list;
gint align = mux->alignment;
gint av, packet_size;
GstFlowReturn flow_ret;
GstClockTime pts;
packet_size = mux->packet_size;
@ -1108,8 +1121,16 @@ gst_base_ts_mux_push_packets (GstBaseTsMux * mux, gboolean force)
/* no alignment, just push all available data */
if (align == 0) {
buffer_list = gst_adapter_take_buffer_list (mux->out_adapter, av);
return gst_aggregator_finish_buffer_list (GST_AGGREGATOR (mux),
flow_ret = gst_aggregator_finish_buffer_list (GST_AGGREGATOR (mux),
buffer_list);
pts = gst_adapter_prev_pts (mux->out_adapter, NULL);
if (GST_CLOCK_TIME_IS_VALID (pts)
&& (!GST_CLOCK_TIME_IS_VALID (segment->position)
|| segment->position < pts))
segment->position = pts;
return flow_ret;
}
align *= packet_size;
@ -1122,7 +1143,6 @@ gst_base_ts_mux_push_packets (GstBaseTsMux * mux, gboolean force)
GST_LOG_OBJECT (mux, "aligning to %d bytes", align);
while (align <= av) {
GstBuffer *buf;
GstClockTime pts;
pts = gst_adapter_prev_pts (mux->out_adapter, NULL);
buf = gst_adapter_take_buffer (mux->out_adapter, align);
@ -1135,7 +1155,6 @@ gst_base_ts_mux_push_packets (GstBaseTsMux * mux, gboolean force)
if (av > 0 && force) {
GstBuffer *buf;
GstClockTime pts;
guint8 *data;
guint32 header;
gint dummy;
@ -1185,7 +1204,16 @@ gst_base_ts_mux_push_packets (GstBaseTsMux * mux, gboolean force)
gst_buffer_list_add (buffer_list, buf);
}
return gst_aggregator_finish_buffer_list (GST_AGGREGATOR (mux), buffer_list);
flow_ret =
gst_aggregator_finish_buffer_list (GST_AGGREGATOR (mux), buffer_list);
pts = gst_adapter_prev_pts (mux->out_adapter, NULL);
if (GST_CLOCK_TIME_IS_VALID (pts)
&& (!GST_CLOCK_TIME_IS_VALID (segment->position)
|| segment->position < pts))
segment->position = pts;
return flow_ret;
}
static GstFlowReturn
@ -1352,7 +1380,13 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
if (prog == NULL) {
GList *cur;
gst_base_ts_mux_create_pad_stream (mux, GST_PAD (best));
ret = gst_base_ts_mux_create_pad_stream (mux, GST_PAD (best), FALSE);
if (G_UNLIKELY (ret != GST_FLOW_OK)) {
if (buf)
gst_buffer_unref (buf);
g_mutex_unlock (&mux->lock);
return ret;
}
tsmux_resend_pat (mux->tsmux);
tsmux_resend_si (mux->tsmux);
prog = best->prog;
@ -2802,6 +2836,7 @@ gst_base_ts_mux_class_init (GstBaseTsMuxClass * klass)
gstagg_class->src_event = gst_base_ts_mux_src_event;
gstagg_class->start = gst_base_ts_mux_start;
gstagg_class->stop = gst_base_ts_mux_stop;
gstagg_class->get_next_time = gst_aggregator_simple_get_next_time;
klass->create_ts_mux = gst_base_ts_mux_default_create_ts_mux;
klass->allocate_packet = gst_base_ts_mux_default_allocate_packet;