Rewrite community outbox to use new fetcher

This commit is contained in:
Felix Ableitner 2021-10-25 16:15:03 +02:00
parent 20af10dddf
commit a75b6cb5c9
13 changed files with 243 additions and 135 deletions

1
Cargo.lock generated
View file

@ -1833,6 +1833,7 @@ dependencies = [
"http",
"http-signature-normalization-actix",
"itertools",
"lazy_static",
"lemmy_api_common",
"lemmy_apub_lib",
"lemmy_db_schema",

View file

@ -50,6 +50,7 @@ thiserror = "1.0.29"
background-jobs = "0.9.0"
reqwest = { version = "0.11.4", features = ["json"] }
html2md = "0.2.13"
lazy_static = "1.4.0"
[dev-dependencies]
serial_test = "0.5.1"

View file

@ -48,6 +48,28 @@ pub struct CreateOrUpdatePost {
}
impl CreateOrUpdatePost {
pub(crate) async fn new(
post: &ApubPost,
actor: &ApubPerson,
community: &ApubCommunity,
kind: CreateOrUpdateType,
context: &LemmyContext,
) -> Result<CreateOrUpdatePost, LemmyError> {
let id = generate_activity_id(
kind.clone(),
&context.settings().get_protocol_and_hostname(),
)?;
Ok(CreateOrUpdatePost {
actor: ObjectId::new(actor.actor_id()),
to: [PublicUrl::Public],
object: post.to_apub(context).await?,
cc: [ObjectId::new(community.actor_id())],
kind,
id: id.clone(),
context: lemmy_context(),
unparsed: Default::default(),
})
}
pub async fn send(
post: &ApubPost,
actor: &ApubPerson,
@ -60,22 +82,8 @@ impl CreateOrUpdatePost {
})
.await??
.into();
let id = generate_activity_id(
kind.clone(),
&context.settings().get_protocol_and_hostname(),
)?;
let create_or_update = CreateOrUpdatePost {
actor: ObjectId::new(actor.actor_id()),
to: [PublicUrl::Public],
object: post.to_apub(context).await?,
cc: [ObjectId::new(community.actor_id())],
kind,
id: id.clone(),
context: lemmy_context(),
unparsed: Default::default(),
};
let create_or_update = CreateOrUpdatePost::new(post, actor, &community, kind, context).await?;
let id = create_or_update.id.clone();
let activity = AnnouncableActivities::CreateOrUpdatePost(Box::new(create_or_update));
send_to_community(activity, &id, actor, &community, vec![], context).await
}

View file

@ -0,0 +1,132 @@
use crate::{
activities::{post::create_or_update::CreateOrUpdatePost, CreateOrUpdateType},
context::lemmy_context,
generate_outbox_url,
objects::{community::ApubCommunity, person::ApubPerson, post::ApubPost},
};
use activitystreams::{
base::AnyBase,
chrono::NaiveDateTime,
collection::kind::OrderedCollectionType,
object::Tombstone,
primitives::OneOrMany,
url::Url,
};
use lemmy_api_common::blocking;
use lemmy_apub_lib::{
data::Data,
traits::{ActivityHandler, ApubObject},
verify::verify_domains_match,
};
use lemmy_db_schema::{
source::{person::Person, post::Post},
traits::Crud,
};
use lemmy_utils::LemmyError;
use lemmy_websocket::LemmyContext;
use serde::{Deserialize, Serialize};
use serde_with::skip_serializing_none;
#[skip_serializing_none]
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CommunityOutbox {
#[serde(rename = "@context")]
context: OneOrMany<AnyBase>,
r#type: OrderedCollectionType,
id: Url,
ordered_items: Vec<CreateOrUpdatePost>,
}
#[derive(Clone, Debug)]
pub(crate) struct ApubCommunityOutbox(Vec<ApubPost>);
/// Put community in the data, so we dont have to read it again from the database.
pub(crate) struct OutboxData(pub ApubCommunity, pub LemmyContext);
#[async_trait::async_trait(?Send)]
impl ApubObject for ApubCommunityOutbox {
type DataType = OutboxData;
type TombstoneType = Tombstone;
type ApubType = CommunityOutbox;
fn last_refreshed_at(&self) -> Option<NaiveDateTime> {
None
}
async fn read_from_apub_id(
_object_id: Url,
data: &Self::DataType,
) -> Result<Option<Self>, LemmyError> {
// Only read from database if its a local community, otherwise fetch over http
if data.0.local {
let community_id = data.0.id;
let post_list: Vec<ApubPost> = blocking(data.1.pool(), move |conn| {
Post::list_for_community(conn, community_id)
})
.await??
.into_iter()
.map(Into::into)
.collect();
Ok(Some(ApubCommunityOutbox(post_list)))
} else {
Ok(None)
}
}
async fn delete(self, _data: &Self::DataType) -> Result<(), LemmyError> {
// do nothing (it gets deleted automatically with the community)
Ok(())
}
async fn to_apub(&self, data: &Self::DataType) -> Result<Self::ApubType, LemmyError> {
let mut ordered_items = vec![];
for post in &self.0 {
let actor = post.creator_id;
let actor: ApubPerson = blocking(data.1.pool(), move |conn| Person::read(conn, actor))
.await??
.into();
let a =
CreateOrUpdatePost::new(post, &actor, &data.0, CreateOrUpdateType::Create, &data.1).await?;
ordered_items.push(a);
}
Ok(CommunityOutbox {
context: lemmy_context(),
r#type: OrderedCollectionType::OrderedCollection,
id: generate_outbox_url(&data.0.actor_id)?.into(),
ordered_items,
})
}
fn to_tombstone(&self) -> Result<Self::TombstoneType, LemmyError> {
// no tombstone for this, there is only a tombstone for the community
unimplemented!()
}
async fn from_apub(
apub: &Self::ApubType,
data: &Self::DataType,
expected_domain: &Url,
request_counter: &mut i32,
) -> Result<Self, LemmyError> {
verify_domains_match(expected_domain, &apub.id)?;
let mut outbox_activities = apub.ordered_items.clone();
if outbox_activities.len() > 20 {
outbox_activities = outbox_activities[0..20].to_vec();
}
// We intentionally ignore errors here. This is because the outbox might contain posts from old
// Lemmy versions, or from other software which we cant parse. In that case, we simply skip the
// item and only parse the ones that work.
for activity in outbox_activities {
activity
.receive(&Data::new(data.1.clone()), request_counter)
.await
.ok();
}
// This return value is unused, so just set an empty vec
Ok(ApubCommunityOutbox { 0: vec![] })
}
}

View file

@ -0,0 +1 @@
pub(crate) mod community_outbox;

View file

@ -1,15 +1,10 @@
use crate::{
activities::community::announce::AnnounceActivity,
fetcher::{fetch::fetch_remote_object, object_id::ObjectId},
objects::{community::Group, person::ApubPerson},
};
use activitystreams::{
base::AnyBase,
collection::{CollectionExt, OrderedCollection},
};
use activitystreams::collection::{CollectionExt, OrderedCollection};
use anyhow::Context;
use lemmy_api_common::blocking;
use lemmy_apub_lib::{data::Data, traits::ActivityHandler};
use lemmy_db_schema::{
source::community::{Community, CommunityModerator, CommunityModeratorForm},
traits::Joinable,
@ -70,51 +65,6 @@ pub(crate) async fn update_community_mods(
Ok(())
}
pub(crate) async fn fetch_community_outbox(
context: &LemmyContext,
outbox: &Url,
recursion_counter: &mut i32,
) -> Result<(), LemmyError> {
let outbox = fetch_remote_object::<OrderedCollection>(
context.client(),
&context.settings(),
outbox,
recursion_counter,
)
.await?;
let outbox_activities = outbox.items().context(location_info!())?.clone();
let mut outbox_activities = outbox_activities.many().context(location_info!())?;
if outbox_activities.len() > 20 {
outbox_activities = outbox_activities[0..20].to_vec();
}
// We intentionally ignore errors here. This is because the outbox might contain posts from old
// Lemmy versions, or from other software which we cant parse. In that case, we simply skip the
// item and only parse the ones that work.
for activity in outbox_activities {
parse_outbox_item(activity, context, recursion_counter)
.await
.ok();
}
Ok(())
}
async fn parse_outbox_item(
announce: AnyBase,
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError> {
// TODO: instead of converting like this, we should create a struct CommunityOutbox with
// AnnounceActivity as inner type, but that gives me stackoverflow
let ser = serde_json::to_string(&announce)?;
let announce: AnnounceActivity = serde_json::from_str(&ser)?;
announce
.receive(&Data::new(context.clone()), request_counter)
.await?;
Ok(())
}
async fn fetch_community_mods(
context: &LemmyContext,
group: &Group,

View file

@ -47,7 +47,7 @@ pub(crate) async fn get_or_fetch_and_upsert_actor(
///
/// TODO it won't pick up new avatars, summaries etc until a day after.
/// Actors need an "update" activity pushed to other servers to fix this.
fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool {
fn should_refetch_object(last_refreshed: NaiveDateTime) -> bool {
let update_interval = if cfg!(debug_assertions) {
// avoid infinite loop when fetching community outbox
chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG)

View file

@ -1,11 +1,14 @@
use crate::fetcher::should_refetch_actor;
use crate::fetcher::should_refetch_object;
use anyhow::anyhow;
use diesel::NotFound;
use lemmy_apub_lib::{traits::ApubObject, APUB_JSON_CONTENT_TYPE};
use lemmy_db_schema::newtypes::DbUrl;
use lemmy_utils::{request::retry, settings::structs::Settings, LemmyError};
use lemmy_websocket::LemmyContext;
use reqwest::StatusCode;
use lemmy_utils::{
request::{build_user_agent, retry},
settings::structs::Settings,
LemmyError,
};
use reqwest::{Client, StatusCode};
use serde::{Deserialize, Serialize};
use std::{
fmt::{Debug, Display, Formatter},
@ -18,16 +21,24 @@ use url::Url;
/// fetch through the search). This should be configurable.
static REQUEST_LIMIT: i32 = 25;
// TODO: after moving this file to library, remove lazy_static dependency from apub crate
lazy_static! {
static ref CLIENT: Client = Client::builder()
.user_agent(build_user_agent(&Settings::get()))
.build()
.unwrap();
}
#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
#[serde(transparent)]
pub struct ObjectId<Kind>(Url, #[serde(skip)] PhantomData<Kind>)
where
Kind: ApubObject<DataType = LemmyContext> + Send + 'static,
Kind: ApubObject + Send + 'static,
for<'de2> <Kind as ApubObject>::ApubType: serde::Deserialize<'de2>;
impl<Kind> ObjectId<Kind>
where
Kind: ApubObject<DataType = LemmyContext> + Send + 'static,
Kind: ApubObject + Send + 'static,
for<'de2> <Kind as ApubObject>::ApubType: serde::Deserialize<'de2>,
{
pub fn new<T>(url: T) -> Self
@ -44,10 +55,10 @@ where
/// Fetches an activitypub object, either from local database (if possible), or over http.
pub async fn dereference(
&self,
context: &LemmyContext,
data: &<Kind as ApubObject>::DataType,
request_counter: &mut i32,
) -> Result<Kind, LemmyError> {
let db_object = self.dereference_from_db(context).await?;
let db_object = self.dereference_from_db(data).await?;
// if its a local object, only fetch it from the database and not over http
if self.0.domain() == Some(&Settings::get().get_hostname_without_port()?) {
@ -57,39 +68,48 @@ where
};
}
// object found in database
if let Some(object) = db_object {
// object is old and should be refetched
if let Some(last_refreshed_at) = object.last_refreshed_at() {
// TODO: rename to should_refetch_object()
if should_refetch_actor(last_refreshed_at) {
if should_refetch_object(last_refreshed_at) {
return self
.dereference_from_http(context, request_counter, Some(object))
.dereference_from_http(data, request_counter, Some(object))
.await;
}
}
Ok(object)
} else {
}
// object not found, need to fetch over http
else {
self
.dereference_from_http(context, request_counter, None)
.dereference_from_http(data, request_counter, None)
.await
}
}
/// Fetch an object from the local db. Instead of falling back to http, this throws an error if
/// the object is not found in the database.
pub async fn dereference_local(&self, context: &LemmyContext) -> Result<Kind, LemmyError> {
let object = self.dereference_from_db(context).await?;
pub async fn dereference_local(
&self,
data: &<Kind as ApubObject>::DataType,
) -> Result<Kind, LemmyError> {
let object = self.dereference_from_db(data).await?;
object.ok_or_else(|| anyhow!("object not found in database {}", self).into())
}
/// returning none means the object was not found in local db
async fn dereference_from_db(&self, context: &LemmyContext) -> Result<Option<Kind>, LemmyError> {
async fn dereference_from_db(
&self,
data: &<Kind as ApubObject>::DataType,
) -> Result<Option<Kind>, LemmyError> {
let id = self.0.clone();
ApubObject::read_from_apub_id(id, context).await
ApubObject::read_from_apub_id(id, data).await
}
async fn dereference_from_http(
&self,
context: &LemmyContext,
data: &<Kind as ApubObject>::DataType,
request_counter: &mut i32,
db_object: Option<Kind>,
) -> Result<Kind, LemmyError> {
@ -102,8 +122,7 @@ where
}
let res = retry(|| {
context
.client()
CLIENT
.get(self.0.as_str())
.header("Accept", APUB_JSON_CONTENT_TYPE)
.timeout(Duration::from_secs(60))
@ -113,20 +132,20 @@ where
if res.status() == StatusCode::GONE {
if let Some(db_object) = db_object {
db_object.delete(context).await?;
db_object.delete(data).await?;
}
return Err(anyhow!("Fetched remote object {} which was deleted", self).into());
}
let res2: Kind::ApubType = res.json().await?;
Ok(Kind::from_apub(&res2, context, self.inner(), request_counter).await?)
Ok(Kind::from_apub(&res2, data, self.inner(), request_counter).await?)
}
}
impl<Kind> Display for ObjectId<Kind>
where
Kind: ApubObject<DataType = LemmyContext> + Send + 'static,
Kind: ApubObject + Send + 'static,
for<'de2> <Kind as ApubObject>::ApubType: serde::Deserialize<'de2>,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
@ -136,7 +155,7 @@ where
impl<Kind> From<ObjectId<Kind>> for Url
where
Kind: ApubObject<DataType = LemmyContext> + Send + 'static,
Kind: ApubObject + Send + 'static,
for<'de2> <Kind as ApubObject>::ApubType: serde::Deserialize<'de2>,
{
fn from(id: ObjectId<Kind>) -> Self {
@ -146,7 +165,7 @@ where
impl<Kind> From<ObjectId<Kind>> for DbUrl
where
Kind: ApubObject<DataType = LemmyContext> + Send + 'static,
Kind: ApubObject + Send + 'static,
for<'de2> <Kind as ApubObject>::ApubType: serde::Deserialize<'de2>,
{
fn from(id: ObjectId<Kind>) -> Self {

View file

@ -5,24 +5,30 @@ use crate::{
following::{follow::FollowCommunity, undo::UndoFollowCommunity},
report::Report,
},
collections::community_outbox::{ApubCommunityOutbox, OutboxData},
context::lemmy_context,
generate_moderators_url, generate_outbox_url,
fetcher::object_id::ObjectId,
generate_moderators_url,
http::{
create_apub_response, create_apub_tombstone_response, payload_to_string, receive_activity,
create_apub_response,
create_apub_tombstone_response,
payload_to_string,
receive_activity,
},
objects::community::ApubCommunity,
};
use activitystreams::{
base::{AnyBase, BaseExt},
base::BaseExt,
collection::{CollectionExt, OrderedCollection, UnorderedCollection},
url::Url,
};
use actix_web::{body::Body, web, web::Payload, HttpRequest, HttpResponse};
use lemmy_api_common::blocking;
use lemmy_apub_lib::traits::{ActivityFields, ActivityHandler, ApubObject};
use lemmy_db_schema::source::{activity::Activity, community::Community};
use lemmy_db_schema::source::community::Community;
use lemmy_db_views_actor::{
community_follower_view::CommunityFollowerView, community_moderator_view::CommunityModeratorView,
community_follower_view::CommunityFollowerView,
community_moderator_view::CommunityModeratorView,
};
use lemmy_utils::LemmyError;
use lemmy_websocket::LemmyContext;
@ -122,31 +128,18 @@ pub(crate) async fn get_apub_community_followers(
/// activites like votes or comments).
pub(crate) async fn get_apub_community_outbox(
info: web::Path<CommunityQuery>,
req: HttpRequest,
context: web::Data<LemmyContext>,
) -> Result<HttpResponse<Body>, LemmyError> {
let community = blocking(context.pool(), move |conn| {
Community::read_from_name(conn, &info.community_name)
})
.await??;
let community_actor_id = community.actor_id.to_owned();
let activities = blocking(context.pool(), move |conn| {
Activity::read_community_outbox(conn, &community_actor_id)
})
.await??;
let activities = activities
.iter()
.map(AnyBase::from_arbitrary_json)
.collect::<Result<Vec<AnyBase>, serde_json::Error>>()?;
let len = activities.len();
let mut collection = OrderedCollection::new();
collection
.set_many_items(activities)
.set_many_contexts(lemmy_context())
.set_id(generate_outbox_url(&community.actor_id)?.into())
.set_total_items(len as u64);
Ok(create_apub_response(&collection))
let outbox_data = OutboxData(community.into(), context.get_ref().clone());
let url = Url::parse(&req.head().uri.to_string())?;
let id = ObjectId::<ApubCommunityOutbox>::new(url);
let outbox = id.dereference(&outbox_data, &mut 0).await?;
Ok(create_apub_response(&outbox.to_apub(&outbox_data).await?))
}
pub(crate) async fn get_apub_community_inbox(

View file

@ -1,10 +1,14 @@
pub mod activities;
pub(crate) mod collections;
mod context;
pub mod fetcher;
pub mod http;
pub mod migrations;
pub mod objects;
#[macro_use]
extern crate lazy_static;
use crate::fetcher::post_or_comment::PostOrComment;
use anyhow::{anyhow, Context};
use lemmy_api_common::blocking;

View file

@ -313,6 +313,8 @@ mod tests {
#[actix_rt::test]
#[serial]
async fn test_parse_lemmy_comment() {
// TODO: changed ObjectId::dereference() so that it always fetches if
// last_refreshed_at() == None. But post doesnt store that and expects to never be refetched
let context = init_context();
let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap();
let data = prepare_comment_test(&url, &context).await;

View file

@ -1,7 +1,8 @@
use crate::{
check_is_apub_id_valid,
collections::community_outbox::{ApubCommunityOutbox, OutboxData},
context::lemmy_context,
fetcher::community::{fetch_community_outbox, update_community_mods},
fetcher::{community::update_community_mods, object_id::ObjectId},
generate_moderators_url,
generate_outbox_url,
objects::{create_tombstone, get_summary_from_string_or_source, ImageObject, Source},
@ -65,7 +66,7 @@ pub struct Group {
// lemmy extension
pub(crate) moderators: Option<Url>,
inbox: Url,
pub(crate) outbox: Url,
pub(crate) outbox: ObjectId<ApubCommunityOutbox>,
followers: Url,
endpoints: Endpoints<Url>,
public_key: PublicKey,
@ -193,7 +194,7 @@ impl ApubObject for ApubCommunity {
sensitive: Some(self.nsfw),
moderators: Some(generate_moderators_url(&self.actor_id)?.into()),
inbox: self.inbox_url.clone().into(),
outbox: generate_outbox_url(&self.actor_id)?.into(),
outbox: ObjectId::new(generate_outbox_url(&self.actor_id)?),
followers: self.followers_url.clone().into(),
endpoints: Endpoints {
shared_inbox: self.shared_inbox_url.clone().map(|s| s.into()),
@ -227,19 +228,24 @@ impl ApubObject for ApubCommunity {
// Fetching mods and outbox is not necessary for Lemmy to work, so ignore errors. Besides,
// we need to ignore these errors so that tests can work entirely offline.
let community = blocking(context.pool(), move |conn| Community::upsert(conn, &form)).await??;
let community: ApubCommunity =
blocking(context.pool(), move |conn| Community::upsert(conn, &form))
.await??
.into();
update_community_mods(group, &community, context, request_counter)
.await
.map_err(|e| debug!("{}", e))
.ok();
// TODO: doing this unconditionally might cause infinite loop for some reason
fetch_community_outbox(context, &group.outbox, request_counter)
let outbox_data = OutboxData(community.clone(), context.clone());
group
.outbox
.dereference(&outbox_data, request_counter)
.await
.map_err(|e| debug!("{}", e))
.ok();
Ok(community.into())
Ok(community)
}
}
@ -318,7 +324,8 @@ mod tests {
// change these links so they dont fetch over the network
json.moderators =
Some(Url::parse("https://enterprise.lemmy.ml/c/tenforward/not_moderators").unwrap());
json.outbox = Url::parse("https://enterprise.lemmy.ml/c/tenforward/not_outbox").unwrap();
json.outbox =
ObjectId::new(Url::parse("https://enterprise.lemmy.ml/c/tenforward/not_outbox").unwrap());
let url = Url::parse("https://enterprise.lemmy.ml/c/tenforward").unwrap();
let mut request_counter = 0;

View file

@ -126,16 +126,6 @@ impl Community {
community.select(actor_id).distinct().load::<String>(conn)
}
pub fn read_from_followers_url(
conn: &PgConnection,
followers_url_: &DbUrl,
) -> Result<Community, Error> {
use crate::schema::community::dsl::*;
community
.filter(followers_url.eq(followers_url_))
.first::<Self>(conn)
}
pub fn upsert(conn: &PgConnection, community_form: &CommunityForm) -> Result<Community, Error> {
use crate::schema::community::dsl::*;
insert_into(community)