From fc4a0d29c6eae40bc0cc37d250270ca92d7e1927 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 30 Oct 2023 10:56:39 +0200 Subject: [PATCH] tutorials: Use async-channel instead of the glib MainContext channel The latter will be removed in favour of using async code in the future, and async code generally allows for more flexible message handling than the callback based MainContext channel. Part-of: --- tutorials/Cargo.toml | 2 + tutorials/src/bin/basic-tutorial-13.rs | 85 ++++++++++----------- tutorials/src/bin/playback-tutorial-5.rs | 94 ++++++++++++------------ 3 files changed, 94 insertions(+), 87 deletions(-) diff --git a/tutorials/Cargo.toml b/tutorials/Cargo.toml index 7e61f7325..4e6578ae3 100644 --- a/tutorials/Cargo.toml +++ b/tutorials/Cargo.toml @@ -16,6 +16,8 @@ gst-pbutils = { package = "gstreamer-pbutils", path = "../gstreamer-pbutils" } byte-slice-cast = "1" anyhow = "1" termion = { version = "2", optional = true } +async-channel = "2.0.0" +futures = "0.3" [target.'cfg(target_os = "macos")'.dependencies] cocoa = "0.25" diff --git a/tutorials/src/bin/basic-tutorial-13.rs b/tutorials/src/bin/basic-tutorial-13.rs index aa9fa55c0..34cb8c8bd 100644 --- a/tutorials/src/bin/basic-tutorial-13.rs +++ b/tutorials/src/bin/basic-tutorial-13.rs @@ -65,7 +65,7 @@ fn send_seek_event(pipeline: &Element, rate: f64) -> bool { } // This is where we get the user input from the terminal. -fn handle_keyboard(ready_tx: glib::Sender) { +fn handle_keyboard(ready_tx: async_channel::Sender) { // We set the terminal in "raw mode" so that we can get the keys without waiting for the user // to press return. let _stdout = io::stdout().into_raw_mode().unwrap(); @@ -84,7 +84,7 @@ fn handle_keyboard(ready_tx: glib::Sender) { _ => continue, }; ready_tx - .send(command) + .send_blocking(command) .expect("failed to send data through channel"); if command == Command::Quit { break; @@ -116,7 +116,7 @@ USAGE: Choose one of the following options, then press enter: let _guard = main_context.acquire().unwrap(); // Build the channel to get the terminal inputs from a different thread. - let (ready_tx, ready_rx) = glib::MainContext::channel(glib::Priority::DEFAULT); + let (ready_tx, ready_rx) = async_channel::bounded(5); thread::spawn(move || handle_keyboard(ready_tx)); @@ -135,51 +135,52 @@ USAGE: Choose one of the following options, then press enter: let mut playing = true; let mut rate = 1.; - ready_rx.attach(Some(&main_loop.context()), move |command: Command| { - let pipeline = match pipeline_weak.upgrade() { - Some(pipeline) => pipeline, - None => return glib::ControlFlow::Continue, - }; - match command { - Command::PlayPause => { - let status = if playing { - let _ = pipeline.set_state(State::Paused); - "PAUSE" - } else { - let _ = pipeline.set_state(State::Playing); - "PLAYING" - }; - playing = !playing; - println!("Setting state to {status}\r"); - } - Command::DataRateUp => { - if send_seek_event(&pipeline, rate * 2.) { - rate *= 2.; + main_context.spawn_local(async move { + while let Ok(command) = ready_rx.recv().await { + let Some(pipeline) = pipeline_weak.upgrade() else { + break; + }; + + match command { + Command::PlayPause => { + let status = if playing { + let _ = pipeline.set_state(State::Paused); + "PAUSE" + } else { + let _ = pipeline.set_state(State::Playing); + "PLAYING" + }; + playing = !playing; + println!("Setting state to {status}\r"); } - } - Command::DataRateDown => { - if send_seek_event(&pipeline, rate / 2.) { - rate /= 2.; + Command::DataRateUp => { + if send_seek_event(&pipeline, rate * 2.) { + rate *= 2.; + } } - } - Command::ReverseRate => { - if send_seek_event(&pipeline, rate * -1.) { - rate *= -1.; + Command::DataRateDown => { + if send_seek_event(&pipeline, rate / 2.) { + rate /= 2.; + } } - } - Command::NextFrame => { - if let Some(video_sink) = pipeline.property::>("video-sink") { - // Send the event - let step = Step::new(gst::format::Buffers::ONE, rate.abs(), true, false); - video_sink.send_event(step); - println!("Stepping one frame\r"); + Command::ReverseRate => { + if send_seek_event(&pipeline, rate * -1.) { + rate *= -1.; + } + } + Command::NextFrame => { + if let Some(video_sink) = pipeline.property::>("video-sink") { + // Send the event + let step = Step::new(gst::format::Buffers::ONE, rate.abs(), true, false); + video_sink.send_event(step); + println!("Stepping one frame\r"); + } + } + Command::Quit => { + main_loop_clone.quit(); } - } - Command::Quit => { - main_loop_clone.quit(); } } - glib::ControlFlow::Continue }); main_loop.run(); diff --git a/tutorials/src/bin/playback-tutorial-5.rs b/tutorials/src/bin/playback-tutorial-5.rs index f329a1acf..7f7454cf6 100644 --- a/tutorials/src/bin/playback-tutorial-5.rs +++ b/tutorials/src/bin/playback-tutorial-5.rs @@ -15,7 +15,7 @@ enum Command { Quit, } -fn handle_keyboard(ready_tx: glib::Sender) { +fn handle_keyboard(ready_tx: async_channel::Sender) { let mut stdin = termion::async_stdin().keys(); loop { @@ -36,7 +36,7 @@ fn handle_keyboard(ready_tx: glib::Sender) { _ => continue, }; ready_tx - .send(command.clone()) + .send_blocking(command.clone()) .expect("Failed to send command to the main thread."); if command == Command::Quit { break; @@ -104,7 +104,7 @@ fn tutorial_main() -> Result<(), Error> { let _guard = main_context.acquire().unwrap(); // Build the channel to get the terminal inputs from a different thread. - let (ready_tx, ready_rx) = glib::MainContext::channel(glib::Priority::DEFAULT); + let (ready_tx, ready_rx) = async_channel::bounded(5); // Start the keyboard handling thread thread::spawn(move || handle_keyboard(ready_tx)); @@ -121,61 +121,65 @@ fn tutorial_main() -> Result<(), Error> { // Start playing pipeline.set_state(gst::State::Playing)?; - ready_rx.attach(Some(&main_loop.context()), move |command: Command| { - let pipeline = match pipeline_weak.upgrade() { - Some(pipeline) => pipeline, - None => return glib::ControlFlow::Continue, - }; + main_context.spawn_local(async move { + while let Ok(command) = ready_rx.recv().await { + let Some(pipeline) = pipeline_weak.upgrade() else { + break; + }; - match command { - Command::UpdateChannel(ref name, increase) => { - let balance = pipeline - .dynamic_cast_ref::() - .unwrap(); - update_color_channel(name, increase, balance); - print_current_values(&pipeline); - } - Command::Quit => { - main_loop_clone.quit(); + match command { + Command::UpdateChannel(ref name, increase) => { + let balance = pipeline + .dynamic_cast_ref::() + .unwrap(); + update_color_channel(name, increase, balance); + print_current_values(&pipeline); + } + Command::Quit => { + main_loop_clone.quit(); + } } } - glib::ControlFlow::Continue }); // Handle bus errors / EOS correctly let main_loop_clone = main_loop.clone(); let bus = pipeline.bus().unwrap(); + let mut bus_stream = bus.stream(); let pipeline_weak = pipeline.downgrade(); - let _bus_watch = bus.add_watch(move |_bus, message| { - use gst::MessageView; + main_context.spawn_local(async move { + use futures::prelude::*; - let pipeline = match pipeline_weak.upgrade() { - Some(pipeline) => pipeline, - None => return glib::ControlFlow::Continue, - }; + while let Some(message) = bus_stream.next().await { + use gst::MessageView; - match message.view() { - MessageView::Error(err) => { - eprintln!( - "Error received from element {:?} {}", - err.src().map(|s| s.path_string()), - err.error() - ); - eprintln!("Debugging information: {:?}", err.debug()); - main_loop_clone.quit(); - glib::ControlFlow::Break + let Some(pipeline) = pipeline_weak.upgrade() else { + break; + }; + + match message.view() { + MessageView::Error(err) => { + eprintln!( + "Error received from element {:?} {}", + err.src().map(|s| s.path_string()), + err.error() + ); + eprintln!("Debugging information: {:?}", err.debug()); + main_loop_clone.quit(); + break; + } + MessageView::Eos(..) => { + println!("Reached end of stream"); + pipeline + .set_state(gst::State::Ready) + .expect("Unable to set the pipeline to the `Ready` state"); + main_loop_clone.quit(); + break; + } + _ => (), } - MessageView::Eos(..) => { - println!("Reached end of stream"); - pipeline - .set_state(gst::State::Ready) - .expect("Unable to set the pipeline to the `Ready` state"); - main_loop_clone.quit(); - glib::ControlFlow::Break - } - _ => glib::ControlFlow::Continue, } - })?; + }); // Print initial values for all channels print_current_values(&pipeline);