Make sure to keep around and drop bus watches after usage in all the examples

This commit is contained in:
Sebastian Dröge 2023-04-14 12:46:43 +03:00
parent aabfb61834
commit 47159ad3c2
8 changed files with 217 additions and 206 deletions

View file

@ -192,27 +192,28 @@ fn main() {
let bus = pipeline.bus().unwrap();
let l_clone = l.clone();
bus.add_watch(move |_, msg| {
use gst::MessageView;
let _bus_watch = bus
.add_watch(move |_, msg| {
use gst::MessageView;
match msg.view() {
MessageView::Eos(..) => l_clone.quit(),
MessageView::Error(err) => {
gst::error!(
CAT,
"Error from {:?}: {} ({:?})",
err.src().map(|s| s.path_string()),
err.error(),
err.debug()
);
l_clone.quit();
}
_ => (),
};
match msg.view() {
MessageView::Eos(..) => l_clone.quit(),
MessageView::Error(err) => {
gst::error!(
CAT,
"Error from {:?}: {} ({:?})",
err.src().map(|s| s.path_string()),
err.error(),
err.debug()
);
l_clone.quit();
}
_ => (),
};
glib::Continue(true)
})
.expect("Failed to add bus watch");
glib::Continue(true)
})
.expect("Failed to add bus watch");
pipeline.set_state(gst::State::Playing).unwrap();

View file

@ -108,45 +108,46 @@ fn main() {
let terminated_count = Arc::new(AtomicU32::new(0));
let pipeline_clone = pipeline.clone();
let l_clone = l.clone();
bus.add_watch(move |_, msg| {
use gst::MessageView;
match msg.view() {
MessageView::Eos(_) => {
// Actually, we don't post EOS (see sinks impl).
gst::info!(CAT, "Received eos");
l_clone.quit();
let _bus_watch = bus
.add_watch(move |_, msg| {
use gst::MessageView;
match msg.view() {
MessageView::Eos(_) => {
// Actually, we don't post EOS (see sinks impl).
gst::info!(CAT, "Received eos");
l_clone.quit();
glib::Continue(false)
}
MessageView::Error(msg) => {
if let gst::MessageView::Error(msg) = msg.message().view() {
if msg.error().matches(gst::LibraryError::Shutdown) {
if terminated_count.fetch_add(1, Ordering::SeqCst) == args.streams - 1 {
gst::info!(CAT, "Received all shutdown requests");
l_clone.quit();
glib::Continue(false)
}
MessageView::Error(msg) => {
if let gst::MessageView::Error(msg) = msg.message().view() {
if msg.error().matches(gst::LibraryError::Shutdown) {
if terminated_count.fetch_add(1, Ordering::SeqCst) == args.streams - 1 {
gst::info!(CAT, "Received all shutdown requests");
l_clone.quit();
return glib::Continue(false);
} else {
return glib::Continue(true);
return glib::Continue(false);
} else {
return glib::Continue(true);
}
}
}
gst::error!(
CAT,
"Error from {:?}: {} ({:?})",
msg.src().map(|s| s.path_string()),
msg.error(),
msg.debug()
);
l_clone.quit();
glib::Continue(false)
}
gst::error!(
CAT,
"Error from {:?}: {} ({:?})",
msg.src().map(|s| s.path_string()),
msg.error(),
msg.debug()
);
l_clone.quit();
glib::Continue(false)
_ => glib::Continue(true),
}
_ => glib::Continue(true),
}
})
.expect("Failed to add bus watch");
})
.expect("Failed to add bus watch");
gst::info!(CAT, "Switching to Ready");
let start = Instant::now();

View file

@ -170,31 +170,32 @@ fn run(pipeline: gst::Pipeline) {
let bus = pipeline.bus().unwrap();
let l_clone = l.clone();
bus.add_watch(move |_, msg| {
use gst::MessageView;
match msg.view() {
MessageView::Eos(_) => {
gst::info!(CAT, "Received eos");
l_clone.quit();
let _bus_watch = bus
.add_watch(move |_, msg| {
use gst::MessageView;
match msg.view() {
MessageView::Eos(_) => {
gst::info!(CAT, "Received eos");
l_clone.quit();
glib::Continue(false)
}
MessageView::Error(msg) => {
gst::error!(
CAT,
"Error from {:?}: {} ({:?})",
msg.src().map(|s| s.path_string()),
msg.error(),
msg.debug()
);
l_clone.quit();
glib::Continue(false)
}
MessageView::Error(msg) => {
gst::error!(
CAT,
"Error from {:?}: {} ({:?})",
msg.src().map(|s| s.path_string()),
msg.error(),
msg.debug()
);
l_clone.quit();
glib::Continue(false)
glib::Continue(false)
}
_ => glib::Continue(true),
}
_ => glib::Continue(true),
}
})
.expect("Failed to add bus watch");
})
.expect("Failed to add bus watch");
pipeline.set_state(gst::State::Playing).unwrap();
l.run();

View file

@ -135,38 +135,39 @@ fn multiple_contexts_queue() {
let bus = pipeline.bus().unwrap();
let l_clone = l.clone();
bus.add_watch(move |_, msg| {
use gst::MessageView;
let _bus_watch = bus
.add_watch(move |_, msg| {
use gst::MessageView;
match msg.view() {
MessageView::StateChanged(state_changed) => {
if let Some(source) = state_changed.src() {
if source.type_() == gst::Pipeline::static_type()
&& state_changed.old() == gst::State::Paused
&& state_changed.current() == gst::State::Playing
{
if let Some(test_scenario) = test_scenario.take() {
std::thread::spawn(test_scenario);
match msg.view() {
MessageView::StateChanged(state_changed) => {
if let Some(source) = state_changed.src() {
if source.type_() == gst::Pipeline::static_type()
&& state_changed.old() == gst::State::Paused
&& state_changed.current() == gst::State::Playing
{
if let Some(test_scenario) = test_scenario.take() {
std::thread::spawn(test_scenario);
}
}
}
}
}
MessageView::Error(err) => {
gst::error!(
CAT,
"multiple_contexts_queue: Error from {:?}: {} ({:?})",
err.src().map(|s| s.path_string()),
err.error(),
err.debug()
);
l_clone.quit();
}
_ => (),
};
MessageView::Error(err) => {
gst::error!(
CAT,
"multiple_contexts_queue: Error from {:?}: {} ({:?})",
err.src().map(|s| s.path_string()),
err.error(),
err.debug()
);
l_clone.quit();
}
_ => (),
};
glib::Continue(true)
})
.unwrap();
glib::Continue(true)
})
.unwrap();
pipeline.set_state(gst::State::Playing).unwrap();
@ -281,38 +282,39 @@ fn multiple_contexts_proxy() {
let bus = pipeline.bus().unwrap();
let l_clone = l.clone();
bus.add_watch(move |_, msg| {
use gst::MessageView;
let _bus_watch = bus
.add_watch(move |_, msg| {
use gst::MessageView;
match msg.view() {
MessageView::StateChanged(state_changed) => {
if let Some(source) = state_changed.src() {
if source.type_() == gst::Pipeline::static_type()
&& state_changed.old() == gst::State::Paused
&& state_changed.current() == gst::State::Playing
{
if let Some(test_scenario) = test_scenario.take() {
std::thread::spawn(test_scenario);
match msg.view() {
MessageView::StateChanged(state_changed) => {
if let Some(source) = state_changed.src() {
if source.type_() == gst::Pipeline::static_type()
&& state_changed.old() == gst::State::Paused
&& state_changed.current() == gst::State::Playing
{
if let Some(test_scenario) = test_scenario.take() {
std::thread::spawn(test_scenario);
}
}
}
}
}
MessageView::Error(err) => {
gst::error!(
CAT,
"multiple_contexts_proxy: Error from {:?}: {} ({:?})",
err.src().map(|s| s.path_string()),
err.error(),
err.debug()
);
l_clone.quit();
}
_ => (),
};
MessageView::Error(err) => {
gst::error!(
CAT,
"multiple_contexts_proxy: Error from {:?}: {} ({:?})",
err.src().map(|s| s.path_string()),
err.error(),
err.debug()
);
l_clone.quit();
}
_ => (),
};
glib::Continue(true)
})
.unwrap();
glib::Continue(true)
})
.unwrap();
pipeline.set_state(gst::State::Playing).unwrap();
@ -405,7 +407,7 @@ fn eos() {
});
let l_clone = l.clone();
pipeline
let _bus_watch = pipeline
.bus()
.unwrap()
.add_watch(move |_, msg| {
@ -561,7 +563,7 @@ fn premature_shutdown() {
});
let l_clone = l.clone();
pipeline
let _bus_watch = pipeline
.bus()
.unwrap()
.add_watch(move |_, msg| {
@ -657,7 +659,7 @@ fn socket_play_null_play() {
});
let l_clone = l.clone();
pipeline
let _bus_watch = pipeline
.bus()
.unwrap()
.add_watch(move |_, msg| {

View file

@ -154,42 +154,43 @@ fn create_ui(app: &gtk::Application) {
let bus = pipeline.bus().unwrap();
let app_weak = app.downgrade();
bus.add_watch_local(move |_, msg| {
use gst::MessageView;
let bus_watch = bus
.add_watch_local(move |_, msg| {
use gst::MessageView;
let app = match app_weak.upgrade() {
Some(app) => app,
None => return glib::Continue(false),
};
let app = match app_weak.upgrade() {
Some(app) => app,
None => return glib::Continue(false),
};
match msg.view() {
MessageView::Eos(..) => app.quit(),
MessageView::Error(err) => {
println!(
"Error from {:?}: {} ({:?})",
msg.src().map(|s| s.path_string()),
err.error(),
err.debug()
);
app.quit();
}
_ => (),
};
match msg.view() {
MessageView::Eos(..) => app.quit(),
MessageView::Error(err) => {
println!(
"Error from {:?}: {} ({:?})",
msg.src().map(|s| s.path_string()),
err.error(),
err.debug()
);
app.quit();
}
_ => (),
};
glib::Continue(true)
})
.expect("Failed to add bus watch");
glib::Continue(true)
})
.expect("Failed to add bus watch");
pipeline.set_state(gst::State::Playing).unwrap();
// Pipeline reference is owned by the closure below, so will be
// destroyed once the app is destroyed
let timeout_id = RefCell::new(Some(timeout_id));
let bus_watch = RefCell::new(Some(bus_watch));
app.connect_shutdown(move |_| {
drop(bus_watch.borrow_mut().take());
pipeline.set_state(gst::State::Null).unwrap();
bus.remove_watch().unwrap();
if let Some(timeout_id) = timeout_id.borrow_mut().take() {
timeout_id.remove();
}

View file

@ -10,7 +10,7 @@
use gio::prelude::*;
use gst::{glib, prelude::*};
use gtk::prelude::*;
use std::cell::Cell;
use std::cell::{Cell, RefCell};
struct DroppingProbe(glib::WeakRef<gst::Pad>, Option<gst::PadProbeId>);
@ -106,7 +106,7 @@ fn create_window(app: &gtk::Application) {
}
});
{
let bus_watch = {
let bus = pipeline.bus().unwrap();
let window = window.downgrade();
bus.add_watch_local(move |_, msg| {
@ -136,8 +136,8 @@ fn create_window(app: &gtk::Application) {
glib::Continue(true)
})
.unwrap();
}
.unwrap()
};
{
let pipeline = pipeline.clone();
@ -148,7 +148,9 @@ fn create_window(app: &gtk::Application) {
});
}
let bus_watch = RefCell::new(Some(bus_watch));
window.connect_unrealize(move |_| {
drop(bus_watch.borrow_mut().take());
pipeline
.set_state(gst::State::Null)
.expect("Failed to stop pipeline");

View file

@ -284,42 +284,43 @@ fn create_ui(app: &gtk::Application) {
let bus = pipeline.bus().unwrap();
let app_weak = app.downgrade();
bus.add_watch_local(move |_, msg| {
use gst::MessageView;
let bus_watch = bus
.add_watch_local(move |_, msg| {
use gst::MessageView;
let app = match app_weak.upgrade() {
Some(app) => app,
None => return glib::Continue(false),
};
let app = match app_weak.upgrade() {
Some(app) => app,
None => return glib::Continue(false),
};
match msg.view() {
MessageView::Eos(..) => app.quit(),
MessageView::Error(err) => {
println!(
"Error from {:?}: {} ({:?})",
msg.src().map(|s| s.path_string()),
err.error(),
err.debug()
);
app.quit();
}
_ => (),
};
match msg.view() {
MessageView::Eos(..) => app.quit(),
MessageView::Error(err) => {
println!(
"Error from {:?}: {} ({:?})",
msg.src().map(|s| s.path_string()),
err.error(),
err.debug()
);
app.quit();
}
_ => (),
};
glib::Continue(true)
})
.expect("Failed to add bus watch");
glib::Continue(true)
})
.expect("Failed to add bus watch");
pipeline.set_state(gst::State::Playing).unwrap();
// Pipeline reference is owned by the closure below, so will be
// destroyed once the app is destroyed
let timeout_id = RefCell::new(Some(timeout_id));
let bus_watch = RefCell::new(Some(bus_watch));
app.connect_shutdown(move |_| {
drop(bus_watch.borrow_mut().take());
pipeline.set_state(gst::State::Null).unwrap();
bus.remove_watch().unwrap();
if let Some(timeout_id) = timeout_id.borrow_mut().take() {
timeout_id.remove();
}

View file

@ -95,42 +95,44 @@ fn create_ui(app: &gtk::Application) {
.expect("Unable to set the pipeline to the `Playing` state");
let app_weak = app.downgrade();
bus.add_watch_local(move |_, msg| {
use gst::MessageView;
let bus_watch = bus
.add_watch_local(move |_, msg| {
use gst::MessageView;
let app = match app_weak.upgrade() {
Some(app) => app,
None => return glib::Continue(false),
};
let app = match app_weak.upgrade() {
Some(app) => app,
None => return glib::Continue(false),
};
match msg.view() {
MessageView::Eos(..) => app.quit(),
MessageView::Error(err) => {
println!(
"Error from {:?}: {} ({:?})",
err.src().map(|s| s.path_string()),
err.error(),
err.debug()
);
app.quit();
}
_ => (),
};
match msg.view() {
MessageView::Eos(..) => app.quit(),
MessageView::Error(err) => {
println!(
"Error from {:?}: {} ({:?})",
err.src().map(|s| s.path_string()),
err.error(),
err.debug()
);
app.quit();
}
_ => (),
};
glib::Continue(true)
})
.expect("Failed to add bus watch");
glib::Continue(true)
})
.expect("Failed to add bus watch");
let timeout_id = RefCell::new(Some(timeout_id));
let pipeline = RefCell::new(Some(pipeline));
let bus_watch = RefCell::new(Some(bus_watch));
app.connect_shutdown(move |_| {
window.close();
drop(bus_watch.borrow_mut().take());
if let Some(pipeline) = pipeline.borrow_mut().take() {
pipeline
.set_state(gst::State::Null)
.expect("Unable to set the pipeline to the `Null` state");
pipeline.bus().unwrap().remove_watch().unwrap();
}
if let Some(timeout_id) = timeout_id.borrow_mut().take() {