diff --git a/generic/threadshare/src/runtime/executor/async_wrapper.rs b/generic/threadshare/src/runtime/executor/async_wrapper.rs index 0e23691a..47205028 100644 --- a/generic/threadshare/src/runtime/executor/async_wrapper.rs +++ b/generic/threadshare/src/runtime/executor/async_wrapper.rs @@ -94,10 +94,10 @@ pub struct Async { pub(super) source: Arc, /// The inner I/O handle. - io: Option, + pub(super) io: Option, // The [`Handle`] on the [`Scheduler`] on which this Async wrapper is registered. - sched: scheduler::HandleWeak, + pub(super) sched: scheduler::HandleWeak, } impl Unpin for Async {} diff --git a/generic/threadshare/src/runtime/executor/reactor.rs b/generic/threadshare/src/runtime/executor/reactor.rs index 4e297385..e1f2221c 100644 --- a/generic/threadshare/src/runtime/executor/reactor.rs +++ b/generic/threadshare/src/runtime/executor/reactor.rs @@ -1,3 +1,4 @@ +// SPDX-License-Identifier: MIT OR Apache-2.0 // This is based on https://github.com/smol-rs/async-io // with adaptations by: // diff --git a/generic/threadshare/src/runtime/executor/reactor/kqueue.rs b/generic/threadshare/src/runtime/executor/reactor/kqueue.rs index 9f31af98..2ca28bcc 100644 --- a/generic/threadshare/src/runtime/executor/reactor/kqueue.rs +++ b/generic/threadshare/src/runtime/executor/reactor/kqueue.rs @@ -1,20 +1,15 @@ // SPDX-License-Identifier: MIT OR Apache-2.0 +// This is based on https://github.com/smol-rs/async-io -use crate::os::kqueue::Signal; - -use polling::os::kqueue::{PollerKqueueExt, Process, ProcessOps, Signal as PollSignal}; -use polling::{Event, PollMode, Poller}; +use polling::{Event, Poller}; use std::fmt; use std::io::Result; use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd}; -use std::process::Child; /// The raw registration into the reactor. -/// -/// This needs to be public, since it is technically exposed through the `QueueableSealed` trait. #[doc(hidden)] -pub enum Registration { +pub struct Registration { /// Raw file descriptor for readability/writability. /// /// @@ -22,22 +17,12 @@ pub enum Registration { /// /// This describes a valid file descriptor that has not been `close`d. It will not be /// closed while this object is alive. - Fd(RawFd), - - /// Raw signal number for signal delivery. - Signal(Signal), - - /// Process for process termination. - Process(Child), + raw: RawFd, } impl fmt::Debug for Registration { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Fd(raw) => fmt::Debug::fmt(raw, f), - Self::Signal(signal) => fmt::Debug::fmt(signal, f), - Self::Process(process) => fmt::Debug::fmt(process, f), - } + fmt::Debug::fmt(&self.raw, f) } } @@ -48,61 +33,31 @@ impl Registration { /// /// The provided file descriptor must be valid and not be closed while this object is alive. pub(crate) unsafe fn new(f: impl AsFd) -> Self { - Self::Fd(f.as_fd().as_raw_fd()) + Self { + raw: f.as_fd().as_raw_fd(), + } } /// Registers the object into the reactor. #[inline] pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> { - match self { - Self::Fd(raw) => { - // SAFETY: This object's existence validates the invariants of Poller::add - unsafe { poller.add(*raw, Event::none(token)) } - } - Self::Signal(signal) => { - poller.add_filter(PollSignal(signal.0), token, PollMode::Oneshot) - } - Self::Process(process) => poller.add_filter( - unsafe { Process::new(process, ProcessOps::Exit) }, - token, - PollMode::Oneshot, - ), - } + // SAFETY: This object's existence validates the invariants of Poller::add + unsafe { poller.add(self.raw, Event::none(token)) } } /// Re-registers the object into the reactor. #[inline] pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> { - match self { - Self::Fd(raw) => { - // SAFETY: self.raw is a valid file descriptor - let fd = unsafe { BorrowedFd::borrow_raw(*raw) }; - poller.modify(fd, interest) - } - Self::Signal(signal) => { - poller.modify_filter(PollSignal(signal.0), interest.key, PollMode::Oneshot) - } - Self::Process(process) => poller.modify_filter( - unsafe { Process::new(process, ProcessOps::Exit) }, - interest.key, - PollMode::Oneshot, - ), - } + // SAFETY: self.raw is a valid file descriptor + let fd = unsafe { BorrowedFd::borrow_raw(self.raw) }; + poller.modify(fd, interest) } /// Deregisters the object from the reactor. #[inline] pub(crate) fn delete(&self, poller: &Poller) -> Result<()> { - match self { - Self::Fd(raw) => { - // SAFETY: self.raw is a valid file descriptor - let fd = unsafe { BorrowedFd::borrow_raw(*raw) }; - poller.delete(fd) - } - Self::Signal(signal) => poller.delete_filter(PollSignal(signal.0)), - Self::Process(process) => { - poller.delete_filter(unsafe { Process::new(process, ProcessOps::Exit) }) - } - } + // SAFETY: self.raw is a valid file descriptor + let fd = unsafe { BorrowedFd::borrow_raw(self.raw) }; + poller.delete(fd) } } diff --git a/generic/threadshare/src/runtime/executor/reactor/unix.rs b/generic/threadshare/src/runtime/executor/reactor/unix.rs index b2f9b1b2..75ac60ac 100644 --- a/generic/threadshare/src/runtime/executor/reactor/unix.rs +++ b/generic/threadshare/src/runtime/executor/reactor/unix.rs @@ -1,4 +1,5 @@ // SPDX-License-Identifier: MIT OR Apache-2.0 +// This is based on https://github.com/smol-rs/async-io use polling::{Event, Poller}; diff --git a/generic/threadshare/src/runtime/executor/reactor/windows.rs b/generic/threadshare/src/runtime/executor/reactor/windows.rs index 1c92f00a..e2905604 100644 --- a/generic/threadshare/src/runtime/executor/reactor/windows.rs +++ b/generic/threadshare/src/runtime/executor/reactor/windows.rs @@ -1,4 +1,5 @@ // SPDX-License-Identifier: MIT OR Apache-2.0 +// This is based on https://github.com/smol-rs/async-io use polling::{Event, Poller}; use std::fmt; diff --git a/generic/threadshare/tests/tcpclientsrc.rs b/generic/threadshare/tests/tcpclientsrc.rs index a450452e..c6274b79 100644 --- a/generic/threadshare/tests/tcpclientsrc.rs +++ b/generic/threadshare/tests/tcpclientsrc.rs @@ -43,7 +43,7 @@ fn test_push() { let handler = thread::spawn(move || { use std::net; - let listener = net::TcpListener::bind("0.0.0.0:5000").unwrap(); + let listener = net::TcpListener::bind("0.0.0.0:5010").unwrap(); listening_tx.send(()).unwrap(); let stream = listener.incoming().next().unwrap(); let buffer = [0; 160]; @@ -59,7 +59,7 @@ fn test_push() { let caps = gst::Caps::builder("foo/bar").build(); let tcpclientsrc = gst::ElementFactory::make("ts-tcpclientsrc") .property("caps", &caps) - .property("port", 5000i32) + .property("port", 5010i32) .build() .unwrap(); let appsink = gst_app::AppSink::builder()