rtspsrc2: Allocate a buffer pool for UDP RTP data

Control the size with a receive-mtu property

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1425>
This commit is contained in:
Nirbheek Chauhan 2024-01-14 02:20:20 +05:30
parent 44e49a06a0
commit 437326ebfd
3 changed files with 90 additions and 11 deletions

View file

@ -6386,6 +6386,20 @@
"type": "gchararray",
"writable": true
},
"receive-mtu": {
"blurb": "Initial size of buffers to allocate in the buffer pool, will be increased if too small",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "1508",
"max": "-1",
"min": "0",
"mutable": "ready",
"readable": true,
"type": "guint",
"writable": true
},
"timeout": {
"blurb": "Timeout for network activity, in nanoseconds",
"conditionally-available": false,

View file

@ -30,7 +30,6 @@ Roughly in order of priority:
* Credentials support
* TLS/TCP support
* NAT hole punching
* Allocate a buffer pool for receiving + pushing UDP packets
* Allow ignoring specific streams (SDP medias)
- Currently all available source pads must be linked
* SRTP support

View file

@ -50,12 +50,15 @@ use super::transport::RtspTransportInfo;
const DEFAULT_LOCATION: Option<Url> = None;
const DEFAULT_TIMEOUT: gst::ClockTime = gst::ClockTime::from_seconds(2);
const DEFAULT_PORT_START: u16 = 0;
// Priority list has multicast first, because we want to prefer multicast if it's available
const DEFAULT_PROTOCOLS: &str = "udp-mcast,udp,tcp";
// Equal to MTU + 8 by default to avoid incorrectly detecting an MTU sized buffer as having
// possibly overflown our receive buffer, and triggering a doubling of the buffer sizes.
const DEFAULT_RECEIVE_MTU: u32 = 1500 + 8;
const MAX_MESSAGE_SIZE: usize = 1024 * 1024;
const MAX_BIND_PORT_RETRY: u16 = 100;
const UDP_PACKET_MAX_SIZE: usize = 65535 - 8;
const UDP_PACKET_MAX_SIZE: u32 = 65535 - 8;
static RTCP_CAPS: Lazy<gst::Caps> =
Lazy::new(|| gst::Caps::from(gst::Structure::new_empty("application/x-rtcp")));
@ -91,6 +94,7 @@ struct Settings {
port_start: u16,
protocols: Vec<RtspProtocol>,
timeout: gst::ClockTime,
receive_mtu: u32,
}
impl Default for Settings {
@ -100,6 +104,7 @@ impl Default for Settings {
port_start: DEFAULT_PORT_START,
timeout: DEFAULT_TIMEOUT,
protocols: parse_protocols_str(DEFAULT_PROTOCOLS).unwrap(),
receive_mtu: DEFAULT_RECEIVE_MTU,
}
}
}
@ -262,6 +267,12 @@ impl ObjectImpl for RtspSrc {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpecUInt::builder("receive-mtu")
.nick("Receive packet size")
.blurb("Initial size of buffers to allocate in the buffer pool, will be increased if too small")
.default_value(DEFAULT_RECEIVE_MTU)
.mutable_ready()
.build(),
glib::ParamSpecString::builder("location")
.nick("Location")
.blurb("RTSP server, credentials and media path, e.g. rtsp://user:p4ssw0rd@camera-5.local:8554/h264_1080p30")
@ -299,6 +310,11 @@ impl ObjectImpl for RtspSrc {
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let res = match pspec.name() {
"receive-mtu" => {
let mut settings = self.settings.lock().unwrap();
settings.receive_mtu = value.get::<u32>().expect("type checked upstream");
Ok(())
}
"location" => {
let location = value.get::<Option<&str>>().expect("type checked upstream");
self.set_location(location)
@ -343,6 +359,10 @@ impl ObjectImpl for RtspSrc {
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"receive-mtu" => {
let settings = self.settings.lock().unwrap();
settings.receive_mtu.to_value()
}
"location" => {
let settings = self.settings.lock().unwrap();
let location = settings.location.as_ref().map(Url::to_string);
@ -855,7 +875,13 @@ impl RtspSrc {
p.rtp_appsrc = Some(rtp_appsrc.clone());
// Spawn RTP udp receive task
state.handles.push(RUNTIME.spawn(async move {
udp_rtp_task(&rtp_socket, rtp_appsrc, settings.timeout).await
udp_rtp_task(
&rtp_socket,
rtp_appsrc,
settings.timeout,
settings.receive_mtu,
)
.await
}));
// Spawn RTCP udp send/recv task
@ -899,7 +925,13 @@ impl RtspSrc {
let rtp_appsrc = self.make_rtp_appsrc(rtpsession_n, &p.caps, &manager)?;
p.rtp_appsrc = Some(rtp_appsrc.clone());
state.handles.push(RUNTIME.spawn(async move {
udp_rtp_task(&rtp_socket, rtp_appsrc, settings.timeout).await
udp_rtp_task(
&rtp_socket,
rtp_appsrc,
settings.timeout,
settings.receive_mtu,
)
.await
}));
// Spawn RTCP udp send/recv task
@ -1965,9 +1997,12 @@ fn on_rtcp_tcp(
}
}
async fn udp_rtp_task(socket: &UdpSocket, appsrc: gst_app::AppSrc, timeout: gst::ClockTime) {
// TODO: this should allocate a buffer pool to avoid a copy
let mut buf = vec![0; UDP_PACKET_MAX_SIZE];
async fn udp_rtp_task(
socket: &UdpSocket,
appsrc: gst_app::AppSrc,
timeout: gst::ClockTime,
receive_mtu: u32,
) {
let t = Duration::from_secs(timeout.into());
// We would not be connected if the server didn't give us a Transport header or its Transport
// header didn't specify the server port, so we don't know the sender port from which we will
@ -1991,12 +2026,43 @@ async fn udp_rtp_task(socket: &UdpSocket, appsrc: gst_app::AppSrc, timeout: gst:
return;
}
}
let mut size = receive_mtu;
let caps = appsrc.caps();
let mut pool = gst::BufferPool::new();
let mut config = pool.config();
config.set_params(caps.as_ref(), size, 2, 0);
pool.set_config(config).unwrap();
pool.set_active(true).unwrap();
let error = loop {
match time::timeout(t, socket.recv(&mut buf)).await {
let Ok(buffer) = pool.acquire_buffer(None) else {
break format!("Failed to acquire buffer");
};
let Ok(mut map) = buffer.into_mapped_buffer_writable() else {
break format!("Failed to map buffer writable");
};
match time::timeout(t, socket.recv(map.as_mut_slice())).await {
Ok(Ok(len)) => {
if size < UDP_PACKET_MAX_SIZE && len == size as usize {
gst::warning!(
CAT,
"Data maybe lost: UDP buffer size {size} filled, doubling"
);
size = (size * 2).min(UDP_PACKET_MAX_SIZE);
if let Err(err) = pool.set_active(false) {
break format!("Failed to deactivate buffer pool: {err:?}");
}
pool = gst::BufferPool::new();
let mut config = pool.config();
config.set_params(caps.as_ref(), size, 2, 0);
pool.set_config(config).unwrap();
if let Err(err) = pool.set_active(true) {
break format!("Failed to reallocate buffer pool: {err:?}");
}
}
let t = appsrc.current_running_time();
let mut buffer = gst::Buffer::from_slice(buf[..len].to_owned());
let mut buffer = map.into_buffer();
let bufref = buffer.make_mut();
bufref.set_size(len);
bufref.set_dts(t);
if let Err(err) = appsrc.push_buffer(buffer) {
break format!("UDP buffer push failed: {err:?}");
@ -2023,7 +2089,7 @@ async fn udp_rtcp_task(
// RTCP, or if the server didn't send a Transport header in the SETUP response at all.
// In that case, we will connect when we get an RTCP packet.
let mut is_connected = socket.peer_addr().is_ok();
let mut buf = vec![0; UDP_PACKET_MAX_SIZE];
let mut buf = vec![0; UDP_PACKET_MAX_SIZE as usize];
let error = loop {
tokio::select! {
send_rtcp = rx.recv() => match send_rtcp {