mirror of
https://github.com/LemmyNet/activitypub-federation-rust.git
synced 2024-06-09 00:39:29 +00:00
Make the activity queue generic over a trait
This commit is contained in:
parent
af92e0d532
commit
912f930122
|
@ -56,6 +56,7 @@ axum = { version = "0.6.18", features = [
|
|||
], default-features = false, optional = true }
|
||||
tower = { version = "0.4.13", optional = true }
|
||||
hyper = { version = "0.14", optional = true }
|
||||
uuid = { version = "1.4.0", features = ["serde", "v4"] }
|
||||
|
||||
[features]
|
||||
default = ["actix-web", "axum"]
|
||||
|
|
|
@ -6,11 +6,11 @@ use crate::{
|
|||
DbPost,
|
||||
};
|
||||
use activitypub_federation::{
|
||||
activity_queue::send_activity,
|
||||
config::Data,
|
||||
fetch::object_id::ObjectId,
|
||||
kinds::activity::CreateType,
|
||||
protocol::{context::WithContext, helpers::deserialize_one_or_many},
|
||||
queue::{send_activity, simple_queue::SimpleQueue},
|
||||
traits::{ActivityHandler, Object},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
@ -29,7 +29,11 @@ pub struct CreatePost {
|
|||
}
|
||||
|
||||
impl CreatePost {
|
||||
pub async fn send(note: Note, inbox: Url, data: &Data<DatabaseHandle>) -> Result<(), Error> {
|
||||
pub async fn send(
|
||||
note: Note,
|
||||
inbox: Url,
|
||||
data: &Data<DatabaseHandle, SimpleQueue>,
|
||||
) -> Result<(), Error> {
|
||||
print!("Sending reply to {}", ¬e.attributed_to);
|
||||
let create = CreatePost {
|
||||
actor: note.attributed_to.clone(),
|
||||
|
@ -47,6 +51,7 @@ impl CreatePost {
|
|||
#[async_trait::async_trait]
|
||||
impl ActivityHandler for CreatePost {
|
||||
type DataType = DatabaseHandle;
|
||||
type QueueType = SimpleQueue;
|
||||
type Error = crate::error::Error;
|
||||
|
||||
fn id(&self) -> &Url {
|
||||
|
@ -57,12 +62,18 @@ impl ActivityHandler for CreatePost {
|
|||
self.actor.inner()
|
||||
}
|
||||
|
||||
async fn verify(&self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
|
||||
async fn verify(
|
||||
&self,
|
||||
data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<(), Self::Error> {
|
||||
DbPost::verify(&self.object, &self.id, data).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
|
||||
async fn receive(
|
||||
self,
|
||||
data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<(), Self::Error> {
|
||||
DbPost::from_json(self.object, data).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ use activitypub_federation::{
|
|||
config::Data,
|
||||
fetch::webfinger::{build_webfinger_response, extract_webfinger_name, Webfinger},
|
||||
protocol::context::WithContext,
|
||||
queue::simple_queue::SimpleQueue,
|
||||
traits::Object,
|
||||
};
|
||||
use axum::{
|
||||
|
@ -31,7 +32,7 @@ impl IntoResponse for Error {
|
|||
#[debug_handler]
|
||||
pub async fn http_get_user(
|
||||
Path(name): Path<String>,
|
||||
data: Data<DatabaseHandle>,
|
||||
data: Data<DatabaseHandle, SimpleQueue>,
|
||||
) -> Result<FederationJson<WithContext<Person>>, Error> {
|
||||
let db_user = data.read_user(&name)?;
|
||||
let json_user = db_user.into_json(&data).await?;
|
||||
|
@ -40,10 +41,10 @@ pub async fn http_get_user(
|
|||
|
||||
#[debug_handler]
|
||||
pub async fn http_post_user_inbox(
|
||||
data: Data<DatabaseHandle>,
|
||||
data: Data<DatabaseHandle, SimpleQueue>,
|
||||
activity_data: ActivityData,
|
||||
) -> impl IntoResponse {
|
||||
receive_activity::<WithContext<PersonAcceptedActivities>, DbUser, DatabaseHandle>(
|
||||
receive_activity::<WithContext<PersonAcceptedActivities>, DbUser, DatabaseHandle, SimpleQueue>(
|
||||
activity_data,
|
||||
&data,
|
||||
)
|
||||
|
@ -58,7 +59,7 @@ pub struct WebfingerQuery {
|
|||
#[debug_handler]
|
||||
pub async fn webfinger(
|
||||
Query(query): Query<WebfingerQuery>,
|
||||
data: Data<DatabaseHandle>,
|
||||
data: Data<DatabaseHandle, SimpleQueue>,
|
||||
) -> Result<Json<Webfinger>, Error> {
|
||||
let name = extract_webfinger_name(&query.resource, &data)?;
|
||||
let db_user = data.read_user(&name)?;
|
||||
|
|
|
@ -5,6 +5,7 @@ use activitypub_federation::{
|
|||
http_signatures::generate_actor_keypair,
|
||||
kinds::actor::PersonType,
|
||||
protocol::{public_key::PublicKey, verification::verify_domains_match},
|
||||
queue::simple_queue::SimpleQueue,
|
||||
traits::{ActivityHandler, Actor, Object},
|
||||
};
|
||||
use chrono::{Local, NaiveDateTime};
|
||||
|
@ -66,6 +67,7 @@ pub struct Person {
|
|||
#[async_trait::async_trait]
|
||||
impl Object for DbUser {
|
||||
type DataType = DatabaseHandle;
|
||||
type QueueType = SimpleQueue;
|
||||
type Kind = Person;
|
||||
type Error = Error;
|
||||
|
||||
|
@ -75,7 +77,7 @@ impl Object for DbUser {
|
|||
|
||||
async fn read_from_id(
|
||||
object_id: Url,
|
||||
data: &Data<Self::DataType>,
|
||||
data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<Option<Self>, Self::Error> {
|
||||
let users = data.users.lock().unwrap();
|
||||
let res = users
|
||||
|
@ -85,7 +87,10 @@ impl Object for DbUser {
|
|||
Ok(res)
|
||||
}
|
||||
|
||||
async fn into_json(self, _data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
|
||||
async fn into_json(
|
||||
self,
|
||||
_data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<Self::Kind, Self::Error> {
|
||||
Ok(Person {
|
||||
preferred_username: self.name.clone(),
|
||||
kind: Default::default(),
|
||||
|
@ -98,7 +103,7 @@ impl Object for DbUser {
|
|||
async fn verify(
|
||||
json: &Self::Kind,
|
||||
expected_domain: &Url,
|
||||
_data: &Data<Self::DataType>,
|
||||
_data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<(), Self::Error> {
|
||||
verify_domains_match(json.id.inner(), expected_domain)?;
|
||||
Ok(())
|
||||
|
@ -106,7 +111,7 @@ impl Object for DbUser {
|
|||
|
||||
async fn from_json(
|
||||
json: Self::Kind,
|
||||
_data: &Data<Self::DataType>,
|
||||
_data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<Self, Self::Error> {
|
||||
Ok(DbUser {
|
||||
name: json.preferred_username,
|
||||
|
|
|
@ -10,6 +10,7 @@ use activitypub_federation::{
|
|||
fetch::object_id::ObjectId,
|
||||
kinds::{object::NoteType, public},
|
||||
protocol::{helpers::deserialize_one_or_many, verification::verify_domains_match},
|
||||
queue::simple_queue::SimpleQueue,
|
||||
traits::{Actor, Object},
|
||||
};
|
||||
use activitystreams_kinds::link::MentionType;
|
||||
|
@ -48,30 +49,37 @@ pub struct Mention {
|
|||
#[async_trait::async_trait]
|
||||
impl Object for DbPost {
|
||||
type DataType = DatabaseHandle;
|
||||
type QueueType = SimpleQueue;
|
||||
type Kind = Note;
|
||||
type Error = Error;
|
||||
|
||||
async fn read_from_id(
|
||||
_object_id: Url,
|
||||
_data: &Data<Self::DataType>,
|
||||
_data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<Option<Self>, Self::Error> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
async fn into_json(self, _data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
|
||||
async fn into_json(
|
||||
self,
|
||||
_data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<Self::Kind, Self::Error> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn verify(
|
||||
json: &Self::Kind,
|
||||
expected_domain: &Url,
|
||||
_data: &Data<Self::DataType>,
|
||||
_data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<(), Self::Error> {
|
||||
verify_domains_match(json.id.inner(), expected_domain)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error> {
|
||||
async fn from_json(
|
||||
json: Self::Kind,
|
||||
data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<Self, Self::Error> {
|
||||
println!(
|
||||
"Received post with content {} and id {}",
|
||||
&json.content, &json.id
|
||||
|
|
|
@ -3,6 +3,7 @@ use activitypub_federation::{
|
|||
config::Data,
|
||||
fetch::object_id::ObjectId,
|
||||
kinds::activity::AcceptType,
|
||||
queue::simple_queue::SimpleQueue,
|
||||
traits::ActivityHandler,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
@ -32,6 +33,7 @@ impl Accept {
|
|||
#[async_trait::async_trait]
|
||||
impl ActivityHandler for Accept {
|
||||
type DataType = DatabaseHandle;
|
||||
type QueueType = SimpleQueue;
|
||||
type Error = crate::error::Error;
|
||||
|
||||
fn id(&self) -> &Url {
|
||||
|
@ -42,11 +44,17 @@ impl ActivityHandler for Accept {
|
|||
self.actor.inner()
|
||||
}
|
||||
|
||||
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
|
||||
async fn verify(
|
||||
&self,
|
||||
_data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn receive(self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
|
||||
async fn receive(
|
||||
self,
|
||||
_data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ use activitypub_federation::{
|
|||
fetch::object_id::ObjectId,
|
||||
kinds::activity::CreateType,
|
||||
protocol::helpers::deserialize_one_or_many,
|
||||
queue::simple_queue::SimpleQueue,
|
||||
traits::{ActivityHandler, Object},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
@ -40,6 +41,7 @@ impl CreatePost {
|
|||
#[async_trait::async_trait]
|
||||
impl ActivityHandler for CreatePost {
|
||||
type DataType = DatabaseHandle;
|
||||
type QueueType = SimpleQueue;
|
||||
type Error = crate::error::Error;
|
||||
|
||||
fn id(&self) -> &Url {
|
||||
|
@ -50,12 +52,18 @@ impl ActivityHandler for CreatePost {
|
|||
self.actor.inner()
|
||||
}
|
||||
|
||||
async fn verify(&self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
|
||||
async fn verify(
|
||||
&self,
|
||||
data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<(), Self::Error> {
|
||||
DbPost::verify(&self.object, &self.id, data).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
|
||||
async fn receive(
|
||||
self,
|
||||
data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<(), Self::Error> {
|
||||
DbPost::from_json(self.object, data).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ use activitypub_federation::{
|
|||
config::Data,
|
||||
fetch::object_id::ObjectId,
|
||||
kinds::activity::FollowType,
|
||||
queue::simple_queue::SimpleQueue,
|
||||
traits::{ActivityHandler, Actor},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
@ -37,6 +38,7 @@ impl Follow {
|
|||
#[async_trait::async_trait]
|
||||
impl ActivityHandler for Follow {
|
||||
type DataType = DatabaseHandle;
|
||||
type QueueType = SimpleQueue;
|
||||
type Error = crate::error::Error;
|
||||
|
||||
fn id(&self) -> &Url {
|
||||
|
@ -47,13 +49,19 @@ impl ActivityHandler for Follow {
|
|||
self.actor.inner()
|
||||
}
|
||||
|
||||
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
|
||||
async fn verify(
|
||||
&self,
|
||||
_data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Ignore clippy false positive: https://github.com/rust-lang/rust-clippy/issues/6446
|
||||
#[allow(clippy::await_holding_lock)]
|
||||
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
|
||||
async fn receive(
|
||||
self,
|
||||
data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<(), Self::Error> {
|
||||
// add to followers
|
||||
let local_user = {
|
||||
let mut users = data.users.lock().unwrap();
|
||||
|
|
|
@ -8,6 +8,7 @@ use activitypub_federation::{
|
|||
config::{Data, FederationConfig, FederationMiddleware},
|
||||
fetch::webfinger::{build_webfinger_response, extract_webfinger_name},
|
||||
protocol::context::WithContext,
|
||||
queue::simple_queue::SimpleQueue,
|
||||
traits::{Actor, Object},
|
||||
FEDERATION_CONTENT_TYPE,
|
||||
};
|
||||
|
@ -16,7 +17,7 @@ use anyhow::anyhow;
|
|||
use serde::Deserialize;
|
||||
use tracing::info;
|
||||
|
||||
pub fn listen(config: &FederationConfig<DatabaseHandle>) -> Result<(), Error> {
|
||||
pub fn listen(config: &FederationConfig<DatabaseHandle, SimpleQueue>) -> Result<(), Error> {
|
||||
let hostname = config.domain();
|
||||
info!("Listening with actix-web on {hostname}");
|
||||
let config = config.clone();
|
||||
|
@ -35,7 +36,9 @@ pub fn listen(config: &FederationConfig<DatabaseHandle>) -> Result<(), Error> {
|
|||
}
|
||||
|
||||
/// Handles requests to fetch system user json over HTTP
|
||||
pub async fn http_get_system_user(data: Data<DatabaseHandle>) -> Result<HttpResponse, Error> {
|
||||
pub async fn http_get_system_user(
|
||||
data: Data<DatabaseHandle, SimpleQueue>,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
let json_user = data.system_user.clone().into_json(&data).await?;
|
||||
Ok(HttpResponse::Ok()
|
||||
.content_type(FEDERATION_CONTENT_TYPE)
|
||||
|
@ -46,7 +49,7 @@ pub async fn http_get_system_user(data: Data<DatabaseHandle>) -> Result<HttpResp
|
|||
pub async fn http_get_user(
|
||||
request: HttpRequest,
|
||||
user_name: web::Path<String>,
|
||||
data: Data<DatabaseHandle>,
|
||||
data: Data<DatabaseHandle, SimpleQueue>,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
let signed_by = signing_actor::<DbUser>(&request, None, &data).await?;
|
||||
// here, checks can be made on the actor or the domain to which
|
||||
|
@ -71,9 +74,9 @@ pub async fn http_get_user(
|
|||
pub async fn http_post_user_inbox(
|
||||
request: HttpRequest,
|
||||
body: Bytes,
|
||||
data: Data<DatabaseHandle>,
|
||||
data: Data<DatabaseHandle, SimpleQueue>,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
receive_activity::<WithContext<PersonAcceptedActivities>, DbUser, DatabaseHandle>(
|
||||
receive_activity::<WithContext<PersonAcceptedActivities>, DbUser, DatabaseHandle, SimpleQueue>(
|
||||
request, body, &data,
|
||||
)
|
||||
.await
|
||||
|
@ -86,7 +89,7 @@ pub struct WebfingerQuery {
|
|||
|
||||
pub async fn webfinger(
|
||||
query: web::Query<WebfingerQuery>,
|
||||
data: Data<DatabaseHandle>,
|
||||
data: Data<DatabaseHandle, SimpleQueue>,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
let name = extract_webfinger_name(&query.resource, &data)?;
|
||||
let db_user = data.read_user(&name)?;
|
||||
|
|
|
@ -11,6 +11,7 @@ use activitypub_federation::{
|
|||
config::{Data, FederationConfig, FederationMiddleware},
|
||||
fetch::webfinger::{build_webfinger_response, extract_webfinger_name, Webfinger},
|
||||
protocol::context::WithContext,
|
||||
queue::simple_queue::SimpleQueue,
|
||||
traits::Object,
|
||||
};
|
||||
use axum::{
|
||||
|
@ -25,7 +26,7 @@ use serde::Deserialize;
|
|||
use std::net::ToSocketAddrs;
|
||||
use tracing::info;
|
||||
|
||||
pub fn listen(config: &FederationConfig<DatabaseHandle>) -> Result<(), Error> {
|
||||
pub fn listen(config: &FederationConfig<DatabaseHandle, SimpleQueue>) -> Result<(), Error> {
|
||||
let hostname = config.domain();
|
||||
info!("Listening with axum on {hostname}");
|
||||
let config = config.clone();
|
||||
|
@ -48,7 +49,7 @@ pub fn listen(config: &FederationConfig<DatabaseHandle>) -> Result<(), Error> {
|
|||
#[debug_handler]
|
||||
async fn http_get_user(
|
||||
Path(name): Path<String>,
|
||||
data: Data<DatabaseHandle>,
|
||||
data: Data<DatabaseHandle, SimpleQueue>,
|
||||
) -> Result<FederationJson<WithContext<Person>>, Error> {
|
||||
let db_user = data.read_user(&name)?;
|
||||
let json_user = db_user.into_json(&data).await?;
|
||||
|
@ -57,10 +58,10 @@ async fn http_get_user(
|
|||
|
||||
#[debug_handler]
|
||||
async fn http_post_user_inbox(
|
||||
data: Data<DatabaseHandle>,
|
||||
data: Data<DatabaseHandle, SimpleQueue>,
|
||||
activity_data: ActivityData,
|
||||
) -> impl IntoResponse {
|
||||
receive_activity::<WithContext<PersonAcceptedActivities>, DbUser, DatabaseHandle>(
|
||||
receive_activity::<WithContext<PersonAcceptedActivities>, DbUser, DatabaseHandle, SimpleQueue>(
|
||||
activity_data,
|
||||
&data,
|
||||
)
|
||||
|
@ -75,7 +76,7 @@ struct WebfingerQuery {
|
|||
#[debug_handler]
|
||||
async fn webfinger(
|
||||
Query(query): Query<WebfingerQuery>,
|
||||
data: Data<DatabaseHandle>,
|
||||
data: Data<DatabaseHandle, SimpleQueue>,
|
||||
) -> Result<Json<Webfinger>, Error> {
|
||||
let name = extract_webfinger_name(&query.resource, &data)?;
|
||||
let db_user = data.read_user(&name)?;
|
||||
|
|
|
@ -2,7 +2,10 @@ use crate::{
|
|||
objects::{person::DbUser, post::DbPost},
|
||||
Error,
|
||||
};
|
||||
use activitypub_federation::config::{FederationConfig, UrlVerifier};
|
||||
use activitypub_federation::{
|
||||
config::{FederationConfig, UrlVerifier},
|
||||
queue::simple_queue::SimpleQueue,
|
||||
};
|
||||
use anyhow::anyhow;
|
||||
use async_trait::async_trait;
|
||||
use std::{
|
||||
|
@ -14,7 +17,7 @@ use url::Url;
|
|||
pub async fn new_instance(
|
||||
hostname: &str,
|
||||
name: String,
|
||||
) -> Result<FederationConfig<DatabaseHandle>, Error> {
|
||||
) -> Result<FederationConfig<DatabaseHandle, SimpleQueue>, Error> {
|
||||
let mut system_user = DbUser::new(hostname, "system".into())?;
|
||||
system_user.ap_id = Url::parse(&format!("http://{}/", hostname))?.into();
|
||||
|
||||
|
@ -76,7 +79,7 @@ impl FromStr for Webserver {
|
|||
}
|
||||
|
||||
pub fn listen(
|
||||
config: &FederationConfig<DatabaseHandle>,
|
||||
config: &FederationConfig<DatabaseHandle, SimpleQueue>,
|
||||
webserver: &Webserver,
|
||||
) -> Result<(), Error> {
|
||||
match webserver {
|
||||
|
|
|
@ -6,12 +6,12 @@ use crate::{
|
|||
utils::generate_object_id,
|
||||
};
|
||||
use activitypub_federation::{
|
||||
activity_queue::send_activity,
|
||||
config::Data,
|
||||
fetch::{object_id::ObjectId, webfinger::webfinger_resolve_actor},
|
||||
http_signatures::generate_actor_keypair,
|
||||
kinds::actor::PersonType,
|
||||
protocol::{context::WithContext, public_key::PublicKey, verification::verify_domains_match},
|
||||
queue::{send_activity, simple_queue::SimpleQueue},
|
||||
traits::{ActivityHandler, Actor, Object},
|
||||
};
|
||||
use chrono::{Local, NaiveDateTime};
|
||||
|
@ -81,7 +81,11 @@ impl DbUser {
|
|||
Ok(Url::parse(&format!("{}/followers", self.ap_id.inner()))?)
|
||||
}
|
||||
|
||||
pub async fn follow(&self, other: &str, data: &Data<DatabaseHandle>) -> Result<(), Error> {
|
||||
pub async fn follow(
|
||||
&self,
|
||||
other: &str,
|
||||
data: &Data<DatabaseHandle, SimpleQueue>,
|
||||
) -> Result<(), Error> {
|
||||
let other: DbUser = webfinger_resolve_actor(other, data).await?;
|
||||
let id = generate_object_id(data.domain())?;
|
||||
let follow = Follow::new(self.ap_id.clone(), other.ap_id.clone(), id.clone());
|
||||
|
@ -90,7 +94,11 @@ impl DbUser {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn post(&self, post: DbPost, data: &Data<DatabaseHandle>) -> Result<(), Error> {
|
||||
pub async fn post(
|
||||
&self,
|
||||
post: DbPost,
|
||||
data: &Data<DatabaseHandle, SimpleQueue>,
|
||||
) -> Result<(), Error> {
|
||||
let id = generate_object_id(data.domain())?;
|
||||
let create = CreatePost::new(post.into_json(data).await?, id.clone());
|
||||
let mut inboxes = vec![];
|
||||
|
@ -106,7 +114,7 @@ impl DbUser {
|
|||
&self,
|
||||
activity: Activity,
|
||||
recipients: Vec<Url>,
|
||||
data: &Data<DatabaseHandle>,
|
||||
data: &Data<DatabaseHandle, SimpleQueue>,
|
||||
) -> Result<(), <Activity as ActivityHandler>::Error>
|
||||
where
|
||||
Activity: ActivityHandler + Serialize + Debug + Send + Sync,
|
||||
|
@ -121,6 +129,7 @@ impl DbUser {
|
|||
#[async_trait::async_trait]
|
||||
impl Object for DbUser {
|
||||
type DataType = DatabaseHandle;
|
||||
type QueueType = SimpleQueue;
|
||||
type Kind = Person;
|
||||
type Error = Error;
|
||||
|
||||
|
@ -130,7 +139,7 @@ impl Object for DbUser {
|
|||
|
||||
async fn read_from_id(
|
||||
object_id: Url,
|
||||
data: &Data<Self::DataType>,
|
||||
data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<Option<Self>, Self::Error> {
|
||||
let users = data.users.lock().unwrap();
|
||||
let res = users
|
||||
|
@ -140,7 +149,10 @@ impl Object for DbUser {
|
|||
Ok(res)
|
||||
}
|
||||
|
||||
async fn into_json(self, _data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
|
||||
async fn into_json(
|
||||
self,
|
||||
_data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<Self::Kind, Self::Error> {
|
||||
Ok(Person {
|
||||
preferred_username: self.name.clone(),
|
||||
kind: Default::default(),
|
||||
|
@ -153,13 +165,16 @@ impl Object for DbUser {
|
|||
async fn verify(
|
||||
json: &Self::Kind,
|
||||
expected_domain: &Url,
|
||||
_data: &Data<Self::DataType>,
|
||||
_data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<(), Self::Error> {
|
||||
verify_domains_match(json.id.inner(), expected_domain)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error> {
|
||||
async fn from_json(
|
||||
json: Self::Kind,
|
||||
data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<Self, Self::Error> {
|
||||
let user = DbUser {
|
||||
name: json.preferred_username,
|
||||
ap_id: json.id,
|
||||
|
|
|
@ -4,6 +4,7 @@ use activitypub_federation::{
|
|||
fetch::object_id::ObjectId,
|
||||
kinds::{object::NoteType, public},
|
||||
protocol::{helpers::deserialize_one_or_many, verification::verify_domains_match},
|
||||
queue::simple_queue::SimpleQueue,
|
||||
traits::Object,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
@ -44,12 +45,13 @@ pub struct Note {
|
|||
#[async_trait::async_trait]
|
||||
impl Object for DbPost {
|
||||
type DataType = DatabaseHandle;
|
||||
type QueueType = SimpleQueue;
|
||||
type Kind = Note;
|
||||
type Error = Error;
|
||||
|
||||
async fn read_from_id(
|
||||
object_id: Url,
|
||||
data: &Data<Self::DataType>,
|
||||
data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<Option<Self>, Self::Error> {
|
||||
let posts = data.posts.lock().unwrap();
|
||||
let res = posts
|
||||
|
@ -59,7 +61,10 @@ impl Object for DbPost {
|
|||
Ok(res)
|
||||
}
|
||||
|
||||
async fn into_json(self, data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
|
||||
async fn into_json(
|
||||
self,
|
||||
data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<Self::Kind, Self::Error> {
|
||||
let creator = self.creator.dereference_local(data).await?;
|
||||
Ok(Note {
|
||||
kind: Default::default(),
|
||||
|
@ -73,13 +78,16 @@ impl Object for DbPost {
|
|||
async fn verify(
|
||||
json: &Self::Kind,
|
||||
expected_domain: &Url,
|
||||
_data: &Data<Self::DataType>,
|
||||
_data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<(), Self::Error> {
|
||||
verify_domains_match(json.id.inner(), expected_domain)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error> {
|
||||
async fn from_json(
|
||||
json: Self::Kind,
|
||||
data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<Self, Self::Error> {
|
||||
let post = DbPost {
|
||||
text: json.content,
|
||||
ap_id: json.id,
|
||||
|
|
|
@ -1,667 +0,0 @@
|
|||
//! Queue for signing and sending outgoing activities with retry
|
||||
//!
|
||||
#![doc = include_str!("../docs/09_sending_activities.md")]
|
||||
|
||||
use crate::{
|
||||
config::Data,
|
||||
error::Error,
|
||||
http_signatures::sign_request,
|
||||
reqwest_shim::ResponseExt,
|
||||
traits::{ActivityHandler, Actor},
|
||||
FEDERATION_CONTENT_TYPE,
|
||||
};
|
||||
use anyhow::{anyhow, Context};
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures_core::Future;
|
||||
use http::{header::HeaderName, HeaderMap, HeaderValue};
|
||||
use httpdate::fmt_http_date;
|
||||
use itertools::Itertools;
|
||||
use openssl::pkey::{PKey, Private};
|
||||
use reqwest::Request;
|
||||
use reqwest_middleware::ClientWithMiddleware;
|
||||
use serde::Serialize;
|
||||
use std::{
|
||||
fmt::{Debug, Display},
|
||||
sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc,
|
||||
},
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
use tokio::{
|
||||
sync::mpsc::{unbounded_channel, UnboundedSender},
|
||||
task::{JoinHandle, JoinSet},
|
||||
};
|
||||
use tracing::{debug, info, warn};
|
||||
use url::Url;
|
||||
|
||||
/// Send a new activity to the given inboxes
|
||||
///
|
||||
/// - `activity`: The activity to be sent, gets converted to json
|
||||
/// - `private_key`: Private key belonging to the actor who sends the activity, for signing HTTP
|
||||
/// signature. Generated with [crate::http_signatures::generate_actor_keypair].
|
||||
/// - `inboxes`: List of remote actor inboxes that should receive the activity. Ignores local actor
|
||||
/// inboxes. Should be built by calling [crate::traits::Actor::shared_inbox_or_inbox]
|
||||
/// for each target actor.
|
||||
pub async fn send_activity<Activity, Datatype, ActorType>(
|
||||
activity: Activity,
|
||||
actor: &ActorType,
|
||||
inboxes: Vec<Url>,
|
||||
data: &Data<Datatype>,
|
||||
) -> Result<(), <Activity as ActivityHandler>::Error>
|
||||
where
|
||||
Activity: ActivityHandler + Serialize,
|
||||
<Activity as ActivityHandler>::Error: From<anyhow::Error> + From<serde_json::Error>,
|
||||
Datatype: Clone,
|
||||
ActorType: Actor,
|
||||
{
|
||||
let config = &data.config;
|
||||
let actor_id = activity.actor();
|
||||
let activity_id = activity.id();
|
||||
let activity_serialized: Bytes = serde_json::to_vec(&activity)?.into();
|
||||
let private_key_pem = actor
|
||||
.private_key_pem()
|
||||
.ok_or_else(|| anyhow!("Actor {actor_id} does not contain a private key for signing"))?;
|
||||
|
||||
// This is a mostly expensive blocking call, we don't want to tie up other tasks while this is happening
|
||||
let private_key = tokio::task::spawn_blocking(move || {
|
||||
PKey::private_key_from_pem(private_key_pem.as_bytes())
|
||||
.map_err(|err| anyhow!("Could not create private key from PEM data:{err}"))
|
||||
})
|
||||
.await
|
||||
.map_err(|err| anyhow!("Error joining:{err}"))??;
|
||||
|
||||
let inboxes: Vec<Url> = inboxes
|
||||
.into_iter()
|
||||
.unique()
|
||||
.filter(|i| !config.is_local_url(i))
|
||||
.collect();
|
||||
// This field is only optional to make builder work, its always present at this point
|
||||
let activity_queue = config
|
||||
.activity_queue
|
||||
.as_ref()
|
||||
.expect("Config has activity queue");
|
||||
for inbox in inboxes {
|
||||
if let Err(err) = config.verify_url_valid(&inbox).await {
|
||||
debug!("inbox url invalid, skipping: {inbox}: {err}");
|
||||
continue;
|
||||
}
|
||||
|
||||
let message = SendActivityTask {
|
||||
actor_id: actor_id.clone(),
|
||||
activity_id: activity_id.clone(),
|
||||
inbox,
|
||||
activity: activity_serialized.clone(),
|
||||
private_key: private_key.clone(),
|
||||
http_signature_compat: config.http_signature_compat,
|
||||
};
|
||||
|
||||
// Don't use the activity queue if this is in debug mode, send and wait directly
|
||||
if config.debug {
|
||||
if let Err(err) = sign_and_send(
|
||||
&message,
|
||||
&config.client,
|
||||
config.request_timeout,
|
||||
Default::default(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("{err}");
|
||||
}
|
||||
} else {
|
||||
activity_queue.queue(message).await?;
|
||||
let stats = activity_queue.get_stats();
|
||||
let running = stats.running.load(Ordering::Relaxed);
|
||||
if running == config.worker_count && config.worker_count != 0 {
|
||||
warn!("Reached max number of send activity workers ({}). Consider increasing worker count to avoid federation delays", config.worker_count);
|
||||
warn!("{:?}", stats);
|
||||
} else {
|
||||
info!("{:?}", stats);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct SendActivityTask {
|
||||
actor_id: Url,
|
||||
activity_id: Url,
|
||||
activity: Bytes,
|
||||
inbox: Url,
|
||||
private_key: PKey<Private>,
|
||||
http_signature_compat: bool,
|
||||
}
|
||||
|
||||
async fn sign_and_send(
|
||||
task: &SendActivityTask,
|
||||
client: &ClientWithMiddleware,
|
||||
timeout: Duration,
|
||||
retry_strategy: RetryStrategy,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
debug!(
|
||||
"Sending {} to {}, contents:\n {}",
|
||||
task.activity_id,
|
||||
task.inbox,
|
||||
serde_json::from_slice::<serde_json::Value>(&task.activity)?
|
||||
);
|
||||
let request_builder = client
|
||||
.post(task.inbox.to_string())
|
||||
.timeout(timeout)
|
||||
.headers(generate_request_headers(&task.inbox));
|
||||
let request = sign_request(
|
||||
request_builder,
|
||||
&task.actor_id,
|
||||
task.activity.clone(),
|
||||
task.private_key.clone(),
|
||||
task.http_signature_compat,
|
||||
)
|
||||
.await
|
||||
.context("signing request")?;
|
||||
|
||||
retry(
|
||||
|| {
|
||||
send(
|
||||
task,
|
||||
client,
|
||||
request
|
||||
.try_clone()
|
||||
.expect("The body of the request is not cloneable"),
|
||||
)
|
||||
},
|
||||
retry_strategy,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn send(
|
||||
task: &SendActivityTask,
|
||||
client: &ClientWithMiddleware,
|
||||
request: Request,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let response = client.execute(request).await;
|
||||
|
||||
match response {
|
||||
Ok(o) if o.status().is_success() => {
|
||||
debug!(
|
||||
"Activity {} delivered successfully to {}",
|
||||
task.activity_id, task.inbox
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
Ok(o) if o.status().is_client_error() => {
|
||||
let text = o.text_limited().await.map_err(Error::other)?;
|
||||
debug!(
|
||||
"Activity {} was rejected by {}, aborting: {}",
|
||||
task.activity_id, task.inbox, text,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
Ok(o) => {
|
||||
let status = o.status();
|
||||
let text = o.text_limited().await.map_err(Error::other)?;
|
||||
Err(anyhow!(
|
||||
"Queueing activity {} to {} for retry after failure with status {}: {}",
|
||||
task.activity_id,
|
||||
task.inbox,
|
||||
status,
|
||||
text,
|
||||
))
|
||||
}
|
||||
Err(e) => Err(anyhow!(
|
||||
"Queueing activity {} to {} for retry after connection failure: {}",
|
||||
task.activity_id,
|
||||
task.inbox,
|
||||
e
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn generate_request_headers(inbox_url: &Url) -> HeaderMap {
|
||||
let mut host = inbox_url.domain().expect("read inbox domain").to_string();
|
||||
if let Some(port) = inbox_url.port() {
|
||||
host = format!("{}:{}", host, port);
|
||||
}
|
||||
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(
|
||||
HeaderName::from_static("content-type"),
|
||||
HeaderValue::from_static(FEDERATION_CONTENT_TYPE),
|
||||
);
|
||||
headers.insert(
|
||||
HeaderName::from_static("host"),
|
||||
HeaderValue::from_str(&host).expect("Hostname is valid"),
|
||||
);
|
||||
headers.insert(
|
||||
"date",
|
||||
HeaderValue::from_str(&fmt_http_date(SystemTime::now())).expect("Date is valid"),
|
||||
);
|
||||
headers
|
||||
}
|
||||
|
||||
/// A simple activity queue which spawns tokio workers to send out requests
|
||||
/// When creating a queue, it will spawn a task per worker thread
|
||||
/// Uses an unbounded mpsc queue for communication (i.e, all messages are in memory)
|
||||
pub(crate) struct ActivityQueue {
|
||||
// Stats shared between the queue and workers
|
||||
stats: Arc<Stats>,
|
||||
sender: UnboundedSender<SendActivityTask>,
|
||||
sender_task: JoinHandle<()>,
|
||||
retry_sender_task: JoinHandle<()>,
|
||||
}
|
||||
|
||||
/// Simple stat counter to show where we're up to with sending messages
|
||||
/// This is a lock-free way to share things between tasks
|
||||
/// When reading these values it's possible (but extremely unlikely) to get stale data if a worker task is in the middle of transitioning
|
||||
#[derive(Default)]
|
||||
pub(crate) struct Stats {
|
||||
pending: AtomicUsize,
|
||||
running: AtomicUsize,
|
||||
retries: AtomicUsize,
|
||||
dead_last_hour: AtomicUsize,
|
||||
completed_last_hour: AtomicUsize,
|
||||
}
|
||||
|
||||
impl Debug for Stats {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"Activity queue stats: pending: {}, running: {}, retries: {}, dead: {}, complete: {}",
|
||||
self.pending.load(Ordering::Relaxed),
|
||||
self.running.load(Ordering::Relaxed),
|
||||
self.retries.load(Ordering::Relaxed),
|
||||
self.dead_last_hour.load(Ordering::Relaxed),
|
||||
self.completed_last_hour.load(Ordering::Relaxed)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Default)]
|
||||
struct RetryStrategy {
|
||||
/// Amount of time in seconds to back off
|
||||
backoff: usize,
|
||||
/// Amount of times to retry
|
||||
retries: usize,
|
||||
/// If this particular request has already been retried, you can add an offset here to increment the count to start
|
||||
offset: usize,
|
||||
/// Number of seconds to sleep before trying
|
||||
initial_sleep: usize,
|
||||
}
|
||||
|
||||
/// A tokio spawned worker which is responsible for submitting requests to federated servers
|
||||
/// This will retry up to one time with the same signature, and if it fails, will move it to the retry queue.
|
||||
/// We need to retry activity sending in case the target instances is temporarily unreachable.
|
||||
/// In this case, the task is stored and resent when the instance is hopefully back up. This
|
||||
/// list shows the retry intervals, and which events of the target instance can be covered:
|
||||
/// - 60s (one minute, service restart) -- happens in the worker w/ same signature
|
||||
/// - 60min (one hour, instance maintenance) --- happens in the retry worker
|
||||
/// - 60h (2.5 days, major incident with rebuild from backup) --- happens in the retry worker
|
||||
async fn worker(
|
||||
client: ClientWithMiddleware,
|
||||
timeout: Duration,
|
||||
message: SendActivityTask,
|
||||
retry_queue: UnboundedSender<SendActivityTask>,
|
||||
stats: Arc<Stats>,
|
||||
strategy: RetryStrategy,
|
||||
) {
|
||||
stats.pending.fetch_sub(1, Ordering::Relaxed);
|
||||
stats.running.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let outcome = sign_and_send(&message, &client, timeout, strategy).await;
|
||||
|
||||
// "Running" has finished, check the outcome
|
||||
stats.running.fetch_sub(1, Ordering::Relaxed);
|
||||
|
||||
match outcome {
|
||||
Ok(_) => {
|
||||
stats.completed_last_hour.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
Err(_err) => {
|
||||
stats.retries.fetch_add(1, Ordering::Relaxed);
|
||||
warn!(
|
||||
"Sending activity {} to {} to the retry queue to be tried again later",
|
||||
message.activity_id, message.inbox
|
||||
);
|
||||
// Send to the retry queue. Ignoring whether it succeeds or not
|
||||
retry_queue.send(message).ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn retry_worker(
|
||||
client: ClientWithMiddleware,
|
||||
timeout: Duration,
|
||||
message: SendActivityTask,
|
||||
stats: Arc<Stats>,
|
||||
strategy: RetryStrategy,
|
||||
) {
|
||||
// Because the times are pretty extravagant between retries, we have to re-sign each time
|
||||
let outcome = retry(
|
||||
|| {
|
||||
sign_and_send(
|
||||
&message,
|
||||
&client,
|
||||
timeout,
|
||||
RetryStrategy {
|
||||
backoff: 0,
|
||||
retries: 0,
|
||||
offset: 0,
|
||||
initial_sleep: 0,
|
||||
},
|
||||
)
|
||||
},
|
||||
strategy,
|
||||
)
|
||||
.await;
|
||||
|
||||
stats.retries.fetch_sub(1, Ordering::Relaxed);
|
||||
|
||||
match outcome {
|
||||
Ok(_) => {
|
||||
stats.completed_last_hour.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
Err(_err) => {
|
||||
stats.dead_last_hour.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ActivityQueue {
|
||||
fn new(
|
||||
client: ClientWithMiddleware,
|
||||
worker_count: usize,
|
||||
retry_count: usize,
|
||||
timeout: Duration,
|
||||
backoff: usize, // This should be 60 seconds by default or 1 second in tests
|
||||
) -> Self {
|
||||
let stats: Arc<Stats> = Default::default();
|
||||
|
||||
// This task clears the dead/completed stats every hour
|
||||
let hour_stats = stats.clone();
|
||||
tokio::spawn(async move {
|
||||
let duration = Duration::from_secs(3600);
|
||||
loop {
|
||||
tokio::time::sleep(duration).await;
|
||||
hour_stats.completed_last_hour.store(0, Ordering::Relaxed);
|
||||
hour_stats.dead_last_hour.store(0, Ordering::Relaxed);
|
||||
}
|
||||
});
|
||||
|
||||
let (retry_sender, mut retry_receiver) = unbounded_channel();
|
||||
let retry_stats = stats.clone();
|
||||
let retry_client = client.clone();
|
||||
|
||||
// The "fast path" retry
|
||||
// The backoff should be < 5 mins for this to work otherwise signatures may expire
|
||||
// This strategy is the one that is used with the *same* signature
|
||||
let strategy = RetryStrategy {
|
||||
backoff,
|
||||
retries: 1,
|
||||
offset: 0,
|
||||
initial_sleep: 0,
|
||||
};
|
||||
|
||||
// The "retry path" strategy
|
||||
// After the fast path fails, a task will sleep up to backoff ^ 2 and then retry again
|
||||
let retry_strategy = RetryStrategy {
|
||||
backoff,
|
||||
retries: 3,
|
||||
offset: 2,
|
||||
initial_sleep: backoff.pow(2), // wait 60 mins before even trying
|
||||
};
|
||||
|
||||
let retry_sender_task = tokio::spawn(async move {
|
||||
let mut join_set = JoinSet::new();
|
||||
|
||||
while let Some(message) = retry_receiver.recv().await {
|
||||
let retry_task = retry_worker(
|
||||
retry_client.clone(),
|
||||
timeout,
|
||||
message,
|
||||
retry_stats.clone(),
|
||||
retry_strategy,
|
||||
);
|
||||
|
||||
if retry_count > 0 {
|
||||
// If we're over the limit of retries, wait for them to finish before spawning
|
||||
while join_set.len() >= retry_count {
|
||||
join_set.join_next().await;
|
||||
}
|
||||
|
||||
join_set.spawn(retry_task);
|
||||
} else {
|
||||
// If the retry worker count is `0` then just spawn and don't use the join_set
|
||||
tokio::spawn(retry_task);
|
||||
}
|
||||
}
|
||||
|
||||
while !join_set.is_empty() {
|
||||
join_set.join_next().await;
|
||||
}
|
||||
});
|
||||
|
||||
let (sender, mut receiver) = unbounded_channel();
|
||||
|
||||
let sender_stats = stats.clone();
|
||||
|
||||
let sender_task = tokio::spawn(async move {
|
||||
let mut join_set = JoinSet::new();
|
||||
|
||||
while let Some(message) = receiver.recv().await {
|
||||
let task = worker(
|
||||
client.clone(),
|
||||
timeout,
|
||||
message,
|
||||
retry_sender.clone(),
|
||||
sender_stats.clone(),
|
||||
strategy,
|
||||
);
|
||||
|
||||
if worker_count > 0 {
|
||||
// If we're over the limit of workers, wait for them to finish before spawning
|
||||
while join_set.len() >= worker_count {
|
||||
join_set.join_next().await;
|
||||
}
|
||||
|
||||
join_set.spawn(task);
|
||||
} else {
|
||||
// If the worker count is `0` then just spawn and don't use the join_set
|
||||
tokio::spawn(task);
|
||||
}
|
||||
}
|
||||
|
||||
drop(retry_sender);
|
||||
|
||||
while !join_set.is_empty() {
|
||||
join_set.join_next().await;
|
||||
}
|
||||
});
|
||||
|
||||
Self {
|
||||
stats,
|
||||
sender,
|
||||
sender_task,
|
||||
retry_sender_task,
|
||||
}
|
||||
}
|
||||
|
||||
async fn queue(&self, message: SendActivityTask) -> Result<(), anyhow::Error> {
|
||||
self.stats.pending.fetch_add(1, Ordering::Relaxed);
|
||||
self.sender.send(message)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_stats(&self) -> &Stats {
|
||||
&self.stats
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
// Drops all the senders and shuts down the workers
|
||||
pub(crate) async fn shutdown(
|
||||
self,
|
||||
wait_for_retries: bool,
|
||||
) -> Result<Arc<Stats>, anyhow::Error> {
|
||||
drop(self.sender);
|
||||
|
||||
self.sender_task.await?;
|
||||
|
||||
if wait_for_retries {
|
||||
self.retry_sender_task.await?;
|
||||
}
|
||||
|
||||
Ok(self.stats)
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates an activity queue using tokio spawned tasks
|
||||
/// Note: requires a tokio runtime
|
||||
pub(crate) fn create_activity_queue(
|
||||
client: ClientWithMiddleware,
|
||||
worker_count: usize,
|
||||
retry_count: usize,
|
||||
request_timeout: Duration,
|
||||
) -> ActivityQueue {
|
||||
ActivityQueue::new(client, worker_count, retry_count, request_timeout, 60)
|
||||
}
|
||||
|
||||
/// Retries a future action factory function up to `amount` times with an exponential backoff timer between tries
|
||||
async fn retry<T, E: Display + Debug, F: Future<Output = Result<T, E>>, A: FnMut() -> F>(
|
||||
mut action: A,
|
||||
strategy: RetryStrategy,
|
||||
) -> Result<T, E> {
|
||||
let mut count = strategy.offset;
|
||||
|
||||
// Do an initial sleep if it's called for
|
||||
if strategy.initial_sleep > 0 {
|
||||
let sleep_dur = Duration::from_secs(strategy.initial_sleep as u64);
|
||||
tokio::time::sleep(sleep_dur).await;
|
||||
}
|
||||
|
||||
loop {
|
||||
match action().await {
|
||||
Ok(val) => return Ok(val),
|
||||
Err(err) => {
|
||||
if count < strategy.retries {
|
||||
count += 1;
|
||||
|
||||
let sleep_amt = strategy.backoff.pow(count as u32) as u64;
|
||||
let sleep_dur = Duration::from_secs(sleep_amt);
|
||||
warn!("{err:?}. Sleeping for {sleep_dur:?} and trying again");
|
||||
tokio::time::sleep(sleep_dur).await;
|
||||
continue;
|
||||
} else {
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use axum::extract::State;
|
||||
use bytes::Bytes;
|
||||
use http::StatusCode;
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::http_signatures::generate_actor_keypair;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[allow(unused)]
|
||||
// This will periodically send back internal errors to test the retry
|
||||
async fn dodgy_handler(
|
||||
State(state): State<Arc<AtomicUsize>>,
|
||||
headers: HeaderMap,
|
||||
body: Bytes,
|
||||
) -> Result<(), StatusCode> {
|
||||
debug!("Headers:{:?}", headers);
|
||||
debug!("Body len:{}", body.len());
|
||||
|
||||
if state.fetch_add(1, Ordering::Relaxed) % 20 == 0 {
|
||||
return Err(StatusCode::INTERNAL_SERVER_ERROR);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn test_server() {
|
||||
use axum::{routing::post, Router};
|
||||
|
||||
// We should break every now and then ;)
|
||||
let state = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
let app = Router::new()
|
||||
.route("/", post(dodgy_handler))
|
||||
.with_state(state);
|
||||
|
||||
axum::Server::bind(&"0.0.0.0:8001".parse().unwrap())
|
||||
.serve(app.into_make_service())
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
// Queues 100 messages and then asserts that the worker runs them
|
||||
async fn test_activity_queue_workers() {
|
||||
let num_workers = 64;
|
||||
let num_messages: usize = 100;
|
||||
|
||||
tokio::spawn(test_server());
|
||||
|
||||
/*
|
||||
// uncomment for debug logs & stats
|
||||
use tracing::log::LevelFilter;
|
||||
|
||||
env_logger::builder()
|
||||
.filter_level(LevelFilter::Warn)
|
||||
.filter_module("activitypub_federation", LevelFilter::Info)
|
||||
.format_timestamp(None)
|
||||
.init();
|
||||
|
||||
*/
|
||||
|
||||
let activity_queue = ActivityQueue::new(
|
||||
reqwest::Client::default().into(),
|
||||
num_workers,
|
||||
num_workers,
|
||||
Duration::from_secs(10),
|
||||
1,
|
||||
);
|
||||
|
||||
let keypair = generate_actor_keypair().unwrap();
|
||||
|
||||
let message = SendActivityTask {
|
||||
actor_id: "http://localhost:8001".parse().unwrap(),
|
||||
activity_id: "http://localhost:8001/activity".parse().unwrap(),
|
||||
activity: "{}".into(),
|
||||
inbox: "http://localhost:8001".parse().unwrap(),
|
||||
private_key: keypair.private_key().unwrap(),
|
||||
http_signature_compat: true,
|
||||
};
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
for _ in 0..num_messages {
|
||||
activity_queue.queue(message.clone()).await.unwrap();
|
||||
}
|
||||
|
||||
info!("Queue Sent: {:?}", start.elapsed());
|
||||
|
||||
let stats = activity_queue.shutdown(true).await.unwrap();
|
||||
|
||||
info!(
|
||||
"Queue Finished. Num msgs: {}, Time {:?}, msg/s: {:0.0}",
|
||||
num_messages,
|
||||
start.elapsed(),
|
||||
num_messages as f64 / start.elapsed().as_secs_f64()
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
stats.completed_last_hour.load(Ordering::Relaxed),
|
||||
num_messages
|
||||
);
|
||||
}
|
||||
}
|
|
@ -5,6 +5,7 @@ use crate::{
|
|||
error::Error,
|
||||
fetch::object_id::ObjectId,
|
||||
http_signatures::{verify_body_hash, verify_signature},
|
||||
queue::ActivityQueue,
|
||||
traits::{ActivityHandler, Actor, Object},
|
||||
};
|
||||
use actix_web::{web::Bytes, HttpRequest, HttpResponse};
|
||||
|
@ -15,14 +16,17 @@ use tracing::debug;
|
|||
/// Handles incoming activities, verifying HTTP signatures and other checks
|
||||
///
|
||||
/// After successful validation, activities are passed to respective [trait@ActivityHandler].
|
||||
pub async fn receive_activity<Activity, ActorT, Datatype>(
|
||||
pub async fn receive_activity<Activity, ActorT, Datatype, Queuetype>(
|
||||
request: HttpRequest,
|
||||
body: Bytes,
|
||||
data: &Data<Datatype>,
|
||||
data: &Data<Datatype, Queuetype>,
|
||||
) -> Result<HttpResponse, <Activity as ActivityHandler>::Error>
|
||||
where
|
||||
Activity: ActivityHandler<DataType = Datatype> + DeserializeOwned + Send + 'static,
|
||||
ActorT: Object<DataType = Datatype> + Actor + Send + 'static,
|
||||
Activity: ActivityHandler<DataType = Datatype, QueueType = Queuetype>
|
||||
+ DeserializeOwned
|
||||
+ Send
|
||||
+ 'static,
|
||||
ActorT: Object<DataType = Datatype, QueueType = Queuetype> + Actor + Send + 'static,
|
||||
for<'de2> <ActorT as Object>::Kind: serde::Deserialize<'de2>,
|
||||
<Activity as ActivityHandler>::Error: From<anyhow::Error>
|
||||
+ From<Error>
|
||||
|
@ -30,6 +34,7 @@ where
|
|||
+ From<serde_json::Error>,
|
||||
<ActorT as Object>::Error: From<Error> + From<anyhow::Error>,
|
||||
Datatype: Clone,
|
||||
Queuetype: ActivityQueue,
|
||||
{
|
||||
verify_body_hash(request.headers().get("Digest"), &body)?;
|
||||
|
||||
|
@ -57,9 +62,9 @@ where
|
|||
mod test {
|
||||
use super::*;
|
||||
use crate::{
|
||||
activity_queue::generate_request_headers,
|
||||
config::FederationConfig,
|
||||
http_signatures::sign_request,
|
||||
queue::{request::generate_request_headers, simple_queue::SimpleQueue},
|
||||
traits::tests::{DbConnection, DbUser, Follow, DB_USER_KEYPAIR},
|
||||
};
|
||||
use actix_web::test::TestRequest;
|
||||
|
@ -70,7 +75,7 @@ mod test {
|
|||
#[tokio::test]
|
||||
async fn test_receive_activity() {
|
||||
let (body, incoming_request, config) = setup_receive_test().await;
|
||||
receive_activity::<Follow, DbUser, DbConnection>(
|
||||
receive_activity::<Follow, DbUser, DbConnection, SimpleQueue>(
|
||||
incoming_request.to_http_request(),
|
||||
body,
|
||||
&config.to_request_data(),
|
||||
|
@ -82,7 +87,7 @@ mod test {
|
|||
#[tokio::test]
|
||||
async fn test_receive_activity_invalid_body_signature() {
|
||||
let (_, incoming_request, config) = setup_receive_test().await;
|
||||
let err = receive_activity::<Follow, DbUser, DbConnection>(
|
||||
let err = receive_activity::<Follow, DbUser, DbConnection, SimpleQueue>(
|
||||
incoming_request.to_http_request(),
|
||||
"invalid".into(),
|
||||
&config.to_request_data(),
|
||||
|
@ -99,7 +104,7 @@ mod test {
|
|||
async fn test_receive_activity_invalid_path() {
|
||||
let (body, incoming_request, config) = setup_receive_test().await;
|
||||
let incoming_request = incoming_request.uri("/wrong");
|
||||
let err = receive_activity::<Follow, DbUser, DbConnection>(
|
||||
let err = receive_activity::<Follow, DbUser, DbConnection, SimpleQueue>(
|
||||
incoming_request.to_http_request(),
|
||||
body,
|
||||
&config.to_request_data(),
|
||||
|
@ -112,7 +117,11 @@ mod test {
|
|||
assert_eq!(e, &Error::ActivitySignatureInvalid)
|
||||
}
|
||||
|
||||
async fn setup_receive_test() -> (Bytes, TestRequest, FederationConfig<DbConnection>) {
|
||||
async fn setup_receive_test() -> (
|
||||
Bytes,
|
||||
TestRequest,
|
||||
FederationConfig<DbConnection, SimpleQueue>,
|
||||
) {
|
||||
let inbox = "https://example.com/inbox";
|
||||
let headers = generate_request_headers(&Url::parse(inbox).unwrap());
|
||||
let request_builder = ClientWithMiddleware::from(Client::default())
|
||||
|
|
|
@ -1,4 +1,7 @@
|
|||
use crate::config::{Data, FederationConfig, FederationMiddleware};
|
||||
use crate::{
|
||||
config::{Data, FederationConfig, FederationMiddleware},
|
||||
queue::ActivityQueue,
|
||||
};
|
||||
use actix_web::{
|
||||
dev::{forward_ready, Payload, Service, ServiceRequest, ServiceResponse, Transform},
|
||||
Error,
|
||||
|
@ -8,45 +11,47 @@ use actix_web::{
|
|||
};
|
||||
use std::future::{ready, Ready};
|
||||
|
||||
impl<S, B, T> Transform<S, ServiceRequest> for FederationMiddleware<T>
|
||||
impl<S, B, T, Q> Transform<S, ServiceRequest> for FederationMiddleware<T, Q>
|
||||
where
|
||||
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
|
||||
S::Future: 'static,
|
||||
B: 'static,
|
||||
T: Clone + Sync + 'static,
|
||||
Q: ActivityQueue + Sync + 'static,
|
||||
{
|
||||
type Response = ServiceResponse<B>;
|
||||
type Error = Error;
|
||||
type Transform = FederationService<S, T>;
|
||||
type Transform = FederationService<S, T, Q>;
|
||||
type InitError = ();
|
||||
type Future = Ready<Result<Self::Transform, Self::InitError>>;
|
||||
|
||||
fn new_transform(&self, service: S) -> Self::Future {
|
||||
ready(Ok(FederationService {
|
||||
service,
|
||||
config: self.0.clone(),
|
||||
config: self.config.clone(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
/// Passes [FederationConfig] to HTTP handlers, converting it to [Data] in the process
|
||||
#[doc(hidden)]
|
||||
pub struct FederationService<S, T: Clone>
|
||||
pub struct FederationService<S, T: Clone, Q: ActivityQueue>
|
||||
where
|
||||
S: Service<ServiceRequest, Error = Error>,
|
||||
S::Future: 'static,
|
||||
T: Sync,
|
||||
{
|
||||
service: S,
|
||||
config: FederationConfig<T>,
|
||||
config: FederationConfig<T, Q>,
|
||||
}
|
||||
|
||||
impl<S, B, T> Service<ServiceRequest> for FederationService<S, T>
|
||||
impl<S, B, T, Q> Service<ServiceRequest> for FederationService<S, T, Q>
|
||||
where
|
||||
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
|
||||
S::Future: 'static,
|
||||
B: 'static,
|
||||
T: Clone + Sync + 'static,
|
||||
Q: ActivityQueue + Sync + 'static,
|
||||
{
|
||||
type Response = ServiceResponse<B>;
|
||||
type Error = Error;
|
||||
|
@ -61,12 +66,12 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<T: Clone + 'static> FromRequest for Data<T> {
|
||||
impl<T: Clone + 'static, Q: ActivityQueue + 'static> FromRequest for Data<T, Q> {
|
||||
type Error = Error;
|
||||
type Future = Ready<Result<Self, Self::Error>>;
|
||||
|
||||
fn from_request(req: &HttpRequest, _payload: &mut Payload) -> Self::Future {
|
||||
ready(match req.extensions().get::<FederationConfig<T>>() {
|
||||
ready(match req.extensions().get::<FederationConfig<T, Q>>() {
|
||||
Some(c) => Ok(c.to_request_data()),
|
||||
None => Err(actix_web::error::ErrorBadRequest(
|
||||
"Missing extension, did you register FederationMiddleware?",
|
||||
|
|
|
@ -18,7 +18,7 @@ use serde::Deserialize;
|
|||
pub async fn signing_actor<A>(
|
||||
request: &HttpRequest,
|
||||
body: Option<Bytes>,
|
||||
data: &Data<<A as Object>::DataType>,
|
||||
data: &Data<<A as Object>::DataType, <A as Object>::QueueType>,
|
||||
) -> Result<A, <A as Object>::Error>
|
||||
where
|
||||
A: Object + Actor,
|
||||
|
|
|
@ -7,6 +7,7 @@ use crate::{
|
|||
error::Error,
|
||||
fetch::object_id::ObjectId,
|
||||
http_signatures::{verify_body_hash, verify_signature},
|
||||
queue::ActivityQueue,
|
||||
traits::{ActivityHandler, Actor, Object},
|
||||
};
|
||||
use axum::{
|
||||
|
@ -21,13 +22,16 @@ use serde::de::DeserializeOwned;
|
|||
use tracing::debug;
|
||||
|
||||
/// Handles incoming activities, verifying HTTP signatures and other checks
|
||||
pub async fn receive_activity<Activity, ActorT, Datatype>(
|
||||
pub async fn receive_activity<Activity, ActorT, Datatype, Queuetype>(
|
||||
activity_data: ActivityData,
|
||||
data: &Data<Datatype>,
|
||||
data: &Data<Datatype, Queuetype>,
|
||||
) -> Result<(), <Activity as ActivityHandler>::Error>
|
||||
where
|
||||
Activity: ActivityHandler<DataType = Datatype> + DeserializeOwned + Send + 'static,
|
||||
ActorT: Object<DataType = Datatype> + Actor + Send + 'static,
|
||||
Activity: ActivityHandler<DataType = Datatype, QueueType = Queuetype>
|
||||
+ DeserializeOwned
|
||||
+ Send
|
||||
+ 'static,
|
||||
ActorT: Object<DataType = Datatype, QueueType = Queuetype> + Actor + Send + 'static,
|
||||
for<'de2> <ActorT as Object>::Kind: serde::Deserialize<'de2>,
|
||||
<Activity as ActivityHandler>::Error: From<anyhow::Error>
|
||||
+ From<Error>
|
||||
|
@ -35,6 +39,7 @@ where
|
|||
+ From<serde_json::Error>,
|
||||
<ActorT as Object>::Error: From<Error> + From<anyhow::Error>,
|
||||
Datatype: Clone,
|
||||
Queuetype: ActivityQueue,
|
||||
{
|
||||
verify_body_hash(activity_data.headers.get("Digest"), &activity_data.body)?;
|
||||
|
||||
|
|
|
@ -1,33 +1,45 @@
|
|||
use crate::config::{Data, FederationConfig, FederationMiddleware};
|
||||
use crate::{
|
||||
config::{Data, FederationConfig, FederationMiddleware},
|
||||
queue::ActivityQueue,
|
||||
};
|
||||
use axum::{async_trait, body::Body, extract::FromRequestParts, http::Request, response::Response};
|
||||
use http::{request::Parts, StatusCode};
|
||||
use std::task::{Context, Poll};
|
||||
use tower::{Layer, Service};
|
||||
|
||||
impl<S, T: Clone> Layer<S> for FederationMiddleware<T> {
|
||||
type Service = FederationService<S, T>;
|
||||
impl<S, T: Clone, Q: ActivityQueue> Layer<S> for FederationMiddleware<T, Q> {
|
||||
type Service = FederationService<S, T, Q>;
|
||||
|
||||
fn layer(&self, inner: S) -> Self::Service {
|
||||
FederationService {
|
||||
inner,
|
||||
config: self.0.clone(),
|
||||
config: self.config.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Passes [FederationConfig] to HTTP handlers, converting it to [Data] in the process
|
||||
#[doc(hidden)]
|
||||
#[derive(Clone)]
|
||||
pub struct FederationService<S, T: Clone> {
|
||||
pub struct FederationService<S, T: Clone, Q: ActivityQueue> {
|
||||
inner: S,
|
||||
config: FederationConfig<T>,
|
||||
config: FederationConfig<T, Q>,
|
||||
}
|
||||
|
||||
impl<S, T> Service<Request<Body>> for FederationService<S, T>
|
||||
impl<S: Clone, T: Clone, Q: ActivityQueue> Clone for FederationService<S, T, Q> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
inner: self.inner.clone(),
|
||||
config: self.config.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, T, Q> Service<Request<Body>> for FederationService<S, T, Q>
|
||||
where
|
||||
S: Service<Request<Body>, Response = Response> + Send + 'static,
|
||||
S::Future: Send + 'static,
|
||||
T: Clone + Send + Sync + 'static,
|
||||
Q: ActivityQueue + Send + Sync + 'static,
|
||||
{
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
|
@ -44,15 +56,16 @@ where
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<S, T: Clone + 'static> FromRequestParts<S> for Data<T>
|
||||
impl<S, T: Clone + 'static, Q: ActivityQueue + 'static> FromRequestParts<S> for Data<T, Q>
|
||||
where
|
||||
S: Send + Sync,
|
||||
T: Send + Sync,
|
||||
Q: Send + Sync,
|
||||
{
|
||||
type Rejection = (StatusCode, &'static str);
|
||||
|
||||
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
|
||||
match parts.extensions.get::<FederationConfig<T>>() {
|
||||
match parts.extensions.get::<FederationConfig<T, Q>>() {
|
||||
Some(c) => Ok(c.to_request_data()),
|
||||
None => Err((
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
|
|
139
src/config.rs
139
src/config.rs
|
@ -16,9 +16,9 @@
|
|||
//! ```
|
||||
|
||||
use crate::{
|
||||
activity_queue::{create_activity_queue, ActivityQueue},
|
||||
error::Error,
|
||||
protocol::verification::verify_domains_match,
|
||||
queue::{simple_queue::SimpleQueue, ActivityQueue},
|
||||
traits::{ActivityHandler, Actor},
|
||||
};
|
||||
use anyhow::Context;
|
||||
|
@ -39,9 +39,9 @@ use std::{
|
|||
use url::Url;
|
||||
|
||||
/// Configuration for this library, with various federation related settings
|
||||
#[derive(Builder, Clone)]
|
||||
#[builder(build_fn(private, name = "partial_build"))]
|
||||
pub struct FederationConfig<T: Clone> {
|
||||
#[derive(Builder)]
|
||||
#[builder(build_fn(private, name = "partial_build"), pattern = "owned")]
|
||||
pub struct FederationConfig<T: Clone, Q: ActivityQueue + ?Sized> {
|
||||
/// The domain where this federated instance is running
|
||||
#[builder(setter(into))]
|
||||
pub(crate) domain: String,
|
||||
|
@ -94,13 +94,34 @@ pub struct FederationConfig<T: Clone> {
|
|||
pub(crate) signed_fetch_actor: Option<Arc<(Url, PKey<Private>)>>,
|
||||
/// Queue for sending outgoing activities. Only optional to make builder work, its always
|
||||
/// present once constructed.
|
||||
#[builder(setter(skip))]
|
||||
pub(crate) activity_queue: Option<Arc<ActivityQueue>>,
|
||||
#[builder(default = "None", setter(custom))]
|
||||
pub(crate) activity_queue: Option<Arc<Q>>,
|
||||
}
|
||||
|
||||
impl<T: Clone> FederationConfig<T> {
|
||||
// No clue why this can't be derived, so just implemented manually...
|
||||
impl<T: Clone, Q: ActivityQueue> Clone for FederationConfig<T, Q> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
domain: self.domain.clone(),
|
||||
app_data: self.app_data.clone(),
|
||||
http_fetch_limit: self.http_fetch_limit.clone(),
|
||||
client: self.client.clone(),
|
||||
worker_count: self.worker_count.clone(),
|
||||
retry_count: self.retry_count.clone(),
|
||||
debug: self.debug.clone(),
|
||||
allow_http_urls: self.allow_http_urls.clone(),
|
||||
request_timeout: self.request_timeout.clone(),
|
||||
url_verifier: self.url_verifier.clone(),
|
||||
http_signature_compat: self.http_signature_compat.clone(),
|
||||
signed_fetch_actor: self.signed_fetch_actor.clone(),
|
||||
activity_queue: self.activity_queue.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Clone, Q: ActivityQueue> FederationConfig<T, Q> {
|
||||
/// Returns a new config builder with default values.
|
||||
pub fn builder() -> FederationConfigBuilder<T> {
|
||||
pub fn builder() -> FederationConfigBuilder<T, Q> {
|
||||
FederationConfigBuilder::default()
|
||||
}
|
||||
|
||||
|
@ -123,7 +144,7 @@ impl<T: Clone> FederationConfig<T> {
|
|||
}
|
||||
|
||||
/// Create new [Data] from this. You should prefer to use a middleware if possible.
|
||||
pub fn to_request_data(&self) -> Data<T> {
|
||||
pub fn to_request_data(&self) -> Data<T, Q> {
|
||||
Data {
|
||||
config: self.clone(),
|
||||
request_counter: Default::default(),
|
||||
|
@ -184,6 +205,43 @@ impl<T: Clone> FederationConfig<T> {
|
|||
pub fn domain(&self) -> &str {
|
||||
&self.domain
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Clone, Q: ActivityQueue> FederationConfigBuilder<T, Q> {
|
||||
/// Sets an actor to use to sign all federated fetch requests
|
||||
pub fn signed_fetch_actor<A: Actor>(mut self, actor: &A) -> Self {
|
||||
let private_key_pem = actor
|
||||
.private_key_pem()
|
||||
.expect("actor does not have a private key to sign with");
|
||||
|
||||
let private_key = PKey::private_key_from_pem(private_key_pem.as_bytes())
|
||||
.expect("Could not decode PEM data");
|
||||
self.signed_fetch_actor = Some(Some(Arc::new((actor.id(), private_key))));
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets an actor to use to sign all federated fetch requests
|
||||
pub fn activity_queue(mut self, queue: Q) -> Self {
|
||||
self.activity_queue = Some(Some(Arc::new(queue)));
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Clone> FederationConfigBuilder<T, SimpleQueue> {
|
||||
/// Constructs a new config instance with the values supplied to builder.
|
||||
///
|
||||
/// Values which are not explicitly specified use the defaults. Also initializes the
|
||||
/// queue for outgoing activities, which is stored internally in the config struct.
|
||||
/// Requires a tokio runtime for the background queue.
|
||||
pub async fn build(
|
||||
self,
|
||||
) -> Result<FederationConfig<T, SimpleQueue>, FederationConfigBuilderError> {
|
||||
let mut config = self.partial_build()?;
|
||||
config.activity_queue = Some(Arc::new(SimpleQueue::from_config(&config)));
|
||||
Ok(config)
|
||||
}
|
||||
}
|
||||
impl<T: Clone> FederationConfig<T, SimpleQueue> {
|
||||
/// Shut down this federation, waiting for the outgoing queue to be sent.
|
||||
/// If the activityqueue is still in use in other requests or was never constructed, returns an error.
|
||||
/// If wait_retries is true, also wait for requests that have initially failed and are being retried.
|
||||
|
@ -196,7 +254,7 @@ impl<T: Clone> FederationConfig<T> {
|
|||
.take()
|
||||
.context("ActivityQueue never constructed, build() not called?")?;
|
||||
// Todo: use Arc::into_inner but is only part of rust 1.70.
|
||||
let stats = Arc::<ActivityQueue>::try_unwrap(q)
|
||||
let stats = Arc::<SimpleQueue>::try_unwrap(q)
|
||||
.map_err(|_| {
|
||||
anyhow::anyhow!(
|
||||
"Could not cleanly shut down: activityqueue arc was still in use elsewhere "
|
||||
|
@ -207,39 +265,7 @@ impl<T: Clone> FederationConfig<T> {
|
|||
Ok(stats)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Clone> FederationConfigBuilder<T> {
|
||||
/// Sets an actor to use to sign all federated fetch requests
|
||||
pub fn signed_fetch_actor<A: Actor>(&mut self, actor: &A) -> &mut Self {
|
||||
let private_key_pem = actor
|
||||
.private_key_pem()
|
||||
.expect("actor does not have a private key to sign with");
|
||||
|
||||
let private_key = PKey::private_key_from_pem(private_key_pem.as_bytes())
|
||||
.expect("Could not decode PEM data");
|
||||
self.signed_fetch_actor = Some(Some(Arc::new((actor.id(), private_key))));
|
||||
self
|
||||
}
|
||||
|
||||
/// Constructs a new config instance with the values supplied to builder.
|
||||
///
|
||||
/// Values which are not explicitly specified use the defaults. Also initializes the
|
||||
/// queue for outgoing activities, which is stored internally in the config struct.
|
||||
/// Requires a tokio runtime for the background queue.
|
||||
pub async fn build(&mut self) -> Result<FederationConfig<T>, FederationConfigBuilderError> {
|
||||
let mut config = self.partial_build()?;
|
||||
let queue = create_activity_queue(
|
||||
config.client.clone(),
|
||||
config.worker_count,
|
||||
config.retry_count,
|
||||
config.request_timeout,
|
||||
);
|
||||
config.activity_queue = Some(Arc::new(queue));
|
||||
Ok(config)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Clone> Deref for FederationConfig<T> {
|
||||
impl<T: Clone, Q: ActivityQueue> Deref for FederationConfig<T, Q> {
|
||||
type Target = T;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
|
@ -308,12 +334,12 @@ clone_trait_object!(UrlVerifier);
|
|||
/// prevent denial of service attacks, where an attacker triggers fetching of recursive objects.
|
||||
///
|
||||
/// <https://www.w3.org/TR/activitypub/#security-recursive-objects>
|
||||
pub struct Data<T: Clone> {
|
||||
pub(crate) config: FederationConfig<T>,
|
||||
pub struct Data<T: Clone, Q: ActivityQueue> {
|
||||
pub(crate) config: FederationConfig<T, Q>,
|
||||
pub(crate) request_counter: AtomicU32,
|
||||
}
|
||||
|
||||
impl<T: Clone> Data<T> {
|
||||
impl<T: Clone, Q: ActivityQueue> Data<T, Q> {
|
||||
/// Returns the data which was stored in [FederationConfigBuilder::app_data]
|
||||
pub fn app_data(&self) -> &T {
|
||||
&self.config.app_data
|
||||
|
@ -337,7 +363,7 @@ impl<T: Clone> Data<T> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T: Clone> Deref for Data<T> {
|
||||
impl<T: Clone, Q: ActivityQueue> Deref for Data<T, Q> {
|
||||
type Target = T;
|
||||
|
||||
fn deref(&self) -> &T {
|
||||
|
@ -346,12 +372,21 @@ impl<T: Clone> Deref for Data<T> {
|
|||
}
|
||||
|
||||
/// Middleware for HTTP handlers which provides access to [Data]
|
||||
#[derive(Clone)]
|
||||
pub struct FederationMiddleware<T: Clone>(pub(crate) FederationConfig<T>);
|
||||
pub struct FederationMiddleware<T: Clone, Q: ActivityQueue> {
|
||||
pub(crate) config: FederationConfig<T, Q>,
|
||||
}
|
||||
|
||||
impl<T: Clone> FederationMiddleware<T> {
|
||||
impl<T: Clone, Q: ActivityQueue> FederationMiddleware<T, Q> {
|
||||
/// Construct a new middleware instance
|
||||
pub fn new(config: FederationConfig<T>) -> Self {
|
||||
FederationMiddleware(config)
|
||||
pub fn new(config: FederationConfig<T, Q>) -> Self {
|
||||
FederationMiddleware { config }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Clone, Q: ActivityQueue> Clone for FederationMiddleware<T, Q> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
config: self.config.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ where
|
|||
pub async fn dereference(
|
||||
&self,
|
||||
owner: &<Kind as Collection>::Owner,
|
||||
data: &Data<<Kind as Collection>::DataType>,
|
||||
data: &Data<<Kind as Collection>::DataType, <Kind as Collection>::QueueType>,
|
||||
) -> Result<Kind, <Kind as Collection>::Error>
|
||||
where
|
||||
<Kind as Collection>::Error: From<Error>,
|
||||
|
|
|
@ -6,6 +6,7 @@ use crate::{
|
|||
config::Data,
|
||||
error::Error,
|
||||
http_signatures::sign_request,
|
||||
queue::ActivityQueue,
|
||||
reqwest_shim::ResponseExt,
|
||||
FEDERATION_CONTENT_TYPE,
|
||||
};
|
||||
|
@ -33,9 +34,9 @@ pub mod webfinger;
|
|||
/// If the value exceeds [FederationSettings.http_fetch_limit], the request is aborted with
|
||||
/// [Error::RequestLimit]. This prevents denial of service attacks where an attack triggers
|
||||
/// infinite, recursive fetching of data.
|
||||
pub async fn fetch_object_http<T: Clone, Kind: DeserializeOwned>(
|
||||
pub async fn fetch_object_http<T: Clone, Kind: DeserializeOwned, Q: ActivityQueue>(
|
||||
url: &Url,
|
||||
data: &Data<T>,
|
||||
data: &Data<T, Q>,
|
||||
) -> Result<Kind, Error> {
|
||||
let config = &data.config;
|
||||
// dont fetch local objects this way
|
||||
|
|
|
@ -87,7 +87,7 @@ where
|
|||
/// Fetches an activitypub object, either from local database (if possible), or over http.
|
||||
pub async fn dereference(
|
||||
&self,
|
||||
data: &Data<<Kind as Object>::DataType>,
|
||||
data: &Data<<Kind as Object>::DataType, <Kind as Object>::QueueType>,
|
||||
) -> Result<Kind, <Kind as Object>::Error>
|
||||
where
|
||||
<Kind as Object>::Error: From<Error> + From<anyhow::Error>,
|
||||
|
@ -121,7 +121,7 @@ where
|
|||
/// the object is not found in the database.
|
||||
pub async fn dereference_local(
|
||||
&self,
|
||||
data: &Data<<Kind as Object>::DataType>,
|
||||
data: &Data<<Kind as Object>::DataType, <Kind as Object>::QueueType>,
|
||||
) -> Result<Kind, <Kind as Object>::Error>
|
||||
where
|
||||
<Kind as Object>::Error: From<Error>,
|
||||
|
@ -133,7 +133,7 @@ where
|
|||
/// returning none means the object was not found in local db
|
||||
async fn dereference_from_db(
|
||||
&self,
|
||||
data: &Data<<Kind as Object>::DataType>,
|
||||
data: &Data<<Kind as Object>::DataType, <Kind as Object>::QueueType>,
|
||||
) -> Result<Option<Kind>, <Kind as Object>::Error> {
|
||||
let id = self.0.clone();
|
||||
Object::read_from_id(*id, data).await
|
||||
|
@ -141,7 +141,7 @@ where
|
|||
|
||||
async fn dereference_from_http(
|
||||
&self,
|
||||
data: &Data<<Kind as Object>::DataType>,
|
||||
data: &Data<<Kind as Object>::DataType, <Kind as Object>::QueueType>,
|
||||
db_object: Option<Kind>,
|
||||
) -> Result<Kind, <Kind as Object>::Error>
|
||||
where
|
||||
|
|
|
@ -2,6 +2,7 @@ use crate::{
|
|||
config::Data,
|
||||
error::{Error, Error::WebfingerResolveFailed},
|
||||
fetch::{fetch_object_http, object_id::ObjectId},
|
||||
queue::ActivityQueue,
|
||||
traits::{Actor, Object},
|
||||
FEDERATION_CONTENT_TYPE,
|
||||
};
|
||||
|
@ -19,7 +20,7 @@ use url::Url;
|
|||
/// is then fetched using [ObjectId::dereference], and the result returned.
|
||||
pub async fn webfinger_resolve_actor<T: Clone, Kind>(
|
||||
identifier: &str,
|
||||
data: &Data<T>,
|
||||
data: &Data<T, <Kind as Object>::QueueType>,
|
||||
) -> Result<Kind, <Kind as Object>::Error>
|
||||
where
|
||||
Kind: Object + Actor + Send + 'static + Object<DataType = T>,
|
||||
|
@ -85,9 +86,10 @@ where
|
|||
/// # Ok::<(), anyhow::Error>(())
|
||||
/// }).unwrap();
|
||||
///```
|
||||
pub fn extract_webfinger_name<T>(query: &str, data: &Data<T>) -> Result<String, Error>
|
||||
pub fn extract_webfinger_name<T, Q>(query: &str, data: &Data<T, Q>) -> Result<String, Error>
|
||||
where
|
||||
T: Clone,
|
||||
Q: ActivityQueue,
|
||||
{
|
||||
// TODO: would be nice if we could implement this without regex and remove the dependency
|
||||
// Regex taken from Mastodon -
|
||||
|
|
|
@ -148,7 +148,7 @@ pub(crate) async fn signing_actor<'a, A, H>(
|
|||
headers: H,
|
||||
method: &Method,
|
||||
uri: &Uri,
|
||||
data: &Data<<A as Object>::DataType>,
|
||||
data: &Data<<A as Object>::DataType, <A as Object>::QueueType>,
|
||||
) -> Result<A, <A as Object>::Error>
|
||||
where
|
||||
A: Object + Actor,
|
||||
|
@ -274,7 +274,7 @@ pub(crate) fn verify_body_hash(
|
|||
#[cfg(test)]
|
||||
pub mod test {
|
||||
use super::*;
|
||||
use crate::activity_queue::generate_request_headers;
|
||||
use crate::queue::request::generate_request_headers;
|
||||
use reqwest::Client;
|
||||
use reqwest_middleware::ClientWithMiddleware;
|
||||
use std::str::FromStr;
|
||||
|
|
|
@ -10,7 +10,6 @@
|
|||
#![doc = include_str!("../docs/10_fetching_objects_with_unknown_type.md")]
|
||||
#![deny(missing_docs)]
|
||||
|
||||
pub mod activity_queue;
|
||||
#[cfg(feature = "actix-web")]
|
||||
pub mod actix_web;
|
||||
#[cfg(feature = "axum")]
|
||||
|
@ -20,6 +19,7 @@ pub mod error;
|
|||
pub mod fetch;
|
||||
pub mod http_signatures;
|
||||
pub mod protocol;
|
||||
pub mod queue;
|
||||
pub(crate) mod reqwest_shim;
|
||||
pub mod traits;
|
||||
|
||||
|
|
|
@ -61,6 +61,7 @@ where
|
|||
T: ActivityHandler + Send + Sync,
|
||||
{
|
||||
type DataType = <T as ActivityHandler>::DataType;
|
||||
type QueueType = <T as ActivityHandler>::QueueType;
|
||||
type Error = <T as ActivityHandler>::Error;
|
||||
|
||||
fn id(&self) -> &Url {
|
||||
|
@ -71,11 +72,17 @@ where
|
|||
self.inner.actor()
|
||||
}
|
||||
|
||||
async fn verify(&self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
|
||||
async fn verify(
|
||||
&self,
|
||||
data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<(), Self::Error> {
|
||||
self.inner.verify(data).await
|
||||
}
|
||||
|
||||
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
|
||||
async fn receive(
|
||||
self,
|
||||
data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<(), Self::Error> {
|
||||
self.inner.receive(data).await
|
||||
}
|
||||
}
|
||||
|
|
284
src/queue/mod.rs
Normal file
284
src/queue/mod.rs
Normal file
|
@ -0,0 +1,284 @@
|
|||
//! Queue for signing and sending outgoing activities with retry
|
||||
//!
|
||||
#![doc = include_str!("../../docs/09_sending_activities.md")]
|
||||
|
||||
pub(crate) mod request;
|
||||
pub mod simple_queue;
|
||||
mod util;
|
||||
mod worker;
|
||||
|
||||
use crate::{
|
||||
config::Data,
|
||||
traits::{ActivityHandler, Actor},
|
||||
};
|
||||
use anyhow::anyhow;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
|
||||
use itertools::Itertools;
|
||||
use openssl::pkey::{PKey, Private};
|
||||
use serde::Serialize;
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
sync::atomic::{AtomicUsize, Ordering},
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
use tracing::{debug, info, warn};
|
||||
use url::Url;
|
||||
|
||||
use self::request::sign_and_send;
|
||||
|
||||
#[async_trait]
|
||||
/// Anything that can enqueue outgoing activitypub requests
|
||||
pub trait ActivityQueue {
|
||||
/// The errors that can be returned when queuing
|
||||
type Error;
|
||||
|
||||
/// Retrieve the queue stats
|
||||
fn stats(&self) -> &Stats;
|
||||
|
||||
/// Queues one activity task to a specific inbox
|
||||
async fn queue(&self, message: SendActivityTask) -> Result<(), Self::Error>;
|
||||
}
|
||||
|
||||
/// Sends an activity with an outbound activity queue
|
||||
pub async fn send_activity<Q: ActivityQueue, Activity, Datatype, ActorType>(
|
||||
activity: Activity,
|
||||
actor: &ActorType,
|
||||
inboxes: Vec<Url>,
|
||||
data: &Data<Datatype, Q>,
|
||||
) -> Result<(), <Activity as ActivityHandler>::Error>
|
||||
where
|
||||
Activity: ActivityHandler + Serialize,
|
||||
<Activity as ActivityHandler>::Error: From<anyhow::Error>
|
||||
+ From<serde_json::Error>
|
||||
+ std::convert::From<<Q as ActivityQueue>::Error>,
|
||||
Datatype: Clone,
|
||||
ActorType: Actor,
|
||||
{
|
||||
let config = &data.config;
|
||||
let queue = data
|
||||
.config
|
||||
.activity_queue
|
||||
.clone()
|
||||
.expect("Should have a queue configured");
|
||||
let actor_id = activity.actor();
|
||||
let activity_id = activity.id();
|
||||
let activity_serialized: Bytes = serde_json::to_vec(&activity)?.into();
|
||||
let private_key_pem = actor
|
||||
.private_key_pem()
|
||||
.ok_or_else(|| anyhow!("Actor {actor_id} does not contain a private key for signing"))?;
|
||||
|
||||
// This is a mostly expensive blocking call, we don't want to tie up other tasks while this is happening
|
||||
let private_key = tokio::task::spawn_blocking(move || {
|
||||
PKey::private_key_from_pem(private_key_pem.as_bytes())
|
||||
.map_err(|err| anyhow!("Could not create private key from PEM data:{err}"))
|
||||
})
|
||||
.await
|
||||
.map_err(|err| anyhow!("Error joining:{err}"))??;
|
||||
|
||||
let inboxes: Vec<Url> = inboxes
|
||||
.into_iter()
|
||||
.unique()
|
||||
.filter(|i| !config.is_local_url(i))
|
||||
.collect();
|
||||
|
||||
for inbox in inboxes {
|
||||
if let Err(err) = config.verify_url_valid(&inbox).await {
|
||||
debug!("inbox url invalid, skipping: {inbox}: {err}");
|
||||
continue;
|
||||
}
|
||||
|
||||
let message = SendActivityTask {
|
||||
id: Uuid::new_v4(),
|
||||
actor_id: actor_id.clone(),
|
||||
activity_id: activity_id.clone(),
|
||||
inbox,
|
||||
activity: activity_serialized.clone(),
|
||||
private_key: private_key.clone(),
|
||||
};
|
||||
|
||||
// Don't use the activity queue if this is in debug mode, send and wait directly
|
||||
if config.debug {
|
||||
if let Err(err) = sign_and_send(
|
||||
&message,
|
||||
&config.client,
|
||||
config.request_timeout,
|
||||
Default::default(),
|
||||
config.http_signature_compat,
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("{err}");
|
||||
}
|
||||
} else {
|
||||
queue.queue(message).await?;
|
||||
let stats = queue.stats();
|
||||
let running = stats.running.load(Ordering::Relaxed);
|
||||
if running == config.worker_count && config.worker_count != 0 {
|
||||
warn!("Reached max number of send activity workers ({}). Consider increasing worker count to avoid federation delays", config.worker_count);
|
||||
warn!("{:?}", stats);
|
||||
} else {
|
||||
info!("{:?}", stats);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
/// The struct sent to a worker for processing
|
||||
/// When `send_activity` is used, it is split up to tasks per-inbox
|
||||
pub struct SendActivityTask {
|
||||
/// The ID of the activity task
|
||||
pub id: Uuid,
|
||||
/// The actor ID
|
||||
pub actor_id: Url,
|
||||
/// The activity ID
|
||||
pub activity_id: Url,
|
||||
/// The activity body in JSON
|
||||
pub activity: Bytes,
|
||||
/// The inbox to send the activity to
|
||||
pub inbox: Url,
|
||||
/// The private key to sign the request
|
||||
pub private_key: PKey<Private>,
|
||||
}
|
||||
|
||||
/// Simple stat counter to show where we're up to with sending messages
|
||||
/// This is a lock-free way to share things between tasks
|
||||
/// When reading these values it's possible (but extremely unlikely) to get stale data if a worker task is in the middle of transitioning
|
||||
#[derive(Default)]
|
||||
pub struct Stats {
|
||||
pending: AtomicUsize,
|
||||
running: AtomicUsize,
|
||||
retries: AtomicUsize,
|
||||
dead_last_hour: AtomicUsize,
|
||||
completed_last_hour: AtomicUsize,
|
||||
}
|
||||
|
||||
impl Debug for Stats {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"Activity queue stats: pending: {}, running: {}, retries: {}, dead: {}, complete: {}",
|
||||
self.pending.load(Ordering::Relaxed),
|
||||
self.running.load(Ordering::Relaxed),
|
||||
self.retries.load(Ordering::Relaxed),
|
||||
self.dead_last_hour.load(Ordering::Relaxed),
|
||||
self.completed_last_hour.load(Ordering::Relaxed)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use axum::extract::State;
|
||||
use bytes::Bytes;
|
||||
use http::{HeaderMap, StatusCode};
|
||||
use std::{
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use crate::{http_signatures::generate_actor_keypair, queue::simple_queue::SimpleQueue};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[allow(unused)]
|
||||
// This will periodically send back internal errors to test the retry
|
||||
async fn dodgy_handler(
|
||||
State(state): State<Arc<AtomicUsize>>,
|
||||
headers: HeaderMap,
|
||||
body: Bytes,
|
||||
) -> Result<(), StatusCode> {
|
||||
debug!("Headers:{:?}", headers);
|
||||
debug!("Body len:{}", body.len());
|
||||
|
||||
if state.fetch_add(1, Ordering::Relaxed) % 20 == 0 {
|
||||
return Err(StatusCode::INTERNAL_SERVER_ERROR);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn test_server() {
|
||||
use axum::{routing::post, Router};
|
||||
|
||||
// We should break every now and then ;)
|
||||
let state = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
let app = Router::new()
|
||||
.route("/", post(dodgy_handler))
|
||||
.with_state(state);
|
||||
|
||||
axum::Server::bind(&"0.0.0.0:8001".parse().unwrap())
|
||||
.serve(app.into_make_service())
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
// Queues 100 messages and then asserts that the worker runs them
|
||||
async fn test_activity_queue_workers() {
|
||||
let num_workers = 64;
|
||||
let num_messages: usize = 100;
|
||||
|
||||
tokio::spawn(test_server());
|
||||
|
||||
/*
|
||||
// uncomment for debug logs & stats
|
||||
use tracing::log::LevelFilter;
|
||||
|
||||
env_logger::builder()
|
||||
.filter_level(LevelFilter::Warn)
|
||||
.filter_module("activitypub_federation", LevelFilter::Info)
|
||||
.format_timestamp(None)
|
||||
.init();
|
||||
|
||||
*/
|
||||
|
||||
let queue = SimpleQueue::new(
|
||||
reqwest::Client::default().into(),
|
||||
num_workers,
|
||||
num_workers,
|
||||
Duration::from_secs(10),
|
||||
1,
|
||||
true,
|
||||
);
|
||||
|
||||
let keypair = generate_actor_keypair().unwrap();
|
||||
|
||||
let message = SendActivityTask {
|
||||
id: Uuid::new_v4(),
|
||||
actor_id: "http://localhost:8001".parse().unwrap(),
|
||||
activity_id: "http://localhost:8001/activity".parse().unwrap(),
|
||||
activity: "{}".into(),
|
||||
inbox: "http://localhost:8001".parse().unwrap(),
|
||||
private_key: keypair.private_key().unwrap(),
|
||||
};
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
for _ in 0..num_messages {
|
||||
queue.queue(message.clone()).await.unwrap();
|
||||
}
|
||||
|
||||
info!("Queue Sent: {:?}", start.elapsed());
|
||||
|
||||
let stats = queue.shutdown(true).await.unwrap();
|
||||
|
||||
info!(
|
||||
"Queue Finished. Num msgs: {}, Time {:?}, msg/s: {:0.0}",
|
||||
num_messages,
|
||||
start.elapsed(),
|
||||
num_messages as f64 / start.elapsed().as_secs_f64()
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
stats.completed_last_hour.load(Ordering::Relaxed),
|
||||
num_messages
|
||||
);
|
||||
}
|
||||
}
|
126
src/queue/request.rs
Normal file
126
src/queue/request.rs
Normal file
|
@ -0,0 +1,126 @@
|
|||
use std::time::{Duration, SystemTime};
|
||||
|
||||
use http::{header::HeaderName, HeaderMap, HeaderValue};
|
||||
use httpdate::fmt_http_date;
|
||||
use reqwest::Request;
|
||||
use reqwest_middleware::ClientWithMiddleware;
|
||||
use url::Url;
|
||||
|
||||
use crate::{
|
||||
error::Error,
|
||||
http_signatures::sign_request,
|
||||
queue::util::retry,
|
||||
reqwest_shim::ResponseExt,
|
||||
FEDERATION_CONTENT_TYPE,
|
||||
};
|
||||
use anyhow::{anyhow, Context};
|
||||
use tracing::*;
|
||||
|
||||
use super::{util::RetryStrategy, SendActivityTask};
|
||||
|
||||
pub(super) async fn sign_and_send(
|
||||
task: &SendActivityTask,
|
||||
client: &ClientWithMiddleware,
|
||||
timeout: Duration,
|
||||
retry_strategy: RetryStrategy,
|
||||
http_signature_compat: bool,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
debug!(
|
||||
"Sending {} to {}, contents:\n {}",
|
||||
task.activity_id,
|
||||
task.inbox,
|
||||
serde_json::from_slice::<serde_json::Value>(&task.activity)?
|
||||
);
|
||||
let request_builder = client
|
||||
.post(task.inbox.to_string())
|
||||
.timeout(timeout)
|
||||
.headers(generate_request_headers(&task.inbox));
|
||||
let request = sign_request(
|
||||
request_builder,
|
||||
&task.actor_id,
|
||||
task.activity.clone(),
|
||||
task.private_key.clone(),
|
||||
http_signature_compat,
|
||||
)
|
||||
.await
|
||||
.context("signing request")?;
|
||||
|
||||
retry(
|
||||
|| {
|
||||
send(
|
||||
task,
|
||||
client,
|
||||
request
|
||||
.try_clone()
|
||||
.expect("The body of the request is not cloneable"),
|
||||
)
|
||||
},
|
||||
retry_strategy,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(super) async fn send(
|
||||
task: &SendActivityTask,
|
||||
client: &ClientWithMiddleware,
|
||||
request: Request,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let response = client.execute(request).await;
|
||||
|
||||
match response {
|
||||
Ok(o) if o.status().is_success() => {
|
||||
debug!(
|
||||
"Activity {} delivered successfully to {}",
|
||||
task.activity_id, task.inbox
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
Ok(o) if o.status().is_client_error() => {
|
||||
let text = o.text_limited().await.map_err(Error::other)?;
|
||||
debug!(
|
||||
"Activity {} was rejected by {}, aborting: {}",
|
||||
task.activity_id, task.inbox, text,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
Ok(o) => {
|
||||
let status = o.status();
|
||||
let text = o.text_limited().await.map_err(Error::other)?;
|
||||
Err(anyhow!(
|
||||
"Queueing activity {} to {} for retry after failure with status {}: {}",
|
||||
task.activity_id,
|
||||
task.inbox,
|
||||
status,
|
||||
text,
|
||||
))
|
||||
}
|
||||
Err(e) => Err(anyhow!(
|
||||
"Queueing activity {} to {} for retry after connection failure: {}",
|
||||
task.activity_id,
|
||||
task.inbox,
|
||||
e
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn generate_request_headers(inbox_url: &Url) -> HeaderMap {
|
||||
let mut host = inbox_url.domain().expect("read inbox domain").to_string();
|
||||
if let Some(port) = inbox_url.port() {
|
||||
host = format!("{}:{}", host, port);
|
||||
}
|
||||
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(
|
||||
HeaderName::from_static("content-type"),
|
||||
HeaderValue::from_static(FEDERATION_CONTENT_TYPE),
|
||||
);
|
||||
headers.insert(
|
||||
HeaderName::from_static("host"),
|
||||
HeaderValue::from_str(&host).expect("Hostname is valid"),
|
||||
);
|
||||
headers.insert(
|
||||
"date",
|
||||
HeaderValue::from_str(&fmt_http_date(SystemTime::now())).expect("Date is valid"),
|
||||
);
|
||||
headers
|
||||
}
|
202
src/queue/simple_queue.rs
Normal file
202
src/queue/simple_queue.rs
Normal file
|
@ -0,0 +1,202 @@
|
|||
//! A simple activity queue which spawns tokio workers to send out requests
|
||||
//! Uses an unbounded mpsc queue for communication (i.e, all messages are in memory)
|
||||
use std::{
|
||||
sync::{atomic::Ordering, Arc},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use reqwest_middleware::ClientWithMiddleware;
|
||||
use tokio::{
|
||||
sync::mpsc::{unbounded_channel, UnboundedSender},
|
||||
task::{JoinHandle, JoinSet},
|
||||
};
|
||||
|
||||
use crate::config::FederationConfig;
|
||||
|
||||
use super::{
|
||||
util::RetryStrategy,
|
||||
worker::{activity_worker, retry_worker},
|
||||
ActivityQueue,
|
||||
SendActivityTask,
|
||||
Stats,
|
||||
};
|
||||
|
||||
/// A simple activity queue which spawns tokio workers to send out requests
|
||||
/// When creating a queue, it will spawn a task per worker thread
|
||||
/// Uses an unbounded mpsc queue for communication (i.e, all messages are in memory)
|
||||
pub struct SimpleQueue {
|
||||
// Stats shared between the queue and workers
|
||||
stats: Arc<Stats>,
|
||||
sender: UnboundedSender<SendActivityTask>,
|
||||
sender_task: JoinHandle<()>,
|
||||
retry_sender_task: JoinHandle<()>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ActivityQueue for SimpleQueue {
|
||||
type Error = anyhow::Error;
|
||||
fn stats(&self) -> &Stats {
|
||||
&self.stats
|
||||
}
|
||||
|
||||
async fn queue(&self, message: SendActivityTask) -> Result<(), Self::Error> {
|
||||
self.stats.pending.fetch_add(1, Ordering::Relaxed);
|
||||
self.sender.send(message)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl SimpleQueue {
|
||||
/// Construct a queue from federation config
|
||||
pub fn from_config<T: Clone>(config: &FederationConfig<T, SimpleQueue>) -> Self {
|
||||
Self::new(
|
||||
config.client.clone(),
|
||||
config.worker_count,
|
||||
config.retry_count,
|
||||
config.request_timeout,
|
||||
60,
|
||||
config.http_signature_compat,
|
||||
)
|
||||
}
|
||||
|
||||
/// Construct a new queue
|
||||
pub fn new(
|
||||
client: ClientWithMiddleware,
|
||||
worker_count: usize,
|
||||
retry_count: usize,
|
||||
timeout: Duration,
|
||||
backoff: usize, // This should be 60 seconds by default or 1 second in tests
|
||||
http_signature_compat: bool,
|
||||
) -> Self {
|
||||
let stats: Arc<Stats> = Default::default();
|
||||
|
||||
// This task clears the dead/completed stats every hour
|
||||
let hour_stats = stats.clone();
|
||||
tokio::spawn(async move {
|
||||
let duration = Duration::from_secs(3600);
|
||||
loop {
|
||||
tokio::time::sleep(duration).await;
|
||||
hour_stats.completed_last_hour.store(0, Ordering::Relaxed);
|
||||
hour_stats.dead_last_hour.store(0, Ordering::Relaxed);
|
||||
}
|
||||
});
|
||||
|
||||
let (retry_sender, mut retry_receiver) = unbounded_channel();
|
||||
let retry_stats = stats.clone();
|
||||
let retry_client = client.clone();
|
||||
|
||||
// The "fast path" retry
|
||||
// The backoff should be < 5 mins for this to work otherwise signatures may expire
|
||||
// This strategy is the one that is used with the *same* signature
|
||||
let strategy = RetryStrategy {
|
||||
backoff,
|
||||
retries: 1,
|
||||
offset: 0,
|
||||
initial_sleep: 0,
|
||||
};
|
||||
|
||||
// The "retry path" strategy
|
||||
// After the fast path fails, a task will sleep up to backoff ^ 2 and then retry again
|
||||
let retry_strategy = RetryStrategy {
|
||||
backoff,
|
||||
retries: 3,
|
||||
offset: 2,
|
||||
initial_sleep: backoff.pow(2), // wait 60 mins before even trying
|
||||
};
|
||||
|
||||
let retry_sender_task = tokio::spawn(async move {
|
||||
let mut join_set = JoinSet::new();
|
||||
|
||||
while let Some(message) = retry_receiver.recv().await {
|
||||
let retry_task = retry_worker(
|
||||
retry_client.clone(),
|
||||
timeout,
|
||||
message,
|
||||
retry_stats.clone(),
|
||||
retry_strategy,
|
||||
http_signature_compat,
|
||||
);
|
||||
|
||||
if retry_count > 0 {
|
||||
// If we're over the limit of retries, wait for them to finish before spawning
|
||||
while join_set.len() >= retry_count {
|
||||
join_set.join_next().await;
|
||||
}
|
||||
|
||||
join_set.spawn(retry_task);
|
||||
} else {
|
||||
// If the retry worker count is `0` then just spawn and don't use the join_set
|
||||
tokio::spawn(retry_task);
|
||||
}
|
||||
}
|
||||
|
||||
while !join_set.is_empty() {
|
||||
join_set.join_next().await;
|
||||
}
|
||||
});
|
||||
|
||||
let (sender, mut receiver) = unbounded_channel();
|
||||
|
||||
let sender_stats = stats.clone();
|
||||
|
||||
let sender_task = tokio::spawn(async move {
|
||||
let mut join_set = JoinSet::new();
|
||||
|
||||
while let Some(message) = receiver.recv().await {
|
||||
let task = activity_worker(
|
||||
client.clone(),
|
||||
timeout,
|
||||
message,
|
||||
retry_sender.clone(),
|
||||
sender_stats.clone(),
|
||||
strategy,
|
||||
http_signature_compat,
|
||||
);
|
||||
|
||||
if worker_count > 0 {
|
||||
// If we're over the limit of workers, wait for them to finish before spawning
|
||||
while join_set.len() >= worker_count {
|
||||
join_set.join_next().await;
|
||||
}
|
||||
|
||||
join_set.spawn(task);
|
||||
} else {
|
||||
// If the worker count is `0` then just spawn and don't use the join_set
|
||||
tokio::spawn(task);
|
||||
}
|
||||
}
|
||||
|
||||
drop(retry_sender);
|
||||
|
||||
while !join_set.is_empty() {
|
||||
join_set.join_next().await;
|
||||
}
|
||||
});
|
||||
|
||||
Self {
|
||||
stats,
|
||||
sender,
|
||||
sender_task,
|
||||
retry_sender_task,
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
// Drops all the senders and shuts down the workers
|
||||
pub(crate) async fn shutdown(
|
||||
self,
|
||||
wait_for_retries: bool,
|
||||
) -> Result<Arc<Stats>, anyhow::Error> {
|
||||
drop(self.sender);
|
||||
|
||||
self.sender_task.await?;
|
||||
|
||||
if wait_for_retries {
|
||||
self.retry_sender_task.await?;
|
||||
}
|
||||
|
||||
Ok(self.stats)
|
||||
}
|
||||
}
|
56
src/queue/util.rs
Normal file
56
src/queue/util.rs
Normal file
|
@ -0,0 +1,56 @@
|
|||
use futures_core::Future;
|
||||
use std::{
|
||||
fmt::{Debug, Display},
|
||||
time::Duration,
|
||||
};
|
||||
use tracing::*;
|
||||
|
||||
#[derive(Clone, Copy, Default)]
|
||||
pub(crate) struct RetryStrategy {
|
||||
/// Amount of time in seconds to back off
|
||||
pub backoff: usize,
|
||||
/// Amount of times to retry
|
||||
pub retries: usize,
|
||||
/// If this particular request has already been retried, you can add an offset here to increment the count to start
|
||||
pub offset: usize,
|
||||
/// Number of seconds to sleep before trying
|
||||
pub initial_sleep: usize,
|
||||
}
|
||||
|
||||
/// Retries a future action factory function up to `amount` times with an exponential backoff timer between tries
|
||||
pub(crate) async fn retry<
|
||||
T,
|
||||
E: Display + Debug,
|
||||
F: Future<Output = Result<T, E>>,
|
||||
A: FnMut() -> F,
|
||||
>(
|
||||
mut action: A,
|
||||
strategy: RetryStrategy,
|
||||
) -> Result<T, E> {
|
||||
let mut count = strategy.offset;
|
||||
|
||||
// Do an initial sleep if it's called for
|
||||
if strategy.initial_sleep > 0 {
|
||||
let sleep_dur = Duration::from_secs(strategy.initial_sleep as u64);
|
||||
tokio::time::sleep(sleep_dur).await;
|
||||
}
|
||||
|
||||
loop {
|
||||
match action().await {
|
||||
Ok(val) => return Ok(val),
|
||||
Err(err) => {
|
||||
if count < strategy.retries {
|
||||
count += 1;
|
||||
|
||||
let sleep_amt = strategy.backoff.pow(count as u32) as u64;
|
||||
let sleep_dur = Duration::from_secs(sleep_amt);
|
||||
warn!("{err:?}. Sleeping for {sleep_dur:?} and trying again");
|
||||
tokio::time::sleep(sleep_dur).await;
|
||||
continue;
|
||||
} else {
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
96
src/queue/worker.rs
Normal file
96
src/queue/worker.rs
Normal file
|
@ -0,0 +1,96 @@
|
|||
use reqwest_middleware::ClientWithMiddleware;
|
||||
|
||||
use std::{
|
||||
sync::{atomic::Ordering, Arc},
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use tracing::warn;
|
||||
|
||||
use super::{
|
||||
request::sign_and_send,
|
||||
util::{retry, RetryStrategy},
|
||||
SendActivityTask,
|
||||
Stats,
|
||||
};
|
||||
|
||||
/// A tokio spawned worker which is responsible for submitting requests to federated servers
|
||||
/// This will retry up to one time with the same signature, and if it fails, will move it to the retry queue.
|
||||
/// We need to retry activity sending in case the target instances is temporarily unreachable.
|
||||
/// In this case, the task is stored and resent when the instance is hopefully back up. This
|
||||
/// list shows the retry intervals, and which events of the target instance can be covered:
|
||||
/// - 60s (one minute, service restart) -- happens in the worker w/ same signature
|
||||
/// - 60min (one hour, instance maintenance) --- happens in the retry worker
|
||||
/// - 60h (2.5 days, major incident with rebuild from backup) --- happens in the retry worker
|
||||
pub(super) async fn activity_worker(
|
||||
client: ClientWithMiddleware,
|
||||
timeout: Duration,
|
||||
message: SendActivityTask,
|
||||
retry_queue: UnboundedSender<SendActivityTask>,
|
||||
stats: Arc<Stats>,
|
||||
strategy: RetryStrategy,
|
||||
http_signature_compat: bool,
|
||||
) {
|
||||
stats.pending.fetch_sub(1, Ordering::Relaxed);
|
||||
stats.running.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let outcome = sign_and_send(&message, &client, timeout, strategy, http_signature_compat).await;
|
||||
|
||||
// "Running" has finished, check the outcome
|
||||
stats.running.fetch_sub(1, Ordering::Relaxed);
|
||||
|
||||
match outcome {
|
||||
Ok(_) => {
|
||||
stats.completed_last_hour.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
Err(_err) => {
|
||||
stats.retries.fetch_add(1, Ordering::Relaxed);
|
||||
warn!(
|
||||
"Sending activity {} to {} to the retry queue to be tried again later",
|
||||
message.activity_id, message.inbox
|
||||
);
|
||||
// Send to the retry queue. Ignoring whether it succeeds or not
|
||||
retry_queue.send(message).ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn retry_worker(
|
||||
client: ClientWithMiddleware,
|
||||
timeout: Duration,
|
||||
message: SendActivityTask,
|
||||
stats: Arc<Stats>,
|
||||
strategy: RetryStrategy,
|
||||
http_signature_compat: bool,
|
||||
) {
|
||||
// Because the times are pretty extravagant between retries, we have to re-sign each time
|
||||
let outcome = retry(
|
||||
|| {
|
||||
sign_and_send(
|
||||
&message,
|
||||
&client,
|
||||
timeout,
|
||||
RetryStrategy {
|
||||
backoff: 0,
|
||||
retries: 0,
|
||||
offset: 0,
|
||||
initial_sleep: 0,
|
||||
},
|
||||
http_signature_compat,
|
||||
)
|
||||
},
|
||||
strategy,
|
||||
)
|
||||
.await;
|
||||
|
||||
stats.retries.fetch_sub(1, Ordering::Relaxed);
|
||||
|
||||
match outcome {
|
||||
Ok(_) => {
|
||||
stats.completed_last_hour.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
Err(_err) => {
|
||||
stats.dead_last_hour.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,6 +1,10 @@
|
|||
//! Traits which need to be implemented for federated data types
|
||||
|
||||
use crate::{config::Data, protocol::public_key::PublicKey};
|
||||
use crate::{
|
||||
config::Data,
|
||||
protocol::public_key::PublicKey,
|
||||
queue::{simple_queue::SimpleQueue, ActivityQueue},
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use chrono::NaiveDateTime;
|
||||
use serde::Deserialize;
|
||||
|
@ -97,6 +101,8 @@ pub trait Object: Sized + Debug {
|
|||
/// App data type passed to handlers. Must be identical to
|
||||
/// [crate::config::FederationConfigBuilder::app_data] type.
|
||||
type DataType: Clone + Send + Sync;
|
||||
/// The queue type to use with this object
|
||||
type QueueType: ActivityQueue + Send + Sync;
|
||||
/// The type of protocol struct which gets sent over network to federate this database struct.
|
||||
type Kind;
|
||||
/// Error type returned by handler methods
|
||||
|
@ -120,13 +126,16 @@ pub trait Object: Sized + Debug {
|
|||
/// Should return `Ok(None)` if not found.
|
||||
async fn read_from_id(
|
||||
object_id: Url,
|
||||
data: &Data<Self::DataType>,
|
||||
data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<Option<Self>, Self::Error>;
|
||||
|
||||
/// Mark remote object as deleted in local database.
|
||||
///
|
||||
/// Called when a `Delete` activity is received, or if fetch returns a `Tombstone` object.
|
||||
async fn delete(self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
|
||||
async fn delete(
|
||||
self,
|
||||
_data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -134,7 +143,10 @@ pub trait Object: Sized + Debug {
|
|||
///
|
||||
/// Called when a local object gets fetched by another instance over HTTP, or when an object
|
||||
/// gets sent in an activity.
|
||||
async fn into_json(self, data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error>;
|
||||
async fn into_json(
|
||||
self,
|
||||
data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<Self::Kind, Self::Error>;
|
||||
|
||||
/// Verifies that the received object is valid.
|
||||
///
|
||||
|
@ -146,7 +158,7 @@ pub trait Object: Sized + Debug {
|
|||
async fn verify(
|
||||
json: &Self::Kind,
|
||||
expected_domain: &Url,
|
||||
data: &Data<Self::DataType>,
|
||||
data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<(), Self::Error>;
|
||||
|
||||
/// Convert object from ActivityPub type to database type.
|
||||
|
@ -154,7 +166,10 @@ pub trait Object: Sized + Debug {
|
|||
/// Called when an object is received from HTTP fetch or as part of an activity. This method
|
||||
/// should write the received object to database. Note that there is no distinction between
|
||||
/// create and update, so an `upsert` operation should be used.
|
||||
async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error>;
|
||||
async fn from_json(
|
||||
json: Self::Kind,
|
||||
data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<Self, Self::Error>;
|
||||
}
|
||||
|
||||
/// Handler for receiving incoming activities.
|
||||
|
@ -206,6 +221,10 @@ pub trait ActivityHandler {
|
|||
/// App data type passed to handlers. Must be identical to
|
||||
/// [crate::config::FederationConfigBuilder::app_data] type.
|
||||
type DataType: Clone + Send + Sync;
|
||||
|
||||
/// The queue type to use with this object
|
||||
type QueueType: ActivityQueue + Send + Sync;
|
||||
|
||||
/// Error type returned by handler methods
|
||||
type Error;
|
||||
|
||||
|
@ -219,13 +238,15 @@ pub trait ActivityHandler {
|
|||
///
|
||||
/// This needs to be a separate method, because it might be used for activities
|
||||
/// like `Undo/Follow`, which shouldn't perform any database write for the inner `Follow`.
|
||||
async fn verify(&self, data: &Data<Self::DataType>) -> Result<(), Self::Error>;
|
||||
async fn verify(&self, data: &Data<Self::DataType, Self::QueueType>)
|
||||
-> Result<(), Self::Error>;
|
||||
|
||||
/// Called when an activity is received.
|
||||
///
|
||||
/// Should perform validation and possibly write action to the database. In case the activity
|
||||
/// has a nested `object` field, must call `object.from_json` handler.
|
||||
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error>;
|
||||
async fn receive(self, data: &Data<Self::DataType, Self::QueueType>)
|
||||
-> Result<(), Self::Error>;
|
||||
}
|
||||
|
||||
/// Trait to allow retrieving common Actor data.
|
||||
|
@ -271,6 +292,7 @@ where
|
|||
T: ActivityHandler + Send + Sync,
|
||||
{
|
||||
type DataType = T::DataType;
|
||||
type QueueType = T::QueueType;
|
||||
type Error = T::Error;
|
||||
|
||||
fn id(&self) -> &Url {
|
||||
|
@ -281,11 +303,17 @@ where
|
|||
self.deref().actor()
|
||||
}
|
||||
|
||||
async fn verify(&self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
|
||||
async fn verify(
|
||||
&self,
|
||||
data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<(), Self::Error> {
|
||||
self.deref().verify(data).await
|
||||
}
|
||||
|
||||
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
|
||||
async fn receive(
|
||||
self,
|
||||
data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<(), Self::Error> {
|
||||
(*self).receive(data).await
|
||||
}
|
||||
}
|
||||
|
@ -298,6 +326,10 @@ pub trait Collection: Sized {
|
|||
/// App data type passed to handlers. Must be identical to
|
||||
/// [crate::config::FederationConfigBuilder::app_data] type.
|
||||
type DataType: Clone + Send + Sync;
|
||||
|
||||
/// The queue type to use with this object
|
||||
type QueueType: ActivityQueue + Send + Sync;
|
||||
|
||||
/// The type of protocol struct which gets sent over network to federate this database struct.
|
||||
type Kind: for<'de2> Deserialize<'de2>;
|
||||
/// Error type returned by handler methods
|
||||
|
@ -306,7 +338,7 @@ pub trait Collection: Sized {
|
|||
/// Reads local collection from database and returns it as Activitypub JSON.
|
||||
async fn read_local(
|
||||
owner: &Self::Owner,
|
||||
data: &Data<Self::DataType>,
|
||||
data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<Self::Kind, Self::Error>;
|
||||
|
||||
/// Verifies that the received object is valid.
|
||||
|
@ -316,7 +348,7 @@ pub trait Collection: Sized {
|
|||
async fn verify(
|
||||
json: &Self::Kind,
|
||||
expected_domain: &Url,
|
||||
data: &Data<Self::DataType>,
|
||||
data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<(), Self::Error>;
|
||||
|
||||
/// Convert object from ActivityPub type to database type.
|
||||
|
@ -327,7 +359,7 @@ pub trait Collection: Sized {
|
|||
async fn from_json(
|
||||
json: Self::Kind,
|
||||
owner: &Self::Owner,
|
||||
data: &Data<Self::DataType>,
|
||||
data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<Self, Self::Error>;
|
||||
}
|
||||
|
||||
|
@ -403,17 +435,21 @@ pub mod tests {
|
|||
#[async_trait]
|
||||
impl Object for DbUser {
|
||||
type DataType = DbConnection;
|
||||
type QueueType = SimpleQueue;
|
||||
type Kind = Person;
|
||||
type Error = Error;
|
||||
|
||||
async fn read_from_id(
|
||||
_object_id: Url,
|
||||
_data: &Data<Self::DataType>,
|
||||
_data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<Option<Self>, Self::Error> {
|
||||
Ok(Some(DB_USER.clone()))
|
||||
}
|
||||
|
||||
async fn into_json(self, _data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
|
||||
async fn into_json(
|
||||
self,
|
||||
_data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<Self::Kind, Self::Error> {
|
||||
Ok(Person {
|
||||
preferred_username: self.name.clone(),
|
||||
kind: Default::default(),
|
||||
|
@ -426,7 +462,7 @@ pub mod tests {
|
|||
async fn verify(
|
||||
json: &Self::Kind,
|
||||
expected_domain: &Url,
|
||||
_data: &Data<Self::DataType>,
|
||||
_data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<(), Self::Error> {
|
||||
verify_domains_match(json.id.inner(), expected_domain)?;
|
||||
Ok(())
|
||||
|
@ -434,7 +470,7 @@ pub mod tests {
|
|||
|
||||
async fn from_json(
|
||||
json: Self::Kind,
|
||||
_data: &Data<Self::DataType>,
|
||||
_data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<Self, Self::Error> {
|
||||
Ok(DbUser {
|
||||
name: json.preferred_username,
|
||||
|
@ -479,6 +515,7 @@ pub mod tests {
|
|||
#[async_trait]
|
||||
impl ActivityHandler for Follow {
|
||||
type DataType = DbConnection;
|
||||
type QueueType = SimpleQueue;
|
||||
type Error = Error;
|
||||
|
||||
fn id(&self) -> &Url {
|
||||
|
@ -489,11 +526,17 @@ pub mod tests {
|
|||
self.actor.inner()
|
||||
}
|
||||
|
||||
async fn verify(&self, _: &Data<Self::DataType>) -> Result<(), Self::Error> {
|
||||
async fn verify(
|
||||
&self,
|
||||
_: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn receive(self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
|
||||
async fn receive(
|
||||
self,
|
||||
_data: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -507,29 +550,36 @@ pub mod tests {
|
|||
#[async_trait]
|
||||
impl Object for DbPost {
|
||||
type DataType = DbConnection;
|
||||
type QueueType = SimpleQueue;
|
||||
type Kind = Note;
|
||||
type Error = Error;
|
||||
|
||||
async fn read_from_id(
|
||||
_: Url,
|
||||
_: &Data<Self::DataType>,
|
||||
_: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<Option<Self>, Self::Error> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn into_json(self, _: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
|
||||
async fn into_json(
|
||||
self,
|
||||
_: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<Self::Kind, Self::Error> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn verify(
|
||||
_: &Self::Kind,
|
||||
_: &Url,
|
||||
_: &Data<Self::DataType>,
|
||||
_: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<(), Self::Error> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn from_json(_: Self::Kind, _: &Data<Self::DataType>) -> Result<Self, Self::Error> {
|
||||
async fn from_json(
|
||||
_: Self::Kind,
|
||||
_: &Data<Self::DataType, Self::QueueType>,
|
||||
) -> Result<Self, Self::Error> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue