mirror of
				https://github.com/django/django.git
				synced 2025-10-26 07:06:08 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			972 lines
		
	
	
		
			35 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			972 lines
		
	
	
		
			35 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import base64
 | |
| import hashlib
 | |
| import os
 | |
| import shutil
 | |
| import sys
 | |
| import tempfile as sys_tempfile
 | |
| import unittest
 | |
| from io import BytesIO, StringIO
 | |
| from unittest import mock
 | |
| from urllib.parse import quote
 | |
| 
 | |
| from django.conf import DEFAULT_STORAGE_ALIAS
 | |
| from django.core.exceptions import SuspiciousFileOperation
 | |
| from django.core.files import temp as tempfile
 | |
| from django.core.files.storage import default_storage
 | |
| from django.core.files.uploadedfile import SimpleUploadedFile, UploadedFile
 | |
| from django.http.multipartparser import (
 | |
|     FILE,
 | |
|     MAX_TOTAL_HEADER_SIZE,
 | |
|     MultiPartParser,
 | |
|     MultiPartParserError,
 | |
|     Parser,
 | |
| )
 | |
| from django.test import SimpleTestCase, TestCase, client, override_settings
 | |
| 
 | |
| from . import uploadhandler
 | |
| from .models import FileModel
 | |
| 
 | |
| UNICODE_FILENAME = "test-0123456789_中文_Orléans.jpg"
 | |
| MEDIA_ROOT = sys_tempfile.mkdtemp()
 | |
| UPLOAD_FOLDER = "test_upload"
 | |
| UPLOAD_TO = os.path.join(MEDIA_ROOT, UPLOAD_FOLDER)
 | |
| 
 | |
| CANDIDATE_TRAVERSAL_FILE_NAMES = [
 | |
|     "/tmp/hax0rd.txt",  # Absolute path, *nix-style.
 | |
|     "C:\\Windows\\hax0rd.txt",  # Absolute path, win-style.
 | |
|     "C:/Windows/hax0rd.txt",  # Absolute path, broken-style.
 | |
|     "\\tmp\\hax0rd.txt",  # Absolute path, broken in a different way.
 | |
|     "/tmp\\hax0rd.txt",  # Absolute path, broken by mixing.
 | |
|     "subdir/hax0rd.txt",  # Descendant path, *nix-style.
 | |
|     "subdir\\hax0rd.txt",  # Descendant path, win-style.
 | |
|     "sub/dir\\hax0rd.txt",  # Descendant path, mixed.
 | |
|     "../../hax0rd.txt",  # Relative path, *nix-style.
 | |
|     "..\\..\\hax0rd.txt",  # Relative path, win-style.
 | |
|     "../..\\hax0rd.txt",  # Relative path, mixed.
 | |
|     "../hax0rd.txt",  # HTML entities.
 | |
|     "../hax0rd.txt",  # HTML entities.
 | |
| ]
 | |
| 
 | |
| CANDIDATE_INVALID_FILE_NAMES = [
 | |
|     "/tmp/",  # Directory, *nix-style.
 | |
|     "c:\\tmp\\",  # Directory, win-style.
 | |
|     "/tmp/.",  # Directory dot, *nix-style.
 | |
|     "c:\\tmp\\.",  # Directory dot, *nix-style.
 | |
|     "/tmp/..",  # Parent directory, *nix-style.
 | |
|     "c:\\tmp\\..",  # Parent directory, win-style.
 | |
|     "",  # Empty filename.
 | |
| ]
 | |
| 
 | |
| 
 | |
| @override_settings(
 | |
|     MEDIA_ROOT=MEDIA_ROOT, ROOT_URLCONF="file_uploads.urls", MIDDLEWARE=[]
 | |
| )
 | |
| class FileUploadTests(TestCase):
 | |
|     @classmethod
 | |
|     def setUpClass(cls):
 | |
|         super().setUpClass()
 | |
|         os.makedirs(MEDIA_ROOT, exist_ok=True)
 | |
|         cls.addClassCleanup(shutil.rmtree, MEDIA_ROOT)
 | |
| 
 | |
|     def test_upload_name_is_validated(self):
 | |
|         candidates = [
 | |
|             "/tmp/",
 | |
|             "/tmp/..",
 | |
|             "/tmp/.",
 | |
|         ]
 | |
|         if sys.platform == "win32":
 | |
|             candidates.extend(
 | |
|                 [
 | |
|                     "c:\\tmp\\",
 | |
|                     "c:\\tmp\\..",
 | |
|                     "c:\\tmp\\.",
 | |
|                 ]
 | |
|             )
 | |
|         for file_name in candidates:
 | |
|             with self.subTest(file_name=file_name):
 | |
|                 self.assertRaises(SuspiciousFileOperation, UploadedFile, name=file_name)
 | |
| 
 | |
|     def test_simple_upload(self):
 | |
|         with open(__file__, "rb") as fp:
 | |
|             post_data = {
 | |
|                 "name": "Ringo",
 | |
|                 "file_field": fp,
 | |
|             }
 | |
|             response = self.client.post("/upload/", post_data)
 | |
|         self.assertEqual(response.status_code, 200)
 | |
| 
 | |
|     def test_large_upload(self):
 | |
|         file = tempfile.NamedTemporaryFile
 | |
|         with file(suffix=".file1") as file1, file(suffix=".file2") as file2:
 | |
|             file1.write(b"a" * (2**21))
 | |
|             file1.seek(0)
 | |
| 
 | |
|             file2.write(b"a" * (10 * 2**20))
 | |
|             file2.seek(0)
 | |
| 
 | |
|             post_data = {
 | |
|                 "name": "Ringo",
 | |
|                 "file_field1": file1,
 | |
|                 "file_field2": file2,
 | |
|             }
 | |
| 
 | |
|             for key in list(post_data):
 | |
|                 try:
 | |
|                     post_data[key + "_hash"] = hashlib.sha1(
 | |
|                         post_data[key].read()
 | |
|                     ).hexdigest()
 | |
|                     post_data[key].seek(0)
 | |
|                 except AttributeError:
 | |
|                     post_data[key + "_hash"] = hashlib.sha1(
 | |
|                         post_data[key].encode()
 | |
|                     ).hexdigest()
 | |
| 
 | |
|             response = self.client.post("/verify/", post_data)
 | |
| 
 | |
|             self.assertEqual(response.status_code, 200)
 | |
| 
 | |
|     def _test_base64_upload(self, content, encode=base64.b64encode):
 | |
|         payload = client.FakePayload(
 | |
|             "\r\n".join(
 | |
|                 [
 | |
|                     "--" + client.BOUNDARY,
 | |
|                     'Content-Disposition: form-data; name="file"; filename="test.txt"',
 | |
|                     "Content-Type: application/octet-stream",
 | |
|                     "Content-Transfer-Encoding: base64",
 | |
|                     "",
 | |
|                 ]
 | |
|             )
 | |
|         )
 | |
|         payload.write(b"\r\n" + encode(content.encode()) + b"\r\n")
 | |
|         payload.write("--" + client.BOUNDARY + "--\r\n")
 | |
|         r = {
 | |
|             "CONTENT_LENGTH": len(payload),
 | |
|             "CONTENT_TYPE": client.MULTIPART_CONTENT,
 | |
|             "PATH_INFO": "/echo_content/",
 | |
|             "REQUEST_METHOD": "POST",
 | |
|             "wsgi.input": payload,
 | |
|         }
 | |
|         response = self.client.request(**r)
 | |
|         self.assertEqual(response.json()["file"], content)
 | |
| 
 | |
|     def test_base64_upload(self):
 | |
|         self._test_base64_upload("This data will be transmitted base64-encoded.")
 | |
| 
 | |
|     def test_big_base64_upload(self):
 | |
|         self._test_base64_upload("Big data" * 68000)  # > 512Kb
 | |
| 
 | |
|     def test_big_base64_newlines_upload(self):
 | |
|         self._test_base64_upload("Big data" * 68000, encode=base64.encodebytes)
 | |
| 
 | |
|     def test_base64_invalid_upload(self):
 | |
|         payload = client.FakePayload(
 | |
|             "\r\n".join(
 | |
|                 [
 | |
|                     "--" + client.BOUNDARY,
 | |
|                     'Content-Disposition: form-data; name="file"; filename="test.txt"',
 | |
|                     "Content-Type: application/octet-stream",
 | |
|                     "Content-Transfer-Encoding: base64",
 | |
|                     "",
 | |
|                 ]
 | |
|             )
 | |
|         )
 | |
|         payload.write(b"\r\n!\r\n")
 | |
|         payload.write("--" + client.BOUNDARY + "--\r\n")
 | |
|         r = {
 | |
|             "CONTENT_LENGTH": len(payload),
 | |
|             "CONTENT_TYPE": client.MULTIPART_CONTENT,
 | |
|             "PATH_INFO": "/echo_content/",
 | |
|             "REQUEST_METHOD": "POST",
 | |
|             "wsgi.input": payload,
 | |
|         }
 | |
|         response = self.client.request(**r)
 | |
|         self.assertEqual(response.json()["file"], "")
 | |
| 
 | |
|     def test_unicode_file_name(self):
 | |
|         with sys_tempfile.TemporaryDirectory() as temp_dir:
 | |
|             # This file contains Chinese symbols and an accented char in the name.
 | |
|             with open(os.path.join(temp_dir, UNICODE_FILENAME), "w+b") as file1:
 | |
|                 file1.write(b"b" * (2**10))
 | |
|                 file1.seek(0)
 | |
|                 response = self.client.post("/unicode_name/", {"file_unicode": file1})
 | |
|             self.assertEqual(response.status_code, 200)
 | |
| 
 | |
|     def test_unicode_file_name_rfc2231(self):
 | |
|         """
 | |
|         Receiving file upload when filename is encoded with RFC 2231.
 | |
|         """
 | |
|         payload = client.FakePayload()
 | |
|         payload.write(
 | |
|             "\r\n".join(
 | |
|                 [
 | |
|                     "--" + client.BOUNDARY,
 | |
|                     'Content-Disposition: form-data; name="file_unicode"; '
 | |
|                     "filename*=UTF-8''%s" % quote(UNICODE_FILENAME),
 | |
|                     "Content-Type: application/octet-stream",
 | |
|                     "",
 | |
|                     "You got pwnd.\r\n",
 | |
|                     "\r\n--" + client.BOUNDARY + "--\r\n",
 | |
|                 ]
 | |
|             )
 | |
|         )
 | |
| 
 | |
|         r = {
 | |
|             "CONTENT_LENGTH": len(payload),
 | |
|             "CONTENT_TYPE": client.MULTIPART_CONTENT,
 | |
|             "PATH_INFO": "/unicode_name/",
 | |
|             "REQUEST_METHOD": "POST",
 | |
|             "wsgi.input": payload,
 | |
|         }
 | |
|         response = self.client.request(**r)
 | |
|         self.assertEqual(response.status_code, 200)
 | |
| 
 | |
|     def test_unicode_name_rfc2231(self):
 | |
|         """
 | |
|         Receiving file upload when filename is encoded with RFC 2231.
 | |
|         """
 | |
|         payload = client.FakePayload()
 | |
|         payload.write(
 | |
|             "\r\n".join(
 | |
|                 [
 | |
|                     "--" + client.BOUNDARY,
 | |
|                     "Content-Disposition: form-data; name*=UTF-8''file_unicode; "
 | |
|                     "filename*=UTF-8''%s" % quote(UNICODE_FILENAME),
 | |
|                     "Content-Type: application/octet-stream",
 | |
|                     "",
 | |
|                     "You got pwnd.\r\n",
 | |
|                     "\r\n--" + client.BOUNDARY + "--\r\n",
 | |
|                 ]
 | |
|             )
 | |
|         )
 | |
| 
 | |
|         r = {
 | |
|             "CONTENT_LENGTH": len(payload),
 | |
|             "CONTENT_TYPE": client.MULTIPART_CONTENT,
 | |
|             "PATH_INFO": "/unicode_name/",
 | |
|             "REQUEST_METHOD": "POST",
 | |
|             "wsgi.input": payload,
 | |
|         }
 | |
|         response = self.client.request(**r)
 | |
|         self.assertEqual(response.status_code, 200)
 | |
| 
 | |
|     def test_unicode_file_name_rfc2231_with_double_quotes(self):
 | |
|         payload = client.FakePayload()
 | |
|         payload.write(
 | |
|             "\r\n".join(
 | |
|                 [
 | |
|                     "--" + client.BOUNDARY,
 | |
|                     'Content-Disposition: form-data; name="file_unicode"; '
 | |
|                     "filename*=\"UTF-8''%s\"" % quote(UNICODE_FILENAME),
 | |
|                     "Content-Type: application/octet-stream",
 | |
|                     "",
 | |
|                     "You got pwnd.\r\n",
 | |
|                     "\r\n--" + client.BOUNDARY + "--\r\n",
 | |
|                 ]
 | |
|             )
 | |
|         )
 | |
|         r = {
 | |
|             "CONTENT_LENGTH": len(payload),
 | |
|             "CONTENT_TYPE": client.MULTIPART_CONTENT,
 | |
|             "PATH_INFO": "/unicode_name/",
 | |
|             "REQUEST_METHOD": "POST",
 | |
|             "wsgi.input": payload,
 | |
|         }
 | |
|         response = self.client.request(**r)
 | |
|         self.assertEqual(response.status_code, 200)
 | |
| 
 | |
|     def test_unicode_name_rfc2231_with_double_quotes(self):
 | |
|         payload = client.FakePayload()
 | |
|         payload.write(
 | |
|             "\r\n".join(
 | |
|                 [
 | |
|                     "--" + client.BOUNDARY,
 | |
|                     "Content-Disposition: form-data; name*=\"UTF-8''file_unicode\"; "
 | |
|                     "filename*=\"UTF-8''%s\"" % quote(UNICODE_FILENAME),
 | |
|                     "Content-Type: application/octet-stream",
 | |
|                     "",
 | |
|                     "You got pwnd.\r\n",
 | |
|                     "\r\n--" + client.BOUNDARY + "--\r\n",
 | |
|                 ]
 | |
|             )
 | |
|         )
 | |
|         r = {
 | |
|             "CONTENT_LENGTH": len(payload),
 | |
|             "CONTENT_TYPE": client.MULTIPART_CONTENT,
 | |
|             "PATH_INFO": "/unicode_name/",
 | |
|             "REQUEST_METHOD": "POST",
 | |
|             "wsgi.input": payload,
 | |
|         }
 | |
|         response = self.client.request(**r)
 | |
|         self.assertEqual(response.status_code, 200)
 | |
| 
 | |
|     def test_blank_filenames(self):
 | |
|         """
 | |
|         Receiving file upload when filename is blank (before and after
 | |
|         sanitization) should be okay.
 | |
|         """
 | |
|         filenames = [
 | |
|             "",
 | |
|             # Normalized by MultiPartParser.IE_sanitize().
 | |
|             "C:\\Windows\\",
 | |
|             # Normalized by os.path.basename().
 | |
|             "/",
 | |
|             "ends-with-slash/",
 | |
|         ]
 | |
|         payload = client.FakePayload()
 | |
|         for i, name in enumerate(filenames):
 | |
|             payload.write(
 | |
|                 "\r\n".join(
 | |
|                     [
 | |
|                         "--" + client.BOUNDARY,
 | |
|                         'Content-Disposition: form-data; name="file%s"; filename="%s"'
 | |
|                         % (i, name),
 | |
|                         "Content-Type: application/octet-stream",
 | |
|                         "",
 | |
|                         "You got pwnd.\r\n",
 | |
|                     ]
 | |
|                 )
 | |
|             )
 | |
|         payload.write("\r\n--" + client.BOUNDARY + "--\r\n")
 | |
| 
 | |
|         r = {
 | |
|             "CONTENT_LENGTH": len(payload),
 | |
|             "CONTENT_TYPE": client.MULTIPART_CONTENT,
 | |
|             "PATH_INFO": "/echo/",
 | |
|             "REQUEST_METHOD": "POST",
 | |
|             "wsgi.input": payload,
 | |
|         }
 | |
|         response = self.client.request(**r)
 | |
|         self.assertEqual(response.status_code, 200)
 | |
| 
 | |
|         # Empty filenames should be ignored
 | |
|         received = response.json()
 | |
|         for i, name in enumerate(filenames):
 | |
|             self.assertIsNone(received.get("file%s" % i))
 | |
| 
 | |
|     def test_non_printable_chars_in_file_names(self):
 | |
|         file_name = "non-\x00printable\x00\n_chars.txt\x00"
 | |
|         payload = client.FakePayload()
 | |
|         payload.write(
 | |
|             "\r\n".join(
 | |
|                 [
 | |
|                     "--" + client.BOUNDARY,
 | |
|                     f'Content-Disposition: form-data; name="file"; '
 | |
|                     f'filename="{file_name}"',
 | |
|                     "Content-Type: application/octet-stream",
 | |
|                     "",
 | |
|                     "You got pwnd.\r\n",
 | |
|                 ]
 | |
|             )
 | |
|         )
 | |
|         payload.write("\r\n--" + client.BOUNDARY + "--\r\n")
 | |
|         r = {
 | |
|             "CONTENT_LENGTH": len(payload),
 | |
|             "CONTENT_TYPE": client.MULTIPART_CONTENT,
 | |
|             "PATH_INFO": "/echo/",
 | |
|             "REQUEST_METHOD": "POST",
 | |
|             "wsgi.input": payload,
 | |
|         }
 | |
|         response = self.client.request(**r)
 | |
|         # Non-printable chars are sanitized.
 | |
|         received = response.json()
 | |
|         self.assertEqual(received["file"], "non-printable_chars.txt")
 | |
| 
 | |
|     def test_dangerous_file_names(self):
 | |
|         """Uploaded file names should be sanitized before ever reaching the view."""
 | |
|         # This test simulates possible directory traversal attacks by a
 | |
|         # malicious uploader We have to do some monkeybusiness here to construct
 | |
|         # a malicious payload with an invalid file name (containing os.sep or
 | |
|         # os.pardir). This similar to what an attacker would need to do when
 | |
|         # trying such an attack.
 | |
|         payload = client.FakePayload()
 | |
|         for i, name in enumerate(CANDIDATE_TRAVERSAL_FILE_NAMES):
 | |
|             payload.write(
 | |
|                 "\r\n".join(
 | |
|                     [
 | |
|                         "--" + client.BOUNDARY,
 | |
|                         'Content-Disposition: form-data; name="file%s"; filename="%s"'
 | |
|                         % (i, name),
 | |
|                         "Content-Type: application/octet-stream",
 | |
|                         "",
 | |
|                         "You got pwnd.\r\n",
 | |
|                     ]
 | |
|                 )
 | |
|             )
 | |
|         payload.write("\r\n--" + client.BOUNDARY + "--\r\n")
 | |
| 
 | |
|         r = {
 | |
|             "CONTENT_LENGTH": len(payload),
 | |
|             "CONTENT_TYPE": client.MULTIPART_CONTENT,
 | |
|             "PATH_INFO": "/echo/",
 | |
|             "REQUEST_METHOD": "POST",
 | |
|             "wsgi.input": payload,
 | |
|         }
 | |
|         response = self.client.request(**r)
 | |
|         # The filenames should have been sanitized by the time it got to the view.
 | |
|         received = response.json()
 | |
|         for i, name in enumerate(CANDIDATE_TRAVERSAL_FILE_NAMES):
 | |
|             got = received["file%s" % i]
 | |
|             self.assertEqual(got, "hax0rd.txt")
 | |
| 
 | |
|     def test_filename_overflow(self):
 | |
|         """File names over 256 characters (dangerous on some platforms) get fixed up."""
 | |
|         long_str = "f" * 300
 | |
|         cases = [
 | |
|             # field name, filename, expected
 | |
|             ("long_filename", "%s.txt" % long_str, "%s.txt" % long_str[:251]),
 | |
|             ("long_extension", "foo.%s" % long_str, ".%s" % long_str[:254]),
 | |
|             ("no_extension", long_str, long_str[:255]),
 | |
|             ("no_filename", ".%s" % long_str, ".%s" % long_str[:254]),
 | |
|             ("long_everything", "%s.%s" % (long_str, long_str), ".%s" % long_str[:254]),
 | |
|         ]
 | |
|         payload = client.FakePayload()
 | |
|         for name, filename, _ in cases:
 | |
|             payload.write(
 | |
|                 "\r\n".join(
 | |
|                     [
 | |
|                         "--" + client.BOUNDARY,
 | |
|                         'Content-Disposition: form-data; name="{}"; filename="{}"',
 | |
|                         "Content-Type: application/octet-stream",
 | |
|                         "",
 | |
|                         "Oops.",
 | |
|                         "",
 | |
|                     ]
 | |
|                 ).format(name, filename)
 | |
|             )
 | |
|         payload.write("\r\n--" + client.BOUNDARY + "--\r\n")
 | |
|         r = {
 | |
|             "CONTENT_LENGTH": len(payload),
 | |
|             "CONTENT_TYPE": client.MULTIPART_CONTENT,
 | |
|             "PATH_INFO": "/echo/",
 | |
|             "REQUEST_METHOD": "POST",
 | |
|             "wsgi.input": payload,
 | |
|         }
 | |
|         response = self.client.request(**r)
 | |
|         result = response.json()
 | |
|         for name, _, expected in cases:
 | |
|             got = result[name]
 | |
|             self.assertEqual(expected, got, "Mismatch for {}".format(name))
 | |
|             self.assertLess(
 | |
|                 len(got), 256, "Got a long file name (%s characters)." % len(got)
 | |
|             )
 | |
| 
 | |
|     def test_file_content(self):
 | |
|         file = tempfile.NamedTemporaryFile
 | |
|         with (
 | |
|             file(suffix=".ctype_extra") as no_content_type,
 | |
|             file(suffix=".ctype_extra") as simple_file,
 | |
|         ):
 | |
|             no_content_type.write(b"no content")
 | |
|             no_content_type.seek(0)
 | |
| 
 | |
|             simple_file.write(b"text content")
 | |
|             simple_file.seek(0)
 | |
|             simple_file.content_type = "text/plain"
 | |
| 
 | |
|             string_io = StringIO("string content")
 | |
|             bytes_io = BytesIO(b"binary content")
 | |
| 
 | |
|             response = self.client.post(
 | |
|                 "/echo_content/",
 | |
|                 {
 | |
|                     "no_content_type": no_content_type,
 | |
|                     "simple_file": simple_file,
 | |
|                     "string": string_io,
 | |
|                     "binary": bytes_io,
 | |
|                 },
 | |
|             )
 | |
|             received = response.json()
 | |
|             self.assertEqual(received["no_content_type"], "no content")
 | |
|             self.assertEqual(received["simple_file"], "text content")
 | |
|             self.assertEqual(received["string"], "string content")
 | |
|             self.assertEqual(received["binary"], "binary content")
 | |
| 
 | |
|     def test_content_type_extra(self):
 | |
|         """Uploaded files may have content type parameters available."""
 | |
|         file = tempfile.NamedTemporaryFile
 | |
|         with (
 | |
|             file(suffix=".ctype_extra") as no_content_type,
 | |
|             file(suffix=".ctype_extra") as simple_file,
 | |
|         ):
 | |
|             no_content_type.write(b"something")
 | |
|             no_content_type.seek(0)
 | |
| 
 | |
|             simple_file.write(b"something")
 | |
|             simple_file.seek(0)
 | |
|             simple_file.content_type = "text/plain; test-key=test_value"
 | |
| 
 | |
|             response = self.client.post(
 | |
|                 "/echo_content_type_extra/",
 | |
|                 {
 | |
|                     "no_content_type": no_content_type,
 | |
|                     "simple_file": simple_file,
 | |
|                 },
 | |
|             )
 | |
|             received = response.json()
 | |
|             self.assertEqual(received["no_content_type"], {})
 | |
|             self.assertEqual(received["simple_file"], {"test-key": "test_value"})
 | |
| 
 | |
|     def test_truncated_multipart_handled_gracefully(self):
 | |
|         """
 | |
|         If passed an incomplete multipart message, MultiPartParser does not
 | |
|         attempt to read beyond the end of the stream, and simply will handle
 | |
|         the part that can be parsed gracefully.
 | |
|         """
 | |
|         payload_str = "\r\n".join(
 | |
|             [
 | |
|                 "--" + client.BOUNDARY,
 | |
|                 'Content-Disposition: form-data; name="file"; filename="foo.txt"',
 | |
|                 "Content-Type: application/octet-stream",
 | |
|                 "",
 | |
|                 "file contents" "--" + client.BOUNDARY + "--",
 | |
|                 "",
 | |
|             ]
 | |
|         )
 | |
|         payload = client.FakePayload(payload_str[:-10])
 | |
|         r = {
 | |
|             "CONTENT_LENGTH": len(payload),
 | |
|             "CONTENT_TYPE": client.MULTIPART_CONTENT,
 | |
|             "PATH_INFO": "/echo/",
 | |
|             "REQUEST_METHOD": "POST",
 | |
|             "wsgi.input": payload,
 | |
|         }
 | |
|         self.assertEqual(self.client.request(**r).json(), {})
 | |
| 
 | |
|     def test_empty_multipart_handled_gracefully(self):
 | |
|         """
 | |
|         If passed an empty multipart message, MultiPartParser will return
 | |
|         an empty QueryDict.
 | |
|         """
 | |
|         r = {
 | |
|             "CONTENT_LENGTH": 0,
 | |
|             "CONTENT_TYPE": client.MULTIPART_CONTENT,
 | |
|             "PATH_INFO": "/echo/",
 | |
|             "REQUEST_METHOD": "POST",
 | |
|             "wsgi.input": client.FakePayload(b""),
 | |
|         }
 | |
|         self.assertEqual(self.client.request(**r).json(), {})
 | |
| 
 | |
|     def test_custom_upload_handler(self):
 | |
|         file = tempfile.NamedTemporaryFile
 | |
|         with file() as smallfile, file() as bigfile:
 | |
|             # A small file (under the 5M quota)
 | |
|             smallfile.write(b"a" * (2**21))
 | |
|             smallfile.seek(0)
 | |
| 
 | |
|             # A big file (over the quota)
 | |
|             bigfile.write(b"a" * (10 * 2**20))
 | |
|             bigfile.seek(0)
 | |
| 
 | |
|             # Small file posting should work.
 | |
|             self.assertIn("f", self.client.post("/quota/", {"f": smallfile}).json())
 | |
| 
 | |
|             # Large files don't go through.
 | |
|             self.assertNotIn("f", self.client.post("/quota/", {"f": bigfile}).json())
 | |
| 
 | |
|     def test_broken_custom_upload_handler(self):
 | |
|         with tempfile.NamedTemporaryFile() as file:
 | |
|             file.write(b"a" * (2**21))
 | |
|             file.seek(0)
 | |
| 
 | |
|             msg = (
 | |
|                 "You cannot alter upload handlers after the upload has been processed."
 | |
|             )
 | |
|             with self.assertRaisesMessage(AttributeError, msg):
 | |
|                 self.client.post("/quota/broken/", {"f": file})
 | |
| 
 | |
|     def test_stop_upload_temporary_file_handler(self):
 | |
|         with tempfile.NamedTemporaryFile() as temp_file:
 | |
|             temp_file.write(b"a")
 | |
|             temp_file.seek(0)
 | |
|             response = self.client.post("/temp_file/stop_upload/", {"file": temp_file})
 | |
|             temp_path = response.json()["temp_path"]
 | |
|             self.assertIs(os.path.exists(temp_path), False)
 | |
| 
 | |
|     def test_upload_interrupted_temporary_file_handler(self):
 | |
|         # Simulate an interrupted upload by omitting the closing boundary.
 | |
|         class MockedParser(Parser):
 | |
|             def __iter__(self):
 | |
|                 for item in super().__iter__():
 | |
|                     item_type, meta_data, field_stream = item
 | |
|                     yield item_type, meta_data, field_stream
 | |
|                     if item_type == FILE:
 | |
|                         return
 | |
| 
 | |
|         with tempfile.NamedTemporaryFile() as temp_file:
 | |
|             temp_file.write(b"a")
 | |
|             temp_file.seek(0)
 | |
|             with mock.patch(
 | |
|                 "django.http.multipartparser.Parser",
 | |
|                 MockedParser,
 | |
|             ):
 | |
|                 response = self.client.post(
 | |
|                     "/temp_file/upload_interrupted/",
 | |
|                     {"file": temp_file},
 | |
|                 )
 | |
|             temp_path = response.json()["temp_path"]
 | |
|             self.assertIs(os.path.exists(temp_path), False)
 | |
| 
 | |
|     def test_upload_large_header_fields(self):
 | |
|         payload = client.FakePayload(
 | |
|             "\r\n".join(
 | |
|                 [
 | |
|                     "--" + client.BOUNDARY,
 | |
|                     'Content-Disposition: form-data; name="my_file"; '
 | |
|                     'filename="test.txt"',
 | |
|                     "Content-Type: text/plain",
 | |
|                     "X-Long-Header: %s" % ("-" * 500),
 | |
|                     "",
 | |
|                     "file contents",
 | |
|                     "--" + client.BOUNDARY + "--\r\n",
 | |
|                 ]
 | |
|             ),
 | |
|         )
 | |
|         r = {
 | |
|             "CONTENT_LENGTH": len(payload),
 | |
|             "CONTENT_TYPE": client.MULTIPART_CONTENT,
 | |
|             "PATH_INFO": "/echo_content/",
 | |
|             "REQUEST_METHOD": "POST",
 | |
|             "wsgi.input": payload,
 | |
|         }
 | |
|         response = self.client.request(**r)
 | |
|         self.assertEqual(response.status_code, 200)
 | |
|         self.assertEqual(response.json(), {"my_file": "file contents"})
 | |
| 
 | |
|     def test_upload_header_fields_too_large(self):
 | |
|         payload = client.FakePayload(
 | |
|             "\r\n".join(
 | |
|                 [
 | |
|                     "--" + client.BOUNDARY,
 | |
|                     'Content-Disposition: form-data; name="my_file"; '
 | |
|                     'filename="test.txt"',
 | |
|                     "Content-Type: text/plain",
 | |
|                     "X-Long-Header: %s" % ("-" * (MAX_TOTAL_HEADER_SIZE + 1)),
 | |
|                     "",
 | |
|                     "file contents",
 | |
|                     "--" + client.BOUNDARY + "--\r\n",
 | |
|                 ]
 | |
|             ),
 | |
|         )
 | |
|         r = {
 | |
|             "CONTENT_LENGTH": len(payload),
 | |
|             "CONTENT_TYPE": client.MULTIPART_CONTENT,
 | |
|             "PATH_INFO": "/echo_content/",
 | |
|             "REQUEST_METHOD": "POST",
 | |
|             "wsgi.input": payload,
 | |
|         }
 | |
|         response = self.client.request(**r)
 | |
|         self.assertEqual(response.status_code, 400)
 | |
| 
 | |
|     def test_fileupload_getlist(self):
 | |
|         file = tempfile.NamedTemporaryFile
 | |
|         with file() as file1, file() as file2, file() as file2a:
 | |
|             file1.write(b"a" * (2**23))
 | |
|             file1.seek(0)
 | |
| 
 | |
|             file2.write(b"a" * (2 * 2**18))
 | |
|             file2.seek(0)
 | |
| 
 | |
|             file2a.write(b"a" * (5 * 2**20))
 | |
|             file2a.seek(0)
 | |
| 
 | |
|             response = self.client.post(
 | |
|                 "/getlist_count/",
 | |
|                 {
 | |
|                     "file1": file1,
 | |
|                     "field1": "test",
 | |
|                     "field2": "test3",
 | |
|                     "field3": "test5",
 | |
|                     "field4": "test6",
 | |
|                     "field5": "test7",
 | |
|                     "file2": (file2, file2a),
 | |
|                 },
 | |
|             )
 | |
|             got = response.json()
 | |
|             self.assertEqual(got.get("file1"), 1)
 | |
|             self.assertEqual(got.get("file2"), 2)
 | |
| 
 | |
|     def test_fileuploads_closed_at_request_end(self):
 | |
|         file = tempfile.NamedTemporaryFile
 | |
|         with file() as f1, file() as f2a, file() as f2b:
 | |
|             response = self.client.post(
 | |
|                 "/fd_closing/t/",
 | |
|                 {
 | |
|                     "file": f1,
 | |
|                     "file2": (f2a, f2b),
 | |
|                 },
 | |
|             )
 | |
| 
 | |
|         request = response.wsgi_request
 | |
|         # The files were parsed.
 | |
|         self.assertTrue(hasattr(request, "_files"))
 | |
| 
 | |
|         file = request._files["file"]
 | |
|         self.assertTrue(file.closed)
 | |
| 
 | |
|         files = request._files.getlist("file2")
 | |
|         self.assertTrue(files[0].closed)
 | |
|         self.assertTrue(files[1].closed)
 | |
| 
 | |
|     def test_no_parsing_triggered_by_fd_closing(self):
 | |
|         file = tempfile.NamedTemporaryFile
 | |
|         with file() as f1, file() as f2a, file() as f2b:
 | |
|             response = self.client.post(
 | |
|                 "/fd_closing/f/",
 | |
|                 {
 | |
|                     "file": f1,
 | |
|                     "file2": (f2a, f2b),
 | |
|                 },
 | |
|             )
 | |
| 
 | |
|         request = response.wsgi_request
 | |
|         # The fd closing logic doesn't trigger parsing of the stream
 | |
|         self.assertFalse(hasattr(request, "_files"))
 | |
| 
 | |
|     def test_file_error_blocking(self):
 | |
|         """
 | |
|         The server should not block when there are upload errors (bug #8622).
 | |
|         This can happen if something -- i.e. an exception handler -- tries to
 | |
|         access POST while handling an error in parsing POST. This shouldn't
 | |
|         cause an infinite loop!
 | |
|         """
 | |
| 
 | |
|         class POSTAccessingHandler(client.ClientHandler):
 | |
|             """A handler that'll access POST during an exception."""
 | |
| 
 | |
|             def handle_uncaught_exception(self, request, resolver, exc_info):
 | |
|                 ret = super().handle_uncaught_exception(request, resolver, exc_info)
 | |
|                 request.POST  # evaluate
 | |
|                 return ret
 | |
| 
 | |
|         # Maybe this is a little more complicated that it needs to be; but if
 | |
|         # the django.test.client.FakePayload.read() implementation changes then
 | |
|         # this test would fail.  So we need to know exactly what kind of error
 | |
|         # it raises when there is an attempt to read more than the available bytes:
 | |
|         try:
 | |
|             client.FakePayload(b"a").read(2)
 | |
|         except Exception as err:
 | |
|             reference_error = err
 | |
| 
 | |
|         # install the custom handler that tries to access request.POST
 | |
|         self.client.handler = POSTAccessingHandler()
 | |
| 
 | |
|         with open(__file__, "rb") as fp:
 | |
|             post_data = {
 | |
|                 "name": "Ringo",
 | |
|                 "file_field": fp,
 | |
|             }
 | |
|             try:
 | |
|                 self.client.post("/upload_errors/", post_data)
 | |
|             except reference_error.__class__ as err:
 | |
|                 self.assertNotEqual(
 | |
|                     str(err),
 | |
|                     str(reference_error),
 | |
|                     "Caught a repeated exception that'll cause an infinite loop in "
 | |
|                     "file uploads.",
 | |
|                 )
 | |
|             except Exception as err:
 | |
|                 # CustomUploadError is the error that should have been raised
 | |
|                 self.assertEqual(err.__class__, uploadhandler.CustomUploadError)
 | |
| 
 | |
|     def test_filename_case_preservation(self):
 | |
|         """
 | |
|         The storage backend shouldn't mess with the case of the filenames
 | |
|         uploaded.
 | |
|         """
 | |
|         # Synthesize the contents of a file upload with a mixed case filename
 | |
|         # so we don't have to carry such a file in the Django tests source code
 | |
|         # tree.
 | |
|         vars = {"boundary": "oUrBoUnDaRyStRiNg"}
 | |
|         post_data = [
 | |
|             "--%(boundary)s",
 | |
|             'Content-Disposition: form-data; name="file_field"; '
 | |
|             'filename="MiXeD_cAsE.txt"',
 | |
|             "Content-Type: application/octet-stream",
 | |
|             "",
 | |
|             "file contents\n",
 | |
|             "--%(boundary)s--\r\n",
 | |
|         ]
 | |
|         response = self.client.post(
 | |
|             "/filename_case/",
 | |
|             "\r\n".join(post_data) % vars,
 | |
|             "multipart/form-data; boundary=%(boundary)s" % vars,
 | |
|         )
 | |
|         self.assertEqual(response.status_code, 200)
 | |
|         id = int(response.content)
 | |
|         obj = FileModel.objects.get(pk=id)
 | |
|         # The name of the file uploaded and the file stored in the server-side
 | |
|         # shouldn't differ.
 | |
|         self.assertEqual(os.path.basename(obj.testfile.path), "MiXeD_cAsE.txt")
 | |
| 
 | |
|     def test_filename_traversal_upload(self):
 | |
|         os.makedirs(UPLOAD_TO, exist_ok=True)
 | |
|         tests = [
 | |
|             "../test.txt",
 | |
|             "../test.txt",
 | |
|         ]
 | |
|         for file_name in tests:
 | |
|             with self.subTest(file_name=file_name):
 | |
|                 payload = client.FakePayload()
 | |
|                 payload.write(
 | |
|                     "\r\n".join(
 | |
|                         [
 | |
|                             "--" + client.BOUNDARY,
 | |
|                             'Content-Disposition: form-data; name="my_file"; '
 | |
|                             'filename="%s";' % file_name,
 | |
|                             "Content-Type: text/plain",
 | |
|                             "",
 | |
|                             "file contents.\r\n",
 | |
|                             "\r\n--" + client.BOUNDARY + "--\r\n",
 | |
|                         ]
 | |
|                     ),
 | |
|                 )
 | |
|                 r = {
 | |
|                     "CONTENT_LENGTH": len(payload),
 | |
|                     "CONTENT_TYPE": client.MULTIPART_CONTENT,
 | |
|                     "PATH_INFO": "/upload_traversal/",
 | |
|                     "REQUEST_METHOD": "POST",
 | |
|                     "wsgi.input": payload,
 | |
|                 }
 | |
|                 response = self.client.request(**r)
 | |
|                 result = response.json()
 | |
|                 self.assertEqual(response.status_code, 200)
 | |
|                 self.assertEqual(result["file_name"], "test.txt")
 | |
|                 self.assertIs(
 | |
|                     os.path.exists(os.path.join(MEDIA_ROOT, "test.txt")),
 | |
|                     False,
 | |
|                 )
 | |
|                 self.assertIs(
 | |
|                     os.path.exists(os.path.join(UPLOAD_TO, "test.txt")),
 | |
|                     True,
 | |
|                 )
 | |
| 
 | |
| 
 | |
| @override_settings(MEDIA_ROOT=MEDIA_ROOT)
 | |
| class DirectoryCreationTests(SimpleTestCase):
 | |
|     """
 | |
|     Tests for error handling during directory creation
 | |
|     via _save_FIELD_file (ticket #6450)
 | |
|     """
 | |
| 
 | |
|     @classmethod
 | |
|     def setUpClass(cls):
 | |
|         super().setUpClass()
 | |
|         os.makedirs(MEDIA_ROOT, exist_ok=True)
 | |
|         cls.addClassCleanup(shutil.rmtree, MEDIA_ROOT)
 | |
| 
 | |
|     def setUp(self):
 | |
|         self.obj = FileModel()
 | |
| 
 | |
|     @unittest.skipIf(
 | |
|         sys.platform == "win32", "Python on Windows doesn't have working os.chmod()."
 | |
|     )
 | |
|     @override_settings(
 | |
|         STORAGES={
 | |
|             DEFAULT_STORAGE_ALIAS: {
 | |
|                 "BACKEND": "django.core.files.storage.FileSystemStorage",
 | |
|             }
 | |
|         }
 | |
|     )
 | |
|     def test_readonly_root(self):
 | |
|         """Permission errors are not swallowed"""
 | |
|         os.chmod(MEDIA_ROOT, 0o500)
 | |
|         self.addCleanup(os.chmod, MEDIA_ROOT, 0o700)
 | |
|         with self.assertRaises(PermissionError):
 | |
|             self.obj.testfile.save(
 | |
|                 "foo.txt", SimpleUploadedFile("foo.txt", b"x"), save=False
 | |
|             )
 | |
| 
 | |
|     def test_not_a_directory(self):
 | |
|         default_storage.delete(UPLOAD_TO)
 | |
|         # Create a file with the upload directory name
 | |
|         with SimpleUploadedFile(UPLOAD_TO, b"x") as file:
 | |
|             default_storage.save(UPLOAD_TO, file)
 | |
|         self.addCleanup(default_storage.delete, UPLOAD_TO)
 | |
|         msg = "%s exists and is not a directory." % UPLOAD_TO
 | |
|         with self.assertRaisesMessage(FileExistsError, msg):
 | |
|             with SimpleUploadedFile("foo.txt", b"x") as file:
 | |
|                 self.obj.testfile.save("foo.txt", file, save=False)
 | |
| 
 | |
| 
 | |
| class MultiParserTests(SimpleTestCase):
 | |
|     def test_empty_upload_handlers(self):
 | |
|         # We're not actually parsing here; just checking if the parser properly
 | |
|         # instantiates with empty upload handlers.
 | |
|         MultiPartParser(
 | |
|             {
 | |
|                 "CONTENT_TYPE": "multipart/form-data; boundary=_foo",
 | |
|                 "CONTENT_LENGTH": "1",
 | |
|             },
 | |
|             StringIO("x"),
 | |
|             [],
 | |
|             "utf-8",
 | |
|         )
 | |
| 
 | |
|     def test_invalid_content_type(self):
 | |
|         with self.assertRaisesMessage(
 | |
|             MultiPartParserError, "Invalid Content-Type: text/plain"
 | |
|         ):
 | |
|             MultiPartParser(
 | |
|                 {
 | |
|                     "CONTENT_TYPE": "text/plain",
 | |
|                     "CONTENT_LENGTH": "1",
 | |
|                 },
 | |
|                 StringIO("x"),
 | |
|                 [],
 | |
|                 "utf-8",
 | |
|             )
 | |
| 
 | |
|     def test_negative_content_length(self):
 | |
|         with self.assertRaisesMessage(
 | |
|             MultiPartParserError, "Invalid content length: -1"
 | |
|         ):
 | |
|             MultiPartParser(
 | |
|                 {
 | |
|                     "CONTENT_TYPE": "multipart/form-data; boundary=_foo",
 | |
|                     "CONTENT_LENGTH": -1,
 | |
|                 },
 | |
|                 StringIO("x"),
 | |
|                 [],
 | |
|                 "utf-8",
 | |
|             )
 | |
| 
 | |
|     def test_bad_type_content_length(self):
 | |
|         multipart_parser = MultiPartParser(
 | |
|             {
 | |
|                 "CONTENT_TYPE": "multipart/form-data; boundary=_foo",
 | |
|                 "CONTENT_LENGTH": "a",
 | |
|             },
 | |
|             StringIO("x"),
 | |
|             [],
 | |
|             "utf-8",
 | |
|         )
 | |
|         self.assertEqual(multipart_parser._content_length, 0)
 | |
| 
 | |
|     def test_sanitize_file_name(self):
 | |
|         parser = MultiPartParser(
 | |
|             {
 | |
|                 "CONTENT_TYPE": "multipart/form-data; boundary=_foo",
 | |
|                 "CONTENT_LENGTH": "1",
 | |
|             },
 | |
|             StringIO("x"),
 | |
|             [],
 | |
|             "utf-8",
 | |
|         )
 | |
|         for file_name in CANDIDATE_TRAVERSAL_FILE_NAMES:
 | |
|             with self.subTest(file_name=file_name):
 | |
|                 self.assertEqual(parser.sanitize_file_name(file_name), "hax0rd.txt")
 | |
| 
 | |
|     def test_sanitize_invalid_file_name(self):
 | |
|         parser = MultiPartParser(
 | |
|             {
 | |
|                 "CONTENT_TYPE": "multipart/form-data; boundary=_foo",
 | |
|                 "CONTENT_LENGTH": "1",
 | |
|             },
 | |
|             StringIO("x"),
 | |
|             [],
 | |
|             "utf-8",
 | |
|         )
 | |
|         for file_name in CANDIDATE_INVALID_FILE_NAMES:
 | |
|             with self.subTest(file_name=file_name):
 | |
|                 self.assertIsNone(parser.sanitize_file_name(file_name))
 |