Add clustering with proxy file streaming

This commit is contained in:
Chris McCord 2022-01-27 09:36:04 -05:00
parent 2243651c70
commit add1d15177
11 changed files with 131 additions and 20 deletions

View file

@ -96,6 +96,7 @@ Hooks.AudioPlayer = {
mounted(){
this.playbackBeganAt = null
this.player = this.el.querySelector("audio")
this.playerDuration = 0
this.currentTime = this.el.querySelector("#player-time")
this.duration = this.el.querySelector("#player-duration")
this.progress = this.el.querySelector("#player-progress")
@ -115,9 +116,10 @@ Hooks.AudioPlayer = {
this.play()
}
})
this.handleEvent("play", ({url, token, elapsed}) => {
this.handleEvent("play", ({url, token, duration, elapsed}) => {
this.playbackBeganAt = nowSeconds() - elapsed
let currentSrc = this.player.src.split("?")[0]
this.playerDuration = duration
if(currentSrc === url && this.player.paused){
this.play({sync: true})
} else if(currentSrc !== url) {
@ -156,14 +158,15 @@ Hooks.AudioPlayer = {
},
updateProgress(){
if(isNaN(this.player.duration)){ return false }
if(this.player.currentTime >= this.player.duration){
if(this.playerDuration === 0){ return false }
if(Math.ceil(this.player.currentTime) >= Math.floor(this.playerDuration)){
this.playerDuration = 0
this.pushEvent("next_song_auto")
clearInterval(this.progressTimer)
return
}
this.progress.style.width = `${(this.player.currentTime / (this.player.duration) * 100)}%`
this.duration.innerText = this.formatTime(this.player.duration)
this.progress.style.width = `${(this.player.currentTime / (this.playerDuration) * 100)}%`
this.duration.innerText = this.formatTime(this.playerDuration)
this.currentTime.innerText = this.formatTime(this.player.currentTime)
},

View file

@ -2,7 +2,8 @@ import Config
config :live_beats, :files,
uploads_dir: Path.expand("../priv/uploads", __DIR__),
host: [scheme: "http", host: "localhost", port: 4000]
host: [scheme: "http", host: "localhost", port: 4000],
server_ip: "127.0.0.1"
config :live_beats, :github,
client_id: System.fetch_env!("LIVE_BEATS_GITHUB_CLIENT_ID"),

View file

@ -22,6 +22,10 @@ if config_env() == :prod do
host = System.get_env("PHX_HOST") || "example.com"
ecto_ipv6? = System.get_env("ECTO_IPV6") == "true"
app_name =
System.get_env("FLY_APP_NAME") ||
raise "FLY_APP_NAME not available"
config :live_beats, LiveBeats.Repo,
# ssl: true,
socket_options: if(ecto_ipv6?, do: [:inet6], else: []),
@ -49,9 +53,26 @@ if config_env() == :prod do
config :live_beats, :files,
uploads_dir: "/app/uploads",
host: [scheme: "https", host: host, port: 443]
host: [scheme: "https", host: host, port: 443],
server_ip: System.fetch_env!("LIVE_BEATS_SERVER_IP"),
hostname: "livebeats.local",
transport_opts: [inet6: true]
config :live_beats, :github,
client_id: System.fetch_env!("LIVE_BEATS_GITHUB_CLIENT_ID"),
client_secret: System.fetch_env!("LIVE_BEATS_GITHUB_CLIENT_SECRET")
config :libcluster,
debug: true,
topologies: [
fly6pn: [
strategy: Cluster.Strategy.DNSPoll,
config: [
polling_interval: 5_000,
query: "#{app_name}.internal",
node_basename: app_name
]
]
]
end

View file

@ -1,9 +1,10 @@
import Config
config :live_beats, :files, [
config :live_beats, :files,
uploads_dir: Path.expand("../tmp/test-uploads", __DIR__),
host: [scheme: "http", host: "localhost", port: 4000],
]
server_ip: "127.0.0.1"
# Configure your database
#
# The MIX_TEST_PARTITION environment variable can be used

View file

@ -8,8 +8,10 @@ defmodule LiveBeats.Application do
@impl true
def start(_type, _args) do
LiveBeats.MediaLibrary.attach()
topologies = Application.get_env(:libcluster, :topologies) || []
children = [
{Cluster.Supervisor, [topologies, [name: LiveBeats.ClusterSupervisor]]},
{Task.Supervisor, name: LiveBeats.TaskSupervisor},
# Start the Ecto repository
LiveBeats.Repo,

View file

@ -33,7 +33,7 @@ defmodule LiveBeats.MediaLibrary.Song do
@doc false
def changeset(song, attrs) do
song
|> cast(attrs, [:album_artist, :artist, :title, :attribution, :date_recorded, :date_released, :server_ip])
|> cast(attrs, [:album_artist, :artist, :title, :attribution, :date_recorded, :date_released])
|> validate_required([:artist, :title])
|> unique_constraint(:title,
message: "is a duplicated from another song",
@ -70,10 +70,8 @@ defmodule LiveBeats.MediaLibrary.Song do
end
def put_server_ip(%Ecto.Changeset{} = changeset) do
server_ip = (System.get_env("LIVE_BEATS_SERVER_IP") || "127.0.0.1")
changeset
|> Ecto.Changeset.cast(%{server_ip: server_ip}, [:server_ip])
server_ip = LiveBeats.config([:files, :server_ip])
Ecto.Changeset.cast(changeset, %{server_ip: server_ip}, [:server_ip])
end
defp mp3_url(filename) do

View file

@ -6,11 +6,27 @@ defmodule LiveBeatsWeb.FileController do
alias LiveBeats.MediaLibrary
require Logger
def show(conn, %{"id" => filename_uuid, "token" => token}) do
case Phoenix.Token.verify(conn, "file", token, max_age: :timer.minutes(1)) do
{:ok, ^filename_uuid} -> do_send_file(conn, MediaLibrary.local_filepath(filename_uuid))
{:ok, _} -> send_resp(conn, :unauthorized, "")
{:error, _} -> send_resp(conn, :unauthorized, "")
path = MediaLibrary.local_filepath(filename_uuid)
mime_type = MIME.from_path(path)
case Phoenix.Token.decrypt(conn, "file", token, max_age: :timer.minutes(1)) do
{:ok, %{uuid: ^filename_uuid, ip: ip}} ->
if local_file?(filename_uuid, ip) do
Logger.info("serving file from #{server_ip()}")
do_send_file(conn, path)
else
Logger.info("proxying file to #{ip} from #{server_ip()}")
proxy_file(conn, ip, mime_type)
end
{:ok, _} ->
send_resp(conn, :unauthorized, "")
{:error, _} ->
send_resp(conn, :unauthorized, "")
end
end
@ -21,4 +37,60 @@ defmodule LiveBeatsWeb.FileController do
|> put_resp_header("accept-ranges", "bytes")
|> send_file(200, path)
end
defp proxy_file(conn, ip, mime_type) do
uri = conn |> request_url() |> URI.parse()
port = LiveBeatsWeb.Endpoint.config(:http)[:port]
path = uri.path <> "?" <> uri.query <> "&from=#{server_ip()}"
{:ok, ipv6} = :inet.parse_address(String.to_charlist(ip))
{:ok, req} = Mint.HTTP.connect(:http, ipv6, port, file_server_opts())
{:ok, req, request_ref} = Mint.HTTP.request(req, "GET", path, [], "")
conn
|> put_resp_header("content-type", mime_type)
|> put_resp_header("accept-ranges", "bytes")
|> put_resp_header("transfer-encoding", "chunked")
|> send_chunked(200)
|> stream(req, request_ref)
end
defp stream(conn, req, ref) do
receive do
{:tcp, _, _} = msg ->
{:ok, req, responses} = Mint.HTTP.stream(req, msg)
new_conn =
Enum.reduce(responses, conn, fn
{:data, ^ref, data}, acc -> chunk!(acc, data)
{:done, ^ref}, acc -> halt(acc)
{:status, ^ref, 200}, acc -> acc
{:headers, ^ref, _}, acc -> acc
end)
if new_conn.halted do
new_conn
else
stream(new_conn, req, ref)
end
end
end
defp chunk!(conn, data) do
{:ok, conn} = chunk(conn, data)
conn
end
defp local_file?(_filename_uuid, ip) do
# TODO cache locally
ip == server_ip()
end
defp server_ip, do: LiveBeats.config([:files, :server_ip])
defp file_server_opts do
[
hostname: LiveBeats.config([:files, :hostname]) || "localhost",
transport_opts: LiveBeats.config([:files, :transport_opts]) || []
]
end
end

View file

@ -297,11 +297,17 @@ defmodule LiveBeatsWeb.PlayerLive do
end
defp push_play(socket, %Song{} = song, elapsed) do
token = Phoenix.Token.sign(socket.endpoint, "file", song.mp3_filename)
token =
Phoenix.Token.encrypt(socket.endpoint, "file", %{
vsn: 1,
ip: to_string(song.server_ip),
uuid: song.mp3_filename
})
push_event(socket, "play", %{
paused: Song.paused?(song),
elapsed: elapsed,
duration: song.duration,
token: token,
url: song.mp3_url
})

View file

@ -36,6 +36,7 @@ defmodule LiveBeats.MixProject do
{:phoenix, github: "phoenixframework/phoenix", override: true},
{:phoenix_ecto, "~> 4.4"},
{:ecto_sql, "~> 3.6"},
{:ecto_network, "~> 1.3.0"},
{:postgrex, ">= 0.0.0"},
{:phoenix_html, "~> 3.0"},
{:phoenix_live_reload, "~> 1.2", only: :dev},
@ -52,7 +53,8 @@ defmodule LiveBeats.MixProject do
{:mint, "~> 1.0"},
{:heroicons, "~> 0.2.2"},
{:castore, "~> 0.1.13"},
{:tailwind, "~> 0.1"}
{:tailwind, "~> 0.1"},
{:libcluster, "~> 3.3.1"}
]
end

View file

@ -19,6 +19,7 @@
"html_entities": {:hex, :html_entities, "0.5.2", "9e47e70598da7de2a9ff6af8758399251db6dbb7eebe2b013f2bbd2515895c3c", [:mix], [], "hexpm", "c53ba390403485615623b9531e97696f076ed415e8d8058b1dbaa28181f4fdcc"},
"id3": {:hex, :id3, "1.0.1", "344840dee445c19be4b6dca9c70f0a7cd5dd03cb4260bae8ad9a21457ef12813", [:mix], [{:rustler, "~> 0.21.0", [hex: :rustler, repo: "hexpm", optional: false]}], "hexpm", "07ee2a284414065920751a528f237e77dc2aeed222a75302f909f3c07fd867e1"},
"jason": {:hex, :jason, "1.3.0", "fa6b82a934feb176263ad2df0dbd91bf633d4a46ebfdffea0c8ae82953714946", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "53fc1f51255390e0ec7e50f9cb41e751c260d065dcba2bf0d08dc51a4002c2ac"},
"libcluster": {:hex, :libcluster, "3.3.1", "e7a4875cd1290cee7a693d6bd46076863e9e433708b01339783de6eff5b7f0aa", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "b575ca63c1cd84e01f3fa0fc45e6eb945c1ee7ae8d441d33def999075e9e5398"},
"mime": {:hex, :mime, "2.0.2", "0b9e1a4c840eafb68d820b0e2158ef5c49385d17fb36855ac6e7e087d4b1dcc5", [:mix], [], "hexpm", "e6a3f76b4c277739e36c2e21a2c640778ba4c3846189d5ab19f97f126df5f9b7"},
"mint": {:hex, :mint, "1.3.0", "396b3301102f7b775e103da5a20494b25753aed818d6d6f0ad222a3a018c3600", [:mix], [{:castore, "~> 0.1.0", [hex: :castore, repo: "hexpm", optional: true]}], "hexpm", "a9aac960562e43ca69a77e5176576abfa78b8398cec5543dd4fb4ab0131d5c1e"},
"phoenix": {:git, "https://github.com/phoenixframework/phoenix.git", "8d2b33ac9691bd624ede602088d213f89600d233", []},

4
rel/env.sh.eex Normal file
View file

@ -0,0 +1,4 @@
ip=$(grep fly-local-6pn /etc/hosts | cut -f 1)
export RELEASE_DISTRIBUTION=name
export RELEASE_NODE=$FLY_APP_NAME@$ip
export LIVE_BEATS_SERVER_IP=$ip