From cca3ebf520151935977c7a10c9c333a6df860458 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Wed, 20 Mar 2024 15:05:39 +0200 Subject: [PATCH] rtp: Switch from chrono to time Which allows to simplify quite a bit of code and avoids us having to handle some API deprecations. Part-of: --- Cargo.lock | 2 +- net/rtp/Cargo.toml | 2 +- net/rtp/src/gcc/imp.rs | 146 +++++++++++++++++++---------------------- 3 files changed, 68 insertions(+), 82 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2efdf935..209dff54 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2656,7 +2656,6 @@ version = "0.13.0-alpha.1" dependencies = [ "atomic_refcell", "bitstream-io", - "chrono", "gst-plugin-version-helper", "gstreamer", "gstreamer-app", @@ -2666,6 +2665,7 @@ dependencies = [ "rand", "rtp-types", "smallvec", + "time", ] [[package]] diff --git a/net/rtp/Cargo.toml b/net/rtp/Cargo.toml index 7361a4ea..c60a1ffb 100644 --- a/net/rtp/Cargo.toml +++ b/net/rtp/Cargo.toml @@ -11,13 +11,13 @@ rust-version.workspace = true [dependencies] atomic_refcell = "0.1" bitstream-io = "2.1" -chrono = { version = "0.4", default-features = false } gst = { workspace = true, features = ["v1_20"] } gst-rtp = { workspace = true, features = ["v1_20"] } once_cell.workspace = true rand = { version = "0.8", default-features = false, features = ["std", "std_rng" ] } rtp-types = { version = "0.1" } smallvec = { version = "1.11", features = ["union", "write", "const_generics", "const_new"] } +time = { version = "0.3", default-features = false, features = ["std"] } [dev-dependencies] gst-check = { workspace = true, features = ["v1_20"] } diff --git a/net/rtp/src/gcc/imp.rs b/net/rtp/src/gcc/imp.rs index 4cafd6c6..a2655ba5 100644 --- a/net/rtp/src/gcc/imp.rs +++ b/net/rtp/src/gcc/imp.rs @@ -17,7 +17,6 @@ * responsible for determining what bitrate to give to each encode) * */ -use chrono::Duration; use gst::{glib, prelude::*, subclass::prelude::*}; use once_cell::sync::Lazy; use std::{ @@ -26,8 +25,8 @@ use std::{ fmt::Debug, mem, sync::Mutex, - time, }; +use time::Duration; type Bitrate = u32; @@ -44,7 +43,7 @@ static CAT: Lazy = Lazy::new(|| { }); // Table1. Time limit in milliseconds between packet bursts which identifies a group -static BURST_TIME: Lazy = Lazy::new(|| Duration::milliseconds(5)); +const BURST_TIME: Duration = Duration::milliseconds(5); // Table1. Coefficient used for the measured noise variance // [0.1,0.001] @@ -55,13 +54,13 @@ const ONE_MINUS_CHI: f64 = 1. - CHI; const Q: f64 = 0.001; // Table1. Initial value for the adaptive threshold -static INITIAL_DEL_VAR_TH: Lazy = Lazy::new(|| Duration::microseconds(12500)); +const INITIAL_DEL_VAR_TH: Duration = Duration::microseconds(12500); // Table1. Initial value of the system error covariance const INITIAL_ERROR_COVARIANCE: f64 = 0.1; // Table1. Time required to trigger an overuse signal -static OVERUSE_TIME_TH: Lazy = Lazy::new(|| Duration::milliseconds(10)); +const OVERUSE_TIME_TH: Duration = Duration::milliseconds(10); // from 5.5 "beta is typically chosen to be in the interval [0.8, 0.95], 0.85 is the RECOMMENDED value." const BETA: f64 = 0.85; @@ -76,7 +75,7 @@ const MOVING_AVERAGE_SMOOTHING_FACTOR: f64 = 0.5; // `N(i)` is the number of packets received the past T seconds and `L(j)` is // the payload size of packet j. A window between 0.5 and 1 second is // RECOMMENDED. -static PACKETS_RECEIVED_WINDOW: Lazy = Lazy::new(|| Duration::milliseconds(1000)); // ms +const PACKETS_RECEIVED_WINDOW: Duration = Duration::milliseconds(1000); // ms // from "5.4 Over-use detector" -> // Moreover, del_var_th(i) SHOULD NOT be updated if this condition @@ -85,32 +84,32 @@ static PACKETS_RECEIVED_WINDOW: Lazy = Lazy::new(|| Duration::millisec // ``` // |m(i)| - del_var_th(i) > 15 // ``` -static MAX_M_MINUS_DEL_VAR_TH: Lazy = Lazy::new(|| Duration::milliseconds(15)); +const MAX_M_MINUS_DEL_VAR_TH: Duration = Duration::milliseconds(15); // from 5.4 "It is also RECOMMENDED to clamp del_var_th(i) to the range [6, 600]" -static MIN_THRESHOLD: Lazy = Lazy::new(|| Duration::milliseconds(6)); -static MAX_THRESHOLD: Lazy = Lazy::new(|| Duration::milliseconds(600)); +const MIN_THRESHOLD: Duration = Duration::milliseconds(6); +const MAX_THRESHOLD: Duration = Duration::milliseconds(600); // From 5.5 ""Close" is defined as three standard deviations around this average" const STANDARD_DEVIATION_CLOSE_NUM: f64 = 3.; // Minimal duration between 2 updates on the lost based rate controller -static LOSS_UPDATE_INTERVAL: Lazy = Lazy::new(|| time::Duration::from_millis(200)); -static LOSS_DECREASE_THRESHOLD: f64 = 0.1; -static LOSS_INCREASE_THRESHOLD: f64 = 0.02; -static LOSS_INCREASE_FACTOR: f64 = 1.05; +const LOSS_UPDATE_INTERVAL: Duration = Duration::milliseconds(200); +const LOSS_DECREASE_THRESHOLD: f64 = 0.1; +const LOSS_INCREASE_THRESHOLD: f64 = 0.02; +const LOSS_INCREASE_FACTOR: f64 = 1.05; // Minimal duration between 2 updates on the lost based rate controller -static DELAY_UPDATE_INTERVAL: Lazy = Lazy::new(|| time::Duration::from_millis(100)); +const DELAY_UPDATE_INTERVAL: Duration = Duration::milliseconds(100); -static ROUND_TRIP_TIME_WINDOW_SIZE: usize = 100; +const ROUND_TRIP_TIME_WINDOW_SIZE: usize = 100; -fn ts2dur(t: gst::ClockTime) -> Duration { +const fn ts2dur(t: gst::ClockTime) -> Duration { Duration::nanoseconds(t.nseconds() as i64) } -fn dur2ts(t: Duration) -> gst::ClockTime { - gst::ClockTime::from_nseconds(t.num_nanoseconds().unwrap() as u64) +const fn dur2ts(t: Duration) -> gst::ClockTime { + gst::ClockTime::from_nseconds(t.whole_nanoseconds() as u64) } #[derive(Debug)] @@ -118,7 +117,9 @@ enum BandwidthEstimationOp { /// Don't update target bitrate Hold, /// Decrease target bitrate + #[allow(unused)] Decrease(String /* reason */), + #[allow(unused)] Increase(String /* reason */), } @@ -160,7 +161,7 @@ impl Packet { let seqnum = structure.get::("seqnum").unwrap() as u64; if lost { return Some(Packet { - arrival: Duration::zero(), + arrival: Duration::ZERO, departure: ts2dur(departure), size: structure.get::("size").unwrap() as usize, seqnum, @@ -189,18 +190,12 @@ impl Default for PacketGroup { fn default() -> Self { Self { packets: Default::default(), - departure: Duration::zero(), + departure: Duration::ZERO, arrival: None, } } } -fn pdur(d: &Duration) -> String { - let stdd = time::Duration::from_nanos(d.num_nanoseconds().unwrap().unsigned_abs()); - - format!("{}{stdd:?}", if d.lt(&Duration::zero()) { "-" } else { "" }) -} - impl PacketGroup { fn add(&mut self, packet: Packet) { if self.departure.is_zero() { @@ -310,10 +305,10 @@ impl Debug for Detector { "Network Usage: {:?}. Effective bitrate: {}ps - Measure: {} Estimate: {} threshold {} - overuse_estimate {}", self.usage, human_kbits(self.effective_bitrate()), - pdur(&self.measure), - pdur(&self.estimate), - pdur(&self.threshold), - pdur(&self.last_overuse_estimate), + self.measure, + self.estimate, + self.threshold, + self.last_overuse_estimate, ) } } @@ -323,7 +318,7 @@ impl Detector { Detector { group: Default::default(), prev_group: Default::default(), - measure: Duration::zero(), + measure: Duration::ZERO, /* Smallish value to hold PACKETS_RECEIVED_WINDOW packets */ last_received_packets: BTreeMap::new(), @@ -334,16 +329,16 @@ impl Detector { gain: 0., measurement_uncertainty: 0., estimate_error: INITIAL_ERROR_COVARIANCE, - estimate: Duration::zero(), + estimate: Duration::ZERO, - threshold: *INITIAL_DEL_VAR_TH, + threshold: INITIAL_DEL_VAR_TH, last_threshold_update: None, num_deltas: 0, last_use_detector_update: time::Instant::now(), increasing_counter: 0, - last_overuse_estimate: Duration::zero(), - increasing_duration: Duration::zero(), + last_overuse_estimate: Duration::ZERO, + increasing_duration: Duration::ZERO, rtts: Default::default(), clock: gst::SystemClock::obtain(), @@ -371,7 +366,7 @@ impl Detector { .unwrap() .arrival; - while last_arrival - self.oldest_packet_in_window_ts() > *PACKETS_RECEIVED_WINDOW { + while last_arrival - self.oldest_packet_in_window_ts() > PACKETS_RECEIVED_WINDOW { let oldest_seqnum = *self.last_received_packets.iter().next().unwrap().0; self.last_received_packets.remove(&oldest_seqnum); } @@ -398,9 +393,8 @@ impl Detector { .sum::() * 8.; - (bits - / (duration.num_nanoseconds().unwrap() as f64 - / gst::ClockTime::SECOND.nseconds() as f64)) as Bitrate + (bits / (duration.whole_nanoseconds() as f64 / gst::ClockTime::SECOND.nseconds() as f64)) + as Bitrate } fn oldest_packet_in_window_ts(&self) -> Duration { @@ -425,7 +419,7 @@ impl Detector { (self .rtts .iter() - .map(|d| d.num_nanoseconds().unwrap() as f64) + .map(|d| d.whole_nanoseconds() as f64) .sum::() / self.rtts.len() as f64) as i64, ) @@ -481,7 +475,7 @@ impl Detector { } if pkt.departure >= self.group.departure { - if self.group.inter_departure_time_pkt(pkt) < *BURST_TIME { + if self.group.inter_departure_time_pkt(pkt) < BURST_TIME { self.group.add(*pkt); continue; } @@ -491,8 +485,8 @@ impl Detector { // A Packet which has an inter-arrival time less than burst_time and // an inter-group delay variation d(i) less than 0 is considered // being part of the current group of packets. - if self.group.inter_arrival_time_pkt(pkt) < *BURST_TIME - && self.group.inter_delay_variation_pkt(pkt) < Duration::zero() + if self.group.inter_arrival_time_pkt(pkt) < BURST_TIME + && self.group.inter_delay_variation_pkt(pkt) < Duration::ZERO { self.group.add(*pkt); continue; @@ -502,7 +496,7 @@ impl Detector { gst::trace!( CAT, "Packet group done: {:?}", - gst::ClockTime::from_nseconds(group.departure.num_nanoseconds().unwrap() as u64) + gst::ClockTime::from_nseconds(group.departure.whole_nanoseconds() as u64) ); if let Some(prev_group) = mem::replace(&mut self.prev_group, Some(group.clone())) { // 5.3 Arrival-time filter @@ -514,7 +508,7 @@ impl Detector { gst::debug!( CAT, "Ignoring packet departed at {:?} as we got feedback too late", - gst::ClockTime::from_nseconds(pkt.departure.num_nanoseconds().unwrap() as u64) + gst::ClockTime::from_nseconds(pkt.departure.whole_nanoseconds() as u64) ); } } @@ -527,10 +521,7 @@ impl Detector { if let Some(ref last_update) = self.last_loss_update { self.loss_average = loss_fraction - + (-Duration::from_std(now - *last_update) - .unwrap() - .num_milliseconds() as f64) - .exp() + + (-(now - *last_update).whole_milliseconds() as f64).exp() * (self.loss_average - loss_fraction); } @@ -541,7 +532,7 @@ impl Detector { self.measure = group.inter_delay_variation(prev_group); let z = self.measure - self.estimate; - let zms = z.num_microseconds().unwrap() as f64 / 1000.0; + let zms = z.whole_microseconds() as f64 / 1000.0; // This doesn't exactly follows the spec as we should compute and // use f_max here, no implementation we have found actually uses it. @@ -575,11 +566,11 @@ impl Detector { } let t = Duration::nanoseconds( - self.estimate.num_nanoseconds().unwrap() * i64::min(self.num_deltas, MAX_DELTAS), + self.estimate.whole_nanoseconds() as i64 * i64::min(self.num_deltas, MAX_DELTAS), ); let usage = if t > self.threshold { NetworkUsage::Over - } else if t.num_nanoseconds().unwrap() < -self.threshold.num_nanoseconds().unwrap() { + } else if t.whole_nanoseconds() < -self.threshold.whole_nanoseconds() { NetworkUsage::Under } else { NetworkUsage::Normal @@ -593,15 +584,15 @@ impl Detector { fn update_threshold(&mut self, estimate: &Duration) { const K_U: f64 = 0.01; // Table1. Coefficient for the adaptive threshold const K_D: f64 = 0.00018; // Table1. Coefficient for the adaptive threshold - const MAX_TIME_DELTA: time::Duration = time::Duration::from_millis(100); + const MAX_TIME_DELTA: Duration = Duration::milliseconds(100); let now = time::Instant::now(); if self.last_threshold_update.is_none() { self.last_threshold_update = Some(now); } - let abs_estimate = Duration::nanoseconds(estimate.num_nanoseconds().unwrap().abs()); - if abs_estimate > self.threshold + *MAX_M_MINUS_DEL_VAR_TH { + let abs_estimate = estimate.abs(); + if abs_estimate > self.threshold + MAX_M_MINUS_DEL_VAR_TH { self.last_threshold_update = Some(now); return; } @@ -611,14 +602,12 @@ impl Detector { } else { K_U }; - let time_delta = - Duration::from_std((now - self.last_threshold_update.unwrap()).min(MAX_TIME_DELTA)) - .unwrap(); + let time_delta = (now - self.last_threshold_update.unwrap()).min(MAX_TIME_DELTA); let d = abs_estimate - self.threshold; - let add = k * d.num_milliseconds() as f64 * time_delta.num_milliseconds() as f64; + let add = k * d.whole_milliseconds() as f64 * time_delta.whole_milliseconds() as f64; self.threshold += Duration::nanoseconds((add * 100. * 1_000.) as i64); - self.threshold = self.threshold.clamp(*MIN_THRESHOLD, *MAX_THRESHOLD); + self.threshold = self.threshold.clamp(MIN_THRESHOLD, MAX_THRESHOLD); self.last_threshold_update = Some(now); } @@ -626,22 +615,22 @@ impl Detector { let (th_usage, estimate) = self.compare_threshold(); let now = time::Instant::now(); - let delta = Duration::from_std(now - self.last_use_detector_update).unwrap(); + let delta = now - self.last_use_detector_update; self.last_use_detector_update = now; gst::log!( CAT, "{:?} - self.estimate {} - estimate: {} - th: {}", th_usage, - pdur(&self.estimate), - pdur(&estimate), - pdur(&self.threshold) + self.estimate, + estimate, + self.threshold ); match th_usage { NetworkUsage::Over => { self.increasing_duration += delta; self.increasing_counter += 1; - if self.increasing_duration > *OVERUSE_TIME_TH + if self.increasing_duration > OVERUSE_TIME_TH && self.increasing_counter > 1 && estimate > self.last_overuse_estimate { @@ -649,7 +638,7 @@ impl Detector { } } NetworkUsage::Under | NetworkUsage::Normal => { - self.increasing_duration = Duration::zero(); + self.increasing_duration = Duration::ZERO; self.increasing_counter = 0; self.usage = th_usage; @@ -761,8 +750,8 @@ impl State { // 4. sending engine implementing a "leaky bucket" fn create_buffer_list(&mut self, bwe: &super::BandwidthEstimator) -> gst::BufferList { let now = time::Instant::now(); - let elapsed = Duration::from_std(now - self.last_push).unwrap(); - let mut budget = (elapsed.num_nanoseconds().unwrap()) + let elapsed = now - self.last_push; + let mut budget = (elapsed.whole_nanoseconds() as i64) .mul_div_round( self.estimated_bitrate as i64, gst::ClockTime::SECOND.nseconds() as i64, @@ -793,7 +782,7 @@ impl State { CAT, obj: bwe, "{} bitrate: {}ps budget: {}/{} sending: {} Remaining: {}/{}", - pdur(&elapsed), + elapsed, human_kbits(self.estimated_bitrate), human_kbits(budget as f64), human_kbits(total_budget as f64), @@ -815,11 +804,11 @@ impl State { let time_since_last_update_ms = match self.last_increase_on_delay { None => 0., Some(prev) => { - if now - prev < *DELAY_UPDATE_INTERVAL { + if now - prev < DELAY_UPDATE_INTERVAL { return None; } - (now - prev).as_millis() as f64 + (now - prev).whole_milliseconds() as f64 } }; @@ -839,7 +828,7 @@ impl State { let packets_per_frame = f64::ceil(bits_per_frame / (1200. * 8.)); let avg_packet_size_bits = bits_per_frame / packets_per_frame; - let rtt_ms = self.detector.rtt().num_milliseconds() as f64; + let rtt_ms = self.detector.rtt().whole_milliseconds() as f64; let response_time_ms = 100. + rtt_ms; let alpha = 0.5 * f64::min(time_since_last_update_ms / response_time_ms, 1.0); let threshold_on_effective_bitrate = 1.5 * effective_bitrate as f64; @@ -959,7 +948,7 @@ impl State { let now = time::Instant::now(); if loss_ratio > LOSS_DECREASE_THRESHOLD - && (now - self.last_decrease_on_loss) > *LOSS_UPDATE_INTERVAL + && (now - self.last_decrease_on_loss) > LOSS_UPDATE_INTERVAL { let factor = 1. - (0.5 * loss_ratio); @@ -973,7 +962,7 @@ impl State { ControllerType::Loss, ) } else if loss_ratio < LOSS_INCREASE_THRESHOLD - && (now - self.last_increase_on_loss) > *LOSS_UPDATE_INTERVAL + && (now - self.last_increase_on_loss) > LOSS_UPDATE_INTERVAL { self.last_control_op = BandwidthEstimationOp::Increase("Low loss".into()); self.last_increase_on_loss = now; @@ -1000,7 +989,7 @@ impl State { }, NetworkUsage::Over => { let now = time::Instant::now(); - if now - self.last_decrease_on_delay > *DELAY_UPDATE_INTERVAL { + if now - self.last_decrease_on_delay > DELAY_UPDATE_INTERVAL { let effective_bitrate = self.detector.effective_bitrate(); let target = (self.estimated_bitrate as f64 * 0.95).min(BETA * effective_bitrate as f64); @@ -1051,7 +1040,7 @@ impl BandwidthEstimator { let clock = gst::SystemClock::obtain(); bwe.imp().state.lock().unwrap().clock_entry = - Some(clock.new_single_shot_id(clock.time().unwrap() + dur2ts(*BURST_TIME))); + Some(clock.new_single_shot_id(clock.time().unwrap() + dur2ts(BURST_TIME))); self.srcpad.start_task(move || { let pause = || { @@ -1088,10 +1077,7 @@ impl BandwidthEstimator { let list = { let mut state = lock_state(); clock - .single_shot_id_reinit( - &clock_entry, - clock.time().unwrap() + dur2ts(*BURST_TIME), - ) + .single_shot_id_reinit(&clock_entry, clock.time().unwrap() + dur2ts(BURST_TIME)) .unwrap(); state.clock_entry = Some(clock_entry); state.create_buffer_list(&bwe)