diff --git a/net/rtp/src/av1/pay/imp.rs b/net/rtp/src/av1/pay/imp.rs index bd6aa8d1..09fa9519 100644 --- a/net/rtp/src/av1/pay/imp.rs +++ b/net/rtp/src/av1/pay/imp.rs @@ -114,6 +114,7 @@ impl RTPAv1Pay { &self, state: &mut State, data: &[u8], + marker: bool, dts: Option, pts: Option, ) -> Result { @@ -193,7 +194,7 @@ impl RTPAv1Pay { let mut list = gst::BufferList::new(); { let list = list.get_mut().unwrap(); - while let Some(packet_data) = self.consider_new_packet(state, false) { + while let Some(packet_data) = self.consider_new_packet(state, false, marker) { let buffer = self.generate_new_packet(state, packet_data)?; list.add(buffer); } @@ -208,13 +209,21 @@ impl RTPAv1Pay { /// /// If `true` is passed for `force`, packets of any size will be accepted, /// which is used in flushing the last OBUs after receiving an EOS for example. - fn consider_new_packet(&self, state: &mut State, force: bool) -> Option { + /// + /// If `true` is passed for `marker` then all queued OBUs are considered to finish this TU. + fn consider_new_packet( + &self, + state: &mut State, + force: bool, + marker: bool, + ) -> Option { gst::trace!( CAT, imp: self, - "{} new packet, currently storing {} OBUs", + "{} new packet, currently storing {} OBUs (marker {})", if force { "forcing" } else { "considering" }, - state.obus.len() + state.obus.len(), + marker, ); let payload_limit = gst_rtp::calc_payload_len(self.obj().mtu(), 0, 0); @@ -226,7 +235,7 @@ impl RTPAv1Pay { let mut required_ids = None::<(u8, u8)>; // figure out how many OBUs we can fit into this packet - for obu in &state.obus { + for (idx, obu) in state.obus.iter().enumerate() { // for OBUs with extension headers, spatial and temporal IDs must be equal // to all other such OBUs in the packet let matching_obu_ids = |obu: &SizedObu, required_ids: &mut Option<(u8, u8)>| -> bool { @@ -247,11 +256,20 @@ impl RTPAv1Pay { gst::log!(CAT, imp: self, "ignoring temporal delimiter OBU"); if packet.obu_count > 0 { + if marker { + gst::warning!( + CAT, + imp: self, + "Temporal delimited in the middle of a frame" + ); + } + packet.ends_temporal_unit = true; if packet.obu_count > 3 { packet.payload_size += pending_bytes; packet.omit_last_size_field = false; } + return Some(packet); } @@ -264,6 +282,7 @@ impl RTPAv1Pay { packet.payload_size += pending_bytes; packet.omit_last_size_field = false; } + packet.ends_temporal_unit = marker && idx == state.obus.len() - 1; return Some(packet); } @@ -279,6 +298,7 @@ impl RTPAv1Pay { { packet.obu_count += 1; packet.payload_size += current.partial_size() + pending_bytes; + packet.ends_temporal_unit = marker && idx == state.obus.len() - 1; return Some(packet); } @@ -302,6 +322,7 @@ impl RTPAv1Pay { packet.payload_size = payload_limit; packet.omit_last_size_field = leb_size == 0; } else if packet.obu_count > 3 { + packet.ends_temporal_unit = marker && idx == state.obus.len() - 1; packet.payload_size += pending_bytes; } @@ -309,11 +330,13 @@ impl RTPAv1Pay { } } - if force && packet.obu_count > 0 { + if (force || marker) && packet.obu_count > 0 { if packet.obu_count > 3 { packet.payload_size += pending_bytes; packet.omit_last_size_field = false; } + packet.ends_temporal_unit = true; + Some(packet) } else { // if we ran out of OBUs with space in the packet to spare, wait a bit longer @@ -621,7 +644,7 @@ impl RTPBasePayloadImpl for RTPAv1Pay { let dts = buffer.dts(); let pts = buffer.pts(); - let buffer = buffer.into_mapped_buffer_readable().map_err(|_| { + let map = buffer.map_readable().map_err(|_| { gst::element_imp_error!( self, gst::ResourceError::Read, @@ -631,7 +654,10 @@ impl RTPBasePayloadImpl for RTPAv1Pay { gst::FlowError::Error })?; - let list = self.handle_new_obus(&mut state, buffer.as_slice(), dts, pts)?; + // Does the buffer finished a full TU? + let marker = buffer.flags().contains(gst::BufferFlags::MARKER); + let list = self.handle_new_obus(&mut state, map.as_slice(), marker, dts, pts)?; + drop(map); drop(state); if !list.is_empty() { @@ -652,7 +678,7 @@ impl RTPBasePayloadImpl for RTPAv1Pay { let mut state = self.state.lock().unwrap(); let list = list.get_mut().unwrap(); - while let Some(packet_data) = self.consider_new_packet(&mut state, true) { + while let Some(packet_data) = self.consider_new_packet(&mut state, true, true) { match self.generate_new_packet(&mut state, packet_data) { Ok(buffer) => list.add(buffer), Err(_) => break, @@ -804,15 +830,25 @@ mod tests { ( false, State { - obus: VecDeque::from(vec![ObuData { - info: SizedObu { - obu_type: ObuType::Frame, - size: 4, - ..base_obu + obus: VecDeque::from(vec![ + ObuData { + info: SizedObu { + obu_type: ObuType::TemporalDelimiter, + size: 0, + ..base_obu + }, + ..ObuData::default() }, - bytes: vec![1, 2, 3, 4], - ..ObuData::default() - }]), + ObuData { + info: SizedObu { + obu_type: ObuType::Frame, + size: 4, + ..base_obu + }, + bytes: vec![1, 2, 3, 4], + ..ObuData::default() + }, + ]), ..State::default() }, ), @@ -843,7 +879,7 @@ mod tests { payload_size: 34, last_obu_fragment_size: None, omit_last_size_field: false, - ends_temporal_unit: false, + ends_temporal_unit: true, }), State { obus: { @@ -854,7 +890,17 @@ mod tests { ..input_data[1].1 }, ), - (None, input_data[2].1.clone()), + ( + None, + State { + obus: { + let mut copy = input_data[2].1.obus.clone(); + copy.pop_front().unwrap(); + copy + }, + ..input_data[2].1 + }, + ), ]; let element = ::Type::new(); @@ -866,7 +912,7 @@ mod tests { *state = input_data[idx].1.clone(); assert_eq!( - pay.consider_new_packet(&mut state, input_data[idx].0), + pay.consider_new_packet(&mut state, input_data[idx].0, false), results[idx].0, ); assert_eq!( diff --git a/net/rtp/tests/rtpav1.rs b/net/rtp/tests/rtpav1.rs index cf2479d4..3ae68b42 100644 --- a/net/rtp/tests/rtpav1.rs +++ b/net/rtp/tests/rtpav1.rs @@ -167,7 +167,7 @@ fn test_payloader() { 0b0011_0100, 0b0100_1000, 1, ] ), ( - false, + true, // because of EOS 90_000, vec![ 0b0001_0000,