diff --git a/net/aws/src/s3sink/imp.rs b/net/aws/src/s3sink/imp.rs index 4241b890..5b4ae85b 100644 --- a/net/aws/src/s3sink/imp.rs +++ b/net/aws/src/s3sink/imp.rs @@ -88,6 +88,7 @@ impl Started { enum State { Stopped, + Completed, Started(Started), } @@ -185,12 +186,68 @@ static CAT: Lazy = Lazy::new(|| { }); impl S3Sink { + fn flush_multipart_upload(&self, state: &mut Started) { + let settings = self.settings.lock().unwrap(); + match settings.multipart_upload_on_error { + OnError::Abort => { + gst::log!( + CAT, + imp: self, + "Aborting multipart upload request with id: {}", + state.upload_id + ); + match self.abort_multipart_upload_request(state) { + Ok(()) => { + gst::log!( + CAT, + imp: self, + "Aborting multipart upload request succeeded." + ); + } + Err(err) => gst::error!( + CAT, + imp: self, + "Aborting multipart upload failed: {}", + err.to_string() + ), + } + } + OnError::Complete => { + gst::log!( + CAT, + imp: self, + "Completing multipart upload request with id: {}", + state.upload_id + ); + match self.complete_multipart_upload_request(state) { + Ok(()) => { + gst::log!( + CAT, + imp: self, + "Complete multipart upload request succeeded." + ); + } + Err(err) => gst::error!( + CAT, + imp: self, + "Completing multipart upload failed: {}", + err.to_string() + ), + } + } + OnError::DoNothing => (), + } + } + fn flush_current_buffer(&self) -> Result<(), Option> { let upload_part_req: UploadPart = self.create_upload_part_request()?; let mut state = self.state.lock().unwrap(); let state = match *state { State::Started(ref mut started_state) => started_state, + State::Completed => { + unreachable!("Upload should not be completed yet"); + } State::Stopped => { unreachable!("Element should be started"); } @@ -202,56 +259,7 @@ impl S3Sink { let output = s3utils::wait(&self.canceller, upload_part_req_future).map_err(|err| match err { WaitError::FutureError(err) => { - let settings = self.settings.lock().unwrap(); - match settings.multipart_upload_on_error { - OnError::Abort => { - gst::log!( - CAT, - imp: self, - "Aborting multipart upload request with id: {}", - state.upload_id - ); - match self.abort_multipart_upload_request(state) { - Ok(()) => { - gst::log!( - CAT, - imp: self, - "Aborting multipart upload request succeeded." - ); - } - Err(err) => gst::error!( - CAT, - imp: self, - "Aborting multipart upload failed: {}", - err.to_string() - ), - } - } - OnError::Complete => { - gst::log!( - CAT, - imp: self, - "Completing multipart upload request with id: {}", - state.upload_id - ); - match self.complete_multipart_upload_request(state) { - Ok(()) => { - gst::log!( - CAT, - imp: self, - "Complete multipart upload request succeeded." - ); - } - Err(err) => gst::error!( - CAT, - imp: self, - "Completing multipart upload failed: {}", - err.to_string() - ), - } - } - OnError::DoNothing => (), - } + self.flush_multipart_upload(state); Some(gst::error_msg!( gst::ResourceError::OpenWrite, ["Failed to upload part: {}", err] @@ -277,6 +285,9 @@ impl S3Sink { let mut state = self.state.lock().unwrap(); let state = match *state { State::Started(ref mut started_state) => started_state, + State::Completed => { + unreachable!("Upload should not be completed yet"); + } State::Stopped => { unreachable!("Element should be started"); } @@ -437,12 +448,21 @@ impl S3Sink { let mut state = self.state.lock().unwrap(); let started_state = match *state { State::Started(ref mut started_state) => started_state, + State::Completed => { + unreachable!("Upload should not be completed yet"); + } State::Stopped => { unreachable!("Element should be started"); } }; - self.complete_multipart_upload_request(started_state) + let res = self.complete_multipart_upload_request(started_state); + + if res.is_ok() { + *state = State::Completed; + } + + res } fn start(&self) -> Result<(), gst::ErrorMessage> { @@ -562,6 +582,9 @@ impl S3Sink { let mut state = self.state.lock().unwrap(); let started_state = match *state { State::Started(ref mut started_state) => started_state, + State::Completed => { + unreachable!("Upload should not be completed yet"); + } State::Stopped => { unreachable!("Element should be started already"); } @@ -953,6 +976,17 @@ impl BaseSinkImpl for S3Sink { fn stop(&self) -> Result<(), gst::ErrorMessage> { let mut state = self.state.lock().unwrap(); + + if let State::Started(ref mut state) = *state { + gst::warning!(CAT, imp: self, "Stopped without EOS"); + + // We're stopping without an EOS -- treat this as an error and deal with the open + // multipart upload accordingly _if_ we managed to upload any parts + if !state.completed_parts.is_empty() { + self.flush_multipart_upload(state); + } + } + *state = State::Stopped; gst::info!(CAT, imp: self, "Stopped"); @@ -965,6 +999,15 @@ impl BaseSinkImpl for S3Sink { return Err(gst::FlowError::Error); } + if let State::Completed = *self.state.lock().unwrap() { + gst::element_imp_error!( + self, + gst::CoreError::Failed, + ["Trying to render after upload complete"] + ); + return Err(gst::FlowError::Error); + } + gst::trace!(CAT, imp: self, "Rendering {:?}", buffer); let map = buffer.map_readable().map_err(|_| { gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]);