Actually implement signing in the httpy client.

This commit is contained in:
Jamie Bliss 2023-12-12 20:20:55 +00:00
parent d4058faf55
commit 6ebc23e24b
No known key found for this signature in database
2 changed files with 130 additions and 12 deletions

View file

@ -8,15 +8,26 @@ The API is identical to httpx, but some features has been added:
(Because Y is next after X).
"""
import functools
import ipaddress
import typing
from types import EllipsisType
import httpx
from django.conf import settings
from httpx._types import TimeoutTypes
from .signatures import HttpSignature
class SigningActor(typing.Protocol):
"""
An AP Actor with keys, that can sign requests.
Both :class:`users.models.identity.Identity`, and
:class:`users.models.system_actor.SystemActor` implement this protocol.
"""
#: The private key used for signing, in PEM format
private_key: str
@ -28,7 +39,43 @@ class SigningActor(typing.Protocol):
public_key_id: str
class Client(httpx.Client):
class SignedAuth(httpx.Auth):
"""
Handles signing the request.
"""
# Doing it this way so we get automatic sync/async handling
requires_request_body = True
def __init__(self, actor: SigningActor):
self.actor = actor
def auth_flow(self, request: httpx.Request):
HttpSignature.sign_request(
request, self.actor.private_key, self.actor.public_key_id
)
yield request
@functools.lru_cache # Reuse transports
def _get_transport(
blocked_ranges: list[ipaddress.IPv4Network | ipaddress.IPv6Network | str]
| EllipsisType,
sync: bool,
):
"""
Gets an (Async)Transport that blocks the given IP ranges
"""
if blocked_ranges is ...:
blocked_ranges = settings.HTTP_BLOCKED_RANGES
blocked_ranges = [
ipaddress.ip_network(net) if isinstance(net, str) else net
for net in typing.cast(typing.Iterable, blocked_ranges)
]
class BaseClient(httpx.BaseClient):
def __init__(
self,
*,
@ -36,6 +83,7 @@ class Client(httpx.Client):
blocked_ranges: list[ipaddress.IPv4Network | ipaddress.IPv6Network | str]
| None
| EllipsisType = ...,
timeout: TimeoutTypes = settings.SETUP.REMOTE_TIMEOUT,
**opts
):
"""
@ -45,18 +93,32 @@ class Client(httpx.Client):
Networks, None to disable the feature, or Ellipsis to
pull the Django setting.
"""
super().__init__(**opts)
if actor:
opts["auth"] = SignedAuth(actor)
if blocked_ranges is ...:
blocked_ranges = settings.HTTP_BLOCKED_RANGES
if blocked_ranges is not None:
# TODO: Do we want to cache this?
blocked_ranges = [
ipaddress.ip_network(net) if isinstance(net, str) else net
for net in typing.cast(typing.Iterable, blocked_ranges)
]
super().__init__(timeout=timeout, **opts)
# TODO: If we're given blocked ranges, customize transport
self.actor = actor
def build_request(self, *pargs, **kwargs):
request = super().build_request(*pargs, **kwargs)
# GET requests get implicit accept headers added
if request.method == "GET" and "Accept" not in request.headers:
request.headers["Accept"] = "application/ld+json"
request.headers[
"User-Agent"
] = settings.TAKAHE_USER_AGENT # TODO: Move this to __init__
return request
# BaseClient before (Async)Client because __init__
class Client(BaseClient, httpx.Client):
pass
class AsyncClient(BaseClient, httpx.AsyncClient):
pass

View file

@ -185,6 +185,62 @@ class HttpSignature:
public_key,
)
@classmethod
def sign_request(
cls,
request: httpx.Request,
private_key: str,
key_id: str,
):
"""
Adds a signature to a Request.
"""
if not request.url.scheme:
raise ValueError("URI does not contain a scheme")
# Create the core header field set
date_string = http_date()
request.headers |= {
"(request-target)": f"{request.method} {request.url.path}",
"Host": request.url.host,
"Date": date_string,
}
# If we have a body, add a digest and content type
body_bytes = request.content
if body_bytes:
request.headers["Digest"] = cls.calculate_digest(body_bytes)
# Sign the headers
signing_headers = {
key for key in request.headers.keys() if key.lower() != "user-agent"
}
signed_string = "\n".join(
f"{name.lower()}: {value}"
for name, value in request.headers.items()
if name in signing_headers
)
private_key_instance: rsa.RSAPrivateKey = cast(
rsa.RSAPrivateKey,
serialization.load_pem_private_key(
private_key.encode("ascii"),
password=None,
),
)
signature = private_key_instance.sign(
signed_string.encode("utf8"),
padding.PKCS1v15(),
hashes.SHA256(),
)
request.headers["Signature"] = cls.compile_signature(
{
"keyid": key_id,
"headers": list(signing_headers),
"signature": signature,
"algorithm": "rsa-sha256",
}
)
del request.headers["(request-target)"]
@classmethod
def signed_request(
cls,