From 91bfd0f7c3aac546aa77aa985e4a9404411925fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Sun, 21 Jan 2024 22:09:41 +0100 Subject: [PATCH] webrtc: signallers: attempt to close the ws when an error occurs This commit discards the early error returns in the send tasks to log the error and attempt to close the websocket. Part-of: --- net/webrtc/signalling/src/server/mod.rs | 18 ++++++++++++----- net/webrtc/src/aws_kvs_signaller/imp.rs | 25 ++++++++++++++++++------ net/webrtc/src/janusvr_signaller/imp.rs | 26 ++++++++++++++++--------- net/webrtc/src/signaller/imp.rs | 21 +++++++++++++------- 4 files changed, 63 insertions(+), 27 deletions(-) diff --git a/net/webrtc/signalling/src/server/mod.rs b/net/webrtc/signalling/src/server/mod.rs index 700ae4dd..3b17fd3d 100644 --- a/net/webrtc/signalling/src/server/mod.rs +++ b/net/webrtc/signalling/src/server/mod.rs @@ -10,7 +10,7 @@ use std::pin::Pin; use std::sync::{Arc, Mutex}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::task; -use tracing::{info, instrument, trace, warn}; +use tracing::{debug, error, info, instrument, trace, warn}; struct Peer { receive_task_handle: task::JoinHandle<()>, @@ -141,6 +141,7 @@ impl Server { let this_id_clone = this_id.clone(); let (mut ws_sink, mut ws_stream) = ws.split(); let send_task_handle = task::spawn(async move { + let mut res = Ok(()); loop { match tokio::time::timeout( std::time::Duration::from_secs(30), @@ -150,21 +151,28 @@ impl Server { { Ok(Some(msg)) => { trace!(this_id = %this_id_clone, "sending {}", msg); - ws_sink.send(WsMessage::Text(msg)).await?; + res = ws_sink.send(WsMessage::Text(msg)).await; } Ok(None) => { break; } Err(_) => { trace!(this_id = %this_id_clone, "timeout, sending ping"); - ws_sink.send(WsMessage::Ping(vec![])).await?; + res = ws_sink.send(WsMessage::Ping(vec![])).await; } } + + if let Err(ref err) = res { + error!(this_id = %this_id_clone, "Quitting send loop: {err}"); + break; + } } - ws_sink.close().await?; + debug!(this_id = %this_id_clone, "Done sending"); - Ok::<(), Error>(()) + let _ = ws_sink.close().await; + + res.map_err(Into::into) }); let mut tx = self.state.lock().unwrap().tx.clone(); diff --git a/net/webrtc/src/aws_kvs_signaller/imp.rs b/net/webrtc/src/aws_kvs_signaller/imp.rs index 7946e244..e46d0330 100644 --- a/net/webrtc/src/aws_kvs_signaller/imp.rs +++ b/net/webrtc/src/aws_kvs_signaller/imp.rs @@ -424,6 +424,7 @@ impl Signaller { let imp = self.downgrade(); let ping_timeout = settings.ping_timeout; let send_task_handle = task::spawn(async move { + let mut res = Ok(()); loop { match tokio::time::timeout( std::time::Duration::from_secs(ping_timeout as u64), @@ -440,26 +441,38 @@ impl Signaller { serde_json::to_string(&msg).unwrap() ); } - ws_sink + res = ws_sink .send(WsMessage::Text(serde_json::to_string(&msg).unwrap())) - .await?; + .await; } Ok(None) => { break; } Err(_) => { - ws_sink.send(WsMessage::Ping(vec![])).await?; + res = ws_sink.send(WsMessage::Ping(vec![])).await; } } + + if let Err(ref err) = res { + if let Some(imp) = imp.upgrade() { + gst::error!(CAT, imp: imp, "Quitting send loop: {err}"); + } else { + gst::error!(CAT, "Quitting send loop: {err}"); + } + + break; + } } if let Some(imp) = imp.upgrade() { - gst::info!(CAT, imp: imp, "Done sending"); + gst::debug!(CAT, imp: imp, "Done sending"); + } else { + gst::debug!(CAT, "Done sending"); } - ws_sink.close().await?; + let _ = ws_sink.close().await; - Ok::<(), Error>(()) + res.map_err(Into::into) }); let imp = self.downgrade(); diff --git a/net/webrtc/src/janusvr_signaller/imp.rs b/net/webrtc/src/janusvr_signaller/imp.rs index 56d6a458..5a68be54 100644 --- a/net/webrtc/src/janusvr_signaller/imp.rs +++ b/net/webrtc/src/janusvr_signaller/imp.rs @@ -278,14 +278,15 @@ impl Signaller { let (ws_sender, mut ws_receiver) = mpsc::channel::(1000); let send_task_handle = RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move { + let mut res = Ok(()); loop { tokio::select! { opt = ws_receiver.next() => match opt { Some(msg) => { gst::log!(CAT, "Sending websocket message {:?}", msg); - ws_sink + res = ws_sink .send(WsMessage::Text(serde_json::to_string(&msg).unwrap())) - .await?; + .await; }, None => break, }, @@ -301,22 +302,29 @@ impl Signaller { transaction, session_id, }); - ws_sink + res = ws_sink .send(WsMessage::Text(serde_json::to_string(&msg).unwrap())) - .await?; + .await; } } } + + if let Err(ref err) = res { + this.as_ref().map_or_else(|| gst::error!(CAT, "Quitting send task: {err}"), + |this| gst::error!(CAT, imp: this, "Quitting send task: {err}") + ); + + break; + } } - let msg = "Done sending"; - this.map_or_else(|| gst::info!(CAT, "{msg}"), - |this| gst::info!(CAT, imp: this, "{msg}") + this.map_or_else(|| gst::debug!(CAT, "Done sending"), + |this| gst::debug!(CAT, imp: this, "Done sending") ); - ws_sink.close().await?; + let _ = ws_sink.close().await; - Ok::<(), Error>(()) + res.map_err(Into::into) })); let recv_task_handle = diff --git a/net/webrtc/src/signaller/imp.rs b/net/webrtc/src/signaller/imp.rs index fb73c053..bda3b764 100644 --- a/net/webrtc/src/signaller/imp.rs +++ b/net/webrtc/src/signaller/imp.rs @@ -140,21 +140,28 @@ impl Signaller { let (websocket_sender, mut websocket_receiver) = mpsc::channel::(1000); let send_task_handle = RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move { + let mut res = Ok(()); while let Some(msg) = websocket_receiver.next().await { gst::log!(CAT, "Sending websocket message {:?}", msg); - ws_sink + res = ws_sink .send(WsMessage::Text(serde_json::to_string(&msg).unwrap())) - .await?; + .await; + + if let Err(ref err) = res { + this.as_ref().map_or_else(|| gst::error!(CAT, "Quitting send loop: {err}"), + |this| gst::error!(CAT, imp: this, "Quitting send loop: {err}") + ); + break; + } } - let msg = "Done sending"; - this.map_or_else(|| gst::info!(CAT, "{msg}"), - |this| gst::info!(CAT, imp: this, "{msg}") + this.map_or_else(|| gst::debug!(CAT, "Done sending"), + |this| gst::debug!(CAT, imp: this, "Done sending") ); - ws_sink.close().await?; + let _ = ws_sink.close().await; - Ok::<(), Error>(()) + res.map_err(Into::into) })); let obj = self.obj();