livesync: Clean up state handling

- Separate resetting state more cleanly, introducing `set_flushing`,
  `sink_reset` and `src_reset`.
- Clear the queue early when we flush, in order to unblock waits on
  query responses.
- Return an error when we fail to start, pause or stop the task.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1369>
This commit is contained in:
Jan Alexander Steffens (heftig) 2023-10-24 23:53:17 +02:00
parent d663f708ef
commit 62791bfb47

View file

@ -428,9 +428,21 @@ impl ElementImpl for LiveSync {
let success = self.parent_change_state(transition)?;
if transition == gst::StateChange::PlayingToPaused {
let mut state = self.state.lock();
state.playing = false;
match transition {
gst::StateChange::PlayingToPaused => {
let mut state = self.state.lock();
state.playing = false;
}
gst::StateChange::PausedToReady => {
let mut state = self.state.lock();
state.num_in = 0;
state.num_drop = 0;
state.num_out = 0;
state.num_duplicate = 0;
}
_ => {}
}
match (transition, success) {
@ -506,36 +518,11 @@ impl LiveSync {
return Err(gst::loggable_error!(CAT, "Wrong scheduling mode"));
}
if active {
let mut state = self.state.lock();
state.srcresult = Ok(gst::FlowSuccess::Ok);
state.eos = false;
state.in_timestamp = None;
state.num_in = 0;
state.num_drop = 0;
state.in_segment = None;
} else {
{
let mut state = self.state.lock();
state.srcresult = Err(gst::FlowError::Flushing);
if let Some(clock_id) = state.clock_id.take() {
clock_id.unschedule();
}
state.pending_caps = None;
state.out_audio_info = None;
state.out_buffer = None;
self.cond.notify_all();
}
if !active {
self.set_flushing(&mut self.state.lock());
let lock = pad.stream_lock();
{
let mut state = self.state.lock();
state.in_caps = None;
state.in_audio_info = None;
state.queue.clear();
state.buffer_queued = false;
state.update_fallback_duration();
}
self.sink_reset(&mut self.state.lock());
drop(lock);
}
@ -553,37 +540,49 @@ impl LiveSync {
}
if active {
let ret;
{
let mut state = self.state.lock();
state.srcresult = Ok(gst::FlowSuccess::Ok);
state.pending_segment = None;
state.out_segment = None;
state.out_timestamp = None;
state.num_out = 0;
state.num_duplicate = 0;
ret = self.start_src_task().map_err(Into::into);
}
ret
self.start_src_task(&mut self.state.lock())
.map_err(|e| gst::LoggableError::new(*CAT, e))?;
} else {
{
let mut state = self.state.lock();
state.srcresult = Err(gst::FlowError::Flushing);
if let Some(clock_id) = state.clock_id.take() {
clock_id.unschedule();
}
state.pending_caps = None;
state.out_audio_info = None;
state.out_buffer = None;
self.cond.notify_all();
}
let mut state = self.state.lock();
self.set_flushing(&mut state);
self.src_reset(&mut state);
drop(state);
pad.stop_task().map_err(Into::into)
pad.stop_task()?;
}
Ok(())
}
fn set_flushing(&self, state: &mut State) {
state.srcresult = Err(gst::FlowError::Flushing);
if let Some(clock_id) = state.clock_id.take() {
clock_id.unschedule();
}
// Ensure we drop any query response sender to unblock the sinkpad
state.queue.clear();
state.buffer_queued = false;
self.cond.notify_all();
}
fn sink_reset(&self, state: &mut State) {
state.eos = false;
state.in_segment = None;
state.in_caps = None;
state.in_audio_info = None;
state.in_timestamp = None;
state.update_fallback_duration();
}
fn src_reset(&self, state: &mut State) {
state.pending_segment = None;
state.out_segment = None;
state.pending_caps = None;
state.out_audio_info = None;
state.out_buffer = None;
state.out_timestamp = None;
}
fn sink_event(&self, pad: &gst::Pad, mut event: gst::Event) -> bool {
@ -603,16 +602,13 @@ impl LiveSync {
gst::EventView::FlushStart(_) => {
let ret = self.srcpad.push_event(event);
{
let mut state = self.state.lock();
state.srcresult = Err(gst::FlowError::Flushing);
if let Some(clock_id) = state.clock_id.take() {
clock_id.unschedule();
}
self.cond.notify_all();
self.set_flushing(&mut self.state.lock());
if let Err(e) = self.srcpad.pause_task() {
gst::error!(CAT, imp: self, "Failed to pause task: {e}");
return false;
}
let _ = self.srcpad.pause_task();
return ret;
}
@ -620,21 +616,14 @@ impl LiveSync {
let ret = self.srcpad.push_event(event);
let mut state = self.state.lock();
state.srcresult = Ok(gst::FlowSuccess::Ok);
state.eos = false;
state.in_segment = None;
state.pending_segment = None;
state.out_segment = None;
state.in_caps = None;
state.pending_caps = None;
state.in_audio_info = None;
state.out_audio_info = None;
state.queue.clear();
state.buffer_queued = false;
state.out_buffer = None;
state.update_fallback_duration();
self.sink_reset(&mut state);
self.src_reset(&mut state);
if let Err(e) = self.start_src_task(&mut state) {
gst::error!(CAT, imp: self, "Failed to start task: {e}");
return false;
}
let _ = self.start_src_task();
return ret;
}
@ -689,12 +678,14 @@ impl LiveSync {
let mut state = self.state.lock();
if is_restart {
if state.srcresult == Err(gst::FlowError::Eos) {
state.srcresult = Ok(gst::FlowSuccess::Ok);
}
state.eos = false;
let _ = self.start_src_task();
if state.srcresult == Err(gst::FlowError::Eos) {
if let Err(e) = self.start_src_task(&mut state) {
gst::error!(CAT, imp: self, "Failed to start task: {e}");
return false;
}
}
}
if state.eos {
@ -739,10 +730,12 @@ impl LiveSync {
{
let mut state = self.state.lock();
if state.srcresult == Err(gst::FlowError::NotLinked) {
state.srcresult = Ok(gst::FlowSuccess::Ok);
let _ = self.start_src_task();
if let Err(e) = self.start_src_task(&mut state) {
gst::error!(CAT, imp: self, "Failed to start task: {e}");
}
}
}
self.sinkpad.push_event(event)
}
@ -766,6 +759,7 @@ impl LiveSync {
self.cond.notify_all();
drop(state);
// If the sender gets dropped, we will also unblock
receiver.recv().unwrap_or(false)
} else {
gst::Pad::query_default(pad, Some(&*self.obj()), query)
@ -977,8 +971,16 @@ impl LiveSync {
}
fn start_src_task(&self, state: &mut State) -> Result<(), glib::BoolError> {
state.srcresult = Ok(gst::FlowSuccess::Ok);
let imp = self.ref_counted();
self.srcpad.start_task(move || imp.src_loop())
let ret = self.srcpad.start_task(move || imp.src_loop());
if ret.is_err() {
state.srcresult = Err(gst::FlowError::Error);
}
ret
}
fn src_loop(&self) {
@ -1278,15 +1280,16 @@ impl LiveSync {
None => return false,
};
let slack = state
.out_buffer
.as_deref()
.map_or(gst::ClockTime::ZERO, |b| b.duration().unwrap());
// When out_timestamp is set, we also have an out_buffer
let slack = state.out_buffer.as_deref().unwrap().duration().unwrap();
if timestamp.start < out_timestamp.end + slack {
return false;
}
// This buffer would start beyond another buffer duration after our
// last emitted buffer ended
gst::debug!(
CAT,
imp: self,