From 50794e1af184b858a7611123864f0462d1082000 Mon Sep 17 00:00:00 2001 From: Chris McCord Date: Fri, 27 Jan 2023 14:58:29 -0500 Subject: [PATCH] Fix race conditions with advisory locks --- lib/live_beats/media_library.ex | 7 +++++++ lib/live_beats/media_library/song.ex | 2 +- lib/live_beats/repo.ex | 10 ++++++++++ lib/live_beats_web/components/core_components.ex | 2 +- lib/live_beats_web/live/profile_live.ex | 7 ++----- 5 files changed, 21 insertions(+), 7 deletions(-) diff --git a/lib/live_beats/media_library.ex b/lib/live_beats/media_library.ex index 5fee975..59e62c5 100644 --- a/lib/live_beats/media_library.ex +++ b/lib/live_beats/media_library.ex @@ -163,6 +163,7 @@ defmodule LiveBeats.MediaLibrary do multi = Ecto.Multi.new() + |> lock_playlist(user.id) |> Ecto.Multi.run(:starting_position, fn repo, _changes -> count = repo.one(from s in Song, where: s.user_id == ^user.id, select: count(s.id)) {:ok, count - 1} @@ -358,6 +359,7 @@ defmodule LiveBeats.MediaLibrary do multi = Ecto.Multi.new() + |> lock_playlist(song.user_id) |> Ecto.Multi.run(:valid_index, fn repo, _changes -> case repo.one(from s in Song, where: s.user_id == ^song.user_id, select: count(s.id)) do count when new_index < count -> {:ok, count} @@ -406,6 +408,7 @@ defmodule LiveBeats.MediaLibrary do old_index = song.position Ecto.Multi.new() + |> lock_playlist(song.user_id) |> Ecto.Multi.delete(:delete, song) |> multi_update_all(:dec_positions, fn _ -> from(s in Song, @@ -533,4 +536,8 @@ defmodule LiveBeats.MediaLibrary do defp multi_update_all(multi, name, func, opts \\ []) do Ecto.Multi.update_all(multi, name, func, opts) end + + defp lock_playlist(%Ecto.Multi{} = multi, user_id) do + Repo.multi_transaction_lock(multi, :playlist, user_id) + end end diff --git a/lib/live_beats/media_library/song.ex b/lib/live_beats/media_library/song.ex index e17c406..ef4dbc1 100644 --- a/lib/live_beats/media_library/song.ex +++ b/lib/live_beats/media_library/song.ex @@ -13,7 +13,7 @@ defmodule LiveBeats.MediaLibrary.Song do field :date_recorded, :naive_datetime field :date_released, :naive_datetime field :duration, :integer - field :status, Ecto.Enum, values: [stopped: 1, playing: 2, paused: 3] + field :status, Ecto.Enum, values: [stopped: 1, playing: 2, paused: 3], default: :stopped field :title, :string field :attribution, :string field :mp3_url, :string diff --git a/lib/live_beats/repo.ex b/lib/live_beats/repo.ex index 03a357a..4f9918d 100644 --- a/lib/live_beats/repo.ex +++ b/lib/live_beats/repo.ex @@ -4,6 +4,16 @@ defmodule LiveBeats.Repo do adapter: Ecto.Adapters.Postgres def replica, do: LiveBeats.config([:replica]) + + @locks %{playlist: 1} + + def multi_transaction_lock(multi, scope, id) when is_atom(scope) and is_integer(id) do + scope_int = Map.fetch!(@locks, scope) + + Ecto.Multi.run(multi, scope, fn repo, _changes -> + repo.query("SELECT pg_advisory_xact_lock(#{scope_int}, #{id})") + end) + end end defmodule LiveBeats.ReplicaRepo do diff --git a/lib/live_beats_web/components/core_components.ex b/lib/live_beats_web/components/core_components.ex index a4e9644..2d29c38 100644 --- a/lib/live_beats_web/components/core_components.ex +++ b/lib/live_beats_web/components/core_components.ex @@ -582,7 +582,7 @@ defmodule LiveBeatsWeb.CoreComponents do diff --git a/lib/live_beats_web/live/profile_live.ex b/lib/live_beats_web/live/profile_live.ex index b9f1f21..2d01151 100644 --- a/lib/live_beats_web/live/profile_live.ex +++ b/lib/live_beats_web/live/profile_live.ex @@ -159,7 +159,7 @@ defmodule LiveBeatsWeb.ProfileLive do profile: profile, owns_profile?: MediaLibrary.owns_profile?(current_user, profile) ) - |> stream_songs() + |> stream(:songs, MediaLibrary.list_profile_songs(profile, 50)) |> assign_presences() {:ok, socket, temporary_assigns: [presences: %{}]} @@ -201,6 +201,7 @@ defmodule LiveBeatsWeb.ProfileLive do def handle_event("row_dropped", %{"id" => dom_id, "old" => old_idx, "new" => new_idx}, socket) do "songs-" <> id = dom_id song = MediaLibrary.get_song!(id) + if song.user_id == socket.assigns.current_user.id and song.position == old_idx do :ok = MediaLibrary.update_song_position(song, new_idx) {:noreply, socket} @@ -338,10 +339,6 @@ defmodule LiveBeatsWeb.ProfileLive do socket end - defp stream_songs(socket) do - stream(socket, :songs, MediaLibrary.list_profile_songs(socket.assigns.profile, 50)) - end - defp assign_presences(socket) do socket = assign(socket, presences_count: 0, presences: %{}, presence_ids: %{})