Improve error reporting/handling

And minor refactoring.
This commit is contained in:
Sebastian Dröge 2019-07-19 12:51:06 +03:00
parent d352a0c20d
commit 98c290602c
4 changed files with 228 additions and 181 deletions

View file

@ -96,7 +96,7 @@ impl FindInstance {
let mut sources = vec![];
for i in 0..no_sources {
sources.push(Source::Borrowed(
ptr::NonNull::new(sources_ptr.add(i as usize) as *mut _).unwrap(),
ptr::NonNull::new_unchecked(sources_ptr.add(i as usize) as *mut _),
self,
));
}

View file

@ -516,14 +516,26 @@ impl BaseSrcImpl for NdiAudioSrc {
let mut state = self.state.lock().unwrap();
state.receiver = Some(recv);
if state.info.as_ref() != Some(&info) {
let caps = info.to_caps().unwrap();
let caps = info.to_caps().ok_or_else(|| {
gst_element_error!(
element,
gst::ResourceError::Settings,
["Invalid audio info received: {:?}", info]
);
gst::FlowError::NotNegotiated
})?;
state.info = Some(info.clone());
state.current_latency = buffer.get_duration();
drop(state);
gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps);
element
.set_caps(&caps)
.map_err(|_| gst::FlowError::NotNegotiated)?;
element.set_caps(&caps).map_err(|_| {
gst_element_error!(
element,
gst::CoreError::Negotiation,
["Failed to negotiate caps: {:?}", caps]
);
gst::FlowError::NotNegotiated
})?;
let _ = element
.post_message(&gst::Message::new_latency().src(Some(element)).build());

View file

@ -556,14 +556,26 @@ impl BaseSrcImpl for NdiVideoSrc {
let mut state = self.state.lock().unwrap();
state.receiver = Some(recv);
if state.info.as_ref() != Some(&info) {
let caps = info.to_caps().unwrap();
let caps = info.to_caps().ok_or_else(|| {
gst_element_error!(
element,
gst::ResourceError::Settings,
["Invalid audio info received: {:?}", info]
);
gst::FlowError::NotNegotiated
})?;
state.info = Some(info.clone());
state.current_latency = buffer.get_duration();
drop(state);
gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps);
element
.set_caps(&caps)
.map_err(|_| gst::FlowError::NotNegotiated)?;
element.set_caps(&caps).map_err(|_| {
gst_element_error!(
element,
gst::CoreError::Negotiation,
["Failed to negotiate caps: {:?}", caps]
);
gst::FlowError::NotNegotiated
})?;
let _ = element
.post_message(&gst::Message::new_latency().src(Some(element)).build());

View file

@ -264,47 +264,53 @@ impl Observations {
current_mapping.b.into(),
current_mapping.num.into(),
current_mapping.den.into(),
)
.unwrap();
);
let new_calculated = gst::Clock::adjust_with_calibration(
time.0.into(),
next_mapping.xbase.into(),
next_mapping.b.into(),
next_mapping.num.into(),
next_mapping.den.into(),
)
.unwrap();
let diff = if new_calculated > expected {
new_calculated - expected
} else {
expected - new_calculated
};
// Allow at most 5% frame duration or 2ms difference per frame
let max_diff = cmp::max(
(duration / 10).unwrap_or(2 * gst::MSECOND_VAL),
2 * gst::MSECOND_VAL,
);
if diff > max_diff {
gst_debug!(
cat,
obj: element,
"New time mapping causes difference {} but only {} allowed",
gst::ClockTime::from(diff),
gst::ClockTime::from(max_diff),
if let (Some(expected), Some(new_calculated)) = (*expected, *new_calculated) {
let diff = if new_calculated > expected {
new_calculated - expected
} else {
expected - new_calculated
};
// Allow at most 5% frame duration or 2ms difference per frame
let max_diff = cmp::max(
(duration / 10).unwrap_or(2 * gst::MSECOND_VAL),
2 * gst::MSECOND_VAL,
);
if new_calculated > expected {
current_mapping.b = expected + max_diff;
current_mapping.xbase = time.0;
if diff > max_diff {
gst_debug!(
cat,
obj: element,
"New time mapping causes difference {} but only {} allowed",
gst::ClockTime::from(diff),
gst::ClockTime::from(max_diff),
);
if new_calculated > expected {
current_mapping.b = expected + max_diff;
current_mapping.xbase = time.0;
} else {
current_mapping.b = expected - max_diff;
current_mapping.xbase = time.0;
}
} else {
current_mapping.b = expected - max_diff;
current_mapping.xbase = time.0;
*current_mapping = *next_mapping;
}
} else {
*current_mapping = *next_mapping;
gst_warning!(
cat,
obj: element,
"Failed to calculate timestamps based on new mapping",
);
}
}
@ -597,13 +603,15 @@ where
if val_ip_address == ip_address || val_ndi_name == ndi_name {
if (val_video.is_some() || !T::IS_VIDEO) && (val_audio.is_some() || T::IS_VIDEO) {
gst_error!(
cat,
obj: element,
"Source with ndi-name '{:?}' and ip-address '{:?}' already in use for {}",
val_ndi_name,
val_ip_address,
if T::IS_VIDEO { "video" } else { "audio" },
gst_element_error!(
element,
gst::ResourceError::OpenRead,
[
"Source with ndi-name '{:?}' and ip-address '{:?}' already in use for {}",
val_ndi_name,
val_ip_address,
if T::IS_VIDEO { "video" } else { "audio" }
]
);
return None;
@ -744,9 +752,7 @@ fn connect_ndi_async(
source.ip_address(),
);
}
}
{
let receivers = HASHMAP_RECEIVERS.lock().unwrap();
let info = match receivers.get(&id_receiver) {
None => return Err(None),
@ -804,7 +810,7 @@ fn connect_ndi_async(
None => {
return Err(Some(gst_error_msg!(
gst::CoreError::Negotiation,
["Cannot run NDI: NDIlib_recv_create_v3 error"]
["Failed to connect to source"]
)));
}
Some(recv) => recv,
@ -873,7 +879,10 @@ where
Some(receiver) => receiver,
};
let element = receiver.0.element.upgrade().unwrap();
let element = match receiver.0.element.upgrade() {
None => return,
Some(element) => element,
};
let mut recv = receiver.0.recv.lock().unwrap();
loop {
@ -911,7 +920,10 @@ where
Some(receiver) => receiver,
};
let element = receiver.0.element.upgrade().unwrap();
let element = match receiver.0.element.upgrade() {
None => return,
Some(element) => element,
};
{
let queue = (receiver.0.queue.0).0.lock().unwrap();
@ -947,7 +959,10 @@ where
Some(receiver) => receiver,
};
let element = receiver.0.element.upgrade().unwrap();
let element = match receiver.0.element.upgrade() {
None => return,
Some(element) => element,
};
{
let queue = (receiver.0.queue.0).0.lock().unwrap();
@ -1032,6 +1047,78 @@ impl ReceiverCapture<VideoReceiver> for Receiver<VideoReceiver> {
}
}
impl<T: ReceiverType> Receiver<T> {
fn calculate_timestamp(
&self,
element: &gst_base::BaseSrc,
timestamp: i64,
timecode: i64,
duration: gst::ClockTime,
) -> (gst::ClockTime, gst::ClockTime) {
let clock = element.get_clock().unwrap();
// For now take the current running time as PTS. At a later time we
// will want to work with the timestamp given by the NDI SDK if available
let now = clock.get_time();
let base_time = element.get_base_time();
let receive_time = now - base_time;
let real_time_now = gst::ClockTime::from(glib::get_real_time() as u64 * 1000);
let timestamp = if timestamp == ndisys::NDIlib_recv_timestamp_undefined {
gst::CLOCK_TIME_NONE
} else {
gst::ClockTime::from(timestamp as u64 * 100)
};
let timecode = gst::ClockTime::from(timecode as u64 * 100);
gst_log!(
self.0.cat,
obj: element,
"Received frame with timecode {}, timestamp {}, duration {}, receive time {}, local time now {}",
timecode,
timestamp,
duration,
receive_time,
real_time_now,
);
let (pts, duration) = match self.0.timestamp_mode {
TimestampMode::ReceiveTime => self.0.observations.process(
self.0.cat,
element,
(timestamp, receive_time),
duration,
),
TimestampMode::Timecode => (timecode, duration),
TimestampMode::Timestamp if timestamp.is_none() => (receive_time, duration),
TimestampMode::Timestamp => {
// Timestamps are relative to the UNIX epoch
if real_time_now > timestamp {
let diff = real_time_now - timestamp;
if diff > receive_time {
(0.into(), duration)
} else {
(receive_time - diff, duration)
}
} else {
let diff = timestamp - real_time_now;
(receive_time + diff, duration)
}
}
};
gst_log!(
self.0.cat,
obj: element,
"Calculated PTS {}, duration {}",
pts,
duration,
);
(pts, duration)
}
}
impl ReceiverCapture<AudioReceiver> for Receiver<AudioReceiver> {
fn capture_internal(
&self,
@ -1081,10 +1168,15 @@ impl Receiver<VideoReceiver> {
let video_frame = match res {
Err(_) => {
gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type error received, assuming that the source closed the stream...."]);
gst_element_error!(
element,
gst::ResourceError::Read,
["Error receiving frame"]
);
return Err(gst::FlowError::Error);
}
Ok(None) if timeout.elapsed().as_millis() >= self.0.timeout as u128 => {
gst_debug!(self.0.cat, obj: element, "Timed out -- assuming EOS",);
return Err(gst::FlowError::Eos);
}
Ok(None) => {
@ -1101,6 +1193,13 @@ impl Receiver<VideoReceiver> {
break video_frame;
};
gst_debug!(
self.0.cat,
obj: element,
"Received video frame {:?}",
video_frame,
);
let (pts, duration) = self.calculate_video_timestamp(element, &video_frame);
// Simply read all video frames while flushing but don't copy them or anything to
@ -1124,22 +1223,6 @@ impl Receiver<VideoReceiver> {
element: &gst_base::BaseSrc,
video_frame: &VideoFrame,
) -> (gst::ClockTime, gst::ClockTime) {
let clock = element.get_clock().unwrap();
// For now take the current running time as PTS. At a later time we
// will want to work with the timestamp given by the NDI SDK if available
let now = clock.get_time();
let base_time = element.get_base_time();
let receive_time = now - base_time;
let real_time_now = gst::ClockTime::from(glib::get_real_time() as u64 * 1000);
let timestamp = if video_frame.timestamp() == ndisys::NDIlib_recv_timestamp_undefined {
gst::CLOCK_TIME_NONE
} else {
gst::ClockTime::from(video_frame.timestamp() as u64 * 100)
};
let timecode = gst::ClockTime::from(video_frame.timecode() as u64 * 100);
let duration = gst::SECOND
.mul_div_floor(
video_frame.frame_rate().1 as u64,
@ -1147,57 +1230,17 @@ impl Receiver<VideoReceiver> {
)
.unwrap_or(gst::CLOCK_TIME_NONE);
gst_log!(
self.0.cat,
obj: element,
"NDI video frame received: {:?} with timecode {}, timestamp {}, duration {}, receive time {}, local time now {}",
video_frame,
timecode,
timestamp,
self.calculate_timestamp(
element,
video_frame.timestamp(),
video_frame.timecode(),
duration,
receive_time,
real_time_now,
);
let (pts, duration) = match self.0.timestamp_mode {
TimestampMode::ReceiveTime => self.0.observations.process(
self.0.cat,
element,
(timestamp, receive_time),
duration,
),
TimestampMode::Timecode => (timecode, duration),
TimestampMode::Timestamp if timestamp.is_none() => (receive_time, duration),
TimestampMode::Timestamp => {
// Timestamps are relative to the UNIX epoch
if real_time_now > timestamp {
let diff = real_time_now - timestamp;
if diff > receive_time {
(0.into(), duration)
} else {
(receive_time - diff, duration)
}
} else {
let diff = timestamp - real_time_now;
(receive_time + diff, duration)
}
}
};
gst_log!(
self.0.cat,
obj: element,
"Calculated PTS for video frame {}, duration {}",
pts,
duration,
);
(pts, duration)
)
}
fn create_video_info(
&self,
_element: &gst_base::BaseSrc,
element: &gst_base::BaseSrc,
video_frame: &VideoFrame,
) -> Result<gst_video::VideoInfo, gst::FlowError> {
// YV12 and I420 are swapped in the NDI SDK compared to GStreamer
@ -1213,7 +1256,8 @@ impl Receiver<VideoReceiver> {
ndisys::NDIlib_FourCC_type_e::NDIlib_FourCC_type_UYVA => gst_video::VideoFormat::Uyvy,
};
let par = gst::Fraction::approximate_f32(video_frame.picture_aspect_ratio()).unwrap()
let par = gst::Fraction::approximate_f32(video_frame.picture_aspect_ratio())
.unwrap_or(gst::Fraction::new(1, 1))
* gst::Fraction::new(video_frame.yres(), video_frame.xres());
#[cfg(feature = "interlaced-fields")]
@ -1241,7 +1285,15 @@ impl Receiver<VideoReceiver> {
builder = builder.field_order(gst_video::VideoFieldOrder::TopFieldFirst);
}
Ok(builder.build().unwrap())
builder.build().ok_or_else(|| {
gst_element_error!(
element,
gst::StreamError::Format,
["Invalid video format configuration"]
);
gst::FlowError::NotNegotiated
})
}
#[cfg(not(feature = "interlaced-fields"))]
@ -1282,7 +1334,15 @@ impl Receiver<VideoReceiver> {
builder = builder.field_order(gst_video::VideoFieldOrder::TopFieldFirst);
}
Ok(builder.build().unwrap());
builder.build().ok_or_else(|| {
gst_element_error!(
element,
gst::StreamError::Format,
["Invalid video format configuration"]
);
gst::FlowError::NotNegotiated
})
}
}
@ -1365,7 +1425,6 @@ impl Receiver<VideoReceiver> {
buffer: gst::Buffer,
video_frame: &VideoFrame,
) -> Result<gst::Buffer, gst::FlowError> {
// FIXME: Error handling if frame dimensions don't match
let mut vframe = gst_video::VideoFrame::from_buffer_writable(buffer, info).unwrap();
match info.format() {
@ -1514,10 +1573,15 @@ impl Receiver<AudioReceiver> {
let audio_frame = match res {
Err(_) => {
gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type error received, assuming that the source closed the stream...."]);
gst_element_error!(
element,
gst::ResourceError::Read,
["Error receiving frame"]
);
return Err(gst::FlowError::Error);
}
Ok(None) if timeout.elapsed().as_millis() >= self.0.timeout as u128 => {
gst_debug!(self.0.cat, obj: element, "Timed out -- assuming EOS",);
return Err(gst::FlowError::Eos);
}
Ok(None) => {
@ -1534,6 +1598,13 @@ impl Receiver<AudioReceiver> {
break audio_frame;
};
gst_debug!(
self.0.cat,
obj: element,
"Received audio frame {:?}",
audio_frame,
);
let (pts, duration) = self.calculate_audio_timestamp(element, &audio_frame);
// Simply read all video frames while flushing but don't copy them or anything to
@ -1557,22 +1628,6 @@ impl Receiver<AudioReceiver> {
element: &gst_base::BaseSrc,
audio_frame: &AudioFrame,
) -> (gst::ClockTime, gst::ClockTime) {
let clock = element.get_clock().unwrap();
// For now take the current running time as PTS. At a later time we
// will want to work with the timestamp given by the NDI SDK if available
let now = clock.get_time();
let base_time = element.get_base_time();
let receive_time = now - base_time;
let real_time_now = gst::ClockTime::from(glib::get_real_time() as u64 * 1000);
let timestamp = if audio_frame.timestamp() == ndisys::NDIlib_recv_timestamp_undefined {
gst::CLOCK_TIME_NONE
} else {
gst::ClockTime::from(audio_frame.timestamp() as u64 * 100)
};
let timecode = gst::ClockTime::from(audio_frame.timecode() as u64 * 100);
let duration = gst::SECOND
.mul_div_floor(
audio_frame.no_samples() as u64,
@ -1580,57 +1635,17 @@ impl Receiver<AudioReceiver> {
)
.unwrap_or(gst::CLOCK_TIME_NONE);
gst_log!(
self.0.cat,
obj: element,
"NDI audio frame received: {:?} with timecode {}, timestamp {}, duration {}, receive time {}, local time now {}",
audio_frame,
timecode,
timestamp,
self.calculate_timestamp(
element,
audio_frame.timestamp(),
audio_frame.timecode(),
duration,
receive_time,
real_time_now,
);
let (pts, duration) = match self.0.timestamp_mode {
TimestampMode::ReceiveTime => self.0.observations.process(
self.0.cat,
element,
(timestamp, receive_time),
duration,
),
TimestampMode::Timecode => (timecode, duration),
TimestampMode::Timestamp if timestamp.is_none() => (receive_time, duration),
TimestampMode::Timestamp => {
// Timestamps are relative to the UNIX epoch
if real_time_now > timestamp {
let diff = real_time_now - timestamp;
if diff > receive_time {
(0.into(), duration)
} else {
(receive_time - diff, duration)
}
} else {
let diff = timestamp - real_time_now;
(receive_time + diff, duration)
}
}
};
gst_log!(
self.0.cat,
obj: element,
"Calculated PTS for audio frame {}, duration {}",
pts,
duration,
);
(pts, duration)
)
}
fn create_audio_info(
&self,
_element: &gst_base::BaseSrc,
element: &gst_base::BaseSrc,
audio_frame: &AudioFrame,
) -> Result<gst_audio::AudioInfo, gst::FlowError> {
let builder = gst_audio::AudioInfo::new(
@ -1639,7 +1654,15 @@ impl Receiver<AudioReceiver> {
audio_frame.no_channels() as u32,
);
Ok(builder.build().unwrap())
builder.build().ok_or_else(|| {
gst_element_error!(
element,
gst::StreamError::Format,
["Invalid audio format configuration"]
);
gst::FlowError::NotNegotiated
})
}
fn create_audio_buffer(