ts-queue: split up enqueue_item

This is far from a full refactoring, but it makes the current
implementation slightly easier to understand
This commit is contained in:
Mathieu Duponchelle 2019-08-09 04:29:10 +02:00 committed by Sebastian Dröge
parent f2dca12de6
commit 892c812669

View file

@ -172,6 +172,126 @@ impl Queue {
}
}
/* Try transfering all the items from the pending queue to the DataQueue, then
* the current item. Errors out if the DataQueue was full, or the pending queue
* is already scheduled, in which case the current item should be added to the
* pending queue */
fn queue_until_full(
&self,
queue: &DataQueue,
pending_queue: &mut Option<(Option<task::Task>, bool, VecDeque<DataQueueItem>)>,
item: DataQueueItem,
) -> Result<(), DataQueueItem> {
match pending_queue {
None => queue.push(item),
Some((_, false, ref mut items)) => {
let mut failed_item = None;
while let Some(item) = items.pop_front() {
if let Err(item) = queue.push(item) {
failed_item = Some(item);
}
}
if let Some(failed_item) = failed_item {
items.push_front(failed_item);
Err(item)
} else {
queue.push(item)
}
}
_ => Err(item),
}
}
/* Schedules emptying of the pending queue. If there is an upstream
* io context, the new pending future is added to it, it is otherwise
* returned, for the caller to block on */
fn schedule_pending_queue(
&self,
element: &gst::Element,
state: &mut State,
) -> Option<impl Future<Item = (), Error = ()>> {
gst_log!(self.cat, obj: element, "Scheduling pending queue now");
let State {
queue: _,
ref mut pending_queue,
ref io_context_in,
pending_future_id_in,
..
} = *state;
pending_queue.as_mut().unwrap().1 = true;
let element_clone = element.clone();
let future = future::poll_fn(move || {
let queue = Self::from_instance(&element_clone);
let mut state = queue.state.lock().unwrap();
let State {
queue: ref dq,
ref mut pending_queue,
..
} = *state;
if dq.is_none() {
return Ok(Async::Ready(()));
}
gst_log!(
queue.cat,
obj: &element_clone,
"Trying to empty pending queue"
);
let res = if let Some((ref mut task, _, ref mut items)) = *pending_queue {
let mut failed_item = None;
while let Some(item) = items.pop_front() {
if let Err(item) = dq.as_ref().unwrap().push(item) {
failed_item = Some(item);
}
}
if let Some(failed_item) = failed_item {
items.push_front(failed_item);
*task = Some(task::current());
gst_log!(
queue.cat,
obj: &element_clone,
"Waiting for more queue space"
);
Ok(Async::NotReady)
} else {
gst_log!(queue.cat, obj: &element_clone, "Pending queue is empty now");
Ok(Async::Ready(()))
}
} else {
gst_log!(
queue.cat,
obj: &element_clone,
"Flushing, dropping pending queue"
);
Ok(Async::Ready(()))
};
if res == Ok(Async::Ready(())) {
*pending_queue = None;
}
res
});
if let (Some(io_context_in), Some(pending_future_id_in)) =
(io_context_in.as_ref(), pending_future_id_in.as_ref())
{
io_context_in.add_pending_future(*pending_future_id_in, future);
None
} else {
Some(future)
}
}
fn enqueue_item(
&self,
_pad: &gst::Pad,
@ -184,34 +304,12 @@ impl Queue {
ref queue,
ref mut pending_queue,
ref io_context_in,
pending_future_id_in,
..
} = *state;
let queue = queue.as_ref().ok_or(gst::FlowError::Error)?;
let item = match pending_queue {
None => queue.push(item),
Some((_, false, ref mut items)) => {
let mut failed_item = None;
while let Some(item) = items.pop_front() {
if let Err(item) = queue.push(item) {
failed_item = Some(item);
}
}
if let Some(failed_item) = failed_item {
items.push_front(failed_item);
Err(item)
} else {
queue.push(item)
}
}
_ => Err(item),
};
if let Err(item) = item {
if let Err(item) = self.queue_until_full(queue, pending_queue, item) {
if pending_queue
.as_ref()
.map(|(_, scheduled, _)| !scheduled)
@ -237,80 +335,7 @@ impl Queue {
);
if schedule_now {
gst_log!(self.cat, obj: element, "Scheduling pending queue now");
pending_queue.as_mut().unwrap().1 = true;
let element_clone = element.clone();
let future = future::poll_fn(move || {
let queue = Self::from_instance(&element_clone);
let mut state = queue.state.lock().unwrap();
let State {
queue: ref dq,
ref mut pending_queue,
..
} = *state;
if dq.is_none() {
return Ok(Async::Ready(()));
}
gst_log!(
queue.cat,
obj: &element_clone,
"Trying to empty pending queue"
);
let res = if let Some((ref mut task, _, ref mut items)) = *pending_queue
{
let mut failed_item = None;
while let Some(item) = items.pop_front() {
if let Err(item) = dq.as_ref().unwrap().push(item) {
failed_item = Some(item);
}
}
if let Some(failed_item) = failed_item {
items.push_front(failed_item);
*task = Some(task::current());
gst_log!(
queue.cat,
obj: &element_clone,
"Waiting for more queue space"
);
Ok(Async::NotReady)
} else {
gst_log!(
queue.cat,
obj: &element_clone,
"Pending queue is empty now"
);
Ok(Async::Ready(()))
}
} else {
gst_log!(
queue.cat,
obj: &element_clone,
"Flushing, dropping pending queue"
);
Ok(Async::Ready(()))
};
if res == Ok(Async::Ready(())) {
*pending_queue = None;
}
res
});
if let (Some(io_context_in), Some(pending_future_id_in)) =
(io_context_in.as_ref(), pending_future_id_in.as_ref())
{
io_context_in.add_pending_future(*pending_future_id_in, future);
None
} else {
Some(future)
}
self.schedule_pending_queue(element, &mut state)
} else {
gst_log!(self.cat, obj: element, "Scheduling pending queue later");
@ -328,12 +353,16 @@ impl Queue {
};
if let Some(wait_future) = wait_future {
gst_log!(self.cat, obj: element, "Blocking until queue becomes empty");
gst_log!(
self.cat,
obj: element,
"Blocking until queue has space again"
);
executor::current_thread::block_on_all(wait_future).map_err(|_| {
gst_element_error!(
element,
gst::StreamError::Failed,
["failed to wait for queue to become empty again"]
["failed to wait for queue to have space again"]
);
gst::FlowError::Error
})?;