diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 1fc15d78..f32d70b3 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -6386,6 +6386,20 @@ "type": "gchararray", "writable": true }, + "receive-mtu": { + "blurb": "Initial size of buffers to allocate in the buffer pool, will be increased if too small", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "1508", + "max": "-1", + "min": "0", + "mutable": "ready", + "readable": true, + "type": "guint", + "writable": true + }, "timeout": { "blurb": "Timeout for network activity, in nanoseconds", "conditionally-available": false, diff --git a/net/rtsp/README.md b/net/rtsp/README.md index 26d72e52..a66ed12f 100644 --- a/net/rtsp/README.md +++ b/net/rtsp/README.md @@ -30,7 +30,6 @@ Roughly in order of priority: * Credentials support * TLS/TCP support * NAT hole punching -* Allocate a buffer pool for receiving + pushing UDP packets * Allow ignoring specific streams (SDP medias) - Currently all available source pads must be linked * SRTP support diff --git a/net/rtsp/src/rtspsrc/imp.rs b/net/rtsp/src/rtspsrc/imp.rs index 5108b497..c5e79d67 100644 --- a/net/rtsp/src/rtspsrc/imp.rs +++ b/net/rtsp/src/rtspsrc/imp.rs @@ -50,12 +50,15 @@ use super::transport::RtspTransportInfo; const DEFAULT_LOCATION: Option = None; const DEFAULT_TIMEOUT: gst::ClockTime = gst::ClockTime::from_seconds(2); const DEFAULT_PORT_START: u16 = 0; - // Priority list has multicast first, because we want to prefer multicast if it's available const DEFAULT_PROTOCOLS: &str = "udp-mcast,udp,tcp"; +// Equal to MTU + 8 by default to avoid incorrectly detecting an MTU sized buffer as having +// possibly overflown our receive buffer, and triggering a doubling of the buffer sizes. +const DEFAULT_RECEIVE_MTU: u32 = 1500 + 8; + const MAX_MESSAGE_SIZE: usize = 1024 * 1024; const MAX_BIND_PORT_RETRY: u16 = 100; -const UDP_PACKET_MAX_SIZE: usize = 65535 - 8; +const UDP_PACKET_MAX_SIZE: u32 = 65535 - 8; static RTCP_CAPS: Lazy = Lazy::new(|| gst::Caps::from(gst::Structure::new_empty("application/x-rtcp"))); @@ -91,6 +94,7 @@ struct Settings { port_start: u16, protocols: Vec, timeout: gst::ClockTime, + receive_mtu: u32, } impl Default for Settings { @@ -100,6 +104,7 @@ impl Default for Settings { port_start: DEFAULT_PORT_START, timeout: DEFAULT_TIMEOUT, protocols: parse_protocols_str(DEFAULT_PROTOCOLS).unwrap(), + receive_mtu: DEFAULT_RECEIVE_MTU, } } } @@ -262,6 +267,12 @@ impl ObjectImpl for RtspSrc { fn properties() -> &'static [glib::ParamSpec] { static PROPERTIES: Lazy> = Lazy::new(|| { vec![ + glib::ParamSpecUInt::builder("receive-mtu") + .nick("Receive packet size") + .blurb("Initial size of buffers to allocate in the buffer pool, will be increased if too small") + .default_value(DEFAULT_RECEIVE_MTU) + .mutable_ready() + .build(), glib::ParamSpecString::builder("location") .nick("Location") .blurb("RTSP server, credentials and media path, e.g. rtsp://user:p4ssw0rd@camera-5.local:8554/h264_1080p30") @@ -299,6 +310,11 @@ impl ObjectImpl for RtspSrc { } fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { let res = match pspec.name() { + "receive-mtu" => { + let mut settings = self.settings.lock().unwrap(); + settings.receive_mtu = value.get::().expect("type checked upstream"); + Ok(()) + } "location" => { let location = value.get::>().expect("type checked upstream"); self.set_location(location) @@ -343,6 +359,10 @@ impl ObjectImpl for RtspSrc { fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { match pspec.name() { + "receive-mtu" => { + let settings = self.settings.lock().unwrap(); + settings.receive_mtu.to_value() + } "location" => { let settings = self.settings.lock().unwrap(); let location = settings.location.as_ref().map(Url::to_string); @@ -855,7 +875,13 @@ impl RtspSrc { p.rtp_appsrc = Some(rtp_appsrc.clone()); // Spawn RTP udp receive task state.handles.push(RUNTIME.spawn(async move { - udp_rtp_task(&rtp_socket, rtp_appsrc, settings.timeout).await + udp_rtp_task( + &rtp_socket, + rtp_appsrc, + settings.timeout, + settings.receive_mtu, + ) + .await })); // Spawn RTCP udp send/recv task @@ -899,7 +925,13 @@ impl RtspSrc { let rtp_appsrc = self.make_rtp_appsrc(rtpsession_n, &p.caps, &manager)?; p.rtp_appsrc = Some(rtp_appsrc.clone()); state.handles.push(RUNTIME.spawn(async move { - udp_rtp_task(&rtp_socket, rtp_appsrc, settings.timeout).await + udp_rtp_task( + &rtp_socket, + rtp_appsrc, + settings.timeout, + settings.receive_mtu, + ) + .await })); // Spawn RTCP udp send/recv task @@ -1965,9 +1997,12 @@ fn on_rtcp_tcp( } } -async fn udp_rtp_task(socket: &UdpSocket, appsrc: gst_app::AppSrc, timeout: gst::ClockTime) { - // TODO: this should allocate a buffer pool to avoid a copy - let mut buf = vec![0; UDP_PACKET_MAX_SIZE]; +async fn udp_rtp_task( + socket: &UdpSocket, + appsrc: gst_app::AppSrc, + timeout: gst::ClockTime, + receive_mtu: u32, +) { let t = Duration::from_secs(timeout.into()); // We would not be connected if the server didn't give us a Transport header or its Transport // header didn't specify the server port, so we don't know the sender port from which we will @@ -1991,12 +2026,43 @@ async fn udp_rtp_task(socket: &UdpSocket, appsrc: gst_app::AppSrc, timeout: gst: return; } } + let mut size = receive_mtu; + let caps = appsrc.caps(); + let mut pool = gst::BufferPool::new(); + let mut config = pool.config(); + config.set_params(caps.as_ref(), size, 2, 0); + pool.set_config(config).unwrap(); + pool.set_active(true).unwrap(); let error = loop { - match time::timeout(t, socket.recv(&mut buf)).await { + let Ok(buffer) = pool.acquire_buffer(None) else { + break format!("Failed to acquire buffer"); + }; + let Ok(mut map) = buffer.into_mapped_buffer_writable() else { + break format!("Failed to map buffer writable"); + }; + match time::timeout(t, socket.recv(map.as_mut_slice())).await { Ok(Ok(len)) => { + if size < UDP_PACKET_MAX_SIZE && len == size as usize { + gst::warning!( + CAT, + "Data maybe lost: UDP buffer size {size} filled, doubling" + ); + size = (size * 2).min(UDP_PACKET_MAX_SIZE); + if let Err(err) = pool.set_active(false) { + break format!("Failed to deactivate buffer pool: {err:?}"); + } + pool = gst::BufferPool::new(); + let mut config = pool.config(); + config.set_params(caps.as_ref(), size, 2, 0); + pool.set_config(config).unwrap(); + if let Err(err) = pool.set_active(true) { + break format!("Failed to reallocate buffer pool: {err:?}"); + } + } let t = appsrc.current_running_time(); - let mut buffer = gst::Buffer::from_slice(buf[..len].to_owned()); + let mut buffer = map.into_buffer(); let bufref = buffer.make_mut(); + bufref.set_size(len); bufref.set_dts(t); if let Err(err) = appsrc.push_buffer(buffer) { break format!("UDP buffer push failed: {err:?}"); @@ -2023,7 +2089,7 @@ async fn udp_rtcp_task( // RTCP, or if the server didn't send a Transport header in the SETUP response at all. // In that case, we will connect when we get an RTCP packet. let mut is_connected = socket.peer_addr().is_ok(); - let mut buf = vec![0; UDP_PACKET_MAX_SIZE]; + let mut buf = vec![0; UDP_PACKET_MAX_SIZE as usize]; let error = loop { tokio::select! { send_rtcp = rx.recv() => match send_rtcp {