diff --git a/gst-plugin-flv/src/flvdemux.rs b/gst-plugin-flv/src/flvdemux.rs index 33400e67..3a5875e6 100644 --- a/gst-plugin-flv/src/flvdemux.rs +++ b/gst-plugin-flv/src/flvdemux.rs @@ -490,7 +490,7 @@ pub struct FlvDemux { } impl FlvDemux { - pub fn new(_demuxer: &RsDemuxerWrapper) -> FlvDemux { + pub fn new(_demuxer: &RsDemuxer) -> FlvDemux { FlvDemux { cat: gst::DebugCategory::new( "rsflvdemux", @@ -503,13 +503,13 @@ impl FlvDemux { } } - pub fn new_boxed(demuxer: &RsDemuxerWrapper) -> Box { + pub fn new_boxed(demuxer: &RsDemuxer) -> Box { Box::new(Self::new(demuxer)) } fn handle_script_tag( &mut self, - demuxer: &RsDemuxerWrapper, + demuxer: &RsDemuxer, tag_header: &flavors::TagHeader, ) -> Result { if self.adapter.get_available() < (15 + tag_header.data_size) as usize { @@ -577,7 +577,7 @@ impl FlvDemux { fn update_audio_stream( &mut self, - demuxer: &RsDemuxerWrapper, + demuxer: &RsDemuxer, data_header: &flavors::AudioDataHeader, ) -> Result { gst_trace!( @@ -631,7 +631,7 @@ impl FlvDemux { fn handle_audio_tag( &mut self, - demuxer: &RsDemuxerWrapper, + demuxer: &RsDemuxer, tag_header: &flavors::TagHeader, data_header: &flavors::AudioDataHeader, ) -> Result { @@ -754,7 +754,7 @@ impl FlvDemux { fn update_video_stream( &mut self, - demuxer: &RsDemuxerWrapper, + demuxer: &RsDemuxer, data_header: &flavors::VideoDataHeader, ) -> Result { gst_trace!( @@ -809,7 +809,7 @@ impl FlvDemux { fn handle_video_tag( &mut self, - demuxer: &RsDemuxerWrapper, + demuxer: &RsDemuxer, tag_header: &flavors::TagHeader, data_header: &flavors::VideoDataHeader, ) -> Result { @@ -954,10 +954,7 @@ impl FlvDemux { Ok(HandleBufferResult::BufferForStream(VIDEO_STREAM_ID, buffer)) } - fn update_state( - &mut self, - demuxer: &RsDemuxerWrapper, - ) -> Result { + fn update_state(&mut self, demuxer: &RsDemuxer) -> Result { match self.state { State::Stopped => unreachable!(), State::NeedHeader => { @@ -1100,10 +1097,10 @@ impl FlvDemux { } } -impl Demuxer for FlvDemux { +impl DemuxerImpl for FlvDemux { fn start( &mut self, - demuxer: &RsDemuxerWrapper, + demuxer: &RsDemuxer, _upstream_size: Option, _random_access: bool, ) -> Result<(), ErrorMessage> { @@ -1112,7 +1109,7 @@ impl Demuxer for FlvDemux { Ok(()) } - fn stop(&mut self, demuxer: &RsDemuxerWrapper) -> Result<(), ErrorMessage> { + fn stop(&mut self, demuxer: &RsDemuxer) -> Result<(), ErrorMessage> { self.state = State::Stopped; self.adapter.clear(); self.streaming_state = None; @@ -1122,7 +1119,7 @@ impl Demuxer for FlvDemux { fn seek( &mut self, - demuxer: &RsDemuxerWrapper, + demuxer: &RsDemuxer, start: u64, stop: Option, ) -> Result { @@ -1131,7 +1128,7 @@ impl Demuxer for FlvDemux { fn handle_buffer( &mut self, - demuxer: &RsDemuxerWrapper, + demuxer: &RsDemuxer, buffer: Option, ) -> Result { if let Some(buffer) = buffer { @@ -1141,16 +1138,16 @@ impl Demuxer for FlvDemux { self.update_state(demuxer) } - fn end_of_stream(&mut self, demuxer: &RsDemuxerWrapper) -> Result<(), ErrorMessage> { + fn end_of_stream(&mut self, demuxer: &RsDemuxer) -> Result<(), ErrorMessage> { // nothing to do here, all data we have left is incomplete Ok(()) } - fn is_seekable(&self, demuxer: &RsDemuxerWrapper) -> bool { + fn is_seekable(&self, demuxer: &RsDemuxer) -> bool { false } - fn get_position(&self, demuxer: &RsDemuxerWrapper) -> Option { + fn get_position(&self, demuxer: &RsDemuxer) -> Option { if let Some(StreamingState { last_position, .. }) = self.streaming_state { return last_position; } @@ -1158,7 +1155,7 @@ impl Demuxer for FlvDemux { None } - fn get_duration(&self, demuxer: &RsDemuxerWrapper) -> Option { + fn get_duration(&self, demuxer: &RsDemuxer) -> Option { if let Some(StreamingState { metadata: Some(Metadata { duration, .. }), .. diff --git a/gst-plugin/Cargo.toml b/gst-plugin/Cargo.toml index 45577b3d..28f66a79 100644 --- a/gst-plugin/Cargo.toml +++ b/gst-plugin/Cargo.toml @@ -10,6 +10,7 @@ libc = "0.2" url = "1.1" lazy_static = "0.2" byteorder = "1.0" +mopa = "0.2" glib-sys = { git = "https://github.com/gtk-rs/sys" } gobject-sys = { git = "https://github.com/gtk-rs/sys" } gstreamer-sys = { git = "https://github.com/sdroege/gstreamer-sys", features = ["v1_10"] } diff --git a/gst-plugin/src/demuxer.rs b/gst-plugin/src/demuxer.rs index 81d3e8f9..b71ef7ca 100644 --- a/gst-plugin/src/demuxer.rs +++ b/gst-plugin/src/demuxer.rs @@ -6,32 +6,16 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use libc::c_char; -use std::os::raw::c_void; -use std::ffi::CString; - -use std::panic::{self, AssertUnwindSafe}; - use std::sync::Mutex; -use std::sync::atomic::{AtomicBool, Ordering}; use std::u32; use std::u64; -use std::ptr; -use std::mem; -use std::cell::{Cell, RefCell}; use std::collections::BTreeMap; -use std::ops::Deref; -use std::cmp; use error::*; - -use glib_ffi; -use gobject_ffi; -use gst_ffi; +use element::*; use glib; -use glib::translate::*; use gst; use gst::prelude::*; use gst_base; @@ -62,31 +46,33 @@ pub enum HandleBufferResult { Eos(Option), } -pub trait Demuxer { +pub type RsDemuxer = RsElement; + +pub trait DemuxerImpl: Send + 'static { fn start( &mut self, - demuxer: &RsDemuxerWrapper, + demuxer: &RsDemuxer, upstream_size: Option, random_access: bool, ) -> Result<(), ErrorMessage>; - fn stop(&mut self, demuxer: &RsDemuxerWrapper) -> Result<(), ErrorMessage>; + fn stop(&mut self, demuxer: &RsDemuxer) -> Result<(), ErrorMessage>; fn seek( &mut self, - demuxer: &RsDemuxerWrapper, + demuxer: &RsDemuxer, start: u64, stop: Option, ) -> Result; fn handle_buffer( &mut self, - demuxer: &RsDemuxerWrapper, + demuxer: &RsDemuxer, buffer: Option, ) -> Result; - fn end_of_stream(&mut self, demuxer: &RsDemuxerWrapper) -> Result<(), ErrorMessage>; + fn end_of_stream(&mut self, demuxer: &RsDemuxer) -> Result<(), ErrorMessage>; - fn is_seekable(&self, demuxer: &RsDemuxerWrapper) -> bool; - fn get_position(&self, demuxer: &RsDemuxerWrapper) -> Option; - fn get_duration(&self, demuxer: &RsDemuxerWrapper) -> Option; + fn is_seekable(&self, demuxer: &RsDemuxer) -> bool; + fn get_position(&self, demuxer: &RsDemuxer) -> Option; + fn get_duration(&self, demuxer: &RsDemuxer) -> Option; } #[derive(Debug)] @@ -108,104 +94,98 @@ impl Stream { pub struct DemuxerWrapper { cat: gst::DebugCategory, - demuxer: Mutex>, - panicked: AtomicBool, + demuxer: Mutex>, } impl DemuxerWrapper { - fn new(demuxer: Box) -> DemuxerWrapper { + fn new(demuxer_impl: Box) -> DemuxerWrapper { DemuxerWrapper { cat: gst::DebugCategory::new( "rsdemux", gst::DebugColorFlags::empty(), "Rust demuxer base class", ), - demuxer: Mutex::new(demuxer), - panicked: AtomicBool::new(false), + demuxer: Mutex::new(demuxer_impl), } } - fn start( - &self, - demuxer: &RsDemuxerWrapper, - upstream_size: Option, - random_access: bool, - ) -> bool { + fn start(&self, element: &RsDemuxer, upstream_size: Option, random_access: bool) -> bool { let demuxer_impl = &mut self.demuxer.lock().unwrap(); gst_debug!( self.cat, - obj: demuxer, + obj: element, "Starting with upstream size {:?} and random access {}", upstream_size, random_access ); - match demuxer_impl.start(demuxer, upstream_size, random_access) { + match demuxer_impl.start(element, upstream_size, random_access) { Ok(..) => { - gst_trace!(self.cat, obj: demuxer, "Successfully started",); + gst_trace!(self.cat, obj: element, "Successfully started",); true } Err(ref msg) => { - gst_error!(self.cat, obj: demuxer, "Failed to start: {:?}", msg); - self.post_message(demuxer, msg); + gst_error!(self.cat, obj: element, "Failed to start: {:?}", msg); + self.post_message(element, msg); false } } } - fn stop(&self, demuxer: &RsDemuxerWrapper) -> bool { + fn stop(&self, element: &RsDemuxer) -> bool { let demuxer_impl = &mut self.demuxer.lock().unwrap(); - gst_debug!(self.cat, obj: demuxer, "Stopping"); + gst_debug!(self.cat, obj: element, "Stopping"); - match demuxer_impl.stop(demuxer) { + match demuxer_impl.stop(element) { Ok(..) => { - gst_trace!(self.cat, obj: demuxer, "Successfully stop"); + gst_trace!(self.cat, obj: element, "Successfully stop"); true } Err(ref msg) => { - gst_error!(self.cat, obj: demuxer, "Failed to stop: {:?}", msg); - self.post_message(demuxer, msg); + gst_error!(self.cat, obj: element, "Failed to stop: {:?}", msg); + self.post_message(element, msg); false } } } - fn is_seekable(&self, demuxer: &RsDemuxerWrapper) -> bool { + fn is_seekable(&self, element: &RsDemuxer) -> bool { let demuxer_impl = &self.demuxer.lock().unwrap(); - let seekable = demuxer_impl.is_seekable(demuxer); - gst_debug!(self.cat, obj: demuxer, "Seekable {}", seekable); + let seekable = demuxer_impl.is_seekable(element); + gst_debug!(self.cat, obj: element, "Seekable {}", seekable); seekable } - fn get_position(&self, demuxer: &RsDemuxerWrapper) -> Option { + fn get_position(&self, element: &RsDemuxer) -> Option { let demuxer_impl = &self.demuxer.lock().unwrap(); - demuxer_impl.get_position(demuxer) + demuxer_impl.get_position(element) } - fn get_duration(&self, demuxer: &RsDemuxerWrapper) -> Option { + fn get_duration(&self, element: &RsDemuxer) -> Option { let demuxer_impl = &self.demuxer.lock().unwrap(); - demuxer_impl.get_duration(demuxer) + demuxer_impl.get_duration(element) } - fn seek(&self, demuxer: &RsDemuxerWrapper, start: u64, stop: u64, offset: &mut u64) -> bool { + fn seek(&self, element: &RsDemuxer, start: u64, stop: u64, offset: &mut u64) -> bool { + let demuxer = element.get_impl().downcast_ref::().unwrap(); let stop = if stop == u64::MAX { None } else { Some(stop) }; - gst_debug!(self.cat, obj: demuxer, "Seeking to {:?}-{:?}", start, stop); + gst_debug!(self.cat, obj: element, "Seeking to {:?}-{:?}", start, stop); let res = { let mut demuxer_impl = &mut self.demuxer.lock().unwrap(); - match demuxer_impl.seek(demuxer, start, stop) { + match demuxer_impl.seek(element, start, stop) { Ok(res) => res, Err(ref msg) => { - gst_error!(self.cat, obj: demuxer, "Failed to seek: {:?}", msg); - self.post_message(demuxer, msg); + gst_error!(self.cat, obj: element, "Failed to seek: {:?}", msg); + self.post_message(element, msg); return false; } } @@ -213,43 +193,44 @@ impl DemuxerWrapper { match res { SeekResult::TooEarly => { - gst_debug!(self.cat, obj: demuxer, "Seeked too early"); + gst_debug!(self.cat, obj: element, "Seeked too early"); false } SeekResult::Ok(off) => { - gst_trace!(self.cat, obj: demuxer, "Seeked successfully"); + gst_trace!(self.cat, obj: element, "Seeked successfully"); *offset = off; true } SeekResult::Eos => { - gst_debug!(self.cat, obj: demuxer, "Seeked after EOS"); + gst_debug!(self.cat, obj: element, "Seeked after EOS"); *offset = u64::MAX; - demuxer.stream_eos(None); + demuxer.stream_eos(element, None); true } } } - fn handle_buffer(&self, demuxer: &RsDemuxerWrapper, buffer: gst::Buffer) -> gst::FlowReturn { + fn handle_buffer(&self, element: &RsDemuxer, buffer: gst::Buffer) -> gst::FlowReturn { + let demuxer = element.get_impl().downcast_ref::().unwrap(); let mut res = { let mut demuxer_impl = &mut self.demuxer.lock().unwrap(); - gst_trace!(self.cat, obj: demuxer, "Handling buffer {:?}", buffer); + gst_trace!(self.cat, obj: element, "Handling buffer {:?}", buffer); - match demuxer_impl.handle_buffer(demuxer, Some(buffer)) { + match demuxer_impl.handle_buffer(element, Some(buffer)) { Ok(res) => res, Err(flow_error) => { gst_error!( self.cat, - obj: demuxer, + obj: element, "Failed handling buffer: {:?}", flow_error ); match flow_error { FlowError::NotNegotiated(ref msg) | FlowError::Error(ref msg) => { - self.post_message(demuxer, msg) + self.post_message(element, msg) } _ => (), } @@ -260,33 +241,33 @@ impl DemuxerWrapper { // Loop until AllEos, NeedMoreData or error when pushing downstream loop { - gst_trace!(self.cat, obj: demuxer, "Handled {:?}", res); + gst_trace!(self.cat, obj: element, "Handled {:?}", res); match res { HandleBufferResult::NeedMoreData => { return gst::FlowReturn::Ok; } HandleBufferResult::StreamAdded(stream) => { - demuxer.add_stream(stream.index, stream.caps, &stream.stream_id); + demuxer.add_stream(element, stream.index, stream.caps, &stream.stream_id); } HandleBufferResult::HaveAllStreams => { - demuxer.added_all_streams(); + demuxer.added_all_streams(element); } HandleBufferResult::StreamChanged(stream) => { - demuxer.stream_format_changed(stream.index, stream.caps); + demuxer.stream_format_changed(element, stream.index, stream.caps); } HandleBufferResult::StreamsChanged(streams) => for stream in streams { - demuxer.stream_format_changed(stream.index, stream.caps); + demuxer.stream_format_changed(element, stream.index, stream.caps); }, HandleBufferResult::BufferForStream(index, buffer) => { - let flow_ret = demuxer.stream_push_buffer(index, buffer); + let flow_ret = demuxer.stream_push_buffer(element, index, buffer); if flow_ret != gst::FlowReturn::Ok { return flow_ret; } } HandleBufferResult::Eos(index) => { - demuxer.stream_eos(index); + demuxer.stream_eos(element, index); return gst::FlowReturn::Eos; } HandleBufferResult::Again => { @@ -294,22 +275,22 @@ impl DemuxerWrapper { } }; - gst_trace!(self.cat, obj: demuxer, "Calling again"); + gst_trace!(self.cat, obj: element, "Calling again"); res = { let mut demuxer_impl = &mut self.demuxer.lock().unwrap(); - match demuxer_impl.handle_buffer(demuxer, None) { + match demuxer_impl.handle_buffer(element, None) { Ok(res) => res, Err(flow_error) => { gst_error!( self.cat, - obj: demuxer, + obj: element, "Failed calling again: {:?}", flow_error ); match flow_error { FlowError::NotNegotiated(ref msg) | FlowError::Error(ref msg) => { - self.post_message(demuxer, msg) + self.post_message(element, msg) } _ => (), } @@ -320,21 +301,21 @@ impl DemuxerWrapper { } } - fn end_of_stream(&self, demuxer: &RsDemuxerWrapper) { + fn end_of_stream(&self, element: &RsDemuxer) { let mut demuxer_impl = &mut self.demuxer.lock().unwrap(); - gst_debug!(self.cat, obj: demuxer, "End of stream"); - match demuxer_impl.end_of_stream(demuxer) { + gst_debug!(self.cat, obj: element, "End of stream"); + match demuxer_impl.end_of_stream(element) { Ok(_) => (), Err(ref msg) => { - gst_error!(self.cat, obj: demuxer, "Failed end of stream: {:?}", msg); - self.post_message(demuxer, msg); + gst_error!(self.cat, obj: element, "Failed end of stream: {:?}", msg); + self.post_message(element, msg); } } } - fn post_message(&self, demuxer: &RsDemuxerWrapper, msg: &ErrorMessage) { - msg.post(demuxer); + fn post_message(&self, element: &RsDemuxer, msg: &ErrorMessage) { + msg.post(element); } } @@ -345,124 +326,111 @@ pub struct DemuxerInfo { pub classification: String, pub author: String, pub rank: u32, - pub create_instance: fn(&RsDemuxerWrapper) -> Box, + pub create_instance: fn(&RsDemuxer) -> Box, pub input_caps: gst::Caps, pub output_caps: gst::Caps, } -struct OrderedPad(pub gst::Pad); +pub struct Demuxer { + sinkpad: gst::Pad, + flow_combiner: Mutex, + group_id: Mutex, + srcpads: Mutex>, + wrap: Box, +} -impl Deref for OrderedPad { - type Target = gst::Pad; +#[derive(Default)] +pub struct UniqueFlowCombiner(gst_base::FlowCombiner); - fn deref(&self) -> &Self::Target { - &self.0 +impl UniqueFlowCombiner { + fn add_pad(&mut self, pad: &gst::Pad) { + self.0.add_pad(pad); + } + + fn clear(&mut self) { + self.0.clear(); + } + + fn update_flow(&mut self, flow_ret: gst::FlowReturn) -> gst::FlowReturn { + self.0.update_flow(flow_ret) } } -impl AsRef for OrderedPad { - fn as_ref(&self) -> &gst::Pad { - &self.0 - } -} +unsafe impl Send for UniqueFlowCombiner {} +unsafe impl Sync for UniqueFlowCombiner {} -impl PartialEq for OrderedPad { - fn eq(&self, other: &Self) -> bool { - self.get_name() == other.get_name() - } -} - -impl Eq for OrderedPad {} - -impl PartialOrd for OrderedPad { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for OrderedPad { - fn cmp(&self, other: &Self) -> cmp::Ordering { - self.get_name().cmp(&other.get_name()) - } -} - -glib_wrapper! { - pub struct RsDemuxerWrapper(Object): [gst::Element => gst_ffi::GstElement, - gst::Object => gst_ffi::GstObject]; - - match fn { - get_type => || rs_demuxer_get_type(), - } -} - -impl RsDemuxerWrapper { +impl Demuxer { fn get_wrap(&self) -> &DemuxerWrapper { - let stash = self.to_glib_none(); - let demuxer: *mut RsDemuxer = stash.0; - - unsafe { &*((*demuxer).wrap) } + &self.wrap } - fn get_private(&self) -> &RsDemuxerPrivate { - let stash = self.to_glib_none(); - let demuxer: *mut RsDemuxer = stash.0; - - unsafe { &*((*demuxer).private) } + fn new(demuxer: &RsDemuxer, sinkpad: gst::Pad, demuxer_info: &DemuxerInfo) -> Self { + Self { + sinkpad: sinkpad, + flow_combiner: Mutex::new(Default::default()), + group_id: Mutex::new(gst::util_group_id_next()), + srcpads: Mutex::new(BTreeMap::new()), + wrap: Box::new(DemuxerWrapper::new((demuxer_info.create_instance)(demuxer))), + } } - fn change_state(&self, transition: gst::StateChange) -> gst::StateChangeReturn { - let mut ret = gst::StateChangeReturn::Success; - let wrap = self.get_wrap(); - let private = self.get_private(); + fn class_init( + klass: &mut RsElementClass, + long_name: &str, + classification: &str, + description: &str, + author: &str, + input_caps: &gst::Caps, + output_caps: &gst::Caps, + ) { + klass.set_metadata(long_name, classification, description, author); - gst_trace!(wrap.cat, obj: self, "Changing state {:?}", transition); + let pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + input_caps, + ); + klass.add_pad_template(pad_template); - match transition { - gst::StateChange::ReadyToPaused => { - // TODO - private.group_id.set(gst::util_group_id_next()); - } - _ => (), - } - - ret = self.parent_change_state(transition); - if ret == gst::StateChangeReturn::Failure { - return ret; - } - - match transition { - gst::StateChange::PausedToReady => { - private.flow_combiner.borrow_mut().clear(); - let mut srcpads = private.srcpads.borrow_mut(); - for (_, pad) in srcpads.iter().by_ref() { - self.remove_pad(pad.as_ref()).unwrap(); - } - srcpads.clear(); - } - _ => (), - } - - ret + let pad_template = gst::PadTemplate::new( + "src_%u", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + output_caps, + ); + klass.add_pad_template(pad_template); } - fn add_stream(&self, index: u32, caps: gst::Caps, stream_id: &str) { - let private = self.get_private(); + fn init(element: &RsElement, demuxer_info: &DemuxerInfo) -> Box { + let templ = element.get_pad_template("sink").unwrap(); + let sinkpad = gst::Pad::new_from_template(&templ, "sink"); + sinkpad.set_activate_function(Demuxer::sink_activate); + sinkpad.set_activatemode_function(Demuxer::sink_activatemode); + sinkpad.set_chain_function(Demuxer::sink_chain); + sinkpad.set_event_function(Demuxer::sink_event); + element.add_pad(&sinkpad).unwrap(); - let mut srcpads = private.srcpads.borrow_mut(); + let imp = Self::new(element, sinkpad, demuxer_info); + Box::new(imp) + } + + fn add_stream(&self, element: &RsElement, index: u32, caps: gst::Caps, stream_id: &str) { + let mut srcpads = self.srcpads.lock().unwrap(); assert!(!srcpads.contains_key(&index)); - let templ = self.get_pad_template("src_%u").unwrap(); + let templ = element.get_pad_template("src_%u").unwrap(); let name = format!("src_{}", index); let pad = gst::Pad::new_from_template(&templ, Some(name.as_str())); - pad.set_query_function(RsDemuxerWrapper::src_query); - pad.set_event_function(RsDemuxerWrapper::src_event); + pad.set_query_function(Demuxer::src_query); + pad.set_event_function(Demuxer::src_event); pad.set_active(true).unwrap(); - let full_stream_id = pad.create_stream_id(self, stream_id).unwrap(); + let full_stream_id = pad.create_stream_id(element, stream_id).unwrap(); pad.push_event( gst::Event::new_stream_start(&full_stream_id) - .group_id(private.group_id.get()) + .group_id(*self.group_id.lock().unwrap()) .build(), ); pad.push_event(gst::Event::new_caps(&caps).build()); @@ -471,31 +439,27 @@ impl RsDemuxerWrapper { segment.init(gst::Format::Time); pad.push_event(gst::Event::new_segment(&segment).build()); - private.flow_combiner.borrow_mut().add_pad(&pad); - self.add_pad(&pad).unwrap(); + self.flow_combiner.lock().unwrap().add_pad(&pad); + element.add_pad(&pad).unwrap(); - srcpads.insert(index, OrderedPad(pad)); + srcpads.insert(index, pad); } - fn added_all_streams(&self) { - let private = self.get_private(); - - self.no_more_pads(); - private.group_id.set(gst::util_group_id_next()); + fn added_all_streams(&self, element: &RsElement) { + element.no_more_pads(); + *self.group_id.lock().unwrap() = gst::util_group_id_next(); } - fn stream_format_changed(&self, index: u32, caps: gst::Caps) { - let private = self.get_private(); - let srcpads = private.srcpads.borrow(); + fn stream_format_changed(&self, _element: &RsElement, index: u32, caps: gst::Caps) { + let srcpads = self.srcpads.lock().unwrap(); if let Some(pad) = srcpads.get(&index) { pad.push_event(gst::Event::new_caps(&caps).build()); } } - fn stream_eos(&self, index: Option) { - let private = self.get_private(); - let srcpads = private.srcpads.borrow(); + fn stream_eos(&self, _element: &RsElement, index: Option) { + let srcpads = self.srcpads.lock().unwrap(); let event = gst::Event::new_eos().build(); match index { @@ -508,58 +472,45 @@ impl RsDemuxerWrapper { }; } - fn stream_push_buffer(&self, index: u32, buffer: gst::Buffer) -> gst::FlowReturn { - let private = self.get_private(); - let srcpads = private.srcpads.borrow(); + fn stream_push_buffer( + &self, + _element: &RsElement, + index: u32, + buffer: gst::Buffer, + ) -> gst::FlowReturn { + let srcpads = self.srcpads.lock().unwrap(); if let Some(pad) = srcpads.get(&index) { - private - .flow_combiner - .borrow_mut() + self.flow_combiner + .lock() + .unwrap() .update_flow(pad.push(buffer)) } else { gst::FlowReturn::Error } } - fn remove_all_streams(&self) { - let private = self.get_private(); - private.flow_combiner.borrow_mut().clear(); - let mut srcpads = private.srcpads.borrow_mut(); + fn remove_all_streams(&self, element: &RsElement) { + self.flow_combiner.lock().unwrap().clear(); + let mut srcpads = self.srcpads.lock().unwrap(); for (_, pad) in srcpads.iter().by_ref() { - self.remove_pad(pad.as_ref()).unwrap(); + element.remove_pad(pad).unwrap(); } srcpads.clear(); } - fn parent_change_state(&self, transition: gst::StateChange) -> gst::StateChangeReturn { - unsafe { - let stash = self.to_glib_none(); - let demuxer: *mut RsDemuxer = stash.0; - let demuxer_klass = &**(demuxer as *const *const RsDemuxerClass); - let parent_klass = &*(demuxer_klass.parent_vtable as *const gst_ffi::GstElementClass); - - parent_klass - .change_state - .map(|f| { - from_glib(f(self.to_glib_none().0, transition.to_glib())) - }) - .unwrap_or(gst::StateChangeReturn::Failure) - } - } - fn sink_activate(_pad: &gst::Pad, parent: &Option) -> bool { - let this = parent + let element = parent .as_ref() .map(|o| o.clone()) .unwrap() - .downcast::() + .downcast::() .unwrap(); - let private = this.get_private(); + let demuxer = element.get_impl().downcast_ref::().unwrap(); let mode = { let mut query = gst::Query::new_scheduling(); - if !private.sinkpad.peer_query(query.get_mut().unwrap()) { + if !demuxer.sinkpad.peer_query(query.get_mut().unwrap()) { return false; } @@ -573,20 +524,7 @@ impl RsDemuxerWrapper { gst::PadMode::Push }; - let mut query = gst::Query::new_duration(gst::Format::Bytes); - let upstream_size = if private.sinkpad.peer_query(query.get_mut().unwrap()) { - use gst::QueryView; - - match query.view() { - QueryView::Duration(ref d) => Some(d.get().1 as u64), - _ => unreachable!(), - } - } else { - None - }; - private.upstream_size.set(upstream_size); - - match private.sinkpad.activate_mode(mode, true) { + match demuxer.sinkpad.activate_mode(mode, true) { Ok(_) => true, Err(_) => false, } @@ -598,36 +536,44 @@ impl RsDemuxerWrapper { mode: gst::PadMode, active: bool, ) -> bool { - let this = parent + let element = parent .as_ref() .map(|o| o.clone()) .unwrap() - .downcast::() + .downcast::() .unwrap(); - let private = this.get_private(); - let wrap = this.get_wrap(); + let demuxer = element.get_impl().downcast_ref::().unwrap(); + let wrap = demuxer.get_wrap(); if active { - if !wrap.start( - &this, - private.upstream_size.get(), - mode == gst::PadMode::Pull, - ) { + let mut query = gst::Query::new_duration(gst::Format::Bytes); + let upstream_size = if demuxer.sinkpad.peer_query(query.get_mut().unwrap()) { + use gst::QueryView; + + match query.view() { + QueryView::Duration(ref d) => Some(d.get().1 as u64), + _ => unreachable!(), + } + } else { + None + }; + + if !wrap.start(&element, upstream_size, mode == gst::PadMode::Pull) { return false; } if mode == gst::PadMode::Pull { // TODO - // private.sinkpad.start_task(...) + // demuxer.sinkpad.start_task(...) } true } else { if mode == gst::PadMode::Pull { - let _ = private.sinkpad.stop_task(); + let _ = demuxer.sinkpad.stop_task(); } - wrap.stop(&this) + wrap.stop(&element) } } @@ -636,30 +582,32 @@ impl RsDemuxerWrapper { parent: &Option, buffer: gst::Buffer, ) -> gst::FlowReturn { - let this = parent + let element = parent .as_ref() .map(|o| o.clone()) .unwrap() - .downcast::() + .downcast::() .unwrap(); - let wrap = this.get_wrap(); - wrap.handle_buffer(&this, buffer) + let demuxer = element.get_impl().downcast_ref::().unwrap(); + let wrap = demuxer.get_wrap(); + wrap.handle_buffer(&element, buffer) } fn sink_event(pad: &gst::Pad, parent: &Option, event: gst::Event) -> bool { use gst::EventView; - let this = parent + let element = parent .as_ref() .map(|o| o.clone()) .unwrap() - .downcast::() + .downcast::() .unwrap(); - let wrap = this.get_wrap(); + let demuxer = element.get_impl().downcast_ref::().unwrap(); + let wrap = demuxer.get_wrap(); match event.view() { EventView::Eos(..) => { - wrap.end_of_stream(&this); + wrap.end_of_stream(&element); pad.event_default(parent.as_ref(), event) } EventView::Segment(..) => pad.event_default(parent.as_ref(), event), @@ -670,20 +618,21 @@ impl RsDemuxerWrapper { fn src_query(pad: &gst::Pad, parent: &Option, query: &mut gst::QueryRef) -> bool { use gst::QueryView; - let this = parent + let element = parent .as_ref() .map(|o| o.clone()) .unwrap() - .downcast::() + .downcast::() .unwrap(); - let wrap = this.get_wrap(); + let demuxer = element.get_impl().downcast_ref::().unwrap(); + let wrap = demuxer.get_wrap(); match query.view_mut() { QueryView::Position(ref mut q) => { let (fmt, _) = q.get(); if fmt == gst::Format::Time { - let position = wrap.get_position(&this); - gst_trace!(wrap.cat, obj: &this, "Returning position {:?}", position); + let position = wrap.get_position(&element); + gst_trace!(wrap.cat, obj: &element, "Returning position {:?}", position); match position { None => return false, @@ -699,8 +648,8 @@ impl RsDemuxerWrapper { QueryView::Duration(ref mut q) => { let (fmt, _) = q.get(); if fmt == gst::Format::Time { - let duration = wrap.get_duration(&this); - gst_trace!(wrap.cat, obj: &this, "Returning duration {:?}", duration); + let duration = wrap.get_duration(&element); + gst_trace!(wrap.cat, obj: &element, "Returning duration {:?}", duration); match duration { None => return false, @@ -734,229 +683,72 @@ impl RsDemuxerWrapper { } } -#[repr(C)] -pub struct RsDemuxer { - parent: gst_ffi::GstElement, - wrap: *mut DemuxerWrapper, - private: *mut RsDemuxerPrivate, -} +impl ElementImpl for Demuxer { + fn change_state( + &self, + element: &RsElement, + transition: gst::StateChange, + ) -> gst::StateChangeReturn { + let mut ret = gst::StateChangeReturn::Success; + let wrap = self.get_wrap(); -#[repr(C)] -pub struct RsDemuxerClass { - parent_class: gst_ffi::GstElementClass, - demuxer_info: *const DemuxerInfo, - parent_vtable: glib_ffi::gconstpointer, -} + gst_trace!(wrap.cat, obj: element, "Changing state {:?}", transition); -pub struct RsDemuxerPrivate { - sinkpad: gst::Pad, - flow_combiner: RefCell, - upstream_size: Cell>, - group_id: Cell, - srcpads: RefCell>, -} - -unsafe fn rs_demuxer_get_type() -> glib_ffi::GType { - use std::sync::{Once, ONCE_INIT}; - - static mut TYPE: glib_ffi::GType = gobject_ffi::G_TYPE_INVALID; - static ONCE: Once = ONCE_INIT; - - ONCE.call_once(|| { - let type_info = gobject_ffi::GTypeInfo { - class_size: mem::size_of::() as u16, - base_init: None, - base_finalize: None, - class_init: Some(demuxer_class_init), - class_finalize: None, - class_data: ptr::null_mut(), - instance_size: mem::size_of::() as u16, - n_preallocs: 0, - instance_init: None, - value_table: ptr::null(), - }; - - let type_name = { - let mut idx = 0; - - loop { - let type_name = CString::new(format!("RsDemuxer-{}", idx)).unwrap(); - if gobject_ffi::g_type_from_name(type_name.as_ptr()) == gobject_ffi::G_TYPE_INVALID - { - break type_name; - } - idx += 1; + match transition { + gst::StateChange::ReadyToPaused => { + // TODO + *self.group_id.lock().unwrap() = gst::util_group_id_next(); } - }; + _ => (), + } - TYPE = gobject_ffi::g_type_register_static( - gst_ffi::gst_element_get_type(), - type_name.as_ptr(), - &type_info, - gobject_ffi::GTypeFlags::empty(), - ); - }); + ret = element.parent_change_state(transition); + if ret == gst::StateChangeReturn::Failure { + return ret; + } - TYPE -} + match transition { + gst::StateChange::PausedToReady => { + self.flow_combiner.lock().unwrap().clear(); + let mut srcpads = self.srcpads.lock().unwrap(); + for (_, pad) in srcpads.iter().by_ref() { + element.remove_pad(pad).unwrap(); + } + srcpads.clear(); + } + _ => (), + } -unsafe extern "C" fn demuxer_finalize(obj: *mut gobject_ffi::GObject) { - let demuxer = &mut *(obj as *mut RsDemuxer); - - drop(Box::from_raw(demuxer.wrap)); - demuxer.wrap = ptr::null_mut(); - - drop(Box::from_raw(demuxer.private)); - demuxer.private = ptr::null_mut(); - - let demuxer_klass = &**(obj as *const *const RsDemuxerClass); - let parent_klass = &*(demuxer_klass.parent_vtable as *const gobject_ffi::GObjectClass); - parent_klass.finalize.map(|f| f(obj)); -} - -unsafe extern "C" fn demuxer_sub_class_init( - klass: glib_ffi::gpointer, - klass_data: glib_ffi::gpointer, -) { - let demuxer_info = &*(klass_data as *const DemuxerInfo); - - { - let element_klass = &mut *(klass as *mut gst_ffi::GstElementClass); - - gst_ffi::gst_element_class_set_metadata( - element_klass, - demuxer_info.long_name.to_glib_none().0, - demuxer_info.classification.to_glib_none().0, - demuxer_info.description.to_glib_none().0, - demuxer_info.author.to_glib_none().0, - ); - - let pad_template = gst::PadTemplate::new( - "sink", - gst::PadDirection::Sink, - gst::PadPresence::Always, - &demuxer_info.input_caps, - ); - gst_ffi::gst_element_class_add_pad_template(element_klass, pad_template.to_glib_full()); - - let pad_template = gst::PadTemplate::new( - "src_%u", - gst::PadDirection::Src, - gst::PadPresence::Sometimes, - &demuxer_info.output_caps, - ); - gst_ffi::gst_element_class_add_pad_template(element_klass, pad_template.to_glib_full()); - } - - { - let demuxer_klass = &mut *(klass as *mut RsDemuxerClass); - - demuxer_klass.demuxer_info = demuxer_info; + ret } } -unsafe extern "C" fn demuxer_class_init( - klass: glib_ffi::gpointer, - _klass_data: glib_ffi::gpointer, -) { - { - let gobject_klass = &mut *(klass as *mut gobject_ffi::GObjectClass); - - gobject_klass.finalize = Some(demuxer_finalize); - } - - { - let element_klass = &mut *(klass as *mut gst_ffi::GstElementClass); - - element_klass.change_state = Some(demuxer_change_state); - } - - { - let demuxer_klass = &mut *(klass as *mut RsDemuxerClass); - - demuxer_klass.parent_vtable = gobject_ffi::g_type_class_peek_parent(klass); - } -} - -unsafe extern "C" fn demuxer_change_state( - ptr: *mut gst_ffi::GstElement, - transition: gst_ffi::GstStateChange, -) -> gst_ffi::GstStateChangeReturn { - let demuxer: &RsDemuxerWrapper = &from_glib_borrow(ptr as *mut RsDemuxer); - let wrap = demuxer.get_wrap(); - - panic_to_error!(wrap, demuxer, gst::StateChangeReturn::Failure, { - demuxer.change_state(from_glib(transition)) - }).to_glib() -} - -unsafe extern "C" fn demuxer_sub_init( - instance: *mut gobject_ffi::GTypeInstance, - klass: glib_ffi::gpointer, -) { - let demuxer = &mut *(instance as *mut RsDemuxer); - let demuxer_klass = &*(klass as *const RsDemuxerClass); - let demuxer_info = &*demuxer_klass.demuxer_info; - - let private = &mut demuxer.private as *mut _; - - let wrap = Box::new(DemuxerWrapper::new((demuxer_info.create_instance)( - &RsDemuxerWrapper::from_glib_borrow(instance as *mut _), - ))); - demuxer.wrap = Box::into_raw(wrap); - - let demuxer = &RsDemuxerWrapper::from_glib_borrow(demuxer as *mut _); - - let templ = demuxer.get_pad_template("sink").unwrap(); - let sinkpad = gst::Pad::new_from_template(&templ, "sink"); - sinkpad.set_activate_function(RsDemuxerWrapper::sink_activate); - sinkpad.set_activatemode_function(RsDemuxerWrapper::sink_activatemode); - sinkpad.set_chain_function(RsDemuxerWrapper::sink_chain); - sinkpad.set_event_function(RsDemuxerWrapper::sink_event); - demuxer.add_pad(&sinkpad).unwrap(); - - let private_data = Box::new(RsDemuxerPrivate { - sinkpad: sinkpad, - flow_combiner: RefCell::new(Default::default()), - upstream_size: Cell::new(None), - group_id: Cell::new(gst::util_group_id_next()), - srcpads: RefCell::new(BTreeMap::new()), - }); - *private = Box::into_raw(private_data); -} - - pub fn demuxer_register(plugin: &gst::Plugin, demuxer_info: DemuxerInfo) { - unsafe { - let parent_type = rs_demuxer_get_type(); - let type_name = format!("RsDemuxer-{}", demuxer_info.name); + let name = demuxer_info.name.clone(); + let rank = demuxer_info.rank; - let name = demuxer_info.name.clone(); - let rank = demuxer_info.rank; + let long_name = demuxer_info.long_name.clone(); + let classification = demuxer_info.classification.clone(); + let description = demuxer_info.description.clone(); + let author = demuxer_info.author.clone(); + let input_caps = demuxer_info.input_caps.clone(); + let output_caps = demuxer_info.output_caps.clone(); - let demuxer_info = Box::new(demuxer_info); - let demuxer_info_ptr = Box::into_raw(demuxer_info) as glib_ffi::gpointer; - - let type_info = gobject_ffi::GTypeInfo { - class_size: mem::size_of::() as u16, - base_init: None, - base_finalize: None, - class_init: Some(demuxer_sub_class_init), - class_finalize: None, - class_data: demuxer_info_ptr, - instance_size: mem::size_of::() as u16, - n_preallocs: 0, - instance_init: Some(demuxer_sub_init), - value_table: ptr::null(), - }; - - let type_ = gobject_ffi::g_type_register_static( - parent_type, - type_name.to_glib_none().0, - &type_info, - gobject_ffi::GTypeFlags::empty(), - ); - - gst::Element::register(plugin, &name, rank, from_glib(type_)); - } + element_register( + plugin, + &name, + rank, + move |klass| { + Demuxer::class_init( + klass, + &long_name, + &classification, + &description, + &author, + &input_caps, + &output_caps, + ) + }, + move |element| Demuxer::init(element, &demuxer_info), + ); } diff --git a/gst-plugin/src/element.rs b/gst-plugin/src/element.rs new file mode 100644 index 00000000..3a7db486 --- /dev/null +++ b/gst-plugin/src/element.rs @@ -0,0 +1,305 @@ +// Copyright (C) 2017 Sebastian Dröge +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::ffi::CString; +use std::ptr; +use std::mem; +use std::sync::atomic::AtomicBool; +use mopa; + +use glib_ffi; +use gobject_ffi; +use gst_ffi; + +use glib; +use glib::translate::*; +use gst; +use gst::prelude::*; + +pub trait ElementImpl: mopa::Any + Send + Sync + 'static { + fn change_state( + &self, + element: &RsElement, + transition: gst::StateChange, + ) -> gst::StateChangeReturn { + element.parent_change_state(transition) + } +} + +mopafy!(ElementImpl); + +pub unsafe trait Element: IsA { + fn parent_change_state(&self, transition: gst::StateChange) -> gst::StateChangeReturn { + unsafe { + // Our class + let klass = *(self.to_glib_none().0 as *const glib_ffi::gpointer); + // The parent class, RsElement or any other first-level Rust implementation + let parent_klass = gobject_ffi::g_type_class_peek_parent(klass); + // The actual parent class as defined in C + let parent_klass = &*(gobject_ffi::g_type_class_peek_parent(parent_klass) as + *const gst_ffi::GstElementClass); + parent_klass + .change_state + .map(|f| { + from_glib(f(self.to_glib_none().0, transition.to_glib())) + }) + .unwrap_or(gst::StateChangeReturn::Success) + } + } +} + +pub unsafe trait ElementClass { + fn add_pad_template(&mut self, pad_template: gst::PadTemplate) { + unsafe { + gst_ffi::gst_element_class_add_pad_template( + self as *const Self as *mut gst_ffi::GstElementClass, + pad_template.to_glib_full(), + ); + } + } + + fn set_metadata( + &mut self, + long_name: &str, + classification: &str, + description: &str, + author: &str, + ) { + unsafe { + gst_ffi::gst_element_class_set_metadata( + self as *const Self as *mut gst_ffi::GstElementClass, + long_name.to_glib_none().0, + classification.to_glib_none().0, + description.to_glib_none().0, + author.to_glib_none().0, + ); + } + } +} + +glib_wrapper! { + pub struct RsElement(Object): [gst::Element => gst_ffi::GstElement, + gst::Object => gst_ffi::GstObject]; + + match fn { + get_type => || ffi::rs_element_get_type(), + } +} + +impl RsElement { + pub fn get_impl(&self) -> &ElementImpl { + unsafe { + let stash = self.to_glib_none(); + let ptr: *const ffi::RsElement = stash.0; + (*ptr).get_impl() + } + } +} + +unsafe impl> Element for T {} +unsafe impl ElementClass for RsElementClass {} + +struct ElementData { + class_init: Box, + init: Box Box + Send + Sync + 'static>, +} + +pub mod ffi { + use super::*; + use super::RsElement as RsElementWrapper; + + #[repr(C)] + pub struct RsElement { + parent: gst_ffi::GstElement, + imp: *const Box, + panicked: AtomicBool, + } + + impl RsElement { + pub fn get_impl(&self) -> &ElementImpl { + unsafe { + assert!(!self.imp.is_null()); + &*(*self.imp) + } + } + } + + #[repr(C)] + pub struct RsElementClass { + parent_class: gst_ffi::GstElementClass, + element_data: *const ElementData, + } + + pub unsafe fn rs_element_get_type() -> glib_ffi::GType { + use std::sync::{Once, ONCE_INIT}; + + static mut TYPE: glib_ffi::GType = gobject_ffi::G_TYPE_INVALID; + static ONCE: Once = ONCE_INIT; + + ONCE.call_once(|| { + let type_info = gobject_ffi::GTypeInfo { + class_size: mem::size_of::() as u16, + base_init: None, + base_finalize: None, + class_init: Some(element_class_init), + class_finalize: None, + class_data: ptr::null_mut(), + instance_size: mem::size_of::() as u16, + n_preallocs: 0, + instance_init: None, + value_table: ptr::null(), + }; + + let type_name = { + let mut idx = 0; + + loop { + let type_name = CString::new(format!("RsElement-{}", idx)).unwrap(); + if gobject_ffi::g_type_from_name(type_name.as_ptr()) == + gobject_ffi::G_TYPE_INVALID + { + break type_name; + } + idx += 1; + } + }; + + TYPE = gobject_ffi::g_type_register_static( + gst_ffi::gst_element_get_type(), + type_name.as_ptr(), + &type_info, + gobject_ffi::GTypeFlags::empty(), + ); + }); + + TYPE + } + + unsafe extern "C" fn element_finalize(obj: *mut gobject_ffi::GObject) { + callback_guard!(); + let element = &mut *(obj as *mut RsElement); + + drop(Box::from_raw(element.imp as *mut Box)); + element.imp = ptr::null_mut(); + + let klass = *(obj as *const glib_ffi::gpointer); + let parent_klass = gobject_ffi::g_type_class_peek_parent(klass); + let parent_klass = &*(gobject_ffi::g_type_class_peek_parent(parent_klass) as + *const gobject_ffi::GObjectClass); + parent_klass.finalize.map(|f| f(obj)); + } + + unsafe extern "C" fn element_sub_class_init( + klass: glib_ffi::gpointer, + klass_data: glib_ffi::gpointer, + ) { + callback_guard!(); + let element_data = &*(klass_data as *const ElementData); + + { + let klass = &mut *(klass as *mut RsElementClass); + + klass.element_data = element_data; + + (element_data.class_init)(klass); + } + } + + unsafe extern "C" fn element_class_init( + klass: glib_ffi::gpointer, + _klass_data: glib_ffi::gpointer, + ) { + callback_guard!(); + { + let gobject_klass = &mut *(klass as *mut gobject_ffi::GObjectClass); + + gobject_klass.finalize = Some(element_finalize); + } + + { + let element_klass = &mut *(klass as *mut gst_ffi::GstElementClass); + + element_klass.change_state = Some(element_change_state); + } + } + + unsafe extern "C" fn element_change_state( + ptr: *mut gst_ffi::GstElement, + transition: gst_ffi::GstStateChange, + ) -> gst_ffi::GstStateChangeReturn { + callback_guard!(); + let element = &*(ptr as *mut RsElement); + let wrap: RsElementWrapper = from_glib_borrow(ptr as *mut RsElement); + let imp = &*element.imp; + + panic_to_error2!(&wrap, &element.panicked, gst::StateChangeReturn::Failure, { + imp.change_state(&wrap, from_glib(transition)) + }).to_glib() + } + + unsafe extern "C" fn element_sub_init( + instance: *mut gobject_ffi::GTypeInstance, + klass: glib_ffi::gpointer, + ) { + callback_guard!(); + let element = &mut *(instance as *mut RsElement); + let wrap: RsElementWrapper = from_glib_borrow(instance as *mut RsElement); + let klass = &*(klass as *const RsElementClass); + let element_data = &*klass.element_data; + + let imp = (element_data.init)(&wrap); + element.imp = Box::into_raw(Box::new(imp)); + } + + pub fn element_register( + plugin: &gst::Plugin, + name: &str, + rank: u32, + class_init: F, + init: G, + ) where + F: Fn(&mut RsElementClass) + Send + 'static, + G: Fn(&RsElementWrapper) -> Box + Send + Sync + 'static, + { + unsafe { + let parent_type = rs_element_get_type(); + let type_name = format!("RsElement-{}", name); + + let element_data = ElementData { + class_init: Box::new(class_init), + init: Box::new(init), + }; + let element_data = Box::into_raw(Box::new(element_data)) as glib_ffi::gpointer; + + let type_info = gobject_ffi::GTypeInfo { + class_size: mem::size_of::() as u16, + base_init: None, + base_finalize: None, + class_init: Some(element_sub_class_init), + class_finalize: None, + class_data: element_data, + instance_size: mem::size_of::() as u16, + n_preallocs: 0, + instance_init: Some(element_sub_init), + value_table: ptr::null(), + }; + + let type_ = gobject_ffi::g_type_register_static( + parent_type, + type_name.to_glib_none().0, + &type_info, + gobject_ffi::GTypeFlags::empty(), + ); + + gst::Element::register(plugin, &name, rank, from_glib(type_)); + } + } +} + +pub use self::ffi::RsElementClass; +pub use self::ffi::element_register; diff --git a/gst-plugin/src/error.rs b/gst-plugin/src/error.rs index 053359b2..1f960647 100644 --- a/gst-plugin/src/error.rs +++ b/gst-plugin/src/error.rs @@ -243,3 +243,34 @@ macro_rules! panic_to_error( } }}; ); + +#[macro_export] +macro_rules! panic_to_error2( + ($element:expr, $panicked:expr, $ret:expr, $code:block) => {{ + use std::panic::{self, AssertUnwindSafe}; + use std::sync::atomic::Ordering; + use $crate::error::ErrorMessage; + + if $panicked.load(Ordering::Relaxed) { + error_msg!(gst::LibraryError::Failed, ["Panicked"]).post($element); + $ret + } else { + let result = panic::catch_unwind(AssertUnwindSafe(|| $code)); + + match result { + Ok(result) => result, + Err(err) => { + $panicked.store(true, Ordering::Relaxed); + if let Some(cause) = err.downcast_ref::<&str>() { + error_msg!(gst::LibraryError::Failed, ["Panicked: {}", cause]).post($element); + } else if let Some(cause) = err.downcast_ref::() { + error_msg!(gst::LibraryError::Failed, ["Panicked: {}", cause]).post($element); + } else { + error_msg!(gst::LibraryError::Failed, ["Panicked"]).post($element); + } + $ret + } + } + } + }}; +); diff --git a/gst-plugin/src/lib.rs b/gst-plugin/src/lib.rs index 847d66da..d0934eff 100644 --- a/gst-plugin/src/lib.rs +++ b/gst-plugin/src/lib.rs @@ -12,6 +12,8 @@ extern crate gstreamer_base_sys as gst_base_ffi; #[macro_use] extern crate lazy_static; extern crate libc; +#[macro_use] +extern crate mopa; extern crate url; pub extern crate glib_sys as glib_ffi; pub extern crate gobject_sys as gobject_ffi; @@ -23,6 +25,12 @@ pub extern crate glib; #[macro_use] pub extern crate gstreamer as gst; +macro_rules! callback_guard { + () => ( + let _guard = ::glib::CallbackGuard::new(); + ) +} + #[macro_use] pub mod utils; #[macro_use] @@ -34,3 +42,5 @@ pub mod source; pub mod sink; pub mod demuxer; pub mod bytes; + +pub mod element; diff --git a/gst-plugin/src/sink.rs b/gst-plugin/src/sink.rs index b52bd41e..cd988417 100644 --- a/gst-plugin/src/sink.rs +++ b/gst-plugin/src/sink.rs @@ -40,7 +40,7 @@ pub struct SinkWrapper { panicked: AtomicBool, } -pub trait Sink { +pub trait Sink: Send + 'static { fn uri_validator(&self) -> Box; fn start(&mut self, sink: &RsSinkWrapper, uri: Url) -> Result<(), ErrorMessage>; diff --git a/gst-plugin/src/source.rs b/gst-plugin/src/source.rs index 1c33f8d7..f8c170e4 100644 --- a/gst-plugin/src/source.rs +++ b/gst-plugin/src/source.rs @@ -40,7 +40,7 @@ pub struct SourceWrapper { panicked: AtomicBool, } -pub trait Source { +pub trait Source: Send + 'static { fn uri_validator(&self) -> Box; fn is_seekable(&self, src: &RsSrcWrapper) -> bool;