from django.contrib.messages import constants, get_level, set_level from django.contrib.messages.api import MessageFailure from django.contrib.messages.constants import DEFAULT_LEVELS from django.contrib.messages.storage import default_storage from django.contrib.messages.storage.base import Message from django.http import HttpRequest, HttpResponse from django.test import modify_settings, override_settings from django.urls import reverse from django.utils.translation import gettext_lazy def add_level_messages(storage): """ Add 6 messages from different levels (including a custom one) to a storage instance. """ storage.add(constants.INFO, 'A generic info message') storage.add(29, 'Some custom level') storage.add(constants.DEBUG, 'A debugging message', extra_tags='extra-tag') storage.add(constants.WARNING, 'A warning') storage.add(constants.ERROR, 'An error') storage.add(constants.SUCCESS, 'This was a triumph.') class BaseTests: storage_class = default_storage levels = { 'debug': constants.DEBUG, 'info': constants.INFO, 'success': constants.SUCCESS, 'warning': constants.WARNING, 'error': constants.ERROR, } def setUp(self): self.settings_override = override_settings( TEMPLATES=[{ 'BACKEND': 'django.template.backends.django.DjangoTemplates', 'DIRS': [], 'APP_DIRS': True, 'OPTIONS': { 'context_processors': ( 'django.contrib.auth.context_processors.auth', 'django.contrib.messages.context_processors.messages', ), }, }], ROOT_URLCONF='messages_tests.urls', MESSAGE_TAGS={}, MESSAGE_STORAGE='%s.%s' % (self.storage_class.__module__, self.storage_class.__name__), SESSION_SERIALIZER='django.contrib.sessions.serializers.JSONSerializer', ) self.settings_override.enable() def tearDown(self): self.settings_override.disable() def get_request(self): return HttpRequest() def get_response(self): return HttpResponse() def get_storage(self, data=None): """ Return the storage backend, setting its loaded data to the ``data`` argument. This method avoids the storage ``_get`` method from getting called so that other parts of the storage backend can be tested independent of the message retrieval logic. """ storage = self.storage_class(self.get_request()) storage._loaded_data = data or [] return storage def test_repr(self): request = self.get_request() storage = self.storage_class(request) self.assertEqual( repr(storage), f'<{self.storage_class.__qualname__}: request=>', ) def test_add(self): storage = self.get_storage() self.assertFalse(storage.added_new) storage.add(constants.INFO, 'Test message 1') self.assertTrue(storage.added_new) storage.add(constants.INFO, 'Test message 2', extra_tags='tag') self.assertEqual(len(storage), 2) def test_add_lazy_translation(self): storage = self.get_storage() response = self.get_response() storage.add(constants.INFO, gettext_lazy('lazy message')) storage.update(response) storing = self.stored_messages_count(storage, response) self.assertEqual(storing, 1) def test_no_update(self): storage = self.get_storage() response = self.get_response() storage.update(response) storing = self.stored_messages_count(storage, response) self.assertEqual(storing, 0) def test_add_update(self): storage = self.get_storage() response = self.get_response() storage.add(constants.INFO, 'Test message 1') storage.add(constants.INFO, 'Test message 1', extra_tags='tag') storage.update(response) storing = self.stored_messages_count(storage, response) self.assertEqual(storing, 2) def test_existing_add_read_update(self): storage = self.get_existing_storage() response = self.get_response() storage.add(constants.INFO, 'Test message 3') list(storage) # Simulates a read storage.update(response) storing = self.stored_messages_count(storage, response) self.assertEqual(storing, 0) def test_existing_read_add_update(self): storage = self.get_existing_storage() response = self.get_response() list(storage) # Simulates a read storage.add(constants.INFO, 'Test message 3') storage.update(response) storing = self.stored_messages_count(storage, response) self.assertEqual(storing, 1) @override_settings(MESSAGE_LEVEL=constants.DEBUG) def test_full_request_response_cycle(self): """ With the message middleware enabled, messages are properly stored and retrieved across the full request/redirect/response cycle. """ data = { 'messages': ['Test message %d' % x for x in range(5)], } show_url = reverse('show_message') for level in ('debug', 'info', 'success', 'warning', 'error'): add_url = reverse('add_message', args=(level,)) response = self.client.post(add_url, data, follow=True) self.assertRedirects(response, show_url) self.assertIn('messages', response.context) messages = [Message(self.levels[level], msg) for msg in data['messages']] self.assertEqual(list(response.context['messages']), messages) for msg in data['messages']: self.assertContains(response, msg) @override_settings(MESSAGE_LEVEL=constants.DEBUG) def test_with_template_response(self): data = { 'messages': ['Test message %d' % x for x in range(5)], } show_url = reverse('show_template_response') for level in self.levels: add_url = reverse('add_template_response', args=(level,)) response = self.client.post(add_url, data, follow=True) self.assertRedirects(response, show_url) self.assertIn('messages', response.context) for msg in data['messages']: self.assertContains(response, msg) # there shouldn't be any messages on second GET request response = self.client.get(show_url) for msg in data['messages']: self.assertNotContains(response, msg) def test_context_processor_message_levels(self): show_url = reverse('show_template_response') response = self.client.get(show_url) self.assertIn('DEFAULT_MESSAGE_LEVELS', response.context) self.assertEqual(response.context['DEFAULT_MESSAGE_LEVELS'], DEFAULT_LEVELS) @override_settings(MESSAGE_LEVEL=constants.DEBUG) def test_multiple_posts(self): """ Messages persist properly when multiple POSTs are made before a GET. """ data = { 'messages': ['Test message %d' % x for x in range(5)], } show_url = reverse('show_message') messages = [] for level in ('debug', 'info', 'success', 'warning', 'error'): messages.extend(Message(self.levels[level], msg) for msg in data['messages']) add_url = reverse('add_message', args=(level,)) self.client.post(add_url, data) response = self.client.get(show_url) self.assertIn('messages', response.context) self.assertEqual(list(response.context['messages']), messages) for msg in data['messages']: self.assertContains(response, msg) @modify_settings( INSTALLED_APPS={'remove': 'django.contrib.messages'}, MIDDLEWARE={'remove': 'django.contrib.messages.middleware.MessageMiddleware'}, ) @override_settings( MESSAGE_LEVEL=constants.DEBUG, TEMPLATES=[{ 'BACKEND': 'django.template.backends.django.DjangoTemplates', 'DIRS': [], 'APP_DIRS': True, }], ) def test_middleware_disabled(self): """ When the middleware is disabled, an exception is raised when one attempts to store a message. """ data = { 'messages': ['Test message %d' % x for x in range(5)], } reverse('show_message') for level in ('debug', 'info', 'success', 'warning', 'error'): add_url = reverse('add_message', args=(level,)) with self.assertRaises(MessageFailure): self.client.post(add_url, data, follow=True) @modify_settings( INSTALLED_APPS={'remove': 'django.contrib.messages'}, MIDDLEWARE={'remove': 'django.contrib.messages.middleware.MessageMiddleware'}, ) @override_settings( TEMPLATES=[{ 'BACKEND': 'django.template.backends.django.DjangoTemplates', 'DIRS': [], 'APP_DIRS': True, }], ) def test_middleware_disabled_fail_silently(self): """ When the middleware is disabled, an exception is not raised if 'fail_silently' is True. """ data = { 'messages': ['Test message %d' % x for x in range(5)], 'fail_silently': True, } show_url = reverse('show_message') for level in ('debug', 'info', 'success', 'warning', 'error'): add_url = reverse('add_message', args=(level,)) response = self.client.post(add_url, data, follow=True) self.assertRedirects(response, show_url) self.assertNotIn('messages', response.context) def stored_messages_count(self, storage, response): """ Return the number of messages being stored after a ``storage.update()`` call. """ raise NotImplementedError('This method must be set by a subclass.') def test_get(self): raise NotImplementedError('This method must be set by a subclass.') def get_existing_storage(self): return self.get_storage([ Message(constants.INFO, 'Test message 1'), Message(constants.INFO, 'Test message 2', extra_tags='tag'), ]) def test_existing_read(self): """ Reading the existing storage doesn't cause the data to be lost. """ storage = self.get_existing_storage() self.assertFalse(storage.used) # After iterating the storage engine directly, the used flag is set. data = list(storage) self.assertTrue(storage.used) # The data does not disappear because it has been iterated. self.assertEqual(data, list(storage)) def test_existing_add(self): storage = self.get_existing_storage() self.assertFalse(storage.added_new) storage.add(constants.INFO, 'Test message 3') self.assertTrue(storage.added_new) def test_default_level(self): # get_level works even with no storage on the request. request = self.get_request() self.assertEqual(get_level(request), constants.INFO) # get_level returns the default level if it hasn't been set. storage = self.get_storage() request._messages = storage self.assertEqual(get_level(request), constants.INFO) # Only messages of sufficient level get recorded. add_level_messages(storage) self.assertEqual(len(storage), 5) def test_low_level(self): request = self.get_request() storage = self.storage_class(request) request._messages = storage self.assertTrue(set_level(request, 5)) self.assertEqual(get_level(request), 5) add_level_messages(storage) self.assertEqual(len(storage), 6) def test_high_level(self): request = self.get_request() storage = self.storage_class(request) request._messages = storage self.assertTrue(set_level(request, 30)) self.assertEqual(get_level(request), 30) add_level_messages(storage) self.assertEqual(len(storage), 2) @override_settings(MESSAGE_LEVEL=29) def test_settings_level(self): request = self.get_request() storage = self.storage_class(request) self.assertEqual(get_level(request), 29) add_level_messages(storage) self.assertEqual(len(storage), 3) def test_tags(self): storage = self.get_storage() storage.level = 0 add_level_messages(storage) storage.add(constants.INFO, 'A generic info message', extra_tags=None) tags = [msg.tags for msg in storage] self.assertEqual(tags, ['info', '', 'extra-tag debug', 'warning', 'error', 'success', 'info']) def test_level_tag(self): storage = self.get_storage() storage.level = 0 add_level_messages(storage) tags = [msg.level_tag for msg in storage] self.assertEqual(tags, ['info', '', 'debug', 'warning', 'error', 'success']) @override_settings(MESSAGE_TAGS={ constants.INFO: 'info', constants.DEBUG: '', constants.WARNING: '', constants.ERROR: 'bad', 29: 'custom', }) def test_custom_tags(self): storage = self.get_storage() storage.level = 0 add_level_messages(storage) tags = [msg.tags for msg in storage] self.assertEqual(tags, ['info', 'custom', 'extra-tag', '', 'bad', 'success'])