gst-plugins-rs/utils/fallbackswitch/src/fallbacksrc/imp.rs
Seungha Yang 930cc41aaa fallbacksrc: Don't forward manual flush events to downstream
fallbackswitch might forward flush event if it's for the currently
active pad. But forwarded flush event will be problematic in various
reasons and that's not a behavior we expected.
2022-04-12 23:21:32 +09:00

2485 lines
89 KiB
Rust

// Copyright (C) 2020 Sebastian Dröge <sebastian@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use parking_lot::Mutex;
use std::mem;
use std::time::Instant;
use once_cell::sync::Lazy;
use super::custom_source::CustomSource;
use super::video_fallback::VideoFallbackSource;
use super::{RetryReason, Status};
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"fallbacksrc",
gst::DebugColorFlags::empty(),
Some("Fallback Source Bin"),
)
});
#[derive(Debug, Clone)]
struct Stats {
num_retry: u64,
last_retry_reason: RetryReason,
buffering_percent: i32,
}
impl Default for Stats {
fn default() -> Self {
Self {
num_retry: 0,
last_retry_reason: RetryReason::None,
buffering_percent: 100,
}
}
}
impl Stats {
fn to_structure(&self) -> gst::Structure {
gst::Structure::builder("application/x-fallbacksrc-stats")
.field("num-retry", self.num_retry)
.field("last-retry-reason", self.last_retry_reason)
.field("buffering-percent", self.buffering_percent)
.build()
}
}
#[derive(Debug, Clone)]
struct Settings {
enable_audio: bool,
enable_video: bool,
uri: Option<String>,
source: Option<gst::Element>,
fallback_uri: Option<String>,
timeout: gst::ClockTime,
restart_timeout: gst::ClockTime,
retry_timeout: gst::ClockTime,
restart_on_eos: bool,
min_latency: gst::ClockTime,
buffer_duration: i64,
immediate_fallback: bool,
manual_unblock: bool,
}
impl Default for Settings {
fn default() -> Self {
Settings {
enable_audio: true,
enable_video: true,
uri: None,
source: None,
fallback_uri: None,
timeout: 5 * gst::ClockTime::SECOND,
restart_timeout: 5 * gst::ClockTime::SECOND,
retry_timeout: 60 * gst::ClockTime::SECOND,
restart_on_eos: false,
min_latency: gst::ClockTime::ZERO,
buffer_duration: -1,
immediate_fallback: false,
manual_unblock: false,
}
}
}
#[derive(Debug)]
enum Source {
Uri(String),
Element(gst::Element),
}
// Blocking buffer pad probe on the source pads. Once blocked we have a running time for the
// current buffer that can later be used for offsetting
//
// This is used for the initial offsetting after starting of the stream and for "pausing" when
// buffering.
struct Block {
pad: gst::Pad,
probe_id: gst::PadProbeId,
running_time: Option<gst::ClockTime>,
}
// Connects one source pad with fallbackswitch and the corresponding fallback input
struct Stream {
// Fallback input stream
// for video: filesrc, decoder, converters, imagefreeze
// for audio: live audiotestsrc, converters
fallback_input: gst::Element,
// source pad from source
source_srcpad: Option<gst::Pad>,
source_srcpad_block: Option<Block>,
// clocksync for source source pad
clocksync: gst::Element,
// imagefreeze if this is an image stream
imagefreeze: Option<gst::Element>,
clocksync_queue: gst::Element,
clocksync_queue_srcpad: gst::Pad,
// fallbackswitch
switch: gst::Element,
// output source pad, connected to switch
srcpad: gst::GhostPad,
}
struct State {
// uridecodebin3 or custom source element
source: gst::Element,
source_is_live: bool,
source_pending_restart: bool,
// For timing out the source and shutting it down to restart it
source_restart_timeout: Option<gst::SingleShotClockId>,
// For restarting the source after shutting it down
source_pending_restart_timeout: Option<gst::SingleShotClockId>,
// For failing completely if we didn't recover after the retry timeout
source_retry_timeout: Option<gst::SingleShotClockId>,
// All our output streams, selected by properties
video_stream: Option<Stream>,
audio_stream: Option<Stream>,
flow_combiner: gst_base::UniqueFlowCombiner,
last_buffering_update: Option<Instant>,
// Stream collection posted by source
streams: Option<gst::StreamCollection>,
// Configure settings
settings: Settings,
configured_source: Source,
// Statistics
stats: Stats,
// When application is using the manual-unblock property
manually_blocked: bool,
// So that we don't schedule a restart when manually unblocking
// and our source hasn't reached the required state
schedule_restart_on_unblock: bool,
is_image: bool,
}
#[derive(Default)]
pub struct FallbackSrc {
settings: Mutex<Settings>,
state: Mutex<Option<State>>,
}
#[glib::object_subclass]
impl ObjectSubclass for FallbackSrc {
const NAME: &'static str = "FallbackSrc";
type Type = super::FallbackSrc;
type ParentType = gst::Bin;
}
impl ObjectImpl for FallbackSrc {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpecBoolean::new(
"enable-audio",
"Enable Audio",
"Enable the audio stream, this will output silence if there's no audio in the configured URI",
true,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpecBoolean::new(
"enable-video",
"Enable Video",
"Enable the video stream, this will output black or the fallback video if there's no video in the configured URI",
true,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpecString::new("uri", "URI", "URI to use", None, glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY),
glib::ParamSpecObject::new(
"source",
"Source",
"Source to use instead of the URI",
gst::Element::static_type(),
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpecString::new(
"fallback-uri",
"Fallback URI",
"Fallback URI to use for video in case the main stream doesn't work",
None,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpecUInt64::new(
"timeout",
"Timeout",
"Timeout for switching to the fallback URI",
0,
std::u64::MAX - 1,
5 * *gst::ClockTime::SECOND,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpecUInt64::new(
"restart-timeout",
"Timeout",
"Timeout for restarting an active source",
0,
std::u64::MAX - 1,
5 * *gst::ClockTime::SECOND,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpecUInt64::new(
"retry-timeout",
"Retry Timeout",
"Timeout for stopping after repeated failure",
0,
std::u64::MAX - 1,
60 * *gst::ClockTime::SECOND,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpecBoolean::new(
"restart-on-eos",
"Restart on EOS",
"Restart source on EOS",
false,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpecEnum::new(
"status",
"Status",
"Current source status",
Status::static_type(),
Status::Stopped as i32,
glib::ParamFlags::READABLE,
),
glib::ParamSpecUInt64::new(
"min-latency",
"Minimum Latency",
"When the main source has a higher latency than the fallback source \
this allows to configure a minimum latency that would be configured \
if initially the fallback is enabled",
0,
std::u64::MAX - 1,
0,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpecInt64::new(
"buffer-duration",
"Buffer Duration",
"Buffer duration when buffering streams (-1 default value)",
-1,
std::i64::MAX - 1,
-1,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpecBoxed::new(
"statistics",
"Statistics",
"Various statistics",
gst::Structure::static_type(),
glib::ParamFlags::READABLE,
),
glib::ParamSpecBoolean::new(
"manual-unblock",
"Manual unblock",
"When enabled, the application must call the unblock signal, except for live streams",
false,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpecBoolean::new(
"immediate-fallback",
"Immediate fallback",
"Forward the fallback streams immediately at startup, when the primary streams are slow to start up and immediate output is required",
false,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
]
});
PROPERTIES.as_ref()
}
fn set_property(
&self,
obj: &Self::Type,
_id: usize,
value: &glib::Value,
pspec: &glib::ParamSpec,
) {
match pspec.name() {
"enable-audio" => {
let mut settings = self.settings.lock();
let new_value = value.get().expect("type checked upstream");
gst::info!(
CAT,
obj: obj,
"Changing enable-audio from {:?} to {:?}",
settings.enable_audio,
new_value,
);
settings.enable_audio = new_value;
}
"enable-video" => {
let mut settings = self.settings.lock();
let new_value = value.get().expect("type checked upstream");
gst::info!(
CAT,
obj: obj,
"Changing enable-video from {:?} to {:?}",
settings.enable_video,
new_value,
);
settings.enable_video = new_value;
}
"uri" => {
let mut settings = self.settings.lock();
let new_value = value.get().expect("type checked upstream");
gst::info!(
CAT,
obj: obj,
"Changing URI from {:?} to {:?}",
settings.uri,
new_value,
);
settings.uri = new_value;
}
"source" => {
let mut settings = self.settings.lock();
let new_value = value.get().expect("type checked upstream");
gst::info!(
CAT,
obj: obj,
"Changing source from {:?} to {:?}",
settings.source,
new_value,
);
settings.source = new_value;
}
"fallback-uri" => {
let mut settings = self.settings.lock();
let new_value = value.get().expect("type checked upstream");
gst::info!(
CAT,
obj: obj,
"Changing Fallback URI from {:?} to {:?}",
settings.fallback_uri,
new_value,
);
settings.fallback_uri = new_value;
}
"timeout" => {
let mut settings = self.settings.lock();
let new_value = value.get().expect("type checked upstream");
gst::info!(
CAT,
obj: obj,
"Changing timeout from {:?} to {:?}",
settings.timeout,
new_value,
);
settings.timeout = new_value;
}
"restart-timeout" => {
let mut settings = self.settings.lock();
let new_value = value.get().expect("type checked upstream");
gst::info!(
CAT,
obj: obj,
"Changing Restart Timeout from {:?} to {:?}",
settings.restart_timeout,
new_value,
);
settings.restart_timeout = new_value;
}
"retry-timeout" => {
let mut settings = self.settings.lock();
let new_value = value.get().expect("type checked upstream");
gst::info!(
CAT,
obj: obj,
"Changing Retry Timeout from {:?} to {:?}",
settings.retry_timeout,
new_value,
);
settings.retry_timeout = new_value;
}
"restart-on-eos" => {
let mut settings = self.settings.lock();
let new_value = value.get().expect("type checked upstream");
gst::info!(
CAT,
obj: obj,
"Changing restart-on-eos from {:?} to {:?}",
settings.restart_on_eos,
new_value,
);
settings.restart_on_eos = new_value;
}
"min-latency" => {
let mut settings = self.settings.lock();
let new_value = value.get().expect("type checked upstream");
gst::info!(
CAT,
obj: obj,
"Changing Minimum Latency from {:?} to {:?}",
settings.min_latency,
new_value,
);
settings.min_latency = new_value;
}
"buffer-duration" => {
let mut settings = self.settings.lock();
let new_value = value.get().expect("type checked upstream");
gst::info!(
CAT,
obj: obj,
"Changing Buffer Duration from {:?} to {:?}",
settings.buffer_duration,
new_value,
);
settings.buffer_duration = new_value;
}
"immediate-fallback" => {
let mut settings = self.settings.lock();
let new_value = value.get().expect("type checked upstream");
gst::info!(
CAT,
obj: obj,
"Changing immediate-fallback from {:?} to {:?}",
settings.immediate_fallback,
new_value,
);
settings.immediate_fallback = new_value;
}
"manual-unblock" => {
let mut settings = self.settings.lock();
let new_value = value.get().expect("type checked upstream");
gst::info!(
CAT,
obj: obj,
"Changing manual-unblock from {:?} to {:?}",
settings.manual_unblock,
new_value,
);
settings.manual_unblock = new_value;
}
_ => unimplemented!(),
}
}
// Called whenever a value of a property is read. It can be called
// at any time from any thread.
#[allow(clippy::blocks_in_if_conditions)]
fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"enable-audio" => {
let settings = self.settings.lock();
settings.enable_audio.to_value()
}
"enable-video" => {
let settings = self.settings.lock();
settings.enable_video.to_value()
}
"uri" => {
let settings = self.settings.lock();
settings.uri.to_value()
}
"source" => {
let settings = self.settings.lock();
settings.source.to_value()
}
"fallback-uri" => {
let settings = self.settings.lock();
settings.fallback_uri.to_value()
}
"timeout" => {
let settings = self.settings.lock();
settings.timeout.to_value()
}
"restart-timeout" => {
let settings = self.settings.lock();
settings.restart_timeout.to_value()
}
"retry-timeout" => {
let settings = self.settings.lock();
settings.retry_timeout.to_value()
}
"restart-on-eos" => {
let settings = self.settings.lock();
settings.restart_on_eos.to_value()
}
"status" => {
let state_guard = self.state.lock();
// If we have no state then we'r stopped
let state = match &*state_guard {
None => return Status::Stopped.to_value(),
Some(ref state) => state,
};
// If any restarts/retries are pending, we're retrying
if state.source_pending_restart
|| state.source_pending_restart_timeout.is_some()
|| state.source_retry_timeout.is_some()
{
return Status::Retrying.to_value();
}
// Otherwise if buffering < 100, we have no streams yet or of the expected
// streams there is no source pad yet, we're buffering
let mut have_audio = false;
let mut have_video = false;
if let Some(ref streams) = state.streams {
for stream in streams.iter() {
have_audio =
have_audio || stream.stream_type().contains(gst::StreamType::AUDIO);
have_video =
have_video || stream.stream_type().contains(gst::StreamType::VIDEO);
}
}
if state.stats.buffering_percent < 100
|| state.source_restart_timeout.is_some()
|| state.streams.is_none()
|| (have_audio
&& state
.audio_stream
.as_ref()
.map(|s| s.source_srcpad.is_none() || s.source_srcpad_block.is_some())
.unwrap_or(true))
|| (have_video
&& state
.video_stream
.as_ref()
.map(|s| s.source_srcpad.is_none() || s.source_srcpad_block.is_some())
.unwrap_or(true))
{
return Status::Buffering.to_value();
}
// Otherwise we're running now
Status::Running.to_value()
}
"min-latency" => {
let settings = self.settings.lock();
settings.min_latency.to_value()
}
"buffer-duration" => {
let settings = self.settings.lock();
settings.buffer_duration.to_value()
}
"statistics" => self.stats().to_value(),
"immediate-fallback" => {
let settings = self.settings.lock();
settings.immediate_fallback.to_value()
}
"manual-unblock" => {
let settings = self.settings.lock();
settings.manual_unblock.to_value()
}
_ => unimplemented!(),
}
}
fn signals() -> &'static [glib::subclass::Signal] {
static SIGNALS: Lazy<Vec<glib::subclass::Signal>> = Lazy::new(|| {
vec![
glib::subclass::Signal::builder(
"update-uri",
&[String::static_type().into()],
String::static_type().into(),
)
.class_handler(|_token, args| {
// Simply return the input by default
Some(args[1].clone())
})
.accumulator(|_hint, ret, value| {
// First signal handler wins
*ret = value.clone();
false
})
.build(),
glib::subclass::Signal::builder("unblock", &[], glib::types::Type::UNIT.into())
.action()
.class_handler(|_token, args| {
let element = args[0].get::<super::FallbackSrc>().expect("signal arg");
let src = element.imp();
let mut state_guard = src.state.lock();
let state = match &mut *state_guard {
None => {
return None;
}
Some(state) => state,
};
state.manually_blocked = false;
if state.schedule_restart_on_unblock
&& src.have_fallback_activated(&element, state)
{
src.schedule_source_restart_timeout(
&element,
state,
gst::ClockTime::ZERO,
);
}
src.unblock_pads(&element, state);
None
})
.build(),
]
});
SIGNALS.as_ref()
}
fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
obj.set_suppressed_flags(gst::ElementFlags::SOURCE | gst::ElementFlags::SINK);
obj.set_element_flags(gst::ElementFlags::SOURCE);
obj.set_bin_flags(gst::BinFlags::STREAMS_AWARE);
}
}
impl GstObjectImpl for FallbackSrc {}
impl ElementImpl for FallbackSrc {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"Fallback Source",
"Generic/Source",
"Live source with uridecodebin3 or custom source, and fallback image stream",
"Sebastian Dröge <sebastian@centricular.com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let audio_src_pad_template = gst::PadTemplate::new(
"audio",
gst::PadDirection::Src,
gst::PadPresence::Sometimes,
&gst::Caps::new_any(),
)
.unwrap();
let video_src_pad_template = gst::PadTemplate::new(
"video",
gst::PadDirection::Src,
gst::PadPresence::Sometimes,
&gst::Caps::new_any(),
)
.unwrap();
vec![audio_src_pad_template, video_src_pad_template]
});
PAD_TEMPLATES.as_ref()
}
#[allow(clippy::single_match)]
fn change_state(
&self,
element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
match transition {
gst::StateChange::NullToReady => {
self.start(element)?;
}
_ => (),
}
self.parent_change_state(element, transition)?;
// Change the source state manually here to be able to catch errors. State changes always
// happen from sink to source, so we do this after chaining up.
self.change_source_state(element, transition);
// Ignore parent state change return to prevent spurious async/no-preroll return values
// due to core state change bugs
match transition {
gst::StateChange::ReadyToPaused | gst::StateChange::PlayingToPaused => {
Ok(gst::StateChangeSuccess::NoPreroll)
}
gst::StateChange::ReadyToNull => {
self.stop(element);
Ok(gst::StateChangeSuccess::Success)
}
_ => Ok(gst::StateChangeSuccess::Success),
}
}
fn send_event(&self, _element: &Self::Type, event: gst::Event) -> bool {
match event.view() {
gst::EventView::Eos(..) => {
gst::debug!(CAT, "Handling element-level EOS, forwarding to all streams");
let mut state_guard = self.state.lock();
let state = match &mut *state_guard {
None => {
return true;
}
Some(state) => state,
};
// We don't want to hold the state lock while pushing out EOS
let mut send_eos_elements: Vec<gst::Element> = vec![];
let mut send_eos_pads: Vec<gst::Pad> = vec![];
send_eos_elements.push(state.source.clone());
for stream in [&mut state.video_stream, &mut state.audio_stream]
.iter_mut()
.filter_map(|v| v.as_mut())
{
// Not strictly necessary as the switch will EOS when receiving
// EOS on its primary pad, just good form.
send_eos_elements.push(stream.fallback_input.clone());
// If our source hadn't been connected to the switch as a primary
// stream, we need to send EOS there ourselves
if stream.source_srcpad.is_none() {
let clocksync_queue_sinkpad =
stream.clocksync_queue.static_pad("sink").unwrap();
send_eos_pads.push(clocksync_queue_sinkpad.clone());
}
}
drop(state_guard);
for elem in send_eos_elements.drain(..) {
elem.send_event(event.clone());
}
for pad in send_eos_pads.drain(..) {
pad.send_event(event.clone());
}
true
}
_ => true,
}
}
}
impl BinImpl for FallbackSrc {
fn handle_message(&self, bin: &Self::Type, msg: gst::Message) {
use gst::MessageView;
match msg.view() {
MessageView::Buffering(m) => {
// Don't forward upwards, we handle this internally
self.handle_buffering(bin, m);
}
MessageView::StreamsSelected(m) => {
// Don't forward upwards, we are exposing streams based on properties
// TODO: Do stream configuration via our own stream collection and handling
// of stream select events
// TODO: Also needs updating of StreamCollection handling in CustomSource
self.handle_streams_selected(bin, m);
}
MessageView::Error(m) => {
if !self.handle_error(bin, m) {
self.parent_handle_message(bin, msg);
}
}
_ => self.parent_handle_message(bin, msg),
}
}
}
impl FallbackSrc {
fn create_main_input(
&self,
element: &super::FallbackSrc,
source: &Source,
buffer_duration: i64,
) -> gst::Element {
let source = match source {
Source::Uri(ref uri) => {
let source = gst::ElementFactory::make("uridecodebin3", Some("uridecodebin"))
.expect("No uridecodebin3 found");
let uri = element.emit_by_name::<glib::GString>("update-uri", &[uri]);
source.set_property("uri", uri);
source.set_property("use-buffering", true);
source.set_property("buffer-duration", buffer_duration);
source
}
Source::Element(ref source) => CustomSource::new(source).upcast(),
};
// Handle any async state changes internally, they don't affect the pipeline because we
// convert everything to a live stream
source.set_property("async-handling", true);
// Don't let the bin handle state changes of the source. We want to do it manually to catch
// possible errors and retry, without causing the whole bin state change to fail
source.set_locked_state(true);
let element_weak = element.downgrade();
source.connect_pad_added(move |_, pad| {
let element = match element_weak.upgrade() {
None => return,
Some(element) => element,
};
let src = element.imp();
if let Err(msg) = src.handle_source_pad_added(&element, pad) {
element.post_error_message(msg);
}
});
let element_weak = element.downgrade();
source.connect_pad_removed(move |_, pad| {
let element = match element_weak.upgrade() {
None => return,
Some(element) => element,
};
let src = element.imp();
src.handle_source_pad_removed(&element, pad);
});
element.add_many(&[&source]).unwrap();
source
}
fn create_fallback_video_input(
&self,
_element: &super::FallbackSrc,
min_latency: gst::ClockTime,
fallback_uri: Option<&str>,
) -> gst::Element {
VideoFallbackSource::new(fallback_uri, min_latency).upcast()
}
fn create_fallback_audio_input(&self, _element: &super::FallbackSrc) -> gst::Element {
let input = gst::Bin::new(Some("fallback_audio"));
let audiotestsrc = gst::ElementFactory::make("audiotestsrc", Some("fallback_audiosrc"))
.expect("No audiotestsrc found");
input.add_many(&[&audiotestsrc]).unwrap();
audiotestsrc.set_property_from_str("wave", "silence");
audiotestsrc.set_property("is-live", true);
let srcpad = audiotestsrc.static_pad("src").unwrap();
input
.add_pad(
&gst::GhostPad::builder(Some("src"), gst::PadDirection::Src)
.build_with_target(&srcpad)
.unwrap(),
)
.unwrap();
input.upcast()
}
fn create_stream(
&self,
element: &super::FallbackSrc,
timeout: gst::ClockTime,
min_latency: gst::ClockTime,
is_audio: bool,
fallback_uri: Option<&str>,
immediate_fallback: bool,
) -> Stream {
let fallback_input = if is_audio {
self.create_fallback_audio_input(element)
} else {
self.create_fallback_video_input(element, min_latency, fallback_uri)
};
let switch =
gst::ElementFactory::make("fallbackswitch", None).expect("No fallbackswitch found");
let clocksync = gst::ElementFactory::make("clocksync", None)
.or_else(|_| -> Result<_, glib::BoolError> {
let identity = gst::ElementFactory::make("identity", None)?;
identity.set_property("sync", true);
Ok(identity)
})
.expect("No clocksync or identity found");
// Workaround for issues caused by https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/800
let clocksync_queue = gst::ElementFactory::make("queue", None).expect("No queue found");
clocksync_queue.set_properties(&[
("max-size-buffers", &0u32),
("max-size-bytes", &0u32),
("max-size-time", &gst::ClockTime::SECOND),
]);
element
.add_many(&[&fallback_input, &switch, &clocksync_queue, &clocksync])
.unwrap();
switch.set_property("timeout", timeout.nseconds());
switch.set_property("min-upstream-latency", min_latency.nseconds());
switch.set_property("immediate-fallback", immediate_fallback);
gst::Element::link_pads(&clocksync_queue, Some("src"), &clocksync, Some("sink")).unwrap();
let clocksync_srcpad = clocksync.static_pad("src").unwrap();
let switch_mainsink = switch.request_pad_simple("sink_%u").unwrap();
clocksync_srcpad.link(&switch_mainsink).unwrap();
switch_mainsink.set_property("priority", 0u32);
// clocksync_queue sink pad is not connected to anything yet at this point!
let fallback_srcpad = fallback_input.static_pad("src").unwrap();
let switch_fallbacksink = switch.request_pad_simple("sink_%u").unwrap();
fallback_srcpad.link(&switch_fallbacksink).unwrap();
switch_fallbacksink.set_property("priority", 1u32);
let element_weak = element.downgrade();
switch.connect_notify(Some("active-pad"), move |_switch, _pspec| {
let element = match element_weak.upgrade() {
None => return,
Some(element) => element,
};
let src = element.imp();
src.handle_switch_active_pad_change(&element);
});
let srcpad = switch.static_pad("src").unwrap();
let templ = element
.pad_template(if is_audio { "audio" } else { "video" })
.unwrap();
let ghostpad = gst::GhostPad::builder_with_template(&templ, Some(&templ.name()))
.proxy_pad_chain_function({
let element_weak = element.downgrade();
move |pad, _parent, buffer| {
let element = match element_weak.upgrade() {
None => return Err(gst::FlowError::Flushing),
Some(element) => element,
};
let src = element.imp();
src.proxy_pad_chain(&element, pad, buffer)
}
})
.build_with_target(&srcpad)
.unwrap();
element.add_pad(&ghostpad).unwrap();
Stream {
fallback_input,
source_srcpad: None,
source_srcpad_block: None,
clocksync,
imagefreeze: None,
clocksync_queue_srcpad: clocksync_queue.static_pad("src").unwrap(),
clocksync_queue,
switch,
srcpad: ghostpad.upcast(),
}
}
fn start(&self, element: &super::FallbackSrc) -> Result<(), gst::StateChangeError> {
gst::debug!(CAT, obj: element, "Starting");
let mut state_guard = self.state.lock();
if state_guard.is_some() {
return Err(gst::StateChangeError);
}
let settings = self.settings.lock().clone();
let configured_source = match settings
.uri
.as_ref()
.cloned()
.map(Source::Uri)
.or_else(|| settings.source.as_ref().cloned().map(Source::Element))
{
Some(source) => source,
None => {
gst::error!(CAT, obj: element, "No URI or source element configured");
gst::element_error!(
element,
gst::LibraryError::Settings,
["No URI or source element configured"]
);
return Err(gst::StateChangeError);
}
};
let fallback_uri = &settings.fallback_uri;
// Create main input
let source = self.create_main_input(element, &configured_source, settings.buffer_duration);
let mut flow_combiner = gst_base::UniqueFlowCombiner::new();
// Create video stream
let video_stream = if settings.enable_video {
let stream = self.create_stream(
element,
settings.timeout,
settings.min_latency,
false,
fallback_uri.as_deref(),
settings.immediate_fallback,
);
flow_combiner.add_pad(&stream.srcpad);
Some(stream)
} else {
None
};
// Create audio stream
let audio_stream = if settings.enable_audio {
let stream = self.create_stream(
element,
settings.timeout,
settings.min_latency,
true,
None,
settings.immediate_fallback,
);
flow_combiner.add_pad(&stream.srcpad);
Some(stream)
} else {
None
};
let manually_blocked = settings.manual_unblock;
*state_guard = Some(State {
source,
source_is_live: false,
source_pending_restart: false,
source_restart_timeout: None,
source_pending_restart_timeout: None,
source_retry_timeout: None,
video_stream,
audio_stream,
flow_combiner,
last_buffering_update: None,
streams: None,
settings,
configured_source,
stats: Stats::default(),
manually_blocked,
schedule_restart_on_unblock: false,
is_image: false,
});
drop(state_guard);
element.no_more_pads();
element.notify("status");
gst::debug!(CAT, obj: element, "Started");
Ok(())
}
fn stop(&self, element: &super::FallbackSrc) {
gst::debug!(CAT, obj: element, "Stopping");
let mut state_guard = self.state.lock();
let mut state = match state_guard.take() {
Some(state) => state,
None => return,
};
drop(state_guard);
element.notify("status");
// In theory all streams should've been removed from the source's pad-removed signal
// handler when going from Paused to Ready but better safe than sorry here
for stream in [&state.video_stream, &state.audio_stream]
.iter()
.filter_map(|v| v.as_ref())
{
element.remove(&stream.switch).unwrap();
element.remove(&stream.clocksync_queue).unwrap();
element.remove(&stream.clocksync).unwrap();
element.remove(&stream.fallback_input).unwrap();
let _ = stream.srcpad.set_target(None::<&gst::Pad>);
let _ = element.remove_pad(&stream.srcpad);
}
state.video_stream = None;
state.audio_stream = None;
if let Source::Element(ref source) = state.configured_source {
// Explicitly remove the source element from the CustomSource so that we can
// later create a new CustomSource and add it again there.
if source.has_as_parent(&state.source) {
let _ = source.set_state(gst::State::Null);
let _ = state
.source
.downcast_ref::<gst::Bin>()
.unwrap()
.remove(source);
}
}
element.remove(&state.source).unwrap();
if let Some(timeout) = state.source_pending_restart_timeout.take() {
timeout.unschedule();
}
if let Some(timeout) = state.source_retry_timeout.take() {
timeout.unschedule();
}
if let Some(timeout) = state.source_restart_timeout.take() {
timeout.unschedule();
}
gst::debug!(CAT, obj: element, "Stopped");
}
fn change_source_state(&self, element: &super::FallbackSrc, transition: gst::StateChange) {
gst::debug!(CAT, obj: element, "Changing source state: {:?}", transition);
let mut state_guard = self.state.lock();
let state = match &mut *state_guard {
Some(state) => state,
None => return,
};
if transition.current() <= transition.next() && state.source_pending_restart {
gst::debug!(
CAT,
obj: element,
"Not starting source because pending restart"
);
return;
} else if transition.next() <= gst::State::Ready && state.source_pending_restart {
gst::debug!(
CAT,
obj: element,
"Unsetting pending restart because shutting down"
);
state.source_pending_restart = false;
if let Some(timeout) = state.source_pending_restart_timeout.take() {
timeout.unschedule();
}
}
let source = state.source.clone();
drop(state_guard);
element.notify("status");
let res = source.set_state(transition.next());
match res {
Err(_) => {
gst::error!(CAT, obj: element, "Source failed to change state");
// Try again later if we're not shutting down
if transition != gst::StateChange::ReadyToNull {
let _ = source.set_state(gst::State::Null);
let mut state_guard = self.state.lock();
let state = state_guard.as_mut().expect("no state");
self.handle_source_error(element, state, RetryReason::StateChangeFailure);
drop(state_guard);
element.notify("statistics");
}
}
Ok(res) => {
gst::debug!(
CAT,
obj: element,
"Source changed state successfully: {:?}",
res
);
let mut state_guard = self.state.lock();
let state = state_guard.as_mut().expect("no state");
// Remember if the source is live
if transition == gst::StateChange::ReadyToPaused {
state.source_is_live = res == gst::StateChangeSuccess::NoPreroll;
}
if (state.source_is_live && transition == gst::StateChange::ReadyToPaused)
|| (!state.source_is_live && transition == gst::StateChange::PausedToPlaying)
{
assert!(state.source_restart_timeout.is_none());
state.schedule_restart_on_unblock = true;
self.schedule_source_restart_timeout(element, state, gst::ClockTime::ZERO);
}
}
}
}
fn proxy_pad_chain(
&self,
element: &super::FallbackSrc,
pad: &gst::ProxyPad,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let res = gst::ProxyPad::chain_default(pad, Some(element), buffer);
let mut state_guard = self.state.lock();
let state = match &mut *state_guard {
None => return res,
Some(state) => state,
};
state.flow_combiner.update_pad_flow(pad, res)
}
fn handle_source_pad_added(
&self,
element: &super::FallbackSrc,
pad: &gst::Pad,
) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, obj: element, "Pad {} added to source", pad.name(),);
let mut state_guard = self.state.lock();
let state = match &mut *state_guard {
None => {
return Ok(());
}
Some(state) => state,
};
let mut is_image = false;
if let Some(ev) = pad.sticky_event::<gst::event::StreamStart>(0) {
let stream = ev.stream();
if let Some(stream) = stream {
if let Some(caps) = stream.caps() {
if let Some(s) = caps.structure(0) {
is_image = s.name().starts_with("image/");
}
}
}
}
if is_image {
if let Some(timeout) = state.source_pending_restart_timeout.take() {
timeout.unschedule();
}
if let Some(timeout) = state.source_retry_timeout.take() {
timeout.unschedule();
}
if let Some(timeout) = state.source_restart_timeout.take() {
timeout.unschedule();
}
}
state.is_image |= is_image;
let (is_video, stream) = match pad.name() {
x if x.starts_with("audio_") => (false, &mut state.audio_stream),
x if x.starts_with("video_") => (true, &mut state.video_stream),
_ => {
let caps = match pad.current_caps().unwrap_or_else(|| pad.query_caps(None)) {
caps if !caps.is_any() && !caps.is_empty() => caps,
_ => return Ok(()),
};
let s = caps.structure(0).unwrap();
if s.name().starts_with("audio/") {
(false, &mut state.audio_stream)
} else if s.name().starts_with("video/") {
(true, &mut state.video_stream)
} else {
// TODO: handle subtitles etc
return Ok(());
}
}
};
let type_ = if is_video { "video" } else { "audio" };
let stream = match stream {
None => {
gst::debug!(CAT, obj: element, "No {} stream enabled", type_);
return Ok(());
}
Some(Stream {
source_srcpad: Some(_),
..
}) => {
gst::debug!(CAT, obj: element, "Already configured a {} stream", type_);
return Ok(());
}
Some(ref mut stream) => stream,
};
let sinkpad = if is_image {
let imagefreeze = if let Some(ref imagefreeze) = stream.imagefreeze {
imagefreeze
} else {
let imagefreeze =
gst::ElementFactory::make("imagefreeze", None).expect("no imagefreeze found");
gst::debug!(CAT, "image stream, inserting imagefreeze");
element.add(&imagefreeze).unwrap();
imagefreeze.set_property("is-live", true);
stream.imagefreeze = Some(imagefreeze);
stream.imagefreeze.as_ref().unwrap()
};
if imagefreeze.sync_state_with_parent().is_err() {
gst::error!(CAT, obj: element, "imagefreeze failed to change state",);
return Err(gst::error_msg!(
gst::CoreError::StateChange,
["Failed to change imagefreeze state"]
));
}
imagefreeze.link(&stream.clocksync_queue).unwrap();
imagefreeze.static_pad("sink").unwrap()
} else {
if let Some(imagefreeze) = stream.imagefreeze.take() {
imagefreeze.set_locked_state(true);
let _ = imagefreeze.set_state(gst::State::Null);
element.remove(&imagefreeze).unwrap();
}
stream.clocksync_queue.static_pad("sink").unwrap()
};
pad.link(&sinkpad).map_err(|err| {
gst::error!(
CAT,
obj: element,
"Failed to link source pad to clocksync: {}",
err
);
gst::error_msg!(
gst::CoreError::Negotiation,
["Failed to link source pad to clocksync: {}", err]
)
})?;
let element_weak = element.downgrade();
pad.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, move |pad, info| {
let element = match element_weak.upgrade() {
None => return gst::PadProbeReturn::Ok,
Some(element) => element,
};
let src = element.imp();
match info.data {
Some(gst::PadProbeData::Event(ref ev)) if ev.type_() == gst::EventType::Eos => {
gst::debug!(
CAT,
obj: &element,
"Received EOS from source on pad {}",
pad.name()
);
let mut state_guard = src.state.lock();
let state = match &mut *state_guard {
None => {
return gst::PadProbeReturn::Ok;
}
Some(state) => state,
};
if is_image {
gst::PadProbeReturn::Ok
} else if state.settings.restart_on_eos {
src.handle_source_error(&element, state, RetryReason::Eos);
drop(state_guard);
element.notify("statistics");
gst::PadProbeReturn::Drop
} else {
if let Some(other_stream) = {
if is_video {
state.audio_stream.as_ref()
} else {
state.video_stream.as_ref()
}
} {
if other_stream.source_srcpad.is_none() {
let fallback_input = &other_stream.fallback_input;
let clocksync_queue_sinkpad =
other_stream.clocksync_queue.static_pad("sink").unwrap();
fallback_input.call_async(move |fallback_input| {
fallback_input.send_event(gst::event::Eos::new());
clocksync_queue_sinkpad.send_event(gst::event::Eos::new());
});
}
}
gst::PadProbeReturn::Ok
}
}
_ => gst::PadProbeReturn::Ok,
}
});
assert!(stream.source_srcpad_block.is_none());
stream.source_srcpad = Some(pad.clone());
stream.source_srcpad_block = Some(self.add_pad_probe(element, stream));
drop(state_guard);
element.notify("status");
Ok(())
}
fn add_pad_probe(&self, element: &super::FallbackSrc, stream: &mut Stream) -> Block {
// FIXME: Not literally correct as we add the probe to the queue source pad but that's only
// a workaround until
// https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/800
// is fixed.
gst::debug!(
CAT,
obj: element,
"Adding probe to pad {}",
stream.source_srcpad.as_ref().unwrap().name()
);
let element_weak = element.downgrade();
let probe_id = stream
.clocksync_queue_srcpad
.add_probe(
gst::PadProbeType::BLOCK
| gst::PadProbeType::BUFFER
| gst::PadProbeType::EVENT_DOWNSTREAM,
move |pad, info| {
let element = match element_weak.upgrade() {
None => return gst::PadProbeReturn::Pass,
Some(element) => element,
};
let pts = match info.data {
Some(gst::PadProbeData::Buffer(ref buffer)) => buffer.pts(),
Some(gst::PadProbeData::Event(ref ev)) => match ev.view() {
gst::EventView::Gap(ev) => Some(ev.get().0),
_ => return gst::PadProbeReturn::Pass,
},
_ => unreachable!(),
};
let src = element.imp();
if let Err(msg) = src.handle_pad_blocked(&element, pad, pts) {
element.post_error_message(msg);
}
gst::PadProbeReturn::Ok
},
)
.unwrap();
Block {
pad: stream.clocksync_queue_srcpad.clone(),
probe_id,
running_time: gst::ClockTime::NONE,
}
}
fn handle_pad_blocked(
&self,
element: &super::FallbackSrc,
pad: &gst::Pad,
pts: impl Into<Option<gst::ClockTime>>,
) -> Result<(), gst::ErrorMessage> {
let mut state_guard = self.state.lock();
let state = match &mut *state_guard {
None => {
return Ok(());
}
Some(state) => state,
};
// FIXME: Not literally correct as we added the probe to the queue source pad but that's only
// a workaround until
// https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/800
// is fixed.
let stream = if let Some(stream) = state
.audio_stream
.as_mut()
.filter(|s| &s.clocksync_queue_srcpad == pad)
{
gst::debug!(
CAT,
obj: element,
"Called probe on pad {}",
stream.source_srcpad.as_ref().unwrap().name()
);
stream
} else if let Some(stream) = state
.video_stream
.as_mut()
.filter(|s| &s.clocksync_queue_srcpad == pad)
{
gst::debug!(
CAT,
obj: element,
"Called probe on pad {}",
stream.source_srcpad.as_ref().unwrap().name()
);
stream
} else {
unreachable!();
};
// Directly unblock for live streams
if state.source_is_live {
for (source_srcpad, block) in [state.video_stream.as_mut(), state.audio_stream.as_mut()]
.iter_mut()
.filter_map(|s| s.as_mut())
.filter_map(|s| {
if let Some(block) = s.source_srcpad_block.take() {
Some((s.source_srcpad.as_ref().unwrap(), block))
} else {
None
}
})
{
gst::debug!(
CAT,
obj: element,
"Removing pad probe for pad {}",
source_srcpad.name()
);
block.pad.remove_probe(block.probe_id);
}
gst::debug!(CAT, obj: element, "Live source, unblocking directly");
drop(state_guard);
element.notify("status");
return Ok(());
}
// Update running time for this block
let block = match stream.source_srcpad_block {
Some(ref mut block) => block,
None => return Ok(()),
};
let segment = match pad.sticky_event::<gst::event::Segment>(0) {
Some(ev) => ev.segment().clone(),
None => {
gst::warning!(CAT, obj: element, "Have no segment event yet");
return Ok(());
}
};
let segment = segment.downcast::<gst::ClockTime>().map_err(|_| {
gst::error!(CAT, obj: element, "Have no time segment");
gst::error_msg!(gst::CoreError::Clock, ["Have no time segment"])
})?;
let pts = pts.into();
let running_time = if let Some((_, start)) =
pts.zip(segment.start()).filter(|(pts, start)| pts < start)
{
segment.to_running_time(start)
} else if let Some((_, stop)) = pts.zip(segment.stop()).filter(|(pts, stop)| pts >= stop) {
segment.to_running_time(stop)
} else {
segment.to_running_time(pts)
};
gst::debug!(
CAT,
obj: element,
"Have block running time {}",
running_time.display(),
);
block.running_time = running_time;
self.unblock_pads(element, state);
drop(state_guard);
element.notify("status");
Ok(())
}
fn unblock_pads(&self, element: &super::FallbackSrc, state: &mut State) {
if state.manually_blocked {
gst::debug!(CAT, obj: element, "Not unblocking yet: manual unblock",);
return;
}
// Check if all streams are blocked and have a running time and we have
// 100% buffering
if state.stats.buffering_percent < 100 {
gst::debug!(
CAT,
obj: element,
"Not unblocking yet: buffering {}%",
state.stats.buffering_percent
);
return;
}
let streams = match state.streams {
None => {
gst::debug!(CAT, obj: element, "Have no stream collection yet");
return;
}
Some(ref streams) => streams,
};
let mut have_audio = false;
let mut have_video = false;
for stream in streams.iter() {
have_audio = have_audio || stream.stream_type().contains(gst::StreamType::AUDIO);
have_video = have_video || stream.stream_type().contains(gst::StreamType::VIDEO);
}
let want_audio = state.settings.enable_audio;
let want_video = state.settings.enable_video;
let audio_running_time = state
.audio_stream
.as_ref()
.and_then(|s| s.source_srcpad_block.as_ref())
.and_then(|b| b.running_time);
let video_running_time = state
.video_stream
.as_ref()
.and_then(|s| s.source_srcpad_block.as_ref())
.and_then(|b| b.running_time);
let audio_srcpad = state
.audio_stream
.as_ref()
.and_then(|s| s.source_srcpad.as_ref().cloned());
let video_srcpad = state
.video_stream
.as_ref()
.and_then(|s| s.source_srcpad.as_ref().cloned());
let audio_is_eos = audio_srcpad
.as_ref()
.map(|p| p.pad_flags().contains(gst::PadFlags::EOS))
.unwrap_or(false);
let video_is_eos = video_srcpad
.as_ref()
.map(|p| p.pad_flags().contains(gst::PadFlags::EOS))
.unwrap_or(false);
// If we need both, wait for both and take the minimum, otherwise take the one we need.
// Also consider EOS, we'd never get a new running time after EOS so don't need to wait.
// FIXME: All this surely can be simplified somehow
// FIXME I guess this could be moved up
let current_running_time = match element.current_running_time() {
Some(current_running_time) => current_running_time,
None => {
gst::debug!(CAT, obj: element, "Waiting for current_running_time");
return;
}
};
if have_audio && want_audio && have_video && want_video {
if audio_running_time.is_none()
&& !audio_is_eos
&& video_running_time.is_none()
&& !video_is_eos
{
gst::debug!(
CAT,
obj: element,
"Waiting for audio and video pads to block"
);
return;
} else if audio_running_time.is_none() && !audio_is_eos {
gst::debug!(CAT, obj: element, "Waiting for audio pad to block");
return;
} else if video_running_time.is_none() && !video_is_eos {
gst::debug!(CAT, obj: element, "Waiting for video pad to block");
return;
}
let audio_running_time = audio_running_time.expect("checked above");
let video_running_time = video_running_time.expect("checked above");
let min_running_time = if audio_is_eos {
video_running_time
} else if video_is_eos {
audio_running_time
} else {
audio_running_time.min(video_running_time)
};
let offset = if current_running_time > min_running_time {
(current_running_time - min_running_time).nseconds() as i64
} else {
-((min_running_time - current_running_time).nseconds() as i64)
};
gst::debug!(
CAT,
obj: element,
"Unblocking at {} with pad offset {} (audio: {} eos {}, video {} eos {})",
current_running_time,
offset,
audio_running_time,
audio_is_eos,
video_running_time,
video_is_eos,
);
if let Some(block) = state
.audio_stream
.as_mut()
.and_then(|s| s.source_srcpad_block.take())
{
if !audio_is_eos {
block.pad.set_offset(offset);
}
block.pad.remove_probe(block.probe_id);
}
if let Some(block) = state
.video_stream
.as_mut()
.and_then(|s| s.source_srcpad_block.take())
{
if !video_is_eos {
block.pad.set_offset(offset);
}
block.pad.remove_probe(block.probe_id);
}
} else if have_audio && want_audio {
let audio_running_time = match audio_running_time {
Some(audio_running_time) => audio_running_time,
None => {
gst::debug!(CAT, obj: element, "Waiting for audio pad to block");
return;
}
};
let offset = if current_running_time > audio_running_time {
(current_running_time - audio_running_time).nseconds() as i64
} else {
-((audio_running_time - current_running_time).nseconds() as i64)
};
gst::debug!(
CAT,
obj: element,
"Unblocking at {} with pad offset {} (audio: {} eos {})",
current_running_time,
offset,
audio_running_time,
audio_is_eos
);
if let Some(block) = state
.audio_stream
.as_mut()
.and_then(|s| s.source_srcpad_block.take())
{
if !audio_is_eos {
block.pad.set_offset(offset);
}
block.pad.remove_probe(block.probe_id);
}
} else if have_video && want_video {
let video_running_time = match video_running_time {
Some(video_running_time) => video_running_time,
None => {
gst::debug!(CAT, obj: element, "Waiting for video pad to block");
return;
}
};
let offset = if current_running_time > video_running_time {
(current_running_time - video_running_time).nseconds() as i64
} else {
-((video_running_time - current_running_time).nseconds() as i64)
};
gst::debug!(
CAT,
obj: element,
"Unblocking at {} with pad offset {} (video: {} eos {})",
current_running_time,
offset,
video_running_time,
video_is_eos
);
if let Some(block) = state
.video_stream
.as_mut()
.and_then(|s| s.source_srcpad_block.take())
{
if !video_is_eos {
block.pad.set_offset(offset);
}
block.pad.remove_probe(block.probe_id);
}
}
}
fn handle_source_pad_removed(&self, element: &super::FallbackSrc, pad: &gst::Pad) {
gst::debug!(CAT, obj: element, "Pad {} removed from source", pad.name());
let mut state_guard = self.state.lock();
let state = match &mut *state_guard {
None => {
return;
}
Some(state) => state,
};
// Don't have to do anything here other than forgetting about the pad. Unlinking will
// automatically happen while the pad is being removed from source and thus leaves the
// bin hierarchy
let stream = if let Some(stream) = state
.audio_stream
.as_mut()
.filter(|s| s.source_srcpad.as_ref() == Some(pad))
{
stream
} else if let Some(stream) = state
.video_stream
.as_mut()
.filter(|s| s.source_srcpad.as_ref() == Some(pad))
{
stream
} else {
return;
};
stream.source_srcpad = None;
self.unblock_pads(element, state);
drop(state_guard);
element.notify("status");
}
fn handle_buffering(&self, element: &super::FallbackSrc, m: &gst::message::Buffering) {
let mut state_guard = self.state.lock();
let state = match &mut *state_guard {
None => {
return;
}
Some(state) => state,
};
if state.source_pending_restart {
gst::debug!(CAT, obj: element, "Has pending restart");
return;
}
gst::debug!(CAT, obj: element, "Got buffering {}%", m.percent());
state.stats.buffering_percent = m.percent();
if state.stats.buffering_percent < 100 {
state.last_buffering_update = Some(Instant::now());
// Block source pads if needed to pause
if let Some(ref mut stream) = state.audio_stream {
if stream.source_srcpad_block.is_none() && stream.source_srcpad.is_some() {
stream.source_srcpad_block = Some(self.add_pad_probe(element, stream));
}
}
if let Some(ref mut stream) = state.video_stream {
if stream.source_srcpad_block.is_none() && stream.source_srcpad.is_some() {
stream.source_srcpad_block = Some(self.add_pad_probe(element, stream));
}
}
} else {
// Check if we can unblock now
self.unblock_pads(element, state);
}
drop(state_guard);
element.notify("status");
element.notify("statistics");
}
fn handle_streams_selected(
&self,
element: &super::FallbackSrc,
m: &gst::message::StreamsSelected,
) {
let mut state_guard = self.state.lock();
let state = match &mut *state_guard {
None => {
return;
}
Some(state) => state,
};
let streams = m.stream_collection();
gst::debug!(
CAT,
obj: element,
"Got stream collection {:?}",
streams.debug()
);
let mut have_audio = false;
let mut have_video = false;
for stream in streams.iter() {
have_audio = have_audio || stream.stream_type().contains(gst::StreamType::AUDIO);
have_video = have_video || stream.stream_type().contains(gst::StreamType::VIDEO);
}
if !have_audio && state.settings.enable_audio {
gst::warning!(
CAT,
obj: element,
"Have no audio streams but audio is enabled"
);
}
if !have_video && state.settings.enable_video {
gst::warning!(
CAT,
obj: element,
"Have no video streams but video is enabled"
);
}
state.streams = Some(streams);
// This might not be the first stream collection and we might have some unblocked pads from
// before already, which would need to be blocked again now for keeping things in sync
for stream in [&mut state.video_stream, &mut state.audio_stream]
.iter_mut()
.filter_map(|v| v.as_mut())
{
if stream.source_srcpad.is_some() && stream.source_srcpad_block.is_none() {
stream.source_srcpad_block = Some(self.add_pad_probe(element, stream));
}
}
self.unblock_pads(element, state);
drop(state_guard);
element.notify("status");
}
fn handle_error(&self, element: &super::FallbackSrc, m: &gst::message::Error) -> bool {
let mut state_guard = self.state.lock();
let state = match &mut *state_guard {
None => {
return false;
}
Some(state) => state,
};
let src = match m.src().and_then(|s| s.downcast::<gst::Element>().ok()) {
None => return false,
Some(src) => src,
};
gst::debug!(
CAT,
obj: element,
"Got error message from {}",
src.path_string()
);
if src == state.source || src.has_as_ancestor(&state.source) {
self.handle_source_error(element, state, RetryReason::Error);
drop(state_guard);
element.notify("status");
element.notify("statistics");
return true;
}
// Check if error is from video fallback input and if so, try another
// fallback to videotestsrc
if let Some(ref mut video_stream) = state.video_stream {
if src == video_stream.fallback_input
|| src.has_as_ancestor(&video_stream.fallback_input)
{
gst::debug!(CAT, obj: element, "Got error from video fallback input");
let prev_fallback_uri = video_stream
.fallback_input
.property::<Option<String>>("uri");
// This means previously videotestsrc was configured
// Something went wrong and there is no other way than to error out
if prev_fallback_uri.is_none() {
return false;
}
let fallback_input = &video_stream.fallback_input;
fallback_input.call_async(|fallback_input| {
// Re-run video fallback input with videotestsrc
let _ = fallback_input.set_state(gst::State::Null);
let _ = fallback_input.set_property("uri", None::<&str>);
let _ = fallback_input.sync_state_with_parent();
});
return true;
}
}
gst::error!(
CAT,
obj: element,
"Give up for error message from {}",
src.path_string()
);
false
}
fn handle_source_error(
&self,
element: &super::FallbackSrc,
state: &mut State,
reason: RetryReason,
) {
gst::debug!(CAT, obj: element, "Handling source error: {:?}", reason);
state.stats.last_retry_reason = reason;
if state.source_pending_restart {
gst::debug!(CAT, obj: element, "Source is already pending restart");
return;
}
// Increase retry count only if there was no pending restart
state.stats.num_retry += 1;
// Unschedule pending timeout, we're restarting now
if let Some(timeout) = state.source_restart_timeout.take() {
timeout.unschedule();
}
// Prevent state changes from changing the state in an uncoordinated way
state.source_pending_restart = true;
// Drop any EOS events from any source pads of the source that might happen because of the
// error. We don't need to remove these pad probes because restarting the source will also
// remove/add the pads again.
for pad in state.source.src_pads() {
pad.add_probe(
gst::PadProbeType::EVENT_DOWNSTREAM,
|_pad, info| match info.data {
Some(gst::PadProbeData::Event(ref event)) => {
if event.type_() == gst::EventType::Eos {
gst::PadProbeReturn::Drop
} else {
gst::PadProbeReturn::Ok
}
}
_ => unreachable!(),
},
)
.unwrap();
}
let source_weak = state.source.downgrade();
element.call_async(move |element| {
let src = element.imp();
let source = match source_weak.upgrade() {
None => return,
Some(source) => source,
};
// Remove blocking pad probes if they are still there as otherwise shutting down the
// source will deadlock on the probes.
let mut state_guard = src.state.lock();
let state = match &mut *state_guard {
None
| Some(State {
source_pending_restart: false,
..
}) => {
gst::debug!(CAT, obj: element, "Restarting source not needed anymore");
return;
}
Some(state) => state,
};
for (source_srcpad_name, block) in
[state.video_stream.as_mut(), state.audio_stream.as_mut()]
.iter_mut()
.filter_map(|s| s.as_mut())
.filter_map(|s| {
if let Some(block) = s.source_srcpad_block.take() {
Some((s.source_srcpad.as_ref().map(|pad| pad.name()), block))
} else {
None
}
})
{
gst::debug!(
CAT,
obj: element,
"Removing pad probe for pad {}",
source_srcpad_name.as_deref().unwrap_or("UNKNOWN")
);
block.pad.remove_probe(block.probe_id);
}
let stream_sinkpads = [state.audio_stream.as_ref(), state.video_stream.as_ref()]
.into_iter()
.flatten()
.map(|s| {
if let Some(ref imagefreeze) = s.imagefreeze {
imagefreeze.static_pad("sink").unwrap()
} else {
s.clocksync_queue.static_pad("sink").unwrap()
}
})
.collect::<Vec<_>>();
let stream_srcpads = [state.audio_stream.as_ref(), state.video_stream.as_ref()]
.into_iter()
.flatten()
.map(|s| {
let srcpad = s.srcpad.clone();
let probe_id = srcpad
.add_probe(
gst::PadProbeType::EVENT_DOWNSTREAM | gst::PadProbeType::EVENT_FLUSH,
move |_pad, info| match info.data {
Some(gst::PadProbeData::Event(ref ev)) => match ev.view() {
gst::EventView::FlushStart(_) => gst::PadProbeReturn::Drop,
gst::EventView::FlushStop(_) => gst::PadProbeReturn::Drop,
_ => gst::PadProbeReturn::Ok,
},
_ => gst::PadProbeReturn::Ok,
},
)
.unwrap();
(probe_id, srcpad)
})
.collect::<Vec<_>>();
drop(state_guard);
gst::debug!(CAT, obj: element, "Flushing source");
let _ = source.send_event(gst::event::FlushStart::builder().build());
gst::debug!(CAT, obj: element, "Shutting down source");
let _ = source.set_state(gst::State::Null);
gst::debug!(CAT, obj: element, "Stop flushing downstream of source");
for pad in stream_sinkpads {
let _ = pad.send_event(gst::event::FlushStop::builder(true).build());
}
for (probe_id, pad) in stream_srcpads {
pad.remove_probe(probe_id);
}
// Sleep for 1s before retrying
let mut state_guard = src.state.lock();
let state = match &mut *state_guard {
None
| Some(State {
source_pending_restart: false,
..
}) => {
gst::debug!(CAT, obj: element, "Restarting source not needed anymore");
return;
}
Some(state) => state,
};
for stream in [state.video_stream.as_mut(), state.audio_stream.as_mut()]
.iter_mut()
.filter_map(|s| s.as_mut())
{
stream.source_srcpad_block = None;
stream.source_srcpad = None;
}
gst::debug!(CAT, obj: element, "Waiting for 1s before retrying");
let clock = gst::SystemClock::obtain();
let wait_time = clock.time().unwrap() + gst::ClockTime::SECOND;
assert!(state.source_pending_restart_timeout.is_none());
let timeout = clock.new_single_shot_id(wait_time);
let element_weak = element.downgrade();
timeout
.wait_async(move |_clock, _time, _id| {
let element = match element_weak.upgrade() {
None => return,
Some(element) => element,
};
gst::debug!(CAT, obj: &element, "Woke up, retrying");
element.call_async(|element| {
let src = element.imp();
let mut state_guard = src.state.lock();
let state = match &mut *state_guard {
None
| Some(State {
source_pending_restart: false,
..
}) => {
gst::debug!(
CAT,
obj: element,
"Restarting source not needed anymore"
);
return;
}
Some(state) => state,
};
let (source, old_source) = if let Source::Uri(..) = state.configured_source
{
// FIXME: Create a new uridecodebin3 because it currently is not reusable
// See https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/746
element.remove(&state.source).unwrap();
let source = src.create_main_input(
element,
&state.configured_source,
state.settings.buffer_duration,
);
(
source.clone(),
Some(mem::replace(&mut state.source, source)),
)
} else {
(state.source.clone(), None)
};
state.source_pending_restart = false;
state.source_pending_restart_timeout = None;
state.stats.buffering_percent = 100;
state.last_buffering_update = None;
if let Some(timeout) = state.source_restart_timeout.take() {
gst::debug!(CAT, obj: element, "Unscheduling restart timeout");
timeout.unschedule();
}
drop(state_guard);
if let Some(old_source) = old_source {
// Drop old source after releasing the lock, it might call the pad-removed callback
// still
drop(old_source);
}
if source.sync_state_with_parent().is_err() {
gst::error!(CAT, obj: element, "Source failed to change state");
let _ = source.set_state(gst::State::Null);
let mut state_guard = src.state.lock();
let state = state_guard.as_mut().expect("no state");
src.handle_source_error(
element,
state,
RetryReason::StateChangeFailure,
);
drop(state_guard);
element.notify("statistics");
} else {
let mut state_guard = src.state.lock();
let state = state_guard.as_mut().expect("no state");
assert!(state.source_restart_timeout.is_none());
src.schedule_source_restart_timeout(
element,
state,
gst::ClockTime::ZERO,
);
}
});
})
.expect("Failed to wait async");
state.source_pending_restart_timeout = Some(timeout);
});
}
#[allow(clippy::blocks_in_if_conditions)]
fn schedule_source_restart_timeout(
&self,
element: &super::FallbackSrc,
state: &mut State,
elapsed: gst::ClockTime,
) {
if state.source_pending_restart {
gst::debug!(
CAT,
obj: element,
"Not scheduling source restart timeout because source is pending restart already",
);
return;
}
if state.is_image {
gst::debug!(
CAT,
obj: element,
"Not scheduling source restart timeout because we are playing back an image",
);
return;
}
if state.manually_blocked {
gst::debug!(
CAT,
obj: element,
"Not scheduling source restart timeout because we are manually blocked",
);
return;
}
let clock = gst::SystemClock::obtain();
let wait_time = clock.time().unwrap() + state.settings.restart_timeout - elapsed;
gst::debug!(
CAT,
obj: element,
"Scheduling source restart timeout for {}",
wait_time,
);
let timeout = clock.new_single_shot_id(wait_time);
let element_weak = element.downgrade();
timeout
.wait_async(move |_clock, _time, _id| {
let element = match element_weak.upgrade() {
None => return,
Some(element) => element,
};
element.call_async(move |element| {
let src = element.imp();
gst::debug!(CAT, obj: element, "Source restart timeout triggered");
let mut state_guard = src.state.lock();
let state = match &mut *state_guard {
None => {
gst::debug!(CAT, obj: element, "Restarting source not needed anymore");
return;
}
Some(state) => state,
};
state.source_restart_timeout = None;
// If we have the fallback activated then restart the source now.
if src.have_fallback_activated(element, state) {
// If we're not actively buffering right now let's restart the source
if state
.last_buffering_update
.map(|i| i.elapsed() >= state.settings.restart_timeout.into())
.unwrap_or(state.stats.buffering_percent == 100)
{
gst::debug!(CAT, obj: element, "Not buffering, restarting source");
src.handle_source_error(element, state, RetryReason::Timeout);
drop(state_guard);
element.notify("statistics");
} else {
gst::debug!(CAT, obj: element, "Buffering, restarting source later");
let elapsed = state
.last_buffering_update
.and_then(|last_buffering_update| {
gst::ClockTime::try_from(last_buffering_update.elapsed()).ok()
})
.unwrap_or(gst::ClockTime::ZERO);
src.schedule_source_restart_timeout(element, state, elapsed);
}
} else {
gst::debug!(CAT, obj: element, "Restarting source not needed anymore");
}
});
})
.expect("Failed to wait async");
state.source_restart_timeout = Some(timeout);
}
#[allow(clippy::blocks_in_if_conditions)]
fn have_fallback_activated(&self, _element: &super::FallbackSrc, state: &State) -> bool {
let mut have_audio = false;
let mut have_video = false;
if let Some(ref streams) = state.streams {
for stream in streams.iter() {
have_audio = have_audio || stream.stream_type().contains(gst::StreamType::AUDIO);
have_video = have_video || stream.stream_type().contains(gst::StreamType::VIDEO);
}
}
// If we have neither audio nor video (no streams yet), or active pad for the ones we have
// is the fallback pad then we have the fallback activated.
(!have_audio && !have_video)
|| (have_audio
&& state.audio_stream.is_some()
&& state
.audio_stream
.as_ref()
.and_then(|s| s.switch.property::<Option<gst::Pad>>("active-pad"))
.map(|p| p.property::<u32>("priority") != 0)
.unwrap_or(true))
|| (have_video
&& state.video_stream.is_some()
&& state
.video_stream
.as_ref()
.and_then(|s| s.switch.property::<Option<gst::Pad>>("active-pad"))
.map(|p| p.property::<u32>("priority") != 0)
.unwrap_or(true))
}
fn handle_switch_active_pad_change(&self, element: &super::FallbackSrc) {
let mut state_guard = self.state.lock();
let state = match &mut *state_guard {
None => {
return;
}
Some(state) => state,
};
// If we have the fallback activated then start the retry timeout unless it was started
// already. Otherwise cancel the retry timeout.
if self.have_fallback_activated(element, state) {
gst::warning!(CAT, obj: element, "Switched to fallback stream");
if state.source_restart_timeout.is_none() {
self.schedule_source_restart_timeout(element, state, gst::ClockTime::ZERO);
}
} else {
gst::debug!(CAT, obj: element, "Switched to main stream");
if let Some(timeout) = state.source_retry_timeout.take() {
gst::debug!(CAT, obj: element, "Unscheduling retry timeout");
timeout.unschedule();
}
if let Some(timeout) = state.source_restart_timeout.take() {
gst::debug!(CAT, obj: element, "Unscheduling restart timeout");
timeout.unschedule();
}
}
drop(state_guard);
element.notify("status");
}
fn stats(&self) -> gst::Structure {
let state_guard = self.state.lock();
let state = match &*state_guard {
None => return Stats::default().to_structure(),
Some(ref state) => state,
};
state.stats.to_structure()
}
}