From b4cba4ed625ce7c88675616b3bbb237c28a926d1 Mon Sep 17 00:00:00 2001 From: Jon Prindiville Date: Wed, 26 Apr 2017 11:43:56 -0400 Subject: [PATCH] Fixed #28144 -- Added FileSystemStorage.OS_OPEN_FLAGS to allow customization. --- django/core/files/storage.py | 9 ++++----- tests/file_storage/tests.py | 39 ++++++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/django/core/files/storage.py b/django/core/files/storage.py index 30788d6d75..b0168186ef 100644 --- a/django/core/files/storage.py +++ b/django/core/files/storage.py @@ -168,6 +168,9 @@ class FileSystemStorage(Storage): """ Standard filesystem storage """ + # The combination of O_CREAT and O_EXCL makes os.open() raise OSError if + # the file already exists before it's opened. + OS_OPEN_FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL | getattr(os, 'O_BINARY', 0) def __init__(self, location=None, base_url=None, file_permissions_mode=None, directory_permissions_mode=None): @@ -256,12 +259,8 @@ class FileSystemStorage(Storage): # This is a normal uploadedfile that we can stream. else: - # This fun binary flag incantation makes os.open throw an - # OSError if the file already exists before we open it. - flags = (os.O_WRONLY | os.O_CREAT | os.O_EXCL | - getattr(os, 'O_BINARY', 0)) # The current umask value is masked out by os.open! - fd = os.open(full_path, flags, 0o666) + fd = os.open(full_path, self.OS_OPEN_FLAGS, 0o666) _file = None try: locks.lock(fd, locks.LOCK_EX) diff --git a/tests/file_storage/tests.py b/tests/file_storage/tests.py index 33dc699ab4..3a21ca9046 100644 --- a/tests/file_storage/tests.py +++ b/tests/file_storage/tests.py @@ -566,6 +566,45 @@ class CustomStorageTests(FileStorageTests): self.storage.delete(second) +class OverwritingStorage(FileSystemStorage): + """ + Overwrite existing files instead of appending a suffix to generate an + unused name. + """ + # Mask out O_EXCL so os.open() doesn't raise OSError if the file exists. + OS_OPEN_FLAGS = FileSystemStorage.OS_OPEN_FLAGS & ~os.O_EXCL + + def get_available_name(self, name, max_length=None): + """Override the effort to find an used name.""" + return name + + +class OverwritingStorageTests(FileStorageTests): + storage_class = OverwritingStorage + + def test_save_overwrite_behavior(self): + """Saving to same file name twice overwrites the first file.""" + name = 'test.file' + self.assertFalse(self.storage.exists(name)) + content_1 = b'content one' + content_2 = b'second content' + f_1 = ContentFile(content_1) + f_2 = ContentFile(content_2) + stored_name_1 = self.storage.save(name, f_1) + try: + self.assertEqual(stored_name_1, name) + self.assertTrue(self.storage.exists(name)) + self.assertTrue(os.path.exists(os.path.join(self.temp_dir, name))) + self.assertEqual(self.storage.open(name).read(), content_1) + stored_name_2 = self.storage.save(name, f_2) + self.assertEqual(stored_name_2, name) + self.assertTrue(self.storage.exists(name)) + self.assertTrue(os.path.exists(os.path.join(self.temp_dir, name))) + self.assertEqual(self.storage.open(name).read(), content_2) + finally: + self.storage.delete(name) + + class DiscardingFalseContentStorage(FileSystemStorage): def _save(self, name, content): if content: