mirror of
https://github.com/django/django.git
synced 2024-12-24 10:05:46 +00:00
384 lines
14 KiB
Python
384 lines
14 KiB
Python
from django.contrib.messages import constants, get_level, set_level, utils
|
|
from django.contrib.messages.api import MessageFailure
|
|
from django.contrib.messages.constants import DEFAULT_LEVELS
|
|
from django.contrib.messages.storage import base, default_storage
|
|
from django.contrib.messages.storage.base import Message
|
|
from django.http import HttpRequest, HttpResponse
|
|
from django.test import modify_settings, override_settings
|
|
from django.urls import reverse
|
|
from django.utils.translation import gettext_lazy
|
|
|
|
|
|
def add_level_messages(storage):
|
|
"""
|
|
Add 6 messages from different levels (including a custom one) to a storage
|
|
instance.
|
|
"""
|
|
storage.add(constants.INFO, 'A generic info message')
|
|
storage.add(29, 'Some custom level')
|
|
storage.add(constants.DEBUG, 'A debugging message', extra_tags='extra-tag')
|
|
storage.add(constants.WARNING, 'A warning')
|
|
storage.add(constants.ERROR, 'An error')
|
|
storage.add(constants.SUCCESS, 'This was a triumph.')
|
|
|
|
|
|
class override_settings_tags(override_settings):
|
|
def enable(self):
|
|
super().enable()
|
|
# LEVEL_TAGS is a constant defined in the
|
|
# django.contrib.messages.storage.base module, so after changing
|
|
# settings.MESSAGE_TAGS, update that constant also.
|
|
self.old_level_tags = base.LEVEL_TAGS
|
|
base.LEVEL_TAGS = utils.get_level_tags()
|
|
|
|
def disable(self):
|
|
super().disable()
|
|
base.LEVEL_TAGS = self.old_level_tags
|
|
|
|
|
|
class BaseTests:
|
|
storage_class = default_storage
|
|
levels = {
|
|
'debug': constants.DEBUG,
|
|
'info': constants.INFO,
|
|
'success': constants.SUCCESS,
|
|
'warning': constants.WARNING,
|
|
'error': constants.ERROR,
|
|
}
|
|
|
|
def setUp(self):
|
|
self.settings_override = override_settings_tags(
|
|
TEMPLATES=[{
|
|
'BACKEND': 'django.template.backends.django.DjangoTemplates',
|
|
'DIRS': [],
|
|
'APP_DIRS': True,
|
|
'OPTIONS': {
|
|
'context_processors': (
|
|
'django.contrib.auth.context_processors.auth',
|
|
'django.contrib.messages.context_processors.messages',
|
|
),
|
|
},
|
|
}],
|
|
ROOT_URLCONF='messages_tests.urls',
|
|
MESSAGE_TAGS={},
|
|
MESSAGE_STORAGE='%s.%s' % (self.storage_class.__module__, self.storage_class.__name__),
|
|
SESSION_SERIALIZER='django.contrib.sessions.serializers.JSONSerializer',
|
|
)
|
|
self.settings_override.enable()
|
|
|
|
def tearDown(self):
|
|
self.settings_override.disable()
|
|
|
|
def get_request(self):
|
|
return HttpRequest()
|
|
|
|
def get_response(self):
|
|
return HttpResponse()
|
|
|
|
def get_storage(self, data=None):
|
|
"""
|
|
Return the storage backend, setting its loaded data to the ``data``
|
|
argument.
|
|
|
|
This method avoids the storage ``_get`` method from getting called so
|
|
that other parts of the storage backend can be tested independent of
|
|
the message retrieval logic.
|
|
"""
|
|
storage = self.storage_class(self.get_request())
|
|
storage._loaded_data = data or []
|
|
return storage
|
|
|
|
def test_repr(self):
|
|
request = self.get_request()
|
|
storage = self.storage_class(request)
|
|
self.assertEqual(
|
|
repr(storage),
|
|
f'<{self.storage_class.__qualname__}: request=<HttpRequest>>',
|
|
)
|
|
|
|
def test_add(self):
|
|
storage = self.get_storage()
|
|
self.assertFalse(storage.added_new)
|
|
storage.add(constants.INFO, 'Test message 1')
|
|
self.assertTrue(storage.added_new)
|
|
storage.add(constants.INFO, 'Test message 2', extra_tags='tag')
|
|
self.assertEqual(len(storage), 2)
|
|
|
|
def test_add_lazy_translation(self):
|
|
storage = self.get_storage()
|
|
response = self.get_response()
|
|
|
|
storage.add(constants.INFO, gettext_lazy('lazy message'))
|
|
storage.update(response)
|
|
|
|
storing = self.stored_messages_count(storage, response)
|
|
self.assertEqual(storing, 1)
|
|
|
|
def test_no_update(self):
|
|
storage = self.get_storage()
|
|
response = self.get_response()
|
|
storage.update(response)
|
|
storing = self.stored_messages_count(storage, response)
|
|
self.assertEqual(storing, 0)
|
|
|
|
def test_add_update(self):
|
|
storage = self.get_storage()
|
|
response = self.get_response()
|
|
|
|
storage.add(constants.INFO, 'Test message 1')
|
|
storage.add(constants.INFO, 'Test message 1', extra_tags='tag')
|
|
storage.update(response)
|
|
|
|
storing = self.stored_messages_count(storage, response)
|
|
self.assertEqual(storing, 2)
|
|
|
|
def test_existing_add_read_update(self):
|
|
storage = self.get_existing_storage()
|
|
response = self.get_response()
|
|
|
|
storage.add(constants.INFO, 'Test message 3')
|
|
list(storage) # Simulates a read
|
|
storage.update(response)
|
|
|
|
storing = self.stored_messages_count(storage, response)
|
|
self.assertEqual(storing, 0)
|
|
|
|
def test_existing_read_add_update(self):
|
|
storage = self.get_existing_storage()
|
|
response = self.get_response()
|
|
|
|
list(storage) # Simulates a read
|
|
storage.add(constants.INFO, 'Test message 3')
|
|
storage.update(response)
|
|
|
|
storing = self.stored_messages_count(storage, response)
|
|
self.assertEqual(storing, 1)
|
|
|
|
@override_settings(MESSAGE_LEVEL=constants.DEBUG)
|
|
def test_full_request_response_cycle(self):
|
|
"""
|
|
With the message middleware enabled, messages are properly stored and
|
|
retrieved across the full request/redirect/response cycle.
|
|
"""
|
|
data = {
|
|
'messages': ['Test message %d' % x for x in range(5)],
|
|
}
|
|
show_url = reverse('show_message')
|
|
for level in ('debug', 'info', 'success', 'warning', 'error'):
|
|
add_url = reverse('add_message', args=(level,))
|
|
response = self.client.post(add_url, data, follow=True)
|
|
self.assertRedirects(response, show_url)
|
|
self.assertIn('messages', response.context)
|
|
messages = [Message(self.levels[level], msg) for msg in data['messages']]
|
|
self.assertEqual(list(response.context['messages']), messages)
|
|
for msg in data['messages']:
|
|
self.assertContains(response, msg)
|
|
|
|
@override_settings(MESSAGE_LEVEL=constants.DEBUG)
|
|
def test_with_template_response(self):
|
|
data = {
|
|
'messages': ['Test message %d' % x for x in range(5)],
|
|
}
|
|
show_url = reverse('show_template_response')
|
|
for level in self.levels:
|
|
add_url = reverse('add_template_response', args=(level,))
|
|
response = self.client.post(add_url, data, follow=True)
|
|
self.assertRedirects(response, show_url)
|
|
self.assertIn('messages', response.context)
|
|
for msg in data['messages']:
|
|
self.assertContains(response, msg)
|
|
|
|
# there shouldn't be any messages on second GET request
|
|
response = self.client.get(show_url)
|
|
for msg in data['messages']:
|
|
self.assertNotContains(response, msg)
|
|
|
|
def test_context_processor_message_levels(self):
|
|
show_url = reverse('show_template_response')
|
|
response = self.client.get(show_url)
|
|
|
|
self.assertIn('DEFAULT_MESSAGE_LEVELS', response.context)
|
|
self.assertEqual(response.context['DEFAULT_MESSAGE_LEVELS'], DEFAULT_LEVELS)
|
|
|
|
@override_settings(MESSAGE_LEVEL=constants.DEBUG)
|
|
def test_multiple_posts(self):
|
|
"""
|
|
Messages persist properly when multiple POSTs are made before a GET.
|
|
"""
|
|
data = {
|
|
'messages': ['Test message %d' % x for x in range(5)],
|
|
}
|
|
show_url = reverse('show_message')
|
|
messages = []
|
|
for level in ('debug', 'info', 'success', 'warning', 'error'):
|
|
messages.extend(Message(self.levels[level], msg) for msg in data['messages'])
|
|
add_url = reverse('add_message', args=(level,))
|
|
self.client.post(add_url, data)
|
|
response = self.client.get(show_url)
|
|
self.assertIn('messages', response.context)
|
|
self.assertEqual(list(response.context['messages']), messages)
|
|
for msg in data['messages']:
|
|
self.assertContains(response, msg)
|
|
|
|
@modify_settings(
|
|
INSTALLED_APPS={'remove': 'django.contrib.messages'},
|
|
MIDDLEWARE={'remove': 'django.contrib.messages.middleware.MessageMiddleware'},
|
|
)
|
|
@override_settings(
|
|
MESSAGE_LEVEL=constants.DEBUG,
|
|
TEMPLATES=[{
|
|
'BACKEND': 'django.template.backends.django.DjangoTemplates',
|
|
'DIRS': [],
|
|
'APP_DIRS': True,
|
|
}],
|
|
)
|
|
def test_middleware_disabled(self):
|
|
"""
|
|
When the middleware is disabled, an exception is raised when one
|
|
attempts to store a message.
|
|
"""
|
|
data = {
|
|
'messages': ['Test message %d' % x for x in range(5)],
|
|
}
|
|
reverse('show_message')
|
|
for level in ('debug', 'info', 'success', 'warning', 'error'):
|
|
add_url = reverse('add_message', args=(level,))
|
|
with self.assertRaises(MessageFailure):
|
|
self.client.post(add_url, data, follow=True)
|
|
|
|
@modify_settings(
|
|
INSTALLED_APPS={'remove': 'django.contrib.messages'},
|
|
MIDDLEWARE={'remove': 'django.contrib.messages.middleware.MessageMiddleware'},
|
|
)
|
|
@override_settings(
|
|
TEMPLATES=[{
|
|
'BACKEND': 'django.template.backends.django.DjangoTemplates',
|
|
'DIRS': [],
|
|
'APP_DIRS': True,
|
|
}],
|
|
)
|
|
def test_middleware_disabled_fail_silently(self):
|
|
"""
|
|
When the middleware is disabled, an exception is not raised
|
|
if 'fail_silently' is True.
|
|
"""
|
|
data = {
|
|
'messages': ['Test message %d' % x for x in range(5)],
|
|
'fail_silently': True,
|
|
}
|
|
show_url = reverse('show_message')
|
|
for level in ('debug', 'info', 'success', 'warning', 'error'):
|
|
add_url = reverse('add_message', args=(level,))
|
|
response = self.client.post(add_url, data, follow=True)
|
|
self.assertRedirects(response, show_url)
|
|
self.assertNotIn('messages', response.context)
|
|
|
|
def stored_messages_count(self, storage, response):
|
|
"""
|
|
Return the number of messages being stored after a
|
|
``storage.update()`` call.
|
|
"""
|
|
raise NotImplementedError('This method must be set by a subclass.')
|
|
|
|
def test_get(self):
|
|
raise NotImplementedError('This method must be set by a subclass.')
|
|
|
|
def get_existing_storage(self):
|
|
return self.get_storage([
|
|
Message(constants.INFO, 'Test message 1'),
|
|
Message(constants.INFO, 'Test message 2', extra_tags='tag'),
|
|
])
|
|
|
|
def test_existing_read(self):
|
|
"""
|
|
Reading the existing storage doesn't cause the data to be lost.
|
|
"""
|
|
storage = self.get_existing_storage()
|
|
self.assertFalse(storage.used)
|
|
# After iterating the storage engine directly, the used flag is set.
|
|
data = list(storage)
|
|
self.assertTrue(storage.used)
|
|
# The data does not disappear because it has been iterated.
|
|
self.assertEqual(data, list(storage))
|
|
|
|
def test_existing_add(self):
|
|
storage = self.get_existing_storage()
|
|
self.assertFalse(storage.added_new)
|
|
storage.add(constants.INFO, 'Test message 3')
|
|
self.assertTrue(storage.added_new)
|
|
|
|
def test_default_level(self):
|
|
# get_level works even with no storage on the request.
|
|
request = self.get_request()
|
|
self.assertEqual(get_level(request), constants.INFO)
|
|
|
|
# get_level returns the default level if it hasn't been set.
|
|
storage = self.get_storage()
|
|
request._messages = storage
|
|
self.assertEqual(get_level(request), constants.INFO)
|
|
|
|
# Only messages of sufficient level get recorded.
|
|
add_level_messages(storage)
|
|
self.assertEqual(len(storage), 5)
|
|
|
|
def test_low_level(self):
|
|
request = self.get_request()
|
|
storage = self.storage_class(request)
|
|
request._messages = storage
|
|
|
|
self.assertTrue(set_level(request, 5))
|
|
self.assertEqual(get_level(request), 5)
|
|
|
|
add_level_messages(storage)
|
|
self.assertEqual(len(storage), 6)
|
|
|
|
def test_high_level(self):
|
|
request = self.get_request()
|
|
storage = self.storage_class(request)
|
|
request._messages = storage
|
|
|
|
self.assertTrue(set_level(request, 30))
|
|
self.assertEqual(get_level(request), 30)
|
|
|
|
add_level_messages(storage)
|
|
self.assertEqual(len(storage), 2)
|
|
|
|
@override_settings(MESSAGE_LEVEL=29)
|
|
def test_settings_level(self):
|
|
request = self.get_request()
|
|
storage = self.storage_class(request)
|
|
|
|
self.assertEqual(get_level(request), 29)
|
|
|
|
add_level_messages(storage)
|
|
self.assertEqual(len(storage), 3)
|
|
|
|
def test_tags(self):
|
|
storage = self.get_storage()
|
|
storage.level = 0
|
|
add_level_messages(storage)
|
|
storage.add(constants.INFO, 'A generic info message', extra_tags=None)
|
|
tags = [msg.tags for msg in storage]
|
|
self.assertEqual(tags, ['info', '', 'extra-tag debug', 'warning', 'error', 'success', 'info'])
|
|
|
|
def test_level_tag(self):
|
|
storage = self.get_storage()
|
|
storage.level = 0
|
|
add_level_messages(storage)
|
|
tags = [msg.level_tag for msg in storage]
|
|
self.assertEqual(tags, ['info', '', 'debug', 'warning', 'error', 'success'])
|
|
|
|
@override_settings_tags(MESSAGE_TAGS={
|
|
constants.INFO: 'info',
|
|
constants.DEBUG: '',
|
|
constants.WARNING: '',
|
|
constants.ERROR: 'bad',
|
|
29: 'custom',
|
|
})
|
|
def test_custom_tags(self):
|
|
storage = self.get_storage()
|
|
storage.level = 0
|
|
add_level_messages(storage)
|
|
tags = [msg.tags for msg in storage]
|
|
self.assertEqual(tags, ['info', 'custom', 'extra-tag', '', 'bad', 'success'])
|