1
0
mirror of https://github.com/django/django.git synced 2025-01-05 16:06:07 +00:00
django/tests/get_or_create/tests.py

487 lines
18 KiB
Python

from __future__ import unicode_literals
import time
import traceback
from datetime import date, datetime, timedelta
from threading import Thread
from django.db import DatabaseError, IntegrityError, connection
from django.test import (
TestCase, TransactionTestCase, ignore_warnings, skipUnlessDBFeature,
)
from django.utils.encoding import DjangoUnicodeDecodeError
from .models import (
Author, Book, DefaultPerson, ManualPrimaryKeyTest, Person, Profile,
Publisher, Tag, Thing,
)
class GetOrCreateTests(TestCase):
def setUp(self):
self.lennon = Person.objects.create(
first_name='John', last_name='Lennon', birthday=date(1940, 10, 9)
)
def test_get_or_create_method_with_get(self):
created = Person.objects.get_or_create(
first_name="John", last_name="Lennon", defaults={
"birthday": date(1940, 10, 9)
}
)[1]
self.assertFalse(created)
self.assertEqual(Person.objects.count(), 1)
def test_get_or_create_method_with_create(self):
created = Person.objects.get_or_create(
first_name='George', last_name='Harrison', defaults={
'birthday': date(1943, 2, 25)
}
)[1]
self.assertTrue(created)
self.assertEqual(Person.objects.count(), 2)
def test_get_or_create_redundant_instance(self):
"""
If we execute the exact same statement twice, the second time,
it won't create a Person.
"""
Person.objects.get_or_create(
first_name='George', last_name='Harrison', defaults={
'birthday': date(1943, 2, 25)
}
)
created = Person.objects.get_or_create(
first_name='George', last_name='Harrison', defaults={
'birthday': date(1943, 2, 25)
}
)[1]
self.assertFalse(created)
self.assertEqual(Person.objects.count(), 2)
def test_get_or_create_invalid_params(self):
"""
If you don't specify a value or default value for all required
fields, you will get an error.
"""
with self.assertRaises(IntegrityError):
Person.objects.get_or_create(first_name="Tom", last_name="Smith")
def test_get_or_create_on_related_manager(self):
p = Publisher.objects.create(name="Acme Publishing")
# Create a book through the publisher.
book, created = p.books.get_or_create(name="The Book of Ed & Fred")
self.assertTrue(created)
# The publisher should have one book.
self.assertEqual(p.books.count(), 1)
# Try get_or_create again, this time nothing should be created.
book, created = p.books.get_or_create(name="The Book of Ed & Fred")
self.assertFalse(created)
# And the publisher should still have one book.
self.assertEqual(p.books.count(), 1)
# Add an author to the book.
ed, created = book.authors.get_or_create(name="Ed")
self.assertTrue(created)
# The book should have one author.
self.assertEqual(book.authors.count(), 1)
# Try get_or_create again, this time nothing should be created.
ed, created = book.authors.get_or_create(name="Ed")
self.assertFalse(created)
# And the book should still have one author.
self.assertEqual(book.authors.count(), 1)
# Add a second author to the book.
fred, created = book.authors.get_or_create(name="Fred")
self.assertTrue(created)
# The book should have two authors now.
self.assertEqual(book.authors.count(), 2)
# Create an Author not tied to any books.
Author.objects.create(name="Ted")
# There should be three Authors in total. The book object should have two.
self.assertEqual(Author.objects.count(), 3)
self.assertEqual(book.authors.count(), 2)
# Try creating a book through an author.
_, created = ed.books.get_or_create(name="Ed's Recipes", publisher=p)
self.assertTrue(created)
# Now Ed has two Books, Fred just one.
self.assertEqual(ed.books.count(), 2)
self.assertEqual(fred.books.count(), 1)
# Use the publisher's primary key value instead of a model instance.
_, created = ed.books.get_or_create(name='The Great Book of Ed', publisher_id=p.id)
self.assertTrue(created)
# Try get_or_create again, this time nothing should be created.
_, created = ed.books.get_or_create(name='The Great Book of Ed', publisher_id=p.id)
self.assertFalse(created)
# The publisher should have three books.
self.assertEqual(p.books.count(), 3)
def test_defaults_exact(self):
"""
If you have a field named defaults and want to use it as an exact
lookup, you need to use 'defaults__exact'.
"""
obj, created = Person.objects.get_or_create(
first_name='George', last_name='Harrison', defaults__exact='testing', defaults={
'birthday': date(1943, 2, 25),
'defaults': 'testing',
}
)
self.assertTrue(created)
self.assertEqual(obj.defaults, 'testing')
obj2, created = Person.objects.get_or_create(
first_name='George', last_name='Harrison', defaults__exact='testing', defaults={
'birthday': date(1943, 2, 25),
'defaults': 'testing',
}
)
self.assertFalse(created)
self.assertEqual(obj, obj2)
def test_callable_defaults(self):
"""
Callables in `defaults` are evaluated if the instance is created.
"""
obj, created = Person.objects.get_or_create(
first_name="George",
defaults={"last_name": "Harrison", "birthday": lambda: date(1943, 2, 25)},
)
self.assertTrue(created)
self.assertEqual(date(1943, 2, 25), obj.birthday)
def test_callable_defaults_not_called(self):
def raise_exception():
raise AssertionError
obj, created = Person.objects.get_or_create(
first_name="John", last_name="Lennon",
defaults={"birthday": lambda: raise_exception()},
)
class GetOrCreateTestsWithManualPKs(TestCase):
def setUp(self):
self.first_pk = ManualPrimaryKeyTest.objects.create(id=1, data="Original")
def test_create_with_duplicate_primary_key(self):
"""
If you specify an existing primary key, but different other fields,
then you will get an error and data will not be updated.
"""
with self.assertRaises(IntegrityError):
ManualPrimaryKeyTest.objects.get_or_create(id=1, data="Different")
self.assertEqual(ManualPrimaryKeyTest.objects.get(id=1).data, "Original")
def test_get_or_create_raises_IntegrityError_plus_traceback(self):
"""
get_or_create should raise IntegrityErrors with the full traceback.
This is tested by checking that a known method call is in the traceback.
We cannot use assertRaises here because we need to inspect
the actual traceback. Refs #16340.
"""
try:
ManualPrimaryKeyTest.objects.get_or_create(id=1, data="Different")
except IntegrityError:
formatted_traceback = traceback.format_exc()
self.assertIn(str('obj.save'), formatted_traceback)
# MySQL emits a warning when broken data is saved
@ignore_warnings(module='django.db.backends.mysql.base')
def test_savepoint_rollback(self):
"""
Regression test for #20463: the database connection should still be
usable after a DataError or ProgrammingError in .get_or_create().
"""
try:
Person.objects.get_or_create(
birthday=date(1970, 1, 1),
defaults={'first_name': b"\xff", 'last_name': b"\xff"})
except (DatabaseError, DjangoUnicodeDecodeError):
Person.objects.create(
first_name="Bob", last_name="Ross", birthday=date(1950, 1, 1))
else:
self.skipTest("This backend accepts broken utf-8.")
def test_get_or_create_empty(self):
"""
If all the attributes on a model have defaults, get_or_create() doesn't
require any arguments.
"""
DefaultPerson.objects.get_or_create()
class GetOrCreateTransactionTests(TransactionTestCase):
available_apps = ['get_or_create']
def test_get_or_create_integrityerror(self):
"""
Regression test for #15117. Requires a TransactionTestCase on
databases that delay integrity checks until the end of transactions,
otherwise the exception is never raised.
"""
try:
Profile.objects.get_or_create(person=Person(id=1))
except IntegrityError:
pass
else:
self.skipTest("This backend does not support integrity checks.")
class GetOrCreateThroughManyToMany(TestCase):
def test_get_get_or_create(self):
tag = Tag.objects.create(text='foo')
a_thing = Thing.objects.create(name='a')
a_thing.tags.add(tag)
obj, created = a_thing.tags.get_or_create(text='foo')
self.assertFalse(created)
self.assertEqual(obj.pk, tag.pk)
def test_create_get_or_create(self):
a_thing = Thing.objects.create(name='a')
obj, created = a_thing.tags.get_or_create(text='foo')
self.assertTrue(created)
self.assertEqual(obj.text, 'foo')
self.assertIn(obj, a_thing.tags.all())
def test_something(self):
Tag.objects.create(text='foo')
a_thing = Thing.objects.create(name='a')
with self.assertRaises(IntegrityError):
a_thing.tags.get_or_create(text='foo')
class UpdateOrCreateTests(TestCase):
def test_update(self):
Person.objects.create(
first_name='John', last_name='Lennon', birthday=date(1940, 10, 9)
)
p, created = Person.objects.update_or_create(
first_name='John', last_name='Lennon', defaults={
'birthday': date(1940, 10, 10)
}
)
self.assertFalse(created)
self.assertEqual(p.first_name, 'John')
self.assertEqual(p.last_name, 'Lennon')
self.assertEqual(p.birthday, date(1940, 10, 10))
def test_create(self):
p, created = Person.objects.update_or_create(
first_name='John', last_name='Lennon', defaults={
'birthday': date(1940, 10, 10)
}
)
self.assertTrue(created)
self.assertEqual(p.first_name, 'John')
self.assertEqual(p.last_name, 'Lennon')
self.assertEqual(p.birthday, date(1940, 10, 10))
def test_create_twice(self):
params = {
'first_name': 'John',
'last_name': 'Lennon',
'birthday': date(1940, 10, 10),
}
Person.objects.update_or_create(**params)
# If we execute the exact same statement, it won't create a Person.
p, created = Person.objects.update_or_create(**params)
self.assertFalse(created)
def test_integrity(self):
"""
If you don't specify a value or default value for all required
fields, you will get an error.
"""
with self.assertRaises(IntegrityError):
Person.objects.update_or_create(first_name="Tom", last_name="Smith")
def test_manual_primary_key_test(self):
"""
If you specify an existing primary key, but different other fields,
then you will get an error and data will not be updated.
"""
ManualPrimaryKeyTest.objects.create(id=1, data="Original")
with self.assertRaises(IntegrityError):
ManualPrimaryKeyTest.objects.update_or_create(id=1, data="Different")
self.assertEqual(ManualPrimaryKeyTest.objects.get(id=1).data, "Original")
def test_error_contains_full_traceback(self):
"""
update_or_create should raise IntegrityErrors with the full traceback.
This is tested by checking that a known method call is in the traceback.
We cannot use assertRaises/assertRaises here because we need to inspect
the actual traceback. Refs #16340.
"""
try:
ManualPrimaryKeyTest.objects.update_or_create(id=1, data="Different")
except IntegrityError:
formatted_traceback = traceback.format_exc()
self.assertIn('obj.save', formatted_traceback)
def test_create_with_related_manager(self):
"""
Should be able to use update_or_create from the related manager to
create a book. Refs #23611.
"""
p = Publisher.objects.create(name="Acme Publishing")
book, created = p.books.update_or_create(name="The Book of Ed & Fred")
self.assertTrue(created)
self.assertEqual(p.books.count(), 1)
def test_update_with_related_manager(self):
"""
Should be able to use update_or_create from the related manager to
update a book. Refs #23611.
"""
p = Publisher.objects.create(name="Acme Publishing")
book = Book.objects.create(name="The Book of Ed & Fred", publisher=p)
self.assertEqual(p.books.count(), 1)
name = "The Book of Django"
book, created = p.books.update_or_create(defaults={'name': name}, id=book.id)
self.assertFalse(created)
self.assertEqual(book.name, name)
self.assertEqual(p.books.count(), 1)
def test_create_with_many(self):
"""
Should be able to use update_or_create from the m2m related manager to
create a book. Refs #23611.
"""
p = Publisher.objects.create(name="Acme Publishing")
author = Author.objects.create(name="Ted")
book, created = author.books.update_or_create(name="The Book of Ed & Fred", publisher=p)
self.assertTrue(created)
self.assertEqual(author.books.count(), 1)
def test_update_with_many(self):
"""
Should be able to use update_or_create from the m2m related manager to
update a book. Refs #23611.
"""
p = Publisher.objects.create(name="Acme Publishing")
author = Author.objects.create(name="Ted")
book = Book.objects.create(name="The Book of Ed & Fred", publisher=p)
book.authors.add(author)
self.assertEqual(author.books.count(), 1)
name = "The Book of Django"
book, created = author.books.update_or_create(defaults={'name': name}, id=book.id)
self.assertFalse(created)
self.assertEqual(book.name, name)
self.assertEqual(author.books.count(), 1)
def test_defaults_exact(self):
"""
If you have a field named defaults and want to use it as an exact
lookup, you need to use 'defaults__exact'.
"""
obj, created = Person.objects.update_or_create(
first_name='George', last_name='Harrison', defaults__exact='testing', defaults={
'birthday': date(1943, 2, 25),
'defaults': 'testing',
}
)
self.assertTrue(created)
self.assertEqual(obj.defaults, 'testing')
obj, created = Person.objects.update_or_create(
first_name='George', last_name='Harrison', defaults__exact='testing', defaults={
'birthday': date(1943, 2, 25),
'defaults': 'another testing',
}
)
self.assertFalse(created)
self.assertEqual(obj.defaults, 'another testing')
def test_create_callable_default(self):
obj, created = Person.objects.update_or_create(
first_name='George', last_name='Harrison',
defaults={'birthday': lambda: date(1943, 2, 25)},
)
self.assertIs(created, True)
self.assertEqual(obj.birthday, date(1943, 2, 25))
def test_update_callable_default(self):
Person.objects.update_or_create(
first_name='George', last_name='Harrison', birthday=date(1942, 2, 25),
)
obj, created = Person.objects.update_or_create(
first_name='George',
defaults={'last_name': lambda: 'NotHarrison'},
)
self.assertIs(created, False)
self.assertEqual(obj.last_name, 'NotHarrison')
class UpdateOrCreateTransactionTests(TransactionTestCase):
available_apps = ['get_or_create']
@skipUnlessDBFeature('has_select_for_update')
@skipUnlessDBFeature('supports_transactions')
def test_updates_in_transaction(self):
"""
Objects are selected and updated in a transaction to avoid race
conditions. This test forces update_or_create() to hold the lock
in another thread for a relatively long time so that it can update
while it holds the lock. The updated field isn't a field in 'defaults',
so update_or_create() shouldn't have an effect on it.
"""
lock_status = {'has_grabbed_lock': False}
def birthday_sleep():
lock_status['has_grabbed_lock'] = True
time.sleep(0.5)
return date(1940, 10, 10)
def update_birthday_slowly():
Person.objects.update_or_create(
first_name='John', defaults={'birthday': birthday_sleep}
)
# Avoid leaking connection for Oracle
connection.close()
def lock_wait():
# timeout after ~0.5 seconds
for i in range(20):
time.sleep(0.025)
if lock_status['has_grabbed_lock']:
return True
return False
Person.objects.create(first_name='John', last_name='Lennon', birthday=date(1940, 10, 9))
# update_or_create in a separate thread
t = Thread(target=update_birthday_slowly)
before_start = datetime.now()
t.start()
if not lock_wait():
self.skipTest('Database took too long to lock the row')
# Update during lock
Person.objects.filter(first_name='John').update(last_name='NotLennon')
after_update = datetime.now()
# Wait for thread to finish
t.join()
# The update remains and it blocked.
updated_person = Person.objects.get(first_name='John')
self.assertGreater(after_update - before_start, timedelta(seconds=0.5))
self.assertEqual(updated_person.last_name, 'NotLennon')