threadshare: Initial version of a UDP sink

This commit is contained in:
Mathieu Duponchelle 2019-12-23 16:23:38 +01:00 committed by Sebastian Dröge
parent 4c27d560a9
commit 382b9f118c
6 changed files with 1696 additions and 169 deletions

View file

@ -43,6 +43,7 @@ use std::sync::atomic::Ordering::Relaxed;
use std::sync::Arc;
use std::time::Duration;
use crate::get_current_running_time;
use crate::runtime::prelude::*;
use crate::runtime::{
self, Context, JoinHandle, PadContext, PadSink, PadSinkRef, PadSrc, PadSrcRef, PadSrcWeak,
@ -433,18 +434,6 @@ lazy_static! {
}
impl JitterBuffer {
fn get_current_running_time(&self, element: &gst::Element) -> gst::ClockTime {
if let Some(clock) = element.get_clock() {
if clock.get_time() > element.get_base_time() {
clock.get_time() - element.get_base_time()
} else {
gst::ClockTime(Some(0))
}
} else {
gst::CLOCK_TIME_NONE
}
}
fn parse_caps(
&self,
state: &mut MutexGuard<State>,
@ -645,7 +634,7 @@ impl JitterBuffer {
}
if dts == gst::CLOCK_TIME_NONE {
dts = self.get_current_running_time(element);
dts = get_current_running_time(element);
pts = dts;
estimated_dts = state.clock_rate != -1;
@ -947,7 +936,7 @@ impl JitterBuffer {
)
};
let now = self.get_current_running_time(element);
let now = get_current_running_time(element);
gst_debug!(
CAT,
@ -1018,7 +1007,7 @@ impl JitterBuffer {
None => return,
};
let now = jb.get_current_running_time(&element);
let now = get_current_running_time(&element);
gst_debug!(
CAT,

View file

@ -31,6 +31,7 @@ pub mod runtime;
pub mod socket;
mod tcpclientsrc;
mod udpsink;
mod udpsrc;
mod appsrc;
@ -44,10 +45,12 @@ use glib_sys as glib_ffi;
use gst;
use gst::gst_plugin_define;
use gst::prelude::*;
use gstreamer_sys as gst_ffi;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
udpsrc::register(plugin)?;
udpsink::register(plugin)?;
tcpclientsrc::register(plugin)?;
queue::register(plugin)?;
proxy::register(plugin)?;
@ -98,3 +101,15 @@ impl<'a> Drop for MutexGuard<'a> {
}
}
}
pub fn get_current_running_time(element: &gst::Element) -> gst::ClockTime {
if let Some(clock) = element.get_clock() {
if clock.get_time() > element.get_base_time() {
clock.get_time() - element.get_base_time()
} else {
0.into()
}
} else {
gst::CLOCK_TIME_NONE
}
}

View file

@ -23,13 +23,24 @@ use futures::lock::Mutex;
use gst;
use gst::prelude::*;
use gst::{gst_debug, gst_error};
use gst::{gst_debug, gst_error, gst_error_msg};
use lazy_static::lazy_static;
use std::io;
use std::sync::Arc;
use gio;
use gio::prelude::*;
use gio_sys as gio_ffi;
use gobject_sys as gobject_ffi;
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
#[cfg(windows)]
use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
lazy_static! {
static ref SOCKET_CAT: gst::DebugCategory = gst::DebugCategory::new(
"ts-socket",
@ -246,3 +257,169 @@ impl<T: SocketRead + 'static> SocketStream<T> {
}
}
}
// Send/Sync struct for passing around a gio::Socket
// and getting the raw fd from it
//
// gio::Socket is not Send/Sync as it's generally unsafe
// to access it from multiple threads. Getting the underlying raw
// fd is safe though, as is receiving/sending from two different threads
#[derive(Debug)]
pub struct GioSocketWrapper {
socket: *mut gio_ffi::GSocket,
}
unsafe impl Send for GioSocketWrapper {}
unsafe impl Sync for GioSocketWrapper {}
impl GioSocketWrapper {
pub fn new(socket: &gio::Socket) -> Self {
use glib::translate::*;
Self {
socket: socket.to_glib_full(),
}
}
pub fn as_socket(&self) -> gio::Socket {
unsafe {
use glib::translate::*;
from_glib_none(self.socket)
}
}
#[cfg(unix)]
pub fn set_tos(&self, qos_dscp: i32) -> Result<(), glib::error::Error> {
use libc::{IPPROTO_IP, IPPROTO_IPV6, IPV6_TCLASS, IP_TOS};
let tos = (qos_dscp & 0x3f) << 2;
let socket = self.as_socket();
socket.set_option(IPPROTO_IP, IP_TOS, tos)?;
if socket.get_family() == gio::SocketFamily::Ipv6 {
socket.set_option(IPPROTO_IPV6, IPV6_TCLASS, tos)?;
}
Ok(())
}
#[cfg(not(unix))]
pub fn set_tos(&self, qos_dscp: i32) -> Result<(), Error> {
Ok(())
}
#[cfg(unix)]
pub fn get<T: FromRawFd>(&self) -> T {
unsafe { FromRawFd::from_raw_fd(libc::dup(gio_ffi::g_socket_get_fd(self.socket))) }
}
#[cfg(windows)]
pub fn get<T: FromRawSocket>(&self) -> T {
unsafe {
FromRawSocket::from_raw_socket(
dup_socket(gio_ffi::g_socket_get_fd(self.socket) as _) as _
)
}
}
}
impl Clone for GioSocketWrapper {
fn clone(&self) -> Self {
Self {
socket: unsafe { gobject_ffi::g_object_ref(self.socket as *mut _) as *mut _ },
}
}
}
impl Drop for GioSocketWrapper {
fn drop(&mut self) {
unsafe {
gobject_ffi::g_object_unref(self.socket as *mut _);
}
}
}
#[cfg(windows)]
unsafe fn dup_socket(socket: usize) -> usize {
use std::mem;
use winapi::shared::ws2def;
use winapi::um::processthreadsapi;
use winapi::um::winsock2;
let mut proto_info = mem::MaybeUninit::uninit();
let ret = winsock2::WSADuplicateSocketA(
socket,
processthreadsapi::GetCurrentProcessId(),
proto_info.as_mut_ptr(),
);
assert_eq!(ret, 0);
let mut proto_info = proto_info.assume_init();
let socket = winsock2::WSASocketA(
ws2def::AF_INET,
ws2def::SOCK_DGRAM,
ws2def::IPPROTO_UDP as i32,
&mut proto_info,
0,
0,
);
assert_ne!(socket, winsock2::INVALID_SOCKET);
socket
}
pub fn wrap_socket(socket: &tokio::net::UdpSocket) -> Result<GioSocketWrapper, gst::ErrorMessage> {
#[cfg(unix)]
unsafe {
let fd = libc::dup(socket.as_raw_fd());
// This is unsafe because it allows us to share the fd between the socket and the
// GIO socket below, but safety of this is the job of the application
struct FdConverter(RawFd);
impl IntoRawFd for FdConverter {
fn into_raw_fd(self) -> RawFd {
self.0
}
}
let fd = FdConverter(fd);
let gio_socket = gio::Socket::new_from_fd(fd).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenWrite,
["Failed to create wrapped GIO socket: {}", err]
)
})?;
Ok(GioSocketWrapper::new(&gio_socket))
}
#[cfg(windows)]
unsafe {
// FIXME: Needs https://github.com/tokio-rs/tokio/pull/806
// and https://github.com/carllerche/mio/pull/859
let fd = unreachable!(); //dup_socket(socket.as_raw_socket() as _) as _;
// This is unsafe because it allows us to share the fd between the socket and the
// GIO socket below, but safety of this is the job of the application
struct SocketConverter(RawSocket);
impl IntoRawSocket for SocketConverter {
fn into_raw_socket(self) -> RawSocket {
self.0
}
}
let fd = SocketConverter(fd);
let gio_socket = gio::Socket::new_from_socket(fd).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenWrite,
["Failed to create wrapped GIO socket: {}", err]
)
})?;
Ok(GioSocketWrapper::new(&gio_socket))
}
}

File diff suppressed because it is too large Load diff

View file

@ -22,7 +22,6 @@ use futures::lock::Mutex;
use futures::prelude::*;
use gio;
use gio_sys as gio_ffi;
use glib;
use glib::prelude::*;
@ -30,8 +29,6 @@ use glib::subclass;
use glib::subclass::prelude::*;
use glib::{glib_object_impl, glib_object_subclass};
use gobject_sys as gobject_ffi;
use gst;
use gst::prelude::*;
use gst::subclass::prelude::*;
@ -47,16 +44,10 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::{self, Arc};
use std::u16;
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
#[cfg(windows)]
use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
use crate::runtime::prelude::*;
use crate::runtime::{self, Context, JoinHandle, PadSrc, PadSrcRef};
use super::socket::{Socket, SocketRead, SocketStream};
use super::socket::{wrap_socket, GioSocketWrapper, Socket, SocketRead, SocketStream};
const DEFAULT_ADDRESS: Option<&str> = Some("127.0.0.1");
const DEFAULT_PORT: u32 = 5000;
@ -69,98 +60,6 @@ const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: u32 = 0;
const DEFAULT_RETRIEVE_SENDER_ADDRESS: bool = true;
// Send/Sync struct for passing around a gio::Socket
// and getting the raw fd from it
//
// gio::Socket is not Send/Sync as it's generally unsafe
// to access it from multiple threads. Getting the underlying raw
// fd is safe though, as is receiving/sending from two different threads
#[derive(Debug)]
struct GioSocketWrapper {
socket: *mut gio_ffi::GSocket,
}
unsafe impl Send for GioSocketWrapper {}
unsafe impl Sync for GioSocketWrapper {}
impl GioSocketWrapper {
fn new(socket: &gio::Socket) -> Self {
use glib::translate::*;
Self {
socket: socket.to_glib_full(),
}
}
fn as_socket(&self) -> gio::Socket {
unsafe {
use glib::translate::*;
from_glib_none(self.socket)
}
}
#[cfg(unix)]
fn get<T: FromRawFd>(&self) -> T {
unsafe { FromRawFd::from_raw_fd(libc::dup(gio_ffi::g_socket_get_fd(self.socket))) }
}
#[cfg(windows)]
fn get<T: FromRawSocket>(&self) -> T {
unsafe {
FromRawSocket::from_raw_socket(
dup_socket(gio_ffi::g_socket_get_fd(self.socket) as _) as _
)
}
}
}
impl Clone for GioSocketWrapper {
fn clone(&self) -> Self {
Self {
socket: unsafe { gobject_ffi::g_object_ref(self.socket as *mut _) as *mut _ },
}
}
}
impl Drop for GioSocketWrapper {
fn drop(&mut self) {
unsafe {
gobject_ffi::g_object_unref(self.socket as *mut _);
}
}
}
#[cfg(windows)]
unsafe fn dup_socket(socket: usize) -> usize {
use std::mem;
use winapi::shared::ws2def;
use winapi::um::processthreadsapi;
use winapi::um::winsock2;
let mut proto_info = mem::MaybeUninit::uninit();
let ret = winsock2::WSADuplicateSocketA(
socket,
processthreadsapi::GetCurrentProcessId(),
proto_info.as_mut_ptr(),
);
assert_eq!(ret, 0);
let mut proto_info = proto_info.assume_init();
let socket = winsock2::WSASocketA(
ws2def::AF_INET,
ws2def::SOCK_DGRAM,
ws2def::IPPROTO_UDP as i32,
&mut proto_info,
0,
0,
);
assert_ne!(socket, winsock2::INVALID_SOCKET);
socket
}
#[derive(Debug, Clone)]
struct Settings {
address: Option<String>,
@ -811,57 +710,7 @@ impl UdpSrc {
}
}
// Store the socket as used-socket in the settings
#[cfg(unix)]
unsafe {
let fd = libc::dup(socket.as_raw_fd());
// This is unsafe because it allows us to share the fd between the socket and the
// GIO socket below, but safety of this is the job of the application
struct FdConverter(RawFd);
impl IntoRawFd for FdConverter {
fn into_raw_fd(self) -> RawFd {
self.0
}
}
let fd = FdConverter(fd);
let gio_socket = gio::Socket::new_from_fd(fd).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to create wrapped GIO socket: {}", err]
)
})?;
let wrapper = GioSocketWrapper::new(&gio_socket);
settings.used_socket = Some(wrapper);
}
#[cfg(windows)]
unsafe {
// FIXME: Needs https://github.com/tokio-rs/tokio/pull/806
// and https://github.com/carllerche/mio/pull/859
let fd = unreachable!(); //dup_socket(socket.as_raw_socket() as _) as _;
// This is unsafe because it allows us to share the fd between the socket and the
// GIO socket below, but safety of this is the job of the application
struct SocketConverter(RawSocket);
impl IntoRawSocket for SocketConverter {
fn into_raw_socket(self) -> RawSocket {
self.0
}
}
let fd = SocketConverter(fd);
let gio_socket = gio::Socket::new_from_socket(fd).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to create wrapped GIO socket: {}", err]
)
})?;
let wrapper = GioSocketWrapper::new(&gio_socket);
settings.used_socket = Some(wrapper);
}
settings.used_socket = Some(wrap_socket(&socket)?);
socket
};

View file

@ -0,0 +1,160 @@
// Copyright (C) 2019 Mathieu Duponchelle <mathieu@centricular.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
use std::thread;
use glib;
use glib::prelude::*;
use gst;
use gst_check;
use gstthreadshare;
fn init() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
gst::init().unwrap();
gstthreadshare::plugin_register_static().expect("gstthreadshare udpsrc test");
});
}
#[test]
fn test_client_management() {
init();
let h = gst_check::Harness::new("ts-udpsink");
let udpsink = h.get_element().unwrap();
let clients = udpsink
.get_property("clients")
.unwrap()
.get::<String>()
.unwrap()
.unwrap();
assert_eq!(clients, "127.0.0.1:5000");
udpsink.emit("add", &[&"192.168.1.1", &57]).unwrap();
let clients = udpsink
.get_property("clients")
.unwrap()
.get::<String>()
.unwrap()
.unwrap();
assert_eq!(clients, "127.0.0.1:5000,192.168.1.1:57");
/* Adding a client twice is not supported */
udpsink.emit("add", &[&"192.168.1.1", &57]).unwrap();
let clients = udpsink
.get_property("clients")
.unwrap()
.get::<String>()
.unwrap()
.unwrap();
assert_eq!(clients, "127.0.0.1:5000,192.168.1.1:57");
udpsink.emit("remove", &[&"192.168.1.1", &57]).unwrap();
let clients = udpsink
.get_property("clients")
.unwrap()
.get::<String>()
.unwrap()
.unwrap();
assert_eq!(clients, "127.0.0.1:5000");
/* Removing a non-existing client should not be a problem */
udpsink.emit("remove", &[&"192.168.1.1", &57]).unwrap();
let clients = udpsink
.get_property("clients")
.unwrap()
.get::<String>()
.unwrap()
.unwrap();
assert_eq!(clients, "127.0.0.1:5000");
/* While the default host:address client is listed in clients,
* it can't be removed with the remove signal */
udpsink.emit("remove", &[&"127.0.0.1", &5000]).unwrap();
let clients = udpsink
.get_property("clients")
.unwrap()
.get::<String>()
.unwrap()
.unwrap();
assert_eq!(clients, "127.0.0.1:5000");
/* It is however possible to remove the default client by setting
* host to None */
let host: Option<String> = None;
udpsink.set_property("host", &host).unwrap();
let clients = udpsink
.get_property("clients")
.unwrap()
.get::<String>()
.unwrap()
.unwrap();
assert_eq!(clients, "");
/* The client properties is writable too */
udpsink
.set_property("clients", &"127.0.0.1:5000,192.168.1.1:57")
.unwrap();
let clients = udpsink
.get_property("clients")
.unwrap()
.get::<String>()
.unwrap()
.unwrap();
assert_eq!(clients, "127.0.0.1:5000,192.168.1.1:57");
udpsink.emit("clear", &[]).unwrap();
let clients = udpsink
.get_property("clients")
.unwrap()
.get::<String>()
.unwrap()
.unwrap();
assert_eq!(clients, "");
}
#[test]
fn test_chain() {
init();
let mut h = gst_check::Harness::new("ts-udpsink");
h.set_src_caps_str(&"foo/bar");
thread::spawn(move || {
use std::net;
use std::time;
thread::sleep(time::Duration::from_millis(50));
let socket = net::UdpSocket::bind("127.0.0.1:5000").unwrap();
let mut buf = [0; 5];
let (amt, _) = socket.recv_from(&mut buf).unwrap();
assert!(amt == 4);
assert!(buf == [42, 43, 44, 45, 0]);
});
let buf = gst::Buffer::from_slice(&[42, 43, 44, 45]);
assert!(h.push(buf) == Ok(gst::FlowSuccess::Ok));
}