1
0
mirror of https://github.com/django/django.git synced 2024-11-18 07:26:04 +00:00
django/tests/transaction_hooks/tests.py
Tim Graham 6e4c6281db Reverted "Fixed #27818 -- Replaced try/except/pass with contextlib.suppress()."
This reverts commit 550cb3a365
because try/except performs better.
2017-09-07 08:16:21 -04:00

236 lines
6.8 KiB
Python

from django.db import connection, transaction
from django.test import TransactionTestCase, skipUnlessDBFeature
from .models import Thing
class ForcedError(Exception):
pass
class TestConnectionOnCommit(TransactionTestCase):
"""
Tests for transaction.on_commit().
Creation/checking of database objects in parallel with callback tracking is
to verify that the behavior of the two match in all tested cases.
"""
available_apps = ['transaction_hooks']
def setUp(self):
self.notified = []
def notify(self, id_):
if id_ == 'error':
raise ForcedError()
self.notified.append(id_)
def do(self, num):
"""Create a Thing instance and notify about it."""
Thing.objects.create(num=num)
transaction.on_commit(lambda: self.notify(num))
def assertDone(self, nums):
self.assertNotified(nums)
self.assertEqual(sorted(t.num for t in Thing.objects.all()), sorted(nums))
def assertNotified(self, nums):
self.assertEqual(self.notified, nums)
def test_executes_immediately_if_no_transaction(self):
self.do(1)
self.assertDone([1])
def test_delays_execution_until_after_transaction_commit(self):
with transaction.atomic():
self.do(1)
self.assertNotified([])
self.assertDone([1])
def test_does_not_execute_if_transaction_rolled_back(self):
try:
with transaction.atomic():
self.do(1)
raise ForcedError()
except ForcedError:
pass
self.assertDone([])
def test_executes_only_after_final_transaction_committed(self):
with transaction.atomic():
with transaction.atomic():
self.do(1)
self.assertNotified([])
self.assertNotified([])
self.assertDone([1])
def test_discards_hooks_from_rolled_back_savepoint(self):
with transaction.atomic():
# one successful savepoint
with transaction.atomic():
self.do(1)
# one failed savepoint
try:
with transaction.atomic():
self.do(2)
raise ForcedError()
except ForcedError:
pass
# another successful savepoint
with transaction.atomic():
self.do(3)
# only hooks registered during successful savepoints execute
self.assertDone([1, 3])
def test_no_hooks_run_from_failed_transaction(self):
"""If outer transaction fails, no hooks from within it run."""
try:
with transaction.atomic():
with transaction.atomic():
self.do(1)
raise ForcedError()
except ForcedError:
pass
self.assertDone([])
def test_inner_savepoint_rolled_back_with_outer(self):
with transaction.atomic():
try:
with transaction.atomic():
with transaction.atomic():
self.do(1)
raise ForcedError()
except ForcedError:
pass
self.do(2)
self.assertDone([2])
def test_no_savepoints_atomic_merged_with_outer(self):
with transaction.atomic():
with transaction.atomic():
self.do(1)
try:
with transaction.atomic(savepoint=False):
raise ForcedError()
except ForcedError:
pass
self.assertDone([])
def test_inner_savepoint_does_not_affect_outer(self):
with transaction.atomic():
with transaction.atomic():
self.do(1)
try:
with transaction.atomic():
raise ForcedError()
except ForcedError:
pass
self.assertDone([1])
def test_runs_hooks_in_order_registered(self):
with transaction.atomic():
self.do(1)
with transaction.atomic():
self.do(2)
self.do(3)
self.assertDone([1, 2, 3])
def test_hooks_cleared_after_successful_commit(self):
with transaction.atomic():
self.do(1)
with transaction.atomic():
self.do(2)
self.assertDone([1, 2]) # not [1, 1, 2]
def test_hooks_cleared_after_rollback(self):
try:
with transaction.atomic():
self.do(1)
raise ForcedError()
except ForcedError:
pass
with transaction.atomic():
self.do(2)
self.assertDone([2])
@skipUnlessDBFeature('test_db_allows_multiple_connections')
def test_hooks_cleared_on_reconnect(self):
with transaction.atomic():
self.do(1)
connection.close()
connection.connect()
with transaction.atomic():
self.do(2)
self.assertDone([2])
def test_error_in_hook_doesnt_prevent_clearing_hooks(self):
try:
with transaction.atomic():
transaction.on_commit(lambda: self.notify('error'))
except ForcedError:
pass
with transaction.atomic():
self.do(1)
self.assertDone([1])
def test_db_query_in_hook(self):
with transaction.atomic():
Thing.objects.create(num=1)
transaction.on_commit(
lambda: [self.notify(t.num) for t in Thing.objects.all()]
)
self.assertDone([1])
def test_transaction_in_hook(self):
def on_commit():
with transaction.atomic():
t = Thing.objects.create(num=1)
self.notify(t.num)
with transaction.atomic():
transaction.on_commit(on_commit)
self.assertDone([1])
def test_hook_in_hook(self):
def on_commit(i, add_hook):
with transaction.atomic():
if add_hook:
transaction.on_commit(lambda: on_commit(i + 10, False))
t = Thing.objects.create(num=i)
self.notify(t.num)
with transaction.atomic():
transaction.on_commit(lambda: on_commit(1, True))
transaction.on_commit(lambda: on_commit(2, True))
self.assertDone([1, 11, 2, 12])
def test_raises_exception_non_autocommit_mode(self):
def should_never_be_called():
raise AssertionError('this function should never be called')
try:
connection.set_autocommit(False)
msg = 'on_commit() cannot be used in manual transaction management'
with self.assertRaisesMessage(transaction.TransactionManagementError, msg):
transaction.on_commit(should_never_be_called)
finally:
connection.set_autocommit(True)