Add debug logging everywhere

This commit is contained in:
Sebastian Dröge 2016-12-27 16:47:39 +01:00
parent c9aad84898
commit 137a14360f
13 changed files with 366 additions and 28 deletions

View file

@ -80,18 +80,24 @@ impl Sink for FileSink {
let location = try!(uri.to_file_path()
.or_else(|_| {
error!(self.logger, "Unsupported file URI '{}'", uri.as_str());
Err(error_msg!(SinkError::Failure,
["Unsupported file URI '{}'", uri.as_str()]))
}));
let file = try!(File::create(location.as_path()).or_else(|err| {
error!(self.logger,
"Could not open file for writing: {}",
err.to_string());
Err(error_msg!(SinkError::OpenFailed,
["Could not open file for writing '{}': {}",
location.to_str().unwrap_or("Non-UTF8 path"),
err.to_string()]))
}));
debug!(self.logger, "Opened file {:?}", file);
self.streaming_state = StreamingState::Started {
file: file,
position: 0,
@ -107,6 +113,11 @@ impl Sink for FileSink {
}
fn render(&mut self, buffer: &Buffer) -> Result<(), FlowError> {
// FIXME: Because we borrow streaming state mutably below
let logger = self.logger.clone();
trace!(logger, "Rendering {:?}", buffer);
let (file, position) = match self.streaming_state {
StreamingState::Started { ref mut file, ref mut position } => (file, position),
StreamingState::Stopped => {
@ -124,6 +135,7 @@ impl Sink for FileSink {
let data = map.as_slice();
try!(file.write_all(data).or_else(|err| {
error!(logger, "Failed to write: {}", err);
Err(FlowError::Error(error_msg!(SinkError::WriteFailed, ["Failed to write: {}", err])))
}));

View file

@ -92,17 +92,23 @@ impl Source for FileSrc {
let location = try!(uri.to_file_path()
.or_else(|_| {
error!(self.logger, "Unsupported file URI '{}'", uri.as_str());
Err(error_msg!(SourceError::Failure,
["Unsupported file URI '{}'", uri.as_str()]))
}));
let file = try!(File::open(location.as_path()).or_else(|err| {
error!(self.logger,
"Could not open file for reading: {}",
err.to_string());
Err(error_msg!(SourceError::OpenFailed,
["Could not open file for reading '{}': {}",
location.to_str().unwrap_or("Non-UTF8 path"),
err.to_string()]))
}));
debug!(self.logger, "Opened file {:?}", file);
self.streaming_state = StreamingState::Started {
file: file,
position: 0,
@ -118,6 +124,9 @@ impl Source for FileSrc {
}
fn fill(&mut self, offset: u64, _: u32, buffer: &mut Buffer) -> Result<(), FlowError> {
// FIXME: Because we borrow streaming state mutably below
let logger = self.logger.clone();
let (file, position) = match self.streaming_state {
StreamingState::Started { ref mut file, ref mut position } => (file, position),
StreamingState::Stopped => {
@ -127,6 +136,7 @@ impl Source for FileSrc {
if *position != offset {
try!(file.seek(SeekFrom::Start(offset)).or_else(|err| {
error!(logger, "Failed to seek to {}: {:?}", offset, err);
Err(FlowError::Error(error_msg!(SourceError::SeekFailed,
["Failed to seek to {}: {}",
offset,
@ -147,6 +157,7 @@ impl Source for FileSrc {
let data = map.as_mut_slice();
try!(file.read(data).or_else(|err| {
error!(logger, "Failed to read: {:?}", err);
Err(FlowError::Error(error_msg!(SourceError::ReadFailed,
["Failed to read at {}: {}",
offset,

View file

@ -83,12 +83,16 @@ impl HttpSrc {
}
}
debug!(self.logger, "Doing new request {:?}", req);
let response = try!(req.send().or_else(|err| {
error!(self.logger, "Request failed: {:?}", err);
Err(error_msg!(SourceError::ReadFailed,
["Failed to fetch {}: {}", uri, err.to_string()]))
}));
if !response.status().is_success() {
error!(self.logger, "Request status failed: {:?}", response);
return Err(error_msg!(SourceError::ReadFailed,
["Failed to fetch {}: {}", uri, response.status()]));
}
@ -119,6 +123,8 @@ impl HttpSrc {
["Failed to seek to {}: Got {}", start, position]));
}
debug!(self.logger, "Request successful: {:?}", response);
Ok(StreamingState::Started {
uri: uri,
response: response,
@ -193,6 +199,8 @@ impl Source for HttpSrc {
}
fn fill(&mut self, offset: u64, _: u32, buffer: &mut Buffer) -> Result<(), FlowError> {
let logger = self.logger.clone();
let (response, position) = match self.streaming_state {
StreamingState::Started { ref mut response, ref mut position, .. } => {
(response, position)
@ -221,6 +229,7 @@ impl Source for HttpSrc {
let data = map.as_mut_slice();
try!(response.read(data).or_else(|err| {
error!(logger, "Failed to read: {:?}", err);
Err(FlowError::Error(error_msg!(SourceError::ReadFailed,
["Failed to read at {}: {}",
offset,

View file

@ -10,7 +10,8 @@ license = "LGPL-2.1+"
libc = "0.2"
url = "1.1"
bitflags = "0.7"
slog = "1.3"
slog = { version = "1.3", features = ["max_level_trace"] }
lazy_static = "0.2"
[build-dependencies]
gcc = "0.3"

View file

@ -17,8 +17,20 @@
//
use buffer::*;
use log::*;
use std::collections::VecDeque;
use std::cmp;
use slog::*;
lazy_static! {
static ref LOGGER: Logger = {
Logger::root(GstDebugDrain::new(None,
"rsadapter",
0,
"Rust buffer adapter"),
None)
};
}
#[derive(Debug)]
pub struct Adapter {
@ -47,6 +59,11 @@ impl Adapter {
let size = buffer.get_size();
self.size += size;
trace!(LOGGER,
"Storing {:?} of size {}, now have size {}",
buffer,
size,
self.size);
self.deque.push_back(buffer.into_read_mapped_buffer().unwrap());
}
@ -55,6 +72,7 @@ impl Adapter {
self.size = 0;
self.skip = 0;
self.scratch.clear();
trace!(LOGGER, "Cleared adapter");
}
pub fn get_available(&self) -> usize {
@ -66,10 +84,18 @@ impl Adapter {
let mut left = size;
let mut idx = 0;
trace!(LOGGER, "Copying {} bytes", size);
for item in deque {
let data_item = item.as_slice();
let to_copy = cmp::min(left, data_item.len() - skip);
trace!(LOGGER,
"Copying {} bytes from {:?}, {} more to go",
to_copy,
item,
left - to_copy);
data[idx..idx + to_copy].copy_from_slice(&data_item[skip..skip + to_copy]);
skip = 0;
idx += to_copy;
@ -85,9 +111,14 @@ impl Adapter {
let size = data.len();
if self.size < size {
debug!(LOGGER,
"Peeking {} bytes into, not enough data: have {}",
size,
self.size);
return Err(AdapterError::NotEnoughData);
}
trace!(LOGGER, "Peeking {} bytes into", size);
if size == 0 {
return Ok(());
}
@ -98,6 +129,10 @@ impl Adapter {
pub fn peek(&mut self, size: usize) -> Result<&[u8], AdapterError> {
if self.size < size {
debug!(LOGGER,
"Peeking {} bytes, not enough data: have {}",
size,
self.size);
return Err(AdapterError::NotEnoughData);
}
@ -106,11 +141,14 @@ impl Adapter {
}
if let Some(front) = self.deque.front() {
trace!(LOGGER, "Peeking {} bytes, subbuffer of first", size);
if front.get_size() - self.skip >= size {
return Ok(&front.as_slice()[self.skip..self.skip + size]);
}
}
trace!(LOGGER, "Peeking {} bytes, copy to scratch", size);
self.scratch.truncate(0);
self.scratch.reserve(size);
{
@ -123,6 +161,10 @@ impl Adapter {
pub fn get_buffer(&mut self, size: usize) -> Result<Buffer, AdapterError> {
if self.size < size {
debug!(LOGGER,
"Get buffer of {} bytes, not enough data: have {}",
size,
self.size);
return Err(AdapterError::NotEnoughData);
}
@ -132,6 +174,7 @@ impl Adapter {
let sub = self.deque.front().and_then(|front| {
if front.get_size() - self.skip >= size {
trace!(LOGGER, "Get buffer of {} bytes, subbuffer of first", size);
let new = front.get_buffer().copy_region(self.skip, Some(size)).unwrap();
Some(new)
} else {
@ -144,6 +187,7 @@ impl Adapter {
return Ok(s);
}
trace!(LOGGER, "Get buffer of {} bytes, copy into new buffer", size);
let mut new = Buffer::new_with_size(size).unwrap();
{
let mut map = new.map_readwrite().unwrap();
@ -156,6 +200,10 @@ impl Adapter {
pub fn flush(&mut self, size: usize) -> Result<(), AdapterError> {
if self.size < size {
debug!(LOGGER,
"Flush {} bytes, not enough data: have {}",
size,
self.size);
return Err(AdapterError::NotEnoughData);
}
@ -163,16 +211,26 @@ impl Adapter {
return Ok(());
}
trace!(LOGGER, "Flushing {} bytes, have {}", size, self.size);
let mut left = size;
while left > 0 {
let front_size = self.deque.front().unwrap().get_size() - self.skip;
if front_size <= left {
trace!(LOGGER,
"Flushing whole {:?}, {} more to go",
self.deque.front(),
left - front_size);
self.deque.pop_front();
self.size -= front_size;
self.skip = 0;
left -= front_size;
} else {
trace!(LOGGER,
"Flushing partial {:?}, {} more left",
self.deque.front(),
front_size - left);
self.skip += left;
self.size -= left;
left = 0;

View file

@ -581,7 +581,7 @@ impl Drop for Buffer {
fn gst_mini_object_unref(obj: *mut c_void);
}
if self.owned {
if self.owned && !self.raw.is_null() {
unsafe { gst_mini_object_unref(self.raw) }
}
}
@ -673,8 +673,10 @@ impl Drop for ReadMappedBuffer {
fn gst_buffer_unmap(buffer: *mut c_void, map: *mut GstMapInfo);
};
unsafe {
gst_buffer_unmap(self.buffer.raw, &mut self.map_info as *mut GstMapInfo);
if !self.buffer.raw.is_null() {
unsafe {
gst_buffer_unmap(self.buffer.raw, &mut self.map_info as *mut GstMapInfo);
}
}
}
}
@ -706,8 +708,10 @@ impl Drop for ReadWriteMappedBuffer {
fn gst_buffer_unmap(buffer: *mut c_void, map: *mut GstMapInfo);
};
unsafe {
gst_buffer_unmap(self.buffer.raw, &mut self.map_info as *mut GstMapInfo);
if !self.buffer.raw.is_null() {
unsafe {
gst_buffer_unmap(self.buffer.raw, &mut self.map_info as *mut GstMapInfo);
}
}
}
}

View file

@ -132,6 +132,8 @@ gst_rs_demuxer_init (GstRsDemuxer * demuxer, GstRsDemuxerClass * klass)
gst_element_add_pad (GST_ELEMENT (demuxer), demuxer->sinkpad);
demuxer->flow_combiner = gst_flow_combiner_new ();
GST_DEBUG_OBJECT (demuxer, "Instantiating");
}
static void
@ -139,6 +141,7 @@ gst_rs_demuxer_finalize (GObject * object)
{
GstRsDemuxer *demuxer = GST_RS_DEMUXER (object);
GST_DEBUG_OBJECT (demuxer, "Finalizing");
gst_flow_combiner_free (demuxer->flow_combiner);
demuxer_drop (demuxer->instance);
@ -158,8 +161,12 @@ gst_rs_demuxer_sink_activate (GstPad * pad, GstObject * parent)
return FALSE;
}
// TODO
//if (gst_query_has_scheduling_mode_with_flags (query, GST_PAD_MODE_PULL, GST_SCHEDULING_FLAG_SEEKABLE))
//if (gst_query_has_scheduling_mode_with_flags (query, GST_PAD_MODE_PULL, GST_SCHEDULING_FLAG_SEEKABLE)) {
// GST_DEBUG_OBJECT (demuxer, "Activating in PULL mode");
// mode = GST_PAD_MODE_PULL;
//} else {
//GST_DEBUG_OBJECT (demuxer, "Activating in PUSH mode");
//}
gst_query_unref (query);
demuxer->upstream_size = -1;
@ -179,7 +186,11 @@ gst_rs_demuxer_sink_activate_mode (GstPad * pad,
GstRsDemuxer *demuxer = GST_RS_DEMUXER (parent);
gboolean res = TRUE;
GST_DEBUG_OBJECT (demuxer, "%s pad in %s mode",
(active ? "Activating" : "Deactivating"), gst_pad_mode_get_name (mode));
if (active) {
GST_DEBUG_OBJECT (demuxer, "Starting");
if (!demuxer_start (demuxer->instance, demuxer->upstream_size,
mode == GST_PAD_MODE_PULL ? TRUE : FALSE)) {
res = FALSE;
@ -202,8 +213,16 @@ static GstFlowReturn
gst_rs_demuxer_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
{
GstRsDemuxer *demuxer = GST_RS_DEMUXER (parent);
GstFlowReturn res;
return demuxer_handle_buffer (demuxer->instance, buf);
GST_TRACE_OBJECT (demuxer, "Handling buffer %p", buf);
res = demuxer_handle_buffer (demuxer->instance, buf);
GST_TRACE_OBJECT (demuxer, "Handling buffer returned %s",
gst_flow_get_name (res));
return res;
}
static gboolean
@ -220,6 +239,7 @@ gst_rs_demuxer_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
break;
}
case GST_EVENT_EOS:
GST_DEBUG_OBJECT (demuxer, "Got EOS");
demuxer_end_of_stream (demuxer->instance);
res = gst_pad_event_default (pad, parent, event);
break;
@ -245,6 +265,8 @@ gst_rs_demuxer_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
gint64 position;
if (demuxer_get_position (demuxer->instance, &position)) {
GST_DEBUG_OBJECT (demuxer, "Returning position %" GST_TIME_FORMAT,
GST_TIME_ARGS (position));
gst_query_set_position (query, format, position);
res = TRUE;
} else {
@ -261,6 +283,8 @@ gst_rs_demuxer_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
gint64 duration;
if (demuxer_get_duration (demuxer->instance, &duration)) {
GST_DEBUG_OBJECT (demuxer, "Returning duration %" GST_TIME_FORMAT,
GST_TIME_ARGS (duration));
gst_query_set_duration (query, format, duration);
res = TRUE;
} else {
@ -302,6 +326,10 @@ gst_rs_demuxer_change_state (GstElement * element, GstStateChange transition)
GstRsDemuxer *demuxer = GST_RS_DEMUXER (element);
GstStateChangeReturn result;
GST_DEBUG_OBJECT (demuxer, "Change state %s to %s",
gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED:
demuxer->offset = 0;
@ -325,6 +353,7 @@ gst_rs_demuxer_change_state (GstElement * element, GstStateChange transition)
guint i;
/* Ignore stop failures */
GST_DEBUG_OBJECT (demuxer, "Stopping");
demuxer_stop (demuxer->instance);
gst_flow_combiner_clear (demuxer->flow_combiner);
@ -362,6 +391,9 @@ gst_rs_demuxer_add_stream (GstRsDemuxer * demuxer, guint32 index,
g_assert (demuxer->srcpads[index] == NULL);
GST_DEBUG_OBJECT (demuxer, "Adding stream %u with format %s and stream id %s",
index, format, stream_id);
templ =
gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (demuxer),
"src_%u");
@ -398,6 +430,8 @@ gst_rs_demuxer_add_stream (GstRsDemuxer * demuxer, guint32 index,
void
gst_rs_demuxer_added_all_streams (GstRsDemuxer * demuxer)
{
GST_DEBUG_OBJECT (demuxer, "No more pads");
gst_element_no_more_pads (GST_ELEMENT (demuxer));
demuxer->group_id = gst_util_group_id_next ();
}
@ -411,6 +445,8 @@ gst_rs_demuxer_stream_format_changed (GstRsDemuxer * demuxer, guint32 index,
g_assert (demuxer->srcpads[index] != NULL);
GST_DEBUG_OBJECT (demuxer, "Format changed for stream %u: %s", index, format);
caps = gst_caps_from_string (format);
event = gst_event_new_caps (caps);
gst_caps_unref (caps);
@ -426,6 +462,8 @@ gst_rs_demuxer_stream_eos (GstRsDemuxer * demuxer, guint32 index)
g_assert (index == -1 || demuxer->srcpads[index] != NULL);
GST_DEBUG_OBJECT (demuxer, "EOS for stream %u", index);
event = gst_event_new_eos ();
if (index == -1) {
gint i;
@ -450,8 +488,12 @@ gst_rs_demuxer_stream_push_buffer (GstRsDemuxer * demuxer, guint32 index,
g_assert (demuxer->srcpads[index] != NULL);
GST_DEBUG_OBJECT (demuxer, "Pushing buffer %p for pad %u", buffer, index);
res = gst_pad_push (demuxer->srcpads[index], buffer);
GST_DEBUG_OBJECT (demuxer, "Pushed buffer returned: %s",
gst_flow_get_name (res));
res = gst_flow_combiner_update_flow (demuxer->flow_combiner, res);
GST_DEBUG_OBJECT (demuxer, "Combined return: %s", gst_flow_get_name (res));
return res;
}
@ -461,6 +503,7 @@ gst_rs_demuxer_remove_all_streams (GstRsDemuxer * demuxer)
{
guint i;
GST_DEBUG_OBJECT (demuxer, "Removing all streams");
gst_flow_combiner_clear (demuxer->flow_combiner);
for (i = 0; i < G_N_ELEMENTS (demuxer->srcpads); i++) {
@ -475,7 +518,7 @@ gst_rs_demuxer_init_class (gpointer data)
{
demuxers = g_hash_table_new (g_direct_hash, g_direct_equal);
GST_DEBUG_CATEGORY_INIT (gst_rs_demuxer_debug, "rsdemux", 0,
"rsdemux element");
"Rust demuxer base class");
parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
}
@ -505,6 +548,15 @@ gst_rs_demuxer_register (GstPlugin * plugin, const gchar * name,
g_once (&gonce, gst_rs_demuxer_init_class, NULL);
GST_DEBUG ("Registering for %" GST_PTR_FORMAT ": %s", plugin, name);
GST_DEBUG (" long name: %s", long_name);
GST_DEBUG (" description: %s", description);
GST_DEBUG (" classification: %s", classification);
GST_DEBUG (" author: %s", author);
GST_DEBUG (" rank: %d", rank);
GST_DEBUG (" input formats: %s", input_format);
GST_DEBUG (" output formats: %s", output_formats);
data = g_new0 (ElementData, 1);
data->long_name = g_strdup (long_name);
data->description = g_strdup (description);

View file

@ -27,9 +27,12 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::u32;
use std::u64;
use slog::*;
use utils::*;
use error::*;
use buffer::*;
use log::*;
use plugin::Plugin;
pub type StreamIndex = u32;
@ -92,6 +95,7 @@ impl Stream {
pub struct DemuxerWrapper {
raw: *mut c_void,
logger: Logger,
demuxer: Mutex<Box<Demuxer>>,
panicked: AtomicBool,
}
@ -100,6 +104,11 @@ impl DemuxerWrapper {
fn new(raw: *mut c_void, demuxer: Box<Demuxer>) -> DemuxerWrapper {
DemuxerWrapper {
raw: raw,
logger: Logger::root(GstDebugDrain::new(Some(unsafe { &Element::new(raw) }),
"rsdemux",
0,
"Rust demuxer base class"),
None),
demuxer: Mutex::new(demuxer),
panicked: AtomicBool::new(false),
}
@ -108,6 +117,11 @@ impl DemuxerWrapper {
fn start(&self, upstream_size: u64, random_access: bool) -> bool {
let demuxer = &mut self.demuxer.lock().unwrap();
debug!(self.logger,
"Starting with upstream size {} and random access {}",
upstream_size,
random_access);
let upstream_size = if upstream_size == u64::MAX {
None
} else {
@ -115,8 +129,12 @@ impl DemuxerWrapper {
};
match demuxer.start(upstream_size, random_access) {
Ok(..) => true,
Ok(..) => {
trace!(self.logger, "Successfully started");
true
}
Err(ref msg) => {
error!(self.logger, "Failed to start: {:?}", msg);
self.post_message(msg);
false
}
@ -126,9 +144,15 @@ impl DemuxerWrapper {
fn stop(&self) -> bool {
let demuxer = &mut self.demuxer.lock().unwrap();
debug!(self.logger, "Stopping");
match demuxer.stop() {
Ok(..) => true,
Ok(..) => {
trace!(self.logger, "Successfully stop");
true
}
Err(ref msg) => {
error!(self.logger, "Failed to stop: {:?}", msg);
self.post_message(msg);
false
}
@ -138,7 +162,10 @@ impl DemuxerWrapper {
fn is_seekable(&self) -> bool {
let demuxer = &self.demuxer.lock().unwrap();
demuxer.is_seekable()
let seekable = demuxer.is_seekable();
debug!(self.logger, "Seekable {}", seekable);
seekable
}
@ -147,10 +174,12 @@ impl DemuxerWrapper {
match demuxer.get_position() {
None => {
trace!(self.logger, "Got no position");
*position = u64::MAX;
GBoolean::False
}
Some(pos) => {
trace!(self.logger, "Returning position {}", pos);
*position = pos;
GBoolean::True
}
@ -158,16 +187,18 @@ impl DemuxerWrapper {
}
fn get_duration(&self, position: &mut u64) -> GBoolean {
fn get_duration(&self, duration: &mut u64) -> GBoolean {
let demuxer = &self.demuxer.lock().unwrap();
match demuxer.get_duration() {
None => {
*position = u64::MAX;
trace!(self.logger, "Got no duration");
*duration = u64::MAX;
GBoolean::False
}
Some(pos) => {
*position = pos;
Some(dur) => {
trace!(self.logger, "Returning duration {}", dur);
*duration = dur;
GBoolean::True
}
}
@ -180,12 +211,15 @@ impl DemuxerWrapper {
let stop = if stop == u64::MAX { None } else { Some(stop) };
debug!(self.logger, "Seeking to {:?}-{:?}", start, stop);
let res = {
let mut demuxer = &mut self.demuxer.lock().unwrap();
match demuxer.seek(start, stop) {
Ok(res) => res,
Err(ref msg) => {
error!(self.logger, "Failed to seek: {:?}", msg);
self.post_message(msg);
return false;
}
@ -193,12 +227,17 @@ impl DemuxerWrapper {
};
match res {
SeekResult::TooEarly => false,
SeekResult::TooEarly => {
debug!(self.logger, "Seeked too early");
false
}
SeekResult::Ok(off) => {
trace!(self.logger, "Seeked successfully");
*offset = off;
true
}
SeekResult::Eos => {
debug!(self.logger, "Seeked after EOS");
*offset = u64::MAX;
unsafe {
@ -231,9 +270,12 @@ impl DemuxerWrapper {
let mut res = {
let mut demuxer = &mut self.demuxer.lock().unwrap();
trace!(self.logger, "Handling buffer {:?}", buffer);
match demuxer.handle_buffer(Some(buffer)) {
Ok(res) => res,
Err(flow_error) => {
error!(self.logger, "Failed handling buffer: {:?}", flow_error);
match flow_error {
FlowError::NotNegotiated(ref msg) |
FlowError::Error(ref msg) => self.post_message(msg),
@ -246,6 +288,8 @@ impl DemuxerWrapper {
// Loop until AllEos, NeedMoreData or error when pushing downstream
loop {
trace!(self.logger, "Handled {:?}", res);
match res {
HandleBufferResult::NeedMoreData => {
return GstFlowReturn::Ok;
@ -306,11 +350,14 @@ impl DemuxerWrapper {
}
};
trace!(self.logger, "Calling again");
res = {
let mut demuxer = &mut self.demuxer.lock().unwrap();
match demuxer.handle_buffer(None) {
Ok(res) => res,
Err(flow_error) => {
error!(self.logger, "Failed calling again: {:?}", flow_error);
match flow_error {
FlowError::NotNegotiated(ref msg) |
FlowError::Error(ref msg) => self.post_message(msg),
@ -326,9 +373,11 @@ impl DemuxerWrapper {
fn end_of_stream(&self) {
let mut demuxer = &mut self.demuxer.lock().unwrap();
debug!(self.logger, "End of stream");
match demuxer.end_of_stream() {
Ok(_) => (),
Err(ref msg) => {
error!(self.logger, "Failed end of stream: {:?}", msg);
self.post_message(msg);
}
}

View file

@ -22,6 +22,8 @@ extern crate url;
extern crate bitflags;
#[macro_use]
extern crate slog;
#[macro_use]
extern crate lazy_static;
#[macro_use]
pub mod utils;

View file

@ -119,6 +119,8 @@ gst_rs_sink_init (GstRsSink * sink, GstRsSinkClass * klass)
gst_base_sink_set_sync (GST_BASE_SINK (sink), FALSE);
GST_DEBUG_OBJECT (sink, "Instantiating");
sink->instance = sink_new (sink, data->create_instance);
}
@ -127,6 +129,8 @@ gst_rs_sink_finalize (GObject * object)
{
GstRsSink *sink = GST_RS_SINK (object);
GST_DEBUG_OBJECT (sink, "Finalizing");
sink_drop (sink->instance);
G_OBJECT_CLASS (parent_class)->finalize (object);
@ -174,8 +178,12 @@ gst_rs_sink_render (GstBaseSink * basesink, GstBuffer * buffer)
GstRsSink *sink = GST_RS_SINK (basesink);
GstFlowReturn ret;
GST_TRACE_OBJECT (sink, "Rendering buffer %p", buffer);
ret = sink_render (sink->instance, buffer);
GST_TRACE_OBJECT (sink, "Rendered buffer: %s", gst_flow_get_name (ret));
return ret;
}
@ -185,6 +193,8 @@ gst_rs_sink_start (GstBaseSink * basesink)
{
GstRsSink *sink = GST_RS_SINK (basesink);
GST_DEBUG_OBJECT (sink, "Starting");
return sink_start (sink->instance);
}
@ -194,6 +204,8 @@ gst_rs_sink_stop (GstBaseSink * basesink)
{
GstRsSink *sink = GST_RS_SINK (basesink);
GST_DEBUG_OBJECT (sink, "Stopping");
/* Ignore stop failures */
sink_stop (sink->instance);
@ -219,8 +231,13 @@ static gchar *
gst_rs_sink_uri_get_uri (GstURIHandler * handler)
{
GstRsSink *sink = GST_RS_SINK (handler);
gchar *res;
return sink_get_uri (sink->instance);
res = sink_get_uri (sink->instance);
GST_DEBUG_OBJECT (sink, "Returning URI %s", res);
return res;
}
static gboolean
@ -229,8 +246,12 @@ gst_rs_sink_uri_set_uri (GstURIHandler * handler, const gchar * uri,
{
GstRsSink *sink = GST_RS_SINK (handler);
if (!sink_set_uri (sink->instance, uri, err))
GST_DEBUG_OBJECT (sink, "Setting URI %s", uri);
if (!sink_set_uri (sink->instance, uri, err)) {
GST_ERROR_OBJECT (sink, "Failed to set URI: %s", (*err)->message);
return FALSE;
}
return TRUE;
}
@ -250,7 +271,8 @@ static gpointer
gst_rs_sink_init_class (gpointer data)
{
sinks = g_hash_table_new (g_direct_hash, g_direct_equal);
GST_DEBUG_CATEGORY_INIT (gst_rs_sink_debug, "rssink", 0, "rssink element");
GST_DEBUG_CATEGORY_INIT (gst_rs_sink_debug, "rssink", 0,
"Rust sink base class");
parent_class = g_type_class_ref (GST_TYPE_BASE_SINK);
}
@ -284,6 +306,14 @@ gst_rs_sink_register (GstPlugin * plugin, const gchar * name,
g_once (&gonce, gst_rs_sink_init_class, NULL);
GST_DEBUG ("Registering for %" GST_PTR_FORMAT ": %s", plugin, name);
GST_DEBUG (" long name: %s", long_name);
GST_DEBUG (" description: %s", description);
GST_DEBUG (" classification: %s", classification);
GST_DEBUG (" author: %s", author);
GST_DEBUG (" rank: %d", rank);
GST_DEBUG (" protocols: %s", protocols);
data = g_new0 (ElementData, 1);
data->name = g_strdup (name);
data->long_name = g_strdup (long_name);

View file

@ -28,9 +28,12 @@ use std::sync::atomic::{AtomicBool, Ordering};
use url::Url;
use slog::*;
use utils::*;
use error::*;
use buffer::*;
use log::*;
use plugin::Plugin;
#[derive(Debug)]
@ -56,6 +59,7 @@ impl ToGError for SinkError {
pub struct SinkWrapper {
raw: *mut c_void,
logger: Logger,
uri: Mutex<(Option<Url>, bool)>,
uri_validator: Box<UriValidator>,
sink: Mutex<Box<Sink>>,
@ -75,6 +79,11 @@ impl SinkWrapper {
fn new(raw: *mut c_void, sink: Box<Sink>) -> SinkWrapper {
SinkWrapper {
raw: raw,
logger: Logger::root(GstDebugDrain::new(Some(unsafe { &Element::new(raw) }),
"rssink",
0,
"Rust sink base class"),
None),
uri: Mutex::new((None, false)),
uri_validator: sink.uri_validator(),
sink: Mutex::new(sink),
@ -85,6 +94,8 @@ impl SinkWrapper {
fn set_uri(&self, uri_str: Option<&str>) -> Result<(), UriError> {
let uri_storage = &mut self.uri.lock().unwrap();
debug!(self.logger, "Setting URI {:?}", uri_str);
if uri_storage.1 {
return Err(UriError::new(UriErrorKind::BadState, Some("Already started".to_string())));
}
@ -114,6 +125,8 @@ impl SinkWrapper {
}
fn start(&self) -> bool {
debug!(self.logger, "Starting");
// Don't keep the URI locked while we call start later
let uri = match *self.uri.lock().unwrap() {
(Some(ref uri), ref mut started) => {
@ -121,6 +134,7 @@ impl SinkWrapper {
uri.clone()
}
(None, _) => {
error!(self.logger, "No URI given");
self.post_message(&error_msg!(SinkError::OpenFailed, ["No URI given"]));
return false;
}
@ -128,8 +142,13 @@ impl SinkWrapper {
let sink = &mut self.sink.lock().unwrap();
match sink.start(uri) {
Ok(..) => true,
Ok(..) => {
trace!(self.logger, "Started successfully");
true
}
Err(ref msg) => {
error!(self.logger, "Failed to start: {:?}", msg);
self.uri.lock().unwrap().1 = false;
self.post_message(msg);
false
@ -140,12 +159,17 @@ impl SinkWrapper {
fn stop(&self) -> bool {
let sink = &mut self.sink.lock().unwrap();
debug!(self.logger, "Stopping");
match sink.stop() {
Ok(..) => {
trace!(self.logger, "Stopped successfully");
self.uri.lock().unwrap().1 = false;
true
}
Err(ref msg) => {
error!(self.logger, "Failed to stop: {:?}", msg);
self.post_message(msg);
false
}
@ -155,9 +179,12 @@ impl SinkWrapper {
fn render(&self, buffer: &Buffer) -> GstFlowReturn {
let sink = &mut self.sink.lock().unwrap();
trace!(self.logger, "Rendering buffer {:?}", buffer);
match sink.render(buffer) {
Ok(..) => GstFlowReturn::Ok,
Err(flow_error) => {
error!(self.logger, "Failed to render: {:?}", flow_error);
match flow_error {
FlowError::NotNegotiated(ref msg) |
FlowError::Error(ref msg) => self.post_message(msg),
@ -204,6 +231,7 @@ pub unsafe extern "C" fn sink_set_uri(ptr: *const SinkWrapper,
match wrap.set_uri(uri_str) {
Err(err) => {
error!(wrap.logger, "Failed to set URI {:?}", err);
err.into_gerror(cerr);
GBoolean::False
}

View file

@ -128,6 +128,8 @@ gst_rs_src_init (GstRsSrc * src, GstRsSrcClass * klass)
gst_base_src_set_blocksize (GST_BASE_SRC (src), 4096);
GST_DEBUG_OBJECT (src, "Instantiating");
src->instance = source_new (src, data->create_instance);
}
@ -136,6 +138,8 @@ gst_rs_src_finalize (GObject * object)
{
GstRsSrc *src = GST_RS_SRC (object);
GST_DEBUG_OBJECT (src, "Finalizing");
source_drop (src->instance);
G_OBJECT_CLASS (parent_class)->finalize (object);
@ -182,16 +186,28 @@ gst_rs_src_fill (GstBaseSrc * basesrc, guint64 offset, guint length,
GstBuffer * buf)
{
GstRsSrc *src = GST_RS_SRC (basesrc);
GstFlowReturn ret;
return source_fill (src->instance, offset, length, buf);
GST_TRACE_OBJECT (src,
"Filling buffer %p, offset %" G_GUINT64_FORMAT " and length %"
G_GUINT64_FORMAT, *buf, offset, length);
ret = source_fill (src->instance, offset, length, buf);
GST_TRACE_OBJECT (src, "Filled buffer: %s", gst_flow_get_name (ret));
return ret;
}
static gboolean
gst_rs_src_is_seekable (GstBaseSrc * basesrc)
{
GstRsSrc *src = GST_RS_SRC (basesrc);
gboolean res;
return source_is_seekable (src->instance);
res = source_is_seekable (src->instance);
GST_DEBUG_OBJECT (src, "Returning seekable %d", res);
}
static gboolean
@ -201,6 +217,8 @@ gst_rs_src_get_size (GstBaseSrc * basesrc, guint64 * size)
*size = source_get_size (src->instance);
GST_DEBUG_OBJECT (src, "Returning size %" G_GUINT64_FORMAT, *size);
return TRUE;
}
@ -210,6 +228,8 @@ gst_rs_src_start (GstBaseSrc * basesrc)
{
GstRsSrc *src = GST_RS_SRC (basesrc);
GST_DEBUG_OBJECT (src, "Starting");
return source_start (src->instance);
}
@ -218,6 +238,8 @@ gst_rs_src_stop (GstBaseSrc * basesrc)
{
GstRsSrc *src = GST_RS_SRC (basesrc);
GST_DEBUG_OBJECT (src, "Stopping");
/* Ignore stop failures */
source_stop (src->instance);
@ -230,9 +252,14 @@ gst_rs_src_do_seek (GstBaseSrc * basesrc, GstSegment * segment)
GstRsSrc *src = GST_RS_SRC (basesrc);
gboolean ret;
GST_DEBUG_OBJECT (src, "Seeking to %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
GST_TIME_ARGS (segment->start), GST_TIME_ARGS (segment->stop));
ret = source_seek (src->instance, segment->start, segment->stop);
if (!ret)
if (!ret) {
GST_DEBUG_OBJECT (src, "Failed to seek");
return FALSE;
}
return GST_BASE_SRC_CLASS (parent_class)->do_seek (basesrc, segment);
}
@ -256,8 +283,13 @@ static gchar *
gst_rs_src_uri_get_uri (GstURIHandler * handler)
{
GstRsSrc *src = GST_RS_SRC (handler);
gchar *res;
return source_get_uri (src->instance);
res = source_get_uri (src->instance);
GST_DEBUG_OBJECT (src, "Returning URI %s", res);
return res;
}
static gboolean
@ -266,8 +298,12 @@ gst_rs_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
{
GstRsSrc *src = GST_RS_SRC (handler);
if (!source_set_uri (src->instance, uri, err))
GST_DEBUG_OBJECT (src, "Setting URI %s", uri);
if (!source_set_uri (src->instance, uri, err)) {
GST_ERROR_OBJECT (src, "Failed to set URI: %s", (*err)->message);
return FALSE;
}
return TRUE;
}
@ -287,7 +323,8 @@ static gpointer
gst_rs_source_init_class (gpointer data)
{
sources = g_hash_table_new (g_direct_hash, g_direct_equal);
GST_DEBUG_CATEGORY_INIT (gst_rs_src_debug, "rssrc", 0, "rssrc element");
GST_DEBUG_CATEGORY_INIT (gst_rs_src_debug, "rssrc", 0,
"Rust source base class");
parent_class = g_type_class_ref (GST_TYPE_BASE_SRC);
@ -323,6 +360,15 @@ gst_rs_source_register (GstPlugin * plugin, const gchar * name,
g_once (&gonce, gst_rs_source_init_class, NULL);
GST_DEBUG ("Registering for %" GST_PTR_FORMAT ": %s", plugin, name);
GST_DEBUG (" long name: %s", long_name);
GST_DEBUG (" description: %s", description);
GST_DEBUG (" classification: %s", classification);
GST_DEBUG (" author: %s", author);
GST_DEBUG (" rank: %d", rank);
GST_DEBUG (" protocols: %s", protocols);
GST_DEBUG (" push only: %d", push_only);
data = g_new0 (ElementData, 1);
data->long_name = g_strdup (long_name);
data->description = g_strdup (description);

View file

@ -28,10 +28,13 @@ use std::sync::atomic::{AtomicBool, Ordering};
use url::Url;
use slog::*;
use plugin::Plugin;
use utils::*;
use error::*;
use buffer::*;
use log::*;
#[derive(Debug)]
pub enum SourceError {
@ -56,6 +59,7 @@ impl ToGError for SourceError {
pub struct SourceWrapper {
raw: *mut c_void,
logger: Logger,
uri: Mutex<(Option<Url>, bool)>,
uri_validator: Box<UriValidator>,
source: Mutex<Box<Source>>,
@ -78,6 +82,11 @@ impl SourceWrapper {
fn new(raw: *mut c_void, source: Box<Source>) -> SourceWrapper {
SourceWrapper {
raw: raw,
logger: Logger::root(GstDebugDrain::new(Some(unsafe { &Element::new(raw) }),
"rssrc",
0,
"Rust source base class"),
None),
uri: Mutex::new((None, false)),
uri_validator: source.uri_validator(),
source: Mutex::new(source),
@ -88,6 +97,8 @@ impl SourceWrapper {
fn set_uri(&self, uri_str: Option<&str>) -> Result<(), UriError> {
let uri_storage = &mut self.uri.lock().unwrap();
debug!(self.logger, "Setting URI {:?}", uri_str);
if uri_storage.1 {
return Err(UriError::new(UriErrorKind::BadState, Some("Already started".to_string())));
}
@ -127,6 +138,8 @@ impl SourceWrapper {
}
fn start(&self) -> bool {
debug!(self.logger, "Starting");
// Don't keep the URI locked while we call start later
let uri = match *self.uri.lock().unwrap() {
(Some(ref uri), ref mut started) => {
@ -134,6 +147,7 @@ impl SourceWrapper {
uri.clone()
}
(None, _) => {
error!(self.logger, "No URI given");
self.post_message(&error_msg!(SourceError::OpenFailed, ["No URI given"]));
return false;
}
@ -141,8 +155,13 @@ impl SourceWrapper {
let source = &mut self.source.lock().unwrap();
match source.start(uri) {
Ok(..) => true,
Ok(..) => {
trace!(self.logger, "Started successfully");
true
}
Err(ref msg) => {
error!(self.logger, "Failed to start: {:?}", msg);
self.uri.lock().unwrap().1 = false;
self.post_message(msg);
false
@ -153,12 +172,17 @@ impl SourceWrapper {
fn stop(&self) -> bool {
let source = &mut self.source.lock().unwrap();
debug!(self.logger, "Stopping");
match source.stop() {
Ok(..) => {
trace!(self.logger, "Stopped successfully");
self.uri.lock().unwrap().1 = false;
true
}
Err(ref msg) => {
error!(self.logger, "Failed to stop: {:?}", msg);
self.post_message(msg);
false
}
@ -167,9 +191,17 @@ impl SourceWrapper {
fn fill(&self, offset: u64, length: u32, buffer: &mut Buffer) -> GstFlowReturn {
let source = &mut self.source.lock().unwrap();
trace!(self.logger,
"Filling buffer {:?} with offset {} and length {}",
buffer,
offset,
length);
match source.fill(offset, length, buffer) {
Ok(()) => GstFlowReturn::Ok,
Err(flow_error) => {
error!(self.logger, "Failed to fill: {:?}", flow_error);
match flow_error {
FlowError::NotNegotiated(ref msg) |
FlowError::Error(ref msg) => self.post_message(msg),
@ -183,9 +215,12 @@ impl SourceWrapper {
fn seek(&self, start: u64, stop: Option<u64>) -> bool {
let source = &mut self.source.lock().unwrap();
debug!(self.logger, "Seeking to {:?}-{:?}", start, stop);
match source.seek(start, stop) {
Ok(..) => true,
Err(ref msg) => {
error!(self.logger, "Failed to seek {:?}", msg);
self.post_message(msg);
false
}
@ -229,6 +264,7 @@ pub unsafe extern "C" fn source_set_uri(ptr: *const SourceWrapper,
match wrap.set_uri(uri_str) {
Err(err) => {
error!(wrap.logger, "Failed to set URI {:?}", err);
err.into_gerror(cerr);
GBoolean::False
}