pleroma/test/pleroma/integration/mastodon_websocket_test.exs
Mark Felder d0f4b2b02f Remove invalid test
It is not allowed to use the Sec-WebSocket-Protocol header for arbitrary values. This was possible due to the raw websocket handling we were doing with Cowboy, but Phoenix.Socket.Transport does not allow this as the value of this header is compared against a static list of subprotocols.

https://hexdocs.pm/phoenix/Phoenix.Endpoint.html#socket/3-websocket-configuration

Additionally I cannot find anywhere that we depended on this behavior. Setting the Sec-WebSocket-Protocol header does not appear to be a part of PleromaFE.
2024-02-14 15:27:12 -05:00

486 lines
15 KiB
Elixir

# Pleroma: A lightweight social networking server
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Integration.MastodonWebsocketTest do
# Needs a streamer, needs to stay synchronous
use Pleroma.DataCase
import ExUnit.CaptureLog
import Pleroma.Factory
alias Pleroma.Integration.WebsocketClient
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.OAuth
@moduletag needs_streamer: true, capture_log: true
@path Pleroma.Web.Endpoint.url()
|> URI.parse()
|> Map.put(:scheme, "ws")
|> Map.put(:path, "/api/v1/streaming")
|> URI.to_string()
def start_socket(qs \\ nil, headers \\ []) do
path =
case qs do
nil -> @path
qs -> @path <> qs
end
WebsocketClient.start_link(self(), path, headers)
end
defp decode_json(json) do
with {:ok, %{"event" => event, "payload" => payload_text}} <- Jason.decode(json),
{:ok, payload} <- Jason.decode(payload_text) do
{:ok, %{"event" => event, "payload" => payload}}
end
end
# Turns atom keys to strings
defp atom_key_to_string(json) do
json
|> Jason.encode!()
|> Jason.decode!()
end
test "refuses invalid requests" do
capture_log(fn ->
assert {:error, %WebSockex.RequestError{code: 404}} = start_socket("?stream=ncjdk")
Process.sleep(30)
end)
end
test "requires authentication and a valid token for protected streams" do
capture_log(fn ->
assert {:error, %WebSockex.RequestError{code: 401}} =
start_socket("?stream=user&access_token=aaaaaaaaaaaa")
assert {:error, %WebSockex.RequestError{code: 401}} = start_socket("?stream=user")
Process.sleep(30)
end)
end
test "allows unified stream" do
assert {:ok, _} = start_socket()
end
test "allows public streams without authentication" do
assert {:ok, _} = start_socket("?stream=public")
assert {:ok, _} = start_socket("?stream=public:local")
assert {:ok, _} = start_socket("?stream=public:remote&instance=lain.com")
assert {:ok, _} = start_socket("?stream=hashtag&tag=lain")
end
test "receives well formatted events" do
user = insert(:user)
{:ok, _} = start_socket("?stream=public")
{:ok, activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
assert_receive {:text, raw_json}, 1_000
assert {:ok, json} = Jason.decode(raw_json)
assert "update" == json["event"]
assert json["payload"]
assert {:ok, json} = Jason.decode(json["payload"])
view_json =
Pleroma.Web.MastodonAPI.StatusView.render("show.json", activity: activity, for: nil)
|> atom_key_to_string()
assert json == view_json
end
describe "subscribing via WebSocket" do
test "can subscribe" do
user = insert(:user)
{:ok, pid} = start_socket()
WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{"type" => "subscribe", "result" => "success"}
}} = decode_json(raw_json)
{:ok, activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
assert_receive {:text, raw_json}, 1_000
assert {:ok, json} = Jason.decode(raw_json)
assert "update" == json["event"]
assert json["payload"]
assert {:ok, json} = Jason.decode(json["payload"])
view_json =
Pleroma.Web.MastodonAPI.StatusView.render("show.json", activity: activity, for: nil)
|> Jason.encode!()
|> Jason.decode!()
assert json == view_json
end
test "can subscribe to multiple streams" do
user = insert(:user)
{:ok, pid} = start_socket()
WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{"type" => "subscribe", "result" => "success"}
}} = decode_json(raw_json)
WebsocketClient.send_text(
pid,
%{type: "subscribe", stream: "hashtag", tag: "mew"} |> Jason.encode!()
)
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{"type" => "subscribe", "result" => "success"}
}} = decode_json(raw_json)
{:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber #mew"})
assert_receive {:text, raw_json}, 1_000
assert {:ok, %{"stream" => stream1}} = Jason.decode(raw_json)
assert_receive {:text, raw_json}, 1_000
assert {:ok, %{"stream" => stream2}} = Jason.decode(raw_json)
streams = [stream1, stream2]
assert ["hashtag", "mew"] in streams
assert ["public"] in streams
end
test "won't double subscribe" do
user = insert(:user)
{:ok, pid} = start_socket()
WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{"type" => "subscribe", "result" => "success"}
}} = decode_json(raw_json)
WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{"type" => "subscribe", "result" => "ignored"}
}} = decode_json(raw_json)
{:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
assert_receive {:text, _}, 1_000
refute_receive {:text, _}, 1_000
end
test "rejects invalid streams" do
{:ok, pid} = start_socket()
WebsocketClient.send_text(pid, %{type: "subscribe", stream: "nonsense"} |> Jason.encode!())
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{"type" => "subscribe", "result" => "error", "error" => "bad_topic"}
}} = decode_json(raw_json)
end
test "can unsubscribe" do
user = insert(:user)
{:ok, pid} = start_socket()
WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{"type" => "subscribe", "result" => "success"}
}} = decode_json(raw_json)
WebsocketClient.send_text(pid, %{type: "unsubscribe", stream: "public"} |> Jason.encode!())
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{"type" => "unsubscribe", "result" => "success"}
}} = decode_json(raw_json)
{:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
refute_receive {:text, _}, 1_000
end
end
describe "with a valid user token" do
setup do
{:ok, app} =
Pleroma.Repo.insert(
OAuth.App.register_changeset(%OAuth.App{}, %{
client_name: "client",
scopes: ["read"],
redirect_uris: "url"
})
)
user = insert(:user)
{:ok, auth} = OAuth.Authorization.create_authorization(app, user)
{:ok, token} = OAuth.Token.exchange_token(app, auth)
%{app: app, user: user, token: token}
end
test "accepts valid tokens", state do
assert {:ok, _} = start_socket("?stream=user&access_token=#{state.token.token}")
end
test "accepts the 'user' stream", %{token: token} = _state do
assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
capture_log(fn ->
assert {:error, %WebSockex.RequestError{code: 401}} = start_socket("?stream=user")
Process.sleep(30)
end)
end
test "accepts the 'user:notification' stream", %{token: token} = _state do
assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}")
capture_log(fn ->
assert {:error, %WebSockex.RequestError{code: 401}} =
start_socket("?stream=user:notification")
Process.sleep(30)
end)
end
test "accepts valid token on client-sent event", %{token: token} do
assert {:ok, pid} = start_socket()
WebsocketClient.send_text(
pid,
%{type: "pleroma:authenticate", token: token.token} |> Jason.encode!()
)
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{"type" => "pleroma:authenticate", "result" => "success"}
}} = decode_json(raw_json)
WebsocketClient.send_text(pid, %{type: "subscribe", stream: "user"} |> Jason.encode!())
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{"type" => "subscribe", "result" => "success"}
}} = decode_json(raw_json)
end
test "rejects invalid token on client-sent event" do
assert {:ok, pid} = start_socket()
WebsocketClient.send_text(
pid,
%{type: "pleroma:authenticate", token: "Something else"} |> Jason.encode!()
)
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{
"type" => "pleroma:authenticate",
"result" => "error",
"error" => "unauthorized"
}
}} = decode_json(raw_json)
end
test "rejects new authenticate request if already logged-in", %{token: token} do
assert {:ok, pid} = start_socket()
WebsocketClient.send_text(
pid,
%{type: "pleroma:authenticate", token: token.token} |> Jason.encode!()
)
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{"type" => "pleroma:authenticate", "result" => "success"}
}} = decode_json(raw_json)
WebsocketClient.send_text(
pid,
%{type: "pleroma:authenticate", token: "Something else"} |> Jason.encode!()
)
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{
"type" => "pleroma:authenticate",
"result" => "error",
"error" => "already_authenticated"
}
}} = decode_json(raw_json)
end
test "accepts the 'list' stream", %{token: token, user: user} do
posting_user = insert(:user)
{:ok, list} = Pleroma.List.create("test", user)
Pleroma.List.follow(list, posting_user)
assert {:ok, _} = start_socket("?stream=list&access_token=#{token.token}&list=#{list.id}")
assert {:ok, pid} = start_socket("?access_token=#{token.token}")
WebsocketClient.send_text(
pid,
%{type: "subscribe", stream: "list", list: list.id} |> Jason.encode!()
)
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{"type" => "subscribe", "result" => "success"}
}} = decode_json(raw_json)
WebsocketClient.send_text(
pid,
%{type: "subscribe", stream: "list", list: to_string(list.id)} |> Jason.encode!()
)
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "pleroma:respond",
"payload" => %{"type" => "subscribe", "result" => "ignored"}
}} = decode_json(raw_json)
end
test "disconnect when token is revoked", %{app: app, user: user, token: token} do
assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}")
assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
{:ok, auth} = OAuth.Authorization.create_authorization(app, user)
{:ok, token2} = OAuth.Token.exchange_token(app, auth)
assert {:ok, _} = start_socket("?stream=user&access_token=#{token2.token}")
OAuth.Token.Strategy.Revoke.revoke(token)
assert_receive {:close, _}
assert_receive {:close, _}
refute_receive {:close, _}
end
test "receives private statuses", %{user: reading_user, token: token} do
user = insert(:user)
CommonAPI.follow(reading_user, user)
{:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
{:ok, activity} =
CommonAPI.post(user, %{status: "nice echo chamber", visibility: "private"})
assert_receive {:text, raw_json}, 1_000
assert {:ok, json} = Jason.decode(raw_json)
assert "update" == json["event"]
assert json["payload"]
assert {:ok, json} = Jason.decode(json["payload"])
view_json =
Pleroma.Web.MastodonAPI.StatusView.render("show.json",
activity: activity,
for: reading_user
)
|> Jason.encode!()
|> Jason.decode!()
assert json == view_json
end
test "receives edits", %{user: reading_user, token: token} do
user = insert(:user)
CommonAPI.follow(reading_user, user)
{:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
{:ok, activity} =
CommonAPI.post(user, %{status: "nice echo chamber", visibility: "private"})
assert_receive {:text, _raw_json}, 1_000
{:ok, _} = CommonAPI.update(user, activity, %{status: "mew mew", visibility: "private"})
assert_receive {:text, raw_json}, 1_000
activity = Pleroma.Activity.normalize(activity)
view_json =
Pleroma.Web.MastodonAPI.StatusView.render("show.json",
activity: activity,
for: reading_user
)
|> Jason.encode!()
|> Jason.decode!()
assert {:ok, %{"event" => "status.update", "payload" => ^view_json}} = decode_json(raw_json)
end
test "receives notifications", %{user: reading_user, token: token} do
user = insert(:user)
CommonAPI.follow(reading_user, user)
{:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}")
{:ok, %Pleroma.Activity{id: activity_id} = _activity} =
CommonAPI.post(user, %{
status: "nice echo chamber @#{reading_user.nickname}",
visibility: "private"
})
assert_receive {:text, raw_json}, 1_000
assert {:ok,
%{
"event" => "notification",
"payload" => %{
"status" => %{
"id" => ^activity_id
}
}
}} = decode_json(raw_json)
end
end
end