From 1d525d49cb5a4cb3b258e7fea7c1e97e47fff46f Mon Sep 17 00:00:00 2001 From: Mouse Reeve Date: Fri, 2 Apr 2021 10:44:30 -0700 Subject: [PATCH] Use sorted set for activitystreams --- bookwyrm/activitystreams.py | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/bookwyrm/activitystreams.py b/bookwyrm/activitystreams.py index d811aa491..a7fa1637c 100644 --- a/bookwyrm/activitystreams.py +++ b/bookwyrm/activitystreams.py @@ -23,15 +23,23 @@ class ActivityStream(ABC): """ the redis key for this user's unread count for this stream """ return "{}-unread".format(self.stream_id(user)) + def get_value(self, status): # pylint: disable=no-self-use + """ the status id and the rank (ie, published date) """ + return {status.id: status.published_date.timestamp()} + def add_status(self, status): """ add a status to users' feeds """ + value = self.get_value(status) # we want to do this as a bulk operation, hence "pipeline" pipeline = r.pipeline() for user in self.stream_users(status): # add the status to the feed - pipeline.lpush(self.stream_id(user), status.id) - pipeline.ltrim(self.stream_id(user), 0, settings.MAX_STREAM_LENGTH) - + pipeline.zadd(self.stream_id(user), value) + pipeline.zremrangebyrank( + self.stream_id(user), + settings.MAX_STREAM_LENGTH, + -1 + ) # add to the unread status count pipeline.incr(self.unread_id(user)) # and go! @@ -48,7 +56,12 @@ class ActivityStream(ABC): """ add a user's statuses to another user's feed """ pipeline = r.pipeline() for status in user.status_set.all()[: settings.MAX_STREAM_LENGTH]: - pipeline.lpush(self.stream_id(viewer), status.id) + pipeline.zadd(self.stream_id(viewer), self.get_value(status)) + pipeline.zremrangebyrank( + self.stream_id(user), + settings.MAX_STREAM_LENGTH, + -1 + ) pipeline.execute() def remove_user_statuses(self, viewer, user): @@ -63,7 +76,7 @@ class ActivityStream(ABC): # clear unreads for this feed r.set(self.unread_id(user), 0) - statuses = r.lrange(self.stream_id(user), 0, -1) + statuses = r.zrevrange(self.stream_id(user), 0, -1) return ( models.Status.objects.select_subclasses() .filter(id__in=statuses) @@ -81,7 +94,13 @@ class ActivityStream(ABC): stream_id = self.stream_id(user) for status in statuses.all()[: settings.MAX_STREAM_LENGTH]: - pipeline.lpush(stream_id, status.id) + pipeline.zadd(stream_id, self.get_value(status)) + + pipeline.zremrangebyrank( + stream_id, + settings.MAX_STREAM_LENGTH, + -1 + ) pipeline.execute() def stream_users(self, status): # pylint: disable=no-self-use