1
0
mirror of https://github.com/django/django.git synced 2024-12-30 13:05:45 +00:00
django/tests/get_or_create/tests.py
Michael Sanders 271542dad1 Fixed #29499 -- Fixed race condition in QuerySet.update_or_create().
A race condition happened when the object didn't already exist and
another process/thread created the object before update_or_create()
did and then attempted to update the object, also before update_or_create()
saved the object. The update by the other process/thread could be lost.
2018-08-02 17:07:48 -04:00

630 lines
25 KiB
Python

import time
import traceback
from datetime import date, datetime, timedelta
from threading import Thread
from django.core.exceptions import FieldError
from django.db import DatabaseError, IntegrityError, connection
from django.test import TestCase, TransactionTestCase, skipUnlessDBFeature
from django.utils.functional import lazy
from .models import (
Author, Book, DefaultPerson, ManualPrimaryKeyTest, Person, Profile,
Publisher, Tag, Thing,
)
class GetOrCreateTests(TestCase):
def setUp(self):
self.lennon = Person.objects.create(
first_name='John', last_name='Lennon', birthday=date(1940, 10, 9)
)
def test_get_or_create_method_with_get(self):
created = Person.objects.get_or_create(
first_name="John", last_name="Lennon", defaults={
"birthday": date(1940, 10, 9)
}
)[1]
self.assertFalse(created)
self.assertEqual(Person.objects.count(), 1)
def test_get_or_create_method_with_create(self):
created = Person.objects.get_or_create(
first_name='George', last_name='Harrison', defaults={
'birthday': date(1943, 2, 25)
}
)[1]
self.assertTrue(created)
self.assertEqual(Person.objects.count(), 2)
def test_get_or_create_redundant_instance(self):
"""
If we execute the exact same statement twice, the second time,
it won't create a Person.
"""
Person.objects.get_or_create(
first_name='George', last_name='Harrison', defaults={
'birthday': date(1943, 2, 25)
}
)
created = Person.objects.get_or_create(
first_name='George', last_name='Harrison', defaults={
'birthday': date(1943, 2, 25)
}
)[1]
self.assertFalse(created)
self.assertEqual(Person.objects.count(), 2)
def test_get_or_create_invalid_params(self):
"""
If you don't specify a value or default value for all required
fields, you will get an error.
"""
with self.assertRaises(IntegrityError):
Person.objects.get_or_create(first_name="Tom", last_name="Smith")
def test_get_or_create_with_pk_property(self):
"""
Using the pk property of a model is allowed.
"""
Thing.objects.get_or_create(pk=1)
def test_get_or_create_with_model_property_defaults(self):
"""Using a property with a setter implemented is allowed."""
t, _ = Thing.objects.get_or_create(defaults={'capitalized_name_property': 'annie'}, pk=1)
self.assertEqual(t.name, 'Annie')
def test_get_or_create_on_related_manager(self):
p = Publisher.objects.create(name="Acme Publishing")
# Create a book through the publisher.
book, created = p.books.get_or_create(name="The Book of Ed & Fred")
self.assertTrue(created)
# The publisher should have one book.
self.assertEqual(p.books.count(), 1)
# Try get_or_create again, this time nothing should be created.
book, created = p.books.get_or_create(name="The Book of Ed & Fred")
self.assertFalse(created)
# And the publisher should still have one book.
self.assertEqual(p.books.count(), 1)
# Add an author to the book.
ed, created = book.authors.get_or_create(name="Ed")
self.assertTrue(created)
# The book should have one author.
self.assertEqual(book.authors.count(), 1)
# Try get_or_create again, this time nothing should be created.
ed, created = book.authors.get_or_create(name="Ed")
self.assertFalse(created)
# And the book should still have one author.
self.assertEqual(book.authors.count(), 1)
# Add a second author to the book.
fred, created = book.authors.get_or_create(name="Fred")
self.assertTrue(created)
# The book should have two authors now.
self.assertEqual(book.authors.count(), 2)
# Create an Author not tied to any books.
Author.objects.create(name="Ted")
# There should be three Authors in total. The book object should have two.
self.assertEqual(Author.objects.count(), 3)
self.assertEqual(book.authors.count(), 2)
# Try creating a book through an author.
_, created = ed.books.get_or_create(name="Ed's Recipes", publisher=p)
self.assertTrue(created)
# Now Ed has two Books, Fred just one.
self.assertEqual(ed.books.count(), 2)
self.assertEqual(fred.books.count(), 1)
# Use the publisher's primary key value instead of a model instance.
_, created = ed.books.get_or_create(name='The Great Book of Ed', publisher_id=p.id)
self.assertTrue(created)
# Try get_or_create again, this time nothing should be created.
_, created = ed.books.get_or_create(name='The Great Book of Ed', publisher_id=p.id)
self.assertFalse(created)
# The publisher should have three books.
self.assertEqual(p.books.count(), 3)
def test_defaults_exact(self):
"""
If you have a field named defaults and want to use it as an exact
lookup, you need to use 'defaults__exact'.
"""
obj, created = Person.objects.get_or_create(
first_name='George', last_name='Harrison', defaults__exact='testing', defaults={
'birthday': date(1943, 2, 25),
'defaults': 'testing',
}
)
self.assertTrue(created)
self.assertEqual(obj.defaults, 'testing')
obj2, created = Person.objects.get_or_create(
first_name='George', last_name='Harrison', defaults__exact='testing', defaults={
'birthday': date(1943, 2, 25),
'defaults': 'testing',
}
)
self.assertFalse(created)
self.assertEqual(obj, obj2)
def test_callable_defaults(self):
"""
Callables in `defaults` are evaluated if the instance is created.
"""
obj, created = Person.objects.get_or_create(
first_name="George",
defaults={"last_name": "Harrison", "birthday": lambda: date(1943, 2, 25)},
)
self.assertTrue(created)
self.assertEqual(date(1943, 2, 25), obj.birthday)
def test_callable_defaults_not_called(self):
def raise_exception():
raise AssertionError
obj, created = Person.objects.get_or_create(
first_name="John", last_name="Lennon",
defaults={"birthday": lambda: raise_exception()},
)
def test_defaults_not_evaluated_unless_needed(self):
"""`defaults` aren't evaluated if the instance isn't created."""
def raise_exception():
raise AssertionError
obj, created = Person.objects.get_or_create(
first_name='John', defaults=lazy(raise_exception, object)(),
)
self.assertFalse(created)
class GetOrCreateTestsWithManualPKs(TestCase):
def setUp(self):
self.first_pk = ManualPrimaryKeyTest.objects.create(id=1, data="Original")
def test_create_with_duplicate_primary_key(self):
"""
If you specify an existing primary key, but different other fields,
then you will get an error and data will not be updated.
"""
with self.assertRaises(IntegrityError):
ManualPrimaryKeyTest.objects.get_or_create(id=1, data="Different")
self.assertEqual(ManualPrimaryKeyTest.objects.get(id=1).data, "Original")
def test_get_or_create_raises_IntegrityError_plus_traceback(self):
"""
get_or_create should raise IntegrityErrors with the full traceback.
This is tested by checking that a known method call is in the traceback.
We cannot use assertRaises here because we need to inspect
the actual traceback. Refs #16340.
"""
try:
ManualPrimaryKeyTest.objects.get_or_create(id=1, data="Different")
except IntegrityError:
formatted_traceback = traceback.format_exc()
self.assertIn('obj.save', formatted_traceback)
def test_savepoint_rollback(self):
"""
The database connection is still usable after a DatabaseError in
get_or_create() (#20463).
"""
Tag.objects.create(text='foo')
with self.assertRaises(DatabaseError):
# pk 123456789 doesn't exist, so the tag object will be created.
# Saving triggers a unique constraint violation on 'text'.
Tag.objects.get_or_create(pk=123456789, defaults={'text': 'foo'})
# Tag objects can be created after the error.
Tag.objects.create(text='bar')
def test_get_or_create_empty(self):
"""
If all the attributes on a model have defaults, get_or_create() doesn't
require any arguments.
"""
DefaultPerson.objects.get_or_create()
class GetOrCreateTransactionTests(TransactionTestCase):
available_apps = ['get_or_create']
def test_get_or_create_integrityerror(self):
"""
Regression test for #15117. Requires a TransactionTestCase on
databases that delay integrity checks until the end of transactions,
otherwise the exception is never raised.
"""
try:
Profile.objects.get_or_create(person=Person(id=1))
except IntegrityError:
pass
else:
self.skipTest("This backend does not support integrity checks.")
class GetOrCreateThroughManyToMany(TestCase):
def test_get_get_or_create(self):
tag = Tag.objects.create(text='foo')
a_thing = Thing.objects.create(name='a')
a_thing.tags.add(tag)
obj, created = a_thing.tags.get_or_create(text='foo')
self.assertFalse(created)
self.assertEqual(obj.pk, tag.pk)
def test_create_get_or_create(self):
a_thing = Thing.objects.create(name='a')
obj, created = a_thing.tags.get_or_create(text='foo')
self.assertTrue(created)
self.assertEqual(obj.text, 'foo')
self.assertIn(obj, a_thing.tags.all())
def test_something(self):
Tag.objects.create(text='foo')
a_thing = Thing.objects.create(name='a')
with self.assertRaises(IntegrityError):
a_thing.tags.get_or_create(text='foo')
class UpdateOrCreateTests(TestCase):
def test_update(self):
Person.objects.create(
first_name='John', last_name='Lennon', birthday=date(1940, 10, 9)
)
p, created = Person.objects.update_or_create(
first_name='John', last_name='Lennon', defaults={
'birthday': date(1940, 10, 10)
}
)
self.assertFalse(created)
self.assertEqual(p.first_name, 'John')
self.assertEqual(p.last_name, 'Lennon')
self.assertEqual(p.birthday, date(1940, 10, 10))
def test_create(self):
p, created = Person.objects.update_or_create(
first_name='John', last_name='Lennon', defaults={
'birthday': date(1940, 10, 10)
}
)
self.assertTrue(created)
self.assertEqual(p.first_name, 'John')
self.assertEqual(p.last_name, 'Lennon')
self.assertEqual(p.birthday, date(1940, 10, 10))
def test_create_twice(self):
params = {
'first_name': 'John',
'last_name': 'Lennon',
'birthday': date(1940, 10, 10),
}
Person.objects.update_or_create(**params)
# If we execute the exact same statement, it won't create a Person.
p, created = Person.objects.update_or_create(**params)
self.assertFalse(created)
def test_integrity(self):
"""
If you don't specify a value or default value for all required
fields, you will get an error.
"""
with self.assertRaises(IntegrityError):
Person.objects.update_or_create(first_name="Tom", last_name="Smith")
def test_manual_primary_key_test(self):
"""
If you specify an existing primary key, but different other fields,
then you will get an error and data will not be updated.
"""
ManualPrimaryKeyTest.objects.create(id=1, data="Original")
with self.assertRaises(IntegrityError):
ManualPrimaryKeyTest.objects.update_or_create(id=1, data="Different")
self.assertEqual(ManualPrimaryKeyTest.objects.get(id=1).data, "Original")
def test_with_pk_property(self):
"""
Using the pk property of a model is allowed.
"""
Thing.objects.update_or_create(pk=1)
def test_update_or_create_with_model_property_defaults(self):
"""Using a property with a setter implemented is allowed."""
t, _ = Thing.objects.get_or_create(defaults={'capitalized_name_property': 'annie'}, pk=1)
self.assertEqual(t.name, 'Annie')
def test_error_contains_full_traceback(self):
"""
update_or_create should raise IntegrityErrors with the full traceback.
This is tested by checking that a known method call is in the traceback.
We cannot use assertRaises/assertRaises here because we need to inspect
the actual traceback. Refs #16340.
"""
try:
ManualPrimaryKeyTest.objects.update_or_create(id=1, data="Different")
except IntegrityError:
formatted_traceback = traceback.format_exc()
self.assertIn('obj.save', formatted_traceback)
def test_create_with_related_manager(self):
"""
Should be able to use update_or_create from the related manager to
create a book. Refs #23611.
"""
p = Publisher.objects.create(name="Acme Publishing")
book, created = p.books.update_or_create(name="The Book of Ed & Fred")
self.assertTrue(created)
self.assertEqual(p.books.count(), 1)
def test_update_with_related_manager(self):
"""
Should be able to use update_or_create from the related manager to
update a book. Refs #23611.
"""
p = Publisher.objects.create(name="Acme Publishing")
book = Book.objects.create(name="The Book of Ed & Fred", publisher=p)
self.assertEqual(p.books.count(), 1)
name = "The Book of Django"
book, created = p.books.update_or_create(defaults={'name': name}, id=book.id)
self.assertFalse(created)
self.assertEqual(book.name, name)
self.assertEqual(p.books.count(), 1)
def test_create_with_many(self):
"""
Should be able to use update_or_create from the m2m related manager to
create a book. Refs #23611.
"""
p = Publisher.objects.create(name="Acme Publishing")
author = Author.objects.create(name="Ted")
book, created = author.books.update_or_create(name="The Book of Ed & Fred", publisher=p)
self.assertTrue(created)
self.assertEqual(author.books.count(), 1)
def test_update_with_many(self):
"""
Should be able to use update_or_create from the m2m related manager to
update a book. Refs #23611.
"""
p = Publisher.objects.create(name="Acme Publishing")
author = Author.objects.create(name="Ted")
book = Book.objects.create(name="The Book of Ed & Fred", publisher=p)
book.authors.add(author)
self.assertEqual(author.books.count(), 1)
name = "The Book of Django"
book, created = author.books.update_or_create(defaults={'name': name}, id=book.id)
self.assertFalse(created)
self.assertEqual(book.name, name)
self.assertEqual(author.books.count(), 1)
def test_defaults_exact(self):
"""
If you have a field named defaults and want to use it as an exact
lookup, you need to use 'defaults__exact'.
"""
obj, created = Person.objects.update_or_create(
first_name='George', last_name='Harrison', defaults__exact='testing', defaults={
'birthday': date(1943, 2, 25),
'defaults': 'testing',
}
)
self.assertTrue(created)
self.assertEqual(obj.defaults, 'testing')
obj, created = Person.objects.update_or_create(
first_name='George', last_name='Harrison', defaults__exact='testing', defaults={
'birthday': date(1943, 2, 25),
'defaults': 'another testing',
}
)
self.assertFalse(created)
self.assertEqual(obj.defaults, 'another testing')
def test_create_callable_default(self):
obj, created = Person.objects.update_or_create(
first_name='George', last_name='Harrison',
defaults={'birthday': lambda: date(1943, 2, 25)},
)
self.assertIs(created, True)
self.assertEqual(obj.birthday, date(1943, 2, 25))
def test_update_callable_default(self):
Person.objects.update_or_create(
first_name='George', last_name='Harrison', birthday=date(1942, 2, 25),
)
obj, created = Person.objects.update_or_create(
first_name='George',
defaults={'last_name': lambda: 'NotHarrison'},
)
self.assertIs(created, False)
self.assertEqual(obj.last_name, 'NotHarrison')
def test_defaults_not_evaluated_unless_needed(self):
"""`defaults` aren't evaluated if the instance isn't created."""
Person.objects.create(
first_name='John', last_name='Lennon', birthday=date(1940, 10, 9)
)
def raise_exception():
raise AssertionError
obj, created = Person.objects.get_or_create(
first_name='John', defaults=lazy(raise_exception, object)(),
)
self.assertFalse(created)
class UpdateOrCreateTestsWithManualPKs(TestCase):
def test_create_with_duplicate_primary_key(self):
"""
If an existing primary key is specified with different values for other
fields, then IntegrityError is raised and data isn't updated.
"""
ManualPrimaryKeyTest.objects.create(id=1, data='Original')
with self.assertRaises(IntegrityError):
ManualPrimaryKeyTest.objects.update_or_create(id=1, data='Different')
self.assertEqual(ManualPrimaryKeyTest.objects.get(id=1).data, 'Original')
class UpdateOrCreateTransactionTests(TransactionTestCase):
available_apps = ['get_or_create']
@skipUnlessDBFeature('has_select_for_update')
@skipUnlessDBFeature('supports_transactions')
def test_updates_in_transaction(self):
"""
Objects are selected and updated in a transaction to avoid race
conditions. This test forces update_or_create() to hold the lock
in another thread for a relatively long time so that it can update
while it holds the lock. The updated field isn't a field in 'defaults',
so update_or_create() shouldn't have an effect on it.
"""
lock_status = {'has_grabbed_lock': False}
def birthday_sleep():
lock_status['has_grabbed_lock'] = True
time.sleep(0.5)
return date(1940, 10, 10)
def update_birthday_slowly():
Person.objects.update_or_create(
first_name='John', defaults={'birthday': birthday_sleep}
)
# Avoid leaking connection for Oracle
connection.close()
def lock_wait():
# timeout after ~0.5 seconds
for i in range(20):
time.sleep(0.025)
if lock_status['has_grabbed_lock']:
return True
return False
Person.objects.create(first_name='John', last_name='Lennon', birthday=date(1940, 10, 9))
# update_or_create in a separate thread
t = Thread(target=update_birthday_slowly)
before_start = datetime.now()
t.start()
if not lock_wait():
self.skipTest('Database took too long to lock the row')
# Update during lock
Person.objects.filter(first_name='John').update(last_name='NotLennon')
after_update = datetime.now()
# Wait for thread to finish
t.join()
# The update remains and it blocked.
updated_person = Person.objects.get(first_name='John')
self.assertGreater(after_update - before_start, timedelta(seconds=0.5))
self.assertEqual(updated_person.last_name, 'NotLennon')
@skipUnlessDBFeature('has_select_for_update')
@skipUnlessDBFeature('supports_transactions')
def test_creation_in_transaction(self):
"""
Objects are selected and updated in a transaction to avoid race
conditions. This test checks the behavior of update_or_create() when
the object doesn't already exist, but another thread creates the
object before update_or_create() does and then attempts to update the
object, also before update_or_create(). It forces update_or_create() to
hold the lock in another thread for a relatively long time so that it
can update while it holds the lock. The updated field isn't a field in
'defaults', so update_or_create() shouldn't have an effect on it.
"""
lock_status = {'lock_count': 0}
def birthday_sleep():
lock_status['lock_count'] += 1
time.sleep(0.5)
return date(1940, 10, 10)
def update_birthday_slowly():
try:
Person.objects.update_or_create(first_name='John', defaults={'birthday': birthday_sleep})
finally:
# Avoid leaking connection for Oracle
connection.close()
def lock_wait(expected_lock_count):
# timeout after ~0.5 seconds
for i in range(20):
time.sleep(0.025)
if lock_status['lock_count'] == expected_lock_count:
return True
self.skipTest('Database took too long to lock the row')
# update_or_create in a separate thread.
t = Thread(target=update_birthday_slowly)
before_start = datetime.now()
t.start()
lock_wait(1)
# Create object *after* initial attempt by update_or_create to get obj
# but before creation attempt.
Person.objects.create(first_name='John', last_name='Lennon', birthday=date(1940, 10, 9))
lock_wait(2)
# At this point, the thread is pausing for 0.5 seconds, so now attempt
# to modify object before update_or_create() calls save(). This should
# be blocked until after the save().
Person.objects.filter(first_name='John').update(last_name='NotLennon')
after_update = datetime.now()
# Wait for thread to finish
t.join()
# Check call to update_or_create() succeeded and the subsequent
# (blocked) call to update().
updated_person = Person.objects.get(first_name='John')
self.assertEqual(updated_person.birthday, date(1940, 10, 10)) # set by update_or_create()
self.assertEqual(updated_person.last_name, 'NotLennon') # set by update()
self.assertGreater(after_update - before_start, timedelta(seconds=1))
class InvalidCreateArgumentsTests(TransactionTestCase):
available_apps = ['get_or_create']
msg = "Invalid field name(s) for model Thing: 'nonexistent'."
bad_field_msg = "Cannot resolve keyword 'nonexistent' into field. Choices are: id, name, tags"
def test_get_or_create_with_invalid_defaults(self):
with self.assertRaisesMessage(FieldError, self.msg):
Thing.objects.get_or_create(name='a', defaults={'nonexistent': 'b'})
def test_get_or_create_with_invalid_kwargs(self):
with self.assertRaisesMessage(FieldError, self.bad_field_msg):
Thing.objects.get_or_create(name='a', nonexistent='b')
def test_update_or_create_with_invalid_defaults(self):
with self.assertRaisesMessage(FieldError, self.msg):
Thing.objects.update_or_create(name='a', defaults={'nonexistent': 'b'})
def test_update_or_create_with_invalid_kwargs(self):
with self.assertRaisesMessage(FieldError, self.bad_field_msg):
Thing.objects.update_or_create(name='a', nonexistent='b')
def test_multiple_invalid_fields(self):
with self.assertRaisesMessage(FieldError, self.bad_field_msg):
Thing.objects.update_or_create(name='a', nonexistent='b', defaults={'invalid': 'c'})
def test_property_attribute_without_setter_defaults(self):
with self.assertRaisesMessage(FieldError, "Invalid field name(s) for model Thing: 'name_in_all_caps'"):
Thing.objects.update_or_create(name='a', defaults={'name_in_all_caps': 'FRANK'})
def test_property_attribute_without_setter_kwargs(self):
msg = "Cannot resolve keyword 'name_in_all_caps' into field. Choices are: id, name, tags"
with self.assertRaisesMessage(FieldError, msg):
Thing.objects.update_or_create(name_in_all_caps='FRANK', defaults={'name': 'Frank'})