diff --git a/docs/09_sending_activities.md b/docs/09_sending_activities.md index 215be5e..34959a3 100644 --- a/docs/09_sending_activities.md +++ b/docs/09_sending_activities.md @@ -4,6 +4,46 @@ To send an activity we need to initialize our previously defined struct, and pic ``` # use activitypub_federation::config::FederationConfig; +# use activitypub_federation::activity_queue::queue_activity; +# use activitypub_federation::http_signatures::generate_actor_keypair; +# use activitypub_federation::traits::Actor; +# use activitypub_federation::fetch::object_id::ObjectId; +# use activitypub_federation::traits::tests::{DB_USER, DbConnection, Follow}; +# tokio::runtime::Runtime::new().unwrap().block_on(async { +# let db_connection = DbConnection; +# let config = FederationConfig::builder() +# .domain("example.com") +# .app_data(db_connection) +# .build().await?; +# let data = config.to_request_data(); +# let sender = DB_USER.clone(); +# let recipient = DB_USER.clone(); +let activity = Follow { + actor: ObjectId::parse("https://lemmy.ml/u/nutomic")?, + object: recipient.federation_id.clone().into(), + kind: Default::default(), + id: "https://lemmy.ml/activities/321".try_into()? +}; +let inboxes = vec![recipient.shared_inbox_or_inbox()]; + +queue_activity(activity, &sender, inboxes, &data).await?; +# Ok::<(), anyhow::Error>(()) +# }).unwrap() +``` + +The list of inboxes gets deduplicated (important for shared inbox). All inboxes on the local domain and those which fail the [crate::config::UrlVerifier] check are excluded from delivery. For each remaining inbox a background tasks is created. It signs the HTTP header with the given private key. Finally the activity is delivered to the inbox. + +It is possible that delivery fails because the target instance is temporarily unreachable. In this case the task is scheduled for retry after a certain waiting time. For each task delivery is retried up to 3 times after the initial attempt. The retry intervals are as follows: + +- one minute, in case of service restart +- one hour, in case of instance maintenance +- 2.5 days, in case of major incident with rebuild from backup + +In case [crate::config::FederationConfigBuilder::debug] is enabled, no background thread is used but activities are sent directly on the foreground. This makes it easier to catch delivery errors and avoids complicated steps to await delivery in tests. + +In some cases you may want to bypass the builtin activity queue, and implement your own. For example to specify different retry intervals, or to persist retries across application restarts. You can do it with the following code: +```rust +# use activitypub_federation::config::FederationConfig; # use activitypub_federation::activity_sending::SendActivityTask; # use activitypub_federation::http_signatures::generate_actor_keypair; # use activitypub_federation::traits::Actor; @@ -28,23 +68,8 @@ let inboxes = vec![recipient.shared_inbox_or_inbox()]; let sends = SendActivityTask::prepare(&activity, &sender, inboxes, &data).await?; for send in sends { - send.sign_and_send(&data).await?; +send.sign_and_send(&data).await?; } # Ok::<(), anyhow::Error>(()) # }).unwrap() -``` - -The list of inboxes gets deduplicated (important for shared inbox). All inboxes on the local -domain and those which fail the [crate::config::UrlVerifier] check are excluded from delivery. -For each remaining inbox a background tasks is created. It signs the HTTP header with the given -private key. Finally the activity is delivered to the inbox. - -It is possible that delivery fails because the target instance is temporarily unreachable. In -this case the task is scheduled for retry after a certain waiting time. For each task delivery -is retried up to 3 times after the initial attempt. The retry intervals are as follows: - -- one minute, in case of service restart -- one hour, in case of instance maintenance -- 2.5 days, in case of major incident with rebuild from backup - -In case [crate::config::FederationConfigBuilder::debug] is enabled, no background thread is used but activities are sent directly on the foreground. This makes it easier to catch delivery errors and avoids complicated steps to await delivery in tests. +``` \ No newline at end of file diff --git a/src/activity_queue.rs b/src/activity_queue.rs index 4179b9c..d9f0230 100644 --- a/src/activity_queue.rs +++ b/src/activity_queue.rs @@ -3,18 +3,15 @@ #![doc = include_str!("../docs/09_sending_activities.md")] use crate::{ - activity_sending::get_pkey_cached, + activity_sending::{generate_request_headers, get_pkey_cached}, config::Data, error::Error, http_signatures::sign_request, reqwest_shim::ResponseExt, traits::{ActivityHandler, Actor}, - FEDERATION_CONTENT_TYPE, }; use bytes::Bytes; use futures_core::Future; -use http::{header::HeaderName, HeaderMap, HeaderValue}; -use httpdate::fmt_http_date; use itertools::Itertools; use openssl::pkey::{PKey, Private}; use reqwest::Request; @@ -26,7 +23,7 @@ use std::{ atomic::{AtomicUsize, Ordering}, Arc, }, - time::{Duration, SystemTime}, + time::Duration, }; use tokio::{ sync::mpsc::{unbounded_channel, UnboundedSender}, @@ -35,7 +32,8 @@ use tokio::{ use tracing::{debug, info, warn}; use url::Url; -/// Send a new activity to the given inboxes +/// Send a new activity to the given inboxes with automatic retry on failure. Alternatively you +/// can implement your own queue and then send activities using [[crate::activity_sending::SendActivityTask]]. /// /// - `activity`: The activity to be sent, gets converted to json /// - `private_key`: Private key belonging to the actor who sends the activity, for signing HTTP @@ -199,28 +197,6 @@ async fn send( } } -pub(crate) fn generate_request_headers(inbox_url: &Url) -> HeaderMap { - let mut host = inbox_url.domain().expect("read inbox domain").to_string(); - if let Some(port) = inbox_url.port() { - host = format!("{}:{}", host, port); - } - - let mut headers = HeaderMap::new(); - headers.insert( - HeaderName::from_static("content-type"), - HeaderValue::from_static(FEDERATION_CONTENT_TYPE), - ); - headers.insert( - HeaderName::from_static("host"), - HeaderValue::from_str(&host).expect("Hostname is valid"), - ); - headers.insert( - "date", - HeaderValue::from_str(&fmt_http_date(SystemTime::now())).expect("Date is valid"), - ); - headers -} - /// A simple activity queue which spawns tokio workers to send out requests /// When creating a queue, it will spawn a task per worker thread /// Uses an unbounded mpsc queue for communication (i.e, all messages are in memory) diff --git a/src/activity_sending.rs b/src/activity_sending.rs index a333bb7..075f2e3 100644 --- a/src/activity_sending.rs +++ b/src/activity_sending.rs @@ -27,7 +27,8 @@ use tracing::debug; use url::Url; #[derive(Clone, Debug)] -/// all info needed to send one activity to one inbox +/// All info needed to sign and send one activity to one inbox. You should generally use +/// [[crate::activity_queue::queue_activity]] unless you want implement your own queue. pub struct SendActivityTask<'a> { pub(crate) actor_id: &'a Url, pub(crate) activity_id: &'a Url, @@ -43,7 +44,7 @@ impl Display for SendActivityTask<'_> { } impl SendActivityTask<'_> { - /// prepare an activity for sending + /// Prepare an activity for sending /// /// - `activity`: The activity to be sent, gets converted to json /// - `inboxes`: List of remote actor inboxes that should receive the activity. Ignores local actor diff --git a/src/http_signatures.rs b/src/http_signatures.rs index 59d1d48..bc27ee5 100644 --- a/src/http_signatures.rs +++ b/src/http_signatures.rs @@ -1,7 +1,7 @@ //! Generating keypairs, creating and verifying signatures //! //! Signature creation and verification is handled internally in the library. See -//! [send_activity](crate::activity_sending::send_activity) and +//! [send_activity](crate::activity_sending::SendActivityTask::sign_and_send) and //! [receive_activity (actix-web)](crate::actix_web::inbox::receive_activity) / //! [receive_activity (axum)](crate::axum::inbox::receive_activity).