revert apub send changes

This commit is contained in:
Felix Ableitner 2024-04-03 13:38:51 +02:00
parent 5faea338b5
commit b0bd4a2996
3 changed files with 38 additions and 4 deletions

View file

@ -11,6 +11,8 @@ fi
export RUST_BACKTRACE=1
#export RUST_LOG="warn,lemmy_server=$LEMMY_LOG_LEVEL,lemmy_federate=$LEMMY_LOG_LEVEL,lemmy_api=$LEMMY_LOG_LEVEL,lemmy_api_common=$LEMMY_LOG_LEVEL,lemmy_api_crud=$LEMMY_LOG_LEVEL,lemmy_apub=$LEMMY_LOG_LEVEL,lemmy_db_schema=$LEMMY_LOG_LEVEL,lemmy_db_views=$LEMMY_LOG_LEVEL,lemmy_db_views_actor=$LEMMY_LOG_LEVEL,lemmy_db_views_moderator=$LEMMY_LOG_LEVEL,lemmy_routes=$LEMMY_LOG_LEVEL,lemmy_utils=$LEMMY_LOG_LEVEL,lemmy_websocket=$LEMMY_LOG_LEVEL"
export LEMMY_TEST_FAST_FEDERATION=1 # by default, the persistent federation queue has delays in the scale of 30s-5min
# pictrs setup
if [ ! -f "api_tests/pict-rs" ]; then
curl "https://git.asonix.dog/asonix/pict-rs/releases/download/v0.5.0-beta.2/pict-rs-linux-amd64" -o api_tests/pict-rs

View file

@ -25,6 +25,27 @@ use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc, time::Duration};
use tokio::{task::JoinHandle, time::sleep};
use tokio_util::sync::CancellationToken;
/// Decrease the delays of the federation queue.
/// Should only be used for federation tests since it significantly increases CPU and DB load of the federation queue.
pub(crate) static LEMMY_TEST_FAST_FEDERATION: Lazy<bool> = Lazy::new(|| {
std::env::var("LEMMY_TEST_FAST_FEDERATION")
.map(|s| !s.is_empty())
.unwrap_or(false)
});
/// Recheck for new federation work every n seconds.
///
/// When the queue is processed faster than new activities are added and it reaches the current time with an empty batch,
/// this is the delay the queue waits before it checks if new activities have been added to the sent_activities table.
/// This delay is only applied if no federated activity happens during sending activities of the last batch.
pub(crate) static WORK_FINISHED_RECHECK_DELAY: Lazy<Duration> = Lazy::new(|| {
if *LEMMY_TEST_FAST_FEDERATION {
Duration::from_millis(100)
} else {
Duration::from_secs(30)
}
});
/// A task that will be run in an infinite loop, unless it is cancelled.
/// If the task exits without being cancelled, an error will be logged and the task will be restarted.
pub struct CancellableTask {

View file

@ -1,4 +1,10 @@
use crate::util::{get_activity_cached, get_actor_cached, get_latest_activity_id};
use crate::util::{
get_activity_cached,
get_actor_cached,
get_latest_activity_id,
LEMMY_TEST_FAST_FEDERATION,
WORK_FINISHED_RECHECK_DELAY,
};
use activitypub_federation::{
activity_sending::SendActivityTask,
config::Data,
@ -44,8 +50,13 @@ static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60);
/// this delay limits the maximum time until the follow actually results in activities from that community id being sent to that inbox url.
/// This delay currently needs to not be too small because the DB load is currently fairly high because of the current structure of storing inboxes for every person, not having a separate list of shared_inboxes, and the architecture of having every instance queue be fully separate.
/// (see https://github.com/LemmyNet/lemmy/issues/3958)
static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy<chrono::TimeDelta> =
Lazy::new(|| chrono::TimeDelta::from_std(CACHE_DURATION_SHORT).expect("TimeDelta out of bounds"));
static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy<chrono::TimeDelta> = Lazy::new(|| {
if *LEMMY_TEST_FAST_FEDERATION {
chrono::TimeDelta::try_seconds(1).expect("TimeDelta out of bounds")
} else {
chrono::TimeDelta::try_minutes(2).expect("TimeDelta out of bounds")
}
});
/// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance unfollows a specific remote community.
/// This is expected to happen pretty rarely and updating it in a timely manner is not too important.
static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy<chrono::TimeDelta> =
@ -153,7 +164,7 @@ impl InstanceWorker {
if id >= latest_id {
// no more work to be done, wait before rechecking
tokio::select! {
() = sleep(CACHE_DURATION_SHORT) => {},
() = sleep(*WORK_FINISHED_RECHECK_DELAY) => {},
() = self.stop.cancelled() => {}
}
return Ok(());