added stream fetch objects

This commit is contained in:
Maksim Pechnikov 2020-08-04 21:17:51 +03:00
parent a545c6e1e6
commit aa84f27df6
2 changed files with 25 additions and 27 deletions

View file

@ -6,7 +6,7 @@ defmodule Pleroma.RepoStreamer do
alias Pleroma.Repo
import Ecto.Query
def chunk_stream(query, chunk_size) do
def chunk_stream(query, chunk_size, opts \\ []) do
Stream.unfold(0, fn
:halt ->
{[], :halt}
@ -16,7 +16,7 @@ defmodule Pleroma.RepoStreamer do
|> order_by(asc: :id)
|> where([r], r.id > ^last_id)
|> limit(^chunk_size)
|> Repo.all()
|> Repo.all(opts)
|> case do
[] ->
{[], :halt}

View file

@ -10,6 +10,8 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do
use Pleroma.Workers.WorkerHelper, queue: "attachments_cleanup"
@batch_size 500
@impl Oban.Worker
def perform(%Job{
args: %{
@ -19,8 +21,7 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do
}) do
attachments
|> Enum.flat_map(fn item -> Enum.map(item["url"], & &1["href"]) end)
|> fetch_objects
|> prepare_objects(actor, Enum.map(attachments, & &1["name"]))
|> fetch_objects(actor, Enum.map(attachments, & &1["name"]))
|> filter_objects
|> do_clean
@ -71,17 +72,16 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do
end)
end
defp prepare_objects(objects, actor, names) do
objects
|> Enum.reduce(%{}, fn %{
id: id,
data: %{
"url" => [%{"href" => href}],
"actor" => obj_actor,
"name" => name
}
},
acc ->
defp prepare_objects(init, objects, actor, names) do
Enum.reduce(objects, init, fn %{
id: id,
data: %{
"url" => [%{"href" => href}],
"actor" => obj_actor,
"name" => name
}
},
acc ->
Map.update(acc, href, %{id: id, count: 1}, fn val ->
case obj_actor == actor and name in names do
true ->
@ -96,18 +96,16 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do
end)
end
defp fetch_objects(hrefs) do
from(o in Object,
where:
fragment(
"to_jsonb(array(select jsonb_array_elements((?)#>'{url}') ->> 'href' where jsonb_typeof((?)#>'{url}') = 'array'))::jsonb \\?| (?)",
o.data,
o.data,
^hrefs
)
defp fetch_objects(hrefs, actor, names) do
from(
o in Object,
where: fragment("object_attachment_urls(?) && (?)", o.data, ^hrefs)
)
# The query above can be time consumptive on large instances until we
# refactor how uploads are stored
|> Repo.all(timeout: :infinity)
|> Pleroma.RepoStreamer.chunk_stream(@batch_size, timeout: :infinity)
|> Stream.transform(%{}, fn objs, acc ->
res = prepare_objects(acc, objs, actor, names)
{res, res}
end)
|> Enum.to_list()
end
end