mirror of
				https://github.com/django/django.git
				synced 2025-10-31 09:41:08 +00:00 
			
		
		
		
	Fixed #34110 -- Added in-memory file storage.
Thanks Paolo Melchiorre, Carlton Gibson, and Mariusz Felisiak for reviews.
This commit is contained in:
		
				
					committed by
					
						 Mariusz Felisiak
						Mariusz Felisiak
					
				
			
			
				
	
			
			
			
						parent
						
							04fdf71933
						
					
				
				
					commit
					72efd840a8
				
			| @@ -4,9 +4,11 @@ from django.utils.module_loading import import_string | ||||
|  | ||||
| from .base import Storage | ||||
| from .filesystem import FileSystemStorage | ||||
| from .memory import InMemoryStorage | ||||
|  | ||||
| __all__ = ( | ||||
|     "FileSystemStorage", | ||||
|     "InMemoryStorage", | ||||
|     "Storage", | ||||
|     "DefaultStorage", | ||||
|     "default_storage", | ||||
|   | ||||
							
								
								
									
										290
									
								
								django/core/files/storage/memory.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										290
									
								
								django/core/files/storage/memory.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,290 @@ | ||||
| """ | ||||
| Based on dj-inmemorystorage (BSD) by Cody Soyland, Seán Hayes, Tore Birkeland, | ||||
| and Nick Presta. | ||||
| """ | ||||
|  | ||||
| import errno | ||||
| import io | ||||
| import os | ||||
| import pathlib | ||||
| from urllib.parse import urljoin | ||||
|  | ||||
| from django.conf import settings | ||||
| from django.core.files.base import ContentFile | ||||
| from django.core.signals import setting_changed | ||||
| from django.utils._os import safe_join | ||||
| from django.utils.deconstruct import deconstructible | ||||
| from django.utils.encoding import filepath_to_uri | ||||
| from django.utils.functional import cached_property | ||||
| from django.utils.timezone import now | ||||
|  | ||||
| from .base import Storage | ||||
| from .mixins import StorageSettingsMixin | ||||
|  | ||||
| __all__ = ("InMemoryStorage",) | ||||
|  | ||||
|  | ||||
| class TimingMixin: | ||||
|     def _initialize_times(self): | ||||
|         self.created_time = now() | ||||
|         self.accessed_time = self.created_time | ||||
|         self.modified_time = self.created_time | ||||
|  | ||||
|     def _update_accessed_time(self): | ||||
|         self.accessed_time = now() | ||||
|  | ||||
|     def _update_modified_time(self): | ||||
|         self.modified_time = now() | ||||
|  | ||||
|  | ||||
| class InMemoryFileNode(ContentFile, TimingMixin): | ||||
|     """ | ||||
|     Helper class representing an in-memory file node. | ||||
|  | ||||
|     Handle unicode/bytes conversion during I/O operations and record creation, | ||||
|     modification, and access times. | ||||
|     """ | ||||
|  | ||||
|     def __init__(self, content="", name=""): | ||||
|         self.file = None | ||||
|         self._content_type = type(content) | ||||
|         self._initialize_stream() | ||||
|         self._initialize_times() | ||||
|  | ||||
|     def open(self, mode): | ||||
|         self._convert_stream_content(mode) | ||||
|         self._update_accessed_time() | ||||
|         return super().open(mode) | ||||
|  | ||||
|     def write(self, data): | ||||
|         super().write(data) | ||||
|         self._update_modified_time() | ||||
|  | ||||
|     def _initialize_stream(self): | ||||
|         """Initialize underlying stream according to the content type.""" | ||||
|         self.file = io.BytesIO() if self._content_type == bytes else io.StringIO() | ||||
|  | ||||
|     def _convert_stream_content(self, mode): | ||||
|         """Convert actual file content according to the opening mode.""" | ||||
|         new_content_type = bytes if "b" in mode else str | ||||
|         # No conversion needed. | ||||
|         if self._content_type == new_content_type: | ||||
|             return | ||||
|  | ||||
|         content = self.file.getvalue() | ||||
|         content = content.encode() if isinstance(content, str) else content.decode() | ||||
|         self._content_type = new_content_type | ||||
|         self._initialize_stream() | ||||
|  | ||||
|         self.file.write(content) | ||||
|  | ||||
|  | ||||
| class InMemoryDirNode(TimingMixin): | ||||
|     """ | ||||
|     Helper class representing an in-memory directory node. | ||||
|  | ||||
|     Handle path navigation of directory trees, creating missing nodes if | ||||
|     needed. | ||||
|     """ | ||||
|  | ||||
|     def __init__(self): | ||||
|         self._children = {} | ||||
|         self._initialize_times() | ||||
|  | ||||
|     def resolve(self, path, create_if_missing=False, leaf_cls=None, check_exists=True): | ||||
|         """ | ||||
|         Navigate current directory tree, returning node matching path or | ||||
|         creating a new one, if missing. | ||||
|         - path: path of the node to search | ||||
|         - create_if_missing: create nodes if not exist. Defaults to False. | ||||
|         - leaf_cls: expected type of leaf node. Defaults to None. | ||||
|         - check_exists: if True and the leaf node does not exist, raise a | ||||
|           FileNotFoundError. Defaults to True. | ||||
|         """ | ||||
|         path_segments = list(pathlib.Path(path).parts) | ||||
|         current_node = self | ||||
|  | ||||
|         while path_segments: | ||||
|             path_segment = path_segments.pop(0) | ||||
|             # If current node is a file node and there are unprocessed | ||||
|             # segments, raise an error. | ||||
|             if isinstance(current_node, InMemoryFileNode): | ||||
|                 path_segments = os.path.split(path) | ||||
|                 current_path = "/".join( | ||||
|                     path_segments[: path_segments.index(path_segment)] | ||||
|                 ) | ||||
|                 raise NotADirectoryError( | ||||
|                     errno.ENOTDIR, os.strerror(errno.ENOTDIR), current_path | ||||
|                 ) | ||||
|             current_node = current_node._resolve_child( | ||||
|                 path_segment, | ||||
|                 create_if_missing, | ||||
|                 leaf_cls if len(path_segments) == 0 else InMemoryDirNode, | ||||
|             ) | ||||
|             if current_node is None: | ||||
|                 break | ||||
|  | ||||
|         if current_node is None and check_exists: | ||||
|             raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), path) | ||||
|  | ||||
|         # If a leaf_cls is not None, check if leaf node is of right type. | ||||
|         if leaf_cls and not isinstance(current_node, leaf_cls): | ||||
|             error_cls, error_code = ( | ||||
|                 (NotADirectoryError, errno.ENOTDIR) | ||||
|                 if leaf_cls is InMemoryDirNode | ||||
|                 else (IsADirectoryError, errno.EISDIR) | ||||
|             ) | ||||
|             raise error_cls(error_code, os.strerror(error_code), path) | ||||
|  | ||||
|         return current_node | ||||
|  | ||||
|     def _resolve_child(self, path_segment, create_if_missing, child_cls): | ||||
|         if create_if_missing: | ||||
|             self._update_accessed_time() | ||||
|             self._update_modified_time() | ||||
|             return self._children.setdefault(path_segment, child_cls()) | ||||
|         return self._children.get(path_segment) | ||||
|  | ||||
|     def listdir(self): | ||||
|         directories, files = [], [] | ||||
|         for name, entry in self._children.items(): | ||||
|             if isinstance(entry, InMemoryDirNode): | ||||
|                 directories.append(name) | ||||
|             else: | ||||
|                 files.append(name) | ||||
|         return directories, files | ||||
|  | ||||
|     def remove_child(self, name): | ||||
|         if name in self._children: | ||||
|             self._update_accessed_time() | ||||
|             self._update_modified_time() | ||||
|             del self._children[name] | ||||
|  | ||||
|  | ||||
| @deconstructible(path="django.core.files.storage.InMemoryStorage") | ||||
| class InMemoryStorage(Storage, StorageSettingsMixin): | ||||
|     """A storage saving files in memory.""" | ||||
|  | ||||
|     def __init__( | ||||
|         self, | ||||
|         location=None, | ||||
|         base_url=None, | ||||
|         file_permissions_mode=None, | ||||
|         directory_permissions_mode=None, | ||||
|     ): | ||||
|         self._location = location | ||||
|         self._base_url = base_url | ||||
|         self._file_permissions_mode = file_permissions_mode | ||||
|         self._directory_permissions_mode = directory_permissions_mode | ||||
|         self._root = InMemoryDirNode() | ||||
|         self._resolve( | ||||
|             self.base_location, create_if_missing=True, leaf_cls=InMemoryDirNode | ||||
|         ) | ||||
|         setting_changed.connect(self._clear_cached_properties) | ||||
|  | ||||
|     @cached_property | ||||
|     def base_location(self): | ||||
|         return self._value_or_setting(self._location, settings.MEDIA_ROOT) | ||||
|  | ||||
|     @cached_property | ||||
|     def location(self): | ||||
|         return os.path.abspath(self.base_location) | ||||
|  | ||||
|     @cached_property | ||||
|     def base_url(self): | ||||
|         if self._base_url is not None and not self._base_url.endswith("/"): | ||||
|             self._base_url += "/" | ||||
|         return self._value_or_setting(self._base_url, settings.MEDIA_URL) | ||||
|  | ||||
|     @cached_property | ||||
|     def file_permissions_mode(self): | ||||
|         return self._value_or_setting( | ||||
|             self._file_permissions_mode, settings.FILE_UPLOAD_PERMISSIONS | ||||
|         ) | ||||
|  | ||||
|     @cached_property | ||||
|     def directory_permissions_mode(self): | ||||
|         return self._value_or_setting( | ||||
|             self._directory_permissions_mode, settings.FILE_UPLOAD_DIRECTORY_PERMISSIONS | ||||
|         ) | ||||
|  | ||||
|     def _relative_path(self, name): | ||||
|         full_path = self.path(name) | ||||
|         return os.path.relpath(full_path, self.location) | ||||
|  | ||||
|     def _resolve(self, name, create_if_missing=False, leaf_cls=None, check_exists=True): | ||||
|         try: | ||||
|             relative_path = self._relative_path(name) | ||||
|             return self._root.resolve( | ||||
|                 relative_path, | ||||
|                 create_if_missing=create_if_missing, | ||||
|                 leaf_cls=leaf_cls, | ||||
|                 check_exists=check_exists, | ||||
|             ) | ||||
|         except NotADirectoryError as exc: | ||||
|             absolute_path = self.path(exc.filename) | ||||
|             raise FileExistsError(f"{absolute_path} exists and is not a directory.") | ||||
|  | ||||
|     def _open(self, name, mode="rb"): | ||||
|         create_if_missing = "w" in mode | ||||
|         file_node = self._resolve( | ||||
|             name, create_if_missing=create_if_missing, leaf_cls=InMemoryFileNode | ||||
|         ) | ||||
|         return file_node.open(mode) | ||||
|  | ||||
|     def _save(self, name, content): | ||||
|         file_node = self._resolve( | ||||
|             name, create_if_missing=True, leaf_cls=InMemoryFileNode | ||||
|         ) | ||||
|         fd = None | ||||
|         for chunk in content.chunks(): | ||||
|             if fd is None: | ||||
|                 mode = "wb" if isinstance(chunk, bytes) else "wt" | ||||
|                 fd = file_node.open(mode) | ||||
|             fd.write(chunk) | ||||
|  | ||||
|         if hasattr(content, "temporary_file_path"): | ||||
|             os.remove(content.temporary_file_path()) | ||||
|  | ||||
|         file_node.modified_time = now() | ||||
|         return self._relative_path(name).replace("\\", "/") | ||||
|  | ||||
|     def path(self, name): | ||||
|         return safe_join(self.location, name) | ||||
|  | ||||
|     def delete(self, name): | ||||
|         path, filename = os.path.split(name) | ||||
|         dir_node = self._resolve(path, check_exists=False) | ||||
|         if dir_node is None: | ||||
|             return None | ||||
|         dir_node.remove_child(filename) | ||||
|  | ||||
|     def exists(self, name): | ||||
|         return self._resolve(name, check_exists=False) is not None | ||||
|  | ||||
|     def listdir(self, path): | ||||
|         node = self._resolve(path, leaf_cls=InMemoryDirNode) | ||||
|         return node.listdir() | ||||
|  | ||||
|     def size(self, name): | ||||
|         return len(self._open(name, "rb").file.getvalue()) | ||||
|  | ||||
|     def url(self, name): | ||||
|         if self.base_url is None: | ||||
|             raise ValueError("This file is not accessible via a URL.") | ||||
|         url = filepath_to_uri(name) | ||||
|         if url is not None: | ||||
|             url = url.lstrip("/") | ||||
|         return urljoin(self.base_url, url) | ||||
|  | ||||
|     def get_accessed_time(self, name): | ||||
|         file_node = self._resolve(name) | ||||
|         return file_node.accessed_time | ||||
|  | ||||
|     def get_created_time(self, name): | ||||
|         file_node = self._resolve(name) | ||||
|         return file_node.created_time | ||||
|  | ||||
|     def get_modified_time(self, name): | ||||
|         file_node = self._resolve(name) | ||||
|         return file_node.modified_time | ||||
| @@ -74,6 +74,39 @@ The ``FileSystemStorage`` class | ||||
|         time of the last metadata change, and on others (like Windows), it's | ||||
|         the creation time of the file. | ||||
|  | ||||
| The ``InMemoryStorage`` class | ||||
| ============================= | ||||
|  | ||||
| .. versionadded:: 4.2 | ||||
|  | ||||
| .. class:: InMemoryStorage(location=None, base_url=None, file_permissions_mode=None, directory_permissions_mode=None) | ||||
|  | ||||
|     The :class:`~django.core.files.storage.InMemoryStorage` class implements | ||||
|     a memory-based file storage. It has no persistence, but can be useful for | ||||
|     speeding up tests by avoiding disk access. | ||||
|  | ||||
|     .. attribute:: location | ||||
|  | ||||
|         Absolute path to the directory name assigned to files. Defaults to the | ||||
|         value of your :setting:`MEDIA_ROOT` setting. | ||||
|  | ||||
|     .. attribute:: base_url | ||||
|  | ||||
|         URL that serves the files stored at this location. | ||||
|         Defaults to the value of your :setting:`MEDIA_URL` setting. | ||||
|  | ||||
|     .. attribute:: file_permissions_mode | ||||
|  | ||||
|         The file system permissions assigned to files, provided for | ||||
|         compatibility with ``FileSystemStorage``. Defaults to | ||||
|         :setting:`FILE_UPLOAD_PERMISSIONS`. | ||||
|  | ||||
|     .. attribute:: directory_permissions_mode | ||||
|  | ||||
|         The file system permissions assigned to directories, provided for | ||||
|         compatibility with ``FileSystemStorage``. Defaults to | ||||
|         :setting:`FILE_UPLOAD_DIRECTORY_PERMISSIONS`. | ||||
|  | ||||
| The ``Storage`` class | ||||
| ===================== | ||||
|  | ||||
|   | ||||
| @@ -85,6 +85,12 @@ The Breach (HTB) paper`_. | ||||
|  | ||||
| .. _Heal The Breach (HTB) paper: https://ieeexplore.ieee.org/document/9754554 | ||||
|  | ||||
| In-memory file storage | ||||
| ---------------------- | ||||
|  | ||||
| The new ``django.core.files.storage.InMemoryStorage`` class provides a | ||||
| non-persistent storage useful for speeding up tests by avoiding disk access. | ||||
|  | ||||
| Minor features | ||||
| -------------- | ||||
|  | ||||
|   | ||||
| @@ -366,3 +366,12 @@ Preserving the test database | ||||
| The :option:`test --keepdb` option preserves the test database between test | ||||
| runs. It skips the create and destroy actions which can greatly decrease the | ||||
| time to run tests. | ||||
|  | ||||
| Avoiding disk access for media files | ||||
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||||
|  | ||||
| .. versionadded:: 4.2 | ||||
|  | ||||
| The :class:`~django.core.files.storage.InMemoryStorage` is a convenient way to | ||||
| prevent disk access for media files. All data is kept in memory, then it gets | ||||
| discarded after tests run. | ||||
|   | ||||
							
								
								
									
										290
									
								
								tests/file_storage/test_inmemory_storage.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										290
									
								
								tests/file_storage/test_inmemory_storage.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,290 @@ | ||||
| import os | ||||
| import sys | ||||
| import time | ||||
| import unittest | ||||
|  | ||||
| from django.core.files.base import ContentFile | ||||
| from django.core.files.storage import InMemoryStorage | ||||
| from django.core.files.uploadedfile import TemporaryUploadedFile | ||||
| from django.test import SimpleTestCase, override_settings | ||||
|  | ||||
|  | ||||
| class MemoryStorageIOTests(unittest.TestCase): | ||||
|     def setUp(self): | ||||
|         self.storage = InMemoryStorage() | ||||
|  | ||||
|     def test_write_string(self): | ||||
|         with self.storage.open("file.txt", "w") as fd: | ||||
|             fd.write("hello") | ||||
|         with self.storage.open("file.txt", "r") as fd: | ||||
|             self.assertEqual(fd.read(), "hello") | ||||
|         with self.storage.open("file.dat", "wb") as fd: | ||||
|             fd.write(b"hello") | ||||
|         with self.storage.open("file.dat", "rb") as fd: | ||||
|             self.assertEqual(fd.read(), b"hello") | ||||
|  | ||||
|     def test_convert_str_to_bytes_and_back(self): | ||||
|         """InMemoryStorage handles conversion from str to bytes and back.""" | ||||
|         with self.storage.open("file.txt", "w") as fd: | ||||
|             fd.write("hello") | ||||
|         with self.storage.open("file.txt", "rb") as fd: | ||||
|             self.assertEqual(fd.read(), b"hello") | ||||
|         with self.storage.open("file.dat", "wb") as fd: | ||||
|             fd.write(b"hello") | ||||
|         with self.storage.open("file.dat", "r") as fd: | ||||
|             self.assertEqual(fd.read(), "hello") | ||||
|  | ||||
|     def test_open_missing_file(self): | ||||
|         self.assertRaises(FileNotFoundError, self.storage.open, "missing.txt") | ||||
|  | ||||
|     def test_open_dir_as_file(self): | ||||
|         with self.storage.open("a/b/file.txt", "w") as fd: | ||||
|             fd.write("hello") | ||||
|         self.assertRaises(IsADirectoryError, self.storage.open, "a/b") | ||||
|  | ||||
|     def test_file_saving(self): | ||||
|         self.storage.save("file.txt", ContentFile("test")) | ||||
|         self.assertEqual(self.storage.open("file.txt", "r").read(), "test") | ||||
|  | ||||
|         self.storage.save("file.dat", ContentFile(b"test")) | ||||
|         self.assertEqual(self.storage.open("file.dat", "rb").read(), b"test") | ||||
|  | ||||
|     @unittest.skipIf( | ||||
|         sys.platform == "win32", "Windows doesn't support moving open files." | ||||
|     ) | ||||
|     def test_removing_temporary_file_after_save(self): | ||||
|         """A temporary file is removed when saved into storage.""" | ||||
|         with TemporaryUploadedFile("test", "text/plain", 1, "utf8") as file: | ||||
|             self.storage.save("test.txt", file) | ||||
|             self.assertFalse(os.path.exists(file.temporary_file_path())) | ||||
|  | ||||
|     def test_large_file_saving(self): | ||||
|         large_file = ContentFile("A" * ContentFile.DEFAULT_CHUNK_SIZE * 3) | ||||
|         self.storage.save("file.txt", large_file) | ||||
|  | ||||
|     def test_file_size(self): | ||||
|         """ | ||||
|         File size is equal to the size of bytes-encoded version of the saved | ||||
|         data. | ||||
|         """ | ||||
|         self.storage.save("file.txt", ContentFile("test")) | ||||
|         self.assertEqual(self.storage.size("file.txt"), 4) | ||||
|  | ||||
|         # A unicode char encoded to UTF-8 takes 2 bytes. | ||||
|         self.storage.save("unicode_file.txt", ContentFile("è")) | ||||
|         self.assertEqual(self.storage.size("unicode_file.txt"), 2) | ||||
|  | ||||
|         self.storage.save("file.dat", ContentFile(b"\xf1\xf1")) | ||||
|         self.assertEqual(self.storage.size("file.dat"), 2) | ||||
|  | ||||
|     def test_listdir(self): | ||||
|         self.assertEqual(self.storage.listdir(""), ([], [])) | ||||
|  | ||||
|         self.storage.save("file_a.txt", ContentFile("test")) | ||||
|         self.storage.save("file_b.txt", ContentFile("test")) | ||||
|         self.storage.save("dir/file_c.txt", ContentFile("test")) | ||||
|  | ||||
|         dirs, files = self.storage.listdir("") | ||||
|         self.assertEqual(sorted(files), ["file_a.txt", "file_b.txt"]) | ||||
|         self.assertEqual(dirs, ["dir"]) | ||||
|  | ||||
|     def test_list_relative_path(self): | ||||
|         self.storage.save("a/file.txt", ContentFile("test")) | ||||
|  | ||||
|         _dirs, files = self.storage.listdir("./a/./.") | ||||
|         self.assertEqual(files, ["file.txt"]) | ||||
|  | ||||
|     def test_exists(self): | ||||
|         self.storage.save("dir/subdir/file.txt", ContentFile("test")) | ||||
|         self.assertTrue(self.storage.exists("dir")) | ||||
|         self.assertTrue(self.storage.exists("dir/subdir")) | ||||
|         self.assertTrue(self.storage.exists("dir/subdir/file.txt")) | ||||
|  | ||||
|     def test_delete(self): | ||||
|         """Deletion handles both files and directory trees.""" | ||||
|         self.storage.save("dir/subdir/file.txt", ContentFile("test")) | ||||
|         self.storage.save("dir/subdir/other_file.txt", ContentFile("test")) | ||||
|         self.assertTrue(self.storage.exists("dir/subdir/file.txt")) | ||||
|         self.assertTrue(self.storage.exists("dir/subdir/other_file.txt")) | ||||
|  | ||||
|         self.storage.delete("dir/subdir/other_file.txt") | ||||
|         self.assertFalse(self.storage.exists("dir/subdir/other_file.txt")) | ||||
|  | ||||
|         self.storage.delete("dir/subdir") | ||||
|         self.assertFalse(self.storage.exists("dir/subdir/file.txt")) | ||||
|         self.assertFalse(self.storage.exists("dir/subdir")) | ||||
|  | ||||
|     def test_delete_missing_file(self): | ||||
|         self.storage.delete("missing_file.txt") | ||||
|         self.storage.delete("missing_dir/missing_file.txt") | ||||
|  | ||||
|     def test_file_node_cannot_have_children(self): | ||||
|         """Navigate to children of a file node raises FileExistsError.""" | ||||
|         self.storage.save("file.txt", ContentFile("test")) | ||||
|         self.assertRaises(FileExistsError, self.storage.listdir, "file.txt/child_dir") | ||||
|         self.assertRaises( | ||||
|             FileExistsError, | ||||
|             self.storage.save, | ||||
|             "file.txt/child_file.txt", | ||||
|             ContentFile("test"), | ||||
|         ) | ||||
|  | ||||
|     @override_settings(MEDIA_URL=None) | ||||
|     def test_url(self): | ||||
|         self.assertRaises(ValueError, self.storage.url, ("file.txt",)) | ||||
|  | ||||
|         storage = InMemoryStorage(base_url="http://www.example.com") | ||||
|         self.assertEqual(storage.url("file.txt"), "http://www.example.com/file.txt") | ||||
|  | ||||
|     def test_url_with_none_filename(self): | ||||
|         storage = InMemoryStorage(base_url="/test_media_url/") | ||||
|         self.assertEqual(storage.url(None), "/test_media_url/") | ||||
|  | ||||
|  | ||||
| class MemoryStorageTimesTests(unittest.TestCase): | ||||
|     def setUp(self): | ||||
|         self.storage = InMemoryStorage() | ||||
|  | ||||
|     def test_file_modified_time(self): | ||||
|         """ | ||||
|         File modified time should change after file changing | ||||
|         """ | ||||
|         self.storage.save("file.txt", ContentFile("test")) | ||||
|         modified_time = self.storage.get_modified_time("file.txt") | ||||
|  | ||||
|         time.sleep(0.1) | ||||
|  | ||||
|         with self.storage.open("file.txt", "w") as fd: | ||||
|             fd.write("new content") | ||||
|  | ||||
|         new_modified_time = self.storage.get_modified_time("file.txt") | ||||
|         self.assertTrue(new_modified_time > modified_time) | ||||
|  | ||||
|     def test_file_accessed_time(self): | ||||
|         """File accessed time should chage after consecutive opening.""" | ||||
|         self.storage.save("file.txt", ContentFile("test")) | ||||
|         accessed_time = self.storage.get_accessed_time("file.txt") | ||||
|  | ||||
|         time.sleep(0.1) | ||||
|  | ||||
|         self.storage.open("file.txt", "r") | ||||
|         new_accessed_time = self.storage.get_accessed_time("file.txt") | ||||
|         self.assertGreater(new_accessed_time, accessed_time) | ||||
|  | ||||
|     def test_file_created_time(self): | ||||
|         """File creation time should not change after I/O operations.""" | ||||
|         self.storage.save("file.txt", ContentFile("test")) | ||||
|         created_time = self.storage.get_created_time("file.txt") | ||||
|  | ||||
|         time.sleep(0.1) | ||||
|  | ||||
|         # File opening doesn't change creation time. | ||||
|         file = self.storage.open("file.txt", "r") | ||||
|         after_open_created_time = self.storage.get_created_time("file.txt") | ||||
|         self.assertEqual(after_open_created_time, created_time) | ||||
|         # Writing to a file doesn't change its creation time. | ||||
|         file.write("New test") | ||||
|         self.storage.save("file.txt", file) | ||||
|         after_write_created_time = self.storage.get_created_time("file.txt") | ||||
|         self.assertEqual(after_write_created_time, created_time) | ||||
|  | ||||
|     def test_directory_times_changing_after_file_creation(self): | ||||
|         """ | ||||
|         Directory modified and accessed time should change when a new file is | ||||
|         created inside. | ||||
|         """ | ||||
|         self.storage.save("dir/file1.txt", ContentFile("test")) | ||||
|         created_time = self.storage.get_created_time("dir") | ||||
|         modified_time = self.storage.get_modified_time("dir") | ||||
|         accessed_time = self.storage.get_accessed_time("dir") | ||||
|  | ||||
|         time.sleep(0.1) | ||||
|  | ||||
|         self.storage.save("dir/file2.txt", ContentFile("test")) | ||||
|         new_modified_time = self.storage.get_modified_time("dir") | ||||
|         new_accessed_time = self.storage.get_accessed_time("dir") | ||||
|         after_file_creation_created_time = self.storage.get_created_time("dir") | ||||
|         self.assertGreater(new_modified_time, modified_time) | ||||
|         self.assertGreater(new_accessed_time, accessed_time) | ||||
|         self.assertEqual(created_time, after_file_creation_created_time) | ||||
|  | ||||
|     def test_directory_times_changing_after_file_deletion(self): | ||||
|         """ | ||||
|         Directory modified and accessed time should change when a new file is | ||||
|         deleted inside. | ||||
|         """ | ||||
|         self.storage.save("dir/file.txt", ContentFile("test")) | ||||
|         created_time = self.storage.get_created_time("dir") | ||||
|         modified_time = self.storage.get_modified_time("dir") | ||||
|         accessed_time = self.storage.get_accessed_time("dir") | ||||
|  | ||||
|         time.sleep(0.1) | ||||
|  | ||||
|         self.storage.delete("dir/file.txt") | ||||
|         new_modified_time = self.storage.get_modified_time("dir") | ||||
|         new_accessed_time = self.storage.get_accessed_time("dir") | ||||
|         after_file_deletion_created_time = self.storage.get_created_time("dir") | ||||
|         self.assertGreater(new_modified_time, modified_time) | ||||
|         self.assertGreater(new_accessed_time, accessed_time) | ||||
|         self.assertEqual(created_time, after_file_deletion_created_time) | ||||
|  | ||||
|  | ||||
| class InMemoryStorageTests(SimpleTestCase): | ||||
|     def test_deconstruction(self): | ||||
|         storage = InMemoryStorage() | ||||
|         path, args, kwargs = storage.deconstruct() | ||||
|         self.assertEqual(path, "django.core.files.storage.InMemoryStorage") | ||||
|         self.assertEqual(args, ()) | ||||
|         self.assertEqual(kwargs, {}) | ||||
|  | ||||
|         kwargs_orig = { | ||||
|             "location": "/custom_path", | ||||
|             "base_url": "http://myfiles.example.com/", | ||||
|             "file_permissions_mode": "0o755", | ||||
|             "directory_permissions_mode": "0o600", | ||||
|         } | ||||
|         storage = InMemoryStorage(**kwargs_orig) | ||||
|         path, args, kwargs = storage.deconstruct() | ||||
|         self.assertEqual(kwargs, kwargs_orig) | ||||
|  | ||||
|     @override_settings( | ||||
|         MEDIA_ROOT="media_root", | ||||
|         MEDIA_URL="media_url/", | ||||
|         FILE_UPLOAD_PERMISSIONS=0o777, | ||||
|         FILE_UPLOAD_DIRECTORY_PERMISSIONS=0o777, | ||||
|     ) | ||||
|     def test_setting_changed(self): | ||||
|         """ | ||||
|         Properties using settings values as defaults should be updated on | ||||
|         referenced settings change while specified values should be unchanged. | ||||
|         """ | ||||
|         storage = InMemoryStorage( | ||||
|             location="explicit_location", | ||||
|             base_url="explicit_base_url/", | ||||
|             file_permissions_mode=0o666, | ||||
|             directory_permissions_mode=0o666, | ||||
|         ) | ||||
|         defaults_storage = InMemoryStorage() | ||||
|         settings = { | ||||
|             "MEDIA_ROOT": "overridden_media_root", | ||||
|             "MEDIA_URL": "/overridden_media_url/", | ||||
|             "FILE_UPLOAD_PERMISSIONS": 0o333, | ||||
|             "FILE_UPLOAD_DIRECTORY_PERMISSIONS": 0o333, | ||||
|         } | ||||
|         with self.settings(**settings): | ||||
|             self.assertEqual(storage.base_location, "explicit_location") | ||||
|             self.assertIn("explicit_location", storage.location) | ||||
|             self.assertEqual(storage.base_url, "explicit_base_url/") | ||||
|             self.assertEqual(storage.file_permissions_mode, 0o666) | ||||
|             self.assertEqual(storage.directory_permissions_mode, 0o666) | ||||
|             self.assertEqual(defaults_storage.base_location, settings["MEDIA_ROOT"]) | ||||
|             self.assertIn(settings["MEDIA_ROOT"], defaults_storage.location) | ||||
|             self.assertEqual(defaults_storage.base_url, settings["MEDIA_URL"]) | ||||
|             self.assertEqual( | ||||
|                 defaults_storage.file_permissions_mode, | ||||
|                 settings["FILE_UPLOAD_PERMISSIONS"], | ||||
|             ) | ||||
|             self.assertEqual( | ||||
|                 defaults_storage.directory_permissions_mode, | ||||
|                 settings["FILE_UPLOAD_DIRECTORY_PERMISSIONS"], | ||||
|             ) | ||||
| @@ -11,6 +11,7 @@ from urllib.parse import quote | ||||
|  | ||||
| from django.core.exceptions import SuspiciousFileOperation | ||||
| from django.core.files import temp as tempfile | ||||
| from django.core.files.storage import default_storage | ||||
| from django.core.files.uploadedfile import SimpleUploadedFile, UploadedFile | ||||
| from django.http.multipartparser import ( | ||||
|     FILE, | ||||
| @@ -804,6 +805,9 @@ class DirectoryCreationTests(SimpleTestCase): | ||||
|     @unittest.skipIf( | ||||
|         sys.platform == "win32", "Python on Windows doesn't have working os.chmod()." | ||||
|     ) | ||||
|     @override_settings( | ||||
|         DEFAULT_FILE_STORAGE="django.core.files.storage.FileSystemStorage" | ||||
|     ) | ||||
|     def test_readonly_root(self): | ||||
|         """Permission errors are not swallowed""" | ||||
|         os.chmod(MEDIA_ROOT, 0o500) | ||||
| @@ -814,9 +818,11 @@ class DirectoryCreationTests(SimpleTestCase): | ||||
|             ) | ||||
|  | ||||
|     def test_not_a_directory(self): | ||||
|         default_storage.delete(UPLOAD_TO) | ||||
|         # Create a file with the upload directory name | ||||
|         open(UPLOAD_TO, "wb").close() | ||||
|         self.addCleanup(os.remove, UPLOAD_TO) | ||||
|         with SimpleUploadedFile(UPLOAD_TO, b"x") as file: | ||||
|             default_storage.save(UPLOAD_TO, file) | ||||
|         self.addCleanup(default_storage.delete, UPLOAD_TO) | ||||
|         msg = "%s exists and is not a directory." % UPLOAD_TO | ||||
|         with self.assertRaisesMessage(FileExistsError, msg): | ||||
|             with SimpleUploadedFile("foo.txt", b"x") as file: | ||||
|   | ||||
		Reference in New Issue
	
	Block a user