1
0
mirror of https://github.com/django/django.git synced 2025-01-25 01:30:48 +00:00
Simon Charette 64f9776bc4 Refs #29004 -- Prevented inspectdb tests from flushing all tables data.
This is a costly operation on most database backends.
2018-12-24 15:32:39 -05:00

399 lines
19 KiB
Python

import re
from io import StringIO
from unittest import mock, skipUnless
from django.core.management import call_command
from django.db import connection
from django.db.backends.base.introspection import TableInfo
from django.test import TestCase, TransactionTestCase, skipUnlessDBFeature
from .models import PeopleMoreData
def inspectdb_tables_only(table_name):
"""
Limit introspection to tables created for models of this app.
Some databases such as Oracle are extremely slow at introspection.
"""
return table_name.startswith('inspectdb_')
def special_table_only(table_name):
return table_name.startswith('inspectdb_special')
class InspectDBTestCase(TestCase):
unique_re = re.compile(r'.*unique_together = \((.+),\).*')
def test_stealth_table_name_filter_option(self):
out = StringIO()
call_command('inspectdb', table_name_filter=inspectdb_tables_only, stdout=out)
error_message = "inspectdb has examined a table that should have been filtered out."
# contrib.contenttypes is one of the apps always installed when running
# the Django test suite, check that one of its tables hasn't been
# inspected
self.assertNotIn("class DjangoContentType(models.Model):", out.getvalue(), msg=error_message)
def test_table_option(self):
"""
inspectdb can inspect a subset of tables by passing the table names as
arguments.
"""
out = StringIO()
call_command('inspectdb', 'inspectdb_people', stdout=out)
output = out.getvalue()
self.assertIn('class InspectdbPeople(models.Model):', output)
self.assertNotIn("InspectdbPeopledata", output)
def make_field_type_asserter(self):
"""Call inspectdb and return a function to validate a field type in its output"""
out = StringIO()
call_command('inspectdb', 'inspectdb_columntypes', stdout=out)
output = out.getvalue()
def assertFieldType(name, definition):
out_def = re.search(r'^\s*%s = (models.*)$' % name, output, re.MULTILINE).groups()[0]
self.assertEqual(definition, out_def)
return assertFieldType
def test_field_types(self):
"""Test introspection of various Django field types"""
assertFieldType = self.make_field_type_asserter()
# Inspecting Oracle DB doesn't produce correct results (#19884):
# - it reports fields as blank=True when they aren't.
if not connection.features.interprets_empty_strings_as_nulls:
assertFieldType('char_field', "models.CharField(max_length=10)")
assertFieldType('null_char_field', "models.CharField(max_length=10, blank=True, null=True)")
assertFieldType('email_field', "models.CharField(max_length=254)")
assertFieldType('file_field', "models.CharField(max_length=100)")
assertFieldType('file_path_field', "models.CharField(max_length=100)")
assertFieldType('slug_field', "models.CharField(max_length=50)")
assertFieldType('text_field', "models.TextField()")
assertFieldType('url_field', "models.CharField(max_length=200)")
assertFieldType('date_field', "models.DateField()")
assertFieldType('date_time_field', "models.DateTimeField()")
if connection.features.can_introspect_ip_address_field:
assertFieldType('gen_ip_address_field', "models.GenericIPAddressField()")
elif not connection.features.interprets_empty_strings_as_nulls:
assertFieldType('gen_ip_address_field', "models.CharField(max_length=39)")
if connection.features.can_introspect_time_field:
assertFieldType('time_field', "models.TimeField()")
if connection.features.has_native_uuid_field:
assertFieldType('uuid_field', "models.UUIDField()")
elif not connection.features.interprets_empty_strings_as_nulls:
assertFieldType('uuid_field', "models.CharField(max_length=32)")
def test_number_field_types(self):
"""Test introspection of various Django field types"""
assertFieldType = self.make_field_type_asserter()
if not connection.features.can_introspect_autofield:
assertFieldType('id', "models.IntegerField(primary_key=True) # AutoField?")
if connection.features.can_introspect_big_integer_field:
assertFieldType('big_int_field', "models.BigIntegerField()")
else:
assertFieldType('big_int_field', "models.IntegerField()")
bool_field_type = connection.features.introspected_boolean_field_type
assertFieldType('bool_field', "models.{}()".format(bool_field_type))
assertFieldType('null_bool_field', 'models.{}(blank=True, null=True)'.format(bool_field_type))
if connection.features.can_introspect_decimal_field:
assertFieldType('decimal_field', "models.DecimalField(max_digits=6, decimal_places=1)")
else: # Guessed arguments on SQLite, see #5014
assertFieldType('decimal_field', "models.DecimalField(max_digits=10, decimal_places=5) "
"# max_digits and decimal_places have been guessed, "
"as this database handles decimal fields as float")
assertFieldType('float_field', "models.FloatField()")
assertFieldType('int_field', "models.IntegerField()")
if connection.features.can_introspect_positive_integer_field:
assertFieldType('pos_int_field', "models.PositiveIntegerField()")
else:
assertFieldType('pos_int_field', "models.IntegerField()")
if connection.features.can_introspect_positive_integer_field:
if connection.features.can_introspect_small_integer_field:
assertFieldType('pos_small_int_field', "models.PositiveSmallIntegerField()")
else:
assertFieldType('pos_small_int_field', "models.PositiveIntegerField()")
else:
if connection.features.can_introspect_small_integer_field:
assertFieldType('pos_small_int_field', "models.SmallIntegerField()")
else:
assertFieldType('pos_small_int_field', "models.IntegerField()")
if connection.features.can_introspect_small_integer_field:
assertFieldType('small_int_field', "models.SmallIntegerField()")
else:
assertFieldType('small_int_field', "models.IntegerField()")
@skipUnlessDBFeature('can_introspect_foreign_keys')
def test_attribute_name_not_python_keyword(self):
out = StringIO()
call_command('inspectdb', table_name_filter=inspectdb_tables_only, stdout=out)
output = out.getvalue()
error_message = "inspectdb generated an attribute name which is a Python keyword"
# Recursive foreign keys should be set to 'self'
self.assertIn("parent = models.ForeignKey('self', models.DO_NOTHING)", output)
self.assertNotIn(
"from = models.ForeignKey(InspectdbPeople, models.DO_NOTHING)",
output,
msg=error_message,
)
# As InspectdbPeople model is defined after InspectdbMessage, it should be quoted
self.assertIn(
"from_field = models.ForeignKey('InspectdbPeople', models.DO_NOTHING, db_column='from_id')",
output,
)
self.assertIn(
"people_pk = models.ForeignKey(InspectdbPeople, models.DO_NOTHING, primary_key=True)",
output,
)
self.assertIn(
"people_unique = models.ForeignKey(InspectdbPeople, models.DO_NOTHING, unique=True)",
output,
)
def test_digits_column_name_introspection(self):
"""Introspection of column names consist/start with digits (#16536/#17676)"""
out = StringIO()
call_command('inspectdb', 'inspectdb_digitsincolumnname', stdout=out)
output = out.getvalue()
error_message = "inspectdb generated a model field name which is a number"
self.assertNotIn(" 123 = models.CharField", output, msg=error_message)
self.assertIn("number_123 = models.CharField", output)
error_message = "inspectdb generated a model field name which starts with a digit"
self.assertNotIn(" 4extra = models.CharField", output, msg=error_message)
self.assertIn("number_4extra = models.CharField", output)
self.assertNotIn(" 45extra = models.CharField", output, msg=error_message)
self.assertIn("number_45extra = models.CharField", output)
def test_special_column_name_introspection(self):
"""
Introspection of column names containing special characters,
unsuitable for Python identifiers
"""
out = StringIO()
call_command('inspectdb', table_name_filter=special_table_only, stdout=out)
output = out.getvalue()
base_name = connection.introspection.identifier_converter('Field')
self.assertIn("field = models.IntegerField()", output)
self.assertIn("field_field = models.IntegerField(db_column='%s_')" % base_name, output)
self.assertIn("field_field_0 = models.IntegerField(db_column='%s__')" % base_name, output)
self.assertIn("field_field_1 = models.IntegerField(db_column='__field')", output)
self.assertIn("prc_x = models.IntegerField(db_column='prc(%) x')", output)
self.assertIn("tamaño = models.IntegerField()", output)
def test_table_name_introspection(self):
"""
Introspection of table names containing special characters,
unsuitable for Python identifiers
"""
out = StringIO()
call_command('inspectdb', table_name_filter=special_table_only, stdout=out)
output = out.getvalue()
self.assertIn("class InspectdbSpecialTableName(models.Model):", output)
def test_managed_models(self):
"""By default the command generates models with `Meta.managed = False` (#14305)"""
out = StringIO()
call_command('inspectdb', 'inspectdb_columntypes', stdout=out)
output = out.getvalue()
self.longMessage = False
self.assertIn(" managed = False", output, msg='inspectdb should generate unmanaged models.')
def test_unique_together_meta(self):
out = StringIO()
call_command('inspectdb', 'inspectdb_uniquetogether', stdout=out)
output = out.getvalue()
self.assertIn(" unique_together = (('", output)
unique_together_match = re.findall(self.unique_re, output)
# There should be one unique_together tuple.
self.assertEqual(len(unique_together_match), 1)
fields = unique_together_match[0]
# Fields with db_column = field name.
self.assertIn("('field1', 'field2')", fields)
# Fields from columns whose names are Python keywords.
self.assertIn("('field1', 'field2')", fields)
# Fields whose names normalize to the same Python field name and hence
# are given an integer suffix.
self.assertIn("('non_unique_column', 'non_unique_column_0')", fields)
@skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific SQL')
def test_unsupported_unique_together(self):
"""Unsupported index types (COALESCE here) are skipped."""
with connection.cursor() as c:
c.execute(
'CREATE UNIQUE INDEX Findex ON %s '
'(id, people_unique_id, COALESCE(message_id, -1))' % PeopleMoreData._meta.db_table
)
try:
out = StringIO()
call_command(
'inspectdb',
table_name_filter=lambda tn: tn.startswith(PeopleMoreData._meta.db_table),
stdout=out,
)
output = out.getvalue()
self.assertIn('# A unique constraint could not be introspected.', output)
self.assertEqual(re.findall(self.unique_re, output), ["('id', 'people_unique')"])
finally:
with connection.cursor() as c:
c.execute('DROP INDEX Findex')
@skipUnless(connection.vendor == 'sqlite',
"Only patched sqlite's DatabaseIntrospection.data_types_reverse for this test")
def test_custom_fields(self):
"""
Introspection of columns with a custom field (#21090)
"""
out = StringIO()
orig_data_types_reverse = connection.introspection.data_types_reverse
try:
connection.introspection.data_types_reverse = {
'text': 'myfields.TextField',
'bigint': 'BigIntegerField',
}
call_command('inspectdb', 'inspectdb_columntypes', stdout=out)
output = out.getvalue()
self.assertIn("text_field = myfields.TextField()", output)
self.assertIn("big_int_field = models.BigIntegerField()", output)
finally:
connection.introspection.data_types_reverse = orig_data_types_reverse
def test_introspection_errors(self):
"""
Introspection errors should not crash the command, and the error should
be visible in the output.
"""
out = StringIO()
with mock.patch('django.db.connection.introspection.get_table_list',
return_value=[TableInfo(name='nonexistent', type='t')]):
call_command('inspectdb', stdout=out)
output = out.getvalue()
self.assertIn("# Unable to inspect table 'nonexistent'", output)
# The error message depends on the backend
self.assertIn("# The error was:", output)
class InspectDBTransactionalTests(TransactionTestCase):
available_apps = ['inspectdb']
def test_include_views(self):
"""inspectdb --include-views creates models for database views."""
with connection.cursor() as cursor:
cursor.execute(
'CREATE VIEW inspectdb_people_view AS '
'SELECT id, name FROM inspectdb_people'
)
out = StringIO()
view_model = 'class InspectdbPeopleView(models.Model):'
view_managed = 'managed = False # Created from a view.'
try:
call_command('inspectdb', table_name_filter=inspectdb_tables_only, stdout=out)
no_views_output = out.getvalue()
self.assertNotIn(view_model, no_views_output)
self.assertNotIn(view_managed, no_views_output)
call_command('inspectdb', table_name_filter=inspectdb_tables_only, include_views=True, stdout=out)
with_views_output = out.getvalue()
self.assertIn(view_model, with_views_output)
self.assertIn(view_managed, with_views_output)
finally:
with connection.cursor() as cursor:
cursor.execute('DROP VIEW inspectdb_people_view')
@skipUnlessDBFeature('can_introspect_materialized_views')
def test_include_materialized_views(self):
"""inspectdb --include-views creates models for materialized views."""
with connection.cursor() as cursor:
cursor.execute(
'CREATE MATERIALIZED VIEW inspectdb_people_materialized AS '
'SELECT id, name FROM inspectdb_people'
)
out = StringIO()
view_model = 'class InspectdbPeopleMaterialized(models.Model):'
view_managed = 'managed = False # Created from a view.'
try:
call_command('inspectdb', table_name_filter=inspectdb_tables_only, stdout=out)
no_views_output = out.getvalue()
self.assertNotIn(view_model, no_views_output)
self.assertNotIn(view_managed, no_views_output)
call_command('inspectdb', table_name_filter=inspectdb_tables_only, include_views=True, stdout=out)
with_views_output = out.getvalue()
self.assertIn(view_model, with_views_output)
self.assertIn(view_managed, with_views_output)
finally:
with connection.cursor() as cursor:
cursor.execute('DROP MATERIALIZED VIEW inspectdb_people_materialized')
@skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific SQL')
@skipUnlessDBFeature('supports_table_partitions')
def test_include_partitions(self):
"""inspectdb --include-partitions creates models for partitions."""
with connection.cursor() as cursor:
cursor.execute('''\
CREATE TABLE inspectdb_partition_parent (name text not null)
PARTITION BY LIST (left(upper(name), 1))
''')
cursor.execute('''\
CREATE TABLE inspectdb_partition_child
PARTITION OF inspectdb_partition_parent
FOR VALUES IN ('A', 'B', 'C')
''')
out = StringIO()
partition_model_parent = 'class InspectdbPartitionParent(models.Model):'
partition_model_child = 'class InspectdbPartitionChild(models.Model):'
partition_managed = 'managed = False # Created from a partition.'
try:
call_command('inspectdb', table_name_filter=inspectdb_tables_only, stdout=out)
no_partitions_output = out.getvalue()
self.assertIn(partition_model_parent, no_partitions_output)
self.assertNotIn(partition_model_child, no_partitions_output)
self.assertNotIn(partition_managed, no_partitions_output)
call_command('inspectdb', table_name_filter=inspectdb_tables_only, include_partitions=True, stdout=out)
with_partitions_output = out.getvalue()
self.assertIn(partition_model_parent, with_partitions_output)
self.assertIn(partition_model_child, with_partitions_output)
self.assertIn(partition_managed, with_partitions_output)
finally:
with connection.cursor() as cursor:
cursor.execute('DROP TABLE IF EXISTS inspectdb_partition_child')
cursor.execute('DROP TABLE IF EXISTS inspectdb_partition_parent')
@skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific SQL')
def test_foreign_data_wrapper(self):
with connection.cursor() as cursor:
cursor.execute('CREATE EXTENSION IF NOT EXISTS file_fdw')
cursor.execute('CREATE SERVER inspectdb_server FOREIGN DATA WRAPPER file_fdw')
cursor.execute('''\
CREATE FOREIGN TABLE inspectdb_iris_foreign_table (
petal_length real,
petal_width real,
sepal_length real,
sepal_width real
) SERVER inspectdb_server OPTIONS (
filename '/dev/null'
)
''')
out = StringIO()
foreign_table_model = 'class InspectdbIrisForeignTable(models.Model):'
foreign_table_managed = 'managed = False'
try:
call_command('inspectdb', stdout=out)
output = out.getvalue()
self.assertIn(foreign_table_model, output)
self.assertIn(foreign_table_managed, output)
finally:
with connection.cursor() as cursor:
cursor.execute('DROP FOREIGN TABLE IF EXISTS inspectdb_iris_foreign_table')
cursor.execute('DROP SERVER IF EXISTS inspectdb_server')
cursor.execute('DROP EXTENSION IF EXISTS file_fdw')