From add1d15177aa83245d0732f0abfd28a52737ec95 Mon Sep 17 00:00:00 2001 From: Chris McCord Date: Thu, 27 Jan 2022 09:36:04 -0500 Subject: [PATCH] Add clustering with proxy file streaming --- assets/js/app.js | 13 +-- config/dev.exs | 3 +- config/runtime.exs | 23 +++++- config/test.exs | 5 +- lib/live_beats/application.ex | 2 + lib/live_beats/media_library/song.ex | 8 +- .../controllers/file_controller.ex | 80 ++++++++++++++++++- lib/live_beats_web/live/player_live.ex | 8 +- mix.exs | 4 +- mix.lock | 1 + rel/env.sh.eex | 4 + 11 files changed, 131 insertions(+), 20 deletions(-) create mode 100644 rel/env.sh.eex diff --git a/assets/js/app.js b/assets/js/app.js index a7a820a..4db38a1 100644 --- a/assets/js/app.js +++ b/assets/js/app.js @@ -96,6 +96,7 @@ Hooks.AudioPlayer = { mounted(){ this.playbackBeganAt = null this.player = this.el.querySelector("audio") + this.playerDuration = 0 this.currentTime = this.el.querySelector("#player-time") this.duration = this.el.querySelector("#player-duration") this.progress = this.el.querySelector("#player-progress") @@ -115,9 +116,10 @@ Hooks.AudioPlayer = { this.play() } }) - this.handleEvent("play", ({url, token, elapsed}) => { + this.handleEvent("play", ({url, token, duration, elapsed}) => { this.playbackBeganAt = nowSeconds() - elapsed let currentSrc = this.player.src.split("?")[0] + this.playerDuration = duration if(currentSrc === url && this.player.paused){ this.play({sync: true}) } else if(currentSrc !== url) { @@ -156,14 +158,15 @@ Hooks.AudioPlayer = { }, updateProgress(){ - if(isNaN(this.player.duration)){ return false } - if(this.player.currentTime >= this.player.duration){ + if(this.playerDuration === 0){ return false } + if(Math.ceil(this.player.currentTime) >= Math.floor(this.playerDuration)){ + this.playerDuration = 0 this.pushEvent("next_song_auto") clearInterval(this.progressTimer) return } - this.progress.style.width = `${(this.player.currentTime / (this.player.duration) * 100)}%` - this.duration.innerText = this.formatTime(this.player.duration) + this.progress.style.width = `${(this.player.currentTime / (this.playerDuration) * 100)}%` + this.duration.innerText = this.formatTime(this.playerDuration) this.currentTime.innerText = this.formatTime(this.player.currentTime) }, diff --git a/config/dev.exs b/config/dev.exs index b94c083..4f1093f 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -2,7 +2,8 @@ import Config config :live_beats, :files, uploads_dir: Path.expand("../priv/uploads", __DIR__), - host: [scheme: "http", host: "localhost", port: 4000] + host: [scheme: "http", host: "localhost", port: 4000], + server_ip: "127.0.0.1" config :live_beats, :github, client_id: System.fetch_env!("LIVE_BEATS_GITHUB_CLIENT_ID"), diff --git a/config/runtime.exs b/config/runtime.exs index db0b097..bbed52c 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -22,6 +22,10 @@ if config_env() == :prod do host = System.get_env("PHX_HOST") || "example.com" ecto_ipv6? = System.get_env("ECTO_IPV6") == "true" + app_name = + System.get_env("FLY_APP_NAME") || + raise "FLY_APP_NAME not available" + config :live_beats, LiveBeats.Repo, # ssl: true, socket_options: if(ecto_ipv6?, do: [:inet6], else: []), @@ -49,9 +53,26 @@ if config_env() == :prod do config :live_beats, :files, uploads_dir: "/app/uploads", - host: [scheme: "https", host: host, port: 443] + host: [scheme: "https", host: host, port: 443], + server_ip: System.fetch_env!("LIVE_BEATS_SERVER_IP"), + hostname: "livebeats.local", + transport_opts: [inet6: true] + config :live_beats, :github, client_id: System.fetch_env!("LIVE_BEATS_GITHUB_CLIENT_ID"), client_secret: System.fetch_env!("LIVE_BEATS_GITHUB_CLIENT_SECRET") + + config :libcluster, + debug: true, + topologies: [ + fly6pn: [ + strategy: Cluster.Strategy.DNSPoll, + config: [ + polling_interval: 5_000, + query: "#{app_name}.internal", + node_basename: app_name + ] + ] + ] end diff --git a/config/test.exs b/config/test.exs index 043e354..a3bde67 100644 --- a/config/test.exs +++ b/config/test.exs @@ -1,9 +1,10 @@ import Config -config :live_beats, :files, [ +config :live_beats, :files, uploads_dir: Path.expand("../tmp/test-uploads", __DIR__), host: [scheme: "http", host: "localhost", port: 4000], -] + server_ip: "127.0.0.1" + # Configure your database # # The MIX_TEST_PARTITION environment variable can be used diff --git a/lib/live_beats/application.ex b/lib/live_beats/application.ex index ed99555..f842946 100644 --- a/lib/live_beats/application.ex +++ b/lib/live_beats/application.ex @@ -8,8 +8,10 @@ defmodule LiveBeats.Application do @impl true def start(_type, _args) do LiveBeats.MediaLibrary.attach() + topologies = Application.get_env(:libcluster, :topologies) || [] children = [ + {Cluster.Supervisor, [topologies, [name: LiveBeats.ClusterSupervisor]]}, {Task.Supervisor, name: LiveBeats.TaskSupervisor}, # Start the Ecto repository LiveBeats.Repo, diff --git a/lib/live_beats/media_library/song.ex b/lib/live_beats/media_library/song.ex index cfc53ea..083ad4d 100644 --- a/lib/live_beats/media_library/song.ex +++ b/lib/live_beats/media_library/song.ex @@ -33,7 +33,7 @@ defmodule LiveBeats.MediaLibrary.Song do @doc false def changeset(song, attrs) do song - |> cast(attrs, [:album_artist, :artist, :title, :attribution, :date_recorded, :date_released, :server_ip]) + |> cast(attrs, [:album_artist, :artist, :title, :attribution, :date_recorded, :date_released]) |> validate_required([:artist, :title]) |> unique_constraint(:title, message: "is a duplicated from another song", @@ -70,10 +70,8 @@ defmodule LiveBeats.MediaLibrary.Song do end def put_server_ip(%Ecto.Changeset{} = changeset) do - server_ip = (System.get_env("LIVE_BEATS_SERVER_IP") || "127.0.0.1") - - changeset - |> Ecto.Changeset.cast(%{server_ip: server_ip}, [:server_ip]) + server_ip = LiveBeats.config([:files, :server_ip]) + Ecto.Changeset.cast(changeset, %{server_ip: server_ip}, [:server_ip]) end defp mp3_url(filename) do diff --git a/lib/live_beats_web/controllers/file_controller.ex b/lib/live_beats_web/controllers/file_controller.ex index 97b97b5..390ffb2 100644 --- a/lib/live_beats_web/controllers/file_controller.ex +++ b/lib/live_beats_web/controllers/file_controller.ex @@ -6,11 +6,27 @@ defmodule LiveBeatsWeb.FileController do alias LiveBeats.MediaLibrary + require Logger + def show(conn, %{"id" => filename_uuid, "token" => token}) do - case Phoenix.Token.verify(conn, "file", token, max_age: :timer.minutes(1)) do - {:ok, ^filename_uuid} -> do_send_file(conn, MediaLibrary.local_filepath(filename_uuid)) - {:ok, _} -> send_resp(conn, :unauthorized, "") - {:error, _} -> send_resp(conn, :unauthorized, "") + path = MediaLibrary.local_filepath(filename_uuid) + mime_type = MIME.from_path(path) + + case Phoenix.Token.decrypt(conn, "file", token, max_age: :timer.minutes(1)) do + {:ok, %{uuid: ^filename_uuid, ip: ip}} -> + if local_file?(filename_uuid, ip) do + Logger.info("serving file from #{server_ip()}") + do_send_file(conn, path) + else + Logger.info("proxying file to #{ip} from #{server_ip()}") + proxy_file(conn, ip, mime_type) + end + + {:ok, _} -> + send_resp(conn, :unauthorized, "") + + {:error, _} -> + send_resp(conn, :unauthorized, "") end end @@ -21,4 +37,60 @@ defmodule LiveBeatsWeb.FileController do |> put_resp_header("accept-ranges", "bytes") |> send_file(200, path) end + + defp proxy_file(conn, ip, mime_type) do + uri = conn |> request_url() |> URI.parse() + port = LiveBeatsWeb.Endpoint.config(:http)[:port] + path = uri.path <> "?" <> uri.query <> "&from=#{server_ip()}" + {:ok, ipv6} = :inet.parse_address(String.to_charlist(ip)) + {:ok, req} = Mint.HTTP.connect(:http, ipv6, port, file_server_opts()) + {:ok, req, request_ref} = Mint.HTTP.request(req, "GET", path, [], "") + + conn + |> put_resp_header("content-type", mime_type) + |> put_resp_header("accept-ranges", "bytes") + |> put_resp_header("transfer-encoding", "chunked") + |> send_chunked(200) + |> stream(req, request_ref) + end + + defp stream(conn, req, ref) do + receive do + {:tcp, _, _} = msg -> + {:ok, req, responses} = Mint.HTTP.stream(req, msg) + + new_conn = + Enum.reduce(responses, conn, fn + {:data, ^ref, data}, acc -> chunk!(acc, data) + {:done, ^ref}, acc -> halt(acc) + {:status, ^ref, 200}, acc -> acc + {:headers, ^ref, _}, acc -> acc + end) + + if new_conn.halted do + new_conn + else + stream(new_conn, req, ref) + end + end + end + + defp chunk!(conn, data) do + {:ok, conn} = chunk(conn, data) + conn + end + + defp local_file?(_filename_uuid, ip) do + # TODO cache locally + ip == server_ip() + end + + defp server_ip, do: LiveBeats.config([:files, :server_ip]) + + defp file_server_opts do + [ + hostname: LiveBeats.config([:files, :hostname]) || "localhost", + transport_opts: LiveBeats.config([:files, :transport_opts]) || [] + ] + end end diff --git a/lib/live_beats_web/live/player_live.ex b/lib/live_beats_web/live/player_live.ex index fa2f50c..d486dc5 100644 --- a/lib/live_beats_web/live/player_live.ex +++ b/lib/live_beats_web/live/player_live.ex @@ -297,11 +297,17 @@ defmodule LiveBeatsWeb.PlayerLive do end defp push_play(socket, %Song{} = song, elapsed) do - token = Phoenix.Token.sign(socket.endpoint, "file", song.mp3_filename) + token = + Phoenix.Token.encrypt(socket.endpoint, "file", %{ + vsn: 1, + ip: to_string(song.server_ip), + uuid: song.mp3_filename + }) push_event(socket, "play", %{ paused: Song.paused?(song), elapsed: elapsed, + duration: song.duration, token: token, url: song.mp3_url }) diff --git a/mix.exs b/mix.exs index 7fe00cb..ca52619 100644 --- a/mix.exs +++ b/mix.exs @@ -36,6 +36,7 @@ defmodule LiveBeats.MixProject do {:phoenix, github: "phoenixframework/phoenix", override: true}, {:phoenix_ecto, "~> 4.4"}, {:ecto_sql, "~> 3.6"}, + {:ecto_network, "~> 1.3.0"}, {:postgrex, ">= 0.0.0"}, {:phoenix_html, "~> 3.0"}, {:phoenix_live_reload, "~> 1.2", only: :dev}, @@ -52,7 +53,8 @@ defmodule LiveBeats.MixProject do {:mint, "~> 1.0"}, {:heroicons, "~> 0.2.2"}, {:castore, "~> 0.1.13"}, - {:tailwind, "~> 0.1"} + {:tailwind, "~> 0.1"}, + {:libcluster, "~> 3.3.1"} ] end diff --git a/mix.lock b/mix.lock index 2d0e741..41968e9 100644 --- a/mix.lock +++ b/mix.lock @@ -19,6 +19,7 @@ "html_entities": {:hex, :html_entities, "0.5.2", "9e47e70598da7de2a9ff6af8758399251db6dbb7eebe2b013f2bbd2515895c3c", [:mix], [], "hexpm", "c53ba390403485615623b9531e97696f076ed415e8d8058b1dbaa28181f4fdcc"}, "id3": {:hex, :id3, "1.0.1", "344840dee445c19be4b6dca9c70f0a7cd5dd03cb4260bae8ad9a21457ef12813", [:mix], [{:rustler, "~> 0.21.0", [hex: :rustler, repo: "hexpm", optional: false]}], "hexpm", "07ee2a284414065920751a528f237e77dc2aeed222a75302f909f3c07fd867e1"}, "jason": {:hex, :jason, "1.3.0", "fa6b82a934feb176263ad2df0dbd91bf633d4a46ebfdffea0c8ae82953714946", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "53fc1f51255390e0ec7e50f9cb41e751c260d065dcba2bf0d08dc51a4002c2ac"}, + "libcluster": {:hex, :libcluster, "3.3.1", "e7a4875cd1290cee7a693d6bd46076863e9e433708b01339783de6eff5b7f0aa", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "b575ca63c1cd84e01f3fa0fc45e6eb945c1ee7ae8d441d33def999075e9e5398"}, "mime": {:hex, :mime, "2.0.2", "0b9e1a4c840eafb68d820b0e2158ef5c49385d17fb36855ac6e7e087d4b1dcc5", [:mix], [], "hexpm", "e6a3f76b4c277739e36c2e21a2c640778ba4c3846189d5ab19f97f126df5f9b7"}, "mint": {:hex, :mint, "1.3.0", "396b3301102f7b775e103da5a20494b25753aed818d6d6f0ad222a3a018c3600", [:mix], [{:castore, "~> 0.1.0", [hex: :castore, repo: "hexpm", optional: true]}], "hexpm", "a9aac960562e43ca69a77e5176576abfa78b8398cec5543dd4fb4ab0131d5c1e"}, "phoenix": {:git, "https://github.com/phoenixframework/phoenix.git", "8d2b33ac9691bd624ede602088d213f89600d233", []}, diff --git a/rel/env.sh.eex b/rel/env.sh.eex new file mode 100644 index 0000000..3d2dc99 --- /dev/null +++ b/rel/env.sh.eex @@ -0,0 +1,4 @@ +ip=$(grep fly-local-6pn /etc/hosts | cut -f 1) +export RELEASE_DISTRIBUTION=name +export RELEASE_NODE=$FLY_APP_NAME@$ip +export LIVE_BEATS_SERVER_IP=$ip \ No newline at end of file