From 7835d78b3dc1f25d2641832ac25e05176e46ecd6 Mon Sep 17 00:00:00 2001 From: Seungha Yang Date: Fri, 1 Sep 2023 00:20:08 +0900 Subject: [PATCH] hlssink3: Add hlscmafsink element Adding cmafmux based hls sink element Part-of: --- net/hlssink3/Cargo.toml | 1 + net/hlssink3/src/imp.rs | 504 ++++++++++++++++++++++++++++++++++- net/hlssink3/src/lib.rs | 11 + net/hlssink3/src/playlist.rs | 21 +- 4 files changed, 527 insertions(+), 10 deletions(-) diff --git a/net/hlssink3/Cargo.toml b/net/hlssink3/Cargo.toml index 7fa520d7..8a55104b 100644 --- a/net/hlssink3/Cargo.toml +++ b/net/hlssink3/Cargo.toml @@ -10,6 +10,7 @@ rust-version = "1.70" [dependencies] gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +gst_app = { package = "gstreamer-app", git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gio = { git = "https://github.com/gtk-rs/gtk-rs-core" } once_cell = "1.7.2" m3u8-rs = "5.0" diff --git a/net/hlssink3/src/imp.rs b/net/hlssink3/src/imp.rs index 57107e82..86b12d4a 100644 --- a/net/hlssink3/src/imp.rs +++ b/net/hlssink3/src/imp.rs @@ -1,4 +1,5 @@ // Copyright (C) 2021 Rafael Caricio +// Copyright (C) 2023 Seungha Yang // // This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. // If a copy of the MPL was not distributed with this file, You can obtain one at @@ -20,7 +21,9 @@ use std::io::Write; use std::path; use std::sync::Mutex; -const DEFAULT_LOCATION: &str = "segment%05d.ts"; +const DEFAULT_TS_LOCATION: &str = "segment%05d.ts"; +const DEFAULT_INIT_LOCATION: &str = "init%05d.mp4"; +const DEFAULT_CMAF_LOCATION: &str = "segment%05d.m4s"; const DEFAULT_PLAYLIST_LOCATION: &str = "playlist.m3u8"; const DEFAULT_MAX_NUM_SEGMENT_FILES: u32 = 10; const DEFAULT_TARGET_DURATION: u32 = 15; @@ -30,8 +33,12 @@ const DEFAULT_I_FRAMES_ONLY_PLAYLIST: bool = false; const DEFAULT_PROGRAM_DATE_TIME_TAG: bool = false; const DEFAULT_CLOCK_TRACKING_FOR_PDT: bool = true; const DEFAULT_SEND_KEYFRAME_REQUESTS: bool = true; +const DEFAULT_SYNC: bool = true; +const DEFAULT_LATENCY: gst::ClockTime = + gst::ClockTime::from_mseconds((DEFAULT_TARGET_DURATION * 500) as u64); const SIGNAL_GET_PLAYLIST_STREAM: &str = "get-playlist-stream"; +const SIGNAL_GET_INIT_STREAM: &str = "get-init-stream"; const SIGNAL_GET_FRAGMENT_STREAM: &str = "get-fragment-stream"; const SIGNAL_DELETE_FRAGMENT: &str = "delete-fragment"; @@ -537,7 +544,7 @@ impl HlsBaseSink { Ok(gst::FlowSuccess::Ok) } - fn new_file_stream

(&self, location: &P) -> Result + pub fn new_file_stream

(&self, location: &P) -> Result where P: AsRef, { @@ -607,7 +614,7 @@ impl Default for HlsSink3Settings { .build() .expect("Could not make element splitmuxsink"); Self { - location: String::from(DEFAULT_LOCATION), + location: String::from(DEFAULT_TS_LOCATION), target_duration: DEFAULT_TARGET_DURATION, playlist_type: None, send_keyframe_requests: DEFAULT_SEND_KEYFRAME_REQUESTS, @@ -648,7 +655,7 @@ impl ObjectImpl for HlsSink3 { glib::ParamSpecString::builder("location") .nick("File Location") .blurb("Location of the file to write") - .default_value(Some(DEFAULT_LOCATION)) + .default_value(Some(DEFAULT_TS_LOCATION)) .build(), glib::ParamSpecUInt::builder("target-duration") .nick("Target duration") @@ -682,7 +689,7 @@ impl ObjectImpl for HlsSink3 { settings.location = value .get::>() .expect("type checked upstream") - .unwrap_or_else(|| DEFAULT_LOCATION.into()); + .unwrap_or_else(|| DEFAULT_TS_LOCATION.into()); settings .splitmuxsink .set_property("location", &settings.location); @@ -1007,7 +1014,7 @@ impl HlsSink3 { ..Default::default() }; - Playlist::new(playlist, turn_vod) + Playlist::new(playlist, turn_vod, false) } fn on_format_location( @@ -1081,3 +1088,488 @@ impl HlsSink3 { ); } } + +struct HlsCmafSinkSettings { + init_location: String, + location: String, + target_duration: u32, + playlist_type: Option, + sync: bool, + latency: gst::ClockTime, + + cmafmux: gst::Element, + appsink: gst_app::AppSink, +} + +impl Default for HlsCmafSinkSettings { + fn default() -> Self { + let cmafmux = gst::ElementFactory::make("cmafmux") + .name("muxer") + .property( + "fragment-duration", + gst::ClockTime::from_seconds(DEFAULT_TARGET_DURATION as u64), + ) + .property("latency", DEFAULT_LATENCY) + .build() + .expect("Could not make element cmafmux"); + let appsink = gst_app::AppSink::builder() + .buffer_list(true) + .sync(DEFAULT_SYNC) + .name("sink") + .build(); + + Self { + init_location: String::from(DEFAULT_INIT_LOCATION), + location: String::from(DEFAULT_CMAF_LOCATION), + target_duration: DEFAULT_TARGET_DURATION, + playlist_type: None, + sync: DEFAULT_SYNC, + latency: DEFAULT_LATENCY, + cmafmux, + appsink, + } + } +} + +#[derive(Default)] +struct HlsCmafSinkState { + init_idx: u32, + segment_idx: u32, + init_segment: Option, + new_header: bool, +} + +#[derive(Default)] +pub struct HlsCmafSink { + settings: Mutex, + state: Mutex, +} + +#[glib::object_subclass] +impl ObjectSubclass for HlsCmafSink { + const NAME: &'static str = "GstHlsCmafSink"; + type Type = super::HlsCmafSink; + type ParentType = super::HlsBaseSink; +} + +impl ObjectImpl for HlsCmafSink { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecString::builder("init-location") + .nick("Init Location") + .blurb("Location of the init fragment file to write") + .default_value(Some(DEFAULT_INIT_LOCATION)) + .build(), + glib::ParamSpecString::builder("location") + .nick("Location") + .blurb("Location of the fragment file to write") + .default_value(Some(DEFAULT_CMAF_LOCATION)) + .build(), + glib::ParamSpecUInt::builder("target-duration") + .nick("Target duration") + .blurb("The target duration in seconds of a segment/file. (0 - disabled, useful for management of segment duration by the streaming server)") + .default_value(DEFAULT_TARGET_DURATION) + .mutable_ready() + .build(), + glib::ParamSpecEnum::builder_with_default("playlist-type", DEFAULT_PLAYLIST_TYPE) + .nick("Playlist Type") + .blurb("The type of the playlist to use. When VOD type is set, the playlist will be live until the pipeline ends execution.") + .mutable_ready() + .build(), + glib::ParamSpecBoolean::builder("sync") + .nick("Sync") + .blurb("Sync on the clock") + .default_value(DEFAULT_SYNC) + .build(), + glib::ParamSpecUInt64::builder("latency") + .nick("Latency") + .blurb( + "Additional latency to allow upstream to take longer to \ + produce buffers for the current position (in nanoseconds)", + ) + .maximum(i64::MAX as u64) + .default_value(DEFAULT_LATENCY.nseconds()) + .build(), + ] + }); + + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + let mut settings = self.settings.lock().unwrap(); + match pspec.name() { + "init-location" => { + settings.init_location = value + .get::>() + .expect("type checked upstream") + .unwrap_or_else(|| DEFAULT_INIT_LOCATION.into()); + } + "location" => { + settings.location = value + .get::>() + .expect("type checked upstream") + .unwrap_or_else(|| DEFAULT_CMAF_LOCATION.into()); + } + "target-duration" => { + settings.target_duration = value.get().expect("type checked upstream"); + settings.cmafmux.set_property( + "fragment-duration", + gst::ClockTime::from_seconds(settings.target_duration as u64), + ); + } + "playlist-type" => { + settings.playlist_type = value + .get::() + .expect("type checked upstream") + .into(); + } + "sync" => { + settings.sync = value.get().expect("type checked upstream"); + settings.appsink.set_property("sync", settings.sync); + } + "latency" => { + settings.latency = value.get().expect("type checked upstream"); + settings.cmafmux.set_property("latency", settings.latency); + } + _ => unimplemented!(), + }; + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + let settings = self.settings.lock().unwrap(); + match pspec.name() { + "init-location" => settings.init_location.to_value(), + "location" => settings.location.to_value(), + "target-duration" => settings.target_duration.to_value(), + "playlist-type" => { + let playlist_type: HlsSink3PlaylistType = settings.playlist_type.as_ref().into(); + playlist_type.to_value() + } + "sync" => settings.sync.to_value(), + "latency" => settings.latency.to_value(), + _ => unimplemented!(), + } + } + + fn signals() -> &'static [glib::subclass::Signal] { + static SIGNALS: Lazy> = Lazy::new(|| { + vec![glib::subclass::Signal::builder(SIGNAL_GET_INIT_STREAM) + .param_types([String::static_type()]) + .return_type::>() + .class_handler(|_, args| { + let elem = args[0].get::().expect("signal arg"); + let init_location = args[1].get::().expect("signal arg"); + let imp = elem.imp(); + + Some(imp.new_file_stream(&init_location).ok().to_value()) + }) + .accumulator(|_hint, ret, value| { + // First signal handler wins + *ret = value.clone(); + false + }) + .build()] + }); + + SIGNALS.as_ref() + } + + fn constructed(&self) { + self.parent_constructed(); + + let obj = self.obj(); + let settings = self.settings.lock().unwrap(); + + obj.add_many([&settings.cmafmux, settings.appsink.upcast_ref()]) + .unwrap(); + settings.cmafmux.link(&settings.appsink).unwrap(); + + let sinkpad = settings.cmafmux.static_pad("sink").unwrap(); + let gpad = gst::GhostPad::with_target(&sinkpad).unwrap(); + + obj.add_pad(&gpad).unwrap(); + + let self_weak = self.downgrade(); + settings.appsink.set_callbacks( + gst_app::AppSinkCallbacks::builder() + .new_sample(move |sink| { + let imp = match self_weak.upgrade() { + Some(imp) => imp, + _ => return Err(gst::FlowError::Eos), + }; + + let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?; + imp.on_new_sample(sample) + }) + .build(), + ); + } +} + +impl GstObjectImpl for HlsCmafSink {} + +impl ElementImpl for HlsCmafSink { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "HTTP Live Streaming CMAF Sink", + "Sink/Muxer", + "HTTP Live Streaming CMAF Sink", + "Seungha Yang ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &[ + gst::Structure::builder("video/x-h264") + .field("stream-format", gst::List::new(["avc", "avc3"])) + .field("alignment", "au") + .field("width", gst::IntRange::new(1, u16::MAX as i32)) + .field("height", gst::IntRange::new(1, u16::MAX as i32)) + .build(), + gst::Structure::builder("video/x-h265") + .field("stream-format", gst::List::new(["hvc1", "hev1"])) + .field("alignment", "au") + .field("width", gst::IntRange::new(1, u16::MAX as i32)) + .field("height", gst::IntRange::new(1, u16::MAX as i32)) + .build(), + gst::Structure::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("stream-format", "raw") + .field("channels", gst::IntRange::new(1, u16::MAX as i32)) + .field("rate", gst::IntRange::new(1, i32::MAX)) + .build(), + ] + .into_iter() + .collect::(), + ) + .unwrap(); + + vec![pad_template] + }); + + PAD_TEMPLATES.as_ref() + } + + fn change_state( + &self, + transition: gst::StateChange, + ) -> Result { + if transition == gst::StateChange::ReadyToPaused { + let (target_duration, playlist_type, segment_template) = { + let settings = self.settings.lock().unwrap(); + ( + settings.target_duration, + settings.playlist_type.clone(), + settings.location.clone(), + ) + }; + + let playlist = self.start(target_duration, playlist_type); + base_imp!(self).open_playlist(playlist, segment_template); + } + + self.parent_change_state(transition) + } +} + +impl BinImpl for HlsCmafSink {} + +impl HlsBaseSinkImpl for HlsCmafSink {} + +impl HlsCmafSink { + fn start(&self, target_duration: u32, playlist_type: Option) -> Playlist { + gst::info!(CAT, imp: self, "Starting"); + + let mut state = self.state.lock().unwrap(); + *state = HlsCmafSinkState::default(); + + let (turn_vod, playlist_type) = if playlist_type == Some(MediaPlaylistType::Vod) { + (true, Some(MediaPlaylistType::Event)) + } else { + (false, playlist_type) + }; + + let playlist = MediaPlaylist { + version: Some(6), + target_duration: target_duration as f32, + playlist_type, + independent_segments: true, + ..Default::default() + }; + + Playlist::new(playlist, turn_vod, true) + } + + fn on_init_segment(&self) -> Result, String> { + let settings = self.settings.lock().unwrap(); + let mut state = self.state.lock().unwrap(); + let location = match sprintf::sprintf!(&settings.init_location, state.init_idx) { + Ok(location) => location, + Err(err) => { + gst::error!( + CAT, + imp: self, + "Couldn't build file name, err: {:?}", err, + ); + return Err(String::from("Invalid init segment file pattern")); + } + }; + + let stream = self + .obj() + .emit_by_name::>(SIGNAL_GET_INIT_STREAM, &[&location]) + .ok_or_else(|| String::from("Error while getting fragment stream"))? + .into_write(); + + let uri = base_imp!(self).get_segment_uri(&location); + + state.init_segment = Some(m3u8_rs::Map { + uri, + ..Default::default() + }); + state.new_header = true; + state.init_idx += 1; + + Ok(stream) + } + + fn on_new_fragment( + &self, + ) -> Result<(gio::OutputStreamWrite, String), String> { + let mut state = self.state.lock().unwrap(); + let (stream, location) = base_imp!(self) + .get_fragment_stream(state.segment_idx) + .ok_or_else(|| String::from("Error while getting fragment stream"))?; + + state.segment_idx += 1; + + Ok((stream.into_write(), location)) + } + + fn add_segment( + &self, + duration: f32, + running_time: Option, + location: String, + ) -> Result { + let uri = base_imp!(self).get_segment_uri(&location); + let mut state = self.state.lock().unwrap(); + + let map = if state.new_header { + state.new_header = false; + state.init_segment.clone() + } else { + None + }; + + base_imp!(self).add_segment( + &location, + running_time, + MediaSegment { + uri, + duration, + map, + ..Default::default() + }, + ) + } + + fn on_new_sample(&self, sample: gst::Sample) -> Result { + let mut buffer_list = sample.buffer_list_owned().unwrap(); + let mut first = buffer_list.get(0).unwrap(); + + if first + .flags() + .contains(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER) + { + let mut stream = self.on_init_segment().map_err(|err| { + gst::error!( + CAT, + imp: self, + "Couldn't get output stream for init segment, {err}", + ); + gst::FlowError::Error + })?; + + let map = first.map_readable().unwrap(); + stream.write(&map).map_err(|_| { + gst::error!( + CAT, + imp: self, + "Couldn't write init segment to output stream", + ); + gst::FlowError::Error + })?; + + stream.flush().map_err(|_| { + gst::error!( + CAT, + imp: self, + "Couldn't flush output stream", + ); + gst::FlowError::Error + })?; + + drop(map); + + buffer_list.make_mut().remove(0, 1); + if buffer_list.is_empty() { + return Ok(gst::FlowSuccess::Ok); + } + + first = buffer_list.get(0).unwrap(); + } + + let segment = sample + .segment() + .unwrap() + .downcast_ref::() + .unwrap(); + let running_time = segment.to_running_time(first.pts().unwrap()); + let dur = first.duration().unwrap(); + + let (mut stream, location) = self.on_new_fragment().map_err(|err| { + gst::error!( + CAT, + imp: self, + "Couldn't get output stream for segment, {err}", + ); + gst::FlowError::Error + })?; + + for buffer in &*buffer_list { + let map = buffer.map_readable().unwrap(); + + stream.write(&map).map_err(|_| { + gst::error!( + CAT, + imp: self, + "Couldn't write segment to output stream", + ); + gst::FlowError::Error + })?; + } + + stream.flush().map_err(|_| { + gst::error!( + CAT, + imp: self, + "Couldn't flush output stream", + ); + gst::FlowError::Error + })?; + + self.add_segment(dur.mseconds() as f32 / 1_000f32, running_time, location) + } +} diff --git a/net/hlssink3/src/lib.rs b/net/hlssink3/src/lib.rs index 91cc3006..295fe45b 100644 --- a/net/hlssink3/src/lib.rs +++ b/net/hlssink3/src/lib.rs @@ -50,6 +50,10 @@ glib::wrapper! { pub struct HlsSink3(ObjectSubclass) @extends HlsBaseSink, gst::Bin, gst::Element, gst::Object; } +glib::wrapper! { + pub struct HlsCmafSink(ObjectSubclass) @extends HlsBaseSink, gst::Bin, gst::Element, gst::Object; +} + pub fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { #[cfg(feature = "doc")] { @@ -64,6 +68,13 @@ pub fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { HlsSink3::static_type(), )?; + gst::Element::register( + Some(plugin), + "hlscmafsink", + gst::Rank::None, + HlsCmafSink::static_type(), + )?; + Ok(()) } diff --git a/net/hlssink3/src/playlist.rs b/net/hlssink3/src/playlist.rs index a8e39cfe..2c750523 100644 --- a/net/hlssink3/src/playlist.rs +++ b/net/hlssink3/src/playlist.rs @@ -20,15 +20,17 @@ pub struct Playlist { playlist_index: u64, status: PlaylistRenderState, turn_vod: bool, + is_cmaf: bool, } impl Playlist { - pub fn new(playlist: MediaPlaylist, turn_vod: bool) -> Self { + pub fn new(playlist: MediaPlaylist, turn_vod: bool, is_cmaf: bool) -> Self { Self { inner: playlist, playlist_index: 0, status: PlaylistRenderState::Init, turn_vod, + is_cmaf, } } @@ -50,9 +52,20 @@ impl Playlist { } // Remove oldest segments if playlist is at maximum expected capacity - if max_playlist_length > 0 && self.inner.segments.len() > max_playlist_length { - let remove_len = self.inner.segments.len() - max_playlist_length; - self.inner.segments.drain(0..remove_len); + if max_playlist_length > 0 { + if self.is_cmaf { + // init segment uri will be specified only if it's updated + // or in case of the very first segment. + while self.inner.segments.len() > max_playlist_length { + let to_remove = self.inner.segments.remove(0); + if self.inner.segments[0].map.is_none() { + self.inner.segments[0].map = to_remove.map.clone() + } + } + } else if self.inner.segments.len() > max_playlist_length { + let remove_len = self.inner.segments.len() - max_playlist_length; + self.inner.segments.drain(0..remove_len); + } } self.playlist_index += 1;