Use background jobs for delivery, support Accept and Reject

This commit is contained in:
asonix 2020-03-21 15:24:05 -05:00
parent 6a8f7db165
commit 5a49c1f10c
11 changed files with 371 additions and 74 deletions

68
Cargo.lock generated
View file

@ -452,6 +452,50 @@ dependencies = [
"serde_urlencoded",
]
[[package]]
name = "background-jobs"
version = "0.8.0-alpha.0"
source = "git+https://git.asonix.dog/Aardwolf/background-jobs#3144b71abb5991643353d8e9f046a173ce9d6d4e"
dependencies = [
"background-jobs-actix",
"background-jobs-core",
]
[[package]]
name = "background-jobs-actix"
version = "0.7.0-alpha.0"
source = "git+https://git.asonix.dog/Aardwolf/background-jobs#3144b71abb5991643353d8e9f046a173ce9d6d4e"
dependencies = [
"actix",
"actix-rt",
"anyhow",
"async-trait",
"background-jobs-core",
"chrono",
"log",
"num_cpus",
"rand",
"serde 1.0.105",
"serde_json",
"thiserror",
"tokio",
]
[[package]]
name = "background-jobs-core"
version = "0.7.0"
source = "git+https://git.asonix.dog/Aardwolf/background-jobs#3144b71abb5991643353d8e9f046a173ce9d6d4e"
dependencies = [
"anyhow",
"async-trait",
"chrono",
"futures",
"log",
"serde 1.0.105",
"serde_json",
"thiserror",
]
[[package]]
name = "backtrace"
version = "0.3.45"
@ -628,6 +672,7 @@ checksum = "80094f509cf8b5ae86a4966a39b3ff66cd7e2a3e594accec3743ff3fabeab5b2"
dependencies = [
"num-integer",
"num-traits 0.2.11",
"serde 1.0.105",
"time 0.1.42",
]
@ -1781,6 +1826,7 @@ dependencies = [
"actix-web",
"actix-webfinger",
"anyhow",
"background-jobs",
"base64 0.12.0",
"bb8-postgres",
"config",
@ -2102,9 +2148,9 @@ dependencies = [
[[package]]
name = "siphasher"
version = "0.3.1"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83da420ee8d1a89e640d0948c646c1c088758d3a3c538f943bfa97bdac17929d"
checksum = "8e88f89a550c01e4cd809f3df4f52dc9e939f3273a2017eabd5c6d12fd98bb23"
[[package]]
name = "slab"
@ -2251,9 +2297,9 @@ checksum = "7c65d530b10ccaeac294f349038a597e435b18fb456aadd0840a623f83b9e941"
[[package]]
name = "syn"
version = "1.0.16"
version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "123bd9499cfb380418d509322d7a6d52e5315f064fe4b3ad18a53d6b92c07859"
checksum = "0df0eb663f387145cab623dea85b09c2c5b4b0aef44e945d928e682fce71bb03"
dependencies = [
"proc-macro2",
"quote",
@ -2303,18 +2349,18 @@ dependencies = [
[[package]]
name = "thiserror"
version = "1.0.11"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee14bf8e6767ab4c687c9e8bc003879e042a96fd67a3ba5934eadb6536bef4db"
checksum = "268c0f167625b8b0cc90a91787b158a372b4edadb31d6e20479dc787309defad"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.11"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7b51e1fbc44b5a0840be594fbc0f960be09050f2617e61e6aa43bef97cd3ef4"
checksum = "9a3ecbaa927a1d5a73d14a20af52463fa433c0727d07ef5e208f0546841d2efd"
dependencies = [
"proc-macro2",
"quote",
@ -2427,7 +2473,7 @@ dependencies = [
"postgres-protocol",
"postgres-types",
"tokio",
"tokio-util 0.3.0",
"tokio-util 0.3.1",
]
[[package]]
@ -2458,9 +2504,9 @@ dependencies = [
[[package]]
name = "tokio-util"
version = "0.3.0"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af67cdce2b40f8dffb0ee04c853a24217b5d0d3e358f0f5ccc0b5332174ed9a8"
checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499"
dependencies = [
"bytes",
"futures-core",

View file

@ -19,6 +19,7 @@ actix-rt = "1.0.0"
actix-web = { version = "3.0.0-alpha.1", features = ["rustls"] }
actix-webfinger = "0.3.0-alpha.3"
activitystreams = "0.5.0-alpha.11"
background-jobs = { version = "0.8.0-alpha.0", git = "https://git.asonix.dog/Aardwolf/background-jobs", default-features = false, features = ["background-jobs-actix"] }
base64 = "0.12"
bb8-postgres = "0.4.0"
config = "0.10.1"

View file

@ -33,6 +33,8 @@ ecample, if the server is `https://relay.my.tld`, the correct URL would be
`https://relay.my.tld/actor`.
### Supported Activities
- Accept Follow {self}, this is a no-op
- Reject Follow {self}, an Undo Follow is sent back
- Announce {anything}, {anything} is Announced to listening servers
- Create {anything}, {anything} is Announced to listening servers
- Follow {self}, become a listener of the relay, a Follow will be sent back

View file

@ -37,10 +37,12 @@ pub struct AnyExistingObject {
#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "PascalCase")]
pub enum ValidTypes {
Accept,
Announce,
Create,
Delete,
Follow,
Reject,
Undo,
Update,
}
@ -133,20 +135,49 @@ impl ValidObjects {
}
}
pub fn child_object_is(&self, uri: &XsdAnyUri) -> bool {
pub fn child_object_id(&self) -> Option<XsdAnyUri> {
match self {
ValidObjects::Id(_) => false,
ValidObjects::Id(_) => None,
ValidObjects::Object(AnyExistingObject { ext, .. }) => {
if let Some(o) = ext.get("object") {
if let Ok(child_uri) = serde_json::from_value::<XsdAnyUri>(o.clone()) {
return child_uri == *uri;
return Some(child_uri);
}
}
false
None
}
}
}
pub fn child_object_is(&self, uri: &XsdAnyUri) -> bool {
if let Some(child_object_id) = self.child_object_id() {
return *uri == child_object_id;
}
false
}
pub fn child_actor_id(&self) -> Option<XsdAnyUri> {
match self {
ValidObjects::Id(_) => None,
ValidObjects::Object(AnyExistingObject { ext, .. }) => {
if let Some(o) = ext.get("actor") {
if let Ok(child_uri) = serde_json::from_value::<XsdAnyUri>(o.clone()) {
return Some(child_uri);
}
}
None
}
}
}
pub fn child_actor_is(&self, uri: &XsdAnyUri) -> bool {
if let Some(child_actor_id) = self.child_actor_id() {
return *uri == child_actor_id;
}
false
}
}
impl AcceptedActors {

View file

@ -10,6 +10,9 @@ use std::{convert::Infallible, fmt::Debug, io::Error};
#[derive(Debug, thiserror::Error)]
pub enum MyError {
#[error("Error queueing job, {0}")]
Queue(anyhow::Error),
#[error("Error in configuration, {0}")]
Config(#[from] config::ConfigError),
@ -82,8 +85,8 @@ pub enum MyError {
#[error("Couldn't receive request response")]
ReceiveResponse,
#[error("Response has invalid status code")]
Status,
#[error("Response has invalid status code, {0}")]
Status(StatusCode),
#[error("URI is missing domain field")]
Domain,

View file

@ -3,6 +3,8 @@ use crate::{
config::{Config, UrlKind},
db::Db,
error::MyError,
jobs::JobServer,
jobs::{Deliver, DeliverMany},
requests::Requests,
responses::accepted,
state::State,
@ -25,6 +27,7 @@ pub async fn inbox(
state: web::Data<State>,
config: web::Data<Config>,
client: web::Data<Requests>,
jobs: web::Data<JobServer>,
input: web::Json<AcceptedObjects>,
verified: Option<SignatureVerified>,
digest_verified: Option<DigestVerified>,
@ -66,22 +69,74 @@ pub async fn inbox(
}
match input.kind {
ValidTypes::Accept => handle_accept(&config, input).await,
ValidTypes::Reject => handle_reject(&db, &config, &jobs, input, actor).await,
ValidTypes::Announce | ValidTypes::Create => {
handle_announce(&state, &config, &client, input, actor).await
handle_announce(&state, &config, &jobs, input, actor).await
}
ValidTypes::Follow => handle_follow(&db, &config, &client, input, actor, is_listener).await,
ValidTypes::Follow => handle_follow(&db, &config, &jobs, input, actor, is_listener).await,
ValidTypes::Delete | ValidTypes::Update => {
handle_forward(&state, &client, input, actor).await
handle_forward(&state, &jobs, input, actor).await
}
ValidTypes::Undo => handle_undo(&db, &state, &config, &client, input, actor).await,
ValidTypes::Undo => handle_undo(&db, &state, &config, &jobs, input, actor).await,
}
}
async fn handle_accept(config: &Config, input: AcceptedObjects) -> Result<HttpResponse, MyError> {
if !input.object.is_kind("Follow") {
return Err(MyError::Kind(
input.object.kind().unwrap_or("unknown").to_owned(),
));
}
if !input
.object
.child_actor_is(&config.generate_url(UrlKind::Actor).parse()?)
{
return Err(MyError::WrongActor(input.object.id().to_string()));
}
Ok(accepted(serde_json::json!({})))
}
async fn handle_reject(
db: &Db,
config: &Config,
jobs: &JobServer,
input: AcceptedObjects,
actor: AcceptedActors,
) -> Result<HttpResponse, MyError> {
if !input.object.is_kind("Follow") {
return Err(MyError::Kind(
input.object.kind().unwrap_or("unknown").to_owned(),
));
}
if !input
.object
.child_actor_is(&config.generate_url(UrlKind::Actor).parse()?)
{
return Err(MyError::WrongActor(input.object.id().to_string()));
}
let my_id: XsdAnyUri = config.generate_url(UrlKind::Actor).parse()?;
let inbox = actor.inbox().to_owned();
db.remove_listener(inbox).await?;
let undo = generate_undo_follow(config, &actor.id, &my_id)?;
let inbox = actor.inbox().to_owned();
jobs.queue(Deliver::new(inbox, undo.clone())?)?;
Ok(accepted(undo))
}
async fn handle_undo(
db: &Db,
state: &State,
config: &Config,
client: &Requests,
jobs: &JobServer,
input: AcceptedObjects,
actor: AcceptedActors,
) -> Result<HttpResponse, MyError> {
@ -95,7 +150,7 @@ async fn handle_undo(
}
if !input.object.is_kind("Follow") {
return handle_forward(state, client, input, actor).await;
return handle_forward(state, jobs, input, actor).await;
}
let my_id: XsdAnyUri = config.generate_url(UrlKind::Actor).parse()?;
@ -109,26 +164,22 @@ async fn handle_undo(
let undo = generate_undo_follow(config, &actor.id, &my_id)?;
let client2 = client.clone();
let inbox = actor.inbox().clone();
let undo2 = undo.clone();
actix::Arbiter::spawn(async move {
let _ = client2.deliver(inbox, &undo2).await;
});
let inbox = actor.inbox().to_owned();
jobs.queue(Deliver::new(inbox, undo.clone())?)?;
Ok(accepted(undo))
}
async fn handle_forward(
state: &State,
client: &Requests,
jobs: &JobServer,
input: AcceptedObjects,
actor: AcceptedActors,
) -> Result<HttpResponse, MyError> {
let object_id = input.object.id();
let inboxes = get_inboxes(state, &actor, &object_id).await?;
client.deliver_many(inboxes, input.clone());
jobs.queue(DeliverMany::new(inboxes, input.clone())?)?;
Ok(accepted(input))
}
@ -136,7 +187,7 @@ async fn handle_forward(
async fn handle_announce(
state: &State,
config: &Config,
client: &Requests,
jobs: &JobServer,
input: AcceptedObjects,
actor: AcceptedActors,
) -> Result<HttpResponse, MyError> {
@ -150,7 +201,7 @@ async fn handle_announce(
let announce = generate_announce(config, &activity_id, object_id)?;
let inboxes = get_inboxes(state, &actor, &object_id).await?;
client.deliver_many(inboxes, announce.clone());
jobs.queue(DeliverMany::new(inboxes, announce.clone())?)?;
state.cache(object_id.to_owned(), activity_id).await;
@ -160,7 +211,7 @@ async fn handle_announce(
async fn handle_follow(
db: &Db,
config: &Config,
client: &Requests,
jobs: &JobServer,
input: AcceptedObjects,
actor: AcceptedActors,
is_listener: bool,
@ -178,23 +229,15 @@ async fn handle_follow(
// if following relay directly, not just following 'public', followback
if input.object.is(&my_id) {
let follow = generate_follow(config, &actor.id, &my_id)?;
let client2 = client.clone();
let inbox = actor.inbox().clone();
let follow2 = follow.clone();
actix::Arbiter::spawn(async move {
let _ = client2.deliver(inbox, &follow2).await;
});
let inbox = actor.inbox().to_owned();
jobs.queue(Deliver::new(inbox, follow)?)?;
}
}
let accept = generate_accept_follow(config, &actor.id, &input.id, &my_id)?;
let client2 = client.clone();
let inbox = actor.inbox().clone();
let accept2 = accept.clone();
actix::Arbiter::spawn(async move {
let _ = client2.deliver(inbox, &accept2).await;
});
let inbox = actor.inbox().to_owned();
jobs.queue(Deliver::new(inbox, accept.clone())?)?;
Ok(accepted(accept))
}

56
src/jobs/deliver.rs Normal file
View file

@ -0,0 +1,56 @@
use crate::{error::MyError, jobs::JobState};
use activitystreams::primitives::XsdAnyUri;
use anyhow::Error;
use background_jobs::{Job, Processor};
use std::{future::Future, pin::Pin};
use tokio::sync::oneshot;
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Deliver {
to: XsdAnyUri,
data: serde_json::Value,
}
impl Deliver {
pub fn new<T>(to: XsdAnyUri, data: T) -> Result<Self, MyError>
where
T: serde::ser::Serialize,
{
Ok(Deliver {
to,
data: serde_json::to_value(data)?,
})
}
async fn perform(self, state: JobState) -> Result<(), Error> {
state.requests.deliver(self.to, &self.data).await?;
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct DeliverProcessor;
impl Job for Deliver {
type State = JobState;
type Processor = DeliverProcessor;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>;
fn run(self, state: Self::State) -> Self::Future {
let (tx, rx) = oneshot::channel();
actix::spawn(async move {
let _ = tx.send(self.perform(state).await);
});
Box::pin(async move { rx.await? })
}
}
impl Processor for DeliverProcessor {
type Job = Deliver;
const NAME: &'static str = "DeliverProcessor";
const QUEUE: &'static str = "default";
}

56
src/jobs/deliver_many.rs Normal file
View file

@ -0,0 +1,56 @@
use crate::{
error::MyError,
jobs::{Deliver, JobState},
};
use activitystreams::primitives::XsdAnyUri;
use anyhow::Error;
use background_jobs::{Job, Processor};
use futures::future::{ready, Ready};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct DeliverMany {
to: Vec<XsdAnyUri>,
data: serde_json::Value,
}
impl DeliverMany {
pub fn new<T>(to: Vec<XsdAnyUri>, data: T) -> Result<Self, MyError>
where
T: serde::ser::Serialize,
{
Ok(DeliverMany {
to,
data: serde_json::to_value(data)?,
})
}
fn perform(self, state: JobState) -> Result<(), Error> {
for inbox in self.to {
state
.job_server
.queue(Deliver::new(inbox, self.data.clone())?)?;
}
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct DeliverManyProcessor;
impl Job for DeliverMany {
type State = JobState;
type Processor = DeliverManyProcessor;
type Future = Ready<Result<(), Error>>;
fn run(self, state: Self::State) -> Self::Future {
ready(self.perform(state))
}
}
impl Processor for DeliverManyProcessor {
type Job = DeliverMany;
const NAME: &'static str = "DeliverManyProcessor";
const QUEUE: &'static str = "default";
}

64
src/jobs/mod.rs Normal file
View file

@ -0,0 +1,64 @@
mod deliver;
mod deliver_many;
pub use self::{deliver::Deliver, deliver_many::DeliverMany};
use crate::{
error::MyError,
jobs::{deliver::DeliverProcessor, deliver_many::DeliverManyProcessor},
requests::Requests,
state::State,
};
use background_jobs::{memory_storage::Storage, Job, QueueHandle, WorkerConfig};
pub fn create_server() -> JobServer {
JobServer::new(background_jobs::create_server(Storage::new()))
}
pub fn create_workers(state: State, job_server: JobServer) {
let queue_handle = job_server.queue_handle();
WorkerConfig::new(move || JobState::new(state.requests(), job_server.clone()))
.register(DeliverProcessor)
.register(DeliverManyProcessor)
.set_processor_count("default", 4)
.start(queue_handle);
}
#[derive(Clone)]
pub struct JobState {
requests: Requests,
job_server: JobServer,
}
#[derive(Clone)]
pub struct JobServer {
inner: QueueHandle,
}
impl JobState {
fn new(requests: Requests, job_server: JobServer) -> Self {
JobState {
requests,
job_server,
}
}
}
impl JobServer {
fn new(queue_handle: QueueHandle) -> Self {
JobServer {
inner: queue_handle,
}
}
pub fn queue_handle(&self) -> QueueHandle {
self.inner.clone()
}
pub fn queue<J>(&self, job: J) -> Result<(), MyError>
where
J: Job,
{
self.inner.queue(job).map_err(MyError::Queue)
}
}

View file

@ -17,6 +17,7 @@ mod config;
mod db;
mod error;
mod inbox;
mod jobs;
mod nodeinfo;
mod notify;
mod rehydrate;
@ -27,8 +28,14 @@ mod verifier;
mod webfinger;
use self::{
args::Args, config::Config, db::Db, error::MyError, state::State,
templates::statics::StaticFile, webfinger::RelayResolver,
args::Args,
config::Config,
db::Db,
error::MyError,
jobs::{create_server, create_workers},
state::State,
templates::statics::StaticFile,
webfinger::RelayResolver,
};
async fn index(
@ -102,16 +109,21 @@ async fn main() -> Result<(), anyhow::Error> {
rehydrate::spawn(db.clone(), state.clone());
let job_server = create_server();
let _ = notify::NotifyHandler::start_handler(state.clone(), pg_config.clone());
let bind_address = config.bind_address();
HttpServer::new(move || {
create_workers(state.clone(), job_server.clone());
App::new()
.wrap(Logger::default())
.data(db.clone())
.data(state.clone())
.data(state.requests())
.data(config.clone())
.data(job_server.clone())
.service(web::resource("/").route(web::get().to(index)))
.service(
web::resource("/inbox")

View file

@ -1,8 +1,6 @@
use crate::{apub::AcceptedActors, error::MyError, state::ActorCache};
use activitystreams::primitives::XsdAnyUri;
use actix::Arbiter;
use actix_web::client::Client;
use futures::stream::StreamExt;
use http_signature_normalization_actix::prelude::*;
use log::error;
use rsa::{hash::Hashes, padding::PaddingScheme, RSAPrivateKey};
@ -67,14 +65,15 @@ impl Requests {
})?;
if !res.status().is_success() {
error!("Invalid status code for fetch, {}", res.status());
if let Ok(bytes) = res.body().await {
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
error!("Response, {}", s);
if !s.is_empty() {
error!("Response, {}", s);
}
}
}
return Err(MyError::Status);
return Err(MyError::Status(res.status()));
}
res.json().await.map_err(|e| {
@ -83,23 +82,6 @@ impl Requests {
})
}
pub fn deliver_many<T>(&self, inboxes: Vec<XsdAnyUri>, item: T)
where
T: serde::ser::Serialize + 'static,
{
let this = self.clone();
Arbiter::spawn(async move {
let mut unordered = futures::stream::FuturesUnordered::new();
for inbox in inboxes {
unordered.push(this.deliver(inbox, &item));
}
while let Some(_) = unordered.next().await {}
});
}
pub async fn deliver<T>(&self, inbox: XsdAnyUri, item: &T) -> Result<(), MyError>
where
T: serde::ser::Serialize,
@ -129,19 +111,20 @@ impl Requests {
})?;
if !res.status().is_success() {
error!("Invalid response status from {}, {}", inbox, res.status());
if let Ok(bytes) = res.body().await {
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
error!("Response, {}", s);
if !s.is_empty() {
error!("Response, {}", s);
}
}
}
return Err(MyError::Status);
return Err(MyError::Status(res.status()));
}
Ok(())
}
fn sign(&self, signing_string: &str) -> Result<String, crate::error::MyError> {
fn sign(&self, signing_string: &str) -> Result<String, MyError> {
let hashed = Sha256::digest(signing_string.as_bytes());
let bytes =
self.private_key