diff --git a/net/rtp/src/gcc/imp.rs b/net/rtp/src/gcc/imp.rs index a2655ba5..0f108bf4 100644 --- a/net/rtp/src/gcc/imp.rs +++ b/net/rtp/src/gcc/imp.rs @@ -19,6 +19,7 @@ */ use gst::{glib, prelude::*, subclass::prelude::*}; use once_cell::sync::Lazy; +use smallvec::SmallVec; use std::{ collections::{BTreeMap, VecDeque}, fmt, @@ -29,6 +30,7 @@ use std::{ use time::Duration; type Bitrate = u32; +type BufferList = SmallVec<[gst::Buffer; 10]>; const DEFAULT_MIN_BITRATE: Bitrate = 1000; const DEFAULT_ESTIMATED_BITRATE: Bitrate = 2_048_000; @@ -748,7 +750,7 @@ impl Default for State { impl State { // 4. sending engine implementing a "leaky bucket" - fn create_buffer_list(&mut self, bwe: &super::BandwidthEstimator) -> gst::BufferList { + fn create_buffer_list(&mut self, bwe: &super::BandwidthEstimator) -> BufferList { let now = time::Instant::now(); let elapsed = now - self.last_push; let mut budget = (elapsed.whole_nanoseconds() as i64) @@ -762,8 +764,8 @@ impl State { let mut remaining = self.buffers.iter().map(|b| b.size() as f64).sum::() * 8.; let total_size = remaining; - let mut list = gst::BufferList::new(); - let mutlist = list.get_mut().unwrap(); + let mut list_size = 0; + let mut list = BufferList::new(); // Leak the bucket so it can hold at most 30ms of data let maximum_remaining_bits = 30. * self.estimated_bitrate as f64 / 1000.; @@ -773,7 +775,8 @@ impl State { let n_bits = buf.size() * 8; leaked = budget <= 0 && remaining > maximum_remaining_bits; - mutlist.add(buf); + list_size += buf.size(); + list.push(buf); budget -= n_bits as i64; remaining -= n_bits as f64; } @@ -786,7 +789,7 @@ impl State { human_kbits(self.estimated_bitrate), human_kbits(budget as f64), human_kbits(total_budget as f64), - human_kbits(list.calculate_size() as f64 * 8.), + human_kbits(list_size as f64 * 8.), human_kbits(remaining), human_kbits(total_size) ); @@ -1026,8 +1029,14 @@ pub struct BandwidthEstimator { } impl BandwidthEstimator { - fn push_list(&self, list: gst::BufferList) -> Result { - let res = self.srcpad.push_list(list); + fn push_list(&self, list: BufferList) -> Result { + let mut res = Ok(gst::FlowSuccess::Ok); + for buf in list { + res = self.srcpad.push(buf); + if res.is_err() { + break; + } + } self.state.lock().unwrap().flow_return = res;