mirror of
https://github.com/LemmyNet/activitypub-federation-rust.git
synced 2024-06-08 16:29:33 +00:00
add shutdown method
This commit is contained in:
parent
68f9210d4c
commit
8065813b8f
|
@ -113,19 +113,11 @@ where
|
|||
activity_queue.queue(message).await?;
|
||||
let stats = activity_queue.get_stats();
|
||||
let running = stats.running.load(Ordering::Relaxed);
|
||||
let stats_fmt = format!(
|
||||
"Activity queue stats: pending: {}, running: {}, retries: {}, dead: {}, complete: {}",
|
||||
stats.pending.load(Ordering::Relaxed),
|
||||
running,
|
||||
stats.retries.load(Ordering::Relaxed),
|
||||
stats.dead_last_hour.load(Ordering::Relaxed),
|
||||
stats.completed_last_hour.load(Ordering::Relaxed),
|
||||
);
|
||||
if running == config.worker_count && config.worker_count != 0 {
|
||||
warn!("Reached max number of send activity workers ({}). Consider increasing worker count to avoid federation delays", config.worker_count);
|
||||
warn!(stats_fmt);
|
||||
warn!("{:?}", stats);
|
||||
} else {
|
||||
info!(stats_fmt);
|
||||
info!("{:?}", stats);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -264,7 +256,7 @@ pub(crate) struct ActivityQueue {
|
|||
/// This is a lock-free way to share things between tasks
|
||||
/// When reading these values it's possible (but extremely unlikely) to get stale data if a worker task is in the middle of transitioning
|
||||
#[derive(Default)]
|
||||
struct Stats {
|
||||
pub(crate) struct Stats {
|
||||
pending: AtomicUsize,
|
||||
running: AtomicUsize,
|
||||
retries: AtomicUsize,
|
||||
|
@ -272,6 +264,20 @@ struct Stats {
|
|||
completed_last_hour: AtomicUsize,
|
||||
}
|
||||
|
||||
impl Debug for Stats {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"Activity queue stats: pending: {}, running: {}, retries: {}, dead: {}, complete: {}",
|
||||
self.pending.load(Ordering::Relaxed),
|
||||
self.running.load(Ordering::Relaxed),
|
||||
self.retries.load(Ordering::Relaxed),
|
||||
self.dead_last_hour.load(Ordering::Relaxed),
|
||||
self.completed_last_hour.load(Ordering::Relaxed)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Default)]
|
||||
struct RetryStrategy {
|
||||
/// Amount of time in seconds to back off
|
||||
|
@ -494,7 +500,10 @@ impl ActivityQueue {
|
|||
|
||||
#[allow(unused)]
|
||||
// Drops all the senders and shuts down the workers
|
||||
async fn shutdown(self, wait_for_retries: bool) -> Result<Arc<Stats>, anyhow::Error> {
|
||||
pub(crate) async fn shutdown(
|
||||
self,
|
||||
wait_for_retries: bool,
|
||||
) -> Result<Arc<Stats>, anyhow::Error> {
|
||||
drop(self.sender);
|
||||
|
||||
self.sender_task.await?;
|
||||
|
|
|
@ -21,6 +21,7 @@ use crate::{
|
|||
protocol::verification::verify_domains_match,
|
||||
traits::{ActivityHandler, Actor},
|
||||
};
|
||||
use anyhow::Context;
|
||||
use async_trait::async_trait;
|
||||
use derive_builder::Builder;
|
||||
use dyn_clone::{clone_trait_object, DynClone};
|
||||
|
@ -183,6 +184,23 @@ impl<T: Clone> FederationConfig<T> {
|
|||
pub fn domain(&self) -> &str {
|
||||
&self.domain
|
||||
}
|
||||
/// Shut down this federation, waiting for the outgoing queue to be sent
|
||||
/// If the activityqueue is still in use in other requests, returns an error
|
||||
/// If wait_retries is true, also wait for requests that have initially failed and are being retried
|
||||
///
|
||||
/// Currently, this method does not work correctly if worker_count = 0 (unlimited)
|
||||
pub async fn shutdown(mut self, wait_retries: bool) -> anyhow::Result<()> {
|
||||
if let Some(q) = self.activity_queue.take() {
|
||||
let stats = Arc::<ActivityQueue>::into_inner(q)
|
||||
.context(
|
||||
"Could not cleanly shut down: activityqueue arc was still in use elsewhere ",
|
||||
)?
|
||||
.shutdown(wait_retries)
|
||||
.await?;
|
||||
tracing::info!("{:?}", stats);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Clone> FederationConfigBuilder<T> {
|
||||
|
|
|
@ -11,7 +11,7 @@ use url::Url;
|
|||
|
||||
impl<T> FromStr for ObjectId<T>
|
||||
where
|
||||
T: Object + Send + 'static,
|
||||
T: Object + Send + Debug + 'static,
|
||||
for<'de2> <T as Object>::Kind: Deserialize<'de2>,
|
||||
{
|
||||
type Err = url::ParseError;
|
||||
|
@ -62,7 +62,7 @@ where
|
|||
|
||||
impl<Kind> ObjectId<Kind>
|
||||
where
|
||||
Kind: Object + Send + 'static,
|
||||
Kind: Object + Send + Debug + 'static,
|
||||
for<'de2> <Kind as Object>::Kind: serde::Deserialize<'de2>,
|
||||
{
|
||||
/// Construct a new objectid instance
|
||||
|
@ -93,7 +93,6 @@ where
|
|||
<Kind as Object>::Error: From<Error> + From<anyhow::Error>,
|
||||
{
|
||||
let db_object = self.dereference_from_db(data).await?;
|
||||
|
||||
// if its a local object, only fetch it from the database and not over http
|
||||
if data.config.is_local_url(&self.0) {
|
||||
return match db_object {
|
||||
|
|
|
@ -93,7 +93,7 @@ use url::Url;
|
|||
///
|
||||
/// }
|
||||
#[async_trait]
|
||||
pub trait Object: Sized {
|
||||
pub trait Object: Sized + Debug {
|
||||
/// App data type passed to handlers. Must be identical to
|
||||
/// [crate::config::FederationConfigBuilder::app_data] type.
|
||||
type DataType: Clone + Send + Sync;
|
||||
|
|
Loading…
Reference in a new issue