Fix up mutability

Rust assumes that there can only be a single mutable reference at any time, as
such make use of interior mutability with a mutex for all state of the
elements.
This commit is contained in:
Sebastian Dröge 2016-08-22 22:25:58 +03:00
parent c7c2676e67
commit 086ec5b68d
4 changed files with 245 additions and 187 deletions

View file

@ -26,12 +26,22 @@ use std::sync::Mutex;
use utils::*; use utils::*;
use rssink::*; use rssink::*;
#[derive(Debug)]
struct Settings {
location: Option<PathBuf>,
}
#[derive(Debug)]
enum StreamingState {
Stopped,
Started { file: File, position: u64 },
}
#[derive(Debug)] #[derive(Debug)]
pub struct FileSink { pub struct FileSink {
controller: SinkController, controller: SinkController,
location: Mutex<Option<PathBuf>>, settings: Mutex<Settings>,
file: Option<File>, streaming_state: Mutex<StreamingState>,
position: u64,
} }
unsafe impl Sync for FileSink {} unsafe impl Sync for FileSink {}
@ -41,9 +51,8 @@ impl FileSink {
pub fn new(controller: SinkController) -> FileSink { pub fn new(controller: SinkController) -> FileSink {
FileSink { FileSink {
controller: controller, controller: controller,
location: Mutex::new(None), settings: Mutex::new(Settings { location: None }),
file: None, streaming_state: Mutex::new(StreamingState::Stopped),
position: 0,
} }
} }
@ -54,24 +63,23 @@ impl FileSink {
impl Sink for FileSink { impl Sink for FileSink {
fn set_uri(&self, uri: Option<Url>) -> bool { fn set_uri(&self, uri: Option<Url>) -> bool {
let ref mut location = self.settings.lock().unwrap().location;
match uri { match uri {
None => { None => {
let mut location = self.location.lock().unwrap();
*location = None; *location = None;
return true; true
} }
Some(ref uri) => { Some(ref uri) => {
match uri.to_file_path().ok() { match uri.to_file_path().ok() {
Some(p) => { Some(p) => {
let mut location = self.location.lock().unwrap();
*location = Some(p); *location = Some(p);
return true; true
} }
None => { None => {
let mut location = self.location.lock().unwrap();
*location = None; *location = None;
println_err!("Unsupported file URI '{}'", uri.as_str()); println_err!("Unsupported file URI '{}'", uri.as_str());
return false; false
} }
} }
} }
@ -79,31 +87,36 @@ impl Sink for FileSink {
} }
fn get_uri(&self) -> Option<Url> { fn get_uri(&self) -> Option<Url> {
let location = self.location.lock().unwrap(); let ref location = self.settings.lock().unwrap().location;
(*location) location.as_ref()
.as_ref()
.map(|l| Url::from_file_path(l).ok()) .map(|l| Url::from_file_path(l).ok())
.and_then(|i| i) // join() .and_then(|i| i) // join()
} }
fn start(&self) -> bool { fn start(&self) -> bool {
self.file = None; let ref location = self.settings.lock().unwrap().location;
self.position = 0; let mut streaming_state = self.streaming_state.lock().unwrap();
if let StreamingState::Started { .. } = *streaming_state {
return false;
}
let location = self.location.lock().unwrap();
match *location { match *location {
None => return false, None => false,
Some(ref location) => { Some(ref location) => {
match File::create(location.as_path()) { match File::create(location.as_path()) {
Ok(file) => { Ok(file) => {
self.file = Some(file); *streaming_state = StreamingState::Started {
return true; file: file,
position: 0,
};
true
} }
Err(err) => { Err(err) => {
println_err!("Could not open file for writing '{}': {}", println_err!("Could not open file for writing '{}': {}",
location.to_str().unwrap_or("Non-UTF8 path"), location.to_str().unwrap_or("Non-UTF8 path"),
err.to_string()); err.to_string());
return false; false
} }
} }
} }
@ -111,24 +124,28 @@ impl Sink for FileSink {
} }
fn stop(&self) -> bool { fn stop(&self) -> bool {
self.file = None; let mut streaming_state = self.streaming_state.lock().unwrap();
self.position = 0; *streaming_state = StreamingState::Stopped;
true true
} }
fn render(&self, data: &[u8]) -> GstFlowReturn { fn render(&self, data: &[u8]) -> GstFlowReturn {
match self.file { let mut streaming_state = self.streaming_state.lock().unwrap();
None => return GstFlowReturn::Error,
Some(ref mut f) => { if let StreamingState::Started { ref mut file, ref mut position } = *streaming_state {
match f.write_all(data) { match file.write_all(data) {
Ok(_) => return GstFlowReturn::Ok, Ok(_) => {
Err(err) => { *position += data.len() as u64;
println_err!("Failed to write: {}", err); return GstFlowReturn::Ok;
return GstFlowReturn::Error; }
} Err(err) => {
println_err!("Failed to write: {}", err);
return GstFlowReturn::Error;
} }
} }
} else {
return GstFlowReturn::Error;
} }
} }
} }

View file

@ -27,12 +27,22 @@ use std::io::Write;
use utils::*; use utils::*;
use rssource::*; use rssource::*;
#[derive(Debug)]
struct Settings {
location: Option<PathBuf>,
}
#[derive(Debug)]
enum StreamingState {
Stopped,
Started { file: File, position: u64 },
}
#[derive(Debug)] #[derive(Debug)]
pub struct FileSrc { pub struct FileSrc {
controller: SourceController, controller: SourceController,
location: Mutex<Option<PathBuf>>, settings: Mutex<Settings>,
file: Option<File>, streaming_state: Mutex<StreamingState>,
position: u64,
} }
unsafe impl Sync for FileSrc {} unsafe impl Sync for FileSrc {}
@ -42,9 +52,8 @@ impl FileSrc {
pub fn new(controller: SourceController) -> FileSrc { pub fn new(controller: SourceController) -> FileSrc {
FileSrc { FileSrc {
controller: controller, controller: controller,
location: Mutex::new(None), settings: Mutex::new(Settings { location: None }),
file: None, streaming_state: Mutex::new(StreamingState::Stopped),
position: 0,
} }
} }
@ -54,25 +63,24 @@ impl FileSrc {
} }
impl Source for FileSrc { impl Source for FileSrc {
fn set_uri(&mut self, uri: Option<Url>) -> bool { fn set_uri(&self, uri: Option<Url>) -> bool {
let ref mut location = self.settings.lock().unwrap().location;
match uri { match uri {
None => { None => {
let mut location = self.location.lock().unwrap();
*location = None; *location = None;
return true; true
} }
Some(uri) => { Some(ref uri) => {
match uri.to_file_path().ok() { match uri.to_file_path().ok() {
Some(p) => { Some(p) => {
let mut location = self.location.lock().unwrap();
*location = Some(p); *location = Some(p);
return true; true
} }
None => { None => {
let mut location = self.location.lock().unwrap();
*location = None; *location = None;
println_err!("Unsupported file URI '{}'", uri.as_str()); println_err!("Unsupported file URI '{}'", uri.as_str());
return false; false
} }
} }
} }
@ -80,9 +88,8 @@ impl Source for FileSrc {
} }
fn get_uri(&self) -> Option<Url> { fn get_uri(&self) -> Option<Url> {
let location = self.location.lock().unwrap(); let ref location = self.settings.lock().unwrap().location;
(*location) location.as_ref()
.as_ref()
.map(|l| Url::from_file_path(l).ok()) .map(|l| Url::from_file_path(l).ok())
.and_then(|i| i) // join() .and_then(|i| i) // join()
} }
@ -92,75 +99,87 @@ impl Source for FileSrc {
} }
fn get_size(&self) -> u64 { fn get_size(&self) -> u64 {
self.file.as_ref() let streaming_state = self.streaming_state.lock().unwrap();
.map(|f| f.metadata().ok())
.and_then(|i| i) // join() if let StreamingState::Started { ref file, .. } = *streaming_state {
.map(|m| m.len()) file.metadata()
.unwrap_or(u64::MAX) .ok()
.map(|m| m.len())
.unwrap_or(u64::MAX)
} else {
u64::MAX
}
} }
fn start(&mut self) -> bool { fn start(&self) -> bool {
self.file = None; let ref location = self.settings.lock().unwrap().location;
self.position = 0; let mut streaming_state = self.streaming_state.lock().unwrap();
let location = self.location.lock().unwrap();
if let StreamingState::Started { .. } = *streaming_state {
return false;
}
match *location { match *location {
None => return false, None => false,
Some(ref location) => { Some(ref location) => {
match File::open(location.as_path()) { match File::open(location.as_path()) {
Ok(file) => { Ok(file) => {
self.file = Some(file); *streaming_state = StreamingState::Started {
return true; file: file,
position: 0,
};
true
} }
Err(err) => { Err(err) => {
println_err!("Failed to open file '{}': {}", println_err!("Could not open file for writing '{}': {}",
location.to_str().unwrap_or("Non-UTF8 path"), location.to_str().unwrap_or("Non-UTF8 path"),
err.to_string()); err.to_string());
return false; false
} }
} }
} }
} }
} }
fn stop(&mut self) -> bool { fn stop(&self) -> bool {
self.file = None; let mut streaming_state = self.streaming_state.lock().unwrap();
self.position = 0; *streaming_state = StreamingState::Stopped;
true true
} }
fn fill(&mut self, offset: u64, data: &mut [u8]) -> Result<usize, GstFlowReturn> { fn fill(&self, offset: u64, data: &mut [u8]) -> Result<usize, GstFlowReturn> {
match self.file { let mut streaming_state = self.streaming_state.lock().unwrap();
None => return Err(GstFlowReturn::Error),
Some(ref mut f) => {
if self.position != offset {
match f.seek(SeekFrom::Start(offset)) {
Ok(_) => {
self.position = offset;
}
Err(err) => {
println_err!("Failed to seek to {}: {}", offset, err.to_string());
return Err(GstFlowReturn::Error);
}
}
}
match f.read(data) { if let StreamingState::Started { ref mut file, ref mut position } = *streaming_state {
Ok(size) => { if *position != offset {
self.position += size as u64; match file.seek(SeekFrom::Start(offset)) {
return Ok(size); Ok(_) => {
*position = offset;
} }
Err(err) => { Err(err) => {
println_err!("Failed to read at {}: {}", offset, err.to_string()); println_err!("Failed to seek to {}: {}", offset, err.to_string());
return Err(GstFlowReturn::Error); return Err(GstFlowReturn::Error);
} }
} }
} }
match file.read(data) {
Ok(size) => {
*position += size as u64;
Ok(size)
}
Err(err) => {
println_err!("Failed to read at {}: {}", offset, err.to_string());
Err(GstFlowReturn::Error)
}
}
} else {
Err(GstFlowReturn::Error)
} }
} }
fn do_seek(&mut self, _: u64, _: u64) -> bool { fn do_seek(&self, _: u64, _: u64) -> bool {
true true
} }
} }

View file

@ -25,22 +25,34 @@ use hyper::client::response::Response;
use std::io::Write; use std::io::Write;
use std::sync::Mutex; use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
use utils::*; use utils::*;
use rssource::*; use rssource::*;
#[derive(Debug)]
struct Settings {
url: Option<Url>,
}
#[derive(Debug)]
enum StreamingState {
Stopped,
Started {
response: Response,
seekable: bool,
position: u64,
size: u64,
start: u64,
stop: u64,
},
}
#[derive(Debug)] #[derive(Debug)]
pub struct HttpSrc { pub struct HttpSrc {
controller: SourceController, controller: SourceController,
url: Mutex<Option<Url>>, settings: Mutex<Settings>,
streaming_state: Mutex<StreamingState>,
client: Client, client: Client,
response: Option<Response>,
seekable: AtomicBool,
position: u64,
size: u64,
start: u64,
stop: u64,
} }
unsafe impl Sync for HttpSrc {} unsafe impl Sync for HttpSrc {}
@ -50,14 +62,9 @@ impl HttpSrc {
pub fn new(controller: SourceController) -> HttpSrc { pub fn new(controller: SourceController) -> HttpSrc {
HttpSrc { HttpSrc {
controller: controller, controller: controller,
url: Mutex::new(None), settings: Mutex::new(Settings { url: None }),
streaming_state: Mutex::new(StreamingState::Stopped),
client: Client::new(), client: Client::new(),
response: None,
seekable: AtomicBool::new(false),
position: 0,
size: u64::MAX,
start: 0,
stop: u64::MAX,
} }
} }
@ -65,15 +72,11 @@ impl HttpSrc {
Box::new(HttpSrc::new(controller)) Box::new(HttpSrc::new(controller))
} }
pub fn do_request(&mut self, start: u64, stop: u64) -> bool { fn do_request(&self, start: u64, stop: u64) -> StreamingState {
self.response = None; let ref url = self.settings.lock().unwrap().url;
self.seekable.store(false, Ordering::Relaxed);
self.position = 0;
self.size = u64::MAX;
let url = self.url.lock().unwrap();
match *url { match *url {
None => return false, None => StreamingState::Stopped,
Some(ref url) => { Some(ref url) => {
let mut req = self.client.get(url.clone()); let mut req = self.client.get(url.clone());
@ -81,15 +84,15 @@ impl HttpSrc {
req = if stop == u64::MAX { req = if stop == u64::MAX {
req.header(Range::Bytes(vec![ByteRangeSpec::AllFrom(start)])) req.header(Range::Bytes(vec![ByteRangeSpec::AllFrom(start)]))
} else { } else {
req.header(Range::Bytes(vec![ByteRangeSpec::FromTo(start, stop)])) req.header(Range::Bytes(vec![ByteRangeSpec::FromTo(start, stop - 1)]))
}; };
} }
match req.send() { match req.send() {
Ok(response) => { Ok(response) => {
if response.status.is_success() { if response.status.is_success() {
self.size = if let Some(&ContentLength(content_length)) = let size = if let Some(&ContentLength(content_length)) =
response.headers.get() { response.headers.get() {
content_length + start content_length + start
} else { } else {
u64::MAX u64::MAX
@ -101,34 +104,35 @@ impl HttpSrc {
false false
}; };
self.seekable.store(self.size != u64::MAX && accept_byte_ranges, let seekable = size != u64::MAX && accept_byte_ranges;
Ordering::Relaxed);
self.start = start; let position = if let Some(&ContentRange(ContentRangeSpec::Bytes{range: Some((range_start, _)), ..})) = response.headers.get() {
self.stop = stop;
self.position = if let Some(&ContentRange(ContentRangeSpec::Bytes{range: Some((range_start, _)), ..})) = response.headers.get() {
range_start range_start
} else { } else {
start start
}; };
if self.position != start { if position != start {
println_err!("Failed to seek to {}: Got {}", start, self.position); println_err!("Failed to seek to {}: Got {}", start, position);
return false; StreamingState::Stopped
} else {
StreamingState::Started {
response: response,
seekable: seekable,
position: 0,
size: size,
start: start,
stop: stop,
}
} }
self.response = Some(response);
return true;
} else { } else {
println_err!("Failed to fetch {}: {}", url, response.status); println_err!("Failed to fetch {}: {}", url, response.status);
return false; StreamingState::Stopped
} }
} }
Err(err) => { Err(err) => {
println_err!("Failed to fetch {}: {}", url, err.to_string()); println_err!("Failed to fetch {}: {}", url, err.to_string());
return false; StreamingState::Stopped
} }
} }
} }
@ -137,25 +141,19 @@ impl HttpSrc {
} }
impl Source for HttpSrc { impl Source for HttpSrc {
fn set_uri(&mut self, uri: Option<Url>) -> bool { fn set_uri(&self, uri: Option<Url>) -> bool {
if self.response.is_some() { let ref mut url = self.settings.lock().unwrap().url;
println_err!("Can't set URI after starting");
return false;
}
match uri { match uri {
None => { None => {
let mut url = self.url.lock().unwrap();
*url = None; *url = None;
return true; return true;
} }
Some(uri) => { Some(uri) => {
if uri.scheme() == "http" || uri.scheme() == "https" { if uri.scheme() == "http" || uri.scheme() == "https" {
let mut url = self.url.lock().unwrap();
*url = Some(uri); *url = Some(uri);
return true; return true;
} else { } else {
let mut url = self.url.lock().unwrap();
*url = None; *url = None;
println_err!("Unsupported URI '{}'", uri.as_str()); println_err!("Unsupported URI '{}'", uri.as_str());
return false; return false;
@ -165,67 +163,91 @@ impl Source for HttpSrc {
} }
fn get_uri(&self) -> Option<Url> { fn get_uri(&self) -> Option<Url> {
let url = self.url.lock().unwrap(); let ref url = self.settings.lock().unwrap().url;
(*url).as_ref().map(|u| u.clone()) url.as_ref().map(|u| u.clone())
} }
fn is_seekable(&self) -> bool { fn is_seekable(&self) -> bool {
self.seekable.load(Ordering::Relaxed) let streaming_state = self.streaming_state.lock().unwrap();
match *streaming_state {
StreamingState::Started { seekable, .. } => seekable,
_ => false,
}
} }
fn get_size(&self) -> u64 { fn get_size(&self) -> u64 {
self.size let streaming_state = self.streaming_state.lock().unwrap();
} match *streaming_state {
StreamingState::Started { size, .. } => size,
fn start(&mut self) -> bool { _ => u64::MAX,
self.seekable.store(false, Ordering::Relaxed);
return self.do_request(0, u64::MAX);
}
fn stop(&mut self) -> bool {
self.seekable.store(false, Ordering::Relaxed);
self.position = 0;
self.size = u64::MAX;
match self.response {
Some(ref mut response) => drop(response),
None => (),
} }
self.response = None;
return true;
} }
fn do_seek(&mut self, start: u64, stop: u64) -> bool { fn start(&self) -> bool {
return self.do_request(start, stop); let mut streaming_state = self.streaming_state.lock().unwrap();
*streaming_state = self.do_request(0, u64::MAX);
if let StreamingState::Stopped = *streaming_state {
false
} else {
true
}
} }
fn fill(&mut self, offset: u64, data: &mut [u8]) -> Result<usize, GstFlowReturn> { fn stop(&self) -> bool {
if self.position != offset || self.response.is_none() { let mut streaming_state = self.streaming_state.lock().unwrap();
let stop = self.stop; // FIXME: Borrow checker fail *streaming_state = StreamingState::Stopped;
if !self.do_request(offset, stop) {
println_err!("Failed to seek to {}", offset); true
return Err(GstFlowReturn::Error); }
}
fn do_seek(&self, start: u64, stop: u64) -> bool {
let mut streaming_state = self.streaming_state.lock().unwrap();
*streaming_state = self.do_request(start, stop);
if let StreamingState::Stopped = *streaming_state {
false
} else {
true
}
}
fn fill(&self, offset: u64, data: &mut [u8]) -> Result<usize, GstFlowReturn> {
let mut streaming_state = self.streaming_state.lock().unwrap();
if let StreamingState::Stopped = *streaming_state {
return Err(GstFlowReturn::Error);
} }
match self.response { if let StreamingState::Started { position, stop, .. } = *streaming_state {
None => return Err(GstFlowReturn::Error), if position != offset {
Some(ref mut r) => { *streaming_state = self.do_request(offset, stop);
match r.read(data) { if let StreamingState::Stopped = *streaming_state {
Ok(size) => { println_err!("Failed to seek to {}", offset);
if size == 0 { return Err(GstFlowReturn::Error);
return Err(GstFlowReturn::Eos);
}
self.position += size as u64;
return Ok(size);
}
Err(err) => {
println_err!("Failed to read at {}: {}", offset, err.to_string());
return Err(GstFlowReturn::Error);
}
} }
} }
} }
if let StreamingState::Started { ref mut response, ref mut position, .. } =
*streaming_state {
match response.read(data) {
Ok(size) => {
if size == 0 {
return Err(GstFlowReturn::Eos);
}
*position += size as u64;
Ok(size)
}
Err(err) => {
println_err!("Failed to read at {}: {}", offset, err.to_string());
Err(GstFlowReturn::Error)
}
}
} else {
Err(GstFlowReturn::Error)
}
} }
} }

View file

@ -39,17 +39,17 @@ impl SourceController {
pub trait Source: Sync + Send { pub trait Source: Sync + Send {
// Called from any thread at any time // Called from any thread at any time
fn set_uri(&mut self, uri: Option<Url>) -> bool; fn set_uri(&self, uri: Option<Url>) -> bool;
fn get_uri(&self) -> Option<Url>; fn get_uri(&self) -> Option<Url>;
// Called from any thread between start/stop // Called from any thread between start/stop
fn is_seekable(&self) -> bool; fn is_seekable(&self) -> bool;
// Called from the streaming thread only // Called from the streaming thread only
fn start(&mut self) -> bool; fn start(&self) -> bool;
fn stop(&mut self) -> bool; fn stop(&self) -> bool;
fn fill(&mut self, offset: u64, data: &mut [u8]) -> Result<usize, GstFlowReturn>; fn fill(&self, offset: u64, data: &mut [u8]) -> Result<usize, GstFlowReturn>;
fn do_seek(&mut self, start: u64, stop: u64) -> bool; fn do_seek(&self, start: u64, stop: u64) -> bool;
fn get_size(&self) -> u64; fn get_size(&self) -> u64;
} }