net: Update for subclassing API changes

This commit is contained in:
Sebastian Dröge 2020-11-14 19:24:01 +02:00
parent af0337c26c
commit d7404a7e1c
10 changed files with 213 additions and 138 deletions

View file

@ -230,7 +230,7 @@ lazy_static! {
impl ReqwestHttpSrc {
fn set_location(
&self,
_element: &gst_base::BaseSrc,
_element: &super::ReqwestHttpSrc,
uri: Option<&str>,
) -> Result<(), glib::Error> {
let state = self.state.lock().unwrap();
@ -268,7 +268,10 @@ impl ReqwestHttpSrc {
Ok(())
}
fn ensure_client(&self, src: &gst_base::BaseSrc) -> Result<ClientContext, gst::ErrorMessage> {
fn ensure_client(
&self,
src: &super::ReqwestHttpSrc,
) -> Result<ClientContext, gst::ErrorMessage> {
let mut client_guard = self.client.lock().unwrap();
if let Some(ref client) = *client_guard {
gst_debug!(CAT, obj: src, "Using already configured client");
@ -334,7 +337,7 @@ impl ReqwestHttpSrc {
fn do_request(
&self,
src: &gst_base::BaseSrc,
src: &super::ReqwestHttpSrc,
uri: Url,
start: u64,
stop: Option<u64>,
@ -670,17 +673,15 @@ impl ReqwestHttpSrc {
}
impl ObjectImpl for ReqwestHttpSrc {
fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) {
fn set_property(&self, obj: &Self::Type, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
match *prop {
subclass::Property("location", ..) => {
let element = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
let location = value.get::<&str>().expect("type checked upstream");
if let Err(err) = self.set_location(element, location) {
if let Err(err) = self.set_location(obj, location) {
gst_error!(
CAT,
obj: element,
obj: obj,
"Failed to set property `location`: {:?}",
err
);
@ -695,9 +696,8 @@ impl ObjectImpl for ReqwestHttpSrc {
settings.user_agent = user_agent;
}
subclass::Property("is-live", ..) => {
let element = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
let is_live = value.get_some().expect("type checked upstream");
element.set_live(is_live);
obj.set_live(is_live);
}
subclass::Property("user-id", ..) => {
let mut settings = self.settings.lock().unwrap();
@ -743,7 +743,7 @@ impl ObjectImpl for ReqwestHttpSrc {
};
}
fn get_property(&self, obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
fn get_property(&self, obj: &Self::Type, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
match *prop {
subclass::Property("location", ..) => {
@ -756,10 +756,7 @@ impl ObjectImpl for ReqwestHttpSrc {
let settings = self.settings.lock().unwrap();
Ok(settings.user_agent.to_value())
}
subclass::Property("is-live", ..) => {
let element = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
Ok(element.is_live().to_value())
}
subclass::Property("is-live", ..) => Ok(obj.is_live().to_value()),
subclass::Property("user-id", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.user_id.to_value())
@ -796,16 +793,15 @@ impl ObjectImpl for ReqwestHttpSrc {
}
}
fn constructed(&self, obj: &glib::Object) {
fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
let element = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
element.set_automatic_eos(false);
element.set_format(gst::Format::Bytes);
obj.set_automatic_eos(false);
obj.set_format(gst::Format::Bytes);
}
}
impl ElementImpl for ReqwestHttpSrc {
fn set_context(&self, element: &gst::Element, context: &gst::Context) {
fn set_context(&self, element: &Self::Type, context: &gst::Context) {
if context.get_context_type() == REQWEST_CLIENT_CONTEXT {
let mut external_client = self.external_client.lock().unwrap();
let s = context.get_structure();
@ -820,7 +816,7 @@ impl ElementImpl for ReqwestHttpSrc {
fn change_state(
&self,
element: &gst::Element,
element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
if let gst::StateChange::ReadyToNull = transition {
@ -832,21 +828,21 @@ impl ElementImpl for ReqwestHttpSrc {
}
impl BaseSrcImpl for ReqwestHttpSrc {
fn is_seekable(&self, _src: &gst_base::BaseSrc) -> bool {
fn is_seekable(&self, _src: &Self::Type) -> bool {
match *self.state.lock().unwrap() {
State::Started { seekable, .. } => seekable,
_ => false,
}
}
fn get_size(&self, _src: &gst_base::BaseSrc) -> Option<u64> {
fn get_size(&self, _src: &Self::Type) -> Option<u64> {
match *self.state.lock().unwrap() {
State::Started { size, .. } => size,
_ => None,
}
}
fn unlock(&self, _src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
fn unlock(&self, _src: &Self::Type) -> Result<(), gst::ErrorMessage> {
let canceller = self.canceller.lock().unwrap();
if let Some(ref canceller) = *canceller {
canceller.abort();
@ -854,7 +850,7 @@ impl BaseSrcImpl for ReqwestHttpSrc {
Ok(())
}
fn start(&self, src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
fn start(&self, src: &Self::Type) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
*state = State::Stopped;
@ -881,14 +877,14 @@ impl BaseSrcImpl for ReqwestHttpSrc {
Ok(())
}
fn stop(&self, src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
fn stop(&self, src: &Self::Type) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: src, "Stopping");
*self.state.lock().unwrap() = State::Stopped;
Ok(())
}
fn query(&self, element: &gst_base::BaseSrc, query: &mut gst::QueryRef) -> bool {
fn query(&self, element: &Self::Type, query: &mut gst::QueryRef) -> bool {
use gst::QueryView;
match query.view_mut() {
@ -906,7 +902,7 @@ impl BaseSrcImpl for ReqwestHttpSrc {
}
}
fn do_seek(&self, src: &gst_base::BaseSrc, segment: &mut gst::Segment) -> bool {
fn do_seek(&self, src: &Self::Type, segment: &mut gst::Segment) -> bool {
let segment = segment.downcast_mut::<gst::format::Bytes>().unwrap();
let mut state = self.state.lock().unwrap();
@ -951,7 +947,7 @@ impl BaseSrcImpl for ReqwestHttpSrc {
}
impl PushSrcImpl for ReqwestHttpSrc {
fn create(&self, src: &gst_base::PushSrc) -> Result<gst::Buffer, gst::FlowError> {
fn create(&self, src: &Self::Type) -> Result<gst::Buffer, gst::FlowError> {
let mut state = self.state.lock().unwrap();
let (response, position, caps, tags) = match *state {
@ -1073,15 +1069,13 @@ impl PushSrcImpl for ReqwestHttpSrc {
}
impl URIHandlerImpl for ReqwestHttpSrc {
fn get_uri(&self, _element: &gst::URIHandler) -> Option<String> {
fn get_uri(&self, _element: &Self::Type) -> Option<String> {
let settings = self.settings.lock().unwrap();
settings.location.as_ref().map(Url::to_string)
}
fn set_uri(&self, element: &gst::URIHandler, uri: &str) -> Result<(), glib::Error> {
let element = element.dynamic_cast_ref::<gst_base::BaseSrc>().unwrap();
fn set_uri(&self, element: &Self::Type, uri: &str) -> Result<(), glib::Error> {
self.set_location(&element, Some(uri))
}
@ -1096,6 +1090,7 @@ impl URIHandlerImpl for ReqwestHttpSrc {
impl ObjectSubclass for ReqwestHttpSrc {
const NAME: &'static str = "ReqwestHttpSrc";
type Type = super::ReqwestHttpSrc;
type ParentType = gst_base::PushSrc;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
@ -1116,7 +1111,7 @@ impl ObjectSubclass for ReqwestHttpSrc {
type_.add_interface::<gst::URIHandler>();
}
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
fn class_init(klass: &mut Self::Class) {
klass.set_metadata(
"HTTP Source",
"Source/Network/HTTP",
@ -1137,12 +1132,3 @@ impl ObjectSubclass for ReqwestHttpSrc {
klass.install_properties(&PROPERTIES);
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"reqwesthttpsrc",
gst::Rank::Marginal,
ReqwestHttpSrc::get_type(),
)
}

View file

@ -0,0 +1,27 @@
// Copyright (C) 2016-2018 Sebastian Dröge <sebastian@centricular.com>
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use glib::prelude::*;
mod imp;
glib_wrapper! {
pub struct ReqwestHttpSrc(ObjectSubclass<imp::ReqwestHttpSrc>) @extends gst_base::PushSrc, gst_base::BaseSrc, gst::Element, gst::Object, @implements gst::URIHandler;
}
unsafe impl Send for ReqwestHttpSrc {}
unsafe impl Sync for ReqwestHttpSrc {}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"reqwesthttpsrc",
gst::Rank::Marginal,
ReqwestHttpSrc::static_type(),
)
}

View file

@ -40,7 +40,7 @@ use std::pin::Pin;
use std::sync::Mutex;
use std::time::Duration;
use crate::packet::*;
use super::packet::*;
use serde_derive::Deserialize;
@ -199,7 +199,7 @@ impl Default for State {
type WsSink = Pin<Box<dyn Sink<Message, Error = WsError> + Send>>;
struct Transcriber {
pub struct Transcriber {
srcpad: gst::Pad,
sinkpad: gst::Pad,
settings: Mutex<Settings>,
@ -230,7 +230,7 @@ fn build_packet(payload: &[u8]) -> Vec<u8> {
}
impl Transcriber {
fn dequeue(&self, element: &gst::Element) -> bool {
fn dequeue(&self, element: &super::Transcriber) -> bool {
/* First, check our pending buffers */
let mut items = vec![];
@ -340,7 +340,7 @@ impl Transcriber {
fn enqueue(
&self,
element: &gst::Element,
element: &super::Transcriber,
state: &mut State,
alternative: &TranscriptAlternative,
partial: bool,
@ -398,7 +398,7 @@ impl Transcriber {
fn loop_fn(
&self,
element: &gst::Element,
element: &super::Transcriber,
receiver: &mut mpsc::Receiver<Message>,
) -> Result<(), gst::ErrorMessage> {
let future = async move {
@ -567,7 +567,7 @@ impl Transcriber {
RUNTIME.enter(|| futures::executor::block_on(future))
}
fn start_task(&self, element: &gst::Element) -> Result<(), gst::LoggableError> {
fn start_task(&self, element: &super::Transcriber) -> Result<(), gst::LoggableError> {
let element_weak = element.downgrade();
let pad_weak = self.srcpad.downgrade();
let (sender, mut receiver) = mpsc::channel(1);
@ -610,7 +610,7 @@ impl Transcriber {
fn src_activatemode(
&self,
_pad: &gst::Pad,
element: &gst::Element,
element: &super::Transcriber,
_mode: gst::PadMode,
active: bool,
) -> Result<(), gst::LoggableError> {
@ -628,7 +628,12 @@ impl Transcriber {
Ok(())
}
fn src_query(&self, pad: &gst::Pad, element: &gst::Element, query: &mut gst::QueryRef) -> bool {
fn src_query(
&self,
pad: &gst::Pad,
element: &super::Transcriber,
query: &mut gst::QueryRef,
) -> bool {
use gst::QueryView;
gst_log!(CAT, obj: pad, "Handling query {:?}", query);
@ -664,7 +669,7 @@ impl Transcriber {
}
}
fn sink_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
fn sink_event(&self, pad: &gst::Pad, element: &super::Transcriber, event: gst::Event) -> bool {
use gst::EventView;
gst_debug!(CAT, obj: pad, "Handling event {:?}", event);
@ -771,7 +776,7 @@ impl Transcriber {
async fn sync_and_send(
&self,
element: &gst::Element,
element: &super::Transcriber,
buffer: Option<gst::Buffer>,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut delay = None;
@ -819,7 +824,7 @@ impl Transcriber {
fn handle_buffer(
&self,
_pad: &gst::Pad,
element: &gst::Element,
element: &super::Transcriber,
buffer: Option<gst::Buffer>,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_debug!(CAT, obj: element, "Handling {:?}", buffer);
@ -846,13 +851,13 @@ impl Transcriber {
fn sink_chain(
&self,
pad: &gst::Pad,
element: &gst::Element,
element: &super::Transcriber,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
self.handle_buffer(pad, element, Some(buffer))
}
fn ensure_connection(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
fn ensure_connection(&self, element: &super::Transcriber) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
if state.connected {
@ -959,7 +964,7 @@ impl Transcriber {
Ok(())
}
fn disconnect(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
fn disconnect(&self, element: &super::Transcriber) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
gst_info!(CAT, obj: element, "Unpreparing");
@ -987,13 +992,14 @@ impl Transcriber {
impl ObjectSubclass for Transcriber {
const NAME: &'static str = "RsAwsTranscriber";
type Type = super::Transcriber;
type ParentType = gst::Element;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
fn with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
fn with_class(klass: &Self::Class) -> Self {
let templ = klass.get_pad_template("sink").unwrap();
let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink"))
.chain_function(|pad, parent, buffer| {
@ -1047,7 +1053,7 @@ impl ObjectSubclass for Transcriber {
}
}
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
fn class_init(klass: &mut Self::Class) {
klass.set_metadata(
"Transcriber",
"Audio/Text/Filter",
@ -1085,17 +1091,15 @@ impl ObjectSubclass for Transcriber {
}
impl ObjectImpl for Transcriber {
fn constructed(&self, obj: &glib::Object) {
fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
let element = obj.downcast_ref::<gst::Element>().unwrap();
element.add_pad(&self.sinkpad).unwrap();
element.add_pad(&self.srcpad).unwrap();
element
.set_element_flags(gst::ElementFlags::PROVIDE_CLOCK | gst::ElementFlags::REQUIRE_CLOCK);
obj.add_pad(&self.sinkpad).unwrap();
obj.add_pad(&self.srcpad).unwrap();
obj.set_element_flags(gst::ElementFlags::PROVIDE_CLOCK | gst::ElementFlags::REQUIRE_CLOCK);
}
fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
fn set_property(&self, _obj: &Self::Type, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
match *prop {
@ -1115,7 +1119,7 @@ impl ObjectImpl for Transcriber {
}
}
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
fn get_property(&self, _obj: &Self::Type, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
match *prop {
@ -1139,7 +1143,7 @@ impl ObjectImpl for Transcriber {
impl ElementImpl for Transcriber {
fn change_state(
&self,
element: &gst::Element,
element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_info!(CAT, obj: element, "Changing state {:?}", transition);
@ -1169,16 +1173,7 @@ impl ElementImpl for Transcriber {
Ok(success)
}
fn provide_clock(&self, _element: &gst::Element) -> Option<gst::Clock> {
fn provide_clock(&self, _element: &Self::Type) -> Option<gst::Clock> {
Some(gst::SystemClock::obtain())
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"awstranscriber",
gst::Rank::None,
Transcriber::get_type(),
)
}

View file

@ -0,0 +1,37 @@
// Copyright (C) 2020 Mathieu Duponchelle <mathieu@centricular.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
use glib::prelude::*;
mod imp;
mod packet;
glib_wrapper! {
pub struct Transcriber(ObjectSubclass<imp::Transcriber>) @extends gst::Element, gst::Object;
}
unsafe impl Send for Transcriber {}
unsafe impl Sync for Transcriber {}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"awstranscriber",
gst::Rank::None,
Transcriber::static_type(),
)
}

View file

@ -14,8 +14,7 @@ extern crate gstreamer_base as gst_base;
#[macro_use]
extern crate lazy_static;
mod aws_transcribe_parse;
mod packet;
mod aws_transcriber;
mod s3sink;
mod s3src;
mod s3url;
@ -24,7 +23,7 @@ mod s3utils;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
s3sink::register(plugin)?;
s3src::register(plugin)?;
aws_transcribe_parse::register(plugin)?;
aws_transcriber::register(plugin)?;
Ok(())
}

View file

@ -157,7 +157,7 @@ static PROPERTIES: [subclass::Property; 4] = [
impl S3Sink {
fn flush_current_buffer(
&self,
element: &gst_base::BaseSink,
element: &super::S3Sink,
) -> Result<(), Option<gst::ErrorMessage>> {
let upload_part_req = self.create_upload_part_request()?;
let part_number = upload_part_req.part_number;
@ -261,7 +261,7 @@ impl S3Sink {
})
}
fn finalize_upload(&self, element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> {
fn finalize_upload(&self, element: &super::S3Sink) -> Result<(), gst::ErrorMessage> {
if self.flush_current_buffer(element).is_err() {
return Err(gst_error_msg!(
gst::ResourceError::Settings,
@ -339,7 +339,7 @@ impl S3Sink {
fn update_buffer(
&self,
src: &[u8],
element: &gst_base::BaseSink,
element: &super::S3Sink,
) -> Result<(), Option<gst::ErrorMessage>> {
let mut state = self.state.lock().unwrap();
let started_state = match *state {
@ -381,6 +381,7 @@ impl S3Sink {
impl ObjectSubclass for S3Sink {
const NAME: &'static str = "RusotoS3Sink";
type Type = super::S3Sink;
type ParentType = gst_base::BaseSink;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
@ -395,7 +396,7 @@ impl ObjectSubclass for S3Sink {
}
}
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
fn class_init(klass: &mut Self::Class) {
klass.set_metadata(
"Amazon S3 sink",
"Source/Network",
@ -418,7 +419,7 @@ impl ObjectSubclass for S3Sink {
}
impl ObjectImpl for S3Sink {
fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
fn set_property(&self, _obj: &Self::Type, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id as usize];
let mut settings = self.settings.lock().unwrap();
@ -445,7 +446,7 @@ impl ObjectImpl for S3Sink {
}
}
fn get_property(&self, _: &glib::Object, id: usize) -> Result<glib::Value, ()> {
fn get_property(&self, _: &Self::Type, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id as usize];
let settings = self.settings.lock().unwrap();
@ -462,11 +463,11 @@ impl ObjectImpl for S3Sink {
impl ElementImpl for S3Sink {}
impl BaseSinkImpl for S3Sink {
fn start(&self, _element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> {
fn start(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> {
self.start()
}
fn stop(&self, element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> {
fn stop(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
*state = State::Stopped;
gst_info!(CAT, obj: element, "Stopped");
@ -476,7 +477,7 @@ impl BaseSinkImpl for S3Sink {
fn render(
&self,
element: &gst_base::BaseSink,
element: &Self::Type,
buffer: &gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
if let State::Stopped = *self.state.lock().unwrap() {
@ -511,13 +512,13 @@ impl BaseSinkImpl for S3Sink {
}
}
fn unlock(&self, _element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> {
fn unlock(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> {
self.cancel();
Ok(())
}
fn event(&self, element: &gst_base::BaseSink, event: gst::Event) -> bool {
fn event(&self, element: &Self::Type, event: gst::Event) -> bool {
if let gst::EventView::Eos(_) = event.view() {
if let Err(error_message) = self.finalize_upload(element) {
gst_error!(
@ -533,12 +534,3 @@ impl BaseSinkImpl for S3Sink {
BaseSinkImplExt::parent_event(self, element, event)
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"rusotos3sink",
gst::Rank::Primary,
S3Sink::get_type(),
)
}

View file

@ -0,0 +1,27 @@
// Copyright (C) 2019 Amazon.com, Inc. or its affiliates <mkolny@amazon.com>
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use glib::prelude::*;
mod imp;
glib_wrapper! {
pub struct S3Sink(ObjectSubclass<imp::S3Sink>) @extends gst_base::BaseSink, gst::Element, gst::Object;
}
unsafe impl Send for S3Sink {}
unsafe impl Sync for S3Sink {}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"rusotos3sink",
gst::Rank::Primary,
S3Sink::static_type(),
)
}

View file

@ -72,11 +72,7 @@ impl S3Src {
Ok(S3Client::new(url.region.clone()))
}
fn set_uri(
self: &S3Src,
_: &gst_base::BaseSrc,
url_str: Option<&str>,
) -> Result<(), glib::Error> {
fn set_uri(self: &S3Src, _: &super::S3Src, url_str: Option<&str>) -> Result<(), glib::Error> {
let state = self.state.lock().unwrap();
if let StreamingState::Started { .. } = *state {
@ -108,7 +104,7 @@ impl S3Src {
fn head(
self: &S3Src,
src: &gst_base::BaseSrc,
src: &super::S3Src,
client: &S3Client,
url: &GstS3Url,
) -> Result<u64, gst::ErrorMessage> {
@ -145,7 +141,7 @@ impl S3Src {
/* Returns the bytes, Some(error) if one occured, or a None error if interrupted */
fn get(
self: &S3Src,
src: &gst_base::BaseSrc,
src: &super::S3Src,
offset: u64,
length: u64,
) -> Result<Bytes, Option<gst::ErrorMessage>> {
@ -210,6 +206,7 @@ impl S3Src {
impl ObjectSubclass for S3Src {
const NAME: &'static str = "RusotoS3Src";
type Type = super::S3Src;
type ParentType = gst_base::BaseSrc;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
@ -228,7 +225,7 @@ impl ObjectSubclass for S3Src {
typ.add_interface::<gst::URIHandler>();
}
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
fn class_init(klass: &mut Self::Class) {
klass.set_metadata(
"Amazon S3 source",
"Source/Network",
@ -251,19 +248,18 @@ impl ObjectSubclass for S3Src {
}
impl ObjectImpl for S3Src {
fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) {
fn set_property(&self, obj: &Self::Type, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id as usize];
let basesrc = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
match *prop {
subclass::Property("uri", ..) => {
let _ = self.set_uri(basesrc, value.get().expect("type checked upstream"));
let _ = self.set_uri(obj, value.get().expect("type checked upstream"));
}
_ => unimplemented!(),
}
}
fn get_property(&self, _: &glib::Object, id: usize) -> Result<glib::Value, ()> {
fn get_property(&self, _: &Self::Type, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id as usize];
match *prop {
@ -279,13 +275,12 @@ impl ObjectImpl for S3Src {
}
}
fn constructed(&self, obj: &glib::Object) {
fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
let basesrc = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
basesrc.set_format(gst::Format::Bytes);
obj.set_format(gst::Format::Bytes);
/* Set a larger default blocksize to make read more efficient */
basesrc.set_blocksize(256 * 1024);
obj.set_blocksize(256 * 1024);
}
}
@ -294,13 +289,12 @@ impl ElementImpl for S3Src {
}
impl URIHandlerImpl for S3Src {
fn get_uri(&self, _: &gst::URIHandler) -> Option<String> {
fn get_uri(&self, _: &Self::Type) -> Option<String> {
self.url.lock().unwrap().as_ref().map(|s| s.to_string())
}
fn set_uri(&self, element: &gst::URIHandler, uri: &str) -> Result<(), glib::Error> {
let basesrc = element.dynamic_cast_ref::<gst_base::BaseSrc>().unwrap();
self.set_uri(basesrc, Some(uri))
fn set_uri(&self, element: &Self::Type, uri: &str) -> Result<(), glib::Error> {
self.set_uri(element, Some(uri))
}
fn get_uri_type() -> gst::URIType {
@ -313,18 +307,18 @@ impl URIHandlerImpl for S3Src {
}
impl BaseSrcImpl for S3Src {
fn is_seekable(&self, _: &gst_base::BaseSrc) -> bool {
fn is_seekable(&self, _: &Self::Type) -> bool {
true
}
fn get_size(&self, _: &gst_base::BaseSrc) -> Option<u64> {
fn get_size(&self, _: &Self::Type) -> Option<u64> {
match *self.state.lock().unwrap() {
StreamingState::Stopped => None,
StreamingState::Started { size, .. } => Some(size),
}
}
fn start(&self, src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
fn start(&self, src: &Self::Type) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
if let StreamingState::Started { .. } = *state {
@ -353,7 +347,7 @@ impl BaseSrcImpl for S3Src {
Ok(())
}
fn stop(&self, _: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
fn stop(&self, _: &Self::Type) -> Result<(), gst::ErrorMessage> {
// First, stop any asynchronous tasks if we're running, as they will have the state lock
self.cancel();
@ -368,7 +362,7 @@ impl BaseSrcImpl for S3Src {
Ok(())
}
fn query(&self, src: &gst_base::BaseSrc, query: &mut gst::QueryRef) -> bool {
fn query(&self, src: &Self::Type, query: &mut gst::QueryRef) -> bool {
if let gst::QueryView::Scheduling(ref mut q) = query.view_mut() {
q.set(
gst::SchedulingFlags::SEQUENTIAL | gst::SchedulingFlags::BANDWIDTH_LIMITED,
@ -385,7 +379,7 @@ impl BaseSrcImpl for S3Src {
fn create(
&self,
src: &gst_base::BaseSrc,
src: &Self::Type,
offset: u64,
buffer: Option<&mut gst::BufferRef>,
length: u32,
@ -416,21 +410,12 @@ impl BaseSrcImpl for S3Src {
}
/* FIXME: implement */
fn do_seek(&self, _: &gst_base::BaseSrc, _: &mut gst::Segment) -> bool {
fn do_seek(&self, _: &Self::Type, _: &mut gst::Segment) -> bool {
true
}
fn unlock(&self, _: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
fn unlock(&self, _: &Self::Type) -> Result<(), gst::ErrorMessage> {
self.cancel();
Ok(())
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"rusotos3src",
gst::Rank::Primary,
S3Src::get_type(),
)
}

View file

@ -0,0 +1,27 @@
// Copyright (C) 2017 Author: Arun Raghavan <arun@arunraghavan.net>
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use glib::prelude::*;
mod imp;
glib_wrapper! {
pub struct S3Src(ObjectSubclass<imp::S3Src>) @extends gst_base::BaseSrc, gst::Element, gst::Object;
}
unsafe impl Send for S3Src {}
unsafe impl Sync for S3Src {}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"rusotos3src",
gst::Rank::Primary,
S3Src::static_type(),
)
}