diff --git a/net/webrtc/signalling/src/server/mod.rs b/net/webrtc/signalling/src/server/mod.rs index 700ae4dd..3b17fd3d 100644 --- a/net/webrtc/signalling/src/server/mod.rs +++ b/net/webrtc/signalling/src/server/mod.rs @@ -10,7 +10,7 @@ use std::pin::Pin; use std::sync::{Arc, Mutex}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::task; -use tracing::{info, instrument, trace, warn}; +use tracing::{debug, error, info, instrument, trace, warn}; struct Peer { receive_task_handle: task::JoinHandle<()>, @@ -141,6 +141,7 @@ impl Server { let this_id_clone = this_id.clone(); let (mut ws_sink, mut ws_stream) = ws.split(); let send_task_handle = task::spawn(async move { + let mut res = Ok(()); loop { match tokio::time::timeout( std::time::Duration::from_secs(30), @@ -150,21 +151,28 @@ impl Server { { Ok(Some(msg)) => { trace!(this_id = %this_id_clone, "sending {}", msg); - ws_sink.send(WsMessage::Text(msg)).await?; + res = ws_sink.send(WsMessage::Text(msg)).await; } Ok(None) => { break; } Err(_) => { trace!(this_id = %this_id_clone, "timeout, sending ping"); - ws_sink.send(WsMessage::Ping(vec![])).await?; + res = ws_sink.send(WsMessage::Ping(vec![])).await; } } + + if let Err(ref err) = res { + error!(this_id = %this_id_clone, "Quitting send loop: {err}"); + break; + } } - ws_sink.close().await?; + debug!(this_id = %this_id_clone, "Done sending"); - Ok::<(), Error>(()) + let _ = ws_sink.close().await; + + res.map_err(Into::into) }); let mut tx = self.state.lock().unwrap().tx.clone(); diff --git a/net/webrtc/src/aws_kvs_signaller/imp.rs b/net/webrtc/src/aws_kvs_signaller/imp.rs index 7946e244..e46d0330 100644 --- a/net/webrtc/src/aws_kvs_signaller/imp.rs +++ b/net/webrtc/src/aws_kvs_signaller/imp.rs @@ -424,6 +424,7 @@ impl Signaller { let imp = self.downgrade(); let ping_timeout = settings.ping_timeout; let send_task_handle = task::spawn(async move { + let mut res = Ok(()); loop { match tokio::time::timeout( std::time::Duration::from_secs(ping_timeout as u64), @@ -440,26 +441,38 @@ impl Signaller { serde_json::to_string(&msg).unwrap() ); } - ws_sink + res = ws_sink .send(WsMessage::Text(serde_json::to_string(&msg).unwrap())) - .await?; + .await; } Ok(None) => { break; } Err(_) => { - ws_sink.send(WsMessage::Ping(vec![])).await?; + res = ws_sink.send(WsMessage::Ping(vec![])).await; } } + + if let Err(ref err) = res { + if let Some(imp) = imp.upgrade() { + gst::error!(CAT, imp: imp, "Quitting send loop: {err}"); + } else { + gst::error!(CAT, "Quitting send loop: {err}"); + } + + break; + } } if let Some(imp) = imp.upgrade() { - gst::info!(CAT, imp: imp, "Done sending"); + gst::debug!(CAT, imp: imp, "Done sending"); + } else { + gst::debug!(CAT, "Done sending"); } - ws_sink.close().await?; + let _ = ws_sink.close().await; - Ok::<(), Error>(()) + res.map_err(Into::into) }); let imp = self.downgrade(); diff --git a/net/webrtc/src/janusvr_signaller/imp.rs b/net/webrtc/src/janusvr_signaller/imp.rs index 56d6a458..5a68be54 100644 --- a/net/webrtc/src/janusvr_signaller/imp.rs +++ b/net/webrtc/src/janusvr_signaller/imp.rs @@ -278,14 +278,15 @@ impl Signaller { let (ws_sender, mut ws_receiver) = mpsc::channel::(1000); let send_task_handle = RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move { + let mut res = Ok(()); loop { tokio::select! { opt = ws_receiver.next() => match opt { Some(msg) => { gst::log!(CAT, "Sending websocket message {:?}", msg); - ws_sink + res = ws_sink .send(WsMessage::Text(serde_json::to_string(&msg).unwrap())) - .await?; + .await; }, None => break, }, @@ -301,22 +302,29 @@ impl Signaller { transaction, session_id, }); - ws_sink + res = ws_sink .send(WsMessage::Text(serde_json::to_string(&msg).unwrap())) - .await?; + .await; } } } + + if let Err(ref err) = res { + this.as_ref().map_or_else(|| gst::error!(CAT, "Quitting send task: {err}"), + |this| gst::error!(CAT, imp: this, "Quitting send task: {err}") + ); + + break; + } } - let msg = "Done sending"; - this.map_or_else(|| gst::info!(CAT, "{msg}"), - |this| gst::info!(CAT, imp: this, "{msg}") + this.map_or_else(|| gst::debug!(CAT, "Done sending"), + |this| gst::debug!(CAT, imp: this, "Done sending") ); - ws_sink.close().await?; + let _ = ws_sink.close().await; - Ok::<(), Error>(()) + res.map_err(Into::into) })); let recv_task_handle = diff --git a/net/webrtc/src/signaller/imp.rs b/net/webrtc/src/signaller/imp.rs index fb73c053..bda3b764 100644 --- a/net/webrtc/src/signaller/imp.rs +++ b/net/webrtc/src/signaller/imp.rs @@ -140,21 +140,28 @@ impl Signaller { let (websocket_sender, mut websocket_receiver) = mpsc::channel::(1000); let send_task_handle = RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move { + let mut res = Ok(()); while let Some(msg) = websocket_receiver.next().await { gst::log!(CAT, "Sending websocket message {:?}", msg); - ws_sink + res = ws_sink .send(WsMessage::Text(serde_json::to_string(&msg).unwrap())) - .await?; + .await; + + if let Err(ref err) = res { + this.as_ref().map_or_else(|| gst::error!(CAT, "Quitting send loop: {err}"), + |this| gst::error!(CAT, imp: this, "Quitting send loop: {err}") + ); + break; + } } - let msg = "Done sending"; - this.map_or_else(|| gst::info!(CAT, "{msg}"), - |this| gst::info!(CAT, imp: this, "{msg}") + this.map_or_else(|| gst::debug!(CAT, "Done sending"), + |this| gst::debug!(CAT, imp: this, "Done sending") ); - ws_sink.close().await?; + let _ = ws_sink.close().await; - Ok::<(), Error>(()) + res.map_err(Into::into) })); let obj = self.obj();