gst-plugins-rs/net/rtsp/src/rtspsrc/imp.rs
Nirbheek Chauhan 975556c06b rtspsrc2: Allow a SETUP response without a Transports header
If we only send a single Transport in the Transports header, then the
server is allowed to omit it in the response. This has some strange
consequences for UDP transport: specifically, we have no idea what
addr/port we will get the packets from.

In those cases, we connect() on the socket when we receive the first
packet, so we can send RTCP RRs, and also so we can ensure that we
ignore data from other addresses.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1425>
2024-02-07 20:29:23 +05:30

2061 lines
76 KiB
Rust

// GStreamer RTSP Source 2
//
// Copyright (C) 2023 Tim-Philipp Müller <tim centricular com>
// Copyright (C) 2023-2024 Nirbheek Chauhan <nirbheek centricular com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
//
// https://www.rfc-editor.org/rfc/rfc2326.html
use std::collections::{btree_set::BTreeSet, HashMap};
use std::convert::TryFrom;
use std::fmt;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::pin::Pin;
use std::sync::Mutex;
use std::time::Duration;
use anyhow::Result;
use once_cell::sync::Lazy;
use futures::{Sink, SinkExt, Stream, StreamExt};
use socket2::Socket;
use tokio::net::{TcpStream, UdpSocket};
use tokio::runtime;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio::time;
use rtsp_types::headers::{
CSeq, NptRange, NptTime, Public, Range, RtpInfos, RtpLowerTransport, RtpProfile, RtpTransport,
RtpTransportParameters, Session, Transport, TransportMode, Transports, ACCEPT, CONTENT_BASE,
CONTENT_LOCATION, USER_AGENT,
};
use rtsp_types::{Message, Method, Request, Response, StatusCode, Version};
use url::Url;
use gst::buffer::{MappedBuffer, Readable};
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use super::body::Body;
use super::transport::RtspTransportInfo;
const DEFAULT_LOCATION: Option<Url> = None;
const DEFAULT_TIMEOUT: gst::ClockTime = gst::ClockTime::from_seconds(2);
const DEFAULT_PORT_START: u16 = 0;
// Priority list has multicast first, because we want to prefer multicast if it's available
const DEFAULT_PROTOCOLS: &str = "udp-mcast,udp,tcp";
const MAX_MESSAGE_SIZE: usize = 1024 * 1024;
const MAX_BIND_PORT_RETRY: u16 = 100;
const UDP_PACKET_MAX_SIZE: usize = 65535 - 8;
static RTCP_CAPS: Lazy<gst::Caps> =
Lazy::new(|| gst::Caps::from(gst::Structure::new_empty("application/x-rtcp")));
// Hardcoded for now
const DEFAULT_USER_AGENT: &str = concat!(
"GStreamer rtspsrc2 ",
env!("CARGO_PKG_VERSION"),
"-",
env!("COMMIT_ID")
);
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum RtspProtocol {
UdpMulticast,
Udp,
Tcp,
}
impl fmt::Display for RtspProtocol {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self {
RtspProtocol::Udp => write!(f, "udp"),
RtspProtocol::UdpMulticast => write!(f, "udp-mcast"),
RtspProtocol::Tcp => write!(f, "tcp"),
}
}
}
#[derive(Debug, Clone)]
struct Settings {
location: Option<Url>,
port_start: u16,
protocols: Vec<RtspProtocol>,
timeout: gst::ClockTime,
}
impl Default for Settings {
fn default() -> Self {
Settings {
location: DEFAULT_LOCATION,
port_start: DEFAULT_PORT_START,
timeout: DEFAULT_TIMEOUT,
protocols: parse_protocols_str(DEFAULT_PROTOCOLS).unwrap(),
}
}
}
#[derive(Debug)]
enum Commands {
Play,
//Pause,
Teardown(Option<oneshot::Sender<()>>),
Data(rtsp_types::Data<Body>),
}
#[derive(Debug, Default)]
pub struct RtspSrc {
settings: Mutex<Settings>,
task_handle: Mutex<Option<JoinHandle<()>>>,
command_queue: Mutex<Option<mpsc::Sender<Commands>>>,
}
#[derive(thiserror::Error, Debug)]
pub enum RtspError {
#[error("Generic I/O error")]
IOGeneric(#[from] std::io::Error),
#[error("Read I/O error")]
Read(#[from] super::tcp_message::ReadError),
#[error("RTSP header parse error")]
HeaderParser(#[from] rtsp_types::headers::HeaderParseError),
#[error("SDP parse error")]
SDPParser(#[from] sdp_types::ParserError),
#[error("Unexpected RTSP message: expected, received")]
UnexpectedMessage(&'static str, rtsp_types::Message<Body>),
#[error("Invalid RTSP message")]
InvalidMessage(&'static str),
#[error("Fatal error")]
Fatal(String),
}
pub(crate) static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"rtspsrc2",
gst::DebugColorFlags::empty(),
Some("RTSP source"),
)
});
static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap()
});
fn parse_protocols_str(s: &str) -> Result<Vec<RtspProtocol>, glib::Error> {
let mut acc = Vec::new();
if s.is_empty() {
return Err(glib::Error::new(
gst::CoreError::Failed,
"Protocols list is empty",
));
}
for each in s.split(',') {
match each {
"udp-mcast" => acc.push(RtspProtocol::UdpMulticast),
"udp" => acc.push(RtspProtocol::Udp),
"tcp" => acc.push(RtspProtocol::Tcp),
_ => {
return Err(glib::Error::new(
gst::CoreError::Failed,
&format!("Unsupported RTSP protocol: {each}"),
))
}
}
}
Ok(acc)
}
impl RtspSrc {
fn set_location(&self, uri: Option<&str>) -> Result<(), glib::Error> {
if self.obj().current_state() > gst::State::Ready {
return Err(glib::Error::new(
gst::URIError::BadState,
"Changing the 'location' property on a started 'rtspsrc2' is not supported",
));
}
let mut settings = self.settings.lock().unwrap();
let Some(uri) = uri else {
settings.location = DEFAULT_LOCATION;
return Ok(());
};
let uri = Url::parse(uri).map_err(|err| {
glib::Error::new(
gst::URIError::BadUri,
&format!("Failed to parse URI '{uri}': {err:?}"),
)
})?;
if uri.password().is_some() || !uri.username().is_empty() {
// TODO
gst::fixme!(CAT, "URI credentials are currently ignored");
}
match (uri.host_str(), uri.port()) {
(Some(_), Some(_)) | (Some(_), None) => Ok(()),
_ => Err(glib::Error::new(gst::URIError::BadUri, "Invalid host")),
}?;
let protocols: &[RtspProtocol] = match uri.scheme() {
"rtspu" => &[RtspProtocol::UdpMulticast, RtspProtocol::Udp],
"rtspt" => &[RtspProtocol::Tcp],
"rtsp" => &settings.protocols,
scheme => {
return Err(glib::Error::new(
gst::URIError::UnsupportedProtocol,
&format!("Unsupported URI scheme '{}'", scheme),
));
}
};
if !settings.protocols.iter().any(|p| protocols.contains(p)) {
return Err(glib::Error::new(
gst::URIError::UnsupportedProtocol,
&format!(
"URI scheme '{}' does not match allowed protocols: {:?}",
uri.scheme(),
settings.protocols,
),
));
}
settings.protocols = protocols.to_vec();
settings.location = Some(uri);
Ok(())
}
fn set_protocols(&self, protocol_s: Option<&str>) -> Result<(), glib::Error> {
if self.obj().current_state() > gst::State::Ready {
return Err(glib::Error::new(
gst::CoreError::Failed,
"Changing the 'protocols' property on a started 'rtspsrc2' is not supported",
));
}
let mut settings = self.settings.lock().unwrap();
settings.protocols = match protocol_s {
Some(s) => parse_protocols_str(s)?,
None => parse_protocols_str(DEFAULT_PROTOCOLS).unwrap(),
};
Ok(())
}
}
impl ObjectImpl for RtspSrc {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpecString::builder("location")
.nick("Location")
.blurb("RTSP server, credentials and media path, e.g. rtsp://user:p4ssw0rd@camera-5.local:8554/h264_1080p30")
.mutable_ready()
.build(),
// We purposely use port-start instead of port-range (like in rtspsrc), because
// there is no way for the user to know how many ports we actually need. It depends
// on how many streams the media contains, and whether the server wants RTCP or
// RTCP-mux, or no RTCP. This property can be used to specify the start of the
// valid range, and if the user wants to know how many ports were used, we can
// add API for that later.
glib::ParamSpecUInt::builder("port-start")
.nick("Port start")
.blurb("Port number to start allocating client ports for receiving RTP and RTCP data, eg. 3000 (0 = automatic selection)")
.default_value(DEFAULT_PORT_START.into())
.mutable_ready()
.build(),
glib::ParamSpecString::builder("protocols")
.nick("Protocols")
.blurb("Allowed lower transport protocols, in order of preference")
.default_value("udp-mcast,udp,tcp")
.mutable_ready()
.build(),
glib::ParamSpecUInt64::builder("timeout")
.nick("Timeout")
.blurb("Timeout for network activity, in nanoseconds")
.maximum(gst::ClockTime::MAX.into())
.default_value(DEFAULT_TIMEOUT.into())
.mutable_ready()
.build(),
]
});
PROPERTIES.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let res = match pspec.name() {
"location" => {
let location = value.get::<Option<&str>>().expect("type checked upstream");
self.set_location(location)
}
"port-start" => {
let mut settings = self.settings.lock().unwrap();
let start = value.get::<u32>().expect("type checked upstream");
match u16::try_from(start) {
Ok(start) => {
settings.port_start = start;
Ok(())
}
Err(err) => Err(glib::Error::new(
gst::CoreError::Failed,
&format!("Failed to set port start: {err:?}"),
)),
}
}
"protocols" => {
let protocols = value.get::<Option<&str>>().expect("type checked upstream");
self.set_protocols(protocols)
}
"timeout" => {
let mut settings = self.settings.lock().unwrap();
let timeout = value.get().expect("type checked upstream");
settings.timeout = timeout;
Ok(())
}
name => unimplemented!("Property '{name}'"),
};
if let Err(err) = res {
gst::error!(
CAT,
imp: self,
"Failed to set property `{}`: {:?}",
pspec.name(),
err
);
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"location" => {
let settings = self.settings.lock().unwrap();
let location = settings.location.as_ref().map(Url::to_string);
location.to_value()
}
"port-start" => {
let settings = self.settings.lock().unwrap();
(settings.port_start as u32).to_value()
}
"protocols" => {
let settings = self.settings.lock().unwrap();
(settings
.protocols
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join(","))
.to_value()
}
"timeout" => {
let settings = self.settings.lock().unwrap();
settings.timeout.to_value()
}
name => unimplemented!("Property '{name}'"),
}
}
fn constructed(&self) {
self.parent_constructed();
let obj = self.obj();
obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE);
obj.set_element_flags(gst::ElementFlags::SOURCE);
}
}
impl GstObjectImpl for RtspSrc {}
impl ElementImpl for RtspSrc {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"RTSP Source",
"Source/Network",
"Receive audio or video from a network device via the Real Time Streaming Protocol (RTSP) (RFC 2326, 7826)",
"Nirbheek Chauhan <nirbheek centricular com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let src_pad_template = gst::PadTemplate::new(
"stream_%u",
gst::PadDirection::Src,
gst::PadPresence::Sometimes,
&gst::Caps::new_empty_simple("application/x-rtp"),
)
.unwrap();
vec![src_pad_template]
});
PAD_TEMPLATES.as_ref()
}
fn change_state(
&self,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
match transition {
gst::StateChange::NullToReady => {
self.start().map_err(|err_msg| {
self.post_error_message(err_msg);
gst::StateChangeError
})?;
}
gst::StateChange::PausedToPlaying => {
let cmd_queue = self.cmd_queue();
//self.async_start().map_err(|_| gst::StateChangeError)?;
RUNTIME.spawn(async move { cmd_queue.send(Commands::Play).await });
}
_ => {}
}
let mut ret = self.parent_change_state(transition)?;
match transition {
gst::StateChange::ReadyToPaused | gst::StateChange::PlayingToPaused => {
ret = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
match tokio::runtime::Handle::try_current() {
Ok(_) => {
// If the app does set_state(NULL) from a block_on() inside its own tokio
// runtime, calling block_on() on our own runtime will cause a panic
// because of nested blocking calls. So, shutdown the task from another
// thread.
// The app's usage is also incorrect since they are blocking the runtime
// on I/O, so emit a warning.
gst::warning!(
CAT,
"Blocking I/O: state change to NULL called from an async \
tokio context, redirecting to another thread to prevent \
the tokio panic, but you should refactor your code to \
make use of gst::Element::call_async and set the state \
to NULL from there, without blocking the runtime"
);
let (tx, rx) = std::sync::mpsc::channel();
self.obj().call_async(move |element| {
tx.send(element.imp().stop()).unwrap();
});
rx.recv().unwrap()
}
Err(_) => self.stop(),
}
.map_err(|err_msg| {
self.post_error_message(err_msg);
gst::StateChangeError
})?;
}
_ => (),
}
Ok(ret)
}
}
impl BinImpl for RtspSrc {}
impl URIHandlerImpl for RtspSrc {
const URI_TYPE: gst::URIType = gst::URIType::Src;
fn protocols() -> &'static [&'static str] {
&["rtsp", "rtspu", "rtspt"]
}
fn uri(&self) -> Option<String> {
let settings = self.settings.lock().unwrap();
settings.location.as_ref().map(Url::to_string)
}
fn set_uri(&self, uri: &str) -> Result<(), glib::Error> {
self.set_location(Some(uri))
}
}
type RtspStream =
Pin<Box<dyn Stream<Item = Result<Message<Body>, super::tcp_message::ReadError>> + Send>>;
type RtspSink = Pin<Box<dyn Sink<Message<Body>, Error = std::io::Error> + Send>>;
impl RtspSrc {
#[track_caller]
fn cmd_queue(&self) -> mpsc::Sender<Commands> {
self.command_queue.lock().unwrap().as_ref().unwrap().clone()
}
fn start(&self) -> Result<(), gst::ErrorMessage> {
let Some(url) = self.settings.lock().unwrap().location.clone() else {
return Err(gst::error_msg!(
gst::ResourceError::Settings,
["No location set"]
));
};
gst::info!(
CAT,
imp: self,
"Location: {url}",
);
gst::info!(
CAT,
imp: self,
"Starting RTSP connection thread.. "
);
let task_src = self.ref_counted();
let mut task_handle = self.task_handle.lock().unwrap();
let (tx, rx) = mpsc::channel(1);
{
let mut cmd_queue_opt = self.command_queue.lock().unwrap();
debug_assert!(cmd_queue_opt.is_none());
cmd_queue_opt.replace(tx);
}
let join_handle = RUNTIME.spawn(async move {
gst::info!(CAT, "Connecting to {url} ..");
let hostname_port =
format!("{}:{}", url.host_str().unwrap(), url.port().unwrap_or(554));
// TODO: Add TLS support
let s = match TcpStream::connect(hostname_port).await {
Ok(s) => s,
Err(err) => {
gst::element_imp_error!(
task_src,
gst::ResourceError::OpenRead,
["Failed to connect to RTSP server: {err:#?}"]
);
return;
}
};
let _ = s.set_nodelay(true);
gst::info!(CAT, "Connected!");
let (read, write) = s.into_split();
let stream = Box::pin(super::tcp_message::async_read(read, MAX_MESSAGE_SIZE).fuse());
let sink = Box::pin(super::tcp_message::async_write(write));
let mut state = RtspTaskState::new(url, stream, sink);
let task_ret = task_src.rtsp_task(&mut state, rx).await;
gst::info!(CAT, "Exited rtsp_task");
// Cleanup after stopping
for h in &state.handles {
h.abort();
}
for h in state.handles {
let _ = h.await;
}
let obj = task_src.obj();
for e in obj.iterate_sorted() {
let Ok(e) = e else {
continue;
};
if let Err(err) = e.set_state(gst::State::Null) {
gst::warning!(CAT, "{} failed to go to Null state: {err:?}", e.name());
}
}
for pad in obj.src_pads() {
if let Err(err) = obj.remove_pad(&pad) {
gst::warning!(CAT, "Failed to remove pad {}: {err:?}", pad.name());
}
}
for e in obj.iterate_sorted() {
let Ok(e) = e else {
continue;
};
if let Err(err) = obj.remove(&e) {
gst::warning!(CAT, "Failed to remove element {}: {err:?}", e.name());
}
}
// Post the element error after cleanup
if let Err(err) = task_ret {
gst::element_imp_error!(
task_src,
gst::CoreError::Failed,
["RTSP task exited: {err:#?}"]
);
}
gst::info!(CAT, "Cleanup complete");
});
debug_assert!(task_handle.is_none());
task_handle.replace(join_handle);
gst::info!(CAT, imp: self, "Started");
Ok(())
}
fn stop(&self) -> Result<(), gst::ErrorMessage> {
gst::info!(CAT, "Stopping...");
let cmd_queue = self.cmd_queue();
let task_handle = { self.task_handle.lock().unwrap().take() };
RUNTIME.block_on(async {
let (tx, rx) = oneshot::channel();
if let Ok(()) = cmd_queue.send(Commands::Teardown(Some(tx))).await {
if let Err(_elapsed) = time::timeout(Duration::from_millis(500), rx).await {
gst::warning!(
CAT,
"Timeout waiting for Teardown, going to NULL asynchronously"
);
}
}
});
if let Some(join_handle) = task_handle {
gst::debug!(CAT, "Waiting for RTSP connection thread to shut down..");
let _ = RUNTIME.block_on(join_handle);
}
self.command_queue.lock().unwrap().take();
gst::info!(CAT, imp: self, "Stopped");
Ok(())
}
fn make_rtp_appsrc(
&self,
rtpsession_n: usize,
caps: &gst::Caps,
manager: &RtspManager,
) -> Result<gst_app::AppSrc> {
let callbacks = gst_app::AppSrcCallbacks::builder()
.enough_data(|appsrc| {
gst::warning!(CAT, "appsrc {} is overrunning: enough data!", appsrc.name());
})
.build();
let appsrc = gst_app::AppSrc::builder()
.name(format!("rtp_appsrc_{rtpsession_n}"))
.format(gst::Format::Time)
.handle_segment_change(true)
.caps(caps)
.stream_type(gst_app::AppStreamType::Stream)
.max_bytes(0)
.max_buffers(0)
.max_time(Some(gst::ClockTime::from_seconds(2)))
.leaky_type(gst_app::AppLeakyType::Downstream)
.callbacks(callbacks)
.is_live(true)
.build();
let obj = self.obj();
obj.add(&appsrc)?;
appsrc
.static_pad("src")
.unwrap()
.link(&manager.rtp_recv_sinkpad(rtpsession_n).unwrap())?;
let templ = obj.pad_template("stream_%u").unwrap();
let ghostpad = gst::GhostPad::builder_from_template(&templ)
.name(format!("stream_{}", rtpsession_n))
.build();
gst::info!(CAT, "Adding ghost srcpad {}", ghostpad.name());
obj.add_pad(&ghostpad)
.expect("Adding a ghostpad should never fail");
appsrc.sync_state_with_parent()?;
Ok(appsrc)
}
fn make_rtcp_appsrc(
&self,
rtpsession_n: usize,
manager: &RtspManager,
) -> Result<gst_app::AppSrc> {
let appsrc = gst_app::AppSrc::builder()
.name(format!("rtcp_appsrc_{rtpsession_n}"))
.format(gst::Format::Time)
.handle_segment_change(true)
.caps(&RTCP_CAPS)
.stream_type(gst_app::AppStreamType::Stream)
.is_live(true)
.build();
self.obj().add(&appsrc)?;
appsrc
.static_pad("src")
.unwrap()
.link(&manager.rtcp_recv_sinkpad(rtpsession_n).unwrap())?;
appsrc.sync_state_with_parent()?;
Ok(appsrc)
}
fn make_rtcp_appsink<
F: FnMut(&gst_app::AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
>(
&self,
rtpsession_n: usize,
manager: &RtspManager,
on_rtcp: F,
) -> Result<()> {
let cmd_tx_eos = self.cmd_queue();
let cbs = gst_app::app_sink::AppSinkCallbacks::builder()
.eos(move |_appsink| {
let cmd_tx = cmd_tx_eos.clone();
RUNTIME.spawn(async move {
let _ = cmd_tx.send(Commands::Teardown(None)).await;
});
})
.new_sample(on_rtcp)
.build();
let rtcp_appsink = gst_app::AppSink::builder()
.name(format!("rtcp_appsink_{rtpsession_n}"))
.sync(false)
.async_(false)
.callbacks(cbs)
.build();
self.obj().add(&rtcp_appsink)?;
manager
.rtcp_send_srcpad(rtpsession_n)
.unwrap()
.link(&rtcp_appsink.static_pad("sink").unwrap())?;
Ok(())
}
fn post_start(&self, code: &str, text: &str) {
let obj = self.obj();
let msg = gst::message::Progress::builder(gst::ProgressType::Start, code, text)
.src(&*obj)
.build();
let _ = obj.post_message(msg);
}
fn post_complete(&self, code: &str, text: &str) {
let obj = self.obj();
let msg = gst::message::Progress::builder(gst::ProgressType::Complete, code, text)
.src(&*obj)
.build();
let _ = obj.post_message(msg);
}
fn post_cancelled(&self, code: &str, text: &str) {
let obj = self.obj();
let msg = gst::message::Progress::builder(gst::ProgressType::Canceled, code, text)
.src(&*obj)
.build();
let _ = obj.post_message(msg);
}
async fn rtsp_task(
&self,
state: &mut RtspTaskState,
mut cmd_rx: mpsc::Receiver<Commands>,
) -> Result<()> {
let cmd_tx = self.cmd_queue();
let settings = { self.settings.lock().unwrap().clone() };
// OPTIONS
state.options().await?;
// DESCRIBE
state.describe().await?;
let mut session: Option<Session> = None;
// SETUP streams (TCP interleaved)
state.setup_params = {
state
.setup(
&mut session,
settings.port_start,
&settings.protocols,
TransportMode::Play,
)
.await?
};
let manager = RtspManager::new(std::env::var("USE_RTPBIN2").is_ok_and(|s| s == "1"));
let obj = self.obj();
obj.add(&manager.inner)
.expect("Adding the manager cannot fail");
manager.inner.sync_state_with_parent().unwrap();
let mut tcp_interleave_appsrcs = HashMap::new();
for (rtpsession_n, p) in state.setup_params.iter_mut().enumerate() {
let (tx, rx) = mpsc::channel(1);
let on_rtcp = move |appsink: &_| on_rtcp_udp(appsink, tx.clone());
match &mut p.transport {
RtspTransportInfo::UdpMulticast {
dest,
port: (rtp_port, rtcp_port),
ttl,
} => {
let rtp_socket = bind_port(*rtp_port, dest.is_ipv4())?;
let rtcp_socket = rtcp_port.and_then(|p| {
bind_port(p, dest.is_ipv4())
.map_err(|err| {
gst::warning!(CAT, "Could not bind to RTCP port: {err:?}");
err
})
.ok()
});
match &dest {
IpAddr::V4(addr) => {
rtp_socket.join_multicast_v4(*addr, Ipv4Addr::UNSPECIFIED)?;
if let Some(ttl) = ttl {
let _ = rtp_socket.set_multicast_ttl_v4(*ttl as u32);
}
if let Some(rtcp_socket) = &rtcp_socket {
if let Err(err) =
rtcp_socket.join_multicast_v4(*addr, Ipv4Addr::UNSPECIFIED)
{
gst::warning!(
CAT,
"Failed to join RTCP multicast address {addr}: {err:?}"
);
}
if let Some(ttl) = ttl {
let _ = rtcp_socket.set_multicast_ttl_v4(*ttl as u32);
}
}
}
IpAddr::V6(addr) => {
rtp_socket.join_multicast_v6(addr, 0)?;
if let Some(rtcp_socket) = &rtcp_socket {
if let Err(err) = rtcp_socket.join_multicast_v6(addr, 0) {
gst::warning!(
CAT,
"Failed to join RTCP multicast address {addr}: {err:?}"
);
}
}
}
};
let rtp_appsrc = self.make_rtp_appsrc(rtpsession_n, &p.caps, &manager)?;
p.rtp_appsrc = Some(rtp_appsrc.clone());
// Spawn RTP udp receive task
state.handles.push(RUNTIME.spawn(async move {
udp_rtp_task(&rtp_socket, rtp_appsrc, settings.timeout).await
}));
// Spawn RTCP udp send/recv task
if let Some(rtcp_socket) = rtcp_socket {
let rtcp_appsrc = self.make_rtcp_appsrc(rtpsession_n, &manager)?;
self.make_rtcp_appsink(rtpsession_n, &manager, on_rtcp)?;
state.handles.push(RUNTIME.spawn(async move {
udp_rtcp_task(&rtcp_socket, rtcp_appsrc, rx).await
}));
}
}
RtspTransportInfo::Udp {
source,
server_port,
client_port: _,
sockets,
} => {
let Some((rtp_socket, rtcp_socket)) = sockets.take() else {
gst::warning!(
CAT,
"Skipping: no UDP sockets for {rtpsession_n}: {:#?}",
p.transport
);
continue;
};
if let Some((server_rtp_port, server_rtcp_port)) = server_port {
let _ = rtp_socket
.connect(&format!(
"{}:{server_rtp_port}",
source.as_ref().expect("Must have source address")
))
.await;
if let (Some(source), Some(port), Some(s)) =
(source, server_rtcp_port, rtcp_socket.as_ref())
{
let _ = s.connect(&format!("{source}:{port}")).await;
}
};
// Spawn RTP udp receive task
let rtp_appsrc = self.make_rtp_appsrc(rtpsession_n, &p.caps, &manager)?;
p.rtp_appsrc = Some(rtp_appsrc.clone());
state.handles.push(RUNTIME.spawn(async move {
udp_rtp_task(&rtp_socket, rtp_appsrc, settings.timeout).await
}));
// Spawn RTCP udp send/recv task
if let Some(rtcp_socket) = rtcp_socket {
let rtcp_appsrc = self.make_rtcp_appsrc(rtpsession_n, &manager)?;
self.make_rtcp_appsink(rtpsession_n, &manager, on_rtcp)?;
state.handles.push(RUNTIME.spawn(async move {
udp_rtcp_task(&rtcp_socket, rtcp_appsrc, rx).await
}));
}
}
RtspTransportInfo::Tcp {
channels: (rtp_channel, rtcp_channel),
} => {
let rtp_appsrc = self.make_rtp_appsrc(rtpsession_n, &p.caps, &manager)?;
p.rtp_appsrc = Some(rtp_appsrc.clone());
tcp_interleave_appsrcs.insert(*rtp_channel, rtp_appsrc);
if let Some(rtcp_channel) = rtcp_channel {
// RTCP SR
let rtcp_appsrc = self.make_rtcp_appsrc(rtpsession_n, &manager)?;
tcp_interleave_appsrcs.insert(*rtcp_channel, rtcp_appsrc.clone());
// RTCP RR
let rtcp_channel = *rtcp_channel;
let cmd_tx = cmd_tx.clone();
self.make_rtcp_appsink(rtpsession_n, &manager, move |appsink| {
on_rtcp_tcp(appsink, cmd_tx.clone(), rtcp_channel)
})?;
}
}
}
}
obj.no_more_pads();
// Expose RTP srcpads
manager.inner.connect_pad_added(|manager, pad| {
if pad.direction() != gst::PadDirection::Src {
return;
}
let Some(obj) = manager
.parent()
.and_then(|o| o.downcast::<gst::Element>().ok())
else {
return;
};
let name = pad.name();
match *name.split('_').collect::<Vec<_>>() {
// rtpbin and rtpbin2
["recv", "rtp", "src", stream_id, ssrc, pt]
| ["rtp", "recv", "src", stream_id, ssrc, pt] => {
if stream_id.parse::<u32>().is_err() {
gst::info!(CAT, "Ignoring srcpad with invalid stream id: {name}");
return;
};
gst::info!(CAT, "Setting rtpbin pad {} as ghostpad target", name);
let srcpad = obj
.static_pad(&format!("stream_{}", stream_id))
.expect("ghostpad should've been available already");
let ghostpad = srcpad
.downcast::<gst::GhostPad>()
.expect("rtspsrc src pads are ghost pads");
if let Err(err) = ghostpad.set_target(Some(pad)) {
gst::element_error!(
obj,
gst::ResourceError::Failed,
(
"Failed to set ghostpad {} target {}: {err:?}",
ghostpad.name(),
name
),
["pt: {pt}, ssrc: {ssrc}"]
);
}
}
_ => {
gst::info!(CAT, "Ignoring unknown srcpad: {name}");
}
}
});
let mut expected_response: Option<(Method, u32)> = None;
loop {
tokio::select! {
msg = state.stream.next() => match msg {
Some(Ok(rtsp_types::Message::Data(data))) => {
let Some(appsrc) = tcp_interleave_appsrcs.get(&data.channel_id()) else {
gst::warning!(CAT,
"ignored data of size {}: unknown channel {}",
data.len(),
data.channel_id()
);
continue;
};
let t = appsrc.current_running_time();
let channel_id = data.channel_id();
gst::trace!(CAT, "Received data on channel {channel_id}");
// TODO: this should be from_mut_slice() after making the necessary
// modifications to Body
let mut buffer = gst::Buffer::from_slice(data.into_body());
let bufref = buffer.make_mut();
bufref.set_dts(t);
// TODO: Allow unlinked source pads
if let Err(err) = appsrc.push_buffer(buffer) {
gst::error!(CAT, "Failed to push buffer on pad {} for channel {}", appsrc.name(), channel_id);
return Err(err.into());
}
}
Some(Ok(rtsp_types::Message::Request(req))) => {
// TODO: implement incoming GET_PARAMETER requests
gst::debug!(CAT, "<-- {req:#?}");
}
Some(Ok(rtsp_types::Message::Response(rsp))) => {
gst::debug!(CAT, "<-- {rsp:#?}");
let Some((expected, cseq)) = &expected_response else {
continue;
};
let Some(s) = &session else {
return Err(RtspError::Fatal(format!("Can't handle {:?} response, no SETUP", expected)).into());
};
match expected {
Method::Play => {
state.play_response(&rsp, *cseq, s).await?;
self.post_complete("request", "PLAY response received");
}
Method::Teardown => state.teardown_response(&rsp, *cseq, s).await?,
m => unreachable!("BUG: unexpected response method: {m:?}"),
};
}
Some(Err(e)) => {
// TODO: reconnect or ignore if UDP sockets are still receiving data
gst::error!(CAT, "I/O error: {e:?}, quitting");
return Err(gst::FlowError::Error.into());
}
None => {
// TODO: reconnect or ignore if UDP sockets are still receiving data
gst::error!(CAT, "TCP connection EOF, quitting");
return Err(gst::FlowError::Eos.into());
}
},
Some(cmd) = cmd_rx.recv() => match cmd {
Commands::Play => {
let Some(s) = &session else {
return Err(RtspError::InvalidMessage("Can't PLAY, no SETUP").into());
};
self.post_start("request", "PLAY request sent");
let cseq = state.play(s).await.map_err(|err| {
self.post_cancelled("request", "PLAY request cancelled");
err
})?;
expected_response = Some((Method::Play, cseq));
},
Commands::Teardown(tx) => {
gst::info!(CAT, "Received Teardown command");
let Some(s) = &session else {
return Err(RtspError::InvalidMessage("Can't TEARDOWN, no SETUP").into());
};
let _ = state.teardown(s).await;
if let Some(tx) = tx {
let _ = tx.send(());
}
break;
}
Commands::Data(data) => {
// We currently only send RTCP RR as data messages, this will change when
// we support TCP ONVIF backchannels
state.sink.send(Message::Data(data)).await?;
gst::debug!(CAT, "Sent RTCP RR over TCP");
}
},
else => {
gst::error!(CAT, "No select statement matched, breaking loop");
break;
}
}
}
Ok(())
}
}
struct RtspManager {
inner: gst::Element,
using_rtpbin2: bool,
}
impl RtspManager {
fn new(rtpbin2: bool) -> Self {
let name = if rtpbin2 { "rtpbin2" } else { "rtpbin" };
RtspManager {
inner: gst::ElementFactory::make_with_name(name, None)
.unwrap_or_else(|_| panic!("{name} not found")),
using_rtpbin2: rtpbin2,
}
}
fn rtp_recv_sinkpad(&self, rtpsession: usize) -> Option<gst::Pad> {
let name = if self.using_rtpbin2 {
format!("rtp_recv_sink_{}", rtpsession)
} else {
format!("recv_rtp_sink_{}", rtpsession)
};
self.inner.request_pad_simple(&name)
}
fn rtcp_recv_sinkpad(&self, rtpsession: usize) -> Option<gst::Pad> {
let name = if self.using_rtpbin2 {
format!("rtcp_recv_sink_{}", rtpsession)
} else {
format!("recv_rtcp_sink_{}", rtpsession)
};
self.inner.request_pad_simple(&name)
}
fn rtcp_send_srcpad(&self, rtpsession: usize) -> Option<gst::Pad> {
let name = if self.using_rtpbin2 {
format!("rtcp_send_src_{}", rtpsession)
} else {
format!("send_rtcp_src_{}", rtpsession)
};
self.inner.request_pad_simple(&name)
}
}
struct RtspTaskState {
cseq: u32,
url: Url,
version: Version,
content_base_or_location: Option<String>,
aggregate_control: Option<Url>,
sdp: Option<sdp_types::Session>,
stream:
Pin<Box<dyn Stream<Item = Result<Message<Body>, super::tcp_message::ReadError>> + Send>>,
sink: Pin<Box<dyn Sink<Message<Body>, Error = std::io::Error> + Send>>,
setup_params: Vec<RtspSetupParams>,
handles: Vec<JoinHandle<()>>,
}
struct RtspSetupParams {
control_url: Url,
transport: RtspTransportInfo,
rtp_appsrc: Option<gst_app::AppSrc>,
caps: gst::Caps,
}
impl RtspTaskState {
fn new(url: Url, stream: RtspStream, sink: RtspSink) -> Self {
RtspTaskState {
cseq: 0u32,
url,
version: Version::V1_0,
content_base_or_location: None,
aggregate_control: None,
sdp: None,
stream,
sink,
setup_params: Vec::new(),
handles: Vec::new(),
}
}
fn check_response(
rsp: &Response<Body>,
cseq: u32,
req_name: Method,
session: Option<&Session>,
) -> Result<(), RtspError> {
if rsp.status() != StatusCode::Ok {
return Err(RtspError::Fatal(format!(
"{req_name:?} request failed: {}",
rsp.reason_phrase()
)));
}
match rsp.typed_header::<CSeq>() {
Ok(Some(v)) => {
if *v != cseq {
return Err(RtspError::InvalidMessage("cseq does not match"));
}
}
Ok(None) => {
gst::warning!(
CAT,
"No cseq in response, continuing... {:#?}",
rsp.headers().collect::<Vec<_>>()
);
}
Err(_) => {
gst::warning!(
CAT,
"Invalid cseq in response, continuing... {:#?}",
rsp.headers().collect::<Vec<_>>()
);
}
};
if let Some(s) = session {
if let Some(have_s) = rsp.typed_header::<Session>()? {
if s.0 != have_s.0 {
return Err(RtspError::Fatal(format!(
"Session in header {} does not match our session {}",
s.0, have_s.0
)));
}
} else {
gst::warning!(
CAT,
"No Session header in response, continuing... {:#?}",
rsp.headers().collect::<Vec<_>>()
);
}
}
Ok(())
}
async fn options(&mut self) -> Result<(), RtspError> {
self.cseq += 1;
let req = Request::builder(Method::Options, self.version)
.typed_header::<CSeq>(&self.cseq.into())
.request_uri(self.url.clone())
.header(USER_AGENT, DEFAULT_USER_AGENT)
.build(Body::default());
gst::debug!(CAT, "-->> {req:#?}");
self.sink.send(req.into()).await?;
let rsp = match self.stream.next().await {
Some(Ok(rtsp_types::Message::Response(rsp))) => Ok(rsp),
Some(Ok(m)) => Err(RtspError::UnexpectedMessage("OPTIONS response", m)),
Some(Err(e)) => Err(e.into()),
None => Err(
std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "options response").into(),
),
}?;
gst::debug!(CAT, "<<-- {rsp:#?}");
Self::check_response(&rsp, self.cseq, Method::Options, None)?;
let Ok(Some(methods)) = rsp.typed_header::<Public>() else {
return Err(RtspError::InvalidMessage(
"OPTIONS response does not contain a valid Public header",
));
};
let needed = [
Method::Describe,
Method::Setup,
Method::Play,
Method::Teardown,
];
let mut unsupported = Vec::new();
for method in &needed {
if !methods.contains(method) {
unsupported.push(format!("{method:?}"));
}
}
if !unsupported.is_empty() {
Err(RtspError::Fatal(format!(
"Server doesn't support the required method{} {}",
if unsupported.len() == 1 { "" } else { "s:" },
unsupported.join(",")
)))
} else {
Ok(())
}
}
async fn describe(&mut self) -> Result<(), RtspError> {
self.cseq += 1;
let req = Request::builder(Method::Describe, self.version)
.typed_header::<CSeq>(&self.cseq.into())
.header(USER_AGENT, DEFAULT_USER_AGENT)
.header(ACCEPT, "application/sdp")
.request_uri(self.url.clone())
.build(Body::default());
gst::debug!(CAT, "-->> {req:#?}");
self.sink.send(req.into()).await?;
let rsp = match self.stream.next().await {
Some(Ok(rtsp_types::Message::Response(rsp))) => Ok(rsp),
Some(Ok(m)) => Err(RtspError::UnexpectedMessage("DESCRIBE response", m)),
Some(Err(e)) => Err(e.into()),
None => Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"describe response",
)
.into()),
}?;
gst::debug!(
CAT,
"<<-- Response {:#?}",
rsp.headers().collect::<Vec<_>>()
);
Self::check_response(&rsp, self.cseq, Method::Describe, None)?;
self.content_base_or_location = rsp
.header(&CONTENT_BASE)
.or(rsp.header(&CONTENT_LOCATION))
.map(|v| v.to_string());
gst::info!(CAT, "{}", std::str::from_utf8(rsp.body()).unwrap());
// TODO: read range attribute from SDP for VOD use-cases
let sdp = sdp_types::Session::parse(rsp.body())?;
gst::debug!(CAT, "{sdp:#?}");
self.sdp.replace(sdp);
Ok(())
}
fn parse_fmtp(fmtp: &str, s: &mut gst::structure::Structure) {
// Non-compliant RTSP servers will incorrectly set these here, ignore them
let ignore_fields = [
"media",
"payload",
"clock-rate",
"encoding-name",
"encoding-params",
];
let encoding_name = s.get::<String>("encoding-name").unwrap();
let Some((_, fmtp)) = fmtp.split_once(' ') else {
gst::warning!(CAT, "Could not parse fmtp: {fmtp}");
return;
};
let iter = fmtp.split(';').map_while(|x| x.split_once('='));
for (k, v) in iter {
let k = k.trim().to_ascii_lowercase();
if ignore_fields.contains(&k.as_str()) {
continue;
}
if encoding_name == "H264" && k == "profile-level-id" {
let profile_idc = u8::from_str_radix(&v[0..2], 16);
let csf_idc = u8::from_str_radix(&v[2..4], 16);
let level_idc = u8::from_str_radix(&v[4..6], 16);
if let (Ok(p), Ok(c), Ok(l)) = (profile_idc, csf_idc, level_idc) {
let sps = &[p, c, l];
let profile = gst_pbutils::codec_utils_h264_get_profile(sps);
let level = gst_pbutils::codec_utils_h264_get_level(sps);
if let (Ok(profile), Ok(level)) = (profile, level) {
s.set("profile", profile);
s.set("level", level);
continue;
}
}
gst::warning!(CAT, "Failed to parse profile-level-id {v}, ignoring...");
continue;
}
s.set(k, v);
}
}
fn parse_rtpmap(rtpmap: &str, s: &mut gst::structure::Structure) -> Result<(), RtspError> {
let Some((_, rtpmap)) = rtpmap.split_once(' ') else {
return Err(RtspError::InvalidMessage(
"Could not parse rtpmap: {rtpmap}",
));
};
let mut iter = rtpmap.split('/');
let Some(encoding_name) = iter.next() else {
return Err(RtspError::InvalidMessage(
"Could not parse encoding-name from rtpmap: {rtpmap}",
));
};
s.set("encoding-name", encoding_name);
let Some(v) = iter.next() else {
return Err(RtspError::InvalidMessage(
"Could not parse clock-rate from rtpmap: {rtpmap}",
));
};
let Ok(clock_rate) = v.parse::<i32>() else {
return Err(RtspError::InvalidMessage(
"Could not parse clock-rate from rtpmap: {rtpmap}",
));
};
s.set("clock-rate", clock_rate);
if let Some(v) = iter.next() {
s.set("encoding-params", v);
}
debug_assert!(iter.next().is_none());
Ok(())
}
// https://datatracker.ietf.org/doc/html/rfc2326#appendix-C.1.1
fn parse_control_path(path: &str, base: &Url) -> Option<Url> {
match Url::parse(path) {
Ok(v) => Some(v),
Err(url::ParseError::RelativeUrlWithoutBase) => {
if path == "*" {
Some(base.clone())
} else {
base.join(path).ok()
}
}
Err(_) => None,
}
}
fn parse_setup_transports(
transports: &Transports,
s: &mut gst::Structure,
protocols: &[RtspProtocol],
mode: &TransportMode,
) -> Result<RtspTransportInfo, RtspError> {
let mut last_error =
RtspError::Fatal("No matching transport found matching selected protocols".to_string());
let mut parsed_transports = Vec::new();
for transport in transports.iter() {
let Transport::Rtp(t) = transport else {
last_error =
RtspError::Fatal(format!("Expected RTP transport, got {:#?}", transports));
continue;
};
// RTSP 2 specifies that we can have multiple SSRCs in the response
// Transport header, but it's not clear why, so we don't support it
if let Some(ssrc) = t.params.ssrc.first() {
s.set("ssrc", ssrc)
}
if !t.params.mode.is_empty() && !t.params.mode.contains(mode) {
last_error = RtspError::Fatal(format!(
"Requested mode {:?} doesn't match server modes: {:?}",
mode, t.params.mode
));
continue;
}
let parsed = match RtspTransportInfo::try_from(t) {
Ok(p) => p,
Err(err) => {
last_error = err;
continue;
}
};
parsed_transports.push(parsed);
}
for protocol in protocols {
for n in 0..parsed_transports.len() {
if parsed_transports[n].to_protocol() == *protocol {
let t = parsed_transports.swap_remove(n);
return Ok(t);
}
}
}
Err(last_error)
}
async fn setup(
&mut self,
session: &mut Option<Session>,
port_start: u16,
protocols: &[RtspProtocol],
mode: TransportMode,
) -> Result<Vec<RtspSetupParams>, RtspError> {
let sdp = self.sdp.as_ref().expect("Must have SDP by now");
let base = self
.content_base_or_location
.as_ref()
.and_then(|s| Url::parse(s).ok())
.unwrap_or_else(|| self.url.clone());
self.aggregate_control = sdp
.get_first_attribute_value("control")
// No attribute and no value have the same meaning for us
.ok()
.flatten()
.and_then(|v| Self::parse_control_path(v, &base));
let mut b = gst::Structure::builder("application/x-rtp");
let skip_attrs = ["control", "range"];
for sdp_types::Attribute { attribute, value } in &sdp.attributes {
if skip_attrs.contains(&attribute.as_str()) {
continue;
}
b = b.field(format!("a-{attribute}"), value);
}
let message_structure = b.build();
let conn_source = sdp
.connection
.as_ref()
.map(|c| c.connection_address.as_str())
.filter(|c| !c.is_empty())
.unwrap_or_else(|| base.host_str().unwrap());
let mut port_next = port_start;
let mut stream_num = 0;
let mut setup_params: Vec<RtspSetupParams> = Vec::new();
let skip_attrs = ["control", "rtpmap", "fmtp"];
for m in &sdp.medias {
if !["audio", "video"].contains(&m.media.as_str()) {
gst::info!(CAT, "Ignoring unsupported media {}", m.media);
continue;
}
let media_control = m
.get_first_attribute_value("control")
// No attribute and no value have the same meaning for us
.ok()
.flatten()
.and_then(|v| Self::parse_control_path(v, &base));
let Some(control_url) = media_control.as_ref().or(self.aggregate_control.as_ref())
else {
gst::warning!(
CAT,
"No session control or media control for {} fmt {}, ignoring",
m.media,
m.fmt
);
continue;
};
// RTP caps
// FIXME: move SDP -> Caps parsing to a separate file
debug_assert_eq!(m.port, 0); // TCP
let Ok(pt) = m.fmt.parse::<i32>() else {
gst::error!(CAT, "Could not parse pt: {}, ignoring media", m.fmt);
continue;
};
let mut s = message_structure.clone();
s.set("media", &m.media);
s.set("payload", pt);
if let Ok(Some(rtpmap)) = m.get_first_attribute_value("rtpmap") {
Self::parse_rtpmap(rtpmap, &mut s)?;
} else {
gst::warning!(CAT, "No rtpmap for {} {}, skipping", m.media, m.fmt);
continue;
}
if let Ok(Some(fmtp)) = m.get_first_attribute_value("fmtp") {
Self::parse_fmtp(fmtp, &mut s);
}
for sdp_types::Attribute { attribute, value } in &m.attributes {
if skip_attrs.contains(&attribute.as_str()) {
continue;
}
// https://github.com/sdroege/sdp-types/issues/17
if attribute == "ssrc" {
continue;
}
s.set(format!("a-{attribute}"), value);
}
// TODO: rtcp-fb: fields
if s.get_optional("encoding-name") == Ok(Some("H264")) {
if s.get_optional("level-asymmetry-allowed") != Ok(Some("0"))
&& s.has_field("level")
{
s.remove_field("level");
}
if s.has_field("level-asymmetry-allowed") {
s.remove_field("level-asymmetry-allowed");
};
}
// SETUP
let mut rtp_socket: Option<UdpSocket> = None;
let mut rtcp_socket: Option<UdpSocket> = None;
let mut transports = Vec::new();
let mut is_ipv4 = true;
let mut conn_protocols = BTreeSet::new();
for conn in &m.connections {
if conn.nettype != "IN" {
continue;
}
// XXX: For now, assume that all connections use the same addrtype
match conn.addrtype.as_str() {
"IP4" => is_ipv4 = true,
"IP6" => is_ipv4 = false,
_ => continue,
};
// Strip subnet mask, if any
let addr = if let Some((first, _)) = conn.connection_address.split_once('/') {
first
} else {
conn.connection_address.as_str()
};
let Ok(addr) = addr.parse::<IpAddr>() else {
continue;
};
// If this is an instance of gst-rtsp-server that only supports
// udp-multicast, it will put the multicast address in the media
// connections field.
if addr.is_multicast() {
conn_protocols.insert(RtspProtocol::UdpMulticast);
} else {
conn_protocols.insert(RtspProtocol::Tcp);
conn_protocols.insert(RtspProtocol::Udp);
}
}
let protocols = if !conn_protocols.is_empty() {
let p = protocols.iter().cloned().collect::<BTreeSet<_>>();
p.intersection(&conn_protocols).cloned().collect::<Vec<_>>()
} else {
protocols.to_owned()
};
if protocols.is_empty() {
gst::error!(CAT, "No available protocols left, skipping media");
continue;
}
if protocols.contains(&RtspProtocol::UdpMulticast) {
let params = RtpTransportParameters {
mode: vec![mode.clone()],
multicast: true,
..Default::default()
};
transports.push(Transport::Rtp(RtpTransport {
profile: RtpProfile::Avp,
lower_transport: Some(RtpLowerTransport::Udp),
params,
}));
}
if protocols.contains(&RtspProtocol::Udp) {
let (sock1, rtp_port) = bind_start_port(port_next, is_ipv4).await;
// Get the actual port that was successfully bound
port_next = rtp_port;
let (sock2, rtcp_port) = bind_start_port(rtp_port + 1, is_ipv4).await;
rtp_socket = Some(sock1);
rtcp_socket = Some(sock2);
let params = RtpTransportParameters {
mode: vec![mode.clone()],
unicast: true,
client_port: Some((rtp_port, Some(rtcp_port))),
..Default::default()
};
transports.push(Transport::Rtp(RtpTransport {
profile: RtpProfile::Avp,
lower_transport: Some(RtpLowerTransport::Udp),
params,
}));
}
if protocols.contains(&RtspProtocol::Tcp) {
let params = RtpTransportParameters {
mode: vec![mode.clone()],
interleaved: Some((stream_num, Some(stream_num + 1))),
..Default::default()
};
transports.push(Transport::Rtp(RtpTransport {
// RTSP 2.0 adds AVPF and more
profile: RtpProfile::Avp,
lower_transport: Some(RtpLowerTransport::Tcp),
params,
}));
}
self.cseq += 1;
let transports: Transports = transports.as_slice().into();
let req = Request::builder(Method::Setup, self.version)
.typed_header::<CSeq>(&self.cseq.into())
.header(USER_AGENT, DEFAULT_USER_AGENT)
.typed_header::<Transports>(&transports)
.request_uri(control_url.clone());
let req = if let Some(s) = session {
req.typed_header::<Session>(s)
} else {
req
};
let req = req.build(Body::default());
let cseq = self.cseq;
gst::debug!(CAT, "-->> {req:#?}");
self.sink.send(req.into()).await?;
// RTSP 2 supports pipelining of SETUP requests, so this ping-pong would have to be
// reworked if we want to support it.
let rsp = match self.stream.next().await {
Some(Ok(rtsp_types::Message::Response(rsp))) => Ok(rsp),
Some(Ok(m)) => Err(RtspError::UnexpectedMessage("SETUP response", m)),
Some(Err(e)) => Err(e.into()),
None => Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"setup response",
)
.into()),
}?;
gst::debug!(CAT, "<<-- {rsp:#?}");
Self::check_response(&rsp, cseq, Method::Setup, session.as_ref())?;
let new_session = rsp
.typed_header::<Session>()?
.ok_or(RtspError::InvalidMessage("No session in SETUP response"))?;
// Manually strip timeout field: https://github.com/sdroege/rtsp-types/issues/24
session.replace(Session(new_session.0, None));
let mut parsed_transport = if let Some(transports) = rsp.typed_header::<Transports>()? {
Self::parse_setup_transports(&transports, &mut s, &protocols, &mode)
} else {
// Transport header in response is optional if only one transport was offered
// https://datatracker.ietf.org/doc/html/rfc2326#section-12.39
if transports.len() == 1 {
Self::parse_setup_transports(&transports, &mut s, &protocols, &mode)
} else {
Err(RtspError::InvalidMessage(
"No transport header in SETUP response",
))
}
}?;
match &mut parsed_transport {
RtspTransportInfo::UdpMulticast { .. } => {}
RtspTransportInfo::Udp {
source,
server_port: _,
client_port,
sockets,
} => {
if source.is_none() {
*source = Some(conn_source.to_string());
}
if let Some((rtp_port, rtcp_port)) = client_port {
// There is no reason for the server to reject the client ports WE
// selected, so if it does, just ignore it.
if *rtp_port != port_next {
gst::warning!(
CAT,
"RTP port changed: {port_next} -> {rtp_port}, ignoring"
);
*rtp_port = port_next;
}
port_next += 1;
*sockets = if let Some(rtcp_port) = rtcp_port {
if *rtcp_port != port_next {
gst::warning!(
CAT,
"RTCP port changed: {port_next} -> {rtcp_port}, ignoring"
);
*rtcp_port = port_next;
}
port_next += 1;
Some((rtp_socket.unwrap(), rtcp_socket))
} else {
Some((rtp_socket.unwrap(), None))
}
};
}
RtspTransportInfo::Tcp {
channels: (rtp_ch, rtcp_ch),
} => {
if *rtp_ch != stream_num {
gst::info!(CAT, "RTP channel changed: {stream_num} -> {rtp_ch}");
}
stream_num += 1;
if let Some(rtcp_ch) = rtcp_ch {
if *rtcp_ch != stream_num {
gst::info!(CAT, "RTCP channel changed: {stream_num} -> {rtcp_ch}");
}
stream_num += 1;
}
}
};
let caps = gst::Caps::from(s);
setup_params.push(RtspSetupParams {
control_url: control_url.clone(),
transport: parsed_transport,
rtp_appsrc: None,
caps,
});
}
Ok(setup_params)
}
async fn play(&mut self, session: &Session) -> Result<u32, RtspError> {
self.cseq += 1;
let request_uri = self.aggregate_control.as_ref().unwrap_or(&self.url).clone();
let req = Request::builder(Method::Play, self.version)
.typed_header::<CSeq>(&self.cseq.into())
.typed_header::<Range>(&Range::Npt(NptRange::From(NptTime::Now)))
.header(USER_AGENT, DEFAULT_USER_AGENT)
.request_uri(request_uri)
.typed_header::<Session>(session);
let req = req.build(Body::default());
gst::debug!(CAT, "-->> {req:#?}");
self.sink.send(req.into()).await?;
Ok(self.cseq)
}
async fn play_response(
&mut self,
rsp: &Response<Body>,
cseq: u32,
session: &Session,
) -> Result<(), RtspError> {
Self::check_response(rsp, cseq, Method::Play, Some(session))?;
if let Some(RtpInfos::V1(rtpinfos)) = rsp.typed_header::<RtpInfos>()? {
for rtpinfo in rtpinfos {
for params in self.setup_params.iter_mut() {
if params.control_url == rtpinfo.uri {
let mut changed = false;
let mut caps = params.rtp_appsrc.as_ref().unwrap().caps().unwrap();
let capsref = caps.make_mut();
if let Some(v) = rtpinfo.seq {
capsref.set("seqnum-base", v as u32);
changed = true;
}
if let Some(v) = rtpinfo.rtptime {
capsref.set("clock-base", v);
changed = true;
}
if changed {
params.rtp_appsrc.as_ref().unwrap().set_caps(Some(&caps));
}
}
}
}
} else {
gst::warning!(CAT, "No RTPInfos V1 header in PLAY response");
};
Ok(())
}
async fn teardown(&mut self, session: &Session) -> Result<u32, RtspError> {
self.cseq += 1;
let request_uri = self.aggregate_control.as_ref().unwrap_or(&self.url).clone();
let req = Request::builder(Method::Teardown, self.version)
.typed_header::<CSeq>(&self.cseq.into())
.header(USER_AGENT, DEFAULT_USER_AGENT)
.request_uri(request_uri)
.typed_header::<Session>(session);
let req = req.build(Body::default());
gst::debug!(CAT, "-->> {req:#?}");
self.sink.send(req.into()).await?;
Ok(self.cseq)
}
async fn teardown_response(
&mut self,
rsp: &Response<Body>,
cseq: u32,
session: &Session,
) -> Result<(), RtspError> {
Self::check_response(rsp, cseq, Method::Teardown, Some(session))?;
Ok(())
}
}
fn bind_port(port: u16, is_ipv4: bool) -> Result<UdpSocket, std::io::Error> {
let domain = if is_ipv4 {
socket2::Domain::IPV4
} else {
socket2::Domain::IPV6
};
let sock = Socket::new(domain, socket2::Type::DGRAM, Some(socket2::Protocol::UDP))?;
let _ = sock.set_reuse_address(true);
#[cfg(unix)]
let _ = sock.set_reuse_port(true);
sock.set_nonblocking(true)?;
let addr: SocketAddr = if is_ipv4 {
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port))
} else {
SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, port, 0, 0))
};
sock.bind(&addr.into())?;
let bound_port = if is_ipv4 {
sock.local_addr()?.as_socket_ipv4().unwrap().port()
} else {
sock.local_addr()?.as_socket_ipv6().unwrap().port()
};
gst::debug!(CAT, "Bound to UDP port {bound_port}");
UdpSocket::from_std(sock.into())
}
async fn bind_start_port(port: u16, is_ipv4: bool) -> (UdpSocket, u16) {
let mut next_port = port;
loop {
match bind_port(next_port, is_ipv4) {
Ok(socket) => {
if next_port != 0 {
return (socket, next_port);
}
let addr = socket
.local_addr()
.expect("Newly-bound port should not fail");
return (socket, addr.port());
}
Err(err) => {
gst::debug!(CAT, "Failed to bind to {next_port}: {err:?}, trying next");
next_port += 1;
// If we fail too much, panic instead of forever doing a hot-loop
if (next_port - MAX_BIND_PORT_RETRY) > port {
panic!("Failed to allocate any ports from {port} to {next_port}");
}
}
};
}
}
fn on_rtcp_udp(
appsink: &gst_app::AppSink,
tx: mpsc::Sender<MappedBuffer<Readable>>,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let Ok(sample) = appsink.pull_sample() else {
return Err(gst::FlowError::Error);
};
let Some(buffer) = sample.buffer_owned() else {
return Ok(gst::FlowSuccess::Ok);
};
let map = buffer.into_mapped_buffer_readable();
match map {
Ok(map) => match tx.try_send(map) {
Ok(_) => Ok(gst::FlowSuccess::Ok),
Err(mpsc::error::TrySendError::Full(_)) => {
gst::error!(CAT, "Could not send RTCP, channel is full");
Err(gst::FlowError::Error)
}
Err(mpsc::error::TrySendError::Closed(_)) => Err(gst::FlowError::Eos),
},
Err(err) => {
gst::error!(CAT, "Failed to map buffer: {err:?}");
Err(gst::FlowError::Error)
}
}
}
fn on_rtcp_tcp(
appsink: &gst_app::AppSink,
cmd_tx: mpsc::Sender<Commands>,
rtcp_channel: u8,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let Ok(sample) = appsink.pull_sample() else {
return Err(gst::FlowError::Error);
};
let Some(buffer) = sample.buffer_owned() else {
return Ok(gst::FlowSuccess::Ok);
};
let map = buffer.into_mapped_buffer_readable();
match map {
Ok(map) => {
let data: rtsp_types::Data<Body> =
rtsp_types::Data::new(rtcp_channel, Body::mapped(map));
let cmd_tx = cmd_tx.clone();
RUNTIME.spawn(async move { cmd_tx.send(Commands::Data(data)).await });
Ok(gst::FlowSuccess::Ok)
}
Err(err) => {
gst::error!(CAT, "Failed to map buffer: {err:?}");
Err(gst::FlowError::Error)
}
}
}
async fn udp_rtp_task(socket: &UdpSocket, appsrc: gst_app::AppSrc, timeout: gst::ClockTime) {
// TODO: this should allocate a buffer pool to avoid a copy
let mut buf = vec![0; UDP_PACKET_MAX_SIZE];
let t = Duration::from_secs(timeout.into());
// We would not be connected if the server didn't give us a Transport header or its Transport
// header didn't specify the server port, so we don't know the sender port from which we will
// get data till we get the first packet here.
if !socket.peer_addr().is_ok() {
let ret = match time::timeout(t, socket.peek_sender()).await {
Ok(Ok(addr)) => {
let _ = socket.connect(addr).await;
Ok(())
}
Ok(Err(_elapsed)) => Err(format!("No data after {DEFAULT_TIMEOUT} seconds, exiting")),
Err(err) => Err(format!("UDP socket was closed: {err:?}")),
};
if let Err(s) = ret {
gst::element_error!(
appsrc,
gst::ResourceError::Failed,
("{}", s),
["{:#?}", socket]
);
return;
}
}
let error = loop {
match time::timeout(t, socket.recv(&mut buf)).await {
Ok(Ok(len)) => {
let t = appsrc.current_running_time();
let mut buffer = gst::Buffer::from_slice(buf[..len].to_owned());
let bufref = buffer.make_mut();
bufref.set_dts(t);
if let Err(err) = appsrc.push_buffer(buffer) {
break format!("UDP buffer push failed: {err:?}");
}
}
Ok(Err(_elapsed)) => break format!("No data after {DEFAULT_TIMEOUT} seconds, exiting"),
Err(err) => break format!("UDP socket was closed: {err:?}"),
};
};
gst::element_error!(
appsrc,
gst::ResourceError::Failed,
("{}", error),
["{:#?}", socket]
);
}
async fn udp_rtcp_task(
socket: &UdpSocket,
appsrc: gst_app::AppSrc,
mut rx: mpsc::Receiver<MappedBuffer<Readable>>,
) {
// The socket might not be connected if the server either didn't specify a server_port for
// RTCP, or if the server didn't send a Transport header in the SETUP response at all.
// In that case, we will connect when we get an RTCP packet.
let mut is_connected = socket.peer_addr().is_ok();
let mut buf = vec![0; UDP_PACKET_MAX_SIZE];
let error = loop {
tokio::select! {
send_rtcp = rx.recv() => match send_rtcp {
Some(data) => match socket.send(data.as_ref()).await {
Ok(_) => gst::debug!(CAT, "Sent RTCP RR"),
Err(err) => {
if !is_connected {
gst::warning!(CAT, "Can't send RTCP yet: don't have dest addr");
} else {
rx.close();
break format!("RTCP send error: {err:?}, stopping task");
}
}
},
None => {
rx.close();
break format!("UDP socket {socket:?} closed, no more RTCP will be sent");
}
},
recv_rtcp = socket.recv_from(&mut buf) => match recv_rtcp {
Ok((len, addr)) => {
gst::debug!(CAT, "Received RTCP SR");
if !is_connected {
gst::info!(CAT, "Delayed RTCP UDP connect to {addr:?}");
let _ = socket.connect(addr).await;
is_connected = true;
};
let t = appsrc.current_running_time();
let mut buffer = gst::Buffer::from_slice(buf[..len].to_owned());
let bufref = buffer.make_mut();
bufref.set_dts(t);
if let Err(err) = appsrc.push_buffer(buffer) {
break format!("UDP buffer push failed: {err:?}");
}
}
Err(err) => break format!("UDP socket was closed: {err:?}"),
},
}
};
gst::element_error!(
appsrc,
gst::ResourceError::Failed,
("{}", error),
["{:#?}", socket]
);
}
#[glib::object_subclass]
impl ObjectSubclass for RtspSrc {
const NAME: &'static str = "GstRtspSrc2";
type Type = super::RtspSrc;
type ParentType = gst::Bin;
type Interfaces = (gst::URIHandler,);
}