aws: Add next-file support to putobjectsink

Add `next-file` support to `awss3putobjectsink` on similar lines to
the `next-file` support in `multifilesink`.
This commit is contained in:
Sanchayan Maity 2024-04-17 21:41:06 +05:30
parent 7573caa8e9
commit 108e782ed8
4 changed files with 507 additions and 27 deletions

15
Cargo.lock generated
View file

@ -2243,12 +2243,14 @@ dependencies = [
"gstreamer-audio",
"gstreamer-base",
"gstreamer-check",
"gstreamer-video",
"once_cell",
"percent-encoding",
"rand",
"serde",
"serde_derive",
"serde_json",
"sprintf 0.2.1",
"test-with",
"tokio",
"url",
@ -2481,7 +2483,7 @@ dependencies = [
"gstreamer-video",
"m3u8-rs",
"once_cell",
"sprintf",
"sprintf 0.1.4",
]
[[package]]
@ -5241,7 +5243,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80b776a1b2dc779f5ee0641f8ade0125bc1298dd41a9a0c16d8bd57b42d222b1"
dependencies = [
"bytes",
"heck 0.4.1",
"heck 0.5.0",
"itertools 0.12.1",
"log",
"multimap 0.10.0",
@ -6206,6 +6208,15 @@ version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c0cdea5a20a06e7c57f627094e7b1618e5665592cd88f2d45fa4014e348db58"
[[package]]
name = "sprintf"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2819cb5194dfe9e6d102f4519a9fb9dc7106d2879b71b4fd4d4677f1175bd39"
dependencies = [
"thiserror",
]
[[package]]
name = "static_assertions"
version = "1.1.0"

View file

@ -32,6 +32,8 @@ serde_derive = "1"
serde_json = "1"
url = "2"
once_cell.workspace = true
gst-video = { workspace = true, features = ["v1_22"] }
sprintf = "0.2"
[dev-dependencies]
chrono = { version = "0.4", features = [ "alloc" ] }

View file

@ -14,6 +14,39 @@ use gst::prelude::*;
mod multipartsink;
mod putobjectsink;
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)]
#[repr(u32)]
#[enum_type(name = "GstS3PutObjectSinkNextFile")]
pub(crate) enum NextFile {
#[enum_value(name = "NextBuffer: New file for each buffer.", nick = "next-buffer")]
Buffer,
#[enum_value(
name = "NextDiscont: New file after each discontinuity.",
nick = "next-discont"
)]
Discont,
#[enum_value(
name = "NextKeyFrame: New file at each key frame.",
nick = "next-key-frame"
)]
KeyFrame,
#[enum_value(
name = "NextKeyUnitEvent: New file after a force key unit event.",
nick = "next-key-unit-event"
)]
KeyUnitEvent,
#[enum_value(
name = "NextMaxSize: New file when the configured maximum file size would be exceeded with the next buffer or buffer list.",
nick = "next-max-size"
)]
MaxSize,
#[enum_value(
name = "NextMaxDuration: New file when the configured maximum duration would be exceeded with the next buffer or buffer list.",
nick = "next-max-duration"
)]
MaxDuration,
}
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)]
#[repr(u32)]
#[enum_type(name = "GstS3SinkOnError")]

View file

@ -21,6 +21,7 @@ use aws_sdk_s3::{
Client,
};
use super::NextFile;
use futures::future;
use once_cell::sync::Lazy;
use std::collections::HashMap;
@ -37,6 +38,11 @@ const DEFAULT_FLUSH_INTERVAL_BYTES: u64 = 0;
const DEFAULT_FLUSH_INTERVAL_TIME: gst::ClockTime = gst::ClockTime::from_nseconds(0);
const DEFAULT_FLUSH_ON_ERROR: bool = false;
const DEFAULT_FORCE_PATH_STYLE: bool = false;
const DEFAULT_NEXT_FILE: NextFile = NextFile::Buffer;
const DEFAULT_MIN_KEYFRAME_DISTANCE: gst::ClockTime = gst::ClockTime::from_seconds(10);
const DEFAULT_MAX_SIZE: u64 = 2 * 1024 * 1024 * 1024;
const DEFAULT_MAX_FILE_DURATION: gst::ClockTime = gst::ClockTime::MAX;
const DEFAULT_LOCATION: &str = "%05d";
// General setting for create / abort requests
const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15_000;
@ -47,6 +53,13 @@ struct Started {
start_pts: Option<gst::ClockTime>,
num_buffers: u64,
need_flush: bool,
index: u64,
next_segment: Option<gst::ClockTime>,
streamheaders: Option<Vec<u8>>,
streamheaders_size: u64,
force_key_unit_count: i32,
current_file_size: u64,
file_pts: Option<gst::ClockTime>,
}
impl Started {
@ -57,6 +70,13 @@ impl Started {
start_pts: gst::ClockTime::NONE,
num_buffers: 0,
need_flush: false,
index: 0,
next_segment: gst::ClockTime::NONE,
streamheaders: None,
streamheaders_size: 0,
force_key_unit_count: -1,
current_file_size: 0,
file_pts: gst::ClockTime::NONE,
}
}
}
@ -86,6 +106,11 @@ struct Settings {
flush_interval_bytes: u64,
flush_interval_time: Option<gst::ClockTime>,
flush_on_error: bool,
next_file: Option<NextFile>,
min_keyframe_distance: gst::ClockTime,
max_file_size: u64,
max_file_duration: gst::ClockTime,
file_name: String,
}
impl Settings {
@ -143,6 +168,11 @@ impl Default for Settings {
flush_interval_bytes: DEFAULT_FLUSH_INTERVAL_BYTES,
flush_interval_time: Some(DEFAULT_FLUSH_INTERVAL_TIME),
flush_on_error: DEFAULT_FLUSH_ON_ERROR,
next_file: None,
min_keyframe_distance: DEFAULT_MIN_KEYFRAME_DISTANCE,
max_file_size: DEFAULT_MAX_SIZE,
max_file_duration: DEFAULT_MAX_FILE_DURATION,
file_name: DEFAULT_LOCATION.to_string(),
}
}
}
@ -351,6 +381,231 @@ impl S3PutObjectSink {
)),
}
}
fn accumulate_buffer(
&self,
buffer: &gst::Buffer,
started_state: &mut Started,
) -> Result<gst::FlowSuccess, gst::FlowError> {
if started_state.start_pts.is_none() {
started_state.start_pts = buffer.pts();
}
started_state.num_buffers += 1;
started_state.need_flush = true;
gst::trace!(CAT, imp: self, "Rendering {:?}", buffer);
let map = buffer.map_readable().map_err(|_| {
gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]);
gst::FlowError::Error
})?;
started_state.current_file_size += map.size() as u64;
started_state.buffer.extend_from_slice(map.as_slice());
Ok(gst::FlowSuccess::Ok)
}
fn create_body_with_streamheaders(
&self,
next_file: Option<NextFile>,
streamheaders: &Option<Vec<u8>>,
buffer: &[u8],
) -> ByteStream {
match next_file {
Some(nf) => match nf {
NextFile::KeyFrame | NextFile::MaxSize | NextFile::MaxDuration => {
if let Some(headers) = streamheaders {
let with_sh = [&headers[..], buffer].concat();
ByteStream::from(with_sh)
} else {
ByteStream::from(buffer.to_vec())
}
}
_ => ByteStream::from(buffer.to_vec()),
},
None => unreachable!(),
}
}
fn create_put_object_request_with_next_file(
&self,
started_state: &mut Started,
) -> Result<Option<PutObjectFluentBuilder>, gst::FlowError> {
let url = self.url.lock().unwrap();
let settings = self.settings.lock().unwrap();
let filename = settings.file_name.clone();
if started_state.buffer.is_empty() {
return Ok(None);
}
let body = Some(self.create_body_with_streamheaders(
settings.next_file,
&started_state.streamheaders,
&started_state.buffer,
));
let bucket = Some(url.as_ref().unwrap().bucket.to_owned());
let object = url.as_ref().unwrap().object.to_owned();
let object_with_index = format!("{}{}", object, filename);
let key = match sprintf::sprintf!(&object_with_index, started_state.index) {
Ok(k) => Some(k),
Err(e) => {
gst::element_imp_error!(
self,
gst::CoreError::Failed,
["Failed to format file name: {}", e]
);
return Err(gst::FlowError::Error);
}
};
let metadata = settings.to_metadata(self);
let client = &started_state.client;
/* Equivalent to opening a new file */
started_state.index += 1;
started_state.buffer = Vec::new();
started_state.current_file_size = 0;
Ok(Some(
client
.put_object()
.set_body(body)
.set_bucket(bucket)
.set_key(key)
.set_metadata(metadata),
))
}
fn write_put_object_request_with_next_file(
&self,
started_state: &mut Started,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let req = self.create_put_object_request_with_next_file(started_state)?;
if let Some(put_object_req) = req {
let put_object_req_future = put_object_req.send();
match s3utils::wait(&self.canceller, put_object_req_future) {
Ok(_) => Ok(gst::FlowSuccess::Ok),
Err(err) => match err {
WaitError::Cancelled => Ok(gst::FlowSuccess::Ok),
WaitError::FutureError(e) => {
gst::element_imp_error!(self, gst::CoreError::Failed, ["{}", e]);
Err(gst::FlowError::Error)
}
},
}
} else {
Ok(gst::FlowSuccess::Ok)
}
}
fn write_buffer_with_next_file(
&self,
buffer: &gst::Buffer,
next_file: NextFile,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let settings = self.settings.lock().unwrap();
let min_keyframe_distance = settings.min_keyframe_distance;
let max_file_size = settings.max_file_size;
let max_file_duration = settings.max_file_duration;
drop(settings);
let mut state = self.state.lock().unwrap();
let started_state = match *state {
State::Started(ref mut started_state) => started_state,
State::Stopped => {
unreachable!("Element should be started");
}
};
let map = buffer.map_readable().map_err(|_| {
gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]);
gst::FlowError::Error
})?;
match next_file {
NextFile::Buffer => {
self.write_put_object_request_with_next_file(started_state)?;
self.accumulate_buffer(buffer, started_state)
}
NextFile::Discont => {
if buffer.flags().contains(gst::BufferFlags::DISCONT) {
self.write_put_object_request_with_next_file(started_state)?;
}
self.accumulate_buffer(buffer, started_state)
}
NextFile::KeyFrame => {
if started_state.next_segment == gst::ClockTime::NONE && buffer.pts().is_some() {
started_state.next_segment =
Some(buffer.pts().unwrap() + min_keyframe_distance);
}
if buffer.pts().is_some() {
let buffer_ts = buffer.pts().unwrap();
let delta_unit = buffer.flags().contains(gst::BufferFlags::DELTA_UNIT);
let next_segment = started_state
.next_segment
.expect("Next segment must be valid here");
if buffer_ts >= next_segment && !delta_unit {
started_state.next_segment = Some(next_segment + min_keyframe_distance);
self.write_put_object_request_with_next_file(started_state)?;
}
}
self.accumulate_buffer(buffer, started_state)
}
NextFile::KeyUnitEvent => {
/*
* We don't need to write stream headers here, they will
* be inserted in the stream by upstream elements if key
* unit events have all_headers=true set.
*/
self.accumulate_buffer(buffer, started_state)
}
NextFile::MaxSize => {
if started_state.current_file_size
+ started_state.streamheaders_size
+ map.size() as u64
> max_file_size
{
self.write_put_object_request_with_next_file(started_state)?;
}
self.accumulate_buffer(buffer, started_state)
}
NextFile::MaxDuration => {
let mut new_duration = gst::ClockTime::ZERO;
if buffer.pts().is_some() && started_state.file_pts.is_some() {
new_duration = buffer.pts().unwrap() - started_state.file_pts.unwrap();
if buffer.duration().is_some() {
new_duration += buffer.duration().unwrap();
}
}
started_state.file_pts = match buffer.pts() {
Some(pts) => Some(pts),
None => started_state.file_pts,
};
if new_duration > max_file_duration {
self.write_put_object_request_with_next_file(started_state)?;
}
self.accumulate_buffer(buffer, started_state)
}
}
}
}
#[glib::object_subclass]
@ -455,6 +710,31 @@ impl ObjectImpl for S3PutObjectSink {
.blurb("Force client to use path-style addressing for buckets")
.default_value(DEFAULT_FORCE_PATH_STYLE)
.build(),
glib::ParamSpecEnum::builder_with_default("next-file", DEFAULT_NEXT_FILE)
.nick("Next File")
.blurb("When to start new file")
.write_only()
.mutable_ready()
.build(),
glib::ParamSpecUInt64::builder("min-keyframe-distance")
.nick("Minimum keyframe distance")
.blurb("Minimum distance between keyframes to start a new file")
.default_value(DEFAULT_MIN_KEYFRAME_DISTANCE.into())
.build(),
glib::ParamSpecUInt64::builder("max-file-size")
.nick("Maximum file size")
.blurb("Maximum file size before starting a new file in max-size mode")
.default_value(DEFAULT_MAX_SIZE)
.build(),
glib::ParamSpecUInt64::builder("max-file-duration")
.nick("Maximum file duration")
.blurb("Maximum file duration before starting a new file in max-duration mode")
.default_value(DEFAULT_MAX_FILE_DURATION.into())
.build(),
glib::ParamSpecString::builder("location")
.nick("location")
.blurb("Pattern to create file names of input files. File names are created by calling sprintf() with the pattern and the current index.")
.build(),
]
});
@ -554,6 +834,28 @@ impl ObjectImpl for S3PutObjectSink {
"force-path-style" => {
settings.force_path_style = value.get::<bool>().expect("type checked upstream");
}
"next-file" => {
settings.next_file = value
.get::<NextFile>()
.expect("type checked upstream")
.into();
}
"min-keyframe-distance" => {
settings.min_keyframe_distance = value
.get::<gst::ClockTime>()
.expect("type checked upstream");
}
"max-file-size" => {
settings.max_file_size = value.get::<u64>().expect("type checked upstream");
}
"max-file-duration" => {
settings.max_file_duration = value
.get::<gst::ClockTime>()
.expect("type checked upstream");
}
"location" => {
settings.file_name = value.get::<String>().expect("type checked upstream");
}
_ => unimplemented!(),
}
}
@ -588,6 +890,10 @@ impl ObjectImpl for S3PutObjectSink {
"flush-interval-time" => settings.flush_interval_time.to_value(),
"flush-on-error" => settings.flush_on_error.to_value(),
"force-path-style" => settings.force_path_style.to_value(),
"min-keyframe-distance" => settings.min_keyframe_distance.to_value(),
"max-file-size" => settings.max_file_size.to_value(),
"max-file-duration" => settings.max_file_duration.to_value(),
"location" => settings.file_name.to_value(),
_ => unimplemented!(),
}
}
@ -635,33 +941,54 @@ impl BaseSinkImpl for S3PutObjectSink {
fn stop(&self) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
let next_file = settings.next_file;
if let State::Started(ref started_state) = *state {
if let State::Started(ref mut started_state) = *state {
if settings.flush_on_error && started_state.need_flush {
drop(settings);
drop(state);
gst::warning!(CAT, imp: self, "Stopped without EOS, but flushing");
if let Err(error_message) = self.flush_buffer() {
gst::error!(
CAT,
imp: self,
"Failed to finalize the upload: {:?}",
error_message
);
if next_file.is_some() {
if self
.write_put_object_request_with_next_file(started_state)
.is_err()
{
gst::error!(
CAT,
imp: self,
"Failed to finalize the next-file upload",
);
}
} else {
gst::warning!(CAT, imp: self, "Stopped without EOS, but flushing");
if let Err(error_message) = self.flush_buffer() {
gst::error!(
CAT,
imp: self,
"Failed to finalize the upload: {:?}",
error_message
);
}
}
state = self.state.lock().unwrap();
}
}
*state = State::Stopped;
gst::info!(CAT, imp: self, "Stopped");
Ok(())
}
fn render(&self, buffer: &gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> {
let settings = self.settings.lock().unwrap();
let next_file = settings.next_file;
drop(settings);
/* Everything stays the same as before if next-file is not set */
if let Some(nf) = next_file {
return self.write_buffer_with_next_file(buffer, nf);
}
let mut state = self.state.lock().unwrap();
let started_state = match *state {
@ -716,26 +1043,133 @@ impl BaseSinkImpl for S3PutObjectSink {
}
fn event(&self, event: gst::Event) -> bool {
if let gst::EventView::Eos(_) = event.view() {
let mut state = self.state.lock().unwrap();
use gst::EventView;
if let State::Started(ref mut started_state) = *state {
started_state.need_flush = false;
match event.view() {
EventView::CustomDownstream(ev) => {
let settings = self.settings.lock().unwrap();
let next_file = settings.next_file;
let is_next_key_unit_event =
next_file.map_or_else(|| false, |nf| nf == NextFile::KeyUnitEvent);
drop(settings);
/* Taken from gst_multi_file_sink_event */
if is_next_key_unit_event && gst_video::ForceKeyUnitEvent::is(ev) {
use gst_video::DownstreamForceKeyUnitEvent;
match DownstreamForceKeyUnitEvent::parse(ev) {
Ok(key_unit_event) => {
let mut state = self.state.lock().unwrap();
if let State::Started(ref mut started_state) = *state {
if started_state.force_key_unit_count != -1
&& started_state.force_key_unit_count as u32
== key_unit_event.count
{
return BaseSinkImplExt::parent_event(self, event);
}
started_state.force_key_unit_count = key_unit_event.count as i32;
let _ = self.write_put_object_request_with_next_file(started_state);
}
}
Err(e) => gst::error!(CAT, "Failed to parse key unit event: {}", e),
}
}
}
EventView::Eos(_) => {
let settings = self.settings.lock().unwrap();
let next_file = settings.next_file;
drop(settings);
drop(state);
let mut state = self.state.lock().unwrap();
if let Err(error_message) = self.flush_buffer() {
gst::error!(
CAT,
imp: self,
"Failed to finalize the upload: {:?}",
error_message
);
return false;
if let State::Started(ref mut started_state) = *state {
started_state.need_flush = false;
if next_file.is_some()
&& self
.write_put_object_request_with_next_file(started_state)
.is_err()
{
gst::element_imp_error!(
self,
gst::CoreError::Failed,
["Failed to finalize the upload"]
);
}
}
if next_file.is_none() {
if let Err(error_message) = self.flush_buffer() {
gst::error!(
CAT,
imp: self,
"Failed to finalize the upload: {:?}",
error_message
);
return false;
}
}
}
_ => (),
}
BaseSinkImplExt::parent_event(self, event)
}
fn set_caps(&self, caps: &gst::Caps) -> Result<(), gst::LoggableError> {
let s = caps
.structure(0)
.ok_or(gst::loggable_error!(CAT, "Missing caps in set_caps"))?;
if let Ok(Some(streamheaders)) = s.get_optional::<gst::ArrayRef>("streamheader") {
if streamheaders.is_empty() {
return Ok(());
}
let streamheaders = streamheaders.as_slice();
let mut headers: Vec<u8> = Vec::new();
let mut state = self.state.lock().unwrap();
let started_state = match *state {
State::Started(ref mut started_state) => started_state,
State::Stopped => {
return Err(gst::loggable_error!(CAT, "Element should be started"));
}
};
started_state.streamheaders_size = 0;
for header in streamheaders {
let buffer = header.get::<Option<gst::Buffer>>();
if let Ok(Some(buf)) = buffer {
let map = buf.map_readable().map_err(|_| {
gst::element_imp_error!(
self,
gst::CoreError::Failed,
["Failed to map streamheader buffer"]
);
gst::loggable_error!(CAT, "Failed to map streamheader buffer")
})?;
headers.extend_from_slice(map.as_slice());
started_state.streamheaders_size += map.size() as u64;
}
}
if !headers.is_empty() {
let _ = started_state.streamheaders.take();
gst::info!(CAT, imp: self, "Got streamheaders");
started_state.streamheaders = Some(headers);
}
drop(state);
}
self.parent_set_caps(caps)
}
}