From a54b2dd39e9dcfef058763665d1a74342b65abe1 Mon Sep 17 00:00:00 2001 From: Arun Raghavan Date: Wed, 27 Sep 2023 19:08:10 +0200 Subject: [PATCH] aws: s3: Add a new awss3putobjectsink When streaming small amounts of data, using awss3sink might not be a good idea, as we need to accumulate at least 5 MB of data for a multipart upload (or we flush on EOS). The alternative, while inefficient, is to do a complete PutObject of _all_ the data periodically so as to not lose data in case of a pipeline failure. This element makes a start on this idea by doing a PutObject for every buffer. Part-of: --- net/aws/src/s3sink/mod.rs | 18 +- .../src/s3sink/{imp.rs => multipartsink.rs} | 0 net/aws/src/s3sink/putobjectsink.rs | 593 ++++++++++++++++++ net/aws/tests/s3.rs | 152 +++-- 4 files changed, 718 insertions(+), 45 deletions(-) rename net/aws/src/s3sink/{imp.rs => multipartsink.rs} (100%) create mode 100644 net/aws/src/s3sink/putobjectsink.rs diff --git a/net/aws/src/s3sink/mod.rs b/net/aws/src/s3sink/mod.rs index 9198f025..78308966 100644 --- a/net/aws/src/s3sink/mod.rs +++ b/net/aws/src/s3sink/mod.rs @@ -1,4 +1,6 @@ // Copyright (C) 2019 Amazon.com, Inc. or its affiliates +// Copyright (C) 2023 Asymptotic Inc +// Author: Arun Raghavan // // This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. // If a copy of the MPL was not distributed with this file, You can obtain one at @@ -9,7 +11,8 @@ use gst::glib; use gst::prelude::*; -mod imp; +mod multipartsink; +mod putobjectsink; #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)] #[repr(u32)] @@ -27,7 +30,11 @@ pub(crate) enum OnError { } glib::wrapper! { - pub struct S3Sink(ObjectSubclass) @extends gst_base::BaseSink, gst::Element, gst::Object; + pub struct S3Sink(ObjectSubclass) @extends gst_base::BaseSink, gst::Element, gst::Object; +} + +glib::wrapper! { + pub struct S3PutObjectSink(ObjectSubclass) @extends gst_base::BaseSink, gst::Element, gst::Object; } pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { @@ -43,5 +50,12 @@ pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { "awss3sink", gst::Rank::PRIMARY, S3Sink::static_type(), + )?; + gst::Element::register( + Some(plugin), + "awss3putobjectsink", + // This element should not be autoplugged as it is only useful for specific use cases + gst::Rank::NONE, + S3PutObjectSink::static_type(), ) } diff --git a/net/aws/src/s3sink/imp.rs b/net/aws/src/s3sink/multipartsink.rs similarity index 100% rename from net/aws/src/s3sink/imp.rs rename to net/aws/src/s3sink/multipartsink.rs diff --git a/net/aws/src/s3sink/putobjectsink.rs b/net/aws/src/s3sink/putobjectsink.rs new file mode 100644 index 00000000..ae044bd1 --- /dev/null +++ b/net/aws/src/s3sink/putobjectsink.rs @@ -0,0 +1,593 @@ +// Copyright (C) 2019 Amazon.com, Inc. or its affiliates +// Copyright (C) 2023 Asymptotic Inc +// Author: Arun Raghavan +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; +use gst::subclass::prelude::*; +use gst_base::subclass::prelude::*; + +use aws_sdk_s3::{ + config::{self, retry::RetryConfig, Credentials, Region}, + operation::put_object::builders::PutObjectFluentBuilder, + primitives::ByteStream, + Client, +}; + +use futures::future; +use gst::glib::once_cell::sync::Lazy; +use std::collections::HashMap; +use std::convert::From; +use std::sync::Mutex; +use std::time::Duration; + +use crate::s3url::*; +use crate::s3utils::{self, duration_from_millis, duration_to_millis, WaitError}; + +const DEFAULT_RETRY_ATTEMPTS: u32 = 5; + +// General setting for create / abort requests +const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15_000; + +struct Started { + client: Client, + buffer: Vec, +} + +impl Started { + pub fn new(client: Client, buffer: Vec) -> Started { + Started { client, buffer } + } +} + +#[derive(Default)] +enum State { + #[default] + Stopped, + Started(Started), +} + +struct Settings { + region: Region, + bucket: Option, + key: Option, + content_type: Option, + content_disposition: Option, + access_key: Option, + secret_access_key: Option, + session_token: Option, + metadata: Option, + retry_attempts: u32, + request_timeout: Duration, + endpoint_uri: Option, +} + +impl Settings { + fn to_uri(&self) -> String { + GstS3Url { + region: self.region.clone(), + bucket: self.bucket.clone().unwrap(), + object: self.key.clone().unwrap(), + version: None, + } + .to_string() + } + + fn to_metadata(&self, imp: &S3PutObjectSink) -> Option> { + self.metadata.as_ref().map(|structure| { + let mut hash = HashMap::new(); + + for (key, value) in structure.iter() { + if let Ok(Ok(value_str)) = value.transform::().map(|v| v.get()) { + gst::log!(CAT, imp: imp, "metadata '{}' -> '{}'", key, value_str); + hash.insert(key.to_string(), value_str); + } else { + gst::warning!( + CAT, + imp: imp, + "Failed to convert metadata '{}' to string ('{:?}')", + key, + value + ); + } + } + + hash + }) + } +} + +impl Default for Settings { + fn default() -> Self { + Settings { + region: Region::new("us-west-2"), + bucket: None, + key: None, + content_type: None, + content_disposition: None, + access_key: None, + secret_access_key: None, + session_token: None, + metadata: None, + retry_attempts: DEFAULT_RETRY_ATTEMPTS, + request_timeout: Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC), + endpoint_uri: None, + } + } +} + +#[derive(Default)] +pub struct S3PutObjectSink { + url: Mutex>, + settings: Mutex, + state: Mutex, + canceller: Mutex>, +} + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "awss3putobjectsink", + gst::DebugColorFlags::empty(), + Some("Amazon S3 PutObject Sink"), + ) +}); + +impl S3PutObjectSink { + fn flush_buffer(&self) -> Result<(), Option> { + let put_object_req = self.create_put_object_request(); + + let put_object_req_future = put_object_req.send(); + let _output = + s3utils::wait(&self.canceller, put_object_req_future).map_err(|err| match err { + WaitError::FutureError(err) => Some(gst::error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to upload part: {}", err] + )), + WaitError::Cancelled => None, + })?; + + gst::debug!(CAT, imp: self, "Uploaded complete"); + + Ok(()) + } + + fn create_put_object_request(&self) -> PutObjectFluentBuilder { + let url = self.url.lock().unwrap(); + let settings = self.settings.lock().unwrap(); + let state = self.state.lock().unwrap(); + let state = match *state { + State::Started(ref started_state) => started_state, + State::Stopped => { + unreachable!("Element should be started"); + } + }; + + let body = Some(ByteStream::from(state.buffer.clone())); + + let bucket = Some(url.as_ref().unwrap().bucket.to_owned()); + let key = Some(url.as_ref().unwrap().object.to_owned()); + let metadata = settings.to_metadata(self); + + let client = &state.client; + + client + .put_object() + .set_body(body) + .set_bucket(bucket) + .set_key(key) + .set_metadata(metadata) + } + + fn start(&self) -> Result<(), gst::ErrorMessage> { + let mut state = self.state.lock().unwrap(); + let settings = self.settings.lock().unwrap(); + + if let State::Started { .. } = *state { + unreachable!("Element should be started"); + } + + let s3url = { + let url = self.url.lock().unwrap(); + match *url { + Some(ref url) => url.clone(), + None => { + return Err(gst::error_msg!( + gst::ResourceError::Settings, + ["Cannot start without a URL being set"] + )); + } + } + }; + + let timeout_config = s3utils::timeout_config(settings.request_timeout); + + let cred = match ( + settings.access_key.as_ref(), + settings.secret_access_key.as_ref(), + ) { + (Some(access_key), Some(secret_access_key)) => Some(Credentials::new( + access_key.clone(), + secret_access_key.clone(), + settings.session_token.clone(), + None, + "aws-s3-putobject-sink", + )), + _ => None, + }; + + let sdk_config = + s3utils::wait_config(&self.canceller, s3url.region.clone(), timeout_config, cred) + .map_err(|err| match err { + WaitError::FutureError(err) => gst::error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to create SDK config: {}", err] + ), + WaitError::Cancelled => { + gst::error_msg!( + gst::LibraryError::Failed, + ["SDK config request interrupted during start"] + ) + } + })?; + + let config_builder = config::Builder::from(&sdk_config) + .retry_config(RetryConfig::standard().with_max_attempts(settings.retry_attempts)); + + let config = if let Some(ref uri) = settings.endpoint_uri { + config_builder.endpoint_url(uri).build() + } else { + config_builder.build() + }; + + let client = Client::from_conf(config); + + *state = State::Started(Started::new(client, Vec::new())); + + Ok(()) + } + + fn cancel(&self) { + let mut canceller = self.canceller.lock().unwrap(); + + if let Some(c) = canceller.take() { + c.abort() + }; + } + + fn set_uri(self: &S3PutObjectSink, url_str: Option<&str>) -> Result<(), glib::Error> { + let state = self.state.lock().unwrap(); + + if let State::Started { .. } = *state { + return Err(glib::Error::new( + gst::URIError::BadState, + "Cannot set URI on a started s3sink", + )); + } + + let mut url = self.url.lock().unwrap(); + + if url_str.is_none() { + *url = None; + return Ok(()); + } + + gst::debug!(CAT, imp: self, "Setting uri to {:?}", url_str); + + let url_str = url_str.unwrap(); + match parse_s3_url(url_str) { + Ok(s3url) => { + *url = Some(s3url); + Ok(()) + } + Err(_) => Err(glib::Error::new( + gst::URIError::BadUri, + "Could not parse URI", + )), + } + } +} + +#[glib::object_subclass] +impl ObjectSubclass for S3PutObjectSink { + const NAME: &'static str = "GstAwsS3PutObjectSink"; + type Type = super::S3PutObjectSink; + type ParentType = gst_base::BaseSink; +} + +impl ObjectImpl for S3PutObjectSink { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecString::builder("bucket") + .nick("S3 Bucket") + .blurb("The bucket of the file to write") + .mutable_ready() + .build(), + glib::ParamSpecString::builder("key") + .nick("S3 Key") + .blurb("The key of the file to write") + .mutable_ready() + .build(), + glib::ParamSpecString::builder("region") + .nick("AWS Region") + .blurb("An AWS region (e.g. eu-west-2).") + .default_value(Some("us-west-2")) + .mutable_ready() + .build(), + glib::ParamSpecString::builder("uri") + .nick("URI") + .blurb("The S3 object URI") + .mutable_ready() + .build(), + glib::ParamSpecString::builder("access-key") + .nick("Access Key") + .blurb("AWS Access Key") + .mutable_ready() + .build(), + glib::ParamSpecString::builder("secret-access-key") + .nick("Secret Access Key") + .blurb("AWS Secret Access Key") + .mutable_ready() + .build(), + glib::ParamSpecString::builder("session-token") + .nick("Session Token") + .blurb("AWS temporary Session Token from STS") + .mutable_ready() + .build(), + glib::ParamSpecBoxed::builder::("metadata") + .nick("Metadata") + .blurb("A map of metadata to store with the object in S3; field values need to be convertible to strings.") + .mutable_ready() + .build(), + glib::ParamSpecUInt::builder("retry-attempts") + .nick("Retry attempts") + .blurb("Number of times AWS SDK attempts a request before abandoning the request") + .minimum(1) + .maximum(10) + .default_value(DEFAULT_RETRY_ATTEMPTS) + .build(), + glib::ParamSpecInt64::builder("request-timeout") + .nick("Request timeout") + .blurb("Timeout for general S3 requests (in ms, set to -1 for infinity)") + .minimum(-1) + .default_value(DEFAULT_REQUEST_TIMEOUT_MSEC as i64) + .build(), + glib::ParamSpecString::builder("endpoint-uri") + .nick("S3 endpoint URI") + .blurb("The S3 endpoint URI to use") + .build(), + glib::ParamSpecString::builder("content-type") + .nick("content-type") + .blurb("Content-Type header to set for uploaded object") + .build(), + glib::ParamSpecString::builder("content-disposition") + .nick("content-disposition") + .blurb("Content-Disposition header to set for uploaded object") + .build(), + ] + }); + + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + let mut settings = self.settings.lock().unwrap(); + + gst::debug!( + CAT, + imp: self, + "Setting property '{}' to '{:?}'", + pspec.name(), + value + ); + + match pspec.name() { + "bucket" => { + settings.bucket = value + .get::>() + .expect("type checked upstream"); + if settings.key.is_some() { + let _ = self.set_uri(Some(&settings.to_uri())); + } + } + "key" => { + settings.key = value + .get::>() + .expect("type checked upstream"); + if settings.bucket.is_some() { + let _ = self.set_uri(Some(&settings.to_uri())); + } + } + "region" => { + let region = value.get::().expect("type checked upstream"); + settings.region = Region::new(region); + if settings.key.is_some() && settings.bucket.is_some() { + let _ = self.set_uri(Some(&settings.to_uri())); + } + } + "uri" => { + let _ = self.set_uri(value.get().expect("type checked upstream")); + } + "access-key" => { + settings.access_key = value.get().expect("type checked upstream"); + } + "secret-access-key" => { + settings.secret_access_key = value.get().expect("type checked upstream"); + } + "session-token" => { + settings.session_token = value.get().expect("type checked upstream"); + } + "metadata" => { + settings.metadata = value.get().expect("type checked upstream"); + } + "retry-attempts" => { + settings.retry_attempts = value.get::().expect("type checked upstream"); + } + "request-timeout" => { + settings.request_timeout = + duration_from_millis(value.get::().expect("type checked upstream")); + } + "endpoint-uri" => { + settings.endpoint_uri = value + .get::>() + .expect("type checked upstream"); + if settings.key.is_some() && settings.bucket.is_some() { + let _ = self.set_uri(Some(&settings.to_uri())); + } + } + "content-type" => { + settings.content_type = value + .get::>() + .expect("type checked upstream"); + } + "content-disposition" => { + settings.content_disposition = value + .get::>() + .expect("type checked upstream"); + } + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + let settings = self.settings.lock().unwrap(); + + match pspec.name() { + "key" => settings.key.to_value(), + "bucket" => settings.bucket.to_value(), + "region" => settings.region.to_string().to_value(), + "uri" => { + let url = self.url.lock().unwrap(); + let url = match *url { + Some(ref url) => url.to_string(), + None => "".to_string(), + }; + + url.to_value() + } + "access-key" => settings.access_key.to_value(), + "secret-access-key" => settings.secret_access_key.to_value(), + "session-token" => settings.session_token.to_value(), + "metadata" => settings.metadata.to_value(), + "retry-attempts" => settings.retry_attempts.to_value(), + "request-timeout" => duration_to_millis(Some(settings.request_timeout)).to_value(), + "endpoint-uri" => settings.endpoint_uri.to_value(), + "content-type" => settings.content_type.to_value(), + "content-disposition" => settings.content_disposition.to_value(), + _ => unimplemented!(), + } + } +} + +impl GstObjectImpl for S3PutObjectSink {} + +impl ElementImpl for S3PutObjectSink { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "Amazon S3 PutObject sink", + "Source/Network", + "Writes an object to Amazon S3 using PutObject (mostly useful for small files)", + "Arun Raghavan ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let caps = gst::Caps::new_any(); + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + + vec![sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} + +impl BaseSinkImpl for S3PutObjectSink { + fn start(&self) -> Result<(), gst::ErrorMessage> { + self.start() + } + + fn stop(&self) -> Result<(), gst::ErrorMessage> { + let mut state = self.state.lock().unwrap(); + + *state = State::Stopped; + gst::info!(CAT, imp: self, "Stopped"); + + Ok(()) + } + + fn render(&self, buffer: &gst::Buffer) -> Result { + let mut state = self.state.lock().unwrap(); + + let started_state = match *state { + State::Started(ref mut s) => s, + State::Stopped => { + gst::element_imp_error!(self, gst::CoreError::Failed, ["Not started yet"]); + return Err(gst::FlowError::Error); + } + }; + + gst::trace!(CAT, imp: self, "Rendering {:?}", buffer); + let map = buffer.map_readable().map_err(|_| { + gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]); + gst::FlowError::Error + })?; + + started_state.buffer.extend_from_slice(map.as_slice()); + drop(state); + + match self.flush_buffer() { + Ok(_) => Ok(gst::FlowSuccess::Ok), + Err(err) => match err { + Some(error_message) => { + gst::error!(CAT, imp: self, "Upload failed: {}", error_message); + self.post_error_message(error_message); + Err(gst::FlowError::Error) + } + _ => { + gst::info!(CAT, imp: self, "Upload interrupted. Flushing..."); + Err(gst::FlowError::Flushing) + } + }, + } + } + + fn unlock(&self) -> Result<(), gst::ErrorMessage> { + self.cancel(); + + Ok(()) + } + + fn event(&self, event: gst::Event) -> bool { + if let gst::EventView::Eos(_) = event.view() { + if let Err(error_message) = self.flush_buffer() { + gst::error!( + CAT, + imp: self, + "Failed to finalize the upload: {:?}", + error_message + ); + return false; + } + } + + BaseSinkImplExt::parent_event(self, event) + } +} diff --git a/net/aws/tests/s3.rs b/net/aws/tests/s3.rs index f4ca8a96..c9d141e6 100644 --- a/net/aws/tests/s3.rs +++ b/net/aws/tests/s3.rs @@ -10,8 +10,6 @@ // The test times out on Windows for some reason, skip until we figure out why #[cfg(not(target_os = "windows"))] -#[test_with::env(AWS_ACCESS_KEY_ID)] -#[test_with::env(AWS_SECRET_ACCESS_KEY)] #[cfg(test)] mod tests { use gst::prelude::*; @@ -30,41 +28,9 @@ mod tests { }); } - // Common helper - async fn do_s3_test(key_prefix: &str) { - init(); - - let region = std::env::var("AWS_REGION").unwrap_or_else(|_| DEFAULT_S3_REGION.to_string()); - let bucket = - std::env::var("AWS_S3_BUCKET").unwrap_or_else(|_| "gst-plugins-rs-tests".to_string()); - let key = format!("{key_prefix}-{:?}.txt", chrono::Utc::now()); - let uri = format!("s3://{region}/{bucket}/{key}"); - let content = "Hello, world!\n".as_bytes(); - - // Manually add the element so we can configure it before it goes to PLAYING - let mut h1 = gst_check::Harness::new_empty(); - // Need to add_parse() because the Harness API / Rust bindings aren't conducive to creating and - // adding an element manually - h1.add_parse(format!("awss3sink uri=\"{uri}\"").as_str()); - - h1.set_src_caps(gst::Caps::builder("text/plain").build()); - h1.play(); - - h1.push(gst::Buffer::from_slice(content)).unwrap(); - h1.push_event(gst::event::Eos::new()); - - let mut h2 = gst_check::Harness::new("awss3src"); - h2.element().unwrap().set_property("uri", uri.clone()); - h2.play(); - - let buf = h2.pull_until_eos().unwrap().unwrap(); - assert_eq!( - content, - buf.into_mapped_buffer_readable().unwrap().as_slice() - ); - + async fn delete_object(region: String, bucket: &str, key: &str) { let region_provider = aws_config::meta::region::RegionProviderChain::first_try( - aws_sdk_s3::config::Region::new(region.clone()), + aws_sdk_s3::config::Region::new(region), ) .or_default_provider(); @@ -83,18 +49,118 @@ mod tests { .unwrap(); } - #[tokio::test] - async fn test_s3_simple() { - do_s3_test("s3-test").await; + // Common helper + async fn do_s3_multipart_test(key_prefix: &str) { + init(); + + let region = std::env::var("AWS_REGION").unwrap_or_else(|_| DEFAULT_S3_REGION.to_string()); + let bucket = + std::env::var("AWS_S3_BUCKET").unwrap_or_else(|_| "gst-plugins-rs-tests".to_string()); + let key = format!("{key_prefix}-{:?}.txt", chrono::Utc::now()); + let uri = format!("s3://{region}/{bucket}/{key}"); + let content = "Hello, world!\n".as_bytes(); + + // Manually add the element so we can configure it before it goes to PLAYING + let mut h1 = gst_check::Harness::new_empty(); + // Need to add_parse() because the Harness API / Rust bindings aren't conducive to creating and + // adding an element manually + + h1.add_parse(format!("awss3sink uri=\"{uri}\"").as_str()); + + h1.set_src_caps(gst::Caps::builder("text/plain").build()); + h1.play(); + + h1.push(gst::Buffer::from_slice(content)).unwrap(); + h1.push(gst::Buffer::from_slice(content)).unwrap(); + h1.push(gst::Buffer::from_slice(content)).unwrap(); + h1.push(gst::Buffer::from_slice(content)).unwrap(); + h1.push(gst::Buffer::from_slice(content)).unwrap(); + h1.push_event(gst::event::Eos::new()); + + let mut h2 = gst_check::Harness::new("awss3src"); + h2.element().unwrap().set_property("uri", uri.clone()); + h2.play(); + + let buf = h2.pull_until_eos().unwrap().unwrap(); + assert_eq!( + content.repeat(5), + buf.into_mapped_buffer_readable().unwrap().as_slice() + ); + + delete_object(region.clone(), &bucket, &key).await; + } + + // Common helper + async fn do_s3_putobject_test(key_prefix: &str) { + init(); + + let region = std::env::var("AWS_REGION").unwrap_or_else(|_| DEFAULT_S3_REGION.to_string()); + let bucket = + std::env::var("AWS_S3_BUCKET").unwrap_or_else(|_| "gst-plugins-rs-tests".to_string()); + let key = format!("{key_prefix}-{:?}.txt", chrono::Utc::now()); + let uri = format!("s3://{region}/{bucket}/{key}"); + let content = "Hello, world!\n".as_bytes(); + + // Manually add the element so we can configure it before it goes to PLAYING + let mut h1 = gst_check::Harness::new_empty(); + // Need to add_parse() because the Harness API / Rust bindings aren't conducive to creating and + // adding an element manually + + h1.add_parse( + format!("awss3putobjectsink key=\"{key}\" region=\"{region}\" bucket=\"{bucket}\"") + .as_str(), + ); + + h1.set_src_caps(gst::Caps::builder("text/plain").build()); + h1.play(); + + h1.push(gst::Buffer::from_slice(content)).unwrap(); + h1.push(gst::Buffer::from_slice(content)).unwrap(); + h1.push(gst::Buffer::from_slice(content)).unwrap(); + h1.push(gst::Buffer::from_slice(content)).unwrap(); + h1.push(gst::Buffer::from_slice(content)).unwrap(); + h1.push_event(gst::event::Eos::new()); + + let mut h2 = gst_check::Harness::new("awss3src"); + h2.element().unwrap().set_property("uri", uri.clone()); + h2.play(); + + let buf = h2.pull_until_eos().unwrap().unwrap(); + assert_eq!( + content.repeat(5), + buf.into_mapped_buffer_readable().unwrap().as_slice() + ); + + delete_object(region.clone(), &bucket, &key).await; } #[tokio::test] - async fn test_s3_whitespace() { - do_s3_test("s3 test").await; + async fn test_s3_multipart_simple() { + do_s3_multipart_test("s3-test").await; } #[tokio::test] - async fn test_s3_unicode() { - do_s3_test("s3 🧪 😱").await; + async fn test_s3_multipart_whitespace() { + do_s3_multipart_test("s3 test").await; + } + + #[tokio::test] + async fn test_s3_multipart_unicode() { + do_s3_multipart_test("s3 🧪 😱").await; + } + + #[tokio::test] + async fn test_s3_put_object_simple() { + do_s3_putobject_test("s3-put-object-test").await; + } + + #[tokio::test] + async fn test_s3_put_object_whitespace() { + do_s3_putobject_test("s3 put object test").await; + } + + #[tokio::test] + async fn test_s3_put_object_unicode() { + do_s3_putobject_test("s3 put object 🧪 😱").await; } }