mirror of
				https://github.com/django/django.git
				synced 2025-10-26 07:06:08 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			236 lines
		
	
	
		
			6.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			236 lines
		
	
	
		
			6.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from django.db import connection, transaction
 | |
| from django.test import TransactionTestCase, skipUnlessDBFeature
 | |
| 
 | |
| from .models import Thing
 | |
| 
 | |
| 
 | |
| class ForcedError(Exception):
 | |
|     pass
 | |
| 
 | |
| 
 | |
| class TestConnectionOnCommit(TransactionTestCase):
 | |
|     """
 | |
|     Tests for transaction.on_commit().
 | |
| 
 | |
|     Creation/checking of database objects in parallel with callback tracking is
 | |
|     to verify that the behavior of the two match in all tested cases.
 | |
|     """
 | |
|     available_apps = ['transaction_hooks']
 | |
| 
 | |
|     def setUp(self):
 | |
|         self.notified = []
 | |
| 
 | |
|     def notify(self, id_):
 | |
|         if id_ == 'error':
 | |
|             raise ForcedError()
 | |
|         self.notified.append(id_)
 | |
| 
 | |
|     def do(self, num):
 | |
|         """Create a Thing instance and notify about it."""
 | |
|         Thing.objects.create(num=num)
 | |
|         transaction.on_commit(lambda: self.notify(num))
 | |
| 
 | |
|     def assertDone(self, nums):
 | |
|         self.assertNotified(nums)
 | |
|         self.assertEqual(sorted(t.num for t in Thing.objects.all()), sorted(nums))
 | |
| 
 | |
|     def assertNotified(self, nums):
 | |
|         self.assertEqual(self.notified, nums)
 | |
| 
 | |
|     def test_executes_immediately_if_no_transaction(self):
 | |
|         self.do(1)
 | |
|         self.assertDone([1])
 | |
| 
 | |
|     def test_delays_execution_until_after_transaction_commit(self):
 | |
|         with transaction.atomic():
 | |
|             self.do(1)
 | |
|             self.assertNotified([])
 | |
|         self.assertDone([1])
 | |
| 
 | |
|     def test_does_not_execute_if_transaction_rolled_back(self):
 | |
|         try:
 | |
|             with transaction.atomic():
 | |
|                 self.do(1)
 | |
|                 raise ForcedError()
 | |
|         except ForcedError:
 | |
|             pass
 | |
| 
 | |
|         self.assertDone([])
 | |
| 
 | |
|     def test_executes_only_after_final_transaction_committed(self):
 | |
|         with transaction.atomic():
 | |
|             with transaction.atomic():
 | |
|                 self.do(1)
 | |
|                 self.assertNotified([])
 | |
|             self.assertNotified([])
 | |
|         self.assertDone([1])
 | |
| 
 | |
|     def test_discards_hooks_from_rolled_back_savepoint(self):
 | |
|         with transaction.atomic():
 | |
|             # one successful savepoint
 | |
|             with transaction.atomic():
 | |
|                 self.do(1)
 | |
|             # one failed savepoint
 | |
|             try:
 | |
|                 with transaction.atomic():
 | |
|                     self.do(2)
 | |
|                     raise ForcedError()
 | |
|             except ForcedError:
 | |
|                 pass
 | |
|             # another successful savepoint
 | |
|             with transaction.atomic():
 | |
|                 self.do(3)
 | |
| 
 | |
|         # only hooks registered during successful savepoints execute
 | |
|         self.assertDone([1, 3])
 | |
| 
 | |
|     def test_no_hooks_run_from_failed_transaction(self):
 | |
|         """If outer transaction fails, no hooks from within it run."""
 | |
|         try:
 | |
|             with transaction.atomic():
 | |
|                 with transaction.atomic():
 | |
|                     self.do(1)
 | |
|                 raise ForcedError()
 | |
|         except ForcedError:
 | |
|             pass
 | |
| 
 | |
|         self.assertDone([])
 | |
| 
 | |
|     def test_inner_savepoint_rolled_back_with_outer(self):
 | |
|         with transaction.atomic():
 | |
|             try:
 | |
|                 with transaction.atomic():
 | |
|                     with transaction.atomic():
 | |
|                         self.do(1)
 | |
|                     raise ForcedError()
 | |
|             except ForcedError:
 | |
|                 pass
 | |
|             self.do(2)
 | |
| 
 | |
|         self.assertDone([2])
 | |
| 
 | |
|     def test_no_savepoints_atomic_merged_with_outer(self):
 | |
|         with transaction.atomic():
 | |
|             with transaction.atomic():
 | |
|                 self.do(1)
 | |
|                 try:
 | |
|                     with transaction.atomic(savepoint=False):
 | |
|                         raise ForcedError()
 | |
|                 except ForcedError:
 | |
|                     pass
 | |
| 
 | |
|         self.assertDone([])
 | |
| 
 | |
|     def test_inner_savepoint_does_not_affect_outer(self):
 | |
|         with transaction.atomic():
 | |
|             with transaction.atomic():
 | |
|                 self.do(1)
 | |
|                 try:
 | |
|                     with transaction.atomic():
 | |
|                         raise ForcedError()
 | |
|                 except ForcedError:
 | |
|                     pass
 | |
| 
 | |
|         self.assertDone([1])
 | |
| 
 | |
|     def test_runs_hooks_in_order_registered(self):
 | |
|         with transaction.atomic():
 | |
|             self.do(1)
 | |
|             with transaction.atomic():
 | |
|                 self.do(2)
 | |
|             self.do(3)
 | |
| 
 | |
|         self.assertDone([1, 2, 3])
 | |
| 
 | |
|     def test_hooks_cleared_after_successful_commit(self):
 | |
|         with transaction.atomic():
 | |
|             self.do(1)
 | |
|         with transaction.atomic():
 | |
|             self.do(2)
 | |
| 
 | |
|         self.assertDone([1, 2])  # not [1, 1, 2]
 | |
| 
 | |
|     def test_hooks_cleared_after_rollback(self):
 | |
|         try:
 | |
|             with transaction.atomic():
 | |
|                 self.do(1)
 | |
|                 raise ForcedError()
 | |
|         except ForcedError:
 | |
|             pass
 | |
| 
 | |
|         with transaction.atomic():
 | |
|             self.do(2)
 | |
| 
 | |
|         self.assertDone([2])
 | |
| 
 | |
|     @skipUnlessDBFeature('test_db_allows_multiple_connections')
 | |
|     def test_hooks_cleared_on_reconnect(self):
 | |
|         with transaction.atomic():
 | |
|             self.do(1)
 | |
|             connection.close()
 | |
| 
 | |
|         connection.connect()
 | |
| 
 | |
|         with transaction.atomic():
 | |
|             self.do(2)
 | |
| 
 | |
|         self.assertDone([2])
 | |
| 
 | |
|     def test_error_in_hook_doesnt_prevent_clearing_hooks(self):
 | |
|         try:
 | |
|             with transaction.atomic():
 | |
|                 transaction.on_commit(lambda: self.notify('error'))
 | |
|         except ForcedError:
 | |
|             pass
 | |
| 
 | |
|         with transaction.atomic():
 | |
|             self.do(1)
 | |
| 
 | |
|         self.assertDone([1])
 | |
| 
 | |
|     def test_db_query_in_hook(self):
 | |
|         with transaction.atomic():
 | |
|             Thing.objects.create(num=1)
 | |
|             transaction.on_commit(
 | |
|                 lambda: [self.notify(t.num) for t in Thing.objects.all()]
 | |
|             )
 | |
| 
 | |
|         self.assertDone([1])
 | |
| 
 | |
|     def test_transaction_in_hook(self):
 | |
|         def on_commit():
 | |
|             with transaction.atomic():
 | |
|                 t = Thing.objects.create(num=1)
 | |
|                 self.notify(t.num)
 | |
| 
 | |
|         with transaction.atomic():
 | |
|             transaction.on_commit(on_commit)
 | |
| 
 | |
|         self.assertDone([1])
 | |
| 
 | |
|     def test_hook_in_hook(self):
 | |
|         def on_commit(i, add_hook):
 | |
|             with transaction.atomic():
 | |
|                 if add_hook:
 | |
|                     transaction.on_commit(lambda: on_commit(i + 10, False))
 | |
|                 t = Thing.objects.create(num=i)
 | |
|                 self.notify(t.num)
 | |
| 
 | |
|         with transaction.atomic():
 | |
|             transaction.on_commit(lambda: on_commit(1, True))
 | |
|             transaction.on_commit(lambda: on_commit(2, True))
 | |
| 
 | |
|         self.assertDone([1, 11, 2, 12])
 | |
| 
 | |
|     def test_raises_exception_non_autocommit_mode(self):
 | |
|         def should_never_be_called():
 | |
|             raise AssertionError('this function should never be called')
 | |
| 
 | |
|         try:
 | |
|             connection.set_autocommit(False)
 | |
|             msg = 'on_commit() cannot be used in manual transaction management'
 | |
|             with self.assertRaisesMessage(transaction.TransactionManagementError, msg):
 | |
|                 transaction.on_commit(should_never_be_called)
 | |
|         finally:
 | |
|             connection.set_autocommit(True)
 |