threadshare: Use appsink callbacks instead of signals in the tests

This commit is contained in:
Sebastian Dröge 2021-05-31 10:33:50 +03:00
parent 1ec1352c88
commit 94f75c29a1
5 changed files with 130 additions and 95 deletions

View file

@ -77,17 +77,21 @@ fn jb_pipeline() {
let appsink = sink.dynamic_cast::<gst_app::AppSink>().unwrap(); let appsink = sink.dynamic_cast::<gst_app::AppSink>().unwrap();
let (sender, receiver) = mpsc::channel(); let (sender, receiver) = mpsc::channel();
appsink.connect_new_sample(move |appsink| { appsink.set_callbacks(
let _sample = appsink gst_app::AppSinkCallbacks::builder()
.emit_by_name("pull-sample", &[]) .new_sample(move |appsink| {
.unwrap() let _sample = appsink
.unwrap() .emit_by_name("pull-sample", &[])
.get::<gst::Sample>() .unwrap()
.unwrap(); .unwrap()
.get::<gst::Sample>()
.unwrap();
sender.send(()).unwrap(); sender.send(()).unwrap();
Ok(gst::FlowSuccess::Ok) Ok(gst::FlowSuccess::Ok)
}); })
.build(),
);
pipeline.set_state(gst::State::Playing).unwrap(); pipeline.set_state(gst::State::Playing).unwrap();
@ -143,17 +147,21 @@ fn jb_ts_pipeline() {
let appsink = sink.dynamic_cast::<gst_app::AppSink>().unwrap(); let appsink = sink.dynamic_cast::<gst_app::AppSink>().unwrap();
let (sender, receiver) = mpsc::channel(); let (sender, receiver) = mpsc::channel();
appsink.connect_new_sample(move |appsink| { appsink.set_callbacks(
let _sample = appsink gst_app::AppSinkCallbacks::builder()
.emit_by_name("pull-sample", &[]) .new_sample(move |appsink| {
.unwrap() let _sample = appsink
.unwrap() .emit_by_name("pull-sample", &[])
.get::<gst::Sample>() .unwrap()
.unwrap(); .unwrap()
.get::<gst::Sample>()
.unwrap();
sender.send(()).unwrap(); sender.send(()).unwrap();
Ok(gst::FlowSuccess::Ok) Ok(gst::FlowSuccess::Ok)
}); })
.build(),
);
pipeline.set_state(gst::State::Playing).unwrap(); pipeline.set_state(gst::State::Playing).unwrap();

View file

@ -86,17 +86,21 @@ fn multiple_contexts_queue() {
let appsink = sink.dynamic_cast::<gst_app::AppSink>().unwrap(); let appsink = sink.dynamic_cast::<gst_app::AppSink>().unwrap();
let sender_clone = sender.clone(); let sender_clone = sender.clone();
appsink.connect_new_sample(move |appsink| { appsink.set_callbacks(
let _sample = appsink gst_app::AppSinkCallbacks::builder()
.emit_by_name("pull-sample", &[]) .new_sample(move |appsink| {
.unwrap() let _sample = appsink
.unwrap() .emit_by_name("pull-sample", &[])
.get::<gst::Sample>() .unwrap()
.unwrap(); .unwrap()
.get::<gst::Sample>()
.unwrap();
sender_clone.send(()).unwrap(); sender_clone.send(()).unwrap();
Ok(gst::FlowSuccess::Ok) Ok(gst::FlowSuccess::Ok)
}); })
.build(),
);
} }
let pipeline_clone = pipeline.clone(); let pipeline_clone = pipeline.clone();
@ -244,17 +248,21 @@ fn multiple_contexts_proxy() {
let appsink = sink.dynamic_cast::<gst_app::AppSink>().unwrap(); let appsink = sink.dynamic_cast::<gst_app::AppSink>().unwrap();
let sender_clone = sender.clone(); let sender_clone = sender.clone();
appsink.connect_new_sample(move |appsink| { appsink.set_callbacks(
let _sample = appsink gst_app::AppSinkCallbacks::builder()
.emit_by_name("pull-sample", &[]) .new_sample(move |appsink| {
.unwrap() let _sample = appsink
.unwrap() .emit_by_name("pull-sample", &[])
.get::<gst::Sample>() .unwrap()
.unwrap(); .unwrap()
.get::<gst::Sample>()
.unwrap();
sender_clone.send(()).unwrap(); sender_clone.send(()).unwrap();
Ok(gst::FlowSuccess::Ok) Ok(gst::FlowSuccess::Ok)
}); })
.build(),
);
} }
let pipeline_clone = pipeline.clone(); let pipeline_clone = pipeline.clone();
@ -361,21 +369,24 @@ fn eos() {
let (sample_notifier, sample_notif_rcv) = mpsc::channel(); let (sample_notifier, sample_notif_rcv) = mpsc::channel();
let (eos_notifier, eos_notif_rcv) = mpsc::channel(); let (eos_notifier, eos_notif_rcv) = mpsc::channel();
let appsink = appsink.dynamic_cast::<gst_app::AppSink>().unwrap(); let appsink = appsink.dynamic_cast::<gst_app::AppSink>().unwrap();
appsink.connect_new_sample(move |appsink| { appsink.set_callbacks(
gst_debug!(CAT, obj: appsink, "eos: pulling sample"); gst_app::AppSinkCallbacks::builder()
let _ = appsink .new_sample(move |appsink| {
.emit_by_name("pull-sample", &[]) gst_debug!(CAT, obj: appsink, "eos: pulling sample");
.unwrap() let _ = appsink
.unwrap() .emit_by_name("pull-sample", &[])
.get::<gst::Sample>() .unwrap()
.unwrap(); .unwrap()
.get::<gst::Sample>()
.unwrap();
sample_notifier.send(()).unwrap(); sample_notifier.send(()).unwrap();
Ok(gst::FlowSuccess::Ok) Ok(gst::FlowSuccess::Ok)
}); })
.eos(move |_appsink| eos_notifier.send(()).unwrap())
appsink.connect_eos(move |_appsink| eos_notifier.send(()).unwrap()); .build(),
);
fn push_buffer(src: &gst::Element) -> bool { fn push_buffer(src: &gst::Element) -> bool {
gst_debug!(CAT, obj: src, "eos: pushing buffer"); gst_debug!(CAT, obj: src, "eos: pushing buffer");
@ -504,19 +515,23 @@ fn premature_shutdown() {
let (appsink_sender, appsink_receiver) = mpsc::channel(); let (appsink_sender, appsink_receiver) = mpsc::channel();
let appsink = appsink.dynamic_cast::<gst_app::AppSink>().unwrap(); let appsink = appsink.dynamic_cast::<gst_app::AppSink>().unwrap();
appsink.connect_new_sample(move |appsink| { appsink.set_callbacks(
gst_debug!(CAT, obj: appsink, "premature_shutdown: pulling sample"); gst_app::AppSinkCallbacks::builder()
let _sample = appsink .new_sample(move |appsink| {
.emit_by_name("pull-sample", &[]) gst_debug!(CAT, obj: appsink, "premature_shutdown: pulling sample");
.unwrap() let _sample = appsink
.unwrap() .emit_by_name("pull-sample", &[])
.get::<gst::Sample>() .unwrap()
.unwrap(); .unwrap()
.get::<gst::Sample>()
.unwrap();
appsink_sender.send(()).unwrap(); appsink_sender.send(()).unwrap();
Ok(gst::FlowSuccess::Ok) Ok(gst::FlowSuccess::Ok)
}); })
.build(),
);
fn push_buffer(src: &gst::Element, intent: &str) -> bool { fn push_buffer(src: &gst::Element, intent: &str) -> bool {
gst_debug!( gst_debug!(

View file

@ -56,18 +56,22 @@ fn test_push() {
let appsink = appsink.dynamic_cast::<gst_app::AppSink>().unwrap(); let appsink = appsink.dynamic_cast::<gst_app::AppSink>().unwrap();
let samples_clone = samples.clone(); let samples_clone = samples.clone();
appsink.connect_new_sample(move |appsink| { appsink.set_callbacks(
let sample = appsink gst_app::AppSinkCallbacks::builder()
.emit_by_name("pull-sample", &[]) .new_sample(move |appsink| {
.unwrap() let sample = appsink
.unwrap() .emit_by_name("pull-sample", &[])
.get::<gst::Sample>() .unwrap()
.unwrap(); .unwrap()
.get::<gst::Sample>()
.unwrap();
samples_clone.lock().unwrap().push(sample); samples_clone.lock().unwrap().push(sample);
Ok(gst::FlowSuccess::Ok) Ok(gst::FlowSuccess::Ok)
}); })
.build(),
);
pipeline.set_state(gst::State::Playing).unwrap(); pipeline.set_state(gst::State::Playing).unwrap();

View file

@ -51,18 +51,22 @@ fn test_push() {
let appsink = appsink.dynamic_cast::<gst_app::AppSink>().unwrap(); let appsink = appsink.dynamic_cast::<gst_app::AppSink>().unwrap();
let samples_clone = samples.clone(); let samples_clone = samples.clone();
appsink.connect_new_sample(move |appsink| { appsink.set_callbacks(
let sample = appsink gst_app::AppSinkCallbacks::builder()
.emit_by_name("pull-sample", &[]) .new_sample(move |appsink| {
.unwrap() let sample = appsink
.unwrap() .emit_by_name("pull-sample", &[])
.get::<gst::Sample>() .unwrap()
.unwrap(); .unwrap()
.get::<gst::Sample>()
.unwrap();
samples_clone.lock().unwrap().push(sample); samples_clone.lock().unwrap().push(sample);
Ok(gst::FlowSuccess::Ok) Ok(gst::FlowSuccess::Ok)
}); })
.build(),
);
pipeline.set_state(gst::State::Playing).unwrap(); pipeline.set_state(gst::State::Playing).unwrap();

View file

@ -73,18 +73,22 @@ fn test_push() {
let appsink = appsink.dynamic_cast::<gst_app::AppSink>().unwrap(); let appsink = appsink.dynamic_cast::<gst_app::AppSink>().unwrap();
let samples_clone = samples.clone(); let samples_clone = samples.clone();
appsink.connect_new_sample(move |appsink| { appsink.set_callbacks(
let sample = appsink gst_app::AppSinkCallbacks::builder()
.emit_by_name("pull-sample", &[]) .new_sample(move |appsink| {
.unwrap() let sample = appsink
.unwrap() .emit_by_name("pull-sample", &[])
.get::<gst::Sample>() .unwrap()
.unwrap(); .unwrap()
.get::<gst::Sample>()
.unwrap();
let mut samples = samples_clone.lock().unwrap(); let mut samples = samples_clone.lock().unwrap();
samples.push(sample); samples.push(sample);
Ok(gst::FlowSuccess::Ok) Ok(gst::FlowSuccess::Ok)
}); })
.build(),
);
// Wait for the server to listen // Wait for the server to listen
listening_rx.recv().unwrap(); listening_rx.recv().unwrap();