Implement remote/local clock estimation with the same algorithm as the RTP jitterbuffer

This gives fewer jumps and generally leads to smoother and more correct
results, while at the same time also being faster.
This commit is contained in:
Sebastian Dröge 2021-09-30 13:24:46 +03:00
parent 0c89e0819f
commit 9a53bcd405

View file

@ -1,6 +1,6 @@
use glib::prelude::*;
use gst::prelude::*;
use gst::{gst_debug, gst_error, gst_log, gst_warning};
use gst::{gst_debug, gst_error, gst_log, gst_trace, gst_warning};
use gst_video::prelude::*;
use byte_slice_cast::AsMutSliceOf;
@ -74,29 +74,34 @@ struct ReceiverQueueInner {
timeout: bool,
}
// 100 frames observations window over which we calculate the timestamp drift
// between sender and receiver. A bigger window allows more smoothing out of
// network effects
const WINDOW_LENGTH: usize = 100;
const WINDOW_LENGTH: u64 = 512;
const WINDOW_DURATION: u64 = 2_000_000_000;
#[derive(Clone)]
struct Observations(Arc<Mutex<ObservationsInner>>);
struct ObservationsInner {
// NDI timestamp - GStreamer clock time tuples
values: Vec<(u64, u64)>,
values_tmp: [(u64, u64); WINDOW_LENGTH],
current_mapping: TimeMapping,
next_mapping: TimeMapping,
time_mapping_pending: bool,
// How many frames we skipped since last observation
// we took
skip_count: usize,
// How many frames we skip in this period. once skip_count
// reaches this, we take another observation
skip_period: usize,
// How many observations are left until we update the skip_period
// again. This is always initialized to WINDOW_LENGTH
skip_period_update_in: usize,
struct ObservationsInner {
base_remote_time: Option<u64>,
base_local_time: Option<u64>,
deltas: VecDeque<i64>,
min_delta: i64,
skew: i64,
filling: bool,
window_size: usize,
}
impl Default for ObservationsInner {
fn default() -> ObservationsInner {
ObservationsInner {
base_local_time: None,
base_remote_time: None,
deltas: VecDeque::new(),
min_delta: 0,
skew: 0,
filling: true,
window_size: 0,
}
}
}
#[derive(Clone, Copy, Debug)]
@ -109,18 +114,12 @@ struct TimeMapping {
impl Observations {
fn new() -> Self {
Self(Arc::new(Mutex::new(ObservationsInner {
values: Vec::with_capacity(WINDOW_LENGTH),
values_tmp: [(0, 0); WINDOW_LENGTH],
current_mapping: TimeMapping::default(),
next_mapping: TimeMapping::default(),
time_mapping_pending: false,
skip_count: 0,
skip_period: 1,
skip_period_update_in: WINDOW_LENGTH,
})))
Self(Arc::new(Mutex::new(ObservationsInner::default())))
}
// Based on the algorithm used in GStreamer's rtpjitterbuffer, which comes from
// Fober, Orlarey and Letz, 2005, "Real Time Clock Skew Estimation over Network Delays":
// http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.102.1546
fn process(
&self,
element: &gst_base::BaseSrc,
@ -132,151 +131,152 @@ impl Observations {
}
let time = (time.0.unwrap(), time.1);
let remote_time = time.0.nseconds();
let local_time = time.1.nseconds();
let mut inner = self.0.lock().unwrap();
let ObservationsInner {
ref mut values,
ref mut values_tmp,
ref mut current_mapping,
ref mut next_mapping,
ref mut time_mapping_pending,
ref mut skip_count,
ref mut skip_period,
ref mut skip_period_update_in,
} = *inner;
if values.is_empty() {
current_mapping.xbase = time.0.nseconds();
current_mapping.b = time.1.nseconds();
current_mapping.num = 1;
current_mapping.den = 1;
}
if *skip_count == 0 {
*skip_count += 1;
if *skip_count >= *skip_period {
*skip_count = 0;
}
*skip_period_update_in -= 1;
if *skip_period_update_in == 0 {
*skip_period_update_in = WINDOW_LENGTH;
// Start by first updating every frame, then every second frame, then every third
// frame, etc. until we update once every quarter second
let framerate = gst::ClockTime::SECOND
.checked_div(duration.unwrap_or(40 * gst::ClockTime::MSECOND).nseconds())
.unwrap_or(25) as usize;
if *skip_period < framerate / 4 + 1 {
*skip_period += 1;
} else {
*skip_period = framerate / 4 + 1;
}
}
assert!(values.len() <= WINDOW_LENGTH);
if values.len() == WINDOW_LENGTH {
values.remove(0);
}
values.push((time.0.nseconds(), time.1.nseconds()));
if let Some((num, den, b, xbase, r_squared)) =
gst::calculate_linear_regression(values, Some(values_tmp))
{
next_mapping.xbase = xbase;
next_mapping.b = b;
next_mapping.num = num;
next_mapping.den = den;
*time_mapping_pending = true;
gst_debug!(
CAT,
obj: element,
"Calculated new time mapping: GStreamer time = {} * (NDI time - {}) + {} ({})",
next_mapping.num as f64 / next_mapping.den as f64,
gst::ClockTime::from_nseconds(next_mapping.xbase),
gst::ClockTime::from_nseconds(next_mapping.b),
r_squared,
);
}
} else {
*skip_count += 1;
if *skip_count >= *skip_period {
*skip_count = 0;
}
}
if *time_mapping_pending {
let expected = gst::Clock::adjust_with_calibration(
time.0,
gst::ClockTime::from_nseconds(current_mapping.xbase),
gst::ClockTime::from_nseconds(current_mapping.b),
gst::ClockTime::from_nseconds(current_mapping.num),
gst::ClockTime::from_nseconds(current_mapping.den),
);
let new_calculated = gst::Clock::adjust_with_calibration(
time.0,
gst::ClockTime::from_nseconds(next_mapping.xbase),
gst::ClockTime::from_nseconds(next_mapping.b),
gst::ClockTime::from_nseconds(next_mapping.num),
gst::ClockTime::from_nseconds(next_mapping.den),
);
let diff = if new_calculated > expected {
new_calculated - expected
} else {
expected - new_calculated
};
// Allow at most 5% frame duration or 2ms difference per frame
let max_diff = cmp::max(
(duration.map(|d| d / 10)).unwrap_or(2 * gst::ClockTime::MSECOND),
2 * gst::ClockTime::MSECOND,
);
if diff > max_diff {
gst_debug!(
CAT,
obj: element,
"New time mapping causes difference {} but only {} allowed",
diff,
max_diff,
);
if new_calculated > expected {
current_mapping.b = (expected + max_diff).nseconds();
current_mapping.xbase = time.0.nseconds();
} else {
current_mapping.b = (expected - max_diff).nseconds();
current_mapping.xbase = time.0.nseconds();
}
} else {
*current_mapping = *next_mapping;
}
}
let converted_timestamp = gst::Clock::adjust_with_calibration(
time.0,
gst::ClockTime::from_nseconds(current_mapping.xbase),
gst::ClockTime::from_nseconds(current_mapping.b),
gst::ClockTime::from_nseconds(current_mapping.num),
gst::ClockTime::from_nseconds(current_mapping.den),
);
let converted_duration =
duration.and_then(|d| d.mul_div_floor(current_mapping.num, current_mapping.den));
gst_debug!(
gst_trace!(
CAT,
obj: element,
"Converted timestamp {}/{} to {}, duration {} to {}",
time.0,
time.1,
converted_timestamp.display(),
duration.display(),
converted_duration.display(),
"Local time {}, remote time {}",
gst::ClockTime::from_nseconds(local_time),
gst::ClockTime::from_nseconds(remote_time),
);
(converted_timestamp, converted_duration)
let mut inner = self.0.lock().unwrap();
let (base_remote_time, base_local_time) =
match (inner.base_remote_time, inner.base_local_time) {
(Some(remote), Some(local)) => (remote, local),
_ => {
gst_debug!(
CAT,
obj: element,
"Initializing base time: local {}, remote {}",
gst::ClockTime::from_nseconds(local_time),
gst::ClockTime::from_nseconds(remote_time),
);
inner.base_remote_time = Some(remote_time);
inner.base_local_time = Some(local_time);
return (gst::ClockTime::from_nseconds(local_time), duration);
}
};
let remote_diff = remote_time.saturating_sub(base_remote_time);
let local_diff = local_time.saturating_sub(base_local_time);
let delta = (local_diff as i64) - (remote_diff as i64);
gst_trace!(
CAT,
obj: element,
"Local diff {}, remote diff {}, delta {}",
gst::ClockTime::from_nseconds(local_diff),
gst::ClockTime::from_nseconds(remote_diff),
delta,
);
if remote_diff > 0 && local_diff > 0 {
let slope = (local_diff as f64) / (remote_diff as f64);
if slope < 0.8 || slope > 1.2 {
gst_warning!(
CAT,
obj: element,
"Too small/big slope {}, resetting",
slope
);
*inner = ObservationsInner::default();
gst_debug!(
CAT,
obj: element,
"Initializing base time: local {}, remote {}",
gst::ClockTime::from_nseconds(local_time),
gst::ClockTime::from_nseconds(remote_time),
);
inner.base_remote_time = Some(remote_time);
inner.base_local_time = Some(local_time);
return (gst::ClockTime::from_nseconds(local_time), duration);
}
}
if (delta > inner.skew && delta - inner.skew > 1_000_000_000)
|| (delta < inner.skew && inner.skew - delta > 1_000_000_000)
{
gst_warning!(
CAT,
obj: element,
"Delta {} too far from skew {}, resetting",
delta,
inner.skew
);
*inner = ObservationsInner::default();
gst_debug!(
CAT,
obj: element,
"Initializing base time: local {}, remote {}",
gst::ClockTime::from_nseconds(local_time),
gst::ClockTime::from_nseconds(remote_time),
);
inner.base_remote_time = Some(remote_time);
inner.base_local_time = Some(local_time);
return (gst::ClockTime::from_nseconds(local_time), duration);
}
if inner.filling {
if inner.deltas.is_empty() || delta < inner.min_delta {
inner.min_delta = delta;
}
inner.deltas.push_back(delta);
if remote_diff > WINDOW_DURATION || inner.deltas.len() as u64 == WINDOW_LENGTH {
inner.window_size = inner.deltas.len();
inner.skew = inner.min_delta;
inner.filling = false;
} else {
let perc_time = remote_diff.mul_div_floor(100, WINDOW_DURATION).unwrap() as i64;
let perc_window = (inner.deltas.len() as u64)
.mul_div_floor(100, WINDOW_LENGTH)
.unwrap() as i64;
let perc = cmp::max(perc_time, perc_window);
inner.skew = (perc * inner.min_delta + ((10_000 - perc) * inner.skew)) / 10_000;
}
} else {
let old = inner.deltas.pop_front().unwrap();
inner.deltas.push_back(delta);
if delta <= inner.min_delta {
inner.min_delta = delta;
} else if old == inner.min_delta {
inner.min_delta = inner.deltas.iter().copied().min().unwrap();
}
inner.skew = (inner.min_delta + (124 * inner.skew)) / 125;
}
let out_time = base_local_time + remote_diff;
let out_time = if inner.skew < 0 {
out_time.saturating_sub((-inner.skew) as u64)
} else {
out_time + (inner.skew as u64)
};
gst_trace!(
CAT,
obj: element,
"Skew {}, min delta {}",
inner.skew,
inner.min_delta
);
gst_trace!(
CAT,
obj: element,
"Outputting {}",
gst::ClockTime::from_nseconds(out_time)
);
(gst::ClockTime::from_nseconds(out_time), duration)
}
}
@ -285,8 +285,8 @@ impl Default for TimeMapping {
Self {
xbase: 0,
b: 0,
num: 1,
den: 1,
num: 0,
den: 0,
}
}
}