Add hlssink3 plugin

This commit is contained in:
Rafael Caricio 2021-05-21 19:44:03 +02:00 committed by Sebastian Dröge
parent b738d5933d
commit e87a7afe3e
10 changed files with 1478 additions and 0 deletions

View file

@ -11,6 +11,7 @@ members = [
"generic/file",
"generic/sodium",
"generic/threadshare",
"net/hlssink3",
"net/reqwest",
"net/rusoto",
"utils/fallbackswitch",

View file

@ -44,6 +44,7 @@ plugins = {
'gst-plugin-lewton': 'libgstlewton',
'gst-plugin-rav1e': 'libgstrav1e',
'gst-plugin-reqwest': 'libgstreqwest',
'gst-plugin-hlssink3': 'libgsthlssink3',
'gst-plugin-rspng': 'libgstrspng',
'gst-plugin-rusoto': 'libgstrusoto',
'gst-plugin-textwrap': 'libgstrstextwrap',

49
net/hlssink3/Cargo.toml Normal file
View file

@ -0,0 +1,49 @@
[package]
name = "gst-plugin-hlssink3"
description = "HLS (HTTP Live Streaming) Plugin"
repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
version = "0.1.0"
authors = ["Rafael Caricio <rafael@caricio.com>"]
edition = "2021"
license = "MPL-2.0"
rust-version = "1.56"
[dependencies]
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_10"] }
gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
glib = { git = "https://github.com/gtk-rs/gtk-rs-core" }
gio = { git = "https://github.com/gtk-rs/gtk-rs-core" }
once_cell = "1.7.2"
m3u8-rs = "2.0.0"
regex = "1"
[dev-dependencies]
gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
[build-dependencies]
gst-plugin-version-helper = { git = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" }
[lib]
name = "gsthlssink3"
crate-type = ["cdylib", "rlib"]
path = "src/lib.rs"
[features]
# GStreamer 1.14 is required for static linking
static = ["gst/v1_14"]
capi = []
[package.metadata.capi]
min_version = "0.8.0"
[package.metadata.capi.header]
enabled = false
[package.metadata.capi.library]
install_subdir = "gstreamer-1.0"
versioning = false
[package.metadata.capi.pkg_config]
requires_private = "gstreamer-1.0, gstreamer-base-1.0, gobject-2.0, glib-2.0, gmodule-2.0"

View file

@ -0,0 +1 @@
LICENSE-MPL-2.0

18
net/hlssink3/README.md Normal file
View file

@ -0,0 +1,18 @@
# GStreamer HTTP Live Streaming Plugin
A GStreamer HLS sink plugin. Based on the
["hlssink2"](https://gstreamer.freedesktop.org/documentation/hls/hlssink2.html?gi-language=c)
element.
The "hlssink3" is feature-equivalent to the "hlssink2" element. Any pipeline that uses "hlssink2" can use the
"hlssink3" element and the result should be the same.
The "hlssink3" element has a `playlist-type` property used to control the behavior of the HLS playlist file. The
available values for this property are:
- `null` (default): The tag `#EXT-X-PLAYLIST-TYPE` won't be present in the playlist during the pipeline processing. The
playlist will be updated in sync as new segments are available, old segments are removed, keeping N segments as
defined in the property `playlist-length`. This is the default behavior, and is compatible with how "hlssink2" works;
- `"event"`: The playlist is updated as new segments are available, and the tag `#EXT-X-PLAYLIST-TYPE:EVENT` is present
during processing. No segments will be removed from the playlist. At the end of the processing, the tag
`#EXT-X-ENDLIST` is added to the playlist;
- `"vod"`: The playlist behaves like the `event` option (a live event), but at the end of the processing, the playlist
will be set to `#EXT-X-PLAYLIST-TYPE:VOD`.

12
net/hlssink3/build.rs Normal file
View file

@ -0,0 +1,12 @@
//
// Copyright (C) 2021 Rafael Caricio <rafael@caricio.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
fn main() {
gst_plugin_version_helper::info()
}

841
net/hlssink3/src/imp.rs Normal file
View file

@ -0,0 +1,841 @@
//
// Copyright (C) 2021 Rafael Caricio <rafael@caricio.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use crate::playlist::{Playlist, SegmentFormatter};
use gio::prelude::*;
use glib::subclass::prelude::*;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::{gst_debug, gst_error, gst_info, gst_trace, gst_warning};
use m3u8_rs::playlist::MediaPlaylistType;
use once_cell::sync::Lazy;
use std::fs;
use std::io::Write;
use std::path;
use std::sync::{Arc, Mutex};
const DEFAULT_LOCATION: &str = "segment%05d.ts";
const DEFAULT_PLAYLIST_LOCATION: &str = "playlist.m3u8";
const DEFAULT_MAX_NUM_SEGMENT_FILES: u32 = 10;
const DEFAULT_TARGET_DURATION: u32 = 15;
const DEFAULT_PLAYLIST_LENGTH: u32 = 5;
const DEFAULT_SEND_KEYFRAME_REQUESTS: bool = true;
const SIGNAL_GET_PLAYLIST_STREAM: &str = "get-playlist-stream";
const SIGNAL_GET_FRAGMENT_STREAM: &str = "get-fragment-stream";
const SIGNAL_DELETE_FRAGMENT: &str = "delete-fragment";
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new("hlssink3", gst::DebugColorFlags::empty(), Some("HLS sink"))
});
struct Settings {
location: String,
segment_formatter: SegmentFormatter,
playlist_location: String,
playlist_root: Option<String>,
playlist_length: u32,
playlist_type: Option<MediaPlaylistType>,
max_num_segment_files: usize,
target_duration: u32,
send_keyframe_requests: bool,
splitmuxsink: gst::Element,
giostreamsink: gst::Element,
video_sink: bool,
audio_sink: bool,
}
impl Default for Settings {
fn default() -> Self {
let splitmuxsink = gst::ElementFactory::make("splitmuxsink", Some("split_mux_sink"))
.expect("Could not make element splitmuxsink");
let giostreamsink = gst::ElementFactory::make("giostreamsink", Some("giostream_sink"))
.expect("Could not make element giostreamsink");
Self {
location: String::from(DEFAULT_LOCATION),
segment_formatter: SegmentFormatter::new(DEFAULT_LOCATION).unwrap(),
playlist_location: String::from(DEFAULT_PLAYLIST_LOCATION),
playlist_root: None,
playlist_length: DEFAULT_PLAYLIST_LENGTH,
playlist_type: None,
max_num_segment_files: DEFAULT_MAX_NUM_SEGMENT_FILES as usize,
target_duration: DEFAULT_TARGET_DURATION,
send_keyframe_requests: DEFAULT_SEND_KEYFRAME_REQUESTS,
splitmuxsink,
giostreamsink,
video_sink: false,
audio_sink: false,
}
}
}
pub(crate) struct StartedState {
playlist: Playlist,
fragment_opened_at: Option<gst::ClockTime>,
current_segment_location: Option<String>,
old_segment_locations: Vec<String>,
}
impl StartedState {
fn new(target_duration: f32, playlist_type: Option<MediaPlaylistType>) -> Self {
Self {
playlist: Playlist::new(target_duration, playlist_type),
current_segment_location: None,
fragment_opened_at: None,
old_segment_locations: Vec::new(),
}
}
fn fragment_duration_since(&self, fragment_closed: gst::ClockTime) -> f32 {
assert!(self.fragment_opened_at.is_some());
let fragment_opened_at = &self.fragment_opened_at.unwrap();
let segment_duration = fragment_closed - fragment_opened_at;
segment_duration.mseconds() as f32 / 1_000f32
}
}
enum State {
Stopped,
Started(StartedState),
}
impl Default for State {
fn default() -> Self {
Self::Stopped
}
}
#[derive(Default, Clone)]
pub struct HlsSink3 {
settings: Arc<Mutex<Settings>>,
state: Arc<Mutex<State>>,
}
impl HlsSink3 {
fn start(&self, element: &super::HlsSink3) {
gst_info!(CAT, obj: element, "Starting");
let (target_duration, playlist_type) = {
let settings = self.settings.lock().unwrap();
(
settings.target_duration as f32,
settings.playlist_type.clone(),
)
};
let mut state = self.state.lock().unwrap();
if let State::Stopped = *state {
*state = State::Started(StartedState::new(target_duration, playlist_type));
}
}
fn on_format_location(
&self,
element: &super::HlsSink3,
fragment_id: u32,
) -> Result<String, String> {
gst_info!(
CAT,
"Starting the formatting of the fragment-id: {}",
fragment_id
);
// TODO: Create method in state to simplify this boilerplate: `let state = self.state.started()?`
let mut state_guard = self.state.lock().unwrap();
let state = match &mut *state_guard {
State::Stopped => return Err("Not in Started state".to_string()),
State::Started(s) => s,
};
let settings = self.settings.lock().unwrap();
let segment_file_location = settings.segment_formatter.segment(fragment_id);
gst_trace!(CAT, "Segment location formatted: {}", segment_file_location);
state.current_segment_location = Some(segment_file_location.clone());
let fragment_stream = element
.emit_by_name(SIGNAL_GET_FRAGMENT_STREAM, &[&segment_file_location])
.expect("Error while getting fragment stream")
.get::<gio::OutputStream>()
.map_err(|err| err.to_string())?;
settings
.giostreamsink
.set_property("stream", &fragment_stream);
gst_info!(
CAT,
"New segment location: {:?}",
state.current_segment_location.as_ref()
);
Ok(segment_file_location)
}
fn new_file_stream<P>(
&self,
element: &super::HlsSink3,
location: &P,
) -> Result<gio::OutputStream, String>
where
P: AsRef<path::Path>,
{
let file = fs::File::create(location).map_err(move |err| {
let error_msg = gst::error_msg!(
gst::ResourceError::OpenWrite,
[
"Could not open file {} for writing: {}",
location.as_ref().to_str().unwrap(),
err.to_string(),
]
);
element.post_error_message(error_msg);
err.to_string()
})?;
Ok(gio::WriteOutputStream::new(file).upcast())
}
fn delete_fragment<P>(&self, location: &P)
where
P: AsRef<path::Path>,
{
let _ = fs::remove_file(location).map_err(|err| {
gst_warning!(CAT, "Could not delete segment file: {}", err.to_string());
});
}
fn write_playlist(
&self,
element: &super::HlsSink3,
fragment_closed_at: Option<gst::ClockTime>,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_info!(CAT, obj: element, "Preparing to write new playlist");
let mut state_guard = self.state.lock().unwrap();
let state = match &mut *state_guard {
State::Stopped => return Err(gst::StateChangeError),
State::Started(s) => s,
};
gst_info!(CAT, "COUNT {}", state.playlist.len());
// Only add fragment if it's complete.
if let Some(fragment_closed) = fragment_closed_at {
let segment_filename = self.segment_filename(state);
state.playlist.add_segment(
segment_filename.clone(),
state.fragment_duration_since(fragment_closed),
);
state.old_segment_locations.push(segment_filename);
}
let (playlist_location, max_num_segments, max_playlist_length) = {
let settings = self.settings.lock().unwrap();
(
settings.playlist_location.clone(),
settings.max_num_segment_files,
settings.playlist_length as usize,
)
};
state.playlist.update_playlist_state(max_playlist_length);
// Acquires the playlist file handle so we can update it with new content. By default, this
// is expected to be the same file every time.
let mut playlist_stream = element
.emit_by_name(SIGNAL_GET_PLAYLIST_STREAM, &[&playlist_location])
.expect("Error while getting playlist stream")
.get::<gio::OutputStream>()
.map_err(|err| {
gst_error!(
CAT,
"Could not get stream to write playlist content: {}",
err.to_string()
);
gst::StateChangeError
})?
.into_write();
state
.playlist
.write_to(&mut playlist_stream)
.map_err(|err| {
gst_error!(CAT, "Could not write new playlist: {}", err.to_string());
gst::StateChangeError
})?;
playlist_stream.flush().map_err(|err| {
gst_error!(CAT, "Could not flush playlist: {}", err.to_string());
gst::StateChangeError
})?;
if state.playlist.is_type_undefined() {
// Cleanup old segments from filesystem
if state.old_segment_locations.len() > max_num_segments {
for _ in 0..state.old_segment_locations.len() - max_num_segments {
let old_segment_location = state.old_segment_locations.remove(0);
let _ = element
.emit_by_name(SIGNAL_DELETE_FRAGMENT, &[&old_segment_location])
.expect("Error while processing signal handler");
}
}
}
gst_debug!(CAT, obj: element, "Wrote new playlist file!");
Ok(gst::StateChangeSuccess::Success)
}
fn segment_filename(&self, state: &mut StartedState) -> String {
assert!(state.current_segment_location.is_some());
let segment_filename = state.current_segment_location.take().unwrap();
let settings = self.settings.lock().unwrap();
if let Some(playlist_root) = &settings.playlist_root {
format!(
"{}{}{}",
playlist_root,
std::path::MAIN_SEPARATOR,
segment_filename
)
} else {
segment_filename
}
}
fn write_final_playlist(
&self,
element: &super::HlsSink3,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_debug!(CAT, obj: element, "Preparing to write final playlist");
self.write_playlist(element, None)
}
fn stop(&self, element: &super::HlsSink3) {
gst_debug!(CAT, obj: element, "Stopping");
let mut state = self.state.lock().unwrap();
if let State::Started(_) = *state {
*state = State::Stopped;
}
gst_debug!(CAT, obj: element, "Stopped");
}
}
#[glib::object_subclass]
impl ObjectSubclass for HlsSink3 {
const NAME: &'static str = "GstHlsSink3";
type Type = super::HlsSink3;
type ParentType = gst::Bin;
}
impl BinImpl for HlsSink3 {
#[allow(clippy::single_match)]
fn handle_message(&self, element: &Self::Type, msg: gst::Message) {
use gst::MessageView;
match msg.view() {
MessageView::Element(ref msg) => {
let event_is_from_splitmuxsink = {
let settings = self.settings.lock().unwrap();
msg.src().as_ref() == Some(settings.splitmuxsink.upcast_ref())
};
if !event_is_from_splitmuxsink {
return;
}
let s = msg.structure().unwrap();
match s.name() {
"splitmuxsink-fragment-opened" => {
if let Ok(new_fragment_opened_at) = s.get::<gst::ClockTime>("running-time")
{
let mut state = self.state.lock().unwrap();
match &mut *state {
State::Stopped => {}
State::Started(state) => {
state.fragment_opened_at = Some(new_fragment_opened_at)
}
};
}
}
"splitmuxsink-fragment-closed" => {
let s = msg.structure().unwrap();
if let Ok(fragment_closed_at) = s.get::<gst::ClockTime>("running-time") {
self.write_playlist(element, Some(fragment_closed_at))
.unwrap();
}
}
_ => {}
}
}
_ => self.parent_handle_message(element, msg),
}
}
}
impl ObjectImpl for HlsSink3 {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpec::new_string(
"location",
"File Location",
"Location of the file to write",
Some(DEFAULT_LOCATION),
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_string(
"playlist-location",
"Playlist Location",
"Location of the playlist to write.",
Some(DEFAULT_PLAYLIST_LOCATION),
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_string(
"playlist-root",
"Playlist Root",
"Location of the playlist to write.",
None,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_uint(
"max-files",
"Max files",
"Maximum number of files to keep on disk. Once the maximum is reached, old files start to be deleted to make room for new ones.",
0,
u32::MAX,
DEFAULT_MAX_NUM_SEGMENT_FILES,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_uint(
"target-duration",
"Target duration",
"The target duration in seconds of a segment/file. (0 - disabled, useful for management of segment duration by the streaming server)",
0,
u32::MAX,
DEFAULT_TARGET_DURATION,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_uint(
"playlist-length",
"Playlist length",
"Length of HLS playlist. To allow players to conform to section 6.3.3 of the HLS specification, this should be at least 3. If set to 0, the playlist will be infinite.",
0,
u32::MAX,
DEFAULT_PLAYLIST_LENGTH,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_string(
"playlist-type",
"Playlist Type",
"The type of the playlist to use. When VOD type is set, the playlist will be live until the pipeline ends execution.",
None,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_boolean(
"send-keyframe-requests",
"Send Keyframe Requests",
"Send keyframe requests to ensure correct fragmentation. If this is disabled then the input must have keyframes in regular intervals.",
DEFAULT_SEND_KEYFRAME_REQUESTS,
glib::ParamFlags::READWRITE,
),
]
});
PROPERTIES.as_ref()
}
fn set_property(
&self,
_obj: &Self::Type,
_id: usize,
value: &glib::Value,
pspec: &glib::ParamSpec,
) {
let mut settings = self.settings.lock().unwrap();
match pspec.name() {
"location" => {
settings.location = value
.get::<Option<String>>()
.expect("type checked upstream")
.unwrap_or_else(|| DEFAULT_LOCATION.into());
settings.segment_formatter = SegmentFormatter::new(&settings.location).unwrap();
settings
.splitmuxsink
.set_property("location", &settings.location);
}
"playlist-location" => {
settings.playlist_location = value
.get::<Option<String>>()
.expect("type checked upstream")
.unwrap_or_else(|| String::from(DEFAULT_PLAYLIST_LOCATION));
}
"playlist-root" => {
settings.playlist_root = value
.get::<Option<String>>()
.expect("type checked upstream");
}
"max-files" => {
let max_files: u32 = value.get().expect("type checked upstream");
settings.max_num_segment_files = max_files as usize;
}
"target-duration" => {
settings.target_duration = value.get().expect("type checked upstream");
settings.splitmuxsink.set_property(
"max-size-time",
&(gst::ClockTime::from_seconds(settings.target_duration as u64)),
);
}
"playlist-length" => {
settings.playlist_length = value.get().expect("type checked upstream");
}
"playlist-type" => {
settings.playlist_type = value
.get::<Option<String>>()
.expect("type checked upstream")
.map(|chosen_type| {
if chosen_type.to_lowercase() == "vod" {
MediaPlaylistType::Vod
} else {
MediaPlaylistType::Event
}
})
}
"send-keyframe-requests" => {
settings.send_keyframe_requests = value.get().expect("type checked upstream");
settings
.splitmuxsink
.set_property("send-keyframe-requests", &settings.send_keyframe_requests);
}
_ => unimplemented!(),
};
}
fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"location" => settings.location.to_value(),
"playlist-location" => settings.playlist_location.to_value(),
"playlist-root" => settings.playlist_root.to_value(),
"max-files" => {
let max_files = settings.max_num_segment_files as u32;
max_files.to_value()
}
"target-duration" => settings.target_duration.to_value(),
"playlist-length" => settings.playlist_length.to_value(),
"playlist-type" => settings
.playlist_type
.as_ref()
.map(|ty| ty.to_string())
.to_value(),
"send-keyframe-requests" => settings.send_keyframe_requests.to_value(),
_ => unimplemented!(),
}
}
fn signals() -> &'static [glib::subclass::Signal] {
static SIGNALS: Lazy<Vec<glib::subclass::Signal>> = Lazy::new(|| {
vec![
glib::subclass::Signal::builder(
SIGNAL_GET_PLAYLIST_STREAM,
&[String::static_type().into()],
gio::OutputStream::static_type().into(),
)
.class_handler(|_, args| {
let element = args[0]
.get::<super::HlsSink3>()
.expect("playlist-stream signal arg");
let playlist_location =
args[1].get::<String>().expect("playlist-stream signal arg");
let hlssink3 = HlsSink3::from_instance(&element);
Some(
hlssink3
.new_file_stream(&element, &playlist_location)
.ok()?
.to_value(),
)
})
.accumulator(|_hint, ret, value| {
// First signal handler wins
*ret = value.clone();
false
})
.build(),
glib::subclass::Signal::builder(
SIGNAL_GET_FRAGMENT_STREAM,
&[String::static_type().into()],
gio::OutputStream::static_type().into(),
)
.class_handler(|_, args| {
let element = args[0]
.get::<super::HlsSink3>()
.expect("fragment-stream signal arg");
let fragment_location =
args[1].get::<String>().expect("fragment-stream signal arg");
let hlssink3 = HlsSink3::from_instance(&element);
Some(
hlssink3
.new_file_stream(&element, &fragment_location)
.ok()?
.to_value(),
)
})
.accumulator(|_hint, ret, value| {
// First signal handler wins
*ret = value.clone();
false
})
.build(),
glib::subclass::Signal::builder(
SIGNAL_DELETE_FRAGMENT,
&[String::static_type().into()],
glib::types::Type::BOOL.into(),
)
.class_handler(|_, args| {
let element = args[0].get::<super::HlsSink3>().expect("signal arg");
let fragment_location = args[1].get::<String>().expect("signal arg");
let hlssink3 = HlsSink3::from_instance(&element);
hlssink3.delete_fragment(&fragment_location);
Some(true.to_value())
})
.accumulator(|_hint, ret, value| {
// First signal handler wins
*ret = value.clone();
false
})
.build(),
]
});
SIGNALS.as_ref()
}
fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
obj.set_element_flags(gst::ElementFlags::SINK);
obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE);
let settings = self.settings.lock().unwrap();
let mux = gst::ElementFactory::make("mpegtsmux", Some("mpeg-ts_mux"))
.expect("Could not make element mpegtsmux");
let location: Option<String> = None;
settings.splitmuxsink.set_properties(&[
("location", &location),
(
"max-size-time",
&(gst::ClockTime::from_seconds(settings.target_duration as u64)),
),
("send-keyframe-requests", &settings.send_keyframe_requests),
("muxer", &mux),
("sink", &settings.giostreamsink),
("reset-muxer", &false),
]);
obj.add(&settings.splitmuxsink).unwrap();
settings.splitmuxsink.connect("format-location", false, {
let element_weak = obj.downgrade();
move |args| {
let fragment_id = args[1].get::<u32>().unwrap();
gst_info!(CAT, "Got fragment-id: {}", fragment_id);
let element: super::HlsSink3 = element_weak.upgrade()? as _;
let hlssink3 = HlsSink3::from_instance(&element);
match hlssink3.on_format_location(&element, fragment_id) {
Ok(segment_location) => Some(segment_location.to_value()),
Err(err) => {
gst_error!(CAT, "on format-location handler: {}", err);
Some("unknown_segment".to_value())
}
}
}
});
}
}
impl GstObjectImpl for HlsSink3 {}
impl ElementImpl for HlsSink3 {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"HTTP Live Streaming sink",
"Sink/Muxer",
"HTTP Live Streaming sink",
"Alessandro Decina <alessandro.d@gmail.com>, \
Sebastian Dröge <sebastian@centricular.com>, \
Rafael Caricio <rafael@caricio.com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let caps = gst::Caps::new_any();
let video_pad_template = gst::PadTemplate::new(
"video",
gst::PadDirection::Sink,
gst::PadPresence::Request,
&caps,
)
.unwrap();
let caps = gst::Caps::new_any();
let audio_pad_template = gst::PadTemplate::new(
"audio",
gst::PadDirection::Sink,
gst::PadPresence::Request,
&caps,
)
.unwrap();
vec![video_pad_template, audio_pad_template]
});
PAD_TEMPLATES.as_ref()
}
fn change_state(
&self,
element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
if let gst::StateChange::NullToReady = transition {
self.start(element);
}
let ret = self.parent_change_state(element, transition)?;
match transition {
gst::StateChange::PausedToReady => {
let write_final = {
let mut state = self.state.lock().unwrap();
match &mut *state {
State::Stopped => false,
State::Started(state) => {
if state.playlist.is_rendering() {
state.playlist.stop();
true
} else {
false
}
}
}
};
if write_final {
self.write_final_playlist(element)?;
}
}
gst::StateChange::ReadyToNull => {
self.stop(element);
}
_ => (),
}
Ok(ret)
}
fn request_new_pad(
&self,
element: &Self::Type,
templ: &gst::PadTemplate,
_name: Option<String>,
_caps: Option<&gst::Caps>,
) -> Option<gst::Pad> {
let mut settings = self.settings.lock().unwrap();
match templ.name_template().as_ref().map(|val| val.as_str()) {
Some("audio") => {
if settings.audio_sink {
gst_debug!(
CAT,
obj: element,
"requested_new_pad: audio pad is already set"
);
return None;
}
let peer_pad = settings.splitmuxsink.request_pad_simple("audio_0").unwrap();
let sink_pad =
gst::GhostPad::from_template_with_target(templ, Some("audio"), &peer_pad)
.unwrap();
element.add_pad(&sink_pad).unwrap();
sink_pad.set_active(true).unwrap();
settings.audio_sink = true;
Some(sink_pad.upcast())
}
Some("video") => {
if settings.video_sink {
gst_debug!(
CAT,
obj: element,
"requested_new_pad: video pad is already set"
);
return None;
}
let peer_pad = settings.splitmuxsink.request_pad_simple("video").unwrap();
let sink_pad =
gst::GhostPad::from_template_with_target(templ, Some("video"), &peer_pad)
.unwrap();
element.add_pad(&sink_pad).unwrap();
sink_pad.set_active(true).unwrap();
settings.video_sink = true;
Some(sink_pad.upcast())
}
None => {
gst_debug!(CAT, obj: element, "template name returned `None`",);
None
}
Some(other_name) => {
gst_debug!(
CAT,
obj: element,
"requested_new_pad: name \"{}\" is not audio or video",
other_name
);
None
}
}
}
fn release_pad(&self, element: &Self::Type, pad: &gst::Pad) {
let mut settings = self.settings.lock().unwrap();
if !settings.audio_sink && !settings.video_sink {
return;
}
let ghost_pad = pad.downcast_ref::<gst::GhostPad>().unwrap();
if let Some(peer) = ghost_pad.target() {
settings.splitmuxsink.release_request_pad(&peer);
}
pad.set_active(false).unwrap();
element.remove_pad(pad).unwrap();
if "audio" == ghost_pad.name() {
settings.audio_sink = false;
} else {
settings.video_sink = false;
}
}
}

43
net/hlssink3/src/lib.rs Normal file
View file

@ -0,0 +1,43 @@
//
// Copyright (C) 2021 Rafael Caricio <rafael@caricio.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use glib::prelude::*;
mod imp;
mod playlist;
glib::wrapper! {
pub struct HlsSink3(ObjectSubclass<imp::HlsSink3>) @extends gst::Bin, gst::Element, gst::Object;
}
unsafe impl Send for HlsSink3 {}
unsafe impl Sync for HlsSink3 {}
pub fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"hlssink3",
gst::Rank::None,
HlsSink3::static_type(),
)?;
Ok(())
}
gst::plugin_define!(
hlssink3,
env!("CARGO_PKG_DESCRIPTION"),
plugin_init,
concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),
"MIT/X11",
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_REPOSITORY"),
env!("BUILD_REL_DATE")
);

View file

@ -0,0 +1,265 @@
//
// Copyright (C) 2021 Rafael Caricio <rafael@caricio.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use m3u8_rs::playlist::{MediaPlaylist, MediaPlaylistType, MediaSegment};
use once_cell::sync::Lazy;
use regex::Regex;
use std::io::Write;
const GST_M3U8_PLAYLIST_VERSION: usize = 3;
static SEGMENT_IDX_PATTERN: Lazy<regex::Regex> = Lazy::new(|| Regex::new(r"(%0(\d+)d)").unwrap());
/// An HLS playlist.
///
/// Controls the changes that needs to happen in the playlist as new segments are added. This
/// includes maintaining the number of segments present in the playlist and setting the final
/// state of the playlist.
#[derive(Debug, Clone)]
pub struct Playlist {
inner: MediaPlaylist,
playlist_index: i32,
status: PlaylistRenderState,
turn_vod: bool,
}
impl Playlist {
pub fn new(target_duration: f32, playlist_type: Option<MediaPlaylistType>) -> Self {
let mut turn_vod = false;
let playlist_type = if playlist_type == Some(MediaPlaylistType::Vod) {
turn_vod = true;
Some(MediaPlaylistType::Event)
} else {
playlist_type
};
Self {
inner: MediaPlaylist {
version: GST_M3U8_PLAYLIST_VERSION,
target_duration,
media_sequence: 0,
segments: vec![],
discontinuity_sequence: 0,
end_list: false,
playlist_type,
i_frames_only: false,
start: None,
independent_segments: false,
},
playlist_index: 0,
status: PlaylistRenderState::Init,
turn_vod,
}
}
/// Adds a new segment to the playlist.
pub fn add_segment(&mut self, uri: String, duration: f32) {
self.inner.segments.push(MediaSegment {
uri,
duration,
title: None,
byte_range: None,
discontinuity: false,
key: None,
map: None,
program_date_time: None,
daterange: None,
unknown_tags: vec![],
});
}
/// Updates the playlist based on current state.
///
/// The playlist will be updated based on it's type. The playlist status is set to started.
/// When a playlist type is defined, the number of segments is updated to match the max
/// playlist length value. The playlist index and current media sequence is also kept up
/// to date.
pub fn update_playlist_state(&mut self, max_playlist_length: usize) {
self.start();
if !self.is_type_undefined() {
return;
}
// Remove oldest segments if playlist is at maximum expected capacity
if self.inner.segments.len() > max_playlist_length {
for _ in 0..self.inner.segments.len() - max_playlist_length {
let _ = self.inner.segments.remove(0);
}
}
self.playlist_index += 1;
self.inner.media_sequence = self.playlist_index as i32 - self.inner.segments.len() as i32;
}
/// Sets the playlist to started state.
fn start(&mut self) {
self.status = PlaylistRenderState::Started;
self.inner.end_list = false;
}
/// Sets the playlist to stopped state.
pub fn stop(&mut self) {
match &self.inner.playlist_type {
None => self.inner.end_list = false,
Some(defined) => match defined {
MediaPlaylistType::Event => {
if self.turn_vod {
self.inner.playlist_type = Some(MediaPlaylistType::Vod);
self.inner.end_list = false
} else {
self.inner.end_list = true
}
}
MediaPlaylistType::Vod => self.inner.end_list = false,
},
}
}
/// Returns true if the playlist type is not specified.
pub fn is_type_undefined(&self) -> bool {
self.inner.playlist_type.is_none()
}
/// Returns true if the playlist internal status started.
pub fn is_rendering(&self) -> bool {
self.status == PlaylistRenderState::Started
}
/// Returns the number of segments in the playlist.
pub fn len(&self) -> usize {
self.inner.segments.len()
}
/// Writes the playlist in textual format to the provided `Write` reference.
pub fn write_to<T: Write>(&self, w: &mut T) -> std::io::Result<()> {
self.inner.write_to(w)
}
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum PlaylistRenderState {
Init,
Started,
}
/// A formatter for segment locations.
///
/// The formatting is based on a string that must contain the placeholder `%0Xd` where `X` is a
/// the number of zero prefixes you want to have in the segment name. The placeholder is only
/// replaced once in the string, other placements are not going to be processed.
///
/// # Examples
///
/// In this example we want to have segment files with the following names:
/// ```text
/// part001.ts
/// part002.ts
/// part003.ts
/// part004.ts
/// ```
/// Then we can use the segment pattern value as `"part%03.ts"`:
///
/// ```rust,ignore
/// let formatter = SegmentFormatter::new("part%03.ts").unwrap();
/// assert_eq!(formatter.segment(1), "part001.ts");
/// assert_eq!(formatter.segment(2), "part002.ts");
/// assert_eq!(formatter.segment(3), "part003.ts");
/// assert_eq!(formatter.segment(4), "part004.ts");
/// ```
pub struct SegmentFormatter {
prefix: String,
suffix: String,
padding_len: u32,
}
impl SegmentFormatter {
/// Processes the segment name containing a placeholder. It can be used
/// repeatedly to format segment names.
///
/// If an invalid placeholder is provided, then `None` is returned.
pub fn new<S: AsRef<str>>(segment_pattern: S) -> Option<Self> {
let segment_pattern = segment_pattern.as_ref();
let caps = SEGMENT_IDX_PATTERN.captures(segment_pattern)?;
let number_placement_match = caps.get(1)?;
let zero_pad_match = caps.get(2)?;
let padding_len = zero_pad_match
.as_str()
.parse::<u32>()
.expect("valid number matched by regex");
let prefix = segment_pattern[..number_placement_match.start()].to_string();
let suffix = segment_pattern[number_placement_match.end()..].to_string();
Some(Self {
prefix,
suffix,
padding_len,
})
}
/// Returns the segment location formatted for the provided id.
#[inline]
pub fn segment(&self, id: u32) -> String {
let padded_number = left_pad_zeroes(self.padding_len, id);
format!("{}{}{}", self.prefix, padded_number, self.suffix)
}
}
/// Transforms a number to a zero padded string representation.
///
/// The zero padding is added to the left of the number which is converted to a string. For the
/// case that the length number converted to string is larger than the requested padding, the
/// number representation is returned and no padding is added. The length of the returned string is
/// the maximum value between the desired padding and the length of the number.
///
/// # Examples
///
/// ```rust,ignore
/// let padded_number = left_pad_zeroes(4, 10);
/// assert_eq!(padded_number, "0010");
/// ```
#[inline]
pub(crate) fn left_pad_zeroes(padding: u32, number: u32) -> String {
let numerical_repr = number.to_string();
let mut padded = String::with_capacity(padding.max(numerical_repr.len() as u32) as usize);
let pad_zeroes = padding as i32 - numerical_repr.len() as i32;
if pad_zeroes > 0 {
for _ in 0..pad_zeroes {
padded.push('0');
}
}
padded.push_str(&numerical_repr);
padded
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn segment_is_correctly_formatted() {
let formatter = SegmentFormatter::new("segment%05d.ts").unwrap();
assert_eq!("segment00001.ts", formatter.segment(1));
assert_eq!("segment00016.ts", formatter.segment(16));
assert_eq!("segment01827.ts", formatter.segment(1827));
assert_eq!("segment98765.ts", formatter.segment(98765));
let formatter = SegmentFormatter::new("part-%03d.ts").unwrap();
assert_eq!("part-010.ts", formatter.segment(10));
assert_eq!("part-9999.ts", formatter.segment(9999));
}
#[test]
fn padding_numbers() {
assert_eq!("001", left_pad_zeroes(3, 1));
assert_eq!("010", left_pad_zeroes(3, 10));
assert_eq!("100", left_pad_zeroes(3, 100));
assert_eq!("1000", left_pad_zeroes(3, 1000));
assert_eq!("987654321", left_pad_zeroes(3, 987654321));
}
}

View file

@ -0,0 +1,247 @@
//
// Copyright (C) 2021 Rafael Caricio <rafael@caricio.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use gio::prelude::*;
use gst::gst_info;
use gst::prelude::*;
use once_cell::sync::Lazy;
use std::sync::mpsc;
use std::time::Duration;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"hlssink3-test",
gst::DebugColorFlags::empty(),
Some("Flex HLS sink test"),
)
});
macro_rules! try_or_pause {
($l:expr) => {
match $l {
Ok(v) => v,
Err(err) => {
eprintln!("Skipping Test: {:?}", err);
return Ok(());
}
}
};
}
macro_rules! try_create_element {
($l:expr, $n:expr) => {
match gst::ElementFactory::find($l) {
Some(factory) => factory.create(Some($n)).unwrap(),
None => {
eprintln!("Could not find {} ({}) plugin, skipping test", $l, $n);
return Ok(());
}
}
};
($l:expr) => {{
let alias: String = format!("test_{}", $l);
try_create_element!($l, <std::string::String as AsRef<str>>::as_ref(&alias))
}};
}
fn init() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
gst::init().unwrap();
gsthlssink3::plugin_register_static().expect("hlssink3 test");
});
}
#[derive(Debug, Clone, Eq, PartialEq)]
enum HlsSinkEvent {
GetPlaylistStream(String),
GetFragmentStream(String),
DeleteFragment(String),
}
#[test]
fn test_hlssink3_element_with_video_content() -> Result<(), ()> {
init();
const BUFFER_NB: i32 = 250;
let pipeline = gst::Pipeline::new(Some("video_pipeline"));
let video_src = try_create_element!("videotestsrc");
video_src.set_property("is-live", &true);
video_src.set_property("num-buffers", &BUFFER_NB);
let x264enc = try_create_element!("x264enc");
let h264parse = try_create_element!("h264parse");
let hlssink3 = gst::ElementFactory::make("hlssink3", Some("test_hlssink3"))
.expect("Must be able to instantiate hlssink3");
hlssink3.set_property("target-duration", &2u32);
hlssink3.set_property("playlist-length", &2u32);
hlssink3.set_property("max-files", &2u32);
let (hls_events_sender, hls_events_receiver) = mpsc::sync_channel(20);
hlssink3.connect("get-playlist-stream", false, {
let hls_events_sender = hls_events_sender.clone();
move |args| {
let location = args[1].get::<String>().expect("No location given");
hls_events_sender
.try_send(HlsSinkEvent::GetPlaylistStream(location))
.expect("Send playlist event");
let stream = gio::MemoryOutputStream::new_resizable();
Some(stream.to_value())
}
});
hlssink3.connect("get-fragment-stream", false, {
let hls_events_sender = hls_events_sender.clone();
move |args| {
let location = args[1].get::<String>().expect("No location given");
hls_events_sender
.try_send(HlsSinkEvent::GetFragmentStream(location))
.expect("Send fragment event");
let stream = gio::MemoryOutputStream::new_resizable();
Some(stream.to_value())
}
});
hlssink3.connect("delete-fragment", false, move |args| {
let location = args[1].get::<String>().expect("No location given");
hls_events_sender
.try_send(HlsSinkEvent::DeleteFragment(location))
.expect("Send delete fragment event");
Some(true.to_value())
});
try_or_pause!(pipeline.add_many(&[&video_src, &x264enc, &h264parse, &hlssink3,]));
try_or_pause!(gst::Element::link_many(&[
&video_src, &x264enc, &h264parse, &hlssink3
]));
pipeline.set_state(gst::State::Playing).unwrap();
gst_info!(
CAT,
"hlssink3_video_pipeline: waiting for {} buffers",
BUFFER_NB
);
let mut eos = false;
let bus = pipeline.bus().unwrap();
while let Some(msg) = bus.timed_pop(gst::ClockTime::NONE) {
use gst::MessageView;
match msg.view() {
MessageView::Eos(..) => {
eos = true;
break;
}
MessageView::Error(..) => unreachable!(),
_ => (),
}
}
pipeline.set_state(gst::State::Null).unwrap();
assert!(eos);
// Collect all events triggered during execution of the pipeline
let mut actual_events = Vec::new();
while let Ok(event) = hls_events_receiver.recv_timeout(Duration::from_millis(1)) {
actual_events.push(event);
}
let expected_ordering_of_events = {
use self::HlsSinkEvent::*;
vec![
GetFragmentStream("segment00000.ts".to_string()),
GetPlaylistStream("playlist.m3u8".to_string()),
GetFragmentStream("segment00001.ts".to_string()),
GetPlaylistStream("playlist.m3u8".to_string()),
GetFragmentStream("segment00002.ts".to_string()),
GetPlaylistStream("playlist.m3u8".to_string()),
DeleteFragment("segment00000.ts".to_string()),
GetFragmentStream("segment00003.ts".to_string()),
GetPlaylistStream("playlist.m3u8".to_string()),
DeleteFragment("segment00001.ts".into()),
GetFragmentStream("segment00004.ts".to_string()),
GetPlaylistStream("playlist.m3u8".to_string()),
DeleteFragment("segment00002.ts".to_string()),
GetPlaylistStream("playlist.m3u8".to_string()),
]
};
assert_eq!(expected_ordering_of_events, actual_events);
Ok(())
}
#[test]
fn test_hlssink3_element_with_audio_content() -> Result<(), ()> {
init();
const BUFFER_NB: i32 = 100;
let pipeline = gst::Pipeline::new(Some("audio_pipeline"));
let audio_src = try_create_element!("audiotestsrc");
audio_src.set_property("is-live", &true);
audio_src.set_property("num-buffers", &BUFFER_NB);
let hls_avenc_aac = try_or_pause!(gst::ElementFactory::make(
"avenc_aac",
Some("hls_avenc_aac")
));
let hlssink3 = gst::ElementFactory::make("hlssink3", Some("hlssink3"))
.expect("Must be able to instantiate hlssink3");
hlssink3.set_property("target-duration", &6u32);
hlssink3.connect("get-playlist-stream", false, move |_args| {
let stream = gio::MemoryOutputStream::new_resizable();
Some(stream.to_value())
});
hlssink3.connect("get-fragment-stream", false, move |_args| {
let stream = gio::MemoryOutputStream::new_resizable();
Some(stream.to_value())
});
hlssink3.connect("delete-fragment", false, move |_| Some(true.to_value()));
try_or_pause!(pipeline.add_many(&[&audio_src, &hls_avenc_aac, &hlssink3,]));
try_or_pause!(gst::Element::link_many(&[
&audio_src,
&hls_avenc_aac,
&hlssink3
]));
pipeline.set_state(gst::State::Playing).unwrap();
gst_info!(CAT, "audio_pipeline: waiting for {} buffers", BUFFER_NB);
let mut eos = false;
let bus = pipeline.bus().unwrap();
while let Some(msg) = bus.timed_pop(gst::ClockTime::NONE) {
use gst::MessageView;
match msg.view() {
MessageView::Eos(..) => {
eos = true;
break;
}
MessageView::Error(..) => unreachable!(),
_ => (),
}
}
pipeline.set_state(gst::State::Null).unwrap();
assert!(eos);
Ok(())
}