Merge branch 'ts-udpsrc-fixes' into 'main'

threadshare: add properties for multicast-iface and buffer-size to udpsrc

See merge request gstreamer/gst-plugins-rs!1420
This commit is contained in:
Taruntej Kanakamalla 2024-04-27 00:55:50 +00:00
commit ee5b83a3a3
4 changed files with 227 additions and 15 deletions

92
Cargo.lock generated
View file

@ -1472,6 +1472,23 @@ dependencies = [
"system-deps",
]
[[package]]
name = "default-net"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85dc7576d8346d3c86ad64dc64d26d0f6c970ba4795b850f15ee94467d8e53eb"
dependencies = [
"dlopen2",
"libc",
"memalloc",
"netlink-packet-core",
"netlink-packet-route",
"netlink-sys",
"once_cell",
"system-configuration",
"windows",
]
[[package]]
name = "der"
version = "0.6.1"
@ -1518,6 +1535,17 @@ dependencies = [
"subtle",
]
[[package]]
name = "dlopen2"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09b4f5f101177ff01b8ec4ecc81eead416a8aa42819a2869311b3420fa114ffa"
dependencies = [
"libc",
"once_cell",
"winapi",
]
[[package]]
name = "dssim-core"
version = "3.2.8"
@ -2808,6 +2836,7 @@ dependencies = [
"cfg-if",
"clap",
"concurrent-queue",
"default-net",
"flume",
"futures",
"gio",
@ -4507,6 +4536,12 @@ dependencies = [
"digest 0.10.7",
]
[[package]]
name = "memalloc"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df39d232f5c40b0891c10216992c2f250c054105cb1e56f0fc9032db6203ecc1"
[[package]]
name = "memchr"
version = "2.7.2"
@ -4655,6 +4690,54 @@ dependencies = [
"tempfile",
]
[[package]]
name = "netlink-packet-core"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72724faf704479d67b388da142b186f916188505e7e0b26719019c525882eda4"
dependencies = [
"anyhow",
"byteorder",
"netlink-packet-utils",
]
[[package]]
name = "netlink-packet-route"
version = "0.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "053998cea5a306971f88580d0829e90f270f940befd7cf928da179d4187a5a66"
dependencies = [
"anyhow",
"bitflags 1.3.2",
"byteorder",
"libc",
"netlink-packet-core",
"netlink-packet-utils",
]
[[package]]
name = "netlink-packet-utils"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ede8a08c71ad5a95cdd0e4e52facd37190977039a4704eb82a283f713747d34"
dependencies = [
"anyhow",
"byteorder",
"paste",
"thiserror",
]
[[package]]
name = "netlink-sys"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6471bf08e7ac0135876a9581bf3217ef0333c191c128d34878079f42ee150411"
dependencies = [
"bytes",
"libc",
"log",
]
[[package]]
name = "new_debug_unreachable"
version = "1.0.6"
@ -7099,6 +7182,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f"
dependencies = [
"windows-targets 0.48.5",
]
[[package]]
name = "windows-core"
version = "0.52.0"

View file

@ -10722,6 +10722,20 @@
"type": "gchararray",
"writable": true
},
"buffer-size": {
"blurb": "Size of the kernel receive buffer in bytes, 0=default",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "0",
"max": "-1",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint",
"writable": true
},
"caps": {
"blurb": "Caps to use",
"conditionally-available": false,
@ -10773,6 +10787,18 @@
"type": "guint",
"writable": true
},
"multicast-iface": {
"blurb": "The network interface on which to join the multicast group. This allows multiple interfaces separated by comma. ( e.g. eth0,eth1,wlan0)",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "NULL",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": true
},
"port": {
"blurb": "Port to listen on",
"conditionally-available": false,

View file

@ -27,6 +27,7 @@ rustix = { version = "0.38.2", default-features = false, features = ["std", "fs"
slab = "0.4.7"
socket2 = {features = ["all"], version = "0.5"}
waker-fn = "1.1"
default-net = "0.21.0"
# Used by examples
clap = { version = "4", features = ["derive"], optional = true }

View file

@ -49,6 +49,8 @@ const DEFAULT_USED_SOCKET: Option<GioSocketWrapper> = None;
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO;
const DEFAULT_RETRIEVE_SENDER_ADDRESS: bool = true;
const DEFAULT_MULTICAST_IFACE: Option<&str> = None;
const DEFAULT_BUFFER_SIZE: u32 = 0;
#[derive(Debug, Clone)]
struct Settings {
@ -62,6 +64,8 @@ struct Settings {
context: String,
context_wait: Duration,
retrieve_sender_address: bool,
multicast_iface: Option<String>,
buffer_size: u32,
}
impl Default for Settings {
@ -77,6 +81,8 @@ impl Default for Settings {
context: DEFAULT_CONTEXT.into(),
context_wait: DEFAULT_CONTEXT_WAIT,
retrieve_sender_address: DEFAULT_RETRIEVE_SENDER_ADDRESS,
multicast_iface: DEFAULT_MULTICAST_IFACE.map(Into::into),
buffer_size: DEFAULT_BUFFER_SIZE,
}
}
}
@ -302,6 +308,17 @@ impl TaskImpl for UdpSrcTask {
)
})?;
gst::debug!(CAT, obj: self.element, "socket recv buffer size is {:?}", socket.recv_buffer_size());
if settings.buffer_size != 0 {
gst::debug!(CAT, obj: self.element, "changing the socket recv buffer size to {}", settings.buffer_size);
socket.set_recv_buffer_size(settings.buffer_size as usize).map_err(|err| {
gst::error_msg!(
gst::ResourceError::OpenRead,
["Failed to set buffer_size: {}", err]
)
})?;
}
#[cfg(unix)]
{
socket.set_reuse_port(settings.reuse).map_err(|err| {
@ -327,26 +344,83 @@ impl TaskImpl for UdpSrcTask {
})?;
if addr.is_multicast() {
// TODO: Multicast interface configuration, going to be tricky
match addr {
IpAddr::V4(addr) => {
socket
.as_ref()
.join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0))
.map_err(|err| {
// If the 'multicast-iface' property is specified,
// fetch all the interfaces available and filter the available
// interfaces among the ones specified in the `multicast-iface`
if let Some(iface_name) = &settings.multicast_iface {
let preferred_ifaces: Vec<&str> = iface_name.split(",").collect();
let all_ifaces = default_net::get_interfaces();
let ifaces = all_ifaces.iter().filter(|iface| {
preferred_ifaces.iter().position(|&p| {
iface.name == p.to_string() ||
iface.friendly_name == Some(p.to_string())
}).is_some()
});
for iface in ifaces {
match addr {
IpAddr::V4(addr) => {
let ip_addr = if iface.ipv4.is_empty() {
// No Ipv4 address on this interface, use 0.0.0.0
// as the default address to join multicast
Ipv4Addr::new(0,0,0,0)
} else {
// Use the first ipv4 address in the list
gst::debug!(CAT, obj:self.element, "Joining {} ({:?}) to multicast", iface.name, iface.friendly_name);
iface.ipv4.first().unwrap().addr
};
//TODO: Change this to use interface index instead of the IP address
// Would be ideal if we could rewrite/re-use `join_multicast_v4`
// in gstreamer/libs/gst/helpers/ptp/net.rs
// We could use `socket2::join_multicast_v4` but it is
// not supported for all the OS.
socket
.as_ref()
.join_multicast_v4(&addr, &ip_addr)
.map_err(|err| {
gst::error_msg!(
gst::ResourceError::OpenRead,
["{} {:?} Failed to join multicast group: {}", iface.name, iface.friendly_name, err]
)
})?;
}
IpAddr::V6(addr) => {
let idx = iface.index;
gst::debug!(CAT, obj:self.element, "Joining {} ({:?}) to multicast", iface.name, iface.friendly_name);
socket.as_ref().join_multicast_v6(&addr, idx).map_err(|err| {
gst::error_msg!(
gst::ResourceError::OpenRead,
["Failed to join multicast group: {}", err]
)
})?;
}
}
}
} else {
match addr {
IpAddr::V4(addr) => {
socket
.as_ref()
.join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0))
.map_err(|err| {
gst::error_msg!(
gst::ResourceError::OpenRead,
["Failed to join multicast group: {}", err]
)
})?;
}
IpAddr::V6(addr) => {
socket.as_ref().join_multicast_v6(&addr, 0).map_err(|err| {
gst::error_msg!(
gst::ResourceError::OpenRead,
["Failed to join multicast group: {}", err]
)
})?;
}
IpAddr::V6(addr) => {
socket.as_ref().join_multicast_v6(&addr, 0).map_err(|err| {
gst::error_msg!(
gst::ResourceError::OpenRead,
["Failed to join multicast group: {}", err]
)
})?;
}
}
}
}
@ -679,6 +753,17 @@ impl ObjectImpl for UdpSrc {
.blurb("Whether to retrieve the sender address and add it to buffers as meta. Disabling this might result in minor performance improvements in certain scenarios")
.default_value(DEFAULT_RETRIEVE_SENDER_ADDRESS)
.build(),
glib::ParamSpecString::builder("multicast-iface")
.nick("Multicast Interface")
.blurb("The network interface on which to join the multicast group. This allows multiple interfaces separated by comma. ( e.g. eth0,eth1,wlan0)")
.default_value(DEFAULT_MULTICAST_IFACE)
.build(),
glib::ParamSpecUInt::builder("buffer-size")
.nick("Buffer Size")
.blurb("Size of the kernel receive buffer in bytes, 0=default")
.maximum(u32::MAX)
.default_value(DEFAULT_BUFFER_SIZE)
.build(),
];
#[cfg(not(windows))]
@ -745,6 +830,12 @@ impl ObjectImpl for UdpSrc {
"retrieve-sender-address" => {
settings.retrieve_sender_address = value.get().expect("type checked upstream");
}
"multicast-iface" => {
settings.multicast_iface = value.get().expect("type checked upstream");
}
"buffer-size" => {
settings.buffer_size = value.get().expect("type checked upstream");
}
_ => unimplemented!(),
}
}
@ -770,6 +861,8 @@ impl ObjectImpl for UdpSrc {
"context" => settings.context.to_value(),
"context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
"retrieve-sender-address" => settings.retrieve_sender_address.to_value(),
"multicast-iface" => settings.multicast_iface.to_value(),
"buffer-size" => settings.buffer_size.to_value(),
_ => unimplemented!(),
}
}