mirror of
				https://github.com/django/django.git
				synced 2025-10-24 22:26:08 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			383 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			383 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from django.db import DatabaseError, connection
 | |
| from django.db.models import Index
 | |
| from django.test import TransactionTestCase, skipUnlessDBFeature
 | |
| 
 | |
| from .models import (
 | |
|     Article,
 | |
|     ArticleReporter,
 | |
|     CheckConstraintModel,
 | |
|     City,
 | |
|     Comment,
 | |
|     Country,
 | |
|     District,
 | |
|     Reporter,
 | |
|     UniqueConstraintConditionModel,
 | |
| )
 | |
| 
 | |
| 
 | |
| class IntrospectionTests(TransactionTestCase):
 | |
| 
 | |
|     available_apps = ["introspection"]
 | |
| 
 | |
|     def test_table_names(self):
 | |
|         tl = connection.introspection.table_names()
 | |
|         self.assertEqual(tl, sorted(tl))
 | |
|         self.assertIn(
 | |
|             Reporter._meta.db_table,
 | |
|             tl,
 | |
|             "'%s' isn't in table_list()." % Reporter._meta.db_table,
 | |
|         )
 | |
|         self.assertIn(
 | |
|             Article._meta.db_table,
 | |
|             tl,
 | |
|             "'%s' isn't in table_list()." % Article._meta.db_table,
 | |
|         )
 | |
| 
 | |
|     def test_django_table_names(self):
 | |
|         with connection.cursor() as cursor:
 | |
|             cursor.execute("CREATE TABLE django_ixn_test_table (id INTEGER);")
 | |
|             tl = connection.introspection.django_table_names()
 | |
|             cursor.execute("DROP TABLE django_ixn_test_table;")
 | |
|             self.assertNotIn(
 | |
|                 "django_ixn_test_table",
 | |
|                 tl,
 | |
|                 "django_table_names() returned a non-Django table",
 | |
|             )
 | |
| 
 | |
|     def test_django_table_names_retval_type(self):
 | |
|         # Table name is a list #15216
 | |
|         tl = connection.introspection.django_table_names(only_existing=True)
 | |
|         self.assertIs(type(tl), list)
 | |
|         tl = connection.introspection.django_table_names(only_existing=False)
 | |
|         self.assertIs(type(tl), list)
 | |
| 
 | |
|     def test_table_names_with_views(self):
 | |
|         with connection.cursor() as cursor:
 | |
|             try:
 | |
|                 cursor.execute(
 | |
|                     "CREATE VIEW introspection_article_view AS SELECT headline "
 | |
|                     "from introspection_article;"
 | |
|                 )
 | |
|             except DatabaseError as e:
 | |
|                 if "insufficient privileges" in str(e):
 | |
|                     self.fail("The test user has no CREATE VIEW privileges")
 | |
|                 else:
 | |
|                     raise
 | |
|         try:
 | |
|             self.assertIn(
 | |
|                 "introspection_article_view",
 | |
|                 connection.introspection.table_names(include_views=True),
 | |
|             )
 | |
|             self.assertNotIn(
 | |
|                 "introspection_article_view", connection.introspection.table_names()
 | |
|             )
 | |
|         finally:
 | |
|             with connection.cursor() as cursor:
 | |
|                 cursor.execute("DROP VIEW introspection_article_view")
 | |
| 
 | |
|     def test_unmanaged_through_model(self):
 | |
|         tables = connection.introspection.django_table_names()
 | |
|         self.assertNotIn(ArticleReporter._meta.db_table, tables)
 | |
| 
 | |
|     def test_installed_models(self):
 | |
|         tables = [Article._meta.db_table, Reporter._meta.db_table]
 | |
|         models = connection.introspection.installed_models(tables)
 | |
|         self.assertEqual(models, {Article, Reporter})
 | |
| 
 | |
|     def test_sequence_list(self):
 | |
|         sequences = connection.introspection.sequence_list()
 | |
|         reporter_seqs = [
 | |
|             seq for seq in sequences if seq["table"] == Reporter._meta.db_table
 | |
|         ]
 | |
|         self.assertEqual(
 | |
|             len(reporter_seqs), 1, "Reporter sequence not found in sequence_list()"
 | |
|         )
 | |
|         self.assertEqual(reporter_seqs[0]["column"], "id")
 | |
| 
 | |
|     def test_get_table_description_names(self):
 | |
|         with connection.cursor() as cursor:
 | |
|             desc = connection.introspection.get_table_description(
 | |
|                 cursor, Reporter._meta.db_table
 | |
|             )
 | |
|         self.assertEqual(
 | |
|             [r[0] for r in desc], [f.column for f in Reporter._meta.fields]
 | |
|         )
 | |
| 
 | |
|     def test_get_table_description_types(self):
 | |
|         with connection.cursor() as cursor:
 | |
|             desc = connection.introspection.get_table_description(
 | |
|                 cursor, Reporter._meta.db_table
 | |
|             )
 | |
|         self.assertEqual(
 | |
|             [connection.introspection.get_field_type(r[1], r) for r in desc],
 | |
|             [
 | |
|                 connection.features.introspected_field_types[field]
 | |
|                 for field in (
 | |
|                     "AutoField",
 | |
|                     "CharField",
 | |
|                     "CharField",
 | |
|                     "CharField",
 | |
|                     "BigIntegerField",
 | |
|                     "BinaryField",
 | |
|                     "SmallIntegerField",
 | |
|                     "DurationField",
 | |
|                 )
 | |
|             ],
 | |
|         )
 | |
| 
 | |
|     def test_get_table_description_col_lengths(self):
 | |
|         with connection.cursor() as cursor:
 | |
|             desc = connection.introspection.get_table_description(
 | |
|                 cursor, Reporter._meta.db_table
 | |
|             )
 | |
|         self.assertEqual(
 | |
|             [
 | |
|                 r[3]
 | |
|                 for r in desc
 | |
|                 if connection.introspection.get_field_type(r[1], r) == "CharField"
 | |
|             ],
 | |
|             [30, 30, 254],
 | |
|         )
 | |
| 
 | |
|     def test_get_table_description_nullable(self):
 | |
|         with connection.cursor() as cursor:
 | |
|             desc = connection.introspection.get_table_description(
 | |
|                 cursor, Reporter._meta.db_table
 | |
|             )
 | |
|         nullable_by_backend = connection.features.interprets_empty_strings_as_nulls
 | |
|         self.assertEqual(
 | |
|             [r[6] for r in desc],
 | |
|             [
 | |
|                 False,
 | |
|                 nullable_by_backend,
 | |
|                 nullable_by_backend,
 | |
|                 nullable_by_backend,
 | |
|                 True,
 | |
|                 True,
 | |
|                 False,
 | |
|                 False,
 | |
|             ],
 | |
|         )
 | |
| 
 | |
|     def test_bigautofield(self):
 | |
|         with connection.cursor() as cursor:
 | |
|             desc = connection.introspection.get_table_description(
 | |
|                 cursor, City._meta.db_table
 | |
|             )
 | |
|         self.assertIn(
 | |
|             connection.features.introspected_field_types["BigAutoField"],
 | |
|             [connection.introspection.get_field_type(r[1], r) for r in desc],
 | |
|         )
 | |
| 
 | |
|     def test_smallautofield(self):
 | |
|         with connection.cursor() as cursor:
 | |
|             desc = connection.introspection.get_table_description(
 | |
|                 cursor, Country._meta.db_table
 | |
|             )
 | |
|         self.assertIn(
 | |
|             connection.features.introspected_field_types["SmallAutoField"],
 | |
|             [connection.introspection.get_field_type(r[1], r) for r in desc],
 | |
|         )
 | |
| 
 | |
|     # Regression test for #9991 - 'real' types in postgres
 | |
|     @skipUnlessDBFeature("has_real_datatype")
 | |
|     def test_postgresql_real_type(self):
 | |
|         with connection.cursor() as cursor:
 | |
|             cursor.execute("CREATE TABLE django_ixn_real_test_table (number REAL);")
 | |
|             desc = connection.introspection.get_table_description(
 | |
|                 cursor, "django_ixn_real_test_table"
 | |
|             )
 | |
|             cursor.execute("DROP TABLE django_ixn_real_test_table;")
 | |
|         self.assertEqual(
 | |
|             connection.introspection.get_field_type(desc[0][1], desc[0]), "FloatField"
 | |
|         )
 | |
| 
 | |
|     @skipUnlessDBFeature("can_introspect_foreign_keys")
 | |
|     def test_get_relations(self):
 | |
|         with connection.cursor() as cursor:
 | |
|             relations = connection.introspection.get_relations(
 | |
|                 cursor, Article._meta.db_table
 | |
|             )
 | |
| 
 | |
|         # That's {field_name: (field_name_other_table, other_table)}
 | |
|         expected_relations = {
 | |
|             "reporter_id": ("id", Reporter._meta.db_table),
 | |
|             "response_to_id": ("id", Article._meta.db_table),
 | |
|         }
 | |
|         self.assertEqual(relations, expected_relations)
 | |
| 
 | |
|         # Removing a field shouldn't disturb get_relations (#17785)
 | |
|         body = Article._meta.get_field("body")
 | |
|         with connection.schema_editor() as editor:
 | |
|             editor.remove_field(Article, body)
 | |
|         with connection.cursor() as cursor:
 | |
|             relations = connection.introspection.get_relations(
 | |
|                 cursor, Article._meta.db_table
 | |
|             )
 | |
|         with connection.schema_editor() as editor:
 | |
|             editor.add_field(Article, body)
 | |
|         self.assertEqual(relations, expected_relations)
 | |
| 
 | |
|     def test_get_primary_key_column(self):
 | |
|         with connection.cursor() as cursor:
 | |
|             primary_key_column = connection.introspection.get_primary_key_column(
 | |
|                 cursor, Article._meta.db_table
 | |
|             )
 | |
|             pk_fk_column = connection.introspection.get_primary_key_column(
 | |
|                 cursor, District._meta.db_table
 | |
|             )
 | |
|         self.assertEqual(primary_key_column, "id")
 | |
|         self.assertEqual(pk_fk_column, "city_id")
 | |
| 
 | |
|     def test_get_constraints_index_types(self):
 | |
|         with connection.cursor() as cursor:
 | |
|             constraints = connection.introspection.get_constraints(
 | |
|                 cursor, Article._meta.db_table
 | |
|             )
 | |
|         index = {}
 | |
|         index2 = {}
 | |
|         for val in constraints.values():
 | |
|             if val["columns"] == ["headline", "pub_date"]:
 | |
|                 index = val
 | |
|             if val["columns"] == [
 | |
|                 "headline",
 | |
|                 "response_to_id",
 | |
|                 "pub_date",
 | |
|                 "reporter_id",
 | |
|             ]:
 | |
|                 index2 = val
 | |
|         self.assertEqual(index["type"], Index.suffix)
 | |
|         self.assertEqual(index2["type"], Index.suffix)
 | |
| 
 | |
|     @skipUnlessDBFeature("supports_index_column_ordering")
 | |
|     def test_get_constraints_indexes_orders(self):
 | |
|         """
 | |
|         Indexes have the 'orders' key with a list of 'ASC'/'DESC' values.
 | |
|         """
 | |
|         with connection.cursor() as cursor:
 | |
|             constraints = connection.introspection.get_constraints(
 | |
|                 cursor, Article._meta.db_table
 | |
|             )
 | |
|         indexes_verified = 0
 | |
|         expected_columns = [
 | |
|             ["headline", "pub_date"],
 | |
|             ["headline", "response_to_id", "pub_date", "reporter_id"],
 | |
|         ]
 | |
|         if connection.features.indexes_foreign_keys:
 | |
|             expected_columns += [
 | |
|                 ["reporter_id"],
 | |
|                 ["response_to_id"],
 | |
|             ]
 | |
|         for val in constraints.values():
 | |
|             if val["index"] and not (val["primary_key"] or val["unique"]):
 | |
|                 self.assertIn(val["columns"], expected_columns)
 | |
|                 self.assertEqual(val["orders"], ["ASC"] * len(val["columns"]))
 | |
|                 indexes_verified += 1
 | |
|         self.assertEqual(indexes_verified, len(expected_columns))
 | |
| 
 | |
|     @skipUnlessDBFeature("supports_index_column_ordering", "supports_partial_indexes")
 | |
|     def test_get_constraints_unique_indexes_orders(self):
 | |
|         with connection.cursor() as cursor:
 | |
|             constraints = connection.introspection.get_constraints(
 | |
|                 cursor,
 | |
|                 UniqueConstraintConditionModel._meta.db_table,
 | |
|             )
 | |
|         self.assertIn("cond_name_without_color_uniq", constraints)
 | |
|         constraint = constraints["cond_name_without_color_uniq"]
 | |
|         self.assertIs(constraint["unique"], True)
 | |
|         self.assertEqual(constraint["columns"], ["name"])
 | |
|         self.assertEqual(constraint["orders"], ["ASC"])
 | |
| 
 | |
|     def test_get_constraints(self):
 | |
|         def assertDetails(
 | |
|             details,
 | |
|             cols,
 | |
|             primary_key=False,
 | |
|             unique=False,
 | |
|             index=False,
 | |
|             check=False,
 | |
|             foreign_key=None,
 | |
|         ):
 | |
|             # Different backends have different values for same constraints:
 | |
|             #               PRIMARY KEY     UNIQUE CONSTRAINT    UNIQUE INDEX
 | |
|             # MySQL      pk=1 uniq=1 idx=1  pk=0 uniq=1 idx=1  pk=0 uniq=1 idx=1
 | |
|             # PostgreSQL pk=1 uniq=1 idx=0  pk=0 uniq=1 idx=0  pk=0 uniq=1 idx=1
 | |
|             # SQLite     pk=1 uniq=0 idx=0  pk=0 uniq=1 idx=0  pk=0 uniq=1 idx=1
 | |
|             if details["primary_key"]:
 | |
|                 details["unique"] = True
 | |
|             if details["unique"]:
 | |
|                 details["index"] = False
 | |
|             self.assertEqual(details["columns"], cols)
 | |
|             self.assertEqual(details["primary_key"], primary_key)
 | |
|             self.assertEqual(details["unique"], unique)
 | |
|             self.assertEqual(details["index"], index)
 | |
|             self.assertEqual(details["check"], check)
 | |
|             self.assertEqual(details["foreign_key"], foreign_key)
 | |
| 
 | |
|         # Test custom constraints
 | |
|         custom_constraints = {
 | |
|             "article_email_pub_date_uniq",
 | |
|             "email_pub_date_idx",
 | |
|         }
 | |
|         with connection.cursor() as cursor:
 | |
|             constraints = connection.introspection.get_constraints(
 | |
|                 cursor, Comment._meta.db_table
 | |
|             )
 | |
|             if (
 | |
|                 connection.features.supports_column_check_constraints
 | |
|                 and connection.features.can_introspect_check_constraints
 | |
|             ):
 | |
|                 constraints.update(
 | |
|                     connection.introspection.get_constraints(
 | |
|                         cursor, CheckConstraintModel._meta.db_table
 | |
|                     )
 | |
|                 )
 | |
|                 custom_constraints.add("up_votes_gte_0_check")
 | |
|                 assertDetails(
 | |
|                     constraints["up_votes_gte_0_check"], ["up_votes"], check=True
 | |
|                 )
 | |
|         assertDetails(
 | |
|             constraints["article_email_pub_date_uniq"],
 | |
|             ["article_id", "email", "pub_date"],
 | |
|             unique=True,
 | |
|         )
 | |
|         assertDetails(
 | |
|             constraints["email_pub_date_idx"], ["email", "pub_date"], index=True
 | |
|         )
 | |
|         # Test field constraints
 | |
|         field_constraints = set()
 | |
|         for name, details in constraints.items():
 | |
|             if name in custom_constraints:
 | |
|                 continue
 | |
|             elif details["columns"] == ["up_votes"] and details["check"]:
 | |
|                 assertDetails(details, ["up_votes"], check=True)
 | |
|                 field_constraints.add(name)
 | |
|             elif details["columns"] == ["voting_number"] and details["check"]:
 | |
|                 assertDetails(details, ["voting_number"], check=True)
 | |
|                 field_constraints.add(name)
 | |
|             elif details["columns"] == ["ref"] and details["unique"]:
 | |
|                 assertDetails(details, ["ref"], unique=True)
 | |
|                 field_constraints.add(name)
 | |
|             elif details["columns"] == ["voting_number"] and details["unique"]:
 | |
|                 assertDetails(details, ["voting_number"], unique=True)
 | |
|                 field_constraints.add(name)
 | |
|             elif details["columns"] == ["article_id"] and details["index"]:
 | |
|                 assertDetails(details, ["article_id"], index=True)
 | |
|                 field_constraints.add(name)
 | |
|             elif details["columns"] == ["id"] and details["primary_key"]:
 | |
|                 assertDetails(details, ["id"], primary_key=True, unique=True)
 | |
|                 field_constraints.add(name)
 | |
|             elif details["columns"] == ["article_id"] and details["foreign_key"]:
 | |
|                 assertDetails(
 | |
|                     details, ["article_id"], foreign_key=("introspection_article", "id")
 | |
|                 )
 | |
|                 field_constraints.add(name)
 | |
|             elif details["check"]:
 | |
|                 # Some databases (e.g. Oracle) include additional check
 | |
|                 # constraints.
 | |
|                 field_constraints.add(name)
 | |
|         # All constraints are accounted for.
 | |
|         self.assertEqual(
 | |
|             constraints.keys() ^ (custom_constraints | field_constraints), set()
 | |
|         )
 |