From 5b60ecbb1893c146cc798918308d87808ebb47d4 Mon Sep 17 00:00:00 2001 From: Sanchayan Maity Date: Mon, 31 Jul 2023 11:23:32 +0530 Subject: [PATCH] net: webrtc/webrtchttp: Fix canceller usage Commit 08b6251a added the check to ensure only one canceller at a time for net/webrtc. In `whipsink` and since `whipwebrtcsink` picked up the same implementation, there exists a bug around the use of canceller. `whipsink` calls `wait_async` while passing the canceller as an argument. The path `send_offer -> do_post -> parse_endpoint_response` results in the canceller being replaced in each subsequent call to `wait_async`. Since `wait_async` call does not ensure one canceller, with the async call the use of canceller/abort was subtly broken. Similarly, for `whepsrc`. We really don't need to use `wait_async` inside `do_post` for any `await` calls. If the root future viz. `do_post` with `wait_async` is aborted, the child futures will be taken care of. Part-of: --- net/webrtc/src/whip_signaller/imp.rs | 86 ++++++------------ net/webrtchttp/src/utils.rs | 6 ++ net/webrtchttp/src/whepsrc/imp.rs | 129 +++++++++++---------------- net/webrtchttp/src/whipsink/imp.rs | 92 +++++++------------ 4 files changed, 116 insertions(+), 197 deletions(-) diff --git a/net/webrtc/src/whip_signaller/imp.rs b/net/webrtc/src/whip_signaller/imp.rs index 765de4e8..4cfbc126 100644 --- a/net/webrtc/src/whip_signaller/imp.rs +++ b/net/webrtc/src/whip_signaller/imp.rs @@ -124,14 +124,12 @@ impl Signaller { async fn do_post(&self, offer: gst_webrtc::WebRTCSessionDescription, webrtcbin: &gst::Element) { let auth_token; let endpoint; - let timeout; { let settings = self.settings.lock().unwrap(); endpoint = reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str()).unwrap(); auth_token = settings.auth_token.clone(); - timeout = settings.timeout; drop(settings); } @@ -174,33 +172,19 @@ impl Signaller { ); } - let res = wait_async( - &self.canceller, - client - .request(reqwest::Method::POST, endpoint.clone()) - .headers(headermap) - .body(body) - .send(), - timeout, - ) - .await; + let res = client + .request(reqwest::Method::POST, endpoint.clone()) + .headers(headermap) + .body(body) + .send() + .await; match res { - Ok(r) => match r { - Ok(resp) => { - if let Err(e) = wait_async( - &self.canceller, - self.parse_endpoint_response(offer, resp, redirects, webrtcbin), - timeout, - ) + Ok(resp) => { + self.parse_endpoint_response(offer, resp, redirects, webrtcbin) .await - { - self.handle_future_error(e); - } - } - Err(err) => self.raise_error(err.to_string()), - }, - Err(err) => self.handle_future_error(err), + } + Err(err) => self.raise_error(err.to_string()), } } @@ -214,7 +198,6 @@ impl Signaller { gst::debug!(CAT, imp: self, "Parsing endpoint response"); let endpoint; - let timeout; let use_link_headers; { @@ -222,7 +205,6 @@ impl Signaller { endpoint = reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str()).unwrap(); use_link_headers = settings.use_link_headers; - timeout = settings.timeout; drop(settings); } @@ -288,26 +270,21 @@ impl Signaller { drop(state); } - match wait_async(&self.canceller, resp.bytes(), timeout).await { - Ok(res) => match res { - Ok(ans_bytes) => match gst_sdp::SDPMessage::parse_buffer(&ans_bytes) { - Ok(ans_sdp) => { - let answer = gst_webrtc::WebRTCSessionDescription::new( - gst_webrtc::WebRTCSDPType::Answer, - ans_sdp, - ); - self.obj().emit_by_name::<()>( - "session-description", - &[&"unique", &answer], - ); - } - Err(err) => { - self.raise_error(format!("Could not parse answer SDP: {err}")); - } - }, - Err(err) => self.raise_error(err.to_string()), + match resp.bytes().await { + Ok(ans_bytes) => match gst_sdp::SDPMessage::parse_buffer(&ans_bytes) { + Ok(ans_sdp) => { + let answer = gst_webrtc::WebRTCSessionDescription::new( + gst_webrtc::WebRTCSDPType::Answer, + ans_sdp, + ); + self.obj() + .emit_by_name::<()>("session-description", &[&"unique", &answer]); + } + Err(err) => { + self.raise_error(format!("Could not parse answer SDP: {err}")); + } }, - Err(err) => self.handle_future_error(err), + Err(err) => self.raise_error(err.to_string()), } } @@ -347,12 +324,7 @@ impl Signaller { redirect_url.as_str() ); - if let Err(err) = - wait_async(&self.canceller, self.do_post(offer, webrtcbin), timeout) - .await - { - self.handle_future_error(err); - } + self.do_post(offer, webrtcbin).await } Err(e) => self.raise_error(e.to_string()), } @@ -362,16 +334,14 @@ impl Signaller { } s => { - match wait_async(&self.canceller, resp.bytes(), timeout).await { + match resp.bytes().await { Ok(r) => { - let res = r - .map(|x| x.escape_ascii().to_string()) - .unwrap_or_else(|_| "(no further details)".to_string()); + let res = r.escape_ascii().to_string(); // FIXME: Check and handle 'Retry-After' header in case of server error self.raise_error(format!("Unexpected response: {} - {}", s.as_str(), res)); } - Err(err) => self.handle_future_error(err), + Err(err) => self.raise_error(err.to_string()), } } } diff --git a/net/webrtchttp/src/utils.rs b/net/webrtchttp/src/utils.rs index 34cb203c..9861cefd 100644 --- a/net/webrtchttp/src/utils.rs +++ b/net/webrtchttp/src/utils.rs @@ -35,6 +35,12 @@ where let (abort_handle, abort_registration) = future::AbortHandle::new_pair(); { let mut canceller_guard = canceller.lock().unwrap(); + if canceller_guard.is_some() { + return Err(WaitError::FutureError(gst::error_msg!( + gst::ResourceError::Failed, + ["Old Canceller should not exist"] + ))); + } canceller_guard.replace(abort_handle); drop(canceller_guard); } diff --git a/net/webrtchttp/src/whepsrc/imp.rs b/net/webrtchttp/src/whepsrc/imp.rs index 9cdf4f39..18a81e46 100644 --- a/net/webrtchttp/src/whepsrc/imp.rs +++ b/net/webrtchttp/src/whepsrc/imp.rs @@ -626,7 +626,6 @@ impl WhepSrc { redirects: u8, ) { let endpoint; - let timeout; let use_link_headers; { @@ -634,7 +633,6 @@ impl WhepSrc { endpoint = reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str()).unwrap(); use_link_headers = settings.use_link_headers; - timeout = settings.timeout; drop(settings); } @@ -692,29 +690,26 @@ impl WhepSrc { } }; - match wait_async(&self.canceller, resp.bytes(), timeout).await { - Ok(res) => match res { - Ok(ans_bytes) => { - let mut state = self.state.lock().unwrap(); - *state = match *state { - State::Post { redirects: _r } => State::Running { - whep_resource: url.to_string(), - }, - _ => { - self.raise_error( - gst::ResourceError::Failed, - "Expected to be in POST state".to_string(), - ); - return; - } - }; - drop(state); + match resp.bytes().await { + Ok(ans_bytes) => { + let mut state = self.state.lock().unwrap(); + *state = match *state { + State::Post { redirects: _r } => State::Running { + whep_resource: url.to_string(), + }, + _ => { + self.raise_error( + gst::ResourceError::Failed, + "Expected to be in POST state".to_string(), + ); + return; + } + }; + drop(state); - self.sdp_message_parse(ans_bytes) - } - Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()), - }, - Err(err) => self.handle_future_error(err), + self.sdp_message_parse(ans_bytes) + } + Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()), } } @@ -753,11 +748,7 @@ impl WhepSrc { redirect_url.as_str() ); - if let Err(err) = - wait_async(&self.canceller, self.do_post(sess_desc), timeout).await - { - self.handle_future_error(err); - } + self.do_post(sess_desc).await } Err(e) => self.raise_error(gst::ResourceError::Failed, e.to_string()), } @@ -770,11 +761,9 @@ impl WhepSrc { } s => { - match wait_async(&self.canceller, resp.bytes(), timeout).await { + match resp.bytes().await { Ok(r) => { - let res = r - .map(|x| x.escape_ascii().to_string()) - .unwrap_or_else(|_| "(no further details)".to_string()); + let res = r.escape_ascii().to_string(); // FIXME: Check and handle 'Retry-After' header in case of server error self.raise_error( @@ -782,7 +771,7 @@ impl WhepSrc { format!("Unexpected response: {} - {}", s.as_str(), res), ); } - Err(err) => self.handle_future_error(err), + Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()), } } } @@ -944,14 +933,12 @@ impl WhepSrc { async fn do_post(&self, offer: WebRTCSessionDescription) { let auth_token; let endpoint; - let timeout; { let settings = self.settings.lock().unwrap(); endpoint = reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str()).unwrap(); auth_token = settings.auth_token.clone(); - timeout = settings.timeout; drop(settings); } @@ -982,51 +969,37 @@ impl WhepSrc { endpoint.as_str() ); - let res = wait_async( - &self.canceller, - self.client - .request(reqwest::Method::POST, endpoint.clone()) - .headers(headermap) - .body(body) - .send(), - timeout, - ) - .await; + let resp = self + .client + .request(reqwest::Method::POST, endpoint.clone()) + .headers(headermap) + .body(body) + .send() + .await; - match res { - Ok(resp) => match resp { - Ok(r) => { - #[allow(unused_mut)] - let mut redirects; + match resp { + Ok(r) => { + #[allow(unused_mut)] + let mut redirects; - { - let state = self.state.lock().unwrap(); - redirects = match *state { - State::Post { redirects } => redirects, - _ => { - self.raise_error( - gst::ResourceError::Failed, - "Trying to do POST in unexpected state".to_string(), - ); - return; - } - }; - drop(state); - } - - if let Err(e) = wait_async( - &self.canceller, - self.parse_endpoint_response(offer, r, redirects), - timeout, - ) - .await - { - self.handle_future_error(e); - } + { + let state = self.state.lock().unwrap(); + redirects = match *state { + State::Post { redirects } => redirects, + _ => { + self.raise_error( + gst::ResourceError::Failed, + "Trying to do POST in unexpected state".to_string(), + ); + return; + } + }; + drop(state); } - Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()), - }, - Err(err) => self.handle_future_error(err), + + self.parse_endpoint_response(offer, r, redirects).await + } + Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()), } } diff --git a/net/webrtchttp/src/whipsink/imp.rs b/net/webrtchttp/src/whipsink/imp.rs index be2e2894..34f7914d 100644 --- a/net/webrtchttp/src/whipsink/imp.rs +++ b/net/webrtchttp/src/whipsink/imp.rs @@ -565,14 +565,12 @@ impl WhipSink { async fn do_post(&self, offer: gst_webrtc::WebRTCSessionDescription) { let auth_token; let endpoint; - let timeout; { let settings = self.settings.lock().unwrap(); endpoint = reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str()).unwrap(); auth_token = settings.auth_token.clone(); - timeout = settings.timeout; drop(settings); } @@ -618,33 +616,16 @@ impl WhipSink { ); } - let res = wait_async( - &self.canceller, - client - .request(reqwest::Method::POST, endpoint.clone()) - .headers(headermap) - .body(body) - .send(), - timeout, - ) - .await; + let res = client + .request(reqwest::Method::POST, endpoint.clone()) + .headers(headermap) + .body(body) + .send() + .await; match res { - Ok(r) => match r { - Ok(resp) => { - if let Err(e) = wait_async( - &self.canceller, - self.parse_endpoint_response(offer, resp, redirects), - timeout, - ) - .await - { - self.handle_future_error(e); - } - } - Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()), - }, - Err(err) => self.handle_future_error(err), + Ok(resp) => self.parse_endpoint_response(offer, resp, redirects).await, + Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()), } } @@ -655,7 +636,6 @@ impl WhipSink { redirects: u8, ) { let endpoint; - let timeout; let use_link_headers; { @@ -663,7 +643,6 @@ impl WhipSink { endpoint = reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str()).unwrap(); use_link_headers = settings.use_link_headers; - timeout = settings.timeout; drop(settings); } @@ -737,29 +716,26 @@ impl WhipSink { drop(state); } - match wait_async(&self.canceller, resp.bytes(), timeout).await { - Ok(res) => match res { - Ok(ans_bytes) => match sdp_message::SDPMessage::parse_buffer(&ans_bytes) { - Ok(ans_sdp) => { - let answer = gst_webrtc::WebRTCSessionDescription::new( - gst_webrtc::WebRTCSDPType::Answer, - ans_sdp, - ); - self.webrtcbin.emit_by_name::<()>( - "set-remote-description", - &[&answer, &None::], - ); - } - Err(err) => { - self.raise_error( - gst::ResourceError::Failed, - format!("Could not parse answer SDP: {err}"), - ); - } - }, - Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()), + match resp.bytes().await { + Ok(ans_bytes) => match sdp_message::SDPMessage::parse_buffer(&ans_bytes) { + Ok(ans_sdp) => { + let answer = gst_webrtc::WebRTCSessionDescription::new( + gst_webrtc::WebRTCSDPType::Answer, + ans_sdp, + ); + self.webrtcbin.emit_by_name::<()>( + "set-remote-description", + &[&answer, &None::], + ); + } + Err(err) => { + self.raise_error( + gst::ResourceError::Failed, + format!("Could not parse answer SDP: {err}"), + ); + } }, - Err(err) => self.handle_future_error(err), + Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()), } } @@ -798,11 +774,7 @@ impl WhipSink { redirect_url.as_str() ); - if let Err(err) = - wait_async(&self.canceller, self.do_post(offer), timeout).await - { - self.handle_future_error(err); - } + self.do_post(offer).await } Err(e) => self.raise_error(gst::ResourceError::Failed, e.to_string()), } @@ -815,11 +787,9 @@ impl WhipSink { } s => { - match wait_async(&self.canceller, resp.bytes(), timeout).await { + match resp.bytes().await { Ok(r) => { - let res = r - .map(|x| x.escape_ascii().to_string()) - .unwrap_or_else(|_| "(no further details)".to_string()); + let res = r.escape_ascii().to_string(); // FIXME: Check and handle 'Retry-After' header in case of server error self.raise_error( @@ -827,7 +797,7 @@ impl WhipSink { format!("Unexpected response: {} - {}", s.as_str(), res), ); } - Err(err) => self.handle_future_error(err), + Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()), } } }