slim-live/src/main.rs

255 lines
9.6 KiB
Rust

use anyhow::Error;
use gst::{Buffer, FlowError, FlowSuccess, glib};
use gst::meta::CustomMeta;
use gst::prelude::*;
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex};
static IDENTITY_META: &str = "IdentityMeta";
static BUFFERS_AHEAD_META: &str = "BuffersAheadMeta";
trait UidSetter {
fn set_uid(&mut self, id: String);
}
trait UidGetter {
fn uid(&self) -> Option<String>;
}
impl UidSetter for &mut gst::BufferRef {
fn set_uid(&mut self, id: String) {
let mut meta = gst::meta::CustomMeta::add(self, IDENTITY_META).unwrap();
let s = meta.mut_structure();
s.set("id", id);
}
}
impl UidGetter for &gst::BufferRef {
fn uid(&self) -> Option<String> {
let meta = match CustomMeta::from_buffer(self, IDENTITY_META) {
Ok(m) => m,
Err(_) => return None,
};
let s = meta.structure();
s.get::<String>("id").ok()
}
}
trait BuffersAheadSetter {
fn set_buffers_ahead(&mut self, buffers: Vec<gst::Buffer>);
}
trait BuffersAheadGetter {
fn buffers_ahead(&self) -> Option<Vec<gst::Buffer>>;
}
impl BuffersAheadSetter for &mut gst::BufferRef {
fn set_buffers_ahead(&mut self, buffers: Vec<gst::Buffer>) {
let mut meta = gst::meta::CustomMeta::add(self, BUFFERS_AHEAD_META).unwrap();
let s = meta.mut_structure();
s.set("buffers", gst::Array::new(buffers));
}
}
impl BuffersAheadGetter for &gst::BufferRef {
fn buffers_ahead(&self) -> Option<Vec<gst::Buffer>> {
let meta = match CustomMeta::from_buffer(self, BUFFERS_AHEAD_META) {
Ok(m) => m,
Err(_) => return None,
};
let s = meta.structure();
if let Ok(buffers) = s.get::<gst::Array>("buffers") {
// println!("\n\t------Found the list\n");
//let v = buffers.to_vec().map(|v| v.get_owned::<gst::Buffer>().unwrap()).collect::<Vec<_>>();
let mut res = Vec::new();
for buffer in buffers.iter() {
let b = buffer.to_value().get_owned::<gst::Buffer>().unwrap();
res.push(b);
}
Some(res)
} else {
None
}
}
}
fn main() -> Result<(), Error> {
gst::init()?;
let pipeline = gst::parse_launch(r#"
srtsrc uri=srt://127.0.0.1:7002 ! tsdemux ! h264parse update-timecode=true name=parser ! tee name=t
t. ! queue ! avdec_h264 ! appsink name=picture-type-setter
t. ! queue name=drop-b-frames ! identity name=include-back-b-frame-buffers ! avdec_h264 ! videoconvert ! timeoverlay ! gtksink
"#,)?.downcast::<gst::Pipeline>().unwrap();
/*
t. ! queue name=drop-b-frames ! transcriberbin name=trans cc-caps=closedcaption/x-cea-708,format=cc_data
trans.src_video ! queue name=include-back-b-frame-buffers ! avdec_h264 ! videoconvert ! timeoverlay ! cea608overlay black-background=1 ! gtksink
trans.src_audio ! audioconvert ! audiorate ! audioresample ! autoaudiosink
*/
gst::meta::CustomMeta::register(IDENTITY_META, &[]);
gst::meta::CustomMeta::register(BUFFERS_AHEAD_META, &[]);
let id_gen_shared = Arc::new(Mutex::new(ulid::Generator::new()));
let buffer_type_store = Arc::new(Mutex::new(HashMap::new()));
let last_dropped_buffers = Arc::new(Mutex::new(Vec::new()));
let parser = pipeline.by_name("parser").unwrap();
parser
.static_pad("src")
.unwrap()
.add_probe(gst::PadProbeType::BUFFER, {
let id_gen_shared = id_gen_shared.clone();
move |_pad, probe_info| {
if let Some(gst::PadProbeData::Buffer(ref mut buffer)) = probe_info.data {
let mut id_gen = id_gen_shared.lock().unwrap();
let mut buffer = buffer.get_mut().unwrap();
buffer.set_uid(id_gen.generate().unwrap().to_string());
}
gst::PadProbeReturn::Ok
}
});
let picture_type_setter = pipeline
.by_name("picture-type-setter")
.unwrap()
.downcast::<gst_app::AppSink>()
.unwrap();
picture_type_setter.set_callbacks(
gst_app::AppSinkCallbacks::builder()
// Add a handler to the "new-sample" signal.
.new_sample({
let buffer_type_store = buffer_type_store.clone();
move |appsink| {
// Pull the sample in question out of the appsink's buffer.
let sample = appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.buffer().ok_or(gst::FlowError::Error)?;
// Pull FFMpeg picture information
let picture_type = {
let ffmpeg_meta = CustomMeta::from_buffer(buffer, "GstFFMpegPictureMeta")
.map_err(|_| {
eprintln!("Buffer does not contain FFMpeg meta information, you need to use a modified version of GStreamer");
gst::FlowError::Eos
})?;
let s = ffmpeg_meta.structure();
s.get::<String>("picture-type").map_err(|_| gst::FlowError::Eos)?
};
let buffer_id = buffer.uid().ok_or_else(|| {
eprintln!("Buffer does not contain Identity meta information");
gst::FlowError::Eos
})?;
let mut type_store = buffer_type_store.lock().unwrap();
type_store.insert(buffer_id, picture_type);
Ok(gst::FlowSuccess::Ok)
}
})
.build(),
);
let drop_b_frames_queue = pipeline.by_name("drop-b-frames").unwrap();
let queue_srcpad = drop_b_frames_queue.static_pad("src").unwrap();
queue_srcpad.set_offset(gst::ClockTime::from_mseconds(500).nseconds() as i64);
queue_srcpad.add_probe(gst::PadProbeType::BUFFER, {
let buffer_type_store = buffer_type_store.clone();
let last_dropped_buffers = last_dropped_buffers.clone();
move |_pad, probe_info| {
if let Some(gst::PadProbeData::Buffer(ref mut buffer)) = probe_info.data {
let buffer_id = buffer.as_ref().uid().expect("Buffer UID available");
let type_store = buffer_type_store.lock().unwrap();
let mut last_dropped_buffers = last_dropped_buffers.lock().unwrap();
if let Some(pic_type) = type_store.get(&buffer_id) {
if pic_type == "B" {
// Drop B-frame buffers, but save it to cache for later lookup
last_dropped_buffers.push( buffer.to_owned());
println!("Dropped buffer {buffer_id}");
return gst::PadProbeReturn::Drop;
} else if last_dropped_buffers.len() > 0 {
println!("Found some dropped buffers ahead of {buffer_id}: {:?}", last_dropped_buffers.iter().map(|b| b.as_ref().uid()).collect::<Vec<_>>());
// it is the first non-B-frame buffer since we last dropped
let mut buffer = buffer.get_mut().unwrap();
buffer.set_buffers_ahead(last_dropped_buffers.to_owned());
*last_dropped_buffers = Vec::new();
}
} else {
// The first buffer does not have picture type available, but next ones should have it
println!("Buffer with id({buffer_id}) does not have picture type information");
}
}
gst::PadProbeReturn::Ok
}
});
let reconstruct = pipeline.by_name("include-back-b-frame-buffers").unwrap();
reconstruct
.static_pad("src")
.unwrap()
.add_probe(gst::PadProbeType::BUFFER, |pad, probe_info| {
if let Some(gst::PadProbeData::Buffer(ref buffer)) = probe_info.data {
let mut reconstructed = VecDeque::new();
for buffer_ahead in buffer.as_ref().buffers_ahead().unwrap_or_else(|| Vec::new()) {
let buffer_id = buffer_ahead.as_ref().uid().unwrap();
pad.push(buffer_ahead).expect(format!("Could not push buffer: {buffer_id}").as_str());
reconstructed.push_front(buffer_id);
}
reconstructed.push_front(buffer.as_ref().uid().unwrap());
println!("Reconstructed buffer line: {:?}", reconstructed);
}
gst::PadProbeReturn::Ok
});
pipeline.set_state(gst::State::Playing)?;
let context = glib::MainContext::default();
let main_loop = glib::MainLoop::new(Some(&context), false);
pipeline.set_state(gst::State::Playing)?;
let bus = pipeline.bus().unwrap();
bus.add_watch({
let main_loop = main_loop.clone();
move |_, msg| {
use gst::MessageView;
let main_loop = &main_loop;
match msg.view() {
MessageView::Eos(..) => main_loop.quit(),
MessageView::Error(err) => {
println!(
"Error from {:?}: {} ({:?})",
err.src().map(|s| s.path_string()),
err.error(),
err.debug()
);
main_loop.quit();
}
_ => (),
};
glib::Continue(true)
}
})
.expect("Failed to add bus watch");
main_loop.run();
pipeline.set_state(gst::State::Null)?;
bus.remove_watch().unwrap();
pipeline.set_state(gst::State::Null)?;
Ok(())
}