Port threadshare plugin to new subclassing API

This commit is contained in:
Sebastian Dröge 2018-12-06 13:03:04 +02:00
parent 4d87c11293
commit e64a9b4a1a
7 changed files with 1088 additions and 990 deletions

View file

@ -12,10 +12,8 @@ gio-sys = { git = "https://github.com/gtk-rs/sys" }
gstreamer-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys" }
glib = { git = "https://github.com/gtk-rs/glib", features = ["subclassing"] }
gio = { git = "https://github.com/gtk-rs/gio" }
gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["subclassing"] }
gstreamer-check = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gobject-subclass = { git = "https://github.com/gtk-rs/gobject-subclass" }
gst-plugin = { path = "../gst-plugin" }
tokio = "0.1"
tokio-reactor = "0.1"
tokio-executor = "0.1"

View file

@ -17,11 +17,11 @@
use glib;
use glib::prelude::*;
use glib::subclass;
use glib::subclass::prelude::*;
use gst;
use gst::prelude::*;
use gobject_subclass::object::*;
use gst_plugin::element::*;
use gst::subclass::prelude::*;
use std::sync::Mutex;
use std::u32;
@ -63,44 +63,56 @@ impl Default for Settings {
}
}
static PROPERTIES: [Property; 5] = [
Property::String(
"context",
"Context",
"Context name to share threads with",
Some(DEFAULT_CONTEXT),
PropertyMutability::ReadWrite,
),
Property::UInt(
"context-wait",
"Context Wait",
"Throttle poll loop to run at most once every this many ms",
(0, 1000),
DEFAULT_CONTEXT_WAIT,
PropertyMutability::ReadWrite,
),
Property::UInt(
"max-buffers",
"Max Buffers",
"Maximum number of buffers to queue up",
(1, u32::MAX),
DEFAULT_MAX_BUFFERS,
PropertyMutability::ReadWrite,
),
Property::Boxed(
"caps",
"Caps",
"Caps to use",
gst::Caps::static_type,
PropertyMutability::ReadWrite,
),
Property::Boolean(
"do-timestamp",
"Do Timestamp",
"Timestamp buffers with the current running time on arrival",
DEFAULT_DO_TIMESTAMP,
PropertyMutability::ReadWrite,
),
static PROPERTIES: [subclass::Property; 5] = [
subclass::Property("context", || {
glib::ParamSpec::string(
"context",
"Context",
"Context name to share threads with",
Some(DEFAULT_CONTEXT),
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("context-wait", || {
glib::ParamSpec::uint(
"context-wait",
"Context Wait",
"Throttle poll loop to run at most once every this many ms",
0,
1000,
DEFAULT_CONTEXT_WAIT,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("max-buffers", || {
glib::ParamSpec::uint(
"max-buffers",
"Max Buffers",
"Maximum number of buffers to queue up",
1,
u32::MAX,
DEFAULT_MAX_BUFFERS,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("caps", || {
glib::ParamSpec::boxed(
"caps",
"Caps",
"Caps to use",
gst::Caps::static_type(),
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("do-timestamp", || {
glib::ParamSpec::boolean(
"do-timestamp",
"Do Timestamp",
"Timestamp buffers with the current running time on arrival",
DEFAULT_DO_TIMESTAMP,
glib::ParamFlags::READWRITE,
)
}),
];
struct State {
@ -133,88 +145,6 @@ struct AppSrc {
}
impl AppSrc {
fn class_init(klass: &mut ElementClass) {
klass.set_metadata(
"Thread-sharing app source",
"Source/Generic",
"Thread-sharing app source",
"Sebastian Dröge <sebastian@centricular.com>",
);
let caps = gst::Caps::new_any();
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
);
klass.add_pad_template(src_pad_template);
klass.install_properties(&PROPERTIES);
klass.add_action_signal(
"push-buffer",
&[gst::Buffer::static_type()],
bool::static_type(),
|args| {
let element = args[0]
.get::<gst::Element>()
.unwrap()
.downcast::<Element>()
.unwrap();
let buffer = args[1].get::<gst::Buffer>().unwrap();
let appsrc = element.get_impl().downcast_ref::<AppSrc>().unwrap();
Some(appsrc.push_buffer(&element, buffer).to_value())
},
);
klass.add_action_signal("end-of-stream", &[], bool::static_type(), |args| {
let element = args[0]
.get::<gst::Element>()
.unwrap()
.downcast::<Element>()
.unwrap();
let appsrc = element.get_impl().downcast_ref::<AppSrc>().unwrap();
Some(appsrc.end_of_stream(&element).to_value())
});
}
fn init(element: &Element) -> Box<ElementImpl<Element>> {
let templ = element.get_pad_template("src").unwrap();
let src_pad = gst::Pad::new_from_template(&templ, "src");
src_pad.set_event_function(|pad, parent, event| {
AppSrc::catch_panic_pad_function(
parent,
|| false,
|queue, element| queue.src_event(pad, element, event),
)
});
src_pad.set_query_function(|pad, parent, query| {
AppSrc::catch_panic_pad_function(
parent,
|| false,
|queue, element| queue.src_query(pad, element, query),
)
});
element.add_pad(&src_pad).unwrap();
::set_element_flags(element, gst::ElementFlags::SOURCE);
Box::new(Self {
cat: gst::DebugCategory::new(
"ts-appsrc",
gst::DebugColorFlags::empty(),
"Thread-sharing app source",
),
src_pad: src_pad,
state: Mutex::new(State::default()),
settings: Mutex::new(Settings::default()),
})
}
fn create_io_context_event(state: &State) -> Option<gst::Event> {
if let (&Some(ref pending_future_id), &Some(ref io_context)) =
(&state.pending_future_id, &state.io_context)
@ -232,7 +162,7 @@ impl AppSrc {
}
}
fn src_event(&self, pad: &gst::Pad, element: &Element, event: gst::Event) -> bool {
fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
@ -265,7 +195,12 @@ impl AppSrc {
ret
}
fn src_query(&self, pad: &gst::Pad, _element: &Element, query: &mut gst::QueryRef) -> bool {
fn src_query(
&self,
pad: &gst::Pad,
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
use gst::QueryView;
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
@ -306,7 +241,7 @@ impl AppSrc {
ret
}
fn push_buffer(&self, element: &Element, mut buffer: gst::Buffer) -> bool {
fn push_buffer(&self, element: &gst::Element, mut buffer: gst::Buffer) -> bool {
let settings = self.settings.lock().unwrap().clone();
if settings.do_timestamp {
@ -337,7 +272,7 @@ impl AppSrc {
}
}
fn end_of_stream(&self, element: &Element) -> bool {
fn end_of_stream(&self, element: &gst::Element) -> bool {
let mut state = self.state.lock().unwrap();
if let Some(ref mut channel) = state.channel {
match channel.try_send(Either::Right(gst::Event::new_eos().build())) {
@ -354,7 +289,7 @@ impl AppSrc {
fn push_item(
&self,
element: &Element,
element: &gst::Element,
item: Either<gst::Buffer, gst::Event>,
) -> future::Either<
Box<Future<Item = (), Error = ()> + Send + 'static>,
@ -449,7 +384,7 @@ impl AppSrc {
}
}
fn prepare(&self, element: &Element) -> Result<(), gst::ErrorMessage> {
fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Preparing");
let settings = self.settings.lock().unwrap().clone();
@ -480,7 +415,7 @@ impl AppSrc {
Ok(())
}
fn unprepare(&self, element: &Element) -> Result<(), ()> {
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Unpreparing");
let mut state = self.state.lock().unwrap();
@ -498,7 +433,7 @@ impl AppSrc {
Ok(())
}
fn start(&self, element: &Element) -> Result<(), ()> {
fn start(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Starting");
let settings = self.settings.lock().unwrap().clone();
let mut state = self.state.lock().unwrap();
@ -515,7 +450,7 @@ impl AppSrc {
let element_clone = element.clone();
let future = channel_receiver.for_each(move |item| {
let appsrc = element_clone.get_impl().downcast_ref::<AppSrc>().unwrap();
let appsrc = Self::from_instance(&element_clone);
appsrc.push_item(&element_clone, item)
});
io_context.spawn(future);
@ -526,7 +461,7 @@ impl AppSrc {
Ok(())
}
fn stop(&self, element: &Element) -> Result<(), ()> {
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Stopping");
let mut state = self.state.lock().unwrap();
@ -539,28 +474,114 @@ impl AppSrc {
}
}
impl ObjectImpl<Element> for AppSrc {
fn set_property(&self, _obj: &glib::Object, id: u32, value: &glib::Value) {
let prop = &PROPERTIES[id as usize];
impl ObjectSubclass for AppSrc {
const NAME: &'static str = "RsTsAppSrc";
type ParentType = gst::Element;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
klass.set_metadata(
"Thread-sharing app source",
"Source/Generic",
"Thread-sharing app source",
"Sebastian Dröge <sebastian@centricular.com>",
);
let caps = gst::Caps::new_any();
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
);
klass.add_pad_template(src_pad_template);
klass.install_properties(&PROPERTIES);
klass.add_action_signal(
"push-buffer",
&[gst::Buffer::static_type()],
bool::static_type(),
|args| {
let element = args[0].get::<gst::Element>().unwrap();
let buffer = args[1].get::<gst::Buffer>().unwrap();
let appsrc = Self::from_instance(&element);
Some(appsrc.push_buffer(&element, buffer).to_value())
},
);
klass.add_action_signal("end-of-stream", &[], bool::static_type(), |args| {
let element = args[0].get::<gst::Element>().unwrap();
let appsrc = Self::from_instance(&element);
Some(appsrc.end_of_stream(&element).to_value())
});
}
fn new() -> Self {
unreachable!()
}
fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
let templ = klass.get_pad_template("src").unwrap();
let src_pad = gst::Pad::new_from_template(&templ, "src");
src_pad.set_event_function(|pad, parent, event| {
AppSrc::catch_panic_pad_function(
parent,
|| false,
|queue, element| queue.src_event(pad, element, event),
)
});
src_pad.set_query_function(|pad, parent, query| {
AppSrc::catch_panic_pad_function(
parent,
|| false,
|queue, element| queue.src_query(pad, element, query),
)
});
Self {
cat: gst::DebugCategory::new(
"ts-appsrc",
gst::DebugColorFlags::empty(),
"Thread-sharing app source",
),
src_pad: src_pad,
state: Mutex::new(State::default()),
settings: Mutex::new(Settings::default()),
}
}
}
impl ObjectImpl for AppSrc {
glib_object_impl!();
fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
match *prop {
Property::String("context", ..) => {
subclass::Property("context", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.context = value.get().unwrap_or_else(|| "".into());
}
Property::UInt("context-wait", ..) => {
subclass::Property("context-wait", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.context_wait = value.get().unwrap();
}
Property::Boxed("caps", ..) => {
subclass::Property("caps", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.caps = value.get();
}
Property::UInt("max-buffers", ..) => {
subclass::Property("max-buffers", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.max_buffers = value.get().unwrap();
}
Property::Boolean("do-timestamp", ..) => {
subclass::Property("do-timestamp", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.do_timestamp = value.get().unwrap();
}
@ -568,39 +589,48 @@ impl ObjectImpl<Element> for AppSrc {
}
}
fn get_property(&self, _obj: &glib::Object, id: u32) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id as usize];
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
match *prop {
Property::String("context", ..) => {
subclass::Property("context", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.context.to_value())
}
Property::UInt("context-wait", ..) => {
subclass::Property("context-wait", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.context_wait.to_value())
}
Property::Boxed("caps", ..) => {
subclass::Property("caps", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.caps.to_value())
}
Property::UInt("max-buffers", ..) => {
subclass::Property("max-buffers", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.max_buffers.to_value())
}
Property::Boolean("do-timestamp", ..) => {
subclass::Property("do-timestamp", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.do_timestamp.to_value())
}
_ => unimplemented!(),
}
}
fn constructed(&self, obj: &glib::Object) {
self.parent_constructed(obj);
let element = obj.downcast_ref::<gst::Element>().unwrap();
element.add_pad(&self.src_pad).unwrap();
::set_element_flags(element, gst::ElementFlags::SOURCE);
}
}
impl ElementImpl<Element> for AppSrc {
impl ElementImpl for AppSrc {
fn change_state(
&self,
element: &Element,
element: &gst::Element,
transition: gst::StateChange,
) -> gst::StateChangeReturn {
gst_trace!(self.cat, obj: element, "Changing state {:?}", transition);
@ -624,7 +654,7 @@ impl ElementImpl<Element> for AppSrc {
_ => (),
}
let mut ret = element.parent_change_state(transition);
let mut ret = self.parent_change_state(element, transition);
if ret == gst::StateChangeReturn::Failure {
return ret;
}
@ -648,23 +678,6 @@ impl ElementImpl<Element> for AppSrc {
}
}
struct AppSrcStatic;
impl ImplTypeStatic<Element> for AppSrcStatic {
fn get_name(&self) -> &str {
"AppSrc"
}
fn new(&self, element: &Element) -> Box<ElementImpl<Element>> {
AppSrc::init(element)
}
fn class_init(&self, klass: &mut ElementClass) {
AppSrc::class_init(klass);
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
let type_ = register_type(AppSrcStatic);
gst::Element::register(plugin, "ts-appsrc", 0, type_)
gst::Element::register(plugin, "ts-appsrc", 0, AppSrc::get_type())
}

View file

@ -27,9 +27,6 @@ extern crate gstreamer_sys as gst_ffi;
extern crate gio;
#[macro_use]
extern crate glib;
extern crate gobject_subclass;
#[macro_use]
extern crate gst_plugin;
#[macro_use]
extern crate gstreamer as gst;
@ -73,16 +70,16 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
Ok(())
}
plugin_define!(
b"threadshare\0",
b"Threadshare Plugin\0",
gst_plugin_define!(
"threadshare",
"Threadshare Plugin",
plugin_init,
b"0.1.0\0",
b"LGPL\0",
b"threadshare\0",
b"threadshare\0",
b"https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs\0",
b"2018-03-01\0"
"0.1.0",
"LGPL",
"threadshare",
"threadshare",
"https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs",
"2018-03-01"
);
pub fn set_element_flags<T: glib::IsA<gst::Object> + glib::IsA<gst::Element>>(

View file

@ -17,11 +17,11 @@
use glib;
use glib::prelude::*;
use glib::subclass;
use glib::subclass::prelude::*;
use gst;
use gst::prelude::*;
use gobject_subclass::object::*;
use gst_plugin::element::*;
use gst::subclass::prelude::*;
use std::collections::HashMap;
use std::collections::VecDeque;
@ -87,62 +87,80 @@ impl Default for SettingsSrc {
}
}
static PROPERTIES_SRC: [Property; 6] = [
Property::UInt(
"max-size-buffers",
"Max Size Buffers",
"Maximum number of buffers to queue (0=unlimited)",
(0, u32::MAX),
DEFAULT_MAX_SIZE_BUFFERS,
PropertyMutability::ReadWrite,
),
Property::UInt(
"max-size-bytes",
"Max Size Bytes",
"Maximum number of bytes to queue (0=unlimited)",
(0, u32::MAX),
DEFAULT_MAX_SIZE_BYTES,
PropertyMutability::ReadWrite,
),
Property::UInt64(
"max-size-time",
"Max Size Time",
"Maximum number of nanoseconds to queue (0=unlimited)",
(0, u64::MAX - 1),
DEFAULT_MAX_SIZE_TIME,
PropertyMutability::ReadWrite,
),
Property::String(
"context",
"Context",
"Context name to share threads with",
Some(DEFAULT_CONTEXT),
PropertyMutability::ReadWrite,
),
Property::UInt(
"context-wait",
"Context Wait",
"Throttle poll loop to run at most once every this many ms",
(0, 1000),
DEFAULT_CONTEXT_WAIT,
PropertyMutability::ReadWrite,
),
Property::String(
static PROPERTIES_SRC: [subclass::Property; 6] = [
subclass::Property("max-size-buffers", || {
glib::ParamSpec::uint(
"max-size-buffers",
"Max Size Buffers",
"Maximum number of buffers to queue (0=unlimited)",
0,
u32::MAX,
DEFAULT_MAX_SIZE_BUFFERS,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("max-size-bytes", || {
glib::ParamSpec::uint(
"max-size-bytes",
"Max Size Bytes",
"Maximum number of bytes to queue (0=unlimited)",
0,
u32::MAX,
DEFAULT_MAX_SIZE_BYTES,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("max-size-time", || {
glib::ParamSpec::uint64(
"max-size-time",
"Max Size Time",
"Maximum number of nanoseconds to queue (0=unlimited)",
0,
u64::MAX - 1,
DEFAULT_MAX_SIZE_TIME,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("context", || {
glib::ParamSpec::string(
"context",
"Context",
"Context name to share threads with",
Some(DEFAULT_CONTEXT),
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("context-wait", || {
glib::ParamSpec::uint(
"context-wait",
"Context Wait",
"Throttle poll loop to run at most once every this many ms",
0,
1000,
DEFAULT_CONTEXT_WAIT,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("proxy-context", || {
glib::ParamSpec::string(
"proxy-context",
"Proxy Context",
"Context name of the proxy to share with",
Some(DEFAULT_PROXY_CONTEXT),
glib::ParamFlags::READWRITE,
)
}),
];
static PROPERTIES_SINK: [subclass::Property; 1] = [subclass::Property("proxy-context", || {
glib::ParamSpec::string(
"proxy-context",
"Proxy Context",
"Context name of the proxy to share with",
Some(DEFAULT_PROXY_CONTEXT),
PropertyMutability::ReadWrite,
),
];
static PROPERTIES_SINK: [Property; 1] = [Property::String(
"proxy-context",
"Proxy Context",
"Context name of the proxy to share with",
Some(DEFAULT_PROXY_CONTEXT),
PropertyMutability::ReadWrite,
)];
glib::ParamFlags::READWRITE,
)
})];
// TODO: Refactor into a Sender and Receiver instead of the have_ booleans
@ -273,80 +291,10 @@ struct ProxySink {
}
impl ProxySink {
fn class_init(klass: &mut ElementClass) {
klass.set_metadata(
"Thread-sharing proxy sink",
"Sink/Generic",
"Thread-sharing proxy sink",
"Sebastian Dröge <sebastian@centricular.com>",
);
let caps = gst::Caps::new_any();
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&caps,
);
klass.add_pad_template(sink_pad_template);
klass.install_properties(&PROPERTIES_SINK);
}
fn init(element: &Element) -> Box<ElementImpl<Element>> {
let templ = element.get_pad_template("sink").unwrap();
let sink_pad = gst::Pad::new_from_template(&templ, "sink");
sink_pad.set_chain_function(|pad, parent, buffer| {
ProxySink::catch_panic_pad_function(
parent,
|| gst::FlowReturn::Error,
|queue, element| queue.sink_chain(pad, element, buffer),
)
});
sink_pad.set_chain_list_function(|pad, parent, list| {
ProxySink::catch_panic_pad_function(
parent,
|| gst::FlowReturn::Error,
|queue, element| queue.sink_chain_list(pad, element, list),
)
});
sink_pad.set_event_function(|pad, parent, event| {
ProxySink::catch_panic_pad_function(
parent,
|| false,
|queue, element| queue.sink_event(pad, element, event),
)
});
sink_pad.set_query_function(|pad, parent, query| {
ProxySink::catch_panic_pad_function(
parent,
|| false,
|queue, element| queue.sink_query(pad, element, query),
)
});
element.add_pad(&sink_pad).unwrap();
::set_element_flags(element, gst::ElementFlags::SINK);
Box::new(Self {
cat: gst::DebugCategory::new(
"ts-proxysink",
gst::DebugColorFlags::empty(),
"Thread-sharing proxy sink",
),
sink_pad: sink_pad,
state: Mutex::new(StateSink::default()),
settings: Mutex::new(SettingsSink::default()),
})
}
fn enqueue_item(
&self,
_pad: &gst::Pad,
element: &Element,
element: &gst::Element,
item: DataQueueItem,
) -> gst::FlowReturn {
let wait_future = {
@ -426,10 +374,7 @@ impl ProxySink {
let element_clone = element.clone();
let future = future::poll_fn(move || {
let sink = element_clone
.get_impl()
.downcast_ref::<ProxySink>()
.unwrap();
let sink = Self::from_instance(&element_clone);
let state = sink.state.lock().unwrap();
gst_log!(
@ -549,7 +494,7 @@ impl ProxySink {
fn sink_chain(
&self,
pad: &gst::Pad,
element: &Element,
element: &gst::Element,
buffer: gst::Buffer,
) -> gst::FlowReturn {
gst_log!(self.cat, obj: pad, "Handling buffer {:?}", buffer);
@ -559,14 +504,14 @@ impl ProxySink {
fn sink_chain_list(
&self,
pad: &gst::Pad,
element: &Element,
element: &gst::Element,
list: gst::BufferList,
) -> gst::FlowReturn {
gst_log!(self.cat, obj: pad, "Handling buffer list {:?}", list);
self.enqueue_item(pad, element, DataQueueItem::BufferList(list))
}
fn sink_event(&self, pad: &gst::Pad, element: &Element, event: gst::Event) -> bool {
fn sink_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
@ -612,13 +557,18 @@ impl ProxySink {
true
}
fn sink_query(&self, pad: &gst::Pad, element: &Element, query: &mut gst::QueryRef) -> bool {
fn sink_query(
&self,
pad: &gst::Pad,
element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
pad.query_default(element, query)
}
fn prepare(&self, element: &Element) -> Result<(), gst::ErrorMessage> {
fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Preparing");
let settings = self.settings.lock().unwrap().clone();
@ -640,7 +590,7 @@ impl ProxySink {
Ok(())
}
fn unprepare(&self, element: &Element) -> Result<(), ()> {
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Unpreparing");
let mut state = self.state.lock().unwrap();
@ -650,7 +600,7 @@ impl ProxySink {
Ok(())
}
fn start(&self, element: &Element) -> Result<(), ()> {
fn start(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Starting");
let state = self.state.lock().unwrap();
@ -662,7 +612,7 @@ impl ProxySink {
Ok(())
}
fn stop(&self, element: &Element) -> Result<(), ()> {
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Stopping");
let mut state = self.state.lock().unwrap();
@ -682,12 +632,93 @@ impl ProxySink {
}
}
impl ObjectImpl<Element> for ProxySink {
fn set_property(&self, _obj: &glib::Object, id: u32, value: &glib::Value) {
let prop = &PROPERTIES_SINK[id as usize];
impl ObjectSubclass for ProxySink {
const NAME: &'static str = "RsTsProxySink";
type ParentType = gst::Element;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
klass.set_metadata(
"Thread-sharing proxy sink",
"Sink/Generic",
"Thread-sharing proxy sink",
"Sebastian Dröge <sebastian@centricular.com>",
);
let caps = gst::Caps::new_any();
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&caps,
);
klass.add_pad_template(sink_pad_template);
klass.install_properties(&PROPERTIES_SINK);
}
fn new() -> Self {
unreachable!()
}
fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
let templ = klass.get_pad_template("sink").unwrap();
let sink_pad = gst::Pad::new_from_template(&templ, "sink");
sink_pad.set_chain_function(|pad, parent, buffer| {
ProxySink::catch_panic_pad_function(
parent,
|| gst::FlowReturn::Error,
|queue, element| queue.sink_chain(pad, element, buffer),
)
});
sink_pad.set_chain_list_function(|pad, parent, list| {
ProxySink::catch_panic_pad_function(
parent,
|| gst::FlowReturn::Error,
|queue, element| queue.sink_chain_list(pad, element, list),
)
});
sink_pad.set_event_function(|pad, parent, event| {
ProxySink::catch_panic_pad_function(
parent,
|| false,
|queue, element| queue.sink_event(pad, element, event),
)
});
sink_pad.set_query_function(|pad, parent, query| {
ProxySink::catch_panic_pad_function(
parent,
|| false,
|queue, element| queue.sink_query(pad, element, query),
)
});
Self {
cat: gst::DebugCategory::new(
"ts-proxysink",
gst::DebugColorFlags::empty(),
"Thread-sharing proxy sink",
),
sink_pad: sink_pad,
state: Mutex::new(StateSink::default()),
settings: Mutex::new(SettingsSink::default()),
}
}
}
impl ObjectImpl for ProxySink {
glib_object_impl!();
fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
let prop = &PROPERTIES_SINK[id];
match *prop {
Property::String("proxy-context", ..) => {
subclass::Property("proxy-context", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.proxy_context = value.get().unwrap_or_else(|| "".into());
}
@ -695,23 +726,32 @@ impl ObjectImpl<Element> for ProxySink {
}
}
fn get_property(&self, _obj: &glib::Object, id: u32) -> Result<glib::Value, ()> {
let prop = &PROPERTIES_SINK[id as usize];
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES_SINK[id];
match *prop {
Property::String("proxy-context", ..) => {
subclass::Property("proxy-context", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.proxy_context.to_value())
}
_ => unimplemented!(),
}
}
fn constructed(&self, obj: &glib::Object) {
self.parent_constructed(obj);
let element = obj.downcast_ref::<gst::Element>().unwrap();
element.add_pad(&self.sink_pad).unwrap();
::set_element_flags(element, gst::ElementFlags::SINK);
}
}
impl ElementImpl<Element> for ProxySink {
impl ElementImpl for ProxySink {
fn change_state(
&self,
element: &Element,
element: &gst::Element,
transition: gst::StateChange,
) -> gst::StateChangeReturn {
gst_trace!(self.cat, obj: element, "Changing state {:?}", transition);
@ -735,7 +775,7 @@ impl ElementImpl<Element> for ProxySink {
_ => (),
}
let ret = element.parent_change_state(transition);
let ret = self.parent_change_state(element, transition);
if ret == gst::StateChangeReturn::Failure {
return ret;
}
@ -760,71 +800,6 @@ struct ProxySrc {
}
impl ProxySrc {
fn class_init(klass: &mut ElementClass) {
klass.set_metadata(
"Thread-sharing proxy source",
"Source/Generic",
"Thread-sharing proxy source",
"Sebastian Dröge <sebastian@centricular.com>",
);
let caps = gst::Caps::new_any();
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
);
klass.add_pad_template(src_pad_template);
klass.install_properties(&PROPERTIES_SRC);
}
fn init(element: &Element) -> Box<ElementImpl<Element>> {
let templ = element.get_pad_template("src").unwrap();
let src_pad = gst::Pad::new_from_template(&templ, "src");
src_pad.set_event_function(|pad, parent, event| {
ProxySrc::catch_panic_pad_function(
parent,
|| false,
|queue, element| queue.src_event(pad, element, event),
)
});
src_pad.set_query_function(|pad, parent, query| {
ProxySrc::catch_panic_pad_function(
parent,
|| false,
|queue, element| queue.src_query(pad, element, query),
)
});
element.add_pad(&src_pad).unwrap();
::set_element_flags(element, gst::ElementFlags::SOURCE);
Box::new(Self {
cat: gst::DebugCategory::new(
"ts-proxysrc",
gst::DebugColorFlags::empty(),
"Thread-sharing proxy source",
),
src_pad: src_pad,
state: Mutex::new(StateSrc::default()),
settings: Mutex::new(SettingsSrc::default()),
})
}
fn catch_panic_pad_function<T, F: FnOnce(&Self, &Element) -> T, G: FnOnce() -> T>(
parent: &Option<gst::Object>,
fallback: G,
f: F,
) -> T {
let element = parent.as_ref().unwrap().downcast_ref::<Element>().unwrap();
let src = element.get_impl().downcast_ref::<ProxySrc>().unwrap();
element.catch_panic(fallback, |element| f(src, element))
}
fn create_io_context_event(state: &StateSrc) -> Option<gst::Event> {
if let (&Some(ref pending_future_id), &Some(ref io_context)) =
(&state.pending_future_id, &state.io_context)
@ -842,7 +817,7 @@ impl ProxySrc {
}
}
fn src_event(&self, pad: &gst::Pad, element: &Element, event: gst::Event) -> bool {
fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
@ -875,7 +850,12 @@ impl ProxySrc {
ret
}
fn src_query(&self, pad: &gst::Pad, _element: &Element, query: &mut gst::QueryRef) -> bool {
fn src_query(
&self,
pad: &gst::Pad,
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
use gst::QueryView;
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
@ -917,7 +897,7 @@ impl ProxySrc {
fn push_item(
&self,
element: &Element,
element: &gst::Element,
item: DataQueueItem,
) -> future::Either<
Box<Future<Item = (), Error = gst::FlowError> + Send + 'static>,
@ -1045,7 +1025,7 @@ impl ProxySrc {
}
}
fn prepare(&self, element: &Element) -> Result<(), gst::ErrorMessage> {
fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Preparing");
let settings = self.settings.lock().unwrap().clone();
@ -1095,14 +1075,11 @@ impl ProxySrc {
.schedule(
&io_context,
move |item| {
let src = element_clone.get_impl().downcast_ref::<ProxySrc>().unwrap();
let src = Self::from_instance(&element_clone);
src.push_item(&element_clone, item)
},
move |err| {
let src = element_clone2
.get_impl()
.downcast_ref::<ProxySrc>()
.unwrap();
let src = Self::from_instance(&element_clone2);
gst_error!(src.cat, obj: &element_clone2, "Got error {}", err);
match err {
gst::FlowError::CustomError => (),
@ -1143,7 +1120,7 @@ impl ProxySrc {
Ok(())
}
fn unprepare(&self, element: &Element) -> Result<(), ()> {
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Unpreparing");
// FIXME: The IO Context has to be alive longer than the queue,
@ -1181,7 +1158,7 @@ impl ProxySrc {
Ok(())
}
fn start(&self, element: &Element) -> Result<(), ()> {
fn start(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Starting");
let state = self.state.lock().unwrap();
let queue = state.queue.as_ref().unwrap().0.lock().unwrap();
@ -1195,7 +1172,7 @@ impl ProxySrc {
Ok(())
}
fn stop(&self, element: &Element) -> Result<(), ()> {
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Stopping");
let state = self.state.lock().unwrap();
let mut queue = state.queue.as_ref().unwrap().0.lock().unwrap();
@ -1212,32 +1189,99 @@ impl ProxySrc {
}
}
impl ObjectImpl<Element> for ProxySrc {
fn set_property(&self, _obj: &glib::Object, id: u32, value: &glib::Value) {
let prop = &PROPERTIES_SRC[id as usize];
impl ObjectSubclass for ProxySrc {
const NAME: &'static str = "RsTsProxySrc";
type ParentType = gst::Element;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
klass.set_metadata(
"Thread-sharing proxy source",
"Source/Generic",
"Thread-sharing proxy source",
"Sebastian Dröge <sebastian@centricular.com>",
);
let caps = gst::Caps::new_any();
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
);
klass.add_pad_template(src_pad_template);
klass.install_properties(&PROPERTIES_SRC);
}
fn new() -> Self {
unreachable!()
}
fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
let templ = klass.get_pad_template("src").unwrap();
let src_pad = gst::Pad::new_from_template(&templ, "src");
src_pad.set_event_function(|pad, parent, event| {
ProxySrc::catch_panic_pad_function(
parent,
|| false,
|queue, element| queue.src_event(pad, element, event),
)
});
src_pad.set_query_function(|pad, parent, query| {
ProxySrc::catch_panic_pad_function(
parent,
|| false,
|queue, element| queue.src_query(pad, element, query),
)
});
Self {
cat: gst::DebugCategory::new(
"ts-proxysrc",
gst::DebugColorFlags::empty(),
"Thread-sharing proxy source",
),
src_pad: src_pad,
state: Mutex::new(StateSrc::default()),
settings: Mutex::new(SettingsSrc::default()),
}
}
}
impl ObjectImpl for ProxySrc {
glib_object_impl!();
fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
let prop = &PROPERTIES_SRC[id];
match *prop {
Property::UInt("max-size-buffers", ..) => {
subclass::Property("max-size-buffers", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.max_size_buffers = value.get().unwrap();
}
Property::UInt("max-size-bytes", ..) => {
subclass::Property("max-size-bytes", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.max_size_bytes = value.get().unwrap();
}
Property::UInt64("max-size-time", ..) => {
subclass::Property("max-size-time", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.max_size_time = value.get().unwrap();
}
Property::String("context", ..) => {
subclass::Property("context", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.context = value.get().unwrap_or_else(|| "".into());
}
Property::UInt("context-wait", ..) => {
subclass::Property("context-wait", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.context_wait = value.get().unwrap();
}
Property::String("proxy-context", ..) => {
subclass::Property("proxy-context", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.proxy_context = value.get().unwrap_or_else(|| "".into());
}
@ -1245,43 +1289,52 @@ impl ObjectImpl<Element> for ProxySrc {
}
}
fn get_property(&self, _obj: &glib::Object, id: u32) -> Result<glib::Value, ()> {
let prop = &PROPERTIES_SRC[id as usize];
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES_SRC[id];
match *prop {
Property::UInt("max-size-buffers", ..) => {
subclass::Property("max-size-buffers", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.max_size_buffers.to_value())
}
Property::UInt("max-size-bytes", ..) => {
subclass::Property("max-size-bytes", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.max_size_bytes.to_value())
}
Property::UInt64("max-size-time", ..) => {
subclass::Property("max-size-time", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.max_size_time.to_value())
}
Property::String("context", ..) => {
subclass::Property("context", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.context.to_value())
}
Property::UInt("context-wait", ..) => {
subclass::Property("context-wait", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.context_wait.to_value())
}
Property::String("proxy-context", ..) => {
subclass::Property("proxy-context", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.proxy_context.to_value())
}
_ => unimplemented!(),
}
}
fn constructed(&self, obj: &glib::Object) {
self.parent_constructed(obj);
let element = obj.downcast_ref::<gst::Element>().unwrap();
element.add_pad(&self.src_pad).unwrap();
::set_element_flags(element, gst::ElementFlags::SOURCE);
}
}
impl ElementImpl<Element> for ProxySrc {
impl ElementImpl for ProxySrc {
fn change_state(
&self,
element: &Element,
element: &gst::Element,
transition: gst::StateChange,
) -> gst::StateChangeReturn {
gst_trace!(self.cat, obj: element, "Changing state {:?}", transition);
@ -1305,7 +1358,7 @@ impl ElementImpl<Element> for ProxySrc {
_ => (),
}
let mut ret = element.parent_change_state(transition);
let mut ret = self.parent_change_state(element, transition);
if ret == gst::StateChangeReturn::Failure {
return ret;
}
@ -1325,42 +1378,7 @@ impl ElementImpl<Element> for ProxySrc {
}
}
struct ProxySinkStatic;
impl ImplTypeStatic<Element> for ProxySinkStatic {
fn get_name(&self) -> &str {
"ProxySink"
}
fn new(&self, element: &Element) -> Box<ElementImpl<Element>> {
ProxySink::init(element)
}
fn class_init(&self, klass: &mut ElementClass) {
ProxySink::class_init(klass);
}
}
struct ProxySrcStatic;
impl ImplTypeStatic<Element> for ProxySrcStatic {
fn get_name(&self) -> &str {
"ProxySrc"
}
fn new(&self, element: &Element) -> Box<ElementImpl<Element>> {
ProxySrc::init(element)
}
fn class_init(&self, klass: &mut ElementClass) {
ProxySrc::class_init(klass);
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
let type_ = register_type(ProxySinkStatic);
gst::Element::register(plugin, "ts-proxysink", 0, type_)?;
let type_ = register_type(ProxySrcStatic);
gst::Element::register(plugin, "ts-proxysrc", 0, type_)
gst::Element::register(plugin, "ts-proxysink", 0, ProxySink::get_type())?;
gst::Element::register(plugin, "ts-proxysrc", 0, ProxySrc::get_type())
}

View file

@ -17,11 +17,11 @@
use glib;
use glib::prelude::*;
use glib::subclass;
use glib::subclass::prelude::*;
use gst;
use gst::prelude::*;
use gobject_subclass::object::*;
use gst_plugin::element::*;
use gst::subclass::prelude::*;
use std::collections::VecDeque;
use std::sync::Mutex;
@ -64,46 +64,60 @@ impl Default for Settings {
}
}
static PROPERTIES: [Property; 5] = [
Property::UInt(
"max-size-buffers",
"Max Size Buffers",
"Maximum number of buffers to queue (0=unlimited)",
(0, u32::MAX),
DEFAULT_MAX_SIZE_BUFFERS,
PropertyMutability::ReadWrite,
),
Property::UInt(
"max-size-bytes",
"Max Size Bytes",
"Maximum number of bytes to queue (0=unlimited)",
(0, u32::MAX),
DEFAULT_MAX_SIZE_BYTES,
PropertyMutability::ReadWrite,
),
Property::UInt64(
"max-size-time",
"Max Size Time",
"Maximum number of nanoseconds to queue (0=unlimited)",
(0, u64::MAX - 1),
DEFAULT_MAX_SIZE_TIME,
PropertyMutability::ReadWrite,
),
Property::String(
"context",
"Context",
"Context name to share threads with",
Some(DEFAULT_CONTEXT),
PropertyMutability::ReadWrite,
),
Property::UInt(
"context-wait",
"Context Wait",
"Throttle poll loop to run at most once every this many ms",
(0, 1000),
DEFAULT_CONTEXT_WAIT,
PropertyMutability::ReadWrite,
),
static PROPERTIES: [subclass::Property; 5] = [
subclass::Property("max-size-buffers", || {
glib::ParamSpec::uint(
"max-size-buffers",
"Max Size Buffers",
"Maximum number of buffers to queue (0=unlimited)",
0,
u32::MAX,
DEFAULT_MAX_SIZE_BUFFERS,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("max-size-bytes", || {
glib::ParamSpec::uint(
"max-size-bytes",
"Max Size Bytes",
"Maximum number of bytes to queue (0=unlimited)",
0,
u32::MAX,
DEFAULT_MAX_SIZE_BYTES,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("max-size-time", || {
glib::ParamSpec::uint64(
"max-size-time",
"Max Size Time",
"Maximum number of nanoseconds to queue (0=unlimited)",
0,
u64::MAX - 1,
DEFAULT_MAX_SIZE_TIME,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("context", || {
glib::ParamSpec::string(
"context",
"Context",
"Context name to share threads with",
Some(DEFAULT_CONTEXT),
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("context-wait", || {
glib::ParamSpec::uint(
"context-wait",
"Context Wait",
"Throttle poll loop to run at most once every this many ms",
0,
1000,
DEFAULT_CONTEXT_WAIT,
glib::ParamFlags::READWRITE,
)
}),
];
struct State {
@ -141,100 +155,6 @@ struct Queue {
}
impl Queue {
fn class_init(klass: &mut ElementClass) {
klass.set_metadata(
"Thread-sharing queue",
"Generic",
"Simple data queue",
"Sebastian Dröge <sebastian@centricular.com>",
);
let caps = gst::Caps::new_any();
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&caps,
);
klass.add_pad_template(sink_pad_template);
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
);
klass.add_pad_template(src_pad_template);
klass.install_properties(&PROPERTIES);
}
fn init(element: &Element) -> Box<ElementImpl<Element>> {
let templ = element.get_pad_template("sink").unwrap();
let sink_pad = gst::Pad::new_from_template(&templ, "sink");
let templ = element.get_pad_template("src").unwrap();
let src_pad = gst::Pad::new_from_template(&templ, "src");
sink_pad.set_chain_function(|pad, parent, buffer| {
Queue::catch_panic_pad_function(
parent,
|| gst::FlowReturn::Error,
|queue, element| queue.sink_chain(pad, element, buffer),
)
});
sink_pad.set_chain_list_function(|pad, parent, list| {
Queue::catch_panic_pad_function(
parent,
|| gst::FlowReturn::Error,
|queue, element| queue.sink_chain_list(pad, element, list),
)
});
sink_pad.set_event_function(|pad, parent, event| {
Queue::catch_panic_pad_function(
parent,
|| false,
|queue, element| queue.sink_event(pad, element, event),
)
});
sink_pad.set_query_function(|pad, parent, query| {
Queue::catch_panic_pad_function(
parent,
|| false,
|queue, element| queue.sink_query(pad, element, query),
)
});
src_pad.set_event_function(|pad, parent, event| {
Queue::catch_panic_pad_function(
parent,
|| false,
|queue, element| queue.src_event(pad, element, event),
)
});
src_pad.set_query_function(|pad, parent, query| {
Queue::catch_panic_pad_function(
parent,
|| false,
|queue, element| queue.src_query(pad, element, query),
)
});
element.add_pad(&sink_pad).unwrap();
element.add_pad(&src_pad).unwrap();
Box::new(Self {
cat: gst::DebugCategory::new(
"ts-queue",
gst::DebugColorFlags::empty(),
"Thread-sharing queue",
),
sink_pad: sink_pad,
src_pad: src_pad,
state: Mutex::new(State::default()),
settings: Mutex::new(Settings::default()),
})
}
fn create_io_context_event(state: &State) -> Option<gst::Event> {
if let (&Some(ref pending_future_id), &Some(ref io_context)) =
(&state.pending_future_id, &state.io_context)
@ -255,7 +175,7 @@ impl Queue {
fn enqueue_item(
&self,
_pad: &gst::Pad,
element: &Element,
element: &gst::Element,
item: DataQueueItem,
) -> gst::FlowReturn {
let wait_future = {
@ -325,7 +245,7 @@ impl Queue {
let element_clone = element.clone();
let future = future::poll_fn(move || {
let queue = element_clone.get_impl().downcast_ref::<Queue>().unwrap();
let queue = Self::from_instance(&element_clone);
let mut state = queue.state.lock().unwrap();
let State {
@ -430,7 +350,7 @@ impl Queue {
fn sink_chain(
&self,
pad: &gst::Pad,
element: &Element,
element: &gst::Element,
buffer: gst::Buffer,
) -> gst::FlowReturn {
gst_log!(self.cat, obj: pad, "Handling buffer {:?}", buffer);
@ -440,14 +360,14 @@ impl Queue {
fn sink_chain_list(
&self,
pad: &gst::Pad,
element: &Element,
element: &gst::Element,
list: gst::BufferList,
) -> gst::FlowReturn {
gst_log!(self.cat, obj: pad, "Handling buffer list {:?}", list);
self.enqueue_item(pad, element, DataQueueItem::BufferList(list))
}
fn sink_event(&self, pad: &gst::Pad, element: &Element, mut event: gst::Event) -> bool {
fn sink_event(&self, pad: &gst::Pad, element: &gst::Element, mut event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
@ -505,7 +425,12 @@ impl Queue {
}
}
fn sink_query(&self, pad: &gst::Pad, _element: &Element, query: &mut gst::QueryRef) -> bool {
fn sink_query(
&self,
pad: &gst::Pad,
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
if query.is_serialized() {
@ -518,7 +443,7 @@ impl Queue {
}
}
fn src_event(&self, pad: &gst::Pad, element: &Element, event: gst::Event) -> bool {
fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
@ -542,7 +467,12 @@ impl Queue {
self.sink_pad.push_event(event)
}
fn src_query(&self, pad: &gst::Pad, _element: &Element, query: &mut gst::QueryRef) -> bool {
fn src_query(
&self,
pad: &gst::Pad,
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
use gst::QueryView;
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
@ -578,7 +508,7 @@ impl Queue {
fn push_item(
&self,
element: &Element,
element: &gst::Element,
item: DataQueueItem,
) -> future::Either<
Box<Future<Item = (), Error = gst::FlowError> + Send + 'static>,
@ -679,7 +609,7 @@ impl Queue {
}
}
fn prepare(&self, element: &Element) -> Result<(), gst::ErrorMessage> {
fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Preparing");
let settings = self.settings.lock().unwrap().clone();
@ -719,11 +649,11 @@ impl Queue {
.schedule(
&io_context,
move |item| {
let queue = element_clone.get_impl().downcast_ref::<Queue>().unwrap();
let queue = Self::from_instance(&element_clone);
queue.push_item(&element_clone, item)
},
move |err| {
let queue = element_clone2.get_impl().downcast_ref::<Queue>().unwrap();
let queue = Self::from_instance(&element_clone2);
gst_error!(queue.cat, obj: &element_clone2, "Got error {}", err);
match err {
gst::FlowError::CustomError => (),
@ -762,7 +692,7 @@ impl Queue {
Ok(())
}
fn unprepare(&self, element: &Element) -> Result<(), ()> {
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Unpreparing");
// FIXME: The IO Context has to be alive longer than the queue,
@ -792,7 +722,7 @@ impl Queue {
Ok(())
}
fn start(&self, element: &Element) -> Result<(), ()> {
fn start(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Starting");
let mut state = self.state.lock().unwrap();
@ -806,7 +736,7 @@ impl Queue {
Ok(())
}
fn stop(&self, element: &Element) -> Result<(), ()> {
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Stopping");
let mut state = self.state.lock().unwrap();
@ -826,28 +756,135 @@ impl Queue {
}
}
impl ObjectImpl<Element> for Queue {
fn set_property(&self, _obj: &glib::Object, id: u32, value: &glib::Value) {
let prop = &PROPERTIES[id as usize];
impl ObjectSubclass for Queue {
const NAME: &'static str = "RsTsQueue";
type ParentType = gst::Element;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
klass.set_metadata(
"Thread-sharing queue",
"Generic",
"Simple data queue",
"Sebastian Dröge <sebastian@centricular.com>",
);
let caps = gst::Caps::new_any();
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&caps,
);
klass.add_pad_template(sink_pad_template);
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
);
klass.add_pad_template(src_pad_template);
klass.install_properties(&PROPERTIES);
}
fn new() -> Self {
unreachable!()
}
fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
let templ = klass.get_pad_template("sink").unwrap();
let sink_pad = gst::Pad::new_from_template(&templ, "sink");
let templ = klass.get_pad_template("src").unwrap();
let src_pad = gst::Pad::new_from_template(&templ, "src");
sink_pad.set_chain_function(|pad, parent, buffer| {
Queue::catch_panic_pad_function(
parent,
|| gst::FlowReturn::Error,
|queue, element| queue.sink_chain(pad, element, buffer),
)
});
sink_pad.set_chain_list_function(|pad, parent, list| {
Queue::catch_panic_pad_function(
parent,
|| gst::FlowReturn::Error,
|queue, element| queue.sink_chain_list(pad, element, list),
)
});
sink_pad.set_event_function(|pad, parent, event| {
Queue::catch_panic_pad_function(
parent,
|| false,
|queue, element| queue.sink_event(pad, element, event),
)
});
sink_pad.set_query_function(|pad, parent, query| {
Queue::catch_panic_pad_function(
parent,
|| false,
|queue, element| queue.sink_query(pad, element, query),
)
});
src_pad.set_event_function(|pad, parent, event| {
Queue::catch_panic_pad_function(
parent,
|| false,
|queue, element| queue.src_event(pad, element, event),
)
});
src_pad.set_query_function(|pad, parent, query| {
Queue::catch_panic_pad_function(
parent,
|| false,
|queue, element| queue.src_query(pad, element, query),
)
});
Self {
cat: gst::DebugCategory::new(
"ts-queue",
gst::DebugColorFlags::empty(),
"Thread-sharing queue",
),
sink_pad: sink_pad,
src_pad: src_pad,
state: Mutex::new(State::default()),
settings: Mutex::new(Settings::default()),
}
}
}
impl ObjectImpl for Queue {
glib_object_impl!();
fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
match *prop {
Property::UInt("max-size-buffers", ..) => {
subclass::Property("max-size-buffers", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.max_size_buffers = value.get().unwrap();
}
Property::UInt("max-size-bytes", ..) => {
subclass::Property("max-size-bytes", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.max_size_bytes = value.get().unwrap();
}
Property::UInt64("max-size-time", ..) => {
subclass::Property("max-size-time", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.max_size_time = value.get().unwrap();
}
Property::String("context", ..) => {
subclass::Property("context", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.context = value.get().unwrap_or_else(|| "".into());
}
Property::UInt("context-wait", ..) => {
subclass::Property("context-wait", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.context_wait = value.get().unwrap();
}
@ -855,39 +892,47 @@ impl ObjectImpl<Element> for Queue {
}
}
fn get_property(&self, _obj: &glib::Object, id: u32) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id as usize];
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
match *prop {
Property::UInt("max-size-buffers", ..) => {
subclass::Property("max-size-buffers", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.max_size_buffers.to_value())
}
Property::UInt("max-size-bytes", ..) => {
subclass::Property("max-size-bytes", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.max_size_bytes.to_value())
}
Property::UInt64("max-size-time", ..) => {
subclass::Property("max-size-time", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.max_size_time.to_value())
}
Property::String("context", ..) => {
subclass::Property("context", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.context.to_value())
}
Property::UInt("context-wait", ..) => {
subclass::Property("context-wait", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.context_wait.to_value())
}
_ => unimplemented!(),
}
}
fn constructed(&self, obj: &glib::Object) {
self.parent_constructed(obj);
let element = obj.downcast_ref::<gst::Element>().unwrap();
element.add_pad(&self.sink_pad).unwrap();
element.add_pad(&self.src_pad).unwrap();
}
}
impl ElementImpl<Element> for Queue {
impl ElementImpl for Queue {
fn change_state(
&self,
element: &Element,
element: &gst::Element,
transition: gst::StateChange,
) -> gst::StateChangeReturn {
gst_trace!(self.cat, obj: element, "Changing state {:?}", transition);
@ -911,7 +956,7 @@ impl ElementImpl<Element> for Queue {
_ => (),
}
let ret = element.parent_change_state(transition);
let ret = self.parent_change_state(element, transition);
if ret == gst::StateChangeReturn::Failure {
return ret;
}
@ -928,24 +973,6 @@ impl ElementImpl<Element> for Queue {
}
}
struct QueueStatic;
impl ImplTypeStatic<Element> for QueueStatic {
fn get_name(&self) -> &str {
"Queue"
}
fn new(&self, element: &Element) -> Box<ElementImpl<Element>> {
Queue::init(element)
}
fn class_init(&self, klass: &mut ElementClass) {
Queue::class_init(klass);
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
let queue_static = QueueStatic;
let type_ = register_type(queue_static);
gst::Element::register(plugin, "ts-queue", 0, type_)
gst::Element::register(plugin, "ts-queue", 0, Queue::get_type())
}

View file

@ -18,11 +18,11 @@
use glib;
use glib::prelude::*;
use glib::subclass;
use glib::subclass::prelude::*;
use gst;
use gst::prelude::*;
use gobject_subclass::object::*;
use gst_plugin::element::*;
use gst::subclass::prelude::*;
use std::io;
use std::sync::Mutex;
@ -71,52 +71,67 @@ impl Default for Settings {
}
}
static PROPERTIES: [Property; 6] = [
Property::String(
"address",
"Address",
"Address to receive packets from",
DEFAULT_ADDRESS,
PropertyMutability::ReadWrite,
),
Property::UInt(
"port",
"Port",
"Port to receive packets from",
(0, u16::MAX as u32),
DEFAULT_PORT,
PropertyMutability::ReadWrite,
),
Property::Boxed(
"caps",
"Caps",
"Caps to use",
gst::Caps::static_type,
PropertyMutability::ReadWrite,
),
Property::UInt(
"chunk-size",
"Chunk Size",
"Chunk Size",
(0, u16::MAX as u32),
DEFAULT_CHUNK_SIZE,
PropertyMutability::ReadWrite,
),
Property::String(
"context",
"Context",
"Context name to share threads with",
Some(DEFAULT_CONTEXT),
PropertyMutability::ReadWrite,
),
Property::UInt(
"context-wait",
"Context Wait",
"Throttle poll loop to run at most once every this many ms",
(0, 1000),
DEFAULT_CONTEXT_WAIT,
PropertyMutability::ReadWrite,
),
static PROPERTIES: [subclass::Property; 6] = [
subclass::Property("address", || {
glib::ParamSpec::string(
"address",
"Address",
"Address to receive packets from",
DEFAULT_ADDRESS,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("port", || {
glib::ParamSpec::uint(
"port",
"Port",
"Port to receive packets from",
0,
u16::MAX as u32,
DEFAULT_PORT,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("caps", || {
glib::ParamSpec::boxed(
"caps",
"Caps",
"Caps to use",
gst::Caps::static_type(),
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("chunk-size", || {
glib::ParamSpec::uint(
"chunk-size",
"Chunk Size",
"Chunk Size",
0,
u16::MAX as u32,
DEFAULT_CHUNK_SIZE,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("context", || {
glib::ParamSpec::string(
"context",
"Context",
"Context name to share threads with",
Some(DEFAULT_CONTEXT),
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("context-wait", || {
glib::ParamSpec::uint(
"context-wait",
"Context Wait",
"Throttle poll loop to run at most once every this many ms",
0,
1000,
DEFAULT_CONTEXT_WAIT,
glib::ParamFlags::READWRITE,
)
}),
];
pub struct TcpClientReader {
@ -185,61 +200,7 @@ struct TcpClientSrc {
}
impl TcpClientSrc {
fn class_init(klass: &mut ElementClass) {
klass.set_metadata(
"Thread-sharing TCP client source",
"Source/Network",
"Receives data over the network via TCP",
"Sebastian Dröge <sebastian@centricular.com>, LEE Dongjun <redongjun@gmail.com>",
);
let caps = gst::Caps::new_any();
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
);
klass.add_pad_template(src_pad_template);
klass.install_properties(&PROPERTIES);
}
fn init(element: &Element) -> Box<ElementImpl<Element>> {
let templ = element.get_pad_template("src").unwrap();
let src_pad = gst::Pad::new_from_template(&templ, "src");
src_pad.set_event_function(|pad, parent, event| {
TcpClientSrc::catch_panic_pad_function(
parent,
|| false,
|tcpclientsrc, element| tcpclientsrc.src_event(pad, element, event),
)
});
src_pad.set_query_function(|pad, parent, query| {
TcpClientSrc::catch_panic_pad_function(
parent,
|| false,
|tcpclientsrc, element| tcpclientsrc.src_query(pad, element, query),
)
});
element.add_pad(&src_pad).unwrap();
::set_element_flags(element, gst::ElementFlags::SOURCE);
Box::new(Self {
cat: gst::DebugCategory::new(
"ts-tcpclientsrc",
gst::DebugColorFlags::empty(),
"Thread-sharing TCP Client source",
),
src_pad: src_pad,
state: Mutex::new(State::default()),
settings: Mutex::new(Settings::default()),
})
}
fn src_event(&self, pad: &gst::Pad, element: &Element, event: gst::Event) -> bool {
fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
@ -271,7 +232,12 @@ impl TcpClientSrc {
ret
}
fn src_query(&self, pad: &gst::Pad, _element: &Element, query: &mut gst::QueryRef) -> bool {
fn src_query(
&self,
pad: &gst::Pad,
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
use gst::QueryView;
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
@ -331,7 +297,7 @@ impl TcpClientSrc {
fn push_buffer(
&self,
element: &Element,
element: &gst::Element,
buffer: gst::Buffer,
) -> future::Either<
Box<Future<Item = (), Error = gst::FlowError> + Send + 'static>,
@ -431,7 +397,7 @@ impl TcpClientSrc {
}
}
fn prepare(&self, element: &Element) -> Result<(), gst::ErrorMessage> {
fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
use std::net::{IpAddr, SocketAddr};
gst_debug!(self.cat, obj: element, "Preparing");
@ -493,17 +459,11 @@ impl TcpClientSrc {
.schedule(
&io_context,
move |buffer| {
let tcpclientsrc = element_clone
.get_impl()
.downcast_ref::<TcpClientSrc>()
.unwrap();
let tcpclientsrc = Self::from_instance(&element_clone);
tcpclientsrc.push_buffer(&element_clone, buffer)
},
move |err| {
let tcpclientsrc = element_clone2
.get_impl()
.downcast_ref::<TcpClientSrc>()
.unwrap();
let tcpclientsrc = Self::from_instance(&element_clone2);
gst_error!(tcpclientsrc.cat, obj: &element_clone2, "Got error {}", err);
match err {
Either::Left(gst::FlowError::CustomError) => (),
@ -547,7 +507,7 @@ impl TcpClientSrc {
Ok(())
}
fn unprepare(&self, element: &Element) -> Result<(), ()> {
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Unpreparing");
// FIXME: The IO Context has to be alive longer than the queue,
@ -576,7 +536,7 @@ impl TcpClientSrc {
Ok(())
}
fn start(&self, element: &Element) -> Result<(), ()> {
fn start(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Starting");
let state = self.state.lock().unwrap();
@ -589,7 +549,7 @@ impl TcpClientSrc {
Ok(())
}
fn stop(&self, element: &Element) -> Result<(), ()> {
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Stopping");
let mut state = self.state.lock().unwrap();
@ -604,32 +564,98 @@ impl TcpClientSrc {
}
}
impl ObjectImpl<Element> for TcpClientSrc {
fn set_property(&self, _obj: &glib::Object, id: u32, value: &glib::Value) {
let prop = &PROPERTIES[id as usize];
impl ObjectSubclass for TcpClientSrc {
const NAME: &'static str = "RsTsTcpClientSrc";
type ParentType = gst::Element;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
klass.set_metadata(
"Thread-sharing TCP client source",
"Source/Network",
"Receives data over the network via TCP",
"Sebastian Dröge <sebastian@centricular.com>, LEE Dongjun <redongjun@gmail.com>",
);
let caps = gst::Caps::new_any();
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
);
klass.add_pad_template(src_pad_template);
klass.install_properties(&PROPERTIES);
}
fn new() -> Self {
unreachable!()
}
fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
let templ = klass.get_pad_template("src").unwrap();
let src_pad = gst::Pad::new_from_template(&templ, "src");
src_pad.set_event_function(|pad, parent, event| {
TcpClientSrc::catch_panic_pad_function(
parent,
|| false,
|tcpclientsrc, element| tcpclientsrc.src_event(pad, element, event),
)
});
src_pad.set_query_function(|pad, parent, query| {
TcpClientSrc::catch_panic_pad_function(
parent,
|| false,
|tcpclientsrc, element| tcpclientsrc.src_query(pad, element, query),
)
});
Self {
cat: gst::DebugCategory::new(
"ts-tcpclientsrc",
gst::DebugColorFlags::empty(),
"Thread-sharing TCP Client source",
),
src_pad: src_pad,
state: Mutex::new(State::default()),
settings: Mutex::new(Settings::default()),
}
}
}
impl ObjectImpl for TcpClientSrc {
glib_object_impl!();
fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
match *prop {
Property::String("address", ..) => {
subclass::Property("address", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.address = value.get();
}
Property::UInt("port", ..) => {
subclass::Property("port", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.port = value.get().unwrap();
}
Property::Boxed("caps", ..) => {
subclass::Property("caps", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.caps = value.get();
}
Property::UInt("chunk-size", ..) => {
subclass::Property("chunk-size", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.chunk_size = value.get().unwrap();
}
Property::String("context", ..) => {
subclass::Property("context", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.context = value.get().unwrap_or_else(|| "".into());
}
Property::UInt("context-wait", ..) => {
subclass::Property("context-wait", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.context_wait = value.get().unwrap();
}
@ -637,43 +663,52 @@ impl ObjectImpl<Element> for TcpClientSrc {
}
}
fn get_property(&self, _obj: &glib::Object, id: u32) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id as usize];
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
match *prop {
Property::String("address", ..) => {
subclass::Property("address", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.address.to_value())
}
Property::UInt("port", ..) => {
subclass::Property("port", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.port.to_value())
}
Property::Boxed("caps", ..) => {
subclass::Property("caps", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.caps.to_value())
}
Property::UInt("chunk-size", ..) => {
subclass::Property("chunk-size", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.chunk_size.to_value())
}
Property::String("context", ..) => {
subclass::Property("context", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.context.to_value())
}
Property::UInt("context-wait", ..) => {
subclass::Property("context-wait", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.context_wait.to_value())
}
_ => unimplemented!(),
}
}
fn constructed(&self, obj: &glib::Object) {
self.parent_constructed(obj);
let element = obj.downcast_ref::<gst::Element>().unwrap();
element.add_pad(&self.src_pad).unwrap();
::set_element_flags(element, gst::ElementFlags::SOURCE);
}
}
impl ElementImpl<Element> for TcpClientSrc {
impl ElementImpl for TcpClientSrc {
fn change_state(
&self,
element: &Element,
element: &gst::Element,
transition: gst::StateChange,
) -> gst::StateChangeReturn {
gst_trace!(self.cat, obj: element, "Changing state {:?}", transition);
@ -699,7 +734,7 @@ impl ElementImpl<Element> for TcpClientSrc {
_ => (),
}
let mut ret = element.parent_change_state(transition);
let mut ret = self.parent_change_state(element, transition);
if ret == gst::StateChangeReturn::Failure {
return ret;
}
@ -719,24 +754,6 @@ impl ElementImpl<Element> for TcpClientSrc {
}
}
struct TcpClientSrcStatic;
impl ImplTypeStatic<Element> for TcpClientSrcStatic {
fn get_name(&self) -> &str {
"TcpClientSrc"
}
fn new(&self, element: &Element) -> Box<ElementImpl<Element>> {
TcpClientSrc::init(element)
}
fn class_init(&self, klass: &mut ElementClass) {
TcpClientSrc::class_init(klass);
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
let tcpclientsrc_static = TcpClientSrcStatic;
let type_ = register_type(tcpclientsrc_static);
gst::Element::register(plugin, "ts-tcpclientsrc", 0, type_)
gst::Element::register(plugin, "ts-tcpclientsrc", 0, TcpClientSrc::get_type())
}

View file

@ -17,17 +17,17 @@
use glib;
use glib::prelude::*;
use glib::subclass;
use glib::subclass::prelude::*;
use gst;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gio;
use gio_ffi;
use gobject_ffi;
use gobject_subclass::object::*;
use gst_plugin::element::*;
use std::io;
use std::sync::Mutex;
use std::u16;
@ -179,73 +179,94 @@ impl Default for Settings {
}
}
static PROPERTIES: [Property; 9] = [
Property::String(
"address",
"Address",
"Address/multicast group to listen on",
DEFAULT_ADDRESS,
PropertyMutability::ReadWrite,
),
Property::UInt(
"port",
"Port",
"Port to listen on",
(0, u16::MAX as u32),
DEFAULT_PORT,
PropertyMutability::ReadWrite,
),
Property::Boolean(
"reuse",
"Reuse",
"Allow reuse of the port",
DEFAULT_REUSE,
PropertyMutability::ReadWrite,
),
Property::Boxed(
"caps",
"Caps",
"Caps to use",
gst::Caps::static_type,
PropertyMutability::ReadWrite,
),
Property::UInt(
"mtu",
"MTU",
"MTU",
(0, u16::MAX as u32),
DEFAULT_MTU,
PropertyMutability::ReadWrite,
),
Property::Object(
"socket",
"Socket",
"Socket to use for UDP reception. (None == allocate)",
gio::Socket::static_type,
PropertyMutability::ReadWrite,
),
Property::Object(
"used-socket",
"Used Socket",
"Socket currently in use for UDP reception. (None = no socket)",
gio::Socket::static_type,
PropertyMutability::Readable,
),
Property::String(
"context",
"Context",
"Context name to share threads with",
Some(DEFAULT_CONTEXT),
PropertyMutability::ReadWrite,
),
Property::UInt(
"context-wait",
"Context Wait",
"Throttle poll loop to run at most once every this many ms",
(0, 1000),
DEFAULT_CONTEXT_WAIT,
PropertyMutability::ReadWrite,
),
static PROPERTIES: [subclass::Property; 9] = [
subclass::Property("address", || {
glib::ParamSpec::string(
"address",
"Address",
"Address/multicast group to listen on",
DEFAULT_ADDRESS,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("port", || {
glib::ParamSpec::uint(
"port",
"Port",
"Port to listen on",
0,
u16::MAX as u32,
DEFAULT_PORT,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("reuse", || {
glib::ParamSpec::boolean(
"reuse",
"Reuse",
"Allow reuse of the port",
DEFAULT_REUSE,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("caps", || {
glib::ParamSpec::boxed(
"caps",
"Caps",
"Caps to use",
gst::Caps::static_type(),
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("mtu", || {
glib::ParamSpec::uint(
"mtu",
"MTU",
"MTU",
0,
u16::MAX as u32,
DEFAULT_MTU,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("socket", || {
glib::ParamSpec::object(
"socket",
"Socket",
"Socket to use for UDP reception. (None == allocate)",
gio::Socket::static_type(),
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("used-socket", || {
glib::ParamSpec::object(
"used-socket",
"Used Socket",
"Socket currently in use for UDP reception. (None = no socket)",
gio::Socket::static_type(),
glib::ParamFlags::READABLE,
)
}),
subclass::Property("context", || {
glib::ParamSpec::string(
"context",
"Context",
"Context name to share threads with",
Some(DEFAULT_CONTEXT),
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("context-wait", || {
glib::ParamSpec::uint(
"context-wait",
"Context Wait",
"Throttle poll loop to run at most once every this many ms",
0,
1000,
DEFAULT_CONTEXT_WAIT,
glib::ParamFlags::READWRITE,
)
}),
];
pub struct UdpReader {
@ -296,61 +317,7 @@ struct UdpSrc {
}
impl UdpSrc {
fn class_init(klass: &mut ElementClass) {
klass.set_metadata(
"Thread-sharing UDP source",
"Source/Network",
"Receives data over the network via UDP",
"Sebastian Dröge <sebastian@centricular.com>",
);
let caps = gst::Caps::new_any();
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
);
klass.add_pad_template(src_pad_template);
klass.install_properties(&PROPERTIES);
}
fn init(element: &Element) -> Box<ElementImpl<Element>> {
let templ = element.get_pad_template("src").unwrap();
let src_pad = gst::Pad::new_from_template(&templ, "src");
src_pad.set_event_function(|pad, parent, event| {
UdpSrc::catch_panic_pad_function(
parent,
|| false,
|udpsrc, element| udpsrc.src_event(pad, element, event),
)
});
src_pad.set_query_function(|pad, parent, query| {
UdpSrc::catch_panic_pad_function(
parent,
|| false,
|udpsrc, element| udpsrc.src_query(pad, element, query),
)
});
element.add_pad(&src_pad).unwrap();
::set_element_flags(element, gst::ElementFlags::SOURCE);
Box::new(Self {
cat: gst::DebugCategory::new(
"ts-udpsrc",
gst::DebugColorFlags::empty(),
"Thread-sharing UDP source",
),
src_pad: src_pad,
state: Mutex::new(State::default()),
settings: Mutex::new(Settings::default()),
})
}
fn src_event(&self, pad: &gst::Pad, element: &Element, event: gst::Event) -> bool {
fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
@ -382,7 +349,12 @@ impl UdpSrc {
ret
}
fn src_query(&self, pad: &gst::Pad, _element: &Element, query: &mut gst::QueryRef) -> bool {
fn src_query(
&self,
pad: &gst::Pad,
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
use gst::QueryView;
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
@ -442,7 +414,7 @@ impl UdpSrc {
fn push_buffer(
&self,
element: &Element,
element: &gst::Element,
buffer: gst::Buffer,
) -> future::Either<
Box<Future<Item = (), Error = gst::FlowError> + Send + 'static>,
@ -537,7 +509,7 @@ impl UdpSrc {
}
}
fn prepare(&self, element: &Element) -> Result<(), gst::ErrorMessage> {
fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
gst_debug!(self.cat, obj: element, "Preparing");
@ -768,11 +740,11 @@ impl UdpSrc {
.schedule(
&io_context,
move |buffer| {
let udpsrc = element_clone.get_impl().downcast_ref::<UdpSrc>().unwrap();
let udpsrc = Self::from_instance(&element_clone);
udpsrc.push_buffer(&element_clone, buffer)
},
move |err| {
let udpsrc = element_clone2.get_impl().downcast_ref::<UdpSrc>().unwrap();
let udpsrc = Self::from_instance(&element_clone2);
gst_error!(udpsrc.cat, obj: &element_clone2, "Got error {}", err);
match err {
Either::Left(gst::FlowError::CustomError) => (),
@ -819,7 +791,7 @@ impl UdpSrc {
Ok(())
}
fn unprepare(&self, element: &Element) -> Result<(), ()> {
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Unpreparing");
self.settings.lock().unwrap().used_socket = None;
@ -850,7 +822,7 @@ impl UdpSrc {
Ok(())
}
fn start(&self, element: &Element) -> Result<(), ()> {
fn start(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Starting");
let state = self.state.lock().unwrap();
@ -863,7 +835,7 @@ impl UdpSrc {
Ok(())
}
fn stop(&self, element: &Element) -> Result<(), ()> {
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Stopping");
let mut state = self.state.lock().unwrap();
@ -878,45 +850,111 @@ impl UdpSrc {
}
}
impl ObjectImpl<Element> for UdpSrc {
fn set_property(&self, _obj: &glib::Object, id: u32, value: &glib::Value) {
let prop = &PROPERTIES[id as usize];
impl ObjectSubclass for UdpSrc {
const NAME: &'static str = "RsTsUdpSrc";
type ParentType = gst::Element;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
klass.set_metadata(
"Thread-sharing UDP source",
"Source/Network",
"Receives data over the network via UDP",
"Sebastian Dröge <sebastian@centricular.com>",
);
let caps = gst::Caps::new_any();
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
);
klass.add_pad_template(src_pad_template);
klass.install_properties(&PROPERTIES);
}
fn new() -> Self {
unreachable!()
}
fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
let templ = klass.get_pad_template("src").unwrap();
let src_pad = gst::Pad::new_from_template(&templ, "src");
src_pad.set_event_function(|pad, parent, event| {
UdpSrc::catch_panic_pad_function(
parent,
|| false,
|udpsrc, element| udpsrc.src_event(pad, element, event),
)
});
src_pad.set_query_function(|pad, parent, query| {
UdpSrc::catch_panic_pad_function(
parent,
|| false,
|udpsrc, element| udpsrc.src_query(pad, element, query),
)
});
Self {
cat: gst::DebugCategory::new(
"ts-udpsrc",
gst::DebugColorFlags::empty(),
"Thread-sharing UDP source",
),
src_pad: src_pad,
state: Mutex::new(State::default()),
settings: Mutex::new(Settings::default()),
}
}
}
impl ObjectImpl for UdpSrc {
glib_object_impl!();
fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
match *prop {
Property::String("address", ..) => {
subclass::Property("address", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.address = value.get();
}
Property::UInt("port", ..) => {
subclass::Property("port", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.port = value.get().unwrap();
}
Property::Boolean("reuse", ..) => {
subclass::Property("reuse", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.reuse = value.get().unwrap();
}
Property::Boxed("caps", ..) => {
subclass::Property("caps", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.caps = value.get();
}
Property::UInt("mtu", ..) => {
subclass::Property("mtu", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.mtu = value.get().unwrap();
}
Property::Object("socket", ..) => {
subclass::Property("socket", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.socket = value
.get::<gio::Socket>()
.map(|socket| GioSocketWrapper::new(&socket));
}
Property::Object("used-socket", ..) => {
subclass::Property("used-socket", ..) => {
unreachable!();
}
Property::String("context", ..) => {
subclass::Property("context", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.context = value.get().unwrap_or_else(|| "".into());
}
Property::UInt("context-wait", ..) => {
subclass::Property("context-wait", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.context_wait = value.get().unwrap();
}
@ -924,31 +962,31 @@ impl ObjectImpl<Element> for UdpSrc {
}
}
fn get_property(&self, _obj: &glib::Object, id: u32) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id as usize];
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
match *prop {
Property::String("address", ..) => {
subclass::Property("address", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.address.to_value())
}
Property::UInt("port", ..) => {
subclass::Property("port", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.port.to_value())
}
Property::Boolean("reuse", ..) => {
subclass::Property("reuse", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.reuse.to_value())
}
Property::Boxed("caps", ..) => {
subclass::Property("caps", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.caps.to_value())
}
Property::UInt("mtu", ..) => {
subclass::Property("mtu", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.mtu.to_value())
}
Property::Object("socket", ..) => {
subclass::Property("socket", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings
.socket
@ -956,7 +994,7 @@ impl ObjectImpl<Element> for UdpSrc {
.map(GioSocketWrapper::as_socket)
.to_value())
}
Property::Object("used-socket", ..) => {
subclass::Property("used-socket", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings
.used_socket
@ -964,23 +1002,31 @@ impl ObjectImpl<Element> for UdpSrc {
.map(GioSocketWrapper::as_socket)
.to_value())
}
Property::String("context", ..) => {
subclass::Property("context", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.context.to_value())
}
Property::UInt("context-wait", ..) => {
subclass::Property("context-wait", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.context_wait.to_value())
}
_ => unimplemented!(),
}
}
fn constructed(&self, obj: &glib::Object) {
self.parent_constructed(obj);
let element = obj.downcast_ref::<gst::Element>().unwrap();
element.add_pad(&self.src_pad).unwrap();
::set_element_flags(element, gst::ElementFlags::SOURCE);
}
}
impl ElementImpl<Element> for UdpSrc {
impl ElementImpl for UdpSrc {
fn change_state(
&self,
element: &Element,
element: &gst::Element,
transition: gst::StateChange,
) -> gst::StateChangeReturn {
gst_trace!(self.cat, obj: element, "Changing state {:?}", transition);
@ -1004,7 +1050,7 @@ impl ElementImpl<Element> for UdpSrc {
_ => (),
}
let mut ret = element.parent_change_state(transition);
let mut ret = self.parent_change_state(element, transition);
if ret == gst::StateChangeReturn::Failure {
return ret;
}
@ -1028,24 +1074,6 @@ impl ElementImpl<Element> for UdpSrc {
}
}
struct UdpSrcStatic;
impl ImplTypeStatic<Element> for UdpSrcStatic {
fn get_name(&self) -> &str {
"UdpSrc"
}
fn new(&self, element: &Element) -> Box<ElementImpl<Element>> {
UdpSrc::init(element)
}
fn class_init(&self, klass: &mut ElementClass) {
UdpSrc::class_init(klass);
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
let udpsrc_static = UdpSrcStatic;
let type_ = register_type(udpsrc_static);
gst::Element::register(plugin, "ts-udpsrc", 0, type_)
gst::Element::register(plugin, "ts-udpsrc", 0, UdpSrc::get_type())
}