1
0
mirror of https://github.com/django/django.git synced 2025-01-24 09:09:20 +00:00
Hannes Ljungberg 4d99375b46 Fixed #32453 -- Added introspection of unique constraint field ordering on SQLite.
Co-authored-by: Mariusz Felisiak <felisiak.mariusz@gmail.com>
2021-02-17 10:59:17 +01:00

305 lines
15 KiB
Python

from unittest import mock, skipUnless
from django.db import DatabaseError, connection
from django.db.models import Index
from django.test import TransactionTestCase, skipUnlessDBFeature
from .models import (
Article, ArticleReporter, CheckConstraintModel, City, Comment, Country,
District, Reporter, UniqueConstraintConditionModel,
)
class IntrospectionTests(TransactionTestCase):
available_apps = ['introspection']
def test_table_names(self):
tl = connection.introspection.table_names()
self.assertEqual(tl, sorted(tl))
self.assertIn(Reporter._meta.db_table, tl, "'%s' isn't in table_list()." % Reporter._meta.db_table)
self.assertIn(Article._meta.db_table, tl, "'%s' isn't in table_list()." % Article._meta.db_table)
def test_django_table_names(self):
with connection.cursor() as cursor:
cursor.execute('CREATE TABLE django_ixn_test_table (id INTEGER);')
tl = connection.introspection.django_table_names()
cursor.execute("DROP TABLE django_ixn_test_table;")
self.assertNotIn('django_ixn_test_table', tl,
"django_table_names() returned a non-Django table")
def test_django_table_names_retval_type(self):
# Table name is a list #15216
tl = connection.introspection.django_table_names(only_existing=True)
self.assertIs(type(tl), list)
tl = connection.introspection.django_table_names(only_existing=False)
self.assertIs(type(tl), list)
def test_table_names_with_views(self):
with connection.cursor() as cursor:
try:
cursor.execute(
'CREATE VIEW introspection_article_view AS SELECT headline '
'from introspection_article;')
except DatabaseError as e:
if 'insufficient privileges' in str(e):
self.fail("The test user has no CREATE VIEW privileges")
else:
raise
try:
self.assertIn('introspection_article_view', connection.introspection.table_names(include_views=True))
self.assertNotIn('introspection_article_view', connection.introspection.table_names())
finally:
with connection.cursor() as cursor:
cursor.execute('DROP VIEW introspection_article_view')
def test_unmanaged_through_model(self):
tables = connection.introspection.django_table_names()
self.assertNotIn(ArticleReporter._meta.db_table, tables)
def test_installed_models(self):
tables = [Article._meta.db_table, Reporter._meta.db_table]
models = connection.introspection.installed_models(tables)
self.assertEqual(models, {Article, Reporter})
def test_sequence_list(self):
sequences = connection.introspection.sequence_list()
reporter_seqs = [seq for seq in sequences if seq['table'] == Reporter._meta.db_table]
self.assertEqual(len(reporter_seqs), 1, 'Reporter sequence not found in sequence_list()')
self.assertEqual(reporter_seqs[0]['column'], 'id')
def test_get_table_description_names(self):
with connection.cursor() as cursor:
desc = connection.introspection.get_table_description(cursor, Reporter._meta.db_table)
self.assertEqual([r[0] for r in desc],
[f.column for f in Reporter._meta.fields])
def test_get_table_description_types(self):
with connection.cursor() as cursor:
desc = connection.introspection.get_table_description(cursor, Reporter._meta.db_table)
self.assertEqual(
[connection.introspection.get_field_type(r[1], r) for r in desc],
[
connection.features.introspected_field_types[field] for field in (
'AutoField', 'CharField', 'CharField', 'CharField',
'BigIntegerField', 'BinaryField', 'SmallIntegerField',
'DurationField',
)
],
)
def test_get_table_description_col_lengths(self):
with connection.cursor() as cursor:
desc = connection.introspection.get_table_description(cursor, Reporter._meta.db_table)
self.assertEqual(
[r[3] for r in desc if connection.introspection.get_field_type(r[1], r) == 'CharField'],
[30, 30, 254]
)
def test_get_table_description_nullable(self):
with connection.cursor() as cursor:
desc = connection.introspection.get_table_description(cursor, Reporter._meta.db_table)
nullable_by_backend = connection.features.interprets_empty_strings_as_nulls
self.assertEqual(
[r[6] for r in desc],
[False, nullable_by_backend, nullable_by_backend, nullable_by_backend, True, True, False, False]
)
def test_bigautofield(self):
with connection.cursor() as cursor:
desc = connection.introspection.get_table_description(cursor, City._meta.db_table)
self.assertIn(
connection.features.introspected_field_types['BigAutoField'],
[connection.introspection.get_field_type(r[1], r) for r in desc],
)
def test_smallautofield(self):
with connection.cursor() as cursor:
desc = connection.introspection.get_table_description(cursor, Country._meta.db_table)
self.assertIn(
connection.features.introspected_field_types['SmallAutoField'],
[connection.introspection.get_field_type(r[1], r) for r in desc],
)
# Regression test for #9991 - 'real' types in postgres
@skipUnlessDBFeature('has_real_datatype')
def test_postgresql_real_type(self):
with connection.cursor() as cursor:
cursor.execute("CREATE TABLE django_ixn_real_test_table (number REAL);")
desc = connection.introspection.get_table_description(cursor, 'django_ixn_real_test_table')
cursor.execute('DROP TABLE django_ixn_real_test_table;')
self.assertEqual(connection.introspection.get_field_type(desc[0][1], desc[0]), 'FloatField')
@skipUnlessDBFeature('can_introspect_foreign_keys')
def test_get_relations(self):
with connection.cursor() as cursor:
relations = connection.introspection.get_relations(cursor, Article._meta.db_table)
# That's {field_name: (field_name_other_table, other_table)}
expected_relations = {
'reporter_id': ('id', Reporter._meta.db_table),
'response_to_id': ('id', Article._meta.db_table),
}
self.assertEqual(relations, expected_relations)
# Removing a field shouldn't disturb get_relations (#17785)
body = Article._meta.get_field('body')
with connection.schema_editor() as editor:
editor.remove_field(Article, body)
with connection.cursor() as cursor:
relations = connection.introspection.get_relations(cursor, Article._meta.db_table)
with connection.schema_editor() as editor:
editor.add_field(Article, body)
self.assertEqual(relations, expected_relations)
@skipUnless(connection.vendor == 'sqlite', "This is an sqlite-specific issue")
def test_get_relations_alt_format(self):
"""
With SQLite, foreign keys can be added with different syntaxes and
formatting.
"""
create_table_statements = [
"CREATE TABLE track(id, art_id INTEGER, FOREIGN KEY(art_id) REFERENCES {}(id));",
"CREATE TABLE track(id, art_id INTEGER, FOREIGN KEY (art_id) REFERENCES {}(id));"
]
for statement in create_table_statements:
with connection.cursor() as cursor:
cursor.fetchone = mock.Mock(return_value=[statement.format(Article._meta.db_table), 'table'])
relations = connection.introspection.get_relations(cursor, 'mocked_table')
self.assertEqual(relations, {'art_id': ('id', Article._meta.db_table)})
@skipUnlessDBFeature('can_introspect_foreign_keys')
def test_get_key_columns(self):
with connection.cursor() as cursor:
key_columns = connection.introspection.get_key_columns(cursor, Article._meta.db_table)
self.assertEqual(set(key_columns), {
('reporter_id', Reporter._meta.db_table, 'id'),
('response_to_id', Article._meta.db_table, 'id'),
})
def test_get_primary_key_column(self):
with connection.cursor() as cursor:
primary_key_column = connection.introspection.get_primary_key_column(cursor, Article._meta.db_table)
pk_fk_column = connection.introspection.get_primary_key_column(cursor, District._meta.db_table)
self.assertEqual(primary_key_column, 'id')
self.assertEqual(pk_fk_column, 'city_id')
def test_get_constraints_index_types(self):
with connection.cursor() as cursor:
constraints = connection.introspection.get_constraints(cursor, Article._meta.db_table)
index = {}
index2 = {}
for val in constraints.values():
if val['columns'] == ['headline', 'pub_date']:
index = val
if val['columns'] == ['headline', 'response_to_id', 'pub_date', 'reporter_id']:
index2 = val
self.assertEqual(index['type'], Index.suffix)
self.assertEqual(index2['type'], Index.suffix)
@skipUnlessDBFeature('supports_index_column_ordering')
def test_get_constraints_indexes_orders(self):
"""
Indexes have the 'orders' key with a list of 'ASC'/'DESC' values.
"""
with connection.cursor() as cursor:
constraints = connection.introspection.get_constraints(cursor, Article._meta.db_table)
indexes_verified = 0
expected_columns = [
['headline', 'pub_date'],
['headline', 'response_to_id', 'pub_date', 'reporter_id'],
]
if connection.features.indexes_foreign_keys:
expected_columns += [
['reporter_id'],
['response_to_id'],
]
for val in constraints.values():
if val['index'] and not (val['primary_key'] or val['unique']):
self.assertIn(val['columns'], expected_columns)
self.assertEqual(val['orders'], ['ASC'] * len(val['columns']))
indexes_verified += 1
self.assertEqual(indexes_verified, len(expected_columns))
@skipUnlessDBFeature('supports_index_column_ordering', 'supports_partial_indexes')
def test_get_constraints_unique_indexes_orders(self):
with connection.cursor() as cursor:
constraints = connection.introspection.get_constraints(
cursor,
UniqueConstraintConditionModel._meta.db_table,
)
self.assertIn('cond_name_without_color_uniq', constraints)
constraint = constraints['cond_name_without_color_uniq']
self.assertIs(constraint['unique'], True)
self.assertEqual(constraint['columns'], ['name'])
self.assertEqual(constraint['orders'], ['ASC'])
def test_get_constraints(self):
def assertDetails(details, cols, primary_key=False, unique=False, index=False, check=False, foreign_key=None):
# Different backends have different values for same constraints:
# PRIMARY KEY UNIQUE CONSTRAINT UNIQUE INDEX
# MySQL pk=1 uniq=1 idx=1 pk=0 uniq=1 idx=1 pk=0 uniq=1 idx=1
# PostgreSQL pk=1 uniq=1 idx=0 pk=0 uniq=1 idx=0 pk=0 uniq=1 idx=1
# SQLite pk=1 uniq=0 idx=0 pk=0 uniq=1 idx=0 pk=0 uniq=1 idx=1
if details['primary_key']:
details['unique'] = True
if details['unique']:
details['index'] = False
self.assertEqual(details['columns'], cols)
self.assertEqual(details['primary_key'], primary_key)
self.assertEqual(details['unique'], unique)
self.assertEqual(details['index'], index)
self.assertEqual(details['check'], check)
self.assertEqual(details['foreign_key'], foreign_key)
# Test custom constraints
custom_constraints = {
'article_email_pub_date_uniq',
'email_pub_date_idx',
}
with connection.cursor() as cursor:
constraints = connection.introspection.get_constraints(cursor, Comment._meta.db_table)
if (
connection.features.supports_column_check_constraints and
connection.features.can_introspect_check_constraints
):
constraints.update(
connection.introspection.get_constraints(cursor, CheckConstraintModel._meta.db_table)
)
custom_constraints.add('up_votes_gte_0_check')
assertDetails(constraints['up_votes_gte_0_check'], ['up_votes'], check=True)
assertDetails(constraints['article_email_pub_date_uniq'], ['article_id', 'email', 'pub_date'], unique=True)
assertDetails(constraints['email_pub_date_idx'], ['email', 'pub_date'], index=True)
# Test field constraints
field_constraints = set()
for name, details in constraints.items():
if name in custom_constraints:
continue
elif details['columns'] == ['up_votes'] and details['check']:
assertDetails(details, ['up_votes'], check=True)
field_constraints.add(name)
elif details['columns'] == ['voting_number'] and details['check']:
assertDetails(details, ['voting_number'], check=True)
field_constraints.add(name)
elif details['columns'] == ['ref'] and details['unique']:
assertDetails(details, ['ref'], unique=True)
field_constraints.add(name)
elif details['columns'] == ['voting_number'] and details['unique']:
assertDetails(details, ['voting_number'], unique=True)
field_constraints.add(name)
elif details['columns'] == ['article_id'] and details['index']:
assertDetails(details, ['article_id'], index=True)
field_constraints.add(name)
elif details['columns'] == ['id'] and details['primary_key']:
assertDetails(details, ['id'], primary_key=True, unique=True)
field_constraints.add(name)
elif details['columns'] == ['article_id'] and details['foreign_key']:
assertDetails(details, ['article_id'], foreign_key=('introspection_article', 'id'))
field_constraints.add(name)
elif details['check']:
# Some databases (e.g. Oracle) include additional check
# constraints.
field_constraints.add(name)
# All constraints are accounted for.
self.assertEqual(constraints.keys() ^ (custom_constraints | field_constraints), set())