rusoto: s3sink: Bring back bucket, key and region properties

We don't want to drop these entirely while introducing the URI handler,
as that would break backwards compatibility.
This commit is contained in:
Arun Raghavan 2021-06-21 06:20:32 -04:00
parent 0592e46e65
commit 3a2d16f00c

View file

@ -9,11 +9,12 @@
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::{gst_error, gst_info, gst_trace};
use gst::{gst_debug, gst_error, gst_info, gst_trace};
use gst_base::subclass::prelude::*;
use futures::future;
use rusoto_core::region::Region;
use rusoto_s3::{
CompleteMultipartUploadRequest, CompletedMultipartUpload, CompletedPart,
CreateMultipartUploadRequest, S3Client, UploadPartRequest, S3,
@ -22,6 +23,7 @@ use rusoto_s3::{
use once_cell::sync::Lazy;
use std::convert::From;
use std::str::FromStr;
use std::sync::Mutex;
use crate::s3url::*;
@ -79,10 +81,36 @@ impl Default for State {
const DEFAULT_BUFFER_SIZE: u64 = 5 * 1024 * 1024;
struct Settings {
region: Region,
bucket: Option<String>,
key: Option<String>,
content_type: Option<String>,
buffer_size: u64,
}
impl Settings {
fn to_uri(&self) -> String {
format!(
"s3://{}/{}/{}",
self.region.name(),
self.bucket.as_ref().unwrap(),
self.key.as_ref().unwrap()
)
}
}
impl Default for Settings {
fn default() -> Self {
Settings {
region: Region::default(),
bucket: None,
key: None,
content_type: None,
buffer_size: DEFAULT_BUFFER_SIZE,
}
}
}
#[derive(Default)]
pub struct S3Sink {
url: Mutex<Option<GstS3Url>>,
@ -99,15 +127,6 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
)
});
impl Default for Settings {
fn default() -> Self {
Settings {
content_type: None,
buffer_size: DEFAULT_BUFFER_SIZE,
}
}
}
impl S3Sink {
fn flush_current_buffer(
&self,
@ -329,7 +348,11 @@ impl S3Sink {
};
}
fn set_uri(self: &S3Sink, _: &super::S3Sink, url_str: Option<&str>) -> Result<(), glib::Error> {
fn set_uri(
self: &S3Sink,
object: &super::S3Sink,
url_str: Option<&str>,
) -> Result<(), glib::Error> {
let state = self.state.lock().unwrap();
if let State::Started { .. } = *state {
@ -346,6 +369,8 @@ impl S3Sink {
return Ok(());
}
gst_debug!(CAT, obj: object, "Setting uri to {:?}", url_str);
let url_str = url_str.unwrap();
match parse_s3_url(url_str) {
Ok(s3url) => {
@ -372,6 +397,27 @@ impl ObjectImpl for S3Sink {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpec::new_string(
"bucket",
"S3 Bucket",
"The bucket of the file to write",
None,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpec::new_string(
"key",
"S3 Key",
"The key of the file to write",
None,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpec::new_string(
"region",
"AWS Region",
"An AWS region (e.g. eu-west-2).",
None,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpec::new_uint64(
"part-size",
"Part size",
@ -404,6 +450,30 @@ impl ObjectImpl for S3Sink {
let mut settings = self.settings.lock().unwrap();
match pspec.name() {
"bucket" => {
settings.bucket = value
.get::<Option<String>>()
.expect("type checked upstream");
if settings.key.is_some() {
let _ = self.set_uri(obj, Some(&settings.to_uri()));
}
}
"key" => {
settings.key = value
.get::<Option<String>>()
.expect("type checked upstream");
if settings.bucket.is_some() {
let _ = self.set_uri(obj, Some(&settings.to_uri()));
}
}
"region" => {
settings.region =
Region::from_str(&value.get::<String>().expect("type checked upstream"))
.unwrap();
if settings.key.is_some() && settings.bucket.is_some() {
let _ = self.set_uri(obj, Some(&settings.to_uri()));
}
}
"part-size" => {
settings.buffer_size = value.get::<u64>().expect("type checked upstream");
}
@ -418,6 +488,9 @@ impl ObjectImpl for S3Sink {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"key" => settings.key.to_value(),
"bucket" => settings.bucket.to_value(),
"region" => settings.region.name().to_value(),
"part-size" => settings.buffer_size.to_value(),
"uri" => {
let url = match *self.url.lock().unwrap() {