From ef1e164cc508faed6831cd541646eab8492c56a2 Mon Sep 17 00:00:00 2001 From: Nutomic Date: Thu, 3 Mar 2022 18:54:33 +0000 Subject: [PATCH] Make activity queue worker count configurable, log stats (#2113) --- config/defaults.hjson | 2 ++ crates/apub/src/collections/community_moderators.rs | 5 +---- crates/apub/src/objects/comment.rs | 9 ++------- crates/apub/src/objects/community.rs | 5 +---- crates/apub/src/objects/instance.rs | 5 +---- crates/apub/src/objects/mod.rs | 8 ++++++-- crates/apub/src/objects/person.rs | 9 ++------- crates/apub/src/objects/post.rs | 5 +---- crates/apub/src/objects/private_message.rs | 9 ++------- crates/apub_lib/src/activity_queue.rs | 11 ++++++++++- crates/utils/src/settings/structs.rs | 3 +++ src/main.rs | 2 +- 12 files changed, 32 insertions(+), 41 deletions(-) diff --git a/config/defaults.hjson b/config/defaults.hjson index c30c33ac8..ac732b20d 100644 --- a/config/defaults.hjson +++ b/config/defaults.hjson @@ -59,6 +59,8 @@ # use allowlist only for remote communities, and posts/comments in local communities # (meaning remote communities will show content from arbitrary instances). strict_allowlist: true + # Number of workers for sending outgoing activities. + worker_count: 16 } captcha: { # Whether captcha is required for signup diff --git a/crates/apub/src/collections/community_moderators.rs b/crates/apub/src/collections/community_moderators.rs index d24f06a13..a9a691b8c 100644 --- a/crates/apub/src/collections/community_moderators.rs +++ b/crates/apub/src/collections/community_moderators.rs @@ -146,7 +146,6 @@ mod tests { }, protocol::tests::file_to_json_object, }; - use lemmy_apub_lib::activity_queue::create_activity_queue; use lemmy_db_schema::{ source::{ community::Community, @@ -160,9 +159,7 @@ mod tests { #[actix_rt::test] #[serial] async fn test_parse_lemmy_community_moderators() { - let client = reqwest::Client::new().into(); - let manager = create_activity_queue(client); - let context = init_context(manager.queue_handle().clone()); + let context = init_context(); let (new_mod, site) = parse_lemmy_person(&context).await; let community = parse_lemmy_community(&context).await; let community_id = community.id; diff --git a/crates/apub/src/objects/comment.rs b/crates/apub/src/objects/comment.rs index e61cb8699..b719ba45b 100644 --- a/crates/apub/src/objects/comment.rs +++ b/crates/apub/src/objects/comment.rs @@ -218,7 +218,6 @@ pub(crate) mod tests { protocol::tests::file_to_json_object, }; use assert_json_diff::assert_json_include; - use lemmy_apub_lib::activity_queue::create_activity_queue; use lemmy_db_schema::source::site::Site; use serial_test::serial; @@ -248,9 +247,7 @@ pub(crate) mod tests { #[actix_rt::test] #[serial] pub(crate) async fn test_parse_lemmy_comment() { - let client = reqwest::Client::new().into(); - let manager = create_activity_queue(client); - let context = init_context(manager.queue_handle().clone()); + let context = init_context(); let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap(); let data = prepare_comment_test(&url, &context).await; @@ -279,9 +276,7 @@ pub(crate) mod tests { #[actix_rt::test] #[serial] async fn test_parse_pleroma_comment() { - let client = reqwest::Client::new().into(); - let manager = create_activity_queue(client); - let context = init_context(manager.queue_handle().clone()); + let context = init_context(); let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap(); let data = prepare_comment_test(&url, &context).await; diff --git a/crates/apub/src/objects/community.rs b/crates/apub/src/objects/community.rs index 612616cf9..82b6d797c 100644 --- a/crates/apub/src/objects/community.rs +++ b/crates/apub/src/objects/community.rs @@ -217,7 +217,6 @@ pub(crate) mod tests { objects::{instance::tests::parse_lemmy_instance, tests::init_context}, protocol::tests::file_to_json_object, }; - use lemmy_apub_lib::activity_queue::create_activity_queue; use lemmy_db_schema::{source::site::Site, traits::Crud}; use serial_test::serial; @@ -244,9 +243,7 @@ pub(crate) mod tests { #[actix_rt::test] #[serial] async fn test_parse_lemmy_community() { - let client = reqwest::Client::new().into(); - let manager = create_activity_queue(client); - let context = init_context(manager.queue_handle().clone()); + let context = init_context(); let site = parse_lemmy_instance(&context).await; let community = parse_lemmy_community(&context).await; diff --git a/crates/apub/src/objects/instance.rs b/crates/apub/src/objects/instance.rs index 40b585b5a..a07ecdc17 100644 --- a/crates/apub/src/objects/instance.rs +++ b/crates/apub/src/objects/instance.rs @@ -186,7 +186,6 @@ pub(in crate::objects) async fn fetch_instance_actor_for_object( pub(crate) mod tests { use super::*; use crate::{objects::tests::init_context, protocol::tests::file_to_json_object}; - use lemmy_apub_lib::activity_queue::create_activity_queue; use lemmy_db_schema::traits::Crud; use serial_test::serial; @@ -207,9 +206,7 @@ pub(crate) mod tests { #[actix_rt::test] #[serial] async fn test_parse_lemmy_instance() { - let client = reqwest::Client::new().into(); - let manager = create_activity_queue(client); - let context = init_context(manager.queue_handle().clone()); + let context = init_context(); let site = parse_lemmy_instance(&context).await; assert_eq!(site.name, "Enterprise"); diff --git a/crates/apub/src/objects/mod.rs b/crates/apub/src/objects/mod.rs index cfe1ecd36..cf66beccd 100644 --- a/crates/apub/src/objects/mod.rs +++ b/crates/apub/src/objects/mod.rs @@ -22,11 +22,11 @@ pub(crate) fn get_summary_from_string_or_source( #[cfg(test)] pub(crate) mod tests { use actix::Actor; - use background_jobs::QueueHandle; use diesel::{ r2d2::{ConnectionManager, Pool}, PgConnection, }; + use lemmy_apub_lib::activity_queue::create_activity_queue; use lemmy_db_schema::{ establish_unpooled_connection, get_database_url_from_env, @@ -46,7 +46,11 @@ pub(crate) mod tests { // TODO: would be nice if we didnt have to use a full context for tests. // or at least write a helper function so this code is shared with main.rs - pub(crate) fn init_context(activity_queue: QueueHandle) -> LemmyContext { + pub(crate) fn init_context() -> LemmyContext { + let client = reqwest::Client::new().into(); + // activity queue isnt used in tests, so worker count makes no difference + let queue_manager = create_activity_queue(client, 4); + let activity_queue = queue_manager.queue_handle().clone(); // call this to run migrations establish_unpooled_connection(); let settings = Settings::init().unwrap(); diff --git a/crates/apub/src/objects/person.rs b/crates/apub/src/objects/person.rs index b75b107a1..8eb7fe8b7 100644 --- a/crates/apub/src/objects/person.rs +++ b/crates/apub/src/objects/person.rs @@ -207,7 +207,6 @@ pub(crate) mod tests { }, protocol::{objects::instance::Instance, tests::file_to_json_object}, }; - use lemmy_apub_lib::activity_queue::create_activity_queue; use lemmy_db_schema::{source::site::Site, traits::Crud}; use serial_test::serial; @@ -229,9 +228,7 @@ pub(crate) mod tests { #[actix_rt::test] #[serial] async fn test_parse_lemmy_person() { - let client = reqwest::Client::new().into(); - let manager = create_activity_queue(client); - let context = init_context(manager.queue_handle().clone()); + let context = init_context(); let (person, site) = parse_lemmy_person(&context).await; assert_eq!(person.display_name, Some("Jean-Luc Picard".to_string())); @@ -245,9 +242,7 @@ pub(crate) mod tests { #[actix_rt::test] #[serial] async fn test_parse_pleroma_person() { - let client = reqwest::Client::new().into(); - let manager = create_activity_queue(client); - let context = init_context(manager.queue_handle().clone()); + let context = init_context(); // create and parse a fake pleroma instance actor, to avoid network request during test let mut json: Instance = file_to_json_object("assets/lemmy/objects/instance.json").unwrap(); diff --git a/crates/apub/src/objects/post.rs b/crates/apub/src/objects/post.rs index 3fb4b9cfd..a002e72c7 100644 --- a/crates/apub/src/objects/post.rs +++ b/crates/apub/src/objects/post.rs @@ -215,16 +215,13 @@ mod tests { }, protocol::tests::file_to_json_object, }; - use lemmy_apub_lib::activity_queue::create_activity_queue; use lemmy_db_schema::source::site::Site; use serial_test::serial; #[actix_rt::test] #[serial] async fn test_parse_lemmy_post() { - let client = reqwest::Client::new().into(); - let manager = create_activity_queue(client); - let context = init_context(manager.queue_handle().clone()); + let context = init_context(); let (person, site) = parse_lemmy_person(&context).await; let community = parse_lemmy_community(&context).await; diff --git a/crates/apub/src/objects/private_message.rs b/crates/apub/src/objects/private_message.rs index b1d5e5b19..4896e1991 100644 --- a/crates/apub/src/objects/private_message.rs +++ b/crates/apub/src/objects/private_message.rs @@ -167,7 +167,6 @@ mod tests { protocol::tests::file_to_json_object, }; use assert_json_diff::assert_json_include; - use lemmy_apub_lib::activity_queue::create_activity_queue; use lemmy_db_schema::source::site::Site; use serial_test::serial; @@ -203,9 +202,7 @@ mod tests { #[actix_rt::test] #[serial] async fn test_parse_lemmy_pm() { - let client = reqwest::Client::new().into(); - let manager = create_activity_queue(client); - let context = init_context(manager.queue_handle().clone()); + let context = init_context(); let url = Url::parse("https://enterprise.lemmy.ml/private_message/1621").unwrap(); let data = prepare_comment_test(&url, &context).await; let json: ChatMessage = file_to_json_object("assets/lemmy/objects/chat_message.json").unwrap(); @@ -232,9 +229,7 @@ mod tests { #[actix_rt::test] #[serial] async fn test_parse_pleroma_pm() { - let client = reqwest::Client::new().into(); - let manager = create_activity_queue(client); - let context = init_context(manager.queue_handle().clone()); + let context = init_context(); let url = Url::parse("https://enterprise.lemmy.ml/private_message/1621").unwrap(); let data = prepare_comment_test(&url, &context).await; let pleroma_url = Url::parse("https://queer.hacktivis.me/objects/2").unwrap(); diff --git a/crates/apub_lib/src/activity_queue.rs b/crates/apub_lib/src/activity_queue.rs index 9be430167..ca8645cb1 100644 --- a/crates/apub_lib/src/activity_queue.rs +++ b/crates/apub_lib/src/activity_queue.rs @@ -42,6 +42,14 @@ pub async fn send_activity( } } else { activity_queue.queue::(message).await?; + let stats = activity_queue.get_stats().await?; + info!( + "Activity queue stats: pending: {}, running: {}, dead (this hour): {}, complete (this hour): {}", + stats.pending, + stats.running, + stats.dead.this_hour(), + stats.complete.this_hour() + ); } } @@ -110,12 +118,13 @@ async fn do_send(task: SendActivityTask, client: &ClientWithMiddleware) -> Resul r } -pub fn create_activity_queue(client: ClientWithMiddleware) -> Manager { +pub fn create_activity_queue(client: ClientWithMiddleware, worker_count: u64) -> Manager { // Configure and start our workers WorkerConfig::new_managed(Storage::new(), move |_| MyState { client: client.clone(), }) .register::() + .set_worker_count("default", worker_count) .start() } diff --git a/crates/utils/src/settings/structs.rs b/crates/utils/src/settings/structs.rs index 16ead8637..30f120544 100644 --- a/crates/utils/src/settings/structs.rs +++ b/crates/utils/src/settings/structs.rs @@ -130,6 +130,9 @@ pub struct FederationConfig { /// (meaning remote communities will show content from arbitrary instances). #[default(true)] pub strict_allowlist: bool, + /// Number of workers for sending outgoing activities. + #[default(16)] + pub worker_count: u64, } #[derive(Debug, Deserialize, Serialize, Clone, SmartDefault, Document)] diff --git a/src/main.rs b/src/main.rs index 3b6986afd..a8d74a125 100644 --- a/src/main.rs +++ b/src/main.rs @@ -101,7 +101,7 @@ async fn main() -> Result<(), LemmyError> { let client = ClientBuilder::new(client).with(TracingMiddleware).build(); - let queue_manager = create_activity_queue(client.clone()); + let queue_manager = create_activity_queue(client.clone(), settings.federation.worker_count); let activity_queue = queue_manager.queue_handle().clone();