Change HashtagsTableMigrator to use %DataMigrationFailedId{} structs with FlakeIds

This commit is contained in:
Alex Gleason 2021-12-24 11:28:46 -06:00
parent 1776ea1c86
commit 91902c87de
No known key found for this signature in database
GPG key ID: 7211D1F99744FBB7
2 changed files with 33 additions and 17 deletions

View file

@ -0,0 +1,13 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.DataMigrationFailedId do
use Ecto.Schema
alias Pleroma.DataMigration
schema "data_migration_failed_ids" do
belongs_to(:data_migration, DataMigration)
field(:record_id, FlakeId.Ecto.CompatType)
end
end

View file

@ -12,11 +12,14 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
use Pleroma.Migrators.Support.BaseMigrator
alias Pleroma.DataMigrationFailedId
alias Pleroma.Hashtag
alias Pleroma.HashtagObject
alias Pleroma.Migrators.Support.BaseMigrator
alias Pleroma.Object
import Ecto.Query
@impl BaseMigrator
def feature_config_path, do: [:features, :improved_hashtag_timeline]
@ -51,19 +54,20 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
for failed_id <- failed_ids do
_ =
Repo.query(
"INSERT INTO data_migration_failed_ids(data_migration_id, record_id) " <>
"VALUES ($1, $2) ON CONFLICT DO NOTHING;",
[data_migration_id, failed_id]
)
%DataMigrationFailedId{
data_migration_id: data_migration_id,
record_id: failed_id
}
|> Repo.insert()
end
record_ids = object_ids -- failed_ids
_ =
Repo.query(
"DELETE FROM data_migration_failed_ids " <>
"WHERE data_migration_id = $1 AND record_id = ANY($2)",
[data_migration_id, object_ids -- failed_ids]
)
DataMigrationFailedId
|> where(data_migration_id: ^data_migration_id)
|> where([dmf], dmf.record_id in ^record_ids)
|> Repo.delete_all()
max_object_id = Enum.at(object_ids, -1)
@ -148,14 +152,13 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
failed_objects_query()
|> Repo.chunk_stream(100, :one)
|> Stream.each(fn object ->
|> Stream.each(fn %{id: object_id} = object ->
with {res, _} when res != :error <- transfer_object_hashtags(object) do
_ =
Repo.query(
"DELETE FROM data_migration_failed_ids " <>
"WHERE data_migration_id = $1 AND record_id = $2",
[data_migration_id, object.id]
)
DataMigrationFailedId
|> where(data_migration_id: ^data_migration_id)
|> where(record_id: ^object_id)
|> Repo.delete_all()
end
end)
|> Stream.run()
@ -168,7 +171,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
defp failed_objects_query do
from(o in Object)
|> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"),
|> join(:inner, [o], dmf in DataMigrationFailedId,
on: dmf.record_id == o.id
)
|> where([_o, dmf], dmf.data_migration_id == ^data_migration_id())