From 12dbf50ddc0a8b38e16928d839c047e07aec5995 Mon Sep 17 00:00:00 2001 From: Arun Raghavan Date: Thu, 28 Sep 2023 17:36:04 +0200 Subject: [PATCH] aws: s3putobjectsink: Add some thresholds for flushing Lets us connect when we perform a flush Part-of: --- net/aws/src/s3sink/putobjectsink.rs | 93 ++++++++++++++++++++++++++++- net/aws/tests/s3.rs | 83 ++++++++++++++++++++----- 2 files changed, 160 insertions(+), 16 deletions(-) diff --git a/net/aws/src/s3sink/putobjectsink.rs b/net/aws/src/s3sink/putobjectsink.rs index ae044bd1..2275a5d6 100644 --- a/net/aws/src/s3sink/putobjectsink.rs +++ b/net/aws/src/s3sink/putobjectsink.rs @@ -31,6 +31,9 @@ use crate::s3url::*; use crate::s3utils::{self, duration_from_millis, duration_to_millis, WaitError}; const DEFAULT_RETRY_ATTEMPTS: u32 = 5; +const DEFAULT_FLUSH_INTERVAL_BUFFERS: u64 = 1; +const DEFAULT_FLUSH_INTERVAL_BYTES: u64 = 0; +const DEFAULT_FLUSH_INTERVAL_TIME: gst::ClockTime = gst::ClockTime::from_nseconds(0); // General setting for create / abort requests const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15_000; @@ -38,11 +41,18 @@ const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15_000; struct Started { client: Client, buffer: Vec, + start_pts: Option, + num_buffers: u64, } impl Started { pub fn new(client: Client, buffer: Vec) -> Started { - Started { client, buffer } + Started { + client, + buffer, + start_pts: gst::ClockTime::NONE, + num_buffers: 0, + } } } @@ -66,6 +76,9 @@ struct Settings { retry_attempts: u32, request_timeout: Duration, endpoint_uri: Option, + flush_interval_buffers: u64, + flush_interval_bytes: u64, + flush_interval_time: Option, } impl Settings { @@ -118,6 +131,9 @@ impl Default for Settings { retry_attempts: DEFAULT_RETRY_ATTEMPTS, request_timeout: Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC), endpoint_uri: None, + flush_interval_buffers: DEFAULT_FLUSH_INTERVAL_BUFFERS, + flush_interval_bytes: DEFAULT_FLUSH_INTERVAL_BYTES, + flush_interval_time: Some(DEFAULT_FLUSH_INTERVAL_TIME), } } } @@ -139,6 +155,40 @@ static CAT: Lazy = Lazy::new(|| { }); impl S3PutObjectSink { + fn check_thresholds( + &self, + state: &Started, + pts: Option, + duration: Option, + ) -> bool { + let settings = self.settings.lock().unwrap(); + + #[allow(clippy::if_same_then_else)] + #[allow(clippy::needless_bool)] + // Verbose if/else form for readability + if settings.flush_interval_buffers > 0 + && (state.num_buffers % settings.flush_interval_buffers) == 0 + { + true + } else if settings.flush_interval_bytes > 0 + && (state.buffer.len() as u64 % settings.flush_interval_bytes) == 0 + { + true + } else if settings.flush_interval_time.is_some() + && settings.flush_interval_time.unwrap() != DEFAULT_FLUSH_INTERVAL_TIME + && state.start_pts.is_some() + && pts.is_some() + && duration.is_some() + && (pts.unwrap() - state.start_pts.unwrap() + duration.unwrap()) + % settings.flush_interval_time.unwrap() + == gst::ClockTime::from_nseconds(0) + { + true + } else { + false + } + } + fn flush_buffer(&self) -> Result<(), Option> { let put_object_req = self.create_put_object_request(); @@ -370,6 +420,21 @@ impl ObjectImpl for S3PutObjectSink { .nick("content-disposition") .blurb("Content-Disposition header to set for uploaded object") .build(), + glib::ParamSpecUInt64::builder("flush-interval-buffers") + .nick("Flush interval in buffers") + .blurb("Number of buffers to accumulate before doing a write (0 => disable)") + .default_value(DEFAULT_FLUSH_INTERVAL_BUFFERS) + .build(), + glib::ParamSpecUInt64::builder("flush-interval-bytes") + .nick("Flush interval in bytes") + .blurb("Number of bytes to accumulate before doing a write (0 => disable)") + .default_value(DEFAULT_FLUSH_INTERVAL_BYTES) + .build(), + glib::ParamSpecUInt64::builder("flush-interval-time") + .nick("Flush interval in duration") + .blurb("Total duration of buffers to accumulate before doing a write (0 => disable)") + .default_value(DEFAULT_FLUSH_INTERVAL_TIME.nseconds()) + .build(), ] }); @@ -451,6 +516,18 @@ impl ObjectImpl for S3PutObjectSink { .get::>() .expect("type checked upstream"); } + "flush-interval-buffers" => { + settings.flush_interval_buffers = + value.get::().expect("type checked upstream"); + } + "flush-interval-bytes" => { + settings.flush_interval_bytes = value.get::().expect("type checked upstream"); + } + "flush-interval-time" => { + settings.flush_interval_time = value + .get::>() + .expect("type checked upstream"); + } _ => unimplemented!(), } } @@ -480,6 +557,9 @@ impl ObjectImpl for S3PutObjectSink { "endpoint-uri" => settings.endpoint_uri.to_value(), "content-type" => settings.content_type.to_value(), "content-disposition" => settings.content_disposition.to_value(), + "flush-interval-buffers" => settings.flush_interval_buffers.to_value(), + "flush-interval-bytes" => settings.flush_interval_bytes.to_value(), + "flush-interval-time" => settings.flush_interval_time.to_value(), _ => unimplemented!(), } } @@ -544,6 +624,12 @@ impl BaseSinkImpl for S3PutObjectSink { } }; + if started_state.start_pts.is_none() { + started_state.start_pts = buffer.pts(); + } + + started_state.num_buffers += 1; + gst::trace!(CAT, imp: self, "Rendering {:?}", buffer); let map = buffer.map_readable().map_err(|_| { gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]); @@ -551,6 +637,11 @@ impl BaseSinkImpl for S3PutObjectSink { })?; started_state.buffer.extend_from_slice(map.as_slice()); + + if !self.check_thresholds(started_state, buffer.pts(), buffer.duration()) { + return Ok(gst::FlowSuccess::Ok); + } + drop(state); match self.flush_buffer() { diff --git a/net/aws/tests/s3.rs b/net/aws/tests/s3.rs index c9d141e6..4fff59d6 100644 --- a/net/aws/tests/s3.rs +++ b/net/aws/tests/s3.rs @@ -28,6 +28,12 @@ mod tests { }); } + fn make_buffer(content: &[u8]) -> gst::Buffer { + let mut buf = gst::Buffer::from_slice(content.to_owned()); + buf.make_mut().set_pts(gst::ClockTime::from_mseconds(200)); + buf + } + async fn delete_object(region: String, bucket: &str, key: &str) { let region_provider = aws_config::meta::region::RegionProviderChain::first_try( aws_sdk_s3::config::Region::new(region), @@ -70,11 +76,11 @@ mod tests { h1.set_src_caps(gst::Caps::builder("text/plain").build()); h1.play(); - h1.push(gst::Buffer::from_slice(content)).unwrap(); - h1.push(gst::Buffer::from_slice(content)).unwrap(); - h1.push(gst::Buffer::from_slice(content)).unwrap(); - h1.push(gst::Buffer::from_slice(content)).unwrap(); - h1.push(gst::Buffer::from_slice(content)).unwrap(); + h1.push(make_buffer(content)).unwrap(); + h1.push(make_buffer(content)).unwrap(); + h1.push(make_buffer(content)).unwrap(); + h1.push(make_buffer(content)).unwrap(); + h1.push(make_buffer(content)).unwrap(); h1.push_event(gst::event::Eos::new()); let mut h2 = gst_check::Harness::new("awss3src"); @@ -91,7 +97,12 @@ mod tests { } // Common helper - async fn do_s3_putobject_test(key_prefix: &str) { + async fn do_s3_putobject_test( + key_prefix: &str, + buffers: Option, + bytes: Option, + time: Option, + ) { init(); let region = std::env::var("AWS_REGION").unwrap_or_else(|_| DEFAULT_S3_REGION.to_string()); @@ -103,22 +114,40 @@ mod tests { // Manually add the element so we can configure it before it goes to PLAYING let mut h1 = gst_check::Harness::new_empty(); + // Need to add_parse() because the Harness API / Rust bindings aren't conducive to creating and // adding an element manually h1.add_parse( - format!("awss3putobjectsink key=\"{key}\" region=\"{region}\" bucket=\"{bucket}\"") + format!("awss3putobjectsink key=\"{key}\" region=\"{region}\" bucket=\"{bucket}\" name=\"sink\"") .as_str(), ); + let h1el = h1 + .element() + .unwrap() + .dynamic_cast::() + .unwrap() + .by_name("sink") + .unwrap(); + if let Some(b) = buffers { + h1el.set_property("flush-interval-buffers", b) + }; + if let Some(b) = bytes { + h1el.set_property("flush-interval-bytes", b) + }; + if time.is_some() { + h1el.set_property("flush-interval-time", time) + }; + h1.set_src_caps(gst::Caps::builder("text/plain").build()); h1.play(); - h1.push(gst::Buffer::from_slice(content)).unwrap(); - h1.push(gst::Buffer::from_slice(content)).unwrap(); - h1.push(gst::Buffer::from_slice(content)).unwrap(); - h1.push(gst::Buffer::from_slice(content)).unwrap(); - h1.push(gst::Buffer::from_slice(content)).unwrap(); + h1.push(make_buffer(content)).unwrap(); + h1.push(make_buffer(content)).unwrap(); + h1.push(make_buffer(content)).unwrap(); + h1.push(make_buffer(content)).unwrap(); + h1.push(make_buffer(content)).unwrap(); h1.push_event(gst::event::Eos::new()); let mut h2 = gst_check::Harness::new("awss3src"); @@ -151,16 +180,40 @@ mod tests { #[tokio::test] async fn test_s3_put_object_simple() { - do_s3_putobject_test("s3-put-object-test").await; + do_s3_putobject_test("s3-put-object-test", None, None, None).await; } #[tokio::test] async fn test_s3_put_object_whitespace() { - do_s3_putobject_test("s3 put object test").await; + do_s3_putobject_test("s3 put object test", None, None, None).await; } #[tokio::test] async fn test_s3_put_object_unicode() { - do_s3_putobject_test("s3 put object 🧪 😱").await; + do_s3_putobject_test("s3 put object 🧪 😱", None, None, None).await; + } + + #[tokio::test] + async fn test_s3_put_object_flush_buffers() { + // Awkward threshold as we push 5 buffers + do_s3_putobject_test("s3-put-object-test fbuf", Some(2), None, None).await; + } + + #[tokio::test] + async fn test_s3_put_object_flush_bytes() { + // Awkward threshold as we push 14 bytes per buffer + do_s3_putobject_test("s3-put-object-test fbytes", None, Some(30), None).await; + } + + #[tokio::test] + async fn test_s3_put_object_flush_time() { + do_s3_putobject_test( + "s3-put-object-test ftime", + None, + None, + // Awkward threshold as we push each buffer with 200ms + Some(gst::ClockTime::from_mseconds(300)), + ) + .await; } }