Move connecting and capturing to separate threads

This ensures that we'll be able to capture every frame even if
downstream of the source is blocking for a moment, and also allows us to
make all operations cancellable.
This commit is contained in:
Sebastian Dröge 2019-07-17 19:10:20 +03:00
parent 19d25d20a7
commit 66d4fd1d90
6 changed files with 1567 additions and 874 deletions

View file

@ -16,13 +16,13 @@ pub mod ndi;
mod ndiaudiosrc;
pub mod ndisys;
mod ndivideosrc;
pub mod receiver;
use ndi::*;
use ndisys::*;
use receiver::*;
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
use std::sync::Mutex;
use std::time;
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)]
@ -43,21 +43,7 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
Ok(())
}
struct ReceiverInfo {
id: usize,
ndi_name: String,
ip_address: String,
video: bool,
audio: bool,
ndi_instance: RecvInstance,
}
lazy_static! {
static ref HASHMAP_RECEIVERS: Mutex<HashMap<usize, ReceiverInfo>> = {
let m = HashMap::new();
Mutex::new(m)
};
static ref DEFAULT_RECEIVER_NDI_NAME: String = {
format!("GStreamer NDI Source {}-{}", env!("CARGO_PKG_VERSION"), env!("COMMIT_ID"))
};
@ -73,157 +59,6 @@ lazy_static! {
};
}
static ID_RECEIVER: AtomicUsize = AtomicUsize::new(0);
fn connect_ndi(
cat: gst::DebugCategory,
element: &gst_base::BaseSrc,
ip_address: Option<&str>,
ndi_name: Option<&str>,
receiver_ndi_name: &str,
connect_timeout: u32,
bandwidth: NDIlib_recv_bandwidth_e,
cancel: &AtomicBool,
) -> Option<usize> {
gst_debug!(cat, obj: element, "Starting NDI connection...");
let mut receivers = HASHMAP_RECEIVERS.lock().unwrap();
let video = element.get_type() == ndivideosrc::NdiVideoSrc::get_type();
for val in receivers.values_mut() {
if Some(val.ip_address.as_str()) == ip_address || Some(val.ndi_name.as_str()) == ndi_name {
if (val.video || !video) && (val.audio || video) {
continue;
} else {
if video {
val.video = true;
} else {
val.audio = true;
}
return Some(val.id);
}
}
}
let mut find = match FindInstance::builder().build() {
None => {
gst_element_error!(
element,
gst::CoreError::Negotiation,
["Cannot run NDI: NDIlib_find_create_v2 error"]
);
return None;
}
Some(find) => find,
};
let timeout = time::Instant::now();
let source = loop {
if cancel.load(Ordering::SeqCst) {
gst_debug!(cat, obj: element, "Cancelled");
return None;
}
find.wait_for_sources(50);
if cancel.load(Ordering::SeqCst) {
gst_debug!(cat, obj: element, "Cancelled");
return None;
}
let sources = find.get_current_sources();
gst_debug!(
cat,
obj: element,
"Total sources found in network {}",
sources.len(),
);
let source = sources
.iter()
.find(|s| Some(s.ndi_name()) == ndi_name || Some(s.ip_address()) == ip_address);
if let Some(source) = source {
break source.to_owned();
}
if timeout.elapsed().as_millis() >= connect_timeout as u128 {
gst_element_error!(element, gst::ResourceError::NotFound, ["Stream not found"]);
return None;
}
};
gst_debug!(
cat,
obj: element,
"Connecting to NDI source with ndi-name '{}' and ip-address '{}'",
source.ndi_name(),
source.ip_address(),
);
// FIXME: Ideally we would use NDIlib_recv_color_format_fastest here but that seems to be
// broken with interlaced content currently
let recv = RecvInstance::builder(&source, receiver_ndi_name)
.bandwidth(bandwidth)
.color_format(NDIlib_recv_color_format_e::NDIlib_recv_color_format_UYVY_BGRA)
.allow_video_fields(true)
.build();
let recv = match recv {
None => {
gst_element_error!(
element,
gst::CoreError::Negotiation,
["Cannot run NDI: NDIlib_recv_create_v3 error"]
);
return None;
}
Some(recv) => recv,
};
recv.set_tally(&Tally::default());
let enable_hw_accel = MetadataFrame::new(0, Some("<ndi_hwaccel enabled=\"true\"/>"));
recv.send_metadata(&enable_hw_accel);
let id_receiver = ID_RECEIVER.fetch_add(1, Ordering::SeqCst);
receivers.insert(
id_receiver,
ReceiverInfo {
id: id_receiver,
ndi_name: source.ndi_name().to_owned(),
ip_address: source.ip_address().to_owned(),
video,
audio: !video,
ndi_instance: recv,
},
);
gst_debug!(cat, obj: element, "Started NDI connection");
Some(id_receiver)
}
fn stop_ndi(cat: gst::DebugCategory, element: &gst_base::BaseSrc, id: usize) -> bool {
gst_debug!(cat, obj: element, "Closing NDI connection...");
let mut receivers = HASHMAP_RECEIVERS.lock().unwrap();
{
let val = receivers.get_mut(&id).unwrap();
if val.video && val.audio {
let video = element.get_type() == ndivideosrc::NdiVideoSrc::get_type();
if video {
val.video = false;
} else {
val.audio = false;
}
return true;
}
}
receivers.remove(&id);
gst_debug!(cat, obj: element, "Closed NDI connection");
true
}
impl glib::translate::ToGlib for TimestampMode {
type GlibType = i32;

View file

@ -281,6 +281,15 @@ impl RecvInstance {
}
}
pub fn get_queue(&self) -> Queue {
unsafe {
let _lock = (self.0).1.lock().unwrap();
let mut queue = mem::MaybeUninit::uninit();
NDIlib_recv_get_queue(((self.0).0).0.as_ptr(), queue.as_mut_ptr());
Queue(queue.assume_init())
}
}
pub fn capture(
&self,
video: bool,
@ -490,7 +499,7 @@ impl<'a> VideoFrame<'a> {
impl<'a> Drop for VideoFrame<'a> {
#[allow(irrefutable_let_patterns)]
fn drop(&mut self) {
if let VideoFrame::Borrowed(ref frame, ref recv) = *self {
if let VideoFrame::Borrowed(ref mut frame, ref recv) = *self {
unsafe {
NDIlib_recv_free_video_v2(((recv.0).0).0.as_ptr() as *mut _, frame);
}
@ -597,7 +606,7 @@ impl<'a> AudioFrame<'a> {
impl<'a> Drop for AudioFrame<'a> {
#[allow(irrefutable_let_patterns)]
fn drop(&mut self) {
if let AudioFrame::Borrowed(ref frame, ref recv) = *self {
if let AudioFrame::Borrowed(ref mut frame, ref recv) = *self {
unsafe {
NDIlib_recv_free_audio_v2(((recv.0).0).0.as_ptr() as *mut _, frame);
}
@ -689,10 +698,25 @@ impl<'a> Default for MetadataFrame<'a> {
impl<'a> Drop for MetadataFrame<'a> {
fn drop(&mut self) {
if let MetadataFrame::Borrowed(ref frame, ref recv) = *self {
if let MetadataFrame::Borrowed(ref mut frame, ref recv) = *self {
unsafe {
NDIlib_recv_free_metadata(((recv.0).0).0.as_ptr() as *mut _, frame);
}
}
}
}
#[derive(Debug, Clone)]
pub struct Queue(NDIlib_recv_queue_t);
impl Queue {
pub fn audio_frames(&self) -> i32 {
self.0.audio_frames
}
pub fn video_frames(&self) -> i32 {
self.0.video_frames
}
pub fn metadata_frames(&self) -> i32 {
self.0.metadata_frames
}
}

View file

@ -10,24 +10,16 @@ use gst_base::prelude::*;
use gst_base::subclass::prelude::*;
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time;
use std::{i32, u32};
use connect_ndi;
use ndi::*;
use ndisys;
use stop_ndi;
use Receiver;
use ReceiverControlHandle;
use ReceiverItem;
use TimestampMode;
use DEFAULT_RECEIVER_NDI_NAME;
use HASHMAP_RECEIVERS;
#[cfg(feature = "reference-timestamps")]
use TIMECODE_CAPS;
#[cfg(feature = "reference-timestamps")]
use TIMESTAMP_CAPS;
use byte_slice_cast::AsMutSliceOf;
#[derive(Debug, Clone)]
struct Settings {
@ -129,7 +121,7 @@ static PROPERTIES: [subclass::Property; 7] = [
struct State {
info: Option<gst_audio::AudioInfo>,
id_receiver: Option<usize>,
receiver: Option<Receiver>,
current_latency: gst::ClockTime,
}
@ -137,7 +129,7 @@ impl Default for State {
fn default() -> State {
State {
info: None,
id_receiver: None,
receiver: None,
current_latency: gst::CLOCK_TIME_NONE,
}
}
@ -147,7 +139,7 @@ pub(crate) struct NdiAudioSrc {
cat: gst::DebugCategory,
settings: Mutex<Settings>,
state: Mutex<State>,
unlock: AtomicBool,
receiver_controller: Mutex<Option<ReceiverControlHandle>>,
}
impl ObjectSubclass for NdiAudioSrc {
@ -167,7 +159,7 @@ impl ObjectSubclass for NdiAudioSrc {
),
settings: Mutex::new(Default::default()),
state: Mutex::new(Default::default()),
unlock: AtomicBool::new(false),
receiver_controller: Mutex::new(None),
}
}
@ -353,26 +345,52 @@ impl ObjectImpl for NdiAudioSrc {
}
}
impl ElementImpl for NdiAudioSrc {}
impl ElementImpl for NdiAudioSrc {
fn change_state(
&self,
element: &gst::Element,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
match transition {
gst::StateChange::PausedToPlaying => {
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_playing(true);
}
}
gst::StateChange::PlayingToPaused => {
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_playing(false);
}
}
gst::StateChange::PausedToReady => {
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.shutdown();
}
}
_ => (),
}
self.parent_change_state(element, transition)
}
}
impl BaseSrcImpl for NdiAudioSrc {
fn unlock(&self, element: &gst_base::BaseSrc) -> std::result::Result<(), gst::ErrorMessage> {
gst_debug!(
self.cat,
obj: element,
"Unlocking",
);
self.unlock.store(true, Ordering::SeqCst);
gst_debug!(self.cat, obj: element, "Unlocking",);
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_flushing(true);
}
Ok(())
}
fn unlock_stop(&self, element: &gst_base::BaseSrc) -> std::result::Result<(), gst::ErrorMessage> {
gst_debug!(
self.cat,
obj: element,
"Stop unlocking",
);
self.unlock.store(false, Ordering::SeqCst);
fn unlock_stop(
&self,
element: &gst_base::BaseSrc,
) -> std::result::Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Stop unlocking",);
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_flushing(false);
}
Ok(())
}
@ -387,7 +405,7 @@ impl BaseSrcImpl for NdiAudioSrc {
));
}
let id_receiver = connect_ndi(
let receiver = connect_ndi(
self.cat,
element,
settings.ip_address.as_ref().map(String::as_str),
@ -395,33 +413,32 @@ impl BaseSrcImpl for NdiAudioSrc {
&settings.receiver_ndi_name,
settings.connect_timeout,
settings.bandwidth,
&self.unlock,
settings.timestamp_mode,
settings.timeout,
);
// settings.id_receiver exists
match id_receiver {
None if self.unlock.load(Ordering::SeqCst) => Ok(()),
match receiver {
None => Err(gst_error_msg!(
gst::ResourceError::NotFound,
["Could not connect to this source"]
)),
Some(id_receiver) => {
Some(receiver) => {
*self.receiver_controller.lock().unwrap() =
Some(receiver.receiver_control_handle());
let mut state = self.state.lock().unwrap();
state.id_receiver = Some(id_receiver);
state.receiver = Some(receiver);
Ok(())
}
}
}
fn stop(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
*self.state.lock().unwrap() = Default::default();
let mut state = self.state.lock().unwrap();
if let Some(id_receiver) = state.id_receiver.take() {
stop_ndi(self.cat, element, id_receiver);
fn stop(&self, _element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
if let Some(ref controller) = self.receiver_controller.lock().unwrap().take() {
controller.shutdown();
}
*state = State::default();
*self.state.lock().unwrap() = State::default();
Ok(())
}
@ -474,210 +491,42 @@ impl BaseSrcImpl for NdiAudioSrc {
_offset: u64,
_length: u32,
) -> Result<gst::Buffer, gst::FlowError> {
self.capture(element)
}
}
impl NdiAudioSrc {
fn capture(&self, element: &gst_base::BaseSrc) -> Result<gst::Buffer, gst::FlowError> {
let settings = self.settings.lock().unwrap().clone();
let recv = {
let state = self.state.lock().unwrap();
let receivers = HASHMAP_RECEIVERS.lock().unwrap();
let receiver = &receivers.get(&state.id_receiver.unwrap()).unwrap();
receiver.ndi_instance.clone()
};
let timeout = time::Instant::now();
let audio_frame = loop {
// FIXME: make interruptable
let res = loop {
match recv.capture(false, true, false, 50) {
Err(_) => break Err(()),
Ok(None) => break Ok(None),
Ok(Some(Frame::Audio(frame))) => break Ok(Some(frame)),
_ => unreachable!(),
}
};
let audio_frame = match res {
Err(_) => {
gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type error received, assuming that the source closed the stream...."]);
let mut state = self.state.lock().unwrap();
match state.receiver.take() {
Some(recv) => recv,
None => {
gst_error!(self.cat, obj: element, "Have no receiver");
return Err(gst::FlowError::Error);
}
Ok(None) if timeout.elapsed().as_millis() >= settings.timeout as u128 => {
return Err(gst::FlowError::Eos);
}
Ok(None) => {
gst_debug!(self.cat, obj: element, "No audio frame received yet, retry");
continue;
}
Ok(Some(frame)) => frame,
};
break audio_frame;
}
};
let pts = self.calculate_timestamp(element, &settings, &audio_frame);
let info = self.create_audio_info(element, &audio_frame)?;
match recv.capture() {
ReceiverItem::AudioBuffer(buffer, info) => {
let mut state = self.state.lock().unwrap();
state.receiver = Some(recv);
if state.info.as_ref() != Some(&info) {
let caps = info.to_caps().unwrap();
state.info = Some(info.clone());
state.current_latency = buffer.get_duration();
drop(state);
gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps);
element
.set_caps(&caps)
.map_err(|_| gst::FlowError::NotNegotiated)?;
{
let mut state = self.state.lock().unwrap();
if state.info.as_ref() != Some(&info) {
let caps = info.to_caps().unwrap();
state.info = Some(info.clone());
state.current_latency = gst::SECOND
.mul_div_ceil(
audio_frame.no_samples() as u64,
audio_frame.sample_rate() as u64,
)
.unwrap_or(gst::CLOCK_TIME_NONE);
drop(state);
gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps);
element
.set_caps(&caps)
.map_err(|_| gst::FlowError::NotNegotiated)?;
let _ = element
.post_message(&gst::Message::new_latency().src(Some(element)).build());
}
let _ =
element.post_message(&gst::Message::new_latency().src(Some(element)).build());
Ok(buffer)
}
ReceiverItem::Flushing => Err(gst::FlowError::Flushing),
ReceiverItem::Timeout => Err(gst::FlowError::Eos),
ReceiverItem::Error(err) => Err(err),
ReceiverItem::VideoBuffer(..) => unreachable!(),
}
let buffer = self.create_buffer(element, pts, &info, &audio_frame)?;
gst_log!(self.cat, obj: element, "Produced buffer {:?}", buffer);
Ok(buffer)
}
fn calculate_timestamp(
&self,
element: &gst_base::BaseSrc,
settings: &Settings,
audio_frame: &AudioFrame,
) -> gst::ClockTime {
let clock = element.get_clock().unwrap();
// For now take the current running time as PTS. At a later time we
// will want to work with the timestamp given by the NDI SDK if available
let now = clock.get_time();
let base_time = element.get_base_time();
let receive_time = now - base_time;
let real_time_now = gst::ClockTime::from(glib::get_real_time() as u64 * 1000);
let timestamp = if audio_frame.timestamp() == ndisys::NDIlib_recv_timestamp_undefined {
gst::CLOCK_TIME_NONE
} else {
gst::ClockTime::from(audio_frame.timestamp() as u64 * 100)
};
let timecode = gst::ClockTime::from(audio_frame.timecode() as u64 * 100);
gst_log!(
self.cat,
obj: element,
"NDI audio frame received: {:?} with timecode {} and timestamp {}, receive time {}, local time now {}",
audio_frame,
timecode,
timestamp,
receive_time,
real_time_now,
);
let pts = match settings.timestamp_mode {
TimestampMode::ReceiveTime => receive_time,
TimestampMode::Timecode => timecode,
TimestampMode::Timestamp if timestamp.is_none() => receive_time,
TimestampMode::Timestamp => {
// Timestamps are relative to the UNIX epoch
if real_time_now > timestamp {
let diff = real_time_now - timestamp;
if diff > receive_time {
0.into()
} else {
receive_time - diff
}
} else {
let diff = timestamp - real_time_now;
receive_time + diff
}
}
};
gst_log!(
self.cat,
obj: element,
"Calculated pts for audio frame: {:?}",
pts
);
pts
}
fn create_audio_info(
&self,
_element: &gst_base::BaseSrc,
audio_frame: &AudioFrame,
) -> Result<gst_audio::AudioInfo, gst::FlowError> {
let builder = gst_audio::AudioInfo::new(
gst_audio::AUDIO_FORMAT_S16,
audio_frame.sample_rate() as u32,
audio_frame.no_channels() as u32,
);
Ok(builder.build().unwrap())
}
fn create_buffer(
&self,
_element: &gst_base::BaseSrc,
pts: gst::ClockTime,
info: &gst_audio::AudioInfo,
audio_frame: &AudioFrame,
) -> Result<gst::Buffer, gst::FlowError> {
// We multiply by 2 because is the size in bytes of an i16 variable
let buff_size = (audio_frame.no_samples() as u32 * info.bpf()) as usize;
let mut buffer = gst::Buffer::with_size(buff_size).unwrap();
{
let duration = gst::SECOND
.mul_div_floor(
audio_frame.no_samples() as u64,
audio_frame.sample_rate() as u64,
)
.unwrap_or(gst::CLOCK_TIME_NONE);
let buffer = buffer.get_mut().unwrap();
buffer.set_pts(pts);
buffer.set_duration(duration);
#[cfg(feature = "reference-timestamps")]
{
gst::ReferenceTimestampMeta::add(
buffer,
&*TIMECODE_CAPS,
gst::ClockTime::from(audio_frame.timecode() as u64 * 100),
gst::CLOCK_TIME_NONE,
);
if audio_frame.timestamp() != ndisys::NDIlib_recv_timestamp_undefined {
gst::ReferenceTimestampMeta::add(
buffer,
&*TIMESTAMP_CAPS,
gst::ClockTime::from(audio_frame.timestamp() as u64 * 100),
gst::CLOCK_TIME_NONE,
);
}
}
audio_frame.copy_to_interleaved_16s(
buffer
.map_writable()
.unwrap()
.as_mut_slice_of::<i16>()
.unwrap(),
);
}
Ok(buffer)
}
}

View file

@ -41,22 +41,26 @@ extern "C" {
) -> bool;
pub fn NDIlib_recv_capture_v2(
p_instance: NDIlib_recv_instance_t,
p_video_data: *const NDIlib_video_frame_v2_t,
p_audio_data: *const NDIlib_audio_frame_v2_t,
p_metadata: *const NDIlib_metadata_frame_t,
p_video_data: *mut NDIlib_video_frame_v2_t,
p_audio_data: *mut NDIlib_audio_frame_v2_t,
p_metadata: *mut NDIlib_metadata_frame_t,
timeout_in_ms: u32,
) -> NDIlib_frame_type_e;
pub fn NDIlib_recv_free_video_v2(
p_instance: NDIlib_recv_instance_t,
p_video_data: *const NDIlib_video_frame_v2_t,
p_video_data: *mut NDIlib_video_frame_v2_t,
);
pub fn NDIlib_recv_free_audio_v2(
p_instance: NDIlib_recv_instance_t,
p_audio_data: *const NDIlib_audio_frame_v2_t,
p_audio_data: *mut NDIlib_audio_frame_v2_t,
);
pub fn NDIlib_recv_free_metadata(
p_instance: NDIlib_recv_instance_t,
p_metadata: *const NDIlib_metadata_frame_t,
p_metadata: *mut NDIlib_metadata_frame_t,
);
pub fn NDIlib_recv_get_queue(
p_instance: NDIlib_recv_instance_t,
p_total: *mut NDIlib_recv_queue_t,
);
}
@ -151,6 +155,14 @@ pub struct NDIlib_tally_t {
pub on_preview: bool,
}
#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct NDIlib_recv_queue_t {
pub video_frames: i32,
pub audio_frames: i32,
pub metadata_frames: i32,
}
#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct NDIlib_metadata_frame_t {

View file

@ -9,26 +9,19 @@ use gst_base::prelude::*;
use gst_base::subclass::prelude::*;
use gst_video;
use gst_video::prelude::*;
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time;
use std::{i32, u32};
use ndi::*;
use ndisys;
use connect_ndi;
use stop_ndi;
use Receiver;
use ReceiverControlHandle;
use ReceiverItem;
use TimestampMode;
use DEFAULT_RECEIVER_NDI_NAME;
use HASHMAP_RECEIVERS;
#[cfg(feature = "reference-timestamps")]
use TIMECODE_CAPS;
#[cfg(feature = "reference-timestamps")]
use TIMESTAMP_CAPS;
#[derive(Debug, Clone)]
struct Settings {
@ -130,16 +123,16 @@ static PROPERTIES: [subclass::Property; 7] = [
struct State {
info: Option<gst_video::VideoInfo>,
id_receiver: Option<usize>,
current_latency: gst::ClockTime,
receiver: Option<Receiver>,
}
impl Default for State {
fn default() -> State {
State {
info: None,
id_receiver: None,
current_latency: gst::CLOCK_TIME_NONE,
receiver: None,
}
}
}
@ -148,7 +141,7 @@ pub(crate) struct NdiVideoSrc {
cat: gst::DebugCategory,
settings: Mutex<Settings>,
state: Mutex<State>,
unlock: AtomicBool,
receiver_controller: Mutex<Option<ReceiverControlHandle>>,
}
impl ObjectSubclass for NdiVideoSrc {
@ -168,7 +161,7 @@ impl ObjectSubclass for NdiVideoSrc {
),
settings: Mutex::new(Default::default()),
state: Mutex::new(Default::default()),
unlock: AtomicBool::new(false),
receiver_controller: Mutex::new(None),
}
}
@ -389,10 +382,27 @@ impl ObjectImpl for NdiVideoSrc {
}
impl ElementImpl for NdiVideoSrc {
fn change_state(&self, element: &gst::Element, transition: gst::StateChange) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
fn change_state(
&self,
element: &gst::Element,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
match transition {
gst::StateChange::ReadyToPaused => self.unlock.store(true, Ordering::SeqCst),
gst::StateChange::PausedToReady => self.unlock.store(false, Ordering::SeqCst),
gst::StateChange::PausedToPlaying => {
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_playing(true);
}
}
gst::StateChange::PlayingToPaused => {
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_playing(false);
}
}
gst::StateChange::PausedToReady => {
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.shutdown();
}
}
_ => (),
}
@ -402,28 +412,25 @@ impl ElementImpl for NdiVideoSrc {
impl BaseSrcImpl for NdiVideoSrc {
fn unlock(&self, element: &gst_base::BaseSrc) -> std::result::Result<(), gst::ErrorMessage> {
gst_debug!(
self.cat,
obj: element,
"Unlocking",
);
self.unlock.store(true, Ordering::SeqCst);
gst_debug!(self.cat, obj: element, "Unlocking",);
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_flushing(true);
}
Ok(())
}
fn unlock_stop(&self, element: &gst_base::BaseSrc) -> std::result::Result<(), gst::ErrorMessage> {
gst_debug!(
self.cat,
obj: element,
"Stop unlocking",
);
self.unlock.store(false, Ordering::SeqCst);
fn unlock_stop(
&self,
element: &gst_base::BaseSrc,
) -> std::result::Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Stop unlocking",);
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_flushing(false);
}
Ok(())
}
fn start(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
self.unlock.store(false, Ordering::SeqCst);
*self.state.lock().unwrap() = Default::default();
let settings = self.settings.lock().unwrap().clone();
@ -434,7 +441,7 @@ impl BaseSrcImpl for NdiVideoSrc {
));
}
let id_receiver = connect_ndi(
let receiver = connect_ndi(
self.cat,
element,
settings.ip_address.as_ref().map(String::as_str),
@ -442,35 +449,32 @@ impl BaseSrcImpl for NdiVideoSrc {
&settings.receiver_ndi_name,
settings.connect_timeout,
settings.bandwidth,
&self.unlock,
settings.timestamp_mode,
settings.timeout,
);
// settings.id_receiver exists
match id_receiver {
None if self.unlock.load(Ordering::SeqCst) => Ok(()),
match receiver {
None => Err(gst_error_msg!(
gst::ResourceError::NotFound,
["Could not connect to this source"]
)),
Some(id_receiver) => {
Some(receiver) => {
*self.receiver_controller.lock().unwrap() =
Some(receiver.receiver_control_handle());
let mut state = self.state.lock().unwrap();
state.id_receiver = Some(id_receiver);
state.receiver = Some(receiver);
Ok(())
}
}
}
fn stop(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
self.unlock.store(true, Ordering::SeqCst);
*self.state.lock().unwrap() = Default::default();
let mut state = self.state.lock().unwrap();
if let Some(id_receiver) = state.id_receiver.take() {
stop_ndi(self.cat, element, id_receiver);
fn stop(&self, _element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
if let Some(ref controller) = self.receiver_controller.lock().unwrap().take() {
controller.shutdown();
}
*state = State::default();
*self.state.lock().unwrap() = State::default();
Ok(())
}
@ -527,431 +531,42 @@ impl BaseSrcImpl for NdiVideoSrc {
_offset: u64,
_length: u32,
) -> Result<gst::Buffer, gst::FlowError> {
self.capture(element)
}
}
impl NdiVideoSrc {
fn capture(&self, element: &gst_base::BaseSrc) -> Result<gst::Buffer, gst::FlowError> {
let settings = self.settings.lock().unwrap().clone();
let recv = {
let state = self.state.lock().unwrap();
let receivers = HASHMAP_RECEIVERS.lock().unwrap();
let receiver = &receivers.get(&state.id_receiver.unwrap()).unwrap();
receiver.ndi_instance.clone()
};
let timeout = time::Instant::now();
let video_frame = loop {
// FIXME: make interruptable
let res = loop {
match recv.capture(true, false, false, 50) {
Err(_) => break Err(()),
Ok(None) => break Ok(None),
Ok(Some(Frame::Video(frame))) => break Ok(Some(frame)),
_ => unreachable!(),
}
};
let video_frame = match res {
Err(_) => {
gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type error received, assuming that the source closed the stream...."]);
let mut state = self.state.lock().unwrap();
match state.receiver.take() {
Some(recv) => recv,
None => {
gst_error!(self.cat, obj: element, "Have no receiver");
return Err(gst::FlowError::Error);
}
Ok(None) if timeout.elapsed().as_millis() >= settings.timeout as u128 => {
return Err(gst::FlowError::Eos);
}
Ok(None) => {
gst_debug!(self.cat, obj: element, "No video frame received yet, retry");
continue;
}
Ok(Some(frame)) => frame,
};
break video_frame;
};
let pts = self.calculate_timestamp(element, &settings, &video_frame);
let info = self.create_video_info(element, &video_frame)?;
{
let mut state = self.state.lock().unwrap();
if state.info.as_ref() != Some(&info) {
let caps = info.to_caps().unwrap();
state.info = Some(info.clone());
state.current_latency = gst::SECOND
.mul_div_ceil(
video_frame.frame_rate().1 as u64,
video_frame.frame_rate().0 as u64,
)
.unwrap_or(gst::CLOCK_TIME_NONE);
drop(state);
gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps);
element
.set_caps(&caps)
.map_err(|_| gst::FlowError::NotNegotiated)?;
let _ =
element.post_message(&gst::Message::new_latency().src(Some(element)).build());
}
}
let buffer = self.create_buffer(element, pts, &info, &video_frame)?;
gst_log!(self.cat, obj: element, "Produced buffer {:?}", buffer);
Ok(buffer)
}
fn calculate_timestamp(
&self,
element: &gst_base::BaseSrc,
settings: &Settings,
video_frame: &VideoFrame,
) -> gst::ClockTime {
let clock = element.get_clock().unwrap();
// For now take the current running time as PTS. At a later time we
// will want to work with the timestamp given by the NDI SDK if available
let now = clock.get_time();
let base_time = element.get_base_time();
let receive_time = now - base_time;
let real_time_now = gst::ClockTime::from(glib::get_real_time() as u64 * 1000);
let timestamp = if video_frame.timestamp() == ndisys::NDIlib_recv_timestamp_undefined {
gst::CLOCK_TIME_NONE
} else {
gst::ClockTime::from(video_frame.timestamp() as u64 * 100)
};
let timecode = gst::ClockTime::from(video_frame.timecode() as u64 * 100);
gst_log!(
self.cat,
obj: element,
"NDI video frame received: {:?} with timecode {} and timestamp {}, receive time {}, local time now {}",
video_frame,
timecode,
timestamp,
receive_time,
real_time_now,
);
let pts = match settings.timestamp_mode {
TimestampMode::ReceiveTime => receive_time,
TimestampMode::Timecode => timecode,
TimestampMode::Timestamp if timestamp.is_none() => receive_time,
TimestampMode::Timestamp => {
// Timestamps are relative to the UNIX epoch
if real_time_now > timestamp {
let diff = real_time_now - timestamp;
if diff > receive_time {
0.into()
} else {
receive_time - diff
}
} else {
let diff = timestamp - real_time_now;
receive_time + diff
}
}
};
gst_log!(
self.cat,
obj: element,
"Calculated pts for video frame: {:?}",
pts
);
match recv.capture() {
ReceiverItem::VideoBuffer(buffer, info) => {
let mut state = self.state.lock().unwrap();
state.receiver = Some(recv);
if state.info.as_ref() != Some(&info) {
let caps = info.to_caps().unwrap();
state.info = Some(info.clone());
state.current_latency = buffer.get_duration();
drop(state);
gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps);
element
.set_caps(&caps)
.map_err(|_| gst::FlowError::NotNegotiated)?;
pts
}
fn create_video_info(
&self,
_element: &gst_base::BaseSrc,
video_frame: &VideoFrame,
) -> Result<gst_video::VideoInfo, gst::FlowError> {
// YV12 and I420 are swapped in the NDI SDK compared to GStreamer
let format = match video_frame.fourcc() {
ndisys::NDIlib_FourCC_type_e::NDIlib_FourCC_type_UYVY => gst_video::VideoFormat::Uyvy,
ndisys::NDIlib_FourCC_type_e::NDIlib_FourCC_type_YV12 => gst_video::VideoFormat::I420,
ndisys::NDIlib_FourCC_type_e::NDIlib_FourCC_type_NV12 => gst_video::VideoFormat::Nv12,
ndisys::NDIlib_FourCC_type_e::NDIlib_FourCC_type_I420 => gst_video::VideoFormat::Yv12,
ndisys::NDIlib_FourCC_type_e::NDIlib_FourCC_type_BGRA => gst_video::VideoFormat::Bgra,
ndisys::NDIlib_FourCC_type_e::NDIlib_FourCC_type_BGRX => gst_video::VideoFormat::Bgrx,
ndisys::NDIlib_FourCC_type_e::NDIlib_FourCC_type_RGBA => gst_video::VideoFormat::Rgba,
ndisys::NDIlib_FourCC_type_e::NDIlib_FourCC_type_RGBX => gst_video::VideoFormat::Rgbx,
ndisys::NDIlib_FourCC_type_e::NDIlib_FourCC_type_UYVA => gst_video::VideoFormat::Uyvy,
};
let par = gst::Fraction::approximate_f32(video_frame.picture_aspect_ratio()).unwrap()
* gst::Fraction::new(video_frame.yres(), video_frame.xres());
#[cfg(feature = "interlaced-fields")]
{
let mut builder = gst_video::VideoInfo::new(
format,
video_frame.xres() as u32,
video_frame.yres() as u32,
)
.fps(gst::Fraction::from(video_frame.frame_rate()))
.par(par)
.interlace_mode(match video_frame.frame_format_type() {
ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_progressive => {
gst_video::VideoInterlaceMode::Progressive
let _ = element
.post_message(&gst::Message::new_latency().src(Some(element)).build());
}
ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved => {
gst_video::VideoInterlaceMode::Interleaved
}
_ => gst_video::VideoInterlaceMode::Alternate,
});
/* Requires GStreamer 1.12 at least */
if video_frame.frame_format_type()
== ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved
{
builder = builder.field_order(gst_video::VideoFieldOrder::TopFieldFirst);
Ok(buffer)
}
Ok(builder.build().unwrap())
ReceiverItem::Timeout => Err(gst::FlowError::Eos),
ReceiverItem::Flushing => Err(gst::FlowError::Flushing),
ReceiverItem::Error(err) => Err(err),
ReceiverItem::AudioBuffer(..) => unreachable!(),
}
#[cfg(not(feature = "interlaced-fields"))]
{
if video_frame.frame_format_type()
!= ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_progressive
&& video_frame.frame_format_type()
!= ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved
{
gst_element_error!(
element,
gst::StreamError::Format,
["Separate field interlacing not supported"]
);
return Err(gst::FlowError::NotNegotiated);
}
let builder = gst_video::VideoInfo::new(
format,
video_frame.xres() as u32,
video_frame.yres() as u32,
)
.fps(gst::Fraction::from(video_frame.frame_rate()))
.par(par)
.interlace_mode(
if video_frame.frame_format_type()
== ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_progressive
{
gst_video::VideoInterlaceMode::Progressive
} else {
gst_video::VideoInterlaceMode::Interleaved
},
);
Ok(builder.build().unwrap());
}
}
fn create_buffer(
&self,
element: &gst_base::BaseSrc,
pts: gst::ClockTime,
info: &gst_video::VideoInfo,
video_frame: &VideoFrame,
) -> Result<gst::Buffer, gst::FlowError> {
let mut buffer = gst::Buffer::with_size(info.size()).unwrap();
{
let duration = gst::SECOND
.mul_div_floor(
video_frame.frame_rate().1 as u64,
video_frame.frame_rate().0 as u64,
)
.unwrap_or(gst::CLOCK_TIME_NONE);
let buffer = buffer.get_mut().unwrap();
buffer.set_pts(pts);
buffer.set_duration(duration);
#[cfg(feature = "reference-timestamps")]
{
gst::ReferenceTimestampMeta::add(
buffer,
&*TIMECODE_CAPS,
gst::ClockTime::from(video_frame.timecode() as u64 * 100),
gst::CLOCK_TIME_NONE,
);
if video_frame.timestamp() != ndisys::NDIlib_recv_timestamp_undefined {
gst::ReferenceTimestampMeta::add(
buffer,
&*TIMESTAMP_CAPS,
gst::ClockTime::from(video_frame.timestamp() as u64 * 100),
gst::CLOCK_TIME_NONE,
);
}
}
#[cfg(feature = "interlaced-fields")]
{
match video_frame.frame_format_type() {
ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved => {
buffer.set_video_flags(
gst_video::VideoBufferFlags::INTERLACED
| gst_video::VideoBufferFlags::TFF,
);
}
ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_field_0 => {
buffer.set_video_flags(
gst_video::VideoBufferFlags::INTERLACED
| gst_video::VideoBufferFlags::TOP_FIELD,
);
}
ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_field_1 => {
buffer.set_video_flags(
gst_video::VideoBufferFlags::INTERLACED
| gst_video::VideoBufferFlags::BOTTOM_FIELD,
);
}
_ => (),
};
}
#[cfg(not(feature = "interlaced-fields"))]
{
if video_frame.frame_format_type()
== ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved
{
buffer.set_video_flags(
gst_video::VideoBufferFlags::INTERLACED | gst_video::VideoBufferFlags::TFF,
);
}
}
}
self.copy_frame(element, info, buffer, video_frame)
}
fn copy_frame(
&self,
_element: &gst_base::BaseSrc,
info: &gst_video::VideoInfo,
buffer: gst::Buffer,
video_frame: &VideoFrame,
) -> Result<gst::Buffer, gst::FlowError> {
// FIXME: Error handling if frame dimensions don't match
let mut vframe = gst_video::VideoFrame::from_buffer_writable(buffer, info).unwrap();
match info.format() {
gst_video::VideoFormat::Uyvy
| gst_video::VideoFormat::Bgra
| gst_video::VideoFormat::Bgrx
| gst_video::VideoFormat::Rgba
| gst_video::VideoFormat::Rgbx => {
let line_bytes = if info.format() == gst_video::VideoFormat::Uyvy {
2 * vframe.width() as usize
} else {
4 * vframe.width() as usize
};
let dest_stride = vframe.plane_stride()[0] as usize;
let dest = vframe.plane_data_mut(0).unwrap();
let src_stride = video_frame.line_stride_in_bytes() as usize;
let src = video_frame.data();
for (dest, src) in dest
.chunks_exact_mut(dest_stride)
.zip(src.chunks_exact(src_stride))
{
dest.copy_from_slice(src);
dest.copy_from_slice(&src[..line_bytes]);
}
}
gst_video::VideoFormat::Nv12 => {
// First plane
{
let line_bytes = vframe.width() as usize;
let dest_stride = vframe.plane_stride()[0] as usize;
let dest = vframe.plane_data_mut(0).unwrap();
let src_stride = video_frame.line_stride_in_bytes() as usize;
let src = video_frame.data();
for (dest, src) in dest
.chunks_exact_mut(dest_stride)
.zip(src.chunks_exact(src_stride))
{
dest.copy_from_slice(&src[..line_bytes]);
}
}
// Second plane
{
let line_bytes = vframe.width() as usize;
let dest_stride = vframe.plane_stride()[1] as usize;
let dest = vframe.plane_data_mut(1).unwrap();
let src_stride = video_frame.line_stride_in_bytes() as usize;
let src = &video_frame.data()[(video_frame.yres() as usize * src_stride)..];
for (dest, src) in dest
.chunks_exact_mut(dest_stride)
.zip(src.chunks_exact(src_stride))
{
dest.copy_from_slice(&src[..line_bytes]);
}
}
}
gst_video::VideoFormat::Yv12 | gst_video::VideoFormat::I420 => {
// First plane
{
let line_bytes = vframe.width() as usize;
let dest_stride = vframe.plane_stride()[0] as usize;
let dest = vframe.plane_data_mut(0).unwrap();
let src_stride = video_frame.line_stride_in_bytes() as usize;
let src = video_frame.data();
for (dest, src) in dest
.chunks_exact_mut(dest_stride)
.zip(src.chunks_exact(src_stride))
{
dest.copy_from_slice(&src[..line_bytes]);
}
}
// Second plane
{
let line_bytes = (vframe.width() as usize + 1) / 2;
let dest_stride = vframe.plane_stride()[1] as usize;
let dest = vframe.plane_data_mut(1).unwrap();
let src_stride = video_frame.line_stride_in_bytes() as usize;
let src_stride1 = video_frame.line_stride_in_bytes() as usize / 2;
let src = &video_frame.data()[(video_frame.yres() as usize * src_stride)..];
for (dest, src) in dest
.chunks_exact_mut(dest_stride)
.zip(src.chunks_exact(src_stride1))
{
dest.copy_from_slice(&src[..line_bytes]);
}
}
// Third plane
{
let line_bytes = (vframe.width() as usize + 1) / 2;
let dest_stride = vframe.plane_stride()[2] as usize;
let dest = vframe.plane_data_mut(2).unwrap();
let src_stride = video_frame.line_stride_in_bytes() as usize;
let src_stride1 = video_frame.line_stride_in_bytes() as usize / 2;
let src = &video_frame.data()[(video_frame.yres() as usize * src_stride
+ (video_frame.yres() as usize + 1) / 2 * src_stride1)..];
for (dest, src) in dest
.chunks_exact_mut(dest_stride)
.zip(src.chunks_exact(src_stride1))
{
dest.copy_from_slice(&src[..line_bytes]);
}
}
}
_ => unreachable!(),
}
Ok(vframe.into_buffer())
}
}

1358
src/receiver.rs Normal file

File diff suppressed because it is too large Load diff