bookwyrm/bookwyrm/activitystreams.py
Mouse Reeve 317cf5fcf5 Generate fewer add_status_tasks
Previously, every time a status was saved, a task would start to add it
to people's timelines. This meant there were a ton of duplicate tasks
that were potentially heavy to run. Now, the Status model has a "ready"
field which indicates that it's worth updating the timelines. It
defaults to True, which prevents statuses from accidentally not being
added due to ready state.

The ready state is explicitly set to false in the view, which is the
source of most of the noise for that task.
2022-11-15 14:14:32 -08:00

569 lines
20 KiB
Python

""" access the activity streams stored in redis """
from datetime import timedelta
from django.dispatch import receiver
from django.db import transaction
from django.db.models import signals, Q
from django.utils import timezone
from bookwyrm import models
from bookwyrm.redis_store import RedisStore, r
from bookwyrm.tasks import app, LOW, MEDIUM, HIGH
class ActivityStream(RedisStore):
"""a category of activity stream (like home, local, books)"""
def stream_id(self, user):
"""the redis key for this user's instance of this stream"""
return f"{user.id}-{self.key}"
def unread_id(self, user):
"""the redis key for this user's unread count for this stream"""
stream_id = self.stream_id(user)
return f"{stream_id}-unread"
def unread_by_status_type_id(self, user):
"""the redis key for this user's unread count for this stream"""
stream_id = self.stream_id(user)
return f"{stream_id}-unread-by-type"
def get_rank(self, obj): # pylint: disable=no-self-use
"""statuses are sorted by date published"""
return obj.published_date.timestamp()
def add_status(self, status, increment_unread=False):
"""add a status to users' feeds"""
# the pipeline contains all the add-to-stream activities
pipeline = self.add_object_to_related_stores(status, execute=False)
if increment_unread:
for user in self.get_audience(status):
# add to the unread status count
pipeline.incr(self.unread_id(user))
# add to the unread status count for status type
pipeline.hincrby(
self.unread_by_status_type_id(user), get_status_type(status), 1
)
# and go!
pipeline.execute()
def add_user_statuses(self, viewer, user):
"""add a user's statuses to another user's feed"""
# only add the statuses that the viewer should be able to see (ie, not dms)
statuses = models.Status.privacy_filter(viewer).filter(user=user)
self.bulk_add_objects_to_store(statuses, self.stream_id(viewer))
def remove_user_statuses(self, viewer, user):
"""remove a user's status from another user's feed"""
# remove all so that followers only statuses are removed
statuses = user.status_set.all()
self.bulk_remove_objects_from_store(statuses, self.stream_id(viewer))
def get_activity_stream(self, user):
"""load the statuses to be displayed"""
# clear unreads for this feed
r.set(self.unread_id(user), 0)
r.delete(self.unread_by_status_type_id(user))
statuses = self.get_store(self.stream_id(user))
return (
models.Status.objects.select_subclasses()
.filter(id__in=statuses)
.select_related(
"user",
"reply_parent",
"comment__book",
"review__book",
"quotation__book",
)
.prefetch_related("mention_books", "mention_users")
.order_by("-published_date")
)
def get_unread_count(self, user):
"""get the unread status count for this user's feed"""
return int(r.get(self.unread_id(user)) or 0)
def get_unread_count_by_status_type(self, user):
"""get the unread status count for this user's feed's status types"""
status_types = r.hgetall(self.unread_by_status_type_id(user))
return {
str(key.decode("utf-8")): int(value) or 0
for key, value in status_types.items()
}
def populate_streams(self, user):
"""go from zero to a timeline"""
self.populate_store(self.stream_id(user))
def get_audience(self, status): # pylint: disable=no-self-use
"""given a status, what users should see it"""
# direct messages don't appeard in feeds, direct comments/reviews/etc do
if status.privacy == "direct" and status.status_type == "Note":
return []
# everybody who could plausibly see this status
audience = models.User.objects.filter(
is_active=True,
local=True, # we only create feeds for users of this instance
).exclude(
Q(id__in=status.user.blocks.all()) | Q(blocks=status.user) # not blocked
)
# only visible to the poster and mentioned users
if status.privacy == "direct":
audience = audience.filter(
Q(id=status.user.id) # if the user is the post's author
| Q(id__in=status.mention_users.all()) # if the user is mentioned
)
# don't show replies to statuses the user can't see
elif status.reply_parent and status.reply_parent.privacy == "followers":
audience = audience.filter(
Q(id=status.user.id) # if the user is the post's author
| Q(id=status.reply_parent.user.id) # if the user is the OG author
| (
Q(following=status.user) & Q(following=status.reply_parent.user)
) # if the user is following both authors
).distinct()
# only visible to the poster's followers and tagged users
elif status.privacy == "followers":
audience = audience.filter(
Q(id=status.user.id) # if the user is the post's author
| Q(following=status.user) # if the user is following the author
)
return audience.distinct()
def get_stores_for_object(self, obj):
return [self.stream_id(u) for u in self.get_audience(obj)]
def get_statuses_for_user(self, user): # pylint: disable=no-self-use
"""given a user, what statuses should they see on this stream"""
return models.Status.privacy_filter(
user,
privacy_levels=["public", "unlisted", "followers"],
)
def get_objects_for_store(self, store):
user = models.User.objects.get(id=store.split("-")[0])
return self.get_statuses_for_user(user)
class HomeStream(ActivityStream):
"""users you follow"""
key = "home"
def get_audience(self, status):
audience = super().get_audience(status)
if not audience:
return []
return audience.filter(
Q(id=status.user.id) # if the user is the post's author
| Q(following=status.user) # if the user is following the author
).distinct()
def get_statuses_for_user(self, user):
return models.Status.privacy_filter(
user,
privacy_levels=["public", "unlisted", "followers"],
).exclude(
~Q( # remove everything except
Q(user__followers=user) # user following
| Q(user=user) # is self
| Q(mention_users=user) # mentions user
),
)
class LocalStream(ActivityStream):
"""users you follow"""
key = "local"
def get_audience(self, status):
# this stream wants no part in non-public statuses
if status.privacy != "public" or not status.user.local:
return []
return super().get_audience(status)
def get_statuses_for_user(self, user):
# all public statuses by a local user
return models.Status.privacy_filter(
user,
privacy_levels=["public"],
).filter(user__local=True)
class BooksStream(ActivityStream):
"""books on your shelves"""
key = "books"
def get_audience(self, status):
"""anyone with the mentioned book on their shelves"""
# only show public statuses on the books feed,
# and only statuses that mention books
if status.privacy != "public" or not (
status.mention_books.exists() or hasattr(status, "book")
):
return []
work = (
status.book.parent_work
if hasattr(status, "book")
else status.mention_books.first().parent_work
)
audience = super().get_audience(status)
if not audience:
return []
return audience.filter(shelfbook__book__parent_work=work).distinct()
def get_statuses_for_user(self, user):
"""any public status that mentions the user's books"""
books = user.shelfbook_set.values_list(
"book__parent_work__id", flat=True
).distinct()
return (
models.Status.privacy_filter(
user,
privacy_levels=["public"],
)
.filter(
Q(comment__book__parent_work__id__in=books)
| Q(quotation__book__parent_work__id__in=books)
| Q(review__book__parent_work__id__in=books)
| Q(mention_books__parent_work__id__in=books)
)
.distinct()
)
def add_book_statuses(self, user, book):
"""add statuses about a book to a user's feed"""
work = book.parent_work
statuses = (
models.Status.privacy_filter(
user,
privacy_levels=["public"],
)
.filter(
Q(comment__book__parent_work=work)
| Q(quotation__book__parent_work=work)
| Q(review__book__parent_work=work)
| Q(mention_books__parent_work=work)
)
.distinct()
)
self.bulk_add_objects_to_store(statuses, self.stream_id(user))
def remove_book_statuses(self, user, book):
"""add statuses about a book to a user's feed"""
work = book.parent_work
statuses = (
models.Status.privacy_filter(
user,
privacy_levels=["public"],
)
.filter(
Q(comment__book__parent_work=work)
| Q(quotation__book__parent_work=work)
| Q(review__book__parent_work=work)
| Q(mention_books__parent_work=work)
)
.distinct()
)
self.bulk_remove_objects_from_store(statuses, self.stream_id(user))
# determine which streams are enabled in settings.py
streams = {
"home": HomeStream(),
"local": LocalStream(),
"books": BooksStream(),
}
@receiver(signals.post_save)
# pylint: disable=unused-argument
def add_status_on_create(sender, instance, created, *args, **kwargs):
"""add newly created statuses to activity feeds"""
# we're only interested in new statuses
if not issubclass(sender, models.Status):
return
if instance.deleted:
remove_status_task.delay(instance.id)
return
# To avoid creating a zillion unnecessary tasks caused by re-saving the model,
# check if it's actually ready to send before we go. We're trusting this was
# set correctly by the inbox or view
if not instance.ready:
return
# when creating new things, gotta wait on the transaction
transaction.on_commit(
lambda: add_status_on_create_command(sender, instance, created)
)
def add_status_on_create_command(sender, instance, created):
"""runs this code only after the database commit completes"""
priority = HIGH
# check if this is an old status, de-prioritize if so
# (this will happen if federation is very slow, or, more expectedly, on csv import)
if instance.published_date < timezone.now() - timedelta(
days=1
) or instance.created_date < instance.published_date - timedelta(days=1):
priority = LOW
add_status_task.apply_async(
args=(instance.id,),
kwargs={"increment_unread": created},
queue=priority,
)
if sender == models.Boost:
handle_boost_task.delay(instance.id)
@receiver(signals.post_delete, sender=models.Boost)
# pylint: disable=unused-argument
def remove_boost_on_delete(sender, instance, *args, **kwargs):
"""boosts are deleted"""
# remove the boost
remove_status_task.delay(instance.id)
# re-add the original status
add_status_task.delay(instance.boosted_status.id)
@receiver(signals.post_save, sender=models.UserFollows)
# pylint: disable=unused-argument
def add_statuses_on_follow(sender, instance, created, *args, **kwargs):
"""add a newly followed user's statuses to feeds"""
if not created or not instance.user_subject.local:
return
add_user_statuses_task.delay(
instance.user_subject.id, instance.user_object.id, stream_list=["home"]
)
@receiver(signals.post_delete, sender=models.UserFollows)
# pylint: disable=unused-argument
def remove_statuses_on_unfollow(sender, instance, *args, **kwargs):
"""remove statuses from a feed on unfollow"""
if not instance.user_subject.local:
return
remove_user_statuses_task.delay(
instance.user_subject.id, instance.user_object.id, stream_list=["home"]
)
@receiver(signals.post_save, sender=models.UserBlocks)
# pylint: disable=unused-argument
def remove_statuses_on_block(sender, instance, *args, **kwargs):
"""remove statuses from all feeds on block"""
# blocks apply ot all feeds
if instance.user_subject.local:
remove_user_statuses_task.delay(
instance.user_subject.id, instance.user_object.id
)
# and in both directions
if instance.user_object.local:
remove_user_statuses_task.delay(
instance.user_object.id, instance.user_subject.id
)
@receiver(signals.post_delete, sender=models.UserBlocks)
# pylint: disable=unused-argument
def add_statuses_on_unblock(sender, instance, *args, **kwargs):
"""add statuses back to all feeds on unblock"""
# make sure there isn't a block in the other direction
if models.UserBlocks.objects.filter(
user_subject=instance.user_object,
user_object=instance.user_subject,
).exists():
return
public_streams = [k for (k, v) in streams.items() if k != "home"]
# add statuses back to streams with statuses from anyone
if instance.user_subject.local:
add_user_statuses_task.delay(
instance.user_subject.id,
instance.user_object.id,
stream_list=public_streams,
)
# add statuses back to streams with statuses from anyone
if instance.user_object.local:
add_user_statuses_task.delay(
instance.user_object.id,
instance.user_subject.id,
stream_list=public_streams,
)
@receiver(signals.post_save, sender=models.User)
# pylint: disable=unused-argument
def populate_streams_on_account_create(sender, instance, created, *args, **kwargs):
"""build a user's feeds when they join"""
if not created or not instance.local:
return
transaction.on_commit(
lambda: populate_streams_on_account_create_command(instance.id)
)
def populate_streams_on_account_create_command(instance_id):
"""wait for the transaction to complete"""
for stream in streams:
populate_stream_task.delay(stream, instance_id)
@receiver(signals.pre_save, sender=models.ShelfBook)
# pylint: disable=unused-argument
def add_statuses_on_shelve(sender, instance, *args, **kwargs):
"""update books stream when user shelves a book"""
if not instance.user.local:
return
book = instance.book
# check if the book is already on the user's shelves
editions = book.parent_work.editions.all()
if models.ShelfBook.objects.filter(user=instance.user, book__in=editions).exists():
return
add_book_statuses_task.delay(instance.user.id, book.id)
@receiver(signals.post_delete, sender=models.ShelfBook)
# pylint: disable=unused-argument
def remove_statuses_on_unshelve(sender, instance, *args, **kwargs):
"""update books stream when user unshelves a book"""
if not instance.user.local:
return
book = instance.book
# check if the book is actually unshelved, not just moved
editions = book.parent_work.editions.all()
if models.ShelfBook.objects.filter(user=instance.user, book__in=editions).exists():
return
remove_book_statuses_task.delay(instance.user.id, book.id)
# ---- TASKS
@app.task(queue=LOW)
def add_book_statuses_task(user_id, book_id):
"""add statuses related to a book on shelve"""
user = models.User.objects.get(id=user_id)
book = models.Edition.objects.get(id=book_id)
BooksStream().add_book_statuses(user, book)
@app.task(queue=LOW)
def remove_book_statuses_task(user_id, book_id):
"""remove statuses about a book from a user's books feed"""
user = models.User.objects.get(id=user_id)
book = models.Edition.objects.get(id=book_id)
BooksStream().remove_book_statuses(user, book)
@app.task(queue=MEDIUM)
def populate_stream_task(stream, user_id):
"""background task for populating an empty activitystream"""
user = models.User.objects.get(id=user_id)
stream = streams[stream]
stream.populate_streams(user)
@app.task(queue=MEDIUM)
def remove_status_task(status_ids):
"""remove a status from any stream it might be in"""
# this can take an id or a list of ids
if not isinstance(status_ids, list):
status_ids = [status_ids]
statuses = models.Status.objects.filter(id__in=status_ids)
for stream in streams.values():
for status in statuses:
stream.remove_object_from_related_stores(status)
@app.task(queue=HIGH)
def add_status_task(status_id, increment_unread=False):
"""add a status to any stream it should be in"""
status = models.Status.objects.select_subclasses().get(id=status_id)
# we don't want to tick the unread count for csv import statuses, idk how better
# to check than just to see if the states is more than a few days old
if status.created_date < timezone.now() - timedelta(days=2):
increment_unread = False
for stream in streams.values():
stream.add_status(status, increment_unread=increment_unread)
@app.task(queue=MEDIUM)
def remove_user_statuses_task(viewer_id, user_id, stream_list=None):
"""remove all statuses by a user from a viewer's stream"""
stream_list = [streams[s] for s in stream_list] if stream_list else streams.values()
viewer = models.User.objects.get(id=viewer_id)
user = models.User.objects.get(id=user_id)
for stream in stream_list:
stream.remove_user_statuses(viewer, user)
@app.task(queue=MEDIUM)
def add_user_statuses_task(viewer_id, user_id, stream_list=None):
"""add all statuses by a user to a viewer's stream"""
stream_list = [streams[s] for s in stream_list] if stream_list else streams.values()
viewer = models.User.objects.get(id=viewer_id)
user = models.User.objects.get(id=user_id)
for stream in stream_list:
stream.add_user_statuses(viewer, user)
@app.task(queue=MEDIUM)
def handle_boost_task(boost_id):
"""remove the original post and other, earlier boosts"""
instance = models.Status.objects.get(id=boost_id)
boosted = instance.boost.boosted_status
# previous boosts of this status
old_versions = models.Boost.objects.filter(
boosted_status__id=boosted.id,
created_date__lt=instance.created_date,
)
for stream in streams.values():
# people who should see the boost (not people who see the original status)
audience = stream.get_stores_for_object(instance)
stream.remove_object_from_related_stores(boosted, stores=audience)
for status in old_versions:
stream.remove_object_from_related_stores(status, stores=audience)
def get_status_type(status):
"""return status type even for boosted statuses"""
status_type = status.status_type.lower()
# Check if current status is a boost
if hasattr(status, "boost"):
# Act in accordance of your findings
if hasattr(status.boost.boosted_status, "review"):
status_type = "review"
if hasattr(status.boost.boosted_status, "comment"):
status_type = "comment"
if hasattr(status.boost.boosted_status, "quotation"):
status_type = "quotation"
return status_type