From 2e9e98f8f6de5b8fe926715096ab5368ef047b69 Mon Sep 17 00:00:00 2001 From: Rafael Caricio Date: Sat, 8 May 2021 21:05:13 +0200 Subject: [PATCH] Receiving video content --- src/data.rs | 19 ++++ src/imp.rs | 287 ++++++++++++++++++++++++++++++++++++++++++++++---- src/lib.rs | 1 + src/server.rs | 61 ++++++++++- 4 files changed, 341 insertions(+), 27 deletions(-) create mode 100644 src/data.rs diff --git a/src/data.rs b/src/data.rs new file mode 100644 index 0000000..e02d64e --- /dev/null +++ b/src/data.rs @@ -0,0 +1,19 @@ +use bytes::Bytes; +use rml_rtmp::sessions::StreamMetadata; + +pub enum RtmpInput { + Media(Media), + Metadata(StreamMetadata), +} + +pub struct Media { + pub media_type: MediaType, + pub data: Bytes, + pub timestamp: u32, + pub can_be_dropped: bool, +} + +pub enum MediaType { + Video, + Audio, +} diff --git a/src/imp.rs b/src/imp.rs index 18d49d4..d7172b9 100644 --- a/src/imp.rs +++ b/src/imp.rs @@ -1,15 +1,20 @@ -use crate::server::Server; -use bytes::Bytes; +use crate::connection::{Connection, ConnectionError, ReadResult}; +use crate::data::{MediaType, RtmpInput}; +use crate::server::{Server, ServerResult}; use glib::subclass; use glib::subclass::prelude::*; use gst::prelude::*; use gst::subclass::prelude::*; -use gst::{gst_debug, gst_error, gst_info, gst_log}; +use gst::{gst_debug, gst_trace, gst_info}; use gst_base::prelude::*; use gst_base::subclass::prelude::*; use once_cell::sync::Lazy; +use slab::Slab; +use std::collections::HashSet; +use std::net::{TcpListener, TcpStream}; +use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; use std::sync::Mutex; -use std::u32; +use std::{thread, u32}; static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( @@ -26,6 +31,7 @@ const DEFAULT_PORT: u32 = 5000; struct Settings { address: String, port: u32, + stream_key: Option, } impl Default for Settings { @@ -33,11 +39,12 @@ impl Default for Settings { Settings { address: DEFAULT_ADDRESS.into(), port: DEFAULT_PORT, + stream_key: None, } } } -static PROPERTIES: [subclass::Property; 2] = [ +static PROPERTIES: [subclass::Property; 3] = [ subclass::Property("address", |name| { glib::ParamSpec::string( name, @@ -58,12 +65,26 @@ static PROPERTIES: [subclass::Property; 2] = [ glib::ParamFlags::READWRITE, ) }), + subclass::Property("stream_key", |name| { + glib::ParamSpec::string( + name, + "Stream Key", + "The stream key to expect content to be published", + DEFAULT_ADDRESS.into(), + glib::ParamFlags::READWRITE, + ) + }), ]; #[derive(Debug)] enum State { Stopped, - Started { stream_key: String }, + Started { + source: Receiver, + position: u64, + video_caps: Option, + audio_caps: Option, + }, } impl Default for State { @@ -156,6 +177,7 @@ impl ObjectImpl for RtmpSvrSrc { fn constructed(&self, obj: &Self::Type) { self.parent_constructed(obj); + obj.set_live(true); // this is always a live source! obj.set_automatic_eos(false); obj.set_format(gst::Format::Bytes); } @@ -166,16 +188,57 @@ impl ElementImpl for RtmpSvrSrc {} impl BaseSrcImpl for RtmpSvrSrc { fn start(&self, src: &Self::Type) -> Result<(), gst::ErrorMessage> { // TODO: Here we start the server.. + // TODO: consider sharing context with other gst elements + // - Create a socket + + let (media_sender, media_receiver) = channel(); + let mut state = self.state.lock().unwrap(); + if let State::Started { .. } = *state { + return Ok(()); + } + + let settings = self.settings.lock().unwrap(); + let address = format!("{}:{}", settings.address, settings.port); + let listener = TcpListener::bind(&address).map_err(|err| { + gst::error_msg!( + gst::ResourceError::Busy, + ["Failed to bind to address {}: {}", address, err] + ) + })?; + + let (connection_sender, connection_receiver) = channel(); + + // TODO: Capture the join handle and use it to gracefully shutdown + thread::spawn(|| handle_connections(media_sender, connection_receiver)); + thread::spawn(|| accept_connections(connection_sender, listener)); + *state = State::Started { + source: media_receiver, + position: 0, + video_caps: None, + audio_caps: None, + }; + + // - Create channel to receive data (metadata and media data) + // - Create a thread that handle connections + // - Wait for clients to connect + // - Whenever a client connects and starts sending content + // - When a Connection is active, we can "start" the RtmpSvrSrc to publish buffers of + // content to the stream. Ok(()) } fn stop(&self, src: &Self::Type) -> Result<(), gst::ErrorMessage> { gst_debug!(CAT, obj: src, "Stopping"); // TODO: Here we stop the server + // - Notify the server to stop accepting connections + // - Notify the connections the stream ended + let mut state = self.state.lock().unwrap(); + *state = State::Stopped; Ok(()) } fn is_seekable(&self, _src: &Self::Type) -> bool { + // We cannot go back to previous content false } @@ -206,26 +269,206 @@ impl BaseSrcImpl for RtmpSvrSrc { impl PushSrcImpl for RtmpSvrSrc { fn create(&self, src: &Self::Type) -> Result { let mut state = self.state.lock().unwrap(); - - // gst_debug!(CAT, obj: src, "End of stream"); - // Err(gst::FlowError::Eos) - - match *state { - State::Started { .. } => { - let chunk = Bytes::from("Mock"); - // Here we return the buffer - let size = chunk.len(); - assert_ne!(chunk.len(), 0); - - let buffer = gst::Buffer::from_slice(chunk); - - Ok(buffer) - } + let (source, position, video_caps, audio_caps) = match *state { State::Stopped => { gst::element_error!(src, gst::LibraryError::Failed, ["Not started yet"]); - return Err(gst::FlowError::Error); } + State::Started { + ref source, + ref mut position, + ref mut video_caps, + ref mut audio_caps, + } => (source, position, video_caps, audio_caps), + }; + + loop { + let input = match source.try_recv() { + Result::Ok(i) => i, + Result::Err(TryRecvError::Empty) => continue, // blocks waiting for content + Result::Err(TryRecvError::Disconnected) => { + return Err(gst::FlowError::Eos); + } + }; + + match input { + RtmpInput::Metadata(metadata) => { + println!("Metadata: {:?}", metadata); + + // TODO: check for the actual format, do not just assume + let mut caps = gst::Caps::builder("video/x-flv"); + //gst::Caps::builder("video/x-h264").field("stream-format", &"avc"); + + if let Some(val) = metadata.video_height { + caps = caps.field("height", &val); + } + if let Some(val) = metadata.video_width { + caps = caps.field("width", &val); + } + if let Some(val) = metadata.video_frame_rate { + caps = caps.field("framerate", &val); + } + + *video_caps = Some(caps.build()); + + // TODO: check for the actual format, do not just assume acc + let mut caps = gst::Caps::builder("audio/mpeg") + .field("mpegversion", &4i32) + .field("framed", &true) + .field("stream-format", &"raw"); + + if let Some(val) = metadata.audio_channels { + caps = caps.field("channels", &val); + } + if let Some(val) = metadata.audio_sample_rate { + caps = caps.field("rate", &val); + } + + *audio_caps = Some(caps.build()); + } + RtmpInput::Media(media) => { + // TODO: Decide what to set based on content + if let (MediaType::Video, Some(video_caps)) = + (media.media_type, video_caps.as_ref()) + { + gst_info!(CAT, obj: src, "Setting {:?}", video_caps); + src.set_caps(video_caps) + .map_err(|_| gst::FlowError::NotNegotiated)?; + + // let templ = src.get_element_class().get_pad_template("video").unwrap(); + + let chunk = media.data; + // Here we return the buffer + let size = chunk.len(); + assert_ne!(chunk.len(), 0); + + let offset = *position; + *position += size as u64; + + gst_info!( + CAT, + obj: src, + "Chunk of {} bytes received at offset {}", + chunk.len(), + offset + ); + + let mut buffer = gst::Buffer::from_slice(chunk); + + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_offset(offset); + buffer.set_offset_end(offset + size as u64); + } + + return Ok(buffer); + } + // TODO: Handle Audio content + } + } + } + + gst::element_error!(src, gst::LibraryError::TooLazy, ["No content yet"]); + return Err(gst::FlowError::Error); + } +} + +/// Accepts TCP connections +fn accept_connections(connection_sender: Sender, listener: TcpListener) { + println!("Listening for connections..."); + for stream in listener.incoming() { + println!("New connection!"); + match connection_sender.send(stream.unwrap()) { + Ok(_) => (), + Err(error) => panic!("Error sending stream to connection handler: {:?}", error), + } + } +} + +/// Handle the lifecycle of all TCP connections by sending and receiving data +fn handle_connections(media_sink: Sender, connection_receiver: Receiver) { + let mut connections = Slab::new(); + let mut connection_ids = HashSet::new(); + let mut server = Server::new(media_sink); + + loop { + match connection_receiver.try_recv() { + Err(TryRecvError::Disconnected) => panic!("Connection receiver closed"), + Err(TryRecvError::Empty) => (), + Ok(stream) => { + let connection = Connection::new(stream); + let id = connections.insert(connection); + let connection = connections.get_mut(id).unwrap(); + connection.connection_id = Some(id); + connection_ids.insert(id); + + println!("Connection {} started", id); + } + } + + let mut ids_to_clear = Vec::new(); + let mut packets_to_write = Vec::new(); + for connection_id in &connection_ids { + let connection = connections.get_mut(*connection_id).unwrap(); + match connection.read() { + Err(ConnectionError::SocketClosed) => { + println!("Socket closed for id {}", connection_id); + ids_to_clear.push(*connection_id); + } + + Err(error) => { + println!( + "I/O error while reading connection {}: {:?}", + connection_id, error + ); + ids_to_clear.push(*connection_id); + } + + Ok(result) => match result { + ReadResult::NoBytesReceived => (), + ReadResult::HandshakingInProgress => (), + ReadResult::BytesReceived { buffer, byte_count } => { + let mut server_results = + match server.bytes_received(*connection_id, &buffer[..byte_count]) { + Ok(results) => results, + Err(error) => { + println!("Input caused the following server error: {}", error); + ids_to_clear.push(*connection_id); + continue; + } + }; + + for result in server_results.drain(..) { + match result { + ServerResult::OutboundPacket { + target_connection_id, + packet, + } => { + packets_to_write.push((target_connection_id, packet)); + } + + ServerResult::DisconnectConnection { + connection_id: id_to_close, + } => { + ids_to_clear.push(id_to_close); + } + } + } + } + }, + } + } + + for (connection_id, packet) in packets_to_write.drain(..) { + let connection = connections.get_mut(connection_id).unwrap(); + connection.write(packet.bytes); + } + + for closed_id in ids_to_clear { + println!("Connection {} closed", closed_id); + connection_ids.remove(&closed_id); + connections.remove(closed_id); + server.notify_connection_closed(closed_id); } } } diff --git a/src/lib.rs b/src/lib.rs index d23077b..72752ef 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ use glib::prelude::*; mod connection; +mod data; mod imp; mod server; diff --git a/src/server.rs b/src/server.rs index 602e131..26fcb90 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,3 +1,5 @@ +// Based on the example code in: https://github.com/KallDrexx/rust-media-libs/blob/master/examples/threaded_rtmp_server/src/server.rs +use crate::data::{Media, MediaType, RtmpInput}; use bytes::Bytes; use rml_rtmp::chunk_io::Packet; use rml_rtmp::sessions::StreamMetadata; @@ -8,6 +10,7 @@ use rml_rtmp::time::RtmpTimestamp; use slab::Slab; use std::collections::{HashMap, HashSet}; use std::rc::Rc; +use std::sync::mpsc::Sender; enum ClientAction { Waiting, @@ -63,14 +66,18 @@ pub struct Server { clients: Slab, connection_to_client_map: HashMap, channels: HashMap, + media_sink: Sender, + has_received_keyframe: bool, } impl Server { - pub fn new() -> Server { - Server { - clients: Slab::with_capacity(1024), - connection_to_client_map: HashMap::with_capacity(1024), + pub fn new(media_sink: Sender) -> Self { + Self { + clients: Slab::with_capacity(8), + connection_to_client_map: HashMap::with_capacity(8), channels: HashMap::new(), + media_sink, + has_received_keyframe: false, } } @@ -514,9 +521,11 @@ impl Server { None => return, }; + self.media_sink + .send(RtmpInput::Metadata(metadata.clone())) + .unwrap(); let metadata = Rc::new(metadata); channel.metadata = Some(metadata.clone()); - // Send the metadata to all current watchers for client_id in &channel.watching_client_ids { let client = match self.clients.get_mut(*client_id) { @@ -580,6 +589,48 @@ impl Server { } } + // send to gstreamer element + { + let should_send_to_client = match data_type { + ReceivedDataType::Video => { + self.has_received_keyframe + || (is_video_sequence_header(data.clone()) + || is_video_keyframe(data.clone())) + } + + ReceivedDataType::Audio => { + self.has_received_keyframe || is_audio_sequence_header(data.clone()) + } + }; + if should_send_to_client { + match data_type { + ReceivedDataType::Audio => self + .media_sink + .send(RtmpInput::Media(Media { + media_type: MediaType::Audio, + data: data.clone(), + timestamp: timestamp.value, + can_be_dropped: true, + })) + .unwrap(), + ReceivedDataType::Video => { + if is_video_keyframe(data.clone()) { + self.has_received_keyframe = true; + } + + self.media_sink + .send(RtmpInput::Media(Media { + media_type: MediaType::Video, + data: data.clone(), + timestamp: timestamp.value, + can_be_dropped: true, + })) + .unwrap(); + } + }; + } + } + for client_id in &channel.watching_client_ids { let client = match self.clients.get_mut(*client_id) { Some(client) => client,