From b160278c6bb01972d2ef54fae421339046863ce4 Mon Sep 17 00:00:00 2001 From: Rafael Caricio Date: Thu, 13 May 2021 19:54:23 +0200 Subject: [PATCH] Use gio::OutputStream to enable flexibility --- .gitignore | 2 + Cargo.toml | 5 +- src/imp.rs | 116 ++++++++++++++++--------------------------- tests/flexhlssink.rs | 6 +-- 4 files changed, 50 insertions(+), 79 deletions(-) diff --git a/.gitignore b/.gitignore index 96ef6c0..cd695fc 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ /target Cargo.lock +playlist.m3u8 +segment* diff --git a/Cargo.toml b/Cargo.toml index c95524f..c9bd276 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,8 +12,9 @@ crate-type = ["cdylib", "rlib", "staticlib"] path = "src/lib.rs" [dependencies] -glib = { git = "https://github.com/gtk-rs/gtk-rs" } -gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_14"] } +glib = { git = "https://github.com/gtk-rs/gtk-rs-core" } +gio = { git = "https://github.com/gtk-rs/gtk-rs-core" } +gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_16"] } gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_16"] } gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_14"]} once_cell = "1.7.2" diff --git a/src/imp.rs b/src/imp.rs index 311d2fe..9937f93 100644 --- a/src/imp.rs +++ b/src/imp.rs @@ -1,3 +1,4 @@ +use gio::prelude::*; use glib::subclass::prelude::*; use gst::prelude::*; use gst::subclass::prelude::*; @@ -10,6 +11,7 @@ use std::fs::{File, OpenOptions}; use std::io::Write; use std::path; use std::sync::{Arc, Mutex, MutexGuard}; +use gio::glib::WeakRef; const DEFAULT_LOCATION: &str = "segment%05d.ts"; const DEFAULT_PLAYLIST_LOCATION: &str = "playlist.m3u8"; @@ -40,7 +42,7 @@ struct Settings { // TODO: old_locations ? Maybe just use another thread and send msgs with files to delete ? splitmuxsink: Option, - app_sink: Option, + giostreamsink: Option, muxer: Option, video_sink: bool, audio_sink: bool, @@ -58,7 +60,7 @@ impl Default for Settings { send_keyframe_requests: DEFAULT_SEND_KEYFRAME_REQUESTS, splitmuxsink: None, - app_sink: None, + giostreamsink: None, muxer: None, video_sink: false, audio_sink: false, @@ -110,17 +112,13 @@ impl FlexHlsSink { } => (current_segment_location, current_segment_file), }; - let settings = self.settings.lock().unwrap(); + let mut settings = self.settings.lock().unwrap(); let seq_num = format!("{:0>5}", fragment_id); let segment_file_location = settings .location .replace(BACKWARDS_COMPATIBLE_PLACEHOLDER, &seq_num); - gst_trace!( - CAT, - "Segment location formatted: {}", - segment_file_location - ); + gst_trace!(CAT, "Segment location formatted: {}", segment_file_location); let segment_file_location_clone = segment_file_location.clone(); let segment_file = File::create(&segment_file_location).map_err(move |err| { @@ -135,6 +133,12 @@ impl FlexHlsSink { *current_segment_location = Some(segment_file_location.clone()); *current_segment_file = Some(segment_file); + let giostreamsink = settings.giostreamsink.as_ref().unwrap(); + let stream = self + .get_fragment_stream(segment_file_location.clone()) + .map_err(|err| err.to_string())?; + giostreamsink.set_property("stream", &stream).unwrap(); + gst_info!( CAT, "New segment location: {}", @@ -143,6 +147,21 @@ impl FlexHlsSink { Ok(segment_file_location) } + fn get_fragment_stream(&self, location: String) -> Result { + let file_stream = File::create(&location).map_err(|err| { + glib::Error::new( + gst::URIError::BadReference, + format!( + "Could create segment file {} for writing: {}", + &location, + err.to_string() + ) + .as_str(), + ) + })?; + Ok(gio::WriteOutputStream::new(file_stream)) + } + fn start( &self, element: &super::FlexHlsSink, @@ -486,11 +505,9 @@ impl ObjectImpl for FlexHlsSink { let splitmuxsink = gst::ElementFactory::make("splitmuxsink", Some("split_mux_sink")) .expect("Could not make element splitmuxsink"); - let app_sink = gst::ElementFactory::make("appsink", Some("giostreamsink_replacement_sink")) - .expect("Could not make element appsink"); - // app_sink.set_property("sync", &false).unwrap(); - // app_sink.set_property("async", &false).unwrap(); - // app_sink.set_property("emit-signals", &true).unwrap(); + let giostreamsink = gst::ElementFactory::make("giostreamsink", Some("giostream_sink")) + .expect("Could not make element giostreamsink"); + giostreamsink.set_property("async", &false).unwrap(); let mux = gst::ElementFactory::make("mpegtsmux", Some("mpeg-ts_mux")) .expect("Could not make element mpegtsmux"); @@ -505,9 +522,8 @@ impl ObjectImpl for FlexHlsSink { ), ("send-keyframe-requests", &true), ("muxer", &mux), - ("sink", &app_sink), + ("sink", &giostreamsink), ("reset-muxer", &false), - ("async-finalize", &false), ]) .unwrap(); @@ -531,59 +547,11 @@ impl ObjectImpl for FlexHlsSink { }) .unwrap(); - let appsink = app_sink.downcast_ref::().unwrap(); - appsink.set_emit_signals(true); - - appsink.connect_eos(|appsink| { - gst_info!(CAT, "Got EOS from giostreamsink_replacement_sink"); - }); - - let this = self.clone(); - let element_weak = obj.downgrade(); - appsink.connect_new_sample(move |appsink| { - gst_info!(CAT, "Got new sample from giostreamsink_replacement_sink"); - - let sample = appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?; - let buffer = sample.buffer().ok_or(gst::FlowError::Error)?; - - gst_info!(CAT, "Got new sample buffer[{}]", buffer.size()); - - let mut state = this.state.lock().unwrap(); - let (current_segment_file, current_segment_location) = match &mut *state { - State::Stopped => return Err(gst::FlowError::Error), - State::Started { - current_segment_file, - current_segment_location, - .. - } => (current_segment_file, current_segment_location), - }; - - if let (Some(segment_file), Some(segment_location)) = - (current_segment_file, current_segment_location) - { - let segment_location = segment_location.clone(); - let data = buffer.map_readable().unwrap(); - segment_file.write(&data).map_err(|err| { - let error_msg = gst::error_msg!( - gst::ResourceError::OpenWrite, - [ - "Could not write to segment file \"{}\": {}", - segment_location, - err.to_string(), - ] - ); - let element = element_weak.upgrade().unwrap(); - element.post_error_message(error_msg); - - gst::FlowError::Error - })?; - } - - Ok(gst::FlowSuccess::Ok) - }); + let temp_stream = gio::MemoryOutputStream::new_resizable(); + giostreamsink.set_property("stream", &temp_stream).unwrap(); settings.splitmuxsink = Some(splitmuxsink); - settings.app_sink = Some(app_sink); + settings.giostreamsink = Some(giostreamsink); settings.muxer = Some(mux); } } @@ -679,7 +647,9 @@ impl ElementImpl for FlexHlsSink { Some(sms) => sms, }; let peer_pad = splitmuxsink.request_pad_simple("audio_0").unwrap(); - let sink_pad = gst::GhostPad::from_template_with_target(&templ, Some("audio"), &peer_pad).unwrap(); + let sink_pad = + gst::GhostPad::from_template_with_target(&templ, Some("audio"), &peer_pad) + .unwrap(); element.add_pad(&sink_pad).unwrap(); sink_pad.set_active(true).unwrap(); settings.audio_sink = true; @@ -701,7 +671,9 @@ impl ElementImpl for FlexHlsSink { }; let peer_pad = splitmuxsink.request_pad_simple("video").unwrap(); - let sink_pad = gst::GhostPad::from_template_with_target(&templ, Some("video"), &peer_pad).unwrap(); + let sink_pad = + gst::GhostPad::from_template_with_target(&templ, Some("video"), &peer_pad) + .unwrap(); element.add_pad(&sink_pad).unwrap(); sink_pad.set_active(true).unwrap(); settings.video_sink = true; @@ -709,13 +681,9 @@ impl ElementImpl for FlexHlsSink { Some(sink_pad.upcast()) } None => { - gst_debug!( - CAT, - obj: element, - "template name returned `None`", - ); + gst_debug!(CAT, obj: element, "template name returned `None`",); None - }, + } Some(other_name) => { gst_debug!( CAT, diff --git a/tests/flexhlssink.rs b/tests/flexhlssink.rs index 6618d32..7df4f6f 100644 --- a/tests/flexhlssink.rs +++ b/tests/flexhlssink.rs @@ -29,7 +29,7 @@ fn init() { fn test_basic_element_with_video_content() { init(); - const BUFFER_NB: i32 = 200; + const BUFFER_NB: i32 = 100; let pipeline = gst::Pipeline::new(Some("video_pipeline")); @@ -93,7 +93,7 @@ fn test_basic_element_with_video_content() { let sample = appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?; let buffer = sample.buffer().ok_or(gst::FlowError::Error)?; - gst_info!(CAT, "TEST sample buffer[{}]", buffer.size()); + //gst_info!(CAT, "TEST sample buffer[{}]", buffer.size()); sender.send(()).unwrap(); Ok(gst::FlowSuccess::Ok) @@ -108,7 +108,7 @@ fn test_basic_element_with_video_content() { ); for idx in 0..BUFFER_NB { receiver.recv().unwrap(); - gst_info!(CAT, "flexhlssink_video_pipeline: received buffer #{}", idx); + //gst_info!(CAT, "flexhlssink_video_pipeline: received buffer #{}", idx); } pipeline.set_state(gst::State::Null).unwrap();