Fix resource leaks

Rewrite places where files (or other resources) are opened but not closed to "with" blocks, which
automatically call close() at the end of the scope.

Also simplify some tests where images need to be saved to a model field: an opened file can be
passed directly to FileField.save().
This commit is contained in:
Bart Schuurmans 2024-03-29 15:45:35 +01:00
parent e7ae0fdf93
commit ffee29d8e2
19 changed files with 135 additions and 149 deletions

View file

@ -175,11 +175,13 @@ def generate_instance_layer(content_width):
site = models.SiteSettings.objects.get()
if site.logo_small:
logo_img = Image.open(site.logo_small)
with Image.open(site.logo_small) as logo_img:
logo_img.load()
else:
try:
static_path = os.path.join(settings.STATIC_ROOT, "images/logo-small.png")
logo_img = Image.open(static_path)
with Image.open(static_path) as logo_img:
logo_img.load()
except FileNotFoundError:
logo_img = None
@ -211,18 +213,9 @@ def generate_instance_layer(content_width):
def generate_rating_layer(rating, content_width):
"""Places components for rating preview"""
try:
icon_star_full = Image.open(
os.path.join(settings.STATIC_ROOT, "images/icons/star-full.png")
)
icon_star_empty = Image.open(
os.path.join(settings.STATIC_ROOT, "images/icons/star-empty.png")
)
icon_star_half = Image.open(
os.path.join(settings.STATIC_ROOT, "images/icons/star-half.png")
)
except FileNotFoundError:
return None
path_star_full = os.path.join(settings.STATIC_ROOT, "images/icons/star-full.png")
path_star_empty = os.path.join(settings.STATIC_ROOT, "images/icons/star-empty.png")
path_star_half = os.path.join(settings.STATIC_ROOT, "images/icons/star-half.png")
icon_size = 64
icon_margin = 10
@ -237,17 +230,23 @@ def generate_rating_layer(rating, content_width):
position_x = 0
try:
with Image.open(path_star_full) as icon_star_full:
for _ in range(math.floor(rating)):
rating_layer_mask.alpha_composite(icon_star_full, (position_x, 0))
position_x = position_x + icon_size + icon_margin
if math.floor(rating) != math.ceil(rating):
with Image.open(path_star_half) as icon_star_half:
rating_layer_mask.alpha_composite(icon_star_half, (position_x, 0))
position_x = position_x + icon_size + icon_margin
with Image.open(path_star_empty) as icon_star_empty:
for _ in range(5 - math.ceil(rating)):
rating_layer_mask.alpha_composite(icon_star_empty, (position_x, 0))
position_x = position_x + icon_size + icon_margin
except FileNotFoundError:
return None
rating_layer_mask = rating_layer_mask.getchannel("A")
rating_layer_mask = ImageOps.invert(rating_layer_mask)
@ -290,7 +289,8 @@ def generate_preview_image(
texts = texts or {}
# Cover
try:
inner_img_layer = Image.open(picture)
with Image.open(picture) as inner_img_layer:
inner_img_layer.load()
inner_img_layer.thumbnail(
(inner_img_width, inner_img_height), Image.Resampling.LANCZOS
)

View file

@ -19,7 +19,6 @@ DOMAIN = env("DOMAIN")
with open("VERSION", encoding="utf-8") as f:
version = f.read()
version = version.replace("\n", "")
f.close()
VERSION = version

View file

@ -1,12 +1,10 @@
""" tests the base functionality for activitypub dataclasses """
from io import BytesIO
import json
import pathlib
from unittest.mock import patch
from dataclasses import dataclass
from django.test import TestCase
from PIL import Image
import responses
from bookwyrm import activitypub
@ -48,13 +46,11 @@ class BaseActivity(TestCase):
# don't try to load the user icon
del self.userdata["icon"]
image_file = pathlib.Path(__file__).parent.joinpath(
image_path = pathlib.Path(__file__).parent.joinpath(
"../../static/images/default_avi.jpg"
)
image = Image.open(image_file)
output = BytesIO()
image.save(output, format=image.format)
self.image_data = output.getvalue()
with open(image_path, "rb") as image_file:
self.image_data = image_file.read()
def test_get_representative_not_existing(self, *_):
"""test that an instance representative actor is created if it does not exist"""

View file

@ -9,7 +9,6 @@ from bookwyrm.importers import CalibreImporter
from bookwyrm.models.import_job import handle_imported_book
# pylint: disable=consider-using-with
@patch("bookwyrm.suggested_users.rerank_suggestions_task.delay")
@patch("bookwyrm.activitystreams.populate_stream_task.delay")
@patch("bookwyrm.activitystreams.add_book_statuses_task.delay")
@ -20,8 +19,13 @@ class CalibreImport(TestCase):
"""use a test csv"""
self.importer = CalibreImporter()
datafile = pathlib.Path(__file__).parent.joinpath("../data/calibre.csv")
# pylint: disable-next=consider-using-with
self.csv = open(datafile, "r", encoding=self.importer.encoding)
def tearDown(self):
"""close test csv"""
self.csv.close()
@classmethod
def setUpTestData(cls):
"""populate database"""

View file

@ -16,7 +16,6 @@ def make_date(*args):
return datetime.datetime(*args, tzinfo=pytz.UTC)
# pylint: disable=consider-using-with
@patch("bookwyrm.suggested_users.rerank_suggestions_task.delay")
@patch("bookwyrm.activitystreams.populate_stream_task.delay")
@patch("bookwyrm.activitystreams.add_book_statuses_task.delay")
@ -27,8 +26,13 @@ class GoodreadsImport(TestCase):
"""use a test csv"""
self.importer = GoodreadsImporter()
datafile = pathlib.Path(__file__).parent.joinpath("../data/goodreads.csv")
# pylint: disable-next=consider-using-with
self.csv = open(datafile, "r", encoding=self.importer.encoding)
def tearDown(self):
"""close test csv"""
self.csv.close()
@classmethod
def setUpTestData(cls):
"""populate database"""

View file

@ -19,7 +19,6 @@ def make_date(*args):
return datetime.datetime(*args, tzinfo=pytz.UTC)
# pylint: disable=consider-using-with
@patch("bookwyrm.suggested_users.rerank_suggestions_task.delay")
@patch("bookwyrm.activitystreams.populate_stream_task.delay")
@patch("bookwyrm.activitystreams.add_book_statuses_task.delay")
@ -30,8 +29,13 @@ class GenericImporter(TestCase):
"""use a test csv"""
self.importer = Importer()
datafile = pathlib.Path(__file__).parent.joinpath("../data/generic.csv")
# pylint: disable-next=consider-using-with
self.csv = open(datafile, "r", encoding=self.importer.encoding)
def tearDown(self):
"""close test csv"""
self.csv.close()
@classmethod
def setUpTestData(cls):
"""populate database"""

View file

@ -16,7 +16,6 @@ def make_date(*args):
return datetime.datetime(*args, tzinfo=pytz.UTC)
# pylint: disable=consider-using-with
@patch("bookwyrm.suggested_users.rerank_suggestions_task.delay")
@patch("bookwyrm.activitystreams.populate_stream_task.delay")
@patch("bookwyrm.activitystreams.add_book_statuses_task.delay")
@ -29,8 +28,13 @@ class LibrarythingImport(TestCase):
datafile = pathlib.Path(__file__).parent.joinpath("../data/librarything.tsv")
# Librarything generates latin encoded exports...
# pylint: disable-next=consider-using-with
self.csv = open(datafile, "r", encoding=self.importer.encoding)
def tearDown(self):
"""close test csv"""
self.csv.close()
@classmethod
def setUpTestData(cls):
"""populate database"""

View file

@ -16,7 +16,6 @@ def make_date(*args):
return datetime.datetime(*args, tzinfo=pytz.UTC)
# pylint: disable=consider-using-with
@patch("bookwyrm.suggested_users.rerank_suggestions_task.delay")
@patch("bookwyrm.activitystreams.populate_stream_task.delay")
@patch("bookwyrm.activitystreams.add_book_statuses_task.delay")
@ -27,8 +26,13 @@ class OpenLibraryImport(TestCase):
"""use a test csv"""
self.importer = OpenLibraryImporter()
datafile = pathlib.Path(__file__).parent.joinpath("../data/openlibrary.csv")
# pylint: disable-next=consider-using-with
self.csv = open(datafile, "r", encoding=self.importer.encoding)
def tearDown(self):
"""close test csv"""
self.csv.close()
@classmethod
def setUpTestData(cls):
"""populate database"""

View file

@ -16,7 +16,6 @@ def make_date(*args):
return datetime.datetime(*args, tzinfo=pytz.UTC)
# pylint: disable=consider-using-with
@patch("bookwyrm.suggested_users.rerank_suggestions_task.delay")
@patch("bookwyrm.activitystreams.populate_stream_task.delay")
@patch("bookwyrm.activitystreams.add_book_statuses_task.delay")
@ -27,8 +26,13 @@ class StorygraphImport(TestCase):
"""use a test csv"""
self.importer = StorygraphImporter()
datafile = pathlib.Path(__file__).parent.joinpath("../data/storygraph.csv")
# pylint: disable-next=consider-using-with
self.csv = open(datafile, "r", encoding=self.importer.encoding)
def tearDown(self):
"""close test csv"""
self.csv.close()
@classmethod
def setUpTestData(cls):
"""populate database"""

View file

@ -1,12 +1,9 @@
""" testing models """
from io import BytesIO
import pathlib
import pytest
from dateutil.parser import parse
from PIL import Image
from django.core.files.base import ContentFile
from django.test import TestCase
from django.utils import timezone
@ -130,15 +127,13 @@ class Book(TestCase):
)
def test_thumbnail_fields(self):
"""Just hit them"""
image_file = pathlib.Path(__file__).parent.joinpath(
image_path = pathlib.Path(__file__).parent.joinpath(
"../../static/images/default_avi.jpg"
)
image = Image.open(image_file)
output = BytesIO()
image.save(output, format=image.format)
book = models.Edition.objects.create(title="hello")
book.cover.save("test.jpg", ContentFile(output.getvalue()))
with open(image_path, "rb") as image_file:
book.cover.save("test.jpg", image_file)
self.assertIsNotNone(book.cover_bw_book_xsmall_webp.url)
self.assertIsNotNone(book.cover_bw_book_xsmall_jpg.url)

View file

@ -1,5 +1,4 @@
""" testing models """
from io import BytesIO
from collections import namedtuple
from dataclasses import dataclass
import datetime
@ -10,7 +9,6 @@ from typing import List
from unittest import expectedFailure
from unittest.mock import patch
from PIL import Image
import responses
from django.core.exceptions import ValidationError
@ -420,13 +418,11 @@ class ModelFields(TestCase):
user = User.objects.create_user(
"mouse", "mouse@mouse.mouse", "mouseword", local=True, localname="mouse"
)
image_file = pathlib.Path(__file__).parent.joinpath(
image_path = pathlib.Path(__file__).parent.joinpath(
"../../static/images/default_avi.jpg"
)
image = Image.open(image_file)
output = BytesIO()
image.save(output, format=image.format)
user.avatar.save("test.jpg", ContentFile(output.getvalue()))
with open(image_path, "rb") as image_file:
user.avatar.save("test.jpg", image_file)
instance = fields.ImageField()
@ -516,30 +512,25 @@ class ModelFields(TestCase):
@responses.activate
def test_image_field_set_field_from_activity_no_overwrite_with_cover(self, *_):
"""update a model instance from an activitypub object"""
image_file = pathlib.Path(__file__).parent.joinpath(
image_path = pathlib.Path(__file__).parent.joinpath(
"../../static/images/default_avi.jpg"
)
image = Image.open(image_file)
output = BytesIO()
image.save(output, format=image.format)
another_image_file = pathlib.Path(__file__).parent.joinpath(
another_image_path = pathlib.Path(__file__).parent.joinpath(
"../../static/images/logo.png"
)
another_image = Image.open(another_image_file)
another_output = BytesIO()
another_image.save(another_output, format=another_image.format)
instance = fields.ImageField(activitypub_field="cover", name="cover")
with open(another_image_path, "rb") as another_image_file:
responses.add(
responses.GET,
"http://www.example.com/image.jpg",
body=another_image.tobytes(),
body=another_image_file.read(),
status=200,
)
book = Edition.objects.create(title="hello")
book.cover.save("test.jpg", ContentFile(output.getvalue()))
with open(image_path, "rb") as image_file:
book.cover.save("test.jpg", image_file)
cover_size = book.cover.size
self.assertIsNotNone(cover_size)
@ -553,24 +544,22 @@ class ModelFields(TestCase):
@responses.activate
def test_image_field_set_field_from_activity_with_overwrite_with_cover(self, *_):
"""update a model instance from an activitypub object"""
image_file = pathlib.Path(__file__).parent.joinpath(
image_path = pathlib.Path(__file__).parent.joinpath(
"../../static/images/default_avi.jpg"
)
image = Image.open(image_file)
output = BytesIO()
image.save(output, format=image.format)
book = Edition.objects.create(title="hello")
book.cover.save("test.jpg", ContentFile(output.getvalue()))
with open(image_path, "rb") as image_file:
book.cover.save("test.jpg", image_file)
cover_size = book.cover.size
self.assertIsNotNone(cover_size)
another_image_file = pathlib.Path(__file__).parent.joinpath(
another_image_path = pathlib.Path(__file__).parent.joinpath(
"../../static/images/logo.png"
)
instance = fields.ImageField(activitypub_field="cover", name="cover")
with open(another_image_file, "rb") as another_image:
with open(another_image_path, "rb") as another_image:
responses.add(
responses.GET,
"http://www.example.com/image.jpg",

View file

@ -1,16 +1,13 @@
""" testing models """
from unittest.mock import patch
from io import BytesIO
import pathlib
import re
from django.http import Http404
from django.core.files.base import ContentFile
from django.db import IntegrityError
from django.contrib.auth.models import AnonymousUser
from django.test import TestCase
from django.utils import timezone
from PIL import Image
import responses
from bookwyrm import activitypub, models, settings
@ -51,14 +48,14 @@ class Status(TestCase):
"""individual test setup"""
self.anonymous_user = AnonymousUser
self.anonymous_user.is_authenticated = False
image_file = pathlib.Path(__file__).parent.joinpath(
image_path = pathlib.Path(__file__).parent.joinpath(
"../../static/images/default_avi.jpg"
)
image = Image.open(image_file)
output = BytesIO()
with patch("bookwyrm.models.Status.broadcast"):
image.save(output, format=image.format)
self.book.cover.save("test.jpg", ContentFile(output.getvalue()))
with (
patch("bookwyrm.models.Status.broadcast"),
open(image_path, "rb") as image_file,
):
self.book.cover.save("test.jpg", image_file)
def test_status_generated_fields(self, *_):
"""setting remote id"""

View file

@ -21,20 +21,20 @@ from bookwyrm.preview_images import (
# pylint: disable=unused-argument
# pylint: disable=missing-function-docstring
# pylint: disable=consider-using-with
class PreviewImages(TestCase):
"""every response to a get request, html or json"""
def setUp(self):
"""we need basic test data and mocks"""
self.factory = RequestFactory()
avatar_file = pathlib.Path(__file__).parent.joinpath(
avatar_path = pathlib.Path(__file__).parent.joinpath(
"../static/images/no_cover.jpg"
)
with (
patch("bookwyrm.suggested_users.rerank_suggestions_task.delay"),
patch("bookwyrm.activitystreams.populate_stream_task.delay"),
patch("bookwyrm.lists_stream.populate_lists_task.delay"),
open(avatar_path, "rb") as avatar_file,
):
self.local_user = models.User.objects.create_user(
"possum@local.com",
@ -43,8 +43,8 @@ class PreviewImages(TestCase):
local=True,
localname="possum",
avatar=SimpleUploadedFile(
avatar_file,
open(avatar_file, "rb").read(),
avatar_path,
avatar_file.read(),
content_type="image/jpeg",
),
)
@ -68,6 +68,7 @@ class PreviewImages(TestCase):
patch("bookwyrm.suggested_users.rerank_suggestions_task.delay"),
patch("bookwyrm.activitystreams.populate_stream_task.delay"),
patch("bookwyrm.lists_stream.populate_lists_task.delay"),
open(avatar_path, "rb") as avatar_file,
):
self.remote_user_with_preview = models.User.objects.create_user(
"badger@your.domain.here",
@ -78,8 +79,8 @@ class PreviewImages(TestCase):
inbox="https://example.com/users/badger/inbox",
outbox="https://example.com/users/badger/outbox",
avatar=SimpleUploadedFile(
avatar_file,
open(avatar_file, "rb").read(),
avatar_path,
avatar_file.read(),
content_type="image/jpeg",
),
)
@ -96,7 +97,7 @@ class PreviewImages(TestCase):
settings.ENABLE_PREVIEW_IMAGES = True
def test_generate_preview_image(self, *args, **kwargs):
image_file = pathlib.Path(__file__).parent.joinpath(
image_path = pathlib.Path(__file__).parent.joinpath(
"../static/images/no_cover.jpg"
)
@ -105,7 +106,7 @@ class PreviewImages(TestCase):
"text_three": "@possum@local.com",
}
result = generate_preview_image(texts=texts, picture=image_file, rating=5)
result = generate_preview_image(texts=texts, picture=image_path, rating=5)
self.assertIsInstance(result, Image.Image)
self.assertEqual(
result.size, (settings.PREVIEW_IMG_WIDTH, settings.PREVIEW_IMG_HEIGHT)

View file

@ -1,5 +1,4 @@
""" test for app action functionality """
import os
import json
from unittest.mock import patch
@ -179,7 +178,6 @@ class FederationViews(TestCase):
self.assertEqual(server.application_type, "coolsoft")
self.assertEqual(server.status, "blocked")
# pylint: disable=consider-using-with
def test_import_blocklist(self):
"""load a json file with a list of servers to block"""
server = models.FederatedServer.objects.create(server_name="hi.there.com")
@ -191,14 +189,13 @@ class FederationViews(TestCase):
{"instance": "hi.there.com", "url": "https://explanation.url"}, # existing
{"a": "b"}, # invalid
]
json.dump(data, open("file.json", "w")) # pylint: disable=unspecified-encoding
view = views.ImportServerBlocklist.as_view()
request = self.factory.post(
"",
{
"json_file": SimpleUploadedFile(
"file.json", open("file.json", "rb").read()
"file.json", json.dumps(data).encode("utf-8")
)
},
)
@ -214,6 +211,3 @@ class FederationViews(TestCase):
created = models.FederatedServer.objects.get(server_name="server.name")
self.assertEqual(created.status, "blocked")
self.assertEqual(created.notes, "https://explanation.url")
# remove file.json after test
os.remove("file.json")

View file

@ -1,8 +1,6 @@
""" test for app action functionality """
from io import BytesIO
import pathlib
from unittest.mock import patch
from PIL import Image
import responses
@ -161,14 +159,14 @@ class BookViews(TestCase):
def test_upload_cover_file(self):
"""add a cover via file upload"""
self.assertFalse(self.book.cover)
image_file = pathlib.Path(__file__).parent.joinpath(
image_path = pathlib.Path(__file__).parent.joinpath(
"../../../static/images/default_avi.jpg"
)
form = forms.CoverForm(instance=self.book)
# pylint: disable=consider-using-with
with open(image_path, "rb") as image_file:
form.data["cover"] = SimpleUploadedFile(
image_file, open(image_file, "rb").read(), content_type="image/jpeg"
image_path, image_file.read(), content_type="image/jpeg"
)
request = self.factory.post("", form.data)
@ -296,16 +294,14 @@ class BookViews(TestCase):
def _setup_cover_url():
"""creates cover url mock"""
cover_url = "http://example.com"
image_file = pathlib.Path(__file__).parent.joinpath(
image_path = pathlib.Path(__file__).parent.joinpath(
"../../../static/images/default_avi.jpg"
)
image = Image.open(image_file)
output = BytesIO()
image.save(output, format=image.format)
with open(image_path, "rb") as image_file:
responses.add(
responses.GET,
cover_url,
body=output.getvalue(),
body=image_file.read(),
status=200,
)
return cover_url

View file

@ -81,11 +81,11 @@ class ImportViews(TestCase):
form.data["source"] = "Goodreads"
form.data["privacy"] = "public"
form.data["include_reviews"] = False
csv_file = pathlib.Path(__file__).parent.joinpath("../../data/goodreads.csv")
csv_path = pathlib.Path(__file__).parent.joinpath("../../data/goodreads.csv")
with open(csv_path, "rb") as csv_file:
form.data["csv_file"] = SimpleUploadedFile(
# pylint: disable=consider-using-with
csv_file,
open(csv_file, "rb").read(),
csv_path,
csv_file.read(),
content_type="text/csv",
)

View file

@ -47,14 +47,14 @@ class ImportUserViews(TestCase):
view = views.UserImport.as_view()
form = forms.ImportUserForm()
archive_file = pathlib.Path(__file__).parent.joinpath(
archive_path = pathlib.Path(__file__).parent.joinpath(
"../../data/bookwyrm_account_export.tar.gz"
)
with open(archive_path, "rb") as archive_file:
form.data["archive_file"] = SimpleUploadedFile(
# pylint: disable=consider-using-with
archive_file,
open(archive_file, "rb").read(),
archive_path,
archive_file.read(),
content_type="application/gzip",
)

View file

@ -96,12 +96,12 @@ class EditUserViews(TestCase):
form.data["email"] = "wow@email.com"
form.data["default_post_privacy"] = "public"
form.data["preferred_timezone"] = "UTC"
image_file = pathlib.Path(__file__).parent.joinpath(
image_path = pathlib.Path(__file__).parent.joinpath(
"../../../static/images/no_cover.jpg"
)
# pylint: disable=consider-using-with
with open(image_path, "rb") as image_file:
form.data["avatar"] = SimpleUploadedFile(
image_file, open(image_file, "rb").read(), content_type="image/jpeg"
image_path, image_file.read(), content_type="image/jpeg"
)
request = self.factory.post("", form.data)
request.user = self.local_user
@ -119,12 +119,12 @@ class EditUserViews(TestCase):
def test_crop_avatar(self, _):
"""reduce that image size"""
image_file = pathlib.Path(__file__).parent.joinpath(
image_path = pathlib.Path(__file__).parent.joinpath(
"../../../static/images/no_cover.jpg"
)
image = Image.open(image_file)
with Image.open(image_path) as image:
result = views.preferences.edit_user.crop_avatar(image)
self.assertIsInstance(result, ContentFile)
image_result = Image.open(result)
with Image.open(result) as image_result:
self.assertEqual(image_result.size, (120, 120))

View file

@ -1,10 +1,7 @@
""" test for app action functionality """
from io import BytesIO
from unittest.mock import patch
import pathlib
from PIL import Image
from django.core.files.base import ContentFile
from django.http import Http404
from django.template.response import TemplateResponse
from django.test import TestCase
@ -142,12 +139,9 @@ class FeedViews(TestCase):
"""there are so many views, this just makes sure it LOADS"""
view = views.Status.as_view()
image_file = pathlib.Path(__file__).parent.joinpath(
image_path = pathlib.Path(__file__).parent.joinpath(
"../../static/images/default_avi.jpg"
)
image = Image.open(image_file)
output = BytesIO()
image.save(output, format=image.format)
with patch("bookwyrm.models.activitypub_mixin.broadcast_task.apply_async"):
status = models.Review.objects.create(
content="hi",
@ -157,7 +151,8 @@ class FeedViews(TestCase):
attachment = models.Image.objects.create(
status=status, caption="alt text here"
)
attachment.image.save("test.jpg", ContentFile(output.getvalue()))
with open(image_path, "rb") as image_file:
attachment.image.save("test.jpg", image_file)
request = self.factory.get("")
request.user = self.local_user