diff --git a/src/rsfilesink.rs b/src/rsfilesink.rs index 5d55f647..e08c8a23 100644 --- a/src/rsfilesink.rs +++ b/src/rsfilesink.rs @@ -26,12 +26,22 @@ use std::sync::Mutex; use utils::*; use rssink::*; +#[derive(Debug)] +struct Settings { + location: Option, +} + +#[derive(Debug)] +enum StreamingState { + Stopped, + Started { file: File, position: u64 }, +} + #[derive(Debug)] pub struct FileSink { controller: SinkController, - location: Mutex>, - file: Option, - position: u64, + settings: Mutex, + streaming_state: Mutex, } unsafe impl Sync for FileSink {} @@ -41,9 +51,8 @@ impl FileSink { pub fn new(controller: SinkController) -> FileSink { FileSink { controller: controller, - location: Mutex::new(None), - file: None, - position: 0, + settings: Mutex::new(Settings { location: None }), + streaming_state: Mutex::new(StreamingState::Stopped), } } @@ -54,24 +63,23 @@ impl FileSink { impl Sink for FileSink { fn set_uri(&self, uri: Option) -> bool { + let ref mut location = self.settings.lock().unwrap().location; + match uri { None => { - let mut location = self.location.lock().unwrap(); *location = None; - return true; + true } Some(ref uri) => { match uri.to_file_path().ok() { Some(p) => { - let mut location = self.location.lock().unwrap(); *location = Some(p); - return true; + true } None => { - let mut location = self.location.lock().unwrap(); *location = None; println_err!("Unsupported file URI '{}'", uri.as_str()); - return false; + false } } } @@ -79,31 +87,36 @@ impl Sink for FileSink { } fn get_uri(&self) -> Option { - let location = self.location.lock().unwrap(); - (*location) - .as_ref() + let ref location = self.settings.lock().unwrap().location; + location.as_ref() .map(|l| Url::from_file_path(l).ok()) .and_then(|i| i) // join() } fn start(&self) -> bool { - self.file = None; - self.position = 0; + let ref location = self.settings.lock().unwrap().location; + let mut streaming_state = self.streaming_state.lock().unwrap(); + + if let StreamingState::Started { .. } = *streaming_state { + return false; + } - let location = self.location.lock().unwrap(); match *location { - None => return false, + None => false, Some(ref location) => { match File::create(location.as_path()) { Ok(file) => { - self.file = Some(file); - return true; + *streaming_state = StreamingState::Started { + file: file, + position: 0, + }; + true } Err(err) => { println_err!("Could not open file for writing '{}': {}", location.to_str().unwrap_or("Non-UTF8 path"), err.to_string()); - return false; + false } } } @@ -111,24 +124,28 @@ impl Sink for FileSink { } fn stop(&self) -> bool { - self.file = None; - self.position = 0; + let mut streaming_state = self.streaming_state.lock().unwrap(); + *streaming_state = StreamingState::Stopped; true } fn render(&self, data: &[u8]) -> GstFlowReturn { - match self.file { - None => return GstFlowReturn::Error, - Some(ref mut f) => { - match f.write_all(data) { - Ok(_) => return GstFlowReturn::Ok, - Err(err) => { - println_err!("Failed to write: {}", err); - return GstFlowReturn::Error; - } + let mut streaming_state = self.streaming_state.lock().unwrap(); + + if let StreamingState::Started { ref mut file, ref mut position } = *streaming_state { + match file.write_all(data) { + Ok(_) => { + *position += data.len() as u64; + return GstFlowReturn::Ok; + } + Err(err) => { + println_err!("Failed to write: {}", err); + return GstFlowReturn::Error; } } + } else { + return GstFlowReturn::Error; } } } diff --git a/src/rsfilesrc.rs b/src/rsfilesrc.rs index 6de2b800..86cfc08a 100644 --- a/src/rsfilesrc.rs +++ b/src/rsfilesrc.rs @@ -27,12 +27,22 @@ use std::io::Write; use utils::*; use rssource::*; +#[derive(Debug)] +struct Settings { + location: Option, +} + +#[derive(Debug)] +enum StreamingState { + Stopped, + Started { file: File, position: u64 }, +} + #[derive(Debug)] pub struct FileSrc { controller: SourceController, - location: Mutex>, - file: Option, - position: u64, + settings: Mutex, + streaming_state: Mutex, } unsafe impl Sync for FileSrc {} @@ -42,9 +52,8 @@ impl FileSrc { pub fn new(controller: SourceController) -> FileSrc { FileSrc { controller: controller, - location: Mutex::new(None), - file: None, - position: 0, + settings: Mutex::new(Settings { location: None }), + streaming_state: Mutex::new(StreamingState::Stopped), } } @@ -54,25 +63,24 @@ impl FileSrc { } impl Source for FileSrc { - fn set_uri(&mut self, uri: Option) -> bool { + fn set_uri(&self, uri: Option) -> bool { + let ref mut location = self.settings.lock().unwrap().location; + match uri { None => { - let mut location = self.location.lock().unwrap(); *location = None; - return true; + true } - Some(uri) => { + Some(ref uri) => { match uri.to_file_path().ok() { Some(p) => { - let mut location = self.location.lock().unwrap(); *location = Some(p); - return true; + true } None => { - let mut location = self.location.lock().unwrap(); *location = None; println_err!("Unsupported file URI '{}'", uri.as_str()); - return false; + false } } } @@ -80,9 +88,8 @@ impl Source for FileSrc { } fn get_uri(&self) -> Option { - let location = self.location.lock().unwrap(); - (*location) - .as_ref() + let ref location = self.settings.lock().unwrap().location; + location.as_ref() .map(|l| Url::from_file_path(l).ok()) .and_then(|i| i) // join() } @@ -92,75 +99,87 @@ impl Source for FileSrc { } fn get_size(&self) -> u64 { - self.file.as_ref() - .map(|f| f.metadata().ok()) - .and_then(|i| i) // join() - .map(|m| m.len()) - .unwrap_or(u64::MAX) + let streaming_state = self.streaming_state.lock().unwrap(); + + if let StreamingState::Started { ref file, .. } = *streaming_state { + file.metadata() + .ok() + .map(|m| m.len()) + .unwrap_or(u64::MAX) + } else { + u64::MAX + } } - fn start(&mut self) -> bool { - self.file = None; - self.position = 0; - let location = self.location.lock().unwrap(); + fn start(&self) -> bool { + let ref location = self.settings.lock().unwrap().location; + let mut streaming_state = self.streaming_state.lock().unwrap(); + + if let StreamingState::Started { .. } = *streaming_state { + return false; + } match *location { - None => return false, + None => false, Some(ref location) => { match File::open(location.as_path()) { Ok(file) => { - self.file = Some(file); - return true; + *streaming_state = StreamingState::Started { + file: file, + position: 0, + }; + true } Err(err) => { - println_err!("Failed to open file '{}': {}", + println_err!("Could not open file for writing '{}': {}", location.to_str().unwrap_or("Non-UTF8 path"), err.to_string()); - return false; + false } } } } } - fn stop(&mut self) -> bool { - self.file = None; - self.position = 0; + fn stop(&self) -> bool { + let mut streaming_state = self.streaming_state.lock().unwrap(); + *streaming_state = StreamingState::Stopped; true } - fn fill(&mut self, offset: u64, data: &mut [u8]) -> Result { - match self.file { - None => return Err(GstFlowReturn::Error), - Some(ref mut f) => { - if self.position != offset { - match f.seek(SeekFrom::Start(offset)) { - Ok(_) => { - self.position = offset; - } - Err(err) => { - println_err!("Failed to seek to {}: {}", offset, err.to_string()); - return Err(GstFlowReturn::Error); - } - } - } + fn fill(&self, offset: u64, data: &mut [u8]) -> Result { + let mut streaming_state = self.streaming_state.lock().unwrap(); - match f.read(data) { - Ok(size) => { - self.position += size as u64; - return Ok(size); + if let StreamingState::Started { ref mut file, ref mut position } = *streaming_state { + if *position != offset { + match file.seek(SeekFrom::Start(offset)) { + Ok(_) => { + *position = offset; } Err(err) => { - println_err!("Failed to read at {}: {}", offset, err.to_string()); + println_err!("Failed to seek to {}: {}", offset, err.to_string()); return Err(GstFlowReturn::Error); } } } + + match file.read(data) { + Ok(size) => { + *position += size as u64; + Ok(size) + } + Err(err) => { + println_err!("Failed to read at {}: {}", offset, err.to_string()); + Err(GstFlowReturn::Error) + } + } + } else { + Err(GstFlowReturn::Error) } } - fn do_seek(&mut self, _: u64, _: u64) -> bool { + fn do_seek(&self, _: u64, _: u64) -> bool { true } } diff --git a/src/rshttpsrc.rs b/src/rshttpsrc.rs index df02d9bb..9c6f804a 100644 --- a/src/rshttpsrc.rs +++ b/src/rshttpsrc.rs @@ -25,22 +25,34 @@ use hyper::client::response::Response; use std::io::Write; use std::sync::Mutex; -use std::sync::atomic::{AtomicBool, Ordering}; use utils::*; use rssource::*; +#[derive(Debug)] +struct Settings { + url: Option, +} + +#[derive(Debug)] +enum StreamingState { + Stopped, + Started { + response: Response, + seekable: bool, + position: u64, + size: u64, + start: u64, + stop: u64, + }, +} + #[derive(Debug)] pub struct HttpSrc { controller: SourceController, - url: Mutex>, + settings: Mutex, + streaming_state: Mutex, client: Client, - response: Option, - seekable: AtomicBool, - position: u64, - size: u64, - start: u64, - stop: u64, } unsafe impl Sync for HttpSrc {} @@ -50,14 +62,9 @@ impl HttpSrc { pub fn new(controller: SourceController) -> HttpSrc { HttpSrc { controller: controller, - url: Mutex::new(None), + settings: Mutex::new(Settings { url: None }), + streaming_state: Mutex::new(StreamingState::Stopped), client: Client::new(), - response: None, - seekable: AtomicBool::new(false), - position: 0, - size: u64::MAX, - start: 0, - stop: u64::MAX, } } @@ -65,15 +72,11 @@ impl HttpSrc { Box::new(HttpSrc::new(controller)) } - pub fn do_request(&mut self, start: u64, stop: u64) -> bool { - self.response = None; - self.seekable.store(false, Ordering::Relaxed); - self.position = 0; - self.size = u64::MAX; + fn do_request(&self, start: u64, stop: u64) -> StreamingState { + let ref url = self.settings.lock().unwrap().url; - let url = self.url.lock().unwrap(); match *url { - None => return false, + None => StreamingState::Stopped, Some(ref url) => { let mut req = self.client.get(url.clone()); @@ -81,15 +84,15 @@ impl HttpSrc { req = if stop == u64::MAX { req.header(Range::Bytes(vec![ByteRangeSpec::AllFrom(start)])) } else { - req.header(Range::Bytes(vec![ByteRangeSpec::FromTo(start, stop)])) + req.header(Range::Bytes(vec![ByteRangeSpec::FromTo(start, stop - 1)])) }; } match req.send() { Ok(response) => { if response.status.is_success() { - self.size = if let Some(&ContentLength(content_length)) = - response.headers.get() { + let size = if let Some(&ContentLength(content_length)) = + response.headers.get() { content_length + start } else { u64::MAX @@ -101,34 +104,35 @@ impl HttpSrc { false }; - self.seekable.store(self.size != u64::MAX && accept_byte_ranges, - Ordering::Relaxed); + let seekable = size != u64::MAX && accept_byte_ranges; - self.start = start; - self.stop = stop; - - self.position = if let Some(&ContentRange(ContentRangeSpec::Bytes{range: Some((range_start, _)), ..})) = response.headers.get() { + let position = if let Some(&ContentRange(ContentRangeSpec::Bytes{range: Some((range_start, _)), ..})) = response.headers.get() { range_start } else { start }; - if self.position != start { - println_err!("Failed to seek to {}: Got {}", start, self.position); - return false; + if position != start { + println_err!("Failed to seek to {}: Got {}", start, position); + StreamingState::Stopped + } else { + StreamingState::Started { + response: response, + seekable: seekable, + position: 0, + size: size, + start: start, + stop: stop, + } } - - self.response = Some(response); - - return true; } else { println_err!("Failed to fetch {}: {}", url, response.status); - return false; + StreamingState::Stopped } } Err(err) => { println_err!("Failed to fetch {}: {}", url, err.to_string()); - return false; + StreamingState::Stopped } } } @@ -137,25 +141,19 @@ impl HttpSrc { } impl Source for HttpSrc { - fn set_uri(&mut self, uri: Option) -> bool { - if self.response.is_some() { - println_err!("Can't set URI after starting"); - return false; - } + fn set_uri(&self, uri: Option) -> bool { + let ref mut url = self.settings.lock().unwrap().url; match uri { None => { - let mut url = self.url.lock().unwrap(); *url = None; return true; } Some(uri) => { if uri.scheme() == "http" || uri.scheme() == "https" { - let mut url = self.url.lock().unwrap(); *url = Some(uri); return true; } else { - let mut url = self.url.lock().unwrap(); *url = None; println_err!("Unsupported URI '{}'", uri.as_str()); return false; @@ -165,67 +163,91 @@ impl Source for HttpSrc { } fn get_uri(&self) -> Option { - let url = self.url.lock().unwrap(); - (*url).as_ref().map(|u| u.clone()) + let ref url = self.settings.lock().unwrap().url; + url.as_ref().map(|u| u.clone()) } fn is_seekable(&self) -> bool { - self.seekable.load(Ordering::Relaxed) + let streaming_state = self.streaming_state.lock().unwrap(); + + match *streaming_state { + StreamingState::Started { seekable, .. } => seekable, + _ => false, + } } fn get_size(&self) -> u64 { - self.size - } - - fn start(&mut self) -> bool { - self.seekable.store(false, Ordering::Relaxed); - return self.do_request(0, u64::MAX); - } - - fn stop(&mut self) -> bool { - self.seekable.store(false, Ordering::Relaxed); - self.position = 0; - self.size = u64::MAX; - match self.response { - Some(ref mut response) => drop(response), - None => (), + let streaming_state = self.streaming_state.lock().unwrap(); + match *streaming_state { + StreamingState::Started { size, .. } => size, + _ => u64::MAX, } - self.response = None; - - return true; } - fn do_seek(&mut self, start: u64, stop: u64) -> bool { - return self.do_request(start, stop); + fn start(&self) -> bool { + let mut streaming_state = self.streaming_state.lock().unwrap(); + *streaming_state = self.do_request(0, u64::MAX); + + if let StreamingState::Stopped = *streaming_state { + false + } else { + true + } } - fn fill(&mut self, offset: u64, data: &mut [u8]) -> Result { - if self.position != offset || self.response.is_none() { - let stop = self.stop; // FIXME: Borrow checker fail - if !self.do_request(offset, stop) { - println_err!("Failed to seek to {}", offset); - return Err(GstFlowReturn::Error); - } + fn stop(&self) -> bool { + let mut streaming_state = self.streaming_state.lock().unwrap(); + *streaming_state = StreamingState::Stopped; + + true + } + + fn do_seek(&self, start: u64, stop: u64) -> bool { + let mut streaming_state = self.streaming_state.lock().unwrap(); + *streaming_state = self.do_request(start, stop); + + if let StreamingState::Stopped = *streaming_state { + false + } else { + true + } + } + + fn fill(&self, offset: u64, data: &mut [u8]) -> Result { + let mut streaming_state = self.streaming_state.lock().unwrap(); + + if let StreamingState::Stopped = *streaming_state { + return Err(GstFlowReturn::Error); } - match self.response { - None => return Err(GstFlowReturn::Error), - Some(ref mut r) => { - match r.read(data) { - Ok(size) => { - if size == 0 { - return Err(GstFlowReturn::Eos); - } - - self.position += size as u64; - return Ok(size); - } - Err(err) => { - println_err!("Failed to read at {}: {}", offset, err.to_string()); - return Err(GstFlowReturn::Error); - } + if let StreamingState::Started { position, stop, .. } = *streaming_state { + if position != offset { + *streaming_state = self.do_request(offset, stop); + if let StreamingState::Stopped = *streaming_state { + println_err!("Failed to seek to {}", offset); + return Err(GstFlowReturn::Error); } } } + + if let StreamingState::Started { ref mut response, ref mut position, .. } = + *streaming_state { + match response.read(data) { + Ok(size) => { + if size == 0 { + return Err(GstFlowReturn::Eos); + } + + *position += size as u64; + Ok(size) + } + Err(err) => { + println_err!("Failed to read at {}: {}", offset, err.to_string()); + Err(GstFlowReturn::Error) + } + } + } else { + Err(GstFlowReturn::Error) + } } } diff --git a/src/rssource.rs b/src/rssource.rs index 8cb5d466..4262af1e 100644 --- a/src/rssource.rs +++ b/src/rssource.rs @@ -39,17 +39,17 @@ impl SourceController { pub trait Source: Sync + Send { // Called from any thread at any time - fn set_uri(&mut self, uri: Option) -> bool; + fn set_uri(&self, uri: Option) -> bool; fn get_uri(&self) -> Option; // Called from any thread between start/stop fn is_seekable(&self) -> bool; // Called from the streaming thread only - fn start(&mut self) -> bool; - fn stop(&mut self) -> bool; - fn fill(&mut self, offset: u64, data: &mut [u8]) -> Result; - fn do_seek(&mut self, start: u64, stop: u64) -> bool; + fn start(&self) -> bool; + fn stop(&self) -> bool; + fn fill(&self, offset: u64, data: &mut [u8]) -> Result; + fn do_seek(&self, start: u64, stop: u64) -> bool; fn get_size(&self) -> u64; }