Move debug categories from an instance member to lazy_static

Simplifies the code a bit and less state to carry around.
This commit is contained in:
Sebastian Dröge 2019-10-31 23:34:21 +01:00
parent 93756c392f
commit 1ae57967ae
41 changed files with 632 additions and 675 deletions

View file

@ -54,8 +54,6 @@ struct State {
}
struct AudioEcho {
#[allow(dead_code)]
cat: gst::DebugCategory,
settings: Mutex<Settings>,
state: Mutex<Option<State>>,
}
@ -135,11 +133,6 @@ impl ObjectSubclass for AudioEcho {
fn new() -> Self {
Self {
cat: gst::DebugCategory::new(
"rsaudioecho",
gst::DebugColorFlags::empty(),
Some("Rust audioecho effect"),
),
settings: Mutex::new(Default::default()),
state: Mutex::new(None),
}

View file

@ -17,6 +17,7 @@ cdg = "0.1"
cdg_renderer = "0.3"
image = "0.22"
muldiv = "0.2"
lazy_static = "1.0"
[lib]
name = "gstcdg"

View file

@ -23,11 +23,15 @@ use std::sync::Mutex;
use crate::constants::{CDG_HEIGHT, CDG_WIDTH};
struct CdgDec {
cat: gst::DebugCategory,
cdg_inter: Mutex<cdg_renderer::CdgInterpreter>,
output_info: Mutex<Option<gst_video::VideoInfo>>,
}
lazy_static! {
static ref CAT: gst::DebugCategory =
gst::DebugCategory::new("cdgdec", gst::DebugColorFlags::empty(), Some("CDG decoder"),);
}
impl ObjectSubclass for CdgDec {
const NAME: &'static str = "CdgDec";
type ParentType = gst_video::VideoDecoder;
@ -38,11 +42,6 @@ impl ObjectSubclass for CdgDec {
fn new() -> Self {
Self {
cat: gst::DebugCategory::new(
"cdgdec",
gst::DebugColorFlags::empty(),
Some("CDG decoder"),
),
cdg_inter: Mutex::new(cdg_renderer::CdgInterpreter::new()),
output_info: Mutex::new(None),
}
@ -185,12 +184,7 @@ impl VideoDecoderImpl for CdgDec {
}
}
gst_debug!(
self.cat,
obj: element,
"Finish frame pts={}",
frame.get_pts()
);
gst_debug!(CAT, obj: element, "Finish frame pts={}", frame.get_pts());
element.finish_frame(frame)
}

View file

@ -24,8 +24,14 @@ use crate::constants::{
const CDG_CMD_MEMORY_PRESET: u8 = 1;
struct CdgParse {
cat: gst::DebugCategory,
struct CdgParse;
lazy_static! {
static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
"cdgparse",
gst::DebugColorFlags::empty(),
Some("CDG parser"),
);
}
impl ObjectSubclass for CdgParse {
@ -37,13 +43,7 @@ impl ObjectSubclass for CdgParse {
glib_object_subclass!();
fn new() -> Self {
Self {
cat: gst::DebugCategory::new(
"cdgparse",
gst::DebugColorFlags::empty(),
Some("CDG parser"),
),
}
Self
}
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
@ -196,7 +196,7 @@ impl BaseParseImpl for CdgParse {
buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
}
gst_debug!(self.cat, obj: element, "Found frame pts={}", pts);
gst_debug!(CAT, obj: element, "Found frame pts={}", pts);
element.finish_frame(frame, CDG_PACKET_SIZE as u32)?;

View file

@ -12,6 +12,8 @@
extern crate glib;
#[macro_use]
extern crate gstreamer as gst;
#[macro_use]
extern crate lazy_static;
mod cdgdec;
mod cdgparse;

View file

@ -93,13 +93,20 @@ static PROPERTIES: [subclass::Property; 2] = [
];
struct MccEnc {
cat: gst::DebugCategory,
srcpad: gst::Pad,
sinkpad: gst::Pad,
state: Mutex<State>,
settings: Mutex<Settings>,
}
lazy_static! {
static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
"mccenc",
gst::DebugColorFlags::empty(),
Some("Mcc Encoder Element"),
);
}
impl MccEnc {
fn set_pad_functions(sinkpad: &gst::Pad, srcpad: &gst::Pad) {
sinkpad.set_chain_function(|pad, parent, buffer| {
@ -390,7 +397,7 @@ impl MccEnc {
element: &gst::Element,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_log!(self.cat, obj: pad, "Handling buffer {:?}", buffer);
gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer);
let mut state = self.state.lock().unwrap();
@ -414,7 +421,7 @@ impl MccEnc {
fn sink_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
gst_log!(CAT, obj: pad, "Handling event {:?}", event);
match event.view() {
EventView::Caps(ev) => {
@ -423,7 +430,7 @@ impl MccEnc {
let framerate = match s.get_some::<gst::Fraction>("framerate") {
Ok(framerate) => framerate,
Err(structure::GetError::FieldNotFound { .. }) => {
gst_error!(self.cat, obj: pad, "Caps without framerate");
gst_error!(CAT, obj: pad, "Caps without framerate");
return false;
}
err => panic!("MccEnc::sink_event caps: {:?}", err),
@ -457,10 +464,10 @@ impl MccEnc {
fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
gst_log!(CAT, obj: pad, "Handling event {:?}", event);
match event.view() {
EventView::Seek(_) => {
gst_log!(self.cat, obj: pad, "Dropping seek event");
gst_log!(CAT, obj: pad, "Dropping seek event");
false
}
_ => pad.event_default(Some(element), event),
@ -470,7 +477,7 @@ impl MccEnc {
fn src_query(&self, pad: &gst::Pad, element: &gst::Element, query: &mut gst::QueryRef) -> bool {
use gst::QueryView;
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
gst_log!(CAT, obj: pad, "Handling query {:?}", query);
match query.view_mut() {
QueryView::Seeking(mut q) => {
@ -505,11 +512,6 @@ impl ObjectSubclass for MccEnc {
MccEnc::set_pad_functions(&sinkpad, &srcpad);
Self {
cat: gst::DebugCategory::new(
"mccenc",
gst::DebugColorFlags::empty(),
Some("Mcc Encoder Element"),
),
srcpad,
sinkpad,
state: Mutex::new(State::default()),
@ -624,7 +626,7 @@ impl ElementImpl for MccEnc {
element: &gst::Element,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_trace!(self.cat, obj: element, "Changing state {:?}", transition);
gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
match transition {
gst::StateChange::ReadyToPaused | gst::StateChange::PausedToReady => {

View file

@ -19,6 +19,7 @@ gstreamer-audio = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs
gstreamer-video = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_14"] }
gtk = { git = "https://github.com/gtk-rs/gtk", optional = true }
gio = { git = "https://github.com/gtk-rs/gio", optional = true }
lazy_static = "1.0"
[dev-dependencies]
gstreamer-check = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_14"]}

View file

@ -34,7 +34,6 @@ use gst_video;
use std::sync::{Mutex, RwLock};
struct FallbackSwitch {
cat: gst::DebugCategory,
sinkpad: gst_base::AggregatorPad,
fallback_sinkpad: RwLock<Option<gst_base::AggregatorPad>>,
active_sinkpad: Mutex<gst::Pad>,
@ -43,6 +42,14 @@ struct FallbackSwitch {
settings: Mutex<Settings>,
}
lazy_static! {
static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
"fallbackswitch",
gst::DebugColorFlags::empty(),
Some("Fallback switch Element"),
);
}
#[derive(Debug)]
struct OutputState {
last_sinkpad_time: gst::ClockTime,
@ -116,10 +123,10 @@ impl FallbackSwitch {
fallback_sinkpad: Option<&gst_base::AggregatorPad>,
) -> Result<Option<(gst::Buffer, gst::Caps, bool)>, gst::FlowError> {
// If we got a buffer on the sinkpad just handle it
gst_debug!(self.cat, obj: agg, "Got buffer on sinkpad {:?}", buffer);
gst_debug!(CAT, obj: agg, "Got buffer on sinkpad {:?}", buffer);
if buffer.get_pts().is_none() {
gst_error!(self.cat, obj: agg, "Only buffers with PTS supported");
gst_error!(CAT, obj: agg, "Only buffers with PTS supported");
return Err(gst::FlowError::Error);
}
@ -128,7 +135,7 @@ impl FallbackSwitch {
.get_segment()
.downcast::<gst::ClockTime>()
.map_err(|_| {
gst_error!(self.cat, obj: agg, "Only TIME segments supported");
gst_error!(CAT, obj: agg, "Only TIME segments supported");
gst::FlowError::Error
})?;
@ -144,7 +151,7 @@ impl FallbackSwitch {
if pad_change {
if buffer.get_flags().contains(gst::BufferFlags::DELTA_UNIT) {
gst_info!(
self.cat,
CAT,
obj: agg,
"Can't change back to sinkpad, waiting for keyframe"
);
@ -156,7 +163,7 @@ impl FallbackSwitch {
return Ok(None);
}
gst_info!(self.cat, obj: agg, "Active pad changed to sinkpad");
gst_info!(CAT, obj: agg, "Active pad changed to sinkpad");
*active_sinkpad = self.sinkpad.clone().upcast();
}
drop(active_sinkpad);
@ -170,7 +177,7 @@ impl FallbackSwitch {
.get_segment()
.downcast::<gst::ClockTime>()
.map_err(|_| {
gst_error!(self.cat, obj: agg, "Only TIME segments supported");
gst_error!(CAT, obj: agg, "Only TIME segments supported");
gst::FlowError::Error
})?;
@ -180,7 +187,7 @@ impl FallbackSwitch {
|| fallback_segment.to_running_time(fallback_pts) <= state.last_sinkpad_time
{
gst_debug!(
self.cat,
CAT,
obj: agg,
"Dropping fallback buffer {:?}",
fallback_buffer
@ -213,15 +220,10 @@ impl FallbackSwitch {
.pop_buffer()
.ok_or(gst_base::AGGREGATOR_FLOW_NEED_DATA)?;
gst_debug!(
self.cat,
obj: agg,
"Got buffer on fallback sinkpad {:?}",
buffer
);
gst_debug!(CAT, obj: agg, "Got buffer on fallback sinkpad {:?}", buffer);
if buffer.get_pts().is_none() {
gst_error!(self.cat, obj: agg, "Only buffers with PTS supported");
gst_error!(CAT, obj: agg, "Only buffers with PTS supported");
return Err(gst::FlowError::Error);
}
@ -229,7 +231,7 @@ impl FallbackSwitch {
.get_segment()
.downcast::<gst::ClockTime>()
.map_err(|_| {
gst_error!(self.cat, obj: agg, "Only TIME segments supported");
gst_error!(CAT, obj: agg, "Only TIME segments supported");
gst::FlowError::Error
})?;
let running_time = fallback_segment.to_running_time(buffer.get_dts_or_pts());
@ -250,7 +252,7 @@ impl FallbackSwitch {
// Get the next one if this one is before the timeout
if state.last_sinkpad_time + settings.timeout > running_time {
gst_debug!(
self.cat,
CAT,
obj: agg,
"Timeout not reached yet: {} + {} > {}",
state.last_sinkpad_time,
@ -262,7 +264,7 @@ impl FallbackSwitch {
}
gst_debug!(
self.cat,
CAT,
obj: agg,
"Timeout reached: {} + {} <= {}",
state.last_sinkpad_time,
@ -275,7 +277,7 @@ impl FallbackSwitch {
if pad_change {
if buffer.get_flags().contains(gst::BufferFlags::DELTA_UNIT) {
gst_info!(
self.cat,
CAT,
obj: agg,
"Can't change to fallback sinkpad yet, waiting for keyframe"
);
@ -287,7 +289,7 @@ impl FallbackSwitch {
continue;
}
gst_info!(self.cat, obj: agg, "Active pad changed to fallback sinkpad");
gst_info!(CAT, obj: agg, "Active pad changed to fallback sinkpad");
*active_sinkpad = fallback_sinkpad.clone().upcast();
}
drop(active_sinkpad);
@ -316,7 +318,7 @@ impl FallbackSwitch {
let mut state = self.output_state.lock().unwrap();
let fallback_sinkpad = self.fallback_sinkpad.read().unwrap();
gst_debug!(self.cat, obj: agg, "Aggregate called: timeout {}", timeout);
gst_debug!(CAT, obj: agg, "Aggregate called: timeout {}", timeout);
if let Some(buffer) = self.sinkpad.pop_buffer() {
if let Some(res) =
@ -325,16 +327,12 @@ impl FallbackSwitch {
return Ok(res);
}
} else if self.sinkpad.is_eos() {
gst_log!(self.cat, obj: agg, "Sinkpad is EOS");
gst_log!(CAT, obj: agg, "Sinkpad is EOS");
return Err(gst::FlowError::Eos);
}
if let (false, Some(_)) = (timeout, &*fallback_sinkpad) {
gst_debug!(
self.cat,
obj: agg,
"Have fallback sinkpad but no timeout yet"
);
gst_debug!(CAT, obj: agg, "Have fallback sinkpad but no timeout yet");
Err(gst_base::AGGREGATOR_FLOW_NEED_DATA)
} else if let (true, Some(fallback_sinkpad)) = (timeout, &*fallback_sinkpad) {
@ -342,7 +340,7 @@ impl FallbackSwitch {
} else {
// Otherwise there's not much we can do at this point
gst_debug!(
self.cat,
CAT,
obj: agg,
"Got no buffer on sinkpad and have no fallback sinkpad"
);
@ -374,11 +372,6 @@ impl ObjectSubclass for FallbackSwitch {
.unwrap();
Self {
cat: gst::DebugCategory::new(
"fallbackswitch",
gst::DebugColorFlags::empty(),
Some("Fallback switch Element"),
),
sinkpad: sinkpad.clone(),
fallback_sinkpad: RwLock::new(None),
active_sinkpad: Mutex::new(sinkpad.upcast()),
@ -450,7 +443,7 @@ impl ObjectImpl for FallbackSwitch {
let mut settings = self.settings.lock().unwrap();
let timeout = value.get_some().expect("type checked upstream");
gst_info!(
self.cat,
CAT,
obj: agg,
"Changing timeout from {} to {}",
settings.timeout,
@ -493,13 +486,13 @@ impl ElementImpl for FallbackSwitch {
if templ != &fallback_sink_templ
|| (name.is_some() && name.as_ref().map(String::as_str) != Some("fallback_sink"))
{
gst_error!(self.cat, obj: agg, "Wrong pad template or name");
gst_error!(CAT, obj: agg, "Wrong pad template or name");
return None;
}
let mut fallback_sinkpad = self.fallback_sinkpad.write().unwrap();
if fallback_sinkpad.is_some() {
gst_error!(self.cat, obj: agg, "Already have a fallback sinkpad");
gst_error!(CAT, obj: agg, "Already have a fallback sinkpad");
return None;
}
@ -533,7 +526,7 @@ impl ElementImpl for FallbackSwitch {
drop(pad_states);
drop(fallback_sinkpad);
agg.remove_pad(pad).unwrap();
gst_debug!(self.cat, obj: agg, "Removed fallback sinkpad {:?}", pad);
gst_debug!(CAT, obj: agg, "Removed fallback sinkpad {:?}", pad);
}
}
}
@ -557,7 +550,7 @@ impl AggregatorImpl for FallbackSwitch {
match event.view() {
EventView::Gap(_) => {
gst_debug!(self.cat, obj: agg_pad, "Dropping gap event");
gst_debug!(CAT, obj: agg_pad, "Dropping gap event");
true
}
_ => self.parent_sink_event_pre_queue(agg, agg_pad, event),
@ -575,7 +568,7 @@ impl AggregatorImpl for FallbackSwitch {
match event.view() {
EventView::Caps(caps) => {
let caps = caps.get_caps_owned();
gst_debug!(self.cat, obj: agg_pad, "Received caps {}", caps);
gst_debug!(CAT, obj: agg_pad, "Received caps {}", caps);
let audio_info;
let video_info;
@ -618,14 +611,10 @@ impl AggregatorImpl for FallbackSwitch {
// to be its running time. We will then either output the buffer or drop it, depending on
// its distance from the last sinkpad time
if let Some(_) = self.sinkpad.peek_buffer() {
gst_debug!(
self.cat,
obj: agg,
"Have buffer on sinkpad, immediate timeout"
);
gst_debug!(CAT, obj: agg, "Have buffer on sinkpad, immediate timeout");
0.into()
} else if self.sinkpad.is_eos() {
gst_debug!(self.cat, obj: agg, "Sinkpad is EOS, immediate timeout");
gst_debug!(CAT, obj: agg, "Sinkpad is EOS, immediate timeout");
0.into()
} else if let Some((buffer, fallback_sinkpad)) = self
.fallback_sinkpad
@ -635,7 +624,7 @@ impl AggregatorImpl for FallbackSwitch {
.and_then(|p| p.peek_buffer().map(|buffer| (buffer, p)))
{
if buffer.get_pts().is_none() {
gst_error!(self.cat, obj: agg, "Only buffers with PTS supported");
gst_error!(CAT, obj: agg, "Only buffers with PTS supported");
// Trigger aggregate immediately to error out immediately
return 0.into();
}
@ -643,7 +632,7 @@ impl AggregatorImpl for FallbackSwitch {
let segment = match fallback_sinkpad.get_segment().downcast::<gst::ClockTime>() {
Ok(segment) => segment,
Err(_) => {
gst_error!(self.cat, obj: agg, "Only TIME segments supported");
gst_error!(CAT, obj: agg, "Only TIME segments supported");
// Trigger aggregate immediately to error out immediately
return 0.into();
}
@ -651,14 +640,14 @@ impl AggregatorImpl for FallbackSwitch {
let running_time = segment.to_running_time(buffer.get_dts_or_pts());
gst_debug!(
self.cat,
CAT,
obj: agg,
"Have buffer on fallback sinkpad, timeout at {}",
running_time
);
running_time
} else {
gst_debug!(self.cat, obj: agg, "Have no buffer at all yet");
gst_debug!(CAT, obj: agg, "Have no buffer at all yet");
gst::CLOCK_TIME_NONE
}
}
@ -674,14 +663,14 @@ impl AggregatorImpl for FallbackSwitch {
let segment = match agg_pad.get_segment().downcast::<gst::ClockTime>() {
Ok(segment) => segment,
Err(_) => {
gst_error!(self.cat, obj: agg, "Only TIME segments supported");
gst_error!(CAT, obj: agg, "Only TIME segments supported");
return Some(buffer);
}
};
let pts = buffer.get_pts();
if pts.is_none() {
gst_error!(self.cat, obj: agg, "Only buffers with PTS supported");
gst_error!(CAT, obj: agg, "Only buffers with PTS supported");
return Some(buffer);
}
@ -728,7 +717,7 @@ impl AggregatorImpl for FallbackSwitch {
};
gst_debug!(
self.cat,
CAT,
obj: agg_pad,
"Clipping buffer {:?} with PTS {} and duration {}",
buffer,
@ -765,14 +754,14 @@ impl AggregatorImpl for FallbackSwitch {
agg: &gst_base::Aggregator,
timeout: bool,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_debug!(self.cat, obj: agg, "Aggregate called: timeout {}", timeout);
gst_debug!(CAT, obj: agg, "Aggregate called: timeout {}", timeout);
let (mut buffer, active_caps, pad_change) = self.get_next_buffer(agg, timeout)?;
let current_src_caps = agg.get_static_pad("src").unwrap().get_current_caps();
if Some(&active_caps) != current_src_caps.as_ref() {
gst_info!(
self.cat,
CAT,
obj: agg,
"Caps change from {:?} to {:?}",
current_src_caps,
@ -786,7 +775,7 @@ impl AggregatorImpl for FallbackSwitch {
buffer.make_mut().set_flags(gst::BufferFlags::DISCONT);
}
gst_debug!(self.cat, obj: agg, "Finishing buffer {:?}", buffer);
gst_debug!(CAT, obj: agg, "Finishing buffer {:?}", buffer);
agg.finish_buffer(buffer)
}

View file

@ -39,6 +39,9 @@ mod gst_base {
pub use super::base::*;
}
#[macro_use]
extern crate lazy_static;
mod fallbackswitch;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {

View file

@ -11,6 +11,7 @@ url = "2"
glib = { git = "https://github.com/gtk-rs/glib" }
gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gstreamer-base = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
lazy_static = "1.0"
[lib]
name = "gstrsfile"

View file

@ -62,11 +62,18 @@ impl Default for State {
}
pub struct FileSink {
cat: gst::DebugCategory,
settings: Mutex<Settings>,
state: Mutex<State>,
}
lazy_static! {
static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
"rsfilesink",
gst::DebugColorFlags::empty(),
Some("File Sink"),
);
}
impl FileSink {
fn set_location(
&self,
@ -87,7 +94,7 @@ impl FileSink {
match settings.location {
Some(ref location_cur) => {
gst_info!(
self.cat,
CAT,
obj: element,
"Changing `location` from {:?} to {}",
location_cur,
@ -95,13 +102,13 @@ impl FileSink {
);
}
None => {
gst_info!(self.cat, obj: element, "Setting `location` to {}", location,);
gst_info!(CAT, obj: element, "Setting `location` to {}", location,);
}
}
Some(location)
}
None => {
gst_info!(self.cat, obj: element, "Resetting `location` to None",);
gst_info!(CAT, obj: element, "Resetting `location` to None",);
None
}
};
@ -120,11 +127,6 @@ impl ObjectSubclass for FileSink {
fn new() -> Self {
Self {
cat: gst::DebugCategory::new(
"rsfilesink",
gst::DebugColorFlags::empty(),
Some("File Sink"),
),
settings: Mutex::new(Default::default()),
state: Mutex::new(Default::default()),
}
@ -174,7 +176,7 @@ impl ObjectImpl for FileSink {
if let Err(err) = res {
gst_error!(
self.cat,
CAT,
obj: element,
"Failed to set property `location`: {}",
err
@ -229,10 +231,10 @@ impl BaseSinkImpl for FileSink {
]
)
})?;
gst_debug!(self.cat, obj: element, "Opened file {:?}", file);
gst_debug!(CAT, obj: element, "Opened file {:?}", file);
*state = State::Started { file, position: 0 };
gst_info!(self.cat, obj: element, "Started");
gst_info!(CAT, obj: element, "Started");
Ok(())
}
@ -247,7 +249,7 @@ impl BaseSinkImpl for FileSink {
}
*state = State::Stopped;
gst_info!(self.cat, obj: element, "Stopped");
gst_info!(CAT, obj: element, "Stopped");
Ok(())
}
@ -271,7 +273,7 @@ impl BaseSinkImpl for FileSink {
}
};
gst_trace!(self.cat, obj: element, "Rendering {:?}", buffer);
gst_trace!(CAT, obj: element, "Rendering {:?}", buffer);
let map = buffer.map_readable().ok_or_else(|| {
gst_element_error!(element, gst::CoreError::Failed, ["Failed to map buffer"]);
gst::FlowError::Error

View file

@ -62,11 +62,18 @@ impl Default for State {
}
pub struct FileSrc {
cat: gst::DebugCategory,
settings: Mutex<Settings>,
state: Mutex<State>,
}
lazy_static! {
static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
"rsfilesrc",
gst::DebugColorFlags::empty(),
Some("File Source"),
);
}
impl FileSrc {
fn set_location(
&self,
@ -101,7 +108,7 @@ impl FileSrc {
match settings.location {
Some(ref location_cur) => {
gst_info!(
self.cat,
CAT,
obj: element,
"Changing `location` from {:?} to {}",
location_cur,
@ -109,13 +116,13 @@ impl FileSrc {
);
}
None => {
gst_info!(self.cat, obj: element, "Setting `location to {}", location,);
gst_info!(CAT, obj: element, "Setting `location to {}", location,);
}
}
Some(location)
}
None => {
gst_info!(self.cat, obj: element, "Resetting `location` to None",);
gst_info!(CAT, obj: element, "Resetting `location` to None",);
None
}
};
@ -134,11 +141,6 @@ impl ObjectSubclass for FileSrc {
fn new() -> Self {
Self {
cat: gst::DebugCategory::new(
"rsfilesrc",
gst::DebugColorFlags::empty(),
Some("File Source"),
),
settings: Mutex::new(Default::default()),
state: Mutex::new(Default::default()),
}
@ -188,7 +190,7 @@ impl ObjectImpl for FileSrc {
if let Err(err) = res {
gst_error!(
self.cat,
CAT,
obj: element,
"Failed to set property `location`: {}",
err
@ -264,11 +266,11 @@ impl BaseSrcImpl for FileSrc {
)
})?;
gst_debug!(self.cat, obj: element, "Opened file {:?}", file);
gst_debug!(CAT, obj: element, "Opened file {:?}", file);
*state = State::Started { file, position: 0 };
gst_info!(self.cat, obj: element, "Started");
gst_info!(CAT, obj: element, "Started");
Ok(())
}
@ -284,7 +286,7 @@ impl BaseSrcImpl for FileSrc {
*state = State::Stopped;
gst_info!(self.cat, obj: element, "Stopped");
gst_info!(CAT, obj: element, "Stopped");
Ok(())
}

View file

@ -14,6 +14,8 @@ extern crate glib;
extern crate gstreamer as gst;
extern crate gstreamer_base as gst_base;
extern crate url;
#[macro_use]
extern crate lazy_static;
mod file_location;
mod filesink;

View file

@ -15,6 +15,7 @@ gstreamer-check = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs
lewton = { version = "0.9", default-features = false }
byte-slice-cast = "0.3"
atomic_refcell = "0.1"
lazy_static = "1.0"
[lib]
name = "gstlewton"

View file

@ -32,10 +32,17 @@ struct State {
}
struct LewtonDec {
cat: gst::DebugCategory,
state: AtomicRefCell<Option<State>>,
}
lazy_static! {
static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
"lewtondec",
gst::DebugColorFlags::empty(),
Some("lewton Vorbis decoder"),
);
}
impl ObjectSubclass for LewtonDec {
const NAME: &'static str = "LewtonDec";
type ParentType = gst_audio::AudioDecoder;
@ -46,11 +53,6 @@ impl ObjectSubclass for LewtonDec {
fn new() -> Self {
Self {
cat: gst::DebugCategory::new(
"lewtondec",
gst::DebugColorFlags::empty(),
Some("lewton Vorbis decoder"),
),
state: AtomicRefCell::new(None),
}
}
@ -123,7 +125,7 @@ impl AudioDecoderImpl for LewtonDec {
element: &gst_audio::AudioDecoder,
caps: &gst::Caps,
) -> Result<(), gst::LoggableError> {
gst_debug!(self.cat, obj: element, "Setting format {:?}", caps);
gst_debug!(CAT, obj: element, "Setting format {:?}", caps);
// When the caps are changing we require new headers
let mut state_guard = self.state.borrow_mut();
@ -142,7 +144,7 @@ impl AudioDecoderImpl for LewtonDec {
let streamheaders = streamheaders.as_slice();
if streamheaders.len() < 3 {
gst_debug!(
self.cat,
CAT,
obj: element,
"Not enough streamheaders, trying in-band"
);
@ -155,7 +157,7 @@ impl AudioDecoderImpl for LewtonDec {
if let (Ok(Some(ident_buf)), Ok(Some(comment_buf)), Ok(Some(setup_buf))) =
(ident_buf, comment_buf, setup_buf)
{
gst_debug!(self.cat, obj: element, "Got streamheader buffers");
gst_debug!(CAT, obj: element, "Got streamheader buffers");
state.header_bufs = (Some(ident_buf), Some(comment_buf), Some(setup_buf));
}
}
@ -164,7 +166,7 @@ impl AudioDecoderImpl for LewtonDec {
}
fn flush(&self, element: &gst_audio::AudioDecoder, _hard: bool) {
gst_debug!(self.cat, obj: element, "Flushing");
gst_debug!(CAT, obj: element, "Flushing");
let mut state_guard = self.state.borrow_mut();
if let Some(ref mut state) = *state_guard {
@ -177,7 +179,7 @@ impl AudioDecoderImpl for LewtonDec {
element: &gst_audio::AudioDecoder,
inbuf: Option<&gst::Buffer>,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_debug!(self.cat, obj: element, "Handling buffer {:?}", inbuf);
gst_debug!(CAT, obj: element, "Handling buffer {:?}", inbuf);
let inbuf = match inbuf {
None => return Ok(gst::FlowSuccess::Ok),
@ -185,7 +187,7 @@ impl AudioDecoderImpl for LewtonDec {
};
let inmap = inbuf.map_readable().ok_or_else(|| {
gst_error!(self.cat, obj: element, "Failed to buffer readable");
gst_error!(CAT, obj: element, "Failed to buffer readable");
gst::FlowError::Error
})?;
@ -199,11 +201,7 @@ impl AudioDecoderImpl for LewtonDec {
if state.headerset.is_some() {
return Ok(gst::FlowSuccess::Ok);
} else {
gst_error!(
self.cat,
obj: element,
"Got empty packet before all headers"
);
gst_error!(CAT, obj: element, "Got empty packet before all headers");
return Err(gst::FlowError::Error);
}
}
@ -232,30 +230,26 @@ impl LewtonDec {
) -> Result<gst::FlowSuccess, gst::FlowError> {
// ident header
if indata[0] == 0x01 {
gst_debug!(self.cat, obj: element, "Got ident header buffer");
gst_debug!(CAT, obj: element, "Got ident header buffer");
state.header_bufs = (Some(inbuf.clone()), None, None);
} else if indata[0] == 0x03 {
// comment header
if state.header_bufs.0.is_none() {
gst_warning!(
self.cat,
obj: element,
"Got comment header before ident header"
);
gst_warning!(CAT, obj: element, "Got comment header before ident header");
} else {
gst_debug!(self.cat, obj: element, "Got comment header buffer");
gst_debug!(CAT, obj: element, "Got comment header buffer");
state.header_bufs.1 = Some(inbuf.clone());
}
} else if indata[0] == 0x05 {
// setup header
if state.header_bufs.0.is_none() || state.header_bufs.1.is_none() {
gst_warning!(
self.cat,
CAT,
obj: element,
"Got setup header before ident/comment header"
);
} else {
gst_debug!(self.cat, obj: element, "Got setup header buffer");
gst_debug!(CAT, obj: element, "Got setup header buffer");
state.header_bufs.2 = Some(inbuf.clone());
}
}
@ -284,11 +278,7 @@ impl LewtonDec {
// First try to parse the headers
let ident_map = ident_buf.map_readable().ok_or_else(|| {
gst_error!(
self.cat,
obj: element,
"Failed to map ident buffer readable"
);
gst_error!(CAT, obj: element, "Failed to map ident buffer readable");
gst::FlowError::Error
})?;
let ident = lewton::header::read_header_ident(ident_map.as_ref()).map_err(|err| {
@ -301,11 +291,7 @@ impl LewtonDec {
})?;
let comment_map = comment_buf.map_readable().ok_or_else(|| {
gst_error!(
self.cat,
obj: element,
"Failed to map comment buffer readable"
);
gst_error!(CAT, obj: element, "Failed to map comment buffer readable");
gst::FlowError::Error
})?;
let comment = lewton::header::read_header_comment(comment_map.as_ref()).map_err(|err| {
@ -318,11 +304,7 @@ impl LewtonDec {
})?;
let setup_map = setup_buf.map_readable().ok_or_else(|| {
gst_error!(
self.cat,
obj: element,
"Failed to map setup buffer readable"
);
gst_error!(CAT, obj: element, "Failed to map setup buffer readable");
gst::FlowError::Error
})?;
let setup = lewton::header::read_header_setup(
@ -359,7 +341,7 @@ impl LewtonDec {
let mut map = [0; 8];
if let Err(_) = gst_audio::get_channel_reorder_map(from, to, &mut map[..channels]) {
gst_error!(
self.cat,
CAT,
obj: element,
"Failed to generate channel reorder map from {:?} to {:?}",
from,
@ -375,7 +357,7 @@ impl LewtonDec {
let audio_info = audio_info.build().unwrap();
gst_debug!(
self.cat,
CAT,
obj: element,
"Successfully parsed headers: {:?}",
audio_info
@ -430,12 +412,7 @@ impl LewtonDec {
}
let sample_count = decoded.samples.len() / audio_info.channels() as usize;
gst_debug!(
self.cat,
obj: element,
"Got {} decoded samples",
sample_count
);
gst_debug!(CAT, obj: element, "Got {} decoded samples", sample_count);
if sample_count == 0 {
return element.finish_frame(None, 1);

View file

@ -14,6 +14,8 @@ extern crate glib;
extern crate gstreamer as gst;
#[macro_use]
extern crate gstreamer_audio as gst_audio;
#[macro_use]
extern crate lazy_static;
mod lewtondec;

View file

@ -14,6 +14,7 @@ gstreamer-video = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs
gstreamer-check = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
rav1e = { git = "https://github.com/xiph/rav1e.git", default-features=false }
atomic_refcell = "0.1"
lazy_static = "1.0"
[lib]
name = "gstrav1e"

View file

@ -12,6 +12,8 @@
extern crate glib;
#[macro_use]
extern crate gstreamer as gst;
#[macro_use]
extern crate lazy_static;
mod rav1enc;

View file

@ -285,11 +285,18 @@ struct State {
}
struct Rav1Enc {
cat: gst::DebugCategory,
state: AtomicRefCell<Option<State>>,
settings: Mutex<Settings>,
}
lazy_static! {
static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
"rav1enc",
gst::DebugColorFlags::empty(),
Some("rav1e AV1 encoder"),
);
}
impl ObjectSubclass for Rav1Enc {
const NAME: &'static str = "Rav1Enc";
type ParentType = gst_video::VideoEncoder;
@ -300,11 +307,6 @@ impl ObjectSubclass for Rav1Enc {
fn new() -> Self {
Self {
cat: gst::DebugCategory::new(
"rav1enc",
gst::DebugColorFlags::empty(),
Some("rav1e AV1 encoder"),
),
state: AtomicRefCell::new(None),
settings: Mutex::new(Default::default()),
}
@ -488,10 +490,10 @@ impl VideoEncoderImpl for Rav1Enc {
state: &gst_video::VideoCodecState<gst_video::video_codec_state::Readable>,
) -> Result<(), gst::LoggableError> {
self.finish(element)
.map_err(|_| gst_loggable_error!(self.cat, "Failed to drain"))?;
.map_err(|_| gst_loggable_error!(CAT, "Failed to drain"))?;
let video_info = state.get_info();
gst_debug!(self.cat, obj: element, "Setting format {:?}", video_info);
gst_debug!(CAT, obj: element, "Setting format {:?}", video_info);
let settings = self.settings.lock().unwrap();
@ -610,11 +612,11 @@ impl VideoEncoderImpl for Rav1Enc {
*self.state.borrow_mut() = Some(State {
context: if video_info.format_info().depth()[0] > 8 {
Context::Sixteen(cfg.new_context().map_err(|err| {
gst_loggable_error!(self.cat, "Failed to create context: {:?}", err)
gst_loggable_error!(CAT, "Failed to create context: {:?}", err)
})?)
} else {
Context::Eight(cfg.new_context().map_err(|err| {
gst_loggable_error!(self.cat, "Failed to create context: {:?}", err)
gst_loggable_error!(CAT, "Failed to create context: {:?}", err)
})?)
},
video_info: video_info.clone(),
@ -622,16 +624,16 @@ impl VideoEncoderImpl for Rav1Enc {
let output_state = element
.set_output_state(gst::Caps::new_simple("video/x-av1", &[]), Some(state))
.map_err(|_| gst_loggable_error!(self.cat, "Failed to set output state"))?;
.map_err(|_| gst_loggable_error!(CAT, "Failed to set output state"))?;
element
.negotiate(output_state)
.map_err(|_| gst_loggable_error!(self.cat, "Failed to negotiate"))?;
.map_err(|_| gst_loggable_error!(CAT, "Failed to negotiate"))?;
self.parent_set_format(element, state)
}
fn flush(&self, element: &gst_video::VideoEncoder) -> bool {
gst_debug!(self.cat, obj: element, "Flushing");
gst_debug!(CAT, obj: element, "Flushing");
let mut state_guard = self.state.borrow_mut();
if let Some(ref mut state) = *state_guard {
@ -639,7 +641,7 @@ impl VideoEncoderImpl for Rav1Enc {
loop {
match state.context.receive_packet() {
Ok(_) | Err(data::EncoderStatus::Encoded) => {
gst_debug!(self.cat, obj: element, "Dropping packet on flush",);
gst_debug!(CAT, obj: element, "Dropping packet on flush",);
}
_ => break,
}
@ -653,7 +655,7 @@ impl VideoEncoderImpl for Rav1Enc {
&self,
element: &gst_video::VideoEncoder,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_debug!(self.cat, obj: element, "Finishing");
gst_debug!(CAT, obj: element, "Finishing");
let mut state_guard = self.state.borrow_mut();
if let Some(ref mut state) = *state_guard {
@ -678,7 +680,7 @@ impl VideoEncoderImpl for Rav1Enc {
self.output_frames(element, state)?;
gst_debug!(
self.cat,
CAT,
obj: element,
"Sending frame {}",
frame.get_system_frame_number()
@ -707,7 +709,7 @@ impl VideoEncoderImpl for Rav1Enc {
) {
Ok(_) => {
gst_debug!(
self.cat,
CAT,
obj: element,
"Sent frame {}",
frame.get_system_frame_number()
@ -734,7 +736,7 @@ impl Rav1Enc {
match state.context.receive_packet() {
Ok((packet_type, packet_number, packet_data)) => {
gst_debug!(
self.cat,
CAT,
obj: element,
"Received packet {} of size {}, frame type {:?}",
packet_number,
@ -751,7 +753,7 @@ impl Rav1Enc {
element.finish_frame(Some(frame))?;
}
Err(data::EncoderStatus::Encoded) => {
gst_debug!(self.cat, obj: element, "Encoded but not output frame yet",);
gst_debug!(CAT, obj: element, "Encoded but not output frame yet",);
}
Err(data::EncoderStatus::Failure) => {
gst_element_error!(
@ -763,7 +765,7 @@ impl Rav1Enc {
}
Err(err) => {
gst_debug!(
self.cat,
CAT,
obj: element,
"Soft error when receiving frame: {:?}",
err

View file

@ -17,6 +17,7 @@ gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", fea
gstreamer-base = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
tokio = "0.1"
bytes = "0.4"
lazy_static = "1.0"
[dev-dependencies]
hyper = "0.12"

View file

@ -19,6 +19,8 @@ extern crate hyperx;
extern crate reqwest;
extern crate tokio;
extern crate url;
#[macro_use]
extern crate lazy_static;
mod reqwesthttpsrc;

View file

@ -220,7 +220,6 @@ impl Default for State {
#[derive(Debug)]
pub struct ReqwestHttpSrc {
cat: gst::DebugCategory,
client: Mutex<Option<ClientContext>>,
external_client: Mutex<Option<ClientContext>>,
settings: Mutex<Settings>,
@ -229,6 +228,14 @@ pub struct ReqwestHttpSrc {
canceller: Mutex<Option<oneshot::Sender<Bytes>>>,
}
lazy_static! {
static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
"reqwesthttpsrc",
gst::DebugColorFlags::empty(),
Some("Rust HTTP source"),
);
}
impl ReqwestHttpSrc {
fn set_location(
&self,
@ -273,7 +280,7 @@ impl ReqwestHttpSrc {
fn ensure_client(&self, src: &gst_base::BaseSrc) -> Result<ClientContext, gst::ErrorMessage> {
let mut client_guard = self.client.lock().unwrap();
if let Some(ref client) = *client_guard {
gst_debug!(self.cat, obj: src, "Using already configured client");
gst_debug!(CAT, obj: src, "Using already configured client");
return Ok(client.clone());
}
@ -299,13 +306,13 @@ impl ReqwestHttpSrc {
drop(external_client);
client
} {
gst_debug!(self.cat, obj: src, "Using shared client");
gst_debug!(CAT, obj: src, "Using shared client");
*client_guard = Some(client.clone());
return Ok(client);
}
gst_debug!(self.cat, obj: src, "Creating new client");
gst_debug!(CAT, obj: src, "Creating new client");
let client = ClientContext(Arc::new(ClientContextInner {
client: Client::builder()
.cookie_store(true)
@ -319,7 +326,7 @@ impl ReqwestHttpSrc {
})?,
}));
gst_debug!(self.cat, obj: src, "Sharing new client with other elements");
gst_debug!(CAT, obj: src, "Sharing new client with other elements");
let mut context = gst::Context::new(REQWEST_CLIENT_CONTEXT, true);
{
let context = context.get_mut().unwrap();
@ -351,7 +358,7 @@ impl ReqwestHttpSrc {
RangeUnit, RawLike, UserAgent,
};
gst_debug!(self.cat, obj: src, "Creating new request for {}", uri);
gst_debug!(CAT, obj: src, "Creating new request for {}", uri);
let req = {
let client = self.ensure_client(src)?;
@ -391,7 +398,7 @@ impl ReqwestHttpSrc {
if let Some(value) = value.transform::<String>() {
let value = value.get::<&str>().unwrap().unwrap_or("");
gst_debug!(
self.cat,
CAT,
obj: src,
"Appending extra-header: {}: {}",
field,
@ -400,7 +407,7 @@ impl ReqwestHttpSrc {
headers.append_raw(String::from(field), value);
} else {
gst_warning!(
self.cat,
CAT,
obj: src,
"Failed to transform extra-header '{}' to string",
field
@ -412,7 +419,7 @@ impl ReqwestHttpSrc {
if let Some(value) = value.transform::<String>() {
let value = value.get::<&str>().unwrap().unwrap_or("");
gst_debug!(
self.cat,
CAT,
obj: src,
"Appending extra-header: {}: {}",
field,
@ -421,7 +428,7 @@ impl ReqwestHttpSrc {
headers.append_raw(String::from(field), value);
} else {
gst_warning!(
self.cat,
CAT,
obj: src,
"Failed to transform extra-header '{}' to string",
field
@ -431,7 +438,7 @@ impl ReqwestHttpSrc {
} else if let Some(value) = value.transform::<String>() {
let value = value.get::<&str>().unwrap().unwrap_or("");
gst_debug!(
self.cat,
CAT,
obj: src,
"Appending extra-header: {}: {}",
field,
@ -440,7 +447,7 @@ impl ReqwestHttpSrc {
headers.append_raw(String::from(field), value);
} else {
gst_warning!(
self.cat,
CAT,
obj: src,
"Failed to transform extra-header '{}' to string",
field
@ -476,7 +483,7 @@ impl ReqwestHttpSrc {
req
};
gst_debug!(self.cat, obj: src, "Sending new request: {:?}", req);
gst_debug!(CAT, obj: src, "Sending new request: {:?}", req);
let uri_clone = uri.clone();
let res = self.wait(req.send().map_err(move |err| {
@ -489,21 +496,21 @@ impl ReqwestHttpSrc {
let mut res = match res {
Ok(res) => res,
Err(Some(err)) => {
gst_debug!(self.cat, obj: src, "Error {:?}", err);
gst_debug!(CAT, obj: src, "Error {:?}", err);
return Err(Some(err));
}
Err(None) => {
gst_debug!(self.cat, obj: src, "Flushing");
gst_debug!(CAT, obj: src, "Flushing");
return Err(None);
}
};
gst_debug!(self.cat, obj: src, "Received response: {:?}", res);
gst_debug!(CAT, obj: src, "Received response: {:?}", res);
if !res.status().is_success() {
match res.status() {
StatusCode::NOT_FOUND => {
gst_error!(self.cat, obj: src, "Resource not found");
gst_error!(CAT, obj: src, "Resource not found");
return Err(Some(gst_error_msg!(
gst::ResourceError::NotFound,
["Resource '{}' not found", uri]
@ -513,14 +520,14 @@ impl ReqwestHttpSrc {
| StatusCode::PAYMENT_REQUIRED
| StatusCode::FORBIDDEN
| StatusCode::PROXY_AUTHENTICATION_REQUIRED => {
gst_error!(self.cat, obj: src, "Not authorized: {}", res.status());
gst_error!(CAT, obj: src, "Not authorized: {}", res.status());
return Err(Some(gst_error_msg!(
gst::ResourceError::NotAuthorized,
["Not Authorized for resource '{}': {}", uri, res.status()]
)));
}
_ => {
gst_error!(self.cat, obj: src, "Request failed: {}", res.status());
gst_error!(CAT, obj: src, "Request failed: {}", res.status());
return Err(Some(gst_error_msg!(
gst::ResourceError::OpenRead,
["Request for '{}' failed: {}", uri, res.status()]
@ -569,7 +576,7 @@ impl ReqwestHttpSrc {
});
if let Some(ContentType(ref content_type)) = headers.get() {
gst_debug!(self.cat, obj: src, "Got content type {}", content_type);
gst_debug!(CAT, obj: src, "Got content type {}", content_type);
if let Some(ref mut caps) = caps {
let caps = caps.get_mut().unwrap();
let s = caps.get_mut_structure(0).unwrap();
@ -624,7 +631,7 @@ impl ReqwestHttpSrc {
}
}
gst_debug!(self.cat, obj: src, "Request successful");
gst_debug!(CAT, obj: src, "Request successful");
let body = mem::replace(res.body_mut(), Decoder::empty());
@ -706,7 +713,7 @@ impl ObjectImpl for ReqwestHttpSrc {
let location = value.get::<&str>().expect("type checked upstream");
if let Err(err) = self.set_location(element, location) {
gst_error!(
self.cat,
CAT,
obj: element,
"Failed to set property `location`: {:?}",
err
@ -892,7 +899,7 @@ impl BaseSrcImpl for ReqwestHttpSrc {
})
.map(|uri| uri.clone())?;
gst_debug!(self.cat, obj: src, "Starting for URI {}", uri);
gst_debug!(CAT, obj: src, "Starting for URI {}", uri);
*state = self.do_request(src, uri, 0, None).map_err(|err| {
err.unwrap_or_else(|| {
@ -904,7 +911,7 @@ impl BaseSrcImpl for ReqwestHttpSrc {
}
fn stop(&self, src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: src, "Stopping");
gst_debug!(CAT, obj: src, "Stopping");
*self.state.lock().unwrap() = State::Stopped;
Ok(())
@ -950,10 +957,10 @@ impl BaseSrcImpl for ReqwestHttpSrc {
let start = segment.get_start().expect("No start position given");
let stop = segment.get_stop();
gst_debug!(self.cat, obj: src, "Seeking to {}-{:?}", start, stop);
gst_debug!(CAT, obj: src, "Seeking to {}-{:?}", start, stop);
if position == start && old_stop == stop.0 {
gst_debug!(self.cat, obj: src, "No change to current request");
gst_debug!(CAT, obj: src, "No change to current request");
return true;
}
@ -1007,7 +1014,7 @@ impl BaseSrcImpl for ReqwestHttpSrc {
let current_body = match body.take() {
Some(body) => body,
None => {
gst_error!(self.cat, obj: src, "Don't have a response body");
gst_error!(CAT, obj: src, "Don't have a response body");
gst_element_error!(
src,
gst::ResourceError::Read,
@ -1023,13 +1030,13 @@ impl BaseSrcImpl for ReqwestHttpSrc {
drop(state);
if let Some(caps) = caps {
gst_debug!(self.cat, obj: src, "Setting caps {:?}", caps);
gst_debug!(CAT, obj: src, "Setting caps {:?}", caps);
src.set_caps(&caps)
.map_err(|_| gst::FlowError::NotNegotiated)?;
}
if let Some(tags) = tags {
gst_debug!(self.cat, obj: src, "Sending iradio tags {:?}", tags);
gst_debug!(CAT, obj: src, "Sending iradio tags {:?}", tags);
let pad = src.get_static_pad("src").unwrap();
pad.push_event(gst::Event::new_tag(tags).build());
}
@ -1044,12 +1051,12 @@ impl BaseSrcImpl for ReqwestHttpSrc {
let res = match res {
Ok(res) => res,
Err(Some(err)) => {
gst_debug!(self.cat, obj: src, "Error {:?}", err);
gst_debug!(CAT, obj: src, "Error {:?}", err);
src.post_error_message(&err);
return Err(gst::FlowError::Error);
}
Err(None) => {
gst_debug!(self.cat, obj: src, "Flushing");
gst_debug!(CAT, obj: src, "Flushing");
return Err(gst::FlowError::Flushing);
}
};
@ -1073,7 +1080,7 @@ impl BaseSrcImpl for ReqwestHttpSrc {
/* do something with the chunk and store the body again in the state */
gst_trace!(
self.cat,
CAT,
obj: src,
"Chunk of {} bytes received at offset {}",
chunk.len(),
@ -1098,7 +1105,7 @@ impl BaseSrcImpl for ReqwestHttpSrc {
}
(None, current_body) => {
/* No further data, end of stream */
gst_debug!(self.cat, obj: src, "End of stream");
gst_debug!(CAT, obj: src, "End of stream");
*body = Some(current_body);
Err(gst::FlowError::Eos)
}
@ -1138,11 +1145,6 @@ impl ObjectSubclass for ReqwestHttpSrc {
fn new() -> Self {
Self {
cat: gst::DebugCategory::new(
"reqwesthttpsrc",
gst::DebugColorFlags::empty(),
Some("Rust HTTP source"),
),
client: Mutex::new(None),
external_client: Mutex::new(None),
settings: Mutex::new(Default::default()),

View file

@ -18,6 +18,7 @@ rusoto_s3 = "0.41"
url = "2"
percent-encoding = "2"
tokio = "0.1"
lazy_static = "1.0"
[lib]
name = "gstrusoto"

View file

@ -13,6 +13,8 @@ extern crate glib;
#[macro_use]
extern crate gstreamer as gst;
extern crate gstreamer_base as gst_base;
#[macro_use]
extern crate lazy_static;
mod s3sink;
mod s3src;

View file

@ -95,12 +95,19 @@ struct Settings {
pub struct S3Sink {
settings: Mutex<Settings>,
state: Mutex<State>,
cat: gst::DebugCategory,
runtime: runtime::Runtime,
canceller: Mutex<Option<oneshot::Sender<()>>>,
client: Mutex<S3Client>,
}
lazy_static! {
static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
"rusotos3sink",
gst::DebugColorFlags::empty(),
Some("Amazon S3 Sink"),
);
}
impl Default for Settings {
fn default() -> Self {
Settings {
@ -187,7 +194,7 @@ impl S3Sink {
e_tag: output.e_tag.clone(),
part_number: Some(part_number),
});
gst_info!(self.cat, obj: element, "Uploaded part {}", part_number);
gst_info!(CAT, obj: element, "Uploaded part {}", part_number);
Ok(())
}
@ -386,11 +393,6 @@ impl ObjectSubclass for S3Sink {
Self {
settings: Mutex::new(Default::default()),
state: Mutex::new(Default::default()),
cat: gst::DebugCategory::new(
"rusotos3sink",
gst::DebugColorFlags::empty(),
Some("Amazon S3 Sink"),
),
canceller: Mutex::new(None),
runtime: runtime::Builder::new()
.core_threads(1)
@ -489,7 +491,7 @@ impl BaseSinkImpl for S3Sink {
fn stop(&self, element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
*state = State::Stopped;
gst_info!(self.cat, obj: element, "Stopped");
gst_info!(CAT, obj: element, "Stopped");
Ok(())
}
@ -504,7 +506,7 @@ impl BaseSinkImpl for S3Sink {
return Err(gst::FlowError::Error);
}
gst_trace!(self.cat, obj: element, "Rendering {:?}", buffer);
gst_trace!(CAT, obj: element, "Rendering {:?}", buffer);
let map = buffer.map_readable().ok_or_else(|| {
gst_element_error!(element, gst::CoreError::Failed, ["Failed to map buffer"]);
gst::FlowError::Error
@ -515,7 +517,7 @@ impl BaseSinkImpl for S3Sink {
Err(err) => match err {
Some(error_message) => {
gst_error!(
self.cat,
CAT,
obj: element,
"Multipart upload failed: {}",
error_message
@ -524,7 +526,7 @@ impl BaseSinkImpl for S3Sink {
Err(gst::FlowError::Error)
}
_ => {
gst_info!(self.cat, obj: element, "Upload interrupted. Flushing...");
gst_info!(CAT, obj: element, "Upload interrupted. Flushing...");
Err(gst::FlowError::Flushing)
}
},
@ -542,7 +544,7 @@ impl BaseSinkImpl for S3Sink {
gst::EventView::Eos(_) => {
if let Err(error_message) = self.finalize_upload(element) {
gst_error!(
self.cat,
CAT,
obj: element,
"Failed to finalize the upload: {}",
error_message

View file

@ -41,11 +41,18 @@ enum StreamingState {
pub struct S3Src {
url: Mutex<Option<GstS3Url>>,
state: Mutex<StreamingState>,
cat: gst::DebugCategory,
runtime: runtime::Runtime,
canceller: Mutex<Option<oneshot::Sender<Bytes>>>,
}
lazy_static! {
static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
"rusotos3src",
gst::DebugColorFlags::empty(),
Some("Amazon S3 Source"),
);
}
static PROPERTIES: [subclass::Property; 1] = [subclass::Property("uri", |name| {
glib::ParamSpec::string(
name,
@ -136,12 +143,7 @@ impl S3Src {
})?;
if let Some(size) = output.content_length {
gst_info!(
self.cat,
obj: src,
"HEAD success, content length = {}",
size
);
gst_info!(CAT, obj: src, "HEAD success, content length = {}", size);
Ok(size as u64)
} else {
Err(gst_error_msg!(
@ -183,7 +185,7 @@ impl S3Src {
};
gst_debug!(
self.cat,
CAT,
obj: src,
"Requesting range: {}-{}",
offset,
@ -205,7 +207,7 @@ impl S3Src {
)?;
gst_debug!(
self.cat,
CAT,
obj: src,
"Read {} bytes",
output.content_length.unwrap()
@ -233,11 +235,6 @@ impl ObjectSubclass for S3Src {
Self {
url: Mutex::new(None),
state: Mutex::new(StreamingState::Stopped),
cat: gst::DebugCategory::new(
"rusotos3src",
gst::DebugColorFlags::empty(),
Some("Amazon S3 Source"),
),
runtime: runtime::Builder::new()
.core_threads(1)
.name_prefix("rusotos3src-runtime")
@ -430,7 +427,7 @@ impl BaseSrcImpl for S3Src {
Err(None) => Err(gst::FlowError::Flushing),
/* Actual Error */
Err(Some(err)) => {
gst_error!(self.cat, obj: src, "Could not GET: {}", err);
gst_error!(CAT, obj: src, "Could not GET: {}", err);
Err(gst::FlowError::Error)
}
}

View file

@ -138,12 +138,19 @@ impl Default for State {
}
struct AppSrc {
cat: gst::DebugCategory,
src_pad: gst::Pad,
state: Mutex<State>,
settings: Mutex<Settings>,
}
lazy_static! {
static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
"ts-appsrc",
gst::DebugColorFlags::empty(),
Some("Thread-sharing app source"),
);
}
impl AppSrc {
fn create_io_context_event(state: &State) -> Option<gst::Event> {
if let (&Some(ref pending_future_id), &Some(ref io_context)) =
@ -165,7 +172,7 @@ impl AppSrc {
fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
gst_log!(CAT, obj: pad, "Handling event {:?}", event);
let ret = match event.view() {
EventView::FlushStart(..) => {
@ -187,9 +194,9 @@ impl AppSrc {
};
if ret {
gst_log!(self.cat, obj: pad, "Handled event {:?}", event);
gst_log!(CAT, obj: pad, "Handled event {:?}", event);
} else {
gst_log!(self.cat, obj: pad, "Didn't handle event {:?}", event);
gst_log!(CAT, obj: pad, "Didn't handle event {:?}", event);
}
ret
@ -203,7 +210,7 @@ impl AppSrc {
) -> bool {
use gst::QueryView;
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
gst_log!(CAT, obj: pad, "Handling query {:?}", query);
let ret = match query.view_mut() {
QueryView::Latency(ref mut q) => {
q.set(true, 0.into(), 0.into());
@ -234,9 +241,9 @@ impl AppSrc {
};
if ret {
gst_log!(self.cat, obj: pad, "Handled query {:?}", query);
gst_log!(CAT, obj: pad, "Handled query {:?}", query);
} else {
gst_log!(self.cat, obj: pad, "Didn't handle query {:?}", query);
gst_log!(CAT, obj: pad, "Didn't handle query {:?}", query);
}
ret
}
@ -253,7 +260,7 @@ impl AppSrc {
buffer.set_dts(now - base_time);
buffer.set_pts(gst::CLOCK_TIME_NONE);
} else {
gst_error!(self.cat, obj: element, "Don't have a clock yet");
gst_error!(CAT, obj: element, "Don't have a clock yet");
return false;
}
}
@ -263,7 +270,7 @@ impl AppSrc {
match channel.try_send(Either::Left(buffer)) {
Ok(_) => true,
Err(err) => {
gst_error!(self.cat, obj: element, "Failed to queue buffer: {}", err);
gst_error!(CAT, obj: element, "Failed to queue buffer: {}", err);
false
}
}
@ -278,7 +285,7 @@ impl AppSrc {
match channel.try_send(Either::Right(gst::Event::new_eos().build())) {
Ok(_) => true,
Err(err) => {
gst_error!(self.cat, obj: element, "Failed to queue EOS: {}", err);
gst_error!(CAT, obj: element, "Failed to queue EOS: {}", err);
false
}
}
@ -298,7 +305,7 @@ impl AppSrc {
let mut events = Vec::new();
let mut state = self.state.lock().unwrap();
if state.need_initial_events {
gst_debug!(self.cat, obj: element, "Pushing initial events");
gst_debug!(CAT, obj: element, "Pushing initial events");
let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
events.push(gst::Event::new_stream_start(&stream_id).build());
@ -330,11 +337,11 @@ impl AppSrc {
let res = match item {
Either::Left(buffer) => {
gst_log!(self.cat, obj: element, "Forwarding buffer {:?}", buffer);
gst_log!(CAT, obj: element, "Forwarding buffer {:?}", buffer);
self.src_pad.push(buffer).map(|_| ())
}
Either::Right(event) => {
gst_log!(self.cat, obj: element, "Forwarding event {:?}", event);
gst_log!(CAT, obj: element, "Forwarding event {:?}", event);
self.src_pad.push_event(event);
Ok(())
}
@ -342,15 +349,15 @@ impl AppSrc {
let res = match res {
Ok(_) => {
gst_log!(self.cat, obj: element, "Successfully pushed item");
gst_log!(CAT, obj: element, "Successfully pushed item");
Ok(())
}
Err(gst::FlowError::Flushing) | Err(gst::FlowError::Eos) => {
gst_debug!(self.cat, obj: element, "EOS");
gst_debug!(CAT, obj: element, "EOS");
Err(())
}
Err(err) => {
gst_error!(self.cat, obj: element, "Got error {}", err);
gst_error!(CAT, obj: element, "Got error {}", err);
gst_element_error!(
element,
gst::StreamError::Failed,
@ -385,7 +392,7 @@ impl AppSrc {
}
fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Preparing");
gst_debug!(CAT, obj: element, "Preparing");
let settings = self.settings.lock().unwrap().clone();
@ -401,7 +408,7 @@ impl AppSrc {
let pending_future_id = io_context.acquire_pending_future_id();
gst_debug!(
self.cat,
CAT,
obj: element,
"Got pending future id {:?}",
pending_future_id
@ -410,13 +417,13 @@ impl AppSrc {
state.io_context = Some(io_context);
state.pending_future_id = Some(pending_future_id);
gst_debug!(self.cat, obj: element, "Prepared");
gst_debug!(CAT, obj: element, "Prepared");
Ok(())
}
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Unpreparing");
gst_debug!(CAT, obj: element, "Unpreparing");
// FIXME: The IO Context has to be alive longer than the other parts
// of the state. Otherwise a deadlock can happen between shutting down
@ -438,13 +445,13 @@ impl AppSrc {
drop(io_context);
gst_debug!(self.cat, obj: element, "Unprepared");
gst_debug!(CAT, obj: element, "Unprepared");
Ok(())
}
fn start(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Starting");
gst_debug!(CAT, obj: element, "Starting");
let settings = self.settings.lock().unwrap().clone();
let mut state = self.state.lock().unwrap();
@ -466,19 +473,19 @@ impl AppSrc {
io_context.spawn(future);
*channel = Some(channel_sender);
gst_debug!(self.cat, obj: element, "Started");
gst_debug!(CAT, obj: element, "Started");
Ok(())
}
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Stopping");
gst_debug!(CAT, obj: element, "Stopping");
let mut state = self.state.lock().unwrap();
let _ = state.channel.take();
let _ = state.pending_future_cancel.take();
gst_debug!(self.cat, obj: element, "Stopped");
gst_debug!(CAT, obj: element, "Stopped");
Ok(())
}
@ -569,11 +576,6 @@ impl ObjectSubclass for AppSrc {
});
Self {
cat: gst::DebugCategory::new(
"ts-appsrc",
gst::DebugColorFlags::empty(),
Some("Thread-sharing app source"),
),
src_pad,
state: Mutex::new(State::default()),
settings: Mutex::new(Settings::default()),
@ -659,7 +661,7 @@ impl ElementImpl for AppSrc {
element: &gst::Element,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_trace!(self.cat, obj: element, "Changing state {:?}", transition);
gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
match transition {
gst::StateChange::NullToReady => {

View file

@ -241,13 +241,20 @@ impl Default for State {
}
struct JitterBuffer {
cat: gst::DebugCategory,
sink_pad: gst::Pad,
src_pad: gst::Pad,
state: Mutex<State>,
settings: Mutex<Settings>,
}
lazy_static! {
static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
"ts-jitterbuffer",
gst::DebugColorFlags::empty(),
Some("Thread-sharing jitterbuffer"),
);
}
impl JitterBuffer {
fn get_current_running_time(&self, element: &gst::Element) -> gst::ClockTime {
if let Some(clock) = element.get_clock() {
@ -270,7 +277,7 @@ impl JitterBuffer {
) -> Result<gst::FlowSuccess, gst::FlowError> {
let s = caps.get_structure(0).ok_or(gst::FlowError::Error)?;
gst_info!(self.cat, obj: element, "Parsing caps: {:?}", caps);
gst_info!(CAT, obj: element, "Parsing caps: {:?}", caps);
let payload = s
.get_some::<i32>("payload")
@ -317,7 +324,7 @@ impl JitterBuffer {
}
gst_debug!(
self.cat,
CAT,
"new packet spacing {}, old packet spacing {} combined to {}",
new_packet_spacing,
old_packet_spacing,
@ -342,7 +349,7 @@ impl JitterBuffer {
let mut reset = false;
gst_debug!(
self.cat,
CAT,
obj: element,
"Handling big gap, gap packets length: {}",
gap_packets_length
@ -361,7 +368,7 @@ impl JitterBuffer {
let gap_seq = rtp_buffer.get_seq();
gst_log!(
self.cat,
CAT,
obj: element,
"Looking at gap packet with seq {}",
gap_seq
@ -384,12 +391,7 @@ impl JitterBuffer {
}
}
gst_debug!(
self.cat,
obj: element,
"all consecutive: {}",
all_consecutive
);
gst_debug!(CAT, obj: element, "all consecutive: {}", all_consecutive);
if all_consecutive && gap_packets_length > 3 {
reset = true;
@ -407,7 +409,7 @@ impl JitterBuffer {
pad: &gst::Pad,
element: &gst::Element,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_info!(self.cat, obj: element, "Resetting");
gst_info!(CAT, obj: element, "Resetting");
state.jbuf.borrow().flush();
state.jbuf.borrow().reset_skew();
@ -456,7 +458,7 @@ impl JitterBuffer {
drop(rtp_buffer);
gst_log!(
self.cat,
CAT,
obj: element,
"Storing buffer, seq: {}, rtptime: {}, pt: {}",
seq,
@ -488,7 +490,7 @@ impl JitterBuffer {
state.last_pt = pt as u32;
state.clock_rate = -1;
gst_debug!(self.cat, obj: pad, "New payload type: {}", pt);
gst_debug!(CAT, obj: pad, "New payload type: {}", pt);
if let Some(caps) = pad.get_current_caps() {
self.parse_caps(state, element, &caps, pt)?;
@ -526,7 +528,7 @@ impl JitterBuffer {
if pts.is_none() {
gst_debug!(
self.cat,
CAT,
obj: element,
"cannot calculate a valid pts for #{}, discard",
seq
@ -559,7 +561,7 @@ impl JitterBuffer {
if gap <= 0 {
state.num_late += 1;
gst_debug!(self.cat, obj: element, "Dropping late {}", seq);
gst_debug!(CAT, obj: element, "Dropping late {}", seq);
return Ok(gst::FlowSuccess::Ok);
}
}
@ -598,7 +600,7 @@ impl JitterBuffer {
state.earliest_seqnum = seq;
}
gst_log!(self.cat, obj: pad, "Stored buffer");
gst_log!(CAT, obj: pad, "Stored buffer");
Ok(gst::FlowSuccess::Ok)
}
@ -619,7 +621,7 @@ impl JitterBuffer {
let mut ret = true;
gst_debug!(
self.cat,
CAT,
obj: element,
"Pushing lost events seq: {}, last popped seq: {}",
seqnum,
@ -773,7 +775,7 @@ impl JitterBuffer {
state.num_pushed += 1;
gst_debug!(self.cat, obj: &self.src_pad, "Pushing buffer {:?} with seq {}", buffer, seq);
gst_debug!(CAT, obj: &self.src_pad, "Pushing buffer {:?} with seq {}", buffer, seq);
self.send_io_context_event(&state)?;
@ -789,7 +791,7 @@ impl JitterBuffer {
let now = self.get_current_running_time(element);
gst_debug!(
self.cat,
CAT,
obj: element,
"now is {}, earliest pts is {}, packet_spacing {} and latency {}",
now,
@ -817,7 +819,7 @@ impl JitterBuffer {
let element_clone = element.clone();
gst_debug!(self.cat, obj: element, "Scheduling wakeup in {}", timeout);
gst_debug!(CAT, obj: element, "Scheduling wakeup in {}", timeout);
let timer = Timeout::new(
state.io_context.as_ref().unwrap(),
@ -835,7 +837,7 @@ impl JitterBuffer {
let now = jb.get_current_running_time(&element_clone);
gst_debug!(
jb.cat,
CAT,
obj: &element_clone,
"Woke back up, earliest_pts {}",
state.earliest_pts
@ -937,7 +939,7 @@ impl JitterBuffer {
fn flush(&self, element: &gst::Element) {
let mut state = self.state.lock().unwrap();
gst_info!(self.cat, obj: element, "Flushing");
gst_info!(CAT, obj: element, "Flushing");
let io_context = state.io_context.take();
@ -952,7 +954,7 @@ impl JitterBuffer {
element: &gst::Element,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_debug!(self.cat, obj: pad, "Handling buffer {:?}", buffer);
gst_debug!(CAT, obj: pad, "Handling buffer {:?}", buffer);
let mut state = self.state.lock().unwrap();
self.enqueue_item(&mut state, pad, element, Some(buffer))
}
@ -961,7 +963,7 @@ impl JitterBuffer {
let mut forward = true;
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
gst_log!(CAT, obj: pad, "Handling event {:?}", event);
match event.view() {
EventView::FlushStop(..) => {
@ -989,7 +991,7 @@ impl JitterBuffer {
};
if forward {
gst_log!(self.cat, obj: pad, "Forwarding event {:?}", event);
gst_log!(CAT, obj: pad, "Forwarding event {:?}", event);
self.src_pad.push_event(event)
} else {
true
@ -1003,12 +1005,12 @@ impl JitterBuffer {
query: &mut gst::QueryRef,
) -> bool {
use gst::QueryView;
gst_log!(self.cat, obj: pad, "Forwarding query {:?}", query);
gst_log!(CAT, obj: pad, "Forwarding query {:?}", query);
match query.view_mut() {
QueryView::Drain(..) => {
let mut state = self.state.lock().unwrap();
gst_info!(self.cat, obj: pad, "Draining");
gst_info!(CAT, obj: pad, "Draining");
self.enqueue_item(&mut state, pad, element, None).is_ok()
}
_ => self.src_pad.peer_query(query),
@ -1023,7 +1025,7 @@ impl JitterBuffer {
) -> bool {
use gst::QueryView;
gst_log!(self.cat, obj: pad, "Forwarding query {:?}", query);
gst_log!(CAT, obj: pad, "Forwarding query {:?}", query);
match query.view_mut() {
QueryView::Latency(ref mut q) => {
@ -1059,7 +1061,7 @@ impl JitterBuffer {
}
fn clear_pt_map(&self, element: &gst::Element) {
gst_info!(self.cat, obj: element, "Clearing PT map");
gst_info!(CAT, obj: element, "Clearing PT map");
let mut state = self.state.lock().unwrap();
state.clock_rate = -1;
@ -1164,11 +1166,6 @@ impl ObjectSubclass for JitterBuffer {
});
Self {
cat: gst::DebugCategory::new(
"ts-jitterbuffer",
gst::DebugColorFlags::empty(),
Some("Thread-sharing jitterbuffer"),
),
sink_pad,
src_pad,
state: Mutex::new(State::default()),
@ -1282,7 +1279,7 @@ impl ElementImpl for JitterBuffer {
element: &gst::Element,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_trace!(self.cat, obj: element, "Changing state {:?}", transition);
gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
match transition {
gst::StateChange::NullToReady => {

View file

@ -283,13 +283,20 @@ impl Default for StateSink {
}
struct ProxySink {
cat: gst::DebugCategory,
#[allow(unused)]
sink_pad: gst::Pad,
state: Mutex<StateSink>,
settings: Mutex<SettingsSink>,
}
lazy_static! {
static ref SINK_CAT: gst::DebugCategory = gst::DebugCategory::new(
"ts-proxysink",
gst::DebugColorFlags::empty(),
Some("Thread-sharing proxy sink"),
);
}
impl ProxySink {
fn enqueue_item(
&self,
@ -359,13 +366,13 @@ impl ProxySink {
queue.pending_queue.as_mut().unwrap().2.push_back(item);
gst_log!(
self.cat,
SINK_CAT,
obj: element,
"Proxy is full - Pushing first item on pending queue"
);
if schedule_now {
gst_log!(self.cat, obj: element, "Scheduling pending queue now");
gst_log!(SINK_CAT, obj: element, "Scheduling pending queue now");
queue.pending_queue.as_mut().unwrap().1 = true;
@ -375,7 +382,7 @@ impl ProxySink {
let state = sink.state.lock().unwrap();
gst_log!(
sink.cat,
SINK_CAT,
obj: &element_clone,
"Trying to empty pending queue"
);
@ -407,14 +414,14 @@ impl ProxySink {
items.push_front(failed_item);
*task = Some(task::current());
gst_log!(
sink.cat,
SINK_CAT,
obj: &element_clone,
"Waiting for more queue space"
);
Ok(Async::NotReady)
} else {
gst_log!(
sink.cat,
SINK_CAT,
obj: &element_clone,
"Pending queue is empty now"
);
@ -422,7 +429,7 @@ impl ProxySink {
}
} else {
gst_log!(
sink.cat,
SINK_CAT,
obj: &element_clone,
"Waiting for queue to be allocated"
);
@ -430,7 +437,7 @@ impl ProxySink {
}
} else {
gst_log!(
sink.cat,
SINK_CAT,
obj: &element_clone,
"Flushing, dropping pending queue"
);
@ -453,7 +460,7 @@ impl ProxySink {
Some(future)
}
} else {
gst_log!(self.cat, obj: element, "Scheduling pending queue later");
gst_log!(SINK_CAT, obj: element, "Scheduling pending queue later");
None
}
@ -468,7 +475,7 @@ impl ProxySink {
};
if let Some(wait_future) = wait_future {
gst_log!(self.cat, obj: element, "Blocking until queue becomes empty");
gst_log!(SINK_CAT, obj: element, "Blocking until queue becomes empty");
executor::current_thread::block_on_all(wait_future).map_err(|_| {
gst_element_error!(
element,
@ -490,7 +497,7 @@ impl ProxySink {
element: &gst::Element,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_log!(self.cat, obj: pad, "Handling buffer {:?}", buffer);
gst_log!(SINK_CAT, obj: pad, "Handling buffer {:?}", buffer);
self.enqueue_item(pad, element, DataQueueItem::Buffer(buffer))
}
@ -500,14 +507,14 @@ impl ProxySink {
element: &gst::Element,
list: gst::BufferList,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_log!(self.cat, obj: pad, "Handling buffer list {:?}", list);
gst_log!(SINK_CAT, obj: pad, "Handling buffer list {:?}", list);
self.enqueue_item(pad, element, DataQueueItem::BufferList(list))
}
fn sink_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
gst_log!(SINK_CAT, obj: pad, "Handling event {:?}", event);
match event.view() {
EventView::Eos(..) => {
@ -538,7 +545,7 @@ impl ProxySink {
.expect("missing signal arg");
gst_debug!(
self.cat,
SINK_CAT,
obj: element,
"Got upstream pending future id {:?}",
pending_future_id
@ -551,7 +558,7 @@ impl ProxySink {
_ => (),
};
gst_log!(self.cat, obj: pad, "Queuing event {:?}", event);
gst_log!(SINK_CAT, obj: pad, "Queuing event {:?}", event);
let _ = self.enqueue_item(pad, element, DataQueueItem::Event(event));
true
}
@ -562,13 +569,13 @@ impl ProxySink {
element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
gst_log!(SINK_CAT, obj: pad, "Handling query {:?}", query);
pad.query_default(Some(element), query)
}
fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Preparing");
gst_debug!(SINK_CAT, obj: element, "Preparing");
let settings = self.settings.lock().unwrap().clone();
@ -584,35 +591,35 @@ impl ProxySink {
}
};
gst_debug!(self.cat, obj: element, "Prepared");
gst_debug!(SINK_CAT, obj: element, "Prepared");
Ok(())
}
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Unpreparing");
gst_debug!(SINK_CAT, obj: element, "Unpreparing");
let mut state = self.state.lock().unwrap();
*state = StateSink::default();
gst_debug!(self.cat, obj: element, "Unprepared");
gst_debug!(SINK_CAT, obj: element, "Unprepared");
Ok(())
}
fn start(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Starting");
gst_debug!(SINK_CAT, obj: element, "Starting");
let state = self.state.lock().unwrap();
let mut queue = state.queue.as_ref().unwrap().0.lock().unwrap();
queue.last_res = Ok(gst::FlowSuccess::Ok);
gst_debug!(self.cat, obj: element, "Started");
gst_debug!(SINK_CAT, obj: element, "Started");
Ok(())
}
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Stopping");
gst_debug!(SINK_CAT, obj: element, "Stopping");
let mut state = self.state.lock().unwrap();
state.io_context = None;
@ -625,7 +632,7 @@ impl ProxySink {
}
queue.last_res = Err(gst::FlowError::Flushing);
gst_debug!(self.cat, obj: element, "Stopped");
gst_debug!(SINK_CAT, obj: element, "Stopped");
Ok(())
}
@ -695,11 +702,6 @@ impl ObjectSubclass for ProxySink {
});
Self {
cat: gst::DebugCategory::new(
"ts-proxysink",
gst::DebugColorFlags::empty(),
Some("Thread-sharing proxy sink"),
),
sink_pad,
state: Mutex::new(StateSink::default()),
settings: Mutex::new(SettingsSink::default()),
@ -753,7 +755,7 @@ impl ElementImpl for ProxySink {
element: &gst::Element,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_trace!(self.cat, obj: element, "Changing state {:?}", transition);
gst_trace!(SINK_CAT, obj: element, "Changing state {:?}", transition);
match transition {
gst::StateChange::NullToReady => {
@ -782,12 +784,19 @@ impl ElementImpl for ProxySink {
}
struct ProxySrc {
cat: gst::DebugCategory,
src_pad: gst::Pad,
state: Mutex<StateSrc>,
settings: Mutex<SettingsSrc>,
}
lazy_static! {
static ref SRC_CAT: gst::DebugCategory = gst::DebugCategory::new(
"ts-proxysrc",
gst::DebugColorFlags::empty(),
Some("Thread-sharing proxy source"),
);
}
impl ProxySrc {
fn create_io_context_event(state: &StateSrc) -> Option<gst::Event> {
if let (&Some(ref pending_future_id), &Some(ref io_context)) =
@ -809,7 +818,7 @@ impl ProxySrc {
fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
gst_log!(SRC_CAT, obj: pad, "Handling event {:?}", event);
let ret = match event.view() {
EventView::FlushStart(..) => {
@ -831,9 +840,9 @@ impl ProxySrc {
};
if ret {
gst_log!(self.cat, obj: pad, "Handled event {:?}", event);
gst_log!(SRC_CAT, obj: pad, "Handled event {:?}", event);
} else {
gst_log!(self.cat, obj: pad, "Didn't handle event {:?}", event);
gst_log!(SRC_CAT, obj: pad, "Didn't handle event {:?}", event);
}
ret
@ -847,7 +856,7 @@ impl ProxySrc {
) -> bool {
use gst::QueryView;
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
gst_log!(SRC_CAT, obj: pad, "Handling query {:?}", query);
let ret = match query.view_mut() {
QueryView::Latency(ref mut q) => {
q.set(true, 0.into(), 0.into());
@ -877,9 +886,9 @@ impl ProxySrc {
};
if ret {
gst_log!(self.cat, obj: pad, "Handled query {:?}", query);
gst_log!(SRC_CAT, obj: pad, "Handled query {:?}", query);
} else {
gst_log!(self.cat, obj: pad, "Didn't handle query {:?}", query);
gst_log!(SRC_CAT, obj: pad, "Didn't handle query {:?}", query);
}
ret
}
@ -912,11 +921,11 @@ impl ProxySrc {
let res = match item {
DataQueueItem::Buffer(buffer) => {
gst_log!(self.cat, obj: element, "Forwarding buffer {:?}", buffer);
gst_log!(SRC_CAT, obj: element, "Forwarding buffer {:?}", buffer);
self.src_pad.push(buffer).map(|_| ())
}
DataQueueItem::BufferList(list) => {
gst_log!(self.cat, obj: element, "Forwarding buffer list {:?}", list);
gst_log!(SRC_CAT, obj: element, "Forwarding buffer list {:?}", list);
self.src_pad.push_list(list).map(|_| ())
}
DataQueueItem::Event(event) => {
@ -936,11 +945,11 @@ impl ProxySrc {
match new_event {
Some(event) => {
gst_log!(self.cat, obj: element, "Forwarding new event {:?}", event);
gst_log!(SRC_CAT, obj: element, "Forwarding new event {:?}", event);
self.src_pad.push_event(event);
}
None => {
gst_log!(self.cat, obj: element, "Forwarding event {:?}", event);
gst_log!(SRC_CAT, obj: element, "Forwarding event {:?}", event);
self.src_pad.push_event(event);
}
}
@ -950,14 +959,14 @@ impl ProxySrc {
let res = match res {
Ok(_) => {
gst_log!(self.cat, obj: element, "Successfully pushed item");
gst_log!(SRC_CAT, obj: element, "Successfully pushed item");
let state = self.state.lock().unwrap();
let mut queue = state.queue.as_ref().unwrap().0.lock().unwrap();
queue.last_res = Ok(gst::FlowSuccess::Ok);
Ok(())
}
Err(gst::FlowError::Flushing) => {
gst_debug!(self.cat, obj: element, "Flushing");
gst_debug!(SRC_CAT, obj: element, "Flushing");
let state = self.state.lock().unwrap();
let mut queue = state.queue.as_ref().unwrap().0.lock().unwrap();
if let Some(ref queue) = queue.queue {
@ -967,7 +976,7 @@ impl ProxySrc {
Ok(())
}
Err(gst::FlowError::Eos) => {
gst_debug!(self.cat, obj: element, "EOS");
gst_debug!(SRC_CAT, obj: element, "EOS");
let state = self.state.lock().unwrap();
let mut queue = state.queue.as_ref().unwrap().0.lock().unwrap();
if let Some(ref queue) = queue.queue {
@ -977,7 +986,7 @@ impl ProxySrc {
Ok(())
}
Err(err) => {
gst_error!(self.cat, obj: element, "Got error {}", err);
gst_error!(SRC_CAT, obj: element, "Got error {}", err);
gst_element_error!(
element,
gst::StreamError::Failed,
@ -1015,7 +1024,7 @@ impl ProxySrc {
}
fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Preparing");
gst_debug!(SRC_CAT, obj: element, "Preparing");
let settings = self.settings.lock().unwrap().clone();
@ -1062,8 +1071,7 @@ impl ProxySrc {
src.push_item(&element_clone, item)
},
move |err| {
let src = Self::from_instance(&element_clone2);
gst_error!(src.cat, obj: &element_clone2, "Got error {}", err);
gst_error!(SRC_CAT, obj: &element_clone2, "Got error {}", err);
match err {
gst::FlowError::CustomError => (),
err => {
@ -1086,7 +1094,7 @@ impl ProxySrc {
let pending_future_id = io_context.acquire_pending_future_id();
gst_debug!(
self.cat,
SRC_CAT,
obj: element,
"Got pending future id {:?}",
pending_future_id
@ -1098,13 +1106,13 @@ impl ProxySrc {
state.pending_future_id = Some(pending_future_id);
state.queue = Some(queue);
gst_debug!(self.cat, obj: element, "Prepared");
gst_debug!(SRC_CAT, obj: element, "Prepared");
Ok(())
}
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Unpreparing");
gst_debug!(SRC_CAT, obj: element, "Unpreparing");
// FIXME: The IO Context has to be alive longer than the queue,
// otherwise the queue can't finish any remaining work
@ -1136,13 +1144,13 @@ impl ProxySrc {
}
drop(io_context);
gst_debug!(self.cat, obj: element, "Unprepared");
gst_debug!(SRC_CAT, obj: element, "Unprepared");
Ok(())
}
fn start(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Starting");
gst_debug!(SRC_CAT, obj: element, "Starting");
let state = self.state.lock().unwrap();
let queue = state.queue.as_ref().unwrap().0.lock().unwrap();
@ -1150,13 +1158,13 @@ impl ProxySrc {
queue.unpause();
}
gst_debug!(self.cat, obj: element, "Started");
gst_debug!(SRC_CAT, obj: element, "Started");
Ok(())
}
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Stopping");
gst_debug!(SRC_CAT, obj: element, "Stopping");
let state = self.state.lock().unwrap();
let mut queue = state.queue.as_ref().unwrap().0.lock().unwrap();
@ -1166,7 +1174,7 @@ impl ProxySrc {
}
let _ = queue.pending_future_cancel.take();
gst_debug!(self.cat, obj: element, "Stopped");
gst_debug!(SRC_CAT, obj: element, "Stopped");
Ok(())
}
@ -1226,11 +1234,6 @@ impl ObjectSubclass for ProxySrc {
});
Self {
cat: gst::DebugCategory::new(
"ts-proxysrc",
gst::DebugColorFlags::empty(),
Some("Thread-sharing proxy source"),
),
src_pad,
state: Mutex::new(StateSrc::default()),
settings: Mutex::new(SettingsSrc::default()),
@ -1327,7 +1330,7 @@ impl ElementImpl for ProxySrc {
element: &gst::Element,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_trace!(self.cat, obj: element, "Changing state {:?}", transition);
gst_trace!(SRC_CAT, obj: element, "Changing state {:?}", transition);
match transition {
gst::StateChange::NullToReady => {

View file

@ -153,13 +153,20 @@ impl Default for State {
}
struct Queue {
cat: gst::DebugCategory,
sink_pad: gst::Pad,
src_pad: gst::Pad,
state: Mutex<State>,
settings: Mutex<Settings>,
}
lazy_static! {
static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
"ts-queue",
gst::DebugColorFlags::empty(),
Some("Thread-sharing queue"),
);
}
impl Queue {
fn create_io_context_event(state: &State) -> Option<gst::Event> {
if let (&Some(ref pending_future_id), &Some(ref io_context)) =
@ -222,7 +229,7 @@ impl Queue {
element: &gst::Element,
state: &mut State,
) -> Option<impl Future<Item = (), Error = ()>> {
gst_log!(self.cat, obj: element, "Scheduling pending queue now");
gst_log!(CAT, obj: element, "Scheduling pending queue now");
let State {
ref mut pending_queue,
@ -248,11 +255,7 @@ impl Queue {
return Ok(Async::Ready(()));
}
gst_log!(
queue.cat,
obj: &element_clone,
"Trying to empty pending queue"
);
gst_log!(CAT, obj: &element_clone, "Trying to empty pending queue");
let res = if let Some(PendingQueue {
ref mut task,
@ -270,22 +273,14 @@ impl Queue {
if let Some(failed_item) = failed_item {
items.push_front(failed_item);
*task = Some(task::current());
gst_log!(
queue.cat,
obj: &element_clone,
"Waiting for more queue space"
);
gst_log!(CAT, obj: &element_clone, "Waiting for more queue space");
Ok(Async::NotReady)
} else {
gst_log!(queue.cat, obj: &element_clone, "Pending queue is empty now");
gst_log!(CAT, obj: &element_clone, "Pending queue is empty now");
Ok(Async::Ready(()))
}
} else {
gst_log!(
queue.cat,
obj: &element_clone,
"Flushing, dropping pending queue"
);
gst_log!(CAT, obj: &element_clone, "Flushing, dropping pending queue");
Ok(Async::Ready(()))
};
@ -347,7 +342,7 @@ impl Queue {
pending_queue.as_mut().unwrap().items.push_back(item);
gst_log!(
self.cat,
CAT,
obj: element,
"Queue is full - Pushing first item on pending queue"
);
@ -355,7 +350,7 @@ impl Queue {
if schedule_now {
self.schedule_pending_queue(element, &mut state)
} else {
gst_log!(self.cat, obj: element, "Scheduling pending queue later");
gst_log!(CAT, obj: element, "Scheduling pending queue later");
None
}
@ -371,11 +366,7 @@ impl Queue {
};
if let Some(wait_future) = wait_future {
gst_log!(
self.cat,
obj: element,
"Blocking until queue has space again"
);
gst_log!(CAT, obj: element, "Blocking until queue has space again");
executor::current_thread::block_on_all(wait_future).map_err(|_| {
gst_element_error!(
element,
@ -395,7 +386,7 @@ impl Queue {
element: &gst::Element,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_log!(self.cat, obj: pad, "Handling buffer {:?}", buffer);
gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer);
self.enqueue_item(pad, element, DataQueueItem::Buffer(buffer))
}
@ -405,14 +396,14 @@ impl Queue {
element: &gst::Element,
list: gst::BufferList,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_log!(self.cat, obj: pad, "Handling buffer list {:?}", list);
gst_log!(CAT, obj: pad, "Handling buffer list {:?}", list);
self.enqueue_item(pad, element, DataQueueItem::BufferList(list))
}
fn sink_event(&self, pad: &gst::Pad, element: &gst::Element, mut event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
gst_log!(CAT, obj: pad, "Handling event {:?}", event);
let mut new_event = None;
match event.view() {
@ -441,7 +432,7 @@ impl Queue {
.expect("missing signal arg");
gst_debug!(
self.cat,
CAT,
obj: element,
"Got upstream pending future id {:?}",
pending_future_id
@ -464,11 +455,11 @@ impl Queue {
}
if event.is_serialized() {
gst_log!(self.cat, obj: pad, "Queuing event {:?}", event);
gst_log!(CAT, obj: pad, "Queuing event {:?}", event);
let _ = self.enqueue_item(pad, element, DataQueueItem::Event(event));
true
} else {
gst_log!(self.cat, obj: pad, "Forwarding event {:?}", event);
gst_log!(CAT, obj: pad, "Forwarding event {:?}", event);
self.src_pad.push_event(event)
}
}
@ -479,14 +470,14 @@ impl Queue {
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
gst_log!(CAT, obj: pad, "Handling query {:?}", query);
if query.is_serialized() {
// FIXME: How can we do this?
gst_log!(self.cat, obj: pad, "Dropping serialized query {:?}", query);
gst_log!(CAT, obj: pad, "Dropping serialized query {:?}", query);
false
} else {
gst_log!(self.cat, obj: pad, "Forwarding query {:?}", query);
gst_log!(CAT, obj: pad, "Forwarding query {:?}", query);
self.src_pad.peer_query(query)
}
}
@ -494,7 +485,7 @@ impl Queue {
fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
gst_log!(CAT, obj: pad, "Handling event {:?}", event);
match event.view() {
EventView::FlushStart(..) => {
@ -511,7 +502,7 @@ impl Queue {
_ => (),
};
gst_log!(self.cat, obj: pad, "Forwarding event {:?}", event);
gst_log!(CAT, obj: pad, "Forwarding event {:?}", event);
self.sink_pad.push_event(event)
}
@ -523,7 +514,7 @@ impl Queue {
) -> bool {
use gst::QueryView;
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
gst_log!(CAT, obj: pad, "Handling query {:?}", query);
match query.view_mut() {
QueryView::Scheduling(ref mut q) => {
let mut new_query = gst::Query::new_scheduling();
@ -532,7 +523,7 @@ impl Queue {
return res;
}
gst_log!(self.cat, obj: pad, "Upstream returned {:?}", new_query);
gst_log!(CAT, obj: pad, "Upstream returned {:?}", new_query);
let (flags, min, max, align) = new_query.get_result();
q.set(flags, min, max, align);
@ -544,13 +535,13 @@ impl Queue {
.filter(|m| m != &gst::PadMode::Pull)
.collect::<Vec<_>>(),
);
gst_log!(self.cat, obj: pad, "Returning {:?}", q.get_mut_query());
gst_log!(CAT, obj: pad, "Returning {:?}", q.get_mut_query());
return true;
}
_ => (),
};
gst_log!(self.cat, obj: pad, "Forwarding query {:?}", query);
gst_log!(CAT, obj: pad, "Forwarding query {:?}", query);
self.sink_pad.peer_query(query)
}
@ -585,15 +576,15 @@ impl Queue {
let res = match item {
DataQueueItem::Buffer(buffer) => {
gst_log!(self.cat, obj: element, "Forwarding buffer {:?}", buffer);
gst_log!(CAT, obj: element, "Forwarding buffer {:?}", buffer);
self.src_pad.push(buffer).map(|_| ())
}
DataQueueItem::BufferList(list) => {
gst_log!(self.cat, obj: element, "Forwarding buffer list {:?}", list);
gst_log!(CAT, obj: element, "Forwarding buffer list {:?}", list);
self.src_pad.push_list(list).map(|_| ())
}
DataQueueItem::Event(event) => {
gst_log!(self.cat, obj: element, "Forwarding event {:?}", event);
gst_log!(CAT, obj: element, "Forwarding event {:?}", event);
self.src_pad.push_event(event);
Ok(())
}
@ -601,13 +592,13 @@ impl Queue {
let res = match res {
Ok(_) => {
gst_log!(self.cat, obj: element, "Successfully pushed item");
gst_log!(CAT, obj: element, "Successfully pushed item");
let mut state = self.state.lock().unwrap();
state.last_res = Ok(gst::FlowSuccess::Ok);
Ok(())
}
Err(gst::FlowError::Flushing) => {
gst_debug!(self.cat, obj: element, "Flushing");
gst_debug!(CAT, obj: element, "Flushing");
let mut state = self.state.lock().unwrap();
if let Some(ref queue) = state.queue {
queue.pause();
@ -616,7 +607,7 @@ impl Queue {
Ok(())
}
Err(gst::FlowError::Eos) => {
gst_debug!(self.cat, obj: element, "EOS");
gst_debug!(CAT, obj: element, "EOS");
let mut state = self.state.lock().unwrap();
if let Some(ref queue) = state.queue {
queue.pause();
@ -625,7 +616,7 @@ impl Queue {
Ok(())
}
Err(err) => {
gst_error!(self.cat, obj: element, "Got error {}", err);
gst_error!(CAT, obj: element, "Got error {}", err);
gst_element_error!(
element,
gst::StreamError::Failed,
@ -662,7 +653,7 @@ impl Queue {
}
fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Preparing");
gst_debug!(CAT, obj: element, "Preparing");
let settings = self.settings.lock().unwrap().clone();
@ -705,8 +696,7 @@ impl Queue {
queue.push_item(&element_clone, item)
},
move |err| {
let queue = Self::from_instance(&element_clone2);
gst_error!(queue.cat, obj: &element_clone2, "Got error {}", err);
gst_error!(CAT, obj: &element_clone2, "Got error {}", err);
match err {
gst::FlowError::CustomError => (),
err => {
@ -729,7 +719,7 @@ impl Queue {
let pending_future_id = io_context.acquire_pending_future_id();
gst_debug!(
self.cat,
CAT,
obj: element,
"Got pending future id {:?}",
pending_future_id
@ -739,13 +729,13 @@ impl Queue {
state.queue = Some(dataqueue);
state.pending_future_id = Some(pending_future_id);
gst_debug!(self.cat, obj: element, "Prepared");
gst_debug!(CAT, obj: element, "Prepared");
Ok(())
}
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Unpreparing");
gst_debug!(CAT, obj: element, "Unpreparing");
// FIXME: The IO Context has to be alive longer than the queue,
// otherwise the queue can't finish any remaining work
@ -770,12 +760,12 @@ impl Queue {
}
drop(io_context);
gst_debug!(self.cat, obj: element, "Unprepared");
gst_debug!(CAT, obj: element, "Unprepared");
Ok(())
}
fn start(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Starting");
gst_debug!(CAT, obj: element, "Starting");
let mut state = self.state.lock().unwrap();
if let Some(ref queue) = state.queue {
@ -783,13 +773,13 @@ impl Queue {
}
state.last_res = Ok(gst::FlowSuccess::Ok);
gst_debug!(self.cat, obj: element, "Started");
gst_debug!(CAT, obj: element, "Started");
Ok(())
}
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Stopping");
gst_debug!(CAT, obj: element, "Stopping");
let mut state = self.state.lock().unwrap();
if let Some(ref queue) = state.queue {
@ -805,7 +795,7 @@ impl Queue {
let _ = state.pending_future_cancel.take();
state.last_res = Err(gst::FlowError::Flushing);
gst_debug!(self.cat, obj: element, "Stopped");
gst_debug!(CAT, obj: element, "Stopped");
Ok(())
}
@ -901,11 +891,6 @@ impl ObjectSubclass for Queue {
});
Self {
cat: gst::DebugCategory::new(
"ts-queue",
gst::DebugColorFlags::empty(),
Some("Thread-sharing queue"),
),
sink_pad,
src_pad,
state: Mutex::new(State::default()),
@ -991,7 +976,7 @@ impl ElementImpl for Queue {
element: &gst::Element,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_trace!(self.cat, obj: element, "Changing state {:?}", transition);
gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
match transition {
gst::StateChange::NullToReady => {

View file

@ -199,17 +199,24 @@ impl Default for State {
}
struct TcpClientSrc {
cat: gst::DebugCategory,
src_pad: gst::Pad,
state: Mutex<State>,
settings: Mutex<Settings>,
}
lazy_static! {
static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
"ts-tcpclientsrc",
gst::DebugColorFlags::empty(),
Some("Thread-sharing TCP Client source"),
);
}
impl TcpClientSrc {
fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
gst_log!(CAT, obj: pad, "Handling event {:?}", event);
let ret = match event.view() {
EventView::FlushStart(..) => {
@ -231,9 +238,9 @@ impl TcpClientSrc {
};
if ret {
gst_log!(self.cat, obj: pad, "Handled event {:?}", event);
gst_log!(CAT, obj: pad, "Handled event {:?}", event);
} else {
gst_log!(self.cat, obj: pad, "Didn't handle event {:?}", event);
gst_log!(CAT, obj: pad, "Didn't handle event {:?}", event);
}
ret
}
@ -246,7 +253,7 @@ impl TcpClientSrc {
) -> bool {
use gst::QueryView;
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
gst_log!(CAT, obj: pad, "Handling query {:?}", query);
let ret = match query.view_mut() {
QueryView::Latency(ref mut q) => {
q.set(false, 0.into(), 0.into());
@ -277,9 +284,9 @@ impl TcpClientSrc {
};
if ret {
gst_log!(self.cat, obj: pad, "Handled query {:?}", query);
gst_log!(CAT, obj: pad, "Handled query {:?}", query);
} else {
gst_log!(self.cat, obj: pad, "Didn't handle query {:?}", query);
gst_log!(CAT, obj: pad, "Didn't handle query {:?}", query);
}
ret
}
@ -312,7 +319,7 @@ impl TcpClientSrc {
let mut events = Vec::new();
let mut state = self.state.lock().unwrap();
if state.need_initial_events {
gst_debug!(self.cat, obj: element, "Pushing initial events");
gst_debug!(CAT, obj: element, "Pushing initial events");
let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
events.push(gst::Event::new_stream_start(&stream_id).build());
@ -349,11 +356,11 @@ impl TcpClientSrc {
let res = match self.src_pad.push(buffer) {
Ok(_) => {
gst_log!(self.cat, obj: element, "Successfully pushed buffer");
gst_log!(CAT, obj: element, "Successfully pushed buffer");
Ok(())
}
Err(gst::FlowError::Flushing) => {
gst_debug!(self.cat, obj: element, "Flushing");
gst_debug!(CAT, obj: element, "Flushing");
let state = self.state.lock().unwrap();
if let Some(ref socket) = state.socket {
socket.pause();
@ -361,7 +368,7 @@ impl TcpClientSrc {
Ok(())
}
Err(gst::FlowError::Eos) => {
gst_debug!(self.cat, obj: element, "EOS");
gst_debug!(CAT, obj: element, "EOS");
let state = self.state.lock().unwrap();
if let Some(ref socket) = state.socket {
socket.pause();
@ -369,7 +376,7 @@ impl TcpClientSrc {
Ok(())
}
Err(err) => {
gst_error!(self.cat, obj: element, "Got error {}", err);
gst_error!(CAT, obj: element, "Got error {}", err);
gst_element_error!(
element,
gst::StreamError::Failed,
@ -406,7 +413,7 @@ impl TcpClientSrc {
fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
use std::net::{IpAddr, SocketAddr};
gst_debug!(self.cat, obj: element, "Preparing");
gst_debug!(CAT, obj: element, "Preparing");
let settings = self.settings.lock().unwrap().clone();
@ -440,7 +447,7 @@ impl TcpClientSrc {
let port = settings.port;
let saddr = SocketAddr::new(addr, port as u16);
gst_debug!(self.cat, obj: element, "Connecting to {:?}", saddr);
gst_debug!(CAT, obj: element, "Connecting to {:?}", saddr);
let socket = net::TcpStream::connect(&saddr);
let buffer_pool = gst::BufferPool::new();
@ -469,8 +476,7 @@ impl TcpClientSrc {
tcpclientsrc.push_buffer(&element_clone, buffer)
},
move |err| {
let tcpclientsrc = Self::from_instance(&element_clone2);
gst_error!(tcpclientsrc.cat, obj: &element_clone2, "Got error {}", err);
gst_error!(CAT, obj: &element_clone2, "Got error {}", err);
match err {
Either::Left(gst::FlowError::CustomError) => (),
Either::Left(err) => {
@ -498,7 +504,7 @@ impl TcpClientSrc {
let pending_future_id = io_context.acquire_pending_future_id();
gst_debug!(
self.cat,
CAT,
obj: element,
"Got pending future id {:?}",
pending_future_id
@ -508,13 +514,13 @@ impl TcpClientSrc {
state.io_context = Some(io_context);
state.pending_future_id = Some(pending_future_id);
gst_debug!(self.cat, obj: element, "Prepared");
gst_debug!(CAT, obj: element, "Prepared");
Ok(())
}
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Unpreparing");
gst_debug!(CAT, obj: element, "Unpreparing");
// FIXME: The IO Context has to be alive longer than the queue,
// otherwise the queue can't finish any remaining work
@ -538,25 +544,25 @@ impl TcpClientSrc {
}
drop(io_context);
gst_debug!(self.cat, obj: element, "Unprepared");
gst_debug!(CAT, obj: element, "Unprepared");
Ok(())
}
fn start(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Starting");
gst_debug!(CAT, obj: element, "Starting");
let state = self.state.lock().unwrap();
if let Some(ref socket) = state.socket {
socket.unpause(None, None);
}
gst_debug!(self.cat, obj: element, "Started");
gst_debug!(CAT, obj: element, "Started");
Ok(())
}
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Stopping");
gst_debug!(CAT, obj: element, "Stopping");
let mut state = self.state.lock().unwrap();
if let Some(ref socket) = state.socket {
@ -564,7 +570,7 @@ impl TcpClientSrc {
}
let _ = state.pending_future_cancel.take();
gst_debug!(self.cat, obj: element, "Stopped");
gst_debug!(CAT, obj: element, "Stopped");
Ok(())
}
@ -619,11 +625,6 @@ impl ObjectSubclass for TcpClientSrc {
});
Self {
cat: gst::DebugCategory::new(
"ts-tcpclientsrc",
gst::DebugColorFlags::empty(),
Some("Thread-sharing TCP Client source"),
),
src_pad,
state: Mutex::new(State::default()),
settings: Mutex::new(Settings::default()),
@ -717,7 +718,7 @@ impl ElementImpl for TcpClientSrc {
element: &gst::Element,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_trace!(self.cat, obj: element, "Changing state {:?}", transition);
gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
match transition {
gst::StateChange::NullToReady => {

View file

@ -334,17 +334,24 @@ impl Default for State {
}
struct UdpSrc {
cat: gst::DebugCategory,
src_pad: gst::Pad,
state: Mutex<State>,
settings: Mutex<Settings>,
}
lazy_static! {
static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
"ts-udpsrc",
gst::DebugColorFlags::empty(),
Some("Thread-sharing UDP source"),
);
}
impl UdpSrc {
fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
gst_log!(CAT, obj: pad, "Handling event {:?}", event);
let ret = match event.view() {
EventView::FlushStart(..) => {
@ -366,9 +373,9 @@ impl UdpSrc {
};
if ret {
gst_log!(self.cat, obj: pad, "Handled event {:?}", event);
gst_log!(CAT, obj: pad, "Handled event {:?}", event);
} else {
gst_log!(self.cat, obj: pad, "Didn't handle event {:?}", event);
gst_log!(CAT, obj: pad, "Didn't handle event {:?}", event);
}
ret
}
@ -381,7 +388,7 @@ impl UdpSrc {
) -> bool {
use gst::QueryView;
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
gst_log!(CAT, obj: pad, "Handling query {:?}", query);
let ret = match query.view_mut() {
QueryView::Latency(ref mut q) => {
q.set(true, 0.into(), 0.into());
@ -412,9 +419,9 @@ impl UdpSrc {
};
if ret {
gst_log!(self.cat, obj: pad, "Handled query {:?}", query);
gst_log!(CAT, obj: pad, "Handled query {:?}", query);
} else {
gst_log!(self.cat, obj: pad, "Didn't handle query {:?}", query);
gst_log!(CAT, obj: pad, "Didn't handle query {:?}", query);
}
ret
}
@ -447,7 +454,7 @@ impl UdpSrc {
let mut events = Vec::new();
let mut state = self.state.lock().unwrap();
if state.need_initial_events {
gst_debug!(self.cat, obj: element, "Pushing initial events");
gst_debug!(CAT, obj: element, "Pushing initial events");
let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
events.push(gst::Event::new_stream_start(&stream_id).build());
@ -479,11 +486,11 @@ impl UdpSrc {
let res = match self.src_pad.push(buffer) {
Ok(_) => {
gst_log!(self.cat, obj: element, "Successfully pushed buffer");
gst_log!(CAT, obj: element, "Successfully pushed buffer");
Ok(())
}
Err(gst::FlowError::Flushing) => {
gst_debug!(self.cat, obj: element, "Flushing");
gst_debug!(CAT, obj: element, "Flushing");
let state = self.state.lock().unwrap();
if let Some(ref socket) = state.socket {
socket.pause();
@ -491,7 +498,7 @@ impl UdpSrc {
Ok(())
}
Err(gst::FlowError::Eos) => {
gst_debug!(self.cat, obj: element, "EOS");
gst_debug!(CAT, obj: element, "EOS");
let state = self.state.lock().unwrap();
if let Some(ref socket) = state.socket {
socket.pause();
@ -499,7 +506,7 @@ impl UdpSrc {
Ok(())
}
Err(err) => {
gst_error!(self.cat, obj: element, "Got error {}", err);
gst_error!(CAT, obj: element, "Got error {}", err);
gst_element_error!(
element,
gst::StreamError::Failed,
@ -536,7 +543,7 @@ impl UdpSrc {
fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
gst_debug!(self.cat, obj: element, "Preparing");
gst_debug!(CAT, obj: element, "Preparing");
let settings = self.settings.lock().unwrap().clone();
@ -606,7 +613,7 @@ impl UdpSrc {
let saddr = SocketAddr::new(bind_addr, port as u16);
gst_debug!(
self.cat,
CAT,
obj: element,
"Binding to {:?} for multicast group {:?}",
saddr,
@ -616,7 +623,7 @@ impl UdpSrc {
saddr
} else {
let saddr = SocketAddr::new(addr, port as u16);
gst_debug!(self.cat, obj: element, "Binding to {:?}", saddr);
gst_debug!(CAT, obj: element, "Binding to {:?}", saddr);
saddr
};
@ -787,8 +794,7 @@ impl UdpSrc {
udpsrc.push_buffer(&element_clone, buffer)
},
move |err| {
let udpsrc = Self::from_instance(&element_clone2);
gst_error!(udpsrc.cat, obj: &element_clone2, "Got error {}", err);
gst_error!(CAT, obj: &element_clone2, "Got error {}", err);
match err {
Either::Left(gst::FlowError::CustomError) => (),
Either::Left(err) => {
@ -816,7 +822,7 @@ impl UdpSrc {
let pending_future_id = io_context.acquire_pending_future_id();
gst_debug!(
self.cat,
CAT,
obj: element,
"Got pending future id {:?}",
pending_future_id
@ -826,7 +832,7 @@ impl UdpSrc {
state.io_context = Some(io_context);
state.pending_future_id = Some(pending_future_id);
gst_debug!(self.cat, obj: element, "Prepared");
gst_debug!(CAT, obj: element, "Prepared");
drop(state);
element.notify("used-socket");
@ -835,7 +841,7 @@ impl UdpSrc {
}
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Unpreparing");
gst_debug!(CAT, obj: element, "Unpreparing");
self.settings.lock().unwrap().used_socket = None;
@ -861,25 +867,25 @@ impl UdpSrc {
}
drop(io_context);
gst_debug!(self.cat, obj: element, "Unprepared");
gst_debug!(CAT, obj: element, "Unprepared");
Ok(())
}
fn start(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Starting");
gst_debug!(CAT, obj: element, "Starting");
let state = self.state.lock().unwrap();
if let Some(ref socket) = state.socket {
socket.unpause(element.get_clock(), Some(element.get_base_time()));
}
gst_debug!(self.cat, obj: element, "Started");
gst_debug!(CAT, obj: element, "Started");
Ok(())
}
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Stopping");
gst_debug!(CAT, obj: element, "Stopping");
let mut state = self.state.lock().unwrap();
if let Some(ref socket) = state.socket {
@ -887,7 +893,7 @@ impl UdpSrc {
}
let _ = state.pending_future_cancel.take();
gst_debug!(self.cat, obj: element, "Stopped");
gst_debug!(CAT, obj: element, "Stopped");
Ok(())
}
@ -958,11 +964,6 @@ impl ObjectSubclass for UdpSrc {
});
Self {
cat: gst::DebugCategory::new(
"ts-udpsrc",
gst::DebugColorFlags::empty(),
Some("Thread-sharing UDP source"),
),
src_pad,
state: Mutex::new(State::default()),
settings: Mutex::new(Settings::default()),
@ -1097,7 +1098,7 @@ impl ElementImpl for UdpSrc {
element: &gst::Element,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_trace!(self.cat, obj: element, "Changing state {:?}", transition);
gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
match transition {
gst::StateChange::NullToReady => {

View file

@ -15,6 +15,7 @@ gstreamer-video = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs
gtk = { git = "https://github.com/gtk-rs/gtk", optional = true }
gio = { git = "https://github.com/gtk-rs/gio", optional = true }
parking_lot = "0.9"
lazy_static = "1.0"
[dev-dependencies]
either = "1.0"

View file

@ -24,6 +24,9 @@ extern crate gstreamer as gst;
extern crate gstreamer_audio as gst_audio;
extern crate gstreamer_video as gst_video;
#[macro_use]
extern crate lazy_static;
extern crate parking_lot;
mod togglerecord;

View file

@ -344,7 +344,6 @@ impl HandleData for gst::Buffer {
}
struct ToggleRecord {
cat: gst::DebugCategory,
settings: Mutex<Settings>,
state: Mutex<State>,
main_stream: Stream,
@ -356,6 +355,14 @@ struct ToggleRecord {
pads: Mutex<HashMap<gst::Pad, Stream>>,
}
lazy_static! {
static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
"togglerecord",
gst::DebugColorFlags::empty(),
Some("Toggle Record Element"),
);
}
impl ToggleRecord {
fn set_pad_functions(sinkpad: &gst::Pad, srcpad: &gst::Pad) {
sinkpad.set_chain_function(|pad, parent, buffer| {
@ -439,7 +446,7 @@ impl ToggleRecord {
let data = match data.clip(&state, &state.in_segment) {
None => {
gst_log!(self.cat, obj: pad, "Dropping raw data outside segment");
gst_log!(CAT, obj: pad, "Dropping raw data outside segment");
return Ok(HandleResult::Drop);
}
Some(data) => data,
@ -464,7 +471,7 @@ impl ToggleRecord {
self.main_stream_cond.notify_all();
gst_log!(
self.cat,
CAT,
obj: pad,
"Main stream current running time {}-{} (position: {}-{})",
current_running_time,
@ -479,12 +486,12 @@ impl ToggleRecord {
let mut rec_state = self.state.lock();
let settings_changed = match rec_state.recording_state {
RecordingState::Recording if !settings.record => {
gst_debug!(self.cat, obj: pad, "Stopping recording");
gst_debug!(CAT, obj: pad, "Stopping recording");
rec_state.recording_state = RecordingState::Stopping;
true
}
RecordingState::Stopped if settings.record => {
gst_debug!(self.cat, obj: pad, "Starting recording");
gst_debug!(CAT, obj: pad, "Starting recording");
rec_state.recording_state = RecordingState::Starting;
true
}
@ -495,19 +502,19 @@ impl ToggleRecord {
RecordingState::Recording => {
// Remember where we stopped last, in case of EOS
rec_state.last_recording_stop = current_running_time_end;
gst_log!(self.cat, obj: pad, "Passing buffer (recording)");
gst_log!(CAT, obj: pad, "Passing buffer (recording)");
Ok(HandleResult::Pass(data))
}
RecordingState::Stopping => {
if !data.is_keyframe() {
// Remember where we stopped last, in case of EOS
rec_state.last_recording_stop = current_running_time_end;
gst_log!(self.cat, obj: pad, "Passing non-keyframe buffer (stopping)");
gst_log!(CAT, obj: pad, "Passing non-keyframe buffer (stopping)");
drop(rec_state);
drop(state);
if settings_changed {
gst_debug!(self.cat, obj: pad, "Requesting a new keyframe");
gst_debug!(CAT, obj: pad, "Requesting a new keyframe");
stream
.sinkpad
.push_event(gst_video::new_upstream_force_key_unit_event().build());
@ -518,7 +525,7 @@ impl ToggleRecord {
// Remember the time when we stopped: now, i.e. right before the current buffer!
rec_state.last_recording_stop = current_running_time;
gst_debug!(self.cat, obj: pad, "Stopping at {}", current_running_time);
gst_debug!(CAT, obj: pad, "Stopping at {}", current_running_time);
// Then unlock and wait for all other streams to reach it or go EOS instead.
drop(rec_state);
@ -529,12 +536,12 @@ impl ToggleRecord {
|| (s.current_running_time.is_some()
&& s.current_running_time >= current_running_time_end)
}) {
gst_log!(self.cat, obj: pad, "Waiting for other streams to stop");
gst_log!(CAT, obj: pad, "Waiting for other streams to stop");
self.main_stream_cond.wait(&mut state);
}
if state.flushing {
gst_debug!(self.cat, obj: pad, "Flushing");
gst_debug!(CAT, obj: pad, "Flushing");
return Ok(HandleResult::Flushing);
}
@ -546,7 +553,7 @@ impl ToggleRecord {
rec_state.last_recording_stop = gst::CLOCK_TIME_NONE;
gst_debug!(
self.cat,
CAT,
obj: pad,
"Stopped at {}, recording duration {}",
current_running_time,
@ -555,7 +562,7 @@ impl ToggleRecord {
// Then become Stopped and drop this buffer. We always stop right before
// a keyframe
gst_log!(self.cat, obj: pad, "Dropping buffer (stopped)");
gst_log!(CAT, obj: pad, "Dropping buffer (stopped)");
drop(rec_state);
drop(state);
@ -564,22 +571,18 @@ impl ToggleRecord {
Ok(HandleResult::Drop)
}
RecordingState::Stopped => {
gst_log!(self.cat, obj: pad, "Dropping buffer (stopped)");
gst_log!(CAT, obj: pad, "Dropping buffer (stopped)");
Ok(HandleResult::Drop)
}
RecordingState::Starting => {
// If this is no keyframe, we can directly go out again here and drop the frame
if !data.is_keyframe() {
gst_log!(
self.cat,
obj: pad,
"Dropping non-keyframe buffer (starting)"
);
gst_log!(CAT, obj: pad, "Dropping non-keyframe buffer (starting)");
drop(rec_state);
drop(state);
if settings_changed {
gst_debug!(self.cat, obj: pad, "Requesting a new keyframe");
gst_debug!(CAT, obj: pad, "Requesting a new keyframe");
stream
.sinkpad
.push_event(gst_video::new_upstream_force_key_unit_event().build());
@ -591,7 +594,7 @@ impl ToggleRecord {
// Remember the time when we started: now!
rec_state.last_recording_start = current_running_time;
rec_state.running_time_offset = current_running_time - rec_state.recording_duration;
gst_debug!(self.cat, obj: pad, "Starting at {}", current_running_time);
gst_debug!(CAT, obj: pad, "Starting at {}", current_running_time);
state.segment_pending = true;
for other_stream in &self.other_streams.lock().0 {
@ -608,26 +611,26 @@ impl ToggleRecord {
|| (s.current_running_time.is_some()
&& s.current_running_time >= current_running_time_end)
}) {
gst_log!(self.cat, obj: pad, "Waiting for other streams to start");
gst_log!(CAT, obj: pad, "Waiting for other streams to start");
self.main_stream_cond.wait(&mut state);
}
if state.flushing {
gst_debug!(self.cat, obj: pad, "Flushing");
gst_debug!(CAT, obj: pad, "Flushing");
return Ok(HandleResult::Flushing);
}
let mut rec_state = self.state.lock();
rec_state.recording_state = RecordingState::Recording;
gst_debug!(
self.cat,
CAT,
obj: pad,
"Started at {}, recording duration {}",
current_running_time,
rec_state.recording_duration
);
gst_log!(self.cat, obj: pad, "Passing buffer (recording)");
gst_log!(CAT, obj: pad, "Passing buffer (recording)");
drop(rec_state);
drop(state);
@ -683,7 +686,7 @@ impl ToggleRecord {
let data = match data.clip(&state, &state.in_segment) {
None => {
gst_log!(self.cat, obj: pad, "Dropping raw data outside segment");
gst_log!(CAT, obj: pad, "Dropping raw data outside segment");
return Ok(HandleResult::Drop);
}
Some(data) => data,
@ -702,7 +705,7 @@ impl ToggleRecord {
state.current_running_time = cmp::max(current_running_time_end, state.current_running_time);
gst_log!(
self.cat,
CAT,
obj: pad,
"Secondary stream current running time {}-{} (position: {}-{}",
current_running_time,
@ -725,7 +728,7 @@ impl ToggleRecord {
&& !stream.state.lock().flushing
{
gst_log!(
self.cat,
CAT,
obj: pad,
"Waiting for reaching {} / EOS / flushing, main stream at {}",
current_running_time,
@ -738,7 +741,7 @@ impl ToggleRecord {
state = stream.state.lock();
if state.flushing {
gst_debug!(self.cat, obj: pad, "Flushing");
gst_debug!(CAT, obj: pad, "Flushing");
return Ok(HandleResult::Flushing);
}
@ -749,11 +752,7 @@ impl ToggleRecord {
if main_state.eos {
// If we have no start or stop position (we never recorded) then we're EOS too now
if rec_state.last_recording_stop.is_none() || rec_state.last_recording_start.is_none() {
gst_debug!(
self.cat,
obj: pad,
"Main stream EOS and recording never started",
);
gst_debug!(CAT, obj: pad, "Main stream EOS and recording never started",);
return Ok(HandleResult::Eos);
} else if data.can_clip(&*state)
&& current_running_time < rec_state.last_recording_start
@ -762,7 +761,7 @@ impl ToggleRecord {
// Otherwise if we're before the recording start but the end of the buffer is after
// the start and we can clip, clip the buffer and pass it onwards.
gst_debug!(
self.cat,
CAT,
obj: pad,
"Main stream EOS and we're not EOS yet (overlapping recording start, {} < {} < {})",
current_running_time,
@ -786,12 +785,12 @@ impl ToggleRecord {
segment.set_start(clip_start);
segment.set_stop(clip_stop);
gst_log!(self.cat, obj: pad, "Clipping to segment {:?}", segment,);
gst_log!(CAT, obj: pad, "Clipping to segment {:?}", segment,);
if let Some(data) = data.clip(&*state, &segment) {
return Ok(HandleResult::Pass(data));
} else {
gst_warning!(self.cat, obj: pad, "Complete buffer clipped!");
gst_warning!(CAT, obj: pad, "Complete buffer clipped!");
return Ok(HandleResult::Drop);
}
} else if current_running_time < rec_state.last_recording_start {
@ -799,7 +798,7 @@ impl ToggleRecord {
// means that we either can't clip, or that the end is also before the
// recording start
gst_debug!(
self.cat,
CAT,
obj: pad,
"Main stream EOS and we're not EOS yet (before recording start, {} < {})",
current_running_time,
@ -813,7 +812,7 @@ impl ToggleRecord {
// Similarly if the end is after the recording stop but the start is before and we
// can clip, clip the buffer and pass it through.
gst_debug!(
self.cat,
CAT,
obj: pad,
"Main stream EOS and we're not EOS yet (overlapping recording end, {} < {} < {})",
current_running_time,
@ -837,12 +836,12 @@ impl ToggleRecord {
segment.set_start(clip_start);
segment.set_stop(clip_stop);
gst_log!(self.cat, obj: pad, "Clipping to segment {:?}", segment,);
gst_log!(CAT, obj: pad, "Clipping to segment {:?}", segment,);
if let Some(data) = data.clip(&*state, &segment) {
return Ok(HandleResult::Pass(data));
} else {
gst_warning!(self.cat, obj: pad, "Complete buffer clipped!");
gst_warning!(CAT, obj: pad, "Complete buffer clipped!");
return Ok(HandleResult::Eos);
}
} else if current_running_time_end > rec_state.last_recording_stop {
@ -850,7 +849,7 @@ impl ToggleRecord {
// now. This means that we either couldn't clip or that the start is also after
// the recording stop
gst_debug!(
self.cat,
CAT,
obj: pad,
"Main stream EOS and we're EOS too (after recording end, {} > {})",
current_running_time_end,
@ -864,7 +863,7 @@ impl ToggleRecord {
assert!(current_running_time_end <= rec_state.last_recording_stop);
gst_debug!(
self.cat,
CAT,
obj: pad,
"Main stream EOS and we're not EOS yet (before recording end, {} <= {} <= {})",
rec_state.last_recording_start,
@ -884,21 +883,21 @@ impl ToggleRecord {
// be actually after that start position
assert!(rec_state.last_recording_start.is_some());
assert!(current_running_time >= rec_state.last_recording_start);
gst_log!(self.cat, obj: pad, "Passing buffer (recording)");
gst_log!(CAT, obj: pad, "Passing buffer (recording)");
Ok(HandleResult::Pass(data))
}
RecordingState::Stopping => {
// If we have no start position yet, the main stream is waiting for a key-frame
if rec_state.last_recording_stop.is_none() {
gst_log!(
self.cat,
CAT,
obj: pad,
"Passing buffer (stopping: waiting for keyframe)",
);
Ok(HandleResult::Pass(data))
} else if current_running_time_end <= rec_state.last_recording_stop {
gst_log!(
self.cat,
CAT,
obj: pad,
"Passing buffer (stopping: {} <= {})",
current_running_time_end,
@ -910,7 +909,7 @@ impl ToggleRecord {
&& current_running_time_end > rec_state.last_recording_stop
{
gst_log!(
self.cat,
CAT,
obj: pad,
"Passing buffer (stopping: {} < {} < {})",
current_running_time,
@ -927,17 +926,17 @@ impl ToggleRecord {
let mut segment = state.in_segment.clone();
segment.set_stop(clip_stop);
gst_log!(self.cat, obj: pad, "Clipping to segment {:?}", segment,);
gst_log!(CAT, obj: pad, "Clipping to segment {:?}", segment,);
if let Some(data) = data.clip(&*state, &segment) {
Ok(HandleResult::Pass(data))
} else {
gst_warning!(self.cat, obj: pad, "Complete buffer clipped!");
gst_warning!(CAT, obj: pad, "Complete buffer clipped!");
Ok(HandleResult::Drop)
}
} else {
gst_log!(
self.cat,
CAT,
obj: pad,
"Dropping buffer (stopping: {} > {})",
current_running_time_end,
@ -948,21 +947,21 @@ impl ToggleRecord {
}
RecordingState::Stopped => {
// We're properly stopped
gst_log!(self.cat, obj: pad, "Dropping buffer (stopped)");
gst_log!(CAT, obj: pad, "Dropping buffer (stopped)");
Ok(HandleResult::Drop)
}
RecordingState::Starting => {
// If we have no start position yet, the main stream is waiting for a key-frame
if rec_state.last_recording_start.is_none() {
gst_log!(
self.cat,
CAT,
obj: pad,
"Dropping buffer (starting: waiting for keyframe)",
);
Ok(HandleResult::Drop)
} else if current_running_time >= rec_state.last_recording_start {
gst_log!(
self.cat,
CAT,
obj: pad,
"Passing buffer (starting: {} >= {})",
current_running_time,
@ -974,7 +973,7 @@ impl ToggleRecord {
&& current_running_time_end > rec_state.last_recording_start
{
gst_log!(
self.cat,
CAT,
obj: pad,
"Passing buffer (starting: {} < {} < {})",
current_running_time,
@ -991,17 +990,17 @@ impl ToggleRecord {
let mut segment = state.in_segment.clone();
segment.set_start(clip_start);
gst_log!(self.cat, obj: pad, "Clipping to segment {:?}", segment,);
gst_log!(CAT, obj: pad, "Clipping to segment {:?}", segment,);
if let Some(data) = data.clip(&*state, &segment) {
Ok(HandleResult::Pass(data))
} else {
gst_warning!(self.cat, obj: pad, "Complete buffer clipped!");
gst_warning!(CAT, obj: pad, "Complete buffer clipped!");
Ok(HandleResult::Drop)
}
} else {
gst_log!(
self.cat,
CAT,
obj: pad,
"Dropping buffer (starting: {} < {})",
current_running_time,
@ -1028,7 +1027,7 @@ impl ToggleRecord {
gst::FlowError::Error
})?;
gst_log!(self.cat, obj: pad, "Handling buffer {:?}", buffer);
gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer);
{
let state = stream.state.lock();
@ -1086,16 +1085,11 @@ impl ToggleRecord {
.build(),
);
state.segment_pending = false;
gst_debug!(
self.cat,
obj: pad,
"Pending Segment {:?}",
&state.out_segment
);
gst_debug!(CAT, obj: pad, "Pending Segment {:?}", &state.out_segment);
}
if !state.pending_events.is_empty() {
gst_debug!(self.cat, obj: pad, "Pushing pending events");
gst_debug!(CAT, obj: pad, "Pushing pending events");
}
events.append(&mut state.pending_events);
@ -1113,7 +1107,7 @@ impl ToggleRecord {
};
gst_log!(
self.cat,
CAT,
obj: pad,
"Pushing buffer with running time {}: {:?}",
out_running_time,
@ -1137,7 +1131,7 @@ impl ToggleRecord {
Some(stream) => stream.clone(),
};
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
gst_log!(CAT, obj: pad, "Handling event {:?}", event);
let mut forward = true;
let mut send_pending = false;
@ -1168,12 +1162,12 @@ impl ToggleRecord {
let s = caps.get_structure(0).unwrap();
if s.get_name().starts_with("audio/") {
state.audio_info = gst_audio::AudioInfo::from_caps(caps);
gst_log!(self.cat, obj: pad, "Got audio caps {:?}", state.audio_info);
gst_log!(CAT, obj: pad, "Got audio caps {:?}", state.audio_info);
state.video_info = None;
} else if s.get_name().starts_with("video/") {
state.audio_info = None;
state.video_info = gst_video::VideoInfo::from_caps(caps);
gst_log!(self.cat, obj: pad, "Got video caps {:?}", state.video_info);
gst_log!(CAT, obj: pad, "Got video caps {:?}", state.video_info);
} else {
state.audio_info = None;
state.video_info = None;
@ -1214,12 +1208,12 @@ impl ToggleRecord {
state.segment_pending = true;
state.current_running_time = gst::CLOCK_TIME_NONE;
gst_debug!(self.cat, obj: pad, "Got new Segment {:?}", state.in_segment);
gst_debug!(CAT, obj: pad, "Got new Segment {:?}", state.in_segment);
forward = false;
}
EventView::Gap(e) => {
gst_debug!(self.cat, obj: pad, "Handling Gap event {:?}", event);
gst_debug!(CAT, obj: pad, "Handling Gap event {:?}", event);
let (pts, duration) = e.get();
let handle_result = if stream == self.main_stream {
self.handle_main_stream(element, pad, &stream, (pts, duration))
@ -1249,7 +1243,7 @@ impl ToggleRecord {
state.eos = true;
self.main_stream_cond.notify_all();
gst_debug!(
self.cat,
CAT,
obj: pad,
"Stream is EOS now, sending any pending events"
);
@ -1269,7 +1263,7 @@ impl ToggleRecord {
{
let mut state = stream.state.lock();
if state.segment_pending {
gst_log!(self.cat, obj: pad, "Storing event for later pushing");
gst_log!(CAT, obj: pad, "Storing event for later pushing");
state.pending_events.push(event);
return true;
}
@ -1297,10 +1291,10 @@ impl ToggleRecord {
}
if forward {
gst_log!(self.cat, obj: pad, "Forwarding event {:?}", event);
gst_log!(CAT, obj: pad, "Forwarding event {:?}", event);
stream.srcpad.push_event(event)
} else {
gst_log!(self.cat, obj: pad, "Dropping event {:?}", event);
gst_log!(CAT, obj: pad, "Dropping event {:?}", event);
true
}
}
@ -1323,7 +1317,7 @@ impl ToggleRecord {
Some(stream) => stream.clone(),
};
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
gst_log!(CAT, obj: pad, "Handling query {:?}", query);
stream.srcpad.peer_query(query)
}
@ -1343,7 +1337,7 @@ impl ToggleRecord {
Some(stream) => stream.clone(),
};
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
gst_log!(CAT, obj: pad, "Handling event {:?}", event);
let forward = match event.view() {
EventView::Seek(..) => false,
@ -1359,10 +1353,10 @@ impl ToggleRecord {
drop(rec_state);
if forward {
gst_log!(self.cat, obj: pad, "Forwarding event {:?}", event);
gst_log!(CAT, obj: pad, "Forwarding event {:?}", event);
stream.sinkpad.push_event(event)
} else {
gst_log!(self.cat, obj: pad, "Dropping event {:?}", event);
gst_log!(CAT, obj: pad, "Dropping event {:?}", event);
false
}
}
@ -1382,7 +1376,7 @@ impl ToggleRecord {
Some(stream) => stream.clone(),
};
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
gst_log!(CAT, obj: pad, "Handling query {:?}", query);
match query.view_mut() {
QueryView::Scheduling(ref mut q) => {
let mut new_query = gst::Query::new_scheduling();
@ -1391,7 +1385,7 @@ impl ToggleRecord {
return res;
}
gst_log!(self.cat, obj: pad, "Downstream returned {:?}", new_query);
gst_log!(CAT, obj: pad, "Downstream returned {:?}", new_query);
let (flags, min, max, align) = new_query.get_result();
q.set(flags, min, max, align);
@ -1403,7 +1397,7 @@ impl ToggleRecord {
.filter(|m| m != &gst::PadMode::Pull)
.collect::<Vec<_>>(),
);
gst_log!(self.cat, obj: pad, "Returning {:?}", q.get_mut_query());
gst_log!(CAT, obj: pad, "Returning {:?}", q.get_mut_query());
true
}
QueryView::Seeking(ref mut q) => {
@ -1415,7 +1409,7 @@ impl ToggleRecord {
gst::GenericFormattedValue::new(format, -1),
);
gst_log!(self.cat, obj: pad, "Returning {:?}", q.get_mut_query());
gst_log!(CAT, obj: pad, "Returning {:?}", q.get_mut_query());
true
}
// Position and duration is always the current recording position
@ -1454,7 +1448,7 @@ impl ToggleRecord {
}
}
_ => {
gst_log!(self.cat, obj: pad, "Forwarding query {:?}", query);
gst_log!(CAT, obj: pad, "Forwarding query {:?}", query);
stream.sinkpad.peer_query(query)
}
}
@ -1508,11 +1502,6 @@ impl ObjectSubclass for ToggleRecord {
pads.insert(main_stream.srcpad.clone(), main_stream.clone());
Self {
cat: gst::DebugCategory::new(
"togglerecord",
gst::DebugColorFlags::empty(),
Some("Toggle Record Element"),
),
settings: Mutex::new(Settings::default()),
state: Mutex::new(State::default()),
main_stream,
@ -1583,7 +1572,7 @@ impl ObjectImpl for ToggleRecord {
let mut settings = self.settings.lock();
let record = value.get_some().expect("type checked upstream");
gst_debug!(
self.cat,
CAT,
obj: element,
"Setting record from {:?} to {:?}",
settings.record,
@ -1626,7 +1615,7 @@ impl ElementImpl for ToggleRecord {
element: &gst::Element,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_trace!(self.cat, obj: element, "Changing state {:?}", transition);
gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
match transition {
gst::StateChange::ReadyToPaused => {

View file

@ -15,6 +15,7 @@ gstreamer-video = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs
gstreamer-audio = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
byte-slice-cast = "0.3"
num-traits = "0.2"
lazy_static = "1.0"
[lib]
name = "gstrstutorial"

View file

@ -16,11 +16,18 @@ use gst::subclass::prelude::*;
// Struct containing all the element data
struct Identity {
cat: gst::DebugCategory,
srcpad: gst::Pad,
sinkpad: gst::Pad,
}
lazy_static! {
static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
"rsidentity",
gst::DebugColorFlags::empty(),
Some("Identity Element"),
);
}
impl Identity {
// After creating of our two pads set all the functions on them
//
@ -82,7 +89,7 @@ impl Identity {
_element: &gst::Element,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_log!(self.cat, obj: pad, "Handling buffer {:?}", buffer);
gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer);
self.srcpad.push(buffer)
}
@ -94,7 +101,7 @@ impl Identity {
// See the documentation of gst::Event and gst::EventRef to see what can be done with
// events, and especially the gst::EventView type for inspecting events.
fn sink_event(&self, pad: &gst::Pad, _element: &gst::Element, event: gst::Event) -> bool {
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
gst_log!(CAT, obj: pad, "Handling event {:?}", event);
self.srcpad.push_event(event)
}
@ -113,7 +120,7 @@ impl Identity {
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
gst_log!(CAT, obj: pad, "Handling query {:?}", query);
self.srcpad.peer_query(query)
}
@ -126,7 +133,7 @@ impl Identity {
// See the documentation of gst::Event and gst::EventRef to see what can be done with
// events, and especially the gst::EventView type for inspecting events.
fn src_event(&self, pad: &gst::Pad, _element: &gst::Element, event: gst::Event) -> bool {
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
gst_log!(CAT, obj: pad, "Handling event {:?}", event);
self.sinkpad.push_event(event)
}
@ -145,7 +152,7 @@ impl Identity {
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
gst_log!(self.cat, obj: pad, "Handling query {:?}", query);
gst_log!(CAT, obj: pad, "Handling query {:?}", query);
self.sinkpad.peer_query(query)
}
}
@ -179,15 +186,7 @@ impl ObjectSubclass for Identity {
// Return an instance of our struct and also include our debug category here.
// The debug category will be used later whenever we need to put something
// into the debug logs
Self {
cat: gst::DebugCategory::new(
"rsidentity",
gst::DebugColorFlags::empty(),
Some("Identity Element"),
),
srcpad,
sinkpad,
}
Self { srcpad, sinkpad }
}
// Called exactly once when registering the type. Used for
@ -263,7 +262,7 @@ impl ElementImpl for Identity {
element: &gst::Element,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_trace!(self.cat, obj: element, "Changing state {:?}", transition);
gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
// Call the parent class' implementation of ::change_state()
self.parent_change_state(element, transition)

View file

@ -16,6 +16,8 @@ extern crate gstreamer_video as gst_video;
extern crate byte_slice_cast;
extern crate num_traits;
#[macro_use]
extern crate lazy_static;
mod identity;
mod progressbin;

View file

@ -16,8 +16,6 @@ use gst::subclass::prelude::*;
// Struct containing all the element data
struct ProgressBin {
#[allow(dead_code)]
cat: gst::DebugCategory,
progress: gst::Element,
srcpad: gst::GhostPad,
sinkpad: gst::GhostPad,
@ -53,15 +51,8 @@ impl ObjectSubclass for ProgressBin {
// Don't let progressreport print to stdout itself
progress.set_property("silent", &true).unwrap();
// Return an instance of our struct and also include our debug category here.
// The debug category will be used later whenever we need to put something
// into the debug logs
// Return an instance of our struct
Self {
cat: gst::DebugCategory::new(
"rsprogressbin",
gst::DebugColorFlags::empty(),
Some("Progress printing Bin"),
),
progress,
srcpad,
sinkpad,

View file

@ -71,11 +71,18 @@ struct State {
// Struct containing all the element data
struct Rgb2Gray {
cat: gst::DebugCategory,
settings: Mutex<Settings>,
state: Mutex<Option<State>>,
}
lazy_static! {
static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
"rsrgb2gray",
gst::DebugColorFlags::empty(),
Some("Rust RGB-GRAY converter"),
);
}
impl Rgb2Gray {
// Converts one pixel of BGRx to a grayscale value, shifting and/or
// inverting it as configured
@ -119,11 +126,6 @@ impl ObjectSubclass for Rgb2Gray {
// of our struct here.
fn new() -> Self {
Self {
cat: gst::DebugCategory::new(
"rsrgb2gray",
gst::DebugColorFlags::empty(),
Some("Rust RGB-GRAY converter"),
),
settings: Mutex::new(Default::default()),
state: Mutex::new(None),
}
@ -251,7 +253,7 @@ impl ObjectImpl for Rgb2Gray {
let mut settings = self.settings.lock().unwrap();
let invert = value.get_some().expect("type checked upstream");
gst_info!(
self.cat,
CAT,
obj: element,
"Changing invert from {} to {}",
settings.invert,
@ -263,7 +265,7 @@ impl ObjectImpl for Rgb2Gray {
let mut settings = self.settings.lock().unwrap();
let shift = value.get_some().expect("type checked upstream");
gst_info!(
self.cat,
CAT,
obj: element,
"Changing shift from {} to {}",
settings.shift,
@ -343,7 +345,7 @@ impl BaseTransformImpl for Rgb2Gray {
};
gst_debug!(
self.cat,
CAT,
obj: element,
"Transformed caps from {} to {} in direction {:?}",
caps,
@ -389,7 +391,7 @@ impl BaseTransformImpl for Rgb2Gray {
};
gst_debug!(
self.cat,
CAT,
obj: element,
"Configured for caps {} to {}",
incaps,
@ -407,7 +409,7 @@ impl BaseTransformImpl for Rgb2Gray {
// Drop state
let _ = self.state.lock().unwrap().take();
gst_info!(self.cat, obj: element, "Stopped");
gst_info!(CAT, obj: element, "Stopped");
Ok(())
}

View file

@ -137,12 +137,19 @@ struct ClockWait {
// Struct containing all the element data
struct SineSrc {
cat: gst::DebugCategory,
settings: Mutex<Settings>,
state: Mutex<State>,
clock_wait: Mutex<ClockWait>,
}
lazy_static! {
static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
"rssinesrc",
gst::DebugColorFlags::empty(),
Some("Rust Sine Wave Source"),
);
}
impl SineSrc {
fn process<F: Float + FromByteSlice>(
data: &mut [u8],
@ -204,11 +211,6 @@ impl ObjectSubclass for SineSrc {
// of our struct here.
fn new() -> Self {
Self {
cat: gst::DebugCategory::new(
"rssinesrc",
gst::DebugColorFlags::empty(),
Some("Rust Sine Wave Source"),
),
settings: Mutex::new(Default::default()),
state: Mutex::new(Default::default()),
clock_wait: Mutex::new(ClockWait {
@ -306,7 +308,7 @@ impl ObjectImpl for SineSrc {
let mut settings = self.settings.lock().unwrap();
let samples_per_buffer = value.get_some().expect("type checked upstream");
gst_info!(
self.cat,
CAT,
obj: basesrc,
"Changing samples-per-buffer from {} to {}",
settings.samples_per_buffer,
@ -322,7 +324,7 @@ impl ObjectImpl for SineSrc {
let mut settings = self.settings.lock().unwrap();
let freq = value.get_some().expect("type checked upstream");
gst_info!(
self.cat,
CAT,
obj: basesrc,
"Changing freq from {} to {}",
settings.freq,
@ -334,7 +336,7 @@ impl ObjectImpl for SineSrc {
let mut settings = self.settings.lock().unwrap();
let volume = value.get_some().expect("type checked upstream");
gst_info!(
self.cat,
CAT,
obj: basesrc,
"Changing volume from {} to {}",
settings.volume,
@ -346,7 +348,7 @@ impl ObjectImpl for SineSrc {
let mut settings = self.settings.lock().unwrap();
let mute = value.get_some().expect("type checked upstream");
gst_info!(
self.cat,
CAT,
obj: basesrc,
"Changing mute from {} to {}",
settings.mute,
@ -358,7 +360,7 @@ impl ObjectImpl for SineSrc {
let mut settings = self.settings.lock().unwrap();
let is_live = value.get_some().expect("type checked upstream");
gst_info!(
self.cat,
CAT,
obj: basesrc,
"Changing is-live from {} to {}",
settings.is_live,
@ -442,10 +444,10 @@ impl BaseSrcImpl for SineSrc {
use std::f64::consts::PI;
let info = gst_audio::AudioInfo::from_caps(caps).ok_or_else(|| {
gst_loggable_error!(self.cat, "Failed to build `AudioInfo` from caps {}", caps)
gst_loggable_error!(CAT, "Failed to build `AudioInfo` from caps {}", caps)
})?;
gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps);
gst_debug!(CAT, obj: element, "Configuring for caps {}", caps);
element.set_blocksize(info.bpf() * (*self.settings.lock().unwrap()).samples_per_buffer);
@ -493,7 +495,7 @@ impl BaseSrcImpl for SineSrc {
*self.state.lock().unwrap() = Default::default();
self.unlock_stop(element)?;
gst_info!(self.cat, obj: element, "Started");
gst_info!(CAT, obj: element, "Started");
Ok(())
}
@ -504,7 +506,7 @@ impl BaseSrcImpl for SineSrc {
*self.state.lock().unwrap() = Default::default();
self.unlock(element)?;
gst_info!(self.cat, obj: element, "Stopped");
gst_info!(CAT, obj: element, "Stopped");
Ok(())
}
@ -531,7 +533,7 @@ impl BaseSrcImpl for SineSrc {
let latency = gst::SECOND
.mul_div_floor(settings.samples_per_buffer as u64, info.rate() as u64)
.unwrap();
gst_debug!(self.cat, obj: element, "Returning latency {}", latency);
gst_debug!(CAT, obj: element, "Returning latency {}", latency);
q.set(settings.is_live, latency, gst::CLOCK_TIME_NONE);
true
} else {
@ -568,7 +570,7 @@ impl BaseSrcImpl for SineSrc {
// point but at most samples_per_buffer samples per buffer
let n_samples = if let Some(sample_stop) = state.sample_stop {
if sample_stop <= state.sample_offset {
gst_log!(self.cat, obj: element, "At EOS");
gst_log!(CAT, obj: element, "At EOS");
return Err(gst::FlowError::Eos);
}
@ -661,7 +663,7 @@ impl BaseSrcImpl for SineSrc {
// so that we immediately stop waiting on e.g. shutdown.
let mut clock_wait = self.clock_wait.lock().unwrap();
if clock_wait.flushing {
gst_debug!(self.cat, obj: element, "Flushing");
gst_debug!(CAT, obj: element, "Flushing");
return Err(gst::FlowError::Flushing);
}
@ -670,31 +672,25 @@ impl BaseSrcImpl for SineSrc {
drop(clock_wait);
gst_log!(
self.cat,
CAT,
obj: element,
"Waiting until {}, now {}",
wait_until,
clock.get_time()
);
let (res, jitter) = id.wait();
gst_log!(
self.cat,
obj: element,
"Waited res {:?} jitter {}",
res,
jitter
);
gst_log!(CAT, obj: element, "Waited res {:?} jitter {}", res, jitter);
self.clock_wait.lock().unwrap().clock_id.take();
// If the clock ID was unscheduled, unlock() was called
// and we should return Flushing immediately.
if res == Err(gst::ClockError::Unscheduled) {
gst_debug!(self.cat, obj: element, "Flushing");
gst_debug!(CAT, obj: element, "Flushing");
return Err(gst::FlowError::Flushing);
}
}
gst_debug!(self.cat, obj: element, "Produced buffer {:?}", buffer);
gst_debug!(CAT, obj: element, "Produced buffer {:?}", buffer);
Ok(buffer)
}
@ -733,7 +729,7 @@ impl BaseSrcImpl for SineSrc {
// and for calculating the timestamps, etc.
if segment.get_rate() < 0.0 {
gst_error!(self.cat, obj: element, "Reverse playback not supported");
gst_error!(CAT, obj: element, "Reverse playback not supported");
return false;
}
@ -765,7 +761,7 @@ impl BaseSrcImpl for SineSrc {
(sample_offset as f64).rem(2.0 * PI * (settings.freq as f64) / (rate as f64));
gst_debug!(
self.cat,
CAT,
obj: element,
"Seeked to {}-{:?} (accum: {}) for segment {:?}",
sample_offset,
@ -787,7 +783,7 @@ impl BaseSrcImpl for SineSrc {
if state.info.is_none() {
gst_error!(
self.cat,
CAT,
obj: element,
"Can only seek in Default format if sample rate is known"
);
@ -801,7 +797,7 @@ impl BaseSrcImpl for SineSrc {
(sample_offset as f64).rem(2.0 * PI * (settings.freq as f64) / (rate as f64));
gst_debug!(
self.cat,
CAT,
obj: element,
"Seeked to {}-{:?} (accum: {}) for segment {:?}",
sample_offset,
@ -820,7 +816,7 @@ impl BaseSrcImpl for SineSrc {
true
} else {
gst_error!(
self.cat,
CAT,
obj: element,
"Can't seek in format {:?}",
segment.get_format()
@ -833,7 +829,7 @@ impl BaseSrcImpl for SineSrc {
fn unlock(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
// This should unblock the create() function ASAP, so we
// just unschedule the clock it here, if any.
gst_debug!(self.cat, obj: element, "Unlocking");
gst_debug!(CAT, obj: element, "Unlocking");
let mut clock_wait = self.clock_wait.lock().unwrap();
if let Some(clock_id) = clock_wait.clock_id.take() {
clock_id.unschedule();
@ -846,7 +842,7 @@ impl BaseSrcImpl for SineSrc {
fn unlock_stop(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
// This signals that unlocking is done, so we can reset
// all values again.
gst_debug!(self.cat, obj: element, "Unlock stop");
gst_debug!(CAT, obj: element, "Unlock stop");
let mut clock_wait = self.clock_wait.lock().unwrap();
clock_wait.flushing = false;