threadshare: save upgrade in Pad functions

When initializing Pad functions in `Pad{Src,Sink}`, we downgrade the
`Pad{Src,Sink}` and upgrade it when necessary. This was implemented
to avoid reference cycles:

`gst::Pad` -> pad function -> `Pad{Src,Sink}` -> `gst::Pad`.

Since `Pad{Src,Sink}` reset the pad functions when dropping, there is
no cycles, so we can use an `Arc<Pad{Src,Sink}>` in the pad functions,
thus saving an `upgrade`.
This commit is contained in:
François Laignel 2020-04-29 15:47:57 +02:00
parent 26634f591a
commit 8e38d861b4

View file

@ -445,11 +445,11 @@ impl PadSrc {
fn init_pad_functions<H: PadSrcHandler>(&self, handler: H) {
let handler_clone = handler.clone();
let this_weak = self.downgrade();
let inner_arc = Arc::clone(&(self.0).0);
self.gst_pad()
.set_activate_function(move |gst_pad, parent| {
let handler = handler_clone.clone();
let this_weak = this_weak.clone();
let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| {
@ -457,18 +457,18 @@ impl PadSrc {
Err(gst_loggable_error!(RUNTIME_CAT, "Panic in PadSrc activate"))
},
move |imp, element| {
let this_ref = this_weak.upgrade().expect("PadSrc no longer exists");
let this_ref = PadSrcRef::new(inner_arc);
handler.src_activate(&this_ref, imp, element)
},
)
});
let handler_clone = handler.clone();
let this_weak = self.downgrade();
let inner_arc = Arc::clone(&(self.0).0);
self.gst_pad()
.set_activatemode_function(move |gst_pad, parent, mode, active| {
let handler = handler_clone.clone();
let this_weak = this_weak.clone();
let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| {
@ -479,7 +479,7 @@ impl PadSrc {
))
},
move |imp, element| {
let this_ref = this_weak.upgrade().expect("PadSrc no longer exists");
let this_ref = PadSrcRef::new(inner_arc);
this_ref.activate_mode_hook(mode, active)?;
handler.src_activatemode(&this_ref, imp, element, mode, active)
},
@ -489,31 +489,31 @@ impl PadSrc {
// No need to `set_event_function` since `set_event_full_function`
// overrides it and dispatches to `src_event` when necessary
let handler_clone = handler.clone();
let this_weak = self.downgrade();
let inner_arc = Arc::clone(&(self.0).0);
self.gst_pad()
.set_event_full_function(move |_gst_pad, parent, event| {
let handler = handler_clone.clone();
let this_weak = this_weak.clone();
let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| Err(FlowError::Error),
move |imp, element| {
let this_ref = this_weak.upgrade().expect("PadSrc no longer exists");
let this_ref = PadSrcRef::new(inner_arc);
handler.src_event_full(&this_ref, imp, &element, event)
},
)
});
let this_weak = self.downgrade();
let inner_arc = Arc::clone(&(self.0).0);
self.gst_pad()
.set_query_function(move |_gst_pad, parent, query| {
let handler = handler.clone();
let this_weak = this_weak.clone();
let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| false,
move |imp, element| {
let this_ref = this_weak.upgrade().expect("PadSrc no longer exists");
let this_ref = PadSrcRef::new(inner_arc);
if !query.is_serialized() {
handler.src_query(&this_ref, imp, &element, query)
} else {
@ -870,11 +870,11 @@ impl PadSink {
fn init_pad_functions<H: PadSinkHandler>(&self, handler: H) {
let handler_clone = handler.clone();
let this_weak = self.downgrade();
let inner_arc = Arc::clone(&(self.0).0);
self.gst_pad()
.set_activate_function(move |gst_pad, parent| {
let handler = handler_clone.clone();
let this_weak = this_weak.clone();
let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| {
@ -885,18 +885,18 @@ impl PadSink {
))
},
move |imp, element| {
let this_ref = this_weak.upgrade().expect("PadSink no longer exists");
let this_ref = PadSinkRef::new(inner_arc);
handler.sink_activate(&this_ref, imp, element)
},
)
});
let handler_clone = handler.clone();
let this_weak = self.downgrade();
let inner_arc = Arc::clone(&(self.0).0);
self.gst_pad()
.set_activatemode_function(move |gst_pad, parent, mode, active| {
let handler = handler_clone.clone();
let this_weak = this_weak.clone();
let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| {
@ -907,7 +907,7 @@ impl PadSink {
))
},
move |imp, element| {
let this_ref = this_weak.upgrade().expect("PadSink no longer exists");
let this_ref = PadSinkRef::new(inner_arc);
this_ref.activate_mode_hook(mode, active)?;
handler.sink_activatemode(&this_ref, imp, element, mode, active)
@ -916,17 +916,17 @@ impl PadSink {
});
let handler_clone = handler.clone();
let this_weak = self.downgrade();
let inner_arc = Arc::clone(&(self.0).0);
self.gst_pad()
.set_chain_function(move |_gst_pad, parent, buffer| {
let handler = handler_clone.clone();
let this_weak = this_weak.clone();
let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| Err(FlowError::Error),
move |imp, element| {
if Context::current_has_sub_tasks() {
let this_weak = this_weak.clone();
let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
let handler = handler.clone();
let element = element.clone();
let delayed_fut = async move {
@ -940,7 +940,7 @@ impl PadSink {
Ok(gst::FlowSuccess::Ok)
} else {
let this_ref = this_weak.upgrade().expect("PadSink no longer exists");
let this_ref = PadSinkRef::new(inner_arc);
let chain_fut = handler.sink_chain(&this_ref, imp, &element, buffer);
this_ref.handle_future(chain_fut)
}
@ -949,17 +949,17 @@ impl PadSink {
});
let handler_clone = handler.clone();
let this_weak = self.downgrade();
let inner_arc = Arc::clone(&(self.0).0);
self.gst_pad()
.set_chain_list_function(move |_gst_pad, parent, list| {
let handler = handler_clone.clone();
let this_weak = this_weak.clone();
let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| Err(FlowError::Error),
move |imp, element| {
if Context::current_has_sub_tasks() {
let this_weak = this_weak.clone();
let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
let handler = handler.clone();
let element = element.clone();
let delayed_fut = async move {
@ -975,7 +975,7 @@ impl PadSink {
Ok(gst::FlowSuccess::Ok)
} else {
let this_ref = this_weak.upgrade().expect("PadSink no longer exists");
let this_ref = PadSinkRef::new(inner_arc);
let chain_list_fut =
handler.sink_chain_list(&this_ref, imp, &element, list);
this_ref.handle_future(chain_list_fut)
@ -987,19 +987,18 @@ impl PadSink {
// No need to `set_event_function` since `set_event_full_function`
// overrides it and dispatches to `sink_event` when necessary
let handler_clone = handler.clone();
let this_weak = self.downgrade();
let inner_arc = Arc::clone(&(self.0).0);
self.gst_pad()
.set_event_full_function(move |_gst_pad, parent, event| {
let handler = handler_clone.clone();
let this_weak = this_weak.clone();
let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| Err(FlowError::Error),
move |imp, element| {
let this_ref = this_weak.upgrade().expect("PadSink no longer exists");
if event.is_serialized() {
if Context::current_has_sub_tasks() {
let this_weak = this_weak.clone();
let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
let handler = handler.clone();
let element = element.clone();
let delayed_fut = async move {
@ -1016,27 +1015,29 @@ impl PadSink {
Ok(gst::FlowSuccess::Ok)
} else {
let this_ref = PadSinkRef::new(inner_arc);
let event_fut = handler
.sink_event_full_serialized(&this_ref, imp, &element, event);
this_ref.handle_future(event_fut)
}
} else {
let this_ref = PadSinkRef::new(inner_arc);
handler.sink_event_full(&this_ref, imp, &element, event)
}
},
)
});
let this_weak = self.downgrade();
let inner_arc = Arc::clone(&(self.0).0);
self.gst_pad()
.set_query_function(move |_gst_pad, parent, query| {
let handler = handler.clone();
let this_weak = this_weak.clone();
let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
|| false,
move |imp, element| {
let this_ref = this_weak.upgrade().expect("PadSink no longer exists");
let this_ref = PadSinkRef::new(inner_arc);
if !query.is_serialized() {
handler.sink_query(&this_ref, imp, &element, query)
} else {