mirror of
https://github.com/django/django.git
synced 2024-11-20 08:24:58 +00:00
7aacde84f2
enter_transaction_management() was nearly always followed by managed(). In three places it wasn't, but they will all be refactored eventually. The "forced" keyword argument avoids introducing behavior changes until then. This is mostly backwards-compatible, except, of course, for managed itself. There's a minor difference in _enter_transaction_management: the top self.transaction_state now contains the new 'managed' state rather than the previous one. Django doesn't access self.transaction_state in _enter_transaction_management.
368 lines
14 KiB
Python
368 lines
14 KiB
Python
from __future__ import absolute_import
|
|
|
|
from django.db import connection, connections, transaction, DEFAULT_DB_ALIAS, DatabaseError
|
|
from django.db.transaction import commit_on_success, commit_manually, TransactionManagementError
|
|
from django.test import TransactionTestCase, skipUnlessDBFeature
|
|
from django.test.utils import override_settings
|
|
from django.utils.unittest import skipIf, skipUnless
|
|
|
|
from .models import Mod, M2mA, M2mB
|
|
|
|
|
|
class TestTransactionClosing(TransactionTestCase):
|
|
"""
|
|
Tests to make sure that transactions are properly closed
|
|
when they should be, and aren't left pending after operations
|
|
have been performed in them. Refs #9964.
|
|
"""
|
|
def test_raw_committed_on_success(self):
|
|
"""
|
|
Make sure a transaction consisting of raw SQL execution gets
|
|
committed by the commit_on_success decorator.
|
|
"""
|
|
@commit_on_success
|
|
def raw_sql():
|
|
"Write a record using raw sql under a commit_on_success decorator"
|
|
cursor = connection.cursor()
|
|
cursor.execute("INSERT into transactions_regress_mod (fld) values (18)")
|
|
|
|
raw_sql()
|
|
# Rollback so that if the decorator didn't commit, the record is unwritten
|
|
transaction.rollback()
|
|
self.assertEqual(Mod.objects.count(), 1)
|
|
# Check that the record is in the DB
|
|
obj = Mod.objects.all()[0]
|
|
self.assertEqual(obj.fld, 18)
|
|
|
|
def test_commit_manually_enforced(self):
|
|
"""
|
|
Make sure that under commit_manually, even "read-only" transaction require closure
|
|
(commit or rollback), and a transaction left pending is treated as an error.
|
|
"""
|
|
@commit_manually
|
|
def non_comitter():
|
|
"Execute a managed transaction with read-only operations and fail to commit"
|
|
Mod.objects.count()
|
|
|
|
self.assertRaises(TransactionManagementError, non_comitter)
|
|
|
|
def test_commit_manually_commit_ok(self):
|
|
"""
|
|
Test that under commit_manually, a committed transaction is accepted by the transaction
|
|
management mechanisms
|
|
"""
|
|
@commit_manually
|
|
def committer():
|
|
"""
|
|
Perform a database query, then commit the transaction
|
|
"""
|
|
Mod.objects.count()
|
|
transaction.commit()
|
|
|
|
try:
|
|
committer()
|
|
except TransactionManagementError:
|
|
self.fail("Commit did not clear the transaction state")
|
|
|
|
def test_commit_manually_rollback_ok(self):
|
|
"""
|
|
Test that under commit_manually, a rolled-back transaction is accepted by the transaction
|
|
management mechanisms
|
|
"""
|
|
@commit_manually
|
|
def roller_back():
|
|
"""
|
|
Perform a database query, then rollback the transaction
|
|
"""
|
|
Mod.objects.count()
|
|
transaction.rollback()
|
|
|
|
try:
|
|
roller_back()
|
|
except TransactionManagementError:
|
|
self.fail("Rollback did not clear the transaction state")
|
|
|
|
def test_commit_manually_enforced_after_commit(self):
|
|
"""
|
|
Test that under commit_manually, if a transaction is committed and an operation is
|
|
performed later, we still require the new transaction to be closed
|
|
"""
|
|
@commit_manually
|
|
def fake_committer():
|
|
"Query, commit, then query again, leaving with a pending transaction"
|
|
Mod.objects.count()
|
|
transaction.commit()
|
|
Mod.objects.count()
|
|
|
|
self.assertRaises(TransactionManagementError, fake_committer)
|
|
|
|
@skipUnlessDBFeature('supports_transactions')
|
|
def test_reuse_cursor_reference(self):
|
|
"""
|
|
Make sure transaction closure is enforced even when the queries are performed
|
|
through a single cursor reference retrieved in the beginning
|
|
(this is to show why it is wrong to set the transaction dirty only when a cursor
|
|
is fetched from the connection).
|
|
"""
|
|
@commit_on_success
|
|
def reuse_cursor_ref():
|
|
"""
|
|
Fetch a cursor, perform an query, rollback to close the transaction,
|
|
then write a record (in a new transaction) using the same cursor object
|
|
(reference). All this under commit_on_success, so the second insert should
|
|
be committed.
|
|
"""
|
|
cursor = connection.cursor()
|
|
cursor.execute("INSERT into transactions_regress_mod (fld) values (2)")
|
|
transaction.rollback()
|
|
cursor.execute("INSERT into transactions_regress_mod (fld) values (2)")
|
|
|
|
reuse_cursor_ref()
|
|
# Rollback so that if the decorator didn't commit, the record is unwritten
|
|
transaction.rollback()
|
|
self.assertEqual(Mod.objects.count(), 1)
|
|
obj = Mod.objects.all()[0]
|
|
self.assertEqual(obj.fld, 2)
|
|
|
|
def test_failing_query_transaction_closed(self):
|
|
"""
|
|
Make sure that under commit_on_success, a transaction is rolled back even if
|
|
the first database-modifying operation fails.
|
|
This is prompted by http://code.djangoproject.com/ticket/6669 (and based on sample
|
|
code posted there to exemplify the problem): Before Django 1.3,
|
|
transactions were only marked "dirty" by the save() function after it successfully
|
|
wrote the object to the database.
|
|
"""
|
|
from django.contrib.auth.models import User
|
|
|
|
@transaction.commit_on_success
|
|
def create_system_user():
|
|
"Create a user in a transaction"
|
|
user = User.objects.create_user(username='system', password='iamr00t',
|
|
email='root@SITENAME.com')
|
|
# Redundant, just makes sure the user id was read back from DB
|
|
Mod.objects.create(fld=user.pk)
|
|
|
|
# Create a user
|
|
create_system_user()
|
|
|
|
with self.assertRaises(DatabaseError):
|
|
# The second call to create_system_user should fail for violating
|
|
# a unique constraint (it's trying to re-create the same user)
|
|
create_system_user()
|
|
|
|
# Try to read the database. If the last transaction was indeed closed,
|
|
# this should cause no problems
|
|
User.objects.all()[0]
|
|
|
|
@override_settings(DEBUG=True)
|
|
def test_failing_query_transaction_closed_debug(self):
|
|
"""
|
|
Regression for #6669. Same test as above, with DEBUG=True.
|
|
"""
|
|
self.test_failing_query_transaction_closed()
|
|
|
|
@skipIf(connection.vendor == 'sqlite' and
|
|
(connection.settings_dict['NAME'] == ':memory:' or
|
|
not connection.settings_dict['NAME']),
|
|
'Test uses multiple connections, but in-memory sqlite does not support this')
|
|
class TestNewConnection(TransactionTestCase):
|
|
"""
|
|
Check that new connections don't have special behaviour.
|
|
"""
|
|
def setUp(self):
|
|
self._old_backend = connections[DEFAULT_DB_ALIAS]
|
|
settings = self._old_backend.settings_dict.copy()
|
|
opts = settings['OPTIONS'].copy()
|
|
if 'autocommit' in opts:
|
|
opts['autocommit'] = False
|
|
settings['OPTIONS'] = opts
|
|
new_backend = self._old_backend.__class__(settings, DEFAULT_DB_ALIAS)
|
|
connections[DEFAULT_DB_ALIAS] = new_backend
|
|
|
|
def tearDown(self):
|
|
try:
|
|
connections[DEFAULT_DB_ALIAS].abort()
|
|
except Exception:
|
|
import ipdb; ipdb.set_trace()
|
|
finally:
|
|
connections[DEFAULT_DB_ALIAS].close()
|
|
connections[DEFAULT_DB_ALIAS] = self._old_backend
|
|
|
|
def test_commit(self):
|
|
"""
|
|
Users are allowed to commit and rollback connections.
|
|
"""
|
|
# The starting value is False, not None.
|
|
self.assertIs(connection._dirty, False)
|
|
list(Mod.objects.all())
|
|
self.assertTrue(connection.is_dirty())
|
|
connection.commit()
|
|
self.assertFalse(connection.is_dirty())
|
|
list(Mod.objects.all())
|
|
self.assertTrue(connection.is_dirty())
|
|
connection.rollback()
|
|
self.assertFalse(connection.is_dirty())
|
|
|
|
def test_enter_exit_management(self):
|
|
orig_dirty = connection._dirty
|
|
connection.enter_transaction_management()
|
|
connection.leave_transaction_management()
|
|
self.assertEqual(orig_dirty, connection._dirty)
|
|
|
|
def test_commit_unless_managed(self):
|
|
cursor = connection.cursor()
|
|
cursor.execute("INSERT into transactions_regress_mod (fld) values (2)")
|
|
connection.commit_unless_managed()
|
|
self.assertFalse(connection.is_dirty())
|
|
self.assertEqual(len(Mod.objects.all()), 1)
|
|
self.assertTrue(connection.is_dirty())
|
|
connection.commit_unless_managed()
|
|
self.assertFalse(connection.is_dirty())
|
|
|
|
def test_commit_unless_managed_in_managed(self):
|
|
cursor = connection.cursor()
|
|
connection.enter_transaction_management()
|
|
cursor.execute("INSERT into transactions_regress_mod (fld) values (2)")
|
|
connection.commit_unless_managed()
|
|
self.assertTrue(connection.is_dirty())
|
|
connection.rollback()
|
|
self.assertFalse(connection.is_dirty())
|
|
self.assertEqual(len(Mod.objects.all()), 0)
|
|
connection.commit()
|
|
connection.leave_transaction_management()
|
|
self.assertFalse(connection.is_dirty())
|
|
self.assertEqual(len(Mod.objects.all()), 0)
|
|
self.assertTrue(connection.is_dirty())
|
|
connection.commit_unless_managed()
|
|
self.assertFalse(connection.is_dirty())
|
|
self.assertEqual(len(Mod.objects.all()), 0)
|
|
|
|
|
|
@skipUnless(connection.vendor == 'postgresql',
|
|
"This test only valid for PostgreSQL")
|
|
class TestPostgresAutocommitAndIsolation(TransactionTestCase):
|
|
"""
|
|
Tests to make sure psycopg2's autocommit mode and isolation level
|
|
is restored after entering and leaving transaction management.
|
|
Refs #16047, #18130.
|
|
"""
|
|
def setUp(self):
|
|
from psycopg2.extensions import (ISOLATION_LEVEL_AUTOCOMMIT,
|
|
ISOLATION_LEVEL_SERIALIZABLE,
|
|
TRANSACTION_STATUS_IDLE)
|
|
self._autocommit = ISOLATION_LEVEL_AUTOCOMMIT
|
|
self._serializable = ISOLATION_LEVEL_SERIALIZABLE
|
|
self._idle = TRANSACTION_STATUS_IDLE
|
|
|
|
# We want a clean backend with autocommit = True, so
|
|
# first we need to do a bit of work to have that.
|
|
self._old_backend = connections[DEFAULT_DB_ALIAS]
|
|
settings = self._old_backend.settings_dict.copy()
|
|
opts = settings['OPTIONS'].copy()
|
|
opts['autocommit'] = True
|
|
opts['isolation_level'] = ISOLATION_LEVEL_SERIALIZABLE
|
|
settings['OPTIONS'] = opts
|
|
new_backend = self._old_backend.__class__(settings, DEFAULT_DB_ALIAS)
|
|
connections[DEFAULT_DB_ALIAS] = new_backend
|
|
|
|
def tearDown(self):
|
|
try:
|
|
connections[DEFAULT_DB_ALIAS].abort()
|
|
finally:
|
|
connections[DEFAULT_DB_ALIAS].close()
|
|
connections[DEFAULT_DB_ALIAS] = self._old_backend
|
|
|
|
def test_initial_autocommit_state(self):
|
|
self.assertTrue(connection.features.uses_autocommit)
|
|
self.assertEqual(connection.isolation_level, self._autocommit)
|
|
|
|
def test_transaction_management(self):
|
|
transaction.enter_transaction_management()
|
|
self.assertEqual(connection.isolation_level, self._serializable)
|
|
|
|
transaction.leave_transaction_management()
|
|
self.assertEqual(connection.isolation_level, self._autocommit)
|
|
|
|
def test_transaction_stacking(self):
|
|
transaction.enter_transaction_management()
|
|
self.assertEqual(connection.isolation_level, self._serializable)
|
|
|
|
transaction.enter_transaction_management()
|
|
self.assertEqual(connection.isolation_level, self._serializable)
|
|
|
|
transaction.leave_transaction_management()
|
|
self.assertEqual(connection.isolation_level, self._serializable)
|
|
|
|
transaction.leave_transaction_management()
|
|
self.assertEqual(connection.isolation_level, self._autocommit)
|
|
|
|
def test_enter_autocommit(self):
|
|
transaction.enter_transaction_management()
|
|
self.assertEqual(connection.isolation_level, self._serializable)
|
|
list(Mod.objects.all())
|
|
self.assertTrue(transaction.is_dirty())
|
|
# Enter autocommit mode again.
|
|
transaction.enter_transaction_management(False)
|
|
self.assertFalse(transaction.is_dirty())
|
|
self.assertEqual(
|
|
connection.connection.get_transaction_status(),
|
|
self._idle)
|
|
list(Mod.objects.all())
|
|
self.assertFalse(transaction.is_dirty())
|
|
transaction.leave_transaction_management()
|
|
self.assertEqual(connection.isolation_level, self._serializable)
|
|
transaction.leave_transaction_management()
|
|
self.assertEqual(connection.isolation_level, self._autocommit)
|
|
|
|
|
|
class TestManyToManyAddTransaction(TransactionTestCase):
|
|
def test_manyrelated_add_commit(self):
|
|
"Test for https://code.djangoproject.com/ticket/16818"
|
|
a = M2mA.objects.create()
|
|
b = M2mB.objects.create(fld=10)
|
|
a.others.add(b)
|
|
|
|
# We're in a TransactionTestCase and have not changed transaction
|
|
# behavior from default of "autocommit", so this rollback should not
|
|
# actually do anything. If it does in fact undo our add, that's a bug
|
|
# that the bulk insert was not auto-committed.
|
|
transaction.rollback()
|
|
self.assertEqual(a.others.count(), 1)
|
|
|
|
|
|
class SavepointTest(TransactionTestCase):
|
|
|
|
@skipUnlessDBFeature('uses_savepoints')
|
|
def test_savepoint_commit(self):
|
|
@commit_manually
|
|
def work():
|
|
mod = Mod.objects.create(fld=1)
|
|
pk = mod.pk
|
|
sid = transaction.savepoint()
|
|
Mod.objects.filter(pk=pk).update(fld=10)
|
|
transaction.savepoint_commit(sid)
|
|
mod2 = Mod.objects.get(pk=pk)
|
|
transaction.commit()
|
|
self.assertEqual(mod2.fld, 10)
|
|
|
|
work()
|
|
|
|
@skipIf(connection.vendor == 'mysql' and
|
|
connection.features._mysql_storage_engine == 'MyISAM',
|
|
"MyISAM MySQL storage engine doesn't support savepoints")
|
|
@skipUnlessDBFeature('uses_savepoints')
|
|
def test_savepoint_rollback(self):
|
|
@commit_manually
|
|
def work():
|
|
mod = Mod.objects.create(fld=1)
|
|
pk = mod.pk
|
|
sid = transaction.savepoint()
|
|
Mod.objects.filter(pk=pk).update(fld=20)
|
|
transaction.savepoint_rollback(sid)
|
|
mod2 = Mod.objects.get(pk=pk)
|
|
transaction.commit()
|
|
self.assertEqual(mod2.fld, 1)
|
|
|
|
work()
|