use presence latest changes

This commit is contained in:
Berenice Medel 2022-03-21 12:09:27 -06:00
parent e051dcc943
commit 694daec4da
6 changed files with 35 additions and 223 deletions

View file

@ -22,11 +22,6 @@ defmodule LiveBeats.Application do
{Phoenix.PubSub, name: LiveBeats.PubSub},
# start presence
LiveBeatsWeb.Presence,
{Phoenix.Presence.Client,
client: LiveBeats.PresenceClient,
pubsub: LiveBeats.PubSub,
presence: LiveBeatsWeb.Presence,
name: PresenceClient},
# Start the Endpoint (http/https)
LiveBeatsWeb.Endpoint,
# Expire songs every six hours

View file

@ -1,202 +0,0 @@
defmodule Phoenix.Presence.Client do
use GenServer
@callback init(state :: term) :: {:ok, new_state :: term}
@callback handle_join(topic :: String.t(), key :: String.t(), meta :: [map()], state :: term) ::
{:ok, term}
@callback handle_leave(topic :: String.t(), key :: String.t(), meta :: [map()], state :: term) ::
{:ok, term}
@doc """
TODO
## Options
* `:pubsub` - The required name of the pubsub server
* `:presence` - The required name of the presence module
* `:client` - The required callback module
"""
def start_link(opts) do
case Keyword.fetch(opts, :name) do
{:ok, name} ->
GenServer.start_link(__MODULE__, opts, name: name)
:error ->
GenServer.start_link(__MODULE__, opts)
end
end
def track(pid \\ PresenceClient, topic, key, meta) do
GenServer.call(pid, {:track, self(), topic, to_string(key), meta})
end
def untrack(pid \\ PresenceClient, topic, key) do
GenServer.call(pid, {:untrack, self(), topic, to_string(key)})
end
def init(opts) do
client = Keyword.fetch!(opts, :client)
{:ok, client_state} = client.init(%{})
state = %{
topics: %{},
client: client,
pubsub: Keyword.fetch!(opts, :pubsub),
presence_mod: Keyword.fetch!(opts, :presence),
client_state: client_state
}
{:ok, state}
end
def handle_info(%{topic: topic, event: "presence_diff", payload: diff}, state) do
{:noreply, merge_diff(state, topic, diff)}
end
def handle_call(:state, _from, state) do
{:reply, state, state}
end
def handle_call({:track, pid, topic, key, meta}, _from, state) do
{:reply, :ok, track_pid(state, pid, topic, key, meta)}
end
def handle_call({:untrack, pid, topic, key}, _from, state) do
{:reply, :ok, untrack_pid(state, pid, topic, key)}
end
defp track_pid(state, pid, topic, key, meta) do
# presences are handled when the presence_diff event is received
case Map.fetch(state.topics, topic) do
{:ok, _topic_content} ->
state.presence_mod.track(pid, topic, key, meta)
state
:error ->
# subscribe to topic we weren't yet tracking
Phoenix.PubSub.subscribe(state.pubsub, topic)
state.presence_mod.track(pid, topic, key, meta)
state
end
end
defp untrack_pid(state, pid, topic, key) do
if Map.has_key?(state.topics, topic) do
state.presence_mod.untrack(pid, topic, key)
end
state
end
defp merge_diff(state, topic, %{leaves: leaves, joins: joins}) do
# add new topic if needed
updated_state =
if Map.has_key?(state.topics, topic) do
state
else
put_new_topic(state, topic)
end
# merge diff into state.topics
{updated_state, _topic} = Enum.reduce(joins, {updated_state, topic}, &handle_join/2)
{updated_state, _topic} = Enum.reduce(leaves, {updated_state, topic}, &handle_leave/2)
# if no more presences for given topic, unsubscribe and remove topic
if topic_presences_count(updated_state, topic) == 0 do
Phoenix.PubSub.unsubscribe(state.pubsub, topic)
remove_topic(updated_state, topic)
else
updated_state
end
end
defp handle_join({joined_key, presence}, {state, topic}) do
joined_metas = Map.get(presence, :metas, [])
{updated_state, new_metas} = add_new_presence_or_metas(state, topic, joined_key, joined_metas)
new_presence = Map.put(presence, :metas, new_metas)
{:ok, updated_client_state} =
state.client.handle_join(topic, joined_key, new_presence, state.client_state)
updated_state = Map.put(updated_state, :client_state, updated_client_state)
{updated_state, topic}
end
defp handle_leave({left_key, presence}, {state, topic}) do
{updated_state, new_metas} = remove_presence_or_metas(state, topic, left_key, presence)
new_presence = Map.put(presence, :metas, new_metas)
{:ok, updated_client_state} =
state.client.handle_leave(topic, left_key, new_presence, state.client_state)
updated_state = Map.put(updated_state, :client_state, updated_client_state)
{updated_state, topic}
end
defp put_new_topic(%{topics: topics} = state, topic) do
updated_topics = Map.put_new(topics, topic, %{})
Map.put(state, :topics, updated_topics)
end
defp remove_topic(%{topics: topics} = state, topic) do
updated_topics = Map.delete(topics, topic)
Map.put(state, :topics, updated_topics)
end
defp add_new_presence_or_metas(
%{topics: topics} = state,
topic,
key,
new_metas
) do
topic_info = topics[topic]
{updated_topic, updated_metas} =
case Map.fetch(topic_info, key) do
# existing presence, add new metas
{:ok, existing_metas} ->
remaining_metas = new_metas -- existing_metas
updated_metas = existing_metas ++ remaining_metas
{Map.put(topic_info, key, updated_metas), updated_metas}
:error ->
# there are no presences for that key
{Map.put(topic_info, key, new_metas), new_metas}
end
updated_topics = Map.put(topics, topic, updated_topic)
{Map.put(state, :topics, updated_topics), updated_metas}
end
defp remove_presence_or_metas(
%{topics: topics} = state,
topic,
key,
deleted_metas
) do
topic_info = topics[topic]
state_metas = Map.get(topic_info, key, [])
remaining_metas = state_metas -- Map.get(deleted_metas, :metas, [])
updated_topic =
case remaining_metas do
# delete presence
[] -> Map.delete(topic_info, key)
# delete metas
_ -> Map.put(topic_info, key, remaining_metas)
end
updated_topics = Map.put(topics, topic, updated_topic)
{Map.put(state, :topics, updated_topics), remaining_metas}
end
defp topic_presences_count(state, topic) do
map_size(state.topics[topic])
end
end

View file

@ -1,5 +1,4 @@
defmodule LiveBeats.PresenceClient do
@behaviour Phoenix.Presence.Client
@presence LiveBeatsWeb.Presence
@pubsub LiveBeats.PubSub
@ -7,7 +6,8 @@ defmodule LiveBeats.PresenceClient do
alias LiveBeats.MediaLibrary
def track(%MediaLibrary.Profile{} = profile, current_user_id) do
Phoenix.Presence.Client.track(
@presence.track(
self(),
"proxy:" <> topic(profile),
current_user_id,
%{}
@ -15,7 +15,8 @@ defmodule LiveBeats.PresenceClient do
end
def untrack(%MediaLibrary.Profile{} = profile, current_user_id) do
Phoenix.Presence.Client.untrack(
@presence.untrack(
self(),
"proxy:" <> topic(profile),
current_user_id
)
@ -33,21 +34,29 @@ defmodule LiveBeats.PresenceClient do
@presence.list(topic)
end
@impl Phoenix.Presence.Client
def init(_opts) do
# user-land state
{:ok, %{}}
end
@impl Phoenix.Presence.Client
def handle_join(topic, _key, presence, state) do
local_broadcast(topic, {__MODULE__, %{user_joined: presence}})
{:ok, state}
end
def handle_metas(topic, %{joins: joins, leaves: leaves}, presences, state) do
for {user_id, presence} <- joins do
user_data = %{user: presence.user, metas: Map.fetch!(presences, user_id)}
local_broadcast(topic, {LiveBeats.PresenceClient, %{user_joined: user_data}})
end
for {user_id, presence} <- leaves do
metas =
case Map.fetch(presences, user_id) do
{:ok, presence_metas} -> presence_metas
:error -> []
end
user_data = %{user: presence.user, metas: metas}
local_broadcast(topic, {LiveBeats.PresenceClient, %{user_left: user_data}})
end
@impl Phoenix.Presence.Client
def handle_leave(topic, _key, presence, state) do
local_broadcast(topic, {__MODULE__, %{user_left: presence}})
{:ok, state}
end

View file

@ -7,7 +7,9 @@ defmodule LiveBeatsWeb.Presence do
"""
use Phoenix.Presence,
otp_app: :live_beats,
pubsub_server: LiveBeats.PubSub
pubsub_server: LiveBeats.PubSub,
presence: __MODULE__
import Phoenix.LiveView.Helpers
import LiveBeatsWeb.LiveHelpers
@ -15,6 +17,14 @@ defmodule LiveBeatsWeb.Presence do
alias LiveBeats.{Accounts, MediaLibrary}
alias LiveBeatsWeb.Presence.BadgeComponent
def init(state) do
LiveBeats.PresenceClient.init(state)
end
def handle_metas(topic, presences_diff, presences, state) do
LiveBeats.PresenceClient.handle_metas(topic, presences_diff, presences, state)
end
def subscribe(%MediaLibrary.Profile{} = profile) do
LiveBeats.PresenceClient.subscribe(profile)
end

View file

@ -33,7 +33,7 @@ defmodule LiveBeats.MixProject do
# Type `mix help deps` for examples and options.
defp deps do
[
{:phoenix, github: "phoenixframework/phoenix", override: true},
{:phoenix, github: "bemesa21/phoenix", branch: "presence_client", override: true},
{:phoenix_ecto, "~> 4.4"},
{:ecto_sql, "~> 3.6"},
{:ecto_network, "~> 1.3.0"},

View file

@ -22,13 +22,13 @@
"libcluster": {:hex, :libcluster, "3.3.1", "e7a4875cd1290cee7a693d6bd46076863e9e433708b01339783de6eff5b7f0aa", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "b575ca63c1cd84e01f3fa0fc45e6eb945c1ee7ae8d441d33def999075e9e5398"},
"mime": {:hex, :mime, "2.0.2", "0b9e1a4c840eafb68d820b0e2158ef5c49385d17fb36855ac6e7e087d4b1dcc5", [:mix], [], "hexpm", "e6a3f76b4c277739e36c2e21a2c640778ba4c3846189d5ab19f97f126df5f9b7"},
"mint": {:hex, :mint, "1.3.0", "396b3301102f7b775e103da5a20494b25753aed818d6d6f0ad222a3a018c3600", [:mix], [{:castore, "~> 0.1.0", [hex: :castore, repo: "hexpm", optional: true]}], "hexpm", "a9aac960562e43ca69a77e5176576abfa78b8398cec5543dd4fb4ab0131d5c1e"},
"phoenix": {:git, "https://github.com/phoenixframework/phoenix.git", "8d2b33ac9691bd624ede602088d213f89600d233", []},
"phoenix": {:git, "https://github.com/bemesa21/phoenix.git", "0d34e030ca29cc4eae621ece1f3417e7c04eda76", [branch: "presence_client"]},
"phoenix_ecto": {:hex, :phoenix_ecto, "4.4.0", "0672ed4e4808b3fbed494dded89958e22fb882de47a97634c0b13e7b0b5f7720", [:mix], [{:ecto, "~> 3.3", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.14.2 or ~> 3.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:plug, "~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "09864e558ed31ee00bd48fcc1d4fc58ae9678c9e81649075431e69dbabb43cc1"},
"phoenix_html": {:hex, :phoenix_html, "3.2.0", "1c1219d4b6cb22ac72f12f73dc5fad6c7563104d083f711c3fcd8551a1f4ae11", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "36ec97ba56d25c0136ef1992c37957e4246b649d620958a1f9fa86165f8bc54f"},
"phoenix_live_dashboard": {:hex, :phoenix_live_dashboard, "0.6.2", "0769470265eb13af01b5001b29cb935f4710d6adaa1ffc18417a570a337a2f0f", [:mix], [{:ecto, "~> 3.6.2 or ~> 3.7", [hex: :ecto, repo: "hexpm", optional: true]}, {:ecto_mysql_extras, "~> 0.3", [hex: :ecto_mysql_extras, repo: "hexpm", optional: true]}, {:ecto_psql_extras, "~> 0.7", [hex: :ecto_psql_extras, repo: "hexpm", optional: true]}, {:mime, "~> 1.6 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:phoenix_live_view, "~> 0.17.1", [hex: :phoenix_live_view, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "5bc6c6b38a2ca8b5020b442322fcee6afd5e641637a0b1fb059d4bd89bc58e7b"},
"phoenix_live_reload": {:hex, :phoenix_live_reload, "1.3.3", "3a53772a6118d5679bf50fc1670505a290e32a1d195df9e069d8c53ab040c054", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}, {:phoenix, "~> 1.4", [hex: :phoenix, repo: "hexpm", optional: false]}], "hexpm", "766796676e5f558dbae5d1bdb066849673e956005e3730dfd5affd7a6da4abac"},
"phoenix_live_view": {:git, "https://github.com/phoenixframework/phoenix_live_view.git", "1ae64fb69b0951f2f67f01d369ec9a4006060001", []},
"phoenix_pubsub": {:hex, :phoenix_pubsub, "2.0.0", "a1ae76717bb168cdeb10ec9d92d1480fec99e3080f011402c0a2d68d47395ffb", [:mix], [], "hexpm", "c52d948c4f261577b9c6fa804be91884b381a7f8f18450c5045975435350f771"},
"phoenix_pubsub": {:git, "https://github.com/phoenixframework/phoenix_pubsub.git", "3a286c69bdd6df5a5854cb0ad12ee5a1aa6087bd", []},
"phoenix_view": {:hex, :phoenix_view, "1.0.0", "fea71ecaaed71178b26dd65c401607de5ec22e2e9ef141389c721b3f3d4d8011", [:mix], [{:phoenix_html, "~> 2.14.2 or ~> 3.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}], "hexpm", "82be3e2516f5633220246e2e58181282c71640dab7afc04f70ad94253025db0c"},
"plug": {:hex, :plug, "1.13.4", "addb6e125347226e3b11489e23d22a60f7ab74786befb86c14f94fb5f23ca9a4", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "06114c1f2a334212fe3ae567dbb3b1d29fd492c1a09783d52f3d489c1a6f4cf2"},
"plug_cowboy": {:hex, :plug_cowboy, "2.5.2", "62894ccd601cf9597e2c23911ff12798a8a18d237e9739f58a6b04e4988899fe", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "ea6e87f774c8608d60c8d34022a7d073bd7680a0a013f049fc62bf35efea1044"},