1
0
mirror of https://github.com/django/django.git synced 2025-10-29 08:36:09 +00:00

Fixed #7539, #13067 -- Added on_delete argument to ForeignKey to control cascade behavior. Also refactored deletion for efficiency and code clarity. Many thanks to Johannes Dollinger and Michael Glassford for extensive work on the patch, and to Alex Gaynor, Russell Keith-Magee, and Jacob Kaplan-Moss for review.

git-svn-id: http://code.djangoproject.com/svn/django/trunk@14507 bcc190cf-cafb-0310-a4f2-bffc1f526a37
This commit is contained in:
Carl Meyer
2010-11-09 16:46:42 +00:00
parent 3ba3294c6b
commit 616b30227d
28 changed files with 850 additions and 608 deletions

View File

@@ -1,135 +1,253 @@
from django.db.models import sql
from django.db.models.loading import cache
from django.db.models.query import CollectedObjects
from django.db.models.query_utils import CyclicDependency
from django.test import TestCase
from django.db import models, IntegrityError
from django.test import TestCase, skipUnlessDBFeature, skipIfDBFeature
from models import A, B, C, D, E, F
from modeltests.delete.models import (R, RChild, S, T, U, A, M, MR, MRNull,
create_a, get_default_r, User, Avatar, HiddenUser, HiddenUserProfile)
class DeleteTests(TestCase):
def clear_rel_obj_caches(self, *models):
for m in models:
if hasattr(m._meta, '_related_objects_cache'):
del m._meta._related_objects_cache
def order_models(self, *models):
cache.app_models["delete"].keyOrder = models
class OnDeleteTests(TestCase):
def setUp(self):
self.order_models("a", "b", "c", "d", "e", "f")
self.clear_rel_obj_caches(A, B, C, D, E, F)
self.DEFAULT = get_default_r()
def tearDown(self):
self.order_models("a", "b", "c", "d", "e", "f")
self.clear_rel_obj_caches(A, B, C, D, E, F)
def test_auto(self):
a = create_a('auto')
a.auto.delete()
self.assertFalse(A.objects.filter(name='auto').exists())
def test_collected_objects(self):
g = CollectedObjects()
self.assertFalse(g.add("key1", 1, "item1", None))
self.assertEqual(g["key1"], {1: "item1"})
def test_auto_nullable(self):
a = create_a('auto_nullable')
a.auto_nullable.delete()
self.assertFalse(A.objects.filter(name='auto_nullable').exists())
self.assertFalse(g.add("key2", 1, "item1", "key1"))
self.assertFalse(g.add("key2", 2, "item2", "key1"))
def test_setvalue(self):
a = create_a('setvalue')
a.setvalue.delete()
a = A.objects.get(pk=a.pk)
self.assertEqual(self.DEFAULT, a.setvalue)
self.assertEqual(g["key2"], {1: "item1", 2: "item2"})
def test_setnull(self):
a = create_a('setnull')
a.setnull.delete()
a = A.objects.get(pk=a.pk)
self.assertEqual(None, a.setnull)
self.assertFalse(g.add("key3", 1, "item1", "key1"))
self.assertTrue(g.add("key3", 1, "item1", "key2"))
self.assertEqual(g.ordered_keys(), ["key3", "key2", "key1"])
def test_setdefault(self):
a = create_a('setdefault')
a.setdefault.delete()
a = A.objects.get(pk=a.pk)
self.assertEqual(self.DEFAULT, a.setdefault)
self.assertTrue(g.add("key2", 1, "item1", "key3"))
self.assertRaises(CyclicDependency, g.ordered_keys)
def test_setdefault_none(self):
a = create_a('setdefault_none')
a.setdefault_none.delete()
a = A.objects.get(pk=a.pk)
self.assertEqual(None, a.setdefault_none)
def test_delete(self):
## Second, test the usage of CollectedObjects by Model.delete()
def test_cascade(self):
a = create_a('cascade')
a.cascade.delete()
self.assertFalse(A.objects.filter(name='cascade').exists())
# Due to the way that transactions work in the test harness, doing
# m.delete() here can work but fail in a real situation, since it may
# delete all objects, but not in the right order. So we manually check
# that the order of deletion is correct.
def test_cascade_nullable(self):
a = create_a('cascade_nullable')
a.cascade_nullable.delete()
self.assertFalse(A.objects.filter(name='cascade_nullable').exists())
# Also, it is possible that the order is correct 'accidentally', due
# solely to order of imports etc. To check this, we set the order that
# 'get_models()' will retrieve to a known 'nice' order, and then try
# again with a known 'tricky' order. Slightly naughty access to
# internals here :-)
def test_protect(self):
a = create_a('protect')
self.assertRaises(IntegrityError, a.protect.delete)
# If implementation changes, then the tests may need to be simplified:
# - remove the lines that set the .keyOrder and clear the related
# object caches
# - remove the second set of tests (with a2, b2 etc)
def test_do_nothing(self):
# Testing DO_NOTHING is a bit harder: It would raise IntegrityError for a normal model,
# so we connect to pre_delete and set the fk to a known value.
replacement_r = R.objects.create()
def check_do_nothing(sender, **kwargs):
obj = kwargs['instance']
obj.donothing_set.update(donothing=replacement_r)
models.signals.pre_delete.connect(check_do_nothing)
a = create_a('do_nothing')
a.donothing.delete()
a = A.objects.get(pk=a.pk)
self.assertEqual(replacement_r, a.donothing)
models.signals.pre_delete.disconnect(check_do_nothing)
a1 = A.objects.create()
b1 = B.objects.create(a=a1)
c1 = C.objects.create(b=b1)
d1 = D.objects.create(c=c1, a=a1)
def test_inheritance_cascade_up(self):
child = RChild.objects.create()
child.delete()
self.assertFalse(R.objects.filter(pk=child.pk).exists())
o = CollectedObjects()
a1._collect_sub_objects(o)
self.assertEqual(o.keys(), [D, C, B, A])
a1.delete()
def test_inheritance_cascade_down(self):
child = RChild.objects.create()
parent = child.r_ptr
parent.delete()
self.assertFalse(RChild.objects.filter(pk=child.pk).exists())
# Same again with a known bad order
self.order_models("d", "c", "b", "a")
self.clear_rel_obj_caches(A, B, C, D)
def test_cascade_from_child(self):
a = create_a('child')
a.child.delete()
self.assertFalse(A.objects.filter(name='child').exists())
self.assertFalse(R.objects.filter(pk=a.child_id).exists())
a2 = A.objects.create()
b2 = B.objects.create(a=a2)
c2 = C.objects.create(b=b2)
d2 = D.objects.create(c=c2, a=a2)
def test_cascade_from_parent(self):
a = create_a('child')
R.objects.get(pk=a.child_id).delete()
self.assertFalse(A.objects.filter(name='child').exists())
self.assertFalse(RChild.objects.filter(pk=a.child_id).exists())
o = CollectedObjects()
a2._collect_sub_objects(o)
self.assertEqual(o.keys(), [D, C, B, A])
a2.delete()
def test_setnull_from_child(self):
a = create_a('child_setnull')
a.child_setnull.delete()
self.assertFalse(R.objects.filter(pk=a.child_setnull_id).exists())
def test_collected_objects_null(self):
g = CollectedObjects()
self.assertFalse(g.add("key1", 1, "item1", None))
self.assertFalse(g.add("key2", 1, "item1", "key1", nullable=True))
self.assertTrue(g.add("key1", 1, "item1", "key2"))
self.assertEqual(g.ordered_keys(), ["key1", "key2"])
a = A.objects.get(pk=a.pk)
self.assertEqual(None, a.child_setnull)
def test_delete_nullable(self):
e1 = E.objects.create()
f1 = F.objects.create(e=e1)
e1.f = f1
e1.save()
def test_setnull_from_parent(self):
a = create_a('child_setnull')
R.objects.get(pk=a.child_setnull_id).delete()
self.assertFalse(RChild.objects.filter(pk=a.child_setnull_id).exists())
# Since E.f is nullable, we should delete F first (after nulling out
# the E.f field), then E.
a = A.objects.get(pk=a.pk)
self.assertEqual(None, a.child_setnull)
o = CollectedObjects()
e1._collect_sub_objects(o)
self.assertEqual(o.keys(), [F, E])
def test_o2o_setnull(self):
a = create_a('o2o_setnull')
a.o2o_setnull.delete()
a = A.objects.get(pk=a.pk)
self.assertEqual(None, a.o2o_setnull)
# temporarily replace the UpdateQuery class to verify that E.f is
# actually nulled out first
logged = []
class LoggingUpdateQuery(sql.UpdateQuery):
def clear_related(self, related_field, pk_list, using):
logged.append(related_field.name)
return super(LoggingUpdateQuery, self).clear_related(related_field, pk_list, using)
original = sql.UpdateQuery
sql.UpdateQuery = LoggingUpdateQuery
class DeletionTests(TestCase):
def test_m2m(self):
m = M.objects.create()
r = R.objects.create()
MR.objects.create(m=m, r=r)
r.delete()
self.assertFalse(MR.objects.exists())
e1.delete()
self.assertEqual(logged, ["f"])
logged = []
r = R.objects.create()
MR.objects.create(m=m, r=r)
m.delete()
self.assertFalse(MR.objects.exists())
e2 = E.objects.create()
f2 = F.objects.create(e=e2)
e2.f = f2
e2.save()
m = M.objects.create()
r = R.objects.create()
m.m2m.add(r)
r.delete()
through = M._meta.get_field('m2m').rel.through
self.assertFalse(through.objects.exists())
# Same deal as before, though we are starting from the other object.
o = CollectedObjects()
f2._collect_sub_objects(o)
self.assertEqual(o.keys(), [F, E])
f2.delete()
self.assertEqual(logged, ["f"])
logged = []
r = R.objects.create()
m.m2m.add(r)
m.delete()
self.assertFalse(through.objects.exists())
sql.UpdateQuery = original
m = M.objects.create()
r = R.objects.create()
MRNull.objects.create(m=m, r=r)
r.delete()
self.assertFalse(not MRNull.objects.exists())
self.assertFalse(m.m2m_through_null.exists())
def test_bulk(self):
from django.db.models.sql.constants import GET_ITERATOR_CHUNK_SIZE
s = S.objects.create(r=R.objects.create())
for i in xrange(2*GET_ITERATOR_CHUNK_SIZE):
T.objects.create(s=s)
# 1 (select related `T` instances)
# + 1 (select related `U` instances)
# + 2 (delete `T` instances in batches)
# + 1 (delete `s`)
self.assertNumQueries(5, s.delete)
self.assertFalse(S.objects.exists())
def test_instance_update(self):
deleted = []
related_setnull_sets = []
def pre_delete(sender, **kwargs):
obj = kwargs['instance']
deleted.append(obj)
if isinstance(obj, R):
related_setnull_sets.append(list(a.pk for a in obj.setnull_set.all()))
models.signals.pre_delete.connect(pre_delete)
a = create_a('update_setnull')
a.setnull.delete()
a = create_a('update_cascade')
a.cascade.delete()
for obj in deleted:
self.assertEqual(None, obj.pk)
for pk_list in related_setnull_sets:
for a in A.objects.filter(id__in=pk_list):
self.assertEqual(None, a.setnull)
models.signals.pre_delete.disconnect(pre_delete)
def test_deletion_order(self):
pre_delete_order = []
post_delete_order = []
def log_post_delete(sender, **kwargs):
pre_delete_order.append((sender, kwargs['instance'].pk))
def log_pre_delete(sender, **kwargs):
post_delete_order.append((sender, kwargs['instance'].pk))
models.signals.post_delete.connect(log_post_delete)
models.signals.pre_delete.connect(log_pre_delete)
r = R.objects.create(pk=1)
s1 = S.objects.create(pk=1, r=r)
s2 = S.objects.create(pk=2, r=r)
t1 = T.objects.create(pk=1, s=s1)
t2 = T.objects.create(pk=2, s=s2)
r.delete()
self.assertEqual(
pre_delete_order, [(T, 2), (T, 1), (S, 2), (S, 1), (R, 1)]
)
self.assertEqual(
post_delete_order, [(T, 1), (T, 2), (S, 1), (S, 2), (R, 1)]
)
models.signals.post_delete.disconnect(log_post_delete)
models.signals.post_delete.disconnect(log_pre_delete)
@skipUnlessDBFeature("can_defer_constraint_checks")
def test_can_defer_constraint_checks(self):
u = User.objects.create(
avatar=Avatar.objects.create()
)
a = Avatar.objects.get(pk=u.avatar_id)
# 1 query to find the users for the avatar.
# 1 query to delete the user
# 1 query to delete the avatar
# The important thing is that when we can defer constraint checks there
# is no need to do an UPDATE on User.avatar to null it out.
self.assertNumQueries(3, a.delete)
self.assertFalse(User.objects.exists())
self.assertFalse(Avatar.objects.exists())
@skipIfDBFeature("can_defer_constraint_checks")
def test_cannot_defer_constraint_checks(self):
u = User.objects.create(
avatar=Avatar.objects.create()
)
a = Avatar.objects.get(pk=u.avatar_id)
# 1 query to find the users for the avatar.
# 1 query to delete the user
# 1 query to null out user.avatar, because we can't defer the constraint
# 1 query to delete the avatar
self.assertNumQueries(4, a.delete)
self.assertFalse(User.objects.exists())
self.assertFalse(Avatar.objects.exists())
def test_hidden_related(self):
r = R.objects.create()
h = HiddenUser.objects.create(r=r)
p = HiddenUserProfile.objects.create(user=h)
r.delete()
self.assertEqual(HiddenUserProfile.objects.count(), 0)