aws: s3sink: Treat stopping without EOS as an error for multipart upload

This allows us to try to clean up based on configuration (abort /
complete / do nothing) if the pipeline is shut down without an EOS.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/970>
This commit is contained in:
Arun Raghavan 2022-11-07 14:53:15 -05:00 committed by Arun Raghavan
parent bf9f7a747e
commit 3abd13e57b

View file

@ -88,6 +88,7 @@ impl Started {
enum State {
Stopped,
Completed,
Started(Started),
}
@ -185,12 +186,68 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
});
impl S3Sink {
fn flush_multipart_upload(&self, state: &mut Started) {
let settings = self.settings.lock().unwrap();
match settings.multipart_upload_on_error {
OnError::Abort => {
gst::log!(
CAT,
imp: self,
"Aborting multipart upload request with id: {}",
state.upload_id
);
match self.abort_multipart_upload_request(state) {
Ok(()) => {
gst::log!(
CAT,
imp: self,
"Aborting multipart upload request succeeded."
);
}
Err(err) => gst::error!(
CAT,
imp: self,
"Aborting multipart upload failed: {}",
err.to_string()
),
}
}
OnError::Complete => {
gst::log!(
CAT,
imp: self,
"Completing multipart upload request with id: {}",
state.upload_id
);
match self.complete_multipart_upload_request(state) {
Ok(()) => {
gst::log!(
CAT,
imp: self,
"Complete multipart upload request succeeded."
);
}
Err(err) => gst::error!(
CAT,
imp: self,
"Completing multipart upload failed: {}",
err.to_string()
),
}
}
OnError::DoNothing => (),
}
}
fn flush_current_buffer(&self) -> Result<(), Option<gst::ErrorMessage>> {
let upload_part_req: UploadPart = self.create_upload_part_request()?;
let mut state = self.state.lock().unwrap();
let state = match *state {
State::Started(ref mut started_state) => started_state,
State::Completed => {
unreachable!("Upload should not be completed yet");
}
State::Stopped => {
unreachable!("Element should be started");
}
@ -202,56 +259,7 @@ impl S3Sink {
let output =
s3utils::wait(&self.canceller, upload_part_req_future).map_err(|err| match err {
WaitError::FutureError(err) => {
let settings = self.settings.lock().unwrap();
match settings.multipart_upload_on_error {
OnError::Abort => {
gst::log!(
CAT,
imp: self,
"Aborting multipart upload request with id: {}",
state.upload_id
);
match self.abort_multipart_upload_request(state) {
Ok(()) => {
gst::log!(
CAT,
imp: self,
"Aborting multipart upload request succeeded."
);
}
Err(err) => gst::error!(
CAT,
imp: self,
"Aborting multipart upload failed: {}",
err.to_string()
),
}
}
OnError::Complete => {
gst::log!(
CAT,
imp: self,
"Completing multipart upload request with id: {}",
state.upload_id
);
match self.complete_multipart_upload_request(state) {
Ok(()) => {
gst::log!(
CAT,
imp: self,
"Complete multipart upload request succeeded."
);
}
Err(err) => gst::error!(
CAT,
imp: self,
"Completing multipart upload failed: {}",
err.to_string()
),
}
}
OnError::DoNothing => (),
}
self.flush_multipart_upload(state);
Some(gst::error_msg!(
gst::ResourceError::OpenWrite,
["Failed to upload part: {}", err]
@ -277,6 +285,9 @@ impl S3Sink {
let mut state = self.state.lock().unwrap();
let state = match *state {
State::Started(ref mut started_state) => started_state,
State::Completed => {
unreachable!("Upload should not be completed yet");
}
State::Stopped => {
unreachable!("Element should be started");
}
@ -437,12 +448,21 @@ impl S3Sink {
let mut state = self.state.lock().unwrap();
let started_state = match *state {
State::Started(ref mut started_state) => started_state,
State::Completed => {
unreachable!("Upload should not be completed yet");
}
State::Stopped => {
unreachable!("Element should be started");
}
};
self.complete_multipart_upload_request(started_state)
let res = self.complete_multipart_upload_request(started_state);
if res.is_ok() {
*state = State::Completed;
}
res
}
fn start(&self) -> Result<(), gst::ErrorMessage> {
@ -562,6 +582,9 @@ impl S3Sink {
let mut state = self.state.lock().unwrap();
let started_state = match *state {
State::Started(ref mut started_state) => started_state,
State::Completed => {
unreachable!("Upload should not be completed yet");
}
State::Stopped => {
unreachable!("Element should be started already");
}
@ -953,6 +976,17 @@ impl BaseSinkImpl for S3Sink {
fn stop(&self) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
if let State::Started(ref mut state) = *state {
gst::warning!(CAT, imp: self, "Stopped without EOS");
// We're stopping without an EOS -- treat this as an error and deal with the open
// multipart upload accordingly _if_ we managed to upload any parts
if !state.completed_parts.is_empty() {
self.flush_multipart_upload(state);
}
}
*state = State::Stopped;
gst::info!(CAT, imp: self, "Stopped");
@ -965,6 +999,15 @@ impl BaseSinkImpl for S3Sink {
return Err(gst::FlowError::Error);
}
if let State::Completed = *self.state.lock().unwrap() {
gst::element_imp_error!(
self,
gst::CoreError::Failed,
["Trying to render after upload complete"]
);
return Err(gst::FlowError::Error);
}
gst::trace!(CAT, imp: self, "Rendering {:?}", buffer);
let map = buffer.map_readable().map_err(|_| {
gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]);