gst-plugins-rs/gst-plugin-http/src/httpsrc.rs

282 lines
7.6 KiB
Rust
Raw Normal View History

// Copyright (C) 2016-2017 Sebastian Dröge <sebastian@centricular.com>
2016-05-15 15:54:09 +00:00
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
2016-05-15 15:54:09 +00:00
2018-07-27 10:35:58 +00:00
use reqwest::header::{
AcceptRanges, ByteRangeSpec, ContentLength, ContentRange, ContentRangeSpec, Range, RangeUnit,
};
2018-04-05 08:35:34 +00:00
use reqwest::{Client, Response};
use std::io::Read;
use std::u64;
use url::Url;
use gst_plugin::error::*;
use gst_plugin_simple::error::*;
use gst_plugin_simple::source::*;
2018-05-01 14:16:12 +00:00
use gst_plugin_simple::UriValidator;
2016-12-25 11:16:12 +00:00
use gst;
#[derive(Debug)]
enum StreamingState {
Stopped,
Started {
uri: Url,
response: Response,
seekable: bool,
position: u64,
size: Option<u64>,
start: u64,
stop: Option<u64>,
},
}
#[derive(Debug)]
pub struct HttpSrc {
streaming_state: StreamingState,
cat: gst::DebugCategory,
client: Client,
}
impl HttpSrc {
pub fn new(_src: &BaseSrc) -> HttpSrc {
2016-07-20 08:28:58 +00:00
HttpSrc {
streaming_state: StreamingState::Stopped,
cat: gst::DebugCategory::new(
"rshttpsrc",
gst::DebugColorFlags::empty(),
"Rust HTTP source",
2017-07-31 13:36:35 +00:00
),
2017-10-25 12:58:41 +00:00
client: Client::new(),
2016-07-20 08:28:58 +00:00
}
}
pub fn new_boxed(src: &BaseSrc) -> Box<SourceImpl> {
Box::new(HttpSrc::new(src))
}
2016-05-15 08:48:54 +00:00
2017-07-31 13:36:35 +00:00
fn do_request(
&self,
src: &BaseSrc,
2017-07-31 13:36:35 +00:00
uri: Url,
start: u64,
stop: Option<u64>,
) -> Result<StreamingState, gst::ErrorMessage> {
let cat = self.cat;
2017-10-25 12:58:41 +00:00
let mut req = self.client.get(uri.clone());
match (start != 0, stop) {
(false, None) => (),
2017-07-31 13:29:11 +00:00
(true, None) => {
req.header(Range::Bytes(vec![ByteRangeSpec::AllFrom(start)]));
}
(_, Some(stop)) => {
2017-07-31 13:29:11 +00:00
req.header(Range::Bytes(vec![ByteRangeSpec::FromTo(start, stop - 1)]));
}
}
gst_debug!(cat, obj: src, "Doing new request {:?}", req);
2016-12-27 15:47:39 +00:00
2017-07-31 13:36:35 +00:00
let response = try!(req.send().or_else(|err| {
gst_error!(cat, obj: src, "Request failed: {:?}", err);
Err(gst_error_msg!(
gst::ResourceError::Read,
2017-07-31 13:36:35 +00:00
["Failed to fetch {}: {}", uri, err.to_string()]
))
}));
2016-11-14 18:57:54 +00:00
if !response.status().is_success() {
gst_error!(cat, obj: src, "Request status failed: {:?}", response);
return Err(gst_error_msg!(
gst::ResourceError::Read,
2017-07-31 13:36:35 +00:00
["Failed to fetch {}: {}", uri, response.status()]
));
}
2017-04-12 13:46:11 +00:00
let size = response
.headers()
.get()
.map(|&ContentLength(cl)| cl + start);
2017-07-31 13:36:35 +00:00
let accept_byte_ranges = if let Some(&AcceptRanges(ref ranges)) = response.headers().get() {
ranges.iter().any(|u| *u == RangeUnit::Bytes)
} else {
false
};
let seekable = size.is_some() && accept_byte_ranges;
let position = if let Some(&ContentRange(ContentRangeSpec::Bytes {
range: Some((range_start, _)),
..
})) = response.headers().get()
2017-07-31 13:36:35 +00:00
{
range_start
} else {
start
};
if position != start {
return Err(gst_error_msg!(
gst::ResourceError::Seek,
2017-07-31 13:36:35 +00:00
["Failed to seek to {}: Got {}", start, position]
));
2016-05-15 08:48:54 +00:00
}
gst_debug!(cat, obj: src, "Request successful: {:?}", response);
2016-12-27 15:47:39 +00:00
Ok(StreamingState::Started {
2018-10-11 10:49:10 +00:00
uri,
response,
seekable,
2017-07-31 13:36:35 +00:00
position: 0,
2018-10-11 10:49:10 +00:00
size,
start,
stop,
2017-07-31 13:36:35 +00:00
})
2016-05-15 08:48:54 +00:00
}
}
fn validate_uri(uri: &Url) -> Result<(), UriError> {
if uri.scheme() != "http" && uri.scheme() != "https" {
2017-07-31 13:36:35 +00:00
return Err(UriError::new(
gst::URIError::UnsupportedProtocol,
format!("Unsupported URI '{}'", uri.as_str()),
2017-07-31 13:36:35 +00:00
));
}
Ok(())
}
impl SourceImpl for HttpSrc {
fn uri_validator(&self) -> Box<UriValidator> {
Box::new(validate_uri)
}
fn is_seekable(&self, _src: &BaseSrc) -> bool {
match self.streaming_state {
StreamingState::Started { seekable, .. } => seekable,
_ => false,
}
}
fn get_size(&self, _src: &BaseSrc) -> Option<u64> {
match self.streaming_state {
StreamingState::Started { size, .. } => size,
_ => None,
}
}
fn start(&mut self, src: &BaseSrc, uri: Url) -> Result<(), gst::ErrorMessage> {
self.streaming_state = StreamingState::Stopped;
self.streaming_state = try!(self.do_request(src, uri, 0, None));
Ok(())
}
fn stop(&mut self, _src: &BaseSrc) -> Result<(), gst::ErrorMessage> {
self.streaming_state = StreamingState::Stopped;
Ok(())
}
fn seek(
&mut self,
src: &BaseSrc,
start: u64,
stop: Option<u64>,
) -> Result<(), gst::ErrorMessage> {
let (position, old_stop, uri) = match self.streaming_state {
2017-04-12 13:46:11 +00:00
StreamingState::Started {
position,
stop,
ref uri,
..
} => (position, stop, uri.clone()),
StreamingState::Stopped => {
return Err(gst_error_msg!(
gst::LibraryError::Failed,
["Not started yet"]
));
}
};
if position == start && old_stop == stop {
return Ok(());
}
self.streaming_state = StreamingState::Stopped;
self.streaming_state = try!(self.do_request(src, uri, start, stop));
Ok(())
2016-05-15 08:48:54 +00:00
}
fn fill(
&mut self,
src: &BaseSrc,
offset: u64,
_: u32,
buffer: &mut gst::BufferRef,
) -> Result<(), FlowError> {
let cat = self.cat;
2016-12-27 15:47:39 +00:00
let (response, position) = match self.streaming_state {
2017-04-12 13:46:11 +00:00
StreamingState::Started {
ref mut response,
ref mut position,
..
} => (response, position),
StreamingState::Stopped => {
return Err(FlowError::Error(gst_error_msg!(
2017-12-20 17:30:32 +00:00
gst::LibraryError::Failed,
["Not started yet"]
)));
}
};
if *position != offset {
return Err(FlowError::Error(gst_error_msg!(
gst::ResourceError::Seek,
2017-07-31 13:36:35 +00:00
["Got unexpected offset {}, expected {}", offset, position]
)));
}
let size = {
let mut map = match buffer.map_writable() {
None => {
return Err(FlowError::Error(gst_error_msg!(
gst::LibraryError::Failed,
["Failed to map buffer"]
)));
}
Some(map) => map,
};
let data = map.as_mut_slice();
2017-07-31 13:36:35 +00:00
try!(response.read(data).or_else(|err| {
gst_error!(cat, obj: src, "Failed to read: {:?}", err);
Err(FlowError::Error(gst_error_msg!(
gst::ResourceError::Read,
2017-07-31 13:36:35 +00:00
["Failed to read at {}: {}", offset, err.to_string()]
)))
}))
};
if size == 0 {
return Err(FlowError::Eos);
}
*position += size as u64;
buffer.set_size(size);
Ok(())
}
}