1
0
mirror of https://github.com/django/django.git synced 2024-12-23 01:25:58 +00:00

Refs #34110 -- Reorganized django.core.files.storage into a separate module.

This commit is contained in:
Francesco Panico 2022-11-10 22:01:53 +01:00 committed by Mariusz Felisiak
parent 9bd174b9a7
commit 032c09c414
4 changed files with 220 additions and 210 deletions

View File

@ -0,0 +1,26 @@
from django.conf import settings
from django.utils.functional import LazyObject
from django.utils.module_loading import import_string
from .base import Storage
from .filesystem import FileSystemStorage
__all__ = (
"FileSystemStorage",
"Storage",
"DefaultStorage",
"default_storage",
"get_storage_class",
)
def get_storage_class(import_path=None):
return import_string(import_path or settings.DEFAULT_FILE_STORAGE)
class DefaultStorage(LazyObject):
def _setup(self):
self._wrapped = get_storage_class()()
default_storage = DefaultStorage()

View File

@ -0,0 +1,190 @@
import os
import pathlib
from django.core.exceptions import SuspiciousFileOperation
from django.core.files import File
from django.core.files.utils import validate_file_name
from django.utils.crypto import get_random_string
from django.utils.text import get_valid_filename
class Storage:
"""
A base storage class, providing some default behaviors that all other
storage systems can inherit or override, as necessary.
"""
# The following methods represent a public interface to private methods.
# These shouldn't be overridden by subclasses unless absolutely necessary.
def open(self, name, mode="rb"):
"""Retrieve the specified file from storage."""
return self._open(name, mode)
def save(self, name, content, max_length=None):
"""
Save new content to the file specified by name. The content should be
a proper File object or any Python file-like object, ready to be read
from the beginning.
"""
# Get the proper name for the file, as it will actually be saved.
if name is None:
name = content.name
if not hasattr(content, "chunks"):
content = File(content, name)
name = self.get_available_name(name, max_length=max_length)
name = self._save(name, content)
# Ensure that the name returned from the storage system is still valid.
validate_file_name(name, allow_relative_path=True)
return name
# These methods are part of the public API, with default implementations.
def get_valid_name(self, name):
"""
Return a filename, based on the provided filename, that's suitable for
use in the target storage system.
"""
return get_valid_filename(name)
def get_alternative_name(self, file_root, file_ext):
"""
Return an alternative filename, by adding an underscore and a random 7
character alphanumeric string (before the file extension, if one
exists) to the filename.
"""
return "%s_%s%s" % (file_root, get_random_string(7), file_ext)
def get_available_name(self, name, max_length=None):
"""
Return a filename that's free on the target storage system and
available for new content to be written to.
"""
name = str(name).replace("\\", "/")
dir_name, file_name = os.path.split(name)
if ".." in pathlib.PurePath(dir_name).parts:
raise SuspiciousFileOperation(
"Detected path traversal attempt in '%s'" % dir_name
)
validate_file_name(file_name)
file_root, file_ext = os.path.splitext(file_name)
# If the filename already exists, generate an alternative filename
# until it doesn't exist.
# Truncate original name if required, so the new filename does not
# exceed the max_length.
while self.exists(name) or (max_length and len(name) > max_length):
# file_ext includes the dot.
name = os.path.join(
dir_name, self.get_alternative_name(file_root, file_ext)
)
if max_length is None:
continue
# Truncate file_root if max_length exceeded.
truncation = len(name) - max_length
if truncation > 0:
file_root = file_root[:-truncation]
# Entire file_root was truncated in attempt to find an
# available filename.
if not file_root:
raise SuspiciousFileOperation(
'Storage can not find an available filename for "%s". '
"Please make sure that the corresponding file field "
'allows sufficient "max_length".' % name
)
name = os.path.join(
dir_name, self.get_alternative_name(file_root, file_ext)
)
return name
def generate_filename(self, filename):
"""
Validate the filename by calling get_valid_name() and return a filename
to be passed to the save() method.
"""
filename = str(filename).replace("\\", "/")
# `filename` may include a path as returned by FileField.upload_to.
dirname, filename = os.path.split(filename)
if ".." in pathlib.PurePath(dirname).parts:
raise SuspiciousFileOperation(
"Detected path traversal attempt in '%s'" % dirname
)
return os.path.normpath(os.path.join(dirname, self.get_valid_name(filename)))
def path(self, name):
"""
Return a local filesystem path where the file can be retrieved using
Python's built-in open() function. Storage systems that can't be
accessed using open() should *not* implement this method.
"""
raise NotImplementedError("This backend doesn't support absolute paths.")
# The following methods form the public API for storage systems, but with
# no default implementations. Subclasses must implement *all* of these.
def delete(self, name):
"""
Delete the specified file from the storage system.
"""
raise NotImplementedError(
"subclasses of Storage must provide a delete() method"
)
def exists(self, name):
"""
Return True if a file referenced by the given name already exists in the
storage system, or False if the name is available for a new file.
"""
raise NotImplementedError(
"subclasses of Storage must provide an exists() method"
)
def listdir(self, path):
"""
List the contents of the specified path. Return a 2-tuple of lists:
the first item being directories, the second item being files.
"""
raise NotImplementedError(
"subclasses of Storage must provide a listdir() method"
)
def size(self, name):
"""
Return the total size, in bytes, of the file specified by name.
"""
raise NotImplementedError("subclasses of Storage must provide a size() method")
def url(self, name):
"""
Return an absolute URL where the file's contents can be accessed
directly by a web browser.
"""
raise NotImplementedError("subclasses of Storage must provide a url() method")
def get_accessed_time(self, name):
"""
Return the last accessed time (as a datetime) of the file specified by
name. The datetime will be timezone-aware if USE_TZ=True.
"""
raise NotImplementedError(
"subclasses of Storage must provide a get_accessed_time() method"
)
def get_created_time(self, name):
"""
Return the creation time (as a datetime) of the file specified by name.
The datetime will be timezone-aware if USE_TZ=True.
"""
raise NotImplementedError(
"subclasses of Storage must provide a get_created_time() method"
)
def get_modified_time(self, name):
"""
Return the last modified time (as a datetime) of the file specified by
name. The datetime will be timezone-aware if USE_TZ=True.
"""
raise NotImplementedError(
"subclasses of Storage must provide a get_modified_time() method"
)

View File

@ -1,214 +1,20 @@
import os import os
import pathlib
from datetime import datetime, timezone from datetime import datetime, timezone
from urllib.parse import urljoin from urllib.parse import urljoin
from django.conf import settings from django.conf import settings
from django.core.exceptions import SuspiciousFileOperation
from django.core.files import File, locks from django.core.files import File, locks
from django.core.files.move import file_move_safe from django.core.files.move import file_move_safe
from django.core.files.utils import validate_file_name
from django.core.signals import setting_changed from django.core.signals import setting_changed
from django.utils._os import safe_join from django.utils._os import safe_join
from django.utils.crypto import get_random_string
from django.utils.deconstruct import deconstructible from django.utils.deconstruct import deconstructible
from django.utils.encoding import filepath_to_uri from django.utils.encoding import filepath_to_uri
from django.utils.functional import LazyObject, cached_property from django.utils.functional import cached_property
from django.utils.module_loading import import_string
from django.utils.text import get_valid_filename
__all__ = ( from .base import Storage
"Storage",
"FileSystemStorage",
"DefaultStorage",
"default_storage",
"get_storage_class",
)
class Storage: @deconstructible(path="django.core.files.storage.FileSystemStorage")
"""
A base storage class, providing some default behaviors that all other
storage systems can inherit or override, as necessary.
"""
# The following methods represent a public interface to private methods.
# These shouldn't be overridden by subclasses unless absolutely necessary.
def open(self, name, mode="rb"):
"""Retrieve the specified file from storage."""
return self._open(name, mode)
def save(self, name, content, max_length=None):
"""
Save new content to the file specified by name. The content should be
a proper File object or any Python file-like object, ready to be read
from the beginning.
"""
# Get the proper name for the file, as it will actually be saved.
if name is None:
name = content.name
if not hasattr(content, "chunks"):
content = File(content, name)
name = self.get_available_name(name, max_length=max_length)
name = self._save(name, content)
# Ensure that the name returned from the storage system is still valid.
validate_file_name(name, allow_relative_path=True)
return name
# These methods are part of the public API, with default implementations.
def get_valid_name(self, name):
"""
Return a filename, based on the provided filename, that's suitable for
use in the target storage system.
"""
return get_valid_filename(name)
def get_alternative_name(self, file_root, file_ext):
"""
Return an alternative filename, by adding an underscore and a random 7
character alphanumeric string (before the file extension, if one
exists) to the filename.
"""
return "%s_%s%s" % (file_root, get_random_string(7), file_ext)
def get_available_name(self, name, max_length=None):
"""
Return a filename that's free on the target storage system and
available for new content to be written to.
"""
name = str(name).replace("\\", "/")
dir_name, file_name = os.path.split(name)
if ".." in pathlib.PurePath(dir_name).parts:
raise SuspiciousFileOperation(
"Detected path traversal attempt in '%s'" % dir_name
)
validate_file_name(file_name)
file_root, file_ext = os.path.splitext(file_name)
# If the filename already exists, generate an alternative filename
# until it doesn't exist.
# Truncate original name if required, so the new filename does not
# exceed the max_length.
while self.exists(name) or (max_length and len(name) > max_length):
# file_ext includes the dot.
name = os.path.join(
dir_name, self.get_alternative_name(file_root, file_ext)
)
if max_length is None:
continue
# Truncate file_root if max_length exceeded.
truncation = len(name) - max_length
if truncation > 0:
file_root = file_root[:-truncation]
# Entire file_root was truncated in attempt to find an
# available filename.
if not file_root:
raise SuspiciousFileOperation(
'Storage can not find an available filename for "%s". '
"Please make sure that the corresponding file field "
'allows sufficient "max_length".' % name
)
name = os.path.join(
dir_name, self.get_alternative_name(file_root, file_ext)
)
return name
def generate_filename(self, filename):
"""
Validate the filename by calling get_valid_name() and return a filename
to be passed to the save() method.
"""
filename = str(filename).replace("\\", "/")
# `filename` may include a path as returned by FileField.upload_to.
dirname, filename = os.path.split(filename)
if ".." in pathlib.PurePath(dirname).parts:
raise SuspiciousFileOperation(
"Detected path traversal attempt in '%s'" % dirname
)
return os.path.normpath(os.path.join(dirname, self.get_valid_name(filename)))
def path(self, name):
"""
Return a local filesystem path where the file can be retrieved using
Python's built-in open() function. Storage systems that can't be
accessed using open() should *not* implement this method.
"""
raise NotImplementedError("This backend doesn't support absolute paths.")
# The following methods form the public API for storage systems, but with
# no default implementations. Subclasses must implement *all* of these.
def delete(self, name):
"""
Delete the specified file from the storage system.
"""
raise NotImplementedError(
"subclasses of Storage must provide a delete() method"
)
def exists(self, name):
"""
Return True if a file referenced by the given name already exists in the
storage system, or False if the name is available for a new file.
"""
raise NotImplementedError(
"subclasses of Storage must provide an exists() method"
)
def listdir(self, path):
"""
List the contents of the specified path. Return a 2-tuple of lists:
the first item being directories, the second item being files.
"""
raise NotImplementedError(
"subclasses of Storage must provide a listdir() method"
)
def size(self, name):
"""
Return the total size, in bytes, of the file specified by name.
"""
raise NotImplementedError("subclasses of Storage must provide a size() method")
def url(self, name):
"""
Return an absolute URL where the file's contents can be accessed
directly by a web browser.
"""
raise NotImplementedError("subclasses of Storage must provide a url() method")
def get_accessed_time(self, name):
"""
Return the last accessed time (as a datetime) of the file specified by
name. The datetime will be timezone-aware if USE_TZ=True.
"""
raise NotImplementedError(
"subclasses of Storage must provide a get_accessed_time() method"
)
def get_created_time(self, name):
"""
Return the creation time (as a datetime) of the file specified by name.
The datetime will be timezone-aware if USE_TZ=True.
"""
raise NotImplementedError(
"subclasses of Storage must provide a get_created_time() method"
)
def get_modified_time(self, name):
"""
Return the last modified time (as a datetime) of the file specified by
name. The datetime will be timezone-aware if USE_TZ=True.
"""
raise NotImplementedError(
"subclasses of Storage must provide a get_modified_time() method"
)
@deconstructible
class FileSystemStorage(Storage): class FileSystemStorage(Storage):
""" """
Standard filesystem storage Standard filesystem storage
@ -413,15 +219,3 @@ class FileSystemStorage(Storage):
def get_modified_time(self, name): def get_modified_time(self, name):
return self._datetime_from_timestamp(os.path.getmtime(self.path(name))) return self._datetime_from_timestamp(os.path.getmtime(self.path(name)))
def get_storage_class(import_path=None):
return import_string(import_path or settings.DEFAULT_FILE_STORAGE)
class DefaultStorage(LazyObject):
def _setup(self):
self._wrapped = get_storage_class()()
default_storage = DefaultStorage()

View File

@ -956,7 +956,7 @@ class FieldCallableFileStorageTests(SimpleTestCase):
msg = ( msg = (
"FileField.storage must be a subclass/instance of " "FileField.storage must be a subclass/instance of "
"django.core.files.storage.Storage" "django.core.files.storage.base.Storage"
) )
for invalid_type in (NotStorage, str, list, set, tuple): for invalid_type in (NotStorage, str, list, set, tuple):
with self.subTest(invalid_type=invalid_type): with self.subTest(invalid_type=invalid_type):