From 499f037b6851ad59df905cc4603420f3c1884605 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Fri, 1 Mar 2024 10:44:10 +0100 Subject: [PATCH] more dedup --- docs/09_sending_activities.md | 2 +- examples/local_federation/objects/person.rs | 2 +- src/activity_queue.rs | 65 +++----------- src/activity_sending.rs | 97 +++++++++++---------- 4 files changed, 64 insertions(+), 102 deletions(-) diff --git a/docs/09_sending_activities.md b/docs/09_sending_activities.md index 34959a3..d5efe3a 100644 --- a/docs/09_sending_activities.md +++ b/docs/09_sending_activities.md @@ -26,7 +26,7 @@ let activity = Follow { }; let inboxes = vec![recipient.shared_inbox_or_inbox()]; -queue_activity(activity, &sender, inboxes, &data).await?; +queue_activity(&activity, &sender, inboxes, &data).await?; # Ok::<(), anyhow::Error>(()) # }).unwrap() ``` diff --git a/examples/local_federation/objects/person.rs b/examples/local_federation/objects/person.rs index 0b595c4..0ae402f 100644 --- a/examples/local_federation/objects/person.rs +++ b/examples/local_federation/objects/person.rs @@ -117,7 +117,7 @@ impl DbUser { let activity = WithContext::new_default(activity); // Send through queue in some cases and bypass it in others to test both code paths if use_queue { - queue_activity(activity, self, recipients, data).await?; + queue_activity(&activity, self, recipients, data).await?; } else { let sends = SendActivityTask::prepare(&activity, self, recipients, data).await?; for send in sends { diff --git a/src/activity_queue.rs b/src/activity_queue.rs index 9eca4e8..d314708 100644 --- a/src/activity_queue.rs +++ b/src/activity_queue.rs @@ -3,23 +3,15 @@ #![doc = include_str!("../docs/09_sending_activities.md")] use crate::{ - activity_sending::{ - filter_inboxes, - generate_request_headers, - get_pkey_cached, - send, - serialize_activity, - }, + activity_sending::{build_tasks, generate_request_headers, send, SendActivityTask}, config::Data, error::Error, http_signatures::sign_request, traits::{ActivityHandler, Actor}, }; -use bytes::Bytes; -use futures::StreamExt; + use futures_core::Future; -use itertools::Itertools; -use openssl::pkey::{PKey, Private}; + use reqwest_middleware::ClientWithMiddleware; use serde::Serialize; use std::{ @@ -47,7 +39,7 @@ use url::Url; /// inboxes. Should be built by calling [crate::traits::Actor::shared_inbox_or_inbox] /// for each target actor. pub async fn queue_activity( - activity: Activity, + activity: &Activity, actor: &ActorType, inboxes: Vec, data: &Data, @@ -58,32 +50,7 @@ where ActorType: Actor, { let config = &data.config; - let actor_id = activity.actor(); - let activity_id = activity.id(); - let activity_serialized = serialize_activity(&activity)?; - let private_key = get_pkey_cached(data, actor).await?; - - // This field is only optional to make builder work, its always present at this point - let activity_queue = config - .activity_queue - .as_ref() - .expect("Config has activity queue"); - - let tasks = futures::stream::iter(inboxes.into_iter().unique()) - .filter_map(|inbox| async { - filter_inboxes(&inbox, config) - .await - .then(|| SendActivityTask { - actor_id: actor_id.clone(), - activity_id: activity_id.clone(), - inbox, - activity: activity_serialized.clone(), - private_key: private_key.clone(), - http_signature_compat: config.http_signature_compat, - }) - }) - .collect::>() - .await; + let tasks = build_tasks(activity, actor, inboxes, data).await?; for task in tasks { // Don't use the activity queue if this is in debug mode, send and wait directly @@ -99,6 +66,11 @@ where warn!("{err}"); } } else { + // This field is only optional to make builder work, its always present at this point + let activity_queue = config + .activity_queue + .as_ref() + .expect("Config has activity queue"); activity_queue.queue(task).await?; let stats = activity_queue.get_stats(); let running = stats.running.load(Ordering::Relaxed); @@ -113,23 +85,6 @@ where Ok(()) } -// TODO: should use the existing struct but lifetimes are difficult -#[derive(Clone, Debug)] -pub(crate) struct SendActivityTask { - actor_id: Url, - activity_id: Url, - activity: Bytes, - inbox: Url, - private_key: PKey, - http_signature_compat: bool, -} - -impl Display for SendActivityTask { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{} to {}", self.activity_id, self.inbox) - } -} - async fn sign_and_send( task: &SendActivityTask, client: &ClientWithMiddleware, diff --git a/src/activity_sending.rs b/src/activity_sending.rs index 3dda441..e6951da 100644 --- a/src/activity_sending.rs +++ b/src/activity_sending.rs @@ -3,7 +3,7 @@ #![doc = include_str!("../docs/09_sending_activities.md")] use crate::{ - config::{Data, FederationConfig}, + config::Data, error::Error, http_signatures::sign_request, reqwest_shim::ResponseExt, @@ -32,60 +32,40 @@ use url::Url; #[derive(Clone, Debug)] /// All info needed to sign and send one activity to one inbox. You should generally use /// [[crate::activity_queue::queue_activity]] unless you want implement your own queue. -pub struct SendActivityTask<'a> { - pub(crate) actor_id: &'a Url, - pub(crate) activity_id: &'a Url, +pub struct SendActivityTask { + pub(crate) actor_id: Url, + pub(crate) activity_id: Url, pub(crate) activity: Bytes, pub(crate) inbox: Url, pub(crate) private_key: PKey, pub(crate) http_signature_compat: bool, } -impl Display for SendActivityTask<'_> { +impl Display for SendActivityTask { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{} to {}", self.activity_id, self.inbox) } } -impl SendActivityTask<'_> { +impl SendActivityTask { /// Prepare an activity for sending /// /// - `activity`: The activity to be sent, gets converted to json /// - `inboxes`: List of remote actor inboxes that should receive the activity. Ignores local actor /// inboxes. Should be built by calling [crate::traits::Actor::shared_inbox_or_inbox] /// for each target actor. - pub async fn prepare<'a, Activity, Datatype, ActorType>( - activity: &'a Activity, + pub async fn prepare( + activity: &Activity, actor: &ActorType, inboxes: Vec, data: &Data, - ) -> Result>, Error> + ) -> Result, Error> where Activity: ActivityHandler + Serialize + Debug, Datatype: Clone, ActorType: Actor, { - let config = &data.config; - let actor_id = activity.actor(); - let activity_id = activity.id(); - let activity_serialized = serialize_activity(activity)?; - let private_key = get_pkey_cached(data, actor).await?; - - Ok(futures::stream::iter(inboxes.into_iter().unique()) - .filter_map(|inbox| async { - filter_inboxes(&inbox, config) - .await - .then(|| SendActivityTask { - actor_id, - activity_id, - inbox, - activity: activity_serialized.clone(), - private_key: private_key.clone(), - http_signature_compat: config.http_signature_compat, - }) - }) - .collect() - .await) + build_tasks(activity, actor, inboxes, data).await } /// convert a sendactivitydata to a request, signing and sending it @@ -97,7 +77,7 @@ impl SendActivityTask<'_> { .headers(generate_request_headers(&self.inbox)); let request = sign_request( request_builder, - self.actor_id, + &self.actor_id, self.activity.clone(), self.private_key.clone(), self.http_signature_compat, @@ -144,18 +124,45 @@ pub(crate) fn serialize_activity( .into()) } -pub(crate) async fn filter_inboxes( - inbox: &Url, - config: &FederationConfig, -) -> bool { - if config.is_local_url(inbox) { - false - } else if let Err(e) = config.verify_url_valid(inbox).await { - debug!("inbox url invalid, skipping: {inbox}: {e}"); - false - } else { - true - } +pub(crate) async fn build_tasks<'a, Activity, Datatype, ActorType>( + activity: &'a Activity, + actor: &ActorType, + inboxes: Vec, + data: &Data, +) -> Result, Error> +where + Activity: ActivityHandler + Serialize + Debug, + Datatype: Clone, + ActorType: Actor, +{ + let config = &data.config; + let actor_id = activity.actor(); + let activity_id = activity.id(); + let activity_serialized = serialize_activity(activity)?; + let private_key = get_pkey_cached(data, actor).await?; + + Ok(futures::stream::iter( + inboxes + .into_iter() + .unique() + .filter(|i| !config.is_local_url(i)), + ) + .filter_map(|inbox| async { + if let Err(err) = config.verify_url_valid(&inbox).await { + debug!("inbox url invalid, skipping: {inbox}: {err}"); + return None; + }; + Some(SendActivityTask { + actor_id: actor_id.clone(), + activity_id: activity_id.clone(), + inbox, + activity: activity_serialized.clone(), + private_key: private_key.clone(), + http_signature_compat: config.http_signature_compat, + }) + }) + .collect() + .await) } pub(crate) async fn get_pkey_cached( @@ -280,8 +287,8 @@ mod tests { let keypair = generate_actor_keypair().unwrap(); let message = SendActivityTask { - actor_id: &"http://localhost:8001".parse().unwrap(), - activity_id: &"http://localhost:8001/activity".parse().unwrap(), + actor_id: "http://localhost:8001".parse().unwrap(), + activity_id: "http://localhost:8001/activity".parse().unwrap(), activity: "{}".into(), inbox: "http://localhost:8001".parse().unwrap(), private_key: keypair.private_key().unwrap(),