webrtcsink: Propagate GstContext messages

Implement CustomBusStream so that NEED_CONTEXT and HAVE_CONTEXT
messages from session/discovery can be forwarded to parent
pipeline and also GstContext can be shared.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1322>
This commit is contained in:
Seungha Yang 2023-09-10 00:17:18 +09:00
parent 1de7754616
commit 225482f7ed

View file

@ -118,6 +118,66 @@ impl DiscoveryInfo {
}
}
// Same gst::bus::BusStream but hooking context message from the thread
// where the message is posted, so that GstContext can be shared
#[derive(Debug)]
struct CustomBusStream {
bus: glib::WeakRef<gst::Bus>,
receiver: futures::channel::mpsc::UnboundedReceiver<gst::Message>,
}
impl CustomBusStream {
fn new(bin: &super::BaseWebRTCSink, bus: &gst::Bus) -> Self {
let (sender, receiver) = futures::channel::mpsc::unbounded();
let bin_weak = bin.downgrade();
bus.set_sync_handler(move |_, msg| {
match msg.view() {
gst::MessageView::NeedContext(..) | gst::MessageView::HaveContext(..) => {
if let Some(bin) = bin_weak.upgrade() {
let _ = bin.post_message(msg.to_owned());
}
}
_ => {
let _ = sender.unbounded_send(msg.to_owned());
}
}
gst::BusSyncReply::Drop
});
Self {
bus: bus.downgrade(),
receiver,
}
}
}
impl Drop for CustomBusStream {
fn drop(&mut self) {
if let Some(bus) = self.bus.upgrade() {
bus.unset_sync_handler();
}
}
}
impl futures::Stream for CustomBusStream {
type Item = gst::Message;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
context: &mut std::task::Context,
) -> std::task::Poll<Option<Self::Item>> {
self.receiver.poll_next_unpin(context)
}
}
impl futures::stream::FusedStream for CustomBusStream {
fn is_terminated(&self) -> bool {
self.receiver.is_terminated()
}
}
/// Wrapper around our sink pads
#[derive(Debug, Clone)]
struct InputStream {
@ -2241,7 +2301,8 @@ impl BaseWebRTCSink {
pipeline.set_start_time(gst::ClockTime::NONE);
pipeline.set_base_time(element.base_time().unwrap());
let mut bus_stream = pipeline.bus().unwrap().stream();
let bus = pipeline.bus().unwrap();
let mut bus_stream = CustomBusStream::new(&element, &bus);
let element_clone = element.downgrade();
let pipeline_clone = pipeline.downgrade();
let session_id_clone = session_id.clone();
@ -2851,7 +2912,8 @@ impl BaseWebRTCSink {
.link(&sink)
.with_context(|| format!("Running discovery pipeline for caps {input_caps}"))?;
let mut stream = pipe.0.bus().unwrap().stream();
let bus = pipe.0.bus().unwrap();
let mut stream = CustomBusStream::new(element, &bus);
pipe.0
.set_state(gst::State::Playing)