reqwesthttpsrc: Various cleanups, error handling improvements and better debug output

This commit is contained in:
Sebastian Dröge 2019-10-01 22:34:11 +03:00
parent e90099bad3
commit cecc0804d5

View file

@ -206,14 +206,14 @@ impl ReqwestHttpSrc {
let uri = Url::parse(uri).map_err(|err| {
glib::Error::new(
gst::URIError::BadUri,
format!("Failed to parse URI '{}': {:?}", uri, err,).as_str(),
format!("Failed to parse URI '{}': {:?}", uri, err).as_str(),
)
})?;
if uri.scheme() != "http" && uri.scheme() != "https" {
return Err(glib::Error::new(
gst::URIError::UnsupportedProtocol,
format!("Unsupported URI scheme '{}'", uri.scheme(),).as_str(),
format!("Unsupported URI scheme '{}'", uri.scheme()).as_str(),
));
}
@ -222,82 +222,91 @@ impl ReqwestHttpSrc {
Ok(())
}
fn ensure_client(&self, src: &gst_base::BaseSrc) -> Result<ClientContext, gst::ErrorMessage> {
let mut client_guard = self.client.lock().unwrap();
if let Some(ref client) = *client_guard {
gst_debug!(self.cat, obj: src, "Using already configured client");
return Ok(client.clone());
}
let srcpad = src.get_static_pad("src").unwrap();
let mut q = gst::Query::new_context(REQWEST_CLIENT_CONTEXT);
if srcpad.peer_query(&mut q) {
if let Some(context) = q.get_context_owned() {
src.set_context(&context);
}
} else {
let _ = src.post_message(
&gst::Message::new_need_context(REQWEST_CLIENT_CONTEXT)
.src(Some(src))
.build(),
);
}
if let Some(client) = {
// FIXME: Is there a simpler way to ensure the lock is not hold
// after this block anymore?
let external_client = self.external_client.lock().unwrap();
let client = external_client.as_ref().cloned();
drop(external_client);
client
} {
gst_debug!(self.cat, obj: src, "Using shared client");
*client_guard = Some(client.clone());
return Ok(client);
}
gst_debug!(self.cat, obj: src, "Creating new client");
let client = ClientContext(Arc::new(ClientContextInner {
client: Client::builder()
.cookie_store(true)
.gzip(true)
.build()
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to create Client: {}", err]
)
})?,
}));
gst_debug!(self.cat, obj: src, "Sharing new client with other elements");
let mut context = gst::Context::new(REQWEST_CLIENT_CONTEXT, true);
{
let context = context.get_mut().unwrap();
let s = context.get_mut_structure();
s.set("client", &client);
}
src.set_context(&context);
let _ = src.post_message(
&gst::Message::new_have_context(context)
.src(Some(src))
.build(),
);
*client_guard = Some(client.clone());
Ok(client)
}
fn do_request(
&self,
src: &gst_base::BaseSrc,
uri: Url,
start: u64,
stop: Option<u64>,
) -> Result<State, gst::ErrorMessage> {
) -> Result<State, Option<gst::ErrorMessage>> {
use hyperx::header::{
qitem, AcceptEncoding, AcceptRanges, ByteRangeSpec, Connection, ContentLength,
ContentRange, ContentRangeSpec, Encoding, Headers, Range, RangeUnit, UserAgent,
};
let cat = self.cat;
gst_debug!(self.cat, obj: src, "Creating new request for {}", uri);
let req = {
let mut client_guard = self.client.lock().unwrap();
if client_guard.is_none() {
let srcpad = src.get_static_pad("src").unwrap();
let mut q = gst::Query::new_context(REQWEST_CLIENT_CONTEXT);
if srcpad.peer_query(&mut q) {
if let Some(context) = q.get_context_owned() {
src.set_context(&context);
}
} else {
let _ = src.post_message(
&gst::Message::new_need_context(REQWEST_CLIENT_CONTEXT)
.src(Some(src))
.build(),
);
}
if let Some(client) = {
// FIXME: Is there a simpler way to ensure the lock is not hold
// after this block anymore?
let external_client = self.external_client.lock().unwrap();
let client = external_client.as_ref().map(|c| c.clone());
drop(external_client);
client
} {
gst_debug!(cat, obj: src, "Using shared client");
*client_guard = Some(client);
} else {
gst_debug!(cat, obj: src, "Creating new client");
let client = ClientContext(Arc::new(ClientContextInner {
client: Client::builder()
.cookie_store(true)
.gzip(true)
.build()
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to create Client for {}: {}", uri, err]
)
})?,
}));
gst_debug!(cat, obj: src, "Sharing new client with other elements");
let mut context = gst::Context::new(REQWEST_CLIENT_CONTEXT, true);
{
let context = context.get_mut().unwrap();
let s = context.get_mut_structure();
s.set("client", &client);
}
src.set_context(&context);
let _ = src.post_message(
&gst::Message::new_have_context(context)
.src(Some(src))
.build(),
);
*client_guard = Some(client);
}
} else {
gst_debug!(cat, obj: src, "Using already configured client");
}
client_guard.as_ref().unwrap().0.client.get(uri.clone())
let client = self.ensure_client(src)?;
client.0.client.get(uri.clone())
};
let settings = self.settings.lock().unwrap().clone();
@ -332,58 +341,60 @@ impl ReqwestHttpSrc {
req
};
gst_debug!(cat, obj: src, "Doing new request {:?}", req);
let src_clone = src.clone();
let response_fut = req.send().and_then(move |res| {
gst_debug!(cat, obj: &src_clone, "Response received: {:?}", res);
Ok(res)
});
gst_debug!(self.cat, obj: src, "Sending new request: {:?}", req);
let uri_clone = uri.clone();
let mut response = self
.wait(response_fut.map_err(move |err| {
gst_error_msg!(
gst::ResourceError::Read,
["Failed to fetch {}: {:?}", uri_clone, err]
)
}))
.map_err(|err| {
err.unwrap_or_else(|| {
gst_error_msg!(gst::LibraryError::Failed, ["Interrupted during start"])
})
})?;
let res = self.wait(req.send().map_err(move |err| {
gst_error_msg!(
gst::ResourceError::Read,
["Failed to fetch {}: {:?}", uri_clone, err]
)
}));
if !response.status().is_success() {
match response.status() {
let mut res = match res {
Ok(res) => res,
Err(Some(err)) => {
gst_debug!(self.cat, obj: src, "Error {:?}", err);
return Err(Some(err));
}
Err(None) => {
gst_debug!(self.cat, obj: src, "Flushing");
return Err(None);
}
};
gst_debug!(self.cat, obj: src, "Received response: {:?}", res);
if !res.status().is_success() {
match res.status() {
StatusCode::NOT_FOUND => {
gst_error!(cat, obj: src, "Request status failed: {:?}", response);
return Err(gst_error_msg!(
gst_error!(self.cat, obj: src, "Resource not found");
return Err(Some(gst_error_msg!(
gst::ResourceError::NotFound,
["Request status failed for {}: {}", uri, response.status()]
));
["Resource '{}' not found", uri]
)));
}
StatusCode::UNAUTHORIZED
| StatusCode::PAYMENT_REQUIRED
| StatusCode::FORBIDDEN
| StatusCode::PROXY_AUTHENTICATION_REQUIRED => {
gst_error!(cat, obj: src, "Request status failed: {:?}", response);
return Err(gst_error_msg!(
gst_error!(self.cat, obj: src, "Not authorized: {}", res.status());
return Err(Some(gst_error_msg!(
gst::ResourceError::NotAuthorized,
["Request status failed for {}: {}", uri, response.status()]
));
["Not Authorized for resource '{}': {}", uri, res.status()]
)));
}
_ => {
gst_error!(cat, obj: src, "Request status failed: {:?}", response);
return Err(gst_error_msg!(
gst_error!(self.cat, obj: src, "Request failed: {}", res.status());
return Err(Some(gst_error_msg!(
gst::ResourceError::OpenRead,
["Request status failed for {}: {}", uri, response.status()]
));
["Request for '{}' failed: {}", uri, res.status()]
)));
}
}
}
let headers = Headers::from(response.headers());
let headers = Headers::from(res.headers());
let size = headers.get().map(|&ContentLength(cl)| cl + start);
let accept_byte_ranges = if let Some(&AcceptRanges(ref ranges)) = headers.get() {
@ -405,15 +416,15 @@ impl ReqwestHttpSrc {
};
if position != start {
return Err(gst_error_msg!(
return Err(Some(gst_error_msg!(
gst::ResourceError::Seek,
["Failed to seek to {}: Got {}", start, position]
));
)));
}
gst_debug!(cat, obj: src, "Request successful: {:?}", response);
gst_debug!(self.cat, obj: src, "Request successful");
let body = mem::replace(response.body_mut(), Decoder::empty());
let body = mem::replace(res.body_mut(), Decoder::empty());
Ok(State::Started {
uri,
@ -633,12 +644,19 @@ impl BaseSrcImpl for ReqwestHttpSrc {
})
.map(|uri| uri.clone())?;
*state = self.do_request(src, uri, 0, None)?;
gst_debug!(self.cat, obj: src, "Starting for URI {}", uri);
*state = self.do_request(src, uri, 0, None).map_err(|err| {
err.unwrap_or_else(|| {
gst_error_msg!(gst::LibraryError::Failed, ["Interrupted during start"])
})
})?;
Ok(())
}
fn stop(&self, _src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
fn stop(&self, src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: src, "Stopping");
*self.state.lock().unwrap() = State::Stopped;
Ok(())
@ -666,7 +684,10 @@ impl BaseSrcImpl for ReqwestHttpSrc {
let start = segment.get_start().expect("No start position given");
let stop = segment.get_stop();
gst_debug!(self.cat, obj: src, "Seeking to {}-{:?}", start, stop);
if position == start && old_stop == stop.0 {
gst_debug!(self.cat, obj: src, "No change to current request");
return true;
}
@ -676,10 +697,11 @@ impl BaseSrcImpl for ReqwestHttpSrc {
*state = s;
true
}
Err(err) => {
Err(Some(err)) => {
src.post_error_message(&err);
false
}
Err(None) => false,
}
}
@ -689,7 +711,6 @@ impl BaseSrcImpl for ReqwestHttpSrc {
offset: u64,
_length: u32,
) -> Result<gst::Buffer, gst::FlowError> {
let cat = self.cat;
let mut state = self.state.lock().unwrap();
let (body, position) = match *state {
@ -730,13 +751,26 @@ impl BaseSrcImpl for ReqwestHttpSrc {
};
drop(state);
let res = self.wait(current_body.into_future().map_err(|(err, _body)| {
let res = self.wait(current_body.into_future().map_err(move |(err, _body)| {
gst_error_msg!(
gst::ResourceError::Read,
["Failed to read chunk: {:?}", err]
["Failed to read chunk at offset {}: {:?}", offset, err]
)
}));
let res = match res {
Ok(res) => res,
Err(Some(err)) => {
gst_debug!(self.cat, obj: src, "Error {:?}", err);
src.post_error_message(&err);
return Err(gst::FlowError::Error);
}
Err(None) => {
gst_debug!(self.cat, obj: src, "Flushing");
return Err(gst::FlowError::Flushing);
}
};
let mut state = self.state.lock().unwrap();
let (body, position) = match *state {
State::Started {
@ -752,10 +786,16 @@ impl BaseSrcImpl for ReqwestHttpSrc {
};
match res {
Ok((Some(chunk), current_body)) => {
(Some(chunk), current_body) => {
/* do something with the chunk and store the body again in the state */
gst_debug!(cat, obj: src, "Chunk of {} bytes received", chunk.len());
gst_trace!(
self.cat,
obj: src,
"Chunk of {} bytes received at offset {}",
chunk.len(),
offset
);
let size = chunk.len();
assert_ne!(chunk.len(), 0);
@ -768,28 +808,17 @@ impl BaseSrcImpl for ReqwestHttpSrc {
{
let buffer = buffer.get_mut().unwrap();
buffer.set_offset(offset);
buffer.set_offset_end(offset + size as u64);
}
Ok(buffer)
}
Ok((None, current_body)) => {
(None, current_body) => {
/* No further data, end of stream */
gst_debug!(cat, obj: src, "End of stream");
gst_debug!(self.cat, obj: src, "End of stream");
*body = Some(current_body);
Err(gst::FlowError::Eos)
}
Err(err) => {
/* error */
gst_error!(self.cat, obj: src, "Failed to read: {:?}", err);
gst_element_error!(
src,
gst::ResourceError::Read,
["Failed to read at {}: {:?}", offset, err]
);
Err(gst::FlowError::Error)
}
}
}
}