diff --git a/django/db/backends/mysql/creation.py b/django/db/backends/mysql/creation.py index c53b9f8335..546fb6a629 100644 --- a/django/db/backends/mysql/creation.py +++ b/django/db/backends/mysql/creation.py @@ -17,24 +17,45 @@ class DatabaseCreation(BaseDatabaseCreation): suffix.append('COLLATE %s' % test_settings['COLLATION']) return ' '.join(suffix) + def _execute_create_test_db(self, cursor, parameters, keepdb=False): + try: + if keepdb: + # If the database should be kept, add "IF NOT EXISTS" to avoid + # "database exists" error, also temporarily disable "database + # exists" warning. + cursor.execute(''' + SET @_tmp_sql_notes := @@sql_notes, sql_notes = 0; + CREATE DATABASE IF NOT EXISTS %(dbname)s %(suffix)s; + SET sql_notes = @_tmp_sql_notes; + ''' % parameters) + else: + super()._execute_create_test_db(cursor, parameters, keepdb) + except Exception as e: + if len(e.args) < 1 or e.args[0] != 1007: + # All errors except "database exists" (1007) cancel tests. + sys.stderr.write('Got an error creating the test database: %s\n' % e) + sys.exit(2) + else: + raise e + def _clone_test_db(self, number, verbosity, keepdb=False): - qn = self.connection.ops.quote_name source_database_name = self.connection.settings_dict['NAME'] target_database_name = self.get_test_db_clone_settings(number)['NAME'] - + test_db_params = { + 'dbname': self.connection.ops.quote_name(target_database_name), + 'suffix': self.sql_table_creation_suffix(), + } with self._nodb_connection.cursor() as cursor: try: - cursor.execute("CREATE DATABASE %s" % qn(target_database_name)) + self._execute_create_test_db(cursor, test_db_params, keepdb) except Exception: - if keepdb: - return try: if verbosity >= 1: print("Destroying old test database for alias %s..." % ( self._get_database_display_str(verbosity, target_database_name), )) - cursor.execute("DROP DATABASE %s" % qn(target_database_name)) - cursor.execute("CREATE DATABASE %s" % qn(target_database_name)) + cursor.execute('DROP DATABASE %(dbname)s' % test_db_params) + self._execute_create_test_db(cursor, test_db_params, keepdb) except Exception as e: sys.stderr.write("Got an error recreating the test database: %s\n" % e) sys.exit(2) diff --git a/tests/backends/test_creation.py b/tests/backends/test_creation.py index f96102117b..2ecd03ce2d 100644 --- a/tests/backends/test_creation.py +++ b/tests/backends/test_creation.py @@ -8,6 +8,8 @@ from django.db import DEFAULT_DB_ALIAS, connection, connections from django.db.backends.base.creation import ( TEST_DATABASE_PREFIX, BaseDatabaseCreation, ) +from django.db.backends.mysql.creation import \ + DatabaseCreation as MySQLDatabaseCreation from django.db.backends.oracle.creation import \ DatabaseCreation as OracleDatabaseCreation from django.db.backends.postgresql.creation import \ @@ -191,3 +193,39 @@ class OracleDatabaseCreationTests(TestCase): creation._create_test_db(verbosity=0, keepdb=False) with self.assertRaises(SystemExit): creation._create_test_db(verbosity=0, keepdb=True) + + +@unittest.skipUnless(connection.vendor == 'mysql', "MySQL specific tests") +class MySQLDatabaseCreationTests(SimpleTestCase): + + def _execute_raise_database_exists(self, cursor, parameters, keepdb=False): + raise DatabaseError(1007, "Can't create database '%s'; database exists" % parameters['dbname']) + + def _execute_raise_access_denied(self, cursor, parameters, keepdb=False): + raise DatabaseError(1044, "Access denied for user") + + def patch_test_db_creation(self, execute_create_test_db): + return mock.patch.object(BaseDatabaseCreation, '_execute_create_test_db', execute_create_test_db) + + @mock.patch('sys.stdout', new_callable=StringIO) + @mock.patch('sys.stderr', new_callable=StringIO) + def test_create_test_db_database_exists(self, *mocked_objects): + # Simulate test database creation raising "database exists" + creation = MySQLDatabaseCreation(connection) + with self.patch_test_db_creation(self._execute_raise_database_exists): + with mock.patch('builtins.input', return_value='no'): + with self.assertRaises(SystemExit): + # SystemExit is raised if the user answers "no" to the + # prompt asking if it's okay to delete the test database. + creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False) + # "Database exists" shouldn't appear when keepdb is on + creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True) + + @mock.patch('sys.stdout', new_callable=StringIO) + @mock.patch('sys.stderr', new_callable=StringIO) + def test_create_test_db_unexpected_error(self, *mocked_objects): + # Simulate test database creation raising unexpected error + creation = MySQLDatabaseCreation(connection) + with self.patch_test_db_creation(self._execute_raise_access_denied): + with self.assertRaises(SystemExit): + creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)