Merge pull request 'Ignore incoming activities which have been received before, add /activities endpoint' (#118) from activity-checks into main

Reviewed-on: https://yerbamate.dev/LemmyNet/lemmy/pulls/118
This commit is contained in:
dessalines 2020-10-27 16:26:16 +00:00
commit 3bf885329d
21 changed files with 207 additions and 95 deletions

View file

@ -73,7 +73,7 @@ pub(crate) async fn receive_create_comment(
websocket_id: None, websocket_id: None,
}); });
announce_if_community_is_local(create, &user, context, request_counter).await?; announce_if_community_is_local(create, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -131,7 +131,7 @@ pub(crate) async fn receive_update_comment(
websocket_id: None, websocket_id: None,
}); });
announce_if_community_is_local(update, &user, context, request_counter).await?; announce_if_community_is_local(update, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -183,7 +183,7 @@ pub(crate) async fn receive_like_comment(
websocket_id: None, websocket_id: None,
}); });
announce_if_community_is_local(like, &user, context, request_counter).await?; announce_if_community_is_local(like, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -241,7 +241,7 @@ pub(crate) async fn receive_dislike_comment(
websocket_id: None, websocket_id: None,
}); });
announce_if_community_is_local(dislike, &user, context, request_counter).await?; announce_if_community_is_local(dislike, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -276,8 +276,7 @@ pub(crate) async fn receive_delete_comment(
websocket_id: None, websocket_id: None,
}); });
let user = get_actor_as_user(&delete, context, request_counter).await?; announce_if_community_is_local(delete, context, request_counter).await?;
announce_if_community_is_local(delete, &user, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }

View file

@ -57,7 +57,7 @@ pub(crate) async fn receive_undo_like_comment(
websocket_id: None, websocket_id: None,
}); });
announce_if_community_is_local(undo, &user, context, request_counter).await?; announce_if_community_is_local(undo, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -109,7 +109,7 @@ pub(crate) async fn receive_undo_dislike_comment(
websocket_id: None, websocket_id: None,
}); });
announce_if_community_is_local(undo, &user, context, request_counter).await?; announce_if_community_is_local(undo, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -145,8 +145,7 @@ pub(crate) async fn receive_undo_delete_comment(
websocket_id: None, websocket_id: None,
}); });
let user = get_actor_as_user(&undo, context, request_counter).await?; announce_if_community_is_local(undo, context, request_counter).await?;
announce_if_community_is_local(undo, &user, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -182,7 +181,6 @@ pub(crate) async fn receive_undo_remove_comment(
websocket_id: None, websocket_id: None,
}); });
let mod_ = get_actor_as_user(&undo, context, request_counter).await?; announce_if_community_is_local(undo, context, request_counter).await?;
announce_if_community_is_local(undo, &mod_, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }

View file

@ -1,4 +1,4 @@
use crate::activities::receive::{announce_if_community_is_local, get_actor_as_user}; use crate::activities::receive::announce_if_community_is_local;
use activitystreams::activity::{Delete, Remove, Undo}; use activitystreams::activity::{Delete, Remove, Undo};
use actix_web::HttpResponse; use actix_web::HttpResponse;
use lemmy_db::{community::Community, community_view::CommunityView}; use lemmy_db::{community::Community, community_view::CommunityView};
@ -33,8 +33,7 @@ pub(crate) async fn receive_delete_community(
websocket_id: None, websocket_id: None,
}); });
let user = get_actor_as_user(&delete, context, request_counter).await?; announce_if_community_is_local(delete, context, request_counter).await?;
announce_if_community_is_local(delete, &user, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -95,8 +94,7 @@ pub(crate) async fn receive_undo_delete_community(
websocket_id: None, websocket_id: None,
}); });
let user = get_actor_as_user(&undo, context, request_counter).await?; announce_if_community_is_local(undo, context, request_counter).await?;
announce_if_community_is_local(undo, &user, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -128,7 +126,6 @@ pub(crate) async fn receive_undo_remove_community(
websocket_id: None, websocket_id: None,
}); });
let mod_ = get_actor_as_user(&undo, context, request_counter).await?; announce_if_community_is_local(undo, context, request_counter).await?;
announce_if_community_is_local(undo, &mod_, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }

View file

@ -39,7 +39,6 @@ where
/// community, the activity is announced to all community followers. /// community, the activity is announced to all community followers.
async fn announce_if_community_is_local<T, Kind>( async fn announce_if_community_is_local<T, Kind>(
activity: T, activity: T,
user: &User_,
context: &LemmyContext, context: &LemmyContext,
request_counter: &mut i32, request_counter: &mut i32,
) -> Result<(), LemmyError> ) -> Result<(), LemmyError>
@ -62,7 +61,7 @@ where
if community.local { if community.local {
community community
.send_announce(activity.into_any_base()?, &user, context) .send_announce(activity.into_any_base()?, context)
.await?; .await?;
} }
Ok(()) Ok(())

View file

@ -51,7 +51,7 @@ pub(crate) async fn receive_create_post(
websocket_id: None, websocket_id: None,
}); });
announce_if_community_is_local(create, &user, context, request_counter).await?; announce_if_community_is_local(create, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -89,7 +89,7 @@ pub(crate) async fn receive_update_post(
websocket_id: None, websocket_id: None,
}); });
announce_if_community_is_local(update, &user, context, request_counter).await?; announce_if_community_is_local(update, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -134,7 +134,7 @@ pub(crate) async fn receive_like_post(
websocket_id: None, websocket_id: None,
}); });
announce_if_community_is_local(like, &user, context, request_counter).await?; announce_if_community_is_local(like, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -185,7 +185,7 @@ pub(crate) async fn receive_dislike_post(
websocket_id: None, websocket_id: None,
}); });
announce_if_community_is_local(dislike, &user, context, request_counter).await?; announce_if_community_is_local(dislike, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -214,8 +214,7 @@ pub(crate) async fn receive_delete_post(
websocket_id: None, websocket_id: None,
}); });
let user = get_actor_as_user(&delete, context, request_counter).await?; announce_if_community_is_local(delete, context, request_counter).await?;
announce_if_community_is_local(delete, &user, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }

View file

@ -52,7 +52,7 @@ pub(crate) async fn receive_undo_like_post(
websocket_id: None, websocket_id: None,
}); });
announce_if_community_is_local(undo, &user, context, request_counter).await?; announce_if_community_is_local(undo, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -98,7 +98,7 @@ pub(crate) async fn receive_undo_dislike_post(
websocket_id: None, websocket_id: None,
}); });
announce_if_community_is_local(undo, &user, context, request_counter).await?; announce_if_community_is_local(undo, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -127,8 +127,7 @@ pub(crate) async fn receive_undo_delete_post(
websocket_id: None, websocket_id: None,
}); });
let user = get_actor_as_user(&undo, context, request_counter).await?; announce_if_community_is_local(undo, context, request_counter).await?;
announce_if_community_is_local(undo, &user, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -158,7 +157,6 @@ pub(crate) async fn receive_undo_remove_post(
websocket_id: None, websocket_id: None,
}); });
let mod_ = get_actor_as_user(&undo, context, request_counter).await?; announce_if_community_is_local(undo, context, request_counter).await?;
announce_if_community_is_local(undo, &mod_, context, request_counter).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }

View file

@ -95,7 +95,7 @@ impl ActorType for Community {
.set_to(public()) .set_to(public())
.set_many_ccs(vec![self.get_followers_url()?]); .set_many_ccs(vec![self.get_followers_url()?]);
send_to_community_followers(delete, self, context, None).await?; send_to_community_followers(delete, self, context).await?;
Ok(()) Ok(())
} }
@ -121,7 +121,7 @@ impl ActorType for Community {
.set_to(public()) .set_to(public())
.set_many_ccs(vec![self.get_followers_url()?]); .set_many_ccs(vec![self.get_followers_url()?]);
send_to_community_followers(undo, self, context, None).await?; send_to_community_followers(undo, self, context).await?;
Ok(()) Ok(())
} }
@ -134,7 +134,7 @@ impl ActorType for Community {
.set_to(public()) .set_to(public())
.set_many_ccs(vec![self.get_followers_url()?]); .set_many_ccs(vec![self.get_followers_url()?]);
send_to_community_followers(remove, self, context, None).await?; send_to_community_followers(remove, self, context).await?;
Ok(()) Ok(())
} }
@ -155,7 +155,7 @@ impl ActorType for Community {
.set_to(public()) .set_to(public())
.set_many_ccs(vec![self.get_followers_url()?]); .set_many_ccs(vec![self.get_followers_url()?]);
send_to_community_followers(undo, self, context, None).await?; send_to_community_followers(undo, self, context).await?;
Ok(()) Ok(())
} }
@ -164,7 +164,6 @@ impl ActorType for Community {
async fn send_announce( async fn send_announce(
&self, &self,
activity: AnyBase, activity: AnyBase,
sender: &User_,
context: &LemmyContext, context: &LemmyContext,
) -> Result<(), LemmyError> { ) -> Result<(), LemmyError> {
let mut announce = Announce::new(self.actor_id.to_owned(), activity); let mut announce = Announce::new(self.actor_id.to_owned(), activity);
@ -174,13 +173,7 @@ impl ActorType for Community {
.set_to(public()) .set_to(public())
.set_many_ccs(vec![self.get_followers_url()?]); .set_many_ccs(vec![self.get_followers_url()?]);
send_to_community_followers( send_to_community_followers(announce, self, context).await?;
announce,
self,
context,
Some(sender.get_shared_inbox_url()?),
)
.await?;
Ok(()) Ok(())
} }

View file

@ -15,7 +15,7 @@ where
T: ToString, T: ToString,
{ {
let id = format!( let id = format!(
"{}/receive/{}/{}", "{}/activities/{}/{}",
Settings::get().get_protocol_and_hostname(), Settings::get().get_protocol_and_hostname(),
kind.to_string().to_lowercase(), kind.to_string().to_lowercase(),
Uuid::new_v4() Uuid::new_v4()

View file

@ -121,7 +121,6 @@ impl ActorType for User_ {
async fn send_announce( async fn send_announce(
&self, &self,
_activity: AnyBase, _activity: AnyBase,
_sender: &User_,
_context: &LemmyContext, _context: &LemmyContext,
) -> Result<(), LemmyError> { ) -> Result<(), LemmyError> {
unimplemented!() unimplemented!()

View file

@ -74,24 +74,19 @@ pub async fn send_to_community_followers<T, Kind>(
activity: T, activity: T,
community: &Community, community: &Community,
context: &LemmyContext, context: &LemmyContext,
sender_shared_inbox: Option<Url>,
) -> Result<(), LemmyError> ) -> Result<(), LemmyError>
where where
T: AsObject<Kind> + Extends<Kind> + Debug + BaseExt<Kind>, T: AsObject<Kind> + Extends<Kind> + Debug + BaseExt<Kind>,
Kind: Serialize, Kind: Serialize,
<T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static, <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
{ {
// dont send to the local instance, nor to the instance where the activity originally came from,
// because that would result in a database error (same data inserted twice)
let community_shared_inbox = community.get_shared_inbox_url()?;
let follower_inboxes: Vec<Url> = community let follower_inboxes: Vec<Url> = community
.get_follower_inboxes(context.pool()) .get_follower_inboxes(context.pool())
.await? .await?
.iter() .iter()
.filter(|inbox| Some(inbox) != sender_shared_inbox.as_ref().as_ref())
.filter(|inbox| inbox != &&community_shared_inbox)
.filter(|inbox| check_is_apub_id_valid(inbox).is_ok())
.unique() .unique()
.filter(|inbox| inbox.host_str() != Some(&Settings::get().hostname))
.filter(|inbox| check_is_apub_id_valid(inbox).is_ok())
.map(|inbox| inbox.to_owned()) .map(|inbox| inbox.to_owned())
.collect(); .collect();
debug!( debug!(
@ -133,7 +128,7 @@ where
// if this is a local community, we need to do an announce from the community instead // if this is a local community, we need to do an announce from the community instead
if community.local { if community.local {
community community
.send_announce(activity.into_any_base()?, creator, context) .send_announce(activity.into_any_base()?, context)
.await?; .await?;
} else { } else {
let inbox = community.get_shared_inbox_url()?; let inbox = community.get_shared_inbox_url()?;
@ -223,7 +218,8 @@ where
// This is necessary because send_comment and send_comment_mentions // This is necessary because send_comment and send_comment_mentions
// might send the same ap_id // might send the same ap_id
if insert_into_db { if insert_into_db {
insert_activity(actor.user_id(), activity.clone(), true, pool).await?; let id = activity.id().context(location_info!())?;
insert_activity(id, actor.user_id(), activity.clone(), true, pool).await?;
} }
for i in inboxes { for i in inboxes {

View file

@ -1,6 +1,10 @@
use crate::APUB_JSON_CONTENT_TYPE; use crate::APUB_JSON_CONTENT_TYPE;
use actix_web::{body::Body, HttpResponse}; use actix_web::{body::Body, web, HttpResponse};
use serde::Serialize; use lemmy_db::activity::Activity;
use lemmy_structs::blocking;
use lemmy_utils::{settings::Settings, LemmyError};
use lemmy_websocket::LemmyContext;
use serde::{Deserialize, Serialize};
pub mod comment; pub mod comment;
pub mod community; pub mod community;
@ -26,3 +30,29 @@ where
.content_type(APUB_JSON_CONTENT_TYPE) .content_type(APUB_JSON_CONTENT_TYPE)
.json(data) .json(data)
} }
#[derive(Deserialize)]
pub struct CommunityQuery {
type_: String,
id: String,
}
/// Return the ActivityPub json representation of a local community over HTTP.
pub async fn get_activity(
info: web::Path<CommunityQuery>,
context: web::Data<LemmyContext>,
) -> Result<HttpResponse<Body>, LemmyError> {
let settings = Settings::get();
let activity_id = format!(
"{}/activities/{}/{}",
settings.get_protocol_and_hostname(),
info.type_,
info.id
);
let activity = blocking(context.pool(), move |conn| {
Activity::read_from_apub_id(&conn, &activity_id)
})
.await??;
Ok(create_apub_response(&activity.data))
}

View file

@ -3,6 +3,7 @@ use crate::{
check_is_apub_id_valid, check_is_apub_id_valid,
extensions::signatures::verify_signature, extensions::signatures::verify_signature,
fetcher::get_or_fetch_and_upsert_user, fetcher::get_or_fetch_and_upsert_user,
inbox::{get_activity_id, is_activity_already_known},
insert_activity, insert_activity,
ActorType, ActorType,
}; };
@ -80,6 +81,11 @@ pub async fn community_inbox(
verify_signature(&request, &user)?; verify_signature(&request, &user)?;
let activity_id = get_activity_id(&activity, user_uri)?;
if is_activity_already_known(context.pool(), &activity_id).await? {
return Ok(HttpResponse::Ok().finish());
}
let any_base = activity.clone().into_any_base()?; let any_base = activity.clone().into_any_base()?;
let kind = activity.kind().context(location_info!())?; let kind = activity.kind().context(location_info!())?;
let user_id = user.id; let user_id = user.id;
@ -88,7 +94,14 @@ pub async fn community_inbox(
ValidTypes::Undo => handle_undo_follow(any_base, user, community, &context).await, ValidTypes::Undo => handle_undo_follow(any_base, user, community, &context).await,
}; };
insert_activity(user_id, activity.clone(), false, context.pool()).await?; insert_activity(
&activity_id,
user_id,
activity.clone(),
false,
context.pool(),
)
.await?;
res res
} }

View file

@ -1,3 +1,37 @@
use activitystreams::base::{BaseExt, Extends};
use anyhow::Context;
use lemmy_db::{activity::Activity, DbPool};
use lemmy_structs::blocking;
use lemmy_utils::{location_info, LemmyError};
use serde::{export::fmt::Debug, Serialize};
use url::Url;
pub mod community_inbox; pub mod community_inbox;
pub mod shared_inbox; pub mod shared_inbox;
pub mod user_inbox; pub mod user_inbox;
pub(crate) fn get_activity_id<T, Kind>(activity: &T, creator_uri: &Url) -> Result<Url, LemmyError>
where
T: BaseExt<Kind> + Extends<Kind> + Debug,
Kind: Serialize,
<T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
{
let creator_domain = creator_uri.host_str().context(location_info!())?;
let activity_id = activity.id(creator_domain)?;
Ok(activity_id.context(location_info!())?.to_owned())
}
pub(crate) async fn is_activity_already_known(
pool: &DbPool,
activity_id: &Url,
) -> Result<bool, LemmyError> {
let activity_id = activity_id.to_string();
let existing = blocking(pool, move |conn| {
Activity::read_from_apub_id(&conn, &activity_id)
})
.await?;
match existing {
Ok(_) => Ok(true),
Err(_) => Ok(false),
}
}

View file

@ -42,6 +42,7 @@ use crate::{
check_is_apub_id_valid, check_is_apub_id_valid,
extensions::signatures::verify_signature, extensions::signatures::verify_signature,
fetcher::get_or_fetch_and_upsert_actor, fetcher::get_or_fetch_and_upsert_actor,
inbox::{get_activity_id, is_activity_already_known},
insert_activity, insert_activity,
ActorType, ActorType,
}; };
@ -104,6 +105,11 @@ pub async fn shared_inbox(
let actor = get_or_fetch_and_upsert_actor(&actor_id, &context, request_counter).await?; let actor = get_or_fetch_and_upsert_actor(&actor_id, &context, request_counter).await?;
verify_signature(&request, actor.as_ref())?; verify_signature(&request, actor.as_ref())?;
let activity_id = get_activity_id(&activity, &actor_id)?;
if is_activity_already_known(context.pool(), &activity_id).await? {
return Ok(HttpResponse::Ok().finish());
}
let any_base = activity.clone().into_any_base()?; let any_base = activity.clone().into_any_base()?;
let kind = activity.kind().context(location_info!())?; let kind = activity.kind().context(location_info!())?;
let res = match kind { let res = match kind {
@ -119,7 +125,14 @@ pub async fn shared_inbox(
ValidTypes::Undo => receive_undo(&context, any_base, actor_id, request_counter).await, ValidTypes::Undo => receive_undo(&context, any_base, actor_id, request_counter).await,
}; };
insert_activity(actor.user_id(), activity.clone(), false, context.pool()).await?; insert_activity(
&activity_id,
actor.user_id(),
activity.clone(),
false,
context.pool(),
)
.await?;
res res
} }
@ -142,6 +155,9 @@ async fn receive_announce(
let inner_id = object.id().context(location_info!())?.to_owned(); let inner_id = object.id().context(location_info!())?.to_owned();
check_is_apub_id_valid(&inner_id)?; check_is_apub_id_valid(&inner_id)?;
if is_activity_already_known(context.pool(), &inner_id).await? {
return Ok(HttpResponse::Ok().finish());
}
match kind { match kind {
Some("Create") => receive_create(context, object, inner_id, request_counter).await, Some("Create") => receive_create(context, object, inner_id, request_counter).await,

View file

@ -3,6 +3,7 @@ use crate::{
check_is_apub_id_valid, check_is_apub_id_valid,
extensions::signatures::verify_signature, extensions::signatures::verify_signature,
fetcher::{get_or_fetch_and_upsert_actor, get_or_fetch_and_upsert_community}, fetcher::{get_or_fetch_and_upsert_actor, get_or_fetch_and_upsert_community},
inbox::{get_activity_id, is_activity_already_known},
insert_activity, insert_activity,
ActorType, ActorType,
FromApub, FromApub,
@ -83,6 +84,11 @@ pub async fn user_inbox(
let actor = get_or_fetch_and_upsert_actor(actor_uri, &context, request_counter).await?; let actor = get_or_fetch_and_upsert_actor(actor_uri, &context, request_counter).await?;
verify_signature(&request, actor.as_ref())?; verify_signature(&request, actor.as_ref())?;
let activity_id = get_activity_id(&activity, actor_uri)?;
if is_activity_already_known(context.pool(), &activity_id).await? {
return Ok(HttpResponse::Ok().finish());
}
let any_base = activity.clone().into_any_base()?; let any_base = activity.clone().into_any_base()?;
let kind = activity.kind().context(location_info!())?; let kind = activity.kind().context(location_info!())?;
let res = match kind { let res = match kind {
@ -101,7 +107,14 @@ pub async fn user_inbox(
} }
}; };
insert_activity(actor.user_id(), activity.clone(), false, context.pool()).await?; insert_activity(
&activity_id,
actor.user_id(),
activity.clone(),
false,
context.pool(),
)
.await?;
res res
} }

View file

@ -22,7 +22,7 @@ use activitystreams::{
}; };
use activitystreams_ext::{Ext1, Ext2}; use activitystreams_ext::{Ext1, Ext2};
use anyhow::{anyhow, Context}; use anyhow::{anyhow, Context};
use lemmy_db::{activity::do_insert_activity, user::User_, DbPool}; use lemmy_db::{activity::Activity, user::User_, DbPool};
use lemmy_structs::blocking; use lemmy_structs::blocking;
use lemmy_utils::{location_info, settings::Settings, LemmyError}; use lemmy_utils::{location_info, settings::Settings, LemmyError};
use lemmy_websocket::LemmyContext; use lemmy_websocket::LemmyContext;
@ -202,7 +202,6 @@ pub trait ActorType {
async fn send_announce( async fn send_announce(
&self, &self,
activity: AnyBase, activity: AnyBase,
sender: &User_,
context: &LemmyContext, context: &LemmyContext,
) -> Result<(), LemmyError>; ) -> Result<(), LemmyError>;
@ -256,16 +255,18 @@ pub trait ActorType {
/// Store a sent or received activity in the database, for logging purposes. These records are not /// Store a sent or received activity in the database, for logging purposes. These records are not
/// persistent. /// persistent.
pub async fn insert_activity<T>( pub async fn insert_activity<T>(
ap_id: &Url,
user_id: i32, user_id: i32,
data: T, activity: T,
local: bool, local: bool,
pool: &DbPool, pool: &DbPool,
) -> Result<(), LemmyError> ) -> Result<(), LemmyError>
where where
T: Serialize + std::fmt::Debug + Send + 'static, T: Serialize + std::fmt::Debug + Send + 'static,
{ {
let ap_id = ap_id.to_string();
blocking(pool, move |conn| { blocking(pool, move |conn| {
do_insert_activity(conn, user_id, &data, local) Activity::insert(conn, ap_id, user_id, &activity, local)
}) })
.await??; .await??;
Ok(()) Ok(())

View file

@ -12,6 +12,7 @@ use std::{
#[table_name = "activity"] #[table_name = "activity"]
pub struct Activity { pub struct Activity {
pub id: i32, pub id: i32,
pub ap_id: String,
pub user_id: i32, pub user_id: i32,
pub data: Value, pub data: Value,
pub local: bool, pub local: bool,
@ -22,6 +23,7 @@ pub struct Activity {
#[derive(Insertable, AsChangeset)] #[derive(Insertable, AsChangeset)]
#[table_name = "activity"] #[table_name = "activity"]
pub struct ActivityForm { pub struct ActivityForm {
pub ap_id: String,
pub user_id: i32, pub user_id: i32,
pub data: Value, pub data: Value,
pub local: bool, pub local: bool,
@ -53,30 +55,39 @@ impl Crud<ActivityForm> for Activity {
} }
} }
pub fn do_insert_activity<T>( impl Activity {
conn: &PgConnection, pub fn insert<T>(
user_id: i32, conn: &PgConnection,
data: &T, ap_id: String,
local: bool, user_id: i32,
) -> Result<Activity, IoError> data: &T,
where local: bool,
T: Serialize + Debug, ) -> Result<Self, IoError>
{ where
debug!("inserting activity for user {}: ", user_id); T: Serialize + Debug,
debug!("{}", serde_json::to_string_pretty(&data)?); {
let activity_form = ActivityForm { debug!("inserting activity for user {}: ", user_id);
user_id, debug!("{}", serde_json::to_string_pretty(&data)?);
data: serde_json::to_value(&data)?, let activity_form = ActivityForm {
local, ap_id,
updated: None, user_id,
}; data: serde_json::to_value(&data)?,
let result = Activity::create(&conn, &activity_form); local,
match result { updated: None,
Ok(s) => Ok(s), };
Err(e) => Err(IoError::new( let result = Activity::create(&conn, &activity_form);
ErrorKind::Other, match result {
format!("Failed to insert activity into database: {}", e), Ok(s) => Ok(s),
)), Err(e) => Err(IoError::new(
ErrorKind::Other,
format!("Failed to insert activity into database: {}", e),
)),
}
}
pub fn read_from_apub_id(conn: &PgConnection, object_id: &str) -> Result<Self, Error> {
use crate::schema::activity::dsl::*;
activity.filter(ap_id.eq(object_id)).first::<Self>(conn)
} }
} }
@ -125,16 +136,24 @@ mod tests {
let inserted_creator = User_::create(&conn, &creator_form).unwrap(); let inserted_creator = User_::create(&conn, &creator_form).unwrap();
let ap_id =
"https://enterprise.lemmy.ml/activities/delete/f1b5d57c-80f8-4e03-a615-688d552e946c";
let test_json: Value = serde_json::from_str( let test_json: Value = serde_json::from_str(
r#"{ r#"{
"street": "Article Circle Expressway 1", "@context": "https://www.w3.org/ns/activitystreams",
"city": "North Pole", "id": "https://enterprise.lemmy.ml/activities/delete/f1b5d57c-80f8-4e03-a615-688d552e946c",
"postcode": "99705", "type": "Delete",
"state": "Alaska" "actor": "https://enterprise.lemmy.ml/u/riker",
}"#, "to": "https://www.w3.org/ns/activitystreams#Public",
"cc": [
"https://enterprise.lemmy.ml/c/main/"
],
"object": "https://enterprise.lemmy.ml/post/32"
}"#,
) )
.unwrap(); .unwrap();
let activity_form = ActivityForm { let activity_form = ActivityForm {
ap_id: ap_id.to_string(),
user_id: inserted_creator.id, user_id: inserted_creator.id,
data: test_json.to_owned(), data: test_json.to_owned(),
local: true, local: true,
@ -144,6 +163,7 @@ mod tests {
let inserted_activity = Activity::create(&conn, &activity_form).unwrap(); let inserted_activity = Activity::create(&conn, &activity_form).unwrap();
let expected_activity = Activity { let expected_activity = Activity {
ap_id: ap_id.to_string(),
id: inserted_activity.id, id: inserted_activity.id,
user_id: inserted_creator.id, user_id: inserted_creator.id,
data: test_json, data: test_json,
@ -153,9 +173,11 @@ mod tests {
}; };
let read_activity = Activity::read(&conn, inserted_activity.id).unwrap(); let read_activity = Activity::read(&conn, inserted_activity.id).unwrap();
let read_activity_by_apub_id = Activity::read_from_apub_id(&conn, ap_id).unwrap();
User_::delete(&conn, inserted_creator.id).unwrap(); User_::delete(&conn, inserted_creator.id).unwrap();
assert_eq!(expected_activity, read_activity); assert_eq!(expected_activity, read_activity);
assert_eq!(expected_activity, read_activity_by_apub_id);
assert_eq!(expected_activity, inserted_activity); assert_eq!(expected_activity, inserted_activity);
} }
} }

View file

@ -1,6 +1,7 @@
table! { table! {
activity (id) { activity (id) {
id -> Int4, id -> Int4,
ap_id -> Text,
user_id -> Int4, user_id -> Int4,
data -> Jsonb, data -> Jsonb,
local -> Bool, local -> Bool,

View file

@ -0,0 +1 @@
ALTER TABLE activity DROP COLUMN ap_id;

View file

@ -0,0 +1 @@
ALTER TABLE activity ADD COLUMN ap_id TEXT;

View file

@ -4,6 +4,7 @@ use lemmy_apub::{
http::{ http::{
comment::get_apub_comment, comment::get_apub_comment,
community::{get_apub_community_followers, get_apub_community_http, get_apub_community_outbox}, community::{get_apub_community_followers, get_apub_community_http, get_apub_community_outbox},
get_activity,
post::get_apub_post, post::get_apub_post,
user::get_apub_user_http, user::get_apub_user_http,
}, },
@ -42,7 +43,8 @@ pub fn config(cfg: &mut web::ServiceConfig) {
) )
.route("/u/{user_name}", web::get().to(get_apub_user_http)) .route("/u/{user_name}", web::get().to(get_apub_user_http))
.route("/post/{post_id}", web::get().to(get_apub_post)) .route("/post/{post_id}", web::get().to(get_apub_post))
.route("/comment/{comment_id}", web::get().to(get_apub_comment)), .route("/comment/{comment_id}", web::get().to(get_apub_comment))
.route("/activities/{type_}/{id}", web::get().to(get_activity)),
) )
// Inboxes dont work with the header guard for some reason. // Inboxes dont work with the header guard for some reason.
.service( .service(