mirror of
https://github.com/django/django.git
synced 2024-12-23 09:36:06 +00:00
92 lines
3.4 KiB
Python
92 lines
3.4 KiB
Python
import unittest
|
|
from contextlib import contextmanager
|
|
|
|
from django.core.exceptions import ImproperlyConfigured
|
|
from django.db import connection
|
|
from django.test import TestCase, override_settings
|
|
|
|
|
|
@contextmanager
|
|
def get_connection():
|
|
new_connection = connection.copy()
|
|
yield new_connection
|
|
new_connection.close()
|
|
|
|
|
|
@override_settings(DEBUG=True)
|
|
@unittest.skipUnless(connection.vendor == 'mysql', 'MySQL tests')
|
|
class IsolationLevelTests(TestCase):
|
|
|
|
read_committed = 'read committed'
|
|
repeatable_read = 'repeatable read'
|
|
isolation_values = {
|
|
level: level.replace(' ', '-').upper()
|
|
for level in (read_committed, repeatable_read)
|
|
}
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super().setUpClass()
|
|
configured_isolation_level = connection.isolation_level or cls.isolation_values[cls.repeatable_read]
|
|
cls.configured_isolation_level = configured_isolation_level.upper()
|
|
cls.other_isolation_level = (
|
|
cls.read_committed
|
|
if configured_isolation_level != cls.isolation_values[cls.read_committed]
|
|
else cls.repeatable_read
|
|
)
|
|
|
|
@staticmethod
|
|
def get_isolation_level(connection):
|
|
with connection.cursor() as cursor:
|
|
cursor.execute("SELECT @@session.tx_isolation")
|
|
return cursor.fetchone()[0]
|
|
|
|
def test_auto_is_null_auto_config(self):
|
|
query = 'set sql_auto_is_null = 0'
|
|
connection.init_connection_state()
|
|
last_query = connection.queries[-1]['sql'].lower()
|
|
if connection.features.is_sql_auto_is_null_enabled:
|
|
self.assertIn(query, last_query)
|
|
else:
|
|
self.assertNotIn(query, last_query)
|
|
|
|
def test_connect_isolation_level(self):
|
|
self.assertEqual(self.get_isolation_level(connection), self.configured_isolation_level)
|
|
|
|
def test_setting_isolation_level(self):
|
|
with get_connection() as new_connection:
|
|
new_connection.settings_dict['OPTIONS']['isolation_level'] = self.other_isolation_level
|
|
self.assertEqual(
|
|
self.get_isolation_level(new_connection),
|
|
self.isolation_values[self.other_isolation_level]
|
|
)
|
|
|
|
def test_uppercase_isolation_level(self):
|
|
# Upper case values are also accepted in 'isolation_level'.
|
|
with get_connection() as new_connection:
|
|
new_connection.settings_dict['OPTIONS']['isolation_level'] = self.other_isolation_level.upper()
|
|
self.assertEqual(
|
|
self.get_isolation_level(new_connection),
|
|
self.isolation_values[self.other_isolation_level]
|
|
)
|
|
|
|
def test_default_isolation_level(self):
|
|
# If not specified in settings, the default is read committed.
|
|
with get_connection() as new_connection:
|
|
new_connection.settings_dict['OPTIONS'].pop('isolation_level', None)
|
|
self.assertEqual(
|
|
self.get_isolation_level(new_connection),
|
|
self.isolation_values[self.read_committed]
|
|
)
|
|
|
|
def test_isolation_level_validation(self):
|
|
new_connection = connection.copy()
|
|
new_connection.settings_dict['OPTIONS']['isolation_level'] = 'xxx'
|
|
msg = (
|
|
"Invalid transaction isolation level 'xxx' specified.\n"
|
|
"Use one of 'read committed', 'read uncommitted', "
|
|
"'repeatable read', 'serializable', or None."
|
|
)
|
|
with self.assertRaisesMessage(ImproperlyConfigured, msg):
|
|
new_connection.cursor()
|