diff --git a/gstreamer/Gir.toml b/gstreamer/Gir.toml index c199be555..faab82f5a 100644 --- a/gstreamer/Gir.toml +++ b/gstreamer/Gir.toml @@ -2300,6 +2300,42 @@ manual_traits = ["TagSetterExtManual"] # Takes a raw pointer ignore = true +[[object]] +name = "Gst.Task" +status = "generate" + # Need work + [[object.function]] + name = "new" + ignore = true + [[object.function]] + name = "set_enter_callback" + ignore = true + [[object.function]] + name = "set_leave_callback" + ignore = true + [[object.function]] + name = "set_lock" + ignore = true + [[object.function]] + name = "set_state" + ignore = true + +[[object]] +name = "Gst.TaskPool" +status = "generate" +manual_traits = ["TaskPoolExtManual"] + [[object.function]] + name = "push" + manual = true + + # Moved to TaskHandle + [[object.function]] + name = "join" + ignore = true + [[object.function]] + name = "dispose_handle" + ignore = true + [[object]] name = "Gst.Toc" status = "manual" diff --git a/gstreamer/src/auto/mod.rs b/gstreamer/src/auto/mod.rs index b089431c5..958649059 100644 --- a/gstreamer/src/auto/mod.rs +++ b/gstreamer/src/auto/mod.rs @@ -95,6 +95,12 @@ pub use self::system_clock::SystemClock; mod tag_setter; pub use self::tag_setter::TagSetter; +mod task; +pub use self::task::Task; + +mod task_pool; +pub use self::task_pool::TaskPool; + mod toc_setter; pub use self::toc_setter::TocSetter; @@ -225,6 +231,8 @@ pub mod traits { pub use super::proxy_pad::ProxyPadExt; pub use super::system_clock::SystemClockExt; pub use super::tag_setter::TagSetterExt; + pub use super::task::TaskExt; + pub use super::task_pool::TaskPoolExt; pub use super::toc_setter::TocSetterExt; pub use super::tracer::TracerExt; pub use super::uri_handler::URIHandlerExt; diff --git a/gstreamer/src/auto/task.rs b/gstreamer/src/auto/task.rs new file mode 100644 index 000000000..55da4ef7b --- /dev/null +++ b/gstreamer/src/auto/task.rs @@ -0,0 +1,105 @@ +// This file was generated by gir (https://github.com/gtk-rs/gir) +// from gir-files (https://github.com/gtk-rs/gir-files) +// from gst-gir-files (https://gitlab.freedesktop.org/gstreamer/gir-files-rs.git) +// DO NOT EDIT + +use crate::Object; +use crate::TaskPool; +use crate::TaskState; +use glib::object::IsA; +use glib::translate::*; + +glib::wrapper! { + #[doc(alias = "GstTask")] + pub struct Task(Object) @extends Object; + + match fn { + type_ => || ffi::gst_task_get_type(), + } +} + +impl Task { + pub const NONE: Option<&'static Task> = None; + + #[doc(alias = "gst_task_cleanup_all")] + pub fn cleanup_all() { + assert_initialized_main_thread!(); + unsafe { + ffi::gst_task_cleanup_all(); + } + } +} + +unsafe impl Send for Task {} +unsafe impl Sync for Task {} + +pub trait TaskExt: 'static { + #[doc(alias = "gst_task_get_pool")] + #[doc(alias = "get_pool")] + fn pool(&self) -> TaskPool; + + #[doc(alias = "gst_task_get_state")] + #[doc(alias = "get_state")] + fn state(&self) -> TaskState; + + #[doc(alias = "gst_task_join")] + fn join(&self) -> bool; + + #[doc(alias = "gst_task_pause")] + fn pause(&self) -> bool; + + #[cfg(any(feature = "v1_18", feature = "dox"))] + #[cfg_attr(feature = "dox", doc(cfg(feature = "v1_18")))] + #[doc(alias = "gst_task_resume")] + fn resume(&self) -> bool; + + #[doc(alias = "gst_task_set_pool")] + fn set_pool(&self, pool: &impl IsA); + + #[doc(alias = "gst_task_start")] + fn start(&self) -> bool; + + #[doc(alias = "gst_task_stop")] + fn stop(&self) -> bool; +} + +impl> TaskExt for O { + fn pool(&self) -> TaskPool { + unsafe { from_glib_full(ffi::gst_task_get_pool(self.as_ref().to_glib_none().0)) } + } + + fn state(&self) -> TaskState { + unsafe { from_glib(ffi::gst_task_get_state(self.as_ref().to_glib_none().0)) } + } + + fn join(&self) -> bool { + unsafe { from_glib(ffi::gst_task_join(self.as_ref().to_glib_none().0)) } + } + + fn pause(&self) -> bool { + unsafe { from_glib(ffi::gst_task_pause(self.as_ref().to_glib_none().0)) } + } + + #[cfg(any(feature = "v1_18", feature = "dox"))] + #[cfg_attr(feature = "dox", doc(cfg(feature = "v1_18")))] + fn resume(&self) -> bool { + unsafe { from_glib(ffi::gst_task_resume(self.as_ref().to_glib_none().0)) } + } + + fn set_pool(&self, pool: &impl IsA) { + unsafe { + ffi::gst_task_set_pool( + self.as_ref().to_glib_none().0, + pool.as_ref().to_glib_none().0, + ); + } + } + + fn start(&self) -> bool { + unsafe { from_glib(ffi::gst_task_start(self.as_ref().to_glib_none().0)) } + } + + fn stop(&self) -> bool { + unsafe { from_glib(ffi::gst_task_stop(self.as_ref().to_glib_none().0)) } + } +} diff --git a/gstreamer/src/auto/task_pool.rs b/gstreamer/src/auto/task_pool.rs new file mode 100644 index 000000000..52d4d6eb3 --- /dev/null +++ b/gstreamer/src/auto/task_pool.rs @@ -0,0 +1,65 @@ +// This file was generated by gir (https://github.com/gtk-rs/gir) +// from gir-files (https://github.com/gtk-rs/gir-files) +// from gst-gir-files (https://gitlab.freedesktop.org/gstreamer/gir-files-rs.git) +// DO NOT EDIT + +use crate::Object; +use glib::object::IsA; +use glib::translate::*; +use std::ptr; + +glib::wrapper! { + #[doc(alias = "GstTaskPool")] + pub struct TaskPool(Object) @extends Object; + + match fn { + type_ => || ffi::gst_task_pool_get_type(), + } +} + +impl TaskPool { + pub const NONE: Option<&'static TaskPool> = None; + + #[doc(alias = "gst_task_pool_new")] + pub fn new() -> TaskPool { + assert_initialized_main_thread!(); + unsafe { from_glib_full(ffi::gst_task_pool_new()) } + } +} + +impl Default for TaskPool { + fn default() -> Self { + Self::new() + } +} + +unsafe impl Send for TaskPool {} +unsafe impl Sync for TaskPool {} + +pub trait TaskPoolExt: 'static { + #[doc(alias = "gst_task_pool_cleanup")] + fn cleanup(&self); + + #[doc(alias = "gst_task_pool_prepare")] + fn prepare(&self) -> Result<(), glib::Error>; +} + +impl> TaskPoolExt for O { + fn cleanup(&self) { + unsafe { + ffi::gst_task_pool_cleanup(self.as_ref().to_glib_none().0); + } + } + + fn prepare(&self) -> Result<(), glib::Error> { + unsafe { + let mut error = ptr::null_mut(); + let _ = ffi::gst_task_pool_prepare(self.as_ref().to_glib_none().0, &mut error); + if error.is_null() { + Ok(()) + } else { + Err(from_glib_full(error)) + } + } + } +} diff --git a/gstreamer/src/lib.rs b/gstreamer/src/lib.rs index 5af345353..3b800d0f8 100644 --- a/gstreamer/src/lib.rs +++ b/gstreamer/src/lib.rs @@ -182,11 +182,13 @@ mod control_source; mod parse_context; mod proxy_pad; mod tag_setter; +mod task_pool; pub use crate::element::{ElementMessageType, NotifyWatchId}; pub use crate::element::{ ELEMENT_METADATA_AUTHOR, ELEMENT_METADATA_DESCRIPTION, ELEMENT_METADATA_DOC_URI, ELEMENT_METADATA_ICON_NAME, ELEMENT_METADATA_KLASS, ELEMENT_METADATA_LONGNAME, }; +pub use crate::task_pool::{TaskHandle, TaskPoolTaskHandle}; pub use self::iterator::{Iterator, IteratorError, IteratorImpl, StdIterator}; pub use crate::clock_time::ClockTime; @@ -321,6 +323,7 @@ pub mod prelude { pub use crate::plugin_feature::PluginFeatureExtManual; pub use crate::proxy_pad::ProxyPadExtManual; pub use crate::tag_setter::TagSetterExtManual; + pub use crate::task_pool::{TaskHandle, TaskPoolExtManual}; pub use crate::typefind::TypeFindImpl; pub use crate::value::GstValueExt; diff --git a/gstreamer/src/subclass/mod.rs b/gstreamer/src/subclass/mod.rs index f491c9af9..789f033d3 100644 --- a/gstreamer/src/subclass/mod.rs +++ b/gstreamer/src/subclass/mod.rs @@ -24,6 +24,7 @@ mod object; mod pad; mod pipeline; mod proxy_pad; +mod task_pool; mod tracer; mod device; @@ -61,6 +62,7 @@ pub mod prelude { pub use super::proxy_pad::ProxyPadImpl; pub use super::system_clock::SystemClockImpl; pub use super::tag_setter::TagSetterImpl; + pub use super::task_pool::TaskPoolImpl; pub use super::tracer::{TracerHook, TracerImpl, TracerImplExt}; pub use super::uri_handler::{URIHandlerImpl, URIHandlerImplExt}; } diff --git a/gstreamer/src/subclass/task_pool.rs b/gstreamer/src/subclass/task_pool.rs new file mode 100644 index 000000000..9e3f402c7 --- /dev/null +++ b/gstreamer/src/subclass/task_pool.rs @@ -0,0 +1,347 @@ +// Take a look at the license at the top of the repository in the LICENSE file. + +use super::prelude::*; +use crate::{TaskHandle, TaskPool}; + +use std::hash::{Hash, Hasher}; +use std::ptr; +use std::sync::{Arc, Mutex}; + +use glib::ffi::gpointer; +use glib::prelude::*; +use glib::subclass::prelude::*; +use glib::translate::*; + +pub trait TaskPoolImpl: GstObjectImpl + Send + Sync { + // rustdoc-stripper-ignore-next + /// Handle to be returned from the `push` function to allow the caller to wait for the task's + /// completion. + /// + /// If unneeded, you can specify `()` or [`Infallible`](std::convert::Infallible) for a handle + /// that does nothing on `join` or drop. + type Handle: TaskHandle; + + // rustdoc-stripper-ignore-next + /// Prepare the task pool to accept tasks. + /// + /// This defaults to doing nothing. + fn prepare(&self, _task_pool: &Self::Type) -> Result<(), glib::Error> { + Ok(()) + } + + // rustdoc-stripper-ignore-next + /// Clean up, rejecting further tasks and waiting for all accepted tasks to be stopped. + /// + /// This is mainly used internally to ensure proper cleanup of internal data structures in test + /// suites. + fn cleanup(&self, _task_pool: &Self::Type) {} + + // rustdoc-stripper-ignore-next + /// Deliver a task to the pool. + /// + /// If returning `Ok`, you need to call the `func` eventually. + /// + /// If returning `Err`, the `func` must be dropped without calling it. + fn push( + &self, + task_pool: &Self::Type, + func: TaskPoolFunction, + ) -> Result, glib::Error>; +} + +unsafe impl IsSubclassable for TaskPool { + fn class_init(klass: &mut glib::Class) { + Self::parent_class_init::(klass); + let klass = klass.as_mut(); + klass.prepare = Some(task_pool_prepare::); + klass.cleanup = Some(task_pool_cleanup::); + klass.push = Some(task_pool_push::); + klass.join = Some(task_pool_join::); + + #[cfg(any(feature = "v1_20", feature = "dox"))] + { + klass.dispose_handle = Some(task_pool_dispose_handle::); + } + } +} + +unsafe extern "C" fn task_pool_prepare( + ptr: *mut ffi::GstTaskPool, + error: *mut *mut glib::ffi::GError, +) { + let instance = &*(ptr as *mut T::Instance); + let imp = instance.imp(); + let wrap: Borrowed = from_glib_borrow(ptr); + + match imp.prepare(wrap.unsafe_cast_ref()) { + Ok(()) => {} + Err(err) => { + if !error.is_null() { + *error = err.into_raw(); + } + } + } +} + +unsafe extern "C" fn task_pool_cleanup(ptr: *mut ffi::GstTaskPool) { + let instance = &*(ptr as *mut T::Instance); + let imp = instance.imp(); + let wrap: Borrowed = from_glib_borrow(ptr); + + imp.cleanup(wrap.unsafe_cast_ref()); +} + +unsafe extern "C" fn task_pool_push( + ptr: *mut ffi::GstTaskPool, + func: ffi::GstTaskPoolFunction, + user_data: gpointer, + error: *mut *mut glib::ffi::GError, +) -> gpointer { + let instance = &*(ptr as *mut T::Instance); + let imp = instance.imp(); + let wrap: Borrowed = from_glib_borrow(ptr); + + let func = TaskPoolFunction::new(func.expect("Tried to push null func"), user_data); + + match imp.push(wrap.unsafe_cast_ref(), func.clone()) { + Ok(None) => ptr::null_mut(), + Ok(Some(handle)) => Box::into_raw(Box::new(handle)) as gpointer, + Err(err) => { + func.prevent_call(); + if !error.is_null() { + *error = err.into_raw(); + } + ptr::null_mut() + } + } +} + +unsafe extern "C" fn task_pool_join(ptr: *mut ffi::GstTaskPool, id: gpointer) { + let wrap: Borrowed = from_glib_borrow(ptr); + + if id.is_null() { + crate::warning!(crate::CAT_RUST, obj: wrap.as_ref(), "Tried to join null handle"); + return; + } + + let handle = Box::from_raw(id as *mut T::Handle); + handle.join(); +} + +#[cfg(any(feature = "v1_20", feature = "dox"))] +#[cfg_attr(feature = "dox", doc(cfg(feature = "v1_20")))] +unsafe extern "C" fn task_pool_dispose_handle( + ptr: *mut ffi::GstTaskPool, + id: gpointer, +) { + let wrap: Borrowed = from_glib_borrow(ptr); + + if id.is_null() { + crate::warning!(crate::CAT_RUST, obj: wrap.as_ref(), "Tried to dispose null handle"); + return; + } + + let handle = Box::from_raw(id as *mut T::Handle); + drop(handle); +} + +// rustdoc-stripper-ignore-next +/// Function the task pool should execute, provided to [`push`](TaskPoolImpl::push). +#[derive(Debug)] +pub struct TaskPoolFunction(Arc>>); + +// `Arc>>` is required so that we can enforce that the function +// has not been called and will never be called after `push` returns `Err`. + +#[derive(Debug)] +struct TaskPoolFunctionInner { + func: unsafe extern "C" fn(gpointer), + user_data: gpointer, + warn_on_drop: bool, +} + +unsafe impl Send for TaskPoolFunctionInner {} + +impl TaskPoolFunction { + fn new(func: unsafe extern "C" fn(gpointer), user_data: gpointer) -> Self { + let inner = TaskPoolFunctionInner { + func, + user_data, + warn_on_drop: true, + }; + Self(Arc::new(Mutex::new(Some(inner)))) + } + + fn clone(&self) -> Self { + Self(self.0.clone()) + } + + // rustdoc-stripper-ignore-next + /// Consume and execute the function. + pub fn call(self) { + let mut inner = self + .0 + .lock() + .unwrap() + .take() + .expect("TaskPoolFunction has already been dropped"); + inner.warn_on_drop = false; + unsafe { (inner.func)(inner.user_data) } + } + + fn prevent_call(self) { + let mut inner = self + .0 + .lock() + .unwrap() + .take() + .expect("TaskPoolFunction has already been called"); + inner.warn_on_drop = false; + drop(inner); + } + + fn as_ptr(&self) -> *const Mutex> { + Arc::as_ptr(&self.0) + } +} + +impl Drop for TaskPoolFunctionInner { + fn drop(&mut self) { + if self.warn_on_drop { + crate::warning!(crate::CAT_RUST, "Leaked task function"); + } + } +} + +impl PartialEq for TaskPoolFunction { + fn eq(&self, other: &Self) -> bool { + self.as_ptr() == other.as_ptr() + } +} + +impl Eq for TaskPoolFunction {} + +impl PartialOrd for TaskPoolFunction { + fn partial_cmp(&self, other: &Self) -> Option { + self.as_ptr().partial_cmp(&other.as_ptr()) + } +} + +impl Ord for TaskPoolFunction { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.as_ptr().cmp(&other.as_ptr()) + } +} + +impl Hash for TaskPoolFunction { + fn hash(&self, state: &mut H) { + self.as_ptr().hash(state) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::prelude::*; + use std::sync::atomic; + use std::sync::mpsc::{channel, TryRecvError}; + use std::thread; + + pub mod imp { + use super::*; + + #[derive(Default)] + pub struct TestPool { + pub(super) prepared: atomic::AtomicBool, + pub(super) cleaned_up: atomic::AtomicBool, + } + + #[glib::object_subclass] + impl ObjectSubclass for TestPool { + const NAME: &'static str = "TestPool"; + type Type = super::TestPool; + type ParentType = TaskPool; + } + + impl ObjectImpl for TestPool {} + + impl GstObjectImpl for TestPool {} + + impl TaskPoolImpl for TestPool { + type Handle = TestHandle; + + fn prepare(&self, _task_pool: &Self::Type) -> Result<(), glib::Error> { + self.prepared.store(true, atomic::Ordering::SeqCst); + Ok(()) + } + + fn cleanup(&self, _task_pool: &Self::Type) { + self.cleaned_up.store(true, atomic::Ordering::SeqCst); + } + + fn push( + &self, + _task_pool: &Self::Type, + func: TaskPoolFunction, + ) -> Result, glib::Error> { + let handle = thread::spawn(move || func.call()); + Ok(Some(TestHandle(handle))) + } + } + + pub struct TestHandle(thread::JoinHandle<()>); + + impl TaskHandle for TestHandle { + fn join(self) { + self.0.join().unwrap(); + } + } + } + + glib::wrapper! { + pub struct TestPool(ObjectSubclass) @extends TaskPool, crate::Object; + } + + unsafe impl Send for TestPool {} + unsafe impl Sync for TestPool {} + + impl TestPool { + pub fn new() -> Self { + Self::default() + } + } + + impl Default for TestPool { + fn default() -> Self { + glib::Object::new(&[]).unwrap() + } + } + + #[test] + fn test_simple_subclass() { + crate::init().unwrap(); + + let pool = TestPool::new(); + pool.prepare().unwrap(); + + let (sender, receiver) = channel(); + + let handle = pool + .push(move || { + sender.send(()).unwrap(); + }) + .unwrap(); + let handle = handle.unwrap(); + + assert_eq!(receiver.recv(), Ok(())); + + handle.join(); + assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected)); + + pool.cleanup(); + + let imp = imp::TestPool::from_instance(&pool); + assert!(imp.prepared.load(atomic::Ordering::SeqCst)); + assert!(imp.cleaned_up.load(atomic::Ordering::SeqCst)); + } +} diff --git a/gstreamer/src/task_pool.rs b/gstreamer/src/task_pool.rs new file mode 100644 index 000000000..094158372 --- /dev/null +++ b/gstreamer/src/task_pool.rs @@ -0,0 +1,158 @@ +// Take a look at the license at the top of the repository in the LICENSE file. + +use crate::TaskPool; + +use std::ptr; + +use glib::ffi::gpointer; +use glib::prelude::*; +use glib::translate::*; + +unsafe extern "C" fn task_pool_trampoline(data: gpointer) { + let func = Box::from_raw(data as *mut P); + func() +} + +pub trait TaskPoolExtManual: 'static { + #[doc(alias = "gst_task_pool_push")] + fn push( + &self, + func: P, + ) -> Result, glib::Error>; +} + +impl> TaskPoolExtManual for O { + fn push( + &self, + func: P, + ) -> Result, glib::Error> { + unsafe { + let mut error = ptr::null_mut(); + let func: Box

= Box::new(func); + let func = Box::into_raw(func); + + let handle = ffi::gst_task_pool_push( + self.as_ref().to_glib_none().0, + Some(task_pool_trampoline::

), + func as gpointer, + &mut error, + ); + + if !error.is_null() { + assert!(handle.is_null()); + + // Assume that task_pool_trampoline was + // not called and will not be called + drop(Box::from_raw(func)); + + return Err(from_glib_full(error)); + } + + let handle = ptr::NonNull::new(handle).map(|handle| TaskPoolTaskHandle { + handle, + task_pool: Some(self.as_ref().clone()), + }); + + Ok(handle) + } + } +} + +impl TaskPool { + unsafe fn join(&self, id: ptr::NonNull) { + ffi::gst_task_pool_join(self.to_glib_none().0, id.as_ptr()) + } + + #[cfg(any(feature = "v1_20", feature = "dox"))] + #[cfg_attr(feature = "dox", doc(cfg(feature = "v1_20")))] + unsafe fn dispose_handle(&self, id: ptr::NonNull) { + ffi::gst_task_pool_dispose_handle(self.to_glib_none().0, id.as_ptr()) + } +} + +// rustdoc-stripper-ignore-next +/// A handle for a task which was pushed to a task pool. +pub trait TaskHandle { + // rustdoc-stripper-ignore-next + /// Wait for the task to complete. + fn join(self); +} + +impl TaskHandle for () { + fn join(self) {} +} + +impl TaskHandle for std::convert::Infallible { + fn join(self) {} +} + +// rustdoc-stripper-ignore-next +/// An opaque handle for a task associated with a particular task pool. +/// +/// Keeps a reference to the pool alive. +/// +/// If the `v1_20` feature is enabled, requests the task pool to dispose of the handle when it is +/// dropped. Otherwise, needs to be `join`ed to avoid a leak. +#[cfg_attr(not(any(feature = "v1_20", feature = "dox")), must_use)] +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct TaskPoolTaskHandle { + handle: ptr::NonNull, + task_pool: Option, +} + +impl TaskHandle for TaskPoolTaskHandle { + #[doc(alias = "gst_task_pool_join")] + fn join(mut self) { + let task_pool = self.task_pool.take().unwrap(); + unsafe { task_pool.join(self.handle) } + } +} + +impl Drop for TaskPoolTaskHandle { + #[doc(alias = "gst_task_pool_dispose_handle")] + fn drop(&mut self) { + if let Some(task_pool) = self.task_pool.take() { + cfg_if::cfg_if! { + if #[cfg(any(feature = "v1_20", feature = "dox"))] { + unsafe { task_pool.dispose_handle(self.handle) } + } else { + crate::warning!(crate::CAT_RUST, obj: &task_pool, "Leaked task handle"); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::prelude::*; + use std::sync::mpsc::{channel, RecvError}; + + #[test] + fn test_simple() { + crate::init().unwrap(); + let pool = TaskPool::new(); + pool.prepare().unwrap(); + + let (sender, receiver) = channel(); + + let handle = pool + .push(move || { + sender.send(()).unwrap(); + }) + .unwrap(); + + assert_eq!(receiver.recv(), Ok(())); + + if let Some(handle) = handle { + handle.join(); + } + + // Can't test try_recv here as the default task pool produces no + // handles and thus no way to wait for channel destruction + assert_eq!(receiver.recv(), Err(RecvError)); + + pool.cleanup(); + } +}