Use a separate queue for broadcasts

I think this will go a long way to solve the federation delay problems
we're seeing on b.s. I'm not sure at what point adding more queues will
create more problems than it solves, but I do think in this case the
queues are out of balance and moving broadcasts (which are the most
common type of `medium_priority` task at the moment) to their own queue
will be an improvement.
This commit is contained in:
Mouse Reeve 2023-02-20 12:58:41 -08:00
parent db207065ce
commit b167364c5c
6 changed files with 25 additions and 16 deletions

View file

@ -21,7 +21,7 @@ from django.utils.http import http_date
from bookwyrm import activitypub from bookwyrm import activitypub
from bookwyrm.settings import USER_AGENT, PAGE_LENGTH from bookwyrm.settings import USER_AGENT, PAGE_LENGTH
from bookwyrm.signatures import make_signature, make_digest from bookwyrm.signatures import make_signature, make_digest
from bookwyrm.tasks import app, MEDIUM from bookwyrm.tasks import app, MEDIUM, BROADCAST
from bookwyrm.models.fields import ImageField, ManyToManyField from bookwyrm.models.fields import ImageField, ManyToManyField
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -126,7 +126,7 @@ class ActivitypubMixin:
# there OUGHT to be only one match # there OUGHT to be only one match
return match.first() return match.first()
def broadcast(self, activity, sender, software=None, queue=MEDIUM): def broadcast(self, activity, sender, software=None, queue=BROADCAST):
"""send out an activity""" """send out an activity"""
broadcast_task.apply_async( broadcast_task.apply_async(
args=( args=(
@ -198,7 +198,7 @@ class ActivitypubMixin:
class ObjectMixin(ActivitypubMixin): class ObjectMixin(ActivitypubMixin):
"""add this mixin for object models that are AP serializable""" """add this mixin for object models that are AP serializable"""
def save(self, *args, created=None, software=None, priority=MEDIUM, **kwargs): def save(self, *args, created=None, software=None, priority=BROADCAST, **kwargs):
"""broadcast created/updated/deleted objects as appropriate""" """broadcast created/updated/deleted objects as appropriate"""
broadcast = kwargs.get("broadcast", True) broadcast = kwargs.get("broadcast", True)
# this bonus kwarg would cause an error in the base save method # this bonus kwarg would cause an error in the base save method
@ -506,7 +506,7 @@ def unfurl_related_field(related_field, sort_field=None):
return related_field.remote_id return related_field.remote_id
@app.task(queue=MEDIUM) @app.task(queue=BROADCAST)
def broadcast_task(sender_id: int, activity: str, recipients: List[str]): def broadcast_task(sender_id: int, activity: str, recipients: List[str]):
"""the celery task for broadcast""" """the celery task for broadcast"""
user_model = apps.get_model("bookwyrm.User", require_ready=True) user_model = apps.get_model("bookwyrm.User", require_ready=True)

View file

@ -16,3 +16,5 @@ MEDIUM = "medium_priority"
HIGH = "high_priority" HIGH = "high_priority"
# import items get their own queue because they're such a pain in the ass # import items get their own queue because they're such a pain in the ass
IMPORTS = "imports" IMPORTS = "imports"
# I keep making more queues?? this one broadcasting out
BROADCAST = "broadcast"

View file

@ -20,31 +20,37 @@
{% if queues %} {% if queues %}
<section class="block content"> <section class="block content">
<h2>{% trans "Queues" %}</h2> <h2>{% trans "Queues" %}</h2>
<div class="columns has-text-centered"> <div class="columns has-text-centered is-multiline">
<div class="column is-3"> <div class="column is-4">
<div class="notification"> <div class="notification">
<p class="header">{% trans "Low priority" %}</p> <p class="header">{% trans "Low priority" %}</p>
<p class="title is-5">{{ queues.low_priority|intcomma }}</p> <p class="title is-5">{{ queues.low_priority|intcomma }}</p>
</div> </div>
</div> </div>
<div class="column is-3"> <div class="column is-4">
<div class="notification"> <div class="notification">
<p class="header">{% trans "Medium priority" %}</p> <p class="header">{% trans "Medium priority" %}</p>
<p class="title is-5">{{ queues.medium_priority|intcomma }}</p> <p class="title is-5">{{ queues.medium_priority|intcomma }}</p>
</div> </div>
</div> </div>
<div class="column is-3"> <div class="column is-4">
<div class="notification"> <div class="notification">
<p class="header">{% trans "High priority" %}</p> <p class="header">{% trans "High priority" %}</p>
<p class="title is-5">{{ queues.high_priority|intcomma }}</p> <p class="title is-5">{{ queues.high_priority|intcomma }}</p>
</div> </div>
</div> </div>
<div class="column is-3"> <div class="column is-6">
<div class="notification"> <div class="notification">
<p class="header">{% trans "Imports" %}</p> <p class="header">{% trans "Imports" %}</p>
<p class="title is-5">{{ queues.imports|intcomma }}</p> <p class="title is-5">{{ queues.imports|intcomma }}</p>
</div> </div>
</div> </div>
<div class="column is-6">
<div class="notification">
<p class="header">{% trans "Broadcasts" %}</p>
<p class="title is-5">{{ queues.broadcast|intcomma }}</p>
</div>
</div>
</div> </div>
</section> </section>
{% else %} {% else %}

View file

@ -8,7 +8,7 @@ from django.views.decorators.http import require_GET
import redis import redis
from celerywyrm import settings from celerywyrm import settings
from bookwyrm.tasks import app as celery from bookwyrm.tasks import app as celery, LOW, MEDIUM, HIGH, IMPORTS, BROADCAST
r = redis.from_url(settings.REDIS_BROKER_URL) r = redis.from_url(settings.REDIS_BROKER_URL)
@ -35,10 +35,11 @@ class CeleryStatus(View):
try: try:
queues = { queues = {
"low_priority": r.llen("low_priority"), LOW: r.llen(LOW),
"medium_priority": r.llen("medium_priority"), MEDIUM: r.llen(MEDIUM),
"high_priority": r.llen("high_priority"), HIGH: r.llen(HIGH),
"imports": r.llen("imports"), IMPORTS: r.llen(IMPORTS),
BROADCAST: r.llen(BROADCAST),
} }
# pylint: disable=broad-except # pylint: disable=broad-except
except Exception as err: except Exception as err:

View file

@ -6,7 +6,7 @@ After=network.target postgresql.service redis.service
User=bookwyrm User=bookwyrm
Group=bookwyrm Group=bookwyrm
WorkingDirectory=/opt/bookwyrm/ WorkingDirectory=/opt/bookwyrm/
ExecStart=/opt/bookwyrm/venv/bin/celery -A celerywyrm worker -l info -Q high_priority,medium_priority,low_priority,import ExecStart=/opt/bookwyrm/venv/bin/celery -A celerywyrm worker -l info -Q high_priority,medium_priority,low_priority,import,broadcast
StandardOutput=journal StandardOutput=journal
StandardError=inherit StandardError=inherit

View file

@ -62,7 +62,7 @@ services:
build: . build: .
networks: networks:
- main - main
command: celery -A celerywyrm worker -l info -Q high_priority,medium_priority,low_priority,imports command: celery -A celerywyrm worker -l info -Q high_priority,medium_priority,low_priority,imports,broadcast
volumes: volumes:
- .:/app - .:/app
- static_volume:/app/static - static_volume:/app/static