mirror of
				https://github.com/django/django.git
				synced 2025-10-24 22:26:08 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			671 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			671 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import re
 | |
| from io import StringIO
 | |
| from unittest import mock, skipUnless
 | |
| 
 | |
| from django.core.management import call_command
 | |
| from django.core.management.commands import inspectdb
 | |
| from django.db import connection
 | |
| from django.db.backends.base.introspection import TableInfo
 | |
| from django.test import TestCase, TransactionTestCase, skipUnlessDBFeature
 | |
| 
 | |
| from .models import PeopleMoreData, test_collation
 | |
| 
 | |
| 
 | |
| def inspectdb_tables_only(table_name):
 | |
|     """
 | |
|     Limit introspection to tables created for models of this app.
 | |
|     Some databases such as Oracle are extremely slow at introspection.
 | |
|     """
 | |
|     return table_name.startswith("inspectdb_")
 | |
| 
 | |
| 
 | |
| def inspectdb_views_only(table_name):
 | |
|     return table_name.startswith("inspectdb_") and table_name.endswith(
 | |
|         ("_materialized", "_view")
 | |
|     )
 | |
| 
 | |
| 
 | |
| def special_table_only(table_name):
 | |
|     return table_name.startswith("inspectdb_special")
 | |
| 
 | |
| 
 | |
| class InspectDBTestCase(TestCase):
 | |
|     unique_re = re.compile(r".*unique_together = \((.+),\).*")
 | |
| 
 | |
|     def test_stealth_table_name_filter_option(self):
 | |
|         out = StringIO()
 | |
|         call_command("inspectdb", table_name_filter=inspectdb_tables_only, stdout=out)
 | |
|         error_message = (
 | |
|             "inspectdb has examined a table that should have been filtered out."
 | |
|         )
 | |
|         # contrib.contenttypes is one of the apps always installed when running
 | |
|         # the Django test suite, check that one of its tables hasn't been
 | |
|         # inspected
 | |
|         self.assertNotIn(
 | |
|             "class DjangoContentType(models.Model):", out.getvalue(), msg=error_message
 | |
|         )
 | |
| 
 | |
|     def test_table_option(self):
 | |
|         """
 | |
|         inspectdb can inspect a subset of tables by passing the table names as
 | |
|         arguments.
 | |
|         """
 | |
|         out = StringIO()
 | |
|         call_command("inspectdb", "inspectdb_people", stdout=out)
 | |
|         output = out.getvalue()
 | |
|         self.assertIn("class InspectdbPeople(models.Model):", output)
 | |
|         self.assertNotIn("InspectdbPeopledata", output)
 | |
| 
 | |
|     def make_field_type_asserter(self):
 | |
|         """
 | |
|         Call inspectdb and return a function to validate a field type in its
 | |
|         output.
 | |
|         """
 | |
|         out = StringIO()
 | |
|         call_command("inspectdb", "inspectdb_columntypes", stdout=out)
 | |
|         output = out.getvalue()
 | |
| 
 | |
|         def assertFieldType(name, definition):
 | |
|             out_def = re.search(r"^\s*%s = (models.*)$" % name, output, re.MULTILINE)[1]
 | |
|             self.assertEqual(definition, out_def)
 | |
| 
 | |
|         return assertFieldType
 | |
| 
 | |
|     def test_field_types(self):
 | |
|         """Test introspection of various Django field types"""
 | |
|         assertFieldType = self.make_field_type_asserter()
 | |
|         introspected_field_types = connection.features.introspected_field_types
 | |
|         char_field_type = introspected_field_types["CharField"]
 | |
|         # Inspecting Oracle DB doesn't produce correct results (#19884):
 | |
|         # - it reports fields as blank=True when they aren't.
 | |
|         if (
 | |
|             not connection.features.interprets_empty_strings_as_nulls
 | |
|             and char_field_type == "CharField"
 | |
|         ):
 | |
|             assertFieldType("char_field", "models.CharField(max_length=10)")
 | |
|             assertFieldType(
 | |
|                 "null_char_field",
 | |
|                 "models.CharField(max_length=10, blank=True, null=True)",
 | |
|             )
 | |
|             assertFieldType("email_field", "models.CharField(max_length=254)")
 | |
|             assertFieldType("file_field", "models.CharField(max_length=100)")
 | |
|             assertFieldType("file_path_field", "models.CharField(max_length=100)")
 | |
|             assertFieldType("slug_field", "models.CharField(max_length=50)")
 | |
|             assertFieldType("text_field", "models.TextField()")
 | |
|             assertFieldType("url_field", "models.CharField(max_length=200)")
 | |
|         if char_field_type == "TextField":
 | |
|             assertFieldType("char_field", "models.TextField()")
 | |
|             assertFieldType(
 | |
|                 "null_char_field", "models.TextField(blank=True, null=True)"
 | |
|             )
 | |
|             assertFieldType("email_field", "models.TextField()")
 | |
|             assertFieldType("file_field", "models.TextField()")
 | |
|             assertFieldType("file_path_field", "models.TextField()")
 | |
|             assertFieldType("slug_field", "models.TextField()")
 | |
|             assertFieldType("text_field", "models.TextField()")
 | |
|             assertFieldType("url_field", "models.TextField()")
 | |
|         assertFieldType("date_field", "models.DateField()")
 | |
|         assertFieldType("date_time_field", "models.DateTimeField()")
 | |
|         if introspected_field_types["GenericIPAddressField"] == "GenericIPAddressField":
 | |
|             assertFieldType("gen_ip_address_field", "models.GenericIPAddressField()")
 | |
|         elif not connection.features.interprets_empty_strings_as_nulls:
 | |
|             assertFieldType("gen_ip_address_field", "models.CharField(max_length=39)")
 | |
|         assertFieldType(
 | |
|             "time_field", "models.%s()" % introspected_field_types["TimeField"]
 | |
|         )
 | |
|         if connection.features.has_native_uuid_field:
 | |
|             assertFieldType("uuid_field", "models.UUIDField()")
 | |
|         elif not connection.features.interprets_empty_strings_as_nulls:
 | |
|             assertFieldType("uuid_field", "models.CharField(max_length=32)")
 | |
| 
 | |
|     @skipUnlessDBFeature("can_introspect_json_field", "supports_json_field")
 | |
|     def test_json_field(self):
 | |
|         out = StringIO()
 | |
|         call_command("inspectdb", "inspectdb_jsonfieldcolumntype", stdout=out)
 | |
|         output = out.getvalue()
 | |
|         if not connection.features.interprets_empty_strings_as_nulls:
 | |
|             self.assertIn("json_field = models.JSONField()", output)
 | |
|         self.assertIn(
 | |
|             "null_json_field = models.JSONField(blank=True, null=True)", output
 | |
|         )
 | |
| 
 | |
|     @skipUnlessDBFeature("supports_comments")
 | |
|     def test_db_comments(self):
 | |
|         out = StringIO()
 | |
|         call_command("inspectdb", "inspectdb_dbcomment", stdout=out)
 | |
|         output = out.getvalue()
 | |
|         integer_field_type = connection.features.introspected_field_types[
 | |
|             "IntegerField"
 | |
|         ]
 | |
|         self.assertIn(
 | |
|             f"rank = models.{integer_field_type}("
 | |
|             f"db_comment=\"'Rank' column comment\")",
 | |
|             output,
 | |
|         )
 | |
|         self.assertIn(
 | |
|             "        db_table_comment = 'Custom table comment'",
 | |
|             output,
 | |
|         )
 | |
| 
 | |
|     @skipUnlessDBFeature("supports_collation_on_charfield")
 | |
|     @skipUnless(test_collation, "Language collations are not supported.")
 | |
|     def test_char_field_db_collation(self):
 | |
|         out = StringIO()
 | |
|         call_command("inspectdb", "inspectdb_charfielddbcollation", stdout=out)
 | |
|         output = out.getvalue()
 | |
|         if not connection.features.interprets_empty_strings_as_nulls:
 | |
|             self.assertIn(
 | |
|                 "char_field = models.CharField(max_length=10, "
 | |
|                 "db_collation='%s')" % test_collation,
 | |
|                 output,
 | |
|             )
 | |
|         else:
 | |
|             self.assertIn(
 | |
|                 "char_field = models.CharField(max_length=10, "
 | |
|                 "db_collation='%s', blank=True, null=True)" % test_collation,
 | |
|                 output,
 | |
|             )
 | |
| 
 | |
|     @skipUnlessDBFeature("supports_collation_on_textfield")
 | |
|     @skipUnless(test_collation, "Language collations are not supported.")
 | |
|     def test_text_field_db_collation(self):
 | |
|         out = StringIO()
 | |
|         call_command("inspectdb", "inspectdb_textfielddbcollation", stdout=out)
 | |
|         output = out.getvalue()
 | |
|         if not connection.features.interprets_empty_strings_as_nulls:
 | |
|             self.assertIn(
 | |
|                 "text_field = models.TextField(db_collation='%s')" % test_collation,
 | |
|                 output,
 | |
|             )
 | |
|         else:
 | |
|             self.assertIn(
 | |
|                 "text_field = models.TextField(db_collation='%s, blank=True, "
 | |
|                 "null=True)" % test_collation,
 | |
|                 output,
 | |
|             )
 | |
| 
 | |
|     @skipUnlessDBFeature("supports_unlimited_charfield")
 | |
|     def test_char_field_unlimited(self):
 | |
|         out = StringIO()
 | |
|         call_command("inspectdb", "inspectdb_charfieldunlimited", stdout=out)
 | |
|         output = out.getvalue()
 | |
|         self.assertIn("char_field = models.CharField()", output)
 | |
| 
 | |
|     def test_number_field_types(self):
 | |
|         """Test introspection of various Django field types"""
 | |
|         assertFieldType = self.make_field_type_asserter()
 | |
|         introspected_field_types = connection.features.introspected_field_types
 | |
| 
 | |
|         auto_field_type = connection.features.introspected_field_types["AutoField"]
 | |
|         if auto_field_type != "AutoField":
 | |
|             assertFieldType(
 | |
|                 "id", "models.%s(primary_key=True)  # AutoField?" % auto_field_type
 | |
|             )
 | |
| 
 | |
|         assertFieldType(
 | |
|             "big_int_field", "models.%s()" % introspected_field_types["BigIntegerField"]
 | |
|         )
 | |
| 
 | |
|         bool_field_type = introspected_field_types["BooleanField"]
 | |
|         assertFieldType("bool_field", "models.{}()".format(bool_field_type))
 | |
|         assertFieldType(
 | |
|             "null_bool_field",
 | |
|             "models.{}(blank=True, null=True)".format(bool_field_type),
 | |
|         )
 | |
| 
 | |
|         if connection.vendor != "sqlite":
 | |
|             assertFieldType(
 | |
|                 "decimal_field", "models.DecimalField(max_digits=6, decimal_places=1)"
 | |
|             )
 | |
|         else:  # Guessed arguments on SQLite, see #5014
 | |
|             assertFieldType(
 | |
|                 "decimal_field",
 | |
|                 "models.DecimalField(max_digits=10, decimal_places=5)  "
 | |
|                 "# max_digits and decimal_places have been guessed, "
 | |
|                 "as this database handles decimal fields as float",
 | |
|             )
 | |
| 
 | |
|         assertFieldType("float_field", "models.FloatField()")
 | |
|         assertFieldType(
 | |
|             "int_field", "models.%s()" % introspected_field_types["IntegerField"]
 | |
|         )
 | |
|         assertFieldType(
 | |
|             "pos_int_field",
 | |
|             "models.%s()" % introspected_field_types["PositiveIntegerField"],
 | |
|         )
 | |
|         assertFieldType(
 | |
|             "pos_big_int_field",
 | |
|             "models.%s()" % introspected_field_types["PositiveBigIntegerField"],
 | |
|         )
 | |
|         assertFieldType(
 | |
|             "pos_small_int_field",
 | |
|             "models.%s()" % introspected_field_types["PositiveSmallIntegerField"],
 | |
|         )
 | |
|         assertFieldType(
 | |
|             "small_int_field",
 | |
|             "models.%s()" % introspected_field_types["SmallIntegerField"],
 | |
|         )
 | |
| 
 | |
|     @skipUnlessDBFeature("can_introspect_foreign_keys")
 | |
|     def test_attribute_name_not_python_keyword(self):
 | |
|         out = StringIO()
 | |
|         call_command("inspectdb", table_name_filter=inspectdb_tables_only, stdout=out)
 | |
|         output = out.getvalue()
 | |
|         error_message = (
 | |
|             "inspectdb generated an attribute name which is a Python keyword"
 | |
|         )
 | |
|         # Recursive foreign keys should be set to 'self'
 | |
|         self.assertIn("parent = models.ForeignKey('self', models.DO_NOTHING)", output)
 | |
|         self.assertNotIn(
 | |
|             "from = models.ForeignKey(InspectdbPeople, models.DO_NOTHING)",
 | |
|             output,
 | |
|             msg=error_message,
 | |
|         )
 | |
|         # As InspectdbPeople model is defined after InspectdbMessage, it should
 | |
|         # be quoted.
 | |
|         self.assertIn(
 | |
|             "from_field = models.ForeignKey('InspectdbPeople', models.DO_NOTHING, "
 | |
|             "db_column='from_id')",
 | |
|             output,
 | |
|         )
 | |
|         self.assertIn(
 | |
|             "people_pk = models.OneToOneField(InspectdbPeople, models.DO_NOTHING, "
 | |
|             "primary_key=True)",
 | |
|             output,
 | |
|         )
 | |
|         self.assertIn(
 | |
|             "people_unique = models.OneToOneField(InspectdbPeople, models.DO_NOTHING)",
 | |
|             output,
 | |
|         )
 | |
| 
 | |
|     @skipUnlessDBFeature("can_introspect_foreign_keys")
 | |
|     def test_foreign_key_to_field(self):
 | |
|         out = StringIO()
 | |
|         call_command("inspectdb", "inspectdb_foreignkeytofield", stdout=out)
 | |
|         self.assertIn(
 | |
|             "to_field_fk = models.ForeignKey('InspectdbPeoplemoredata', "
 | |
|             "models.DO_NOTHING, to_field='people_unique_id')",
 | |
|             out.getvalue(),
 | |
|         )
 | |
| 
 | |
|     def test_digits_column_name_introspection(self):
 | |
|         """Introspection of column names consist/start with digits (#16536/#17676)"""
 | |
|         char_field_type = connection.features.introspected_field_types["CharField"]
 | |
|         out = StringIO()
 | |
|         call_command("inspectdb", "inspectdb_digitsincolumnname", stdout=out)
 | |
|         output = out.getvalue()
 | |
|         error_message = "inspectdb generated a model field name which is a number"
 | |
|         self.assertNotIn(
 | |
|             "    123 = models.%s" % char_field_type, output, msg=error_message
 | |
|         )
 | |
|         self.assertIn("number_123 = models.%s" % char_field_type, output)
 | |
| 
 | |
|         error_message = (
 | |
|             "inspectdb generated a model field name which starts with a digit"
 | |
|         )
 | |
|         self.assertNotIn(
 | |
|             "    4extra = models.%s" % char_field_type, output, msg=error_message
 | |
|         )
 | |
|         self.assertIn("number_4extra = models.%s" % char_field_type, output)
 | |
| 
 | |
|         self.assertNotIn(
 | |
|             "    45extra = models.%s" % char_field_type, output, msg=error_message
 | |
|         )
 | |
|         self.assertIn("number_45extra = models.%s" % char_field_type, output)
 | |
| 
 | |
|     def test_special_column_name_introspection(self):
 | |
|         """
 | |
|         Introspection of column names containing special characters,
 | |
|         unsuitable for Python identifiers
 | |
|         """
 | |
|         out = StringIO()
 | |
|         call_command("inspectdb", table_name_filter=special_table_only, stdout=out)
 | |
|         output = out.getvalue()
 | |
|         base_name = connection.introspection.identifier_converter("Field")
 | |
|         integer_field_type = connection.features.introspected_field_types[
 | |
|             "IntegerField"
 | |
|         ]
 | |
|         self.assertIn("field = models.%s()" % integer_field_type, output)
 | |
|         self.assertIn(
 | |
|             "field_field = models.%s(db_column='%s_')"
 | |
|             % (integer_field_type, base_name),
 | |
|             output,
 | |
|         )
 | |
|         self.assertIn(
 | |
|             "field_field_0 = models.%s(db_column='%s__')"
 | |
|             % (integer_field_type, base_name),
 | |
|             output,
 | |
|         )
 | |
|         self.assertIn(
 | |
|             "field_field_1 = models.%s(db_column='__field')" % integer_field_type,
 | |
|             output,
 | |
|         )
 | |
|         self.assertIn(
 | |
|             "prc_x = models.{}(db_column='prc(%) x')".format(integer_field_type), output
 | |
|         )
 | |
|         self.assertIn("tamaño = models.%s()" % integer_field_type, output)
 | |
| 
 | |
|     def test_table_name_introspection(self):
 | |
|         """
 | |
|         Introspection of table names containing special characters,
 | |
|         unsuitable for Python identifiers
 | |
|         """
 | |
|         out = StringIO()
 | |
|         call_command("inspectdb", table_name_filter=special_table_only, stdout=out)
 | |
|         output = out.getvalue()
 | |
|         self.assertIn("class InspectdbSpecialTableName(models.Model):", output)
 | |
| 
 | |
|     def test_custom_normalize_table_name(self):
 | |
|         def pascal_case_table_only(table_name):
 | |
|             return table_name.startswith("inspectdb_pascal")
 | |
| 
 | |
|         class MyCommand(inspectdb.Command):
 | |
|             def normalize_table_name(self, table_name):
 | |
|                 normalized_name = table_name.split(".")[1]
 | |
|                 if connection.features.ignores_table_name_case:
 | |
|                     normalized_name = normalized_name.lower()
 | |
|                 return normalized_name
 | |
| 
 | |
|         out = StringIO()
 | |
|         call_command(MyCommand(), table_name_filter=pascal_case_table_only, stdout=out)
 | |
|         if connection.features.ignores_table_name_case:
 | |
|             expected_model_name = "pascalcase"
 | |
|         else:
 | |
|             expected_model_name = "PascalCase"
 | |
|         self.assertIn(f"class {expected_model_name}(models.Model):", out.getvalue())
 | |
| 
 | |
|     @skipUnlessDBFeature("supports_expression_indexes")
 | |
|     def test_table_with_func_unique_constraint(self):
 | |
|         out = StringIO()
 | |
|         call_command("inspectdb", "inspectdb_funcuniqueconstraint", stdout=out)
 | |
|         output = out.getvalue()
 | |
|         self.assertIn("class InspectdbFuncuniqueconstraint(models.Model):", output)
 | |
| 
 | |
|     def test_managed_models(self):
 | |
|         """
 | |
|         By default the command generates models with `Meta.managed = False`.
 | |
|         """
 | |
|         out = StringIO()
 | |
|         call_command("inspectdb", "inspectdb_columntypes", stdout=out)
 | |
|         output = out.getvalue()
 | |
|         self.longMessage = False
 | |
|         self.assertIn(
 | |
|             "        managed = False",
 | |
|             output,
 | |
|             msg="inspectdb should generate unmanaged models.",
 | |
|         )
 | |
| 
 | |
|     def test_unique_together_meta(self):
 | |
|         out = StringIO()
 | |
|         call_command("inspectdb", "inspectdb_uniquetogether", stdout=out)
 | |
|         output = out.getvalue()
 | |
|         self.assertIn("    unique_together = (('", output)
 | |
|         unique_together_match = self.unique_re.findall(output)
 | |
|         # There should be one unique_together tuple.
 | |
|         self.assertEqual(len(unique_together_match), 1)
 | |
|         fields = unique_together_match[0]
 | |
|         # Fields with db_column = field name.
 | |
|         self.assertIn("('field1', 'field2')", fields)
 | |
|         # Fields from columns whose names are Python keywords.
 | |
|         self.assertIn("('field1', 'field2')", fields)
 | |
|         # Fields whose names normalize to the same Python field name and hence
 | |
|         # are given an integer suffix.
 | |
|         self.assertIn("('non_unique_column', 'non_unique_column_0')", fields)
 | |
| 
 | |
|     @skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL")
 | |
|     def test_unsupported_unique_together(self):
 | |
|         """Unsupported index types (COALESCE here) are skipped."""
 | |
|         with connection.cursor() as c:
 | |
|             c.execute(
 | |
|                 "CREATE UNIQUE INDEX Findex ON %s "
 | |
|                 "(id, people_unique_id, COALESCE(message_id, -1))"
 | |
|                 % PeopleMoreData._meta.db_table
 | |
|             )
 | |
|         try:
 | |
|             out = StringIO()
 | |
|             call_command(
 | |
|                 "inspectdb",
 | |
|                 table_name_filter=lambda tn: tn.startswith(
 | |
|                     PeopleMoreData._meta.db_table
 | |
|                 ),
 | |
|                 stdout=out,
 | |
|             )
 | |
|             output = out.getvalue()
 | |
|             self.assertIn("# A unique constraint could not be introspected.", output)
 | |
|             self.assertEqual(
 | |
|                 self.unique_re.findall(output), ["('id', 'people_unique')"]
 | |
|             )
 | |
|         finally:
 | |
|             with connection.cursor() as c:
 | |
|                 c.execute("DROP INDEX Findex")
 | |
| 
 | |
|     @skipUnless(
 | |
|         connection.vendor == "sqlite",
 | |
|         "Only patched sqlite's DatabaseIntrospection.data_types_reverse for this test",
 | |
|     )
 | |
|     def test_custom_fields(self):
 | |
|         """
 | |
|         Introspection of columns with a custom field (#21090)
 | |
|         """
 | |
|         out = StringIO()
 | |
|         with mock.patch(
 | |
|             "django.db.connection.introspection.data_types_reverse."
 | |
|             "base_data_types_reverse",
 | |
|             {
 | |
|                 "text": "myfields.TextField",
 | |
|                 "bigint": "BigIntegerField",
 | |
|             },
 | |
|         ):
 | |
|             call_command("inspectdb", "inspectdb_columntypes", stdout=out)
 | |
|             output = out.getvalue()
 | |
|             self.assertIn("text_field = myfields.TextField()", output)
 | |
|             self.assertIn("big_int_field = models.BigIntegerField()", output)
 | |
| 
 | |
|     def test_introspection_errors(self):
 | |
|         """
 | |
|         Introspection errors should not crash the command, and the error should
 | |
|         be visible in the output.
 | |
|         """
 | |
|         out = StringIO()
 | |
|         with mock.patch(
 | |
|             "django.db.connection.introspection.get_table_list",
 | |
|             return_value=[TableInfo(name="nonexistent", type="t")],
 | |
|         ):
 | |
|             call_command("inspectdb", stdout=out)
 | |
|         output = out.getvalue()
 | |
|         self.assertIn("# Unable to inspect table 'nonexistent'", output)
 | |
|         # The error message depends on the backend
 | |
|         self.assertIn("# The error was:", output)
 | |
| 
 | |
|     def test_same_relations(self):
 | |
|         out = StringIO()
 | |
|         call_command("inspectdb", "inspectdb_message", stdout=out)
 | |
|         self.assertIn(
 | |
|             "author = models.ForeignKey('InspectdbPeople', models.DO_NOTHING, "
 | |
|             "related_name='inspectdbmessage_author_set')",
 | |
|             out.getvalue(),
 | |
|         )
 | |
| 
 | |
| 
 | |
| class InspectDBTransactionalTests(TransactionTestCase):
 | |
|     available_apps = ["inspectdb"]
 | |
| 
 | |
|     def test_include_views(self):
 | |
|         """inspectdb --include-views creates models for database views."""
 | |
|         with connection.cursor() as cursor:
 | |
|             cursor.execute(
 | |
|                 "CREATE VIEW inspectdb_people_view AS "
 | |
|                 "SELECT id, name FROM inspectdb_people"
 | |
|             )
 | |
|         out = StringIO()
 | |
|         view_model = "class InspectdbPeopleView(models.Model):"
 | |
|         view_managed = "managed = False  # Created from a view."
 | |
|         try:
 | |
|             call_command(
 | |
|                 "inspectdb",
 | |
|                 table_name_filter=inspectdb_views_only,
 | |
|                 stdout=out,
 | |
|             )
 | |
|             no_views_output = out.getvalue()
 | |
|             self.assertNotIn(view_model, no_views_output)
 | |
|             self.assertNotIn(view_managed, no_views_output)
 | |
|             call_command(
 | |
|                 "inspectdb",
 | |
|                 table_name_filter=inspectdb_views_only,
 | |
|                 include_views=True,
 | |
|                 stdout=out,
 | |
|             )
 | |
|             with_views_output = out.getvalue()
 | |
|             self.assertIn(view_model, with_views_output)
 | |
|             self.assertIn(view_managed, with_views_output)
 | |
|         finally:
 | |
|             with connection.cursor() as cursor:
 | |
|                 cursor.execute("DROP VIEW inspectdb_people_view")
 | |
| 
 | |
|     @skipUnlessDBFeature("can_introspect_materialized_views")
 | |
|     def test_include_materialized_views(self):
 | |
|         """inspectdb --include-views creates models for materialized views."""
 | |
|         with connection.cursor() as cursor:
 | |
|             cursor.execute(
 | |
|                 "CREATE MATERIALIZED VIEW inspectdb_people_materialized AS "
 | |
|                 "SELECT id, name FROM inspectdb_people"
 | |
|             )
 | |
|         out = StringIO()
 | |
|         view_model = "class InspectdbPeopleMaterialized(models.Model):"
 | |
|         view_managed = "managed = False  # Created from a view."
 | |
|         try:
 | |
|             call_command(
 | |
|                 "inspectdb",
 | |
|                 table_name_filter=inspectdb_views_only,
 | |
|                 stdout=out,
 | |
|             )
 | |
|             no_views_output = out.getvalue()
 | |
|             self.assertNotIn(view_model, no_views_output)
 | |
|             self.assertNotIn(view_managed, no_views_output)
 | |
|             call_command(
 | |
|                 "inspectdb",
 | |
|                 table_name_filter=inspectdb_views_only,
 | |
|                 include_views=True,
 | |
|                 stdout=out,
 | |
|             )
 | |
|             with_views_output = out.getvalue()
 | |
|             self.assertIn(view_model, with_views_output)
 | |
|             self.assertIn(view_managed, with_views_output)
 | |
|         finally:
 | |
|             with connection.cursor() as cursor:
 | |
|                 cursor.execute("DROP MATERIALIZED VIEW inspectdb_people_materialized")
 | |
| 
 | |
|     @skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL")
 | |
|     def test_include_partitions(self):
 | |
|         """inspectdb --include-partitions creates models for partitions."""
 | |
|         with connection.cursor() as cursor:
 | |
|             cursor.execute(
 | |
|                 """\
 | |
|                 CREATE TABLE inspectdb_partition_parent (name text not null)
 | |
|                 PARTITION BY LIST (left(upper(name), 1))
 | |
|             """
 | |
|             )
 | |
|             cursor.execute(
 | |
|                 """\
 | |
|                 CREATE TABLE inspectdb_partition_child
 | |
|                 PARTITION OF inspectdb_partition_parent
 | |
|                 FOR VALUES IN ('A', 'B', 'C')
 | |
|             """
 | |
|             )
 | |
|         out = StringIO()
 | |
|         partition_model_parent = "class InspectdbPartitionParent(models.Model):"
 | |
|         partition_model_child = "class InspectdbPartitionChild(models.Model):"
 | |
|         partition_managed = "managed = False  # Created from a partition."
 | |
|         try:
 | |
|             call_command(
 | |
|                 "inspectdb", table_name_filter=inspectdb_tables_only, stdout=out
 | |
|             )
 | |
|             no_partitions_output = out.getvalue()
 | |
|             self.assertIn(partition_model_parent, no_partitions_output)
 | |
|             self.assertNotIn(partition_model_child, no_partitions_output)
 | |
|             self.assertNotIn(partition_managed, no_partitions_output)
 | |
|             call_command(
 | |
|                 "inspectdb",
 | |
|                 table_name_filter=inspectdb_tables_only,
 | |
|                 include_partitions=True,
 | |
|                 stdout=out,
 | |
|             )
 | |
|             with_partitions_output = out.getvalue()
 | |
|             self.assertIn(partition_model_parent, with_partitions_output)
 | |
|             self.assertIn(partition_model_child, with_partitions_output)
 | |
|             self.assertIn(partition_managed, with_partitions_output)
 | |
|         finally:
 | |
|             with connection.cursor() as cursor:
 | |
|                 cursor.execute("DROP TABLE IF EXISTS inspectdb_partition_child")
 | |
|                 cursor.execute("DROP TABLE IF EXISTS inspectdb_partition_parent")
 | |
| 
 | |
|     @skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL")
 | |
|     def test_foreign_data_wrapper(self):
 | |
|         with connection.cursor() as cursor:
 | |
|             cursor.execute("CREATE EXTENSION IF NOT EXISTS file_fdw")
 | |
|             cursor.execute(
 | |
|                 "CREATE SERVER inspectdb_server FOREIGN DATA WRAPPER file_fdw"
 | |
|             )
 | |
|             cursor.execute(
 | |
|                 """
 | |
|                 CREATE FOREIGN TABLE inspectdb_iris_foreign_table (
 | |
|                     petal_length real,
 | |
|                     petal_width real,
 | |
|                     sepal_length real,
 | |
|                     sepal_width real
 | |
|                 ) SERVER inspectdb_server OPTIONS (
 | |
|                     program 'echo 1,2,3,4',
 | |
|                     format 'csv'
 | |
|                 )
 | |
|                 """
 | |
|             )
 | |
|         out = StringIO()
 | |
|         foreign_table_model = "class InspectdbIrisForeignTable(models.Model):"
 | |
|         foreign_table_managed = "managed = False"
 | |
|         try:
 | |
|             call_command(
 | |
|                 "inspectdb",
 | |
|                 table_name_filter=inspectdb_tables_only,
 | |
|                 stdout=out,
 | |
|             )
 | |
|             output = out.getvalue()
 | |
|             self.assertIn(foreign_table_model, output)
 | |
|             self.assertIn(foreign_table_managed, output)
 | |
|         finally:
 | |
|             with connection.cursor() as cursor:
 | |
|                 cursor.execute(
 | |
|                     "DROP FOREIGN TABLE IF EXISTS inspectdb_iris_foreign_table"
 | |
|                 )
 | |
|                 cursor.execute("DROP SERVER IF EXISTS inspectdb_server")
 | |
|                 cursor.execute("DROP EXTENSION IF EXISTS file_fdw")
 | |
| 
 | |
|     @skipUnlessDBFeature("create_test_table_with_composite_primary_key")
 | |
|     def test_composite_primary_key(self):
 | |
|         table_name = "test_table_composite_pk"
 | |
|         with connection.cursor() as cursor:
 | |
|             cursor.execute(
 | |
|                 connection.features.create_test_table_with_composite_primary_key
 | |
|             )
 | |
|         out = StringIO()
 | |
|         if connection.vendor == "sqlite":
 | |
|             field_type = connection.features.introspected_field_types["AutoField"]
 | |
|         else:
 | |
|             field_type = connection.features.introspected_field_types["IntegerField"]
 | |
|         try:
 | |
|             call_command("inspectdb", table_name, stdout=out)
 | |
|             output = out.getvalue()
 | |
|             self.assertIn(
 | |
|                 f"column_1 = models.{field_type}(primary_key=True)  # The composite "
 | |
|                 f"primary key (column_1, column_2) found, that is not supported. The "
 | |
|                 f"first column is selected.",
 | |
|                 output,
 | |
|             )
 | |
|             self.assertIn(
 | |
|                 "column_2 = models.%s()"
 | |
|                 % connection.features.introspected_field_types["IntegerField"],
 | |
|                 output,
 | |
|             )
 | |
|         finally:
 | |
|             with connection.cursor() as cursor:
 | |
|                 cursor.execute("DROP TABLE %s" % table_name)
 |