aws: s3sink: Fix handling of special characters in key

Properly URL-encode the string if needed, and add some tests for a
couple of cases.

Fixes: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/431
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1333>
This commit is contained in:
Arun Raghavan 2023-09-20 18:17:41 -04:00
parent 829469d0fe
commit 51129febeb
2 changed files with 80 additions and 61 deletions

View file

@ -116,12 +116,13 @@ struct Settings {
impl Settings { impl Settings {
fn to_uri(&self) -> String { fn to_uri(&self) -> String {
format!( GstS3Url {
"s3://{}/{}/{}", region: self.region.clone(),
self.region, bucket: self.bucket.clone().unwrap(),
self.bucket.as_ref().unwrap(), object: self.key.clone().unwrap(),
self.key.as_ref().unwrap() version: None,
) }
.to_string()
} }
fn to_metadata(&self, imp: &S3Sink) -> Option<HashMap<String, String>> { fn to_metadata(&self, imp: &S3Sink) -> Option<HashMap<String, String>> {

View file

@ -8,72 +8,90 @@
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
// //
use gst::prelude::*;
const DEFAULT_S3_REGION: &str = "us-west-2";
fn init() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
gst::init().unwrap();
gstaws::plugin_register_static().unwrap();
});
}
// The test times out on Windows for some reason, skip until we figure out why // The test times out on Windows for some reason, skip until we figure out why
#[cfg(not(target_os = "windows"))] #[cfg(not(target_os = "windows"))]
#[test_with::env(AWS_ACCESS_KEY_ID)] #[test_with::env(AWS_ACCESS_KEY_ID)]
#[test_with::env(AWS_SECRET_ACCESS_KEY)] #[test_with::env(AWS_SECRET_ACCESS_KEY)]
#[tokio::test] #[cfg(test)]
async fn test_s3() { mod tests {
init(); use gst::prelude::*;
// Makes it easier to get AWS SDK logs if needed
env_logger::init();
let region = std::env::var("AWS_REGION").unwrap_or_else(|_| DEFAULT_S3_REGION.to_string()); const DEFAULT_S3_REGION: &str = "us-west-2";
let bucket =
std::env::var("AWS_S3_BUCKET").unwrap_or_else(|_| "gst-plugins-rs-tests".to_string());
let key = format!("s3-test-{:?}.txt", chrono::Utc::now());
let uri = format!("s3://{region}/{bucket}/{key}");
let content = "Hello, world!\n".as_bytes();
// Manually add the element so we can configure it before it goes to PLAYING fn init() {
let mut h1 = gst_check::Harness::new_empty(); use std::sync::Once;
// Need to add_parse() because the Harness API / Rust bindings aren't conducive to creating and static INIT: Once = Once::new();
// adding an element manually
h1.add_parse(format!("awss3sink uri={uri}").as_str());
h1.set_src_caps(gst::Caps::builder("text/plain").build()); INIT.call_once(|| {
h1.play(); gst::init().unwrap();
gstaws::plugin_register_static().unwrap();
// Makes it easier to get AWS SDK logs if needed
env_logger::init();
});
}
h1.push(gst::Buffer::from_slice(content)).unwrap(); // Common helper
h1.push_event(gst::event::Eos::new()); async fn do_s3_test(key_prefix: &str) {
init();
let mut h2 = gst_check::Harness::new("awss3src"); let region = std::env::var("AWS_REGION").unwrap_or_else(|_| DEFAULT_S3_REGION.to_string());
h2.element().unwrap().set_property("uri", uri.clone()); let bucket =
h2.play(); std::env::var("AWS_S3_BUCKET").unwrap_or_else(|_| "gst-plugins-rs-tests".to_string());
let key = format!("{key_prefix}-{:?}.txt", chrono::Utc::now());
let uri = format!("s3://{region}/{bucket}/{key}");
let content = "Hello, world!\n".as_bytes();
let buf = h2.pull_until_eos().unwrap().unwrap(); // Manually add the element so we can configure it before it goes to PLAYING
assert_eq!( let mut h1 = gst_check::Harness::new_empty();
content, // Need to add_parse() because the Harness API / Rust bindings aren't conducive to creating and
buf.into_mapped_buffer_readable().unwrap().as_slice() // adding an element manually
); h1.add_parse(format!("awss3sink uri=\"{uri}\"").as_str());
let region_provider = aws_config::meta::region::RegionProviderChain::first_try( h1.set_src_caps(gst::Caps::builder("text/plain").build());
aws_sdk_s3::config::Region::new(region.clone()), h1.play();
)
.or_default_provider();
let config = aws_config::from_env().region(region_provider).load().await; h1.push(gst::Buffer::from_slice(content)).unwrap();
let client = aws_sdk_s3::Client::new(&config); h1.push_event(gst::event::Eos::new());
client let mut h2 = gst_check::Harness::new("awss3src");
.delete_object() h2.element().unwrap().set_property("uri", uri.clone());
.bucket(bucket) h2.play();
.key(key)
.send() let buf = h2.pull_until_eos().unwrap().unwrap();
.await assert_eq!(
.unwrap(); content,
buf.into_mapped_buffer_readable().unwrap().as_slice()
);
let region_provider = aws_config::meta::region::RegionProviderChain::first_try(
aws_sdk_s3::config::Region::new(region.clone()),
)
.or_default_provider();
let config = aws_config::from_env().region(region_provider).load().await;
let client = aws_sdk_s3::Client::new(&config);
client
.delete_object()
.bucket(bucket)
.key(key)
.send()
.await
.unwrap();
}
#[tokio::test]
async fn test_s3_simple() {
do_s3_test("s3-test").await;
}
#[tokio::test]
async fn test_s3_whitespace() {
do_s3_test("s3 test").await;
}
#[tokio::test]
async fn test_s3_unicode() {
do_s3_test("s3 🧪 😱").await;
}
} }