From 2767ab4a6fed9aa8d197eda0c6a25a1d617d192d Mon Sep 17 00:00:00 2001 From: phiresky Date: Fri, 1 Sep 2023 13:00:21 +0000 Subject: [PATCH] remove synchronous federation remove activity sender queue --- api_tests/run-federation-test.sh | 1 - crates/api_common/src/send_activity.rs | 59 ++++---------------------- crates/api_crud/src/post/create.rs | 7 +-- crates/apub/src/activities/mod.rs | 14 +----- crates/utils/src/lib.rs | 11 ----- src/api_routes_http.rs | 8 +--- src/lib.rs | 12 ++---- 7 files changed, 15 insertions(+), 97 deletions(-) diff --git a/api_tests/run-federation-test.sh b/api_tests/run-federation-test.sh index ff74744a1..3042fd344 100755 --- a/api_tests/run-federation-test.sh +++ b/api_tests/run-federation-test.sh @@ -2,7 +2,6 @@ set -e export LEMMY_DATABASE_URL=postgres://lemmy:password@localhost:5432 -export LEMMY_SYNCHRONOUS_FEDERATION=1 # currently this is true in debug by default, but still. pushd .. cargo build rm target/lemmy_server || true diff --git a/crates/api_common/src/send_activity.rs b/crates/api_common/src/send_activity.rs index 45109d5c5..84b2efb2d 100644 --- a/crates/api_common/src/send_activity.rs +++ b/crates/api_common/src/send_activity.rs @@ -17,22 +17,14 @@ use lemmy_db_schema::{ }, }; use lemmy_db_views::structs::PrivateMessageView; -use lemmy_utils::{error::LemmyResult, SYNCHRONOUS_FEDERATION}; -use once_cell::sync::{Lazy, OnceCell}; -use tokio::{ - sync::{ - mpsc, - mpsc::{UnboundedReceiver, UnboundedSender, WeakUnboundedSender}, - Mutex, - }, - task::JoinHandle, -}; +use lemmy_utils::error::LemmyResult; +use once_cell::sync::OnceCell; use url::Url; type MatchOutgoingActivitiesBoxed = Box fn(SendActivityData, &'a Data) -> BoxFuture<'a, LemmyResult<()>>>; -/// This static is necessary so that activities can be sent out synchronously for tests. +/// This static is necessary so that the api_common crates don't need to depend on lemmy_apub pub static MATCH_OUTGOING_ACTIVITIES: OnceCell = OnceCell::new(); #[derive(Debug)] @@ -62,51 +54,16 @@ pub enum SendActivityData { CreateReport(Url, Person, Community, String), } -// TODO: instead of static, move this into LemmyContext. make sure that stopping the process with -// ctrl+c still works. -static ACTIVITY_CHANNEL: Lazy = Lazy::new(|| { - let (sender, receiver) = mpsc::unbounded_channel(); - let weak_sender = sender.downgrade(); - ActivityChannel { - weak_sender, - receiver: Mutex::new(receiver), - keepalive_sender: Mutex::new(Some(sender)), - } -}); - -pub struct ActivityChannel { - weak_sender: WeakUnboundedSender, - receiver: Mutex>, - keepalive_sender: Mutex>>, -} +pub struct ActivityChannel; impl ActivityChannel { - pub async fn retrieve_activity() -> Option { - let mut lock = ACTIVITY_CHANNEL.receiver.lock().await; - lock.recv().await - } - pub async fn submit_activity( data: SendActivityData, context: &Data, ) -> LemmyResult<()> { - if *SYNCHRONOUS_FEDERATION { - MATCH_OUTGOING_ACTIVITIES - .get() - .expect("retrieve function pointer")(data, context) - .await?; - } - // could do `ACTIVITY_CHANNEL.keepalive_sender.lock()` instead and get rid of weak_sender, - // not sure which way is more efficient - else if let Some(sender) = ACTIVITY_CHANNEL.weak_sender.upgrade() { - sender.send(data)?; - } - Ok(()) - } - - pub async fn close(outgoing_activities_task: JoinHandle>) -> LemmyResult<()> { - ACTIVITY_CHANNEL.keepalive_sender.lock().await.take(); - outgoing_activities_task.await??; - Ok(()) + MATCH_OUTGOING_ACTIVITIES + .get() + .expect("retrieve function pointer")(data, context) + .await } } diff --git a/crates/api_crud/src/post/create.rs b/crates/api_crud/src/post/create.rs index 9d27f5d62..e84fddc28 100644 --- a/crates/api_crud/src/post/create.rs +++ b/crates/api_crud/src/post/create.rs @@ -37,7 +37,6 @@ use lemmy_utils::{ slurs::{check_slurs, check_slurs_opt}, validation::{check_url_scheme, clean_url_params, is_valid_body_field, is_valid_post_title}, }, - SYNCHRONOUS_FEDERATION, }; use tracing::Instrument; use url::Url; @@ -190,11 +189,7 @@ pub async fn create_post( Err(e) => Err(e).with_lemmy_type(LemmyErrorType::CouldntSendWebmention), } }; - if *SYNCHRONOUS_FEDERATION { - task.await?; - } else { - spawn_try_task(task); - } + spawn_try_task(task); }; build_post_response(&context, community_id, person_id, post_id).await diff --git a/crates/apub/src/activities/mod.rs b/crates/apub/src/activities/mod.rs index ff17be54d..c0d12758b 100644 --- a/crates/apub/src/activities/mod.rs +++ b/crates/apub/src/activities/mod.rs @@ -48,7 +48,6 @@ use lemmy_db_views_actor::structs::{CommunityPersonBanView, CommunityView}; use lemmy_utils::{ error::{LemmyError, LemmyErrorExt, LemmyErrorType, LemmyResult}, spawn_try_task, - SYNCHRONOUS_FEDERATION, }; use serde::Serialize; use std::{ops::Deref, time::Duration}; @@ -225,13 +224,6 @@ where Ok(()) } -pub async fn handle_outgoing_activities(context: Data) -> LemmyResult<()> { - while let Some(data) = ActivityChannel::retrieve_activity().await { - match_outgoing_activities(data, &context.reset_request_count()).await? - } - Ok(()) -} - pub async fn match_outgoing_activities( data: SendActivityData, context: &Data, @@ -336,10 +328,6 @@ pub async fn match_outgoing_activities( } } }; - if *SYNCHRONOUS_FEDERATION { - fed_task.await?; - } else { - spawn_try_task(fed_task); - } + spawn_try_task(fed_task); Ok(()) } diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 1ef8a842c..c0553de31 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -18,7 +18,6 @@ pub mod version; use error::LemmyError; use futures::Future; -use once_cell::sync::Lazy; use std::time::Duration; use tracing::Instrument; @@ -38,16 +37,6 @@ macro_rules! location_info { }; } -/// if true, all federation should happen synchronously. useful for debugging and testing. -/// defaults to true on debug mode, false on releasemode -/// override to true by setting env LEMMY_SYNCHRONOUS_FEDERATION=1 -/// override to false by setting env LEMMY_SYNCHRONOUS_FEDERATION="" -pub static SYNCHRONOUS_FEDERATION: Lazy = Lazy::new(|| { - std::env::var("LEMMY_SYNCHRONOUS_FEDERATION") - .map(|s| !s.is_empty()) - .unwrap_or(cfg!(debug_assertions)) -}); - /// tokio::spawn, but accepts a future that may fail and also /// * logs errors /// * attaches the spawned task to the tracing span of the caller for better logging diff --git a/src/api_routes_http.rs b/src/api_routes_http.rs index 5c8360d0d..e90f64250 100644 --- a/src/api_routes_http.rs +++ b/src/api_routes_http.rs @@ -107,7 +107,7 @@ use lemmy_apub::{ }, SendActivity, }; -use lemmy_utils::{rate_limit::RateLimitCell, spawn_try_task, SYNCHRONOUS_FEDERATION}; +use lemmy_utils::{rate_limit::RateLimitCell, spawn_try_task}; use serde::Deserialize; pub fn config(cfg: &mut web::ServiceConfig, rate_limit: &RateLimitCell) { @@ -364,11 +364,7 @@ where let res = data.perform(&context).await?; let res_clone = res.clone(); let fed_task = async move { SendActivity::send_activity(&data, &res_clone, &apub_data).await }; - if *SYNCHRONOUS_FEDERATION { - fed_task.await?; - } else { - spawn_try_task(fed_task); - } + spawn_try_task(fed_task); Ok(HttpResponse::Ok().json(&res)) } diff --git a/src/lib.rs b/src/lib.rs index d168552bc..d1fd4b845 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,14 +27,14 @@ use lemmy_api_common::{ context::LemmyContext, lemmy_db_views::structs::SiteView, request::build_user_agent, - send_activity::{ActivityChannel, MATCH_OUTGOING_ACTIVITIES}, + send_activity::MATCH_OUTGOING_ACTIVITIES, utils::{ check_private_instance_and_federation_enabled, local_site_rate_limit_to_rate_limit_config, }, }; use lemmy_apub::{ - activities::{handle_outgoing_activities, match_outgoing_activities}, + activities::match_outgoing_activities, VerifyUrlData, FEDERATION_HTTP_FETCH_LIMIT, }; @@ -49,7 +49,6 @@ use lemmy_utils::{ rate_limit::RateLimitCell, response::jsonify_plain_text_errors, settings::SETTINGS, - SYNCHRONOUS_FEDERATION, }; use reqwest::Client; use reqwest_middleware::ClientBuilder; @@ -179,7 +178,7 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> { .app_data(context.clone()) .client(client.clone()) .http_fetch_limit(FEDERATION_HTTP_FETCH_LIMIT) - .debug(*SYNCHRONOUS_FEDERATION) + .debug(false) .http_signature_compat(true) .url_verifier(Box::new(VerifyUrlData(context.inner_pool().clone()))) .build() @@ -198,8 +197,6 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> { Box::pin(match_outgoing_activities(d, c)) })) .expect("set function pointer"); - let request_data = federation_config.to_request_data(); - let outgoing_activities_task = tokio::task::spawn(handle_outgoing_activities(request_data)); let server = if args.http_server { let federation_config = federation_config.clone(); @@ -290,9 +287,6 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> { federate.cancel().await?; } - // Wait for outgoing apub sends to complete - ActivityChannel::close(outgoing_activities_task).await?; - Ok(()) }