Improve user deletion consistency

An attempt to ensure something like
https://git.pleroma.social/pleroma/pleroma/-/issues/1415 does not happen
or is at least debuggable.

- Deactivate the user before deletion to ensure no new posts/follows
can be made during it
- Run the deletion in a transaction. This should reduce performance
impact of a deletion since it will only use a single connection. Also
makes sure an account cannot get stuck in a weird state between deleted
and active. Made it possible to disable though, in case someone hits
the issue mentioned above.
- Log more errors
This commit is contained in:
rinpatch 2021-02-24 17:28:49 +03:00
parent d113ed94e7
commit 4286a383df
7 changed files with 190 additions and 36 deletions

View file

@ -652,7 +652,9 @@ config :pleroma, :oauth2,
issue_new_refresh_token: true,
clean_expired_tokens: false
config :pleroma, :database, rum_enabled: false
config :pleroma, :database,
rum_enabled: false,
rollback_on_activity_deletion_errors: true
config :pleroma, :env, Mix.env()

View file

@ -70,6 +70,20 @@ frontend_options = [
]
config :pleroma, :config_description, [
%{
group: :pleroma,
key: :database,
type: :group,
description: "Database settings",
children: [
%{
key: :rollback_on_activity_deletion_errors,
type: :boolean,
description:
"Rollback the transaction if Pleroma fails to delete an activity during user deletion. If you need to disable this, please report the issue you were having on the bugtracker."
}
]
},
%{
group: :pleroma,
key: Pleroma.Upload,

View file

@ -133,6 +133,10 @@ config :pleroma, :side_effects,
ap_streamer: Pleroma.Web.ActivityPub.ActivityPubMock,
logger: Pleroma.LoggerMock
# Disable transaction check by default unless the test wants otherwise
# because all tests run in a transaction.
config :pleroma, Pleroma.Workers.BackgroundWorker, ignore_transaction_check: true
if File.exists?("./config/test.secret.exs") do
import_config "test.secret.exs"
else

View file

@ -1072,7 +1072,19 @@ defmodule Pleroma.User do
def update_and_set_cache(changeset) do
with {:ok, user} <- Repo.update(changeset, stale_error_field: :id) do
set_cache(user)
BackgroundWorker.execute_or_enqueue_if_in_transaction(fn
false ->
set_cache(user)
# If the function has been enqueued, there is a chance something changed
# before the worker got to executing it, so refetch the user from the database
true ->
user.id
|> get_by_id()
|> set_cache()
end)
{:ok, user}
end
end
@ -1339,7 +1351,7 @@ defmodule Pleroma.User do
user
|> follow_information_changeset(%{follower_count: follower_count})
|> update_and_set_cache
|> update_and_set_cache()
else
{:ok, maybe_fetch_follow_information(user)}
end
@ -1747,27 +1759,67 @@ defmodule Pleroma.User do
@spec perform(atom(), User.t()) :: {:ok, User.t()}
def perform(:delete, %User{} = user) do
# Remove all relationships
user
|> get_followers()
|> Enum.each(fn follower ->
ActivityPub.unfollow(follower, user)
unfollow(follower, user)
end)
# Deactivate the user before starting the deletion
# to make sure they are not able to make new posts/follows during it
{:ok, user} = set_activation_status(user, false)
user
|> get_friends()
|> Enum.each(fn followed ->
ActivityPub.unfollow(user, followed)
unfollow(user, followed)
end)
Repo.transaction(
fn ->
# Remove all relationships
# No need to handle errors from ActivityPub.unfollow because
# they will automatically rollback the transaction.
user
|> get_followers()
|> Enum.each(fn follower ->
ActivityPub.unfollow(follower, user)
unfollow(follower, user)
end)
delete_user_activities(user)
delete_notifications_from_user_activities(user)
user
|> get_friends()
|> Enum.each(fn followed ->
ActivityPub.unfollow(user, followed)
unfollow(user, followed)
end)
delete_outgoing_pending_follow_requests(user)
rollback_on_activity_deletion_errors =
Config.get([:database, :rollback_on_activity_deletion_errors], true)
delete_or_deactivate(user)
case {delete_user_activities(user), rollback_on_activity_deletion_errors} do
{res, rollback} when res == :ok or rollback == false ->
case res do
{:error, _} ->
Logger.warn(fn ->
"Deleting #{user.ap_id}: Failed deleting some of the activities, proceeding anyway."
end)
_ ->
:noop
end
delete_notifications_from_user_activities(user)
delete_outgoing_pending_follow_requests(user)
case delete_or_deactivate(user) do
{:ok, user} -> user
{:error, e} -> Repo.rollback(e)
end
{{:error, e}, true} ->
Logger.error(fn ->
"""
Deleting #{user.ap_id}: Failed deleting some of the activities, rolling back.
Set `config :pleroma, :database, rollback_on_activity_deletion_errors: true`
and restart the deletion if you want to continue anyway. Please report this on Pleroma bugtracker.
"""
end)
Repo.rollback({:deleting_activities, e})
end
end,
timeout: :infinity
)
end
def perform(:set_activation_async, user, status), do: set_activation(user, status)
@ -1807,16 +1859,48 @@ defmodule Pleroma.User do
|> Repo.delete_all()
end
@type activity_id :: String.t()
@spec delete_user_activities(User.t()) ::
:ok | {:error, [{:error, activity_id(), any()}]}
def delete_user_activities(%User{ap_id: ap_id} = user) do
ap_id
|> Activity.Queries.by_actor()
|> Repo.chunk_stream(50, :batches)
|> Stream.each(fn activities ->
Enum.each(activities, fn activity -> delete_activity(activity, user) end)
end)
|> Stream.run()
errors =
ap_id
|> Activity.Queries.by_actor()
|> Repo.chunk_stream(50)
|> Stream.flat_map(fn activity ->
case delete_activity(activity, user) do
{:ok, _activity, _meta} ->
[]
{:error, error} ->
Logger.error(fn ->
"Deleting #{ap_id}: could not delete or undo #{activity.data["id"]}.\n Reason: #{
inspect(error)
}"
end)
[{:error, activity.id, error}]
:noop ->
Logger.debug(fn ->
"Deleting #{ap_id}: nothing to do for #{activity.data["id"]} of type #{
activity.data["type"]
}"
end)
[]
end
end)
|> Enum.to_list()
case errors do
[] -> :ok
errors -> {:error, errors}
end
end
@spec delete_activity(Pleroma.Activity.t(), User.t()) ::
{:ok, Activity.t(), keyword()} | {:error, any()} | :noop
defp delete_activity(%{data: %{"type" => "Create", "object" => object}} = activity, user) do
with {_, %Object{}} <- {:find_object, Object.get_by_ap_id(object)},
{:ok, delete_data, _} <- Builder.delete(user, object) do
@ -1831,18 +1915,20 @@ defmodule Pleroma.User do
end
e ->
Logger.error("Could not delete #{object} created by #{activity.data["ap_id"]}")
Logger.error("Error: #{inspect(e)}")
e
end
end
defp delete_activity(%{data: %{"type" => type}} = activity, user)
when type in ["Like", "Announce"] do
{:ok, undo, _} = Builder.undo(user, activity)
Pipeline.common_pipeline(undo, local: user.local)
with {:ok, undo, _} <- Builder.undo(user, activity) do
Pipeline.common_pipeline(undo, local: user.local)
else
e -> e
end
end
defp delete_activity(_activity, _user), do: "Doing nothing"
defp delete_activity(_activity, _user), do: :noop
defp delete_outgoing_pending_follow_requests(user) do
user

View file

@ -13,6 +13,7 @@ defmodule Pleroma.Web.ActivityPub.Pipeline do
alias Pleroma.Web.ActivityPub.SideEffects
alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.Federator
alias Pleroma.Workers.BackgroundWorker
@side_effects Config.get([:pipeline, :side_effects], SideEffects)
@federator Config.get([:pipeline, :federator], Federator)
@ -26,7 +27,10 @@ defmodule Pleroma.Web.ActivityPub.Pipeline do
def common_pipeline(object, meta) do
case Repo.transaction(fn -> do_common_pipeline(object, meta) end) do
{:ok, {:ok, activity, meta}} ->
@side_effects.handle_after_transaction(meta)
BackgroundWorker.execute_or_enqueue_if_in_transaction(fn ->
@side_effects.handle_after_transaction(meta)
end)
{:ok, activity, meta}
{:ok, value} ->

View file

@ -18,6 +18,7 @@ defmodule Pleroma.Web.Streamer do
alias Pleroma.Web.OAuth.Token
alias Pleroma.Web.Plugs.OAuthScopesPlug
alias Pleroma.Web.StreamerView
alias Pleroma.Workers.BackgroundWorker
@mix_env Mix.env()
@registry Pleroma.Web.StreamerRegistry
@ -135,9 +136,11 @@ defmodule Pleroma.Web.Streamer do
def stream(topics, items) do
if should_env_send?() do
for topic <- List.wrap(topics), item <- List.wrap(items) do
spawn(fn -> do_stream(topic, item) end)
end
BackgroundWorker.execute_or_enqueue_if_in_transaction(fn ->
for topic <- List.wrap(topics), item <- List.wrap(items) do
spawn(fn -> do_stream(topic, item) end)
end
end)
end
end

View file

@ -38,4 +38,45 @@ defmodule Pleroma.Workers.BackgroundWorker do
Pleroma.FollowingRelationship.move_following(origin, target)
end
def perform(%Job{args: %{"op" => "transaction_side_effects", "function" => encoded_function}}) do
function =
encoded_function
|> Base.decode64!()
|> :erlang.binary_to_term()
maybe_execute_function_with_worker_info(function, true)
:ok
end
@doc "Executes a function right away if not running in transaction. Otherwise enqueues it to be executed by BackgroundWorker after transaction commit. Intended for side effects that can not be rolled back. If the function has an arity of 1, the first argument will be a boolean indicating whether it is run by BackgroundWorker or not."
@spec execute_or_enqueue_if_in_transaction((() -> any()) | (boolean() -> any())) ::
{:ok, {:enqueued, Oban.Job.t()}}
| {:error, {:enqueue, Oban.job_changeset()}}
| {:error, {:enqueue, term()}}
| {:ok, {:executed, term()}}
def execute_or_enqueue_if_in_transaction(function) do
if Pleroma.Repo.in_transaction?() and
!Pleroma.Config.get([__MODULE__, :ignore_transaction_check], false) do
encoded_function =
function
|> :erlang.term_to_binary()
|> Base.encode64()
case enqueue("transaction_side_effects", %{"function" => encoded_function}) do
{:ok, job} -> {:ok, {:enqueued, job}}
{:error, e} -> {:error, {:enqueue, e}}
end
else
{:ok, {:executed, maybe_execute_function_with_worker_info(function, false)}}
end
end
defp maybe_execute_function_with_worker_info(function, executed_by_worker) do
if :erlang.fun_info(function)[:arity] == 1 do
function.(executed_by_worker)
else
function.()
end
end
end