mirror of
https://github.com/LemmyNet/activitypub-federation-rust.git
synced 2024-06-08 08:19:28 +00:00
Expose the worker & tweak the ActivityQueue
trait
This commit is contained in:
parent
57d5b86370
commit
5bd0bf0a2a
|
@ -18,7 +18,12 @@
|
|||
use crate::{
|
||||
error::Error,
|
||||
protocol::verification::verify_domains_match,
|
||||
queue::{simple_queue::SimpleQueue, ActivityQueue, SendActivityTask},
|
||||
queue::{
|
||||
standard_retry_queue::StandardRetryQueue,
|
||||
worker::Worker,
|
||||
ActivityMessage,
|
||||
ActivityQueue,
|
||||
},
|
||||
traits::{ActivityHandler, Actor},
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
|
@ -96,7 +101,7 @@ pub struct FederationConfig<T: Clone> {
|
|||
/// Queue for sending outgoing activities. Only optional to make builder work, its always
|
||||
/// present once constructed.
|
||||
#[builder(setter(custom))]
|
||||
pub(crate) activity_queue: Sender<SendActivityTask>,
|
||||
pub(crate) activity_queue: Option<Sender<ActivityMessage>>,
|
||||
}
|
||||
|
||||
impl<T: Clone> FederationConfig<T> {
|
||||
|
@ -205,17 +210,7 @@ impl<T: Clone> FederationConfigBuilder<T> {
|
|||
&mut self,
|
||||
queue: Q,
|
||||
) -> &mut Self {
|
||||
let (sender, mut receiver) = channel(8192);
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Some(message) = receiver.recv().await {
|
||||
if let Err(err) = queue.queue(message).await {
|
||||
error!("{err:?}");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
self.activity_queue = Some(sender);
|
||||
self.activity_queue = Some(Some(make_sender(queue)));
|
||||
self
|
||||
}
|
||||
|
||||
|
@ -225,25 +220,38 @@ impl<T: Clone> FederationConfigBuilder<T> {
|
|||
/// queue for outgoing activities, which is stored internally in the config struct.
|
||||
/// Requires a tokio runtime for the background queue.
|
||||
pub async fn build(&mut self) -> Result<FederationConfig<T>, FederationConfigBuilderError> {
|
||||
if self.activity_queue.is_none() {
|
||||
let queue = SimpleQueue::new(
|
||||
self.client
|
||||
.clone()
|
||||
.unwrap_or_else(|| reqwest::Client::default().into()),
|
||||
self.worker_count.unwrap_or_default(),
|
||||
self.retry_count.unwrap_or_default(),
|
||||
self.request_timeout.unwrap_or(Duration::from_secs(10)),
|
||||
60,
|
||||
self.http_signature_compat.unwrap_or_default(),
|
||||
);
|
||||
let mut config = self.partial_build()?;
|
||||
|
||||
self.activity_queue(queue).await;
|
||||
if config.activity_queue.is_none() {
|
||||
if config.debug {
|
||||
// If we're in debug we use a single worker to send things
|
||||
let queue = Worker::from_config(&config);
|
||||
config.activity_queue = Some(make_sender(queue));
|
||||
} else {
|
||||
// Otherwise we use the standard retry queue
|
||||
let queue = StandardRetryQueue::from_config(&config);
|
||||
config.activity_queue = Some(make_sender(queue));
|
||||
}
|
||||
}
|
||||
|
||||
self.partial_build()
|
||||
Ok(config)
|
||||
}
|
||||
}
|
||||
|
||||
fn make_sender<Q: ActivityQueue + Sync + Send + 'static>(queue: Q) -> Sender<ActivityMessage> {
|
||||
let (sender, mut receiver) = channel(8192);
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Some(message) = receiver.recv().await {
|
||||
if let Err(err) = queue.queue(message).await {
|
||||
error!("{err:?}");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
sender
|
||||
}
|
||||
|
||||
// impl<T: Clone> FederationConfig<T> {
|
||||
// /// Shut down this federation, waiting for the outgoing queue to be sent.
|
||||
// /// If the activityqueue is still in use in other requests or was never constructed, returns an error.
|
||||
|
|
|
@ -3,9 +3,9 @@
|
|||
#![doc = include_str!("../../docs/09_sending_activities.md")]
|
||||
|
||||
pub(crate) mod request;
|
||||
pub mod simple_queue;
|
||||
mod util;
|
||||
mod worker;
|
||||
pub mod standard_retry_queue;
|
||||
pub mod util;
|
||||
pub mod worker;
|
||||
|
||||
use crate::{
|
||||
config::Data,
|
||||
|
@ -36,11 +36,8 @@ pub trait ActivityQueue {
|
|||
/// The errors that can be returned when queuing
|
||||
type Error: Debug;
|
||||
|
||||
/// Retrieve the queue stats
|
||||
fn stats(&self) -> &Stats;
|
||||
|
||||
/// Queues one activity task to a specific inbox
|
||||
async fn queue(&self, message: SendActivityTask) -> Result<(), Self::Error>;
|
||||
async fn queue(&self, message: ActivityMessage) -> Result<(), Self::Error>;
|
||||
}
|
||||
|
||||
/// Sends an activity with an outbound activity queue
|
||||
|
@ -57,7 +54,11 @@ where
|
|||
ActorType: Actor,
|
||||
{
|
||||
let config = &data.config;
|
||||
let queue = data.config.activity_queue.clone();
|
||||
let queue = data
|
||||
.config
|
||||
.activity_queue
|
||||
.clone()
|
||||
.expect("Activity Queue is always configured");
|
||||
let actor_id = activity.actor();
|
||||
let activity_id = activity.id();
|
||||
let activity_serialized: Bytes = serde_json::to_vec(&activity)?.into();
|
||||
|
@ -85,7 +86,7 @@ where
|
|||
continue;
|
||||
}
|
||||
|
||||
let message = SendActivityTask {
|
||||
let message = ActivityMessage {
|
||||
id: Uuid::new_v4(),
|
||||
actor_id: actor_id.clone(),
|
||||
activity_id: activity_id.clone(),
|
||||
|
@ -129,7 +130,7 @@ where
|
|||
#[derive(Clone, Debug)]
|
||||
/// The struct sent to a worker for processing
|
||||
/// When `send_activity` is used, it is split up to tasks per-inbox
|
||||
pub struct SendActivityTask {
|
||||
pub struct ActivityMessage {
|
||||
/// The ID of the activity task
|
||||
pub id: Uuid,
|
||||
/// The actor ID
|
||||
|
@ -160,7 +161,7 @@ impl Debug for Stats {
|
|||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"Activity queue stats: pending: {}, running: {}, retries: {}, dead: {}, complete: {}",
|
||||
"Activity queue stats: pending: {}, running: {}, retries: {}, dead (last hr): {}, complete (last hr): {}",
|
||||
self.pending.load(Ordering::Relaxed),
|
||||
self.running.load(Ordering::Relaxed),
|
||||
self.retries.load(Ordering::Relaxed),
|
||||
|
@ -180,7 +181,10 @@ mod tests {
|
|||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use crate::{http_signatures::generate_actor_keypair, queue::simple_queue::SimpleQueue};
|
||||
use crate::{
|
||||
http_signatures::generate_actor_keypair,
|
||||
queue::standard_retry_queue::StandardRetryQueue,
|
||||
};
|
||||
use tracing::{debug, info};
|
||||
|
||||
use super::*;
|
||||
|
@ -237,7 +241,7 @@ mod tests {
|
|||
|
||||
*/
|
||||
|
||||
let queue = SimpleQueue::new(
|
||||
let queue = StandardRetryQueue::new(
|
||||
reqwest::Client::default().into(),
|
||||
num_workers,
|
||||
num_workers,
|
||||
|
@ -248,7 +252,7 @@ mod tests {
|
|||
|
||||
let keypair = generate_actor_keypair().unwrap();
|
||||
|
||||
let message = SendActivityTask {
|
||||
let message = ActivityMessage {
|
||||
id: Uuid::new_v4(),
|
||||
actor_id: "http://localhost:8001".parse().unwrap(),
|
||||
activity_id: "http://localhost:8001/activity".parse().unwrap(),
|
||||
|
|
|
@ -16,10 +16,12 @@ use crate::{
|
|||
use anyhow::{anyhow, Context};
|
||||
use tracing::debug;
|
||||
|
||||
use super::{util::RetryStrategy, SendActivityTask};
|
||||
use super::{util::RetryStrategy, ActivityMessage};
|
||||
|
||||
pub(super) async fn sign_and_send(
|
||||
task: &SendActivityTask,
|
||||
/// Sign and send a message with an optional retry
|
||||
/// The retry itself doesn't re-sign the request, so the retry times should be < 5 min
|
||||
pub async fn sign_and_send(
|
||||
message: &ActivityMessage,
|
||||
client: &ClientWithMiddleware,
|
||||
timeout: Duration,
|
||||
retry_strategy: RetryStrategy,
|
||||
|
@ -27,19 +29,19 @@ pub(super) async fn sign_and_send(
|
|||
) -> Result<(), anyhow::Error> {
|
||||
debug!(
|
||||
"Sending {} to {}, contents:\n {}",
|
||||
task.activity_id,
|
||||
task.inbox,
|
||||
serde_json::from_slice::<serde_json::Value>(&task.activity)?
|
||||
message.activity_id,
|
||||
message.inbox,
|
||||
serde_json::from_slice::<serde_json::Value>(&message.activity)?
|
||||
);
|
||||
let request_builder = client
|
||||
.post(task.inbox.to_string())
|
||||
.post(message.inbox.to_string())
|
||||
.timeout(timeout)
|
||||
.headers(generate_request_headers(&task.inbox));
|
||||
.headers(generate_request_headers(&message.inbox));
|
||||
let request = sign_request(
|
||||
request_builder,
|
||||
&task.actor_id,
|
||||
task.activity.clone(),
|
||||
task.private_key.clone(),
|
||||
&message.actor_id,
|
||||
message.activity.clone(),
|
||||
message.private_key.clone(),
|
||||
http_signature_compat,
|
||||
)
|
||||
.await
|
||||
|
@ -48,7 +50,7 @@ pub(super) async fn sign_and_send(
|
|||
retry(
|
||||
|| {
|
||||
send(
|
||||
task,
|
||||
message,
|
||||
client,
|
||||
request
|
||||
.try_clone()
|
||||
|
@ -61,7 +63,7 @@ pub(super) async fn sign_and_send(
|
|||
}
|
||||
|
||||
pub(super) async fn send(
|
||||
task: &SendActivityTask,
|
||||
task: &ActivityMessage,
|
||||
client: &ClientWithMiddleware,
|
||||
request: Request,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
|
|
|
@ -11,45 +11,56 @@ use tokio::{
|
|||
sync::mpsc::{unbounded_channel, UnboundedSender},
|
||||
task::{JoinHandle, JoinSet},
|
||||
};
|
||||
use tracing::{info, warn};
|
||||
|
||||
use crate::config::FederationConfig;
|
||||
|
||||
use super::{
|
||||
util::RetryStrategy,
|
||||
worker::{activity_worker, retry_worker},
|
||||
util::{retry, RetryStrategy},
|
||||
worker::Worker,
|
||||
ActivityMessage,
|
||||
ActivityQueue,
|
||||
SendActivityTask,
|
||||
Stats,
|
||||
};
|
||||
|
||||
/// A simple activity queue which spawns tokio workers to send out requests
|
||||
/// When creating a queue, it will spawn a task per worker thread
|
||||
/// Uses an unbounded mpsc queue for communication (i.e, all messages are in memory)
|
||||
pub struct SimpleQueue {
|
||||
/// A tokio spawned worker queue which is responsible for submitting requests to federated servers
|
||||
/// This will retry up to one time with the same signature, and if it fails, will move it to the retry queue.
|
||||
/// We need to retry activity sending in case the target instances is temporarily unreachable.
|
||||
/// In this case, the task is stored and resent when the instance is hopefully back up. This
|
||||
/// list shows the retry intervals, and which events of the target instance can be covered:
|
||||
/// - 60s (one minute, service restart) -- happens in the worker w/ same signature
|
||||
/// - 60min (one hour, instance maintenance) --- happens in the retry queue
|
||||
/// - 60h (2.5 days, major incident with rebuild from backup) --- happens in the retry queue
|
||||
pub struct StandardRetryQueue {
|
||||
// Stats shared between the queue and workers
|
||||
stats: Arc<Stats>,
|
||||
sender: UnboundedSender<SendActivityTask>,
|
||||
worker_count: usize,
|
||||
sender: UnboundedSender<ActivityMessage>,
|
||||
sender_task: JoinHandle<()>,
|
||||
retry_sender_task: JoinHandle<()>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ActivityQueue for SimpleQueue {
|
||||
impl ActivityQueue for StandardRetryQueue {
|
||||
type Error = anyhow::Error;
|
||||
fn stats(&self) -> &Stats {
|
||||
&self.stats
|
||||
}
|
||||
|
||||
async fn queue(&self, message: SendActivityTask) -> Result<(), Self::Error> {
|
||||
async fn queue(&self, message: ActivityMessage) -> Result<(), Self::Error> {
|
||||
self.stats.pending.fetch_add(1, Ordering::Relaxed);
|
||||
self.sender.send(message)?;
|
||||
|
||||
let running = self.stats.running.load(Ordering::Relaxed);
|
||||
if running == self.worker_count && self.worker_count != 0 {
|
||||
warn!("Reached max number of send activity workers ({}). Consider increasing worker count to avoid federation delays", self.worker_count);
|
||||
warn!("{:?}", self.stats);
|
||||
} else {
|
||||
info!("{:?}", self.stats);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl SimpleQueue {
|
||||
/// Construct a queue from federation config
|
||||
impl StandardRetryQueue {
|
||||
/// Builds a simple queue from a federation config
|
||||
pub fn from_config<T: Clone>(config: &FederationConfig<T>) -> Self {
|
||||
Self::new(
|
||||
config.client.clone(),
|
||||
|
@ -85,7 +96,6 @@ impl SimpleQueue {
|
|||
|
||||
let (retry_sender, mut retry_receiver) = unbounded_channel();
|
||||
let retry_stats = stats.clone();
|
||||
let retry_client = client.clone();
|
||||
|
||||
// The "fast path" retry
|
||||
// The backoff should be < 5 mins for this to work otherwise signatures may expire
|
||||
|
@ -97,6 +107,13 @@ impl SimpleQueue {
|
|||
initial_sleep: 0,
|
||||
};
|
||||
|
||||
let worker = Arc::new(Worker {
|
||||
client: client.clone(),
|
||||
request_timeout: timeout,
|
||||
strategy,
|
||||
http_signature_compat,
|
||||
});
|
||||
|
||||
// The "retry path" strategy
|
||||
// After the fast path fails, a task will sleep up to backoff ^ 2 and then retry again
|
||||
let retry_strategy = RetryStrategy {
|
||||
|
@ -106,17 +123,23 @@ impl SimpleQueue {
|
|||
initial_sleep: backoff.pow(2), // wait 60 mins before even trying
|
||||
};
|
||||
|
||||
let retry_worker = Arc::new(Worker {
|
||||
client: client.clone(),
|
||||
request_timeout: timeout,
|
||||
// Internally we need to re-sign the message each attempt so we remove this strategy
|
||||
strategy: RetryStrategy::default(),
|
||||
http_signature_compat,
|
||||
});
|
||||
|
||||
let retry_sender_task = tokio::spawn(async move {
|
||||
let mut join_set = JoinSet::new();
|
||||
|
||||
while let Some(message) = retry_receiver.recv().await {
|
||||
let retry_task = retry_worker(
|
||||
retry_client.clone(),
|
||||
timeout,
|
||||
message,
|
||||
let retry_task = retry_task(
|
||||
retry_stats.clone(),
|
||||
retry_worker.clone(),
|
||||
message,
|
||||
retry_strategy,
|
||||
http_signature_compat,
|
||||
);
|
||||
|
||||
if retry_count > 0 {
|
||||
|
@ -145,14 +168,11 @@ impl SimpleQueue {
|
|||
let mut join_set = JoinSet::new();
|
||||
|
||||
while let Some(message) = receiver.recv().await {
|
||||
let task = activity_worker(
|
||||
client.clone(),
|
||||
timeout,
|
||||
let task = main_task(
|
||||
sender_stats.clone(),
|
||||
worker.clone(),
|
||||
message,
|
||||
retry_sender.clone(),
|
||||
sender_stats.clone(),
|
||||
strategy,
|
||||
http_signature_compat,
|
||||
);
|
||||
|
||||
if worker_count > 0 {
|
||||
|
@ -180,15 +200,12 @@ impl SimpleQueue {
|
|||
sender,
|
||||
sender_task,
|
||||
retry_sender_task,
|
||||
worker_count,
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
// Drops all the senders and shuts down the workers
|
||||
pub(crate) async fn shutdown(
|
||||
self,
|
||||
wait_for_retries: bool,
|
||||
) -> Result<Arc<Stats>, anyhow::Error> {
|
||||
/// Drops all the senders and shuts down the workers
|
||||
pub async fn shutdown(self, wait_for_retries: bool) -> Result<Arc<Stats>, anyhow::Error> {
|
||||
drop(self.sender);
|
||||
|
||||
self.sender_task.await?;
|
||||
|
@ -200,3 +217,54 @@ impl SimpleQueue {
|
|||
Ok(self.stats)
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn main_task(
|
||||
stats: Arc<Stats>,
|
||||
worker: Arc<Worker>,
|
||||
message: ActivityMessage,
|
||||
retry_queue: UnboundedSender<ActivityMessage>,
|
||||
) {
|
||||
stats.pending.fetch_sub(1, Ordering::Relaxed);
|
||||
stats.running.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let outcome = worker.queue(message.clone()).await;
|
||||
|
||||
// "Running" has finished, check the outcome
|
||||
stats.running.fetch_sub(1, Ordering::Relaxed);
|
||||
|
||||
match outcome {
|
||||
Ok(_) => {
|
||||
stats.completed_last_hour.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
Err(_err) => {
|
||||
stats.retries.fetch_add(1, Ordering::Relaxed);
|
||||
warn!(
|
||||
"Sending activity {} to {} to the retry queue to be tried again later",
|
||||
message.activity_id, message.inbox
|
||||
);
|
||||
// Send to the retry queue. Ignoring whether it succeeds or not
|
||||
retry_queue.send(message).ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn retry_task(
|
||||
stats: Arc<Stats>,
|
||||
worker: Arc<Worker>,
|
||||
message: ActivityMessage,
|
||||
strategy: RetryStrategy,
|
||||
) {
|
||||
// Because the times are pretty extravagant between retries, we have to re-sign each time
|
||||
let outcome = retry(|| worker.queue(message.clone()), strategy).await;
|
||||
|
||||
stats.retries.fetch_sub(1, Ordering::Relaxed);
|
||||
|
||||
match outcome {
|
||||
Ok(_) => {
|
||||
stats.completed_last_hour.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
Err(_err) => {
|
||||
stats.dead_last_hour.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,3 +1,4 @@
|
|||
//! Utilities for working with futures
|
||||
use futures_core::Future;
|
||||
use std::{
|
||||
fmt::{Debug, Display},
|
||||
|
@ -6,8 +7,9 @@ use std::{
|
|||
use tracing::warn;
|
||||
|
||||
#[derive(Clone, Copy, Default)]
|
||||
pub(crate) struct RetryStrategy {
|
||||
/// Amount of time in seconds to back off
|
||||
/// A strategy for retrying a future
|
||||
pub struct RetryStrategy {
|
||||
/// Amount of time in seconds to back off, exponential based upon the amount of retries i.e, sleep is backoff ^ retries
|
||||
pub backoff: usize,
|
||||
/// Amount of times to retry
|
||||
pub retries: usize,
|
||||
|
@ -18,12 +20,7 @@ pub(crate) struct RetryStrategy {
|
|||
}
|
||||
|
||||
/// Retries a future action factory function up to `amount` times with an exponential backoff timer between tries
|
||||
pub(crate) async fn retry<
|
||||
T,
|
||||
E: Display + Debug,
|
||||
F: Future<Output = Result<T, E>>,
|
||||
A: FnMut() -> F,
|
||||
>(
|
||||
pub async fn retry<T, E: Display + Debug, F: Future<Output = Result<T, E>>, A: FnMut() -> F>(
|
||||
mut action: A,
|
||||
strategy: RetryStrategy,
|
||||
) -> Result<T, E> {
|
||||
|
|
|
@ -1,96 +1,55 @@
|
|||
//! Worker for sending activity messages
|
||||
use anyhow::Error;
|
||||
use async_trait::async_trait;
|
||||
use derive_builder::Builder;
|
||||
use reqwest_middleware::ClientWithMiddleware;
|
||||
|
||||
use std::{
|
||||
sync::{atomic::Ordering, Arc},
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use tracing::warn;
|
||||
use std::time::Duration;
|
||||
|
||||
use super::{
|
||||
request::sign_and_send,
|
||||
util::{retry, RetryStrategy},
|
||||
SendActivityTask,
|
||||
Stats,
|
||||
};
|
||||
use crate::config::FederationConfig;
|
||||
|
||||
/// A tokio spawned worker which is responsible for submitting requests to federated servers
|
||||
/// This will retry up to one time with the same signature, and if it fails, will move it to the retry queue.
|
||||
/// We need to retry activity sending in case the target instances is temporarily unreachable.
|
||||
/// In this case, the task is stored and resent when the instance is hopefully back up. This
|
||||
/// list shows the retry intervals, and which events of the target instance can be covered:
|
||||
/// - 60s (one minute, service restart) -- happens in the worker w/ same signature
|
||||
/// - 60min (one hour, instance maintenance) --- happens in the retry worker
|
||||
/// - 60h (2.5 days, major incident with rebuild from backup) --- happens in the retry worker
|
||||
pub(super) async fn activity_worker(
|
||||
client: ClientWithMiddleware,
|
||||
timeout: Duration,
|
||||
message: SendActivityTask,
|
||||
retry_queue: UnboundedSender<SendActivityTask>,
|
||||
stats: Arc<Stats>,
|
||||
strategy: RetryStrategy,
|
||||
http_signature_compat: bool,
|
||||
) {
|
||||
stats.pending.fetch_sub(1, Ordering::Relaxed);
|
||||
stats.running.fetch_add(1, Ordering::Relaxed);
|
||||
use super::{request::sign_and_send, util::RetryStrategy, ActivityMessage, ActivityQueue};
|
||||
/// Configuration for the worker, with various tweaks
|
||||
#[derive(Builder, Clone)]
|
||||
pub struct Worker {
|
||||
#[builder(default = "reqwest::Client::default().into()")]
|
||||
/// The Reqwest client to make requests with
|
||||
pub client: ClientWithMiddleware,
|
||||
#[builder(default = "Duration::from_secs(10)")]
|
||||
/// The timeout before a request fails
|
||||
pub request_timeout: Duration,
|
||||
#[builder(default)]
|
||||
/// The strategy for retrying requests
|
||||
pub strategy: RetryStrategy,
|
||||
#[builder(default = "false")]
|
||||
/// Whether to enable signature compat or not
|
||||
pub http_signature_compat: bool,
|
||||
}
|
||||
|
||||
let outcome = sign_and_send(&message, &client, timeout, strategy, http_signature_compat).await;
|
||||
|
||||
// "Running" has finished, check the outcome
|
||||
stats.running.fetch_sub(1, Ordering::Relaxed);
|
||||
|
||||
match outcome {
|
||||
Ok(_) => {
|
||||
stats.completed_last_hour.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
Err(_err) => {
|
||||
stats.retries.fetch_add(1, Ordering::Relaxed);
|
||||
warn!(
|
||||
"Sending activity {} to {} to the retry queue to be tried again later",
|
||||
message.activity_id, message.inbox
|
||||
);
|
||||
// Send to the retry queue. Ignoring whether it succeeds or not
|
||||
retry_queue.send(message).ok();
|
||||
impl Worker {
|
||||
/// Create a worker from a config
|
||||
pub fn from_config<T: Clone>(config: &FederationConfig<T>) -> Self {
|
||||
Self {
|
||||
client: config.client.clone(),
|
||||
request_timeout: config.request_timeout,
|
||||
strategy: Default::default(),
|
||||
http_signature_compat: config.http_signature_compat,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn retry_worker(
|
||||
client: ClientWithMiddleware,
|
||||
timeout: Duration,
|
||||
message: SendActivityTask,
|
||||
stats: Arc<Stats>,
|
||||
strategy: RetryStrategy,
|
||||
http_signature_compat: bool,
|
||||
) {
|
||||
// Because the times are pretty extravagant between retries, we have to re-sign each time
|
||||
let outcome = retry(
|
||||
|| {
|
||||
sign_and_send(
|
||||
&message,
|
||||
&client,
|
||||
timeout,
|
||||
RetryStrategy {
|
||||
backoff: 0,
|
||||
retries: 0,
|
||||
offset: 0,
|
||||
initial_sleep: 0,
|
||||
},
|
||||
http_signature_compat,
|
||||
)
|
||||
},
|
||||
strategy,
|
||||
)
|
||||
.await;
|
||||
#[async_trait]
|
||||
impl ActivityQueue for Worker {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
stats.retries.fetch_sub(1, Ordering::Relaxed);
|
||||
|
||||
match outcome {
|
||||
Ok(_) => {
|
||||
stats.completed_last_hour.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
Err(_err) => {
|
||||
stats.dead_last_hour.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
async fn queue(&self, message: ActivityMessage) -> Result<(), Error> {
|
||||
sign_and_send(
|
||||
&message,
|
||||
&self.client,
|
||||
self.request_timeout,
|
||||
self.strategy,
|
||||
self.http_signature_compat,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue