From 032c09c4144eaa278a64b9a4bef838341b35d175 Mon Sep 17 00:00:00 2001 From: Francesco Panico Date: Thu, 10 Nov 2022 22:01:53 +0100 Subject: [PATCH] Refs #34110 -- Reorganized django.core.files.storage into a separate module. --- django/core/files/storage/__init__.py | 26 +++ django/core/files/storage/base.py | 190 ++++++++++++++++ .../{storage.py => storage/filesystem.py} | 212 +----------------- tests/file_storage/tests.py | 2 +- 4 files changed, 220 insertions(+), 210 deletions(-) create mode 100644 django/core/files/storage/__init__.py create mode 100644 django/core/files/storage/base.py rename django/core/files/{storage.py => storage/filesystem.py} (51%) diff --git a/django/core/files/storage/__init__.py b/django/core/files/storage/__init__.py new file mode 100644 index 0000000000..240bbc1795 --- /dev/null +++ b/django/core/files/storage/__init__.py @@ -0,0 +1,26 @@ +from django.conf import settings +from django.utils.functional import LazyObject +from django.utils.module_loading import import_string + +from .base import Storage +from .filesystem import FileSystemStorage + +__all__ = ( + "FileSystemStorage", + "Storage", + "DefaultStorage", + "default_storage", + "get_storage_class", +) + + +def get_storage_class(import_path=None): + return import_string(import_path or settings.DEFAULT_FILE_STORAGE) + + +class DefaultStorage(LazyObject): + def _setup(self): + self._wrapped = get_storage_class()() + + +default_storage = DefaultStorage() diff --git a/django/core/files/storage/base.py b/django/core/files/storage/base.py new file mode 100644 index 0000000000..16ac22f70a --- /dev/null +++ b/django/core/files/storage/base.py @@ -0,0 +1,190 @@ +import os +import pathlib + +from django.core.exceptions import SuspiciousFileOperation +from django.core.files import File +from django.core.files.utils import validate_file_name +from django.utils.crypto import get_random_string +from django.utils.text import get_valid_filename + + +class Storage: + """ + A base storage class, providing some default behaviors that all other + storage systems can inherit or override, as necessary. + """ + + # The following methods represent a public interface to private methods. + # These shouldn't be overridden by subclasses unless absolutely necessary. + + def open(self, name, mode="rb"): + """Retrieve the specified file from storage.""" + return self._open(name, mode) + + def save(self, name, content, max_length=None): + """ + Save new content to the file specified by name. The content should be + a proper File object or any Python file-like object, ready to be read + from the beginning. + """ + # Get the proper name for the file, as it will actually be saved. + if name is None: + name = content.name + + if not hasattr(content, "chunks"): + content = File(content, name) + + name = self.get_available_name(name, max_length=max_length) + name = self._save(name, content) + # Ensure that the name returned from the storage system is still valid. + validate_file_name(name, allow_relative_path=True) + return name + + # These methods are part of the public API, with default implementations. + + def get_valid_name(self, name): + """ + Return a filename, based on the provided filename, that's suitable for + use in the target storage system. + """ + return get_valid_filename(name) + + def get_alternative_name(self, file_root, file_ext): + """ + Return an alternative filename, by adding an underscore and a random 7 + character alphanumeric string (before the file extension, if one + exists) to the filename. + """ + return "%s_%s%s" % (file_root, get_random_string(7), file_ext) + + def get_available_name(self, name, max_length=None): + """ + Return a filename that's free on the target storage system and + available for new content to be written to. + """ + name = str(name).replace("\\", "/") + dir_name, file_name = os.path.split(name) + if ".." in pathlib.PurePath(dir_name).parts: + raise SuspiciousFileOperation( + "Detected path traversal attempt in '%s'" % dir_name + ) + validate_file_name(file_name) + file_root, file_ext = os.path.splitext(file_name) + # If the filename already exists, generate an alternative filename + # until it doesn't exist. + # Truncate original name if required, so the new filename does not + # exceed the max_length. + while self.exists(name) or (max_length and len(name) > max_length): + # file_ext includes the dot. + name = os.path.join( + dir_name, self.get_alternative_name(file_root, file_ext) + ) + if max_length is None: + continue + # Truncate file_root if max_length exceeded. + truncation = len(name) - max_length + if truncation > 0: + file_root = file_root[:-truncation] + # Entire file_root was truncated in attempt to find an + # available filename. + if not file_root: + raise SuspiciousFileOperation( + 'Storage can not find an available filename for "%s". ' + "Please make sure that the corresponding file field " + 'allows sufficient "max_length".' % name + ) + name = os.path.join( + dir_name, self.get_alternative_name(file_root, file_ext) + ) + return name + + def generate_filename(self, filename): + """ + Validate the filename by calling get_valid_name() and return a filename + to be passed to the save() method. + """ + filename = str(filename).replace("\\", "/") + # `filename` may include a path as returned by FileField.upload_to. + dirname, filename = os.path.split(filename) + if ".." in pathlib.PurePath(dirname).parts: + raise SuspiciousFileOperation( + "Detected path traversal attempt in '%s'" % dirname + ) + return os.path.normpath(os.path.join(dirname, self.get_valid_name(filename))) + + def path(self, name): + """ + Return a local filesystem path where the file can be retrieved using + Python's built-in open() function. Storage systems that can't be + accessed using open() should *not* implement this method. + """ + raise NotImplementedError("This backend doesn't support absolute paths.") + + # The following methods form the public API for storage systems, but with + # no default implementations. Subclasses must implement *all* of these. + + def delete(self, name): + """ + Delete the specified file from the storage system. + """ + raise NotImplementedError( + "subclasses of Storage must provide a delete() method" + ) + + def exists(self, name): + """ + Return True if a file referenced by the given name already exists in the + storage system, or False if the name is available for a new file. + """ + raise NotImplementedError( + "subclasses of Storage must provide an exists() method" + ) + + def listdir(self, path): + """ + List the contents of the specified path. Return a 2-tuple of lists: + the first item being directories, the second item being files. + """ + raise NotImplementedError( + "subclasses of Storage must provide a listdir() method" + ) + + def size(self, name): + """ + Return the total size, in bytes, of the file specified by name. + """ + raise NotImplementedError("subclasses of Storage must provide a size() method") + + def url(self, name): + """ + Return an absolute URL where the file's contents can be accessed + directly by a web browser. + """ + raise NotImplementedError("subclasses of Storage must provide a url() method") + + def get_accessed_time(self, name): + """ + Return the last accessed time (as a datetime) of the file specified by + name. The datetime will be timezone-aware if USE_TZ=True. + """ + raise NotImplementedError( + "subclasses of Storage must provide a get_accessed_time() method" + ) + + def get_created_time(self, name): + """ + Return the creation time (as a datetime) of the file specified by name. + The datetime will be timezone-aware if USE_TZ=True. + """ + raise NotImplementedError( + "subclasses of Storage must provide a get_created_time() method" + ) + + def get_modified_time(self, name): + """ + Return the last modified time (as a datetime) of the file specified by + name. The datetime will be timezone-aware if USE_TZ=True. + """ + raise NotImplementedError( + "subclasses of Storage must provide a get_modified_time() method" + ) diff --git a/django/core/files/storage.py b/django/core/files/storage/filesystem.py similarity index 51% rename from django/core/files/storage.py rename to django/core/files/storage/filesystem.py index 2eb8d08236..4916a0b0e8 100644 --- a/django/core/files/storage.py +++ b/django/core/files/storage/filesystem.py @@ -1,214 +1,20 @@ import os -import pathlib from datetime import datetime, timezone from urllib.parse import urljoin from django.conf import settings -from django.core.exceptions import SuspiciousFileOperation from django.core.files import File, locks from django.core.files.move import file_move_safe -from django.core.files.utils import validate_file_name from django.core.signals import setting_changed from django.utils._os import safe_join -from django.utils.crypto import get_random_string from django.utils.deconstruct import deconstructible from django.utils.encoding import filepath_to_uri -from django.utils.functional import LazyObject, cached_property -from django.utils.module_loading import import_string -from django.utils.text import get_valid_filename +from django.utils.functional import cached_property -__all__ = ( - "Storage", - "FileSystemStorage", - "DefaultStorage", - "default_storage", - "get_storage_class", -) +from .base import Storage -class Storage: - """ - A base storage class, providing some default behaviors that all other - storage systems can inherit or override, as necessary. - """ - - # The following methods represent a public interface to private methods. - # These shouldn't be overridden by subclasses unless absolutely necessary. - - def open(self, name, mode="rb"): - """Retrieve the specified file from storage.""" - return self._open(name, mode) - - def save(self, name, content, max_length=None): - """ - Save new content to the file specified by name. The content should be - a proper File object or any Python file-like object, ready to be read - from the beginning. - """ - # Get the proper name for the file, as it will actually be saved. - if name is None: - name = content.name - - if not hasattr(content, "chunks"): - content = File(content, name) - - name = self.get_available_name(name, max_length=max_length) - name = self._save(name, content) - # Ensure that the name returned from the storage system is still valid. - validate_file_name(name, allow_relative_path=True) - return name - - # These methods are part of the public API, with default implementations. - - def get_valid_name(self, name): - """ - Return a filename, based on the provided filename, that's suitable for - use in the target storage system. - """ - return get_valid_filename(name) - - def get_alternative_name(self, file_root, file_ext): - """ - Return an alternative filename, by adding an underscore and a random 7 - character alphanumeric string (before the file extension, if one - exists) to the filename. - """ - return "%s_%s%s" % (file_root, get_random_string(7), file_ext) - - def get_available_name(self, name, max_length=None): - """ - Return a filename that's free on the target storage system and - available for new content to be written to. - """ - name = str(name).replace("\\", "/") - dir_name, file_name = os.path.split(name) - if ".." in pathlib.PurePath(dir_name).parts: - raise SuspiciousFileOperation( - "Detected path traversal attempt in '%s'" % dir_name - ) - validate_file_name(file_name) - file_root, file_ext = os.path.splitext(file_name) - # If the filename already exists, generate an alternative filename - # until it doesn't exist. - # Truncate original name if required, so the new filename does not - # exceed the max_length. - while self.exists(name) or (max_length and len(name) > max_length): - # file_ext includes the dot. - name = os.path.join( - dir_name, self.get_alternative_name(file_root, file_ext) - ) - if max_length is None: - continue - # Truncate file_root if max_length exceeded. - truncation = len(name) - max_length - if truncation > 0: - file_root = file_root[:-truncation] - # Entire file_root was truncated in attempt to find an - # available filename. - if not file_root: - raise SuspiciousFileOperation( - 'Storage can not find an available filename for "%s". ' - "Please make sure that the corresponding file field " - 'allows sufficient "max_length".' % name - ) - name = os.path.join( - dir_name, self.get_alternative_name(file_root, file_ext) - ) - return name - - def generate_filename(self, filename): - """ - Validate the filename by calling get_valid_name() and return a filename - to be passed to the save() method. - """ - filename = str(filename).replace("\\", "/") - # `filename` may include a path as returned by FileField.upload_to. - dirname, filename = os.path.split(filename) - if ".." in pathlib.PurePath(dirname).parts: - raise SuspiciousFileOperation( - "Detected path traversal attempt in '%s'" % dirname - ) - return os.path.normpath(os.path.join(dirname, self.get_valid_name(filename))) - - def path(self, name): - """ - Return a local filesystem path where the file can be retrieved using - Python's built-in open() function. Storage systems that can't be - accessed using open() should *not* implement this method. - """ - raise NotImplementedError("This backend doesn't support absolute paths.") - - # The following methods form the public API for storage systems, but with - # no default implementations. Subclasses must implement *all* of these. - - def delete(self, name): - """ - Delete the specified file from the storage system. - """ - raise NotImplementedError( - "subclasses of Storage must provide a delete() method" - ) - - def exists(self, name): - """ - Return True if a file referenced by the given name already exists in the - storage system, or False if the name is available for a new file. - """ - raise NotImplementedError( - "subclasses of Storage must provide an exists() method" - ) - - def listdir(self, path): - """ - List the contents of the specified path. Return a 2-tuple of lists: - the first item being directories, the second item being files. - """ - raise NotImplementedError( - "subclasses of Storage must provide a listdir() method" - ) - - def size(self, name): - """ - Return the total size, in bytes, of the file specified by name. - """ - raise NotImplementedError("subclasses of Storage must provide a size() method") - - def url(self, name): - """ - Return an absolute URL where the file's contents can be accessed - directly by a web browser. - """ - raise NotImplementedError("subclasses of Storage must provide a url() method") - - def get_accessed_time(self, name): - """ - Return the last accessed time (as a datetime) of the file specified by - name. The datetime will be timezone-aware if USE_TZ=True. - """ - raise NotImplementedError( - "subclasses of Storage must provide a get_accessed_time() method" - ) - - def get_created_time(self, name): - """ - Return the creation time (as a datetime) of the file specified by name. - The datetime will be timezone-aware if USE_TZ=True. - """ - raise NotImplementedError( - "subclasses of Storage must provide a get_created_time() method" - ) - - def get_modified_time(self, name): - """ - Return the last modified time (as a datetime) of the file specified by - name. The datetime will be timezone-aware if USE_TZ=True. - """ - raise NotImplementedError( - "subclasses of Storage must provide a get_modified_time() method" - ) - - -@deconstructible +@deconstructible(path="django.core.files.storage.FileSystemStorage") class FileSystemStorage(Storage): """ Standard filesystem storage @@ -413,15 +219,3 @@ class FileSystemStorage(Storage): def get_modified_time(self, name): return self._datetime_from_timestamp(os.path.getmtime(self.path(name))) - - -def get_storage_class(import_path=None): - return import_string(import_path or settings.DEFAULT_FILE_STORAGE) - - -class DefaultStorage(LazyObject): - def _setup(self): - self._wrapped = get_storage_class()() - - -default_storage = DefaultStorage() diff --git a/tests/file_storage/tests.py b/tests/file_storage/tests.py index 87a5e70c33..5c7190d698 100644 --- a/tests/file_storage/tests.py +++ b/tests/file_storage/tests.py @@ -956,7 +956,7 @@ class FieldCallableFileStorageTests(SimpleTestCase): msg = ( "FileField.storage must be a subclass/instance of " - "django.core.files.storage.Storage" + "django.core.files.storage.base.Storage" ) for invalid_type in (NotStorage, str, list, set, tuple): with self.subTest(invalid_type=invalid_type):