ts/pad: use gst::Pad in handlers trait functions...

... instead of the `Pad{Src,Sink}Ref` wrappers:

- In practice, only the `gst::Pad` is useful in these functions.
  Some of these which need a `Pad{Src,Sink}Ref`, but it's the one
  for the opposite stream direction. In those cases, it is accessed
  via the element's implementation.
- It saves a few `clone`s.
- The implementations usually use the `gst::Pad` for logging.
  They no longer need to access it via `pad.gst_pad()`.
This commit is contained in:
François Laignel 2022-10-24 13:15:13 +02:00
parent 554ce7e7d6
commit 5ca033049e
11 changed files with 333 additions and 364 deletions

View file

@ -19,7 +19,7 @@ use gst::EventView;
use once_cell::sync::Lazy;
use gstthreadshare::runtime::prelude::*;
use gstthreadshare::runtime::{Context, PadSink, PadSinkRef, PadSinkWeak, Task};
use gstthreadshare::runtime::{Context, PadSink, Task};
use std::sync::Mutex;
use std::task::Poll;
@ -77,7 +77,7 @@ impl PadSinkHandler for TestSinkPadHandler {
fn sink_chain(
self,
_pad: PadSinkWeak,
_pad: gst::Pad,
elem: super::TestSink,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
@ -95,7 +95,7 @@ impl PadSinkHandler for TestSinkPadHandler {
fn sink_chain_list(
self,
_pad: PadSinkWeak,
_pad: gst::Pad,
elem: super::TestSink,
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
@ -115,7 +115,7 @@ impl PadSinkHandler for TestSinkPadHandler {
fn sink_event_serialized(
self,
_pad: PadSinkWeak,
_pad: gst::Pad,
elem: super::TestSink,
event: gst::Event,
) -> BoxFuture<'static, bool> {
@ -133,7 +133,7 @@ impl PadSinkHandler for TestSinkPadHandler {
.boxed()
}
fn sink_event(self, _pad: &PadSinkRef, imp: &TestSink, event: gst::Event) -> bool {
fn sink_event(self, _pad: &gst::Pad, imp: &TestSink, event: gst::Event) -> bool {
if let EventView::FlushStart(..) = event.view() {
return imp.task.flush_start().await_maybe_on_context().is_ok();
}

View file

@ -33,7 +33,7 @@ use std::time::Duration;
use std::u32;
use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSrc, PadSrcRef, Task, TaskState};
use crate::runtime::{Context, PadSrc, Task, TaskState};
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO;
@ -82,8 +82,8 @@ struct AppSrcPadHandler;
impl PadSrcHandler for AppSrcPadHandler {
type ElementImpl = AppSrc;
fn src_event(self, pad: &PadSrcRef, imp: &AppSrc, event: gst::Event) -> bool {
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
fn src_event(self, pad: &gst::Pad, imp: &AppSrc, event: gst::Event) -> bool {
gst::log!(CAT, obj: pad, "Handling {:?}", event);
use gst::EventView;
let ret = match event.view() {
@ -95,16 +95,16 @@ impl PadSrcHandler for AppSrcPadHandler {
};
if ret {
gst::log!(CAT, obj: pad.gst_pad(), "Handled {:?}", event);
gst::log!(CAT, obj: pad, "Handled {:?}", event);
} else {
gst::log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event);
gst::log!(CAT, obj: pad, "Didn't handle {:?}", event);
}
ret
}
fn src_query(self, pad: &PadSrcRef, imp: &AppSrc, query: &mut gst::QueryRef) -> bool {
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
fn src_query(self, pad: &gst::Pad, imp: &AppSrc, query: &mut gst::QueryRef) -> bool {
gst::log!(CAT, obj: pad, "Handling {:?}", query);
use gst::QueryViewMut;
let ret = match query.view_mut() {
@ -136,9 +136,9 @@ impl PadSrcHandler for AppSrcPadHandler {
};
if ret {
gst::log!(CAT, obj: pad.gst_pad(), "Handled {:?}", query);
gst::log!(CAT, obj: pad, "Handled {:?}", query);
} else {
gst::log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", query);
gst::log!(CAT, obj: pad, "Didn't handle {:?}", query);
}
ret
}

View file

@ -33,7 +33,7 @@ use std::time::Duration;
use std::u32;
use crate::runtime::prelude::*;
use crate::runtime::{self, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef};
use crate::runtime::{self, PadSink, PadSrc};
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO;
@ -88,7 +88,7 @@ impl InputSelectorPadSinkHandler {
async fn handle_item(
&self,
pad: &PadSinkRef<'_>,
pad: &gst::Pad,
elem: &super::InputSelector,
mut buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
@ -111,9 +111,9 @@ impl InputSelectorPadSinkHandler {
}
let is_active = {
if state.active_sinkpad.as_ref() == Some(pad.gst_pad()) {
if state.active_sinkpad.as_ref() == Some(pad) {
if inner.send_sticky || state.switched_pad {
pad.gst_pad().sticky_events_foreach(|event| {
pad.sticky_events_foreach(|event| {
use std::ops::ControlFlow;
stickies.push(event.clone());
ControlFlow::Continue(gst::EventForeachAction::Keep)
@ -140,7 +140,7 @@ impl InputSelectorPadSinkHandler {
}
if is_active {
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", buffer);
gst::log!(CAT, obj: pad, "Forwarding {:?}", buffer);
if switched_pad && !buffer.flags().contains(gst::BufferFlags::DISCONT) {
let buffer = buffer.make_mut();
@ -159,26 +159,21 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
fn sink_chain(
self,
pad: PadSinkWeak,
pad: gst::Pad,
elem: super::InputSelector,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move {
let pad = pad.upgrade().expect("PadSink no longer exists");
self.handle_item(&pad, &elem, buffer).await
}
.boxed()
async move { self.handle_item(&pad, &elem, buffer).await }.boxed()
}
fn sink_chain_list(
self,
pad: PadSinkWeak,
pad: gst::Pad,
elem: super::InputSelector,
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move {
let pad = pad.upgrade().expect("PadSink no longer exists");
gst::log!(CAT, obj: pad.gst_pad(), "Handling buffer list {:?}", list);
gst::log!(CAT, obj: pad, "Handling buffer list {:?}", list);
// TODO: Ideally we would keep the list intact and forward it in one go
for buffer in list.iter_owned() {
self.handle_item(&pad, &elem, buffer).await?;
@ -191,7 +186,7 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
fn sink_event_serialized(
self,
_pad: PadSinkWeak,
_pad: gst::Pad,
_elem: super::InputSelector,
event: gst::Event,
) -> BoxFuture<'static, bool> {
@ -219,7 +214,7 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
.boxed()
}
fn sink_event(self, _pad: &PadSinkRef, imp: &InputSelector, event: gst::Event) -> bool {
fn sink_event(self, _pad: &gst::Pad, imp: &InputSelector, event: gst::Event) -> bool {
/* Drop all events for now */
if let gst::EventView::FlushStart(..) = event.view() {
/* Unblock downstream */
@ -234,15 +229,15 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
true
}
fn sink_query(self, pad: &PadSinkRef, imp: &InputSelector, query: &mut gst::QueryRef) -> bool {
gst::log!(CAT, obj: pad.gst_pad(), "Handling query {:?}", query);
fn sink_query(self, pad: &gst::Pad, imp: &InputSelector, query: &mut gst::QueryRef) -> bool {
gst::log!(CAT, obj: pad, "Handling query {:?}", query);
if query.is_serialized() {
// FIXME: How can we do this (drops ALLOCATION and DRAIN)?
gst::log!(CAT, obj: pad.gst_pad(), "Dropping serialized query {:?}", query);
gst::log!(CAT, obj: pad, "Dropping serialized query {:?}", query);
false
} else {
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding query {:?}", query);
gst::log!(CAT, obj: pad, "Forwarding query {:?}", query);
imp.src_pad.gst_pad().peer_query(query)
}
}
@ -254,8 +249,8 @@ struct InputSelectorPadSrcHandler;
impl PadSrcHandler for InputSelectorPadSrcHandler {
type ElementImpl = InputSelector;
fn src_query(self, pad: &PadSrcRef, imp: &InputSelector, query: &mut gst::QueryRef) -> bool {
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
fn src_query(self, pad: &gst::Pad, imp: &InputSelector, query: &mut gst::QueryRef) -> bool {
gst::log!(CAT, obj: pad, "Handling {:?}", query);
use gst::QueryViewMut;
match query.view_mut() {

View file

@ -36,7 +36,7 @@ use std::sync::Mutex as StdMutex;
use std::time::Duration;
use crate::runtime::prelude::*;
use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, Task};
use crate::runtime::{self, Context, PadSink, PadSrc, Task};
use super::jitterbuffer::{RTPJitterBuffer, RTPJitterBufferItem, RTPPacketRateCtx};
@ -488,7 +488,7 @@ impl SinkHandler {
fn enqueue_item(
&self,
pad: &gst::Pad,
pad: gst::Pad,
jb: &JitterBuffer,
buffer: Option<gst::Buffer>,
) -> Result<gst::FlowSuccess, gst::FlowError> {
@ -501,7 +501,7 @@ impl SinkHandler {
// This is to avoid recursion with `store`, `reset` and `enqueue_item`
while let Some(buf) = buffers.pop_front() {
if let Err(err) = self.store(&mut inner, pad, jb, buf) {
if let Err(err) = self.store(&mut inner, &pad, jb, buf) {
match err {
gst::FlowError::CustomError => {
for gap_packet in self.reset(&mut inner, jb) {
@ -550,26 +550,25 @@ impl PadSinkHandler for SinkHandler {
fn sink_chain(
self,
pad: PadSinkWeak,
pad: gst::Pad,
elem: super::JitterBuffer,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move {
let pad = pad.upgrade().expect("PadSink no longer exists");
gst::debug!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer);
self.enqueue_item(pad.gst_pad(), elem.imp(), Some(buffer))
gst::debug!(CAT, obj: pad, "Handling {:?}", buffer);
self.enqueue_item(pad, elem.imp(), Some(buffer))
}
.boxed()
}
fn sink_event(self, pad: &PadSinkRef, jb: &JitterBuffer, event: gst::Event) -> bool {
fn sink_event(self, pad: &gst::Pad, jb: &JitterBuffer, event: gst::Event) -> bool {
use gst::EventView;
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
gst::log!(CAT, obj: pad, "Handling {:?}", event);
if let EventView::FlushStart(..) = event.view() {
if let Err(err) = jb.task.flush_start().await_maybe_on_context() {
gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
gst::error!(CAT, obj: pad, "FlushStart failed {:?}", err);
gst::element_imp_error!(
jb,
gst::StreamError::Failed,
@ -580,20 +579,18 @@ impl PadSinkHandler for SinkHandler {
}
}
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
gst::log!(CAT, obj: pad, "Forwarding {:?}", event);
jb.src_pad.gst_pad().push_event(event)
}
fn sink_event_serialized(
self,
pad: PadSinkWeak,
pad: gst::Pad,
elem: super::JitterBuffer,
event: gst::Event,
) -> BoxFuture<'static, bool> {
async move {
let pad = pad.upgrade().expect("PadSink no longer exists");
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
gst::log!(CAT, obj: pad, "Handling {:?}", event);
let jb = elem.imp();
@ -606,7 +603,7 @@ impl PadSinkHandler for SinkHandler {
}
EventView::FlushStop(..) => {
if let Err(err) = jb.task.flush_stop().await_maybe_on_context() {
gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
gst::error!(CAT, obj: pad, "FlushStop failed {:?}", err);
gst::element_error!(
elem,
gst::StreamError::Failed,
@ -629,7 +626,7 @@ impl PadSinkHandler for SinkHandler {
if forward {
// FIXME: These events should really be queued up and stay in order
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding serialized {:?}", event);
gst::log!(CAT, obj: pad, "Forwarding serialized {:?}", event);
jb.src_pad.push_event(event).await
} else {
true
@ -870,15 +867,15 @@ impl SrcHandler {
impl PadSrcHandler for SrcHandler {
type ElementImpl = JitterBuffer;
fn src_event(self, pad: &PadSrcRef, jb: &JitterBuffer, event: gst::Event) -> bool {
fn src_event(self, pad: &gst::Pad, jb: &JitterBuffer, event: gst::Event) -> bool {
use gst::EventView;
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
gst::log!(CAT, obj: pad, "Handling {:?}", event);
match event.view() {
EventView::FlushStart(..) => {
if let Err(err) = jb.task.flush_start().await_maybe_on_context() {
gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
gst::error!(CAT, obj: pad, "FlushStart failed {:?}", err);
gst::element_imp_error!(
jb,
gst::StreamError::Failed,
@ -890,7 +887,7 @@ impl PadSrcHandler for SrcHandler {
}
EventView::FlushStop(..) => {
if let Err(err) = jb.task.flush_stop().await_maybe_on_context() {
gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
gst::error!(CAT, obj: pad, "FlushStop failed {:?}", err);
gst::element_imp_error!(
jb,
gst::StreamError::Failed,
@ -903,14 +900,14 @@ impl PadSrcHandler for SrcHandler {
_ => (),
}
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
gst::log!(CAT, obj: pad, "Forwarding {:?}", event);
jb.sink_pad.gst_pad().push_event(event)
}
fn src_query(self, pad: &PadSrcRef, jb: &JitterBuffer, query: &mut gst::QueryRef) -> bool {
fn src_query(self, pad: &gst::Pad, jb: &JitterBuffer, query: &mut gst::QueryRef) -> bool {
use gst::QueryViewMut;
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query);
gst::log!(CAT, obj: pad, "Forwarding {:?}", query);
match query.view_mut() {
QueryViewMut::Latency(q) => {

View file

@ -34,9 +34,7 @@ use std::time::Duration;
use std::{u32, u64};
use crate::runtime::prelude::*;
use crate::runtime::{
Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, PadSrcWeak, Task,
};
use crate::runtime::{Context, PadSink, PadSinkWeak, PadSrc, PadSrcWeak, Task};
use crate::dataqueue::{DataQueue, DataQueueItem};
@ -215,13 +213,12 @@ impl PadSinkHandler for ProxySinkPadHandler {
fn sink_chain(
self,
pad: PadSinkWeak,
pad: gst::Pad,
elem: super::ProxySink,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move {
let pad = pad.upgrade().expect("PadSink no longer exists");
gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", buffer);
gst::log!(SINK_CAT, obj: pad, "Handling {:?}", buffer);
let imp = elem.imp();
imp.enqueue_item(DataQueueItem::Buffer(buffer)).await
}
@ -230,21 +227,20 @@ impl PadSinkHandler for ProxySinkPadHandler {
fn sink_chain_list(
self,
pad: PadSinkWeak,
pad: gst::Pad,
elem: super::ProxySink,
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move {
let pad = pad.upgrade().expect("PadSink no longer exists");
gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", list);
gst::log!(SINK_CAT, obj: pad, "Handling {:?}", list);
let imp = elem.imp();
imp.enqueue_item(DataQueueItem::BufferList(list)).await
}
.boxed()
}
fn sink_event(self, pad: &PadSinkRef, imp: &ProxySink, event: gst::Event) -> bool {
gst::debug!(SINK_CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event);
fn sink_event(self, pad: &gst::Pad, imp: &ProxySink, event: gst::Event) -> bool {
gst::debug!(SINK_CAT, obj: pad, "Handling non-serialized {:?}", event);
let src_pad = {
let proxy_ctx = imp.proxy_ctx.lock().unwrap();
@ -262,23 +258,27 @@ impl PadSinkHandler for ProxySinkPadHandler {
}
if let Some(src_pad) = src_pad {
gst::log!(SINK_CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event);
gst::log!(SINK_CAT, obj: pad, "Forwarding non-serialized {:?}", event);
src_pad.push_event(event)
} else {
gst::error!(SINK_CAT, obj: pad.gst_pad(), "No src pad to forward non-serialized {:?} to", event);
gst::error!(
SINK_CAT,
obj: pad,
"No src pad to forward non-serialized {:?} to",
event
);
true
}
}
fn sink_event_serialized(
self,
pad: PadSinkWeak,
pad: gst::Pad,
elem: super::ProxySink,
event: gst::Event,
) -> BoxFuture<'static, bool> {
async move {
let pad = pad.upgrade().expect("PadSink no longer exists");
gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event);
gst::log!(SINK_CAT, obj: pad, "Handling serialized {:?}", event);
let imp = elem.imp();
@ -291,7 +291,7 @@ impl PadSinkHandler for ProxySinkPadHandler {
_ => (),
}
gst::log!(SINK_CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event);
gst::log!(SINK_CAT, obj: pad, "Queuing serialized {:?}", event);
imp.enqueue_item(DataQueueItem::Event(event)).await.is_ok()
}
.boxed()
@ -666,8 +666,8 @@ struct ProxySrcPadHandler;
impl PadSrcHandler for ProxySrcPadHandler {
type ElementImpl = ProxySrc;
fn src_event(self, pad: &PadSrcRef, imp: &ProxySrc, event: gst::Event) -> bool {
gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
fn src_event(self, pad: &gst::Pad, imp: &ProxySrc, event: gst::Event) -> bool {
gst::log!(SRC_CAT, obj: pad, "Handling {:?}", event);
let sink_pad = {
let proxy_ctx = imp.proxy_ctx.lock().unwrap();
@ -684,7 +684,7 @@ impl PadSrcHandler for ProxySrcPadHandler {
match event.view() {
EventView::FlushStart(..) => {
if let Err(err) = imp.task.flush_start().await_maybe_on_context() {
gst::error!(SRC_CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
gst::error!(SRC_CAT, obj: pad, "FlushStart failed {:?}", err);
gst::element_imp_error!(
imp,
gst::StreamError::Failed,
@ -696,7 +696,7 @@ impl PadSrcHandler for ProxySrcPadHandler {
}
EventView::FlushStop(..) => {
if let Err(err) = imp.task.flush_stop().await_maybe_on_context() {
gst::error!(SRC_CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
gst::error!(SRC_CAT, obj: pad, "FlushStop failed {:?}", err);
gst::element_imp_error!(
imp,
gst::StreamError::Failed,
@ -710,16 +710,16 @@ impl PadSrcHandler for ProxySrcPadHandler {
}
if let Some(sink_pad) = sink_pad {
gst::log!(SRC_CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
gst::log!(SRC_CAT, obj: pad, "Forwarding {:?}", event);
sink_pad.push_event(event)
} else {
gst::error!(SRC_CAT, obj: pad.gst_pad(), "No sink pad to forward {:?} to", event);
gst::error!(SRC_CAT, obj: pad, "No sink pad to forward {:?} to", event);
false
}
}
fn src_query(self, pad: &PadSrcRef, _proxysrc: &ProxySrc, query: &mut gst::QueryRef) -> bool {
gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", query);
fn src_query(self, pad: &gst::Pad, _proxysrc: &ProxySrc, query: &mut gst::QueryRef) -> bool {
gst::log!(SRC_CAT, obj: pad, "Handling {:?}", query);
use gst::QueryViewMut;
let ret = match query.view_mut() {
@ -733,7 +733,7 @@ impl PadSrcHandler for ProxySrcPadHandler {
true
}
QueryViewMut::Caps(q) => {
let caps = if let Some(ref caps) = pad.gst_pad().current_caps() {
let caps = if let Some(ref caps) = pad.current_caps() {
q.filter()
.map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First))
.unwrap_or_else(|| caps.clone())
@ -751,9 +751,9 @@ impl PadSrcHandler for ProxySrcPadHandler {
};
if ret {
gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handled {:?}", query);
gst::log!(SRC_CAT, obj: pad, "Handled {:?}", query);
} else {
gst::log!(SRC_CAT, obj: pad.gst_pad(), "Didn't handle {:?}", query);
gst::log!(SRC_CAT, obj: pad, "Didn't handle {:?}", query);
}
ret

View file

@ -33,7 +33,7 @@ use std::time::Duration;
use std::{u32, u64};
use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, Task};
use crate::runtime::{Context, PadSink, PadSrc, Task};
use crate::dataqueue::{DataQueue, DataQueueItem};
@ -85,13 +85,12 @@ impl PadSinkHandler for QueuePadSinkHandler {
fn sink_chain(
self,
pad: PadSinkWeak,
pad: gst::Pad,
elem: super::Queue,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move {
let pad = pad.upgrade().expect("PadSink no longer exists");
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer);
gst::log!(CAT, obj: pad, "Handling {:?}", buffer);
let imp = elem.imp();
imp.enqueue_item(DataQueueItem::Buffer(buffer)).await
}
@ -100,25 +99,24 @@ impl PadSinkHandler for QueuePadSinkHandler {
fn sink_chain_list(
self,
pad: PadSinkWeak,
pad: gst::Pad,
elem: super::Queue,
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move {
let pad = pad.upgrade().expect("PadSink no longer exists");
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", list);
gst::log!(CAT, obj: pad, "Handling {:?}", list);
let imp = elem.imp();
imp.enqueue_item(DataQueueItem::BufferList(list)).await
}
.boxed()
}
fn sink_event(self, pad: &PadSinkRef, imp: &Queue, event: gst::Event) -> bool {
gst::debug!(CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event);
fn sink_event(self, pad: &gst::Pad, imp: &Queue, event: gst::Event) -> bool {
gst::debug!(CAT, obj: pad, "Handling non-serialized {:?}", event);
if let gst::EventView::FlushStart(..) = event.view() {
if let Err(err) = imp.task.flush_start().await_maybe_on_context() {
gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
gst::error!(CAT, obj: pad, "FlushStart failed {:?}", err);
gst::element_imp_error!(
imp,
gst::StreamError::Failed,
@ -129,25 +127,24 @@ impl PadSinkHandler for QueuePadSinkHandler {
}
}
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event);
gst::log!(CAT, obj: pad, "Forwarding non-serialized {:?}", event);
imp.src_pad.gst_pad().push_event(event)
}
fn sink_event_serialized(
self,
pad: PadSinkWeak,
pad: gst::Pad,
elem: super::Queue,
event: gst::Event,
) -> BoxFuture<'static, bool> {
async move {
let pad = pad.upgrade().expect("PadSink no longer exists");
gst::log!(CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event);
gst::log!(CAT, obj: pad, "Handling serialized {:?}", event);
let imp = elem.imp();
if let gst::EventView::FlushStop(..) = event.view() {
if let Err(err) = imp.task.flush_stop().await_maybe_on_context() {
gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
gst::error!(CAT, obj: pad, "FlushStop failed {:?}", err);
gst::element_imp_error!(
imp,
gst::StreamError::Failed,
@ -158,21 +155,21 @@ impl PadSinkHandler for QueuePadSinkHandler {
}
}
gst::log!(CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event);
gst::log!(CAT, obj: pad, "Queuing serialized {:?}", event);
imp.enqueue_item(DataQueueItem::Event(event)).await.is_ok()
}
.boxed()
}
fn sink_query(self, pad: &PadSinkRef, imp: &Queue, query: &mut gst::QueryRef) -> bool {
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
fn sink_query(self, pad: &gst::Pad, imp: &Queue, query: &mut gst::QueryRef) -> bool {
gst::log!(CAT, obj: pad, "Handling {:?}", query);
if query.is_serialized() {
// FIXME: How can we do this?
gst::log!(CAT, obj: pad.gst_pad(), "Dropping serialized {:?}", query);
gst::log!(CAT, obj: pad, "Dropping serialized {:?}", query);
false
} else {
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query);
gst::log!(CAT, obj: pad, "Forwarding {:?}", query);
imp.src_pad.gst_pad().peer_query(query)
}
}
@ -184,19 +181,19 @@ struct QueuePadSrcHandler;
impl PadSrcHandler for QueuePadSrcHandler {
type ElementImpl = Queue;
fn src_event(self, pad: &PadSrcRef, imp: &Queue, event: gst::Event) -> bool {
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
fn src_event(self, pad: &gst::Pad, imp: &Queue, event: gst::Event) -> bool {
gst::log!(CAT, obj: pad, "Handling {:?}", event);
use gst::EventView;
match event.view() {
EventView::FlushStart(..) => {
if let Err(err) = imp.task.flush_start().await_maybe_on_context() {
gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
gst::error!(CAT, obj: pad, "FlushStart failed {:?}", err);
}
}
EventView::FlushStop(..) => {
if let Err(err) = imp.task.flush_stop().await_maybe_on_context() {
gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
gst::error!(CAT, obj: pad, "FlushStop failed {:?}", err);
gst::element_imp_error!(
imp,
gst::StreamError::Failed,
@ -209,12 +206,12 @@ impl PadSrcHandler for QueuePadSrcHandler {
_ => (),
}
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
gst::log!(CAT, obj: pad, "Forwarding {:?}", event);
imp.sink_pad.gst_pad().push_event(event)
}
fn src_query(self, pad: &PadSrcRef, imp: &Queue, query: &mut gst::QueryRef) -> bool {
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
fn src_query(self, pad: &gst::Pad, imp: &Queue, query: &mut gst::QueryRef) -> bool {
gst::log!(CAT, obj: pad, "Handling {:?}", query);
if let gst::QueryViewMut::Scheduling(q) = query.view_mut() {
let mut new_query = gst::query::Scheduling::new();
@ -223,7 +220,7 @@ impl PadSrcHandler for QueuePadSrcHandler {
return res;
}
gst::log!(CAT, obj: pad.gst_pad(), "Upstream returned {:?}", new_query);
gst::log!(CAT, obj: pad, "Upstream returned {:?}", new_query);
let (flags, min, max, align) = new_query.result();
q.set(flags, min, max, align);
@ -235,11 +232,11 @@ impl PadSrcHandler for QueuePadSrcHandler {
.filter(|m| m != &gst::PadMode::Pull)
.collect::<Vec<_>>(),
);
gst::log!(CAT, obj: pad.gst_pad(), "Returning {:?}", q.query_mut());
gst::log!(CAT, obj: pad, "Returning {:?}", q.query_mut());
return true;
}
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query);
gst::log!(CAT, obj: pad, "Forwarding {:?}", query);
imp.sink_pad.gst_pad().peer_query(query)
}
}

View file

@ -123,36 +123,28 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static {
fn src_activate(
self,
pad: &PadSrcRef,
pad: &gst::Pad,
_imp: &Self::ElementImpl,
) -> Result<(), gst::LoggableError> {
let gst_pad = pad.gst_pad();
if gst_pad.is_active() {
if pad.is_active() {
gst::debug!(
RUNTIME_CAT,
obj: gst_pad,
obj: pad,
"Already activated in {:?} mode ",
gst_pad.mode()
pad.mode()
);
return Ok(());
}
gst_pad
.activate_mode(gst::PadMode::Push, true)
.map_err(|err| {
gst::error!(
RUNTIME_CAT,
obj: gst_pad,
"Error in PadSrc activate: {:?}",
err
);
gst::loggable_error!(RUNTIME_CAT, "Error in PadSrc activate: {:?}", err)
})
pad.activate_mode(gst::PadMode::Push, true).map_err(|err| {
gst::error!(RUNTIME_CAT, obj: pad, "Error in PadSrc activate: {:?}", err);
gst::loggable_error!(RUNTIME_CAT, "Error in PadSrc activate: {:?}", err)
})
}
fn src_activatemode(
self,
_pad: &PadSrcRef,
_pad: &gst::Pad,
_imp: &Self::ElementImpl,
_mode: gst::PadMode,
_active: bool,
@ -160,8 +152,8 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static {
Ok(())
}
fn src_event(self, pad: &PadSrcRef, imp: &Self::ElementImpl, event: gst::Event) -> bool {
gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
fn src_event(self, pad: &gst::Pad, imp: &Self::ElementImpl, event: gst::Event) -> bool {
gst::log!(RUNTIME_CAT, obj: pad, "Handling {:?}", event);
let elem = imp.obj();
// FIXME with GAT on `Self::ElementImpl`, we should be able to
@ -170,12 +162,12 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static {
// Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`.
let element = unsafe { elem.unsafe_cast_ref::<gst::Element>() };
gst::Pad::event_default(pad.gst_pad(), Some(element), event)
gst::Pad::event_default(pad, Some(element), event)
}
fn src_event_full(
self,
pad: &PadSrcRef,
pad: &gst::Pad,
imp: &Self::ElementImpl,
event: gst::Event,
) -> Result<FlowSuccess, FlowError> {
@ -185,19 +177,14 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static {
event_to_event_full(self.src_event(pad, imp, event), event_type)
}
fn src_query(
self,
pad: &PadSrcRef,
imp: &Self::ElementImpl,
query: &mut gst::QueryRef,
) -> bool {
gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query);
fn src_query(self, pad: &gst::Pad, imp: &Self::ElementImpl, query: &mut gst::QueryRef) -> bool {
gst::log!(RUNTIME_CAT, obj: pad, "Handling {:?}", query);
if query.is_serialized() {
// FIXME serialized queries should be handled with the dataflow
// but we can't return a `Future` because we couldn't honor QueryRef's lifetime
false
} else {
gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query);
gst::log!(RUNTIME_CAT, obj: pad, "Handling {:?}", query);
let elem = imp.obj();
// FIXME with GAT on `Self::ElementImpl`, we should be able to
@ -206,7 +193,7 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static {
// Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`.
let element = unsafe { elem.unsafe_cast_ref::<gst::Element>() };
gst::Pad::query_default(pad.gst_pad(), Some(element), query)
gst::Pad::query_default(pad, Some(element), query)
}
}
}
@ -230,11 +217,11 @@ impl PadSrcInner {
}
pub async fn push(&self, buffer: gst::Buffer) -> Result<FlowSuccess, FlowError> {
gst::log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", buffer);
gst::log!(RUNTIME_CAT, obj: self.gst_pad, "Pushing {:?}", buffer);
let success = self.gst_pad.push(buffer).map_err(|err| {
gst::error!(RUNTIME_CAT,
obj: self.gst_pad(),
obj: self.gst_pad,
"Failed to push Buffer to PadSrc: {:?}",
err,
);
@ -248,12 +235,12 @@ impl PadSrcInner {
}
pub async fn push_list(&self, list: gst::BufferList) -> Result<FlowSuccess, FlowError> {
gst::log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", list);
gst::log!(RUNTIME_CAT, obj: self.gst_pad, "Pushing {:?}", list);
let success = self.gst_pad.push_list(list).map_err(|err| {
gst::error!(
RUNTIME_CAT,
obj: self.gst_pad(),
obj: self.gst_pad,
"Failed to push BufferList to PadSrc: {:?}",
err,
);
@ -269,7 +256,7 @@ impl PadSrcInner {
pub async fn push_event(&self, event: gst::Event) -> bool {
gst::log!(RUNTIME_CAT, obj: self.gst_pad, "Pushing {:?}", event);
let was_handled = self.gst_pad().push_event(event);
let was_handled = self.gst_pad.push_event(event);
gst::log!(RUNTIME_CAT, obj: self.gst_pad, "Processing any pending sub tasks");
if Context::drain_sub_tasks().await.is_err() {
@ -326,26 +313,6 @@ impl<'a> PadSrcRef<'a> {
pub fn downgrade(&self) -> PadSrcWeak {
PadSrcWeak(Arc::downgrade(&self.strong))
}
fn activate_mode_hook(
&self,
mode: gst::PadMode,
active: bool,
) -> Result<(), gst::LoggableError> {
// Important: don't panic here as the hook is used without `catch_panic_pad_function`
// in the default `activatemode` handling
gst::log!(RUNTIME_CAT, obj: self.gst_pad(), "ActivateMode {:?}, {}", mode, active);
if mode == gst::PadMode::Pull {
gst::error!(RUNTIME_CAT, obj: self.gst_pad(), "Pull mode not supported by PadSrc");
return Err(gst::loggable_error!(
RUNTIME_CAT,
"Pull mode not supported by PadSrc"
));
}
Ok(())
}
}
impl<'a> Deref for PadSrcRef<'a> {
@ -384,19 +351,17 @@ impl PadSrc {
}
pub fn check_reconfigure(&self) -> bool {
self.0.gst_pad().check_reconfigure()
self.0.gst_pad.check_reconfigure()
}
fn init_pad_functions<H: PadSrcHandler>(&self, handler: H) {
// FIXME: Do this better
unsafe {
let handler_clone = handler.clone();
let inner_arc = Arc::clone(&self.0);
self.0
.gst_pad()
.gst_pad
.set_activate_function(move |gst_pad, parent| {
let handler = handler_clone.clone();
let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| {
@ -406,16 +371,15 @@ impl PadSrc {
"Panic in PadSrc activate"
))
},
move |imp| H::src_activate(handler, &PadSrcRef::new(inner_arc), imp),
move |imp| H::src_activate(handler, gst_pad, imp),
)
});
let handler_clone = handler.clone();
let inner_arc = Arc::clone(&self.0);
self.gst_pad()
self.0
.gst_pad
.set_activatemode_function(move |gst_pad, parent, mode, active| {
let handler = handler_clone.clone();
let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| {
@ -426,9 +390,27 @@ impl PadSrc {
))
},
move |imp| {
let this_ref = PadSrcRef::new(inner_arc);
this_ref.activate_mode_hook(mode, active)?;
H::src_activatemode(handler, &this_ref, imp, mode, active)
gst::log!(
RUNTIME_CAT,
obj: gst_pad,
"ActivateMode {:?}, {}",
mode,
active
);
if mode == gst::PadMode::Pull {
gst::error!(
RUNTIME_CAT,
obj: gst_pad,
"Pull mode not supported by PadSrc"
);
return Err(gst::loggable_error!(
RUNTIME_CAT,
"Pull mode not supported by PadSrc"
));
}
H::src_activatemode(handler, gst_pad, imp, mode, active)
},
)
});
@ -436,38 +418,38 @@ impl PadSrc {
// No need to `set_event_function` since `set_event_full_function`
// overrides it and dispatches to `src_event` when necessary
let handler_clone = handler.clone();
let inner_arc = Arc::clone(&self.0);
self.gst_pad()
.set_event_full_function(move |_gst_pad, parent, event| {
self.0
.gst_pad
.set_event_full_function(move |gst_pad, parent, event| {
let handler = handler_clone.clone();
let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| Err(FlowError::Error),
move |imp| {
H::src_event_full(handler, &PadSrcRef::new(inner_arc), imp, event)
},
move |imp| H::src_event_full(handler, gst_pad, imp, event),
)
});
let inner_arc = Arc::clone(&self.0);
self.gst_pad()
.set_query_function(move |_gst_pad, parent, query| {
let handler = handler.clone();
let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| false,
move |imp| {
if !query.is_serialized() {
H::src_query(handler, &PadSrcRef::new(inner_arc), imp, query)
} else {
gst::fixme!(RUNTIME_CAT, obj: inner_arc.gst_pad(), "Serialized Query not supported");
false
}
},
)
});
self.0
.gst_pad
.set_query_function(move |gst_pad, parent, query| {
let handler = handler.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| false,
move |imp| {
if !query.is_serialized() {
H::src_query(handler, gst_pad, imp, query)
} else {
gst::fixme!(
RUNTIME_CAT,
obj: gst_pad,
"Serialized Query not supported"
);
false
}
},
)
});
}
}
}
@ -476,19 +458,24 @@ impl Drop for PadSrc {
fn drop(&mut self) {
// FIXME: Do this better
unsafe {
self.gst_pad()
self.0
.gst_pad
.set_activate_function(move |_gst_pad, _parent| {
Err(gst::loggable_error!(RUNTIME_CAT, "PadSrc no longer exists"))
});
self.gst_pad()
self.0
.gst_pad
.set_activatemode_function(move |_gst_pad, _parent, _mode, _active| {
Err(gst::loggable_error!(RUNTIME_CAT, "PadSrc no longer exists"))
});
self.gst_pad()
self.0
.gst_pad
.set_event_function(move |_gst_pad, _parent, _event| false);
self.gst_pad()
self.0
.gst_pad
.set_event_full_function(move |_gst_pad, _parent, _event| Err(FlowError::Flushing));
self.gst_pad()
self.0
.gst_pad
.set_query_function(move |_gst_pad, _parent, _query| false);
}
}
@ -514,36 +501,33 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
fn sink_activate(
self,
pad: &PadSinkRef,
pad: &gst::Pad,
_imp: &Self::ElementImpl,
) -> Result<(), gst::LoggableError> {
let gst_pad = pad.gst_pad();
if gst_pad.is_active() {
if pad.is_active() {
gst::debug!(
RUNTIME_CAT,
obj: gst_pad,
obj: pad,
"Already activated in {:?} mode ",
gst_pad.mode()
pad.mode()
);
return Ok(());
}
gst_pad
.activate_mode(gst::PadMode::Push, true)
.map_err(|err| {
gst::error!(
RUNTIME_CAT,
obj: gst_pad,
"Error in PadSink activate: {:?}",
err
);
gst::loggable_error!(RUNTIME_CAT, "Error in PadSink activate: {:?}", err)
})
pad.activate_mode(gst::PadMode::Push, true).map_err(|err| {
gst::error!(
RUNTIME_CAT,
obj: pad,
"Error in PadSink activate: {:?}",
err
);
gst::loggable_error!(RUNTIME_CAT, "Error in PadSink activate: {:?}", err)
})
}
fn sink_activatemode(
self,
_pad: &PadSinkRef,
_pad: &gst::Pad,
_imp: &Self::ElementImpl,
_mode: gst::PadMode,
_active: bool,
@ -553,7 +537,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
fn sink_chain(
self,
_pad: PadSinkWeak,
_pad: gst::Pad,
_elem: <Self::ElementImpl as ObjectSubclass>::Type,
_buffer: gst::Buffer,
) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> {
@ -562,16 +546,16 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
fn sink_chain_list(
self,
_pad: PadSinkWeak,
_pad: gst::Pad,
_elem: <Self::ElementImpl as ObjectSubclass>::Type,
_buffer_list: gst::BufferList,
) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> {
future::err(FlowError::NotSupported).boxed()
}
fn sink_event(self, pad: &PadSinkRef, imp: &Self::ElementImpl, event: gst::Event) -> bool {
fn sink_event(self, pad: &gst::Pad, imp: &Self::ElementImpl, event: gst::Event) -> bool {
assert!(!event.is_serialized());
gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
gst::log!(RUNTIME_CAT, obj: pad, "Handling {:?}", event);
let elem = imp.obj();
// FIXME with GAT on `Self::ElementImpl`, we should be able to
@ -580,12 +564,12 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
// Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`.
let element = unsafe { elem.unsafe_cast_ref::<gst::Element>() };
gst::Pad::event_default(pad.gst_pad(), Some(element), event)
gst::Pad::event_default(pad, Some(element), event)
}
fn sink_event_serialized(
self,
pad: PadSinkWeak,
pad: gst::Pad,
elem: <Self::ElementImpl as ObjectSubclass>::Type,
event: gst::Event,
) -> BoxFuture<'static, bool> {
@ -597,17 +581,16 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
let element = unsafe { elem.unsafe_cast::<gst::Element>() };
async move {
let pad = pad.upgrade().expect("PadSink no longer exists");
gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
gst::log!(RUNTIME_CAT, obj: pad, "Handling {:?}", event);
gst::Pad::event_default(pad.gst_pad(), Some(&element), event)
gst::Pad::event_default(&pad, Some(&element), event)
}
.boxed()
}
fn sink_event_full(
self,
pad: &PadSinkRef,
pad: &gst::Pad,
imp: &Self::ElementImpl,
event: gst::Event,
) -> Result<FlowSuccess, FlowError> {
@ -620,7 +603,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
fn sink_event_full_serialized(
self,
pad: PadSinkWeak,
pad: gst::Pad,
elem: <Self::ElementImpl as ObjectSubclass>::Type,
event: gst::Event,
) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> {
@ -636,17 +619,17 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
fn sink_query(
self,
pad: &PadSinkRef,
pad: &gst::Pad,
imp: &Self::ElementImpl,
query: &mut gst::QueryRef,
) -> bool {
if query.is_serialized() {
gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Dropping {:?}", query);
gst::log!(RUNTIME_CAT, obj: pad, "Dropping {:?}", query);
// FIXME serialized queries should be handled with the dataflow
// but we can't return a `Future` because we couldn't honor QueryRef's lifetime
false
} else {
gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query);
gst::log!(RUNTIME_CAT, obj: pad, "Handling {:?}", query);
let elem = imp.obj();
// FIXME with GAT on `Self::ElementImpl`, we should be able to
@ -655,7 +638,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
// Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`.
let element = unsafe { elem.unsafe_cast_ref::<gst::Element>() };
gst::Pad::query_default(pad.gst_pad(), Some(element), query)
gst::Pad::query_default(pad, Some(element), query)
}
}
}
@ -724,26 +707,6 @@ impl<'a> PadSinkRef<'a> {
pub fn downgrade(&self) -> PadSinkWeak {
PadSinkWeak(Arc::downgrade(&self.strong))
}
fn activate_mode_hook(
&self,
mode: gst::PadMode,
active: bool,
) -> Result<(), gst::LoggableError> {
// Important: don't panic here as the hook is used without `catch_panic_pad_function`
// in the default `activatemode` handling
gst::log!(RUNTIME_CAT, obj: self.gst_pad(), "ActivateMode {:?}, {}", mode, active);
if mode == gst::PadMode::Pull {
gst::error!(RUNTIME_CAT, obj: self.gst_pad(), "Pull mode not supported by PadSink");
return Err(gst::loggable_error!(
RUNTIME_CAT,
"Pull mode not supported by PadSink"
));
}
Ok(())
}
}
impl<'a> Deref for PadSinkRef<'a> {
@ -794,12 +757,10 @@ impl PadSink {
{
unsafe {
let handler_clone = handler.clone();
let inner_arc = Arc::clone(&self.0);
self.gst_pad()
self.0
.gst_pad
.set_activate_function(move |gst_pad, parent| {
let handler = handler_clone.clone();
let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| {
@ -809,16 +770,15 @@ impl PadSink {
"Panic in PadSink activate"
))
},
move |imp| H::sink_activate(handler, &PadSinkRef::new(inner_arc), imp),
move |imp| H::sink_activate(handler, gst_pad, imp),
)
});
let handler_clone = handler.clone();
let inner_arc = Arc::clone(&self.0);
self.gst_pad()
self.0
.gst_pad
.set_activatemode_function(move |gst_pad, parent, mode, active| {
let handler = handler_clone.clone();
let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| {
@ -829,36 +789,53 @@ impl PadSink {
))
},
move |imp| {
let this_ref = PadSinkRef::new(inner_arc);
this_ref.activate_mode_hook(mode, active)?;
H::sink_activatemode(handler, &this_ref, imp, mode, active)
gst::log!(
RUNTIME_CAT,
obj: gst_pad,
"ActivateMode {:?}, {}",
mode,
active
);
if mode == gst::PadMode::Pull {
gst::error!(
RUNTIME_CAT,
obj: gst_pad,
"Pull mode not supported by PadSink"
);
return Err(gst::loggable_error!(
RUNTIME_CAT,
"Pull mode not supported by PadSink"
));
}
H::sink_activatemode(handler, gst_pad, imp, mode, active)
},
)
});
let handler_clone = handler.clone();
let inner_arc = Arc::clone(&self.0);
self.gst_pad()
.set_chain_function(move |_gst_pad, parent, buffer| {
self.0
.gst_pad
.set_chain_function(move |gst_pad, parent, buffer| {
let handler = handler_clone.clone();
let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| Err(FlowError::Error),
move |imp| {
let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
let elem = imp.obj().clone();
let gst_pad = gst_pad.clone();
if let Some((ctx, task_id)) = Context::current_task() {
let delayed_fut = async move {
H::sink_chain(handler, this_weak, elem, buffer).await
H::sink_chain(handler, gst_pad, elem, buffer).await
};
let _ =
ctx.add_sub_task(task_id, delayed_fut.map(|res| res.map(drop)));
Ok(gst::FlowSuccess::Ok)
} else {
let chain_fut = H::sink_chain(handler, this_weak, elem, buffer);
let chain_fut = H::sink_chain(handler, gst_pad, elem, buffer);
executor::block_on(chain_fut)
}
},
@ -866,21 +843,20 @@ impl PadSink {
});
let handler_clone = handler.clone();
let inner_arc = Arc::clone(&self.0);
self.gst_pad()
.set_chain_list_function(move |_gst_pad, parent, list| {
self.0
.gst_pad
.set_chain_list_function(move |gst_pad, parent, list| {
let handler = handler_clone.clone();
let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| Err(FlowError::Error),
move |imp| {
let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
let elem = imp.obj().clone();
let gst_pad = gst_pad.clone();
if let Some((ctx, task_id)) = Context::current_task() {
let delayed_fut = async move {
H::sink_chain_list(handler, this_weak, elem, list).await
H::sink_chain_list(handler, gst_pad, elem, list).await
};
let _ =
ctx.add_sub_task(task_id, delayed_fut.map(|res| res.map(drop)));
@ -888,7 +864,7 @@ impl PadSink {
Ok(gst::FlowSuccess::Ok)
} else {
let chain_list_fut =
H::sink_chain_list(handler, this_weak, elem, list);
H::sink_chain_list(handler, gst_pad, elem, list);
executor::block_on(chain_list_fut)
}
},
@ -898,25 +874,22 @@ impl PadSink {
// No need to `set_event_function` since `set_event_full_function`
// overrides it and dispatches to `sink_event` when necessary
let handler_clone = handler.clone();
let inner_arc = Arc::clone(&self.0);
self.gst_pad()
.set_event_full_function(move |_gst_pad, parent, event| {
self.0
.gst_pad
.set_event_full_function(move |gst_pad, parent, event| {
let handler = handler_clone.clone();
let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| Err(FlowError::Error),
move |imp| {
if event.is_serialized() {
let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
let elem = imp.obj().clone();
let gst_pad = gst_pad.clone();
if let Some((ctx, task_id)) = Context::current_task() {
let delayed_fut = async move {
H::sink_event_full_serialized(
handler, this_weak, elem, event,
)
.await
H::sink_event_full_serialized(handler, gst_pad, elem, event)
.await
};
let _ = ctx.add_sub_task(
task_id,
@ -926,35 +899,38 @@ impl PadSink {
Ok(gst::FlowSuccess::Ok)
} else {
let event_fut = H::sink_event_full_serialized(
handler, this_weak, elem, event,
handler, gst_pad, elem, event,
);
executor::block_on(event_fut)
}
} else {
handler.sink_event_full(&PadSinkRef::new(inner_arc), imp, event)
handler.sink_event_full(gst_pad, imp, event)
}
},
)
});
let inner_arc = Arc::clone(&self.0);
self.gst_pad()
.set_query_function(move |_gst_pad, parent, query| {
let handler = handler.clone();
let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| false,
move |imp| {
if !query.is_serialized() {
H::sink_query(handler, &PadSinkRef::new(inner_arc), imp, query)
} else {
gst::fixme!(RUNTIME_CAT, obj: inner_arc.gst_pad(), "Serialized Query not supported");
false
}
},
)
});
self.0
.gst_pad
.set_query_function(move |gst_pad, parent, query| {
let handler = handler.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| false,
move |imp| {
if !query.is_serialized() {
H::sink_query(handler, gst_pad, imp, query)
} else {
gst::fixme!(
RUNTIME_CAT,
obj: gst_pad,
"Serialized Query not supported"
);
false
}
},
)
});
}
}
}
@ -963,29 +939,36 @@ impl Drop for PadSink {
fn drop(&mut self) {
// FIXME: Do this better
unsafe {
self.gst_pad()
self.0
.gst_pad
.set_activate_function(move |_gst_pad, _parent| {
Err(gst::loggable_error!(
RUNTIME_CAT,
"PadSink no longer exists"
))
});
self.gst_pad()
self.0
.gst_pad
.set_activatemode_function(move |_gst_pad, _parent, _mode, _active| {
Err(gst::loggable_error!(
RUNTIME_CAT,
"PadSink no longer exists"
))
});
self.gst_pad()
self.0
.gst_pad
.set_chain_function(move |_gst_pad, _parent, _buffer| Err(FlowError::Flushing));
self.gst_pad()
self.0
.gst_pad
.set_chain_list_function(move |_gst_pad, _parent, _list| Err(FlowError::Flushing));
self.gst_pad()
self.0
.gst_pad
.set_event_function(move |_gst_pad, _parent, _event| false);
self.gst_pad()
self.0
.gst_pad
.set_event_full_function(move |_gst_pad, _parent, _event| Err(FlowError::Flushing));
self.gst_pad()
self.0
.gst_pad
.set_query_function(move |_gst_pad, _parent, _query| false);
}
}

View file

@ -36,7 +36,7 @@ use std::u32;
use crate::runtime::prelude::*;
use crate::runtime::task;
use crate::runtime::{Context, PadSrc, PadSrcRef, Task, TaskState};
use crate::runtime::{Context, PadSrc, Task, TaskState};
use crate::runtime::Async;
use crate::socket::{Socket, SocketError, SocketRead};
@ -96,8 +96,8 @@ struct TcpClientSrcPadHandler;
impl PadSrcHandler for TcpClientSrcPadHandler {
type ElementImpl = TcpClientSrc;
fn src_event(self, pad: &PadSrcRef, imp: &TcpClientSrc, event: gst::Event) -> bool {
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
fn src_event(self, pad: &gst::Pad, imp: &TcpClientSrc, event: gst::Event) -> bool {
gst::log!(CAT, obj: pad, "Handling {:?}", event);
use gst::EventView;
let ret = match event.view() {
@ -109,16 +109,16 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
};
if ret {
gst::log!(CAT, obj: pad.gst_pad(), "Handled {:?}", event);
gst::log!(CAT, obj: pad, "Handled {:?}", event);
} else {
gst::log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event);
gst::log!(CAT, obj: pad, "Didn't handle {:?}", event);
}
ret
}
fn src_query(self, pad: &PadSrcRef, imp: &TcpClientSrc, query: &mut gst::QueryRef) -> bool {
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
fn src_query(self, pad: &gst::Pad, imp: &TcpClientSrc, query: &mut gst::QueryRef) -> bool {
gst::log!(CAT, obj: pad, "Handling {:?}", query);
use gst::QueryViewMut;
let ret = match query.view_mut() {
@ -150,9 +150,9 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
};
if ret {
gst::log!(CAT, obj: pad.gst_pad(), "Handled {:?}", query);
gst::log!(CAT, obj: pad, "Handled {:?}", query);
} else {
gst::log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", query);
gst::log!(CAT, obj: pad, "Didn't handle {:?}", query);
}
ret

View file

@ -30,7 +30,7 @@ use gst::{element_error, error_msg};
use once_cell::sync::Lazy;
use crate::runtime::prelude::*;
use crate::runtime::{self, Async, Context, PadSink, PadSinkRef, PadSinkWeak, Task};
use crate::runtime::{self, Async, Context, PadSink, Task};
use crate::socket::{wrap_socket, GioSocketWrapper};
use std::collections::BTreeSet;
@ -134,7 +134,7 @@ impl PadSinkHandler for UdpSinkPadHandler {
fn sink_chain(
self,
_pad: PadSinkWeak,
_pad: gst::Pad,
elem: super::UdpSink,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
@ -152,7 +152,7 @@ impl PadSinkHandler for UdpSinkPadHandler {
fn sink_chain_list(
self,
_pad: PadSinkWeak,
_pad: gst::Pad,
elem: super::UdpSink,
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
@ -172,7 +172,7 @@ impl PadSinkHandler for UdpSinkPadHandler {
fn sink_event_serialized(
self,
_pad: PadSinkWeak,
_pad: gst::Pad,
elem: super::UdpSink,
event: gst::Event,
) -> BoxFuture<'static, bool> {
@ -190,7 +190,7 @@ impl PadSinkHandler for UdpSinkPadHandler {
.boxed()
}
fn sink_event(self, _pad: &PadSinkRef, imp: &UdpSink, event: gst::Event) -> bool {
fn sink_event(self, _pad: &gst::Pad, imp: &UdpSink, event: gst::Event) -> bool {
if let EventView::FlushStart(..) = event.view() {
return imp.task.flush_start().await_maybe_on_context().is_ok();
}

View file

@ -35,7 +35,7 @@ use std::time::Duration;
use std::u16;
use crate::runtime::prelude::*;
use crate::runtime::{Async, Context, PadSrc, PadSrcRef, Task};
use crate::runtime::{Async, Context, PadSrc, Task};
use crate::socket::{wrap_socket, GioSocketWrapper, Socket, SocketError, SocketRead};
@ -113,8 +113,8 @@ struct UdpSrcPadHandler;
impl PadSrcHandler for UdpSrcPadHandler {
type ElementImpl = UdpSrc;
fn src_event(self, pad: &PadSrcRef, imp: &UdpSrc, event: gst::Event) -> bool {
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
fn src_event(self, pad: &gst::Pad, imp: &UdpSrc, event: gst::Event) -> bool {
gst::log!(CAT, obj: pad, "Handling {:?}", event);
use gst::EventView;
let ret = match event.view() {
@ -126,16 +126,16 @@ impl PadSrcHandler for UdpSrcPadHandler {
};
if ret {
gst::log!(CAT, obj: pad.gst_pad(), "Handled {:?}", event);
gst::log!(CAT, obj: pad, "Handled {:?}", event);
} else {
gst::log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event);
gst::log!(CAT, obj: pad, "Didn't handle {:?}", event);
}
ret
}
fn src_query(self, pad: &PadSrcRef, imp: &UdpSrc, query: &mut gst::QueryRef) -> bool {
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
fn src_query(self, pad: &gst::Pad, imp: &UdpSrc, query: &mut gst::QueryRef) -> bool {
gst::log!(CAT, obj: pad, "Handling {:?}", query);
use gst::QueryViewMut;
let ret = match query.view_mut() {
@ -167,9 +167,9 @@ impl PadSrcHandler for UdpSrcPadHandler {
};
if ret {
gst::log!(CAT, obj: pad.gst_pad(), "Handled {:?}", query);
gst::log!(CAT, obj: pad, "Handled {:?}", query);
} else {
gst::log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", query);
gst::log!(CAT, obj: pad, "Didn't handle {:?}", query);
}
ret

View file

@ -36,9 +36,7 @@ use std::sync::Mutex;
use std::time::Duration;
use gstthreadshare::runtime::prelude::*;
use gstthreadshare::runtime::{
Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, Task, TaskState,
};
use gstthreadshare::runtime::{Context, PadSink, PadSrc, Task, TaskState};
const DEFAULT_CONTEXT: &str = "";
const THROTTLING_DURATION: Duration = Duration::from_millis(2);
@ -89,8 +87,8 @@ mod imp_src {
impl PadSrcHandler for PadSrcTestHandler {
type ElementImpl = ElementSrcTest;
fn src_event(self, pad: &PadSrcRef, imp: &ElementSrcTest, event: gst::Event) -> bool {
gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
fn src_event(self, pad: &gst::Pad, imp: &ElementSrcTest, event: gst::Event) -> bool {
gst::log!(SRC_CAT, obj: pad, "Handling {:?}", event);
let ret = match event.view() {
EventView::FlushStart(..) => {
@ -102,9 +100,9 @@ mod imp_src {
};
if ret {
gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handled {:?}", event);
gst::log!(SRC_CAT, obj: pad, "Handled {:?}", event);
} else {
gst::log!(SRC_CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event);
gst::log!(SRC_CAT, obj: pad, "Didn't handle {:?}", event);
}
ret
@ -441,7 +439,7 @@ mod imp_sink {
fn sink_chain(
self,
_pad: PadSinkWeak,
_pad: gst::Pad,
elem: super::ElementSinkTest,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
@ -454,7 +452,7 @@ mod imp_sink {
fn sink_chain_list(
self,
_pad: PadSinkWeak,
_pad: gst::Pad,
elem: super::ElementSinkTest,
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
@ -465,8 +463,8 @@ mod imp_sink {
.boxed()
}
fn sink_event(self, pad: &PadSinkRef, imp: &ElementSinkTest, event: gst::Event) -> bool {
gst::debug!(SINK_CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event);
fn sink_event(self, pad: &gst::Pad, imp: &ElementSinkTest, event: gst::Event) -> bool {
gst::debug!(SINK_CAT, obj: pad, "Handling non-serialized {:?}", event);
match event.view() {
EventView::FlushStart(..) => {
@ -479,13 +477,12 @@ mod imp_sink {
fn sink_event_serialized(
self,
pad: PadSinkWeak,
pad: gst::Pad,
elem: super::ElementSinkTest,
event: gst::Event,
) -> BoxFuture<'static, bool> {
async move {
let pad = pad.upgrade().expect("PadSink no longer exists");
gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event);
gst::log!(SINK_CAT, obj: pad, "Handling serialized {:?}", event);
let imp = elem.imp();
if let EventView::FlushStop(..) = event.view() {