// Based on the example code in: https://github.com/KallDrexx/rust-media-libs/blob/master/examples/threaded_rtmp_server/src/server.rs use crate::data::{Media, MediaType, RtmpInput}; use bytes::Bytes; use rml_rtmp::chunk_io::Packet; use rml_rtmp::sessions::StreamMetadata; use rml_rtmp::sessions::{ ServerSession, ServerSessionConfig, ServerSessionEvent, ServerSessionResult, }; use rml_rtmp::time::RtmpTimestamp; use slab::Slab; use std::collections::{HashMap, HashSet}; use std::rc::Rc; use std::sync::mpsc::Sender; enum ClientAction { Waiting, Publishing(String), // Publishing to a stream key Watching { stream_key: String, stream_id: u32 }, } enum ReceivedDataType { Audio, Video, } struct Client { session: ServerSession, current_action: ClientAction, connection_id: usize, has_received_video_keyframe: bool, } impl Client { fn get_active_stream_id(&self) -> Option { match self.current_action { ClientAction::Waiting => None, ClientAction::Publishing(_) => None, ClientAction::Watching { stream_key: _, stream_id, } => Some(stream_id), } } } struct MediaChannel { publishing_client_id: Option, watching_client_ids: HashSet, metadata: Option>, video_sequence_header: Option, audio_sequence_header: Option, } #[derive(Debug)] pub enum ServerResult { DisconnectConnection { connection_id: usize, }, OutboundPacket { target_connection_id: usize, packet: Packet, }, } pub struct Server { clients: Slab, connection_to_client_map: HashMap, channels: HashMap, media_sink: Sender, has_received_keyframe: bool, } impl Server { pub fn new(media_sink: Sender) -> Self { Self { clients: Slab::with_capacity(8), connection_to_client_map: HashMap::with_capacity(8), channels: HashMap::new(), media_sink, has_received_keyframe: false, } } pub fn bytes_received( &mut self, connection_id: usize, bytes: &[u8], ) -> Result, String> { let mut server_results = Vec::new(); if !self.connection_to_client_map.contains_key(&connection_id) { // Initiate new client connection let config = ServerSessionConfig::new(); let (session, initial_session_results) = match ServerSession::new(config) { Ok(results) => results, Err(error) => return Err(error.to_string()), }; self.handle_session_results( connection_id, initial_session_results, &mut server_results, ); let client = Client { session, connection_id, current_action: ClientAction::Waiting, has_received_video_keyframe: false, }; let client_id = Some(self.clients.insert(client)); self.connection_to_client_map .insert(connection_id, client_id.unwrap()); } let client_results: Vec; { let client_id = self.connection_to_client_map.get(&connection_id).unwrap(); let client = self.clients.get_mut(*client_id).unwrap(); client_results = match client.session.handle_input(bytes) { Ok(results) => results, Err(error) => return Err(error.to_string()), }; } self.handle_session_results(connection_id, client_results, &mut server_results); Ok(server_results) } pub fn notify_connection_closed(&mut self, connection_id: usize) { match self.connection_to_client_map.remove(&connection_id) { None => (), Some(client_id) => { let client = self.clients.remove(client_id); match client.current_action { ClientAction::Publishing(stream_key) => self.publishing_ended(stream_key), ClientAction::Watching { stream_key, stream_id: _, } => self.play_ended(client_id, stream_key), ClientAction::Waiting => (), } } } } fn handle_session_results( &mut self, executed_connection_id: usize, session_results: Vec, server_results: &mut Vec, ) { for result in session_results { match result { ServerSessionResult::OutboundResponse(packet) => { server_results.push(ServerResult::OutboundPacket { target_connection_id: executed_connection_id, packet, }) } ServerSessionResult::RaisedEvent(event) => { self.handle_raised_event(executed_connection_id, event, server_results) } x => println!("Server result received: {:?}", x), } } } fn handle_raised_event( &mut self, executed_connection_id: usize, event: ServerSessionEvent, server_results: &mut Vec, ) { match event { ServerSessionEvent::ConnectionRequested { request_id, app_name, } => { self.handle_connection_requested( executed_connection_id, request_id, app_name, server_results, ); } ServerSessionEvent::PublishStreamRequested { request_id, app_name, stream_key, mode: _, } => { self.handle_publish_requested( executed_connection_id, request_id, app_name, stream_key, server_results, ); } ServerSessionEvent::PlayStreamRequested { request_id, app_name, stream_key, start_at: _, duration: _, reset: _, stream_id, } => { self.handle_play_requested( executed_connection_id, request_id, app_name, stream_key, stream_id, server_results, ); } ServerSessionEvent::StreamMetadataChanged { app_name, stream_key, metadata, } => { self.handle_metadata_received(app_name, stream_key, metadata, server_results); } ServerSessionEvent::VideoDataReceived { app_name: _, stream_key, data, timestamp, } => { self.handle_audio_video_data_received( stream_key, timestamp, data, ReceivedDataType::Video, server_results, ); } ServerSessionEvent::AudioDataReceived { app_name: _, stream_key, data, timestamp, } => { self.handle_audio_video_data_received( stream_key, timestamp, data, ReceivedDataType::Audio, server_results, ); } _ => println!( "Event raised by connection {}: {:?}", executed_connection_id, event ), } } fn handle_connection_requested( &mut self, requested_connection_id: usize, request_id: u32, app_name: String, server_results: &mut Vec, ) { println!( "Connection {} requested connection to app '{}'", requested_connection_id, app_name ); let accept_result; { let client_id = self .connection_to_client_map .get(&requested_connection_id) .unwrap(); let client = self.clients.get_mut(*client_id).unwrap(); accept_result = client.session.accept_request(request_id); } match accept_result { Err(error) => { println!("Error occurred accepting connection request: {:?}", error); server_results.push(ServerResult::DisconnectConnection { connection_id: requested_connection_id, }) } Ok(results) => { self.handle_session_results(requested_connection_id, results, server_results); } } } fn handle_publish_requested( &mut self, requested_connection_id: usize, request_id: u32, app_name: String, stream_key: String, server_results: &mut Vec, ) { println!( "Publish requested on app '{}' and stream key '{}'", app_name, stream_key ); match self.channels.get(&stream_key) { None => (), Some(channel) => match channel.publishing_client_id { None => (), Some(_) => { println!("Stream key already being published to"); server_results.push(ServerResult::DisconnectConnection { connection_id: requested_connection_id, }); return; } }, } let accept_result; { let client_id = self .connection_to_client_map .get(&requested_connection_id) .unwrap(); let client = self.clients.get_mut(*client_id).unwrap(); client.current_action = ClientAction::Publishing(stream_key.clone()); let channel = self.channels.entry(stream_key).or_insert(MediaChannel { publishing_client_id: None, watching_client_ids: HashSet::new(), metadata: None, video_sequence_header: None, audio_sequence_header: None, }); channel.publishing_client_id = Some(*client_id); accept_result = client.session.accept_request(request_id); } match accept_result { Err(error) => { println!("Error occurred accepting publish request: {:?}", error); server_results.push(ServerResult::DisconnectConnection { connection_id: requested_connection_id, }) } Ok(results) => { self.handle_session_results(requested_connection_id, results, server_results); } } } fn handle_play_requested( &mut self, requested_connection_id: usize, request_id: u32, app_name: String, stream_key: String, stream_id: u32, server_results: &mut Vec, ) { println!( "Play requested on app '{}' and stream key '{}'", app_name, stream_key ); let accept_result; { let client_id = self .connection_to_client_map .get(&requested_connection_id) .unwrap(); let client = self.clients.get_mut(*client_id).unwrap(); client.current_action = ClientAction::Watching { stream_key: stream_key.clone(), stream_id, }; let channel = self .channels .entry(stream_key.clone()) .or_insert(MediaChannel { publishing_client_id: None, watching_client_ids: HashSet::new(), metadata: None, video_sequence_header: None, audio_sequence_header: None, }); channel.watching_client_ids.insert(*client_id); accept_result = match client.session.accept_request(request_id) { Err(error) => Err(error), Ok(mut results) => { // If the channel already has existing metadata, send that to the new client // so they have up to date info match channel.metadata { None => (), Some(ref metadata) => { let packet = match client .session .send_metadata(stream_id, metadata.clone()) { Ok(packet) => packet, Err(error) => { println!("Error occurred sending existing metadata to new client: {:?}", error); server_results.push(ServerResult::DisconnectConnection { connection_id: requested_connection_id, }); return; } }; results.push(ServerSessionResult::OutboundResponse(packet)); } } // If the channel already has sequence headers, send them match channel.video_sequence_header { None => (), Some(ref data) => { let packet = match client.session.send_video_data( stream_id, data.clone(), RtmpTimestamp::new(0), false, ) { Ok(packet) => packet, Err(error) => { println!( "Error occurred sending video header to new client: {:?}", error ); server_results.push(ServerResult::DisconnectConnection { connection_id: requested_connection_id, }); return; } }; results.push(ServerSessionResult::OutboundResponse(packet)); } } match channel.audio_sequence_header { None => (), Some(ref data) => { let packet = match client.session.send_audio_data( stream_id, data.clone(), RtmpTimestamp::new(0), false, ) { Ok(packet) => packet, Err(error) => { println!( "Error occurred sending audio header to new client: {:?}", error ); server_results.push(ServerResult::DisconnectConnection { connection_id: requested_connection_id, }); return; } }; results.push(ServerSessionResult::OutboundResponse(packet)); } } Ok(results) } } } match accept_result { Err(error) => { println!("Error occurred accepting playback request: {:?}", error); server_results.push(ServerResult::DisconnectConnection { connection_id: requested_connection_id, }); return; } Ok(results) => { self.handle_session_results(requested_connection_id, results, server_results); } } } fn handle_metadata_received( &mut self, app_name: String, stream_key: String, metadata: StreamMetadata, server_results: &mut Vec, ) { println!( "New metadata received for app '{}' and stream key '{}'", app_name, stream_key ); let channel = match self.channels.get_mut(&stream_key) { Some(channel) => channel, None => return, }; self.media_sink .send(RtmpInput::Metadata(metadata.clone())) .unwrap(); let metadata = Rc::new(metadata); channel.metadata = Some(metadata.clone()); // Send the metadata to all current watchers for client_id in &channel.watching_client_ids { let client = match self.clients.get_mut(*client_id) { Some(client) => client, None => continue, }; let active_stream_id = match client.get_active_stream_id() { Some(stream_id) => stream_id, None => continue, }; match client .session .send_metadata(active_stream_id, metadata.clone()) { Ok(packet) => server_results.push(ServerResult::OutboundPacket { target_connection_id: client.connection_id, packet, }), Err(error) => { println!( "Error sending metadata to client on connection id {}: {:?}", client.connection_id, error ); server_results.push(ServerResult::DisconnectConnection { connection_id: client.connection_id, }); } } } } fn handle_audio_video_data_received( &mut self, stream_key: String, timestamp: RtmpTimestamp, data: Bytes, data_type: ReceivedDataType, server_results: &mut Vec, ) { let channel = match self.channels.get_mut(&stream_key) { Some(channel) => channel, None => return, }; // If this is an audio or video sequence header we need to save it, so it can be // distributed to any late coming watchers match data_type { ReceivedDataType::Video => { if is_video_sequence_header(data.clone()) { channel.video_sequence_header = Some(data.clone()); } } ReceivedDataType::Audio => { if is_audio_sequence_header(data.clone()) { channel.audio_sequence_header = Some(data.clone()); } } } // send to gstreamer element { let should_send_to_client = match data_type { ReceivedDataType::Video => { self.has_received_keyframe || (is_video_sequence_header(data.clone()) || is_video_keyframe(data.clone())) } ReceivedDataType::Audio => { self.has_received_keyframe || is_audio_sequence_header(data.clone()) } }; if should_send_to_client { match data_type { ReceivedDataType::Audio => self .media_sink .send(RtmpInput::Media(Media { media_type: MediaType::Audio, data: data.clone(), timestamp: timestamp.value, can_be_dropped: true, })) .unwrap(), ReceivedDataType::Video => { if is_video_keyframe(data.clone()) { self.has_received_keyframe = true; } self.media_sink .send(RtmpInput::Media(Media { media_type: MediaType::Video, data: data.clone(), timestamp: timestamp.value, can_be_dropped: true, })) .unwrap(); } }; } } for client_id in &channel.watching_client_ids { let client = match self.clients.get_mut(*client_id) { Some(client) => client, None => continue, }; let active_stream_id = match client.get_active_stream_id() { Some(stream_id) => stream_id, None => continue, }; let should_send_to_client = match data_type { ReceivedDataType::Video => { client.has_received_video_keyframe || (is_video_sequence_header(data.clone()) || is_video_keyframe(data.clone())) } ReceivedDataType::Audio => { client.has_received_video_keyframe || is_audio_sequence_header(data.clone()) } }; if !should_send_to_client { continue; } let send_result = match data_type { ReceivedDataType::Audio => client.session.send_audio_data( active_stream_id, data.clone(), timestamp.clone(), true, ), ReceivedDataType::Video => { if is_video_keyframe(data.clone()) { client.has_received_video_keyframe = true; } client.session.send_video_data( active_stream_id, data.clone(), timestamp.clone(), true, ) } }; match send_result { Ok(packet) => server_results.push(ServerResult::OutboundPacket { target_connection_id: client.connection_id, packet, }), Err(error) => { println!( "Error sending metadata to client on connection id {}: {:?}", client.connection_id, error ); server_results.push(ServerResult::DisconnectConnection { connection_id: client.connection_id, }); } } } } fn publishing_ended(&mut self, stream_key: String) { let channel = match self.channels.get_mut(&stream_key) { Some(channel) => channel, None => return, }; channel.publishing_client_id = None; channel.metadata = None; } fn play_ended(&mut self, client_id: usize, stream_key: String) { let channel = match self.channels.get_mut(&stream_key) { Some(channel) => channel, None => return, }; channel.watching_client_ids.remove(&client_id); } } fn is_video_sequence_header(data: Bytes) -> bool { // This is assuming h264. return data.len() >= 2 && data[0] == 0x17 && data[1] == 0x00; } fn is_audio_sequence_header(data: Bytes) -> bool { // This is assuming aac return data.len() >= 2 && data[0] == 0xaf && data[1] == 0x00; } fn is_video_keyframe(data: Bytes) -> bool { // assumings h264 return data.len() >= 2 && data[0] == 0x17 && data[1] != 0x00; // 0x00 is the sequence header, don't count that for now }