diff --git a/tutorials/Cargo.toml b/tutorials/Cargo.toml index 7e61f7325..4e6578ae3 100644 --- a/tutorials/Cargo.toml +++ b/tutorials/Cargo.toml @@ -16,6 +16,8 @@ gst-pbutils = { package = "gstreamer-pbutils", path = "../gstreamer-pbutils" } byte-slice-cast = "1" anyhow = "1" termion = { version = "2", optional = true } +async-channel = "2.0.0" +futures = "0.3" [target.'cfg(target_os = "macos")'.dependencies] cocoa = "0.25" diff --git a/tutorials/src/bin/basic-tutorial-13.rs b/tutorials/src/bin/basic-tutorial-13.rs index aa9fa55c0..34cb8c8bd 100644 --- a/tutorials/src/bin/basic-tutorial-13.rs +++ b/tutorials/src/bin/basic-tutorial-13.rs @@ -65,7 +65,7 @@ fn send_seek_event(pipeline: &Element, rate: f64) -> bool { } // This is where we get the user input from the terminal. -fn handle_keyboard(ready_tx: glib::Sender) { +fn handle_keyboard(ready_tx: async_channel::Sender) { // We set the terminal in "raw mode" so that we can get the keys without waiting for the user // to press return. let _stdout = io::stdout().into_raw_mode().unwrap(); @@ -84,7 +84,7 @@ fn handle_keyboard(ready_tx: glib::Sender) { _ => continue, }; ready_tx - .send(command) + .send_blocking(command) .expect("failed to send data through channel"); if command == Command::Quit { break; @@ -116,7 +116,7 @@ USAGE: Choose one of the following options, then press enter: let _guard = main_context.acquire().unwrap(); // Build the channel to get the terminal inputs from a different thread. - let (ready_tx, ready_rx) = glib::MainContext::channel(glib::Priority::DEFAULT); + let (ready_tx, ready_rx) = async_channel::bounded(5); thread::spawn(move || handle_keyboard(ready_tx)); @@ -135,51 +135,52 @@ USAGE: Choose one of the following options, then press enter: let mut playing = true; let mut rate = 1.; - ready_rx.attach(Some(&main_loop.context()), move |command: Command| { - let pipeline = match pipeline_weak.upgrade() { - Some(pipeline) => pipeline, - None => return glib::ControlFlow::Continue, - }; - match command { - Command::PlayPause => { - let status = if playing { - let _ = pipeline.set_state(State::Paused); - "PAUSE" - } else { - let _ = pipeline.set_state(State::Playing); - "PLAYING" - }; - playing = !playing; - println!("Setting state to {status}\r"); - } - Command::DataRateUp => { - if send_seek_event(&pipeline, rate * 2.) { - rate *= 2.; + main_context.spawn_local(async move { + while let Ok(command) = ready_rx.recv().await { + let Some(pipeline) = pipeline_weak.upgrade() else { + break; + }; + + match command { + Command::PlayPause => { + let status = if playing { + let _ = pipeline.set_state(State::Paused); + "PAUSE" + } else { + let _ = pipeline.set_state(State::Playing); + "PLAYING" + }; + playing = !playing; + println!("Setting state to {status}\r"); } - } - Command::DataRateDown => { - if send_seek_event(&pipeline, rate / 2.) { - rate /= 2.; + Command::DataRateUp => { + if send_seek_event(&pipeline, rate * 2.) { + rate *= 2.; + } } - } - Command::ReverseRate => { - if send_seek_event(&pipeline, rate * -1.) { - rate *= -1.; + Command::DataRateDown => { + if send_seek_event(&pipeline, rate / 2.) { + rate /= 2.; + } } - } - Command::NextFrame => { - if let Some(video_sink) = pipeline.property::>("video-sink") { - // Send the event - let step = Step::new(gst::format::Buffers::ONE, rate.abs(), true, false); - video_sink.send_event(step); - println!("Stepping one frame\r"); + Command::ReverseRate => { + if send_seek_event(&pipeline, rate * -1.) { + rate *= -1.; + } + } + Command::NextFrame => { + if let Some(video_sink) = pipeline.property::>("video-sink") { + // Send the event + let step = Step::new(gst::format::Buffers::ONE, rate.abs(), true, false); + video_sink.send_event(step); + println!("Stepping one frame\r"); + } + } + Command::Quit => { + main_loop_clone.quit(); } - } - Command::Quit => { - main_loop_clone.quit(); } } - glib::ControlFlow::Continue }); main_loop.run(); diff --git a/tutorials/src/bin/playback-tutorial-5.rs b/tutorials/src/bin/playback-tutorial-5.rs index f329a1acf..7f7454cf6 100644 --- a/tutorials/src/bin/playback-tutorial-5.rs +++ b/tutorials/src/bin/playback-tutorial-5.rs @@ -15,7 +15,7 @@ enum Command { Quit, } -fn handle_keyboard(ready_tx: glib::Sender) { +fn handle_keyboard(ready_tx: async_channel::Sender) { let mut stdin = termion::async_stdin().keys(); loop { @@ -36,7 +36,7 @@ fn handle_keyboard(ready_tx: glib::Sender) { _ => continue, }; ready_tx - .send(command.clone()) + .send_blocking(command.clone()) .expect("Failed to send command to the main thread."); if command == Command::Quit { break; @@ -104,7 +104,7 @@ fn tutorial_main() -> Result<(), Error> { let _guard = main_context.acquire().unwrap(); // Build the channel to get the terminal inputs from a different thread. - let (ready_tx, ready_rx) = glib::MainContext::channel(glib::Priority::DEFAULT); + let (ready_tx, ready_rx) = async_channel::bounded(5); // Start the keyboard handling thread thread::spawn(move || handle_keyboard(ready_tx)); @@ -121,61 +121,65 @@ fn tutorial_main() -> Result<(), Error> { // Start playing pipeline.set_state(gst::State::Playing)?; - ready_rx.attach(Some(&main_loop.context()), move |command: Command| { - let pipeline = match pipeline_weak.upgrade() { - Some(pipeline) => pipeline, - None => return glib::ControlFlow::Continue, - }; + main_context.spawn_local(async move { + while let Ok(command) = ready_rx.recv().await { + let Some(pipeline) = pipeline_weak.upgrade() else { + break; + }; - match command { - Command::UpdateChannel(ref name, increase) => { - let balance = pipeline - .dynamic_cast_ref::() - .unwrap(); - update_color_channel(name, increase, balance); - print_current_values(&pipeline); - } - Command::Quit => { - main_loop_clone.quit(); + match command { + Command::UpdateChannel(ref name, increase) => { + let balance = pipeline + .dynamic_cast_ref::() + .unwrap(); + update_color_channel(name, increase, balance); + print_current_values(&pipeline); + } + Command::Quit => { + main_loop_clone.quit(); + } } } - glib::ControlFlow::Continue }); // Handle bus errors / EOS correctly let main_loop_clone = main_loop.clone(); let bus = pipeline.bus().unwrap(); + let mut bus_stream = bus.stream(); let pipeline_weak = pipeline.downgrade(); - let _bus_watch = bus.add_watch(move |_bus, message| { - use gst::MessageView; + main_context.spawn_local(async move { + use futures::prelude::*; - let pipeline = match pipeline_weak.upgrade() { - Some(pipeline) => pipeline, - None => return glib::ControlFlow::Continue, - }; + while let Some(message) = bus_stream.next().await { + use gst::MessageView; - match message.view() { - MessageView::Error(err) => { - eprintln!( - "Error received from element {:?} {}", - err.src().map(|s| s.path_string()), - err.error() - ); - eprintln!("Debugging information: {:?}", err.debug()); - main_loop_clone.quit(); - glib::ControlFlow::Break + let Some(pipeline) = pipeline_weak.upgrade() else { + break; + }; + + match message.view() { + MessageView::Error(err) => { + eprintln!( + "Error received from element {:?} {}", + err.src().map(|s| s.path_string()), + err.error() + ); + eprintln!("Debugging information: {:?}", err.debug()); + main_loop_clone.quit(); + break; + } + MessageView::Eos(..) => { + println!("Reached end of stream"); + pipeline + .set_state(gst::State::Ready) + .expect("Unable to set the pipeline to the `Ready` state"); + main_loop_clone.quit(); + break; + } + _ => (), } - MessageView::Eos(..) => { - println!("Reached end of stream"); - pipeline - .set_state(gst::State::Ready) - .expect("Unable to set the pipeline to the `Ready` state"); - main_loop_clone.quit(); - glib::ControlFlow::Break - } - _ => glib::ControlFlow::Continue, } - })?; + }); // Print initial values for all channels print_current_values(&pipeline);