diff --git a/src/activity_sending.rs b/src/activity_sending.rs index 31eca80..ce53f94 100644 --- a/src/activity_sending.rs +++ b/src/activity_sending.rs @@ -12,10 +12,14 @@ use crate::{ }; use bytes::Bytes; use futures::StreamExt; +use http::StatusCode; use httpdate::fmt_http_date; use itertools::Itertools; use openssl::pkey::{PKey, Private}; -use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; +use reqwest::{ + header::{HeaderMap, HeaderName, HeaderValue}, + Response, +}; use reqwest_middleware::ClientWithMiddleware; use serde::Serialize; use std::{ @@ -89,20 +93,30 @@ impl SendActivityTask { ) .await?; let response = client.execute(request).await?; + self.handle_response(response).await + } - match response { - o if o.status().is_success() => { + /// Based on the HTTP status code determines if an activity was delivered successfully. In that case + /// Ok is returned. Otherwise it returns Err and the activity send should be retried later. + /// + /// Equivalent code in mastodon: https://github.com/mastodon/mastodon/blob/v4.2.8/app/helpers/jsonld_helper.rb#L215-L217 + async fn handle_response(&self, response: Response) -> Result<(), Error> { + match response.status() { + status if status.is_success() => { debug!("Activity {self} delivered successfully"); Ok(()) } - o if o.status().is_client_error() => { - let text = o.text_limited().await?; + status + if status.is_client_error() + && status != StatusCode::REQUEST_TIMEOUT + && status != StatusCode::TOO_MANY_REQUESTS => + { + let text = response.text_limited().await?; debug!("Activity {self} was rejected, aborting: {text}"); Ok(()) } - o => { - let status = o.status(); - let text = o.text_limited().await?; + status => { + let text = response.text_limited().await?; Err(Error::Other(format!( "Activity {self} failure with status {status}: {text}", @@ -213,7 +227,6 @@ pub(crate) fn generate_request_headers(inbox_url: &Url) -> HeaderMap { mod tests { use super::*; use crate::{config::FederationConfig, http_signatures::generate_actor_keypair}; - use http::StatusCode; use std::{ sync::{atomic::AtomicUsize, Arc}, time::Instant, @@ -287,4 +300,48 @@ mod tests { info!("Queue Sent: {:?}", start.elapsed()); Ok(()) } + + #[tokio::test] + async fn test_handle_response() { + let keypair = generate_actor_keypair().unwrap(); + let message = SendActivityTask { + actor_id: "http://localhost:8001".parse().unwrap(), + activity_id: "http://localhost:8001/activity".parse().unwrap(), + activity: "{}".into(), + inbox: "http://localhost:8001".parse().unwrap(), + private_key: keypair.private_key().unwrap(), + http_signature_compat: true, + }; + + let res = |status| { + http::Response::builder() + .status(status) + .body(vec![]) + .unwrap() + .into() + }; + + assert!(message.handle_response(res(StatusCode::OK)).await.is_ok()); + assert!(message + .handle_response(res(StatusCode::BAD_REQUEST)) + .await + .is_ok()); + + assert!(message + .handle_response(res(StatusCode::MOVED_PERMANENTLY)) + .await + .is_err()); + assert!(message + .handle_response(res(StatusCode::REQUEST_TIMEOUT)) + .await + .is_err()); + assert!(message + .handle_response(res(StatusCode::TOO_MANY_REQUESTS)) + .await + .is_err()); + assert!(message + .handle_response(res(StatusCode::INTERNAL_SERVER_ERROR)) + .await + .is_err()); + } }