1
0
mirror of https://github.com/django/django.git synced 2025-01-17 22:02:54 +00:00

112 lines
4.9 KiB
Python
Raw Normal View History

from __future__ import unicode_literals
import re
import unittest
from django.apps import apps
from django.core.management.color import no_style
from django.core.management.sql import (sql_create, sql_delete, sql_indexes,
sql_destroy_indexes, sql_all)
from django.db import connections, DEFAULT_DB_ALIAS
from django.test import TestCase, ignore_warnings, override_settings
from django.utils import six
from django.utils.deprecation import RemovedInDjango20Warning
# See also initial_sql_regress for 'custom_sql_for_model' tests
class SQLCommandsTestCase(TestCase):
"""Tests for several functions in django/core/management/sql.py"""
def count_ddl(self, output, cmd):
return len([o for o in output if o.startswith(cmd)])
def test_sql_create(self):
app_config = apps.get_app_config('commands_sql')
output = sql_create(app_config, no_style(), connections[DEFAULT_DB_ALIAS])
tables = set()
create_table_re = re.compile(r'^create table .(?P<table>[\w_]+).*', re.IGNORECASE)
reference_re = re.compile(r'.* references .(?P<table>[\w_]+).*', re.IGNORECASE)
for statement in output:
create_table = create_table_re.match(statement)
if create_table:
# Lower since Oracle's table names are upper cased.
tables.add(create_table.group('table').lower())
continue
reference = reference_re.match(statement)
if reference:
# Lower since Oracle's table names are upper cased.
table = reference.group('table').lower()
self.assertIn(
table, tables, "The table %s is referenced before its creation." % table
)
self.assertEqual(tables, {
'commands_sql_comment', 'commands_sql_book', 'commands_sql_book_comments'
})
@unittest.skipUnless('PositiveIntegerField' in connections[DEFAULT_DB_ALIAS].data_type_check_constraints, 'Backend does not have checks.')
def test_sql_create_check(self):
"""Regression test for #23416 -- Check that db_params['check'] is respected."""
app_config = apps.get_app_config('commands_sql')
output = sql_create(app_config, no_style(), connections[DEFAULT_DB_ALIAS])
success = False
for statement in output:
if 'CHECK' in statement:
success = True
if not success:
self.fail("'CHECK' not found in output %s" % output)
def test_sql_delete(self):
app_config = apps.get_app_config('commands_sql')
output = sql_delete(app_config, no_style(), connections[DEFAULT_DB_ALIAS], close_connection=False)
drop_tables = [o for o in output if o.startswith('DROP TABLE')]
self.assertEqual(len(drop_tables), 3)
# Lower so that Oracle's upper case tbl names wont break
sql = drop_tables[-1].lower()
six.assertRegex(self, sql, r'^drop table .commands_sql_comment.*')
@ignore_warnings(category=RemovedInDjango20Warning)
def test_sql_indexes(self):
app_config = apps.get_app_config('commands_sql')
output = sql_indexes(app_config, no_style(), connections[DEFAULT_DB_ALIAS])
# Number of indexes is backend-dependent
self.assertTrue(1 <= self.count_ddl(output, 'CREATE INDEX') <= 4)
def test_sql_destroy_indexes(self):
app_config = apps.get_app_config('commands_sql')
output = sql_destroy_indexes(app_config, no_style(), connections[DEFAULT_DB_ALIAS])
# Number of indexes is backend-dependent
self.assertTrue(1 <= self.count_ddl(output, 'DROP INDEX') <= 4)
@ignore_warnings(category=RemovedInDjango20Warning)
def test_sql_all(self):
app_config = apps.get_app_config('commands_sql')
output = sql_all(app_config, no_style(), connections[DEFAULT_DB_ALIAS])
self.assertEqual(self.count_ddl(output, 'CREATE TABLE'), 3)
# Number of indexes is backend-dependent
self.assertTrue(1 <= self.count_ddl(output, 'CREATE INDEX') <= 4)
class TestRouter(object):
def allow_migrate(self, db, model):
return False
@override_settings(DATABASE_ROUTERS=[TestRouter()])
class SQLCommandsRouterTestCase(TestCase):
def test_router_honored(self):
app_config = apps.get_app_config('commands_sql')
for sql_command in (sql_all, sql_create, sql_delete, sql_indexes, sql_destroy_indexes):
if sql_command is sql_delete:
output = sql_command(app_config, no_style(), connections[DEFAULT_DB_ALIAS], close_connection=False)
# "App creates no tables in the database. Nothing to do."
expected_output = 1
else:
output = sql_command(app_config, no_style(), connections[DEFAULT_DB_ALIAS])
expected_output = 0
self.assertEqual(len(output), expected_output,
"%s command is not honoring routers" % sql_command.__name__)