From 957aac94aeb255b7cab1b02fb92d1f5b4d6814ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Mon, 27 Apr 2020 11:22:26 +0200 Subject: [PATCH] threadshare: align some properties with their C counterparts Some ts-elements properties don't match the name, type or default value of the C elements counterparts. --- generic/threadshare/examples/benchmark.rs | 8 +-- generic/threadshare/src/tcpclientsrc.rs | 65 ++++++++++++----------- generic/threadshare/src/udpsink.rs | 38 ++++++------- generic/threadshare/src/udpsrc.rs | 19 +++---- generic/threadshare/tests/pipeline.rs | 4 +- generic/threadshare/tests/tcpclientsrc.rs | 2 +- generic/threadshare/tests/udpsink.rs | 32 ++++++----- generic/threadshare/tests/udpsrc.rs | 6 +-- 8 files changed, 90 insertions(+), 84 deletions(-) diff --git a/generic/threadshare/examples/benchmark.rs b/generic/threadshare/examples/benchmark.rs index beed48a1..e393491e 100644 --- a/generic/threadshare/examples/benchmark.rs +++ b/generic/threadshare/examples/benchmark.rs @@ -97,7 +97,7 @@ fn main() { gst::ElementFactory::make("ts-udpsrc", Some(format!("source-{}", i).as_str())) .unwrap(); source - .set_property("port", &(40000u32 + (i as u32))) + .set_property("port", &(40000i32 + (i as i32))) .unwrap(); source .set_property("context", &format!("context-{}", (i as u32) % n_groups)) @@ -113,7 +113,7 @@ fn main() { ) .unwrap(); source.set_property("host", &"127.0.0.1").unwrap(); - source.set_property("port", &(40000i32)).unwrap(); + source.set_property("port", &40000i32).unwrap(); source } @@ -123,8 +123,8 @@ fn main() { Some(format!("source-{}", i).as_str()), ) .unwrap(); - source.set_property("address", &"127.0.0.1").unwrap(); - source.set_property("port", &(40000u32)).unwrap(); + source.set_property("host", &"127.0.0.1").unwrap(); + source.set_property("port", &40000i32).unwrap(); source .set_property("context", &format!("context-{}", (i as u32) % n_groups)) .unwrap(); diff --git a/generic/threadshare/src/tcpclientsrc.rs b/generic/threadshare/src/tcpclientsrc.rs index 26c1dd08..8068a59f 100644 --- a/generic/threadshare/src/tcpclientsrc.rs +++ b/generic/threadshare/src/tcpclientsrc.rs @@ -36,6 +36,7 @@ use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use std::sync::Mutex as StdMutex; use std::u16; +use std::u32; use tokio::io::AsyncReadExt; @@ -44,19 +45,19 @@ use crate::runtime::{Context, PadSrc, PadSrcRef, Task}; use super::socket::{Socket, SocketError, SocketRead, SocketState}; -const DEFAULT_ADDRESS: Option<&str> = Some("127.0.0.1"); -const DEFAULT_PORT: u32 = 5000; +const DEFAULT_HOST: Option<&str> = Some("127.0.0.1"); +const DEFAULT_PORT: i32 = 4953; const DEFAULT_CAPS: Option = None; -const DEFAULT_CHUNK_SIZE: u32 = 4096; +const DEFAULT_BLOCKSIZE: u32 = 4096; const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT_WAIT: u32 = 0; #[derive(Debug, Clone)] struct Settings { - address: Option, - port: u32, + host: Option, + port: i32, caps: Option, - chunk_size: u32, + blocksize: u32, context: String, context_wait: u32, } @@ -64,10 +65,10 @@ struct Settings { impl Default for Settings { fn default() -> Self { Settings { - address: DEFAULT_ADDRESS.map(Into::into), + host: DEFAULT_HOST.map(Into::into), port: DEFAULT_PORT, caps: DEFAULT_CAPS, - chunk_size: DEFAULT_CHUNK_SIZE, + blocksize: DEFAULT_BLOCKSIZE, context: DEFAULT_CONTEXT.into(), context_wait: DEFAULT_CONTEXT_WAIT, } @@ -75,22 +76,22 @@ impl Default for Settings { } static PROPERTIES: [subclass::Property; 6] = [ - subclass::Property("address", |name| { + subclass::Property("host", |name| { glib::ParamSpec::string( name, - "Address", - "Address to receive packets from", - DEFAULT_ADDRESS, + "Host", + "The host IP address to receive packets from", + DEFAULT_HOST, glib::ParamFlags::READWRITE, ) }), subclass::Property("port", |name| { - glib::ParamSpec::uint( + glib::ParamSpec::int( name, "Port", "Port to receive packets from", 0, - u16::MAX as u32, + u16::MAX as i32, DEFAULT_PORT, glib::ParamFlags::READWRITE, ) @@ -104,14 +105,14 @@ static PROPERTIES: [subclass::Property; 6] = [ glib::ParamFlags::READWRITE, ) }), - subclass::Property("chunk-size", |name| { + subclass::Property("blocksize", |name| { glib::ParamSpec::uint( name, - "Chunk Size", - "Chunk Size", + "Blocksize", + "Size in bytes to read per buffer (-1 = default)", 0, - u16::MAX as u32, - DEFAULT_CHUNK_SIZE, + u32::MAX, + DEFAULT_BLOCKSIZE, glib::ParamFlags::READWRITE, ) }), @@ -384,28 +385,28 @@ impl TcpClientSrc { ) })?; - let addr: IpAddr = match settings.address { + let host: IpAddr = match settings.host { None => { return Err(gst_error_msg!( gst::ResourceError::Settings, - ["No address set"] + ["No host set"] )); } - Some(ref addr) => match addr.parse() { + Some(ref host) => match host.parse() { Err(err) => { return Err(gst_error_msg!( gst::ResourceError::Settings, - ["Invalid address '{}' set: {}", addr, err] + ["Invalid host '{}' set: {}", host, err] )); } - Ok(addr) => addr, + Ok(host) => host, }, }; let port = settings.port; let buffer_pool = gst::BufferPool::new(); let mut config = buffer_pool.get_config(); - config.set_params(None, settings.chunk_size, 0, 0); + config.set_params(None, settings.blocksize, 0, 0); buffer_pool.set_config(config).map_err(|_| { gst_error_msg!( gst::ResourceError::Settings, @@ -413,7 +414,7 @@ impl TcpClientSrc { ) })?; - let saddr = SocketAddr::new(addr, port as u16); + let saddr = SocketAddr::new(host, port as u16); let element_clone = element.clone(); let socket = Socket::new(element.upcast_ref(), buffer_pool, async move { gst_debug!(CAT, obj: &element_clone, "Connecting to {:?}", saddr); @@ -658,8 +659,8 @@ impl ObjectImpl for TcpClientSrc { let mut settings = self.settings.lock().unwrap(); match *prop { - subclass::Property("address", ..) => { - settings.address = value.get().expect("type checked upstream"); + subclass::Property("host", ..) => { + settings.host = value.get().expect("type checked upstream"); } subclass::Property("port", ..) => { settings.port = value.get_some().expect("type checked upstream"); @@ -667,8 +668,8 @@ impl ObjectImpl for TcpClientSrc { subclass::Property("caps", ..) => { settings.caps = value.get().expect("type checked upstream"); } - subclass::Property("chunk-size", ..) => { - settings.chunk_size = value.get_some().expect("type checked upstream"); + subclass::Property("blocksize", ..) => { + settings.blocksize = value.get_some().expect("type checked upstream"); } subclass::Property("context", ..) => { settings.context = value @@ -688,10 +689,10 @@ impl ObjectImpl for TcpClientSrc { let settings = self.settings.lock().unwrap(); match *prop { - subclass::Property("address", ..) => Ok(settings.address.to_value()), + subclass::Property("host", ..) => Ok(settings.host.to_value()), subclass::Property("port", ..) => Ok(settings.port.to_value()), subclass::Property("caps", ..) => Ok(settings.caps.to_value()), - subclass::Property("chunk-size", ..) => Ok(settings.chunk_size.to_value()), + subclass::Property("blocksize", ..) => Ok(settings.blocksize.to_value()), subclass::Property("context", ..) => Ok(settings.context.to_value()), subclass::Property("context-wait", ..) => Ok(settings.context_wait.to_value()), _ => unimplemented!(), diff --git a/generic/threadshare/src/udpsink.rs b/generic/threadshare/src/udpsink.rs index cc0b941b..8b96b539 100644 --- a/generic/threadshare/src/udpsink.rs +++ b/generic/threadshare/src/udpsink.rs @@ -50,12 +50,12 @@ use std::u16; use std::u8; const DEFAULT_HOST: Option<&str> = Some("127.0.0.1"); -const DEFAULT_PORT: u32 = 5000; +const DEFAULT_PORT: i32 = 5004; const DEFAULT_SYNC: bool = true; const DEFAULT_BIND_ADDRESS: &str = "0.0.0.0"; -const DEFAULT_BIND_PORT: u32 = 0; +const DEFAULT_BIND_PORT: i32 = 0; const DEFAULT_BIND_ADDRESS_V6: &str = "::"; -const DEFAULT_BIND_PORT_V6: u32 = 0; +const DEFAULT_BIND_PORT_V6: i32 = 0; const DEFAULT_SOCKET: Option = None; const DEFAULT_USED_SOCKET: Option = None; const DEFAULT_SOCKET_V6: Option = None; @@ -72,12 +72,12 @@ const DEFAULT_CONTEXT_WAIT: u32 = 0; #[derive(Debug, Clone)] struct Settings { host: Option, - port: u32, + port: i32, sync: bool, bind_address: String, - bind_port: u32, + bind_port: i32, bind_address_v6: String, - bind_port_v6: u32, + bind_port_v6: i32, socket: Option, used_socket: Option, socket_v6: Option, @@ -135,12 +135,12 @@ static PROPERTIES: [subclass::Property; 19] = [ ) }), subclass::Property("port", |name| { - glib::ParamSpec::uint( + glib::ParamSpec::int( name, "Port", "The port to send the packets to", 0, - u16::MAX as u32, + u16::MAX as i32, DEFAULT_PORT, glib::ParamFlags::READWRITE, ) @@ -164,12 +164,12 @@ static PROPERTIES: [subclass::Property; 19] = [ ) }), subclass::Property("bind-port", |name| { - glib::ParamSpec::uint( + glib::ParamSpec::int( name, "Bind Port", "Port to bind the socket to", 0, - u16::MAX as u32, + u16::MAX as i32, DEFAULT_BIND_PORT, glib::ParamFlags::READWRITE, ) @@ -184,12 +184,12 @@ static PROPERTIES: [subclass::Property; 19] = [ ) }), subclass::Property("bind-port-v6", |name| { - glib::ParamSpec::uint( + glib::ParamSpec::int( name, "Bind Port", "Port to bind the V6 socket to", 0, - u16::MAX as u32, + u16::MAX as i32, DEFAULT_BIND_PORT_V6, glib::ParamFlags::READWRITE, ) @@ -779,7 +779,7 @@ impl PadSinkHandler for UdpSinkPadHandler { async move { if let Some(sender) = sender.lock().await.as_mut() { - if let Err(_) = sender.send(TaskItem::Buffer(buffer)).await { + if sender.send(TaskItem::Buffer(buffer)).await.is_err() { gst_debug!(CAT, obj: &element, "Flushing"); return Err(gst::FlowError::Flushing); } @@ -802,7 +802,7 @@ impl PadSinkHandler for UdpSinkPadHandler { async move { if let Some(sender) = sender.lock().await.as_mut() { for buffer in list.iter_owned() { - if let Err(_) = sender.send(TaskItem::Buffer(buffer)).await { + if sender.send(TaskItem::Buffer(buffer)).await.is_err() { gst_debug!(CAT, obj: &element, "Flushing"); return Err(gst::FlowError::Flushing); } @@ -829,7 +829,7 @@ impl PadSinkHandler for UdpSinkPadHandler { let udpsink = UdpSink::from_instance(&element); let _ = udpsink.start(&element); } else if let Some(sender) = sender.lock().await.as_mut() { - if let Err(_) = sender.send(TaskItem::Event(event)).await { + if sender.send(TaskItem::Event(event)).await.is_err() { gst_debug!(CAT, obj: &element, "Flushing"); } } @@ -1123,7 +1123,7 @@ impl UdpSink { } } -fn try_into_socket_addr(element: &gst::Element, host: &str, port: u32) -> Result { +fn try_into_socket_addr(element: &gst::Element, host: &str, port: i32) -> Result { let addr: IpAddr = match host.parse() { Err(err) => { gst_error!(CAT, obj: element, "Failed to parse host {}: {}", host, err); @@ -1186,7 +1186,7 @@ impl ObjectSubclass for UdpSink { let port = args[2] .get::() .expect("signal arg") - .expect("missing signal arg") as u32; + .expect("missing signal arg"); if let Ok(addr) = try_into_socket_addr(&element, &host, port) { let udpsink = Self::from_instance(&element); @@ -1214,7 +1214,7 @@ impl ObjectSubclass for UdpSink { let port = args[2] .get::() .expect("signal arg") - .expect("missing signal arg") as u32; + .expect("missing signal arg"); let udpsink = Self::from_instance(&element); let settings = udpsink.settings.lock().unwrap(); @@ -1382,7 +1382,7 @@ impl ObjectImpl for UdpSink { if rsplit.len() == 2 { rsplit[0] - .parse::() + .parse::() .map_err(|err| { gst_error!( CAT, diff --git a/generic/threadshare/src/udpsrc.rs b/generic/threadshare/src/udpsrc.rs index bb530010..2dd20955 100644 --- a/generic/threadshare/src/udpsrc.rs +++ b/generic/threadshare/src/udpsrc.rs @@ -31,6 +31,7 @@ use gst_net::*; use lazy_static::lazy_static; +use std::i32; use std::io; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::Arc; @@ -42,11 +43,11 @@ use crate::runtime::{Context, PadSrc, PadSrcRef, Task}; use super::socket::{wrap_socket, GioSocketWrapper, Socket, SocketError, SocketRead, SocketState}; -const DEFAULT_ADDRESS: Option<&str> = Some("127.0.0.1"); -const DEFAULT_PORT: u32 = 5000; +const DEFAULT_ADDRESS: Option<&str> = Some("0.0.0.0"); +const DEFAULT_PORT: i32 = 5000; const DEFAULT_REUSE: bool = true; const DEFAULT_CAPS: Option = None; -const DEFAULT_MTU: u32 = 1500; +const DEFAULT_MTU: u32 = 1492; const DEFAULT_SOCKET: Option = None; const DEFAULT_USED_SOCKET: Option = None; const DEFAULT_CONTEXT: &str = ""; @@ -56,7 +57,7 @@ const DEFAULT_RETRIEVE_SENDER_ADDRESS: bool = true; #[derive(Debug, Clone)] struct Settings { address: Option, - port: u32, + port: i32, // for conformity with C based udpsrc reuse: bool, caps: Option, mtu: u32, @@ -95,12 +96,12 @@ static PROPERTIES: [subclass::Property; 10] = [ ) }), subclass::Property("port", |name| { - glib::ParamSpec::uint( + glib::ParamSpec::int( name, "Port", "Port to listen on", 0, - u16::MAX as u32, + u16::MAX as i32, DEFAULT_PORT, glib::ParamFlags::READWRITE, ) @@ -127,9 +128,9 @@ static PROPERTIES: [subclass::Property; 10] = [ glib::ParamSpec::uint( name, "MTU", - "MTU", + "Maximum expected packet size. This directly defines the allocation size of the receive buffer pool", 0, - u16::MAX as u32, + i32::MAX as u32, DEFAULT_MTU, glib::ParamFlags::READWRITE, ) @@ -907,7 +908,7 @@ impl ObjectImpl for UdpSrc { let settings = self.settings.lock().unwrap(); match *prop { subclass::Property("address", ..) => Ok(settings.address.to_value()), - subclass::Property("port", ..) => Ok(settings.port.to_value()), + subclass::Property("port", ..) => Ok((settings.port).to_value()), subclass::Property("reuse", ..) => Ok(settings.reuse.to_value()), subclass::Property("caps", ..) => Ok(settings.caps.to_value()), subclass::Property("mtu", ..) => Ok(settings.mtu.to_value()), diff --git a/generic/threadshare/tests/pipeline.rs b/generic/threadshare/tests/pipeline.rs index e65b50ca..e7e73912 100644 --- a/generic/threadshare/tests/pipeline.rs +++ b/generic/threadshare/tests/pipeline.rs @@ -65,7 +65,7 @@ fn multiple_contexts_queue() { src.set_property("context", &format!("context-{}", (i as u32) % CONTEXT_NB)) .unwrap(); src.set_property("context-wait", &CONTEXT_WAIT).unwrap(); - src.set_property("port", &((FIRST_PORT + i) as u32)) + src.set_property("port", &((FIRST_PORT + i) as i32)) .unwrap(); let queue = @@ -204,7 +204,7 @@ fn multiple_contexts_proxy() { src.set_property("context", &format!("context-{}", (i as u32) % CONTEXT_NB)) .unwrap(); src.set_property("context-wait", &CONTEXT_WAIT).unwrap(); - src.set_property("port", &((FIRST_PORT + i) as u32)) + src.set_property("port", &((FIRST_PORT + i) as i32)) .unwrap(); let proxysink = gst::ElementFactory::make( diff --git a/generic/threadshare/tests/tcpclientsrc.rs b/generic/threadshare/tests/tcpclientsrc.rs index 41a506fd..f545861c 100644 --- a/generic/threadshare/tests/tcpclientsrc.rs +++ b/generic/threadshare/tests/tcpclientsrc.rs @@ -62,7 +62,7 @@ fn test_push() { let caps = gst::Caps::new_simple("foo/bar", &[]); tcpclientsrc.set_property("caps", &caps).unwrap(); - tcpclientsrc.set_property("port", &(5000u32)).unwrap(); + tcpclientsrc.set_property("port", &5000i32).unwrap(); appsink.set_property("emit-signals", &true).unwrap(); diff --git a/generic/threadshare/tests/udpsink.rs b/generic/threadshare/tests/udpsink.rs index 5be3f23f..3638f538 100644 --- a/generic/threadshare/tests/udpsink.rs +++ b/generic/threadshare/tests/udpsink.rs @@ -43,56 +43,56 @@ fn test_client_management() { .unwrap() .unwrap(); - assert_eq!(clients, "127.0.0.1:5000"); + assert_eq!(clients, "127.0.0.1:5004"); - udpsink.emit("add", &[&"192.168.1.1", &57]).unwrap(); + udpsink.emit("add", &[&"192.168.1.1", &57i32]).unwrap(); let clients = udpsink .get_property("clients") .unwrap() .get::() .unwrap() .unwrap(); - assert_eq!(clients, "127.0.0.1:5000,192.168.1.1:57"); + assert_eq!(clients, "127.0.0.1:5004,192.168.1.1:57"); /* Adding a client twice is not supported */ - udpsink.emit("add", &[&"192.168.1.1", &57]).unwrap(); + udpsink.emit("add", &[&"192.168.1.1", &57i32]).unwrap(); let clients = udpsink .get_property("clients") .unwrap() .get::() .unwrap() .unwrap(); - assert_eq!(clients, "127.0.0.1:5000,192.168.1.1:57"); + assert_eq!(clients, "127.0.0.1:5004,192.168.1.1:57"); - udpsink.emit("remove", &[&"192.168.1.1", &57]).unwrap(); + udpsink.emit("remove", &[&"192.168.1.1", &57i32]).unwrap(); let clients = udpsink .get_property("clients") .unwrap() .get::() .unwrap() .unwrap(); - assert_eq!(clients, "127.0.0.1:5000"); + assert_eq!(clients, "127.0.0.1:5004"); /* Removing a non-existing client should not be a problem */ - udpsink.emit("remove", &[&"192.168.1.1", &57]).unwrap(); + udpsink.emit("remove", &[&"192.168.1.1", &57i32]).unwrap(); let clients = udpsink .get_property("clients") .unwrap() .get::() .unwrap() .unwrap(); - assert_eq!(clients, "127.0.0.1:5000"); + assert_eq!(clients, "127.0.0.1:5004"); /* While the default host:address client is listed in clients, * it can't be removed with the remove signal */ - udpsink.emit("remove", &[&"127.0.0.1", &5000]).unwrap(); + udpsink.emit("remove", &[&"127.0.0.1", &5004i32]).unwrap(); let clients = udpsink .get_property("clients") .unwrap() .get::() .unwrap() .unwrap(); - assert_eq!(clients, "127.0.0.1:5000"); + assert_eq!(clients, "127.0.0.1:5004"); /* It is however possible to remove the default client by setting * host to None */ @@ -108,7 +108,7 @@ fn test_client_management() { /* The client properties is writable too */ udpsink - .set_property("clients", &"127.0.0.1:5000,192.168.1.1:57") + .set_property("clients", &"127.0.0.1:5004,192.168.1.1:57") .unwrap(); let clients = udpsink .get_property("clients") @@ -116,7 +116,7 @@ fn test_client_management() { .get::() .unwrap() .unwrap(); - assert_eq!(clients, "127.0.0.1:5000,192.168.1.1:57"); + assert_eq!(clients, "127.0.0.1:5004,192.168.1.1:57"); udpsink.emit("clear", &[]).unwrap(); let clients = udpsink @@ -134,6 +134,10 @@ fn test_chain() { let mut h = gst_check::Harness::new("ts-udpsink"); h.set_src_caps_str(&"foo/bar"); + { + let udpsink = h.get_element().unwrap(); + udpsink.set_property("port", &5005i32).unwrap(); + } thread::spawn(move || { use std::net; @@ -141,7 +145,7 @@ fn test_chain() { thread::sleep(time::Duration::from_millis(50)); - let socket = net::UdpSocket::bind("127.0.0.1:5000").unwrap(); + let socket = net::UdpSocket::bind("127.0.0.1:5005").unwrap(); let mut buf = [0; 5]; let (amt, _) = socket.recv_from(&mut buf).unwrap(); diff --git a/generic/threadshare/tests/udpsrc.rs b/generic/threadshare/tests/udpsrc.rs index 90d0d51e..44d7b96c 100644 --- a/generic/threadshare/tests/udpsrc.rs +++ b/generic/threadshare/tests/udpsrc.rs @@ -39,7 +39,7 @@ fn test_push() { { let udpsrc = h.get_element().unwrap(); udpsrc.set_property("caps", &caps).unwrap(); - udpsrc.set_property("port", &(5000 as u32)).unwrap(); + udpsrc.set_property("port", &5000i32).unwrap(); udpsrc.set_property("context", &"test-push").unwrap(); } @@ -105,7 +105,7 @@ fn test_socket_reuse() { { let udpsrc = ts_src_h.get_element().unwrap(); - udpsrc.set_property("port", &(6000 as u32)).unwrap(); + udpsrc.set_property("port", &6000i32).unwrap(); udpsrc .set_property("context", &"test-socket-reuse") .unwrap(); @@ -130,7 +130,7 @@ fn test_socket_reuse() { { let udpsrc = ts_src_h2.get_element().unwrap(); - udpsrc.set_property("port", &(6001 as u32)).unwrap(); + udpsrc.set_property("port", &6001i32).unwrap(); udpsrc .set_property("context", &"test-socket-reuse") .unwrap();