From 5f19639d0f38227c1b3fef275304d6284514eac5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 13 Oct 2022 11:51:55 +0300 Subject: [PATCH] ndi: Various code cleanup --- net/ndi/src/device_provider/imp.rs | 53 ++++++-------- net/ndi/src/ndisink/imp.rs | 38 +++++----- net/ndi/src/ndisinkcombiner/imp.rs | 88 +++++++++++------------ net/ndi/src/ndisrc/imp.rs | 75 ++++++++++--------- net/ndi/src/ndisrc/receiver.rs | 111 ++++++++++++++--------------- net/ndi/src/ndisrcdemux/imp.rs | 55 +++++++------- 6 files changed, 205 insertions(+), 215 deletions(-) diff --git a/net/ndi/src/device_provider/imp.rs b/net/ndi/src/device_provider/imp.rs index 7cc4bd69..87aa2bb1 100644 --- a/net/ndi/src/device_provider/imp.rs +++ b/net/ndi/src/device_provider/imp.rs @@ -2,7 +2,6 @@ use gst::prelude::*; use gst::subclass::prelude::*; -use gst::{error, log, trace}; use once_cell::sync::OnceCell; @@ -77,33 +76,31 @@ impl DeviceProviderImpl for DeviceProvider { } let mut thread_guard = self.thread.lock().unwrap(); - let device_provider = self.instance(); if thread_guard.is_some() { - log!(CAT, obj: device_provider, "Device provider already started"); + gst::log!(CAT, imp: self, "Device provider already started"); return Ok(()); } self.is_running.store(true, atomic::Ordering::SeqCst); - let device_provider_weak = device_provider.downgrade(); + let imp_weak = self.downgrade(); let mut first = true; *thread_guard = Some(thread::spawn(move || { - let device_provider = match device_provider_weak.upgrade() { - None => return, - Some(device_provider) => device_provider, - }; - - let imp = DeviceProvider::from_instance(&device_provider); { + let imp = match imp_weak.upgrade() { + None => return, + Some(imp) => imp, + }; + let mut find_guard = imp.find.lock().unwrap(); if find_guard.is_some() { - log!(CAT, obj: &device_provider, "Already started"); + gst::log!(CAT, imp: imp, "Already started"); return; } let find = match ndi::FindInstance::builder().build() { None => { - error!(CAT, obj: &device_provider, "Failed to create Find instance"); + gst::error!(CAT, imp: imp, "Failed to create Find instance"); return; } Some(find) => find, @@ -112,17 +109,16 @@ impl DeviceProviderImpl for DeviceProvider { } loop { - let device_provider = match device_provider_weak.upgrade() { - None => break, - Some(device_provider) => device_provider, + let imp = match imp_weak.upgrade() { + None => return, + Some(imp) => imp, }; - let imp = DeviceProvider::from_instance(&device_provider); if !imp.is_running.load(atomic::Ordering::SeqCst) { break; } - imp.poll(&device_provider, first); + imp.poll(first); first = false; } })); @@ -139,7 +135,7 @@ impl DeviceProviderImpl for DeviceProvider { } impl DeviceProvider { - fn poll(&self, device_provider: &super::DeviceProvider, first: bool) { + fn poll(&self, first: bool) { let mut find_guard = self.find.lock().unwrap(); let find = match *find_guard { None => return, @@ -147,7 +143,7 @@ impl DeviceProvider { }; if !find.wait_for_sources(if first { 1000 } else { 5000 }) { - trace!(CAT, obj: device_provider, "No new sources found"); + gst::trace!(CAT, imp: self, "No new sources found"); return; } @@ -160,16 +156,11 @@ impl DeviceProvider { // First check for each device we previously knew if it's still available for old_device in &*current_devices_guard { - let old_device_imp = Device::from_instance(old_device); + let old_device_imp = old_device.imp(); let old_source = old_device_imp.source.get().unwrap(); if !sources.contains(old_source) { - log!( - CAT, - obj: device_provider, - "Source {:?} disappeared", - old_source - ); + gst::log!(CAT, imp: self, "Source {:?} disappeared", old_source); expired_devices.push(old_device.clone()); } else { // Otherwise remember that we had it before already and don't have to announce it @@ -186,14 +177,14 @@ impl DeviceProvider { current_devices_guard.retain(|d| !expired_devices.contains(d)); // And also notify the device provider of them having disappeared for old_device in expired_devices { - device_provider.device_remove(&old_device); + self.instance().device_remove(&old_device); } // Now go through all new devices and announce them for source in sources { - log!(CAT, obj: device_provider, "Source {:?} appeared", source); + gst::log!(CAT, imp: self, "Source {:?} appeared", source); let device = super::Device::new(&source); - device_provider.device_add(&device); + self.instance().device_add(&device); current_devices_guard.push(device); } } @@ -261,9 +252,9 @@ impl super::Device { ("device-class", &device_class), ("properties", &extra_properties), ]); - let device_impl = Device::from_instance(&device); - device_impl.source.set(source.to_owned()).unwrap(); + let imp = device.imp(); + imp.source.set(source.to_owned()).unwrap(); device } diff --git a/net/ndi/src/ndisink/imp.rs b/net/ndi/src/ndisink/imp.rs index 5dd95800..f2ec7492 100644 --- a/net/ndi/src/ndisink/imp.rs +++ b/net/ndi/src/ndisink/imp.rs @@ -3,7 +3,6 @@ use glib::subclass::prelude::*; use gst::prelude::*; use gst::subclass::prelude::*; -use gst::{debug, error, info, trace}; use gst_base::prelude::*; use gst_base::subclass::prelude::*; @@ -207,7 +206,7 @@ impl BaseSinkImpl for NdiSink { audio_info: None, }; *state_storage = Some(state); - info!(CAT, obj: self.instance(), "Started"); + gst::info!(CAT, imp: self, "Started"); Ok(()) } @@ -216,7 +215,7 @@ impl BaseSinkImpl for NdiSink { let mut state_storage = self.state.lock().unwrap(); *state_storage = None; - info!(CAT, obj: self.instance(), "Stopped"); + gst::info!(CAT, imp: self, "Stopped"); Ok(()) } @@ -230,7 +229,7 @@ impl BaseSinkImpl for NdiSink { } fn set_caps(&self, caps: &gst::Caps) -> Result<(), gst::LoggableError> { - debug!(CAT, obj: self.instance(), "Setting caps {}", caps); + gst::debug!(CAT, imp: self, "Setting caps {}", caps); let mut state_storage = self.state.lock().unwrap(); let state = match &mut *state_storage { @@ -257,7 +256,6 @@ impl BaseSinkImpl for NdiSink { } fn render(&self, buffer: &gst::Buffer) -> Result { - let element = self.instance(); let mut state_storage = self.state.lock().unwrap(); let state = match &mut *state_storage { None => return Err(gst::FlowError::Error), @@ -269,13 +267,13 @@ impl BaseSinkImpl for NdiSink { for (buffer, info, timecode) in audio_meta.buffers() { let frame = crate::ndi::AudioFrame::try_from_buffer(info, buffer, *timecode) .map_err(|_| { - error!(CAT, obj: element, "Unsupported audio frame"); + gst::error!(CAT, imp: self, "Unsupported audio frame"); gst::FlowError::NotNegotiated })?; - trace!( + gst::trace!( CAT, - obj: element, + imp: self, "Sending audio buffer {:?} with timecode {} and format {:?}", buffer, if *timecode < 0 { @@ -291,14 +289,15 @@ impl BaseSinkImpl for NdiSink { // Skip empty/gap buffers from ndisinkcombiner if buffer.size() != 0 { - let timecode = element + let timecode = self + .instance() .segment() .downcast::() .ok() .and_then(|segment| { segment .to_running_time(buffer.pts()) - .zip(element.base_time()) + .zip(self.instance().base_time()) }) .and_then(|(running_time, base_time)| running_time.checked_add(base_time)) .map(|time| (time.nseconds() / 100) as i64) @@ -306,19 +305,19 @@ impl BaseSinkImpl for NdiSink { let frame = gst_video::VideoFrameRef::from_buffer_ref_readable(buffer, info) .map_err(|_| { - error!(CAT, obj: element, "Failed to map buffer"); + gst::error!(CAT, imp: self, "Failed to map buffer"); gst::FlowError::Error })?; let frame = crate::ndi::VideoFrame::try_from_video_frame(&frame, timecode) .map_err(|_| { - error!(CAT, obj: element, "Unsupported video frame"); + gst::error!(CAT, imp: self, "Unsupported video frame"); gst::FlowError::NotNegotiated })?; - trace!( + gst::trace!( CAT, - obj: element, + imp: self, "Sending video buffer {:?} with timecode {} and format {:?}", buffer, if timecode < 0 { @@ -331,14 +330,15 @@ impl BaseSinkImpl for NdiSink { state.send.send_video(&frame); } } else if let Some(ref info) = state.audio_info { - let timecode = element + let timecode = self + .instance() .segment() .downcast::() .ok() .and_then(|segment| { segment .to_running_time(buffer.pts()) - .zip(element.base_time()) + .zip(self.instance().base_time()) }) .and_then(|(running_time, base_time)| running_time.checked_add(base_time)) .map(|time| (time.nseconds() / 100) as i64) @@ -346,13 +346,13 @@ impl BaseSinkImpl for NdiSink { let frame = crate::ndi::AudioFrame::try_from_buffer(info, buffer, timecode).map_err(|_| { - error!(CAT, obj: element, "Unsupported audio frame"); + gst::error!(CAT, imp: self, "Unsupported audio frame"); gst::FlowError::NotNegotiated })?; - trace!( + gst::trace!( CAT, - obj: element, + imp: self, "Sending audio buffer {:?} with timecode {} and format {:?}", buffer, if timecode < 0 { diff --git a/net/ndi/src/ndisinkcombiner/imp.rs b/net/ndi/src/ndisinkcombiner/imp.rs index eb6a8263..040d8a1a 100644 --- a/net/ndi/src/ndisinkcombiner/imp.rs +++ b/net/ndi/src/ndisinkcombiner/imp.rs @@ -4,7 +4,6 @@ use glib::prelude::*; use glib::subclass::prelude::*; use gst::prelude::*; use gst::subclass::prelude::*; -use gst::{debug, error, trace, warning}; use gst_base::prelude::*; use gst_base::subclass::prelude::*; @@ -58,10 +57,10 @@ impl ObjectSubclass for NdiSinkCombiner { impl ObjectImpl for NdiSinkCombiner { fn constructed(&self) { + self.parent_constructed(); + let obj = self.instance(); obj.add_pad(&self.video_pad).unwrap(); - - self.parent_constructed(); } } @@ -141,7 +140,7 @@ impl ElementImpl for NdiSinkCombiner { let mut audio_pad_storage = self.audio_pad.lock().unwrap(); if audio_pad_storage.as_ref().map(|p| p.upcast_ref()) == Some(pad) { - debug!(CAT, obj: self.instance(), "Release audio pad"); + gst::debug!(CAT, obj: pad, "Release audio pad"); self.parent_release_pad(pad); *audio_pad_storage = None; } @@ -155,17 +154,16 @@ impl AggregatorImpl for NdiSinkCombiner { _req_name: Option<&str>, _caps: Option<&gst::Caps>, ) -> Option { - let agg = self.instance(); let mut audio_pad_storage = self.audio_pad.lock().unwrap(); if audio_pad_storage.is_some() { - error!(CAT, obj: agg, "Audio pad already requested"); + gst::error!(CAT, imp: self, "Audio pad already requested"); return None; } - let sink_templ = agg.pad_template("audio").unwrap(); + let sink_templ = self.instance().pad_template("audio").unwrap(); if templ != &sink_templ { - error!(CAT, obj: agg, "Wrong pad template"); + gst::error!(CAT, imp: self, "Wrong pad template"); return None; } @@ -173,7 +171,7 @@ impl AggregatorImpl for NdiSinkCombiner { gst::PadBuilder::::from_template(templ, Some("audio")).build(); *audio_pad_storage = Some(pad.clone()); - debug!(CAT, obj: agg, "Requested audio pad"); + gst::debug!(CAT, imp: self, "Requested audio pad"); Some(pad) } @@ -187,7 +185,7 @@ impl AggregatorImpl for NdiSinkCombiner { current_audio_buffers: Vec::new(), }); - debug!(CAT, obj: self.instance(), "Started"); + gst::debug!(CAT, imp: self, "Started"); Ok(()) } @@ -196,7 +194,7 @@ impl AggregatorImpl for NdiSinkCombiner { // Drop our state now let _ = self.state.lock().unwrap().take(); - debug!(CAT, obj: self.instance(), "Stopped"); + gst::debug!(CAT, imp: self, "Stopped"); Ok(()) } @@ -211,24 +209,23 @@ impl AggregatorImpl for NdiSinkCombiner { agg_pad: &gst_base::AggregatorPad, mut buffer: gst::Buffer, ) -> Option { - let agg = self.instance(); let segment = match agg_pad.segment().downcast::() { Ok(segment) => segment, Err(_) => { - error!(CAT, obj: agg, "Only TIME segments supported"); + gst::error!(CAT, obj: agg_pad, "Only TIME segments supported"); return Some(buffer); } }; let pts = buffer.pts(); if pts.is_none() { - error!(CAT, obj: agg, "Only buffers with PTS supported"); + gst::error!(CAT, obj: agg_pad, "Only buffers with PTS supported"); return Some(buffer); } let duration = buffer.duration(); - trace!( + gst::trace!( CAT, obj: agg_pad, "Clipping buffer {:?} with PTS {} and duration {}", @@ -263,7 +260,7 @@ impl AggregatorImpl for NdiSinkCombiner { unreachable!() }; - debug!( + gst::debug!( CAT, obj: agg_pad, "Clipping buffer {:?} with PTS {} and duration {}", @@ -307,7 +304,6 @@ impl AggregatorImpl for NdiSinkCombiner { // should be improved! assert!(!timeout); - let agg = self.instance(); // Because peek_buffer() can call into clip() and that would take the state lock again, // first try getting buffers from both pads here let video_buffer_and_segment = match self.video_pad.peek_buffer() { @@ -316,9 +312,9 @@ impl AggregatorImpl for NdiSinkCombiner { let video_segment = match video_segment.downcast::() { Ok(video_segment) => video_segment, Err(video_segment) => { - error!( + gst::error!( CAT, - obj: agg, + imp: self, "Video segment of wrong format {:?}", video_segment.format() ); @@ -329,7 +325,7 @@ impl AggregatorImpl for NdiSinkCombiner { Some((video_buffer, video_segment)) } None if !self.video_pad.is_eos() => { - trace!(CAT, obj: agg, "Waiting for video buffer"); + gst::trace!(CAT, imp: self, "Waiting for video buffer"); return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); } None => None, @@ -341,7 +337,7 @@ impl AggregatorImpl for NdiSinkCombiner { Some(audio_buffer) if audio_buffer.size() == 0 => { // Skip empty/gap audio buffer audio_pad.drop_buffer(); - trace!(CAT, obj: agg, "Empty audio buffer, waiting for next"); + gst::trace!(CAT, imp: self, "Empty audio buffer, waiting for next"); return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); } Some(audio_buffer) => { @@ -349,9 +345,9 @@ impl AggregatorImpl for NdiSinkCombiner { let audio_segment = match audio_segment.downcast::() { Ok(audio_segment) => audio_segment, Err(audio_segment) => { - error!( + gst::error!( CAT, - obj: agg, + imp: self, "Audio segment of wrong format {:?}", audio_segment.format() ); @@ -362,7 +358,7 @@ impl AggregatorImpl for NdiSinkCombiner { Some((audio_buffer, audio_segment, audio_pad)) } None if !audio_pad.is_eos() => { - trace!(CAT, obj: agg, "Waiting for audio buffer"); + gst::trace!(CAT, imp: self, "Waiting for audio buffer"); return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); } None => None, @@ -383,7 +379,7 @@ impl AggregatorImpl for NdiSinkCombiner { match state.current_video_buffer { None => { - trace!(CAT, obj: agg, "First video buffer, waiting for second"); + gst::trace!(CAT, imp: self, "First video buffer, waiting for second"); state.current_video_buffer = Some((video_buffer, video_running_time)); drop(state_storage); self.video_pad.drop_buffer(); @@ -398,9 +394,9 @@ impl AggregatorImpl for NdiSinkCombiner { } else { match (&state.current_video_buffer, &audio_buffer_segment_and_pad) { (None, None) => { - trace!( + gst::trace!( CAT, - obj: agg, + imp: self, "All pads are EOS and no buffers are queued, finishing" ); return Err(gst::FlowError::Eos); @@ -415,9 +411,9 @@ impl AggregatorImpl for NdiSinkCombiner { let video_segment = match video_segment.downcast::() { Ok(video_segment) => video_segment, Err(video_segment) => { - error!( + gst::error!( CAT, - obj: agg, + imp: self, "Video segment of wrong format {:?}", video_segment.format() ); @@ -427,7 +423,11 @@ impl AggregatorImpl for NdiSinkCombiner { let video_pts = video_segment.position_from_running_time(audio_running_time); if video_pts.is_none() { - warning!(CAT, obj: agg, "Can't output more audio after video EOS"); + gst::warning!( + CAT, + imp: self, + "Can't output more audio after video EOS" + ); return Err(gst::FlowError::Eos); } @@ -447,7 +447,7 @@ impl AggregatorImpl for NdiSinkCombiner { let audio_info = match state.audio_info { Some(ref audio_info) => audio_info, None => { - error!(CAT, obj: agg, "Have no audio caps"); + gst::error!(CAT, imp: self, "Have no audio caps"); return Err(gst::FlowError::NotNegotiated); } }; @@ -466,7 +466,8 @@ impl AggregatorImpl for NdiSinkCombiner { .map(|(audio, video)| audio <= video) .unwrap_or(true) { - let timecode = agg + let timecode = self + .instance() .base_time() .zip(audio_running_time) .map(|(base_time, audio_running_time)| { @@ -474,9 +475,9 @@ impl AggregatorImpl for NdiSinkCombiner { }) .unwrap_or(crate::ndisys::NDIlib_send_timecode_synthesize); - trace!( + gst::trace!( CAT, - obj: agg, + imp: self, "Including audio buffer {:?} with timecode {}: {} <= {}", audio_buffer, timecode, @@ -515,18 +516,17 @@ impl AggregatorImpl for NdiSinkCombiner { drop(state_storage); } - trace!( + gst::trace!( CAT, - obj: agg, + imp: self, "Finishing video buffer {:?}", current_video_buffer ); - agg.finish_buffer(current_video_buffer) + self.instance().finish_buffer(current_video_buffer) } fn sink_event(&self, pad: &gst_base::AggregatorPad, event: gst::Event) -> bool { use gst::EventView; - let agg = self.instance(); match event.view() { EventView::Caps(caps) => { @@ -542,7 +542,7 @@ impl AggregatorImpl for NdiSinkCombiner { let info = match gst_video::VideoInfo::from_caps(&caps) { Ok(info) => info, Err(_) => { - error!(CAT, obj: pad, "Failed to parse caps {:?}", caps); + gst::error!(CAT, obj: pad, "Failed to parse caps {:?}", caps); return false; } }; @@ -562,15 +562,15 @@ impl AggregatorImpl for NdiSinkCombiner { drop(state_storage); - agg.set_latency(latency, gst::ClockTime::NONE); + self.instance().set_latency(latency, gst::ClockTime::NONE); // The video caps are passed through as the audio is included only in a meta - agg.set_src_caps(&caps); + self.instance().set_src_caps(&caps); } else { let info = match gst_audio::AudioInfo::from_caps(&caps) { Ok(info) => info, Err(_) => { - error!(CAT, obj: pad, "Failed to parse caps {:?}", caps); + gst::error!(CAT, obj: pad, "Failed to parse caps {:?}", caps); return false; } }; @@ -581,8 +581,8 @@ impl AggregatorImpl for NdiSinkCombiner { // The video segment is passed through as-is and the video timestamps are preserved EventView::Segment(segment) if pad == &self.video_pad => { let segment = segment.segment(); - debug!(CAT, obj: agg, "Updating segment {:?}", segment); - agg.update_segment(segment); + gst::debug!(CAT, obj: pad, "Updating segment {:?}", segment); + self.instance().update_segment(segment); } _ => (), } diff --git a/net/ndi/src/ndisrc/imp.rs b/net/ndi/src/ndisrc/imp.rs index ef7f2fd2..027e50b7 100644 --- a/net/ndi/src/ndisrc/imp.rs +++ b/net/ndi/src/ndisrc/imp.rs @@ -2,7 +2,6 @@ use gst::prelude::*; use gst::subclass::prelude::*; -use gst::{debug, error}; use gst_base::prelude::*; use gst_base::subclass::base_src::CreateSuccess; use gst_base::subclass::prelude::*; @@ -168,23 +167,22 @@ impl ObjectImpl for NdiSrc { fn constructed(&self) { self.parent_constructed(); - let obj = self.instance(); // Initialize live-ness and notify the base class that // we'd like to operate in Time format + let obj = self.instance(); obj.set_live(true); obj.set_format(gst::Format::Time); } fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { - let obj = self.instance(); match pspec.name() { "ndi-name" => { let mut settings = self.settings.lock().unwrap(); let ndi_name = value.get().unwrap(); - debug!( + gst::debug!( CAT, - obj: obj, + imp: self, "Changing ndi-name from {:?} to {:?}", settings.ndi_name, ndi_name, @@ -194,9 +192,9 @@ impl ObjectImpl for NdiSrc { "url-address" => { let mut settings = self.settings.lock().unwrap(); let url_address = value.get().unwrap(); - debug!( + gst::debug!( CAT, - obj: obj, + imp: self, "Changing url-address from {:?} to {:?}", settings.url_address, url_address, @@ -206,9 +204,9 @@ impl ObjectImpl for NdiSrc { "receiver-ndi-name" => { let mut settings = self.settings.lock().unwrap(); let receiver_ndi_name = value.get::>().unwrap(); - debug!( + gst::debug!( CAT, - obj: obj, + imp: self, "Changing receiver-ndi-name from {:?} to {:?}", settings.receiver_ndi_name, receiver_ndi_name, @@ -219,9 +217,9 @@ impl ObjectImpl for NdiSrc { "connect-timeout" => { let mut settings = self.settings.lock().unwrap(); let connect_timeout = value.get().unwrap(); - debug!( + gst::debug!( CAT, - obj: obj, + imp: self, "Changing connect-timeout from {} to {}", settings.connect_timeout, connect_timeout, @@ -231,9 +229,9 @@ impl ObjectImpl for NdiSrc { "timeout" => { let mut settings = self.settings.lock().unwrap(); let timeout = value.get().unwrap(); - debug!( + gst::debug!( CAT, - obj: obj, + imp: self, "Changing timeout from {} to {}", settings.timeout, timeout, @@ -243,9 +241,9 @@ impl ObjectImpl for NdiSrc { "max-queue-length" => { let mut settings = self.settings.lock().unwrap(); let max_queue_length = value.get().unwrap(); - debug!( + gst::debug!( CAT, - obj: obj, + imp: self, "Changing max-queue-length from {} to {}", settings.max_queue_length, max_queue_length, @@ -255,9 +253,9 @@ impl ObjectImpl for NdiSrc { "bandwidth" => { let mut settings = self.settings.lock().unwrap(); let bandwidth = value.get().unwrap(); - debug!( + gst::debug!( CAT, - obj: obj, + imp: self, "Changing bandwidth from {} to {}", settings.bandwidth, bandwidth, @@ -267,9 +265,9 @@ impl ObjectImpl for NdiSrc { "color-format" => { let mut settings = self.settings.lock().unwrap(); let color_format = value.get().unwrap(); - debug!( + gst::debug!( CAT, - obj: obj, + imp: self, "Changing color format from {:?} to {:?}", settings.color_format, color_format, @@ -279,15 +277,19 @@ impl ObjectImpl for NdiSrc { "timestamp-mode" => { let mut settings = self.settings.lock().unwrap(); let timestamp_mode = value.get().unwrap(); - debug!( + gst::debug!( CAT, - obj: obj, + imp: self, "Changing timestamp mode from {:?} to {:?}", settings.timestamp_mode, timestamp_mode ); if settings.timestamp_mode != timestamp_mode { - let _ = obj.post_message(gst::message::Latency::builder().src(&*obj).build()); + let _ = self.instance().post_message( + gst::message::Latency::builder() + .src(&*self.instance()) + .build(), + ); } settings.timestamp_mode = timestamp_mode; } @@ -411,7 +413,7 @@ impl BaseSrcImpl for NdiSrc { } fn unlock(&self) -> Result<(), gst::ErrorMessage> { - debug!(CAT, obj: self.instance(), "Unlocking",); + gst::debug!(CAT, imp: self, "Unlocking",); if let Some(ref controller) = *self.receiver_controller.lock().unwrap() { controller.set_flushing(true); } @@ -419,7 +421,7 @@ impl BaseSrcImpl for NdiSrc { } fn unlock_stop(&self) -> Result<(), gst::ErrorMessage> { - debug!(CAT, obj: self.instance(), "Stop unlocking",); + gst::debug!(CAT, imp: self, "Stop unlocking",); if let Some(ref controller) = *self.receiver_controller.lock().unwrap() { controller.set_flushing(false); } @@ -501,13 +503,7 @@ impl BaseSrcImpl for NdiSrc { let max = settings.max_queue_length as u64 * latency; - debug!( - CAT, - obj: self.instance(), - "Returning latency min {} max {}", - min, - max - ); + gst::debug!(CAT, imp: self, "Returning latency min {} max {}", min, max); q.set(true, min, max); true } else { @@ -524,13 +520,12 @@ impl BaseSrcImpl for NdiSrc { _buffer: Option<&mut gst::BufferRef>, _length: u32, ) -> Result { - let element = self.instance(); let recv = { let mut state = self.state.lock().unwrap(); match state.receiver.take() { Some(recv) => recv, None => { - error!(CAT, obj: element, "Have no receiver"); + gst::error!(CAT, imp: self, "Have no receiver"); return Err(gst::FlowError::Error); } } @@ -547,8 +542,8 @@ impl BaseSrcImpl for NdiSrc { Buffer::Audio(mut buffer, info) => { if state.audio_info.as_ref() != Some(&info) { let caps = info.to_caps().map_err(|_| { - gst::element_error!( - element, + gst::element_imp_error!( + self, gst::ResourceError::Settings, ["Invalid audio info received: {:?}", info] ); @@ -574,8 +569,8 @@ impl BaseSrcImpl for NdiSrc { if state.video_info.as_ref() != Some(&info) { let caps = info.to_caps().map_err(|_| { - gst::element_error!( - element, + gst::element_imp_error!( + self, gst::ResourceError::Settings, ["Invalid video info received: {:?}", info] ); @@ -598,8 +593,10 @@ impl BaseSrcImpl for NdiSrc { drop(state); if latency_changed { - let _ = element.post_message( - gst::message::Latency::builder().src(&*element).build(), + let _ = self.instance().post_message( + gst::message::Latency::builder() + .src(&*self.instance()) + .build(), ); } diff --git a/net/ndi/src/ndisrc/receiver.rs b/net/ndi/src/ndisrc/receiver.rs index f5430ac5..90484bac 100644 --- a/net/ndi/src/ndisrc/receiver.rs +++ b/net/ndi/src/ndisrc/receiver.rs @@ -2,7 +2,6 @@ use glib::prelude::*; use gst::prelude::*; -use gst::{debug, error, log, trace, warning}; use gst_video::prelude::*; use byte_slice_cast::*; @@ -203,7 +202,7 @@ pub struct ReceiverInner { observations_timestamp: [Observations; 2], observations_timecode: [Observations; 2], - element: glib::WeakRef, + element: glib::WeakRef, timestamp_mode: TimestampMode, timeout: u32, @@ -290,7 +289,7 @@ impl Observations { // http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.102.1546 fn process( &self, - element: &gst_base::BaseSrc, + element: &gst::Element, remote_time: Option, local_time: gst::ClockTime, duration: Option, @@ -300,7 +299,7 @@ impl Observations { let mut inner = self.0.borrow_mut(); - trace!( + gst::trace!( CAT, obj: element, "Local time {}, remote time {}, slope correct {}/{}", @@ -334,7 +333,7 @@ impl Observations { match (inner.base_remote_time, inner.base_local_time) { (Some(remote), Some(local)) => (remote, local), _ => { - debug!( + gst::debug!( CAT, obj: element, "Initializing base time: local {}, remote {}", @@ -382,7 +381,7 @@ impl Observations { // Check for some obviously wrong slopes and try to correct for that if !(0.5..1.5).contains(&scaled_slope) { - warning!( + gst::warning!( CAT, obj: element, "Too small/big slope {}, resetting", @@ -423,7 +422,7 @@ impl Observations { .unwrap() .0 .mul_div_round(inner.slope_correction.0, inner.slope_correction.1)?; - debug!( + gst::debug!( CAT, obj: element, "Initializing base time: local {}, remote {}, slope correction {}/{}", @@ -444,7 +443,7 @@ impl Observations { let local_diff = local_time.saturating_sub(base_local_time); let delta = (local_diff as i64) - (remote_diff as i64); - trace!( + gst::trace!( CAT, obj: element, "Local diff {}, remote diff {}, delta {}", @@ -456,7 +455,7 @@ impl Observations { if (delta > inner.skew && delta - inner.skew > 1_000_000_000) || (delta < inner.skew && inner.skew - delta > 1_000_000_000) { - warning!( + gst::warning!( CAT, obj: element, "Delta {} too far from skew {}, resetting", @@ -466,7 +465,7 @@ impl Observations { let discont = !inner.deltas.is_empty(); - debug!( + gst::debug!( CAT, obj: element, "Initializing base time: local {}, remote {}", @@ -520,14 +519,14 @@ impl Observations { out_time + (inner.skew as u64) }; - trace!( + gst::trace!( CAT, obj: element, "Skew {}, min delta {}", inner.skew, inner.min_delta ); - trace!( + gst::trace!( CAT, obj: element, "Outputting {}", @@ -572,7 +571,7 @@ impl Drop for ReceiverInner { let element = self.element.upgrade(); if let Some(ref element) = element { - debug!(CAT, obj: element, "Closed NDI connection"); + gst::debug!(CAT, obj: element, "Closed NDI connection"); } } } @@ -584,7 +583,7 @@ impl Receiver { timeout: u32, connect_timeout: u32, max_queue_length: usize, - element: &gst_base::BaseSrc, + element: &gst::Element, ) -> Self { let receiver = Receiver(Arc::new(ReceiverInner { queue: ReceiverQueue(Arc::new(( @@ -685,7 +684,7 @@ impl Receiver { #[allow(clippy::too_many_arguments)] pub fn connect( - element: &gst_base::BaseSrc, + element: &gst::Element, ndi_name: Option<&str>, url_address: Option<&str>, receiver_ndi_name: &str, @@ -696,11 +695,11 @@ impl Receiver { timeout: u32, max_queue_length: usize, ) -> Option { - debug!(CAT, obj: element, "Starting NDI connection..."); + gst::debug!(CAT, obj: element, "Starting NDI connection..."); assert!(ndi_name.is_some() || url_address.is_some()); - debug!( + gst::debug!( CAT, obj: element, "Connecting to NDI source with NDI name '{:?}' and URL/Address {:?}", @@ -766,13 +765,13 @@ impl Receiver { let flushing = { let queue = (receiver.0.queue.0).0.lock().unwrap(); if queue.shutdown { - debug!(CAT, obj: &element, "Shutting down"); + gst::debug!(CAT, obj: &element, "Shutting down"); break; } // If an error happened in the meantime, just go out of here if queue.error.is_some() { - error!(CAT, obj: &element, "Error while waiting for connection"); + gst::error!(CAT, obj: &element, "Error while waiting for connection"); return; } @@ -787,7 +786,7 @@ impl Receiver { let res = match recv.capture(50) { _ if flushing => { - debug!(CAT, obj: &element, "Flushing"); + gst::debug!(CAT, obj: &element, "Flushing"); Err(gst::FlowError::Flushing) } Err(_) => { @@ -799,11 +798,11 @@ impl Receiver { Err(gst::FlowError::Error) } Ok(None) if timeout > 0 && timer.elapsed().as_millis() >= timeout as u128 => { - debug!(CAT, obj: &element, "Timed out -- assuming EOS",); + gst::debug!(CAT, obj: &element, "Timed out -- assuming EOS",); Err(gst::FlowError::Eos) } Ok(None) => { - debug!(CAT, obj: &element, "No frame received yet, retry"); + gst::debug!(CAT, obj: &element, "No frame received yet, retry"); continue; } Ok(Some(Frame::Video(frame))) => { @@ -836,7 +835,7 @@ impl Receiver { } Ok(Some(Frame::Metadata(frame))) => { if let Some(metadata) = frame.metadata() { - debug!( + gst::debug!( CAT, obj: &element, "Received metadata at timecode {}: {}", @@ -853,7 +852,7 @@ impl Receiver { Ok(item) => { let mut queue = (receiver.0.queue.0).0.lock().unwrap(); while queue.buffer_queue.len() > receiver.0.max_queue_length { - warning!( + gst::warning!( CAT, obj: &element, "Dropping old buffer -- queue has {} items", @@ -866,7 +865,7 @@ impl Receiver { timer = time::Instant::now(); } Err(gst::FlowError::Eos) => { - debug!(CAT, obj: &element, "Signalling EOS"); + gst::debug!(CAT, obj: &element, "Signalling EOS"); let mut queue = (receiver.0.queue.0).0.lock().unwrap(); queue.timeout = true; (receiver.0.queue.0).1.notify_one(); @@ -880,7 +879,7 @@ impl Receiver { timer = time::Instant::now(); } Err(err) => { - error!(CAT, obj: &element, "Signalling error"); + gst::error!(CAT, obj: &element, "Signalling error"); let mut queue = (receiver.0.queue.0).0.lock().unwrap(); if queue.error.is_none() { queue.error = Some(err); @@ -894,7 +893,7 @@ impl Receiver { fn calculate_timestamp( &self, - element: &gst_base::BaseSrc, + element: &gst::Element, is_audio: bool, timestamp: i64, timecode: i64, @@ -910,7 +909,7 @@ impl Receiver { }; let timecode = gst::ClockTime::from_nseconds(timecode as u64 * 100); - log!( + gst::log!( CAT, obj: element, "Received frame with timecode {}, timestamp {}, duration {}, receive time {}, local time now {}", @@ -939,7 +938,7 @@ impl Receiver { TimestampMode::ReceiveTimeTimecode => match res_timecode { Some((pts, duration, discont)) => (pts, duration, discont), None => { - warning!(CAT, obj: element, "Can't calculate timestamp"); + gst::warning!(CAT, obj: element, "Can't calculate timestamp"); (receive_time, duration, false) } }, @@ -947,7 +946,7 @@ impl Receiver { Some((pts, duration, discont)) => (pts, duration, discont), None => { if timestamp.is_some() { - warning!(CAT, obj: element, "Can't calculate timestamp"); + gst::warning!(CAT, obj: element, "Can't calculate timestamp"); } (receive_time, duration, false) @@ -978,7 +977,7 @@ impl Receiver { } }; - log!( + gst::log!( CAT, obj: element, "Calculated PTS {}, duration {}", @@ -991,15 +990,15 @@ impl Receiver { fn create_video_buffer_and_info( &self, - element: &gst_base::BaseSrc, + element: &gst::Element, video_frame: VideoFrame, ) -> Result { - debug!(CAT, obj: element, "Received video frame {:?}", video_frame); + gst::debug!(CAT, obj: element, "Received video frame {:?}", video_frame); let (pts, duration, discont) = self .calculate_video_timestamp(element, &video_frame) .ok_or_else(|| { - debug!(CAT, obj: element, "Flushing, dropping buffer"); + gst::debug!(CAT, obj: element, "Flushing, dropping buffer"); gst::FlowError::Flushing })?; @@ -1013,14 +1012,14 @@ impl Receiver { .set_flags(gst::BufferFlags::RESYNC); } - log!(CAT, obj: element, "Produced video buffer {:?}", buffer); + gst::log!(CAT, obj: element, "Produced video buffer {:?}", buffer); Ok(Buffer::Video(buffer, info)) } fn calculate_video_timestamp( &self, - element: &gst_base::BaseSrc, + element: &gst::Element, video_frame: &VideoFrame, ) -> Option<(gst::ClockTime, Option, bool)> { let duration = gst::ClockTime::SECOND.mul_div_floor( @@ -1039,7 +1038,7 @@ impl Receiver { fn create_video_info( &self, - element: &gst_base::BaseSrc, + element: &gst::Element, video_frame: &VideoFrame, ) -> Result { let fourcc = video_frame.fourcc(); @@ -1215,7 +1214,7 @@ impl Receiver { .contains(&fourcc) { let compressed_packet = video_frame.compressed_packet().ok_or_else(|| { - error!( + gst::error!( CAT, obj: element, "Video packet doesn't have compressed packet start" @@ -1226,7 +1225,7 @@ impl Receiver { })?; if compressed_packet.fourcc != NDIlib_compressed_FourCC_type_H264 { - error!(CAT, obj: element, "Non-H264 video packet"); + gst::error!(CAT, obj: element, "Non-H264 video packet"); gst::element_error!(element, gst::StreamError::Format, ["Invalid video packet"]); return Err(gst::FlowError::Error); @@ -1253,7 +1252,7 @@ impl Receiver { .contains(&fourcc) { let compressed_packet = video_frame.compressed_packet().ok_or_else(|| { - error!( + gst::error!( CAT, obj: element, "Video packet doesn't have compressed packet start" @@ -1264,7 +1263,7 @@ impl Receiver { })?; if compressed_packet.fourcc != NDIlib_compressed_FourCC_type_HEVC { - error!(CAT, obj: element, "Non-H265 video packet"); + gst::error!(CAT, obj: element, "Non-H265 video packet"); gst::element_error!(element, gst::StreamError::Format, ["Invalid video packet"]); return Err(gst::FlowError::Error); @@ -1291,7 +1290,7 @@ impl Receiver { fn create_video_buffer( &self, - element: &gst_base::BaseSrc, + element: &gst::Element, pts: gst::ClockTime, duration: Option, info: &VideoInfo, @@ -1360,7 +1359,7 @@ impl Receiver { fn copy_video_frame( &self, - #[allow(unused_variables)] element: &gst_base::BaseSrc, + #[allow(unused_variables)] element: &gst::Element, info: &VideoInfo, video_frame: &VideoFrame, ) -> Result { @@ -1492,7 +1491,7 @@ impl Receiver { #[cfg(feature = "advanced-sdk")] VideoInfo::SpeedHQInfo { .. } => { let data = video_frame.data().ok_or_else(|| { - error!(CAT, obj: element, "Video packet has no data"); + gst::error!(CAT, obj: element, "Video packet has no data"); gst::element_error!( element, gst::StreamError::Format, @@ -1507,7 +1506,7 @@ impl Receiver { #[cfg(feature = "advanced-sdk")] VideoInfo::H264 { .. } | VideoInfo::H265 { .. } => { let compressed_packet = video_frame.compressed_packet().ok_or_else(|| { - error!( + gst::error!( CAT, obj: element, "Video packet doesn't have compressed packet start" @@ -1539,15 +1538,15 @@ impl Receiver { fn create_audio_buffer_and_info( &self, - element: &gst_base::BaseSrc, + element: &gst::Element, audio_frame: AudioFrame, ) -> Result { - debug!(CAT, obj: element, "Received audio frame {:?}", audio_frame); + gst::debug!(CAT, obj: element, "Received audio frame {:?}", audio_frame); let (pts, duration, discont) = self .calculate_audio_timestamp(element, &audio_frame) .ok_or_else(|| { - debug!(CAT, obj: element, "Flushing, dropping buffer"); + gst::debug!(CAT, obj: element, "Flushing, dropping buffer"); gst::FlowError::Flushing })?; @@ -1561,14 +1560,14 @@ impl Receiver { .set_flags(gst::BufferFlags::RESYNC); } - log!(CAT, obj: element, "Produced audio buffer {:?}", buffer); + gst::log!(CAT, obj: element, "Produced audio buffer {:?}", buffer); Ok(Buffer::Audio(buffer, info)) } fn calculate_audio_timestamp( &self, - element: &gst_base::BaseSrc, + element: &gst::Element, audio_frame: &AudioFrame, ) -> Option<(gst::ClockTime, Option, bool)> { let duration = gst::ClockTime::SECOND.mul_div_floor( @@ -1587,7 +1586,7 @@ impl Receiver { fn create_audio_info( &self, - element: &gst_base::BaseSrc, + element: &gst::Element, audio_frame: &AudioFrame, ) -> Result { let fourcc = audio_frame.fourcc(); @@ -1615,7 +1614,7 @@ impl Receiver { #[cfg(feature = "advanced-sdk")] if [NDIlib_FourCC_audio_type_AAC].contains(&fourcc) { let compressed_packet = audio_frame.compressed_packet().ok_or_else(|| { - error!( + gst::error!( CAT, obj: element, "Audio packet doesn't have compressed packet start" @@ -1626,7 +1625,7 @@ impl Receiver { })?; if compressed_packet.fourcc != NDIlib_compressed_FourCC_type_AAC { - error!(CAT, obj: element, "Non-AAC audio packet"); + gst::error!(CAT, obj: element, "Non-AAC audio packet"); gst::element_error!(element, gst::StreamError::Format, ["Invalid audio packet"]); return Err(gst::FlowError::Error); @@ -1656,7 +1655,7 @@ impl Receiver { fn create_audio_buffer( &self, - #[allow(unused_variables)] element: &gst_base::BaseSrc, + #[allow(unused_variables)] element: &gst::Element, pts: gst::ClockTime, duration: Option, info: &AudioInfo, @@ -1721,7 +1720,7 @@ impl Receiver { #[cfg(feature = "advanced-sdk")] AudioInfo::Opus { .. } => { let data = audio_frame.data().ok_or_else(|| { - error!(CAT, obj: element, "Audio packet has no data"); + gst::error!(CAT, obj: element, "Audio packet has no data"); gst::element_error!( element, gst::StreamError::Format, @@ -1736,7 +1735,7 @@ impl Receiver { #[cfg(feature = "advanced-sdk")] AudioInfo::Aac { .. } => { let compressed_packet = audio_frame.compressed_packet().ok_or_else(|| { - error!( + gst::error!( CAT, obj: element, "Audio packet doesn't have compressed packet start" diff --git a/net/ndi/src/ndisrcdemux/imp.rs b/net/ndi/src/ndisrcdemux/imp.rs index 472930f6..d221e4c8 100644 --- a/net/ndi/src/ndisrcdemux/imp.rs +++ b/net/ndi/src/ndisrcdemux/imp.rs @@ -2,7 +2,6 @@ use gst::prelude::*; use gst::subclass::prelude::*; -use gst::{debug, error, log}; use std::sync::Mutex; @@ -47,14 +46,14 @@ impl ObjectSubclass for NdiSrcDemux { NdiSrcDemux::catch_panic_pad_function( parent, || Err(gst::FlowError::Error), - |self_| self_.sink_chain(pad, &self_.instance(), buffer), + |self_| self_.sink_chain(pad, buffer), ) }) .event_function(|pad, parent, event| { NdiSrcDemux::catch_panic_pad_function( parent, || false, - |self_| self_.sink_event(pad, &self_.instance(), event), + |self_| self_.sink_event(pad, event), ) }) .build(); @@ -131,7 +130,6 @@ impl ElementImpl for NdiSrcDemux { &self, transition: gst::StateChange, ) -> Result { - let element = self.instance(); let res = self.parent_change_state(transition)?; match transition { @@ -141,7 +139,7 @@ impl ElementImpl for NdiSrcDemux { .iter() .flatten() { - element.remove_pad(pad).unwrap(); + self.instance().remove_pad(pad).unwrap(); } *state = State::default(); } @@ -155,17 +153,16 @@ impl ElementImpl for NdiSrcDemux { impl NdiSrcDemux { fn sink_chain( &self, - pad: &gst::Pad, - element: &super::NdiSrcDemux, + _pad: &gst::Pad, mut buffer: gst::Buffer, ) -> Result { - log!(CAT, obj: pad, "Handling buffer {:?}", buffer); + gst::log!(CAT, imp: self, "Handling buffer {:?}", buffer); let meta = buffer .make_mut() .meta_mut::() .ok_or_else(|| { - error!(CAT, obj: element, "Buffer without NDI source meta"); + gst::error!(CAT, imp: self, "Buffer without NDI source meta"); gst::FlowError::Error })?; @@ -180,10 +177,13 @@ impl NdiSrcDemux { if let Some(ref pad) = state.audio_pad { srcpad = pad.clone(); } else { - debug!(CAT, obj: element, "Adding audio pad with caps {}", caps); + gst::debug!(CAT, imp: self, "Adding audio pad with caps {}", caps); - let klass = element.element_class(); - let templ = klass.pad_template("audio").unwrap(); + let templ = self + .instance() + .element_class() + .pad_template("audio") + .unwrap(); let pad = gst::Pad::builder_with_template(&templ, Some("audio")) .flags(gst::PadFlags::FIXED_CAPS) .build(); @@ -221,7 +221,7 @@ impl NdiSrcDemux { } if state.audio_caps.as_ref() != Some(&caps) { - debug!(CAT, obj: element, "Audio caps changed to {}", caps); + gst::debug!(CAT, imp: self, "Audio caps changed to {}", caps); events.push(gst::event::Caps::new(&caps)); state.audio_caps = Some(caps); } @@ -230,10 +230,13 @@ impl NdiSrcDemux { if let Some(ref pad) = state.video_pad { srcpad = pad.clone(); } else { - debug!(CAT, obj: element, "Adding video pad with caps {}", caps); + gst::debug!(CAT, imp: self, "Adding video pad with caps {}", caps); - let klass = element.element_class(); - let templ = klass.pad_template("video").unwrap(); + let templ = self + .instance() + .element_class() + .pad_template("video") + .unwrap(); let pad = gst::Pad::builder_with_template(&templ, Some("video")) .flags(gst::PadFlags::FIXED_CAPS) .build(); @@ -271,7 +274,7 @@ impl NdiSrcDemux { } if state.video_caps.as_ref() != Some(&caps) { - debug!(CAT, obj: element, "Video caps changed to {}", caps); + gst::debug!(CAT, imp: self, "Video caps changed to {}", caps); events.push(gst::event::Caps::new(&caps)); state.video_caps = Some(caps); } @@ -281,9 +284,9 @@ impl NdiSrcDemux { meta.remove().unwrap(); if add_pad { - element.add_pad(&srcpad).unwrap(); - if element.num_src_pads() == 2 { - element.no_more_pads(); + self.instance().add_pad(&srcpad).unwrap(); + if self.instance().num_src_pads() == 2 { + self.instance().no_more_pads(); } } @@ -297,20 +300,20 @@ impl NdiSrcDemux { state.combiner.update_pad_flow(&srcpad, res) } - fn sink_event(&self, pad: &gst::Pad, element: &super::NdiSrcDemux, event: gst::Event) -> bool { + fn sink_event(&self, pad: &gst::Pad, event: gst::Event) -> bool { use gst::EventView; - log!(CAT, obj: pad, "Handling event {:?}", event); + gst::log!(CAT, imp: self, "Handling event {:?}", event); if let EventView::Eos(_) = event.view() { - if element.num_src_pads() == 0 { + if self.instance().num_src_pads() == 0 { // error out on EOS if no src pad are available - gst::element_error!( - element, + gst::element_imp_error!( + self, gst::StreamError::Demux, ["EOS without available srcpad(s)"] ); } } - gst::Pad::event_default(pad, Some(element), event) + gst::Pad::event_default(pad, Some(&*self.instance()), event) } }