rtpav1pay: Consider the marker flag to output packets immediately at the end of a frame

Otherwise it is necessary to wait for the beginning of the following
frame, which unnecessarily increases the latency.

Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/255

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1072>
This commit is contained in:
Sebastian Dröge 2023-02-02 19:46:46 +02:00
parent 49350f738f
commit 194c4e9e9f
2 changed files with 67 additions and 21 deletions

View file

@ -114,6 +114,7 @@ impl RTPAv1Pay {
&self,
state: &mut State,
data: &[u8],
marker: bool,
dts: Option<gst::ClockTime>,
pts: Option<gst::ClockTime>,
) -> Result<gst::BufferList, gst::FlowError> {
@ -193,7 +194,7 @@ impl RTPAv1Pay {
let mut list = gst::BufferList::new();
{
let list = list.get_mut().unwrap();
while let Some(packet_data) = self.consider_new_packet(state, false) {
while let Some(packet_data) = self.consider_new_packet(state, false, marker) {
let buffer = self.generate_new_packet(state, packet_data)?;
list.add(buffer);
}
@ -208,13 +209,21 @@ impl RTPAv1Pay {
///
/// If `true` is passed for `force`, packets of any size will be accepted,
/// which is used in flushing the last OBUs after receiving an EOS for example.
fn consider_new_packet(&self, state: &mut State, force: bool) -> Option<PacketOBUData> {
///
/// If `true` is passed for `marker` then all queued OBUs are considered to finish this TU.
fn consider_new_packet(
&self,
state: &mut State,
force: bool,
marker: bool,
) -> Option<PacketOBUData> {
gst::trace!(
CAT,
imp: self,
"{} new packet, currently storing {} OBUs",
"{} new packet, currently storing {} OBUs (marker {})",
if force { "forcing" } else { "considering" },
state.obus.len()
state.obus.len(),
marker,
);
let payload_limit = gst_rtp::calc_payload_len(self.obj().mtu(), 0, 0);
@ -226,7 +235,7 @@ impl RTPAv1Pay {
let mut required_ids = None::<(u8, u8)>;
// figure out how many OBUs we can fit into this packet
for obu in &state.obus {
for (idx, obu) in state.obus.iter().enumerate() {
// for OBUs with extension headers, spatial and temporal IDs must be equal
// to all other such OBUs in the packet
let matching_obu_ids = |obu: &SizedObu, required_ids: &mut Option<(u8, u8)>| -> bool {
@ -247,11 +256,20 @@ impl RTPAv1Pay {
gst::log!(CAT, imp: self, "ignoring temporal delimiter OBU");
if packet.obu_count > 0 {
if marker {
gst::warning!(
CAT,
imp: self,
"Temporal delimited in the middle of a frame"
);
}
packet.ends_temporal_unit = true;
if packet.obu_count > 3 {
packet.payload_size += pending_bytes;
packet.omit_last_size_field = false;
}
return Some(packet);
}
@ -264,6 +282,7 @@ impl RTPAv1Pay {
packet.payload_size += pending_bytes;
packet.omit_last_size_field = false;
}
packet.ends_temporal_unit = marker && idx == state.obus.len() - 1;
return Some(packet);
}
@ -279,6 +298,7 @@ impl RTPAv1Pay {
{
packet.obu_count += 1;
packet.payload_size += current.partial_size() + pending_bytes;
packet.ends_temporal_unit = marker && idx == state.obus.len() - 1;
return Some(packet);
}
@ -302,6 +322,7 @@ impl RTPAv1Pay {
packet.payload_size = payload_limit;
packet.omit_last_size_field = leb_size == 0;
} else if packet.obu_count > 3 {
packet.ends_temporal_unit = marker && idx == state.obus.len() - 1;
packet.payload_size += pending_bytes;
}
@ -309,11 +330,13 @@ impl RTPAv1Pay {
}
}
if force && packet.obu_count > 0 {
if (force || marker) && packet.obu_count > 0 {
if packet.obu_count > 3 {
packet.payload_size += pending_bytes;
packet.omit_last_size_field = false;
}
packet.ends_temporal_unit = true;
Some(packet)
} else {
// if we ran out of OBUs with space in the packet to spare, wait a bit longer
@ -621,7 +644,7 @@ impl RTPBasePayloadImpl for RTPAv1Pay {
let dts = buffer.dts();
let pts = buffer.pts();
let buffer = buffer.into_mapped_buffer_readable().map_err(|_| {
let map = buffer.map_readable().map_err(|_| {
gst::element_imp_error!(
self,
gst::ResourceError::Read,
@ -631,7 +654,10 @@ impl RTPBasePayloadImpl for RTPAv1Pay {
gst::FlowError::Error
})?;
let list = self.handle_new_obus(&mut state, buffer.as_slice(), dts, pts)?;
// Does the buffer finished a full TU?
let marker = buffer.flags().contains(gst::BufferFlags::MARKER);
let list = self.handle_new_obus(&mut state, map.as_slice(), marker, dts, pts)?;
drop(map);
drop(state);
if !list.is_empty() {
@ -652,7 +678,7 @@ impl RTPBasePayloadImpl for RTPAv1Pay {
let mut state = self.state.lock().unwrap();
let list = list.get_mut().unwrap();
while let Some(packet_data) = self.consider_new_packet(&mut state, true) {
while let Some(packet_data) = self.consider_new_packet(&mut state, true, true) {
match self.generate_new_packet(&mut state, packet_data) {
Ok(buffer) => list.add(buffer),
Err(_) => break,
@ -804,15 +830,25 @@ mod tests {
(
false,
State {
obus: VecDeque::from(vec![ObuData {
info: SizedObu {
obu_type: ObuType::Frame,
size: 4,
..base_obu
obus: VecDeque::from(vec![
ObuData {
info: SizedObu {
obu_type: ObuType::TemporalDelimiter,
size: 0,
..base_obu
},
..ObuData::default()
},
bytes: vec![1, 2, 3, 4],
..ObuData::default()
}]),
ObuData {
info: SizedObu {
obu_type: ObuType::Frame,
size: 4,
..base_obu
},
bytes: vec![1, 2, 3, 4],
..ObuData::default()
},
]),
..State::default()
},
),
@ -843,7 +879,7 @@ mod tests {
payload_size: 34,
last_obu_fragment_size: None,
omit_last_size_field: false,
ends_temporal_unit: false,
ends_temporal_unit: true,
}),
State {
obus: {
@ -854,7 +890,17 @@ mod tests {
..input_data[1].1
},
),
(None, input_data[2].1.clone()),
(
None,
State {
obus: {
let mut copy = input_data[2].1.obus.clone();
copy.pop_front().unwrap();
copy
},
..input_data[2].1
},
),
];
let element = <RTPAv1Pay as ObjectSubclass>::Type::new();
@ -866,7 +912,7 @@ mod tests {
*state = input_data[idx].1.clone();
assert_eq!(
pay.consider_new_packet(&mut state, input_data[idx].0),
pay.consider_new_packet(&mut state, input_data[idx].0, false),
results[idx].0,
);
assert_eq!(

View file

@ -167,7 +167,7 @@ fn test_payloader() {
0b0011_0100, 0b0100_1000, 1,
]
), (
false,
true, // because of EOS
90_000,
vec![
0b0001_0000,