Use StreamConsumer from gstreamer itself

Merged in https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/merge_requests/1022
This commit is contained in:
Thibault Saunier 2022-05-11 10:41:11 -04:00 committed by Mathieu Duponchelle
parent c02a0d8757
commit f6f079f3a8
5 changed files with 86 additions and 431 deletions

150
Cargo.lock generated
View file

@ -313,9 +313,9 @@ dependencies = [
[[package]]
name = "clap"
version = "3.1.17"
version = "3.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47582c09be7c8b32c0ab3a6181825ababb713fde6fff20fc573a3870dd45c6a0"
checksum = "d2dbdf4bdacb33466e854ce889eee8dfd5729abf7ccd7664d0a2d60cd384440b"
dependencies = [
"atty",
"bitflags",
@ -330,9 +330,9 @@ dependencies = [
[[package]]
name = "clap_derive"
version = "3.1.7"
version = "3.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3aab4734e083b809aaf5794e14e756d1c798d2c69c7f7de7a09a2f5214993c1"
checksum = "25320346e922cffe59c0bbc5410c8d8784509efb321488971081313cb1e1a33c"
dependencies = [
"heck",
"proc-macro-error",
@ -598,7 +598,7 @@ dependencies = [
[[package]]
name = "glib"
version = "0.16.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core#f9fa78ab81999b95531729f81a81a29e743898d4"
source = "git+https://github.com/gtk-rs/gtk-rs-core#98859614e78337b0a37751ccd75e1abeca1295c6"
dependencies = [
"bitflags",
"futures-channel",
@ -618,7 +618,7 @@ dependencies = [
[[package]]
name = "glib-macros"
version = "0.16.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core#f9fa78ab81999b95531729f81a81a29e743898d4"
source = "git+https://github.com/gtk-rs/gtk-rs-core#98859614e78337b0a37751ccd75e1abeca1295c6"
dependencies = [
"anyhow",
"heck",
@ -632,7 +632,7 @@ dependencies = [
[[package]]
name = "glib-sys"
version = "0.16.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core#f9fa78ab81999b95531729f81a81a29e743898d4"
source = "git+https://github.com/gtk-rs/gtk-rs-core#98859614e78337b0a37751ccd75e1abeca1295c6"
dependencies = [
"libc",
"system-deps",
@ -653,7 +653,7 @@ dependencies = [
[[package]]
name = "gobject-sys"
version = "0.16.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core#f9fa78ab81999b95531729f81a81a29e743898d4"
source = "git+https://github.com/gtk-rs/gtk-rs-core#98859614e78337b0a37751ccd75e1abeca1295c6"
dependencies = [
"glib-sys",
"libc",
@ -672,7 +672,7 @@ dependencies = [
[[package]]
name = "gstreamer"
version = "0.19.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537"
dependencies = [
"bitflags",
"cfg-if",
@ -697,7 +697,7 @@ dependencies = [
[[package]]
name = "gstreamer-app"
version = "0.19.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537"
dependencies = [
"bitflags",
"futures-core",
@ -713,7 +713,7 @@ dependencies = [
[[package]]
name = "gstreamer-app-sys"
version = "0.19.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537"
dependencies = [
"glib-sys",
"gstreamer-base-sys",
@ -725,7 +725,7 @@ dependencies = [
[[package]]
name = "gstreamer-base"
version = "0.19.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537"
dependencies = [
"bitflags",
"cfg-if",
@ -738,7 +738,7 @@ dependencies = [
[[package]]
name = "gstreamer-base-sys"
version = "0.19.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537"
dependencies = [
"glib-sys",
"gobject-sys",
@ -750,7 +750,7 @@ dependencies = [
[[package]]
name = "gstreamer-rtp"
version = "0.19.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537"
dependencies = [
"bitflags",
"glib",
@ -763,7 +763,7 @@ dependencies = [
[[package]]
name = "gstreamer-rtp-sys"
version = "0.19.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537"
dependencies = [
"glib-sys",
"gstreamer-base-sys",
@ -775,7 +775,7 @@ dependencies = [
[[package]]
name = "gstreamer-sdp"
version = "0.19.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537"
dependencies = [
"glib",
"gstreamer",
@ -785,7 +785,7 @@ dependencies = [
[[package]]
name = "gstreamer-sdp-sys"
version = "0.19.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537"
dependencies = [
"glib-sys",
"gstreamer-sys",
@ -796,7 +796,7 @@ dependencies = [
[[package]]
name = "gstreamer-sys"
version = "0.19.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537"
dependencies = [
"glib-sys",
"gobject-sys",
@ -804,10 +804,22 @@ dependencies = [
"system-deps",
]
[[package]]
name = "gstreamer-utils"
version = "0.19.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537"
dependencies = [
"gstreamer",
"gstreamer-app",
"gstreamer-video",
"once_cell",
"thiserror",
]
[[package]]
name = "gstreamer-video"
version = "0.19.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537"
dependencies = [
"bitflags",
"cfg-if",
@ -824,7 +836,7 @@ dependencies = [
[[package]]
name = "gstreamer-video-sys"
version = "0.19.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537"
dependencies = [
"glib-sys",
"gobject-sys",
@ -837,7 +849,7 @@ dependencies = [
[[package]]
name = "gstreamer-webrtc"
version = "0.19.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537"
dependencies = [
"glib",
"gstreamer",
@ -849,7 +861,7 @@ dependencies = [
[[package]]
name = "gstreamer-webrtc-sys"
version = "0.19.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#45e16f7753e4a815a42b13a6932fb6ee48e40bda"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#5349822962bbc19c049e807aade0c3b3ba554537"
dependencies = [
"glib-sys",
"gstreamer-sdp-sys",
@ -933,9 +945,9 @@ dependencies = [
[[package]]
name = "itoa"
version = "1.0.1"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35"
checksum = "112c678d4050afce233f4f2852bb2eb519230b3cf12f33585275537d7e41578d"
[[package]]
name = "js-sys"
@ -963,9 +975,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.125"
version = "0.2.126"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5916d2ae698f6de9bfb891ad7a8d65c09d232dc58cc4ac433c7da3b2fd84bc2b"
checksum = "349d5a591cd28b49e1d1037471617a32ddcda5731b99419008085f72d5a53836"
[[package]]
name = "log"
@ -1125,9 +1137,9 @@ dependencies = [
[[package]]
name = "os_str_bytes"
version = "6.0.0"
version = "6.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e22443d1643a904602595ba1cd8f7d896afe56d26712531c5ff73a15b2fbf64"
checksum = "029d8d0b2f198229de29dca79676f2738ff952edf3fde542eb8bf94d8c21b435"
[[package]]
name = "parking"
@ -1226,11 +1238,11 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.38"
version = "1.0.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9027b48e9d4c9175fa2218adf3557f91c1137021739951d4932f5f8268ac48aa"
checksum = "c54b25569025b7fc9651de43004ae593a75ad88543b17178aa5e1b9c4f15f56f"
dependencies = [
"unicode-xid",
"unicode-ident",
]
[[package]]
@ -1316,9 +1328,9 @@ dependencies = [
[[package]]
name = "ryu"
version = "1.0.9"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f"
checksum = "f3f6f92acf49d1b98f7a81226834412ada05458b7364277387724a237f062695"
[[package]]
name = "schannel"
@ -1415,9 +1427,9 @@ dependencies = [
[[package]]
name = "signal-hook"
version = "0.3.13"
version = "0.3.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "647c97df271007dcea485bb74ffdb57f2e683f1306c854f468a0c244badabf2d"
checksum = "a253b5e89e2698464fc26b545c9edceb338e18a89effeeecfea192c3025be29d"
dependencies = [
"libc",
"signal-hook-registry",
@ -1446,12 +1458,12 @@ checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83"
[[package]]
name = "socket2"
version = "0.4.5"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca642ba17f8b2995138b1d7711829c92e98c0a25ea019de790f4f09279c4e296"
checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0"
dependencies = [
"libc",
"windows-sys",
"winapi",
]
[[package]]
@ -1462,13 +1474,13 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "syn"
version = "1.0.92"
version = "1.0.95"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ff7c592601f11445996a06f8ad0c27f094a58857c2f89e97974ab9235b92c52"
checksum = "fbaf6116ab8924f39d52792136fb74fd60a80194cf1b1c6ffa6453eef1c3f942"
dependencies = [
"proc-macro2",
"quote",
"unicode-xid",
"unicode-ident",
]
[[package]]
@ -1672,6 +1684,12 @@ version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992"
[[package]]
name = "unicode-ident"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d22af068fba1eb5edcb4aea19d382b2a3deb4c8f9d475c589b6ada9e0fd493ee"
[[package]]
name = "unicode-normalization"
version = "0.1.19"
@ -1681,12 +1699,6 @@ dependencies = [
"tinyvec",
]
[[package]]
name = "unicode-xid"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "957e51f3646910546462e67d5f7599b9e4fb8acdd304b087a6494730f9eebf04"
[[package]]
name = "url"
version = "2.2.2"
@ -1852,6 +1864,7 @@ dependencies = [
"gstreamer-app",
"gstreamer-rtp",
"gstreamer-sdp",
"gstreamer-utils",
"gstreamer-video",
"gstreamer-webrtc",
"human_bytes",
@ -1936,46 +1949,3 @@ name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-sys"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2"
dependencies = [
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_msvc",
]
[[package]]
name = "windows_aarch64_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47"
[[package]]
name = "windows_i686_gnu"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6"
[[package]]
name = "windows_i686_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024"
[[package]]
name = "windows_x86_64_gnu"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1"
[[package]]
name = "windows_x86_64_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"

View file

@ -15,6 +15,7 @@ gst-video = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", packa
gst-webrtc = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package = "gstreamer-webrtc", features = ["v1_20"] }
gst-sdp = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package = "gstreamer-sdp", features = ["v1_20"] }
gst-rtp = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package = "gstreamer-rtp", features = ["v1_20"] }
gst-utils = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package = "gstreamer-utils" }
once_cell = "1.0"
smallvec = "1"
anyhow = "1"

View file

@ -4,6 +4,7 @@ use gst::glib::value::FromValue;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst_rtp::prelude::*;
use gst_utils::StreamProducer;
use gst_video::prelude::*;
use gst_video::subclass::prelude::*;
use gst_webrtc::WebRTCDataChannel;
@ -17,7 +18,6 @@ use std::collections::HashMap;
use std::ops::Mul;
use std::sync::Mutex;
use super::utils::{make_element, StreamProducer};
use super::{WebRTCSinkCongestionControl, WebRTCSinkError, WebRTCSinkMitigationMode};
use crate::signaller::Signaller;
use std::collections::BTreeMap;
@ -202,6 +202,7 @@ struct Consumer {
max_bitrate: u32,
links: HashMap<u32, gst_utils::ConsumptionLink>,
stats_sigid: Option<glib::SignalHandlerId>,
}
@ -240,6 +241,13 @@ fn create_navigation_event<N: IsA<gst_video::Navigation>>(sink: &N, msg: &str) {
}
}
/// Wrapper around `gst::ElementFactory::make` with a better error
/// message
pub fn make_element(element: &str, name: Option<&str>) -> Result<gst::Element, Error> {
gst::ElementFactory::make(element, name)
.with_context(|| format!("Failed to make element {}", element))
}
/// Simple utility for tearing down a pipeline cleanly
struct PipelineWrapper(gst::Pipeline);
@ -965,7 +973,7 @@ impl State {
fn finalize_consumer(
&mut self,
element: &super::WebRTCSink,
consumer: &Consumer,
consumer: &mut Consumer,
signal: bool,
) {
consumer.pipeline.debug_to_dot_file_with_ts(
@ -973,14 +981,8 @@ impl State {
format!("removing-peer-{}-", consumer.peer_id,),
);
for webrtc_pad in consumer.webrtc_pads.values() {
if let Some(producer) = self
.streams
.get(&webrtc_pad.stream_name)
.and_then(|stream| stream.producer.as_ref())
{
consumer.disconnect_input_stream(producer);
}
for ssrc in consumer.webrtc_pads.keys() {
consumer.links.remove(ssrc);
}
consumer.pipeline.call_async(|pipeline| {
@ -998,8 +1000,8 @@ impl State {
peer_id: &str,
signal: bool,
) -> Option<Consumer> {
if let Some(consumer) = self.consumers.remove(peer_id) {
self.finalize_consumer(element, &consumer, signal);
if let Some(mut consumer) = self.consumers.remove(peer_id) {
self.finalize_consumer(element, &mut consumer, signal);
Some(consumer)
} else {
None
@ -1052,6 +1054,7 @@ impl Consumer {
stats: gst::Structure::new_empty("application/x-webrtc-stats"),
webrtc_pads: HashMap::new(),
encoders: Vec::new(),
links: HashMap::new(),
stats_sigid: None,
}
}
@ -1168,7 +1171,7 @@ impl Consumer {
.get(&payload)
.ok_or_else(|| anyhow!("No codec for payload {}", payload))?;
let appsrc = make_element("appsrc", None)?;
let appsrc = make_element("appsrc", Some(&webrtc_pad.stream_name))?;
self.pipeline.add(&appsrc).unwrap();
let pay_filter = make_element("capsfilter", None)?;
@ -1258,11 +1261,7 @@ impl Consumer {
}
let appsrc = appsrc.downcast::<gst_app::AppSrc>().unwrap();
appsrc.set_format(gst::Format::Time);
appsrc.set_is_live(true);
appsrc.set_handle_segment_change(true);
gst_utils::StreamProducer::configure_consumer(&appsrc);
self.pipeline
.sync_children_states()
.with_context(|| format!("Connecting input stream for {}", self.peer_id))?;
@ -1275,14 +1274,13 @@ impl Consumer {
.link(&webrtc_pad.pad)
.with_context(|| format!("Connecting input stream for {}", self.peer_id))?;
producer.add_consumer(&appsrc, &self.peer_id);
Ok(())
}
/// Called when tearing down the consumer
fn disconnect_input_stream(&self, producer: &StreamProducer) {
producer.remove_consumer(&self.peer_id);
match producer.add_consumer(&appsrc) {
Ok(link) => {
self.links.insert(webrtc_pad.ssrc, link);
Ok(())
}
Err(err) => Err(anyhow!("Could not link producer: {:?}", err)),
}
}
}
@ -2038,7 +2036,7 @@ impl WebRTCSink {
});
if remove {
state.finalize_consumer(element, &consumer, true);
state.finalize_consumer(element, &mut consumer, true);
} else {
state.consumers.insert(consumer.peer_id.clone(), consumer);
}

View file

@ -4,7 +4,6 @@ use gst::subclass::prelude::ObjectSubclassExt;
use std::error::Error;
mod imp;
mod utils;
glib::wrapper! {
pub struct WebRTCSink(ObjectSubclass<imp::WebRTCSink>) @extends gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation;

View file

@ -1,313 +0,0 @@
use std::collections::HashMap;
use std::mem;
use std::sync::{atomic, Arc, Mutex};
use anyhow::{Context, Error};
use once_cell::sync::Lazy;
use gst::prelude::*;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"app-stream-producer",
gst::DebugColorFlags::empty(),
Some("gst_app Stream Producer interface"),
)
});
/// Wrapper around `gst::ElementFactory::make` with a better error
/// message
pub fn make_element(element: &str, name: Option<&str>) -> Result<gst::Element, Error> {
gst::ElementFactory::make(element, name)
.with_context(|| format!("Failed to make element {}", element))
}
/// The interface for transporting media data from one node
/// to another.
///
/// A producer is essentially a GStreamer `appsink` whose output
/// is sent to a set of consumers, who are essentially `appsrc` wrappers
#[derive(Debug, Clone)]
pub struct StreamProducer {
/// The appsink to dispatch data for
appsink: gst_app::AppSink,
/// The consumers to dispatch data to
consumers: Arc<Mutex<StreamConsumers>>,
}
impl PartialEq for StreamProducer {
fn eq(&self, other: &Self) -> bool {
self.appsink.eq(&other.appsink)
}
}
impl Eq for StreamProducer {}
impl StreamProducer {
/// Add an appsrc to dispatch data to
pub fn add_consumer(&self, consumer: &gst_app::AppSrc, consumer_id: &str) {
let mut consumers = self.consumers.lock().unwrap();
if consumers.consumers.get(consumer_id).is_some() {
gst::error!(CAT, "Consumer already added");
return;
}
gst::debug!(CAT, "Adding consumer");
consumer.set_property("max-buffers", 0u64);
consumer.set_property("max-bytes", 0u64);
consumer.set_property("max-time", 500 * gst::ClockTime::MSECOND);
consumer.set_property_from_str("leaky-type", "downstream");
// Forward force-keyunit events upstream to the appsink
let srcpad = consumer.static_pad("src").unwrap();
let appsink_clone = self.appsink.clone();
let fku_probe_id = srcpad
.add_probe(gst::PadProbeType::EVENT_UPSTREAM, move |_pad, info| {
if let Some(gst::PadProbeData::Event(ref ev)) = info.data {
if gst_video::UpstreamForceKeyUnitEvent::parse(ev).is_ok() {
gst::debug!(CAT, "Requesting keyframe");
let _ = appsink_clone.send_event(ev.clone());
}
}
gst::PadProbeReturn::Ok
})
.unwrap();
consumers.consumers.insert(
consumer_id.to_string(),
StreamConsumer::new(consumer, fku_probe_id, consumer_id),
);
}
/// Remove a consumer appsrc by id
pub fn remove_consumer(&self, consumer_id: &str) {
if let Some(consumer) = self.consumers.lock().unwrap().consumers.remove(consumer_id) {
gst::debug!(CAT, "Removed consumer {}", consumer.appsrc.name());
} else {
gst::debug!(CAT, "Consumer {} not found", consumer_id);
}
}
/// Stop discarding data samples and start forwarding them to the consumers.
///
/// This is useful for example for prerolling live sources.
pub fn forward(&self) {
self.consumers.lock().unwrap().discard = false;
}
/// Get the GStreamer `appsink` wrapped by this producer
pub fn appsink(&self) -> &gst_app::AppSink {
&self.appsink
}
}
impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
fn from(appsink: &'a gst_app::AppSink) -> Self {
let consumers = Arc::new(Mutex::new(StreamConsumers {
current_latency: None,
latency_updated: false,
consumers: HashMap::new(),
discard: true,
}));
let consumers_clone = consumers.clone();
let consumers_clone2 = consumers.clone();
appsink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample(move |appsink| {
let mut consumers = consumers_clone.lock().unwrap();
let sample = match appsink.pull_sample() {
Ok(sample) => sample,
Err(_err) => {
gst::debug!(CAT, "Failed to pull sample");
return Err(gst::FlowError::Flushing);
}
};
if consumers.discard {
return Ok(gst::FlowSuccess::Ok);
}
gst::trace!(CAT, "processing sample");
let latency = consumers.current_latency;
let latency_updated = mem::replace(&mut consumers.latency_updated, false);
let mut requested_keyframe = false;
let current_consumers = consumers
.consumers
.values()
.map(|c| {
if let Some(latency) = latency {
if c.forwarded_latency
.compare_exchange(
false,
true,
atomic::Ordering::SeqCst,
atomic::Ordering::SeqCst,
)
.is_ok()
|| latency_updated
{
c.appsrc.set_latency(latency, gst::ClockTime::NONE);
}
}
if c.first_buffer
.compare_exchange(
true,
false,
atomic::Ordering::SeqCst,
atomic::Ordering::SeqCst,
)
.is_ok()
&& !requested_keyframe
{
gst::debug!(CAT, "Requesting keyframe for first buffer");
appsink.send_event(
gst_video::UpstreamForceKeyUnitEvent::builder()
.all_headers(true)
.build(),
);
requested_keyframe = true;
}
c.appsrc.clone()
})
.collect::<smallvec::SmallVec<[_; 16]>>();
drop(consumers);
for consumer in current_consumers {
if let Err(err) = consumer.push_sample(&sample) {
gst::warning!(CAT, "Failed to push sample: {}", err);
}
}
Ok(gst::FlowSuccess::Ok)
})
.eos(move |_| {
let current_consumers = consumers_clone2
.lock()
.unwrap()
.consumers
.values()
.map(|c| c.appsrc.clone())
.collect::<smallvec::SmallVec<[_; 16]>>();
for consumer in current_consumers {
let _ = consumer.end_of_stream();
}
})
.build(),
);
let consumers_clone = consumers.clone();
let sinkpad = appsink.static_pad("sink").unwrap();
sinkpad.add_probe(gst::PadProbeType::EVENT_UPSTREAM, move |pad, info| {
if let Some(gst::PadProbeData::Event(ref ev)) = info.data {
use gst::EventView;
if let EventView::Latency(ev) = ev.view() {
if pad.parent().is_some() {
let latency = ev.latency();
let mut consumers = consumers_clone.lock().unwrap();
consumers.current_latency = Some(latency);
consumers.latency_updated = true;
}
}
}
gst::PadProbeReturn::Ok
});
StreamProducer {
appsink: appsink.clone(),
consumers,
}
}
}
/// Wrapper around a HashMap of consumers, exists for thread safety
/// and also protects some of the producer state
#[derive(Debug)]
struct StreamConsumers {
/// The currently-observed latency
current_latency: Option<gst::ClockTime>,
/// Whether the consumers' appsrc latency needs updating
latency_updated: bool,
/// The consumers, link id -> consumer
consumers: HashMap<String, StreamConsumer>,
/// Whether appsrc samples should be forwarded to consumers yet
discard: bool,
}
/// Wrapper around a consumer's `appsrc`
#[derive(Debug)]
struct StreamConsumer {
/// The GStreamer `appsrc` of the consumer
appsrc: gst_app::AppSrc,
/// The id of a pad probe that intercepts force-key-unit events
fku_probe_id: Option<gst::PadProbeId>,
/// Whether an initial latency was forwarded to the `appsrc`
forwarded_latency: atomic::AtomicBool,
/// Whether a first buffer has made it through, used to determine
/// whether a new key unit should be requested. Only useful for encoded
/// streams.
first_buffer: atomic::AtomicBool,
}
impl StreamConsumer {
/// Create a new consumer
fn new(appsrc: &gst_app::AppSrc, fku_probe_id: gst::PadProbeId, consumer_id: &str) -> Self {
let consumer_id = consumer_id.to_string();
appsrc.set_callbacks(
gst_app::AppSrcCallbacks::builder()
.enough_data(move |_appsrc| {
gst::debug!(
CAT,
"consumer {} is not consuming fast enough, old samples are getting dropped",
consumer_id
);
})
.build(),
);
StreamConsumer {
appsrc: appsrc.clone(),
fku_probe_id: Some(fku_probe_id),
forwarded_latency: atomic::AtomicBool::new(false),
first_buffer: atomic::AtomicBool::new(true),
}
}
}
impl Drop for StreamConsumer {
fn drop(&mut self) {
if let Some(fku_probe_id) = self.fku_probe_id.take() {
let srcpad = self.appsrc.static_pad("src").unwrap();
srcpad.remove_probe(fku_probe_id);
}
}
}
impl PartialEq for StreamConsumer {
fn eq(&self, other: &Self) -> bool {
self.appsrc.eq(&other.appsrc)
}
}
impl Eq for StreamConsumer {}
impl std::hash::Hash for StreamConsumer {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
std::hash::Hash::hash(&self.appsrc, state);
}
}
impl std::borrow::Borrow<gst_app::AppSrc> for StreamConsumer {
fn borrow(&self) -> &gst_app::AppSrc {
&self.appsrc
}
}