""" getting and verifying signatures """ import time from collections import namedtuple from urllib.parse import urlsplit import pathlib from unittest.mock import patch import json import responses import pytest from django.test import TestCase, Client from django.utils.http import http_date from bookwyrm import models from bookwyrm.activitypub import Follow from bookwyrm.settings import DOMAIN, NETLOC from bookwyrm.signatures import create_key_pair, make_signature, make_digest def get_follow_activity(follower, followee): """generates a test activity""" return Follow( id="https://test.com/user/follow/id", actor=follower.remote_id, object=followee.remote_id, ).serialize() KeyPair = namedtuple("KeyPair", ("private_key", "public_key")) Sender = namedtuple("Sender", ("remote_id", "key_pair")) class Signature(TestCase): """signature test""" @classmethod def setUpTestData(cls): """create users and test data""" with ( patch("bookwyrm.suggested_users.rerank_suggestions_task.delay"), patch("bookwyrm.activitystreams.populate_stream_task.delay"), patch("bookwyrm.lists_stream.populate_lists_task.delay"), ): cls.mouse = models.User.objects.create_user( f"mouse@{DOMAIN}", "mouse@example.com", "", local=True, localname="mouse", ) cls.rat = models.User.objects.create_user( f"rat@{DOMAIN}", "rat@example.com", "", local=True, localname="rat" ) cls.cat = models.User.objects.create_user( f"cat@{DOMAIN}", "cat@example.com", "", local=True, localname="cat" ) models.SiteSettings.objects.create() def setUp(self): """test data""" private_key, public_key = create_key_pair() self.fake_remote = Sender( "http://localhost/user/remote", KeyPair(private_key, public_key) ) def send(self, signature, now, data, digest): """test request""" client = Client() return client.post( urlsplit(self.rat.inbox).path, data=data, content_type="application/json", **{ "HTTP_DATE": now, "HTTP_SIGNATURE": signature, "HTTP_DIGEST": digest, "HTTP_CONTENT_TYPE": "application/activity+json; charset=utf-8", "HTTP_HOST": NETLOC, }, ) def send_test_request( # pylint: disable=too-many-arguments self, sender, signer=None, send_data=None, digest=None, date=None ): """sends a follow request to the "rat" user""" now = date or http_date() data = json.dumps(get_follow_activity(sender, self.rat)) digest = digest or make_digest(data) signature = make_signature( "post", signer or sender, self.rat.inbox, now, digest=digest ) with ( patch("bookwyrm.views.inbox.activity_task.apply_async"), patch("bookwyrm.models.user.set_remote_server.delay"), ): return self.send(signature, now, send_data or data, digest) def test_correct_signature(self): """this one should just work""" with patch("bookwyrm.models.relationship.UserFollowRequest.accept"): response = self.send_test_request(sender=self.mouse) self.assertEqual(response.status_code, 200) def test_wrong_signature(self): """Messages must be signed by the right actor. (cat cannot sign messages on behalf of mouse)""" response = self.send_test_request(sender=self.mouse, signer=self.cat) self.assertEqual(response.status_code, 401) @responses.activate def test_remote_signer(self): """signatures for remote users""" datafile = pathlib.Path(__file__).parent.joinpath("data/ap_user.json") data = json.loads(datafile.read_bytes()) data["id"] = self.fake_remote.remote_id data["publicKey"]["id"] = f"{self.fake_remote.remote_id}/#main-key" data["publicKey"]["publicKeyPem"] = self.fake_remote.key_pair.public_key del data["icon"] # Avoid having to return an avatar. responses.add(responses.GET, self.fake_remote.remote_id, json=data, status=200) responses.add( responses.GET, "https://localhost/.well-known/nodeinfo", status=404 ) responses.add( responses.GET, "https://example.com/user/mouse/outbox?page=true", json={"orderedItems": []}, status=200, ) with patch("bookwyrm.models.user.get_remote_reviews.delay"): with patch( "bookwyrm.models.relationship.UserFollowRequest.accept" ) as accept_mock: response = self.send_test_request(sender=self.fake_remote) self.assertEqual(response.status_code, 200) self.assertTrue(accept_mock.called) @responses.activate def test_key_needs_refresh(self): """an out of date key should be updated and the new key work""" datafile = pathlib.Path(__file__).parent.joinpath("data/ap_user.json") data = json.loads(datafile.read_bytes()) data["id"] = self.fake_remote.remote_id data["publicKey"]["id"] = f"{self.fake_remote.remote_id}/#main-key" data["publicKey"]["publicKeyPem"] = self.fake_remote.key_pair.public_key del data["icon"] # Avoid having to return an avatar. responses.add(responses.GET, self.fake_remote.remote_id, json=data, status=200) responses.add( responses.GET, "https://localhost/.well-known/nodeinfo", status=404 ) # Second and subsequent fetches get a different key: key_pair = KeyPair(*create_key_pair()) new_sender = Sender(self.fake_remote.remote_id, key_pair) data["publicKey"]["publicKeyPem"] = key_pair.public_key responses.add(responses.GET, self.fake_remote.remote_id, json=data, status=200) with patch("bookwyrm.models.user.get_remote_reviews.delay"): # Key correct: with patch( "bookwyrm.models.relationship.UserFollowRequest.accept" ) as accept_mock: response = self.send_test_request(sender=self.fake_remote) self.assertEqual(response.status_code, 200) # BUG this is 401 self.assertTrue(accept_mock.called) # Old key is cached, so still works: with patch( "bookwyrm.models.relationship.UserFollowRequest.accept" ) as accept_mock: response = self.send_test_request(sender=self.fake_remote) self.assertEqual(response.status_code, 200) self.assertTrue(accept_mock.called) # Try with new key: with patch( "bookwyrm.models.relationship.UserFollowRequest.accept" ) as accept_mock: response = self.send_test_request(sender=new_sender) self.assertEqual(response.status_code, 200) self.assertTrue(accept_mock.called) # Now the old key will fail: response = self.send_test_request(sender=self.fake_remote) self.assertEqual(response.status_code, 401) @responses.activate def test_nonexistent_signer(self): """fail when unable to look up signer""" responses.add( responses.GET, self.fake_remote.remote_id, json={"error": "not found"}, status=404, ) response = self.send_test_request(sender=self.fake_remote) self.assertEqual(response.status_code, 401) @pytest.mark.integration def test_changed_data(self): """Message data must match the digest header.""" with patch("bookwyrm.activitypub.resolve_remote_id"): response = self.send_test_request( self.mouse, send_data=get_follow_activity(self.mouse, self.cat) ) self.assertEqual(response.status_code, 401) @pytest.mark.integration def test_invalid_digest(self): """signature digest must be valid""" with patch("bookwyrm.activitypub.resolve_remote_id"): response = self.send_test_request( self.mouse, digest="SHA-256=AAAAAAAAAAAAAAAAAA" ) self.assertEqual(response.status_code, 401) @pytest.mark.integration def test_old_message(self): """Old messages should be rejected to prevent replay attacks.""" with patch("bookwyrm.activitypub.resolve_remote_id"): response = self.send_test_request( self.mouse, date=http_date(time.time() - 301) ) self.assertEqual(response.status_code, 401)