remove synchronous federation

remove activity sender queue
This commit is contained in:
phiresky 2023-09-01 13:00:21 +00:00
parent 884307ac60
commit 2767ab4a6f
7 changed files with 15 additions and 97 deletions

View file

@ -2,7 +2,6 @@
set -e set -e
export LEMMY_DATABASE_URL=postgres://lemmy:password@localhost:5432 export LEMMY_DATABASE_URL=postgres://lemmy:password@localhost:5432
export LEMMY_SYNCHRONOUS_FEDERATION=1 # currently this is true in debug by default, but still.
pushd .. pushd ..
cargo build cargo build
rm target/lemmy_server || true rm target/lemmy_server || true

View file

@ -17,22 +17,14 @@ use lemmy_db_schema::{
}, },
}; };
use lemmy_db_views::structs::PrivateMessageView; use lemmy_db_views::structs::PrivateMessageView;
use lemmy_utils::{error::LemmyResult, SYNCHRONOUS_FEDERATION}; use lemmy_utils::error::LemmyResult;
use once_cell::sync::{Lazy, OnceCell}; use once_cell::sync::OnceCell;
use tokio::{
sync::{
mpsc,
mpsc::{UnboundedReceiver, UnboundedSender, WeakUnboundedSender},
Mutex,
},
task::JoinHandle,
};
use url::Url; use url::Url;
type MatchOutgoingActivitiesBoxed = type MatchOutgoingActivitiesBoxed =
Box<for<'a> fn(SendActivityData, &'a Data<LemmyContext>) -> BoxFuture<'a, LemmyResult<()>>>; Box<for<'a> fn(SendActivityData, &'a Data<LemmyContext>) -> BoxFuture<'a, LemmyResult<()>>>;
/// This static is necessary so that activities can be sent out synchronously for tests. /// This static is necessary so that the api_common crates don't need to depend on lemmy_apub
pub static MATCH_OUTGOING_ACTIVITIES: OnceCell<MatchOutgoingActivitiesBoxed> = OnceCell::new(); pub static MATCH_OUTGOING_ACTIVITIES: OnceCell<MatchOutgoingActivitiesBoxed> = OnceCell::new();
#[derive(Debug)] #[derive(Debug)]
@ -62,51 +54,16 @@ pub enum SendActivityData {
CreateReport(Url, Person, Community, String), CreateReport(Url, Person, Community, String),
} }
// TODO: instead of static, move this into LemmyContext. make sure that stopping the process with pub struct ActivityChannel;
// ctrl+c still works.
static ACTIVITY_CHANNEL: Lazy<ActivityChannel> = Lazy::new(|| {
let (sender, receiver) = mpsc::unbounded_channel();
let weak_sender = sender.downgrade();
ActivityChannel {
weak_sender,
receiver: Mutex::new(receiver),
keepalive_sender: Mutex::new(Some(sender)),
}
});
pub struct ActivityChannel {
weak_sender: WeakUnboundedSender<SendActivityData>,
receiver: Mutex<UnboundedReceiver<SendActivityData>>,
keepalive_sender: Mutex<Option<UnboundedSender<SendActivityData>>>,
}
impl ActivityChannel { impl ActivityChannel {
pub async fn retrieve_activity() -> Option<SendActivityData> {
let mut lock = ACTIVITY_CHANNEL.receiver.lock().await;
lock.recv().await
}
pub async fn submit_activity( pub async fn submit_activity(
data: SendActivityData, data: SendActivityData,
context: &Data<LemmyContext>, context: &Data<LemmyContext>,
) -> LemmyResult<()> { ) -> LemmyResult<()> {
if *SYNCHRONOUS_FEDERATION { MATCH_OUTGOING_ACTIVITIES
MATCH_OUTGOING_ACTIVITIES .get()
.get() .expect("retrieve function pointer")(data, context)
.expect("retrieve function pointer")(data, context) .await
.await?;
}
// could do `ACTIVITY_CHANNEL.keepalive_sender.lock()` instead and get rid of weak_sender,
// not sure which way is more efficient
else if let Some(sender) = ACTIVITY_CHANNEL.weak_sender.upgrade() {
sender.send(data)?;
}
Ok(())
}
pub async fn close(outgoing_activities_task: JoinHandle<LemmyResult<()>>) -> LemmyResult<()> {
ACTIVITY_CHANNEL.keepalive_sender.lock().await.take();
outgoing_activities_task.await??;
Ok(())
} }
} }

View file

@ -37,7 +37,6 @@ use lemmy_utils::{
slurs::{check_slurs, check_slurs_opt}, slurs::{check_slurs, check_slurs_opt},
validation::{check_url_scheme, clean_url_params, is_valid_body_field, is_valid_post_title}, validation::{check_url_scheme, clean_url_params, is_valid_body_field, is_valid_post_title},
}, },
SYNCHRONOUS_FEDERATION,
}; };
use tracing::Instrument; use tracing::Instrument;
use url::Url; use url::Url;
@ -190,11 +189,7 @@ pub async fn create_post(
Err(e) => Err(e).with_lemmy_type(LemmyErrorType::CouldntSendWebmention), Err(e) => Err(e).with_lemmy_type(LemmyErrorType::CouldntSendWebmention),
} }
}; };
if *SYNCHRONOUS_FEDERATION { spawn_try_task(task);
task.await?;
} else {
spawn_try_task(task);
}
}; };
build_post_response(&context, community_id, person_id, post_id).await build_post_response(&context, community_id, person_id, post_id).await

View file

@ -48,7 +48,6 @@ use lemmy_db_views_actor::structs::{CommunityPersonBanView, CommunityView};
use lemmy_utils::{ use lemmy_utils::{
error::{LemmyError, LemmyErrorExt, LemmyErrorType, LemmyResult}, error::{LemmyError, LemmyErrorExt, LemmyErrorType, LemmyResult},
spawn_try_task, spawn_try_task,
SYNCHRONOUS_FEDERATION,
}; };
use serde::Serialize; use serde::Serialize;
use std::{ops::Deref, time::Duration}; use std::{ops::Deref, time::Duration};
@ -225,13 +224,6 @@ where
Ok(()) Ok(())
} }
pub async fn handle_outgoing_activities(context: Data<LemmyContext>) -> LemmyResult<()> {
while let Some(data) = ActivityChannel::retrieve_activity().await {
match_outgoing_activities(data, &context.reset_request_count()).await?
}
Ok(())
}
pub async fn match_outgoing_activities( pub async fn match_outgoing_activities(
data: SendActivityData, data: SendActivityData,
context: &Data<LemmyContext>, context: &Data<LemmyContext>,
@ -336,10 +328,6 @@ pub async fn match_outgoing_activities(
} }
} }
}; };
if *SYNCHRONOUS_FEDERATION { spawn_try_task(fed_task);
fed_task.await?;
} else {
spawn_try_task(fed_task);
}
Ok(()) Ok(())
} }

View file

@ -18,7 +18,6 @@ pub mod version;
use error::LemmyError; use error::LemmyError;
use futures::Future; use futures::Future;
use once_cell::sync::Lazy;
use std::time::Duration; use std::time::Duration;
use tracing::Instrument; use tracing::Instrument;
@ -38,16 +37,6 @@ macro_rules! location_info {
}; };
} }
/// if true, all federation should happen synchronously. useful for debugging and testing.
/// defaults to true on debug mode, false on releasemode
/// override to true by setting env LEMMY_SYNCHRONOUS_FEDERATION=1
/// override to false by setting env LEMMY_SYNCHRONOUS_FEDERATION=""
pub static SYNCHRONOUS_FEDERATION: Lazy<bool> = Lazy::new(|| {
std::env::var("LEMMY_SYNCHRONOUS_FEDERATION")
.map(|s| !s.is_empty())
.unwrap_or(cfg!(debug_assertions))
});
/// tokio::spawn, but accepts a future that may fail and also /// tokio::spawn, but accepts a future that may fail and also
/// * logs errors /// * logs errors
/// * attaches the spawned task to the tracing span of the caller for better logging /// * attaches the spawned task to the tracing span of the caller for better logging

View file

@ -107,7 +107,7 @@ use lemmy_apub::{
}, },
SendActivity, SendActivity,
}; };
use lemmy_utils::{rate_limit::RateLimitCell, spawn_try_task, SYNCHRONOUS_FEDERATION}; use lemmy_utils::{rate_limit::RateLimitCell, spawn_try_task};
use serde::Deserialize; use serde::Deserialize;
pub fn config(cfg: &mut web::ServiceConfig, rate_limit: &RateLimitCell) { pub fn config(cfg: &mut web::ServiceConfig, rate_limit: &RateLimitCell) {
@ -364,11 +364,7 @@ where
let res = data.perform(&context).await?; let res = data.perform(&context).await?;
let res_clone = res.clone(); let res_clone = res.clone();
let fed_task = async move { SendActivity::send_activity(&data, &res_clone, &apub_data).await }; let fed_task = async move { SendActivity::send_activity(&data, &res_clone, &apub_data).await };
if *SYNCHRONOUS_FEDERATION { spawn_try_task(fed_task);
fed_task.await?;
} else {
spawn_try_task(fed_task);
}
Ok(HttpResponse::Ok().json(&res)) Ok(HttpResponse::Ok().json(&res))
} }

View file

@ -27,14 +27,14 @@ use lemmy_api_common::{
context::LemmyContext, context::LemmyContext,
lemmy_db_views::structs::SiteView, lemmy_db_views::structs::SiteView,
request::build_user_agent, request::build_user_agent,
send_activity::{ActivityChannel, MATCH_OUTGOING_ACTIVITIES}, send_activity::MATCH_OUTGOING_ACTIVITIES,
utils::{ utils::{
check_private_instance_and_federation_enabled, check_private_instance_and_federation_enabled,
local_site_rate_limit_to_rate_limit_config, local_site_rate_limit_to_rate_limit_config,
}, },
}; };
use lemmy_apub::{ use lemmy_apub::{
activities::{handle_outgoing_activities, match_outgoing_activities}, activities::match_outgoing_activities,
VerifyUrlData, VerifyUrlData,
FEDERATION_HTTP_FETCH_LIMIT, FEDERATION_HTTP_FETCH_LIMIT,
}; };
@ -49,7 +49,6 @@ use lemmy_utils::{
rate_limit::RateLimitCell, rate_limit::RateLimitCell,
response::jsonify_plain_text_errors, response::jsonify_plain_text_errors,
settings::SETTINGS, settings::SETTINGS,
SYNCHRONOUS_FEDERATION,
}; };
use reqwest::Client; use reqwest::Client;
use reqwest_middleware::ClientBuilder; use reqwest_middleware::ClientBuilder;
@ -179,7 +178,7 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> {
.app_data(context.clone()) .app_data(context.clone())
.client(client.clone()) .client(client.clone())
.http_fetch_limit(FEDERATION_HTTP_FETCH_LIMIT) .http_fetch_limit(FEDERATION_HTTP_FETCH_LIMIT)
.debug(*SYNCHRONOUS_FEDERATION) .debug(false)
.http_signature_compat(true) .http_signature_compat(true)
.url_verifier(Box::new(VerifyUrlData(context.inner_pool().clone()))) .url_verifier(Box::new(VerifyUrlData(context.inner_pool().clone())))
.build() .build()
@ -198,8 +197,6 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> {
Box::pin(match_outgoing_activities(d, c)) Box::pin(match_outgoing_activities(d, c))
})) }))
.expect("set function pointer"); .expect("set function pointer");
let request_data = federation_config.to_request_data();
let outgoing_activities_task = tokio::task::spawn(handle_outgoing_activities(request_data));
let server = if args.http_server { let server = if args.http_server {
let federation_config = federation_config.clone(); let federation_config = federation_config.clone();
@ -290,9 +287,6 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> {
federate.cancel().await?; federate.cancel().await?;
} }
// Wait for outgoing apub sends to complete
ActivityChannel::close(outgoing_activities_task).await?;
Ok(()) Ok(())
} }