diff --git a/mux/fmp4/src/fmp4mux/imp.rs b/mux/fmp4/src/fmp4mux/imp.rs index c3886158..d6fede5d 100644 --- a/mux/fmp4/src/fmp4mux/imp.rs +++ b/mux/fmp4/src/fmp4mux/imp.rs @@ -984,27 +984,6 @@ impl FMP4Mux { // Check if this stream is filled enough now. if let Some(chunk_duration) = settings.chunk_duration { // In chunk mode - let (gop_idx, gop) = match stream - .queued_gops - .iter() - .enumerate() - .find(|(_idx, gop)| gop.final_earliest_pts || all_eos || stream.sinkpad.is_eos()) - { - Some(res) => res, - None => { - gst::trace!(CAT, obj: stream.sinkpad, "Chunked mode but no GOP with final earliest PTS known yet"); - return; - } - }; - - gst::trace!( - CAT, - obj: stream.sinkpad, - "GOP {gop_idx} start PTS {}, GOP end PTS {} (final {})", - gop.start_pts, - gop.end_pts, - gop.final_end_pts || all_eos || stream.sinkpad.is_eos(), - ); gst::trace!( CAT, obj: stream.sinkpad, @@ -1026,10 +1005,53 @@ impl FMP4Mux { // First check if the next split should be the end of a fragment or the end of a chunk. // If both are the same then a fragment split has preference. - if fragment_end_pts <= chunk_end_pts && gop.start_pts >= fragment_end_pts { - gst::debug!(CAT, obj: stream.sinkpad, "Stream queued enough data for finishing this fragment"); - stream.fragment_filled = true; - } else if chunk_end_pts < fragment_end_pts { + if fragment_end_pts <= chunk_end_pts { + // We can only finish a fragment if a full GOP with final end PTS is queued and it + // ends at or after the fragment end PTS. + if let Some((gop_idx, gop)) = stream + .queued_gops + .iter() + .enumerate() + .find(|(_idx, gop)| gop.final_end_pts || all_eos || stream.sinkpad.is_eos()) + { + gst::trace!( + CAT, + obj: stream.sinkpad, + "GOP {gop_idx} start PTS {}, GOP end PTS {}", + gop.start_pts, + gop.end_pts, + ); + if gop.end_pts >= fragment_end_pts { + gst::debug!(CAT, obj: stream.sinkpad, "Stream queued enough data for finishing this fragment"); + stream.fragment_filled = true; + return; + } + } + } + + if !stream.fragment_filled { + let (gop_idx, gop) = match stream.queued_gops.iter().enumerate().find( + |(_idx, gop)| gop.final_earliest_pts || all_eos || stream.sinkpad.is_eos(), + ) { + Some(res) => res, + None => { + gst::trace!( + CAT, + obj: stream.sinkpad, + "Chunked mode and want to finish fragment but no GOP with final end PTS known yet", + ); + return; + } + }; + + gst::trace!( + CAT, + obj: stream.sinkpad, + "GOP {gop_idx} start PTS {}, GOP end PTS {} (final {})", + gop.start_pts, + gop.end_pts, + gop.final_end_pts || all_eos || stream.sinkpad.is_eos(), + ); let last_pts = gop.buffers.last().map(|b| b.pts); if gop.end_pts >= chunk_end_pts @@ -1287,8 +1309,15 @@ impl FMP4Mux { gop.final_end_pts || all_eos || stream.sinkpad.is_eos() ); + // If we have a final GOP then include it as long as it's either + // - ending before the dequeue end PTS + // - no GOPs were dequeued yet and this is the first stream + // + // The second case would happen if no GOP ends between the last chunk of the + // fragment and the fragment duration. if (gop.final_end_pts || all_eos || stream.sinkpad.is_eos()) - && gop.end_pts <= dequeue_end_pts + && (gop.end_pts <= dequeue_end_pts + || (gops.is_empty() && chunk_end_pts.is_none())) { gst::trace!( CAT, @@ -1302,7 +1331,16 @@ impl FMP4Mux { break; } - gst::error!(CAT, obj: stream.sinkpad, "Don't have a full GOP at the end of a fragment"); + // Otherwise if this is the first stream and no full GOP is queued then we need + // to wait for more data. + // + // If this is not the first stream then take an incomplete GOP. + if chunk_end_pts.is_none() { + gst::info!(CAT, obj: stream.sinkpad, "Don't have a full GOP at the end of a fragment"); + return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); + } else { + gst::info!(CAT, obj: stream.sinkpad, "Including incomplete GOP"); + } } else { gst::trace!( CAT, @@ -1770,7 +1808,7 @@ impl FMP4Mux { // This is handled below generally if nothing was dequeued } else { if settings.chunk_duration.is_some() { - gst::warning!( + gst::debug!( CAT, obj: stream.sinkpad, "Don't have anything to drain for the first stream on timeout in a live pipeline", @@ -2144,6 +2182,19 @@ impl FMP4Mux { let min_earliest_pts = min_earliest_pts.unwrap(); let chunk_end_pts = chunk_end_pts.unwrap(); + gst::debug!( + CAT, + imp: self, + concat!( + "Draining chunk (fragment start: {} fragment end: {}) ", + "from PTS {} to {}" + ), + fragment_start, + fragment_filled, + min_earliest_pts, + chunk_end_pts, + ); + let mut fmp4_header = None; if !state.sent_headers { let mut buffer = state.stream_header.as_ref().unwrap().copy(); @@ -2315,11 +2366,7 @@ impl FMP4Mux { Err(err) => { if err == gst_base::AGGREGATOR_FLOW_NEED_DATA { assert!(!all_eos); - gst::element_imp_warning!( - self, - gst::StreamError::Format, - ["Longer GOPs than fragment duration"] - ); + gst::debug!(CAT, imp: self, "Need more data"); state.timeout_delay += 1.seconds(); }