gst-plugins-rs/net/webrtc/signalling/src/server/mod.rs
François Laignel f54d714afd webrtc: only use close() to close websockets
In the signaller clients and servers, the following sequence is used to close
the websocket (in the [send task]):

```rust
    ws_sink.send(WsMessage::Close(None)).await?;
    ws_sink.close().await?;
```

tungstenite's [`WebSocket::close()` doc] states:

> Calling this function is the same as calling `write(Message::Close(..))``

So we might think they are redundant and either could be used for this purpose
(`send()` calls `write()`, then `flush()`).

The result is actually is bit different as `write()` starts by checking the
state of the connection and [returns `SendAfterClosing`] if the socket is no
longer active, which is the case when a closing request has been received from
the peer via a [call to `do_close()`]). Note that `do_close()` also enqueues a
`Close` frame.

This behaviour is visible from the server's logs:

```
1. tungstenite::protocol: Received close frame: None
2. tungstenite::protocol: Replying to close with Frame { header: FrameHeader { .., opcode: Control(Close), .. }, payload: [] }
3. gst_plugin_webrtc_signalling::server: Received message Ok(Close(None))
4. gst_plugin_webrtc_signalling::server: connection closed: None this_id=cb13892f-b4d5-4d59-95e2-b3873a7bd319
5. remove_peer{peer_id="cb13892f-b4d5-4d59-95e2-b3873a7bd319"}: gst_plugin_webrtc_signalling::server: close time.busy=285µs time.idle=55.5µs
6. async_tungstenite: websocket start_send error: WebSocket protocol error: Sending after closing is not allowed
```

1: The server's websocket receives the peer's `Close(None)`.
2: `do_close()` enqueues a `Close` frame.
3: The incoming `Close(None)` is handled by the server.
4 & 5: perform session closing.
6: `ws_sink.send(WsMessage::Close(None))` attempts to `write()` while the ws
   is no longer active. The error causes an early return, which means that
   the enqueued `Close` frame is not flushed.

Depending on the peer's shutdown sequence, this can result in the following
error, which can bubble up as a `Message` on the application's bus:

```
ERROR: from element /GstPipeline:pipeline0/GstWebRTCSrc:webrtcsrc0: GStreamer encountered a general stream error.
Additional debug info:
net/webrtc/src/webrtcsrc/imp.rs(625): gstrswebrtc::webrtcsrc:👿:BaseWebRTCSrc::connect_signaller::{{closure}}::{{closure}} (): /GstPipeline:pipeline0/GstWebRTCSrc:webrtcsrc0:
Signalling error: Error receiving: WebSocket protocol error: Connection reset without closing handshake
```

On the other hand, [`close()` ensures the ws is active] before attempting to
write a `Close` frame. If it's not, it only flushes the stream.

Thus, when we want to be able to close the websocket and/or to honor the closing
handshake in response to the peer `Close` message, the `ws_sink.close()`
variant is preferable.

This can be verified in the resulting server's logs:

```
tungstenite::protocol: Received close frame: None
tungstenite::protocol: Replying to close with Frame { header: FrameHeader { is_final: true, rsv1: false, rsv2: false, rsv3: false, opcode: Control(Close), mask: None}, payload: [] }
gst_plugin_webrtc_signalling::server: Received message Ok(Close(None))
gst_plugin_webrtc_signalling::server: connection closed: None this_id=192ed7ff-3b9d-45c5-be66-872cbe67d190
remove_peer{peer_id="192ed7ff-3b9d-45c5-be66-872cbe67d190"}: gst_plugin_webrtc_signalling::server: close time.busy=22.7µs time.idle=37.4µs
tungstenite::protocol: Sending pong/close
```

We now get the notification `Sending pong/close` (the closing handshake) instead
of `websocket start_send error` from step 6 with previous variant.

The `Connection reset without closing handshake` was not observed after this
change.

[send task]: 63b568f4a0/net/webrtc/signalling/src/server/mod.rs (L165)
[`WebSocket::close()` doc]: https://docs.rs/tungstenite/0.21.0/tungstenite/protocol/struct.WebSocket.html#method.close
[returns `SendAfterClosing`]: 85463b264e/src/protocol/mod.rs (L437)
[call to `do_close()`]: 85463b264e/src/protocol/mod.rs (L601)
[`close()` ensures the ws is active]: 85463b264e/src/protocol/mod.rs (L531)

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1435>
2024-02-01 18:08:41 +01:00

234 lines
8 KiB
Rust

// SPDX-License-Identifier: MPL-2.0
use anyhow::Error;
use async_tungstenite::tungstenite::Message as WsMessage;
use futures::channel::mpsc;
use futures::prelude::*;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::task;
use tracing::{info, instrument, trace, warn};
struct Peer {
receive_task_handle: task::JoinHandle<()>,
send_task_handle: task::JoinHandle<Result<(), Error>>,
sender: mpsc::Sender<String>,
}
struct State {
tx: Option<mpsc::Sender<(String, Option<String>)>>,
peers: HashMap<String, Peer>,
}
#[derive(Clone)]
pub struct Server {
state: Arc<Mutex<State>>,
}
#[derive(thiserror::Error, Debug)]
pub enum ServerError {
#[error("error during handshake {0}")]
Handshake(#[from] async_tungstenite::tungstenite::Error),
#[error("error during TLS handshake {0}")]
TLSHandshake(#[from] tokio_native_tls::native_tls::Error),
#[error("timeout during TLS handshake {0}")]
TLSHandshakeTimeout(#[from] tokio::time::error::Elapsed),
}
impl Server {
#[instrument(level = "debug", skip(factory))]
pub fn spawn<
I: for<'a> Deserialize<'a>,
O: Serialize + std::fmt::Debug + Send + Sync,
Factory: FnOnce(Pin<Box<dyn Stream<Item = (String, Option<I>)> + Send>>) -> St,
St: Stream<Item = (String, O)>,
>(
factory: Factory,
) -> Self
where
O: Serialize + std::fmt::Debug,
St: Send + Unpin + 'static,
{
let (tx, rx) = mpsc::channel::<(String, Option<String>)>(1000);
let mut handler = factory(Box::pin(rx.filter_map(|(peer_id, msg)| async move {
if let Some(msg) = msg {
match serde_json::from_str::<I>(&msg) {
Ok(msg) => Some((peer_id, Some(msg))),
Err(err) => {
warn!("Failed to parse incoming message: {} ({})", err, msg);
None
}
}
} else {
Some((peer_id, None))
}
})));
let state = Arc::new(Mutex::new(State {
tx: Some(tx),
peers: HashMap::new(),
}));
let state_clone = state.clone();
task::spawn(async move {
while let Some((peer_id, msg)) = handler.next().await {
match serde_json::to_string(&msg) {
Ok(msg_str) => {
let sender = {
let mut state = state_clone.lock().unwrap();
if let Some(peer) = state.peers.get_mut(&peer_id) {
Some(peer.sender.clone())
} else {
None
}
};
if let Some(mut sender) = sender {
trace!("Sending {}", msg_str);
let _ = sender.send(msg_str).await;
}
}
Err(err) => {
warn!("Failed to serialize outgoing message: {}", err);
}
}
}
});
Self { state }
}
#[instrument(level = "debug", skip(state))]
fn remove_peer(state: Arc<Mutex<State>>, peer_id: &str) {
if let Some(mut peer) = state.lock().unwrap().peers.remove(peer_id) {
let peer_id = peer_id.to_string();
task::spawn(async move {
peer.sender.close_channel();
if let Err(err) = peer.send_task_handle.await {
trace!(peer_id = %peer_id, "Error while joining send task: {}", err);
}
if let Err(err) = peer.receive_task_handle.await {
trace!(peer_id = %peer_id, "Error while joining receive task: {}", err);
}
});
}
}
#[instrument(level = "debug", skip(self, stream))]
pub async fn accept_async<S: 'static>(&mut self, stream: S) -> Result<String, ServerError>
where
S: AsyncRead + AsyncWrite + Unpin + Send,
{
let ws = match async_tungstenite::tokio::accept_async(stream).await {
Ok(ws) => ws,
Err(err) => {
warn!("Error during the websocket handshake: {}", err);
return Err(ServerError::Handshake(err));
}
};
let this_id = uuid::Uuid::new_v4().to_string();
info!(this_id = %this_id, "New WebSocket connection");
// 1000 is completely arbitrary, we simply don't want infinite piling
// up of messages as with unbounded
let (websocket_sender, mut websocket_receiver) = mpsc::channel::<String>(1000);
let this_id_clone = this_id.clone();
let (mut ws_sink, mut ws_stream) = ws.split();
let send_task_handle = task::spawn(async move {
loop {
match tokio::time::timeout(
std::time::Duration::from_secs(30),
websocket_receiver.next(),
)
.await
{
Ok(Some(msg)) => {
trace!(this_id = %this_id_clone, "sending {}", msg);
ws_sink.send(WsMessage::Text(msg)).await?;
}
Ok(None) => {
break;
}
Err(_) => {
trace!(this_id = %this_id_clone, "timeout, sending ping");
ws_sink.send(WsMessage::Ping(vec![])).await?;
}
}
}
ws_sink.close().await?;
Ok::<(), Error>(())
});
let mut tx = self.state.lock().unwrap().tx.clone();
let this_id_clone = this_id.clone();
let state_clone = self.state.clone();
let receive_task_handle = task::spawn(async move {
if let Some(tx) = tx.as_mut() {
if let Err(err) = tx
.send((
this_id_clone.clone(),
Some(
serde_json::json!({
"type": "newPeer",
})
.to_string(),
),
))
.await
{
warn!(this = %this_id_clone, "Error handling message: {:?}", err);
}
}
while let Some(msg) = ws_stream.next().await {
info!("Received message {msg:?}");
match msg {
Ok(WsMessage::Text(msg)) => {
if let Some(tx) = tx.as_mut() {
if let Err(err) = tx.send((this_id_clone.clone(), Some(msg))).await {
warn!(this = %this_id_clone, "Error handling message: {:?}", err);
}
}
}
Ok(WsMessage::Close(reason)) => {
info!(this_id = %this_id_clone, "connection closed: {:?}", reason);
break;
}
Ok(WsMessage::Pong(_)) => {
continue;
}
Ok(_) => warn!(this_id = %this_id_clone, "Unsupported message type"),
Err(err) => {
warn!(this_id = %this_id_clone, "recv error: {}", err);
break;
}
}
}
if let Some(tx) = tx.as_mut() {
let _ = tx.send((this_id_clone.clone(), None)).await;
}
Self::remove_peer(state_clone, &this_id_clone);
});
self.state.lock().unwrap().peers.insert(
this_id.clone(),
Peer {
receive_task_handle,
send_task_handle,
sender: websocket_sender,
},
);
Ok(this_id)
}
}