mirror of
https://github.com/LemmyNet/activitypub-federation-rust.git
synced 2024-06-08 16:29:33 +00:00
Limit all the methods, add reqwest shim that wraps around bytes_stream
This commit is contained in:
parent
3b0efe673c
commit
c6f208d895
3
Cargo.lock
generated
3
Cargo.lock
generated
|
@ -20,7 +20,7 @@ dependencies = [
|
|||
"dyn-clone",
|
||||
"enum_delegate",
|
||||
"env_logger",
|
||||
"futures-util",
|
||||
"futures-core",
|
||||
"http",
|
||||
"http-signature-normalization",
|
||||
"http-signature-normalization-reqwest",
|
||||
|
@ -29,6 +29,7 @@ dependencies = [
|
|||
"itertools",
|
||||
"once_cell",
|
||||
"openssl",
|
||||
"pin-project-lite",
|
||||
"rand",
|
||||
"reqwest",
|
||||
"reqwest-middleware",
|
||||
|
|
|
@ -34,7 +34,8 @@ http-signature-normalization-reqwest = { version = "0.7.1", default-features = f
|
|||
http-signature-normalization = "0.6.0"
|
||||
actix-rt = { version = "2.7.0" }
|
||||
bytes = "1.3.0"
|
||||
futures-util = { version = "0.3.25", default-features = false }
|
||||
futures-core = { version = "0.3.25", default-features = false }
|
||||
pin-project-lite = "0.2.9"
|
||||
|
||||
actix-web = { version = "4.2.1", default-features = false, optional = true }
|
||||
axum = { version = "0.6.0", features = ["json", "headers", "macros", "original-uri"], optional = true }
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
use crate::{
|
||||
core::signatures::{sign_request, PublicKey},
|
||||
traits::ActivityHandler,
|
||||
utils::reqwest_shim::ResponseExt,
|
||||
Error,
|
||||
InstanceSettings,
|
||||
LocalInstance,
|
||||
|
@ -167,7 +168,8 @@ async fn do_send(
|
|||
}
|
||||
Ok(o) => {
|
||||
let status = o.status();
|
||||
let text = o.text().await.map_err(Error::conv)?;
|
||||
// Limit the status text to 1KB.
|
||||
let text = o.text_limited(1024).await.map_err(Error::conv)?;
|
||||
Err(anyhow!(
|
||||
"Queueing activity {} to {} for retry after failure with status {}: {}",
|
||||
task.activity_id,
|
||||
|
|
18
src/utils.rs
18
src/utils.rs
|
@ -1,12 +1,12 @@
|
|||
use crate::{Error, LocalInstance, APUB_JSON_CONTENT_TYPE};
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use futures_util::stream::StreamExt;
|
||||
use crate::{utils::reqwest_shim::ResponseExt, Error, LocalInstance, APUB_JSON_CONTENT_TYPE};
|
||||
use http::{header::HeaderName, HeaderValue, StatusCode};
|
||||
use serde::de::DeserializeOwned;
|
||||
use std::collections::BTreeMap;
|
||||
use tracing::info;
|
||||
use url::Url;
|
||||
|
||||
pub(crate) mod reqwest_shim;
|
||||
|
||||
pub async fn fetch_object_http<Kind: DeserializeOwned>(
|
||||
url: &Url,
|
||||
instance: &LocalInstance,
|
||||
|
@ -35,17 +35,7 @@ pub async fn fetch_object_http<Kind: DeserializeOwned>(
|
|||
return Err(Error::ObjectDeleted);
|
||||
}
|
||||
|
||||
let mut bytes_stream = res.bytes_stream();
|
||||
let mut body = BytesMut::new();
|
||||
while let Some(chunk) = bytes_stream.next().await.transpose().map_err(Error::conv)? {
|
||||
body.put(chunk);
|
||||
|
||||
if body.len() > instance.settings.response_body_size {
|
||||
return Err(Error::ResponseBodyLimit);
|
||||
}
|
||||
}
|
||||
|
||||
serde_json::from_slice(&body).map_err(Error::conv)
|
||||
res.json_limited(instance.settings.response_body_size).await
|
||||
}
|
||||
|
||||
/// Check that both urls have the same domain. If not, return UrlVerificationError.
|
||||
|
|
123
src/utils/reqwest_shim.rs
Normal file
123
src/utils/reqwest_shim.rs
Normal file
|
@ -0,0 +1,123 @@
|
|||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use futures_core::{ready, stream::BoxStream, Stream};
|
||||
use pin_project_lite::pin_project;
|
||||
use reqwest::Response;
|
||||
use serde::Deserialize;
|
||||
use std::{
|
||||
future::Future,
|
||||
marker::PhantomData,
|
||||
mem,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use crate::Error;
|
||||
|
||||
pin_project! {
|
||||
pub struct BytesFuture {
|
||||
#[pin]
|
||||
stream: BoxStream<'static, reqwest::Result<Bytes>>,
|
||||
limit: usize,
|
||||
aggregator: BytesMut,
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for BytesFuture {
|
||||
type Output = Result<Bytes, Error>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
loop {
|
||||
let this = self.as_mut().project();
|
||||
if let Some(chunk) = ready!(this.stream.poll_next(cx))
|
||||
.transpose()
|
||||
.map_err(Error::conv)?
|
||||
{
|
||||
this.aggregator.put(chunk);
|
||||
if this.aggregator.len() > *this.limit {
|
||||
return Poll::Ready(Err(Error::ResponseBodyLimit));
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
Poll::Ready(Ok(mem::take(&mut self.aggregator).freeze()))
|
||||
}
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
pub struct JsonFuture<T> {
|
||||
_t: PhantomData<T>,
|
||||
#[pin]
|
||||
future: BytesFuture,
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Future for JsonFuture<T>
|
||||
where
|
||||
T: for<'de> Deserialize<'de>,
|
||||
{
|
||||
type Output = Result<T, Error>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
let bytes = ready!(this.future.poll(cx))?;
|
||||
Poll::Ready(serde_json::from_slice(&bytes).map_err(Error::conv))
|
||||
}
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
pub struct TextFuture {
|
||||
#[pin]
|
||||
future: BytesFuture,
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for TextFuture {
|
||||
type Output = Result<String, Error>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
let bytes = ready!(this.future.poll(cx))?;
|
||||
Poll::Ready(String::from_utf8(bytes.to_vec()).map_err(Error::conv))
|
||||
}
|
||||
}
|
||||
|
||||
pub trait ResponseExt {
|
||||
type BytesFuture;
|
||||
type JsonFuture<T>;
|
||||
type TextFuture;
|
||||
|
||||
fn bytes_limited(self, limit: usize) -> Self::BytesFuture;
|
||||
fn json_limited<T>(self, limit: usize) -> Self::JsonFuture<T>;
|
||||
fn text_limited(self, limit: usize) -> Self::TextFuture;
|
||||
}
|
||||
|
||||
impl ResponseExt for Response {
|
||||
type BytesFuture = BytesFuture;
|
||||
type JsonFuture<T> = JsonFuture<T>;
|
||||
type TextFuture = TextFuture;
|
||||
|
||||
fn bytes_limited(self, limit: usize) -> Self::BytesFuture {
|
||||
BytesFuture {
|
||||
stream: Box::pin(self.bytes_stream()),
|
||||
limit,
|
||||
aggregator: BytesMut::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn json_limited<T>(self, limit: usize) -> Self::JsonFuture<T> {
|
||||
JsonFuture {
|
||||
_t: PhantomData,
|
||||
future: self.bytes_limited(limit),
|
||||
}
|
||||
}
|
||||
|
||||
fn text_limited(self, limit: usize) -> Self::TextFuture {
|
||||
TextFuture {
|
||||
future: self.bytes_limited(limit),
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue