1
0
mirror of https://github.com/django/django.git synced 2025-01-22 08:10:28 +00:00
2024-01-04 05:55:29 +01:00

385 lines
14 KiB
Python

from django.contrib.messages import Message, constants, get_level, set_level
from django.contrib.messages.api import MessageFailure
from django.contrib.messages.constants import DEFAULT_LEVELS
from django.contrib.messages.storage import default_storage
from django.http import HttpRequest, HttpResponse
from django.test import modify_settings, override_settings
from django.urls import reverse
from django.utils.translation import gettext_lazy
def add_level_messages(storage):
"""
Add 6 messages from different levels (including a custom one) to a storage
instance.
"""
storage.add(constants.INFO, "A generic info message")
storage.add(29, "Some custom level")
storage.add(constants.DEBUG, "A debugging message", extra_tags="extra-tag")
storage.add(constants.WARNING, "A warning")
storage.add(constants.ERROR, "An error")
storage.add(constants.SUCCESS, "This was a triumph.")
class BaseTests:
storage_class = default_storage
levels = {
"debug": constants.DEBUG,
"info": constants.INFO,
"success": constants.SUCCESS,
"warning": constants.WARNING,
"error": constants.ERROR,
}
@classmethod
def setUpClass(cls):
cls.enterClassContext(
override_settings(
TEMPLATES=[
{
"BACKEND": "django.template.backends.django.DjangoTemplates",
"DIRS": [],
"APP_DIRS": True,
"OPTIONS": {
"context_processors": (
"django.contrib.auth.context_processors.auth",
"django.contrib.messages.context_processors.messages",
),
},
}
],
ROOT_URLCONF="messages_tests.urls",
MESSAGE_TAGS={},
MESSAGE_STORAGE=(
f"{cls.storage_class.__module__}.{cls.storage_class.__name__}"
),
SESSION_SERIALIZER="django.contrib.sessions.serializers.JSONSerializer",
)
)
super().setUpClass()
def get_request(self):
return HttpRequest()
def get_response(self):
return HttpResponse()
def get_storage(self, data=None):
"""
Return the storage backend, setting its loaded data to the ``data``
argument.
This method avoids the storage ``_get`` method from getting called so
that other parts of the storage backend can be tested independent of
the message retrieval logic.
"""
storage = self.storage_class(self.get_request())
storage._loaded_data = data or []
return storage
def test_repr(self):
request = self.get_request()
storage = self.storage_class(request)
self.assertEqual(
repr(storage),
f"<{self.storage_class.__qualname__}: request=<HttpRequest>>",
)
def test_add(self):
storage = self.get_storage()
self.assertFalse(storage.added_new)
storage.add(constants.INFO, "Test message 1")
self.assertTrue(storage.added_new)
storage.add(constants.INFO, "Test message 2", extra_tags="tag")
self.assertEqual(len(storage), 2)
def test_add_lazy_translation(self):
storage = self.get_storage()
response = self.get_response()
storage.add(constants.INFO, gettext_lazy("lazy message"))
storage.update(response)
storing = self.stored_messages_count(storage, response)
self.assertEqual(storing, 1)
def test_no_update(self):
storage = self.get_storage()
response = self.get_response()
storage.update(response)
storing = self.stored_messages_count(storage, response)
self.assertEqual(storing, 0)
def test_add_update(self):
storage = self.get_storage()
response = self.get_response()
storage.add(constants.INFO, "Test message 1")
storage.add(constants.INFO, "Test message 1", extra_tags="tag")
storage.update(response)
storing = self.stored_messages_count(storage, response)
self.assertEqual(storing, 2)
def test_existing_add_read_update(self):
storage = self.get_existing_storage()
response = self.get_response()
storage.add(constants.INFO, "Test message 3")
list(storage) # Simulates a read
storage.update(response)
storing = self.stored_messages_count(storage, response)
self.assertEqual(storing, 0)
def test_existing_read_add_update(self):
storage = self.get_existing_storage()
response = self.get_response()
list(storage) # Simulates a read
storage.add(constants.INFO, "Test message 3")
storage.update(response)
storing = self.stored_messages_count(storage, response)
self.assertEqual(storing, 1)
@override_settings(MESSAGE_LEVEL=constants.DEBUG)
def test_full_request_response_cycle(self):
"""
With the message middleware enabled, messages are properly stored and
retrieved across the full request/redirect/response cycle.
"""
data = {
"messages": ["Test message %d" % x for x in range(5)],
}
show_url = reverse("show_message")
for level in ("debug", "info", "success", "warning", "error"):
add_url = reverse("add_message", args=(level,))
response = self.client.post(add_url, data, follow=True)
self.assertRedirects(response, show_url)
self.assertIn("messages", response.context)
messages = [Message(self.levels[level], msg) for msg in data["messages"]]
self.assertEqual(list(response.context["messages"]), messages)
for msg in data["messages"]:
self.assertContains(response, msg)
@override_settings(MESSAGE_LEVEL=constants.DEBUG)
def test_with_template_response(self):
data = {
"messages": ["Test message %d" % x for x in range(5)],
}
show_url = reverse("show_template_response")
for level in self.levels:
add_url = reverse("add_template_response", args=(level,))
response = self.client.post(add_url, data, follow=True)
self.assertRedirects(response, show_url)
self.assertIn("messages", response.context)
for msg in data["messages"]:
self.assertContains(response, msg)
# there shouldn't be any messages on second GET request
response = self.client.get(show_url)
for msg in data["messages"]:
self.assertNotContains(response, msg)
def test_context_processor_message_levels(self):
show_url = reverse("show_template_response")
response = self.client.get(show_url)
self.assertIn("DEFAULT_MESSAGE_LEVELS", response.context)
self.assertEqual(response.context["DEFAULT_MESSAGE_LEVELS"], DEFAULT_LEVELS)
@override_settings(MESSAGE_LEVEL=constants.DEBUG)
def test_multiple_posts(self):
"""
Messages persist properly when multiple POSTs are made before a GET.
"""
data = {
"messages": ["Test message %d" % x for x in range(5)],
}
show_url = reverse("show_message")
messages = []
for level in ("debug", "info", "success", "warning", "error"):
messages.extend(
Message(self.levels[level], msg) for msg in data["messages"]
)
add_url = reverse("add_message", args=(level,))
self.client.post(add_url, data)
response = self.client.get(show_url)
self.assertIn("messages", response.context)
self.assertEqual(list(response.context["messages"]), messages)
for msg in data["messages"]:
self.assertContains(response, msg)
@modify_settings(
INSTALLED_APPS={"remove": "django.contrib.messages"},
MIDDLEWARE={"remove": "django.contrib.messages.middleware.MessageMiddleware"},
)
@override_settings(
MESSAGE_LEVEL=constants.DEBUG,
TEMPLATES=[
{
"BACKEND": "django.template.backends.django.DjangoTemplates",
"DIRS": [],
"APP_DIRS": True,
}
],
)
def test_middleware_disabled(self):
"""
When the middleware is disabled, an exception is raised when one
attempts to store a message.
"""
data = {
"messages": ["Test message %d" % x for x in range(5)],
}
reverse("show_message")
for level in ("debug", "info", "success", "warning", "error"):
add_url = reverse("add_message", args=(level,))
with self.assertRaises(MessageFailure):
self.client.post(add_url, data, follow=True)
@modify_settings(
INSTALLED_APPS={"remove": "django.contrib.messages"},
MIDDLEWARE={"remove": "django.contrib.messages.middleware.MessageMiddleware"},
)
@override_settings(
TEMPLATES=[
{
"BACKEND": "django.template.backends.django.DjangoTemplates",
"DIRS": [],
"APP_DIRS": True,
}
],
)
def test_middleware_disabled_fail_silently(self):
"""
When the middleware is disabled, an exception is not raised
if 'fail_silently' is True.
"""
data = {
"messages": ["Test message %d" % x for x in range(5)],
"fail_silently": True,
}
show_url = reverse("show_message")
for level in ("debug", "info", "success", "warning", "error"):
add_url = reverse("add_message", args=(level,))
response = self.client.post(add_url, data, follow=True)
self.assertRedirects(response, show_url)
self.assertNotIn("messages", response.context)
def stored_messages_count(self, storage, response):
"""
Return the number of messages being stored after a
``storage.update()`` call.
"""
raise NotImplementedError("This method must be set by a subclass.")
def test_get(self):
raise NotImplementedError("This method must be set by a subclass.")
def get_existing_storage(self):
return self.get_storage(
[
Message(constants.INFO, "Test message 1"),
Message(constants.INFO, "Test message 2", extra_tags="tag"),
]
)
def test_existing_read(self):
"""
Reading the existing storage doesn't cause the data to be lost.
"""
storage = self.get_existing_storage()
self.assertFalse(storage.used)
# After iterating the storage engine directly, the used flag is set.
data = list(storage)
self.assertTrue(storage.used)
# The data does not disappear because it has been iterated.
self.assertEqual(data, list(storage))
def test_existing_add(self):
storage = self.get_existing_storage()
self.assertFalse(storage.added_new)
storage.add(constants.INFO, "Test message 3")
self.assertTrue(storage.added_new)
def test_default_level(self):
# get_level works even with no storage on the request.
request = self.get_request()
self.assertEqual(get_level(request), constants.INFO)
# get_level returns the default level if it hasn't been set.
storage = self.get_storage()
request._messages = storage
self.assertEqual(get_level(request), constants.INFO)
# Only messages of sufficient level get recorded.
add_level_messages(storage)
self.assertEqual(len(storage), 5)
def test_low_level(self):
request = self.get_request()
storage = self.storage_class(request)
request._messages = storage
self.assertTrue(set_level(request, 5))
self.assertEqual(get_level(request), 5)
add_level_messages(storage)
self.assertEqual(len(storage), 6)
def test_high_level(self):
request = self.get_request()
storage = self.storage_class(request)
request._messages = storage
self.assertTrue(set_level(request, 30))
self.assertEqual(get_level(request), 30)
add_level_messages(storage)
self.assertEqual(len(storage), 2)
@override_settings(MESSAGE_LEVEL=29)
def test_settings_level(self):
request = self.get_request()
storage = self.storage_class(request)
self.assertEqual(get_level(request), 29)
add_level_messages(storage)
self.assertEqual(len(storage), 3)
def test_tags(self):
storage = self.get_storage()
storage.level = 0
add_level_messages(storage)
storage.add(constants.INFO, "A generic info message", extra_tags=None)
tags = [msg.tags for msg in storage]
self.assertEqual(
tags, ["info", "", "extra-tag debug", "warning", "error", "success", "info"]
)
def test_level_tag(self):
storage = self.get_storage()
storage.level = 0
add_level_messages(storage)
tags = [msg.level_tag for msg in storage]
self.assertEqual(tags, ["info", "", "debug", "warning", "error", "success"])
@override_settings(
MESSAGE_TAGS={
constants.INFO: "info",
constants.DEBUG: "",
constants.WARNING: "",
constants.ERROR: "bad",
29: "custom",
}
)
def test_custom_tags(self):
storage = self.get_storage()
storage.level = 0
add_level_messages(storage)
tags = [msg.tags for msg in storage]
self.assertEqual(tags, ["info", "custom", "extra-tag", "", "bad", "success"])