webrtcsink: Initial congestion control implementation

Naive heuristic lifted from an earlier proof of concept,
augmented with logic from
https://datatracker.ietf.org/doc/html/draft-ietf-rmcat-gcc-02#section-5.5

A property is exposed to disable congestion control for
testing purposes, it can be extended in the future to allow
selecting a different congestion control scheme.

+ Update the documentation
This commit is contained in:
Mathieu Duponchelle 2021-11-04 18:26:50 +01:00
parent 79fb66f338
commit 8c6ff24052
5 changed files with 764 additions and 61 deletions

70
Cargo.lock generated
View file

@ -98,6 +98,23 @@ dependencies = [
"url",
]
[[package]]
name = "async-process"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b21b63ab5a0db0369deb913540af2892750e42d949faacc7a61495ac418a1692"
dependencies = [
"async-io",
"blocking",
"cfg-if",
"event-listener",
"futures-lite",
"libc",
"once_cell",
"signal-hook",
"winapi",
]
[[package]]
name = "async-std"
version = "1.10.0"
@ -108,6 +125,7 @@ dependencies = [
"async-global-executor",
"async-io",
"async-lock",
"async-process",
"crossbeam-utils",
"futures-channel",
"futures-core",
@ -516,7 +534,7 @@ dependencies = [
[[package]]
name = "glib"
version = "0.15.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core#64cd888c4b02525b2c22a5979f8c5f592c1f4047"
source = "git+https://github.com/gtk-rs/gtk-rs-core#834ea60512aef1df6d861fb1efd06173b3d7083a"
dependencies = [
"bitflags",
"futures-channel",
@ -534,7 +552,7 @@ dependencies = [
[[package]]
name = "glib-macros"
version = "0.15.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core#64cd888c4b02525b2c22a5979f8c5f592c1f4047"
source = "git+https://github.com/gtk-rs/gtk-rs-core#834ea60512aef1df6d861fb1efd06173b3d7083a"
dependencies = [
"anyhow",
"heck",
@ -548,7 +566,7 @@ dependencies = [
[[package]]
name = "glib-sys"
version = "0.15.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core#64cd888c4b02525b2c22a5979f8c5f592c1f4047"
source = "git+https://github.com/gtk-rs/gtk-rs-core#834ea60512aef1df6d861fb1efd06173b3d7083a"
dependencies = [
"libc",
"system-deps",
@ -570,7 +588,7 @@ dependencies = [
[[package]]
name = "gobject-sys"
version = "0.15.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core#64cd888c4b02525b2c22a5979f8c5f592c1f4047"
source = "git+https://github.com/gtk-rs/gtk-rs-core#834ea60512aef1df6d861fb1efd06173b3d7083a"
dependencies = [
"glib-sys",
"libc",
@ -663,6 +681,30 @@ dependencies = [
"system-deps",
]
[[package]]
name = "gstreamer-rtp"
version = "0.18.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#b43d785d837738a9071ad0690232e245e487dd53"
dependencies = [
"bitflags",
"glib",
"gstreamer",
"gstreamer-rtp-sys",
"once_cell",
]
[[package]]
name = "gstreamer-rtp-sys"
version = "0.18.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#b43d785d837738a9071ad0690232e245e487dd53"
dependencies = [
"glib-sys",
"gstreamer-base-sys",
"gstreamer-sys",
"libc",
"system-deps",
]
[[package]]
name = "gstreamer-sdp"
version = "0.18.0"
@ -1312,6 +1354,25 @@ dependencies = [
"opaque-debug",
]
[[package]]
name = "signal-hook"
version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c98891d737e271a2954825ef19e46bd16bdb98e2746f2eec4f7a4ef7946efd1"
dependencies = [
"libc",
"signal-hook-registry",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0"
dependencies = [
"libc",
]
[[package]]
name = "slab"
version = "0.4.5"
@ -1630,6 +1691,7 @@ dependencies = [
"gst-plugin-version-helper",
"gstreamer",
"gstreamer-app",
"gstreamer-rtp",
"gstreamer-sdp",
"gstreamer-video",
"gstreamer-webrtc",

View file

@ -32,12 +32,17 @@ useful alternative.
While this is not on the roadmap at the moment, nothing in the design prevents
implementing this optimization.
* Congestion control: the element levarages transport-wide congestion control
feedback messages in order to adapt the bitrate of individual consumers' video
encoders to the available bandwidth.
* Configuration: the level of user control over the element is at the moment quite
narrow, as the only interface exposed is control over proposed codecs, as well
as their order of priority. Consult `gst-inspect=1.0` for more information.
as their order of priority, and disabling congestion control. Consult `gst-inspect=1.0`
for more information.
More features are on the roadmap, focusing on mechanisms for mitigating packet
loss and congestion.
loss.
It is important to note that full control over the individual elements used by
`webrtcsink` is *not* on the roadmap, as it will act as a black box in that respect,
@ -52,13 +57,7 @@ expose interfaces to guide and tune the heuristics it employs.
### Prerequisites
The element has only been tested for now against GStreamer master, with an
extra Merge Request pending review:
<https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1233>
The MR should hopefully make it in in time for GStreamer's 1.20 release,
in the meantime the patches must be applied locally.
The element has only been tested for now against GStreamer master.
For testing, it is recommended to simply build GStreamer locally and run
in the uninstalled devenv.
@ -98,7 +97,7 @@ python3 -m http.server
In the third, run:
```
``` shell
export GST_PLUGIN_PATH=$PWD/target/debug:$GST_PLUGIN_PATH
gst-launch-1.0 webrtcsink name=ws videotestsrc ! ws. audiotestsrc ! ws.
```
@ -106,13 +105,77 @@ gst-launch-1.0 webrtcsink name=ws videotestsrc ! ws. audiotestsrc ! ws.
When the pipeline above is running succesfully, open a browser and
point it to the python server:
```
``` shell
xdg-open http://127.0.0.1:8000
```
You should see an identifier listed in the left-hand panel, click on
it. You should see a test video stream, and hear a test tone.
## Configuration
The element itself can be configured through its properties, see
`gst-inspect-1.0 webrtcsink` for more information about that, in addition the
default signaller also exposes properties for configuring it, in
particular setting the signalling server address, those properties
can be accessed through the `gst::ChildProxy` interface, for example
with gst-launch:
``` shell
gst-launch-1.0 webrtcsink signaller::address="ws://127.0.0.1:8443" ..
```
The signaller object can not be inspected, refer to [the source code]
for the list of properties.
[the source code]: plugins/src/signaller/imp.rs
## Testing congestion control
For the purpose of testing congestion in a reproducible manner, a
[simple tool] has been used, I only used it on Linux but it is documented
as usable on MacOS too. I had to run the client browser on a separate
machine on my local network for congestion to actually be applied, I didn't
look into why that was necessary.
My testing procedure was:
* identify the server machine network interface (eg with `ifconfig` on Linux)
* identify the client machine IP address (eg with `ifconfig` on Linux)
* start the various services as explained in the Usage section (use
`GST_DEBUG=webrtcsink:7` to get detailed logs about congestion control)
* start playback in the client browser
* Run a `comcast` command on the server machine, for instance:
``` shell
/home/meh/go/bin/comcast --device=$SERVER_INTERFACE --target-bw 3000 --target-addr=$CLIENT_IP --target-port=1:65535 --target-proto=udp
```
* Observe the bitrate sharply decreasing, playback should slow down briefly
then catch back up
* Remove the bandwidth limitation, and observe the bitrate eventually increasing
back to a maximum:
``` shell
/home/meh/go/bin/comcast --device=$SERVER_INTERFACE --stop
```
For comparison, the congestion control property can be set to disabled on
webrtcsink, then the above procedure applied again, the expected result is
for playback to simply crawl down to a halt until the bandwidth limitation
is lifted:
``` shell
gst-launch-1.0 webrtcsink congestion-control=disabled
```
[simple tool]: https://github.com/tylertreat/comcast
## License
All code in this repository is licensed under the [MIT license].

View file

@ -14,11 +14,12 @@ gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gst
gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
gst-webrtc = { package = "gstreamer-webrtc", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
gst-sdp = { package = "gstreamer-sdp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
once_cell = "1.0"
smallvec = "1"
anyhow = "1"
futures = "0.3"
async-std = "1"
async-std = { version = "1", features = ["unstable"] }
async-tungstenite = { version = "0.10", features = ["async-std-runtime", "async-native-tls"] }
serde = "1"
serde_derive = "1"
@ -48,4 +49,4 @@ install_subdir = "gstreamer-1.0"
versioning = false
[package.metadata.capi.pkg_config]
requires_private = "gstreamer-webrtc >= 1.20, gstreamer-1.0 >= 1.20, gstreamer-app >= 1.20, gstreamer-video >= 1.20, gstreamer-sdp >= 1.20, gobject-2.0, glib-2.0, gmodule-2.0"
requires_private = "gstreamer-rtp >= 1.20, gstreamer-webrtc >= 1.20, gstreamer-1.0 >= 1.20, gstreamer-app >= 1.20, gstreamer-video >= 1.20, gstreamer-sdp >= 1.20, gobject-2.0, glib-2.0, gmodule-2.0"

View file

@ -3,6 +3,7 @@ use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::{gst_debug, gst_error, gst_info, gst_log, gst_trace, gst_warning};
use gst_rtp::prelude::*;
use async_std::task;
use futures::prelude::*;
@ -10,9 +11,11 @@ use futures::prelude::*;
use anyhow::{anyhow, Error};
use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::ops::Mul;
use std::sync::Mutex;
use super::utils::{make_element, StreamProducer};
use super::WebRTCSinkCongestionControl;
use crate::signaller::Signaller;
use std::collections::BTreeMap;
@ -24,10 +27,17 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
)
});
const RTP_TWCC_URI: &str =
"http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01";
const DEFAULT_CONGESTION_CONTROL: WebRTCSinkCongestionControl =
WebRTCSinkCongestionControl::Homegrown;
/// User configuration
struct Settings {
video_caps: gst::Caps,
audio_caps: gst::Caps,
cc_heuristic: WebRTCSinkCongestionControl,
}
/// Represents a codec we can offer
@ -57,6 +67,8 @@ struct InputStream {
#[derive(Clone)]
struct WebRTCPad {
pad: gst::Pad,
/// The (fixed) caps of the corresponding input stream
in_caps: gst::Caps,
/// Our offer caps
caps: gst::Caps,
/// The m= line index in the SDP
@ -68,11 +80,66 @@ struct WebRTCPad {
payload: Option<i32>,
}
/// Wrapper around GStreamer encoder element, keeps track of factory
/// name in order to provide a unified set / get bitrate API, also
/// tracks a raw capsfilter used to resize / decimate the input video
/// stream according to the bitrate, thresholds hardcoded for now
struct VideoEncoder {
factory_name: String,
element: gst::Element,
filter: gst::Element,
halved_framerate: gst::Fraction,
full_width: i32,
peer_id: String,
}
struct CongestionController {
/// Overall bitrate target for all video streams.
/// Hasn't been tested with multiple video streams, but
/// current design is simply to divide bitrate equally.
bitrate_ema: Option<f64>,
/// Exponential moving average, updated when bitrate is
/// decreased, discarded when increased again past last
/// congestion window. Smoothing factor hardcoded.
target_bitrate: i32,
/// Exponentially weighted moving variance, recursively
/// updated along with bitrate_ema. sqrt'd to obtain standard
/// deviation, used to determine whether to increase bitrate
/// additively or multiplicatively
bitrate_emvar: f64,
/// Used in additive mode to track last control time, influences
/// calculation of added value according to gcc section 5.5
last_update_time: Option<std::time::Instant>,
/// For logging purposes
peer_id: String,
}
#[derive(Debug)]
enum IncreaseType {
/// Increase bitrate by value
Additive(f64),
/// Increase bitrate by factor
Multiplicative(f64),
}
#[derive(Debug)]
enum CongestionControlOp {
/// Don't update target bitrate
Hold,
/// Decrease target bitrate
Decrease(f64),
/// Increase target bitrate, either additively or multiplicatively
Increase(IncreaseType),
}
struct Consumer {
pipeline: gst::Pipeline,
webrtcbin: gst::Element,
webrtc_pads: HashMap<u32, WebRTCPad>,
peer_id: String,
encoders: Vec<VideoEncoder>,
/// None if congestion control was disabled
congestion_controller: Option<CongestionController>,
}
#[derive(PartialEq)]
@ -120,6 +187,7 @@ impl Default for Settings {
.iter()
.map(|s| gst::Structure::new_empty(s))
.collect::<gst::Caps>(),
cc_heuristic: WebRTCSinkCongestionControl::Homegrown,
}
}
}
@ -151,9 +219,14 @@ fn setup_encoding(
src: &gst::Element,
codec: &Codec,
ssrc: Option<u32>,
) -> Result<gst::Element, Error> {
twcc: bool,
) -> Result<(gst::Element, gst::Element, gst::Element), Error> {
let conv = match codec.is_video {
true => make_element("videoconvert", None)?,
true => gst::parse_bin_from_description(
"videoconvert ! videoscale ! videorate drop-only=true",
true,
)?
.upcast(),
false => gst::parse_bin_from_description("audioresample ! audioconvert", true)?.upcast(),
};
@ -206,11 +279,66 @@ fn setup_encoding(
let conv_caps = if codec.encoder.name() == "nvh264enc" {
gst::Caps::builder("video/x-raw")
.field("format", &gst::List::new(&[&"NV12", &"YV12", &"I420"]))
.field("pixel-aspect-ratio", gst::Fraction::new(1, 1))
.build()
} else if codec.is_video {
gst::Caps::builder("video/x-raw")
.field("pixel-aspect-ratio", gst::Fraction::new(1, 1))
.build()
} else {
gst::Caps::new_any()
gst::Caps::builder("audio/x-raw").build()
};
match codec.encoder.name().as_str() {
"vp8enc" | "vp9enc" => {
enc.set_property("deadline", 1i64).unwrap();
enc.set_property("threads", 12i32).unwrap();
enc.set_property("target-bitrate", 2560000i32).unwrap();
enc.set_property("cpu-used", -16i32).unwrap();
enc.set_property("keyframe-max-dist", 2000i32).unwrap();
enc.set_property_from_str("end-usage", "cbr").unwrap();
enc.set_property("buffer-initial-size", 100i32).unwrap();
enc.set_property("buffer-optimal-size", 120i32).unwrap();
enc.set_property("buffer-size", 300i32).unwrap();
enc.set_property("resize-allowed", true).unwrap();
enc.set_property("max-intra-bitrate", 250i32).unwrap();
pay.set_property_from_str("picture-id-mode", "15-bit")
.unwrap();
}
"x264enc" => {
enc.set_property("bitrate", 25608u32).unwrap();
enc.set_property_from_str("tune", "zerolatency").unwrap();
enc.set_property_from_str("speed-preset", "ultrafast")
.unwrap();
enc.set_property("threads", 12u32).unwrap();
enc.set_property("key-int-max", 2560u32).unwrap();
enc.set_property("b-adapt", false).unwrap();
enc.set_property("vbv-buf-capacity", 120u32).unwrap();
}
"nvh264enc" => {
enc.set_property("bitrate", 2048u32).unwrap();
enc.set_property("gop-size", 2560i32).unwrap();
enc.set_property_from_str("rc-mode", "cbr").unwrap();
enc.set_property("vbv-buffer-size", 120u32).unwrap();
enc.set_property("zerolatency", true).unwrap();
}
_ => (),
}
/* We only enforce TWCC in the offer caps, once a remote description
* has been set it will get automatically negotiated. This is necessary
* because the implementor in Firefox had apparently not understood the
* concept of *transport-wide* congestion control, and firefox doesn't
* provide feedback for audio packets.
*/
if twcc {
let twcc_extension = gst_rtp::RTPHeaderExtension::create_from_uri(RTP_TWCC_URI).unwrap();
twcc_extension.set_id(1);
pay.emit_by_name("add-extension", &[&twcc_extension])
.unwrap();
}
conv_filter.set_property("caps", conv_caps).unwrap();
let parse_caps = if codec_name == "video/x-h264" {
@ -230,31 +358,380 @@ fn setup_encoding(
gst::Element::link_many(&[&parse_filter, &pay]).with_context(|| "Linking encoding elements")?;
Ok(pay)
Ok((enc, conv_filter, pay))
}
impl State {
fn remove_consumer(&mut self, element: &super::WebRTCSink, peer_id: &str, signal: bool) {
if let Some(consumer) = self.consumers.remove(peer_id) {
for webrtc_pad in consumer.webrtc_pads.values() {
if let Some(producer) = self
.streams
.get(&webrtc_pad.stream_name)
.and_then(|stream| stream.producer.as_ref())
fn lookup_remote_inbound_rtp_stats(stats: &gst::StructureRef) -> Option<gst::Structure> {
for (_, field_value) in stats {
if let Ok(s) = field_value.get::<gst::Structure>() {
if let Ok(type_) = s.get::<gst_webrtc::WebRTCStatsType>("type") {
if type_ == gst_webrtc::WebRTCStatsType::RemoteInboundRtp {
return Some(s);
}
}
}
}
None
}
fn lookup_transport_stats(stats: &gst::StructureRef) -> Option<gst::Structure> {
for (_, field_value) in stats {
if let Ok(s) = field_value.get::<gst::Structure>() {
if let Ok(type_) = s.get::<gst_webrtc::WebRTCStatsType>("type") {
if type_ == gst_webrtc::WebRTCStatsType::Transport && s.has_field("gst-twcc-stats")
{
consumer.disconnect_input_stream(producer);
return Some(s);
}
}
}
}
None
}
impl VideoEncoder {
fn new(
element: gst::Element,
filter: gst::Element,
in_caps: &gst::Caps,
peer_id: &str,
) -> Self {
let s = in_caps.structure(0).unwrap();
let halved_framerate = s
.get::<gst::Fraction>("framerate")
.unwrap()
.mul(gst::Fraction::new(1, 2));
let full_width = s.get::<i32>("width").unwrap();
Self {
factory_name: element.factory().unwrap().name().into(),
element,
filter,
halved_framerate,
full_width,
peer_id: peer_id.to_string(),
}
}
fn bitrate(&self) -> i32 {
match self.factory_name.as_str() {
"vp8enc" | "vp9enc" => self
.element
.property("target-bitrate")
.unwrap()
.get::<i32>()
.unwrap(),
"x264enc" | "nvh264enc" => {
(self
.element
.property("bitrate")
.unwrap()
.get::<u32>()
.unwrap()
* 1000) as i32
}
_ => unreachable!(),
}
}
fn set_bitrate(&self, element: &super::WebRTCSink, bitrate: i32) {
match self.factory_name.as_str() {
"vp8enc" | "vp9enc" => self
.element
.set_property("target-bitrate", bitrate)
.unwrap(),
"x264enc" | "nvh264enc" => self
.element
.set_property("bitrate", (bitrate / 1000) as u32)
.unwrap(),
_ => unreachable!(),
}
let mut s = self
.filter
.property("caps")
.unwrap()
.get::<gst::Caps>()
.unwrap()
.structure(0)
.unwrap()
.to_owned();
// Hardcoded thresholds, may be tuned further in the future, and
// adapted according to the codec in use
if bitrate < 500000 {
s.set("width", 360i32.min(self.full_width));
s.set("framerate", self.halved_framerate);
} else if bitrate < 1000000 {
s.set("width", 360i32.min(self.full_width));
s.remove_field("framerate");
} else if bitrate < 2000000 {
s.set("width", 720i32.min(self.full_width));
s.remove_field("framerate");
} else {
s.remove_field("width");
s.remove_field("framerate");
}
let caps = gst::Caps::builder_full().structure(s).build();
gst_log!(
CAT,
obj: element,
"consumer {}: setting bitrate {} and caps {} on encoder {:?}",
self.peer_id,
bitrate,
caps,
self.element
);
self.filter.set_property("caps", caps).unwrap();
}
}
impl CongestionController {
fn new(peer_id: &str) -> Self {
Self {
target_bitrate: 0,
bitrate_ema: None,
bitrate_emvar: 0.,
last_update_time: None,
peer_id: peer_id.to_string(),
}
}
fn update(
&mut self,
element: &super::WebRTCSink,
twcc_stats: &gst::StructureRef,
rtt: f64,
) -> CongestionControlOp {
let target_bitrate = self.target_bitrate as f64;
// Unwrap, all those fields must be there or there's been an API
// break, which qualifies as programming error
let bitrate_sent = twcc_stats.get::<u32>("bitrate-sent").unwrap();
let bitrate_recv = twcc_stats.get::<u32>("bitrate-recv").unwrap();
let delta_of_delta = twcc_stats.get::<i64>("avg-delta-of-delta").unwrap();
let loss_percentage = twcc_stats.get::<f64>("packet-loss-pct").unwrap();
let sent_minus_received = bitrate_sent.saturating_sub(bitrate_recv);
let delay_factor = sent_minus_received as f64 / target_bitrate;
let last_update_time = self.last_update_time.replace(std::time::Instant::now());
gst_trace!(
CAT,
obj: element,
"consumer {}: considering stats {}",
self.peer_id,
twcc_stats
);
if delay_factor > 0.01 {
CongestionControlOp::Decrease(if delay_factor < 0.64 {
gst_trace!(
CAT,
obj: element,
"consumer {}: low delay factor",
self.peer_id
);
0.96
} else {
gst_trace!(
CAT,
obj: element,
"consumer {}: High delay factor",
self.peer_id
);
delay_factor.sqrt().sqrt().clamp(0.8, 0.96)
})
} else if delta_of_delta > 1000000 || loss_percentage > 2.0 {
CongestionControlOp::Decrease(if loss_percentage > 0. && loss_percentage < 2.0 {
gst_trace!(CAT, obj: element, "consumer {}: low loss", self.peer_id);
0.97
} else {
gst_log!(CAT, obj: element, "consumer: {}: high loss", self.peer_id);
((100. - loss_percentage) / 100.).clamp(0.7, 0.98)
})
} else if loss_percentage > 0.01 {
gst_trace!(CAT, obj: element, "consumer {}: tiny loss", self.peer_id);
CongestionControlOp::Hold
} else {
gst_trace!(
CAT,
obj: element,
"consumer {}: no detected congestion",
self.peer_id
);
CongestionControlOp::Increase(if let Some(ema) = self.bitrate_ema {
let bitrate_stdev = self.bitrate_emvar.sqrt();
gst_trace!(
CAT,
obj: element,
"consumer {}: Old bitrate: {}, ema: {}, stddev: {}",
self.peer_id,
target_bitrate,
ema,
bitrate_stdev,
);
// gcc section 5.5 advises 3 standard deviations, but experiments
// have shown this to be too low, probably related to the rest of
// homegrown algorithm not implementing gcc, revisit when implementing
// the rest of the RFC
if target_bitrate < ema - 7. * bitrate_stdev {
gst_trace!(
CAT,
obj: element,
"consumer {}: below last congestion window",
self.peer_id
);
/* Multiplicative increase */
IncreaseType::Multiplicative(1.03)
} else if target_bitrate > ema + 7. * bitrate_stdev {
gst_trace!(
CAT,
obj: element,
"consumer {}: above last congestion window",
self.peer_id
);
/* We have gone past our last estimated max bandwidth
* network situation may have changed, go back to
* multiplicative increase
*/
self.bitrate_ema.take();
IncreaseType::Multiplicative(1.03)
} else {
let rtt_ms = rtt * 1000.;
let response_time_ms = 100. + rtt_ms;
let time_since_last_update_ms = match last_update_time {
None => 0.,
Some(instant) => {
(self.last_update_time.unwrap() - instant).as_millis() as f64
}
};
// gcc section 5.5 advises 0.95 as the smoothing factor, but that
// seems intuitively much too low, granting disproportionate importance
// to the last measurement. 0.5 seems plenty enough, I don't have maths
// to back that up though :)
let alpha = 0.5 * f64::min(time_since_last_update_ms / response_time_ms, 1.0);
let bits_per_frame = target_bitrate / 30.;
let packets_per_frame = f64::ceil(bits_per_frame / (1200. * 8.));
let avg_packet_size_bits = bits_per_frame / packets_per_frame;
gst_trace!(
CAT,
obj: element,
"consumer {}: still in last congestion window",
self.peer_id,
);
/* Additive increase */
IncreaseType::Additive(f64::max(1000., alpha * avg_packet_size_bits))
}
} else {
/* Multiplicative increase */
gst_trace!(
CAT,
obj: element,
"consumer {}: outside congestion window",
self.peer_id
);
IncreaseType::Multiplicative(1.03)
})
}
}
fn clamp_bitrate(&mut self, bitrate: i32, n_encoders: i32) {
self.target_bitrate = bitrate.clamp(1000 * n_encoders, 8192000 * n_encoders);
}
fn control(
&mut self,
element: &super::WebRTCSink,
stats: &gst::StructureRef,
encoders: &Vec<VideoEncoder>,
) {
let n_encoders = encoders.len() as i32;
let rtt = lookup_remote_inbound_rtp_stats(stats)
.and_then(|s| s.get::<f64>("round-trip-time").ok())
.unwrap_or(0.);
if let Some(twcc_stats) = lookup_transport_stats(stats).and_then(|transport_stats| {
transport_stats.get::<gst::Structure>("gst-twcc-stats").ok()
}) {
let control_op = self.update(element, &twcc_stats, rtt);
gst_trace!(
CAT,
obj: element,
"consumer {}: applying congestion control operation {:?}",
self.peer_id,
control_op
);
match control_op {
CongestionControlOp::Hold => (),
CongestionControlOp::Increase(IncreaseType::Additive(value)) => {
self.clamp_bitrate(self.target_bitrate + value as i32, n_encoders);
}
CongestionControlOp::Increase(IncreaseType::Multiplicative(factor)) => {
self.clamp_bitrate((self.target_bitrate as f64 * factor) as i32, n_encoders);
}
CongestionControlOp::Decrease(factor) => {
self.clamp_bitrate((self.target_bitrate as f64 * factor) as i32, n_encoders);
// Smoothing factor
let alpha = 0.75;
if let Some(ema) = self.bitrate_ema {
let sigma: f64 = (self.target_bitrate as f64) - ema;
self.bitrate_ema = Some(ema + (alpha * sigma));
self.bitrate_emvar =
(1. - alpha) * (self.bitrate_emvar + alpha * sigma.powi(2));
} else {
self.bitrate_ema = Some(self.target_bitrate as f64);
self.bitrate_emvar = 0.;
}
}
}
consumer.pipeline.call_async(|pipeline| {
let _ = pipeline.set_state(gst::State::Null);
});
if signal {
self.signaller.consumer_removed(element, peer_id);
for encoder in encoders {
encoder.set_bitrate(element, self.target_bitrate / n_encoders);
}
}
}
}
impl State {
fn finalize_consumer(&mut self, element: &super::WebRTCSink, consumer: Consumer, signal: bool) {
for webrtc_pad in consumer.webrtc_pads.values() {
if let Some(producer) = self
.streams
.get(&webrtc_pad.stream_name)
.and_then(|stream| stream.producer.as_ref())
{
consumer.disconnect_input_stream(producer);
}
}
consumer.pipeline.call_async(|pipeline| {
let _ = pipeline.set_state(gst::State::Null);
});
if signal {
self.signaller.consumer_removed(element, &consumer.peer_id);
}
}
fn remove_consumer(&mut self, element: &super::WebRTCSink, peer_id: &str, signal: bool) {
if let Some(consumer) = self.consumers.remove(peer_id) {
self.finalize_consumer(element, consumer, signal);
}
}
fn maybe_start_signaller(&mut self, element: &super::WebRTCSink) {
if self.signaller_state == SignallerState::Stopped
@ -296,16 +773,11 @@ impl Consumer {
}
/// Request a sink pad on our webrtcbin, and set its transceiver's codec_preferences
fn request_webrtcbin_pad(
&mut self,
element: &super::WebRTCSink,
caps: &gst::Caps,
stream_name: &str,
) {
fn request_webrtcbin_pad(&mut self, element: &super::WebRTCSink, stream: &InputStream) {
let ssrc = self.generate_ssrc();
let media_idx = self.webrtc_pads.len() as i32;
let mut payloader_caps = caps.to_owned();
let mut payloader_caps = stream.out_caps.as_ref().unwrap().to_owned();
{
let payloader_caps_mut = payloader_caps.make_mut();
@ -346,10 +818,11 @@ impl Consumer {
ssrc,
WebRTCPad {
pad,
in_caps: stream.in_caps.as_ref().unwrap().clone(),
caps: payloader_caps,
media_idx: media_idx as u32,
ssrc,
stream_name: stream_name.to_string(),
stream_name: stream.sink_pad.name().to_string(),
payload: None,
},
);
@ -358,7 +831,7 @@ impl Consumer {
/// Called when we have received an answer, connects an InputStream
/// to a given WebRTCPad
fn connect_input_stream(
&self,
&mut self,
element: &super::WebRTCSink,
producer: &StreamProducer,
webrtc_pad: &WebRTCPad,
@ -381,7 +854,33 @@ impl Consumer {
let appsrc = make_element("appsrc", None)?;
self.pipeline.add(&appsrc).unwrap();
let pay = setup_encoding(&self.pipeline, &appsrc, codec, Some(webrtc_pad.ssrc))?;
let (enc, filter, pay) =
setup_encoding(&self.pipeline, &appsrc, codec, Some(webrtc_pad.ssrc), false)?;
if codec.is_video {
let enc = VideoEncoder::new(
enc.clone(),
filter.clone(),
&webrtc_pad.in_caps,
&self.peer_id,
);
if let Some(congestion_controller) = self.congestion_controller.as_mut() {
congestion_controller.target_bitrate += enc.bitrate();
} else {
/* If congestion control is disabled, we simply use the highest
* known "safe" value for the bitrate.
*
* I have found higher values to cause packet loss *somewhere* in
* my local network, possibly related to chrome's pretty low UDP
* buffer sizes, this probably should be exposed as a property
* eventually.
*/
enc.set_bitrate(element, 8192000i32);
}
self.encoders.push(enc);
}
let appsrc = appsrc.downcast::<gst_app::AppSrc>().unwrap();
@ -723,6 +1222,7 @@ impl WebRTCSink {
/// Called by the signaller to add a new consumer
pub fn add_consumer(&self, element: &super::WebRTCSink, peer_id: &str) -> Result<(), Error> {
let cc_heuristic = self.settings.lock().unwrap().cc_heuristic;
let mut state = self.state.lock().unwrap();
if state.consumers.contains_key(peer_id) {
@ -882,15 +1382,17 @@ impl WebRTCSink {
webrtcbin,
webrtc_pads: HashMap::new(),
peer_id: peer_id.to_string(),
congestion_controller: match cc_heuristic {
WebRTCSinkCongestionControl::Disabled => None,
WebRTCSinkCongestionControl::Homegrown => Some(CongestionController::new(peer_id)),
},
encoders: Vec::new(),
};
state.streams.iter().for_each(|(_, stream)| {
consumer.request_webrtcbin_pad(
element,
stream.out_caps.as_ref().unwrap(),
stream.sink_pad.name().as_str(),
)
});
state
.streams
.iter()
.for_each(|(_, stream)| consumer.request_webrtcbin_pad(element, &stream));
let clock = element.clock();
@ -950,12 +1452,27 @@ impl WebRTCSink {
Ok(())
}
fn process_webrtcbin_stats(
&self,
element: &super::WebRTCSink,
peer_id: &str,
stats: &gst::StructureRef,
) {
let mut state = self.state.lock().unwrap();
if let Some(consumer) = state.consumers.get_mut(peer_id) {
if let Some(congestion_controller) = consumer.congestion_controller.as_mut() {
congestion_controller.control(element, stats, &consumer.encoders);
}
}
}
fn on_remote_description_set(&self, element: &super::WebRTCSink, peer_id: String) {
let state = self.state.lock().unwrap();
let mut state = self.state.lock().unwrap();
let mut remove = false;
if let Some(consumer) = state.consumers.get(&peer_id) {
for webrtc_pad in consumer.webrtc_pads.values() {
if let Some(mut consumer) = state.consumers.remove(&peer_id) {
for webrtc_pad in consumer.webrtc_pads.clone().values() {
if let Some(producer) = state
.streams
.get(&webrtc_pad.stream_name)
@ -986,12 +1503,43 @@ impl WebRTCSink {
break;
}
}
}
drop(state);
let element_clone = element.downgrade();
let webrtcbin = consumer.webrtcbin.downgrade();
let peer_id_clone = peer_id.clone();
if remove {
let _ = self.remove_consumer(element, &peer_id, true);
task::spawn(async move {
let mut interval =
async_std::stream::interval(std::time::Duration::from_millis(100));
while let Some(_) = interval.next().await {
let element_clone = element_clone.clone();
let peer_id_clone = peer_id_clone.clone();
if let Some(webrtcbin) = webrtcbin.upgrade() {
let promise = gst::Promise::with_change_func(move |reply| {
if let Some(element) = element_clone.upgrade() {
let this = Self::from_instance(&element);
if let Ok(Some(stats)) = reply {
this.process_webrtcbin_stats(&element, &peer_id_clone, stats);
}
}
});
webrtcbin
.emit_by_name("get-stats", &[&None::<gst::Pad>, &promise])
.unwrap();
} else {
break;
}
}
});
if remove {
state.finalize_consumer(element, consumer, true);
} else {
state.consumers.insert(consumer.peer_id.clone(), consumer);
}
}
}
@ -1110,7 +1658,7 @@ impl WebRTCSink {
src.link(&capsfilter)
.with_context(|| format!("Running discovery pipeline for caps {}", caps))?;
let pay = setup_encoding(&pipe.0, &capsfilter, codec, None)?;
let (_, _, pay) = setup_encoding(&pipe.0, &capsfilter, codec, None, true)?;
let sink = make_element("fakesink", None)?;
@ -1144,6 +1692,7 @@ impl WebRTCSink {
"seqnum-offset",
"ssrc",
"sprop-parameter-sets",
"a-framerate",
]);
s.set("payload", codec.payload);
return Ok(s);
@ -1349,6 +1898,14 @@ impl ObjectImpl for WebRTCSink {
gst::Caps::static_type(),
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpec::new_enum(
"congestion-control",
"Congestion control",
"Defines how congestion is controlled, if at all",
WebRTCSinkCongestionControl::static_type(),
DEFAULT_CONGESTION_CONTROL as i32,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
]
});
@ -1377,6 +1934,12 @@ impl ObjectImpl for WebRTCSink {
.expect("type checked upstream")
.unwrap_or_else(|| gst::Caps::new_empty());
}
"congestion-control" => {
let mut settings = self.settings.lock().unwrap();
settings.cc_heuristic = value
.get::<WebRTCSinkCongestionControl>()
.expect("type checked upstream");
}
_ => unimplemented!(),
}
}
@ -1391,6 +1954,10 @@ impl ObjectImpl for WebRTCSink {
let settings = self.settings.lock().unwrap();
settings.audio_caps.to_value()
}
"congestion-control" => {
let settings = self.settings.lock().unwrap();
settings.cc_heuristic.to_value()
}
_ => unimplemented!(),
}
}

View file

@ -110,6 +110,16 @@ impl WebRTCSink {
}
}
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::GEnum)]
#[repr(u32)]
#[genum(type_name = "GstWebRTCSinkCongestionControl")]
pub enum WebRTCSinkCongestionControl {
#[genum(name = "Disabled: no congestion control is applied", nick = "disabled")]
Disabled,
#[genum(name = "Homegrown: simple sender-side heuristic", nick = "homegrown")]
Homegrown,
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),