threadshare: simplify Pad{Src,Sink} implementations

Pad{Src,Sink}[Ref] delegate some functions to their respective
Pad{Src,Sink}Inner. Since they act as smart pointers, we can
safely implement the Deref trait to simplify the implementations.
This commit is contained in:
François Laignel 2020-05-26 11:30:14 +02:00 committed by Sebastian Dröge
parent 13d3fd1cc8
commit dfaf59a59b

View file

@ -42,15 +42,13 @@
//! ╰─────────────────╯ ╭─>│ │╌╌╌>│ │─╮ │
//! ╭───────╯ │ │ ╰──┰──╯ ╰──┰──╯ ╰───────╮ │
//! ╭────────────╮ ╭────────╮ push* │ ┃ ┃ ╭─────────╮
//! │ Pad Task ↺ │<──│ PadSrc │───────╯ ┃ ┃ │ PadSink │
//! │ Pad Task ↺ │──>│ PadSrc │───────╯ ┃ ┃ │ PadSink │
//! ╰────────────╯ ╰────────╯ ┃ ┃ ╰─────────╯
//! ━━━━━━━━━━━━━━━━━━━━━━│━━━━━━━━━━━━━━━━━┛ ┗━━━━━━━━━━━━━━━│━━━━━━━━━━━━
//! ╰───────────────────╮ ╭─────────────────╯
//! ╭──────────────╮
//! │ PadContext │
//! │╭────────────╮│
//! ││ Context ↺ ││
//! ╰╰────────────╯╯
//! ━━━━━━━│━━━━━━━━━━━━━━│━━━━━━━━━━━━━━━━━┛ ┗━━━━━━━━━━━━━━━│━━━━━━━━━━━━
//! ╰──────────────┴───────────────────╮ ╭─────────────────╯
//! ╭────────────╮
//! │ Context ↺ │
//! ╰────────────╯
//! ```
//!
//! Asynchronous operations for both [`PadSrc`] in `Element A` and [`PadSink`] in `Element B` run on
@ -78,6 +76,7 @@ use gst::{gst_debug, gst_error, gst_fixme, gst_log, gst_loggable_error};
use gst::{FlowError, FlowSuccess};
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::{Arc, Weak};
use super::executor::{block_on_or_add_sub_task, Context};
@ -204,7 +203,7 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static {
}
#[derive(Debug)]
struct PadSrcInner {
pub struct PadSrcInner {
gst_pad: gst::Pad,
}
@ -216,6 +215,66 @@ impl PadSrcInner {
PadSrcInner { gst_pad }
}
pub fn gst_pad(&self) -> &gst::Pad {
&self.gst_pad
}
pub async fn push(&self, buffer: gst::Buffer) -> Result<FlowSuccess, FlowError> {
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", buffer);
let success = self.gst_pad.push(buffer).map_err(|err| {
gst_error!(RUNTIME_CAT,
obj: self.gst_pad(),
"Failed to push Buffer to PadSrc: {:?}",
err,
);
err
})?;
gst_log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks");
while Context::current_has_sub_tasks() {
Context::drain_sub_tasks().await?;
}
Ok(success)
}
pub async fn push_list(&self, list: gst::BufferList) -> Result<FlowSuccess, FlowError> {
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", list);
let success = self.gst_pad.push_list(list).map_err(|err| {
gst_error!(
RUNTIME_CAT,
obj: self.gst_pad(),
"Failed to push BufferList to PadSrc: {:?}",
err,
);
err
})?;
gst_log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks");
while Context::current_has_sub_tasks() {
Context::drain_sub_tasks().await?;
}
Ok(success)
}
pub async fn push_event(&self, event: gst::Event) -> bool {
gst_log!(RUNTIME_CAT, obj: &self.gst_pad, "Pushing {:?}", event);
let was_handled = self.gst_pad().push_event(event);
gst_log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks");
while Context::current_has_sub_tasks() {
if Context::drain_sub_tasks().await.is_err() {
return false;
}
}
was_handled
}
}
/// A [`PadSrc`] which can be moved in [`handler`]s functions and `Future`s.
@ -249,51 +308,20 @@ impl PadSrcWeak {
/// [`pad` module]: index.html
#[derive(Debug)]
pub struct PadSrcRef<'a> {
strong: PadSrcStrong,
phantom: PhantomData<&'a PadSrcStrong>,
strong: Arc<PadSrcInner>,
phantom: PhantomData<&'a Self>,
}
impl<'a> PadSrcRef<'a> {
fn new(inner_arc: Arc<PadSrcInner>) -> Self {
PadSrcRef {
strong: PadSrcStrong(inner_arc),
strong: inner_arc,
phantom: PhantomData,
}
}
pub fn gst_pad(&self) -> &gst::Pad {
self.strong.gst_pad()
}
///// Spawns `future` using current [`PadContext`].
/////
///// # Panics
/////
///// This function panics if the `PadSrc` is not prepared.
/////
///// [`PadContext`]: ../struct.PadContext.html
//pub fn spawn<Fut>(&self, future: Fut) -> JoinHandle<Fut::Output>
//where
// Fut: Future + Send + 'static,
// Fut::Output: Send + 'static,
//{
// self.strong.spawn(future)
//}
pub fn downgrade(&self) -> PadSrcWeak {
self.strong.downgrade()
}
pub async fn push(&self, buffer: gst::Buffer) -> Result<FlowSuccess, FlowError> {
self.strong.push(buffer).await
}
pub async fn push_list(&self, list: gst::BufferList) -> Result<FlowSuccess, FlowError> {
self.strong.push_list(list).await
}
pub async fn push_event(&self, event: gst::Event) -> bool {
self.strong.push_event(event).await
PadSrcWeak(Arc::downgrade(&self.strong))
}
fn activate_mode_hook(
@ -317,94 +345,11 @@ impl<'a> PadSrcRef<'a> {
}
}
#[derive(Debug)]
struct PadSrcStrong(Arc<PadSrcInner>);
impl<'a> Deref for PadSrcRef<'a> {
type Target = PadSrcInner;
impl PadSrcStrong {
fn new(gst_pad: gst::Pad) -> Self {
PadSrcStrong(Arc::new(PadSrcInner::new(gst_pad)))
}
#[inline]
fn gst_pad(&self) -> &gst::Pad {
&self.0.gst_pad
}
//#[inline]
//fn spawn<Fut>(&self, future: Fut) -> JoinHandle<Fut::Output>
//where
// Fut: Future + Send + 'static,
// Fut::Output: Send + 'static,
//{
// let pad_ctx = self.pad_context_priv();
// pad_ctx
// .as_ref()
// .expect("PadContext not initialized")
// .spawn(future)
//}
#[inline]
fn downgrade(&self) -> PadSrcWeak {
PadSrcWeak(Arc::downgrade(&self.0))
}
#[inline]
async fn push(&self, buffer: gst::Buffer) -> Result<FlowSuccess, FlowError> {
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", buffer);
let success = self.gst_pad().push(buffer).map_err(|err| {
gst_error!(RUNTIME_CAT,
obj: self.gst_pad(),
"Failed to push Buffer to PadSrc: {:?}",
err,
);
err
})?;
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Processing any pending sub tasks");
while Context::current_has_sub_tasks() {
Context::drain_sub_tasks().await?;
}
Ok(success)
}
#[inline]
async fn push_list(&self, list: gst::BufferList) -> Result<FlowSuccess, FlowError> {
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", list);
let success = self.gst_pad().push_list(list).map_err(|err| {
gst_error!(
RUNTIME_CAT,
obj: self.gst_pad(),
"Failed to push BufferList to PadSrc: {:?}",
err,
);
err
})?;
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Processing any pending sub tasks");
while Context::current_has_sub_tasks() {
Context::drain_sub_tasks().await?;
}
Ok(success)
}
#[inline]
async fn push_event(&self, event: gst::Event) -> bool {
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", event);
let was_handled = self.gst_pad().push_event(event);
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Processing any pending sub tasks");
while Context::current_has_sub_tasks() {
if Context::drain_sub_tasks().await.is_err() {
return false;
}
}
was_handled
fn deref(&self) -> &Self::Target {
&self.strong
}
}
@ -417,36 +362,33 @@ impl PadSrcStrong {
/// [`downgrade`]: struct.PadSrc.html#method.downgrade
/// [`pad` module]: index.html
#[derive(Debug)]
pub struct PadSrc(PadSrcStrong);
pub struct PadSrc(Arc<PadSrcInner>);
impl PadSrc {
pub fn new(gst_pad: gst::Pad, handler: impl PadSrcHandler) -> Self {
let this = PadSrc(PadSrcStrong::new(gst_pad));
let this = PadSrc(Arc::new(PadSrcInner::new(gst_pad)));
this.init_pad_functions(handler);
this
}
pub fn as_ref(&self) -> PadSrcRef<'_> {
PadSrcRef::new(Arc::clone(&(self.0).0))
}
pub fn gst_pad(&self) -> &gst::Pad {
self.0.gst_pad()
}
pub fn downgrade(&self) -> PadSrcWeak {
self.0.downgrade()
PadSrcWeak(Arc::downgrade(&self.0))
}
pub fn as_ref(&self) -> PadSrcRef<'_> {
PadSrcRef::new(Arc::clone(&self.0))
}
pub fn check_reconfigure(&self) -> bool {
self.gst_pad().check_reconfigure()
self.0.gst_pad().check_reconfigure()
}
fn init_pad_functions<H: PadSrcHandler>(&self, handler: H) {
let handler_clone = handler.clone();
let inner_arc = Arc::clone(&(self.0).0);
self.gst_pad()
let inner_arc = Arc::clone(&self.0);
self.0
.gst_pad()
.set_activate_function(move |gst_pad, parent| {
let handler = handler_clone.clone();
let inner_arc = inner_arc.clone();
@ -464,7 +406,7 @@ impl PadSrc {
});
let handler_clone = handler.clone();
let inner_arc = Arc::clone(&(self.0).0);
let inner_arc = Arc::clone(&self.0);
self.gst_pad()
.set_activatemode_function(move |gst_pad, parent, mode, active| {
let handler = handler_clone.clone();
@ -489,7 +431,7 @@ impl PadSrc {
// No need to `set_event_function` since `set_event_full_function`
// overrides it and dispatches to `src_event` when necessary
let handler_clone = handler.clone();
let inner_arc = Arc::clone(&(self.0).0);
let inner_arc = Arc::clone(&self.0);
self.gst_pad()
.set_event_full_function(move |_gst_pad, parent, event| {
let handler = handler_clone.clone();
@ -504,7 +446,7 @@ impl PadSrc {
)
});
let inner_arc = Arc::clone(&(self.0).0);
let inner_arc = Arc::clone(&self.0);
self.gst_pad()
.set_query_function(move |_gst_pad, parent, query| {
let handler = handler.clone();
@ -524,18 +466,6 @@ impl PadSrc {
)
});
}
pub async fn push(&self, buffer: gst::Buffer) -> Result<FlowSuccess, FlowError> {
self.0.push(buffer).await
}
pub async fn push_list(&self, list: gst::BufferList) -> Result<FlowSuccess, FlowError> {
self.0.push_list(list).await
}
pub async fn push_event(&self, event: gst::Event) -> bool {
self.0.push_event(event).await
}
}
impl Drop for PadSrc {
@ -557,6 +487,14 @@ impl Drop for PadSrc {
}
}
impl Deref for PadSrc {
type Target = PadSrcInner;
fn deref(&self) -> &Self::Target {
&self.0
}
}
/// A trait to define `handler`s for [`PadSink`] callbacks.
///
/// *See the [`pad` module] documentation for a description of the model.*
@ -710,7 +648,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
}
#[derive(Debug)]
struct PadSinkInner {
pub struct PadSinkInner {
gst_pad: gst::Pad,
}
@ -722,6 +660,10 @@ impl PadSinkInner {
PadSinkInner { gst_pad }
}
pub fn gst_pad(&self) -> &gst::Pad {
&self.gst_pad
}
}
/// A [`PadSink`] which can be moved in `Handler`s functions and `Future`s.
@ -754,24 +696,20 @@ impl PadSinkWeak {
/// [`pad` module]: index.html
#[derive(Debug)]
pub struct PadSinkRef<'a> {
strong: PadSinkStrong,
phantom: PhantomData<&'a PadSrcStrong>,
strong: Arc<PadSinkInner>,
phantom: PhantomData<&'a Self>,
}
impl<'a> PadSinkRef<'a> {
fn new(inner_arc: Arc<PadSinkInner>) -> Self {
PadSinkRef {
strong: PadSinkStrong(inner_arc),
strong: inner_arc,
phantom: PhantomData,
}
}
pub fn gst_pad(&self) -> &gst::Pad {
self.strong.gst_pad()
}
pub fn downgrade(&self) -> PadSinkWeak {
self.strong.downgrade()
PadSinkWeak(Arc::downgrade(&self.strong))
}
fn activate_mode_hook(
@ -798,20 +736,7 @@ impl<'a> PadSinkRef<'a> {
&self,
fut: impl Future<Output = Result<FlowSuccess, FlowError>> + Send + 'static,
) -> Result<FlowSuccess, FlowError> {
// First try to add it as a sub task to the current task, if any
if let Err(fut) = Context::add_sub_task(fut.map(|res| res.map(drop))) {
// FIXME: update comments below
// Not on a context thread: execute the Future immediately.
//
// - If there is no PadContext, we don't have any other options.
// - If there is a PadContext, it means that we received it from
// an upstream element, but there is at least one non-ts element
// operating on another thread in between, so we can't take
// advantage of the task queue.
//
// Note: we don't use `crate::runtime::executor::block_on` here
// because `Context::is_context_thread()` is checked in the `if`
// statement above.
block_on_or_add_sub_task(fut.map(|res| res.map(|_| gst::FlowSuccess::Ok)))
.unwrap_or(Ok(gst::FlowSuccess::Ok))
} else {
@ -820,20 +745,11 @@ impl<'a> PadSinkRef<'a> {
}
}
#[derive(Debug)]
struct PadSinkStrong(Arc<PadSinkInner>);
impl<'a> Deref for PadSinkRef<'a> {
type Target = PadSinkInner;
impl PadSinkStrong {
fn new(gst_pad: gst::Pad) -> Self {
PadSinkStrong(Arc::new(PadSinkInner::new(gst_pad)))
}
fn gst_pad(&self) -> &gst::Pad {
&self.0.gst_pad
}
fn downgrade(&self) -> PadSinkWeak {
PadSinkWeak(Arc::downgrade(&self.0))
fn deref(&self) -> &Self::Target {
&self.strong
}
}
@ -846,31 +762,27 @@ impl PadSinkStrong {
/// [`downgrade`]: struct.PadSink.html#method.downgrade
/// [`pad` module]: index.html
#[derive(Debug)]
pub struct PadSink(PadSinkStrong);
pub struct PadSink(Arc<PadSinkInner>);
impl PadSink {
pub fn new(gst_pad: gst::Pad, handler: impl PadSinkHandler) -> Self {
let this = PadSink(PadSinkStrong::new(gst_pad));
let this = PadSink(Arc::new(PadSinkInner::new(gst_pad)));
this.init_pad_functions(handler);
this
}
pub fn as_ref(&self) -> PadSinkRef<'_> {
PadSinkRef::new(Arc::clone(&(self.0).0))
}
pub fn gst_pad(&self) -> &gst::Pad {
self.0.gst_pad()
}
pub fn downgrade(&self) -> PadSinkWeak {
self.0.downgrade()
PadSinkWeak(Arc::downgrade(&self.0))
}
pub fn as_ref(&self) -> PadSinkRef<'_> {
PadSinkRef::new(Arc::clone(&self.0))
}
fn init_pad_functions<H: PadSinkHandler>(&self, handler: H) {
let handler_clone = handler.clone();
let inner_arc = Arc::clone(&(self.0).0);
let inner_arc = Arc::clone(&self.0);
self.gst_pad()
.set_activate_function(move |gst_pad, parent| {
let handler = handler_clone.clone();
@ -892,7 +804,7 @@ impl PadSink {
});
let handler_clone = handler.clone();
let inner_arc = Arc::clone(&(self.0).0);
let inner_arc = Arc::clone(&self.0);
self.gst_pad()
.set_activatemode_function(move |gst_pad, parent, mode, active| {
let handler = handler_clone.clone();
@ -916,7 +828,7 @@ impl PadSink {
});
let handler_clone = handler.clone();
let inner_arc = Arc::clone(&(self.0).0);
let inner_arc = Arc::clone(&self.0);
self.gst_pad()
.set_chain_function(move |_gst_pad, parent, buffer| {
let handler = handler_clone.clone();
@ -949,7 +861,7 @@ impl PadSink {
});
let handler_clone = handler.clone();
let inner_arc = Arc::clone(&(self.0).0);
let inner_arc = Arc::clone(&self.0);
self.gst_pad()
.set_chain_list_function(move |_gst_pad, parent, list| {
let handler = handler_clone.clone();
@ -987,7 +899,7 @@ impl PadSink {
// No need to `set_event_function` since `set_event_full_function`
// overrides it and dispatches to `sink_event` when necessary
let handler_clone = handler.clone();
let inner_arc = Arc::clone(&(self.0).0);
let inner_arc = Arc::clone(&self.0);
self.gst_pad()
.set_event_full_function(move |_gst_pad, parent, event| {
let handler = handler_clone.clone();
@ -1028,7 +940,7 @@ impl PadSink {
)
});
let inner_arc = Arc::clone(&(self.0).0);
let inner_arc = Arc::clone(&self.0);
self.gst_pad()
.set_query_function(move |_gst_pad, parent, query| {
let handler = handler.clone();
@ -1072,3 +984,11 @@ impl Drop for PadSink {
.set_query_function(move |_gst_pad, _parent, _query| false);
}
}
impl Deref for PadSink {
type Target = PadSinkInner;
fn deref(&self) -> &Self::Target {
&self.0
}
}