Implement a generic Element implementation infrastructure

And implement the Demuxer base class on top of that, with no unsafe code
left whatsoever
This commit is contained in:
Sebastian Dröge 2017-09-19 14:43:33 +03:00
parent 2781e0b3d8
commit 62a237ff0c
8 changed files with 633 additions and 497 deletions

View file

@ -490,7 +490,7 @@ pub struct FlvDemux {
}
impl FlvDemux {
pub fn new(_demuxer: &RsDemuxerWrapper) -> FlvDemux {
pub fn new(_demuxer: &RsDemuxer) -> FlvDemux {
FlvDemux {
cat: gst::DebugCategory::new(
"rsflvdemux",
@ -503,13 +503,13 @@ impl FlvDemux {
}
}
pub fn new_boxed(demuxer: &RsDemuxerWrapper) -> Box<Demuxer> {
pub fn new_boxed(demuxer: &RsDemuxer) -> Box<DemuxerImpl> {
Box::new(Self::new(demuxer))
}
fn handle_script_tag(
&mut self,
demuxer: &RsDemuxerWrapper,
demuxer: &RsDemuxer,
tag_header: &flavors::TagHeader,
) -> Result<HandleBufferResult, FlowError> {
if self.adapter.get_available() < (15 + tag_header.data_size) as usize {
@ -577,7 +577,7 @@ impl FlvDemux {
fn update_audio_stream(
&mut self,
demuxer: &RsDemuxerWrapper,
demuxer: &RsDemuxer,
data_header: &flavors::AudioDataHeader,
) -> Result<HandleBufferResult, FlowError> {
gst_trace!(
@ -631,7 +631,7 @@ impl FlvDemux {
fn handle_audio_tag(
&mut self,
demuxer: &RsDemuxerWrapper,
demuxer: &RsDemuxer,
tag_header: &flavors::TagHeader,
data_header: &flavors::AudioDataHeader,
) -> Result<HandleBufferResult, FlowError> {
@ -754,7 +754,7 @@ impl FlvDemux {
fn update_video_stream(
&mut self,
demuxer: &RsDemuxerWrapper,
demuxer: &RsDemuxer,
data_header: &flavors::VideoDataHeader,
) -> Result<HandleBufferResult, FlowError> {
gst_trace!(
@ -809,7 +809,7 @@ impl FlvDemux {
fn handle_video_tag(
&mut self,
demuxer: &RsDemuxerWrapper,
demuxer: &RsDemuxer,
tag_header: &flavors::TagHeader,
data_header: &flavors::VideoDataHeader,
) -> Result<HandleBufferResult, FlowError> {
@ -954,10 +954,7 @@ impl FlvDemux {
Ok(HandleBufferResult::BufferForStream(VIDEO_STREAM_ID, buffer))
}
fn update_state(
&mut self,
demuxer: &RsDemuxerWrapper,
) -> Result<HandleBufferResult, FlowError> {
fn update_state(&mut self, demuxer: &RsDemuxer) -> Result<HandleBufferResult, FlowError> {
match self.state {
State::Stopped => unreachable!(),
State::NeedHeader => {
@ -1100,10 +1097,10 @@ impl FlvDemux {
}
}
impl Demuxer for FlvDemux {
impl DemuxerImpl for FlvDemux {
fn start(
&mut self,
demuxer: &RsDemuxerWrapper,
demuxer: &RsDemuxer,
_upstream_size: Option<u64>,
_random_access: bool,
) -> Result<(), ErrorMessage> {
@ -1112,7 +1109,7 @@ impl Demuxer for FlvDemux {
Ok(())
}
fn stop(&mut self, demuxer: &RsDemuxerWrapper) -> Result<(), ErrorMessage> {
fn stop(&mut self, demuxer: &RsDemuxer) -> Result<(), ErrorMessage> {
self.state = State::Stopped;
self.adapter.clear();
self.streaming_state = None;
@ -1122,7 +1119,7 @@ impl Demuxer for FlvDemux {
fn seek(
&mut self,
demuxer: &RsDemuxerWrapper,
demuxer: &RsDemuxer,
start: u64,
stop: Option<u64>,
) -> Result<SeekResult, ErrorMessage> {
@ -1131,7 +1128,7 @@ impl Demuxer for FlvDemux {
fn handle_buffer(
&mut self,
demuxer: &RsDemuxerWrapper,
demuxer: &RsDemuxer,
buffer: Option<gst::Buffer>,
) -> Result<HandleBufferResult, FlowError> {
if let Some(buffer) = buffer {
@ -1141,16 +1138,16 @@ impl Demuxer for FlvDemux {
self.update_state(demuxer)
}
fn end_of_stream(&mut self, demuxer: &RsDemuxerWrapper) -> Result<(), ErrorMessage> {
fn end_of_stream(&mut self, demuxer: &RsDemuxer) -> Result<(), ErrorMessage> {
// nothing to do here, all data we have left is incomplete
Ok(())
}
fn is_seekable(&self, demuxer: &RsDemuxerWrapper) -> bool {
fn is_seekable(&self, demuxer: &RsDemuxer) -> bool {
false
}
fn get_position(&self, demuxer: &RsDemuxerWrapper) -> Option<u64> {
fn get_position(&self, demuxer: &RsDemuxer) -> Option<u64> {
if let Some(StreamingState { last_position, .. }) = self.streaming_state {
return last_position;
}
@ -1158,7 +1155,7 @@ impl Demuxer for FlvDemux {
None
}
fn get_duration(&self, demuxer: &RsDemuxerWrapper) -> Option<u64> {
fn get_duration(&self, demuxer: &RsDemuxer) -> Option<u64> {
if let Some(StreamingState {
metadata: Some(Metadata { duration, .. }),
..

View file

@ -10,6 +10,7 @@ libc = "0.2"
url = "1.1"
lazy_static = "0.2"
byteorder = "1.0"
mopa = "0.2"
glib-sys = { git = "https://github.com/gtk-rs/sys" }
gobject-sys = { git = "https://github.com/gtk-rs/sys" }
gstreamer-sys = { git = "https://github.com/sdroege/gstreamer-sys", features = ["v1_10"] }

File diff suppressed because it is too large Load diff

305
gst-plugin/src/element.rs Normal file
View file

@ -0,0 +1,305 @@
// Copyright (C) 2017 Sebastian Dröge <sebastian@centricular.com>
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use std::ffi::CString;
use std::ptr;
use std::mem;
use std::sync::atomic::AtomicBool;
use mopa;
use glib_ffi;
use gobject_ffi;
use gst_ffi;
use glib;
use glib::translate::*;
use gst;
use gst::prelude::*;
pub trait ElementImpl: mopa::Any + Send + Sync + 'static {
fn change_state(
&self,
element: &RsElement,
transition: gst::StateChange,
) -> gst::StateChangeReturn {
element.parent_change_state(transition)
}
}
mopafy!(ElementImpl);
pub unsafe trait Element: IsA<gst::Element> {
fn parent_change_state(&self, transition: gst::StateChange) -> gst::StateChangeReturn {
unsafe {
// Our class
let klass = *(self.to_glib_none().0 as *const glib_ffi::gpointer);
// The parent class, RsElement or any other first-level Rust implementation
let parent_klass = gobject_ffi::g_type_class_peek_parent(klass);
// The actual parent class as defined in C
let parent_klass = &*(gobject_ffi::g_type_class_peek_parent(parent_klass) as
*const gst_ffi::GstElementClass);
parent_klass
.change_state
.map(|f| {
from_glib(f(self.to_glib_none().0, transition.to_glib()))
})
.unwrap_or(gst::StateChangeReturn::Success)
}
}
}
pub unsafe trait ElementClass {
fn add_pad_template(&mut self, pad_template: gst::PadTemplate) {
unsafe {
gst_ffi::gst_element_class_add_pad_template(
self as *const Self as *mut gst_ffi::GstElementClass,
pad_template.to_glib_full(),
);
}
}
fn set_metadata(
&mut self,
long_name: &str,
classification: &str,
description: &str,
author: &str,
) {
unsafe {
gst_ffi::gst_element_class_set_metadata(
self as *const Self as *mut gst_ffi::GstElementClass,
long_name.to_glib_none().0,
classification.to_glib_none().0,
description.to_glib_none().0,
author.to_glib_none().0,
);
}
}
}
glib_wrapper! {
pub struct RsElement(Object<ffi::RsElement>): [gst::Element => gst_ffi::GstElement,
gst::Object => gst_ffi::GstObject];
match fn {
get_type => || ffi::rs_element_get_type(),
}
}
impl RsElement {
pub fn get_impl(&self) -> &ElementImpl {
unsafe {
let stash = self.to_glib_none();
let ptr: *const ffi::RsElement = stash.0;
(*ptr).get_impl()
}
}
}
unsafe impl<T: IsA<gst::Element>> Element for T {}
unsafe impl ElementClass for RsElementClass {}
struct ElementData {
class_init: Box<Fn(&mut RsElementClass) + Send + 'static>,
init: Box<Fn(&RsElement) -> Box<ElementImpl> + Send + Sync + 'static>,
}
pub mod ffi {
use super::*;
use super::RsElement as RsElementWrapper;
#[repr(C)]
pub struct RsElement {
parent: gst_ffi::GstElement,
imp: *const Box<ElementImpl>,
panicked: AtomicBool,
}
impl RsElement {
pub fn get_impl(&self) -> &ElementImpl {
unsafe {
assert!(!self.imp.is_null());
&*(*self.imp)
}
}
}
#[repr(C)]
pub struct RsElementClass {
parent_class: gst_ffi::GstElementClass,
element_data: *const ElementData,
}
pub unsafe fn rs_element_get_type() -> glib_ffi::GType {
use std::sync::{Once, ONCE_INIT};
static mut TYPE: glib_ffi::GType = gobject_ffi::G_TYPE_INVALID;
static ONCE: Once = ONCE_INIT;
ONCE.call_once(|| {
let type_info = gobject_ffi::GTypeInfo {
class_size: mem::size_of::<RsElementClass>() as u16,
base_init: None,
base_finalize: None,
class_init: Some(element_class_init),
class_finalize: None,
class_data: ptr::null_mut(),
instance_size: mem::size_of::<RsElement>() as u16,
n_preallocs: 0,
instance_init: None,
value_table: ptr::null(),
};
let type_name = {
let mut idx = 0;
loop {
let type_name = CString::new(format!("RsElement-{}", idx)).unwrap();
if gobject_ffi::g_type_from_name(type_name.as_ptr()) ==
gobject_ffi::G_TYPE_INVALID
{
break type_name;
}
idx += 1;
}
};
TYPE = gobject_ffi::g_type_register_static(
gst_ffi::gst_element_get_type(),
type_name.as_ptr(),
&type_info,
gobject_ffi::GTypeFlags::empty(),
);
});
TYPE
}
unsafe extern "C" fn element_finalize(obj: *mut gobject_ffi::GObject) {
callback_guard!();
let element = &mut *(obj as *mut RsElement);
drop(Box::from_raw(element.imp as *mut Box<ElementImpl>));
element.imp = ptr::null_mut();
let klass = *(obj as *const glib_ffi::gpointer);
let parent_klass = gobject_ffi::g_type_class_peek_parent(klass);
let parent_klass = &*(gobject_ffi::g_type_class_peek_parent(parent_klass) as
*const gobject_ffi::GObjectClass);
parent_klass.finalize.map(|f| f(obj));
}
unsafe extern "C" fn element_sub_class_init(
klass: glib_ffi::gpointer,
klass_data: glib_ffi::gpointer,
) {
callback_guard!();
let element_data = &*(klass_data as *const ElementData);
{
let klass = &mut *(klass as *mut RsElementClass);
klass.element_data = element_data;
(element_data.class_init)(klass);
}
}
unsafe extern "C" fn element_class_init(
klass: glib_ffi::gpointer,
_klass_data: glib_ffi::gpointer,
) {
callback_guard!();
{
let gobject_klass = &mut *(klass as *mut gobject_ffi::GObjectClass);
gobject_klass.finalize = Some(element_finalize);
}
{
let element_klass = &mut *(klass as *mut gst_ffi::GstElementClass);
element_klass.change_state = Some(element_change_state);
}
}
unsafe extern "C" fn element_change_state(
ptr: *mut gst_ffi::GstElement,
transition: gst_ffi::GstStateChange,
) -> gst_ffi::GstStateChangeReturn {
callback_guard!();
let element = &*(ptr as *mut RsElement);
let wrap: RsElementWrapper = from_glib_borrow(ptr as *mut RsElement);
let imp = &*element.imp;
panic_to_error2!(&wrap, &element.panicked, gst::StateChangeReturn::Failure, {
imp.change_state(&wrap, from_glib(transition))
}).to_glib()
}
unsafe extern "C" fn element_sub_init(
instance: *mut gobject_ffi::GTypeInstance,
klass: glib_ffi::gpointer,
) {
callback_guard!();
let element = &mut *(instance as *mut RsElement);
let wrap: RsElementWrapper = from_glib_borrow(instance as *mut RsElement);
let klass = &*(klass as *const RsElementClass);
let element_data = &*klass.element_data;
let imp = (element_data.init)(&wrap);
element.imp = Box::into_raw(Box::new(imp));
}
pub fn element_register<F, G>(
plugin: &gst::Plugin,
name: &str,
rank: u32,
class_init: F,
init: G,
) where
F: Fn(&mut RsElementClass) + Send + 'static,
G: Fn(&RsElementWrapper) -> Box<ElementImpl> + Send + Sync + 'static,
{
unsafe {
let parent_type = rs_element_get_type();
let type_name = format!("RsElement-{}", name);
let element_data = ElementData {
class_init: Box::new(class_init),
init: Box::new(init),
};
let element_data = Box::into_raw(Box::new(element_data)) as glib_ffi::gpointer;
let type_info = gobject_ffi::GTypeInfo {
class_size: mem::size_of::<RsElementClass>() as u16,
base_init: None,
base_finalize: None,
class_init: Some(element_sub_class_init),
class_finalize: None,
class_data: element_data,
instance_size: mem::size_of::<RsElement>() as u16,
n_preallocs: 0,
instance_init: Some(element_sub_init),
value_table: ptr::null(),
};
let type_ = gobject_ffi::g_type_register_static(
parent_type,
type_name.to_glib_none().0,
&type_info,
gobject_ffi::GTypeFlags::empty(),
);
gst::Element::register(plugin, &name, rank, from_glib(type_));
}
}
}
pub use self::ffi::RsElementClass;
pub use self::ffi::element_register;

View file

@ -243,3 +243,34 @@ macro_rules! panic_to_error(
}
}};
);
#[macro_export]
macro_rules! panic_to_error2(
($element:expr, $panicked:expr, $ret:expr, $code:block) => {{
use std::panic::{self, AssertUnwindSafe};
use std::sync::atomic::Ordering;
use $crate::error::ErrorMessage;
if $panicked.load(Ordering::Relaxed) {
error_msg!(gst::LibraryError::Failed, ["Panicked"]).post($element);
$ret
} else {
let result = panic::catch_unwind(AssertUnwindSafe(|| $code));
match result {
Ok(result) => result,
Err(err) => {
$panicked.store(true, Ordering::Relaxed);
if let Some(cause) = err.downcast_ref::<&str>() {
error_msg!(gst::LibraryError::Failed, ["Panicked: {}", cause]).post($element);
} else if let Some(cause) = err.downcast_ref::<String>() {
error_msg!(gst::LibraryError::Failed, ["Panicked: {}", cause]).post($element);
} else {
error_msg!(gst::LibraryError::Failed, ["Panicked"]).post($element);
}
$ret
}
}
}
}};
);

View file

@ -12,6 +12,8 @@ extern crate gstreamer_base_sys as gst_base_ffi;
#[macro_use]
extern crate lazy_static;
extern crate libc;
#[macro_use]
extern crate mopa;
extern crate url;
pub extern crate glib_sys as glib_ffi;
pub extern crate gobject_sys as gobject_ffi;
@ -23,6 +25,12 @@ pub extern crate glib;
#[macro_use]
pub extern crate gstreamer as gst;
macro_rules! callback_guard {
() => (
let _guard = ::glib::CallbackGuard::new();
)
}
#[macro_use]
pub mod utils;
#[macro_use]
@ -34,3 +42,5 @@ pub mod source;
pub mod sink;
pub mod demuxer;
pub mod bytes;
pub mod element;

View file

@ -40,7 +40,7 @@ pub struct SinkWrapper {
panicked: AtomicBool,
}
pub trait Sink {
pub trait Sink: Send + 'static {
fn uri_validator(&self) -> Box<UriValidator>;
fn start(&mut self, sink: &RsSinkWrapper, uri: Url) -> Result<(), ErrorMessage>;

View file

@ -40,7 +40,7 @@ pub struct SourceWrapper {
panicked: AtomicBool,
}
pub trait Source {
pub trait Source: Send + 'static {
fn uri_validator(&self) -> Box<UriValidator>;
fn is_seekable(&self, src: &RsSrcWrapper) -> bool;