179 lines
5.4 KiB
Rust
179 lines
5.4 KiB
Rust
use rml_rtmp::handshake::{Handshake, HandshakeProcessResult, PeerType};
|
|
use std::collections::VecDeque;
|
|
use std::io;
|
|
use std::io::{Read, Write};
|
|
use std::net::TcpStream;
|
|
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
|
|
use std::thread;
|
|
use std::time::Duration;
|
|
|
|
const BUFFER_SIZE: usize = 4096;
|
|
|
|
pub enum ReadResult {
|
|
HandshakingInProgress,
|
|
NoBytesReceived,
|
|
BytesReceived {
|
|
buffer: [u8; BUFFER_SIZE],
|
|
byte_count: usize,
|
|
},
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub enum ConnectionError {
|
|
IoError(io::Error),
|
|
SocketClosed,
|
|
}
|
|
|
|
impl From<io::Error> for ConnectionError {
|
|
fn from(error: io::Error) -> Self {
|
|
ConnectionError::IoError(error)
|
|
}
|
|
}
|
|
|
|
pub struct Connection {
|
|
pub connection_id: Option<usize>,
|
|
writer: Sender<Vec<u8>>,
|
|
reader: Receiver<ReadResult>,
|
|
handshake: Handshake,
|
|
handshake_completed: bool,
|
|
}
|
|
|
|
impl Connection {
|
|
pub fn new(socket: TcpStream) -> Connection {
|
|
let (byte_sender, byte_receiver) = channel();
|
|
let (result_sender, result_receiver) = channel();
|
|
|
|
start_byte_writer(byte_receiver, &socket);
|
|
start_result_reader(result_sender, &socket);
|
|
|
|
Connection {
|
|
connection_id: None,
|
|
writer: byte_sender,
|
|
reader: result_receiver,
|
|
handshake: Handshake::new(PeerType::Server),
|
|
handshake_completed: false,
|
|
}
|
|
}
|
|
|
|
pub fn write(&self, bytes: Vec<u8>) {
|
|
self.writer.send(bytes).unwrap();
|
|
}
|
|
|
|
pub fn read(&mut self) -> Result<ReadResult, ConnectionError> {
|
|
match self.reader.try_recv() {
|
|
Err(TryRecvError::Empty) => Ok(ReadResult::NoBytesReceived),
|
|
Err(TryRecvError::Disconnected) => Err(ConnectionError::SocketClosed),
|
|
Ok(result) => match self.handshake_completed {
|
|
true => Ok(result),
|
|
false => match result {
|
|
ReadResult::HandshakingInProgress => unreachable!(),
|
|
ReadResult::NoBytesReceived => Ok(result),
|
|
ReadResult::BytesReceived { buffer, byte_count } => {
|
|
self.handle_handshake_bytes(&buffer[..byte_count])
|
|
}
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
fn handle_handshake_bytes(&mut self, bytes: &[u8]) -> Result<ReadResult, ConnectionError> {
|
|
let result = match self.handshake.process_bytes(bytes) {
|
|
Ok(result) => result,
|
|
Err(error) => {
|
|
println!("Handshake error: {:?}", error);
|
|
return Err(ConnectionError::SocketClosed);
|
|
}
|
|
};
|
|
|
|
match result {
|
|
HandshakeProcessResult::InProgress { response_bytes } => {
|
|
if response_bytes.len() > 0 {
|
|
self.write(response_bytes);
|
|
}
|
|
|
|
Ok(ReadResult::HandshakingInProgress)
|
|
}
|
|
|
|
HandshakeProcessResult::Completed {
|
|
response_bytes,
|
|
remaining_bytes,
|
|
} => {
|
|
println!("Handshake successful!");
|
|
if response_bytes.len() > 0 {
|
|
self.write(response_bytes);
|
|
}
|
|
|
|
let mut buffer = [0; BUFFER_SIZE];
|
|
let buffer_size = remaining_bytes.len();
|
|
for (index, value) in remaining_bytes.into_iter().enumerate() {
|
|
buffer[index] = value;
|
|
}
|
|
|
|
self.handshake_completed = true;
|
|
Ok(ReadResult::BytesReceived {
|
|
buffer,
|
|
byte_count: buffer_size,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn start_byte_writer(byte_receiver: Receiver<Vec<u8>>, socket: &TcpStream) {
|
|
let mut socket = socket.try_clone().expect("failed to clone socket");
|
|
thread::spawn(move || {
|
|
let mut send_queue = VecDeque::new();
|
|
|
|
loop {
|
|
loop {
|
|
match byte_receiver.try_recv() {
|
|
Err(TryRecvError::Empty) => break,
|
|
Err(TryRecvError::Disconnected) => return,
|
|
Ok(bytes) => send_queue.push_back(bytes),
|
|
}
|
|
}
|
|
|
|
match send_queue.pop_front() {
|
|
None => thread::sleep(Duration::from_millis(1)),
|
|
Some(bytes) => match socket.write(&bytes) {
|
|
Ok(_) => (),
|
|
Err(error) => {
|
|
println!("Error writing to socket: {:?}", error);
|
|
return;
|
|
}
|
|
},
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
fn start_result_reader(sender: Sender<ReadResult>, socket: &TcpStream) {
|
|
let mut socket = socket.try_clone().unwrap();
|
|
thread::spawn(move || {
|
|
let mut buffer = [0; BUFFER_SIZE];
|
|
loop {
|
|
match socket.read(&mut buffer) {
|
|
Ok(0) => return, // socket closed
|
|
Ok(read_count) => {
|
|
let mut send_buffer = [0; BUFFER_SIZE];
|
|
for x in 0..read_count {
|
|
send_buffer[x] = buffer[x];
|
|
}
|
|
|
|
let result = ReadResult::BytesReceived {
|
|
buffer: send_buffer,
|
|
byte_count: read_count,
|
|
};
|
|
|
|
sender.send(result).unwrap();
|
|
}
|
|
|
|
Err(error) => {
|
|
println!("Error occurred reading from socket: {:?}", error);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|