mirror of
				https://github.com/django/django.git
				synced 2025-10-26 15:16:09 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			118 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			118 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import unittest
 | |
| from contextlib import contextmanager
 | |
| from unittest import mock
 | |
| 
 | |
| from django.core.exceptions import ImproperlyConfigured
 | |
| from django.db import NotSupportedError, connection
 | |
| from django.test import TestCase, override_settings
 | |
| 
 | |
| 
 | |
| @contextmanager
 | |
| def get_connection():
 | |
|     new_connection = connection.copy()
 | |
|     yield new_connection
 | |
|     new_connection.close()
 | |
| 
 | |
| 
 | |
| @override_settings(DEBUG=True)
 | |
| @unittest.skipUnless(connection.vendor == "mysql", "MySQL tests")
 | |
| class IsolationLevelTests(TestCase):
 | |
|     read_committed = "read committed"
 | |
|     repeatable_read = "repeatable read"
 | |
|     isolation_values = {
 | |
|         level: level.upper() for level in (read_committed, repeatable_read)
 | |
|     }
 | |
| 
 | |
|     @classmethod
 | |
|     def setUpClass(cls):
 | |
|         super().setUpClass()
 | |
|         configured_isolation_level = (
 | |
|             connection.isolation_level or cls.isolation_values[cls.repeatable_read]
 | |
|         )
 | |
|         cls.configured_isolation_level = configured_isolation_level.upper()
 | |
|         cls.other_isolation_level = (
 | |
|             cls.read_committed
 | |
|             if configured_isolation_level != cls.isolation_values[cls.read_committed]
 | |
|             else cls.repeatable_read
 | |
|         )
 | |
| 
 | |
|     @staticmethod
 | |
|     def get_isolation_level(connection):
 | |
|         with connection.cursor() as cursor:
 | |
|             cursor.execute(
 | |
|                 "SHOW VARIABLES "
 | |
|                 "WHERE variable_name IN ('transaction_isolation', 'tx_isolation')"
 | |
|             )
 | |
|             return cursor.fetchone()[1].replace("-", " ")
 | |
| 
 | |
|     def test_auto_is_null_auto_config(self):
 | |
|         query = "set sql_auto_is_null = 0"
 | |
|         connection.init_connection_state()
 | |
|         last_query = connection.queries[-1]["sql"].lower()
 | |
|         if connection.features.is_sql_auto_is_null_enabled:
 | |
|             self.assertIn(query, last_query)
 | |
|         else:
 | |
|             self.assertNotIn(query, last_query)
 | |
| 
 | |
|     def test_connect_isolation_level(self):
 | |
|         self.assertEqual(
 | |
|             self.get_isolation_level(connection), self.configured_isolation_level
 | |
|         )
 | |
| 
 | |
|     def test_setting_isolation_level(self):
 | |
|         with get_connection() as new_connection:
 | |
|             new_connection.settings_dict["OPTIONS"][
 | |
|                 "isolation_level"
 | |
|             ] = self.other_isolation_level
 | |
|             self.assertEqual(
 | |
|                 self.get_isolation_level(new_connection),
 | |
|                 self.isolation_values[self.other_isolation_level],
 | |
|             )
 | |
| 
 | |
|     def test_uppercase_isolation_level(self):
 | |
|         # Upper case values are also accepted in 'isolation_level'.
 | |
|         with get_connection() as new_connection:
 | |
|             new_connection.settings_dict["OPTIONS"][
 | |
|                 "isolation_level"
 | |
|             ] = self.other_isolation_level.upper()
 | |
|             self.assertEqual(
 | |
|                 self.get_isolation_level(new_connection),
 | |
|                 self.isolation_values[self.other_isolation_level],
 | |
|             )
 | |
| 
 | |
|     def test_default_isolation_level(self):
 | |
|         # If not specified in settings, the default is read committed.
 | |
|         with get_connection() as new_connection:
 | |
|             new_connection.settings_dict["OPTIONS"].pop("isolation_level", None)
 | |
|             self.assertEqual(
 | |
|                 self.get_isolation_level(new_connection),
 | |
|                 self.isolation_values[self.read_committed],
 | |
|             )
 | |
| 
 | |
|     def test_isolation_level_validation(self):
 | |
|         new_connection = connection.copy()
 | |
|         new_connection.settings_dict["OPTIONS"]["isolation_level"] = "xxx"
 | |
|         msg = (
 | |
|             "Invalid transaction isolation level 'xxx' specified.\n"
 | |
|             "Use one of 'read committed', 'read uncommitted', "
 | |
|             "'repeatable read', 'serializable', or None."
 | |
|         )
 | |
|         with self.assertRaisesMessage(ImproperlyConfigured, msg):
 | |
|             new_connection.cursor()
 | |
| 
 | |
| 
 | |
| @unittest.skipUnless(connection.vendor == "mysql", "MySQL tests")
 | |
| class Tests(TestCase):
 | |
|     @mock.patch.object(connection, "get_database_version")
 | |
|     def test_check_database_version_supported(self, mocked_get_database_version):
 | |
|         if connection.mysql_is_mariadb:
 | |
|             mocked_get_database_version.return_value = (10, 4)
 | |
|             msg = "MariaDB 10.5 or later is required (found 10.4)."
 | |
|         else:
 | |
|             mocked_get_database_version.return_value = (8, 0, 4)
 | |
|             msg = "MySQL 8.0.11 or later is required (found 8.0.4)."
 | |
| 
 | |
|         with self.assertRaisesMessage(NotSupportedError, msg):
 | |
|             connection.check_database_version_supported()
 | |
|         self.assertTrue(mocked_get_database_version.called)
 |