Combine audio/video source into a single element and add a demuxer for that

This simplifies the code considerably.
This commit is contained in:
Sebastian Dröge 2021-09-29 13:45:04 +03:00
parent 0f88b3df68
commit b3184b45bc
13 changed files with 920 additions and 1478 deletions

View file

@ -12,16 +12,14 @@ Some examples of how to use these elements from the command line:
```
#Information about the elements
gst-inspect-1.0 ndi
gst-inspect-1.0 ndivideosrc
gst-inspect-1.0 ndiaudiosrc
gst-inspect-1.0 ndisrc
gst-inspect-1.0 ndisink
#Video pipeline
gst-launch-1.0 ndivideosrc ndi-name="GC-DEV2 (OBS)" ! autovideosink
#Audio pipeline
gst-launch-1.0 ndiaudiosrc ndi-name="GC-DEV2 (OBS)" ! autoaudiosink
# Audio/Video source pipeline
gst-launch-1.0 ndisrc ndi-name="GC-DEV2 (OBS)" ! ndisrcdemux name=demux demux.video ! queue ! videoconvert ! autovideosink demux.audio ! queue ! audioconvert ! autoaudiosink
#Video and audio pipeline
gst-launch-1.0 ndivideosrc ndi-name="GC-DEV2 (OBS)" ! autovideosink ndiaudiosrc ndi-name="GC-DEV2 (OBS)" ! autoaudiosink
# Audio/Video sink pipeline
gst-launch-1.0 videotestsrc is-live=true ! video/x-raw,format=UYVY ! ndisinkcombiner name=combiner ! ndisink ndi-name="My NDI source" audiotestsrc is-live=true ! combiner.audio
```
Feel free to contribute to this project. Some ways you can contribute are:

View file

@ -12,9 +12,16 @@ use once_cell::sync::Lazy;
use crate::ndi;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"ndideviceprovider",
gst::DebugColorFlags::empty(),
Some("NewTek NDI Device Provider"),
)
});
#[derive(Debug)]
pub struct DeviceProvider {
cat: gst::DebugCategory,
thread: Mutex<Option<thread::JoinHandle<()>>>,
current_devices: Mutex<Vec<super::Device>>,
find: Mutex<Option<ndi::FindInstance>>,
@ -29,11 +36,6 @@ impl ObjectSubclass for DeviceProvider {
fn new() -> Self {
Self {
cat: gst::DebugCategory::new(
"ndideviceprovider",
gst::DebugColorFlags::empty(),
Some("NewTek NDI Device Provider"),
),
thread: Mutex::new(None),
current_devices: Mutex::new(vec![]),
find: Mutex::new(None),
@ -68,11 +70,7 @@ impl DeviceProviderImpl for DeviceProvider {
fn start(&self, device_provider: &Self::Type) -> Result<(), gst::LoggableError> {
let mut thread_guard = self.thread.lock().unwrap();
if thread_guard.is_some() {
gst_log!(
self.cat,
obj: device_provider,
"Device provider already started"
);
gst_log!(CAT, obj: device_provider, "Device provider already started");
return Ok(());
}
@ -90,17 +88,13 @@ impl DeviceProviderImpl for DeviceProvider {
{
let mut find_guard = imp.find.lock().unwrap();
if find_guard.is_some() {
gst_log!(imp.cat, obj: &device_provider, "Already started");
gst_log!(CAT, obj: &device_provider, "Already started");
return;
}
let find = match ndi::FindInstance::builder().build() {
None => {
gst_error!(
imp.cat,
obj: &device_provider,
"Failed to create Find instance"
);
gst_error!(CAT, obj: &device_provider, "Failed to create Find instance");
return;
}
Some(find) => find,
@ -144,7 +138,7 @@ impl DeviceProvider {
};
if !find.wait_for_sources(if first { 1000 } else { 5000 }) {
gst_trace!(self.cat, obj: device_provider, "No new sources found");
gst_trace!(CAT, obj: device_provider, "No new sources found");
return;
}
@ -160,9 +154,9 @@ impl DeviceProvider {
let old_device_imp = Device::from_instance(old_device);
let old_source = old_device_imp.source.get().unwrap();
if !sources.contains(&old_source.0) {
if !sources.contains(&*old_source) {
gst_log!(
self.cat,
CAT,
obj: device_provider,
"Source {:?} disappeared",
old_source
@ -171,7 +165,7 @@ impl DeviceProvider {
} else {
// Otherwise remember that we had it before already and don't have to announce it
// again. After the loop we're going to remove these all from the sources vec.
remaining_sources.push(old_source.0.to_owned());
remaining_sources.push(old_source.to_owned());
}
}
@ -188,18 +182,8 @@ impl DeviceProvider {
// Now go through all new devices and announce them
for source in sources {
gst_log!(
self.cat,
obj: device_provider,
"Source {:?} appeared",
source
);
// Add once for audio, another time for video
let device = super::Device::new(&source, true);
device_provider.device_add(&device);
current_devices_guard.push(device);
let device = super::Device::new(&source, false);
gst_log!(CAT, obj: device_provider, "Source {:?} appeared", source);
let device = super::Device::new(&source);
device_provider.device_add(&device);
current_devices_guard.push(device);
}
@ -208,8 +192,7 @@ impl DeviceProvider {
#[derive(Debug)]
pub struct Device {
cat: gst::DebugCategory,
source: OnceCell<(ndi::Source<'static>, glib::Type)>,
source: OnceCell<ndi::Source<'static>>,
}
#[glib::object_subclass]
@ -220,11 +203,6 @@ impl ObjectSubclass for Device {
fn new() -> Self {
Self {
cat: gst::DebugCategory::new(
"ndidevice",
gst::DebugColorFlags::empty(),
Some("NewTek NDI Device"),
),
source: OnceCell::new(),
}
}
@ -240,11 +218,11 @@ impl DeviceImpl for Device {
) -> Result<gst::Element, gst::LoggableError> {
let source_info = self.source.get().unwrap();
let element = glib::Object::with_type(
source_info.1,
crate::ndisrc::NdiSrc::static_type(),
&[
("name", &name),
("ndi-name", &source_info.0.ndi_name()),
("url-address", &source_info.0.url_address()),
("ndi-name", &source_info.ndi_name()),
("url-address", &source_info.url_address()),
],
)
.unwrap()
@ -256,24 +234,12 @@ impl DeviceImpl for Device {
}
impl super::Device {
fn new(source: &ndi::Source<'_>, is_audio: bool) -> super::Device {
let display_name = format!(
"{} ({})",
source.ndi_name(),
if is_audio { "Audio" } else { "Video" }
);
let device_class = format!(
"Source/{}/Network",
if is_audio { "Audio" } else { "Video" }
);
fn new(source: &ndi::Source<'_>) -> super::Device {
let display_name = source.ndi_name();
let device_class = "Source/Audio/Video/Network";
// Get the caps from the template caps of the corresponding source element
let element_type = if is_audio {
crate::ndiaudiosrc::NdiAudioSrc::static_type()
} else {
crate::ndivideosrc::NdiVideoSrc::static_type()
};
let element_class = glib::Class::<gst::Element>::from_type(element_type).unwrap();
let element_class =
glib::Class::<gst::Element>::from_type(crate::ndisrc::NdiSrc::static_type()).unwrap();
let templ = element_class.pad_template("src").unwrap();
let caps = templ.caps();
@ -292,10 +258,7 @@ impl super::Device {
.unwrap();
let device_impl = Device::from_instance(&device);
device_impl
.source
.set((source.to_owned(), element_type))
.unwrap();
device_impl.source.set(source.to_owned()).unwrap();
device
}

View file

@ -1,21 +1,21 @@
mod device_provider;
pub mod ndi;
mod ndiaudiosrc;
#[cfg(feature = "sink")]
mod ndisink;
#[cfg(feature = "sink")]
mod ndisinkcombiner;
#[cfg(feature = "sink")]
pub mod ndisinkmeta;
mod ndisrc;
mod ndisrcdemux;
pub mod ndisrcmeta;
pub mod ndisys;
mod ndivideosrc;
pub mod receiver;
use crate::ndi::*;
use crate::ndisys::*;
use crate::receiver::*;
use std::collections::HashMap;
use std::time;
use once_cell::sync::Lazy;
@ -41,8 +41,9 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
device_provider::register(plugin)?;
ndivideosrc::register(plugin)?;
ndiaudiosrc::register(plugin)?;
ndisrc::register(plugin)?;
ndisrcdemux::register(plugin)?;
#[cfg(feature = "sink")]
{
ndisinkcombiner::register(plugin)?;

View file

@ -3,7 +3,6 @@ use crate::ndisys::*;
use std::ffi;
use std::mem;
use std::ptr;
use std::sync::{Arc, Mutex};
use byte_slice_cast::*;
@ -242,27 +241,16 @@ impl<'a> RecvBuilder<'a> {
if ptr.is_null() {
None
} else {
Some(RecvInstance(Arc::new((
RecvInstanceInner(ptr::NonNull::new_unchecked(ptr)),
Mutex::new(()),
))))
Some(RecvInstance(ptr::NonNull::new_unchecked(ptr)))
}
}
}
}
// Any access to the RecvInstanceInner apart from calling the capture function must be protected by
// the mutex
#[derive(Debug, Clone)]
pub struct RecvInstance(Arc<(RecvInstanceInner, Mutex<()>)>);
pub struct RecvInstance(ptr::NonNull<::std::os::raw::c_void>);
#[derive(Debug)]
struct RecvInstanceInner(ptr::NonNull<::std::os::raw::c_void>);
unsafe impl Send for RecvInstanceInner {}
// Not 100% true but we ensure safety with the mutex. The documentation says that only the
// capturing itself can be performed from multiple threads at once safely.
unsafe impl Sync for RecvInstanceInner {}
unsafe impl Send for RecvInstance {}
impl RecvInstance {
pub fn builder<'a>(
@ -281,38 +269,24 @@ impl RecvInstance {
}
pub fn set_tally(&self, tally: &Tally) -> bool {
unsafe {
let _lock = (self.0).1.lock().unwrap();
NDIlib_recv_set_tally(((self.0).0).0.as_ptr(), &tally.0)
}
unsafe { NDIlib_recv_set_tally(self.0.as_ptr(), &tally.0) }
}
pub fn send_metadata(&self, metadata: &MetadataFrame) -> bool {
unsafe {
let _lock = (self.0).1.lock().unwrap();
NDIlib_recv_send_metadata(((self.0).0).0.as_ptr(), metadata.as_ptr())
}
unsafe { NDIlib_recv_send_metadata(self.0.as_ptr(), metadata.as_ptr()) }
}
pub fn get_queue(&self) -> Queue {
unsafe {
let _lock = (self.0).1.lock().unwrap();
let mut queue = mem::MaybeUninit::uninit();
NDIlib_recv_get_queue(((self.0).0).0.as_ptr(), queue.as_mut_ptr());
NDIlib_recv_get_queue(self.0.as_ptr(), queue.as_mut_ptr());
Queue(queue.assume_init())
}
}
pub fn capture(
&self,
video: bool,
audio: bool,
metadata: bool,
timeout_in_ms: u32,
) -> Result<Option<Frame>, ()> {
pub fn capture(&self, timeout_in_ms: u32) -> Result<Option<Frame>, ()> {
unsafe {
// Capturing from multiple threads at once is safe according to the documentation
let ptr = ((self.0).0).0.as_ptr();
let ptr = self.0.as_ptr();
let mut video_frame = mem::zeroed();
let mut audio_frame = mem::zeroed();
@ -320,46 +294,22 @@ impl RecvInstance {
let res = NDIlib_recv_capture_v2(
ptr,
if video {
&mut video_frame
} else {
ptr::null_mut()
},
if audio {
&mut audio_frame
} else {
ptr::null_mut()
},
if metadata {
&mut metadata_frame
} else {
ptr::null_mut()
},
&mut video_frame,
&mut audio_frame,
&mut metadata_frame,
timeout_in_ms,
);
match res {
NDIlib_frame_type_e::NDIlib_frame_type_audio => {
assert!(audio);
Ok(Some(Frame::Audio(AudioFrame::BorrowedRecv(
audio_frame,
self,
))))
}
NDIlib_frame_type_e::NDIlib_frame_type_video => {
assert!(video);
Ok(Some(Frame::Video(VideoFrame::BorrowedRecv(
video_frame,
self,
))))
}
NDIlib_frame_type_e::NDIlib_frame_type_metadata => {
assert!(metadata);
Ok(Some(Frame::Metadata(MetadataFrame::Borrowed(
metadata_frame,
self,
))))
}
NDIlib_frame_type_e::NDIlib_frame_type_audio => Ok(Some(Frame::Audio(
AudioFrame::BorrowedRecv(audio_frame, self),
))),
NDIlib_frame_type_e::NDIlib_frame_type_video => Ok(Some(Frame::Video(
VideoFrame::BorrowedRecv(video_frame, self),
))),
NDIlib_frame_type_e::NDIlib_frame_type_metadata => Ok(Some(Frame::Metadata(
MetadataFrame::Borrowed(metadata_frame, self),
))),
NDIlib_frame_type_e::NDIlib_frame_type_error => Err(()),
_ => Ok(None),
}
@ -367,7 +317,7 @@ impl RecvInstance {
}
}
impl Drop for RecvInstanceInner {
impl Drop for RecvInstance {
fn drop(&mut self) {
unsafe { NDIlib_recv_destroy(self.0.as_ptr() as *mut _) }
}
@ -751,7 +701,7 @@ impl<'a> Drop for VideoFrame<'a> {
fn drop(&mut self) {
if let VideoFrame::BorrowedRecv(ref mut frame, recv) = *self {
unsafe {
NDIlib_recv_free_video_v2(((recv.0).0).0.as_ptr() as *mut _, frame);
NDIlib_recv_free_video_v2(recv.0.as_ptr() as *mut _, frame);
}
}
}
@ -920,7 +870,7 @@ impl<'a> Drop for AudioFrame<'a> {
fn drop(&mut self) {
if let AudioFrame::BorrowedRecv(ref mut frame, recv) = *self {
unsafe {
NDIlib_recv_free_audio_v2(((recv.0).0).0.as_ptr() as *mut _, frame);
NDIlib_recv_free_audio_v2(recv.0.as_ptr() as *mut _, frame);
}
}
}
@ -1012,7 +962,7 @@ impl<'a> Drop for MetadataFrame<'a> {
fn drop(&mut self) {
if let MetadataFrame::Borrowed(ref mut frame, recv) = *self {
unsafe {
NDIlib_recv_free_metadata(((recv.0).0).0.as_ptr() as *mut _, frame);
NDIlib_recv_free_metadata(recv.0.as_ptr() as *mut _, frame);
}
}
}

View file

@ -1,19 +0,0 @@
use glib::prelude::*;
mod imp;
glib::wrapper! {
pub struct NdiAudioSrc(ObjectSubclass<imp::NdiAudioSrc>) @extends gst_base::BaseSrc, gst::Element, gst::Object;
}
unsafe impl Send for NdiAudioSrc {}
unsafe impl Sync for NdiAudioSrc {}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"ndiaudiosrc",
gst::Rank::None,
NdiAudioSrc::static_type(),
)
}

View file

@ -10,16 +10,24 @@ use std::{i32, u32};
use once_cell::sync::Lazy;
use crate::connect_ndi;
use crate::ndisys;
use crate::AudioReceiver;
use crate::ndisrcmeta;
use crate::Buffer;
use crate::Receiver;
use crate::ReceiverControlHandle;
use crate::ReceiverItem;
use crate::TimestampMode;
use crate::DEFAULT_RECEIVER_NDI_NAME;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"ndisrc",
gst::DebugColorFlags::empty(),
Some("NewTek NDI Source"),
)
});
#[derive(Debug, Clone)]
struct Settings {
ndi_name: Option<String>,
@ -40,7 +48,7 @@ impl Default for Settings {
receiver_ndi_name: DEFAULT_RECEIVER_NDI_NAME.clone(),
connect_timeout: 10000,
timeout: 5000,
max_queue_length: 5,
max_queue_length: 10,
bandwidth: ndisys::NDIlib_recv_bandwidth_highest,
timestamp_mode: TimestampMode::ReceiveTimeTimecode,
}
@ -48,41 +56,41 @@ impl Default for Settings {
}
struct State {
info: Option<gst_audio::AudioInfo>,
receiver: Option<Receiver<AudioReceiver>>,
video_info: Option<gst_video::VideoInfo>,
video_caps: Option<gst::Caps>,
audio_info: Option<gst_audio::AudioInfo>,
audio_caps: Option<gst::Caps>,
current_latency: Option<gst::ClockTime>,
receiver: Option<Receiver>,
}
impl Default for State {
fn default() -> State {
State {
info: None,
receiver: None,
video_info: None,
video_caps: None,
audio_info: None,
audio_caps: None,
current_latency: gst::ClockTime::NONE,
receiver: None,
}
}
}
pub struct NdiAudioSrc {
cat: gst::DebugCategory,
pub struct NdiSrc {
settings: Mutex<Settings>,
state: Mutex<State>,
receiver_controller: Mutex<Option<ReceiverControlHandle<AudioReceiver>>>,
receiver_controller: Mutex<Option<ReceiverControlHandle>>,
}
#[glib::object_subclass]
impl ObjectSubclass for NdiAudioSrc {
const NAME: &'static str = "NdiAudioSrc";
type Type = super::NdiAudioSrc;
impl ObjectSubclass for NdiSrc {
const NAME: &'static str = "NdiSrc";
type Type = super::NdiSrc;
type ParentType = gst_base::BaseSrc;
fn new() -> Self {
Self {
cat: gst::DebugCategory::new(
"ndiaudiosrc",
gst::DebugColorFlags::empty(),
Some("NewTek NDI Audio Source"),
),
settings: Mutex::new(Default::default()),
state: Mutex::new(Default::default()),
receiver_controller: Mutex::new(None),
@ -90,7 +98,7 @@ impl ObjectSubclass for NdiAudioSrc {
}
}
impl ObjectImpl for NdiAudioSrc {
impl ObjectImpl for NdiSrc {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
@ -139,7 +147,7 @@ impl ObjectImpl for NdiAudioSrc {
"Maximum receive queue length",
0,
u32::MAX,
5,
10,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_int(
@ -186,7 +194,7 @@ impl ObjectImpl for NdiAudioSrc {
let mut settings = self.settings.lock().unwrap();
let ndi_name = value.get().unwrap();
gst_debug!(
self.cat,
CAT,
obj: obj,
"Changing ndi-name from {:?} to {:?}",
settings.ndi_name,
@ -198,7 +206,7 @@ impl ObjectImpl for NdiAudioSrc {
let mut settings = self.settings.lock().unwrap();
let url_address = value.get().unwrap();
gst_debug!(
self.cat,
CAT,
obj: obj,
"Changing url-address from {:?} to {:?}",
settings.url_address,
@ -210,7 +218,7 @@ impl ObjectImpl for NdiAudioSrc {
let mut settings = self.settings.lock().unwrap();
let receiver_ndi_name = value.get::<Option<String>>().unwrap();
gst_debug!(
self.cat,
CAT,
obj: obj,
"Changing receiver-ndi-name from {:?} to {:?}",
settings.receiver_ndi_name,
@ -223,7 +231,7 @@ impl ObjectImpl for NdiAudioSrc {
let mut settings = self.settings.lock().unwrap();
let connect_timeout = value.get().unwrap();
gst_debug!(
self.cat,
CAT,
obj: obj,
"Changing connect-timeout from {} to {}",
settings.connect_timeout,
@ -235,7 +243,7 @@ impl ObjectImpl for NdiAudioSrc {
let mut settings = self.settings.lock().unwrap();
let timeout = value.get().unwrap();
gst_debug!(
self.cat,
CAT,
obj: obj,
"Changing timeout from {} to {}",
settings.timeout,
@ -247,7 +255,7 @@ impl ObjectImpl for NdiAudioSrc {
let mut settings = self.settings.lock().unwrap();
let max_queue_length = value.get().unwrap();
gst_debug!(
self.cat,
CAT,
obj: obj,
"Changing max-queue-length from {} to {}",
settings.max_queue_length,
@ -259,7 +267,7 @@ impl ObjectImpl for NdiAudioSrc {
let mut settings = self.settings.lock().unwrap();
let bandwidth = value.get().unwrap();
gst_debug!(
self.cat,
CAT,
obj: obj,
"Changing bandwidth from {} to {}",
settings.bandwidth,
@ -271,7 +279,7 @@ impl ObjectImpl for NdiAudioSrc {
let mut settings = self.settings.lock().unwrap();
let timestamp_mode = value.get().unwrap();
gst_debug!(
self.cat,
CAT,
obj: obj,
"Changing timestamp mode from {:?} to {:?}",
settings.timestamp_mode,
@ -325,13 +333,13 @@ impl ObjectImpl for NdiAudioSrc {
}
}
impl ElementImpl for NdiAudioSrc {
impl ElementImpl for NdiSrc {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"NewTek NDI Audio Source",
"Source",
"NewTek NDI audio source",
"NewTek NDI Source",
"Source/Audio/Video/Network",
"NewTek NDI source",
"Ruben Gonzalez <rubenrua@teltek.es>, Daniel Vilar <daniel.peiteado@teltek.es>, Sebastian Dröge <sebastian@centricular.com>",
)
});
@ -341,28 +349,15 @@ impl ElementImpl for NdiAudioSrc {
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let caps = gst::Caps::new_simple(
"audio/x-raw",
&[
(
"format",
&gst::List::new(&[&gst_audio::AUDIO_FORMAT_S16.to_string()]),
),
("rate", &gst::IntRange::<i32>::new(1, i32::MAX)),
("channels", &gst::IntRange::<i32>::new(1, i32::MAX)),
("layout", &"interleaved"),
],
);
let audio_src_pad_template = gst::PadTemplate::new(
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Sometimes,
&caps,
gst::PadPresence::Always,
&gst::Caps::builder("application/x-ndi").build(),
)
.unwrap();
vec![audio_src_pad_template]
vec![src_pad_template]
});
PAD_TEMPLATES.as_ref()
@ -396,15 +391,15 @@ impl ElementImpl for NdiAudioSrc {
}
}
impl BaseSrcImpl for NdiAudioSrc {
fn negotiate(&self, _element: &Self::Type) -> Result<(), gst::LoggableError> {
// Always succeed here without doing anything: we will set the caps once we received a
// buffer, there's nothing we can negotiate
Ok(())
impl BaseSrcImpl for NdiSrc {
fn negotiate(&self, element: &Self::Type) -> Result<(), gst::LoggableError> {
element
.set_caps(&gst::Caps::builder("application/x-ndi").build())
.map_err(|_| gst::loggable_error!(CAT, "Failed to negotiate caps",))
}
fn unlock(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Unlocking",);
gst_debug!(CAT, obj: element, "Unlocking",);
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_flushing(true);
}
@ -412,7 +407,7 @@ impl BaseSrcImpl for NdiAudioSrc {
}
fn unlock_stop(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Stop unlocking",);
gst_debug!(CAT, obj: element, "Stop unlocking",);
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_flushing(false);
}
@ -430,8 +425,7 @@ impl BaseSrcImpl for NdiAudioSrc {
));
}
let receiver = connect_ndi(
self.cat,
let receiver = Receiver::connect(
element.upcast_ref(),
settings.ndi_name.as_deref(),
settings.url_address.as_deref(),
@ -443,7 +437,6 @@ impl BaseSrcImpl for NdiAudioSrc {
settings.max_queue_length as usize,
);
// settings.id_receiver exists
match receiver {
None => Err(gst::error_msg!(
gst::ResourceError::NotFound,
@ -488,10 +481,10 @@ impl BaseSrcImpl for NdiAudioSrc {
gst::ClockTime::ZERO
};
let max = 5 * latency;
let max = settings.max_queue_length as u64 * latency;
gst_debug!(
self.cat,
CAT,
obj: element,
"Returning latency min {} max {}",
min,
@ -507,18 +500,6 @@ impl BaseSrcImpl for NdiAudioSrc {
}
}
fn fixate(&self, element: &Self::Type, mut caps: gst::Caps) -> gst::Caps {
caps.truncate();
{
let caps = caps.make_mut();
let s = caps.structure_mut(0).unwrap();
s.fixate_field_nearest_int("rate", 48_000);
s.fixate_field_nearest_int("channels", 2);
}
self.parent_fixate(element, caps)
}
fn create(
&self,
element: &Self::Type,
@ -531,46 +512,87 @@ impl BaseSrcImpl for NdiAudioSrc {
match state.receiver.take() {
Some(recv) => recv,
None => {
gst_error!(self.cat, obj: element, "Have no receiver");
gst_error!(CAT, obj: element, "Have no receiver");
return Err(gst::FlowError::Error);
}
}
};
match recv.capture() {
ReceiverItem::Buffer(buffer, info) => {
let mut state = self.state.lock().unwrap();
state.receiver = Some(recv);
if state.info.as_ref() != Some(&info) {
let caps = info.to_caps().map_err(|_| {
gst::element_error!(
element,
gst::ResourceError::Settings,
["Invalid audio info received: {:?}", info]
);
gst::FlowError::NotNegotiated
})?;
state.info = Some(info);
state.current_latency = buffer.duration();
drop(state);
gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps);
element.set_caps(&caps).map_err(|_| {
gst::element_error!(
element,
gst::CoreError::Negotiation,
["Failed to negotiate caps: {:?}", caps]
);
gst::FlowError::NotNegotiated
})?;
let res = recv.capture();
let _ =
element.post_message(gst::message::Latency::builder().src(element).build());
}
let mut state = self.state.lock().unwrap();
state.receiver = Some(recv);
match res {
ReceiverItem::Buffer(buffer) => {
let buffer = match buffer {
Buffer::Audio(mut buffer, info) => {
if state.audio_info.as_ref() != Some(&info) {
let caps = info.to_caps().map_err(|_| {
gst::element_error!(
element,
gst::ResourceError::Settings,
["Invalid audio info received: {:?}", info]
);
gst::FlowError::NotNegotiated
})?;
state.audio_info = Some(info);
state.audio_caps = Some(caps);
}
{
let buffer = buffer.get_mut().unwrap();
ndisrcmeta::NdiSrcMeta::add(
buffer,
ndisrcmeta::StreamType::Audio,
state.audio_caps.as_ref().unwrap(),
);
}
buffer
}
Buffer::Video(mut buffer, info) => {
let mut latency_changed = false;
if state.video_info.as_ref() != Some(&info) {
let caps = info.to_caps().map_err(|_| {
gst::element_error!(
element,
gst::ResourceError::Settings,
["Invalid audio info received: {:?}", info]
);
gst::FlowError::NotNegotiated
})?;
state.video_info = Some(info);
state.video_caps = Some(caps);
latency_changed = state.current_latency != buffer.duration();
state.current_latency = buffer.duration();
}
{
let buffer = buffer.get_mut().unwrap();
ndisrcmeta::NdiSrcMeta::add(
buffer,
ndisrcmeta::StreamType::Video,
state.video_caps.as_ref().unwrap(),
);
}
drop(state);
if latency_changed {
let _ = element.post_message(
gst::message::Latency::builder().src(element).build(),
);
}
buffer
}
};
Ok(CreateSuccess::NewBuffer(buffer))
}
ReceiverItem::Flushing => Err(gst::FlowError::Flushing),
ReceiverItem::Timeout => Err(gst::FlowError::Eos),
ReceiverItem::Flushing => Err(gst::FlowError::Flushing),
ReceiverItem::Error(err) => Err(err),
}
}

19
src/ndisrc/mod.rs Normal file
View file

@ -0,0 +1,19 @@
use glib::prelude::*;
mod imp;
glib::wrapper! {
pub struct NdiSrc(ObjectSubclass<imp::NdiSrc>) @extends gst_base::BaseSrc, gst::Element, gst::Object;
}
unsafe impl Send for NdiSrc {}
unsafe impl Sync for NdiSrc {}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"ndisrc",
gst::Rank::None,
NdiSrc::static_type(),
)
}

280
src/ndisrcdemux/imp.rs Normal file
View file

@ -0,0 +1,280 @@
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::{gst_debug, gst_error, gst_log};
use std::sync::Mutex;
use once_cell::sync::Lazy;
use crate::ndisrcmeta;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"ndisrcdemux",
gst::DebugColorFlags::empty(),
Some("NewTek NDI Source Demuxer"),
)
});
#[derive(Default)]
struct State {
combiner: gst_base::UniqueFlowCombiner,
video_pad: Option<gst::Pad>,
video_caps: Option<gst::Caps>,
audio_pad: Option<gst::Pad>,
audio_caps: Option<gst::Caps>,
}
pub struct NdiSrcDemux {
sinkpad: gst::Pad,
state: Mutex<State>,
}
#[glib::object_subclass]
impl ObjectSubclass for NdiSrcDemux {
const NAME: &'static str = "NdiSrcDemux";
type Type = super::NdiSrcDemux;
type ParentType = gst::Element;
fn with_class(klass: &Self::Class) -> Self {
let templ = klass.pad_template("sink").unwrap();
let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink"))
.flags(gst::PadFlags::FIXED_CAPS)
.chain_function(|pad, parent, buffer| {
NdiSrcDemux::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|self_, element| self_.sink_chain(pad, element, buffer),
)
})
.build();
Self {
sinkpad,
state: Mutex::new(State::default()),
}
}
}
impl ObjectImpl for NdiSrcDemux {
fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
obj.add_pad(&self.sinkpad).unwrap();
}
}
impl ElementImpl for NdiSrcDemux {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"NewTek NDI Source Demuxer",
"Demuxer/Audio/Video",
"NewTek NDI source demuxer",
"Sebastian Dröge <sebastian@centricular.com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&gst::Caps::builder("application/x-ndi").build(),
)
.unwrap();
let audio_src_pad_template = gst::PadTemplate::new(
"audio",
gst::PadDirection::Src,
gst::PadPresence::Sometimes,
&gst::Caps::builder("audio/x-raw").build(),
)
.unwrap();
let video_src_pad_template = gst::PadTemplate::new(
"video",
gst::PadDirection::Src,
gst::PadPresence::Sometimes,
&gst::Caps::builder("video/x-raw").build(),
)
.unwrap();
vec![
sink_pad_template,
audio_src_pad_template,
video_src_pad_template,
]
});
PAD_TEMPLATES.as_ref()
}
fn change_state(
&self,
element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
let res = self.parent_change_state(element, transition)?;
match transition {
gst::StateChange::PausedToReady => {
let mut state = self.state.lock().unwrap();
for pad in [state.audio_pad.take(), state.video_pad.take()]
.iter()
.flatten()
{
element.remove_pad(pad).unwrap();
}
*state = State::default();
}
_ => (),
}
Ok(res)
}
}
impl NdiSrcDemux {
fn sink_chain(
&self,
pad: &gst::Pad,
element: &super::NdiSrcDemux,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer);
let meta = buffer.meta::<ndisrcmeta::NdiSrcMeta>().ok_or_else(|| {
gst_error!(CAT, obj: element, "Buffer without NDI source meta");
gst::FlowError::Error
})?;
let mut events = vec![];
let srcpad;
let mut add_pad = false;
let mut state = self.state.lock().unwrap();
let caps = meta.caps();
match meta.stream_type() {
ndisrcmeta::StreamType::Audio => {
if let Some(ref pad) = state.audio_pad {
srcpad = pad.clone();
} else {
gst_debug!(CAT, obj: element, "Adding audio pad with caps {}", caps);
let klass = element.element_class();
let templ = klass.pad_template("audio").unwrap();
let pad = gst::Pad::builder_with_template(&templ, Some("audio"))
.flags(gst::PadFlags::FIXED_CAPS)
.build();
let mut caps_event = Some(gst::event::Caps::new(&caps));
self.sinkpad.sticky_events_foreach(|ev| {
if ev.type_() < gst::EventType::Caps {
events.push(ev.clone());
} else {
if let Some(ev) = caps_event.take() {
events.push(ev);
}
if ev.type_() != gst::EventType::Caps {
events.push(ev.clone());
}
}
Ok(Some(ev))
});
state.audio_caps = Some(caps.clone());
state.audio_pad = Some(pad.clone());
let _ = pad.set_active(true);
for ev in events.drain(..) {
let _ = pad.store_sticky_event(&ev);
}
state.combiner.add_pad(&pad);
add_pad = true;
srcpad = pad;
}
if state.audio_caps.as_ref() != Some(&caps) {
gst_debug!(CAT, obj: element, "Audio caps changed to {}", caps);
events.push(gst::event::Caps::new(&caps));
state.audio_caps = Some(caps);
}
}
ndisrcmeta::StreamType::Video => {
if let Some(ref pad) = state.video_pad {
srcpad = pad.clone();
} else {
gst_debug!(CAT, obj: element, "Adding video pad with caps {}", caps);
let klass = element.element_class();
let templ = klass.pad_template("video").unwrap();
let pad = gst::Pad::builder_with_template(&templ, Some("video"))
.flags(gst::PadFlags::FIXED_CAPS)
.build();
let mut caps_event = Some(gst::event::Caps::new(&caps));
self.sinkpad.sticky_events_foreach(|ev| {
if ev.type_() < gst::EventType::Caps {
events.push(ev.clone());
} else {
if let Some(ev) = caps_event.take() {
events.push(ev);
}
if ev.type_() != gst::EventType::Caps {
events.push(ev.clone());
}
}
Ok(Some(ev))
});
state.video_caps = Some(caps.clone());
state.video_pad = Some(pad.clone());
let _ = pad.set_active(true);
for ev in events.drain(..) {
let _ = pad.store_sticky_event(&ev);
}
state.combiner.add_pad(&pad);
add_pad = true;
srcpad = pad;
}
if state.video_caps.as_ref() != Some(&caps) {
gst_debug!(CAT, obj: element, "Video caps changed to {}", caps);
events.push(gst::event::Caps::new(&caps));
state.video_caps = Some(caps);
}
}
}
drop(state);
if add_pad {
element.add_pad(&srcpad).unwrap();
}
for ev in events {
srcpad.push_event(ev);
}
let res = srcpad.push(buffer);
let mut state = self.state.lock().unwrap();
state.combiner.update_pad_flow(&srcpad, res)
}
}

19
src/ndisrcdemux/mod.rs Normal file
View file

@ -0,0 +1,19 @@
use glib::prelude::*;
mod imp;
glib::wrapper! {
pub struct NdiSrcDemux(ObjectSubclass<imp::NdiSrcDemux>) @extends gst::Element, gst::Object;
}
unsafe impl Send for NdiSrcDemux {}
unsafe impl Sync for NdiSrcDemux {}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"ndisrcdemux",
gst::Rank::Primary,
NdiSrcDemux::static_type(),
)
}

158
src/ndisrcmeta.rs Normal file
View file

@ -0,0 +1,158 @@
use gst::prelude::*;
use std::fmt;
use std::mem;
#[repr(transparent)]
pub struct NdiSrcMeta(imp::NdiSrcMeta);
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum StreamType {
Audio,
Video,
}
unsafe impl Send for NdiSrcMeta {}
unsafe impl Sync for NdiSrcMeta {}
impl NdiSrcMeta {
pub fn add<'a>(
buffer: &'a mut gst::BufferRef,
stream_type: StreamType,
caps: &gst::Caps,
) -> gst::MetaRefMut<'a, Self, gst::meta::Standalone> {
unsafe {
// Manually dropping because gst_buffer_add_meta() takes ownership of the
// content of the struct
let mut params = mem::ManuallyDrop::new(imp::NdiSrcMetaParams {
caps: caps.clone(),
stream_type,
});
let meta = gst::ffi::gst_buffer_add_meta(
buffer.as_mut_ptr(),
imp::ndi_src_meta_get_info(),
&mut *params as *mut imp::NdiSrcMetaParams as glib::ffi::gpointer,
) as *mut imp::NdiSrcMeta;
Self::from_mut_ptr(buffer, meta)
}
}
pub fn stream_type(&self) -> StreamType {
self.0.stream_type
}
pub fn caps(&self) -> gst::Caps {
self.0.caps.clone()
}
}
unsafe impl MetaAPI for NdiSrcMeta {
type GstType = imp::NdiSrcMeta;
fn meta_api() -> glib::Type {
imp::ndi_src_meta_api_get_type()
}
}
impl fmt::Debug for NdiSrcMeta {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("NdiSrcMeta")
.field("stream_type", &self.stream_type())
.field("caps", &self.caps())
.finish()
}
}
mod imp {
use super::StreamType;
use glib::translate::*;
use once_cell::sync::Lazy;
use std::mem;
use std::ptr;
pub(super) struct NdiSrcMetaParams {
pub caps: gst::Caps,
pub stream_type: StreamType,
}
#[repr(C)]
pub struct NdiSrcMeta {
parent: gst::ffi::GstMeta,
pub(super) caps: gst::Caps,
pub(super) stream_type: StreamType,
}
pub(super) fn ndi_src_meta_api_get_type() -> glib::Type {
static TYPE: Lazy<glib::Type> = Lazy::new(|| unsafe {
let t = from_glib(gst::ffi::gst_meta_api_type_register(
b"GstNdiSrcMetaAPI\0".as_ptr() as *const _,
[ptr::null::<std::os::raw::c_char>()].as_ptr() as *mut *const _,
));
assert_ne!(t, glib::Type::INVALID);
t
});
*TYPE
}
unsafe extern "C" fn ndi_src_meta_init(
meta: *mut gst::ffi::GstMeta,
params: glib::ffi::gpointer,
_buffer: *mut gst::ffi::GstBuffer,
) -> glib::ffi::gboolean {
assert!(!params.is_null());
let meta = &mut *(meta as *mut NdiSrcMeta);
let params = ptr::read(params as *const NdiSrcMetaParams);
ptr::write(&mut meta.stream_type, params.stream_type);
ptr::write(&mut meta.caps, params.caps);
true.into_glib()
}
unsafe extern "C" fn ndi_src_meta_free(
meta: *mut gst::ffi::GstMeta,
_buffer: *mut gst::ffi::GstBuffer,
) {
let meta = &mut *(meta as *mut NdiSrcMeta);
ptr::drop_in_place(&mut meta.stream_type);
ptr::drop_in_place(&mut meta.caps);
}
unsafe extern "C" fn ndi_src_meta_transform(
_dest: *mut gst::ffi::GstBuffer,
_meta: *mut gst::ffi::GstMeta,
_buffer: *mut gst::ffi::GstBuffer,
_type_: glib::ffi::GQuark,
_data: glib::ffi::gpointer,
) -> glib::ffi::gboolean {
false.into_glib()
}
pub(super) fn ndi_src_meta_get_info() -> *const gst::ffi::GstMetaInfo {
struct MetaInfo(ptr::NonNull<gst::ffi::GstMetaInfo>);
unsafe impl Send for MetaInfo {}
unsafe impl Sync for MetaInfo {}
static META_INFO: Lazy<MetaInfo> = Lazy::new(|| unsafe {
MetaInfo(
ptr::NonNull::new(gst::ffi::gst_meta_register(
ndi_src_meta_api_get_type().into_glib(),
b"GstNdiSrcMeta\0".as_ptr() as *const _,
mem::size_of::<NdiSrcMeta>(),
Some(ndi_src_meta_init),
Some(ndi_src_meta_free),
Some(ndi_src_meta_transform),
) as *mut gst::ffi::GstMetaInfo)
.expect("Failed to register meta API"),
)
});
META_INFO.0.as_ptr()
}
}

View file

@ -1,618 +0,0 @@
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::{gst_debug, gst_error};
use gst_base::prelude::*;
use gst_base::subclass::base_src::CreateSuccess;
use gst_base::subclass::prelude::*;
use std::sync::Mutex;
use std::{i32, u32};
use once_cell::sync::Lazy;
use crate::ndisys;
use crate::connect_ndi;
use crate::Receiver;
use crate::ReceiverControlHandle;
use crate::ReceiverItem;
use crate::TimestampMode;
use crate::VideoReceiver;
use crate::DEFAULT_RECEIVER_NDI_NAME;
#[derive(Debug, Clone)]
struct Settings {
ndi_name: Option<String>,
url_address: Option<String>,
connect_timeout: u32,
timeout: u32,
max_queue_length: u32,
receiver_ndi_name: String,
bandwidth: ndisys::NDIlib_recv_bandwidth_e,
timestamp_mode: TimestampMode,
}
impl Default for Settings {
fn default() -> Self {
Settings {
ndi_name: None,
url_address: None,
receiver_ndi_name: DEFAULT_RECEIVER_NDI_NAME.clone(),
connect_timeout: 10000,
timeout: 5000,
max_queue_length: 5,
bandwidth: ndisys::NDIlib_recv_bandwidth_highest,
timestamp_mode: TimestampMode::ReceiveTimeTimecode,
}
}
}
struct State {
info: Option<gst_video::VideoInfo>,
current_latency: Option<gst::ClockTime>,
receiver: Option<Receiver<VideoReceiver>>,
}
impl Default for State {
fn default() -> State {
State {
info: None,
current_latency: gst::ClockTime::NONE,
receiver: None,
}
}
}
pub struct NdiVideoSrc {
cat: gst::DebugCategory,
settings: Mutex<Settings>,
state: Mutex<State>,
receiver_controller: Mutex<Option<ReceiverControlHandle<VideoReceiver>>>,
}
#[glib::object_subclass]
impl ObjectSubclass for NdiVideoSrc {
const NAME: &'static str = "NdiVideoSrc";
type Type = super::NdiVideoSrc;
type ParentType = gst_base::BaseSrc;
fn new() -> Self {
Self {
cat: gst::DebugCategory::new(
"ndivideosrc",
gst::DebugColorFlags::empty(),
Some("NewTek NDI Video Source"),
),
settings: Mutex::new(Default::default()),
state: Mutex::new(Default::default()),
receiver_controller: Mutex::new(None),
}
}
}
impl ObjectImpl for NdiVideoSrc {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpec::new_string(
"ndi-name",
"NDI Name",
"NDI stream name of the sender",
None,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_string(
"url-address",
"URL/Address",
"URL/address and port of the sender, e.g. 127.0.0.1:5961",
None,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_string(
"receiver-ndi-name",
"Receiver NDI Name",
"NDI stream name of this receiver",
Some(&*DEFAULT_RECEIVER_NDI_NAME),
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_uint(
"connect-timeout",
"Connect Timeout",
"Connection timeout in ms",
0,
u32::MAX,
10000,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_uint(
"timeout",
"Timeout",
"Receive timeout in ms",
0,
u32::MAX,
5000,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_uint(
"max-queue-length",
"Max Queue Length",
"Maximum receive queue length",
0,
u32::MAX,
5,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_int(
"bandwidth",
"Bandwidth",
"Bandwidth, -10 metadata-only, 10 audio-only, 100 highest",
-10,
100,
100,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_enum(
"timestamp-mode",
"Timestamp Mode",
"Timestamp information to use for outgoing PTS",
TimestampMode::static_type(),
TimestampMode::ReceiveTimeTimecode as i32,
glib::ParamFlags::READWRITE,
),
]
});
PROPERTIES.as_ref()
}
fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
// Initialize live-ness and notify the base class that
// we'd like to operate in Time format
obj.set_live(true);
obj.set_format(gst::Format::Time);
}
fn set_property(
&self,
obj: &Self::Type,
_id: usize,
value: &glib::Value,
pspec: &glib::ParamSpec,
) {
match pspec.name() {
"ndi-name" => {
let mut settings = self.settings.lock().unwrap();
let ndi_name = value.get().unwrap();
gst_debug!(
self.cat,
obj: obj,
"Changing ndi-name from {:?} to {:?}",
settings.ndi_name,
ndi_name,
);
settings.ndi_name = ndi_name;
}
"url-address" => {
let mut settings = self.settings.lock().unwrap();
let url_address = value.get().unwrap();
gst_debug!(
self.cat,
obj: obj,
"Changing url-address from {:?} to {:?}",
settings.url_address,
url_address,
);
settings.url_address = url_address;
}
"receiver-ndi-name" => {
let mut settings = self.settings.lock().unwrap();
let receiver_ndi_name = value.get::<Option<String>>().unwrap();
gst_debug!(
self.cat,
obj: obj,
"Changing receiver-ndi-name from {:?} to {:?}",
settings.receiver_ndi_name,
receiver_ndi_name,
);
settings.receiver_ndi_name =
receiver_ndi_name.unwrap_or_else(|| DEFAULT_RECEIVER_NDI_NAME.clone());
}
"connect-timeout" => {
let mut settings = self.settings.lock().unwrap();
let connect_timeout = value.get().unwrap();
gst_debug!(
self.cat,
obj: obj,
"Changing connect-timeout from {} to {}",
settings.connect_timeout,
connect_timeout,
);
settings.connect_timeout = connect_timeout;
}
"timeout" => {
let mut settings = self.settings.lock().unwrap();
let timeout = value.get().unwrap();
gst_debug!(
self.cat,
obj: obj,
"Changing timeout from {} to {}",
settings.timeout,
timeout,
);
settings.timeout = timeout;
}
"max-queue-length" => {
let mut settings = self.settings.lock().unwrap();
let max_queue_length = value.get().unwrap();
gst_debug!(
self.cat,
obj: obj,
"Changing max-queue-length from {} to {}",
settings.max_queue_length,
max_queue_length,
);
settings.max_queue_length = max_queue_length;
}
"bandwidth" => {
let mut settings = self.settings.lock().unwrap();
let bandwidth = value.get().unwrap();
gst_debug!(
self.cat,
obj: obj,
"Changing bandwidth from {} to {}",
settings.bandwidth,
bandwidth,
);
settings.bandwidth = bandwidth;
}
"timestamp-mode" => {
let mut settings = self.settings.lock().unwrap();
let timestamp_mode = value.get().unwrap();
gst_debug!(
self.cat,
obj: obj,
"Changing timestamp mode from {:?} to {:?}",
settings.timestamp_mode,
timestamp_mode
);
if settings.timestamp_mode != timestamp_mode {
let _ = obj.post_message(gst::message::Latency::builder().src(obj).build());
}
settings.timestamp_mode = timestamp_mode;
}
_ => unimplemented!(),
}
}
fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"ndi-name" => {
let settings = self.settings.lock().unwrap();
settings.ndi_name.to_value()
}
"url-address" => {
let settings = self.settings.lock().unwrap();
settings.url_address.to_value()
}
"receiver-ndi-name" => {
let settings = self.settings.lock().unwrap();
settings.receiver_ndi_name.to_value()
}
"connect-timeout" => {
let settings = self.settings.lock().unwrap();
settings.connect_timeout.to_value()
}
"timeout" => {
let settings = self.settings.lock().unwrap();
settings.timeout.to_value()
}
"max-queue-length" => {
let settings = self.settings.lock().unwrap();
settings.max_queue_length.to_value()
}
"bandwidth" => {
let settings = self.settings.lock().unwrap();
settings.bandwidth.to_value()
}
"timestamp-mode" => {
let settings = self.settings.lock().unwrap();
settings.timestamp_mode.to_value()
}
_ => unimplemented!(),
}
}
}
impl ElementImpl for NdiVideoSrc {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"NewTek NDI Video Source",
"Source",
"NewTek NDI video source",
"Ruben Gonzalez <rubenrua@teltek.es>, Daniel Vilar <daniel.peiteado@teltek.es>, Sebastian Dröge <sebastian@centricular.com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
// On the src pad, we can produce F32/F64 with any sample rate
// and any number of channels
let caps = gst::Caps::new_simple(
"video/x-raw",
&[
(
"format",
&gst::List::new(&[
&gst_video::VideoFormat::Uyvy.to_string(),
&gst_video::VideoFormat::Yv12.to_string(),
&gst_video::VideoFormat::Nv12.to_string(),
&gst_video::VideoFormat::I420.to_string(),
&gst_video::VideoFormat::Bgra.to_string(),
&gst_video::VideoFormat::Bgrx.to_string(),
&gst_video::VideoFormat::Rgba.to_string(),
&gst_video::VideoFormat::Rgbx.to_string(),
]),
),
("width", &gst::IntRange::<i32>::new(0, i32::MAX)),
("height", &gst::IntRange::<i32>::new(0, i32::MAX)),
(
"framerate",
&gst::FractionRange::new(
gst::Fraction::new(0, 1),
gst::Fraction::new(i32::MAX, 1),
),
),
],
);
#[cfg(feature = "interlaced-fields")]
let caps = {
let mut tmp = caps.copy();
{
let tmp = tmp.get_mut().unwrap();
tmp.set_features_simple(Some(gst::CapsFeatures::new(&["format:Interlaced"])));
}
let mut caps = caps;
{
let caps = caps.get_mut().unwrap();
caps.append(tmp);
}
caps
};
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
)
.unwrap();
vec![src_pad_template]
});
PAD_TEMPLATES.as_ref()
}
fn change_state(
&self,
element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
match transition {
gst::StateChange::PausedToPlaying => {
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_playing(true);
}
}
gst::StateChange::PlayingToPaused => {
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_playing(false);
}
}
gst::StateChange::PausedToReady => {
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.shutdown();
}
}
_ => (),
}
self.parent_change_state(element, transition)
}
}
impl BaseSrcImpl for NdiVideoSrc {
fn negotiate(&self, _element: &Self::Type) -> Result<(), gst::LoggableError> {
// Always succeed here without doing anything: we will set the caps once we received a
// buffer, there's nothing we can negotiate
Ok(())
}
fn unlock(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Unlocking",);
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_flushing(true);
}
Ok(())
}
fn unlock_stop(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Stop unlocking",);
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_flushing(false);
}
Ok(())
}
fn start(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
*self.state.lock().unwrap() = Default::default();
let settings = self.settings.lock().unwrap().clone();
if settings.ndi_name.is_none() && settings.url_address.is_none() {
return Err(gst::error_msg!(
gst::LibraryError::Settings,
["No NDI name or URL/address given"]
));
}
let receiver = connect_ndi(
self.cat,
element.upcast_ref(),
settings.ndi_name.as_deref(),
settings.url_address.as_deref(),
&settings.receiver_ndi_name,
settings.connect_timeout,
settings.bandwidth,
settings.timestamp_mode,
settings.timeout,
settings.max_queue_length as usize,
);
// settings.id_receiver exists
match receiver {
None => Err(gst::error_msg!(
gst::ResourceError::NotFound,
["Could not connect to this source"]
)),
Some(receiver) => {
*self.receiver_controller.lock().unwrap() =
Some(receiver.receiver_control_handle());
let mut state = self.state.lock().unwrap();
state.receiver = Some(receiver);
Ok(())
}
}
}
fn stop(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> {
if let Some(ref controller) = self.receiver_controller.lock().unwrap().take() {
controller.shutdown();
}
*self.state.lock().unwrap() = State::default();
Ok(())
}
fn query(&self, element: &Self::Type, query: &mut gst::QueryRef) -> bool {
use gst::QueryView;
match query.view_mut() {
QueryView::Scheduling(ref mut q) => {
q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0);
q.add_scheduling_modes(&[gst::PadMode::Push]);
true
}
QueryView::Latency(ref mut q) => {
let state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
if let Some(latency) = state.current_latency {
let min = if settings.timestamp_mode != TimestampMode::Timecode {
latency
} else {
gst::ClockTime::ZERO
};
let max = 5 * latency;
println!("Returning latency min {} max {}", min, max,);
gst_debug!(
self.cat,
obj: element,
"Returning latency min {} max {}",
min,
max
);
q.set(true, min, max);
true
} else {
false
}
}
_ => BaseSrcImplExt::parent_query(self, element, query),
}
}
fn fixate(&self, element: &Self::Type, mut caps: gst::Caps) -> gst::Caps {
caps.truncate();
{
let caps = caps.make_mut();
let s = caps.structure_mut(0).unwrap();
s.fixate_field_nearest_int("width", 1920);
s.fixate_field_nearest_int("height", 1080);
if s.has_field("pixel-aspect-ratio") {
s.fixate_field_nearest_fraction("pixel-aspect-ratio", gst::Fraction::new(1, 1));
}
}
self.parent_fixate(element, caps)
}
//Creates the video buffers
fn create(
&self,
element: &Self::Type,
_offset: u64,
_buffer: Option<&mut gst::BufferRef>,
_length: u32,
) -> Result<CreateSuccess, gst::FlowError> {
let recv = {
let mut state = self.state.lock().unwrap();
match state.receiver.take() {
Some(recv) => recv,
None => {
gst_error!(self.cat, obj: element, "Have no receiver");
return Err(gst::FlowError::Error);
}
}
};
match recv.capture() {
ReceiverItem::Buffer(buffer, info) => {
let mut state = self.state.lock().unwrap();
state.receiver = Some(recv);
if state.info.as_ref() != Some(&info) {
let caps = info.to_caps().map_err(|_| {
gst::element_error!(
element,
gst::ResourceError::Settings,
["Invalid audio info received: {:?}", info]
);
gst::FlowError::NotNegotiated
})?;
state.info = Some(info);
state.current_latency = buffer.duration();
drop(state);
gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps);
element.set_caps(&caps).map_err(|_| {
gst::element_error!(
element,
gst::CoreError::Negotiation,
["Failed to negotiate caps: {:?}", caps]
);
gst::FlowError::NotNegotiated
})?;
let _ =
element.post_message(gst::message::Latency::builder().src(element).build());
}
Ok(CreateSuccess::NewBuffer(buffer))
}
ReceiverItem::Timeout => Err(gst::FlowError::Eos),
ReceiverItem::Flushing => Err(gst::FlowError::Flushing),
ReceiverItem::Error(err) => Err(err),
}
}
}

View file

@ -1,19 +0,0 @@
use glib::prelude::*;
mod imp;
glib::wrapper! {
pub struct NdiVideoSrc(ObjectSubclass<imp::NdiVideoSrc>) @extends gst_base::BaseSrc, gst::Element, gst::Object;
}
unsafe impl Send for NdiVideoSrc {}
unsafe impl Sync for NdiVideoSrc {}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"ndivideosrc",
gst::Rank::None,
NdiVideoSrc::static_type(),
)
}

File diff suppressed because it is too large Load diff