Use sorted set for activitystreams

This commit is contained in:
Mouse Reeve 2021-04-02 10:44:30 -07:00
parent 19a13bdb56
commit 1d525d49cb

View file

@ -23,15 +23,23 @@ class ActivityStream(ABC):
""" the redis key for this user's unread count for this stream """ """ the redis key for this user's unread count for this stream """
return "{}-unread".format(self.stream_id(user)) return "{}-unread".format(self.stream_id(user))
def get_value(self, status): # pylint: disable=no-self-use
""" the status id and the rank (ie, published date) """
return {status.id: status.published_date.timestamp()}
def add_status(self, status): def add_status(self, status):
""" add a status to users' feeds """ """ add a status to users' feeds """
value = self.get_value(status)
# we want to do this as a bulk operation, hence "pipeline" # we want to do this as a bulk operation, hence "pipeline"
pipeline = r.pipeline() pipeline = r.pipeline()
for user in self.stream_users(status): for user in self.stream_users(status):
# add the status to the feed # add the status to the feed
pipeline.lpush(self.stream_id(user), status.id) pipeline.zadd(self.stream_id(user), value)
pipeline.ltrim(self.stream_id(user), 0, settings.MAX_STREAM_LENGTH) pipeline.zremrangebyrank(
self.stream_id(user),
settings.MAX_STREAM_LENGTH,
-1
)
# add to the unread status count # add to the unread status count
pipeline.incr(self.unread_id(user)) pipeline.incr(self.unread_id(user))
# and go! # and go!
@ -48,7 +56,12 @@ class ActivityStream(ABC):
""" add a user's statuses to another user's feed """ """ add a user's statuses to another user's feed """
pipeline = r.pipeline() pipeline = r.pipeline()
for status in user.status_set.all()[: settings.MAX_STREAM_LENGTH]: for status in user.status_set.all()[: settings.MAX_STREAM_LENGTH]:
pipeline.lpush(self.stream_id(viewer), status.id) pipeline.zadd(self.stream_id(viewer), self.get_value(status))
pipeline.zremrangebyrank(
self.stream_id(user),
settings.MAX_STREAM_LENGTH,
-1
)
pipeline.execute() pipeline.execute()
def remove_user_statuses(self, viewer, user): def remove_user_statuses(self, viewer, user):
@ -63,7 +76,7 @@ class ActivityStream(ABC):
# clear unreads for this feed # clear unreads for this feed
r.set(self.unread_id(user), 0) r.set(self.unread_id(user), 0)
statuses = r.lrange(self.stream_id(user), 0, -1) statuses = r.zrevrange(self.stream_id(user), 0, -1)
return ( return (
models.Status.objects.select_subclasses() models.Status.objects.select_subclasses()
.filter(id__in=statuses) .filter(id__in=statuses)
@ -81,7 +94,13 @@ class ActivityStream(ABC):
stream_id = self.stream_id(user) stream_id = self.stream_id(user)
for status in statuses.all()[: settings.MAX_STREAM_LENGTH]: for status in statuses.all()[: settings.MAX_STREAM_LENGTH]:
pipeline.lpush(stream_id, status.id) pipeline.zadd(stream_id, self.get_value(status))
pipeline.zremrangebyrank(
stream_id,
settings.MAX_STREAM_LENGTH,
-1
)
pipeline.execute() pipeline.execute()
def stream_users(self, status): # pylint: disable=no-self-use def stream_users(self, status): # pylint: disable=no-self-use