diff --git a/Cargo.toml b/Cargo.toml index c590f2e..04dc42b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,9 @@ gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/g gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_12"] } gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_12"] } once_cell = "1.0" +rml_rtmp = "0.3.2" +slab = "0.4.2" +bytes = "0.5" [build-dependencies] gst-plugin-version-helper = { git = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" } diff --git a/README.md b/README.md index abab56d..0f8b379 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,17 @@ # GStreamer RTMP Server Plugin GStreamer Plugin that creates a server that is capable of receiving a RTMP stream. + +### Usage + +Sending a test video stream: + +```bash +gst-launch-1.0 videotestsrc is-live=true ! x264enc ! flvmux ! rtmpsink location='rtmp://localhost:5000/myapp/somekey live=1' +``` + +Receiving a rtmp video stream: + +```bash +gst-launch-1.0 -v uridecodebin uri=rtmp://localhost:1935/myapp/somekey ! autovideosink +``` diff --git a/src/connection.rs b/src/connection.rs new file mode 100644 index 0000000..f412c98 --- /dev/null +++ b/src/connection.rs @@ -0,0 +1,173 @@ +use std::collections::VecDeque; +use std::io; +use std::io::{Read, Write}; +use std::time::Duration; +use std::net::TcpStream; +use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; +use std::thread; +use rml_rtmp::handshake::{Handshake, HandshakeProcessResult, PeerType}; + +const BUFFER_SIZE: usize = 4096; + +pub enum ReadResult { + HandshakingInProgress, + NoBytesReceived, + BytesReceived { + buffer: [u8; BUFFER_SIZE], + byte_count: usize, + }, +} + +#[derive(Debug)] +pub enum ConnectionError { + IoError(io::Error), + SocketClosed, +} + +impl From for ConnectionError { + fn from(error: io::Error) -> Self { + ConnectionError::IoError(error) + } +} + +pub struct Connection { + pub connection_id: Option, + writer: Sender>, + reader: Receiver, + handshake: Handshake, + handshake_completed: bool, +} + +impl Connection { + pub fn new(socket: TcpStream) -> Connection { + let (byte_sender, byte_receiver) = channel(); + let (result_sender, result_receiver) = channel(); + + start_byte_writer(byte_receiver, &socket); + start_result_reader(result_sender, &socket); + + Connection { + connection_id: None, + writer: byte_sender, + reader: result_receiver, + handshake: Handshake::new(PeerType::Server), + handshake_completed: false, + } + } + + pub fn write(&self, bytes: Vec) { + self.writer.send(bytes).unwrap(); + } + + pub fn read(&mut self) -> Result { + match self.reader.try_recv() { + Err(TryRecvError::Empty) => Ok(ReadResult::NoBytesReceived), + Err(TryRecvError::Disconnected) => Err(ConnectionError::SocketClosed), + Ok(result) => { + match self.handshake_completed { + true => Ok(result), + false => match result { + ReadResult::HandshakingInProgress => unreachable!(), + ReadResult::NoBytesReceived => Ok(result), + ReadResult::BytesReceived {buffer, byte_count} + => self.handle_handshake_bytes(&buffer[..byte_count]), + } + } + } + } + } + + fn handle_handshake_bytes(&mut self, bytes: &[u8]) -> Result { + let result = match self.handshake.process_bytes(bytes) { + Ok(result) => result, + Err(error) => { + println!("Handshake error: {:?}", error); + return Err(ConnectionError::SocketClosed); + } + }; + + match result { + HandshakeProcessResult::InProgress {response_bytes} => { + if response_bytes.len() > 0 { + self.write(response_bytes); + } + + Ok(ReadResult::HandshakingInProgress) + }, + + HandshakeProcessResult::Completed {response_bytes, remaining_bytes} => { + println!("Handshake successful!"); + if response_bytes.len() > 0 { + self.write(response_bytes); + } + + let mut buffer = [0; BUFFER_SIZE]; + let buffer_size = remaining_bytes.len(); + for (index, value) in remaining_bytes.into_iter().enumerate() { + buffer[index] = value; + } + + self.handshake_completed = true; + Ok(ReadResult::BytesReceived {buffer, byte_count: buffer_size}) + } + } + } +} + +fn start_byte_writer(byte_receiver: Receiver>, socket: &TcpStream) { + let mut socket = socket.try_clone().expect("failed to clone socket"); + thread::spawn(move|| { + let mut send_queue = VecDeque::new(); + + loop { + loop { + match byte_receiver.try_recv() { + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => return, + Ok(bytes) => send_queue.push_back(bytes), + } + } + + match send_queue.pop_front() { + None => thread::sleep(Duration::from_millis(1)), + Some(bytes) => match socket.write(&bytes) { + Ok(_) => (), + Err(error) => { + println!("Error writing to socket: {:?}", error); + return; + } + } + } + } + }); +} + +fn start_result_reader(sender: Sender, socket: &TcpStream) { + let mut socket = socket.try_clone().unwrap(); + thread::spawn(move|| { + let mut buffer = [0; BUFFER_SIZE]; + loop { + match socket.read(&mut buffer) { + Ok(0) => return, // socket closed + Ok(read_count) => { + let mut send_buffer = [0; BUFFER_SIZE]; + for x in 0..read_count { + send_buffer[x] = buffer[x]; + } + + let result = ReadResult::BytesReceived { + buffer: send_buffer, + byte_count: read_count, + }; + + sender.send(result).unwrap(); + }, + + Err(error) => { + println!("Error occurred reading from socket: {:?}", error); + return; + } + } + } + }); +} diff --git a/src/imp.rs b/src/imp.rs index 948f58d..6688d9b 100644 --- a/src/imp.rs +++ b/src/imp.rs @@ -7,6 +7,10 @@ use gst_base::prelude::*; use gst_base::subclass::prelude::*; use once_cell::sync::Lazy; +use std::u32; +use crate::server::Server; +use std::sync::Mutex; +use bytes::Bytes; static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( @@ -16,8 +20,64 @@ static CAT: Lazy = Lazy::new(|| { ) }); -pub struct RtmpSvrSrc { +const DEFAULT_ADDRESS: &str = "0.0.0.0"; +const DEFAULT_PORT: u32 = 5000; +#[derive(Debug, Clone)] +struct Settings { + address: String, + port: u32 +} + +impl Default for Settings { + fn default() -> Self { + Settings { + address: DEFAULT_ADDRESS.into(), + port: DEFAULT_PORT + } + } +} + +static PROPERTIES: [subclass::Property; 2] = [ + subclass::Property("address", |name| { + glib::ParamSpec::string( + name, + "Address", + "The address the server should listen for incoming connections", + DEFAULT_ADDRESS.into(), + glib::ParamFlags::READWRITE + ) + }), + subclass::Property("port", |name| { + glib::ParamSpec::uint( + name, + "Port", + "The port that the server should bind to", + 1000, + u32::MAX, + DEFAULT_PORT, + glib::ParamFlags::READWRITE + ) + }), +]; + +#[derive(Debug)] +enum State { + Stopped, + Started { + stream_key: String, + }, +} + +impl Default for State { + fn default() -> Self { + State::Stopped + } +} + +pub struct RtmpSvrSrc { + settings: Mutex, + state: Mutex, } impl ObjectSubclass for RtmpSvrSrc { @@ -33,15 +93,11 @@ impl ObjectSubclass for RtmpSvrSrc { klass.set_metadata( "RTMP Server Source", "Source/Video", - "Creates a server that is capable of receiving a RTMP stream", + "Creates a server capable of receiving a RTMP stream", "Rafael Caricio ", ); - let caps = gst::Caps::new_simple( - "video/x-raw", - &[], - ); - + let caps = gst::Caps::new_any(); let src_pad_template = gst::PadTemplate::new( "src", gst::PadDirection::Src, @@ -49,14 +105,117 @@ impl ObjectSubclass for RtmpSvrSrc { &caps, ).unwrap(); klass.add_pad_template(src_pad_template); + + klass.install_properties(&PROPERTIES); } fn new() -> Self { - Self {} + Self { + settings: Mutex::new(Default::default()), + state: Mutex::new(Default::default()), + } + } +} + +impl ObjectImpl for RtmpSvrSrc { + + fn set_property(&self, obj: &Self::Type, id: usize, value: &glib::Value) { + let prop = &PROPERTIES[id]; + match *prop { + subclass::Property("address", ..) => { + let mut settings = self.settings.lock().unwrap(); + let address = value.get().expect("type checked upstream").unwrap_or_else(|| DEFAULT_ADDRESS).into(); + settings.address = address; + gst_debug!(CAT, obj: obj, "Set address to: {}", settings.address); + } + subclass::Property("port", ..) => { + let mut settings = self.settings.lock().unwrap(); + let port = value.get_some().expect("type checked upstream"); + settings.port = port; + gst_debug!(CAT, obj: obj, "Set port to: {}", port); + } + _ => unimplemented!(), + }; + } + fn get_property(&self, obj: &Self::Type, id: usize) -> glib::Value { + let prop = &PROPERTIES[id]; + match *prop { + subclass::Property("address", ..) => { + let settings = self.settings.lock().unwrap(); + settings.address.to_value() + } + subclass::Property("port", ..) => { + let settings = self.settings.lock().unwrap(); + settings.port.to_value() + } + _ => unimplemented!(), + } + } + + fn constructed(&self, obj: &Self::Type) { + self.parent_constructed(obj); + obj.set_automatic_eos(false); + obj.set_format(gst::Format::Bytes); } } -impl ObjectImpl for RtmpSvrSrc {} impl ElementImpl for RtmpSvrSrc {} -impl BaseSrcImpl for RtmpSvrSrc {} -impl PushSrcImpl for RtmpSvrSrc {} + +impl BaseSrcImpl for RtmpSvrSrc { + fn start(&self, src: &Self::Type) -> Result<(), gst::ErrorMessage> { + // TODO: Here we start the server.. + Ok(()) + } + + fn stop(&self, src: &Self::Type) -> Result<(), gst::ErrorMessage> { + gst_debug!(CAT, obj: src, "Stopping"); + // TODO: Here we stop the server + Ok(()) + } + + fn is_seekable(&self, _src: &Self::Type) -> bool { + false + } + + fn query(&self, element: &Self::Type, query: &mut gst::QueryRef) -> bool { + use gst::QueryView; + + match query.view_mut() { + QueryView::Scheduling(ref mut q) => { + q.set( + gst::SchedulingFlags::SEQUENTIAL | gst::SchedulingFlags::BANDWIDTH_LIMITED, + 1, + -1, + 0, + ); + q.add_scheduling_modes(&[gst::PadMode::Push]); + true + } + _ => BaseSrcImplExt::parent_query(self, element, query), + } + } + + fn unlock(&self, _src: &Self::Type) -> Result<(), gst::ErrorMessage> { + // TODO: Here we abort the server + Ok(()) + } +} + +impl PushSrcImpl for RtmpSvrSrc { + fn create(&self, src: &Self::Type) -> Result { + let mut state = self.state.lock().unwrap(); + if let State::Started { .. } = *state { + let chunk = Bytes::from("Mock"); + // Here we return the buffer + let size = chunk.len(); + assert_ne!(chunk.len(), 0); + + let buffer = gst::Buffer::from_slice(chunk); + + Ok(buffer) + } else { + gst_debug!(CAT, obj: src, "End of stream"); + Err(gst::FlowError::Eos) + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 7eab5b5..93adfbf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,11 @@ use glib::prelude::*; mod imp; +mod connection; +mod server; glib::wrapper! { - pub struct RtmpSrvSrc(ObjectSubclass) @extends gst_base::BaseSrc, gst::Element, gst::Object; + pub struct RtmpSrvSrc(ObjectSubclass) @extends gst_base::PushSrc, gst_base::BaseSrc, gst::Element, gst::Object; } unsafe impl Send for RtmpSrvSrc {} diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..525145c --- /dev/null +++ b/src/server.rs @@ -0,0 +1,537 @@ +use std::collections::{HashMap, HashSet}; +use std::rc::Rc; +use bytes::Bytes; +use slab::Slab; +use rml_rtmp::sessions::{ServerSession, ServerSessionConfig, ServerSessionResult, ServerSessionEvent}; +use rml_rtmp::sessions::StreamMetadata; +use rml_rtmp::chunk_io::Packet; +use rml_rtmp::time::RtmpTimestamp; + +enum ClientAction { + Waiting, + Publishing(String), // Publishing to a stream key + Watching { + stream_key: String, + stream_id: u32, + }, +} + +enum ReceivedDataType {Audio, Video} + +struct Client { + session: ServerSession, + current_action: ClientAction, + connection_id: usize, + has_received_video_keyframe: bool, +} + +impl Client { + fn get_active_stream_id(&self) -> Option { + match self.current_action { + ClientAction::Waiting => None, + ClientAction::Publishing(_) => None, + ClientAction::Watching {stream_key: _, stream_id} => Some(stream_id), + } + } +} + +struct MediaChannel { + publishing_client_id: Option, + watching_client_ids: HashSet, + metadata: Option>, + video_sequence_header: Option, + audio_sequence_header: Option, +} + +#[derive(Debug)] +pub enum ServerResult { + DisconnectConnection {connection_id: usize}, + OutboundPacket { + target_connection_id: usize, + packet: Packet, + } +} + +pub struct Server { + clients: Slab, + connection_to_client_map: HashMap, + channels: HashMap, +} + +impl Server { + pub fn new() -> Server { + Server { + clients: Slab::with_capacity(1024), + connection_to_client_map: HashMap::with_capacity(1024), + channels: HashMap::new(), + } + } + + pub fn bytes_received(&mut self, connection_id: usize, bytes: &[u8]) -> Result, String> { + let mut server_results = Vec::new(); + + if !self.connection_to_client_map.contains_key(&connection_id) { + // Initiate new client connection + let config = ServerSessionConfig::new(); + let (session, initial_session_results) = match ServerSession::new(config) { + Ok(results) => results, + Err(error) => return Err(error.to_string()), + }; + + self.handle_session_results(connection_id, initial_session_results, &mut server_results); + let client = Client { + session, + connection_id, + current_action: ClientAction::Waiting, + has_received_video_keyframe: false, + }; + + let client_id = Some(self.clients.insert(client)); + self.connection_to_client_map.insert(connection_id, client_id.unwrap()); + } + + let client_results: Vec; + { + let client_id = self.connection_to_client_map.get(&connection_id).unwrap(); + let client = self.clients.get_mut(*client_id).unwrap(); + client_results = match client.session.handle_input(bytes) { + Ok(results) => results, + Err(error) => return Err(error.to_string()), + }; + } + + self.handle_session_results(connection_id, client_results, &mut server_results); + Ok(server_results) + } + + pub fn notify_connection_closed(&mut self, connection_id: usize) { + match self.connection_to_client_map.remove(&connection_id) { + None => (), + Some(client_id) => { + let client = self.clients.remove(client_id); + match client.current_action { + ClientAction::Publishing(stream_key) => self.publishing_ended(stream_key), + ClientAction::Watching{stream_key, stream_id: _} => self.play_ended(client_id, stream_key), + ClientAction::Waiting => (), + } + }, + } + } + + fn handle_session_results(&mut self, + executed_connection_id: usize, + session_results: Vec, + server_results: &mut Vec) { + for result in session_results { + match result { + ServerSessionResult::OutboundResponse(packet) => { + server_results.push(ServerResult::OutboundPacket { + target_connection_id: executed_connection_id, + packet, + }) + }, + + ServerSessionResult::RaisedEvent(event) => + self.handle_raised_event(executed_connection_id, event, server_results), + + x => println!("Server result received: {:?}", x), + } + } + } + + fn handle_raised_event(&mut self, + executed_connection_id: usize, + event: ServerSessionEvent, + server_results: &mut Vec) { + match event { + ServerSessionEvent::ConnectionRequested {request_id, app_name} => { + self.handle_connection_requested(executed_connection_id, request_id, app_name, server_results); + }, + + ServerSessionEvent::PublishStreamRequested {request_id, app_name, stream_key, mode: _} => { + self.handle_publish_requested(executed_connection_id, request_id, app_name, stream_key, server_results); + }, + + ServerSessionEvent::PlayStreamRequested {request_id, app_name, stream_key, start_at: _, duration: _, reset: _, stream_id} => { + self.handle_play_requested(executed_connection_id, request_id, app_name, stream_key, stream_id, server_results); + }, + + ServerSessionEvent::StreamMetadataChanged {app_name, stream_key, metadata} => { + self.handle_metadata_received(app_name, stream_key, metadata, server_results); + }, + + ServerSessionEvent::VideoDataReceived {app_name: _, stream_key, data, timestamp} => { + self.handle_audio_video_data_received(stream_key, timestamp, data, ReceivedDataType::Video, server_results); + }, + + ServerSessionEvent::AudioDataReceived {app_name: _, stream_key, data, timestamp} => { + self.handle_audio_video_data_received(stream_key, timestamp, data, ReceivedDataType::Audio, server_results); + }, + + _ => println!("Event raised by connection {}: {:?}", executed_connection_id, event), + } + } + + fn handle_connection_requested(&mut self, + requested_connection_id: usize, + request_id: u32, + app_name: String, + server_results: &mut Vec) { + println!("Connection {} requested connection to app '{}'", requested_connection_id, app_name); + + let accept_result; + { + let client_id = self.connection_to_client_map.get(&requested_connection_id).unwrap(); + let client = self.clients.get_mut(*client_id).unwrap(); + accept_result = client.session.accept_request(request_id); + } + + match accept_result { + Err(error) => { + println!("Error occurred accepting connection request: {:?}", error); + server_results.push(ServerResult::DisconnectConnection { + connection_id: requested_connection_id} + ) + }, + + Ok(results) => { + self.handle_session_results(requested_connection_id, results, server_results); + } + } + } + + fn handle_publish_requested(&mut self, + requested_connection_id: usize, + request_id: u32, + app_name: String, + stream_key: String, + server_results: &mut Vec) { + println!("Publish requested on app '{}' and stream key '{}'", app_name, stream_key); + + match self.channels.get(&stream_key) { + None => (), + Some(channel) => match channel.publishing_client_id { + None => (), + Some(_) => { + println!("Stream key already being published to"); + server_results.push(ServerResult::DisconnectConnection {connection_id: requested_connection_id}); + return; + } + } + } + + let accept_result; + { + let client_id = self.connection_to_client_map.get(&requested_connection_id).unwrap(); + let client = self.clients.get_mut(*client_id).unwrap(); + client.current_action = ClientAction::Publishing(stream_key.clone()); + + let channel = self.channels + .entry(stream_key) + .or_insert(MediaChannel { + publishing_client_id: None, + watching_client_ids: HashSet::new(), + metadata: None, + video_sequence_header: None, + audio_sequence_header: None, + }); + + channel.publishing_client_id = Some(*client_id); + accept_result = client.session.accept_request(request_id); + } + + match accept_result { + Err(error) => { + println!("Error occurred accepting publish request: {:?}", error); + server_results.push(ServerResult::DisconnectConnection { + connection_id: requested_connection_id} + ) + }, + + Ok(results) => { + self.handle_session_results(requested_connection_id, results, server_results); + } + } + } + + fn handle_play_requested(&mut self, + requested_connection_id: usize, + request_id: u32, + app_name: String, + stream_key: String, + stream_id: u32, + server_results: &mut Vec) { + println!("Play requested on app '{}' and stream key '{}'", app_name, stream_key); + + let accept_result; + { + let client_id = self.connection_to_client_map.get(&requested_connection_id).unwrap(); + let client = self.clients.get_mut(*client_id).unwrap(); + client.current_action = ClientAction::Watching { + stream_key: stream_key.clone(), + stream_id, + }; + + let channel = self.channels + .entry(stream_key.clone()) + .or_insert(MediaChannel { + publishing_client_id: None, + watching_client_ids: HashSet::new(), + metadata: None, + video_sequence_header: None, + audio_sequence_header: None, + }); + + channel.watching_client_ids.insert(*client_id); + accept_result = match client.session.accept_request(request_id) { + Err(error) => Err(error), + Ok(mut results) => { + + // If the channel already has existing metadata, send that to the new client + // so they have up to date info + match channel.metadata { + None => (), + Some(ref metadata) => { + let packet = match client.session.send_metadata(stream_id, metadata.clone()) { + Ok(packet) => packet, + Err(error) => { + println!("Error occurred sending existing metadata to new client: {:?}", error); + server_results.push(ServerResult::DisconnectConnection { + connection_id: requested_connection_id} + ); + + return; + }, + }; + + results.push(ServerSessionResult::OutboundResponse(packet)); + } + } + + // If the channel already has sequence headers, send them + match channel.video_sequence_header { + None => (), + Some(ref data) => { + let packet = match client.session.send_video_data(stream_id, data.clone(), RtmpTimestamp::new(0), false) { + Ok(packet) => packet, + Err(error) => { + println!("Error occurred sending video header to new client: {:?}", error); + server_results.push(ServerResult::DisconnectConnection { + connection_id: requested_connection_id} + ); + + return; + }, + }; + + results.push(ServerSessionResult::OutboundResponse(packet)); + } + } + + match channel.audio_sequence_header { + None => (), + Some(ref data) => { + let packet = match client.session.send_audio_data(stream_id, data.clone(), RtmpTimestamp::new(0), false) { + Ok(packet) => packet, + Err(error) => { + println!("Error occurred sending audio header to new client: {:?}", error); + server_results.push(ServerResult::DisconnectConnection { + connection_id: requested_connection_id} + ); + + return; + }, + }; + + results.push(ServerSessionResult::OutboundResponse(packet)); + } + } + + Ok(results) + } + } + } + + match accept_result { + Err(error) => { + println!("Error occurred accepting playback request: {:?}", error); + server_results.push(ServerResult::DisconnectConnection { + connection_id: requested_connection_id} + ); + + return; + }, + + Ok(results) => { + self.handle_session_results(requested_connection_id, results, server_results); + } + } + } + + fn handle_metadata_received(&mut self, + app_name: String, + stream_key: String, + metadata: StreamMetadata, + server_results: &mut Vec) { + println!("New metadata received for app '{}' and stream key '{}'", app_name, stream_key); + let channel = match self.channels.get_mut(&stream_key) { + Some(channel) => channel, + None => return, + }; + + let metadata = Rc::new(metadata); + channel.metadata = Some(metadata.clone()); + + // Send the metadata to all current watchers + for client_id in &channel.watching_client_ids { + let client = match self.clients.get_mut(*client_id) { + Some(client) => client, + None => continue, + }; + + let active_stream_id = match client.get_active_stream_id() { + Some(stream_id) => stream_id, + None => continue, + }; + + match client.session.send_metadata(active_stream_id, metadata.clone()) { + Ok(packet) => { + server_results.push(ServerResult::OutboundPacket { + target_connection_id: client.connection_id, + packet, + }) + }, + + Err(error) => { + println!("Error sending metadata to client on connection id {}: {:?}", client.connection_id, error); + server_results.push(ServerResult::DisconnectConnection { + connection_id: client.connection_id + }); + }, + } + } + } + + fn handle_audio_video_data_received(&mut self, + stream_key: String, + timestamp: RtmpTimestamp, + data: Bytes, + data_type: ReceivedDataType, + server_results: &mut Vec) { + let channel = match self.channels.get_mut(&stream_key) { + Some(channel) => channel, + None => return, + }; + + // If this is an audio or video sequence header we need to save it, so it can be + // distributed to any late coming watchers + match data_type { + ReceivedDataType::Video => { + if is_video_sequence_header(data.clone()) { + channel.video_sequence_header = Some(data.clone()); + } + }, + + ReceivedDataType::Audio => { + if is_audio_sequence_header(data.clone()) { + channel.audio_sequence_header = Some(data.clone()); + } + } + } + + for client_id in &channel.watching_client_ids { + let client = match self.clients.get_mut(*client_id) { + Some(client) => client, + None => continue, + }; + + let active_stream_id = match client.get_active_stream_id() { + Some(stream_id) => stream_id, + None => continue, + }; + + let should_send_to_client = match data_type { + ReceivedDataType::Video => { + client.has_received_video_keyframe || + (is_video_sequence_header(data.clone()) || + is_video_keyframe(data.clone())) + }, + + ReceivedDataType::Audio => { + client.has_received_video_keyframe || is_audio_sequence_header(data.clone()) + }, + }; + + if !should_send_to_client { + continue; + } + + let send_result = match data_type { + ReceivedDataType::Audio => client.session.send_audio_data(active_stream_id, data.clone(), timestamp.clone(), true), + ReceivedDataType::Video => { + if is_video_keyframe(data.clone()) { + client.has_received_video_keyframe = true; + } + + client.session.send_video_data(active_stream_id, data.clone(), timestamp.clone(), true) + }, + }; + + match send_result { + Ok(packet) => { + server_results.push(ServerResult::OutboundPacket { + target_connection_id: client.connection_id, + packet, + }) + }, + + Err(error) => { + println!("Error sending metadata to client on connection id {}: {:?}", client.connection_id, error); + server_results.push(ServerResult::DisconnectConnection { + connection_id: client.connection_id + }); + }, + } + } + } + + fn publishing_ended(&mut self, stream_key: String) { + let channel = match self.channels.get_mut(&stream_key) { + Some(channel) => channel, + None => return, + }; + + channel.publishing_client_id = None; + channel.metadata = None; + } + + fn play_ended(&mut self, client_id: usize, stream_key: String) { + let channel = match self.channels.get_mut(&stream_key) { + Some(channel) => channel, + None => return, + }; + + channel.watching_client_ids.remove(&client_id); + } +} + +fn is_video_sequence_header(data: Bytes) -> bool { + // This is assuming h264. + return data.len() >= 2 && + data[0] == 0x17 && + data[1] == 0x00; +} + +fn is_audio_sequence_header(data: Bytes) -> bool { + // This is assuming aac + return data.len() >= 2 && + data[0] == 0xaf && + data[1] == 0x00; +} + +fn is_video_keyframe(data: Bytes) -> bool { + // assumings h264 + return data.len() >= 2 && + data[0] == 0x17 && + data[1] != 0x00; // 0x00 is the sequence header, don't count that for now +}