diff --git a/net/rtsp/src/rtspsrc/imp.rs b/net/rtsp/src/rtspsrc/imp.rs index 3306188a..35cb460e 100644 --- a/net/rtsp/src/rtspsrc/imp.rs +++ b/net/rtsp/src/rtspsrc/imp.rs @@ -849,6 +849,7 @@ impl RtspSrc { if let Some(ttl) = ttl { let _ = rtp_socket.set_multicast_ttl_v4(*ttl as u32); } + let _ = rtp_socket.set_multicast_loop_v4(false); if let Some(rtcp_socket) = &rtcp_socket { if let Err(err) = rtcp_socket.join_multicast_v4(*addr, Ipv4Addr::UNSPECIFIED) @@ -857,20 +858,25 @@ impl RtspSrc { CAT, "Failed to join RTCP multicast address {addr}: {err:?}" ); - } - if let Some(ttl) = ttl { - let _ = rtcp_socket.set_multicast_ttl_v4(*ttl as u32); + } else { + if let Some(ttl) = ttl { + let _ = rtcp_socket.set_multicast_ttl_v4(*ttl as u32); + } + let _ = rtcp_socket.set_multicast_loop_v4(false); } } } IpAddr::V6(addr) => { rtp_socket.join_multicast_v6(addr, 0)?; + let _ = rtp_socket.set_multicast_loop_v6(false); if let Some(rtcp_socket) = &rtcp_socket { if let Err(err) = rtcp_socket.join_multicast_v6(addr, 0) { gst::warning!( CAT, "Failed to join RTCP multicast address {addr}: {err:?}" ); + } else { + let _ = rtcp_socket.set_multicast_loop_v6(false); } } } @@ -885,16 +891,18 @@ impl RtspSrc { rtp_appsrc, settings.timeout, settings.receive_mtu, + None, ) .await })); // Spawn RTCP udp send/recv task if let Some(rtcp_socket) = rtcp_socket { + let rtcp_dest = rtcp_port.and_then(|p| Some(SocketAddr::new(*dest, p))); let rtcp_appsrc = self.make_rtcp_appsrc(rtpsession_n, &manager)?; self.make_rtcp_appsink(rtpsession_n, &manager, on_rtcp)?; state.handles.push(RUNTIME.spawn(async move { - udp_rtcp_task(&rtcp_socket, rtcp_appsrc, rx).await + udp_rtcp_task(&rtcp_socket, rtcp_appsrc, rtcp_dest, true, rx).await })); } } @@ -912,18 +920,18 @@ impl RtspSrc { ); continue; }; - if let Some((server_rtp_port, server_rtcp_port)) = server_port { - let _ = rtp_socket - .connect(&format!( - "{}:{server_rtp_port}", - source.as_ref().expect("Must have source address") - )) - .await; - if let (Some(source), Some(port), Some(s)) = - (source, server_rtcp_port, rtcp_socket.as_ref()) - { - let _ = s.connect(&format!("{source}:{port}")).await; + let (rtp_sender_addr, rtcp_sender_addr) = match (source, server_port) { + (Some(ip), Some((rtp_port, Some(rtcp_port)))) => { + let ip = ip.parse().unwrap(); + ( + Some(SocketAddr::new(ip, *rtp_port)), + Some(SocketAddr::new(ip, *rtcp_port)), + ) } + (Some(ip), Some((rtp_port, None))) => { + (Some(SocketAddr::new(ip.parse().unwrap(), *rtp_port)), None) + } + _ => (None, None), }; // Spawn RTP udp receive task @@ -935,6 +943,7 @@ impl RtspSrc { rtp_appsrc, settings.timeout, settings.receive_mtu, + rtp_sender_addr, ) .await })); @@ -944,7 +953,8 @@ impl RtspSrc { let rtcp_appsrc = self.make_rtcp_appsrc(rtpsession_n, &manager)?; self.make_rtcp_appsink(rtpsession_n, &manager, on_rtcp)?; state.handles.push(RUNTIME.spawn(async move { - udp_rtcp_task(&rtcp_socket, rtcp_appsrc, rx).await + udp_rtcp_task(&rtcp_socket, rtcp_appsrc, rtcp_sender_addr, false, rx) + .await })); } } @@ -1860,19 +1870,17 @@ async fn udp_rtp_task( appsrc: gst_app::AppSrc, timeout: gst::ClockTime, receive_mtu: u32, + sender_addr: Option, ) { let t = Duration::from_secs(timeout.into()); - let addr = match socket.peer_addr() { - Ok(addr) => addr, - // We would not be connected if the server didn't give us a Transport header or its - // Transport header didn't specify the server port, so we don't know the sender port from - // which we will get data till we get the first packet here. - Err(_) => { + let sender_addr = match sender_addr { + Some(addr) => addr, + // Server didn't give us a Transport header or its Transport header didn't specify the + // server port, so we don't know the sender port from which we will get data till we get + // the first packet here. + None => { let ret = match time::timeout(t, socket.peek_sender()).await { - Ok(Ok(addr)) => { - let _ = socket.connect(addr).await; - Ok(addr) - } + Ok(Ok(addr)) => Ok(addr), Ok(Err(_elapsed)) => Err(format!( "No data after {} seconds, exiting", timeout.seconds() @@ -1893,9 +1901,10 @@ async fn udp_rtp_task( } } }; + gst::info!(CAT, "Receiving from address {sender_addr:?}"); let gio_addr = { - let inet_addr: gio::InetAddress = addr.ip().into(); - gio::InetSocketAddress::new(&inet_addr, addr.port()) + let inet_addr: gio::InetAddress = sender_addr.ip().into(); + gio::InetSocketAddress::new(&inet_addr, sender_addr.port()) }; let mut size = receive_mtu; let caps = appsrc.caps(); @@ -1911,8 +1920,12 @@ async fn udp_rtp_task( let Ok(mut map) = buffer.into_mapped_buffer_writable() else { break "Failed to map buffer writable".to_string(); }; - match time::timeout(t, socket.recv(map.as_mut_slice())).await { - Ok(Ok(len)) => { + match time::timeout(t, socket.recv_from(map.as_mut_slice())).await { + Ok(Ok((len, addr))) => { + // Ignore packets from the wrong sender + if addr != sender_addr { + continue; + } if size < UDP_PACKET_MAX_SIZE && len == size as usize { gst::warning!( CAT, @@ -1936,6 +1949,7 @@ async fn udp_rtp_task( bufref.set_size(len); bufref.set_dts(t); gst_net::NetAddressMeta::add(bufref, &gio_addr); + gst::trace!(CAT, "received RTP packet from {addr:?}"); if let Err(err) = appsrc.push_buffer(buffer) { break format!("UDP buffer push failed: {err:?}"); } @@ -1957,27 +1971,27 @@ async fn udp_rtp_task( async fn udp_rtcp_task( socket: &UdpSocket, appsrc: gst_app::AppSrc, + mut sender_addr: Option, + is_multicast: bool, mut rx: mpsc::Receiver>, ) { - // The socket might not be connected if the server either didn't specify a server_port for - // RTCP, or if the server didn't send a Transport header in the SETUP response at all. - // In that case, we will connect when we get an RTCP packet. - let mut is_connected = socket.peer_addr().is_ok(); let mut buf = vec![0; UDP_PACKET_MAX_SIZE as usize]; let mut cache: LruCache<_, _> = LruCache::new(NonZeroUsize::new(RTCP_ADDR_CACHE_SIZE).unwrap()); let error = loop { tokio::select! { send_rtcp = rx.recv() => match send_rtcp { - Some(data) => match socket.send(data.as_ref()).await { - Ok(_) => gst::debug!(CAT, "Sent RTCP RR"), - Err(err) => { - if !is_connected { - gst::warning!(CAT, "Can't send RTCP yet: don't have dest addr"); - } else { + // The server either didn't specify a server_port for RTCP, or if the server didn't + // send a Transport header in the SETUP response at all. + Some(data) => if let Some(addr) = sender_addr.as_ref() { + match socket.send_to(data.as_ref(), addr).await { + Ok(_) => gst::debug!(CAT, "Sent RTCP RR packet"), + Err(err) => { rx.close(); break format!("RTCP send error: {err:?}, stopping task"); } } + } else { + gst::warning!(CAT, "Can't send RTCP yet: don't have dest addr"); }, None => { rx.close(); @@ -1986,11 +2000,15 @@ async fn udp_rtcp_task( }, recv_rtcp = socket.recv_from(&mut buf) => match recv_rtcp { Ok((len, addr)) => { - gst::debug!(CAT, "Received RTCP SR"); - if !is_connected { - gst::info!(CAT, "Delayed RTCP UDP connect to {addr:?}"); - let _ = socket.connect(addr).await; - is_connected = true; + gst::debug!(CAT, "Received RTCP packet"); + if let Some(sender_addr) = sender_addr { + // Ignore RTCP from the wrong sender + if !is_multicast && addr != sender_addr { + continue; + } + } else { + sender_addr.replace(addr); + gst::info!(CAT, "Delayed RTCP UDP send address: {addr:?}"); }; let t = appsrc.current_running_time(); let mut buffer = gst::Buffer::from_slice(buf[..len].to_owned());