1
0
mirror of https://github.com/django/django.git synced 2025-01-03 15:06:09 +00:00

Fixed #34799 -- Made MySQL introspection skip cross-database relations.

This commit is contained in:
John Whitman 2023-08-28 14:16:38 -04:00 committed by Mariusz Felisiak
parent 3f8dbe267d
commit 68a8996bdf
2 changed files with 65 additions and 0 deletions

View File

@ -198,6 +198,7 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
FROM information_schema.key_column_usage FROM information_schema.key_column_usage
WHERE table_name = %s WHERE table_name = %s
AND table_schema = DATABASE() AND table_schema = DATABASE()
AND referenced_table_schema = DATABASE()
AND referenced_table_name IS NOT NULL AND referenced_table_name IS NOT NULL
AND referenced_column_name IS NOT NULL AND referenced_column_name IS NOT NULL
""", """,
@ -257,6 +258,10 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
information_schema.table_constraints AS c information_schema.table_constraints AS c
WHERE WHERE
kc.table_schema = DATABASE() AND kc.table_schema = DATABASE() AND
(
kc.referenced_table_schema = DATABASE() OR
kc.referenced_table_schema IS NULL
) AND
c.table_schema = kc.table_schema AND c.table_schema = kc.table_schema AND
c.constraint_name = kc.constraint_name AND c.constraint_name = kc.constraint_name AND
c.constraint_type != 'CHECK' AND c.constraint_type != 'CHECK' AND

View File

@ -61,3 +61,63 @@ class StorageEngineTests(TestCase):
cursor.execute(drop_sql) cursor.execute(drop_sql)
with other_connection.cursor() as cursor: with other_connection.cursor() as cursor:
cursor.execute(drop_sql) cursor.execute(drop_sql)
@skipUnless(connection.vendor == "mysql", "MySQL specific SQL")
class TestCrossDatabaseRelations(TestCase):
databases = {"default", "other"}
def test_omit_cross_database_relations(self):
default_connection = connections["default"]
other_connection = connections["other"]
main_table = "cross_schema_get_relations_main_table"
main_table_quoted = default_connection.ops.quote_name(main_table)
other_schema_quoted = other_connection.ops.quote_name(
other_connection.settings_dict["NAME"]
)
rel_table = "cross_schema_get_relations_rel_table"
rel_table_quoted = other_connection.ops.quote_name(rel_table)
rel_column = "cross_schema_get_relations_rel_table_id"
rel_column_quoted = other_connection.ops.quote_name(rel_column)
try:
with other_connection.cursor() as other_cursor:
other_cursor.execute(
f"""
CREATE TABLE {rel_table_quoted} (
id integer AUTO_INCREMENT,
PRIMARY KEY (id)
)
"""
)
with default_connection.cursor() as default_cursor:
# Create table in the default schema with a cross-database
# relation.
default_cursor.execute(
f"""
CREATE TABLE {main_table_quoted} (
id integer AUTO_INCREMENT,
{rel_column_quoted} integer NOT NULL,
PRIMARY KEY (id),
FOREIGN KEY ({rel_column_quoted})
REFERENCES {other_schema_quoted}.{rel_table_quoted}(id)
)
"""
)
relations = default_connection.introspection.get_relations(
default_cursor, main_table
)
constraints = default_connection.introspection.get_constraints(
default_cursor, main_table
)
self.assertEqual(len(relations), 0)
rel_column_fk_constraints = [
spec
for name, spec in constraints.items()
if spec["columns"] == [rel_column] and spec["foreign_key"] is not None
]
self.assertEqual(len(rel_column_fk_constraints), 0)
finally:
with default_connection.cursor() as default_cursor:
default_cursor.execute(f"DROP TABLE IF EXISTS {main_table_quoted}")
with other_connection.cursor() as other_cursor:
other_cursor.execute(f"DROP TABLE IF EXISTS {rel_table_quoted}")