mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2024-05-20 17:28:49 +00:00
aws: s3putobjectsink: Add a flush-on-error property
Makes sure we can send out data even if the pipeline shutdown in error. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1337>
This commit is contained in:
parent
12dbf50ddc
commit
410d104ad6
|
@ -34,6 +34,7 @@ const DEFAULT_RETRY_ATTEMPTS: u32 = 5;
|
||||||
const DEFAULT_FLUSH_INTERVAL_BUFFERS: u64 = 1;
|
const DEFAULT_FLUSH_INTERVAL_BUFFERS: u64 = 1;
|
||||||
const DEFAULT_FLUSH_INTERVAL_BYTES: u64 = 0;
|
const DEFAULT_FLUSH_INTERVAL_BYTES: u64 = 0;
|
||||||
const DEFAULT_FLUSH_INTERVAL_TIME: gst::ClockTime = gst::ClockTime::from_nseconds(0);
|
const DEFAULT_FLUSH_INTERVAL_TIME: gst::ClockTime = gst::ClockTime::from_nseconds(0);
|
||||||
|
const DEFAULT_FLUSH_ON_ERROR: bool = false;
|
||||||
|
|
||||||
// General setting for create / abort requests
|
// General setting for create / abort requests
|
||||||
const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15_000;
|
const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15_000;
|
||||||
|
@ -43,6 +44,7 @@ struct Started {
|
||||||
buffer: Vec<u8>,
|
buffer: Vec<u8>,
|
||||||
start_pts: Option<gst::ClockTime>,
|
start_pts: Option<gst::ClockTime>,
|
||||||
num_buffers: u64,
|
num_buffers: u64,
|
||||||
|
need_flush: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Started {
|
impl Started {
|
||||||
|
@ -52,6 +54,7 @@ impl Started {
|
||||||
buffer,
|
buffer,
|
||||||
start_pts: gst::ClockTime::NONE,
|
start_pts: gst::ClockTime::NONE,
|
||||||
num_buffers: 0,
|
num_buffers: 0,
|
||||||
|
need_flush: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -79,6 +82,7 @@ struct Settings {
|
||||||
flush_interval_buffers: u64,
|
flush_interval_buffers: u64,
|
||||||
flush_interval_bytes: u64,
|
flush_interval_bytes: u64,
|
||||||
flush_interval_time: Option<gst::ClockTime>,
|
flush_interval_time: Option<gst::ClockTime>,
|
||||||
|
flush_on_error: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Settings {
|
impl Settings {
|
||||||
|
@ -134,6 +138,7 @@ impl Default for Settings {
|
||||||
flush_interval_buffers: DEFAULT_FLUSH_INTERVAL_BUFFERS,
|
flush_interval_buffers: DEFAULT_FLUSH_INTERVAL_BUFFERS,
|
||||||
flush_interval_bytes: DEFAULT_FLUSH_INTERVAL_BYTES,
|
flush_interval_bytes: DEFAULT_FLUSH_INTERVAL_BYTES,
|
||||||
flush_interval_time: Some(DEFAULT_FLUSH_INTERVAL_TIME),
|
flush_interval_time: Some(DEFAULT_FLUSH_INTERVAL_TIME),
|
||||||
|
flush_on_error: DEFAULT_FLUSH_ON_ERROR,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -435,6 +440,11 @@ impl ObjectImpl for S3PutObjectSink {
|
||||||
.blurb("Total duration of buffers to accumulate before doing a write (0 => disable)")
|
.blurb("Total duration of buffers to accumulate before doing a write (0 => disable)")
|
||||||
.default_value(DEFAULT_FLUSH_INTERVAL_TIME.nseconds())
|
.default_value(DEFAULT_FLUSH_INTERVAL_TIME.nseconds())
|
||||||
.build(),
|
.build(),
|
||||||
|
glib::ParamSpecBoolean::builder("flush-on-error")
|
||||||
|
.nick("Flush on error")
|
||||||
|
.blurb("Whether to write out the data on error (like stopping without an EOS)")
|
||||||
|
.default_value(DEFAULT_FLUSH_ON_ERROR)
|
||||||
|
.build(),
|
||||||
]
|
]
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -528,6 +538,9 @@ impl ObjectImpl for S3PutObjectSink {
|
||||||
.get::<Option<gst::ClockTime>>()
|
.get::<Option<gst::ClockTime>>()
|
||||||
.expect("type checked upstream");
|
.expect("type checked upstream");
|
||||||
}
|
}
|
||||||
|
"flush-on-error" => {
|
||||||
|
settings.flush_on_error = value.get::<bool>().expect("type checked upstream");
|
||||||
|
}
|
||||||
_ => unimplemented!(),
|
_ => unimplemented!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -560,6 +573,7 @@ impl ObjectImpl for S3PutObjectSink {
|
||||||
"flush-interval-buffers" => settings.flush_interval_buffers.to_value(),
|
"flush-interval-buffers" => settings.flush_interval_buffers.to_value(),
|
||||||
"flush-interval-bytes" => settings.flush_interval_bytes.to_value(),
|
"flush-interval-bytes" => settings.flush_interval_bytes.to_value(),
|
||||||
"flush-interval-time" => settings.flush_interval_time.to_value(),
|
"flush-interval-time" => settings.flush_interval_time.to_value(),
|
||||||
|
"flush-on-error" => settings.flush_on_error.to_value(),
|
||||||
_ => unimplemented!(),
|
_ => unimplemented!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -606,6 +620,26 @@ impl BaseSinkImpl for S3PutObjectSink {
|
||||||
|
|
||||||
fn stop(&self) -> Result<(), gst::ErrorMessage> {
|
fn stop(&self) -> Result<(), gst::ErrorMessage> {
|
||||||
let mut state = self.state.lock().unwrap();
|
let mut state = self.state.lock().unwrap();
|
||||||
|
let settings = self.settings.lock().unwrap();
|
||||||
|
|
||||||
|
if let State::Started(ref started_state) = *state {
|
||||||
|
if settings.flush_on_error && started_state.need_flush {
|
||||||
|
drop(settings);
|
||||||
|
drop(state);
|
||||||
|
|
||||||
|
gst::warning!(CAT, imp: self, "Stopped without EOS, but flushing");
|
||||||
|
if let Err(error_message) = self.flush_buffer() {
|
||||||
|
gst::error!(
|
||||||
|
CAT,
|
||||||
|
imp: self,
|
||||||
|
"Failed to finalize the upload: {:?}",
|
||||||
|
error_message
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
state = self.state.lock().unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
*state = State::Stopped;
|
*state = State::Stopped;
|
||||||
gst::info!(CAT, imp: self, "Stopped");
|
gst::info!(CAT, imp: self, "Stopped");
|
||||||
|
@ -629,6 +663,7 @@ impl BaseSinkImpl for S3PutObjectSink {
|
||||||
}
|
}
|
||||||
|
|
||||||
started_state.num_buffers += 1;
|
started_state.num_buffers += 1;
|
||||||
|
started_state.need_flush = true;
|
||||||
|
|
||||||
gst::trace!(CAT, imp: self, "Rendering {:?}", buffer);
|
gst::trace!(CAT, imp: self, "Rendering {:?}", buffer);
|
||||||
let map = buffer.map_readable().map_err(|_| {
|
let map = buffer.map_readable().map_err(|_| {
|
||||||
|
@ -668,6 +703,14 @@ impl BaseSinkImpl for S3PutObjectSink {
|
||||||
|
|
||||||
fn event(&self, event: gst::Event) -> bool {
|
fn event(&self, event: gst::Event) -> bool {
|
||||||
if let gst::EventView::Eos(_) = event.view() {
|
if let gst::EventView::Eos(_) = event.view() {
|
||||||
|
let mut state = self.state.lock().unwrap();
|
||||||
|
|
||||||
|
if let State::Started(ref mut started_state) = *state {
|
||||||
|
started_state.need_flush = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
drop(state);
|
||||||
|
|
||||||
if let Err(error_message) = self.flush_buffer() {
|
if let Err(error_message) = self.flush_buffer() {
|
||||||
gst::error!(
|
gst::error!(
|
||||||
CAT,
|
CAT,
|
||||||
|
|
|
@ -102,6 +102,7 @@ mod tests {
|
||||||
buffers: Option<u64>,
|
buffers: Option<u64>,
|
||||||
bytes: Option<u64>,
|
bytes: Option<u64>,
|
||||||
time: Option<gst::ClockTime>,
|
time: Option<gst::ClockTime>,
|
||||||
|
do_eos: bool,
|
||||||
) {
|
) {
|
||||||
init();
|
init();
|
||||||
|
|
||||||
|
@ -139,6 +140,9 @@ mod tests {
|
||||||
if time.is_some() {
|
if time.is_some() {
|
||||||
h1el.set_property("flush-interval-time", time)
|
h1el.set_property("flush-interval-time", time)
|
||||||
};
|
};
|
||||||
|
if !do_eos {
|
||||||
|
h1el.set_property("flush-on-error", true)
|
||||||
|
}
|
||||||
|
|
||||||
h1.set_src_caps(gst::Caps::builder("text/plain").build());
|
h1.set_src_caps(gst::Caps::builder("text/plain").build());
|
||||||
h1.play();
|
h1.play();
|
||||||
|
@ -148,7 +152,13 @@ mod tests {
|
||||||
h1.push(make_buffer(content)).unwrap();
|
h1.push(make_buffer(content)).unwrap();
|
||||||
h1.push(make_buffer(content)).unwrap();
|
h1.push(make_buffer(content)).unwrap();
|
||||||
h1.push(make_buffer(content)).unwrap();
|
h1.push(make_buffer(content)).unwrap();
|
||||||
h1.push_event(gst::event::Eos::new());
|
|
||||||
|
if do_eos {
|
||||||
|
h1.push_event(gst::event::Eos::new());
|
||||||
|
} else {
|
||||||
|
// teardown to trigger end
|
||||||
|
drop(h1);
|
||||||
|
}
|
||||||
|
|
||||||
let mut h2 = gst_check::Harness::new("awss3src");
|
let mut h2 = gst_check::Harness::new("awss3src");
|
||||||
h2.element().unwrap().set_property("uri", uri.clone());
|
h2.element().unwrap().set_property("uri", uri.clone());
|
||||||
|
@ -180,29 +190,29 @@ mod tests {
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_s3_put_object_simple() {
|
async fn test_s3_put_object_simple() {
|
||||||
do_s3_putobject_test("s3-put-object-test", None, None, None).await;
|
do_s3_putobject_test("s3-put-object-test", None, None, None, true).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_s3_put_object_whitespace() {
|
async fn test_s3_put_object_whitespace() {
|
||||||
do_s3_putobject_test("s3 put object test", None, None, None).await;
|
do_s3_putobject_test("s3 put object test", None, None, None, true).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_s3_put_object_unicode() {
|
async fn test_s3_put_object_unicode() {
|
||||||
do_s3_putobject_test("s3 put object 🧪 😱", None, None, None).await;
|
do_s3_putobject_test("s3 put object 🧪 😱", None, None, None, true).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_s3_put_object_flush_buffers() {
|
async fn test_s3_put_object_flush_buffers() {
|
||||||
// Awkward threshold as we push 5 buffers
|
// Awkward threshold as we push 5 buffers
|
||||||
do_s3_putobject_test("s3-put-object-test fbuf", Some(2), None, None).await;
|
do_s3_putobject_test("s3-put-object-test fbuf", Some(2), None, None, true).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_s3_put_object_flush_bytes() {
|
async fn test_s3_put_object_flush_bytes() {
|
||||||
// Awkward threshold as we push 14 bytes per buffer
|
// Awkward threshold as we push 14 bytes per buffer
|
||||||
do_s3_putobject_test("s3-put-object-test fbytes", None, Some(30), None).await;
|
do_s3_putobject_test("s3-put-object-test fbytes", None, Some(30), None, true).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
@ -213,6 +223,33 @@ mod tests {
|
||||||
None,
|
None,
|
||||||
// Awkward threshold as we push each buffer with 200ms
|
// Awkward threshold as we push each buffer with 200ms
|
||||||
Some(gst::ClockTime::from_mseconds(300)),
|
Some(gst::ClockTime::from_mseconds(300)),
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_s3_put_object_on_eos() {
|
||||||
|
// Disable all flush thresholds, so only EOS causes a flush
|
||||||
|
do_s3_putobject_test(
|
||||||
|
"s3-put-object-test eos",
|
||||||
|
Some(0),
|
||||||
|
Some(0),
|
||||||
|
Some(gst::ClockTime::from_nseconds(0)),
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_s3_put_object_without_eos() {
|
||||||
|
// Disable all flush thresholds, skip EOS, and cause a flush on error
|
||||||
|
do_s3_putobject_test(
|
||||||
|
"s3-put-object-test !eos",
|
||||||
|
Some(0),
|
||||||
|
Some(0),
|
||||||
|
Some(gst::ClockTime::from_nseconds(0)),
|
||||||
|
false,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue