From 086ffd7affd4fd43418fc8af90d86e0e2f3d01e0 Mon Sep 17 00:00:00 2001 From: Nirbheek Chauhan Date: Wed, 10 Jan 2024 15:40:19 +0530 Subject: [PATCH] New RTSP source plugin with live streaming support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit GST_PLUGIN_FEATURE_RANK=rtspsrc2:1 gst-play-1.0 [URI] Features: * Live streaming N audio and N video - With RTCP-based A/V sync * Lower transports: TCP, UDP, UDP-Multicast * RTP, RTCP SR, RTCP RR * OPTIONS DESCRIBE SETUP PLAY TEARDOWN * Custom UDP socket management, does not use udpsrc/udpsink * Supports both rtpbin and the rtpbin2 rust rewrite - Set USE_RTPBIN2=1 to use rtpbin2 (needs other MRs) * Properties: - protocols selection and priority (NEW!) - location supports rtsp[ut]:// - port-start instead of port-range Co-Authored-by: Tim-Philipp Müller Part-of: --- Cargo.lock | 197 ++- Cargo.toml | 2 + ci/utils.py | 1 + dependencies.py | 1 + docs/plugins/gst_plugins_cache.json | 91 ++ meson.build | 7 +- meson_options.txt | 1 + net/rtsp/Cargo.toml | 53 + net/rtsp/LICENSE-MPL-2.0 | 373 +++++ net/rtsp/README.md | 68 + net/rtsp/build.rs | 3 + net/rtsp/src/lib.rs | 36 + net/rtsp/src/rtspsrc/body.rs | 103 ++ net/rtsp/src/rtspsrc/imp.rs | 2026 +++++++++++++++++++++++++++ net/rtsp/src/rtspsrc/mod.rs | 30 + net/rtsp/src/rtspsrc/tcp_message.rs | 202 +++ net/rtsp/src/rtspsrc/transport.rs | 105 ++ 17 files changed, 3232 insertions(+), 67 deletions(-) create mode 100644 net/rtsp/Cargo.toml create mode 100644 net/rtsp/LICENSE-MPL-2.0 create mode 100644 net/rtsp/README.md create mode 100644 net/rtsp/build.rs create mode 100644 net/rtsp/src/lib.rs create mode 100644 net/rtsp/src/rtspsrc/body.rs create mode 100644 net/rtsp/src/rtspsrc/imp.rs create mode 100644 net/rtsp/src/rtspsrc/mod.rs create mode 100644 net/rtsp/src/rtspsrc/tcp_message.rs create mode 100644 net/rtsp/src/rtspsrc/transport.rs diff --git a/Cargo.lock b/Cargo.lock index 13d46e82..b57c7e0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -854,6 +854,17 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bstr" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c48f0051a4b4c5e0b6d365cd04af53aeaa209e3cc15ec2cdb69e73cc87fbd0dc" +dependencies = [ + "memchr", + "regex-automata 0.4.5", + "serde", +] + [[package]] name = "build_const" version = "0.2.2" @@ -909,7 +920,7 @@ dependencies = [ [[package]] name = "cairo-rs" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#4fd5ecf98d4e9f1ec0c42cf0e9bec3db4c39f8d1" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#8e574d3a8e445ff9184b00bd490e6f403cd5aa9c" dependencies = [ "bitflags 2.4.2", "cairo-sys-rs", @@ -921,7 +932,7 @@ dependencies = [ [[package]] name = "cairo-sys-rs" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#4fd5ecf98d4e9f1ec0c42cf0e9bec3db4c39f8d1" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#8e574d3a8e445ff9184b00bd490e6f403cd5aa9c" dependencies = [ "glib-sys", "libc", @@ -1105,6 +1116,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "cookie-factory" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "396de984970346b0d9e93d1415082923c679e5ae5c3ee3dcbd104f5610af126b" + [[package]] name = "cookie_store" version = "0.20.0" @@ -1583,6 +1600,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "fallible-iterator" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" + [[package]] name = "fastrand" version = "2.0.1" @@ -1801,7 +1824,7 @@ dependencies = [ [[package]] name = "gdk-pixbuf" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#4fd5ecf98d4e9f1ec0c42cf0e9bec3db4c39f8d1" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#8e574d3a8e445ff9184b00bd490e6f403cd5aa9c" dependencies = [ "gdk-pixbuf-sys", "gio", @@ -1812,7 +1835,7 @@ dependencies = [ [[package]] name = "gdk-pixbuf-sys" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#4fd5ecf98d4e9f1ec0c42cf0e9bec3db4c39f8d1" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#8e574d3a8e445ff9184b00bd490e6f403cd5aa9c" dependencies = [ "gio-sys", "glib-sys", @@ -1971,7 +1994,7 @@ checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" [[package]] name = "gio" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#4fd5ecf98d4e9f1ec0c42cf0e9bec3db4c39f8d1" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#8e574d3a8e445ff9184b00bd490e6f403cd5aa9c" dependencies = [ "futures-channel", "futures-core", @@ -1988,7 +2011,7 @@ dependencies = [ [[package]] name = "gio-sys" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#4fd5ecf98d4e9f1ec0c42cf0e9bec3db4c39f8d1" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#8e574d3a8e445ff9184b00bd490e6f403cd5aa9c" dependencies = [ "glib-sys", "gobject-sys", @@ -2000,7 +2023,7 @@ dependencies = [ [[package]] name = "glib" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#4fd5ecf98d4e9f1ec0c42cf0e9bec3db4c39f8d1" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#8e574d3a8e445ff9184b00bd490e6f403cd5aa9c" dependencies = [ "bitflags 2.4.2", "futures-channel", @@ -2021,7 +2044,7 @@ dependencies = [ [[package]] name = "glib-macros" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#4fd5ecf98d4e9f1ec0c42cf0e9bec3db4c39f8d1" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#8e574d3a8e445ff9184b00bd490e6f403cd5aa9c" dependencies = [ "heck", "proc-macro-crate", @@ -2033,7 +2056,7 @@ dependencies = [ [[package]] name = "glib-sys" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#4fd5ecf98d4e9f1ec0c42cf0e9bec3db4c39f8d1" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#8e574d3a8e445ff9184b00bd490e6f403cd5aa9c" dependencies = [ "libc", "system-deps", @@ -2048,7 +2071,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "gobject-sys" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#4fd5ecf98d4e9f1ec0c42cf0e9bec3db4c39f8d1" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#8e574d3a8e445ff9184b00bd490e6f403cd5aa9c" dependencies = [ "glib-sys", "libc", @@ -2058,7 +2081,7 @@ dependencies = [ [[package]] name = "graphene-rs" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#4fd5ecf98d4e9f1ec0c42cf0e9bec3db4c39f8d1" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#8e574d3a8e445ff9184b00bd490e6f403cd5aa9c" dependencies = [ "glib", "graphene-sys", @@ -2068,7 +2091,7 @@ dependencies = [ [[package]] name = "graphene-sys" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#4fd5ecf98d4e9f1ec0c42cf0e9bec3db4c39f8d1" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#8e574d3a8e445ff9184b00bd490e6f403cd5aa9c" dependencies = [ "glib-sys", "libc", @@ -2599,6 +2622,28 @@ dependencies = [ "once_cell", ] +[[package]] +name = "gst-plugin-rtsp" +version = "0.12.0-alpha.1" +dependencies = [ + "anyhow", + "atomic_refcell", + "data-encoding", + "futures", + "gst-plugin-version-helper", + "gstreamer", + "gstreamer-app", + "gstreamer-pbutils", + "once_cell", + "rtsp-types", + "sdp-types", + "socket2 0.5.5", + "thiserror", + "tokio", + "tokio-stream", + "url", +] + [[package]] name = "gst-plugin-sodium" version = "0.12.0-alpha.1" @@ -2890,7 +2935,7 @@ dependencies = [ [[package]] name = "gstreamer" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "cfg-if", "futures-channel", @@ -2916,7 +2961,7 @@ dependencies = [ [[package]] name = "gstreamer-app" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "futures-core", "futures-sink", @@ -2930,7 +2975,7 @@ dependencies = [ [[package]] name = "gstreamer-app-sys" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "glib-sys", "gstreamer-base-sys", @@ -2942,7 +2987,7 @@ dependencies = [ [[package]] name = "gstreamer-audio" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "cfg-if", "glib", @@ -2957,7 +3002,7 @@ dependencies = [ [[package]] name = "gstreamer-audio-sys" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "glib-sys", "gobject-sys", @@ -2970,7 +3015,7 @@ dependencies = [ [[package]] name = "gstreamer-base" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "atomic_refcell", "cfg-if", @@ -2983,7 +3028,7 @@ dependencies = [ [[package]] name = "gstreamer-base-sys" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "glib-sys", "gobject-sys", @@ -2995,7 +3040,7 @@ dependencies = [ [[package]] name = "gstreamer-check" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "glib", "gstreamer", @@ -3005,7 +3050,7 @@ dependencies = [ [[package]] name = "gstreamer-check-sys" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "glib-sys", "gobject-sys", @@ -3017,7 +3062,7 @@ dependencies = [ [[package]] name = "gstreamer-gl" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "glib", "gstreamer", @@ -3031,7 +3076,7 @@ dependencies = [ [[package]] name = "gstreamer-gl-egl" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "glib", "gstreamer", @@ -3043,7 +3088,7 @@ dependencies = [ [[package]] name = "gstreamer-gl-egl-sys" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "glib-sys", "gstreamer-gl-sys", @@ -3054,7 +3099,7 @@ dependencies = [ [[package]] name = "gstreamer-gl-sys" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "glib-sys", "gobject-sys", @@ -3068,7 +3113,7 @@ dependencies = [ [[package]] name = "gstreamer-gl-wayland" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "glib", "gstreamer", @@ -3080,7 +3125,7 @@ dependencies = [ [[package]] name = "gstreamer-gl-wayland-sys" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "glib-sys", "gstreamer-gl-sys", @@ -3091,7 +3136,7 @@ dependencies = [ [[package]] name = "gstreamer-gl-x11" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "glib", "gstreamer", @@ -3103,7 +3148,7 @@ dependencies = [ [[package]] name = "gstreamer-gl-x11-sys" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "glib-sys", "gstreamer-gl-sys", @@ -3114,7 +3159,7 @@ dependencies = [ [[package]] name = "gstreamer-net" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "gio", "glib", @@ -3125,7 +3170,7 @@ dependencies = [ [[package]] name = "gstreamer-net-sys" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "gio-sys", "glib-sys", @@ -3137,7 +3182,7 @@ dependencies = [ [[package]] name = "gstreamer-pbutils" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "glib", "gstreamer", @@ -3151,7 +3196,7 @@ dependencies = [ [[package]] name = "gstreamer-pbutils-sys" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "glib-sys", "gobject-sys", @@ -3165,7 +3210,7 @@ dependencies = [ [[package]] name = "gstreamer-rtp" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "glib", "gstreamer", @@ -3176,7 +3221,7 @@ dependencies = [ [[package]] name = "gstreamer-rtp-sys" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "glib-sys", "gstreamer-base-sys", @@ -3188,7 +3233,7 @@ dependencies = [ [[package]] name = "gstreamer-sdp" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "glib", "gstreamer", @@ -3198,7 +3243,7 @@ dependencies = [ [[package]] name = "gstreamer-sdp-sys" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "glib-sys", "gstreamer-sys", @@ -3209,7 +3254,7 @@ dependencies = [ [[package]] name = "gstreamer-sys" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "glib-sys", "gobject-sys", @@ -3220,7 +3265,7 @@ dependencies = [ [[package]] name = "gstreamer-utils" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "gstreamer", "gstreamer-app", @@ -3232,7 +3277,7 @@ dependencies = [ [[package]] name = "gstreamer-video" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "cfg-if", "futures-channel", @@ -3249,7 +3294,7 @@ dependencies = [ [[package]] name = "gstreamer-video-sys" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "glib-sys", "gobject-sys", @@ -3262,7 +3307,7 @@ dependencies = [ [[package]] name = "gstreamer-webrtc" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "glib", "gstreamer", @@ -3274,7 +3319,7 @@ dependencies = [ [[package]] name = "gstreamer-webrtc-sys" version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#230c9066269b74a82cc8b2389d07984d78d97f71" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#36792404a98de996e2cf6a7485e667fe6f1cee19" dependencies = [ "glib-sys", "gstreamer-sdp-sys", @@ -3798,9 +3843,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.67" +version = "0.3.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a1d36f1235bc969acba30b7f5990b864423a6068a10f7c90ae8f0112e3a59d1" +checksum = "406cda4b368d531c842222cf9d2600a9a4acce8d29423695379c6868a143a9ee" dependencies = [ "wasm-bindgen", ] @@ -4591,7 +4636,7 @@ dependencies = [ [[package]] name = "pango" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#4fd5ecf98d4e9f1ec0c42cf0e9bec3db4c39f8d1" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#8e574d3a8e445ff9184b00bd490e6f403cd5aa9c" dependencies = [ "gio", "glib", @@ -4602,7 +4647,7 @@ dependencies = [ [[package]] name = "pango-sys" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#4fd5ecf98d4e9f1ec0c42cf0e9bec3db4c39f8d1" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#8e574d3a8e445ff9184b00bd490e6f403cd5aa9c" dependencies = [ "glib-sys", "gobject-sys", @@ -4613,7 +4658,7 @@ dependencies = [ [[package]] name = "pangocairo" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#4fd5ecf98d4e9f1ec0c42cf0e9bec3db4c39f8d1" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#8e574d3a8e445ff9184b00bd490e6f403cd5aa9c" dependencies = [ "cairo-rs", "glib", @@ -4625,7 +4670,7 @@ dependencies = [ [[package]] name = "pangocairo-sys" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#4fd5ecf98d4e9f1ec0c42cf0e9bec3db4c39f8d1" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#8e574d3a8e445ff9184b00bd490e6f403cd5aa9c" dependencies = [ "cairo-sys-rs", "glib-sys", @@ -5314,6 +5359,18 @@ dependencies = [ "winapi", ] +[[package]] +name = "rtsp-types" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "991c14333b8f4fb7459bfaa9e029d0b4b7b1d5a83f56c0d017541e9e1ece21b9" +dependencies = [ + "cookie-factory", + "nom", + "tinyvec", + "url", +] + [[package]] name = "rubato" version = "0.14.1" @@ -5467,6 +5524,16 @@ dependencies = [ "untrusted", ] +[[package]] +name = "sdp-types" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8db497829e222d081f7b50ac81aec4f69750071a0f76b97b950b0b62204da6e" +dependencies = [ + "bstr", + "fallible-iterator", +] + [[package]] name = "sec1" version = "0.3.0" @@ -6538,9 +6605,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1223296a201415c7fad14792dbefaace9bd52b62d33453ade1c5b5f07555406" +checksum = "c1e124130aee3fb58c5bdd6b639a0509486b0338acaaae0c84a5124b0f588b7f" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -6548,9 +6615,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcdc935b63408d58a32f8cc9738a0bffd8f05cc7c002086c6ef20b7312ad9dcd" +checksum = "c9e7e1900c352b609c8488ad12639a311045f40a35491fb69ba8c12f758af70b" dependencies = [ "bumpalo", "log", @@ -6563,9 +6630,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.40" +version = "0.4.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bde2032aeb86bdfaecc8b261eef3cba735cc426c1f3a3416d1e0791be95fc461" +checksum = "877b9c3f61ceea0e56331985743b13f3d25c406a7098d45180fb5f09bc19ed97" dependencies = [ "cfg-if", "js-sys", @@ -6575,9 +6642,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e4c238561b2d428924c49815533a8b9121c664599558a5d9ec51f8a1740a999" +checksum = "b30af9e2d358182b5c7449424f017eba305ed32a7010509ede96cdc4696c46ed" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -6585,9 +6652,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bae1abb6806dc1ad9e560ed242107c0f6c84335f1749dd4e8ddb012ebd5e25a7" +checksum = "642f325be6301eb8107a83d12a8ac6c1e1c54345a7ef1a9261962dfefda09e66" dependencies = [ "proc-macro2", "quote", @@ -6598,15 +6665,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d91413b1c31d7539ba5ef2451af3f0b833a005eb27a631cec32bc0635a8602b" +checksum = "4f186bd2dcf04330886ce82d6f33dd75a7bfcf69ecf5763b89fcde53b6ac9838" [[package]] name = "web-sys" -version = "0.3.67" +version = "0.3.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58cd2333b6e0be7a39605f0e255892fd7418a682d8da8fe042fe25128794d2ed" +checksum = "96565907687f7aceb35bc5fc03770a8a0471d82e479f25832f54a0e3f4b28446" dependencies = [ "js-sys", "wasm-bindgen", @@ -6804,9 +6871,9 @@ checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" [[package]] name = "winnow" -version = "0.5.37" +version = "0.5.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7cad8365489051ae9f054164e459304af2e7e9bb407c958076c8bf4aef52da5" +checksum = "5389a154b01683d28c77f8f68f49dea75f0a4da32557a58f68ee51ebba472d29" dependencies = [ "memchr", ] diff --git a/Cargo.toml b/Cargo.toml index 154671eb..8215dc40 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ members = [ "net/raptorq", "net/reqwest", "net/rtp", + "net/rtsp", "net/webrtchttp", "net/webrtc", "net/webrtc/protocol", @@ -76,6 +77,7 @@ default-members = [ "net/raptorq", "net/reqwest", "net/rtp", + "net/rtsp", "net/webrtchttp", "net/webrtc", "net/webrtc/protocol", diff --git a/ci/utils.py b/ci/utils.py index c43a2843..1ae6be13 100644 --- a/ci/utils.py +++ b/ci/utils.py @@ -22,6 +22,7 @@ RS_PREFIXED = [ 'png', 'tracers', 'rtp', + 'rtsp', 'inter', ] diff --git a/dependencies.py b/dependencies.py index 0bf3c2e3..3f11e209 100755 --- a/dependencies.py +++ b/dependencies.py @@ -29,6 +29,7 @@ RENAMES = { 'rsfile': 'file', 'rsflv': 'flavors', 'rsrtp': 'rtp', + 'rsrtsp': 'rtsp', 'rswebp': 'webp', 'rsonvif': 'onvif', 'rstracers': 'tracers', diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 01f07bdd..1fc15d78 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -6321,6 +6321,97 @@ "tracers": {}, "url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" }, + "rsrtsp": { + "description": "GStreamer RTSP Client Plugin", + "elements": { + "rtspsrc2": { + "author": "Nirbheek Chauhan ", + "description": "Receive audio or video from a network device via the Real Time Streaming Protocol (RTSP) (RFC 2326, 7826)", + "hierarchy": [ + "GstRtspSrc2", + "GstBin", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "interfaces": [ + "GstChildProxy", + "GstURIHandler" + ], + "klass": "Source/Network", + "pad-templates": { + "stream_%%u": { + "caps": "application/x-rtp:\n", + "direction": "src", + "presence": "sometimes" + } + }, + "properties": { + "location": { + "blurb": "RTSP server, credentials and media path, e.g. rtsp://user:p4ssw0rd@camera-5.local:8554/h264_1080p30", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "NULL", + "mutable": "ready", + "readable": true, + "type": "gchararray", + "writable": true + }, + "port-start": { + "blurb": "Port number to start allocating client ports for receiving RTP and RTCP data, eg. 3000 (0 = automatic selection)", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "-1", + "min": "0", + "mutable": "ready", + "readable": true, + "type": "guint", + "writable": true + }, + "protocols": { + "blurb": "Allowed lower transport protocols, in order of preference", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "udp-mcast,udp,tcp", + "mutable": "ready", + "readable": true, + "type": "gchararray", + "writable": true + }, + "timeout": { + "blurb": "Timeout for network activity, in nanoseconds", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "2000000000", + "max": "18446744073709551614", + "min": "0", + "mutable": "ready", + "readable": true, + "type": "guint64", + "writable": true + } + }, + "rank": "none" + } + }, + "filename": "gstrsrtsp", + "license": "MPL", + "other-types": {}, + "package": "gst-plugin-rtsp", + "source": "gst-plugin-rtsp", + "tracers": {}, + "url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" + }, "rstracers": { "description": "GStreamer Rust tracers plugin", "elements": {}, diff --git a/meson.build b/meson.build index cfc5437f..7e2e5777 100644 --- a/meson.build +++ b/meson.build @@ -66,7 +66,9 @@ deps = [ ] webrtc_option = get_option('webrtc') -rtp_option = get_option('rtp').enable_if(webrtc_option.enabled(), error_message: 'webrtc option is enabled') +rtp_option = get_option('rtp').enable_if(webrtc_option.enabled(), error_message: 'webrtc option needs rtp') + +rtsp_option = get_option('rtsp') if get_option('threadshare').allowed() \ or get_option('onvif').allowed() \ @@ -86,7 +88,7 @@ endif if get_option('gtk4').allowed() deps += [['gstreamer-gl-1.0', 'gst-plugins-base', 'gst_gl_dep', 'gstgl']] endif -if get_option('threadshare').allowed() +if get_option('threadshare').allowed() or get_option('rtsp').allowed() deps += [['gstreamer-net-1.0', 'gstreamer', 'gst_net_dep', 'gst_net']] endif @@ -150,6 +152,7 @@ plugins = { }, 'raptorq': {'library': 'libgstraptorq'}, 'reqwest': {'library': 'libgstreqwest'}, + 'rtsp': {'library': 'libgstrsrtsp'}, 'rtp': {'library': 'libgstrsrtp'}, 'webrtchttp': {'library': 'libgstwebrtchttp'}, 'webrtc': { diff --git a/meson_options.txt b/meson_options.txt index af24970f..0842a661 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -28,6 +28,7 @@ option('ndi', type: 'feature', value: 'auto', description: 'Build ndi plugin') option('onvif', type: 'feature', value: 'auto', description: 'Build onvif plugin') option('raptorq', type: 'feature', value: 'auto', description: 'Build raptorq plugin') option('reqwest', type: 'feature', value: 'auto', description: 'Build reqwest plugin') +option('rtsp', type: 'feature', value: 'auto', description: 'Build rtsp plugin') option('rtp', type: 'feature', value: 'auto', description: 'Build rtp plugin') option('webrtc', type: 'feature', value: 'auto', yield: true, description: 'Build webrtc plugin') option('webrtchttp', type: 'feature', value: 'auto', description: 'Build webrtchttp plugin') diff --git a/net/rtsp/Cargo.toml b/net/rtsp/Cargo.toml new file mode 100644 index 00000000..aabc5b83 --- /dev/null +++ b/net/rtsp/Cargo.toml @@ -0,0 +1,53 @@ +[package] +name = "gst-plugin-rtsp" +version.workspace = true +authors = ["Nirbheek Chauhan "] +repository.workspace = true +license = "MPL-2.0" +description = "GStreamer RTSP Client Plugin" +edition.workspace = true +rust-version.workspace = true + +[dependencies] +anyhow = "1" +atomic_refcell = "0.1" +data-encoding = "2.4" +futures = "0.3" +gst = { workspace = true, features = ["v1_20"] } +gst-app = { workspace = true, features = ["v1_20"] } +gst-pbutils = { workspace = true, features = ["v1_20"] } +once_cell.workspace = true +rtsp-types = "0.1" +sdp-types = "0.1" +socket2 = "0.5" +thiserror = "1" +tokio = { version = "1.0", default-features = false, features = ["io-util", "macros", "net", "time", "rt-multi-thread", "sync"] } +tokio-stream = "0.1" +url = "2" + +[lib] +name = "gstrsrtsp" +crate-type = ["cdylib", "rlib"] +path = "src/lib.rs" + +[build-dependencies] +gst-plugin-version-helper.workspace = true + +[features] +static = [] +capi = [] +doc = ["gst/v1_18"] + +[package.metadata.capi] +min_version = "0.9.21" + +[package.metadata.capi.header] +enabled = false + +[package.metadata.capi.library] +install_subdir = "gstreamer-1.0" +versioning = false +import_library = false + +[package.metadata.capi.pkg_config] +requires_private = "gstreamer-1.0, gobject-2.0, glib-2.0, gmodule-2.0" diff --git a/net/rtsp/LICENSE-MPL-2.0 b/net/rtsp/LICENSE-MPL-2.0 new file mode 100644 index 00000000..14e2f777 --- /dev/null +++ b/net/rtsp/LICENSE-MPL-2.0 @@ -0,0 +1,373 @@ +Mozilla Public License Version 2.0 +================================== + +1. Definitions +-------------- + +1.1. "Contributor" + means each individual or legal entity that creates, contributes to + the creation of, or owns Covered Software. + +1.2. "Contributor Version" + means the combination of the Contributions of others (if any) used + by a Contributor and that particular Contributor's Contribution. + +1.3. "Contribution" + means Covered Software of a particular Contributor. + +1.4. "Covered Software" + means Source Code Form to which the initial Contributor has attached + the notice in Exhibit A, the Executable Form of such Source Code + Form, and Modifications of such Source Code Form, in each case + including portions thereof. + +1.5. "Incompatible With Secondary Licenses" + means + + (a) that the initial Contributor has attached the notice described + in Exhibit B to the Covered Software; or + + (b) that the Covered Software was made available under the terms of + version 1.1 or earlier of the License, but not also under the + terms of a Secondary License. + +1.6. "Executable Form" + means any form of the work other than Source Code Form. + +1.7. "Larger Work" + means a work that combines Covered Software with other material, in + a separate file or files, that is not Covered Software. + +1.8. "License" + means this document. + +1.9. "Licensable" + means having the right to grant, to the maximum extent possible, + whether at the time of the initial grant or subsequently, any and + all of the rights conveyed by this License. + +1.10. "Modifications" + means any of the following: + + (a) any file in Source Code Form that results from an addition to, + deletion from, or modification of the contents of Covered + Software; or + + (b) any new file in Source Code Form that contains any Covered + Software. + +1.11. "Patent Claims" of a Contributor + means any patent claim(s), including without limitation, method, + process, and apparatus claims, in any patent Licensable by such + Contributor that would be infringed, but for the grant of the + License, by the making, using, selling, offering for sale, having + made, import, or transfer of either its Contributions or its + Contributor Version. + +1.12. "Secondary License" + means either the GNU General Public License, Version 2.0, the GNU + Lesser General Public License, Version 2.1, the GNU Affero General + Public License, Version 3.0, or any later versions of those + licenses. + +1.13. "Source Code Form" + means the form of the work preferred for making modifications. + +1.14. "You" (or "Your") + means an individual or a legal entity exercising rights under this + License. For legal entities, "You" includes any entity that + controls, is controlled by, or is under common control with You. For + purposes of this definition, "control" means (a) the power, direct + or indirect, to cause the direction or management of such entity, + whether by contract or otherwise, or (b) ownership of more than + fifty percent (50%) of the outstanding shares or beneficial + ownership of such entity. + +2. License Grants and Conditions +-------------------------------- + +2.1. Grants + +Each Contributor hereby grants You a world-wide, royalty-free, +non-exclusive license: + +(a) under intellectual property rights (other than patent or trademark) + Licensable by such Contributor to use, reproduce, make available, + modify, display, perform, distribute, and otherwise exploit its + Contributions, either on an unmodified basis, with Modifications, or + as part of a Larger Work; and + +(b) under Patent Claims of such Contributor to make, use, sell, offer + for sale, have made, import, and otherwise transfer either its + Contributions or its Contributor Version. + +2.2. Effective Date + +The licenses granted in Section 2.1 with respect to any Contribution +become effective for each Contribution on the date the Contributor first +distributes such Contribution. + +2.3. Limitations on Grant Scope + +The licenses granted in this Section 2 are the only rights granted under +this License. No additional rights or licenses will be implied from the +distribution or licensing of Covered Software under this License. +Notwithstanding Section 2.1(b) above, no patent license is granted by a +Contributor: + +(a) for any code that a Contributor has removed from Covered Software; + or + +(b) for infringements caused by: (i) Your and any other third party's + modifications of Covered Software, or (ii) the combination of its + Contributions with other software (except as part of its Contributor + Version); or + +(c) under Patent Claims infringed by Covered Software in the absence of + its Contributions. + +This License does not grant any rights in the trademarks, service marks, +or logos of any Contributor (except as may be necessary to comply with +the notice requirements in Section 3.4). + +2.4. Subsequent Licenses + +No Contributor makes additional grants as a result of Your choice to +distribute the Covered Software under a subsequent version of this +License (see Section 10.2) or under the terms of a Secondary License (if +permitted under the terms of Section 3.3). + +2.5. Representation + +Each Contributor represents that the Contributor believes its +Contributions are its original creation(s) or it has sufficient rights +to grant the rights to its Contributions conveyed by this License. + +2.6. Fair Use + +This License is not intended to limit any rights You have under +applicable copyright doctrines of fair use, fair dealing, or other +equivalents. + +2.7. Conditions + +Sections 3.1, 3.2, 3.3, and 3.4 are conditions of the licenses granted +in Section 2.1. + +3. Responsibilities +------------------- + +3.1. Distribution of Source Form + +All distribution of Covered Software in Source Code Form, including any +Modifications that You create or to which You contribute, must be under +the terms of this License. You must inform recipients that the Source +Code Form of the Covered Software is governed by the terms of this +License, and how they can obtain a copy of this License. You may not +attempt to alter or restrict the recipients' rights in the Source Code +Form. + +3.2. Distribution of Executable Form + +If You distribute Covered Software in Executable Form then: + +(a) such Covered Software must also be made available in Source Code + Form, as described in Section 3.1, and You must inform recipients of + the Executable Form how they can obtain a copy of such Source Code + Form by reasonable means in a timely manner, at a charge no more + than the cost of distribution to the recipient; and + +(b) You may distribute such Executable Form under the terms of this + License, or sublicense it under different terms, provided that the + license for the Executable Form does not attempt to limit or alter + the recipients' rights in the Source Code Form under this License. + +3.3. Distribution of a Larger Work + +You may create and distribute a Larger Work under terms of Your choice, +provided that You also comply with the requirements of this License for +the Covered Software. If the Larger Work is a combination of Covered +Software with a work governed by one or more Secondary Licenses, and the +Covered Software is not Incompatible With Secondary Licenses, this +License permits You to additionally distribute such Covered Software +under the terms of such Secondary License(s), so that the recipient of +the Larger Work may, at their option, further distribute the Covered +Software under the terms of either this License or such Secondary +License(s). + +3.4. Notices + +You may not remove or alter the substance of any license notices +(including copyright notices, patent notices, disclaimers of warranty, +or limitations of liability) contained within the Source Code Form of +the Covered Software, except that You may alter any license notices to +the extent required to remedy known factual inaccuracies. + +3.5. Application of Additional Terms + +You may choose to offer, and to charge a fee for, warranty, support, +indemnity or liability obligations to one or more recipients of Covered +Software. However, You may do so only on Your own behalf, and not on +behalf of any Contributor. You must make it absolutely clear that any +such warranty, support, indemnity, or liability obligation is offered by +You alone, and You hereby agree to indemnify every Contributor for any +liability incurred by such Contributor as a result of warranty, support, +indemnity or liability terms You offer. You may include additional +disclaimers of warranty and limitations of liability specific to any +jurisdiction. + +4. Inability to Comply Due to Statute or Regulation +--------------------------------------------------- + +If it is impossible for You to comply with any of the terms of this +License with respect to some or all of the Covered Software due to +statute, judicial order, or regulation then You must: (a) comply with +the terms of this License to the maximum extent possible; and (b) +describe the limitations and the code they affect. Such description must +be placed in a text file included with all distributions of the Covered +Software under this License. Except to the extent prohibited by statute +or regulation, such description must be sufficiently detailed for a +recipient of ordinary skill to be able to understand it. + +5. Termination +-------------- + +5.1. The rights granted under this License will terminate automatically +if You fail to comply with any of its terms. However, if You become +compliant, then the rights granted under this License from a particular +Contributor are reinstated (a) provisionally, unless and until such +Contributor explicitly and finally terminates Your grants, and (b) on an +ongoing basis, if such Contributor fails to notify You of the +non-compliance by some reasonable means prior to 60 days after You have +come back into compliance. Moreover, Your grants from a particular +Contributor are reinstated on an ongoing basis if such Contributor +notifies You of the non-compliance by some reasonable means, this is the +first time You have received notice of non-compliance with this License +from such Contributor, and You become compliant prior to 30 days after +Your receipt of the notice. + +5.2. If You initiate litigation against any entity by asserting a patent +infringement claim (excluding declaratory judgment actions, +counter-claims, and cross-claims) alleging that a Contributor Version +directly or indirectly infringes any patent, then the rights granted to +You by any and all Contributors for the Covered Software under Section +2.1 of this License shall terminate. + +5.3. In the event of termination under Sections 5.1 or 5.2 above, all +end user license agreements (excluding distributors and resellers) which +have been validly granted by You or Your distributors under this License +prior to termination shall survive termination. + +************************************************************************ +* * +* 6. Disclaimer of Warranty * +* ------------------------- * +* * +* Covered Software is provided under this License on an "as is" * +* basis, without warranty of any kind, either expressed, implied, or * +* statutory, including, without limitation, warranties that the * +* Covered Software is free of defects, merchantable, fit for a * +* particular purpose or non-infringing. The entire risk as to the * +* quality and performance of the Covered Software is with You. * +* Should any Covered Software prove defective in any respect, You * +* (not any Contributor) assume the cost of any necessary servicing, * +* repair, or correction. This disclaimer of warranty constitutes an * +* essential part of this License. No use of any Covered Software is * +* authorized under this License except under this disclaimer. * +* * +************************************************************************ + +************************************************************************ +* * +* 7. Limitation of Liability * +* -------------------------- * +* * +* Under no circumstances and under no legal theory, whether tort * +* (including negligence), contract, or otherwise, shall any * +* Contributor, or anyone who distributes Covered Software as * +* permitted above, be liable to You for any direct, indirect, * +* special, incidental, or consequential damages of any character * +* including, without limitation, damages for lost profits, loss of * +* goodwill, work stoppage, computer failure or malfunction, or any * +* and all other commercial damages or losses, even if such party * +* shall have been informed of the possibility of such damages. This * +* limitation of liability shall not apply to liability for death or * +* personal injury resulting from such party's negligence to the * +* extent applicable law prohibits such limitation. Some * +* jurisdictions do not allow the exclusion or limitation of * +* incidental or consequential damages, so this exclusion and * +* limitation may not apply to You. * +* * +************************************************************************ + +8. Litigation +------------- + +Any litigation relating to this License may be brought only in the +courts of a jurisdiction where the defendant maintains its principal +place of business and such litigation shall be governed by laws of that +jurisdiction, without reference to its conflict-of-law provisions. +Nothing in this Section shall prevent a party's ability to bring +cross-claims or counter-claims. + +9. Miscellaneous +---------------- + +This License represents the complete agreement concerning the subject +matter hereof. If any provision of this License is held to be +unenforceable, such provision shall be reformed only to the extent +necessary to make it enforceable. Any law or regulation which provides +that the language of a contract shall be construed against the drafter +shall not be used to construe this License against a Contributor. + +10. Versions of the License +--------------------------- + +10.1. New Versions + +Mozilla Foundation is the license steward. Except as provided in Section +10.3, no one other than the license steward has the right to modify or +publish new versions of this License. Each version will be given a +distinguishing version number. + +10.2. Effect of New Versions + +You may distribute the Covered Software under the terms of the version +of the License under which You originally received the Covered Software, +or under the terms of any subsequent version published by the license +steward. + +10.3. Modified Versions + +If you create software not governed by this License, and you want to +create a new license for such software, you may create and use a +modified version of this License if you rename the license and remove +any references to the name of the license steward (except to note that +such modified license differs from this License). + +10.4. Distributing Source Code Form that is Incompatible With Secondary +Licenses + +If You choose to distribute Source Code Form that is Incompatible With +Secondary Licenses under the terms of this version of the License, the +notice described in Exhibit B of this License must be attached. + +Exhibit A - Source Code Form License Notice +------------------------------------------- + + This Source Code Form is subject to the terms of the Mozilla Public + License, v. 2.0. If a copy of the MPL was not distributed with this + file, You can obtain one at http://mozilla.org/MPL/2.0/. + +If it is not possible or desirable to put the notice in a particular +file, then You may include the notice in a location (such as a LICENSE +file in a relevant directory) where a recipient would be likely to look +for such a notice. + +You may add additional accurate notices of copyright ownership. + +Exhibit B - "Incompatible With Secondary Licenses" Notice +--------------------------------------------------------- + + This Source Code Form is "Incompatible With Secondary Licenses", as + defined by the Mozilla Public License, v. 2.0. diff --git a/net/rtsp/README.md b/net/rtsp/README.md new file mode 100644 index 00000000..d0cedfad --- /dev/null +++ b/net/rtsp/README.md @@ -0,0 +1,68 @@ +# rtspsrc2 + +Rust rewrite of rtspsrc, with the purpose of fixing the fundamentally broken +architecture of rtspsrc. There are some major problems with rtspsrc: + +1. Element states are linked to RTSP states, which causes unfixable glitching + and issues, especially in shared RTSP media +2. The command loop is fundamentally broken and buggy, which can cause RTSP + commands such as `SET_PARAMETER` and `GET_PARAMETER` to be lost +3. The combination of the above two causes unfixable deadlocks when doing state + changes due to external factors such as server state, or when seeking +4. Parsing of untrusted RTSP messages from the network was done in C with the + `GstRTSPMessage` API. +5. Parsing of untrusted SDP from the network was done in C with the + `GstSDPMessage` API + +## Implemented features + +* RTSP 1.0 support +* Lower transports: TCP, UDP, UDP-Multicast +* RTCP SR and RTCP RR +* RTCP-based A/V sync +* Lower transport selection and priority (NEW!) + - Also supports different lower transports for each SETUP + +## Missing features + +Roughly in order of priority: + +* Credentials support +* TLS/TCP support +* NAT hole punching +* Allocate a buffer pool for receiving + pushing UDP packets +* Allow ignoring specific streams (SDP medias) + - Currently all available source pads must be linked +* SRTP support +* HTTP tunnelling +* Proxy support +* `GET_PARAMETER` / `SET_PARAMETER` +* Make TCP connection optional when using UDP transport + - Or TCP reconnection if UDP has not timed out +* Parse SDP rtcp-fb attributes +* Parse SDP ssrc attributes +* Don't require Transport header in SETUP response, it is optional +* Clock sync support, such as RFC7273 +* PAUSE support with VOD +* Seeking support with VOD +* ONVIF backchannel support +* ONVIF trick mode support +* RTSP 2 support (no servers exist at present) + +## Missing configuration properties + +These are some misc rtspsrc props that haven't been implemented in rtspsrc2 +yet: + +* latency +* do-rtx +* do-rtcp +* iface +* user-agent + +## Maintenance and future cleanup + +* Refactor SDP → Caps parsing into a module +* Test with market RTSP cameras + - Currently, only live555 and gst-rtsp-server have been tested +* Add tokio-console and tokio tracing support diff --git a/net/rtsp/build.rs b/net/rtsp/build.rs new file mode 100644 index 00000000..cda12e57 --- /dev/null +++ b/net/rtsp/build.rs @@ -0,0 +1,3 @@ +fn main() { + gst_plugin_version_helper::info() +} diff --git a/net/rtsp/src/lib.rs b/net/rtsp/src/lib.rs new file mode 100644 index 00000000..db984c0c --- /dev/null +++ b/net/rtsp/src/lib.rs @@ -0,0 +1,36 @@ +// GStreamer RTSP plugin (in Rust) +// +// Copyright (C) 2023 Tim-Philipp Müller +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * plugin-rsrtsp: + * + * Since: plugins-rs-0.12.0 + */ +use gst::glib; + +mod rtspsrc; + +fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + rtspsrc::register(plugin)?; + Ok(()) +} + +gst::plugin_define!( + rsrtsp, + env!("CARGO_PKG_DESCRIPTION"), + plugin_init, + concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")), + // FIXME: MPL-2.0 is only allowed since 1.18.3 (as unknown) and 1.20 (as known) + "MPL", + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_REPOSITORY"), + env!("BUILD_REL_DATE") +); diff --git a/net/rtsp/src/rtspsrc/body.rs b/net/rtsp/src/rtspsrc/body.rs new file mode 100644 index 00000000..9bbaabfc --- /dev/null +++ b/net/rtsp/src/rtspsrc/body.rs @@ -0,0 +1,103 @@ +// Rust RTSP Server +// +// Copyright (C) 2020-2021 Sebastian Dröge +// Copyright (C) 2024 Nirbheek Chauhan +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use std::cmp; +use std::fmt; +use std::ops; + +use gst::buffer::{MappedBuffer, Readable}; + +/// Body used for RTSP messages in the server. +#[derive(Debug)] +pub struct Body(Inner); + +enum Inner { + Vec(Vec), + Custom(Box), + Buffer(MappedBuffer), +} + +trait Custom: AsRef<[u8]> + Send + Sync + 'static {} +impl + Send + Sync + 'static> Custom for T {} + +impl Default for Body { + fn default() -> Self { + Body(Inner::Vec(Vec::new())) + } +} + +impl Body { + /// Create a body from custom memory without copying. + #[allow(dead_code)] + pub fn custom + Send + Sync + 'static>(custom: T) -> Self { + Body(Inner::Custom(Box::new(custom))) + } + + pub fn mapped(mapped: MappedBuffer) -> Self { + Body(Inner::Buffer(mapped)) + } +} + +impl From> for Body { + fn from(v: Vec) -> Self { + Body(Inner::Vec(v)) + } +} + +impl<'a> From<&'a [u8]> for Body { + fn from(s: &'a [u8]) -> Self { + Body::from(Vec::from(s)) + } +} + +impl ops::Deref for Body { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + self.as_ref() + } +} + +impl AsRef<[u8]> for Body { + fn as_ref(&self) -> &[u8] { + match self.0 { + Inner::Vec(ref vec) => vec.as_slice(), + Inner::Custom(ref custom) => (**custom).as_ref(), + Inner::Buffer(ref mapped) => mapped.as_ref(), + } + } +} + +impl cmp::PartialEq for Body { + fn eq(&self, other: &Self) -> bool { + self.as_ref().eq(other.as_ref()) + } +} + +impl cmp::Eq for Body {} + +impl Clone for Body { + fn clone(&self) -> Self { + Body::from(Vec::from(self.as_ref())) + } +} + +impl fmt::Debug for Inner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Inner::Vec(ref vec) => f.debug_tuple("Vec").field(&vec).finish(), + Inner::Custom(ref custom) => { + f.debug_tuple("Custom").field(&(**custom).as_ref()).finish() + } + Inner::Buffer(ref mapped) => mapped.fmt(f), + } + } +} diff --git a/net/rtsp/src/rtspsrc/imp.rs b/net/rtsp/src/rtspsrc/imp.rs new file mode 100644 index 00000000..d312e38c --- /dev/null +++ b/net/rtsp/src/rtspsrc/imp.rs @@ -0,0 +1,2026 @@ +// GStreamer RTSP Source 2 +// +// Copyright (C) 2023 Tim-Philipp Müller +// Copyright (C) 2023-2024 Nirbheek Chauhan +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 +// +// https://www.rfc-editor.org/rfc/rfc2326.html + +use std::collections::{btree_set::BTreeSet, HashMap}; +use std::convert::TryFrom; +use std::fmt; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use anyhow::Result; +use once_cell::sync::Lazy; + +use futures::{Sink, SinkExt, Stream, StreamExt}; +use socket2::Socket; +use tokio::net::{TcpStream, UdpSocket}; +use tokio::runtime; +use tokio::sync::{mpsc, oneshot}; +use tokio::task::JoinHandle; +use tokio::time; + +use rtsp_types::headers::{ + CSeq, NptRange, NptTime, Public, Range, RtpInfos, RtpLowerTransport, RtpProfile, RtpTransport, + RtpTransportParameters, Session, Transport, TransportMode, Transports, ACCEPT, CONTENT_BASE, + CONTENT_LOCATION, USER_AGENT, +}; +use rtsp_types::{Message, Method, Request, Response, StatusCode, Version}; + +use url::Url; + +use gst::buffer::{MappedBuffer, Readable}; +use gst::glib; +use gst::prelude::*; +use gst::subclass::prelude::*; + +use super::body::Body; +use super::transport::RtspTransportInfo; + +const DEFAULT_LOCATION: Option = None; +const DEFAULT_TIMEOUT: gst::ClockTime = gst::ClockTime::from_seconds(2); +const DEFAULT_PORT_START: u16 = 0; + +const DEFAULT_PROTOCOLS: &str = "udp-mcast,udp,tcp"; +const MAX_MESSAGE_SIZE: usize = 1024 * 1024; +const MAX_BIND_PORT_RETRY: u16 = 100; +const UDP_PACKET_MAX_SIZE: usize = 65535 - 8; + +static RTCP_CAPS: Lazy = + Lazy::new(|| gst::Caps::from(gst::Structure::new_empty("application/x-rtcp"))); + +// Hardcoded for now +const DEFAULT_USER_AGENT: &str = concat!( + "GStreamer rtspsrc2 ", + env!("CARGO_PKG_VERSION"), + "-", + env!("COMMIT_ID") +); + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub enum RtspProtocol { + UdpMulticast, + Udp, + Tcp, +} + +impl fmt::Display for RtspProtocol { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self { + RtspProtocol::Udp => write!(f, "udp"), + RtspProtocol::UdpMulticast => write!(f, "udp-mcast"), + RtspProtocol::Tcp => write!(f, "tcp"), + } + } +} + +#[derive(Debug, Clone)] +struct Settings { + location: Option, + port_start: u16, + protocols: Vec, + timeout: gst::ClockTime, +} + +impl Default for Settings { + fn default() -> Self { + Settings { + location: DEFAULT_LOCATION, + port_start: DEFAULT_PORT_START, + timeout: DEFAULT_TIMEOUT, + protocols: parse_protocols_str(DEFAULT_PROTOCOLS).unwrap(), + } + } +} + +#[derive(Debug)] +enum Commands { + Play, + //Pause, + Teardown(Option>), + Data(rtsp_types::Data), +} + +#[derive(Debug, Default)] +pub struct RtspSrc { + settings: Mutex, + task_handle: Mutex>>, + command_queue: Mutex>>, +} + +#[derive(thiserror::Error, Debug)] +pub enum RtspError { + #[error("Generic I/O error")] + IOGeneric(#[from] std::io::Error), + #[error("Read I/O error")] + Read(#[from] super::tcp_message::ReadError), + #[error("RTSP header parse error")] + HeaderParser(#[from] rtsp_types::headers::HeaderParseError), + #[error("SDP parse error")] + SDPParser(#[from] sdp_types::ParserError), + #[error("Unexpected RTSP message: expected, received")] + UnexpectedMessage(&'static str, rtsp_types::Message), + #[error("Invalid RTSP message")] + InvalidMessage(&'static str), + #[error("Fatal error")] + Fatal(String), +} + +pub(crate) static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "rtspsrc2", + gst::DebugColorFlags::empty(), + Some("RTSP source"), + ) +}); + +static RUNTIME: Lazy = Lazy::new(|| { + runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(1) + .build() + .unwrap() +}); + +fn parse_protocols_str(s: &str) -> Result, glib::Error> { + let mut acc = Vec::new(); + if s.is_empty() { + return Err(glib::Error::new( + gst::CoreError::Failed, + "Protocols list is empty", + )); + } + for each in s.split(',') { + match each { + "udp-mcast" => acc.push(RtspProtocol::UdpMulticast), + "udp" => acc.push(RtspProtocol::Udp), + "tcp" => acc.push(RtspProtocol::Tcp), + _ => { + return Err(glib::Error::new( + gst::CoreError::Failed, + &format!("Unsupported RTSP protocol: {each}"), + )) + } + } + } + Ok(acc) +} + +impl RtspSrc { + fn set_location(&self, uri: Option<&str>) -> Result<(), glib::Error> { + if self.obj().current_state() > gst::State::Ready { + return Err(glib::Error::new( + gst::URIError::BadState, + "Changing the 'location' property on a started 'rtspsrc2' is not supported", + )); + } + + let mut settings = self.settings.lock().unwrap(); + + let Some(uri) = uri else { + settings.location = DEFAULT_LOCATION; + return Ok(()); + }; + + let uri = Url::parse(uri).map_err(|err| { + glib::Error::new( + gst::URIError::BadUri, + &format!("Failed to parse URI '{uri}': {err:?}"), + ) + })?; + + if uri.password().is_some() || !uri.username().is_empty() { + // TODO + gst::fixme!(CAT, "URI credentials are currently ignored"); + } + + match (uri.host_str(), uri.port()) { + (Some(_), Some(_)) | (Some(_), None) => Ok(()), + _ => Err(glib::Error::new(gst::URIError::BadUri, "Invalid host")), + }?; + + let protocols: &[RtspProtocol] = match uri.scheme() { + "rtspu" => &[RtspProtocol::UdpMulticast, RtspProtocol::Udp], + "rtspt" => &[RtspProtocol::Tcp], + "rtsp" => &settings.protocols, + scheme => { + return Err(glib::Error::new( + gst::URIError::UnsupportedProtocol, + &format!("Unsupported URI scheme '{}'", scheme), + )); + } + }; + + if !settings.protocols.iter().any(|p| protocols.contains(p)) { + return Err(glib::Error::new( + gst::URIError::UnsupportedProtocol, + &format!( + "URI scheme '{}' does not match allowed protocols: {:?}", + uri.scheme(), + settings.protocols, + ), + )); + } + + settings.protocols = protocols.to_vec(); + settings.location = Some(uri); + + Ok(()) + } + + fn set_protocols(&self, protocol_s: Option<&str>) -> Result<(), glib::Error> { + if self.obj().current_state() > gst::State::Ready { + return Err(glib::Error::new( + gst::CoreError::Failed, + "Changing the 'protocols' property on a started 'rtspsrc2' is not supported", + )); + } + + let mut settings = self.settings.lock().unwrap(); + + settings.protocols = match protocol_s { + Some(s) => parse_protocols_str(s)?, + None => parse_protocols_str(DEFAULT_PROTOCOLS).unwrap(), + }; + + Ok(()) + } +} + +impl ObjectImpl for RtspSrc { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecString::builder("location") + .nick("Location") + .blurb("RTSP server, credentials and media path, e.g. rtsp://user:p4ssw0rd@camera-5.local:8554/h264_1080p30") + .mutable_ready() + .build(), + // We purposely use port-start instead of port-range (like in rtspsrc), because + // there is no way for the user to know how many ports we actually need. It depends + // on how many streams the media contains, and whether the server wants RTCP or + // RTCP-mux, or no RTCP. This property can be used to specify the start of the + // valid range, and if the user wants to know how many ports were used, we can + // add API for that later. + glib::ParamSpecUInt::builder("port-start") + .nick("Port start") + .blurb("Port number to start allocating client ports for receiving RTP and RTCP data, eg. 3000 (0 = automatic selection)") + .default_value(DEFAULT_PORT_START.into()) + .mutable_ready() + .build(), + glib::ParamSpecString::builder("protocols") + .nick("Protocols") + .blurb("Allowed lower transport protocols, in order of preference") + .default_value("udp-mcast,udp,tcp") + .mutable_ready() + .build(), + glib::ParamSpecUInt64::builder("timeout") + .nick("Timeout") + .blurb("Timeout for network activity, in nanoseconds") + .maximum(gst::ClockTime::MAX.into()) + .default_value(DEFAULT_TIMEOUT.into()) + .mutable_ready() + .build(), + ] + }); + + PROPERTIES.as_ref() + } + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + let res = match pspec.name() { + "location" => { + let location = value.get::>().expect("type checked upstream"); + self.set_location(location) + } + "port-start" => { + let mut settings = self.settings.lock().unwrap(); + let start = value.get::().expect("type checked upstream"); + match u16::try_from(start) { + Ok(start) => { + settings.port_start = start; + Ok(()) + } + Err(err) => Err(glib::Error::new( + gst::CoreError::Failed, + &format!("Failed to set port start: {err:?}"), + )), + } + } + "protocols" => { + let protocols = value.get::>().expect("type checked upstream"); + self.set_protocols(protocols) + } + "timeout" => { + let mut settings = self.settings.lock().unwrap(); + let timeout = value.get().expect("type checked upstream"); + settings.timeout = timeout; + Ok(()) + } + name => unimplemented!("Property '{name}'"), + }; + + if let Err(err) = res { + gst::error!( + CAT, + imp: self, + "Failed to set property `{}`: {:?}", + pspec.name(), + err + ); + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "location" => { + let settings = self.settings.lock().unwrap(); + let location = settings.location.as_ref().map(Url::to_string); + + location.to_value() + } + "port-start" => { + let settings = self.settings.lock().unwrap(); + (settings.port_start as u32).to_value() + } + "protocols" => { + let settings = self.settings.lock().unwrap(); + (settings + .protocols + .iter() + .map(ToString::to_string) + .collect::>() + .join(",")) + .to_value() + } + "timeout" => { + let settings = self.settings.lock().unwrap(); + settings.timeout.to_value() + } + name => unimplemented!("Property '{name}'"), + } + } + + fn constructed(&self) { + self.parent_constructed(); + + let obj = self.obj(); + obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE); + obj.set_element_flags(gst::ElementFlags::SOURCE); + } +} + +impl GstObjectImpl for RtspSrc {} + +impl ElementImpl for RtspSrc { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "RTSP Source", + "Source/Network", + "Receive audio or video from a network device via the Real Time Streaming Protocol (RTSP) (RFC 2326, 7826)", + "Nirbheek Chauhan ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let src_pad_template = gst::PadTemplate::new( + "stream_%u", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &gst::Caps::new_empty_simple("application/x-rtp"), + ) + .unwrap(); + + vec![src_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } + + fn change_state( + &self, + transition: gst::StateChange, + ) -> Result { + match transition { + gst::StateChange::NullToReady => { + self.start().map_err(|err_msg| { + self.post_error_message(err_msg); + gst::StateChangeError + })?; + } + gst::StateChange::PausedToPlaying => { + let cmd_queue = self.cmd_queue(); + //self.async_start().map_err(|_| gst::StateChangeError)?; + RUNTIME.spawn(async move { cmd_queue.send(Commands::Play).await }); + } + _ => {} + } + + let mut ret = self.parent_change_state(transition)?; + + match transition { + gst::StateChange::ReadyToPaused | gst::StateChange::PlayingToPaused => { + ret = gst::StateChangeSuccess::NoPreroll; + } + gst::StateChange::PausedToReady => { + match tokio::runtime::Handle::try_current() { + Ok(_) => { + // If the app does set_state(NULL) from a block_on() inside its own tokio + // runtime, calling block_on() on our own runtime will cause a panic + // because of nested blocking calls. So, shutdown the task from another + // thread. + // The app's usage is also incorrect since they are blocking the runtime + // on I/O, so emit a warning. + gst::warning!( + CAT, + "Blocking I/O: state change to NULL called from an async \ + tokio context, redirecting to another thread to prevent \ + the tokio panic, but you should refactor your code to \ + make use of gst::Element::call_async and set the state \ + to NULL from there, without blocking the runtime" + ); + let (tx, rx) = std::sync::mpsc::channel(); + self.obj().call_async(move |element| { + tx.send(element.imp().stop()).unwrap(); + }); + rx.recv().unwrap() + } + Err(_) => self.stop(), + } + .map_err(|err_msg| { + self.post_error_message(err_msg); + gst::StateChangeError + })?; + } + _ => (), + } + + Ok(ret) + } +} + +impl BinImpl for RtspSrc {} + +impl URIHandlerImpl for RtspSrc { + const URI_TYPE: gst::URIType = gst::URIType::Src; + + fn protocols() -> &'static [&'static str] { + &["rtsp", "rtspu", "rtspt"] + } + + fn uri(&self) -> Option { + let settings = self.settings.lock().unwrap(); + + settings.location.as_ref().map(Url::to_string) + } + + fn set_uri(&self, uri: &str) -> Result<(), glib::Error> { + self.set_location(Some(uri)) + } +} + +type RtspStream = + Pin, super::tcp_message::ReadError>> + Send>>; +type RtspSink = Pin, Error = std::io::Error> + Send>>; + +impl RtspSrc { + #[track_caller] + fn cmd_queue(&self) -> mpsc::Sender { + self.command_queue.lock().unwrap().as_ref().unwrap().clone() + } + + fn start(&self) -> Result<(), gst::ErrorMessage> { + let Some(url) = self.settings.lock().unwrap().location.clone() else { + return Err(gst::error_msg!( + gst::ResourceError::Settings, + ["No location set"] + )); + }; + + gst::info!( + CAT, + imp: self, + "Location: {url}", + ); + + gst::info!( + CAT, + imp: self, + "Starting RTSP connection thread.. " + ); + + let task_src = self.ref_counted(); + + let mut task_handle = self.task_handle.lock().unwrap(); + + let (tx, rx) = mpsc::channel(1); + { + let mut cmd_queue_opt = self.command_queue.lock().unwrap(); + debug_assert!(cmd_queue_opt.is_none()); + cmd_queue_opt.replace(tx); + } + + let join_handle = RUNTIME.spawn(async move { + gst::info!(CAT, "Connecting to {url} .."); + let hostname_port = + format!("{}:{}", url.host_str().unwrap(), url.port().unwrap_or(554)); + + // TODO: Add TLS support + let s = match TcpStream::connect(hostname_port).await { + Ok(s) => s, + Err(err) => { + gst::element_imp_error!( + task_src, + gst::CoreError::Failed, + ["Failed to connect to RTSP server: {err:#?}"] + ); + return; + } + }; + let _ = s.set_nodelay(true); + + gst::info!(CAT, "Connected!"); + + let (read, write) = s.into_split(); + + let stream = Box::pin(super::tcp_message::async_read(read, MAX_MESSAGE_SIZE).fuse()); + let sink = Box::pin(super::tcp_message::async_write(write)); + + let mut state = RtspTaskState::new(url, stream, sink); + + let task_ret = task_src.rtsp_task(&mut state, rx).await; + gst::info!(CAT, "Exited rtsp_task"); + + // Cleanup after stopping + for h in &state.handles { + h.abort(); + } + for h in state.handles { + let _ = h.await; + } + let obj = task_src.obj(); + for e in obj.iterate_sorted() { + let Ok(e) = e else { + continue; + }; + if let Err(err) = e.set_state(gst::State::Null) { + gst::warning!(CAT, "{} failed to go to Null state: {err:?}", e.name()); + } + } + for pad in obj.src_pads() { + if let Err(err) = obj.remove_pad(&pad) { + gst::warning!(CAT, "Failed to remove pad {}: {err:?}", pad.name()); + } + } + for e in obj.iterate_sorted() { + let Ok(e) = e else { + continue; + }; + if let Err(err) = obj.remove(&e) { + gst::warning!(CAT, "Failed to remove element {}: {err:?}", e.name()); + } + } + + // Post the element error after cleanup + if let Err(err) = task_ret { + gst::element_imp_error!( + task_src, + gst::CoreError::Failed, + ["RTSP task exited: {err:#?}"] + ); + } + gst::info!(CAT, "Cleanup complete"); + }); + + debug_assert!(task_handle.is_none()); + task_handle.replace(join_handle); + + gst::info!(CAT, imp: self, "Started"); + + Ok(()) + } + + fn stop(&self) -> Result<(), gst::ErrorMessage> { + gst::info!(CAT, "Stopping..."); + let cmd_queue = self.cmd_queue(); + let task_handle = { self.task_handle.lock().unwrap().take() }; + + RUNTIME.block_on(async { + let (tx, rx) = oneshot::channel(); + if let Ok(()) = cmd_queue.send(Commands::Teardown(Some(tx))).await { + if let Err(_elapsed) = time::timeout(Duration::from_millis(500), rx).await { + gst::warning!( + CAT, + "Timeout waiting for Teardown, going to NULL asynchronously" + ); + } + } + }); + + if let Some(join_handle) = task_handle { + gst::debug!(CAT, "Waiting for RTSP connection thread to shut down.."); + let _ = RUNTIME.block_on(join_handle); + } + + self.command_queue.lock().unwrap().take(); + + gst::info!(CAT, imp: self, "Stopped"); + + Ok(()) + } + + fn make_rtp_appsrc( + &self, + rtpsession_n: usize, + caps: &gst::Caps, + manager: &RtspManager, + ) -> Result { + let callbacks = gst_app::AppSrcCallbacks::builder() + .enough_data(|appsrc| { + gst::warning!(CAT, "appsrc {} is overrunning: enough data!", appsrc.name()); + }) + .build(); + let appsrc = gst_app::AppSrc::builder() + .name(format!("rtp_appsrc_{rtpsession_n}")) + .format(gst::Format::Time) + .handle_segment_change(true) + .caps(caps) + .stream_type(gst_app::AppStreamType::Stream) + .max_bytes(0) + .max_buffers(0) + .max_time(Some(gst::ClockTime::from_seconds(2))) + .leaky_type(gst_app::AppLeakyType::Downstream) + .callbacks(callbacks) + .is_live(true) + .build(); + let obj = self.obj(); + obj.add(&appsrc)?; + appsrc + .static_pad("src") + .unwrap() + .link(&manager.rtp_recv_sinkpad(rtpsession_n).unwrap())?; + let templ = obj.pad_template("stream_%u").unwrap(); + let ghostpad = gst::GhostPad::builder_from_template(&templ) + .name(format!("stream_{}", rtpsession_n)) + .build(); + gst::info!(CAT, "Adding ghost srcpad {}", ghostpad.name()); + obj.add_pad(&ghostpad) + .expect("Adding a ghostpad should never fail"); + appsrc.sync_state_with_parent()?; + Ok(appsrc) + } + + fn make_rtcp_appsrc( + &self, + rtpsession_n: usize, + manager: &RtspManager, + ) -> Result { + let appsrc = gst_app::AppSrc::builder() + .name(format!("rtcp_appsrc_{rtpsession_n}")) + .format(gst::Format::Time) + .handle_segment_change(true) + .caps(&RTCP_CAPS) + .stream_type(gst_app::AppStreamType::Stream) + .is_live(true) + .build(); + self.obj().add(&appsrc)?; + appsrc + .static_pad("src") + .unwrap() + .link(&manager.rtcp_recv_sinkpad(rtpsession_n).unwrap())?; + appsrc.sync_state_with_parent()?; + Ok(appsrc) + } + + fn make_rtcp_appsink< + F: FnMut(&gst_app::AppSink) -> Result + Send + 'static, + >( + &self, + rtpsession_n: usize, + manager: &RtspManager, + on_rtcp: F, + ) -> Result<()> { + let cmd_tx_eos = self.cmd_queue(); + let cbs = gst_app::app_sink::AppSinkCallbacks::builder() + .eos(move |_appsink| { + let cmd_tx = cmd_tx_eos.clone(); + RUNTIME.spawn(async move { + let _ = cmd_tx.send(Commands::Teardown(None)).await; + }); + }) + .new_sample(on_rtcp) + .build(); + + let rtcp_appsink = gst_app::AppSink::builder() + .name(format!("rtcp_appsink_{rtpsession_n}")) + .sync(false) + .async_(false) + .callbacks(cbs) + .build(); + self.obj().add(&rtcp_appsink)?; + manager + .rtcp_send_srcpad(rtpsession_n) + .unwrap() + .link(&rtcp_appsink.static_pad("sink").unwrap())?; + Ok(()) + } + + fn post_start(&self, code: &str, text: &str) { + let obj = self.obj(); + let msg = gst::message::Progress::builder(gst::ProgressType::Start, code, text) + .src(&*obj) + .build(); + let _ = obj.post_message(msg); + } + + fn post_complete(&self, code: &str, text: &str) { + let obj = self.obj(); + let msg = gst::message::Progress::builder(gst::ProgressType::Complete, code, text) + .src(&*obj) + .build(); + let _ = obj.post_message(msg); + } + + fn post_cancelled(&self, code: &str, text: &str) { + let obj = self.obj(); + let msg = gst::message::Progress::builder(gst::ProgressType::Canceled, code, text) + .src(&*obj) + .build(); + let _ = obj.post_message(msg); + } + + async fn rtsp_task( + &self, + state: &mut RtspTaskState, + mut cmd_rx: mpsc::Receiver, + ) -> Result<()> { + let cmd_tx = self.cmd_queue(); + + let settings = { self.settings.lock().unwrap().clone() }; + + // OPTIONS + state.options().await?; + + // DESCRIBE + state.describe().await?; + + let mut session: Option = None; + // SETUP streams (TCP interleaved) + state.setup_params = { + state + .setup( + &mut session, + settings.port_start, + &settings.protocols, + TransportMode::Play, + ) + .await? + }; + let manager = RtspManager::new(std::env::var("USE_RTPBIN2").is_ok_and(|s| s == "1")); + + let obj = self.obj(); + obj.add(&manager.inner) + .expect("Adding the manager cannot fail"); + manager.inner.sync_state_with_parent().unwrap(); + + let mut tcp_interleave_appsrcs = HashMap::new(); + for (rtpsession_n, p) in state.setup_params.iter_mut().enumerate() { + let (tx, rx) = mpsc::channel(1); + let on_rtcp = move |appsink: &_| on_rtcp_udp(appsink, tx.clone()); + match &mut p.transport { + RtspTransportInfo::UdpMulticast { + dest, + port: (rtp_port, rtcp_port), + ttl, + } => { + let rtp_socket = bind_port(*rtp_port, dest.is_ipv4())?; + let rtcp_socket = rtcp_port.and_then(|p| { + bind_port(p, dest.is_ipv4()) + .map_err(|err| { + gst::warning!(CAT, "Could not bind to RTCP port: {err:?}"); + err + }) + .ok() + }); + + match &dest { + IpAddr::V4(addr) => { + rtp_socket.join_multicast_v4(*addr, Ipv4Addr::UNSPECIFIED)?; + if let Some(ttl) = ttl { + let _ = rtp_socket.set_multicast_ttl_v4(*ttl as u32); + } + if let Some(rtcp_socket) = &rtcp_socket { + if let Err(err) = + rtcp_socket.join_multicast_v4(*addr, Ipv4Addr::UNSPECIFIED) + { + gst::warning!( + CAT, + "Failed to join RTCP multicast address {addr}: {err:?}" + ); + } + if let Some(ttl) = ttl { + let _ = rtcp_socket.set_multicast_ttl_v4(*ttl as u32); + } + } + } + IpAddr::V6(addr) => { + rtp_socket.join_multicast_v6(addr, 0)?; + if let Some(rtcp_socket) = &rtcp_socket { + if let Err(err) = rtcp_socket.join_multicast_v6(addr, 0) { + gst::warning!( + CAT, + "Failed to join RTCP multicast address {addr}: {err:?}" + ); + } + } + } + }; + + let rtp_appsrc = self.make_rtp_appsrc(rtpsession_n, &p.caps, &manager)?; + p.rtp_appsrc = Some(rtp_appsrc.clone()); + // Spawn RTP udpsrc task + state.handles.push(RUNTIME.spawn(async move { + udpsrc_task(&rtp_socket, rtp_appsrc, Some(settings.timeout)).await + })); + + // Spawn RTCP udpsrc task + if let Some(rtcp_socket) = rtcp_socket { + let rtcp_appsrc = self.make_rtcp_appsrc(rtpsession_n, &manager)?; + let socket = Arc::new(rtcp_socket); + let sock = socket.clone(); + state.handles.push( + RUNTIME + .spawn(async move { udpsrc_task(&sock, rtcp_appsrc, None).await }), + ); + // Spawn RTCP RR udpsink task + self.make_rtcp_appsink(rtpsession_n, &manager, on_rtcp)?; + state + .handles + .push(RUNTIME.spawn(async move { udpsink_task(&socket, rx).await })); + } + } + RtspTransportInfo::Udp { + source, + server_port: (server_rtp_port, server_rtcp_port), + client_port: _, + sockets, + } => { + let Some((rtp_socket, rtcp_socket)) = sockets.take() else { + gst::warning!( + CAT, + "Skipping: no UDP sockets for {rtpsession_n}: {:#?}", + p.transport + ); + continue; + }; + + let _ = rtp_socket + .connect(&format!( + "{}:{server_rtp_port}", + source.as_ref().expect("Must have source address") + )) + .await; + + if let (Some(source), Some(port), Some(s)) = + (source, server_rtcp_port, rtcp_socket.as_ref()) + { + let _ = s.connect(&format!("{source}:{port}")).await; + } + + // Spawn RTP udpsrc task + let rtp_appsrc = self.make_rtp_appsrc(rtpsession_n, &p.caps, &manager)?; + p.rtp_appsrc = Some(rtp_appsrc.clone()); + state.handles.push(RUNTIME.spawn(async move { + udpsrc_task(&rtp_socket, rtp_appsrc, Some(settings.timeout)).await + })); + + if let Some(rtcp_socket) = rtcp_socket { + // Spawn RTCP SR udpsrc task + let rtcp_appsrc = self.make_rtcp_appsrc(rtpsession_n, &manager)?; + let socket = Arc::new(rtcp_socket); + let sock = socket.clone(); + state.handles.push( + RUNTIME + .spawn(async move { udpsrc_task(&sock, rtcp_appsrc, None).await }), + ); + // Spawn RTCP RR udpsink task + self.make_rtcp_appsink(rtpsession_n, &manager, on_rtcp)?; + state + .handles + .push(RUNTIME.spawn(async move { udpsink_task(&socket, rx).await })); + } + } + RtspTransportInfo::Tcp { + channels: (rtp_channel, rtcp_channel), + } => { + let rtp_appsrc = self.make_rtp_appsrc(rtpsession_n, &p.caps, &manager)?; + p.rtp_appsrc = Some(rtp_appsrc.clone()); + tcp_interleave_appsrcs.insert(*rtp_channel, rtp_appsrc); + + if let Some(rtcp_channel) = rtcp_channel { + // RTCP SR + let rtcp_appsrc = self.make_rtcp_appsrc(rtpsession_n, &manager)?; + tcp_interleave_appsrcs.insert(*rtcp_channel, rtcp_appsrc.clone()); + // RTCP RR + let rtcp_channel = *rtcp_channel; + let cmd_tx = cmd_tx.clone(); + self.make_rtcp_appsink(rtpsession_n, &manager, move |appsink| { + on_rtcp_tcp(appsink, cmd_tx.clone(), rtcp_channel) + })?; + } + } + } + } + + obj.no_more_pads(); + + // Expose RTP srcpads + manager.inner.connect_pad_added(|manager, pad| { + if pad.direction() != gst::PadDirection::Src { + return; + } + let Some(obj) = manager + .parent() + .and_then(|o| o.downcast::().ok()) + else { + return; + }; + let name = pad.name(); + match *name.split('_').collect::>() { + // rtpbin and rtpbin2 + ["recv", "rtp", "src", stream_id, ssrc, pt] + | ["rtp", "recv", "src", stream_id, ssrc, pt] => { + if stream_id.parse::().is_err() { + gst::info!(CAT, "Ignoring srcpad with invalid stream id: {name}"); + return; + }; + gst::info!(CAT, "Setting rtpbin pad {} as ghostpad target", name); + let srcpad = obj + .static_pad(&format!("stream_{}", stream_id)) + .expect("ghostpad should've been available already"); + let ghostpad = srcpad + .downcast::() + .expect("rtspsrc src pads are ghost pads"); + if let Err(err) = ghostpad.set_target(Some(pad)) { + gst::element_error!( + obj, + gst::ResourceError::Failed, + ( + "Failed to set ghostpad {} target {}: {err:?}", + ghostpad.name(), + name + ), + ["pt: {pt}, ssrc: {ssrc}"] + ); + } + } + _ => { + gst::info!(CAT, "Ignoring unknown srcpad: {name}"); + } + } + }); + + let mut expected_response: Option<(Method, u32)> = None; + loop { + tokio::select! { + msg = state.stream.next() => match msg { + Some(Ok(rtsp_types::Message::Data(data))) => { + let Some(appsrc) = tcp_interleave_appsrcs.get(&data.channel_id()) else { + gst::warning!(CAT, + "ignored data of size {}: unknown channel {}", + data.len(), + data.channel_id() + ); + continue; + }; + let t = appsrc.current_running_time(); + let channel_id = data.channel_id(); + gst::trace!(CAT, "Received data on channel {channel_id}"); + // TODO: this should be from_mut_slice() after making the necessary + // modifications to Body + let mut buffer = gst::Buffer::from_slice(data.into_body()); + let bufref = buffer.make_mut(); + bufref.set_dts(t); + // TODO: Allow unlinked source pads + if let Err(err) = appsrc.push_buffer(buffer) { + gst::error!(CAT, "Failed to push buffer on pad {} for channel {}", appsrc.name(), channel_id); + return Err(err.into()); + } + } + Some(Ok(rtsp_types::Message::Request(req))) => { + // TODO: implement incoming GET_PARAMETER requests + gst::debug!(CAT, "<-- {req:#?}"); + } + Some(Ok(rtsp_types::Message::Response(rsp))) => { + gst::debug!(CAT, "<-- {rsp:#?}"); + let Some((expected, cseq)) = &expected_response else { + continue; + }; + let Some(s) = &session else { + return Err(RtspError::Fatal(format!("Can't handle {:?} response, no SETUP", expected)).into()); + }; + match expected { + Method::Play => { + state.play_response(&rsp, *cseq, s).await?; + self.post_complete("request", "PLAY response received"); + } + Method::Teardown => state.teardown_response(&rsp, *cseq, s).await?, + m => unreachable!("BUG: unexpected response method: {m:?}"), + }; + } + Some(Err(e)) => { + // TODO: reconnect or ignore if UDP sockets are still receiving data + gst::error!(CAT, "I/O error: {e:?}, quitting"); + return Err(gst::FlowError::Error.into()); + } + None => { + // TODO: reconnect or ignore if UDP sockets are still receiving data + gst::error!(CAT, "TCP connection EOF, quitting"); + return Err(gst::FlowError::Eos.into()); + } + }, + Some(cmd) = cmd_rx.recv() => match cmd { + Commands::Play => { + let Some(s) = &session else { + return Err(RtspError::InvalidMessage("Can't PLAY, no SETUP").into()); + }; + self.post_start("request", "PLAY request sent"); + let cseq = state.play(s).await.map_err(|err| { + self.post_cancelled("request", "PLAY request cancelled"); + err + })?; + expected_response = Some((Method::Play, cseq)); + }, + Commands::Teardown(tx) => { + gst::info!(CAT, "Received Teardown command"); + let Some(s) = &session else { + return Err(RtspError::InvalidMessage("Can't TEARDOWN, no SETUP").into()); + }; + let _ = state.teardown(s).await; + if let Some(tx) = tx { + let _ = tx.send(()); + } + break; + } + Commands::Data(data) => { + // We currently only send RTCP RR as data messages, this will change when + // we support TCP ONVIF backchannels + state.sink.send(Message::Data(data)).await?; + gst::debug!(CAT, "Sent RTCP RR over TCP"); + } + }, + else => { + gst::error!(CAT, "No select statement matched, breaking loop"); + break; + } + } + } + Ok(()) + } +} + +struct RtspManager { + inner: gst::Element, + using_rtpbin2: bool, +} + +impl RtspManager { + fn new(rtpbin2: bool) -> Self { + let name = if rtpbin2 { "rtpbin2" } else { "rtpbin" }; + RtspManager { + inner: gst::ElementFactory::make_with_name(name, None) + .unwrap_or_else(|_| panic!("{name} not found")), + using_rtpbin2: rtpbin2, + } + } + + fn rtp_recv_sinkpad(&self, rtpsession: usize) -> Option { + let name = if self.using_rtpbin2 { + format!("rtp_recv_sink_{}", rtpsession) + } else { + format!("recv_rtp_sink_{}", rtpsession) + }; + self.inner.request_pad_simple(&name) + } + + fn rtcp_recv_sinkpad(&self, rtpsession: usize) -> Option { + let name = if self.using_rtpbin2 { + format!("rtcp_recv_sink_{}", rtpsession) + } else { + format!("recv_rtcp_sink_{}", rtpsession) + }; + self.inner.request_pad_simple(&name) + } + + fn rtcp_send_srcpad(&self, rtpsession: usize) -> Option { + let name = if self.using_rtpbin2 { + format!("rtcp_send_src_{}", rtpsession) + } else { + format!("send_rtcp_src_{}", rtpsession) + }; + self.inner.request_pad_simple(&name) + } +} + +struct RtspTaskState { + cseq: u32, + url: Url, + version: Version, + content_base_or_location: Option, + aggregate_control: Option, + sdp: Option, + + stream: + Pin, super::tcp_message::ReadError>> + Send>>, + sink: Pin, Error = std::io::Error> + Send>>, + + setup_params: Vec, + handles: Vec>, +} + +struct RtspSetupParams { + control_url: Url, + transport: RtspTransportInfo, + rtp_appsrc: Option, + caps: gst::Caps, +} + +impl RtspTaskState { + fn new(url: Url, stream: RtspStream, sink: RtspSink) -> Self { + RtspTaskState { + cseq: 0u32, + url, + version: Version::V1_0, + content_base_or_location: None, + aggregate_control: None, + sdp: None, + stream, + sink, + setup_params: Vec::new(), + handles: Vec::new(), + } + } + + fn check_response( + rsp: &Response, + cseq: u32, + req_name: Method, + session: Option<&Session>, + ) -> Result<(), RtspError> { + if rsp.status() != StatusCode::Ok { + return Err(RtspError::Fatal(format!( + "{req_name:?} request failed: {}", + rsp.reason_phrase() + ))); + } + match rsp.typed_header::() { + Ok(Some(v)) => { + if *v != cseq { + return Err(RtspError::InvalidMessage("cseq does not match")); + } + } + Ok(None) => { + gst::warning!( + CAT, + "No cseq in response, continuing... {:#?}", + rsp.headers().collect::>() + ); + } + Err(_) => { + gst::warning!( + CAT, + "Invalid cseq in response, continuing... {:#?}", + rsp.headers().collect::>() + ); + } + }; + if let Some(s) = session { + if let Some(have_s) = rsp.typed_header::()? { + if s.0 != have_s.0 { + return Err(RtspError::Fatal(format!( + "Session in header {} does not match our session {}", + s.0, have_s.0 + ))); + } + } else { + gst::warning!( + CAT, + "No Session header in response, continuing... {:#?}", + rsp.headers().collect::>() + ); + } + } + Ok(()) + } + + async fn options(&mut self) -> Result<(), RtspError> { + self.cseq += 1; + let req = Request::builder(Method::Options, self.version) + .typed_header::(&self.cseq.into()) + .request_uri(self.url.clone()) + .header(USER_AGENT, DEFAULT_USER_AGENT) + .build(Body::default()); + + gst::debug!(CAT, "-->> {req:#?}"); + self.sink.send(req.into()).await?; + + let rsp = match self.stream.next().await { + Some(Ok(rtsp_types::Message::Response(rsp))) => Ok(rsp), + Some(Ok(m)) => Err(RtspError::UnexpectedMessage("OPTIONS response", m)), + Some(Err(e)) => Err(e.into()), + None => Err( + std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "options response").into(), + ), + }?; + gst::debug!(CAT, "<<-- {rsp:#?}"); + Self::check_response(&rsp, self.cseq, Method::Options, None)?; + + let Ok(Some(methods)) = rsp.typed_header::() else { + return Err(RtspError::InvalidMessage( + "OPTIONS response does not contain a valid Public header", + )); + }; + + let needed = [ + Method::Describe, + Method::Setup, + Method::Play, + Method::Teardown, + ]; + let mut unsupported = Vec::new(); + for method in &needed { + if !methods.contains(method) { + unsupported.push(format!("{method:?}")); + } + } + if !unsupported.is_empty() { + Err(RtspError::Fatal(format!( + "Server doesn't support the required method{} {}", + if unsupported.len() == 1 { "" } else { "s:" }, + unsupported.join(",") + ))) + } else { + Ok(()) + } + } + + async fn describe(&mut self) -> Result<(), RtspError> { + self.cseq += 1; + let req = Request::builder(Method::Describe, self.version) + .typed_header::(&self.cseq.into()) + .header(USER_AGENT, DEFAULT_USER_AGENT) + .header(ACCEPT, "application/sdp") + .request_uri(self.url.clone()) + .build(Body::default()); + + gst::debug!(CAT, "-->> {req:#?}"); + self.sink.send(req.into()).await?; + + let rsp = match self.stream.next().await { + Some(Ok(rtsp_types::Message::Response(rsp))) => Ok(rsp), + Some(Ok(m)) => Err(RtspError::UnexpectedMessage("DESCRIBE response", m)), + Some(Err(e)) => Err(e.into()), + None => Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "describe response", + ) + .into()), + }?; + gst::debug!( + CAT, + "<<-- Response {:#?}", + rsp.headers().collect::>() + ); + Self::check_response(&rsp, self.cseq, Method::Describe, None)?; + + self.content_base_or_location = rsp + .header(&CONTENT_BASE) + .or(rsp.header(&CONTENT_LOCATION)) + .map(|v| v.to_string()); + + gst::info!(CAT, "{}", std::str::from_utf8(rsp.body()).unwrap()); + // TODO: read range attribute from SDP for VOD use-cases + let sdp = sdp_types::Session::parse(rsp.body())?; + gst::debug!(CAT, "{sdp:#?}"); + + self.sdp.replace(sdp); + Ok(()) + } + + fn parse_fmtp(fmtp: &str, s: &mut gst::structure::Structure) { + // Non-compliant RTSP servers will incorrectly set these here, ignore them + let ignore_fields = [ + "media", + "payload", + "clock-rate", + "encoding-name", + "encoding-params", + ]; + let encoding_name = s.get::("encoding-name").unwrap(); + let Some((_, fmtp)) = fmtp.split_once(' ') else { + gst::warning!(CAT, "Could not parse fmtp: {fmtp}"); + return; + }; + let iter = fmtp.split(';').map_while(|x| x.split_once('=')); + for (k, v) in iter { + let k = k.trim().to_ascii_lowercase(); + if ignore_fields.contains(&k.as_str()) { + continue; + } + if encoding_name == "H264" && k == "profile-level-id" { + let profile_idc = u8::from_str_radix(&v[0..2], 16); + let csf_idc = u8::from_str_radix(&v[2..4], 16); + let level_idc = u8::from_str_radix(&v[4..6], 16); + if let (Ok(p), Ok(c), Ok(l)) = (profile_idc, csf_idc, level_idc) { + let sps = &[p, c, l]; + let profile = gst_pbutils::codec_utils_h264_get_profile(sps); + let level = gst_pbutils::codec_utils_h264_get_level(sps); + if let (Ok(profile), Ok(level)) = (profile, level) { + s.set("profile", profile); + s.set("level", level); + continue; + } + } + gst::warning!(CAT, "Failed to parse profile-level-id {v}, ignoring..."); + continue; + } + s.set(k, v); + } + } + + fn parse_rtpmap(rtpmap: &str, s: &mut gst::structure::Structure) -> Result<(), RtspError> { + let Some((_, rtpmap)) = rtpmap.split_once(' ') else { + return Err(RtspError::InvalidMessage( + "Could not parse rtpmap: {rtpmap}", + )); + }; + + let mut iter = rtpmap.split('/'); + let Some(encoding_name) = iter.next() else { + return Err(RtspError::InvalidMessage( + "Could not parse encoding-name from rtpmap: {rtpmap}", + )); + }; + s.set("encoding-name", encoding_name); + + let Some(v) = iter.next() else { + return Err(RtspError::InvalidMessage( + "Could not parse clock-rate from rtpmap: {rtpmap}", + )); + }; + + let Ok(clock_rate) = v.parse::() else { + return Err(RtspError::InvalidMessage( + "Could not parse clock-rate from rtpmap: {rtpmap}", + )); + }; + s.set("clock-rate", clock_rate); + + if let Some(v) = iter.next() { + s.set("encoding-params", v); + } + + debug_assert!(iter.next().is_none()); + + Ok(()) + } + + // https://datatracker.ietf.org/doc/html/rfc2326#appendix-C.1.1 + fn parse_control_path(path: &str, base: &Url) -> Option { + match Url::parse(path) { + Ok(v) => Some(v), + Err(url::ParseError::RelativeUrlWithoutBase) => { + if path == "*" { + Some(base.clone()) + } else { + base.join(path).ok() + } + } + Err(_) => None, + } + } + + fn parse_setup_transports( + transports: Transports, + s: &mut gst::Structure, + protocols: &[RtspProtocol], + mode: &TransportMode, + ) -> Result { + let mut last_error = + RtspError::Fatal("No matching transport found matching selected protocols".to_string()); + let mut parsed_transports = Vec::new(); + for transport in transports.iter() { + let Transport::Rtp(t) = transport else { + last_error = + RtspError::Fatal(format!("Expected RTP transport, got {:#?}", transports)); + continue; + }; + // RTSP 2 specifies that we can have multiple SSRCs in the response + // Transport header, but it's not clear why, so we don't support it + if let Some(ssrc) = t.params.ssrc.first() { + s.set("ssrc", ssrc) + } + if !t.params.mode.is_empty() && !t.params.mode.contains(mode) { + last_error = RtspError::Fatal(format!( + "Requested mode {:?} doesn't match server modes: {:?}", + mode, t.params.mode + )); + continue; + } + let parsed = match RtspTransportInfo::try_from(t) { + Ok(p) => p, + Err(err) => { + last_error = err; + continue; + } + }; + parsed_transports.push(parsed); + } + for protocol in protocols { + for n in 0..parsed_transports.len() { + if parsed_transports[n].to_protocol() == *protocol { + let t = parsed_transports.swap_remove(n); + return Ok(t); + } + } + } + Err(last_error) + } + + async fn setup( + &mut self, + session: &mut Option, + port_start: u16, + protocols: &[RtspProtocol], + mode: TransportMode, + ) -> Result, RtspError> { + let sdp = self.sdp.as_ref().expect("Must have SDP by now"); + let base = self + .content_base_or_location + .as_ref() + .and_then(|s| Url::parse(s).ok()) + .unwrap_or_else(|| self.url.clone()); + self.aggregate_control = sdp + .get_first_attribute_value("control") + // No attribute and no value have the same meaning for us + .ok() + .flatten() + .and_then(|v| Self::parse_control_path(v, &base)); + let mut b = gst::Structure::builder("application/x-rtp"); + + let skip_attrs = ["control", "range"]; + for sdp_types::Attribute { attribute, value } in &sdp.attributes { + if skip_attrs.contains(&attribute.as_str()) { + continue; + } + b = b.field(format!("a-{attribute}"), value); + } + let message_structure = b.build(); + + let conn_source = sdp + .connection + .as_ref() + .map(|c| c.connection_address.as_str()) + .filter(|c| !c.is_empty()) + .unwrap_or_else(|| base.host_str().unwrap()); + let mut port_next = port_start; + let mut stream_num = 0; + let mut setup_params: Vec = Vec::new(); + let skip_attrs = ["control", "rtpmap", "fmtp"]; + for m in &sdp.medias { + if !["audio", "video"].contains(&m.media.as_str()) { + gst::info!(CAT, "Ignoring unsupported media {}", m.media); + continue; + } + let media_control = m + .get_first_attribute_value("control") + // No attribute and no value have the same meaning for us + .ok() + .flatten() + .and_then(|v| Self::parse_control_path(v, &base)); + let Some(control_url) = media_control.as_ref().or(self.aggregate_control.as_ref()) + else { + gst::warning!( + CAT, + "No session control or media control for {} fmt {}, ignoring", + m.media, + m.fmt + ); + continue; + }; + + // RTP caps + // FIXME: move SDP -> Caps parsing to a separate file + debug_assert_eq!(m.port, 0); // TCP + let Ok(pt) = m.fmt.parse::() else { + gst::error!(CAT, "Could not parse pt: {}, ignoring media", m.fmt); + continue; + }; + + let mut s = message_structure.clone(); + s.set("media", &m.media); + s.set("payload", pt); + + if let Ok(Some(rtpmap)) = m.get_first_attribute_value("rtpmap") { + Self::parse_rtpmap(rtpmap, &mut s)?; + } else { + gst::warning!(CAT, "No rtpmap for {} {}, skipping", m.media, m.fmt); + continue; + } + + if let Ok(Some(fmtp)) = m.get_first_attribute_value("fmtp") { + Self::parse_fmtp(fmtp, &mut s); + } + + for sdp_types::Attribute { attribute, value } in &m.attributes { + if skip_attrs.contains(&attribute.as_str()) { + continue; + } + // https://github.com/sdroege/sdp-types/issues/17 + if attribute == "ssrc" { + continue; + } + s.set(format!("a-{attribute}"), value); + } + + // TODO: rtcp-fb: fields + + if s.get_optional("encoding-name") == Ok(Some("H264")) { + if s.get_optional("level-asymmetry-allowed") != Ok(Some("0")) + && s.has_field("level") + { + s.remove_field("level"); + } + if s.has_field("level-asymmetry-allowed") { + s.remove_field("level-asymmetry-allowed"); + }; + } + + // SETUP + let mut rtp_socket: Option = None; + let mut rtcp_socket: Option = None; + let mut transports = Vec::new(); + let mut is_ipv4 = true; + let mut conn_protocols = BTreeSet::new(); + for conn in &m.connections { + if conn.nettype != "IN" { + continue; + } + // XXX: For now, assume that all connections use the same addrtype + match conn.addrtype.as_str() { + "IP4" => is_ipv4 = true, + "IP6" => is_ipv4 = false, + _ => continue, + }; + // Strip subnet mask, if any + let addr = if let Some((first, _)) = conn.connection_address.split_once('/') { + first + } else { + conn.connection_address.as_str() + }; + let Ok(addr) = addr.parse::() else { + continue; + }; + // If this is an instance of gst-rtsp-server that only supports + // udp-multicast, it will put the multicast address in the media + // connections field. + if addr.is_multicast() { + conn_protocols.insert(RtspProtocol::UdpMulticast); + } else { + conn_protocols.insert(RtspProtocol::Tcp); + conn_protocols.insert(RtspProtocol::Udp); + } + } + + let protocols = if !conn_protocols.is_empty() { + let p = protocols.iter().cloned().collect::>(); + p.intersection(&conn_protocols).cloned().collect::>() + } else { + protocols.to_owned() + }; + + if protocols.is_empty() { + gst::error!(CAT, "No available protocols left, skipping media"); + continue; + } + + if protocols.contains(&RtspProtocol::UdpMulticast) { + let params = RtpTransportParameters { + mode: vec![mode.clone()], + multicast: true, + ..Default::default() + }; + transports.push(Transport::Rtp(RtpTransport { + profile: RtpProfile::Avp, + lower_transport: Some(RtpLowerTransport::Udp), + params, + })); + } + if protocols.contains(&RtspProtocol::Udp) { + let (sock1, rtp_port) = bind_start_port(port_next, is_ipv4).await; + // Get the actual port that was successfully bound + port_next = rtp_port; + let (sock2, rtcp_port) = bind_start_port(rtp_port + 1, is_ipv4).await; + rtp_socket = Some(sock1); + rtcp_socket = Some(sock2); + let params = RtpTransportParameters { + mode: vec![mode.clone()], + unicast: true, + client_port: Some((rtp_port, Some(rtcp_port))), + ..Default::default() + }; + transports.push(Transport::Rtp(RtpTransport { + profile: RtpProfile::Avp, + lower_transport: Some(RtpLowerTransport::Udp), + params, + })); + } + if protocols.contains(&RtspProtocol::Tcp) { + let params = RtpTransportParameters { + mode: vec![mode.clone()], + interleaved: Some((stream_num, Some(stream_num + 1))), + ..Default::default() + }; + transports.push(Transport::Rtp(RtpTransport { + // RTSP 2.0 adds AVPF and more + profile: RtpProfile::Avp, + lower_transport: Some(RtpLowerTransport::Tcp), + params, + })); + } + + self.cseq += 1; + let req = Request::builder(Method::Setup, self.version) + .typed_header::(&self.cseq.into()) + .header(USER_AGENT, DEFAULT_USER_AGENT) + .typed_header::(&transports.as_slice().into()) + .request_uri(control_url.clone()); + let req = if let Some(s) = session { + req.typed_header::(s) + } else { + req + }; + let req = req.build(Body::default()); + let cseq = self.cseq; + + gst::debug!(CAT, "-->> {req:#?}"); + self.sink.send(req.into()).await?; + + // RTSP 2 supports pipelining of SETUP requests, so this ping-pong would have to be + // reworked if we want to support it. + let rsp = match self.stream.next().await { + Some(Ok(rtsp_types::Message::Response(rsp))) => Ok(rsp), + Some(Ok(m)) => Err(RtspError::UnexpectedMessage("SETUP response", m)), + Some(Err(e)) => Err(e.into()), + None => Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "setup response", + ) + .into()), + }?; + gst::debug!(CAT, "<<-- {rsp:#?}"); + Self::check_response(&rsp, cseq, Method::Setup, session.as_ref())?; + let new_session = rsp + .typed_header::()? + .ok_or(RtspError::InvalidMessage("No session in SETUP response"))?; + // Manually strip timeout field: https://github.com/sdroege/rtsp-types/issues/24 + session.replace(Session(new_session.0, None)); + let mut parsed_transport = if let Some(transports) = rsp.typed_header::()? { + Self::parse_setup_transports(transports, &mut s, &protocols, &mode) + } else { + // FIXME: Transport header in response is optional + // https://datatracker.ietf.org/doc/html/rfc2326#section-12.39 + Err(RtspError::InvalidMessage( + "No transport header in SETUP response", + )) + }?; + match &mut parsed_transport { + RtspTransportInfo::UdpMulticast { .. } => {} + RtspTransportInfo::Udp { + source, + server_port: _, + client_port, + sockets, + } => { + if source.is_none() { + *source = Some(conn_source.to_string()); + } + if let Some((rtp_port, rtcp_port)) = client_port { + // There is no reason for the server to reject the client ports WE + // selected, so if it does, just ignore it. + if *rtp_port != port_next { + gst::warning!( + CAT, + "RTP port changed: {port_next} -> {rtp_port}, ignoring" + ); + *rtp_port = port_next; + } + port_next += 1; + *sockets = if let Some(rtcp_port) = rtcp_port { + if *rtcp_port != port_next { + gst::warning!( + CAT, + "RTCP port changed: {port_next} -> {rtcp_port}, ignoring" + ); + *rtcp_port = port_next; + } + port_next += 1; + Some((rtp_socket.unwrap(), rtcp_socket)) + } else { + Some((rtp_socket.unwrap(), None)) + } + }; + } + RtspTransportInfo::Tcp { + channels: (rtp_ch, rtcp_ch), + } => { + if *rtp_ch != stream_num { + gst::info!(CAT, "RTP channel changed: {stream_num} -> {rtp_ch}"); + } + stream_num += 1; + if let Some(rtcp_ch) = rtcp_ch { + if *rtcp_ch != stream_num { + gst::info!(CAT, "RTCP channel changed: {stream_num} -> {rtcp_ch}"); + } + stream_num += 1; + } + } + }; + let caps = gst::Caps::from(s); + setup_params.push(RtspSetupParams { + control_url: control_url.clone(), + transport: parsed_transport, + rtp_appsrc: None, + caps, + }); + } + Ok(setup_params) + } + + async fn play(&mut self, session: &Session) -> Result { + self.cseq += 1; + let request_uri = self.aggregate_control.as_ref().unwrap_or(&self.url).clone(); + let req = Request::builder(Method::Play, self.version) + .typed_header::(&self.cseq.into()) + .typed_header::(&Range::Npt(NptRange::From(NptTime::Now))) + .header(USER_AGENT, DEFAULT_USER_AGENT) + .request_uri(request_uri) + .typed_header::(session); + + let req = req.build(Body::default()); + gst::debug!(CAT, "-->> {req:#?}"); + self.sink.send(req.into()).await?; + Ok(self.cseq) + } + + async fn play_response( + &mut self, + rsp: &Response, + cseq: u32, + session: &Session, + ) -> Result<(), RtspError> { + Self::check_response(rsp, cseq, Method::Play, Some(session))?; + if let Some(RtpInfos::V1(rtpinfos)) = rsp.typed_header::()? { + for rtpinfo in rtpinfos { + for params in self.setup_params.iter_mut() { + if params.control_url == rtpinfo.uri { + let mut changed = false; + let mut caps = params.rtp_appsrc.as_ref().unwrap().caps().unwrap(); + let capsref = caps.make_mut(); + if let Some(v) = rtpinfo.seq { + capsref.set("seqnum-base", v as u32); + changed = true; + } + if let Some(v) = rtpinfo.rtptime { + capsref.set("clock-base", v); + changed = true; + } + if changed { + params.rtp_appsrc.as_ref().unwrap().set_caps(Some(&caps)); + } + } + } + } + } else { + gst::warning!(CAT, "No RTPInfos V1 header in PLAY response"); + }; + Ok(()) + } + + async fn teardown(&mut self, session: &Session) -> Result { + self.cseq += 1; + let request_uri = self.aggregate_control.as_ref().unwrap_or(&self.url).clone(); + let req = Request::builder(Method::Teardown, self.version) + .typed_header::(&self.cseq.into()) + .header(USER_AGENT, DEFAULT_USER_AGENT) + .request_uri(request_uri) + .typed_header::(session); + + let req = req.build(Body::default()); + gst::debug!(CAT, "-->> {req:#?}"); + self.sink.send(req.into()).await?; + Ok(self.cseq) + } + + async fn teardown_response( + &mut self, + rsp: &Response, + cseq: u32, + session: &Session, + ) -> Result<(), RtspError> { + Self::check_response(rsp, cseq, Method::Teardown, Some(session))?; + Ok(()) + } +} + +fn bind_port(port: u16, is_ipv4: bool) -> Result { + let domain = if is_ipv4 { + socket2::Domain::IPV4 + } else { + socket2::Domain::IPV6 + }; + let sock = Socket::new(domain, socket2::Type::DGRAM, Some(socket2::Protocol::UDP))?; + let _ = sock.set_reuse_address(true); + #[cfg(unix)] + let _ = sock.set_reuse_port(true); + sock.set_nonblocking(true)?; + let addr: SocketAddr = if is_ipv4 { + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port)) + } else { + SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, port, 0, 0)) + }; + sock.bind(&addr.into())?; + let bound_port = if is_ipv4 { + sock.local_addr()?.as_socket_ipv4().unwrap().port() + } else { + sock.local_addr()?.as_socket_ipv6().unwrap().port() + }; + gst::debug!(CAT, "Bound to UDP port {bound_port}"); + + UdpSocket::from_std(sock.into()) +} + +async fn bind_start_port(port: u16, is_ipv4: bool) -> (UdpSocket, u16) { + let mut next_port = port; + loop { + match bind_port(next_port, is_ipv4) { + Ok(socket) => { + if next_port != 0 { + return (socket, next_port); + } + let addr = socket + .local_addr() + .expect("Newly-bound port should not fail"); + return (socket, addr.port()); + } + Err(err) => { + gst::debug!(CAT, "Failed to bind to {next_port}: {err:?}, trying next"); + next_port += 1; + // If we fail too much, panic instead of forever doing a hot-loop + if (next_port - MAX_BIND_PORT_RETRY) > port { + panic!("Failed to allocate any ports from {port} to {next_port}"); + } + } + }; + } +} + +fn on_rtcp_udp( + appsink: &gst_app::AppSink, + tx: mpsc::Sender>, +) -> Result { + let Ok(sample) = appsink.pull_sample() else { + return Err(gst::FlowError::Error); + }; + let Some(buffer) = sample.buffer_owned() else { + return Ok(gst::FlowSuccess::Ok); + }; + let map = buffer.into_mapped_buffer_readable(); + match map { + Ok(map) => match tx.try_send(map) { + Ok(_) => Ok(gst::FlowSuccess::Ok), + Err(mpsc::error::TrySendError::Full(_)) => { + gst::error!(CAT, "Could not send RTCP, channel is full"); + Err(gst::FlowError::Error) + } + Err(mpsc::error::TrySendError::Closed(_)) => Err(gst::FlowError::Eos), + }, + Err(err) => { + gst::error!(CAT, "Failed to map buffer: {err:?}"); + Err(gst::FlowError::Error) + } + } +} + +fn on_rtcp_tcp( + appsink: &gst_app::AppSink, + cmd_tx: mpsc::Sender, + rtcp_channel: u8, +) -> Result { + let Ok(sample) = appsink.pull_sample() else { + return Err(gst::FlowError::Error); + }; + let Some(buffer) = sample.buffer_owned() else { + return Ok(gst::FlowSuccess::Ok); + }; + let map = buffer.into_mapped_buffer_readable(); + match map { + Ok(map) => { + let data: rtsp_types::Data = + rtsp_types::Data::new(rtcp_channel, Body::mapped(map)); + let cmd_tx = cmd_tx.clone(); + RUNTIME.spawn(async move { cmd_tx.send(Commands::Data(data)).await }); + Ok(gst::FlowSuccess::Ok) + } + Err(err) => { + gst::error!(CAT, "Failed to map buffer: {err:?}"); + Err(gst::FlowError::Error) + } + } +} + +async fn udpsrc_task(socket: &UdpSocket, appsrc: gst_app::AppSrc, timeout: Option) { + // TODO: this should allocate a buffer pool to avoid a copy + let mut buf = vec![0; UDP_PACKET_MAX_SIZE]; + let t = Duration::from_secs(timeout.unwrap_or(gst::ClockTime::MAX).into()); + loop { + match time::timeout(t, socket.recv(&mut buf)).await { + Ok(Ok(len)) => { + let t = appsrc.current_running_time(); + let mut buffer = gst::Buffer::from_slice(buf[..len].to_owned()); + let bufref = buffer.make_mut(); + bufref.set_dts(t); + if let Err(err) = appsrc.push_buffer(buffer) { + gst::element_error!( + appsrc, + gst::ResourceError::Failed, + ("UDP buffer push failed: {:?}", err), + ["{:#?}", socket] + ); + break; + } + } + Ok(Err(_elapsed)) => { + gst::element_error!( + appsrc, + gst::ResourceError::Failed, + ["No data received after {DEFAULT_TIMEOUT} seconds, exiting"] + ); + break; + } + Err(err) => { + gst::element_error!( + appsrc, + gst::ResourceError::Close, + ("UDP socket was closed: {:?}", err), + ["{:#?}", socket] + ); + break; + } + }; + } +} + +async fn udpsink_task(socket: &UdpSocket, mut rx: mpsc::Receiver>) { + loop { + match rx.recv().await { + Some(data) => match socket.send(data.as_ref()).await { + Ok(_) => { + gst::debug!(CAT, "Sent RTCP RR"); + } + Err(err) => { + gst::error!(CAT, "UDP socket send error: {err:?}, quitting loop"); + rx.close(); + break; + } + }, + None => { + gst::info!(CAT, "UDP socket {socket:?} closed, quitting loop"); + rx.close(); + break; + } + }; + } +} + +#[glib::object_subclass] +impl ObjectSubclass for RtspSrc { + const NAME: &'static str = "GstRtspSrc2"; + type Type = super::RtspSrc; + type ParentType = gst::Bin; + type Interfaces = (gst::URIHandler,); +} diff --git a/net/rtsp/src/rtspsrc/mod.rs b/net/rtsp/src/rtspsrc/mod.rs new file mode 100644 index 00000000..dd2ed94a --- /dev/null +++ b/net/rtsp/src/rtspsrc/mod.rs @@ -0,0 +1,30 @@ +// GStreamer RTSP Source v2 +// +// Copyright (C) 2023 Tim-Philipp Müller +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +mod body; +mod imp; +mod tcp_message; +mod transport; + +glib::wrapper! { + pub struct RtspSrc(ObjectSubclass) @extends gst::Bin, gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "rtspsrc2", + gst::Rank::NONE, + RtspSrc::static_type(), + ) +} diff --git a/net/rtsp/src/rtspsrc/tcp_message.rs b/net/rtsp/src/rtspsrc/tcp_message.rs new file mode 100644 index 00000000..314617e4 --- /dev/null +++ b/net/rtsp/src/rtspsrc/tcp_message.rs @@ -0,0 +1,202 @@ +// GStreamer RTSP Source 2 +// +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use std::fmt; +use std::marker::Unpin; + +use futures::{Sink, Stream}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; + +use super::body::Body; +use rtsp_types::Message; + +#[derive(Debug)] +pub enum ReadError { + Io(std::io::Error), + TooBig, + ParseError, +} + +impl std::error::Error for ReadError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + ReadError::Io(ref io) => Some(io), + _ => None, + } + } +} + +impl From for ReadError { + fn from(err: std::io::Error) -> Self { + ReadError::Io(err) + } +} + +impl fmt::Display for ReadError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ReadError::Io(ref io) => fmt::Display::fmt(io, fmt), + ReadError::TooBig => write!(fmt, "Too big message"), + ReadError::ParseError => write!(fmt, "Parse error"), + } + } +} + +pub(crate) fn async_read( + read: R, + max_size: usize, +) -> impl Stream, ReadError>> + Send { + const INITIAL_BUF_SIZE: usize = 8192; + const MAX_EMPTY_BUF_SIZE: usize = 8 * INITIAL_BUF_SIZE; + + struct State { + read: R, + buf: Vec, + write_pos: usize, + // If > 0 then we first need to try parsing as there might be more messages + read_pos: usize, + } + + let state = State { + read, + buf: vec![0; INITIAL_BUF_SIZE], + write_pos: 0, + read_pos: 0, + }; + + futures::stream::unfold(Some(state), move |mut state| async move { + let State { + mut read, + mut buf, + mut write_pos, + mut read_pos, + } = state.take()?; + + let read_one = async { + loop { + assert!(read_pos <= write_pos); + + // First check if there are more messages left in the buffer + if read_pos != write_pos { + assert_ne!(read_pos, write_pos); + match Message::::parse(&buf[read_pos..write_pos]) { + Ok((msg, consumed)) => { + read_pos += consumed; + + // Need to first read more data on the next call + if read_pos == write_pos { + read_pos = 0; + write_pos = 0; + } + + gst::trace!(super::imp::CAT, "Read message {:?}", msg); + + return Ok((Some(msg), write_pos, read_pos)); + } + Err(rtsp_types::ParseError::Error) => return Err(ReadError::ParseError), + Err(rtsp_types::ParseError::Incomplete(_)) => { + if read_pos > 0 { + // Not a complete message left, copy to the beginning and read more + // data + buf.copy_within(read_pos..write_pos, 0); + write_pos -= read_pos; + read_pos = 0; + + // Shrink the buffer again if possible and needed + if buf.len() > MAX_EMPTY_BUF_SIZE && write_pos < MAX_EMPTY_BUF_SIZE + { + buf.resize(MAX_EMPTY_BUF_SIZE, 0); + } + } + } + } + } + + assert_eq!(read_pos, 0); + + if write_pos == max_size { + gst::error!(super::imp::CAT, "Message bigger than maximum {}", max_size); + return Err(ReadError::TooBig); + } + + // Grow the buffer if needed up to the maximum + let new_size = std::cmp::min( + max_size, + buf.len().checked_next_power_of_two().unwrap_or(usize::MAX), + ); + if buf.len() < new_size { + buf.resize(new_size, 0); + } + + let b = read.read(&mut buf[write_pos..]).await?; + + if b == 0 { + gst::debug!(super::imp::CAT, "Connection closed"); + return Ok((None, write_pos, read_pos)); + } + write_pos += b; + + // Try parsing on the next iteration + } + }; + + match read_one.await { + Ok((Some(msg), write_pos, read_pos)) => Some(( + Ok(msg), + Some(State { + read, + buf, + write_pos, + read_pos, + }), + )), + Ok((None, _, _)) => None, + Err(err) => { + gst::error!(super::imp::CAT, "Read error {}", err); + Some((Err(err), None)) + } + } + }) +} + +pub(crate) fn async_write( + write: W, +) -> impl Sink, Error = std::io::Error> + Send { + struct State { + write: W, + buffer: Vec, + } + + let state = State { + write, + buffer: Vec::with_capacity(8192), + }; + + futures::sink::unfold(state, |mut state, item: Message| { + async move { + gst::trace!(super::imp::CAT, "Writing message {:?}", item); + + // TODO: Write data messages more efficiently by writing header / body separately + state.buffer.clear(); + item.write(&mut state.buffer).expect("can't fail"); + + match state.write.write_all(&state.buffer).await { + Ok(_) => { + gst::trace!(super::imp::CAT, "Finished writing queued message"); + Ok(state) + } + Err(err) => { + gst::error!(super::imp::CAT, "Write error {}", err); + Err(err) + } + } + } + }) +} diff --git a/net/rtsp/src/rtspsrc/transport.rs b/net/rtsp/src/rtspsrc/transport.rs new file mode 100644 index 00000000..f4573902 --- /dev/null +++ b/net/rtsp/src/rtspsrc/transport.rs @@ -0,0 +1,105 @@ +// GStreamer RTSP Source 2 +// +// Copyright (C) 2023-2024 Nirbheek Chauhan +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 +// +// https://www.rfc-editor.org/rfc/rfc2326.html + +use super::imp::{RtspError, RtspProtocol}; +use rtsp_types::headers::{transport::RtpTransport, RtpLowerTransport}; +use std::{convert::TryFrom, net::IpAddr}; +use tokio::net::UdpSocket; + +#[derive(Debug)] +pub enum RtspTransportInfo { + Tcp { + channels: (u8, Option), + }, + Udp { + source: Option, + server_port: (u16, Option), + client_port: Option<(u16, Option)>, + sockets: Option<(UdpSocket, Option)>, + }, + UdpMulticast { + dest: IpAddr, + port: (u16, Option), + ttl: Option, + }, +} + +impl TryFrom<&RtpTransport> for RtspTransportInfo { + type Error = RtspError; + + fn try_from(t: &RtpTransport) -> Result { + match &t.lower_transport { + Some(RtpLowerTransport::Tcp) => match t.params.interleaved { + Some(v) => Ok(RtspTransportInfo::Tcp { channels: v }), + None => Err(RtspError::Fatal(format!( + "Expected interleaved channels: {t:#?}", + ))), + }, + Some(RtpLowerTransport::Udp) | None => { + if t.params.multicast { + let dest = if let Some(d) = t.params.destination.as_ref() { + match d.parse::() { + Ok(d) => d, + Err(err) => { + return Err(RtspError::Fatal(format!( + "Failed to parse multicast dest addr: {err:?}" + ))); + } + } + } else { + return Err(RtspError::Fatal(format!( + "Need multicast dest addr: {:#?}", + t.params, + ))); + }; + let Some(port) = t.params.port else { + return Err(RtspError::Fatal(format!( + "Need multicast UDP port(s): {:#?}", + t.params, + ))); + }; + Ok(RtspTransportInfo::UdpMulticast { + dest, + port, + ttl: t.params.ttl, + }) + } else { + let Some(server_port) = t.params.server_port else { + return Err(RtspError::Fatal(format!( + "Need server unicast UDP port(s): {:#?}", + t.params, + ))); + }; + Ok(RtspTransportInfo::Udp { + source: t.params.source.clone(), + server_port, + client_port: t.params.client_port, + sockets: None, + }) + } + } + Some(RtpLowerTransport::Other(token)) => Err(RtspError::Fatal(format!( + "Unsupported RTP lower transport {token:?}" + ))), + } + } +} + +impl RtspTransportInfo { + pub fn to_protocol(&self) -> RtspProtocol { + match &self { + Self::Tcp { .. } => RtspProtocol::Tcp, + Self::Udp { .. } => RtspProtocol::Udp, + Self::UdpMulticast { .. } => RtspProtocol::UdpMulticast, + } + } +}