mirror of
				https://github.com/django/django.git
				synced 2025-10-30 17:16:10 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			385 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			385 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from django.contrib.messages import Message, constants, get_level, set_level
 | |
| from django.contrib.messages.api import MessageFailure
 | |
| from django.contrib.messages.constants import DEFAULT_LEVELS
 | |
| from django.contrib.messages.storage import default_storage
 | |
| from django.http import HttpRequest, HttpResponse
 | |
| from django.test import modify_settings, override_settings
 | |
| from django.urls import reverse
 | |
| from django.utils.translation import gettext_lazy
 | |
| 
 | |
| 
 | |
| def add_level_messages(storage):
 | |
|     """
 | |
|     Add 6 messages from different levels (including a custom one) to a storage
 | |
|     instance.
 | |
|     """
 | |
|     storage.add(constants.INFO, "A generic info message")
 | |
|     storage.add(29, "Some custom level")
 | |
|     storage.add(constants.DEBUG, "A debugging message", extra_tags="extra-tag")
 | |
|     storage.add(constants.WARNING, "A warning")
 | |
|     storage.add(constants.ERROR, "An error")
 | |
|     storage.add(constants.SUCCESS, "This was a triumph.")
 | |
| 
 | |
| 
 | |
| class BaseTests:
 | |
|     storage_class = default_storage
 | |
|     levels = {
 | |
|         "debug": constants.DEBUG,
 | |
|         "info": constants.INFO,
 | |
|         "success": constants.SUCCESS,
 | |
|         "warning": constants.WARNING,
 | |
|         "error": constants.ERROR,
 | |
|     }
 | |
| 
 | |
|     @classmethod
 | |
|     def setUpClass(cls):
 | |
|         cls.enterClassContext(
 | |
|             override_settings(
 | |
|                 TEMPLATES=[
 | |
|                     {
 | |
|                         "BACKEND": "django.template.backends.django.DjangoTemplates",
 | |
|                         "DIRS": [],
 | |
|                         "APP_DIRS": True,
 | |
|                         "OPTIONS": {
 | |
|                             "context_processors": (
 | |
|                                 "django.contrib.auth.context_processors.auth",
 | |
|                                 "django.contrib.messages.context_processors.messages",
 | |
|                             ),
 | |
|                         },
 | |
|                     }
 | |
|                 ],
 | |
|                 ROOT_URLCONF="messages_tests.urls",
 | |
|                 MESSAGE_TAGS={},
 | |
|                 MESSAGE_STORAGE=(
 | |
|                     f"{cls.storage_class.__module__}.{cls.storage_class.__name__}"
 | |
|                 ),
 | |
|                 SESSION_SERIALIZER="django.contrib.sessions.serializers.JSONSerializer",
 | |
|             )
 | |
|         )
 | |
|         super().setUpClass()
 | |
| 
 | |
|     def get_request(self):
 | |
|         return HttpRequest()
 | |
| 
 | |
|     def get_response(self):
 | |
|         return HttpResponse()
 | |
| 
 | |
|     def get_storage(self, data=None):
 | |
|         """
 | |
|         Return the storage backend, setting its loaded data to the ``data``
 | |
|         argument.
 | |
| 
 | |
|         This method avoids the storage ``_get`` method from getting called so
 | |
|         that other parts of the storage backend can be tested independent of
 | |
|         the message retrieval logic.
 | |
|         """
 | |
|         storage = self.storage_class(self.get_request())
 | |
|         storage._loaded_data = data or []
 | |
|         return storage
 | |
| 
 | |
|     def test_repr(self):
 | |
|         request = self.get_request()
 | |
|         storage = self.storage_class(request)
 | |
|         self.assertEqual(
 | |
|             repr(storage),
 | |
|             f"<{self.storage_class.__qualname__}: request=<HttpRequest>>",
 | |
|         )
 | |
| 
 | |
|     def test_add(self):
 | |
|         storage = self.get_storage()
 | |
|         self.assertFalse(storage.added_new)
 | |
|         storage.add(constants.INFO, "Test message 1")
 | |
|         self.assertTrue(storage.added_new)
 | |
|         storage.add(constants.INFO, "Test message 2", extra_tags="tag")
 | |
|         self.assertEqual(len(storage), 2)
 | |
| 
 | |
|     def test_add_lazy_translation(self):
 | |
|         storage = self.get_storage()
 | |
|         response = self.get_response()
 | |
| 
 | |
|         storage.add(constants.INFO, gettext_lazy("lazy message"))
 | |
|         storage.update(response)
 | |
| 
 | |
|         storing = self.stored_messages_count(storage, response)
 | |
|         self.assertEqual(storing, 1)
 | |
| 
 | |
|     def test_no_update(self):
 | |
|         storage = self.get_storage()
 | |
|         response = self.get_response()
 | |
|         storage.update(response)
 | |
|         storing = self.stored_messages_count(storage, response)
 | |
|         self.assertEqual(storing, 0)
 | |
| 
 | |
|     def test_add_update(self):
 | |
|         storage = self.get_storage()
 | |
|         response = self.get_response()
 | |
| 
 | |
|         storage.add(constants.INFO, "Test message 1")
 | |
|         storage.add(constants.INFO, "Test message 1", extra_tags="tag")
 | |
|         storage.update(response)
 | |
| 
 | |
|         storing = self.stored_messages_count(storage, response)
 | |
|         self.assertEqual(storing, 2)
 | |
| 
 | |
|     def test_existing_add_read_update(self):
 | |
|         storage = self.get_existing_storage()
 | |
|         response = self.get_response()
 | |
| 
 | |
|         storage.add(constants.INFO, "Test message 3")
 | |
|         list(storage)  # Simulates a read
 | |
|         storage.update(response)
 | |
| 
 | |
|         storing = self.stored_messages_count(storage, response)
 | |
|         self.assertEqual(storing, 0)
 | |
| 
 | |
|     def test_existing_read_add_update(self):
 | |
|         storage = self.get_existing_storage()
 | |
|         response = self.get_response()
 | |
| 
 | |
|         list(storage)  # Simulates a read
 | |
|         storage.add(constants.INFO, "Test message 3")
 | |
|         storage.update(response)
 | |
| 
 | |
|         storing = self.stored_messages_count(storage, response)
 | |
|         self.assertEqual(storing, 1)
 | |
| 
 | |
|     @override_settings(MESSAGE_LEVEL=constants.DEBUG)
 | |
|     def test_full_request_response_cycle(self):
 | |
|         """
 | |
|         With the message middleware enabled, messages are properly stored and
 | |
|         retrieved across the full request/redirect/response cycle.
 | |
|         """
 | |
|         data = {
 | |
|             "messages": ["Test message %d" % x for x in range(5)],
 | |
|         }
 | |
|         show_url = reverse("show_message")
 | |
|         for level in ("debug", "info", "success", "warning", "error"):
 | |
|             add_url = reverse("add_message", args=(level,))
 | |
|             response = self.client.post(add_url, data, follow=True)
 | |
|             self.assertRedirects(response, show_url)
 | |
|             self.assertIn("messages", response.context)
 | |
|             messages = [Message(self.levels[level], msg) for msg in data["messages"]]
 | |
|             self.assertEqual(list(response.context["messages"]), messages)
 | |
|             for msg in data["messages"]:
 | |
|                 self.assertContains(response, msg)
 | |
| 
 | |
|     @override_settings(MESSAGE_LEVEL=constants.DEBUG)
 | |
|     def test_with_template_response(self):
 | |
|         data = {
 | |
|             "messages": ["Test message %d" % x for x in range(5)],
 | |
|         }
 | |
|         show_url = reverse("show_template_response")
 | |
|         for level in self.levels:
 | |
|             add_url = reverse("add_template_response", args=(level,))
 | |
|             response = self.client.post(add_url, data, follow=True)
 | |
|             self.assertRedirects(response, show_url)
 | |
|             self.assertIn("messages", response.context)
 | |
|             for msg in data["messages"]:
 | |
|                 self.assertContains(response, msg)
 | |
| 
 | |
|             # there shouldn't be any messages on second GET request
 | |
|             response = self.client.get(show_url)
 | |
|             for msg in data["messages"]:
 | |
|                 self.assertNotContains(response, msg)
 | |
| 
 | |
|     def test_context_processor_message_levels(self):
 | |
|         show_url = reverse("show_template_response")
 | |
|         response = self.client.get(show_url)
 | |
| 
 | |
|         self.assertIn("DEFAULT_MESSAGE_LEVELS", response.context)
 | |
|         self.assertEqual(response.context["DEFAULT_MESSAGE_LEVELS"], DEFAULT_LEVELS)
 | |
| 
 | |
|     @override_settings(MESSAGE_LEVEL=constants.DEBUG)
 | |
|     def test_multiple_posts(self):
 | |
|         """
 | |
|         Messages persist properly when multiple POSTs are made before a GET.
 | |
|         """
 | |
|         data = {
 | |
|             "messages": ["Test message %d" % x for x in range(5)],
 | |
|         }
 | |
|         show_url = reverse("show_message")
 | |
|         messages = []
 | |
|         for level in ("debug", "info", "success", "warning", "error"):
 | |
|             messages.extend(
 | |
|                 Message(self.levels[level], msg) for msg in data["messages"]
 | |
|             )
 | |
|             add_url = reverse("add_message", args=(level,))
 | |
|             self.client.post(add_url, data)
 | |
|         response = self.client.get(show_url)
 | |
|         self.assertIn("messages", response.context)
 | |
|         self.assertEqual(list(response.context["messages"]), messages)
 | |
|         for msg in data["messages"]:
 | |
|             self.assertContains(response, msg)
 | |
| 
 | |
|     @modify_settings(
 | |
|         INSTALLED_APPS={"remove": "django.contrib.messages"},
 | |
|         MIDDLEWARE={"remove": "django.contrib.messages.middleware.MessageMiddleware"},
 | |
|     )
 | |
|     @override_settings(
 | |
|         MESSAGE_LEVEL=constants.DEBUG,
 | |
|         TEMPLATES=[
 | |
|             {
 | |
|                 "BACKEND": "django.template.backends.django.DjangoTemplates",
 | |
|                 "DIRS": [],
 | |
|                 "APP_DIRS": True,
 | |
|             }
 | |
|         ],
 | |
|     )
 | |
|     def test_middleware_disabled(self):
 | |
|         """
 | |
|         When the middleware is disabled, an exception is raised when one
 | |
|         attempts to store a message.
 | |
|         """
 | |
|         data = {
 | |
|             "messages": ["Test message %d" % x for x in range(5)],
 | |
|         }
 | |
|         reverse("show_message")
 | |
|         for level in ("debug", "info", "success", "warning", "error"):
 | |
|             add_url = reverse("add_message", args=(level,))
 | |
|             with self.assertRaises(MessageFailure):
 | |
|                 self.client.post(add_url, data, follow=True)
 | |
| 
 | |
|     @modify_settings(
 | |
|         INSTALLED_APPS={"remove": "django.contrib.messages"},
 | |
|         MIDDLEWARE={"remove": "django.contrib.messages.middleware.MessageMiddleware"},
 | |
|     )
 | |
|     @override_settings(
 | |
|         TEMPLATES=[
 | |
|             {
 | |
|                 "BACKEND": "django.template.backends.django.DjangoTemplates",
 | |
|                 "DIRS": [],
 | |
|                 "APP_DIRS": True,
 | |
|             }
 | |
|         ],
 | |
|     )
 | |
|     def test_middleware_disabled_fail_silently(self):
 | |
|         """
 | |
|         When the middleware is disabled, an exception is not raised
 | |
|         if 'fail_silently' is True.
 | |
|         """
 | |
|         data = {
 | |
|             "messages": ["Test message %d" % x for x in range(5)],
 | |
|             "fail_silently": True,
 | |
|         }
 | |
|         show_url = reverse("show_message")
 | |
|         for level in ("debug", "info", "success", "warning", "error"):
 | |
|             add_url = reverse("add_message", args=(level,))
 | |
|             response = self.client.post(add_url, data, follow=True)
 | |
|             self.assertRedirects(response, show_url)
 | |
|             self.assertNotIn("messages", response.context)
 | |
| 
 | |
|     def stored_messages_count(self, storage, response):
 | |
|         """
 | |
|         Return the number of messages being stored after a
 | |
|         ``storage.update()`` call.
 | |
|         """
 | |
|         raise NotImplementedError("This method must be set by a subclass.")
 | |
| 
 | |
|     def test_get(self):
 | |
|         raise NotImplementedError("This method must be set by a subclass.")
 | |
| 
 | |
|     def get_existing_storage(self):
 | |
|         return self.get_storage(
 | |
|             [
 | |
|                 Message(constants.INFO, "Test message 1"),
 | |
|                 Message(constants.INFO, "Test message 2", extra_tags="tag"),
 | |
|             ]
 | |
|         )
 | |
| 
 | |
|     def test_existing_read(self):
 | |
|         """
 | |
|         Reading the existing storage doesn't cause the data to be lost.
 | |
|         """
 | |
|         storage = self.get_existing_storage()
 | |
|         self.assertFalse(storage.used)
 | |
|         # After iterating the storage engine directly, the used flag is set.
 | |
|         data = list(storage)
 | |
|         self.assertTrue(storage.used)
 | |
|         # The data does not disappear because it has been iterated.
 | |
|         self.assertEqual(data, list(storage))
 | |
| 
 | |
|     def test_existing_add(self):
 | |
|         storage = self.get_existing_storage()
 | |
|         self.assertFalse(storage.added_new)
 | |
|         storage.add(constants.INFO, "Test message 3")
 | |
|         self.assertTrue(storage.added_new)
 | |
| 
 | |
|     def test_default_level(self):
 | |
|         # get_level works even with no storage on the request.
 | |
|         request = self.get_request()
 | |
|         self.assertEqual(get_level(request), constants.INFO)
 | |
| 
 | |
|         # get_level returns the default level if it hasn't been set.
 | |
|         storage = self.get_storage()
 | |
|         request._messages = storage
 | |
|         self.assertEqual(get_level(request), constants.INFO)
 | |
| 
 | |
|         # Only messages of sufficient level get recorded.
 | |
|         add_level_messages(storage)
 | |
|         self.assertEqual(len(storage), 5)
 | |
| 
 | |
|     def test_low_level(self):
 | |
|         request = self.get_request()
 | |
|         storage = self.storage_class(request)
 | |
|         request._messages = storage
 | |
| 
 | |
|         self.assertTrue(set_level(request, 5))
 | |
|         self.assertEqual(get_level(request), 5)
 | |
| 
 | |
|         add_level_messages(storage)
 | |
|         self.assertEqual(len(storage), 6)
 | |
| 
 | |
|     def test_high_level(self):
 | |
|         request = self.get_request()
 | |
|         storage = self.storage_class(request)
 | |
|         request._messages = storage
 | |
| 
 | |
|         self.assertTrue(set_level(request, 30))
 | |
|         self.assertEqual(get_level(request), 30)
 | |
| 
 | |
|         add_level_messages(storage)
 | |
|         self.assertEqual(len(storage), 2)
 | |
| 
 | |
|     @override_settings(MESSAGE_LEVEL=29)
 | |
|     def test_settings_level(self):
 | |
|         request = self.get_request()
 | |
|         storage = self.storage_class(request)
 | |
| 
 | |
|         self.assertEqual(get_level(request), 29)
 | |
| 
 | |
|         add_level_messages(storage)
 | |
|         self.assertEqual(len(storage), 3)
 | |
| 
 | |
|     def test_tags(self):
 | |
|         storage = self.get_storage()
 | |
|         storage.level = 0
 | |
|         add_level_messages(storage)
 | |
|         storage.add(constants.INFO, "A generic info message", extra_tags=None)
 | |
|         tags = [msg.tags for msg in storage]
 | |
|         self.assertEqual(
 | |
|             tags, ["info", "", "extra-tag debug", "warning", "error", "success", "info"]
 | |
|         )
 | |
| 
 | |
|     def test_level_tag(self):
 | |
|         storage = self.get_storage()
 | |
|         storage.level = 0
 | |
|         add_level_messages(storage)
 | |
|         tags = [msg.level_tag for msg in storage]
 | |
|         self.assertEqual(tags, ["info", "", "debug", "warning", "error", "success"])
 | |
| 
 | |
|     @override_settings(
 | |
|         MESSAGE_TAGS={
 | |
|             constants.INFO: "info",
 | |
|             constants.DEBUG: "",
 | |
|             constants.WARNING: "",
 | |
|             constants.ERROR: "bad",
 | |
|             29: "custom",
 | |
|         }
 | |
|     )
 | |
|     def test_custom_tags(self):
 | |
|         storage = self.get_storage()
 | |
|         storage.level = 0
 | |
|         add_level_messages(storage)
 | |
|         tags = [msg.tags for msg in storage]
 | |
|         self.assertEqual(tags, ["info", "custom", "extra-tag", "", "bad", "success"])
 |