mirror of
				https://github.com/django/django.git
				synced 2025-10-31 01:25:32 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			226 lines
		
	
	
		
			8.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			226 lines
		
	
	
		
			8.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import json
 | |
| import random
 | |
| from unittest import TestCase
 | |
| 
 | |
| from django.conf import settings
 | |
| from django.contrib.messages import Message, constants
 | |
| from django.contrib.messages.storage.cookie import (
 | |
|     CookieStorage,
 | |
|     MessageDecoder,
 | |
|     MessageEncoder,
 | |
|     bisect_keep_left,
 | |
|     bisect_keep_right,
 | |
| )
 | |
| from django.test import SimpleTestCase, override_settings
 | |
| from django.utils.crypto import get_random_string
 | |
| from django.utils.safestring import SafeData, mark_safe
 | |
| 
 | |
| from .base import BaseTests
 | |
| 
 | |
| 
 | |
| def set_cookie_data(storage, messages, invalid=False, encode_empty=False):
 | |
|     """
 | |
|     Set ``request.COOKIES`` with the encoded data and remove the storage
 | |
|     backend's loaded data cache.
 | |
|     """
 | |
|     encoded_data = storage._encode(messages, encode_empty=encode_empty)
 | |
|     if invalid:
 | |
|         # Truncate the first character so that the hash is invalid.
 | |
|         encoded_data = encoded_data[1:]
 | |
|     storage.request.COOKIES = {CookieStorage.cookie_name: encoded_data}
 | |
|     if hasattr(storage, "_loaded_data"):
 | |
|         del storage._loaded_data
 | |
| 
 | |
| 
 | |
| def stored_cookie_messages_count(storage, response):
 | |
|     """
 | |
|     Return an integer containing the number of messages stored.
 | |
|     """
 | |
|     # Get a list of cookies, excluding ones with a max-age of 0 (because
 | |
|     # they have been marked for deletion).
 | |
|     cookie = response.cookies.get(storage.cookie_name)
 | |
|     if not cookie or cookie["max-age"] == 0:
 | |
|         return 0
 | |
|     data = storage._decode(cookie.value)
 | |
|     if not data:
 | |
|         return 0
 | |
|     if data[-1] == CookieStorage.not_finished:
 | |
|         data.pop()
 | |
|     return len(data)
 | |
| 
 | |
| 
 | |
| @override_settings(
 | |
|     SESSION_COOKIE_DOMAIN=".example.com",
 | |
|     SESSION_COOKIE_SECURE=True,
 | |
|     SESSION_COOKIE_HTTPONLY=True,
 | |
| )
 | |
| class CookieTests(BaseTests, SimpleTestCase):
 | |
|     storage_class = CookieStorage
 | |
| 
 | |
|     def stored_messages_count(self, storage, response):
 | |
|         return stored_cookie_messages_count(storage, response)
 | |
| 
 | |
|     def encode_decode(self, *args, **kwargs):
 | |
|         storage = self.get_storage()
 | |
|         message = [Message(constants.DEBUG, *args, **kwargs)]
 | |
|         encoded = storage._encode(message)
 | |
|         return storage._decode(encoded)[0]
 | |
| 
 | |
|     def test_get(self):
 | |
|         storage = self.storage_class(self.get_request())
 | |
|         # Set initial data.
 | |
|         example_messages = ["test", "me"]
 | |
|         set_cookie_data(storage, example_messages)
 | |
|         # The message contains what's expected.
 | |
|         self.assertEqual(list(storage), example_messages)
 | |
| 
 | |
|     @override_settings(SESSION_COOKIE_SAMESITE="Strict")
 | |
|     def test_cookie_settings(self):
 | |
|         """
 | |
|         CookieStorage honors SESSION_COOKIE_DOMAIN, SESSION_COOKIE_SECURE, and
 | |
|         SESSION_COOKIE_HTTPONLY (#15618, #20972).
 | |
|         """
 | |
|         # Test before the messages have been consumed
 | |
|         storage = self.get_storage()
 | |
|         response = self.get_response()
 | |
|         storage.add(constants.INFO, "test")
 | |
|         storage.update(response)
 | |
|         messages = storage._decode(response.cookies["messages"].value)
 | |
|         self.assertEqual(len(messages), 1)
 | |
|         self.assertEqual(messages[0].message, "test")
 | |
|         self.assertEqual(response.cookies["messages"]["domain"], ".example.com")
 | |
|         self.assertEqual(response.cookies["messages"]["expires"], "")
 | |
|         self.assertIs(response.cookies["messages"]["secure"], True)
 | |
|         self.assertIs(response.cookies["messages"]["httponly"], True)
 | |
|         self.assertEqual(response.cookies["messages"]["samesite"], "Strict")
 | |
| 
 | |
|         # Deletion of the cookie (storing with an empty value) after the
 | |
|         # messages have been consumed.
 | |
|         storage = self.get_storage()
 | |
|         response = self.get_response()
 | |
|         storage.add(constants.INFO, "test")
 | |
|         for m in storage:
 | |
|             pass  # Iterate through the storage to simulate consumption of messages.
 | |
|         storage.update(response)
 | |
|         self.assertEqual(response.cookies["messages"].value, "")
 | |
|         self.assertEqual(response.cookies["messages"]["domain"], ".example.com")
 | |
|         self.assertEqual(
 | |
|             response.cookies["messages"]["expires"], "Thu, 01 Jan 1970 00:00:00 GMT"
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             response.cookies["messages"]["samesite"],
 | |
|             settings.SESSION_COOKIE_SAMESITE,
 | |
|         )
 | |
| 
 | |
|     def test_get_bad_cookie(self):
 | |
|         request = self.get_request()
 | |
|         storage = self.storage_class(request)
 | |
|         # Set initial (invalid) data.
 | |
|         example_messages = ["test", "me"]
 | |
|         set_cookie_data(storage, example_messages, invalid=True)
 | |
|         # The message actually contains what we expect.
 | |
|         self.assertEqual(list(storage), [])
 | |
| 
 | |
|     def test_max_cookie_length(self):
 | |
|         """
 | |
|         If the data exceeds what is allowed in a cookie, older messages are
 | |
|         removed before saving (and returned by the ``update`` method).
 | |
|         """
 | |
|         storage = self.get_storage()
 | |
|         response = self.get_response()
 | |
| 
 | |
|         # When storing as a cookie, the cookie has constant overhead of approx
 | |
|         # 54 chars, and each message has a constant overhead of about 37 chars
 | |
|         # and a variable overhead of zero in the best case. We aim for a message
 | |
|         # size which will fit 4 messages into the cookie, but not 5.
 | |
|         # See also FallbackTest.test_session_fallback
 | |
|         msg_size = int((CookieStorage.max_cookie_size - 54) / 4.5 - 37)
 | |
|         first_msg = None
 | |
|         # Generate the same (tested) content every time that does not get run
 | |
|         # through zlib compression.
 | |
|         random.seed(42)
 | |
|         for i in range(5):
 | |
|             msg = get_random_string(msg_size)
 | |
|             storage.add(constants.INFO, msg)
 | |
|             if i == 0:
 | |
|                 first_msg = msg
 | |
|         unstored_messages = storage.update(response)
 | |
| 
 | |
|         cookie_storing = self.stored_messages_count(storage, response)
 | |
|         self.assertEqual(cookie_storing, 4)
 | |
| 
 | |
|         self.assertEqual(len(unstored_messages), 1)
 | |
|         self.assertEqual(unstored_messages[0].message, first_msg)
 | |
| 
 | |
|     def test_message_rfc6265(self):
 | |
|         non_compliant_chars = ["\\", ",", ";", '"']
 | |
|         messages = ["\\te,st", ';m"e', "\u2019", '123"NOTRECEIVED"']
 | |
|         storage = self.get_storage()
 | |
|         encoded = storage._encode(messages)
 | |
|         for illegal in non_compliant_chars:
 | |
|             self.assertEqual(encoded.find(illegal), -1)
 | |
| 
 | |
|     def test_json_encoder_decoder(self):
 | |
|         """
 | |
|         A complex nested data structure containing Message
 | |
|         instances is properly encoded/decoded by the custom JSON
 | |
|         encoder/decoder classes.
 | |
|         """
 | |
|         messages = [
 | |
|             {
 | |
|                 "message": Message(constants.INFO, "Test message"),
 | |
|                 "message_list": [
 | |
|                     Message(constants.INFO, "message %s") for x in range(5)
 | |
|                 ]
 | |
|                 + [{"another-message": Message(constants.ERROR, "error")}],
 | |
|             },
 | |
|             Message(constants.INFO, "message %s"),
 | |
|         ]
 | |
|         encoder = MessageEncoder()
 | |
|         value = encoder.encode(messages)
 | |
|         decoded_messages = json.loads(value, cls=MessageDecoder)
 | |
|         self.assertEqual(messages, decoded_messages)
 | |
| 
 | |
|     def test_safedata(self):
 | |
|         """
 | |
|         A message containing SafeData is keeping its safe status when
 | |
|         retrieved from the message storage.
 | |
|         """
 | |
|         self.assertIsInstance(
 | |
|             self.encode_decode(mark_safe("<b>Hello Django!</b>")).message,
 | |
|             SafeData,
 | |
|         )
 | |
|         self.assertNotIsInstance(
 | |
|             self.encode_decode("<b>Hello Django!</b>").message,
 | |
|             SafeData,
 | |
|         )
 | |
| 
 | |
|     def test_extra_tags(self):
 | |
|         """
 | |
|         A message's extra_tags attribute is correctly preserved when retrieved
 | |
|         from the message storage.
 | |
|         """
 | |
|         for extra_tags in ["", None, "some tags"]:
 | |
|             with self.subTest(extra_tags=extra_tags):
 | |
|                 self.assertEqual(
 | |
|                     self.encode_decode("message", extra_tags=extra_tags).extra_tags,
 | |
|                     extra_tags,
 | |
|                 )
 | |
| 
 | |
| 
 | |
| class BisectTests(TestCase):
 | |
|     def test_bisect_keep_left(self):
 | |
|         self.assertEqual(bisect_keep_left([1, 1, 1], fn=lambda arr: sum(arr) != 2), 2)
 | |
|         self.assertEqual(bisect_keep_left([1, 1, 1], fn=lambda arr: sum(arr) != 0), 0)
 | |
|         self.assertEqual(bisect_keep_left([], fn=lambda arr: sum(arr) != 0), 0)
 | |
| 
 | |
|     def test_bisect_keep_right(self):
 | |
|         self.assertEqual(bisect_keep_right([1, 1, 1], fn=lambda arr: sum(arr) != 2), 1)
 | |
|         self.assertEqual(
 | |
|             bisect_keep_right([1, 1, 1, 1], fn=lambda arr: sum(arr) != 2), 2
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             bisect_keep_right([1, 1, 1, 1, 1], fn=lambda arr: sum(arr) != 1), 4
 | |
|         )
 | |
|         self.assertEqual(bisect_keep_right([], fn=lambda arr: sum(arr) != 0), 0)
 |