diff --git a/net/rtsp/README.md b/net/rtsp/README.md index d0cedfad..26d72e52 100644 --- a/net/rtsp/README.md +++ b/net/rtsp/README.md @@ -41,7 +41,6 @@ Roughly in order of priority: - Or TCP reconnection if UDP has not timed out * Parse SDP rtcp-fb attributes * Parse SDP ssrc attributes -* Don't require Transport header in SETUP response, it is optional * Clock sync support, such as RFC7273 * PAUSE support with VOD * Seeking support with VOD diff --git a/net/rtsp/src/rtspsrc/imp.rs b/net/rtsp/src/rtspsrc/imp.rs index d312e38c..2933ceaa 100644 --- a/net/rtsp/src/rtspsrc/imp.rs +++ b/net/rtsp/src/rtspsrc/imp.rs @@ -16,7 +16,7 @@ use std::convert::TryFrom; use std::fmt; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; use std::pin::Pin; -use std::sync::{Arc, Mutex}; +use std::sync::Mutex; use std::time::Duration; use anyhow::Result; @@ -51,6 +51,7 @@ const DEFAULT_LOCATION: Option = None; const DEFAULT_TIMEOUT: gst::ClockTime = gst::ClockTime::from_seconds(2); const DEFAULT_PORT_START: u16 = 0; +// Priority list has multicast first, because we want to prefer multicast if it's available const DEFAULT_PROTOCOLS: &str = "udp-mcast,udp,tcp"; const MAX_MESSAGE_SIZE: usize = 1024 * 1024; const MAX_BIND_PORT_RETRY: u16 = 100; @@ -545,7 +546,7 @@ impl RtspSrc { Err(err) => { gst::element_imp_error!( task_src, - gst::CoreError::Failed, + gst::ResourceError::OpenRead, ["Failed to connect to RTSP server: {err:#?}"] ); return; @@ -852,30 +853,23 @@ impl RtspSrc { let rtp_appsrc = self.make_rtp_appsrc(rtpsession_n, &p.caps, &manager)?; p.rtp_appsrc = Some(rtp_appsrc.clone()); - // Spawn RTP udpsrc task + // Spawn RTP udp receive task state.handles.push(RUNTIME.spawn(async move { - udpsrc_task(&rtp_socket, rtp_appsrc, Some(settings.timeout)).await + udp_rtp_task(&rtp_socket, rtp_appsrc, settings.timeout).await })); - // Spawn RTCP udpsrc task + // Spawn RTCP udp send/recv task if let Some(rtcp_socket) = rtcp_socket { let rtcp_appsrc = self.make_rtcp_appsrc(rtpsession_n, &manager)?; - let socket = Arc::new(rtcp_socket); - let sock = socket.clone(); - state.handles.push( - RUNTIME - .spawn(async move { udpsrc_task(&sock, rtcp_appsrc, None).await }), - ); - // Spawn RTCP RR udpsink task self.make_rtcp_appsink(rtpsession_n, &manager, on_rtcp)?; - state - .handles - .push(RUNTIME.spawn(async move { udpsink_task(&socket, rx).await })); + state.handles.push(RUNTIME.spawn(async move { + udp_rtcp_task(&rtcp_socket, rtcp_appsrc, rx).await + })); } } RtspTransportInfo::Udp { source, - server_port: (server_rtp_port, server_rtcp_port), + server_port, client_port: _, sockets, } => { @@ -887,41 +881,34 @@ impl RtspSrc { ); continue; }; + if let Some((server_rtp_port, server_rtcp_port)) = server_port { + let _ = rtp_socket + .connect(&format!( + "{}:{server_rtp_port}", + source.as_ref().expect("Must have source address") + )) + .await; + if let (Some(source), Some(port), Some(s)) = + (source, server_rtcp_port, rtcp_socket.as_ref()) + { + let _ = s.connect(&format!("{source}:{port}")).await; + } + }; - let _ = rtp_socket - .connect(&format!( - "{}:{server_rtp_port}", - source.as_ref().expect("Must have source address") - )) - .await; - - if let (Some(source), Some(port), Some(s)) = - (source, server_rtcp_port, rtcp_socket.as_ref()) - { - let _ = s.connect(&format!("{source}:{port}")).await; - } - - // Spawn RTP udpsrc task + // Spawn RTP udp receive task let rtp_appsrc = self.make_rtp_appsrc(rtpsession_n, &p.caps, &manager)?; p.rtp_appsrc = Some(rtp_appsrc.clone()); state.handles.push(RUNTIME.spawn(async move { - udpsrc_task(&rtp_socket, rtp_appsrc, Some(settings.timeout)).await + udp_rtp_task(&rtp_socket, rtp_appsrc, settings.timeout).await })); + // Spawn RTCP udp send/recv task if let Some(rtcp_socket) = rtcp_socket { - // Spawn RTCP SR udpsrc task let rtcp_appsrc = self.make_rtcp_appsrc(rtpsession_n, &manager)?; - let socket = Arc::new(rtcp_socket); - let sock = socket.clone(); - state.handles.push( - RUNTIME - .spawn(async move { udpsrc_task(&sock, rtcp_appsrc, None).await }), - ); - // Spawn RTCP RR udpsink task self.make_rtcp_appsink(rtpsession_n, &manager, on_rtcp)?; - state - .handles - .push(RUNTIME.spawn(async move { udpsink_task(&socket, rx).await })); + state.handles.push(RUNTIME.spawn(async move { + udp_rtcp_task(&rtcp_socket, rtcp_appsrc, rx).await + })); } } RtspTransportInfo::Tcp { @@ -1415,7 +1402,7 @@ impl RtspTaskState { } fn parse_setup_transports( - transports: Transports, + transports: &Transports, s: &mut gst::Structure, protocols: &[RtspProtocol], mode: &TransportMode, @@ -1664,10 +1651,11 @@ impl RtspTaskState { } self.cseq += 1; + let transports: Transports = transports.as_slice().into(); let req = Request::builder(Method::Setup, self.version) .typed_header::(&self.cseq.into()) .header(USER_AGENT, DEFAULT_USER_AGENT) - .typed_header::(&transports.as_slice().into()) + .typed_header::(&transports) .request_uri(control_url.clone()); let req = if let Some(s) = session { req.typed_header::(s) @@ -1700,13 +1688,17 @@ impl RtspTaskState { // Manually strip timeout field: https://github.com/sdroege/rtsp-types/issues/24 session.replace(Session(new_session.0, None)); let mut parsed_transport = if let Some(transports) = rsp.typed_header::()? { - Self::parse_setup_transports(transports, &mut s, &protocols, &mode) + Self::parse_setup_transports(&transports, &mut s, &protocols, &mode) } else { - // FIXME: Transport header in response is optional + // Transport header in response is optional if only one transport was offered // https://datatracker.ietf.org/doc/html/rfc2326#section-12.39 - Err(RtspError::InvalidMessage( - "No transport header in SETUP response", - )) + if transports.len() == 1 { + Self::parse_setup_transports(&transports, &mut s, &protocols, &mode) + } else { + Err(RtspError::InvalidMessage( + "No transport header in SETUP response", + )) + } }?; match &mut parsed_transport { RtspTransportInfo::UdpMulticast { .. } => {} @@ -1953,11 +1945,33 @@ fn on_rtcp_tcp( } } -async fn udpsrc_task(socket: &UdpSocket, appsrc: gst_app::AppSrc, timeout: Option) { +async fn udp_rtp_task(socket: &UdpSocket, appsrc: gst_app::AppSrc, timeout: gst::ClockTime) { // TODO: this should allocate a buffer pool to avoid a copy let mut buf = vec![0; UDP_PACKET_MAX_SIZE]; - let t = Duration::from_secs(timeout.unwrap_or(gst::ClockTime::MAX).into()); - loop { + let t = Duration::from_secs(timeout.into()); + // We would not be connected if the server didn't give us a Transport header or its Transport + // header didn't specify the server port, so we don't know the sender port from which we will + // get data till we get the first packet here. + if !socket.peer_addr().is_ok() { + let ret = match time::timeout(t, socket.peek_sender()).await { + Ok(Ok(addr)) => { + let _ = socket.connect(addr).await; + Ok(()) + } + Ok(Err(_elapsed)) => Err(format!("No data after {DEFAULT_TIMEOUT} seconds, exiting")), + Err(err) => Err(format!("UDP socket was closed: {err:?}")), + }; + if let Err(s) = ret { + gst::element_error!( + appsrc, + gst::ResourceError::Failed, + ("{}", s), + ["{:#?}", socket] + ); + return; + } + } + let error = loop { match time::timeout(t, socket.recv(&mut buf)).await { Ok(Ok(len)) => { let t = appsrc.current_running_time(); @@ -1965,56 +1979,76 @@ async fn udpsrc_task(socket: &UdpSocket, appsrc: gst_app::AppSrc, timeout: Optio let bufref = buffer.make_mut(); bufref.set_dts(t); if let Err(err) = appsrc.push_buffer(buffer) { - gst::element_error!( - appsrc, - gst::ResourceError::Failed, - ("UDP buffer push failed: {:?}", err), - ["{:#?}", socket] - ); - break; + break format!("UDP buffer push failed: {err:?}"); } } - Ok(Err(_elapsed)) => { - gst::element_error!( - appsrc, - gst::ResourceError::Failed, - ["No data received after {DEFAULT_TIMEOUT} seconds, exiting"] - ); - break; - } - Err(err) => { - gst::element_error!( - appsrc, - gst::ResourceError::Close, - ("UDP socket was closed: {:?}", err), - ["{:#?}", socket] - ); - break; - } + Ok(Err(_elapsed)) => break format!("No data after {DEFAULT_TIMEOUT} seconds, exiting"), + Err(err) => break format!("UDP socket was closed: {err:?}"), }; - } + }; + gst::element_error!( + appsrc, + gst::ResourceError::Failed, + ("{}", error), + ["{:#?}", socket] + ); } -async fn udpsink_task(socket: &UdpSocket, mut rx: mpsc::Receiver>) { - loop { - match rx.recv().await { - Some(data) => match socket.send(data.as_ref()).await { - Ok(_) => { - gst::debug!(CAT, "Sent RTCP RR"); - } - Err(err) => { - gst::error!(CAT, "UDP socket send error: {err:?}, quitting loop"); +async fn udp_rtcp_task( + socket: &UdpSocket, + appsrc: gst_app::AppSrc, + mut rx: mpsc::Receiver>, +) { + // The socket might not be connected if the server either didn't specify a server_port for + // RTCP, or if the server didn't send a Transport header in the SETUP response at all. + // In that case, we will connect when we get an RTCP packet. + let mut is_connected = socket.peer_addr().is_ok(); + let mut buf = vec![0; UDP_PACKET_MAX_SIZE]; + let error = loop { + tokio::select! { + send_rtcp = rx.recv() => match send_rtcp { + Some(data) => match socket.send(data.as_ref()).await { + Ok(_) => gst::debug!(CAT, "Sent RTCP RR"), + Err(err) => { + if !is_connected { + gst::warning!(CAT, "Can't send RTCP yet: don't have dest addr"); + } else { + rx.close(); + break format!("RTCP send error: {err:?}, stopping task"); + } + } + }, + None => { rx.close(); - break; + break format!("UDP socket {socket:?} closed, no more RTCP will be sent"); } }, - None => { - gst::info!(CAT, "UDP socket {socket:?} closed, quitting loop"); - rx.close(); - break; - } - }; - } + recv_rtcp = socket.recv_from(&mut buf) => match recv_rtcp { + Ok((len, addr)) => { + gst::debug!(CAT, "Received RTCP SR"); + if !is_connected { + gst::info!(CAT, "Delayed RTCP UDP connect to {addr:?}"); + let _ = socket.connect(addr).await; + is_connected = true; + }; + let t = appsrc.current_running_time(); + let mut buffer = gst::Buffer::from_slice(buf[..len].to_owned()); + let bufref = buffer.make_mut(); + bufref.set_dts(t); + if let Err(err) = appsrc.push_buffer(buffer) { + break format!("UDP buffer push failed: {err:?}"); + } + } + Err(err) => break format!("UDP socket was closed: {err:?}"), + }, + } + }; + gst::element_error!( + appsrc, + gst::ResourceError::Failed, + ("{}", error), + ["{:#?}", socket] + ); } #[glib::object_subclass] diff --git a/net/rtsp/src/rtspsrc/transport.rs b/net/rtsp/src/rtspsrc/transport.rs index f4573902..d1b84119 100644 --- a/net/rtsp/src/rtspsrc/transport.rs +++ b/net/rtsp/src/rtspsrc/transport.rs @@ -22,7 +22,7 @@ pub enum RtspTransportInfo { }, Udp { source: Option, - server_port: (u16, Option), + server_port: Option<(u16, Option)>, client_port: Option<(u16, Option)>, sockets: Option<(UdpSocket, Option)>, }, @@ -73,15 +73,9 @@ impl TryFrom<&RtpTransport> for RtspTransportInfo { ttl: t.params.ttl, }) } else { - let Some(server_port) = t.params.server_port else { - return Err(RtspError::Fatal(format!( - "Need server unicast UDP port(s): {:#?}", - t.params, - ))); - }; Ok(RtspTransportInfo::Udp { source: t.params.source.clone(), - server_port, + server_port: t.params.server_port, client_port: t.params.client_port, sockets: None, })