From 694daec4da42b1df2028cb427b24849f20b93ba6 Mon Sep 17 00:00:00 2001 From: Berenice Medel Date: Mon, 21 Mar 2022 12:09:27 -0600 Subject: [PATCH] use presence latest changes --- lib/live_beats/application.ex | 5 - .../presence/phoenix_presence_client.ex | 202 ------------------ lib/live_beats/presence/presence_client.ex | 33 +-- lib/live_beats_web/channels/presence.ex | 12 +- mix.exs | 2 +- mix.lock | 4 +- 6 files changed, 35 insertions(+), 223 deletions(-) delete mode 100644 lib/live_beats/presence/phoenix_presence_client.ex diff --git a/lib/live_beats/application.ex b/lib/live_beats/application.ex index 4a2b3da..261461a 100644 --- a/lib/live_beats/application.ex +++ b/lib/live_beats/application.ex @@ -22,11 +22,6 @@ defmodule LiveBeats.Application do {Phoenix.PubSub, name: LiveBeats.PubSub}, # start presence LiveBeatsWeb.Presence, - {Phoenix.Presence.Client, - client: LiveBeats.PresenceClient, - pubsub: LiveBeats.PubSub, - presence: LiveBeatsWeb.Presence, - name: PresenceClient}, # Start the Endpoint (http/https) LiveBeatsWeb.Endpoint, # Expire songs every six hours diff --git a/lib/live_beats/presence/phoenix_presence_client.ex b/lib/live_beats/presence/phoenix_presence_client.ex deleted file mode 100644 index 542e4a4..0000000 --- a/lib/live_beats/presence/phoenix_presence_client.ex +++ /dev/null @@ -1,202 +0,0 @@ -defmodule Phoenix.Presence.Client do - use GenServer - - @callback init(state :: term) :: {:ok, new_state :: term} - @callback handle_join(topic :: String.t(), key :: String.t(), meta :: [map()], state :: term) :: - {:ok, term} - @callback handle_leave(topic :: String.t(), key :: String.t(), meta :: [map()], state :: term) :: - {:ok, term} - - @doc """ - TODO - - ## Options - - * `:pubsub` - The required name of the pubsub server - * `:presence` - The required name of the presence module - * `:client` - The required callback module - """ - def start_link(opts) do - case Keyword.fetch(opts, :name) do - {:ok, name} -> - GenServer.start_link(__MODULE__, opts, name: name) - - :error -> - GenServer.start_link(__MODULE__, opts) - end - end - - def track(pid \\ PresenceClient, topic, key, meta) do - GenServer.call(pid, {:track, self(), topic, to_string(key), meta}) - end - - def untrack(pid \\ PresenceClient, topic, key) do - GenServer.call(pid, {:untrack, self(), topic, to_string(key)}) - end - - def init(opts) do - client = Keyword.fetch!(opts, :client) - {:ok, client_state} = client.init(%{}) - - state = %{ - topics: %{}, - client: client, - pubsub: Keyword.fetch!(opts, :pubsub), - presence_mod: Keyword.fetch!(opts, :presence), - client_state: client_state - } - - {:ok, state} - end - - def handle_info(%{topic: topic, event: "presence_diff", payload: diff}, state) do - {:noreply, merge_diff(state, topic, diff)} - end - - def handle_call(:state, _from, state) do - {:reply, state, state} - end - - def handle_call({:track, pid, topic, key, meta}, _from, state) do - {:reply, :ok, track_pid(state, pid, topic, key, meta)} - end - - def handle_call({:untrack, pid, topic, key}, _from, state) do - {:reply, :ok, untrack_pid(state, pid, topic, key)} - end - - defp track_pid(state, pid, topic, key, meta) do - # presences are handled when the presence_diff event is received - case Map.fetch(state.topics, topic) do - {:ok, _topic_content} -> - state.presence_mod.track(pid, topic, key, meta) - state - - :error -> - # subscribe to topic we weren't yet tracking - Phoenix.PubSub.subscribe(state.pubsub, topic) - state.presence_mod.track(pid, topic, key, meta) - state - end - end - - defp untrack_pid(state, pid, topic, key) do - if Map.has_key?(state.topics, topic) do - state.presence_mod.untrack(pid, topic, key) - end - - state - end - - defp merge_diff(state, topic, %{leaves: leaves, joins: joins}) do - # add new topic if needed - updated_state = - if Map.has_key?(state.topics, topic) do - state - else - put_new_topic(state, topic) - end - - # merge diff into state.topics - {updated_state, _topic} = Enum.reduce(joins, {updated_state, topic}, &handle_join/2) - {updated_state, _topic} = Enum.reduce(leaves, {updated_state, topic}, &handle_leave/2) - - # if no more presences for given topic, unsubscribe and remove topic - if topic_presences_count(updated_state, topic) == 0 do - Phoenix.PubSub.unsubscribe(state.pubsub, topic) - remove_topic(updated_state, topic) - else - updated_state - end - end - - defp handle_join({joined_key, presence}, {state, topic}) do - joined_metas = Map.get(presence, :metas, []) - - {updated_state, new_metas} = add_new_presence_or_metas(state, topic, joined_key, joined_metas) - new_presence = Map.put(presence, :metas, new_metas) - - {:ok, updated_client_state} = - state.client.handle_join(topic, joined_key, new_presence, state.client_state) - - updated_state = Map.put(updated_state, :client_state, updated_client_state) - - {updated_state, topic} - end - - defp handle_leave({left_key, presence}, {state, topic}) do - {updated_state, new_metas} = remove_presence_or_metas(state, topic, left_key, presence) - new_presence = Map.put(presence, :metas, new_metas) - - {:ok, updated_client_state} = - state.client.handle_leave(topic, left_key, new_presence, state.client_state) - - updated_state = Map.put(updated_state, :client_state, updated_client_state) - - {updated_state, topic} - end - - defp put_new_topic(%{topics: topics} = state, topic) do - updated_topics = Map.put_new(topics, topic, %{}) - Map.put(state, :topics, updated_topics) - end - - defp remove_topic(%{topics: topics} = state, topic) do - updated_topics = Map.delete(topics, topic) - Map.put(state, :topics, updated_topics) - end - - defp add_new_presence_or_metas( - %{topics: topics} = state, - topic, - key, - new_metas - ) do - topic_info = topics[topic] - - {updated_topic, updated_metas} = - case Map.fetch(topic_info, key) do - # existing presence, add new metas - {:ok, existing_metas} -> - remaining_metas = new_metas -- existing_metas - updated_metas = existing_metas ++ remaining_metas - {Map.put(topic_info, key, updated_metas), updated_metas} - - :error -> - # there are no presences for that key - {Map.put(topic_info, key, new_metas), new_metas} - end - - updated_topics = Map.put(topics, topic, updated_topic) - - {Map.put(state, :topics, updated_topics), updated_metas} - end - - defp remove_presence_or_metas( - %{topics: topics} = state, - topic, - key, - deleted_metas - ) do - topic_info = topics[topic] - - state_metas = Map.get(topic_info, key, []) - remaining_metas = state_metas -- Map.get(deleted_metas, :metas, []) - - updated_topic = - case remaining_metas do - # delete presence - [] -> Map.delete(topic_info, key) - # delete metas - _ -> Map.put(topic_info, key, remaining_metas) - end - - updated_topics = Map.put(topics, topic, updated_topic) - - {Map.put(state, :topics, updated_topics), remaining_metas} - end - - defp topic_presences_count(state, topic) do - map_size(state.topics[topic]) - end -end diff --git a/lib/live_beats/presence/presence_client.ex b/lib/live_beats/presence/presence_client.ex index e72c2a2..4ef0087 100644 --- a/lib/live_beats/presence/presence_client.ex +++ b/lib/live_beats/presence/presence_client.ex @@ -1,5 +1,4 @@ defmodule LiveBeats.PresenceClient do - @behaviour Phoenix.Presence.Client @presence LiveBeatsWeb.Presence @pubsub LiveBeats.PubSub @@ -7,7 +6,8 @@ defmodule LiveBeats.PresenceClient do alias LiveBeats.MediaLibrary def track(%MediaLibrary.Profile{} = profile, current_user_id) do - Phoenix.Presence.Client.track( + @presence.track( + self(), "proxy:" <> topic(profile), current_user_id, %{} @@ -15,7 +15,8 @@ defmodule LiveBeats.PresenceClient do end def untrack(%MediaLibrary.Profile{} = profile, current_user_id) do - Phoenix.Presence.Client.untrack( + @presence.untrack( + self(), "proxy:" <> topic(profile), current_user_id ) @@ -33,21 +34,29 @@ defmodule LiveBeats.PresenceClient do @presence.list(topic) end - @impl Phoenix.Presence.Client def init(_opts) do # user-land state {:ok, %{}} end - @impl Phoenix.Presence.Client - def handle_join(topic, _key, presence, state) do - local_broadcast(topic, {__MODULE__, %{user_joined: presence}}) - {:ok, state} - end + def handle_metas(topic, %{joins: joins, leaves: leaves}, presences, state) do + for {user_id, presence} <- joins do + user_data = %{user: presence.user, metas: Map.fetch!(presences, user_id)} + local_broadcast(topic, {LiveBeats.PresenceClient, %{user_joined: user_data}}) + end + + for {user_id, presence} <- leaves do + metas = + case Map.fetch(presences, user_id) do + {:ok, presence_metas} -> presence_metas + :error -> [] + end + + user_data = %{user: presence.user, metas: metas} + + local_broadcast(topic, {LiveBeats.PresenceClient, %{user_left: user_data}}) + end - @impl Phoenix.Presence.Client - def handle_leave(topic, _key, presence, state) do - local_broadcast(topic, {__MODULE__, %{user_left: presence}}) {:ok, state} end diff --git a/lib/live_beats_web/channels/presence.ex b/lib/live_beats_web/channels/presence.ex index 42f9c24..74097a6 100644 --- a/lib/live_beats_web/channels/presence.ex +++ b/lib/live_beats_web/channels/presence.ex @@ -7,7 +7,9 @@ defmodule LiveBeatsWeb.Presence do """ use Phoenix.Presence, otp_app: :live_beats, - pubsub_server: LiveBeats.PubSub + pubsub_server: LiveBeats.PubSub, + presence: __MODULE__ + import Phoenix.LiveView.Helpers import LiveBeatsWeb.LiveHelpers @@ -15,6 +17,14 @@ defmodule LiveBeatsWeb.Presence do alias LiveBeats.{Accounts, MediaLibrary} alias LiveBeatsWeb.Presence.BadgeComponent + def init(state) do + LiveBeats.PresenceClient.init(state) + end + + def handle_metas(topic, presences_diff, presences, state) do + LiveBeats.PresenceClient.handle_metas(topic, presences_diff, presences, state) + end + def subscribe(%MediaLibrary.Profile{} = profile) do LiveBeats.PresenceClient.subscribe(profile) end diff --git a/mix.exs b/mix.exs index 4790288..773001d 100644 --- a/mix.exs +++ b/mix.exs @@ -33,7 +33,7 @@ defmodule LiveBeats.MixProject do # Type `mix help deps` for examples and options. defp deps do [ - {:phoenix, github: "phoenixframework/phoenix", override: true}, + {:phoenix, github: "bemesa21/phoenix", branch: "presence_client", override: true}, {:phoenix_ecto, "~> 4.4"}, {:ecto_sql, "~> 3.6"}, {:ecto_network, "~> 1.3.0"}, diff --git a/mix.lock b/mix.lock index fa1528e..cae19dc 100644 --- a/mix.lock +++ b/mix.lock @@ -22,13 +22,13 @@ "libcluster": {:hex, :libcluster, "3.3.1", "e7a4875cd1290cee7a693d6bd46076863e9e433708b01339783de6eff5b7f0aa", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "b575ca63c1cd84e01f3fa0fc45e6eb945c1ee7ae8d441d33def999075e9e5398"}, "mime": {:hex, :mime, "2.0.2", "0b9e1a4c840eafb68d820b0e2158ef5c49385d17fb36855ac6e7e087d4b1dcc5", [:mix], [], "hexpm", "e6a3f76b4c277739e36c2e21a2c640778ba4c3846189d5ab19f97f126df5f9b7"}, "mint": {:hex, :mint, "1.3.0", "396b3301102f7b775e103da5a20494b25753aed818d6d6f0ad222a3a018c3600", [:mix], [{:castore, "~> 0.1.0", [hex: :castore, repo: "hexpm", optional: true]}], "hexpm", "a9aac960562e43ca69a77e5176576abfa78b8398cec5543dd4fb4ab0131d5c1e"}, - "phoenix": {:git, "https://github.com/phoenixframework/phoenix.git", "8d2b33ac9691bd624ede602088d213f89600d233", []}, + "phoenix": {:git, "https://github.com/bemesa21/phoenix.git", "0d34e030ca29cc4eae621ece1f3417e7c04eda76", [branch: "presence_client"]}, "phoenix_ecto": {:hex, :phoenix_ecto, "4.4.0", "0672ed4e4808b3fbed494dded89958e22fb882de47a97634c0b13e7b0b5f7720", [:mix], [{:ecto, "~> 3.3", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.14.2 or ~> 3.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:plug, "~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "09864e558ed31ee00bd48fcc1d4fc58ae9678c9e81649075431e69dbabb43cc1"}, "phoenix_html": {:hex, :phoenix_html, "3.2.0", "1c1219d4b6cb22ac72f12f73dc5fad6c7563104d083f711c3fcd8551a1f4ae11", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "36ec97ba56d25c0136ef1992c37957e4246b649d620958a1f9fa86165f8bc54f"}, "phoenix_live_dashboard": {:hex, :phoenix_live_dashboard, "0.6.2", "0769470265eb13af01b5001b29cb935f4710d6adaa1ffc18417a570a337a2f0f", [:mix], [{:ecto, "~> 3.6.2 or ~> 3.7", [hex: :ecto, repo: "hexpm", optional: true]}, {:ecto_mysql_extras, "~> 0.3", [hex: :ecto_mysql_extras, repo: "hexpm", optional: true]}, {:ecto_psql_extras, "~> 0.7", [hex: :ecto_psql_extras, repo: "hexpm", optional: true]}, {:mime, "~> 1.6 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:phoenix_live_view, "~> 0.17.1", [hex: :phoenix_live_view, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "5bc6c6b38a2ca8b5020b442322fcee6afd5e641637a0b1fb059d4bd89bc58e7b"}, "phoenix_live_reload": {:hex, :phoenix_live_reload, "1.3.3", "3a53772a6118d5679bf50fc1670505a290e32a1d195df9e069d8c53ab040c054", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}, {:phoenix, "~> 1.4", [hex: :phoenix, repo: "hexpm", optional: false]}], "hexpm", "766796676e5f558dbae5d1bdb066849673e956005e3730dfd5affd7a6da4abac"}, "phoenix_live_view": {:git, "https://github.com/phoenixframework/phoenix_live_view.git", "1ae64fb69b0951f2f67f01d369ec9a4006060001", []}, - "phoenix_pubsub": {:hex, :phoenix_pubsub, "2.0.0", "a1ae76717bb168cdeb10ec9d92d1480fec99e3080f011402c0a2d68d47395ffb", [:mix], [], "hexpm", "c52d948c4f261577b9c6fa804be91884b381a7f8f18450c5045975435350f771"}, + "phoenix_pubsub": {:git, "https://github.com/phoenixframework/phoenix_pubsub.git", "3a286c69bdd6df5a5854cb0ad12ee5a1aa6087bd", []}, "phoenix_view": {:hex, :phoenix_view, "1.0.0", "fea71ecaaed71178b26dd65c401607de5ec22e2e9ef141389c721b3f3d4d8011", [:mix], [{:phoenix_html, "~> 2.14.2 or ~> 3.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}], "hexpm", "82be3e2516f5633220246e2e58181282c71640dab7afc04f70ad94253025db0c"}, "plug": {:hex, :plug, "1.13.4", "addb6e125347226e3b11489e23d22a60f7ab74786befb86c14f94fb5f23ca9a4", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "06114c1f2a334212fe3ae567dbb3b1d29fd492c1a09783d52f3d489c1a6f4cf2"}, "plug_cowboy": {:hex, :plug_cowboy, "2.5.2", "62894ccd601cf9597e2c23911ff12798a8a18d237e9739f58a6b04e4988899fe", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "ea6e87f774c8608d60c8d34022a7d073bd7680a0a013f049fc62bf35efea1044"},