import unittest
from contextlib import contextmanager

from django.core.exceptions import ImproperlyConfigured
from django.db import connection
from django.test import TestCase, override_settings


@contextmanager
def get_connection():
    new_connection = connection.copy()
    yield new_connection
    new_connection.close()


@override_settings(DEBUG=True)
@unittest.skipUnless(connection.vendor == 'mysql', 'MySQL tests')
class IsolationLevelTests(TestCase):

    read_committed = 'read committed'
    repeatable_read = 'repeatable read'
    isolation_values = {
        level: level.upper()
        for level in (read_committed, repeatable_read)
    }

    @classmethod
    def setUpClass(cls):
        super().setUpClass()
        configured_isolation_level = connection.isolation_level or cls.isolation_values[cls.repeatable_read]
        cls.configured_isolation_level = configured_isolation_level.upper()
        cls.other_isolation_level = (
            cls.read_committed
            if configured_isolation_level != cls.isolation_values[cls.read_committed]
            else cls.repeatable_read
        )

    @staticmethod
    def get_isolation_level(connection):
        with connection.cursor() as cursor:
            cursor.execute("SHOW VARIABLES WHERE variable_name IN ('transaction_isolation', 'tx_isolation')")
            return cursor.fetchone()[1].replace('-', ' ')

    def test_auto_is_null_auto_config(self):
        query = 'set sql_auto_is_null = 0'
        connection.init_connection_state()
        last_query = connection.queries[-1]['sql'].lower()
        if connection.features.is_sql_auto_is_null_enabled:
            self.assertIn(query, last_query)
        else:
            self.assertNotIn(query, last_query)

    def test_connect_isolation_level(self):
        self.assertEqual(self.get_isolation_level(connection), self.configured_isolation_level)

    def test_setting_isolation_level(self):
        with get_connection() as new_connection:
            new_connection.settings_dict['OPTIONS']['isolation_level'] = self.other_isolation_level
            self.assertEqual(
                self.get_isolation_level(new_connection),
                self.isolation_values[self.other_isolation_level]
            )

    def test_uppercase_isolation_level(self):
        # Upper case values are also accepted in 'isolation_level'.
        with get_connection() as new_connection:
            new_connection.settings_dict['OPTIONS']['isolation_level'] = self.other_isolation_level.upper()
            self.assertEqual(
                self.get_isolation_level(new_connection),
                self.isolation_values[self.other_isolation_level]
            )

    def test_default_isolation_level(self):
        # If not specified in settings, the default is read committed.
        with get_connection() as new_connection:
            new_connection.settings_dict['OPTIONS'].pop('isolation_level', None)
            self.assertEqual(
                self.get_isolation_level(new_connection),
                self.isolation_values[self.read_committed]
            )

    def test_isolation_level_validation(self):
        new_connection = connection.copy()
        new_connection.settings_dict['OPTIONS']['isolation_level'] = 'xxx'
        msg = (
            "Invalid transaction isolation level 'xxx' specified.\n"
            "Use one of 'read committed', 'read uncommitted', "
            "'repeatable read', 'serializable', or None."
        )
        with self.assertRaisesMessage(ImproperlyConfigured, msg):
            new_connection.cursor()