Merge branch 'cmafmux-max-fragment-size' into 'main'

Draft: fmp4mux: Add support for creating fragment based on size

See merge request gstreamer/gst-plugins-rs!1359
This commit is contained in:
Oscar Linde 2024-04-27 23:13:51 +00:00
commit 6673e63dc5

View file

@ -101,6 +101,7 @@ const DEFAULT_WRITE_MFRA: bool = false;
const DEFAULT_WRITE_MEHD: bool = false;
const DEFAULT_INTERLEAVE_BYTES: Option<u64> = None;
const DEFAULT_INTERLEAVE_TIME: Option<gst::ClockTime> = Some(gst::ClockTime::from_mseconds(250));
const DEFAULT_FRAGMENT_MAX_SIZE: u32 = u32::MAX;
#[derive(Debug, Clone)]
struct Settings {
@ -113,6 +114,7 @@ struct Settings {
interleave_time: Option<gst::ClockTime>,
movie_timescale: u32,
offset_to_zero: bool,
fragment_max_size: u32,
}
impl Default for Settings {
@ -127,6 +129,7 @@ impl Default for Settings {
interleave_time: DEFAULT_INTERLEAVE_TIME,
movie_timescale: 0,
offset_to_zero: false,
fragment_max_size: DEFAULT_FRAGMENT_MAX_SIZE,
}
}
}
@ -214,6 +217,10 @@ struct Stream {
fragment_filled: bool,
/// Whether a whole chunk is queued.
chunk_filled: bool,
/// Current size of the stream
size: usize,
/// Whether the fragment max size is reached.
max_size_reached: bool,
/// Difference between the first DTS and 0 in case of negative DTS
dts_offset: Option<gst::ClockTime>,
@ -688,6 +695,7 @@ impl FMP4Mux {
/// Queue incoming buffer as individual GOPs.
fn queue_gops(
&self,
settings: &Settings,
stream: &mut Stream,
mut pre_queued_buffer: PreQueuedBuffer,
) -> Result<(), gst::FlowError> {
@ -798,6 +806,7 @@ impl FMP4Mux {
stream.dts_offset.display(),
);
stream.size += buffer.size();
let gop = Gop {
start_pts: pts,
start_dts: dts,
@ -829,7 +838,7 @@ impl FMP4Mux {
prev_gop.end_pts = std::cmp::max(prev_gop.end_pts, pts);
prev_gop.end_dts = std::cmp::max(prev_gop.end_dts, dts);
if !delta_frames.requires_dts() {
if !delta_frames.requires_dts() || settings.chunk_duration.is_none() {
prev_gop.final_end_pts = true;
}
@ -854,6 +863,7 @@ impl FMP4Mux {
} else if let Some(gop) = stream.queued_gops.front_mut() {
assert!(!delta_frames.intra_only());
stream.size += buffer.size();
gop.end_pts = std::cmp::max(gop.end_pts, end_pts);
gop.end_dts = gop.end_dts.opt_max(end_dts);
gop.buffers.push(GopBuffer {
@ -962,7 +972,7 @@ impl FMP4Mux {
let pre_queued_buffer = Self::pop_buffer(self, stream);
// Queue up the buffer and update GOP tracking state
self.queue_gops(stream, pre_queued_buffer)?;
self.queue_gops(settings, stream, pre_queued_buffer)?;
// Check if this stream is filled enough now.
self.check_stream_filled(settings, stream, fragment_start_pts, chunk_start_pts, false);
@ -988,6 +998,22 @@ impl FMP4Mux {
_ => return,
};
if !stream.max_size_reached {
if stream.size > settings.fragment_max_size as usize {
gst::debug!(CAT, "Reached fragment max size ({} bytes)", stream.size);
stream.max_size_reached = true;
// Send force-keyunit event to instatly request a new I-frame
// so that the current gop and fragment can be completed.
let fku = gst_video::UpstreamForceKeyUnitEvent::builder()
.running_time(None)
.all_headers(true)
.build();
stream.sinkpad.push_event(fku);
}
}
// Check if this stream is filled enough now.
if let Some(chunk_duration) = settings.chunk_duration {
// In chunk mode
@ -1183,6 +1209,17 @@ impl FMP4Mux {
gst::debug!(CAT, obj: stream.sinkpad, "Stream queued enough data for this fragment");
stream.fragment_filled = true;
}
// Check if we have complete GOPs that have reached fragment max size
if stream.max_size_reached {
let complete_gops_size = stream.queued_gops.iter()
.filter(|gop| gop.final_end_pts)
.map(|gop| gop.buffers.iter().map(|gb| gb.buffer.size()).sum::<usize>())
.sum::<usize>();
if complete_gops_size >= settings.fragment_max_size as usize {
stream.fragment_filled = true;
}
}
}
}
@ -1619,6 +1656,7 @@ impl FMP4Mux {
obj: stream.sinkpad,
"Pushing complete GOP",
);
stream.size -= gop.buffers.iter().map(|gb| gb.buffer.size()).sum::<usize>();
gops.push(stream.queued_gops.pop_back().unwrap());
}
}
@ -1897,6 +1935,7 @@ impl FMP4Mux {
)?;
stream.fragment_filled = false;
stream.chunk_filled = false;
stream.max_size_reached = false;
// If we don't have a next chunk start PTS then this is the first stream as above.
if chunk_end_pts.is_none() {
@ -2627,6 +2666,8 @@ impl FMP4Mux {
queued_gops: VecDeque::new(),
fragment_filled: false,
chunk_filled: false,
size: 0,
max_size_reached: false,
dts_offset: None,
current_position: gst::ClockTime::ZERO,
running_time_utc_time_mapping: None,
@ -2883,6 +2924,12 @@ impl ObjectImpl for FMP4Mux {
.blurb("Timescale to use for the movie (units per second, 0 is automatic)")
.mutable_ready()
.build(),
glib::ParamSpecUInt::builder("fragment-max-size")
.nick("Fragment max size in bytes")
.blurb("Maximum byte size for each FMP4 fragment, triumphs fragment-duration")
.default_value(DEFAULT_FRAGMENT_MAX_SIZE)
.mutable_ready()
.build(),
]
});
@ -2953,6 +3000,14 @@ impl ObjectImpl for FMP4Mux {
settings.movie_timescale = value.get().expect("type checked upstream");
}
"fragment-max-size" => {
let mut settings = self.settings.lock().unwrap();
let fragment_max_size = value.get().expect("type checked upstream");
if settings.fragment_max_size != fragment_max_size {
settings.fragment_max_size = fragment_max_size;
}
}
_ => unimplemented!(),
}
}
@ -2999,6 +3054,11 @@ impl ObjectImpl for FMP4Mux {
settings.movie_timescale.to_value()
}
"fragment-max-size" => {
let settings = self.settings.lock().unwrap();
settings.fragment_max_size.to_value()
}
_ => unimplemented!(),
}
}