streamproducer: extract process_sample function

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/merge_requests/1280>
This commit is contained in:
Mathieu Duponchelle 2023-06-24 00:29:11 +02:00
parent 214f61abc5
commit a4247d5199

View file

@ -217,6 +217,113 @@ impl StreamProducer {
Ok(())
}
fn process_sample(
sample: gst::Sample,
appsink: &gst_app::AppSink,
consumers: &mut StreamConsumers,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let (is_discont, is_keyframe) = if let Some(buf) = sample.buffer() {
let flags = buf.flags();
(
flags.contains(gst::BufferFlags::DISCONT),
!flags.contains(gst::BufferFlags::DELTA_UNIT),
)
} else {
(false, true)
};
gst::trace!(
CAT,
obj: appsink,
"processing preroll {:?}",
sample.buffer()
);
let latency = consumers.current_latency;
let latency_updated = mem::replace(&mut consumers.latency_updated, false);
let mut needs_keyframe_request = false;
let current_consumers = consumers
.consumers
.values()
.filter_map(|consumer| {
if let Some(latency) = latency {
if consumer
.forwarded_latency
.compare_exchange(
false,
true,
atomic::Ordering::SeqCst,
atomic::Ordering::SeqCst,
)
.is_ok()
|| latency_updated
{
consumer.appsrc.set_latency(latency, gst::ClockTime::NONE);
}
}
if consumer.discard.load(atomic::Ordering::SeqCst) {
consumer
.needs_keyframe
.store(false, atomic::Ordering::SeqCst);
return None;
}
if is_discont && !is_keyframe {
// Whenever we have a discontinuity, we need a new keyframe
consumer
.needs_keyframe
.store(true, atomic::Ordering::SeqCst);
}
if !is_keyframe && consumer.needs_keyframe.load(atomic::Ordering::SeqCst) {
// If we need a keyframe (and this one isn't) request a keyframe upstream
if !needs_keyframe_request {
gst::debug!(CAT, obj: appsink, "Requesting keyframe for first buffer");
needs_keyframe_request = true;
}
consumer.dropped.fetch_add(1, atomic::Ordering::SeqCst);
gst::debug!(
CAT,
obj: appsink,
"Ignoring frame for {} while waiting for a keyframe",
consumer.appsrc.name()
);
None
} else {
consumer
.needs_keyframe
.store(false, atomic::Ordering::SeqCst);
consumer.pushed.fetch_add(1, atomic::Ordering::SeqCst);
Some(consumer.appsrc.clone())
}
})
.collect::<Vec<_>>();
drop(consumers);
if needs_keyframe_request {
appsink.send_event(
gst_video::UpstreamForceKeyUnitEvent::builder()
.all_headers(true)
.build(),
);
}
for consumer in current_consumers {
if let Err(err) = consumer.push_sample(&sample) {
gst::warning!(CAT, obj: appsink, "Failed to push sample: {}", err);
}
}
Ok(gst::FlowSuccess::Ok)
}
/// Remove a consumer appsrc by id
pub fn remove_consumer(&self, consumer: &gst_app::AppSrc) {
let name = consumer.name();
@ -290,92 +397,7 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
}
};
let (is_discont, is_keyframe) = if let Some(buf) = sample.buffer() {
let flags = buf.flags();
(flags.contains(gst::BufferFlags::DISCONT),
!flags.contains(gst::BufferFlags::DELTA_UNIT))
} else {
(false, true)
};
gst::trace!(CAT, obj: appsink, "processing sample");
let latency = consumers.current_latency;
let latency_updated = mem::replace(&mut consumers.latency_updated, false);
let mut needs_keyframe_request = false;
let current_consumers = consumers
.consumers
.values()
.filter_map(|consumer| {
if let Some(latency) = latency {
if consumer.forwarded_latency
.compare_exchange(
false,
true,
atomic::Ordering::SeqCst,
atomic::Ordering::SeqCst,
)
.is_ok()
|| latency_updated
{
consumer.appsrc.set_latency(latency, gst::ClockTime::NONE);
}
}
if consumer.discard.load(atomic::Ordering::SeqCst) {
consumer.needs_keyframe.store(false, atomic::Ordering::SeqCst);
return None;
}
if is_discont && !is_keyframe {
// Whenever we have a discontinuity, we need a new keyframe
consumer.needs_keyframe.store(
true,
atomic::Ordering::SeqCst,
);
}
if !is_keyframe && consumer.needs_keyframe.load(atomic::Ordering::SeqCst)
{
// If we need a keyframe (and this one isn't) request a keyframe upstream
if !needs_keyframe_request {
gst::debug!(CAT, obj: appsink, "Requesting keyframe for first buffer");
needs_keyframe_request = true;
}
consumer.dropped.fetch_add(1, atomic::Ordering::SeqCst);
gst::debug!(CAT, obj: appsink, "Ignoring frame for {} while waiting for a keyframe",
consumer.appsrc.name());
None
} else {
consumer.needs_keyframe.store(false, atomic::Ordering::SeqCst);
consumer.pushed.fetch_add(1, atomic::Ordering::SeqCst);
Some(consumer.appsrc.clone())
}
})
.collect::<Vec<_>>();
drop(consumers);
if needs_keyframe_request {
appsink.send_event(
gst_video::UpstreamForceKeyUnitEvent::builder()
.all_headers(true)
.build(),
);
}
for consumer in current_consumers {
if let Err(err) = consumer.push_sample(&sample) {
gst::warning!(CAT, obj: appsink, "Failed to push sample: {}", err);
}
}
Ok(gst::FlowSuccess::Ok)
StreamProducer::process_sample(sample, appsink, &mut consumers)
}))
.new_event(glib::clone!(@strong consumers => move |appsink| {
match appsink.pull_object().map(|obj| obj.downcast::<gst::Event>()) {