mirror of
https://github.com/django/django.git
synced 2025-01-05 07:55:47 +00:00
Fixed #28884 -- Fixed crash on SQLite when renaming a field in a model referenced by a ManyToManyField.
Introspected database constraints instead of relying on _meta.related_objects
to determine whether or not a table or a column is referenced on rename
operations.
This has the side effect of ignoring both db_constraint=False and virtual
fields such as GenericRelation which aren't backend by database level
constraints and thus shouldn't prevent the rename operations from being
performed in a transaction.
Regression in 095c1aaa89
.
Thanks Tim for the additional tests and edits, and Mariusz for the review.
This commit is contained in:
parent
f3a98224e6
commit
9f7772e098
@ -199,6 +199,22 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
|
|||||||
'pk': field[5], # undocumented
|
'pk': field[5], # undocumented
|
||||||
} for field in cursor.fetchall()]
|
} for field in cursor.fetchall()]
|
||||||
|
|
||||||
|
def _get_foreign_key_constraints(self, cursor, table_name):
|
||||||
|
constraints = {}
|
||||||
|
cursor.execute('PRAGMA foreign_key_list(%s)' % self.connection.ops.quote_name(table_name))
|
||||||
|
for row in cursor.fetchall():
|
||||||
|
# Remaining on_update/on_delete/match values are of no interest.
|
||||||
|
id_, _, table, from_, to = row[:5]
|
||||||
|
constraints['fk_%d' % id_] = {
|
||||||
|
'columns': [from_],
|
||||||
|
'primary_key': False,
|
||||||
|
'unique': False,
|
||||||
|
'foreign_key': (table, to),
|
||||||
|
'check': False,
|
||||||
|
'index': False,
|
||||||
|
}
|
||||||
|
return constraints
|
||||||
|
|
||||||
def get_constraints(self, cursor, table_name):
|
def get_constraints(self, cursor, table_name):
|
||||||
"""
|
"""
|
||||||
Retrieve any constraints or keys (unique, pk, fk, check, index) across
|
Retrieve any constraints or keys (unique, pk, fk, check, index) across
|
||||||
@ -253,17 +269,5 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
|
|||||||
"check": False,
|
"check": False,
|
||||||
"index": False,
|
"index": False,
|
||||||
}
|
}
|
||||||
# Get foreign keys
|
constraints.update(self._get_foreign_key_constraints(cursor, table_name))
|
||||||
cursor.execute('PRAGMA foreign_key_list(%s)' % self.connection.ops.quote_name(table_name))
|
|
||||||
for row in cursor.fetchall():
|
|
||||||
# Remaining on_update/on_delete/match values are of no interest here
|
|
||||||
id_, seq, table, from_, to = row[:5]
|
|
||||||
constraints['fk_%d' % id_] = {
|
|
||||||
'columns': [from_],
|
|
||||||
'primary_key': False,
|
|
||||||
'unique': False,
|
|
||||||
'foreign_key': (table, to),
|
|
||||||
'check': False,
|
|
||||||
'index': False,
|
|
||||||
}
|
|
||||||
return constraints
|
return constraints
|
||||||
|
@ -55,8 +55,27 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
|
|||||||
else:
|
else:
|
||||||
raise ValueError("Cannot quote parameter value %r of type %s" % (value, type(value)))
|
raise ValueError("Cannot quote parameter value %r of type %s" % (value, type(value)))
|
||||||
|
|
||||||
|
def _is_referenced_by_fk_constraint(self, table_name, column_name=None, ignore_self=False):
|
||||||
|
"""
|
||||||
|
Return whether or not the provided table name is referenced by another
|
||||||
|
one. If `column_name` is specified, only references pointing to that
|
||||||
|
column are considered. If `ignore_self` is True, self-referential
|
||||||
|
constraints are ignored.
|
||||||
|
"""
|
||||||
|
with self.connection.cursor() as cursor:
|
||||||
|
for other_table in self.connection.introspection.get_table_list(cursor):
|
||||||
|
if ignore_self and other_table.name == table_name:
|
||||||
|
continue
|
||||||
|
constraints = self.connection.introspection._get_foreign_key_constraints(cursor, other_table.name)
|
||||||
|
for constraint in constraints.values():
|
||||||
|
constraint_table, constraint_column = constraint['foreign_key']
|
||||||
|
if (constraint_table == table_name and
|
||||||
|
(column_name is None or constraint_column == column_name)):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
def alter_db_table(self, model, old_db_table, new_db_table, disable_constraints=True):
|
def alter_db_table(self, model, old_db_table, new_db_table, disable_constraints=True):
|
||||||
if model._meta.related_objects and disable_constraints:
|
if disable_constraints and self._is_referenced_by_fk_constraint(old_db_table):
|
||||||
if self.connection.in_atomic_block:
|
if self.connection.in_atomic_block:
|
||||||
raise NotSupportedError((
|
raise NotSupportedError((
|
||||||
'Renaming the %r table while in a transaction is not '
|
'Renaming the %r table while in a transaction is not '
|
||||||
@ -71,8 +90,10 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
|
|||||||
|
|
||||||
def alter_field(self, model, old_field, new_field, strict=False):
|
def alter_field(self, model, old_field, new_field, strict=False):
|
||||||
old_field_name = old_field.name
|
old_field_name = old_field.name
|
||||||
|
table_name = model._meta.db_table
|
||||||
|
_, old_column_name = old_field.get_attname_column()
|
||||||
if (new_field.name != old_field_name and
|
if (new_field.name != old_field_name and
|
||||||
any(r.field_name == old_field.name for r in model._meta.related_objects)):
|
self._is_referenced_by_fk_constraint(table_name, old_column_name, ignore_self=True)):
|
||||||
if self.connection.in_atomic_block:
|
if self.connection.in_atomic_block:
|
||||||
raise NotSupportedError((
|
raise NotSupportedError((
|
||||||
'Renaming the %r.%r column while in a transaction is not '
|
'Renaming the %r.%r column while in a transaction is not '
|
||||||
@ -87,9 +108,7 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
|
|||||||
with self.connection.cursor() as cursor:
|
with self.connection.cursor() as cursor:
|
||||||
schema_version = cursor.execute('PRAGMA schema_version').fetchone()[0]
|
schema_version = cursor.execute('PRAGMA schema_version').fetchone()[0]
|
||||||
cursor.execute('PRAGMA writable_schema = 1')
|
cursor.execute('PRAGMA writable_schema = 1')
|
||||||
table_name = model._meta.db_table
|
|
||||||
references_template = ' REFERENCES "%s" ("%%s") ' % table_name
|
references_template = ' REFERENCES "%s" ("%%s") ' % table_name
|
||||||
old_column_name = old_field.get_attname_column()[1]
|
|
||||||
new_column_name = new_field.get_attname_column()[1]
|
new_column_name = new_field.get_attname_column()[1]
|
||||||
search = references_template % old_column_name
|
search = references_template % old_column_name
|
||||||
replacement = references_template % new_column_name
|
replacement = references_template % new_column_name
|
||||||
|
@ -34,3 +34,6 @@ Bugfixes
|
|||||||
|
|
||||||
* Fixed crash when coercing a translatable URL pattern to ``str``
|
* Fixed crash when coercing a translatable URL pattern to ``str``
|
||||||
(:ticket:`28947`).
|
(:ticket:`28947`).
|
||||||
|
|
||||||
|
* Fixed crash on SQLite when renaming a field in a model referenced by a
|
||||||
|
``ManyToManyField`` (:ticket:`28884`).
|
||||||
|
@ -722,7 +722,7 @@ class OperationTests(OperationTestBase):
|
|||||||
|
|
||||||
project_state = self.apply_operations(app_label, project_state, operations=[
|
project_state = self.apply_operations(app_label, project_state, operations=[
|
||||||
migrations.RenameModel("Pony", "Pony2"),
|
migrations.RenameModel("Pony", "Pony2"),
|
||||||
])
|
], atomic=connection.features.supports_atomic_references_rename)
|
||||||
Pony = project_state.apps.get_model(app_label, "Pony2")
|
Pony = project_state.apps.get_model(app_label, "Pony2")
|
||||||
Rider = project_state.apps.get_model(app_label, "Rider")
|
Rider = project_state.apps.get_model(app_label, "Rider")
|
||||||
pony = Pony.objects.create()
|
pony = Pony.objects.create()
|
||||||
@ -1231,12 +1231,13 @@ class OperationTests(OperationTestBase):
|
|||||||
second_state = first_state.clone()
|
second_state = first_state.clone()
|
||||||
operation = migrations.AlterModelTable(name='pony', table=None)
|
operation = migrations.AlterModelTable(name='pony', table=None)
|
||||||
operation.state_forwards(app_label, second_state)
|
operation.state_forwards(app_label, second_state)
|
||||||
with connection.schema_editor() as editor:
|
atomic_rename = connection.features.supports_atomic_references_rename
|
||||||
|
with connection.schema_editor(atomic=atomic_rename) as editor:
|
||||||
operation.database_forwards(app_label, editor, first_state, second_state)
|
operation.database_forwards(app_label, editor, first_state, second_state)
|
||||||
self.assertTableExists(new_m2m_table)
|
self.assertTableExists(new_m2m_table)
|
||||||
self.assertTableNotExists(original_m2m_table)
|
self.assertTableNotExists(original_m2m_table)
|
||||||
# And test reversal
|
# And test reversal
|
||||||
with connection.schema_editor() as editor:
|
with connection.schema_editor(atomic=atomic_rename) as editor:
|
||||||
operation.database_backwards(app_label, editor, second_state, first_state)
|
operation.database_backwards(app_label, editor, second_state, first_state)
|
||||||
self.assertTableExists(original_m2m_table)
|
self.assertTableExists(original_m2m_table)
|
||||||
self.assertTableNotExists(new_m2m_table)
|
self.assertTableNotExists(new_m2m_table)
|
||||||
|
@ -61,6 +61,9 @@ class SchemaTests(TransactionTestCase):
|
|||||||
# local_models should contain test dependent model classes that will be
|
# local_models should contain test dependent model classes that will be
|
||||||
# automatically removed from the app cache on test tear down.
|
# automatically removed from the app cache on test tear down.
|
||||||
self.local_models = []
|
self.local_models = []
|
||||||
|
# isolated_local_models contains models that are in test methods
|
||||||
|
# decorated with @isolate_apps.
|
||||||
|
self.isolated_local_models = []
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
# Delete any tables made for our models
|
# Delete any tables made for our models
|
||||||
@ -75,6 +78,10 @@ class SchemaTests(TransactionTestCase):
|
|||||||
if through and through._meta.auto_created:
|
if through and through._meta.auto_created:
|
||||||
del new_apps.all_models['schema'][through._meta.model_name]
|
del new_apps.all_models['schema'][through._meta.model_name]
|
||||||
del new_apps.all_models['schema'][model._meta.model_name]
|
del new_apps.all_models['schema'][model._meta.model_name]
|
||||||
|
if self.isolated_local_models:
|
||||||
|
with connection.schema_editor() as editor:
|
||||||
|
for model in self.isolated_local_models:
|
||||||
|
editor.delete_model(model)
|
||||||
|
|
||||||
def delete_tables(self):
|
def delete_tables(self):
|
||||||
"Deletes all model tables for our models for a clean test environment"
|
"Deletes all model tables for our models for a clean test environment"
|
||||||
@ -1420,6 +1427,38 @@ class SchemaTests(TransactionTestCase):
|
|||||||
def test_m2m_repoint_inherited(self):
|
def test_m2m_repoint_inherited(self):
|
||||||
self._test_m2m_repoint(InheritedManyToManyField)
|
self._test_m2m_repoint(InheritedManyToManyField)
|
||||||
|
|
||||||
|
@isolate_apps('schema')
|
||||||
|
def test_m2m_rename_field_in_target_model(self):
|
||||||
|
class TagM2MTest(Model):
|
||||||
|
title = CharField(max_length=255)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
app_label = 'schema'
|
||||||
|
|
||||||
|
class LocalM2M(Model):
|
||||||
|
tags = ManyToManyField(TagM2MTest)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
app_label = 'schema'
|
||||||
|
|
||||||
|
# Create the tables.
|
||||||
|
with connection.schema_editor() as editor:
|
||||||
|
editor.create_model(LocalM2M)
|
||||||
|
editor.create_model(TagM2MTest)
|
||||||
|
# Ensure the m2m table is there.
|
||||||
|
self.assertEqual(len(self.column_classes(LocalM2M)), 1)
|
||||||
|
# Alter a field in TagM2MTest.
|
||||||
|
old_field = TagM2MTest._meta.get_field('title')
|
||||||
|
new_field = CharField(max_length=254)
|
||||||
|
new_field.contribute_to_class(TagM2MTest, 'title1')
|
||||||
|
# @isolate_apps() and inner models are needed to have the model
|
||||||
|
# relations populated, otherwise this doesn't act as a regression test.
|
||||||
|
self.assertEqual(len(new_field.model._meta.related_objects), 1)
|
||||||
|
with connection.schema_editor() as editor:
|
||||||
|
editor.alter_field(TagM2MTest, old_field, new_field, strict=True)
|
||||||
|
# Ensure the m2m table is still there.
|
||||||
|
self.assertEqual(len(self.column_classes(LocalM2M)), 1)
|
||||||
|
|
||||||
@skipUnlessDBFeature('supports_column_check_constraints')
|
@skipUnlessDBFeature('supports_column_check_constraints')
|
||||||
def test_check_constraints(self):
|
def test_check_constraints(self):
|
||||||
"""
|
"""
|
||||||
@ -2371,6 +2410,7 @@ class SchemaTests(TransactionTestCase):
|
|||||||
new_field.set_attributes_from_name('id')
|
new_field.set_attributes_from_name('id')
|
||||||
with connection.schema_editor() as editor:
|
with connection.schema_editor() as editor:
|
||||||
editor.alter_field(Node, old_field, new_field, strict=True)
|
editor.alter_field(Node, old_field, new_field, strict=True)
|
||||||
|
self.assertForeignKeyExists(Node, 'parent_id', Node._meta.db_table)
|
||||||
|
|
||||||
@mock.patch('django.db.backends.base.schema.datetime')
|
@mock.patch('django.db.backends.base.schema.datetime')
|
||||||
@mock.patch('django.db.backends.base.schema.timezone')
|
@mock.patch('django.db.backends.base.schema.timezone')
|
||||||
@ -2479,7 +2519,8 @@ class SchemaTests(TransactionTestCase):
|
|||||||
doc.students.add(student)
|
doc.students.add(student)
|
||||||
|
|
||||||
def test_rename_table_renames_deferred_sql_references(self):
|
def test_rename_table_renames_deferred_sql_references(self):
|
||||||
with connection.schema_editor() as editor:
|
atomic_rename = connection.features.supports_atomic_references_rename
|
||||||
|
with connection.schema_editor(atomic=atomic_rename) as editor:
|
||||||
editor.create_model(Author)
|
editor.create_model(Author)
|
||||||
editor.create_model(Book)
|
editor.create_model(Book)
|
||||||
editor.alter_db_table(Author, 'schema_author', 'schema_renamed_author')
|
editor.alter_db_table(Author, 'schema_author', 'schema_renamed_author')
|
||||||
@ -2506,3 +2547,60 @@ class SchemaTests(TransactionTestCase):
|
|||||||
for statement in editor.deferred_sql:
|
for statement in editor.deferred_sql:
|
||||||
self.assertIs(statement.references_column('book', 'title'), False)
|
self.assertIs(statement.references_column('book', 'title'), False)
|
||||||
self.assertIs(statement.references_column('book', 'author_id'), False)
|
self.assertIs(statement.references_column('book', 'author_id'), False)
|
||||||
|
|
||||||
|
@isolate_apps('schema')
|
||||||
|
def test_referenced_field_without_constraint_rename_inside_atomic_block(self):
|
||||||
|
"""
|
||||||
|
Foreign keys without database level constraint don't prevent the field
|
||||||
|
they reference from being renamed in an atomic block.
|
||||||
|
"""
|
||||||
|
class Foo(Model):
|
||||||
|
field = CharField(max_length=255, unique=True)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
app_label = 'schema'
|
||||||
|
|
||||||
|
class Bar(Model):
|
||||||
|
foo = ForeignKey(Foo, CASCADE, to_field='field', db_constraint=False)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
app_label = 'schema'
|
||||||
|
|
||||||
|
self.isolated_local_models = [Foo, Bar]
|
||||||
|
with connection.schema_editor() as editor:
|
||||||
|
editor.create_model(Foo)
|
||||||
|
editor.create_model(Bar)
|
||||||
|
|
||||||
|
new_field = CharField(max_length=255, unique=True)
|
||||||
|
new_field.set_attributes_from_name('renamed')
|
||||||
|
with connection.schema_editor(atomic=True) as editor:
|
||||||
|
editor.alter_field(Foo, Foo._meta.get_field('field'), new_field)
|
||||||
|
|
||||||
|
@isolate_apps('schema')
|
||||||
|
def test_referenced_table_without_constraint_rename_inside_atomic_block(self):
|
||||||
|
"""
|
||||||
|
Foreign keys without database level constraint don't prevent the table
|
||||||
|
they reference from being renamed in an atomic block.
|
||||||
|
"""
|
||||||
|
class Foo(Model):
|
||||||
|
field = CharField(max_length=255, unique=True)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
app_label = 'schema'
|
||||||
|
|
||||||
|
class Bar(Model):
|
||||||
|
foo = ForeignKey(Foo, CASCADE, to_field='field', db_constraint=False)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
app_label = 'schema'
|
||||||
|
|
||||||
|
self.isolated_local_models = [Foo, Bar]
|
||||||
|
with connection.schema_editor() as editor:
|
||||||
|
editor.create_model(Foo)
|
||||||
|
editor.create_model(Bar)
|
||||||
|
|
||||||
|
new_field = CharField(max_length=255, unique=True)
|
||||||
|
new_field.set_attributes_from_name('renamed')
|
||||||
|
with connection.schema_editor(atomic=True) as editor:
|
||||||
|
editor.alter_db_table(Foo, Foo._meta.db_table, 'renamed_table')
|
||||||
|
Foo._meta.db_table = 'renamed_table'
|
||||||
|
Loading…
Reference in New Issue
Block a user