1
0
mirror of https://github.com/django/django.git synced 2025-11-07 07:15:35 +00:00

Fixed #36661 -- Added introspection of database-level delete options.

This commit is contained in:
Mariusz Felisiak
2025-10-31 14:33:27 +01:00
committed by GitHub
parent 6019147229
commit 05ba1a9228
12 changed files with 185 additions and 31 deletions

View File

@@ -4,6 +4,7 @@ import re
from django.core.management.base import BaseCommand, CommandError
from django.db import DEFAULT_DB_ALIAS, connections
from django.db.models.constants import LOOKUP_SEP
from django.db.models.deletion import DatabaseOnDelete
class Command(BaseCommand):
@@ -163,7 +164,9 @@ class Command(BaseCommand):
extra_params["unique"] = True
if is_relation:
ref_db_column, ref_db_table = relations[column_name]
ref_db_column, ref_db_table, db_on_delete = relations[
column_name
]
if extra_params.pop("unique", False) or extra_params.get(
"primary_key"
):
@@ -191,6 +194,8 @@ class Command(BaseCommand):
model_name.lower(),
att_name,
)
if db_on_delete and isinstance(db_on_delete, DatabaseOnDelete):
extra_params["on_delete"] = f"models.{db_on_delete}"
used_relations.add(rel_to)
else:
# Calling `get_field_type` to get the field type string
@@ -227,8 +232,12 @@ class Command(BaseCommand):
"" if "." in field_type else "models.",
field_type,
)
on_delete_qualname = extra_params.pop("on_delete", None)
if field_type.startswith(("ForeignKey(", "OneToOneField(")):
field_desc += ", models.DO_NOTHING"
if on_delete_qualname:
field_desc += f", {on_delete_qualname}"
else:
field_desc += ", models.DO_NOTHING"
# Add comment.
if connection.features.supports_comments and row.comment:

View File

@@ -1,5 +1,7 @@
from collections import namedtuple
from django.db.models import DB_CASCADE, DB_SET_DEFAULT, DB_SET_NULL, DO_NOTHING
# Structure returned by DatabaseIntrospection.get_table_list()
TableInfo = namedtuple("TableInfo", ["name", "type"])
@@ -15,6 +17,13 @@ class BaseDatabaseIntrospection:
"""Encapsulate backend-specific introspection utilities."""
data_types_reverse = {}
on_delete_types = {
"CASCADE": DB_CASCADE,
"NO ACTION": DO_NOTHING,
"SET DEFAULT": DB_SET_DEFAULT,
"SET NULL": DB_SET_NULL,
# DB_RESTRICT - "RESTRICT" is not supported.
}
def __init__(self, connection):
self.connection = connection
@@ -169,8 +178,11 @@ class BaseDatabaseIntrospection:
def get_relations(self, cursor, table_name):
"""
Return a dictionary of {field_name: (field_name_other_table,
other_table)} representing all foreign keys in the given table.
Return a dictionary of
{
field_name: (field_name_other_table, other_table, db_on_delete)
}
representing all foreign keys in the given table.
"""
raise NotImplementedError(
"subclasses of BaseDatabaseIntrospection may require a "

View File

@@ -334,6 +334,7 @@ class DatabaseWrapper(BaseDatabaseWrapper):
for column_name, (
referenced_column_name,
referenced_table_name,
_,
) in relations.items():
cursor.execute(
"""

View File

@@ -196,24 +196,36 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
def get_relations(self, cursor, table_name):
"""
Return a dictionary of {field_name: (field_name_other_table,
other_table)} representing all foreign keys in the given table.
Return a dictionary of
{
field_name: (field_name_other_table, other_table, db_on_delete)
}
representing all foreign keys in the given table.
"""
cursor.execute(
"""
SELECT column_name, referenced_column_name, referenced_table_name
FROM information_schema.key_column_usage
WHERE table_name = %s
AND table_schema = DATABASE()
AND referenced_table_schema = DATABASE()
AND referenced_table_name IS NOT NULL
AND referenced_column_name IS NOT NULL
SELECT
kcu.column_name,
kcu.referenced_column_name,
kcu.referenced_table_name,
rc.delete_rule
FROM
information_schema.key_column_usage kcu
JOIN
information_schema.referential_constraints rc
ON rc.constraint_name = kcu.constraint_name
AND rc.constraint_schema = kcu.constraint_schema
WHERE kcu.table_name = %s
AND kcu.table_schema = DATABASE()
AND kcu.referenced_table_schema = DATABASE()
AND kcu.referenced_table_name IS NOT NULL
AND kcu.referenced_column_name IS NOT NULL
""",
[table_name],
)
return {
field_name: (other_field, other_table)
for field_name, other_field, other_table in cursor.fetchall()
field_name: (other_field, other_table, self.on_delete_types.get(on_delete))
for field_name, other_field, other_table, on_delete in cursor.fetchall()
}
def get_storage_engine(self, cursor, table_name):

View File

@@ -254,13 +254,16 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
def get_relations(self, cursor, table_name):
"""
Return a dictionary of {field_name: (field_name_other_table,
other_table)} representing all foreign keys in the given table.
Return a dictionary of
{
field_name: (field_name_other_table, other_table, db_on_delete)
}
representing all foreign keys in the given table.
"""
table_name = table_name.upper()
cursor.execute(
"""
SELECT ca.column_name, cb.table_name, cb.column_name
SELECT ca.column_name, cb.table_name, cb.column_name, user_constraints.delete_rule
FROM user_constraints, USER_CONS_COLUMNS ca, USER_CONS_COLUMNS cb
WHERE user_constraints.table_name = %s AND
user_constraints.constraint_name = ca.constraint_name AND
@@ -273,8 +276,14 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
self.identifier_converter(field_name): (
self.identifier_converter(rel_field_name),
self.identifier_converter(rel_table_name),
self.on_delete_types.get(on_delete),
)
for field_name, rel_table_name, rel_field_name in cursor.fetchall()
for (
field_name,
rel_table_name,
rel_field_name,
on_delete,
) in cursor.fetchall()
}
def get_primary_key_columns(self, cursor, table_name):

View File

@@ -3,7 +3,7 @@ from collections import namedtuple
from django.db.backends.base.introspection import BaseDatabaseIntrospection
from django.db.backends.base.introspection import FieldInfo as BaseFieldInfo
from django.db.backends.base.introspection import TableInfo as BaseTableInfo
from django.db.models import Index
from django.db.models import DB_CASCADE, DB_SET_DEFAULT, DB_SET_NULL, DO_NOTHING, Index
FieldInfo = namedtuple("FieldInfo", [*BaseFieldInfo._fields, "is_autofield", "comment"])
TableInfo = namedtuple("TableInfo", [*BaseTableInfo._fields, "comment"])
@@ -38,6 +38,14 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
ignored_tables = []
on_delete_types = {
"a": DO_NOTHING,
"c": DB_CASCADE,
"d": DB_SET_DEFAULT,
"n": DB_SET_NULL,
# DB_RESTRICT - "r" is not supported.
}
def get_field_type(self, data_type, description):
field_type = super().get_field_type(data_type, description)
if description.is_autofield or (
@@ -154,12 +162,15 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
def get_relations(self, cursor, table_name):
"""
Return a dictionary of {field_name: (field_name_other_table,
other_table)} representing all foreign keys in the given table.
Return a dictionary of
{
field_name: (field_name_other_table, other_table, db_on_delete)
}
representing all foreign keys in the given table.
"""
cursor.execute(
"""
SELECT a1.attname, c2.relname, a2.attname
SELECT a1.attname, c2.relname, a2.attname, con.confdeltype
FROM pg_constraint con
LEFT JOIN pg_class c1 ON con.conrelid = c1.oid
LEFT JOIN pg_class c2 ON con.confrelid = c2.oid
@@ -175,7 +186,10 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
""",
[table_name],
)
return {row[0]: (row[2], row[1]) for row in cursor.fetchall()}
return {
row[0]: (row[2], row[1], self.on_delete_types.get(row[3]))
for row in cursor.fetchall()
}
def get_constraints(self, cursor, table_name):
"""

View File

@@ -153,20 +153,27 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
def get_relations(self, cursor, table_name):
"""
Return a dictionary of {column_name: (ref_column_name, ref_table_name)}
Return a dictionary of
{column_name: (ref_column_name, ref_table_name, db_on_delete)}
representing all foreign keys in the given table.
"""
cursor.execute(
"PRAGMA foreign_key_list(%s)" % self.connection.ops.quote_name(table_name)
)
return {
column_name: (ref_column_name, ref_table_name)
column_name: (
ref_column_name,
ref_table_name,
self.on_delete_types.get(on_delete),
)
for (
_,
_,
ref_table_name,
column_name,
ref_column_name,
_,
on_delete,
*_,
) in cursor.fetchall()
}
@@ -407,7 +414,10 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
"check": False,
"index": False,
}
for index, (column_name, (ref_column_name, ref_table_name)) in relations
for index, (
column_name,
(ref_column_name, ref_table_name, _),
) in relations
}
)
return constraints

View File

@@ -314,6 +314,11 @@ backends.
database has native support for ``DurationField``, override this method to
simply return the value.
* The ``DatabaseIntrospection.get_relations()`` should now return a dictionary
with 3-tuples containing (``field_name_other_table``, ``other_table``,
``db_on_delete``) as values. ``db_on_delete`` is one of the database-level
delete options e.g. :attr:`~django.db.models.DB_CASCADE`.
:mod:`django.contrib.gis`
-------------------------

View File

@@ -161,3 +161,11 @@ class CompositePKModel(models.Model):
pk = models.CompositePrimaryKey("column_1", "column_2")
column_1 = models.IntegerField()
column_2 = models.IntegerField()
class DbOnDeleteModel(models.Model):
fk_do_nothing = models.ForeignKey(UniqueTogether, on_delete=models.DO_NOTHING)
fk_db_cascade = models.ForeignKey(ColumnTypes, on_delete=models.DB_CASCADE)
fk_set_null = models.ForeignKey(
DigitsInColumnName, on_delete=models.DB_SET_NULL, null=True
)

View File

@@ -299,6 +299,27 @@ class InspectDBTestCase(TestCase):
out.getvalue(),
)
@skipUnlessDBFeature("can_introspect_foreign_keys")
def test_foreign_key_db_on_delete(self):
out = StringIO()
call_command("inspectdb", "inspectdb_dbondeletemodel", stdout=out)
output = out.getvalue()
self.assertIn(
"fk_do_nothing = models.ForeignKey('InspectdbUniquetogether', "
"models.DO_NOTHING)",
output,
)
self.assertIn(
"fk_db_cascade = models.ForeignKey('InspectdbColumntypes', "
"models.DB_CASCADE)",
output,
)
self.assertIn(
"fk_set_null = models.ForeignKey('InspectdbDigitsincolumnname', "
"models.DB_SET_NULL, blank=True, null=True)",
output,
)
def test_digits_column_name_introspection(self):
"""
Introspection of column names consist/start with digits (#16536/#17676)

View File

@@ -110,3 +110,18 @@ class DbCommentModel(models.Model):
class Meta:
db_table_comment = "Custom table comment"
required_db_features = {"supports_comments"}
class DbOnDeleteModel(models.Model):
fk_do_nothing = models.ForeignKey(Country, on_delete=models.DO_NOTHING)
fk_db_cascade = models.ForeignKey(City, on_delete=models.DB_CASCADE)
fk_set_null = models.ForeignKey(Reporter, on_delete=models.DB_SET_NULL, null=True)
class DbOnDeleteSetDefaultModel(models.Model):
fk_db_set_default = models.ForeignKey(
Country, on_delete=models.DB_SET_DEFAULT, db_default=models.Value(1)
)
class Meta:
required_db_features = {"supports_on_delete_db_default"}

View File

@@ -1,5 +1,5 @@
from django.db import DatabaseError, connection
from django.db.models import Index
from django.db.models import DB_CASCADE, DB_SET_DEFAULT, DB_SET_NULL, DO_NOTHING, Index
from django.test import TransactionTestCase, skipUnlessDBFeature
from .models import (
@@ -10,6 +10,8 @@ from .models import (
Comment,
Country,
DbCommentModel,
DbOnDeleteModel,
DbOnDeleteSetDefaultModel,
District,
Reporter,
UniqueConstraintConditionModel,
@@ -219,10 +221,14 @@ class IntrospectionTests(TransactionTestCase):
cursor, Article._meta.db_table
)
# That's {field_name: (field_name_other_table, other_table)}
if connection.vendor == "mysql" and connection.mysql_is_mariadb:
no_db_on_delete = None
else:
no_db_on_delete = DO_NOTHING
# {field_name: (field_name_other_table, other_table, db_on_delete)}
expected_relations = {
"reporter_id": ("id", Reporter._meta.db_table),
"response_to_id": ("id", Article._meta.db_table),
"reporter_id": ("id", Reporter._meta.db_table, no_db_on_delete),
"response_to_id": ("id", Article._meta.db_table, no_db_on_delete),
}
self.assertEqual(relations, expected_relations)
@@ -238,6 +244,38 @@ class IntrospectionTests(TransactionTestCase):
editor.add_field(Article, body)
self.assertEqual(relations, expected_relations)
@skipUnlessDBFeature("can_introspect_foreign_keys")
def test_get_relations_db_on_delete(self):
with connection.cursor() as cursor:
relations = connection.introspection.get_relations(
cursor, DbOnDeleteModel._meta.db_table
)
if connection.vendor == "mysql" and connection.mysql_is_mariadb:
no_db_on_delete = None
else:
no_db_on_delete = DO_NOTHING
# {field_name: (field_name_other_table, other_table, db_on_delete)}
expected_relations = {
"fk_db_cascade_id": ("id", City._meta.db_table, DB_CASCADE),
"fk_do_nothing_id": ("id", Country._meta.db_table, no_db_on_delete),
"fk_set_null_id": ("id", Reporter._meta.db_table, DB_SET_NULL),
}
self.assertEqual(relations, expected_relations)
@skipUnlessDBFeature("can_introspect_foreign_keys", "supports_on_delete_db_default")
def test_get_relations_db_on_delete_default(self):
with connection.cursor() as cursor:
relations = connection.introspection.get_relations(
cursor, DbOnDeleteSetDefaultModel._meta.db_table
)
# {field_name: (field_name_other_table, other_table, db_on_delete)}
expected_relations = {
"fk_db_set_default_id": ("id", Country._meta.db_table, DB_SET_DEFAULT),
}
self.assertEqual(relations, expected_relations)
def test_get_primary_key_column(self):
with connection.cursor() as cursor:
primary_key_column = connection.introspection.get_primary_key_column(