diff --git a/Cargo.lock b/Cargo.lock index b57c7e0f..18990562 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -60,6 +60,18 @@ dependencies = [ "opaque-debug", ] +[[package]] +name = "ahash" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77c3a9648d43b9cd48db467b3f87fdd6e146bcc88ab0180006cef2179fe11d01" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy 0.7.32", +] + [[package]] name = "aho-corasick" version = "1.1.2" @@ -69,6 +81,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -2633,7 +2651,9 @@ dependencies = [ "gst-plugin-version-helper", "gstreamer", "gstreamer-app", + "gstreamer-net", "gstreamer-pbutils", + "lru", "once_cell", "rtsp-types", "sdp-types", @@ -3409,6 +3429,10 @@ name = "hashbrown" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +dependencies = [ + "ahash", + "allocator-api2", +] [[package]] name = "headers" @@ -4104,7 +4128,7 @@ dependencies = [ "shell-words", "thiserror", "tokio", - "zerocopy", + "zerocopy 0.6.6", ] [[package]] @@ -4204,6 +4228,15 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "lru" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db2c024b41519440580066ba82aab04092b333e09066a5eb86c7c4890df31f22" +dependencies = [ + "hashbrown 0.14.3", +] + [[package]] name = "m3u8-rs" version = "5.0.5" @@ -6933,7 +6966,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "854e949ac82d619ee9a14c66a1b674ac730422372ccb759ce0c39cabcf2bf8e6" dependencies = [ "byteorder", - "zerocopy-derive", + "zerocopy-derive 0.6.6", +] + +[[package]] +name = "zerocopy" +version = "0.7.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be" +dependencies = [ + "zerocopy-derive 0.7.32", ] [[package]] @@ -6947,6 +6989,17 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "zerocopy-derive" +version = "0.7.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "zeroize" version = "1.7.0" diff --git a/net/rtsp/Cargo.toml b/net/rtsp/Cargo.toml index aabc5b83..af1a63e4 100644 --- a/net/rtsp/Cargo.toml +++ b/net/rtsp/Cargo.toml @@ -15,8 +15,10 @@ data-encoding = "2.4" futures = "0.3" gst = { workspace = true, features = ["v1_20"] } gst-app = { workspace = true, features = ["v1_20"] } +gst-net = { workspace = true, features = ["v1_20"] } gst-pbutils = { workspace = true, features = ["v1_20"] } once_cell.workspace = true +lru = "0.12" rtsp-types = "0.1" sdp-types = "0.1" socket2 = "0.5" @@ -50,4 +52,4 @@ versioning = false import_library = false [package.metadata.capi.pkg_config] -requires_private = "gstreamer-1.0, gobject-2.0, glib-2.0, gmodule-2.0" +requires_private = "gstreamer-1.0, gstreamer-net-1.0, gobject-2.0, glib-2.0, gmodule-2.0" diff --git a/net/rtsp/src/rtspsrc/imp.rs b/net/rtsp/src/rtspsrc/imp.rs index 2fc3ba9c..34563b41 100644 --- a/net/rtsp/src/rtspsrc/imp.rs +++ b/net/rtsp/src/rtspsrc/imp.rs @@ -15,6 +15,7 @@ use std::collections::{btree_set::BTreeSet, HashMap}; use std::convert::TryFrom; use std::fmt; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; +use std::num::NonZeroUsize; use std::pin::Pin; use std::sync::Mutex; use std::time::Duration; @@ -37,12 +38,14 @@ use rtsp_types::headers::{ }; use rtsp_types::{Message, Method, Request, Response, StatusCode, Version}; +use lru::LruCache; use url::Url; use gst::buffer::{MappedBuffer, Readable}; use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; +use gst_net::gio; use super::body::Body; use super::sdp; @@ -60,6 +63,7 @@ const DEFAULT_RECEIVE_MTU: u32 = 1500 + 8; const MAX_MESSAGE_SIZE: usize = 1024 * 1024; const MAX_BIND_PORT_RETRY: u16 = 100; const UDP_PACKET_MAX_SIZE: u32 = 65535 - 8; +const RTCP_ADDR_CACHE_SIZE: usize = 100; static RTCP_CAPS: Lazy = Lazy::new(|| gst::Caps::from(gst::Structure::new_empty("application/x-rtcp"))); @@ -1858,28 +1862,40 @@ async fn udp_rtp_task( receive_mtu: u32, ) { let t = Duration::from_secs(timeout.into()); - // We would not be connected if the server didn't give us a Transport header or its Transport - // header didn't specify the server port, so we don't know the sender port from which we will - // get data till we get the first packet here. - if socket.peer_addr().is_err() { - let ret = match time::timeout(t, socket.peek_sender()).await { - Ok(Ok(addr)) => { - let _ = socket.connect(addr).await; - Ok(()) + let addr = match socket.peer_addr() { + Ok(addr) => addr, + // We would not be connected if the server didn't give us a Transport header or its + // Transport header didn't specify the server port, so we don't know the sender port from + // which we will get data till we get the first packet here. + Err(_) => { + let ret = match time::timeout(t, socket.peek_sender()).await { + Ok(Ok(addr)) => { + let _ = socket.connect(addr).await; + Ok(addr) + } + Ok(Err(_elapsed)) => { + Err(format!("No data after {DEFAULT_TIMEOUT} seconds, exiting")) + } + Err(err) => Err(format!("UDP socket was closed: {err:?}")), + }; + match ret { + Ok(addr) => addr, + Err(err) => { + gst::element_error!( + appsrc, + gst::ResourceError::Failed, + ("{}", err), + ["{:#?}", socket] + ); + return; + } } - Ok(Err(_elapsed)) => Err(format!("No data after {DEFAULT_TIMEOUT} seconds, exiting")), - Err(err) => Err(format!("UDP socket was closed: {err:?}")), - }; - if let Err(s) = ret { - gst::element_error!( - appsrc, - gst::ResourceError::Failed, - ("{}", s), - ["{:#?}", socket] - ); - return; } - } + }; + let gio_addr = { + let inet_addr: gio::InetAddress = addr.ip().into(); + gio::InetSocketAddress::new(&inet_addr, addr.port()) + }; let mut size = receive_mtu; let caps = appsrc.caps(); let mut pool = gst::BufferPool::new(); @@ -1918,6 +1934,7 @@ async fn udp_rtp_task( let bufref = buffer.make_mut(); bufref.set_size(len); bufref.set_dts(t); + gst_net::NetAddressMeta::add(bufref, &gio_addr); if let Err(err) = appsrc.push_buffer(buffer) { break format!("UDP buffer push failed: {err:?}"); } @@ -1944,6 +1961,7 @@ async fn udp_rtcp_task( // In that case, we will connect when we get an RTCP packet. let mut is_connected = socket.peer_addr().is_ok(); let mut buf = vec![0; UDP_PACKET_MAX_SIZE as usize]; + let mut cache: LruCache<_, _> = LruCache::new(NonZeroUsize::new(RTCP_ADDR_CACHE_SIZE).unwrap()); let error = loop { tokio::select! { send_rtcp = rx.recv() => match send_rtcp { @@ -1975,6 +1993,11 @@ async fn udp_rtcp_task( let mut buffer = gst::Buffer::from_slice(buf[..len].to_owned()); let bufref = buffer.make_mut(); bufref.set_dts(t); + let gio_addr = cache.get_or_insert(addr, || { + let inet_addr: gio::InetAddress = addr.ip().into(); + gio::InetSocketAddress::new(&inet_addr, addr.port()) + }); + gst_net::NetAddressMeta::add(bufref, gio_addr); if let Err(err) = appsrc.push_buffer(buffer) { break format!("UDP buffer push failed: {err:?}"); }