1
0
mirror of https://github.com/django/django.git synced 2025-10-26 15:16:09 +00:00
Files
django/tests/bulk_create/tests.py
Natalia d6a8e5f5e1 Fixed failing bulk_create test raising IntegrityError when run in reverse.
When running the `bulk_create` tests with Postgres settings and
`--reverse`, the following IntegrityError was raised in
bulk_create.tests.BulkCreateTransactionTests.test_objs_with_and_without_pk:

django.db.utils.IntegrityError: duplicate key value violates unique
constraint "bulk_create_country_pkey"
DETAIL:  Key (id)=(1) already exists.

This branch fixes this by ensuring the ID is unique since DB sequences
are not resetted between tests.
2025-08-21 12:25:57 -03:00

918 lines
35 KiB
Python

from datetime import datetime
from math import ceil
from operator import attrgetter
from django.core.exceptions import FieldDoesNotExist
from django.db import (
IntegrityError,
NotSupportedError,
OperationalError,
ProgrammingError,
connection,
)
from django.db.models import FileField, Value
from django.db.models.functions import Lower, Now
from django.test import (
TestCase,
TransactionTestCase,
skipIfDBFeature,
skipUnlessDBFeature,
)
from django.test.utils import CaptureQueriesContext
from django.utils import timezone
from .models import (
BigAutoFieldModel,
Country,
DbDefaultModel,
DbDefaultPrimaryKey,
FieldsWithDbColumns,
NoFields,
NullableFields,
Pizzeria,
ProxyCountry,
ProxyMultiCountry,
ProxyMultiProxyCountry,
ProxyProxyCountry,
RelatedModel,
Restaurant,
SmallAutoFieldModel,
State,
TwoFields,
UpsertConflict,
)
class BulkCreateTests(TestCase):
def setUp(self):
self.data = [
Country(name="United States of America", iso_two_letter="US"),
Country(name="The Netherlands", iso_two_letter="NL"),
Country(name="Germany", iso_two_letter="DE"),
Country(name="Czech Republic", iso_two_letter="CZ"),
]
def test_simple(self):
created = Country.objects.bulk_create(self.data)
self.assertEqual(created, self.data)
self.assertQuerySetEqual(
Country.objects.order_by("-name"),
[
"United States of America",
"The Netherlands",
"Germany",
"Czech Republic",
],
attrgetter("name"),
)
created = Country.objects.bulk_create([])
self.assertEqual(created, [])
self.assertEqual(Country.objects.count(), 4)
@skipUnlessDBFeature("has_bulk_insert")
def test_efficiency(self):
with self.assertNumQueries(1):
Country.objects.bulk_create(self.data)
@skipUnlessDBFeature("has_bulk_insert")
def test_long_non_ascii_text(self):
"""
Inserting non-ASCII values with a length in the range 2001 to 4000
characters, i.e. 4002 to 8000 bytes, must be set as a CLOB on Oracle
(#22144).
"""
Country.objects.bulk_create([Country(description="Ж" * 3000)])
self.assertEqual(Country.objects.count(), 1)
@skipUnlessDBFeature("has_bulk_insert")
def test_long_and_short_text(self):
Country.objects.bulk_create(
[
Country(description="a" * 4001, iso_two_letter="A"),
Country(description="a", iso_two_letter="B"),
Country(description="Ж" * 2001, iso_two_letter="C"),
Country(description="Ж", iso_two_letter="D"),
]
)
self.assertEqual(Country.objects.count(), 4)
def test_multi_table_inheritance_unsupported(self):
expected_message = "Can't bulk create a multi-table inherited model"
with self.assertRaisesMessage(ValueError, expected_message):
Pizzeria.objects.bulk_create(
[
Pizzeria(name="The Art of Pizza"),
]
)
with self.assertRaisesMessage(ValueError, expected_message):
ProxyMultiCountry.objects.bulk_create(
[
ProxyMultiCountry(name="Fillory", iso_two_letter="FL"),
]
)
with self.assertRaisesMessage(ValueError, expected_message):
ProxyMultiProxyCountry.objects.bulk_create(
[
ProxyMultiProxyCountry(name="Fillory", iso_two_letter="FL"),
]
)
def test_proxy_inheritance_supported(self):
ProxyCountry.objects.bulk_create(
[
ProxyCountry(name="Qwghlm", iso_two_letter="QW"),
Country(name="Tortall", iso_two_letter="TA"),
]
)
self.assertQuerySetEqual(
ProxyCountry.objects.all(),
{"Qwghlm", "Tortall"},
attrgetter("name"),
ordered=False,
)
ProxyProxyCountry.objects.bulk_create(
[
ProxyProxyCountry(name="Netherlands", iso_two_letter="NT"),
]
)
self.assertQuerySetEqual(
ProxyProxyCountry.objects.all(),
{
"Qwghlm",
"Tortall",
"Netherlands",
},
attrgetter("name"),
ordered=False,
)
def test_non_auto_increment_pk(self):
State.objects.bulk_create(
[State(two_letter_code=s) for s in ["IL", "NY", "CA", "ME"]]
)
self.assertQuerySetEqual(
State.objects.order_by("two_letter_code"),
[
"CA",
"IL",
"ME",
"NY",
],
attrgetter("two_letter_code"),
)
@skipUnlessDBFeature("has_bulk_insert")
def test_non_auto_increment_pk_efficiency(self):
with self.assertNumQueries(1):
State.objects.bulk_create(
[State(two_letter_code=s) for s in ["IL", "NY", "CA", "ME"]]
)
self.assertQuerySetEqual(
State.objects.order_by("two_letter_code"),
[
"CA",
"IL",
"ME",
"NY",
],
attrgetter("two_letter_code"),
)
@skipIfDBFeature("allows_auto_pk_0")
def test_zero_as_autoval(self):
"""
Zero as id for AutoField should raise exception in MySQL, because MySQL
does not allow zero for automatic primary key if the
NO_AUTO_VALUE_ON_ZERO SQL mode is not enabled.
"""
valid_country = Country(name="Germany", iso_two_letter="DE")
invalid_country = Country(id=0, name="Poland", iso_two_letter="PL")
msg = "The database backend does not accept 0 as a value for AutoField."
with self.assertRaisesMessage(ValueError, msg):
Country.objects.bulk_create([valid_country, invalid_country])
def test_batch_same_vals(self):
# SQLite had a problem where all the same-valued models were
# collapsed to one insert.
Restaurant.objects.bulk_create([Restaurant(name="foo") for i in range(0, 2)])
self.assertEqual(Restaurant.objects.count(), 2)
def test_large_batch(self):
TwoFields.objects.bulk_create(
[TwoFields(f1=i, f2=i + 1) for i in range(0, 1001)]
)
self.assertEqual(TwoFields.objects.count(), 1001)
self.assertEqual(
TwoFields.objects.filter(f1__gte=450, f1__lte=550).count(), 101
)
self.assertEqual(TwoFields.objects.filter(f2__gte=901).count(), 101)
@skipUnlessDBFeature("has_bulk_insert")
def test_large_single_field_batch(self):
# SQLite had a problem with more than 500 UNIONed selects in single
# query.
Restaurant.objects.bulk_create([Restaurant() for i in range(0, 501)])
@skipUnlessDBFeature("has_bulk_insert")
def test_large_batch_efficiency(self):
with CaptureQueriesContext(connection) as ctx:
TwoFields.objects.bulk_create(
[TwoFields(f1=i, f2=i + 1) for i in range(0, 1001)]
)
self.assertLess(len(ctx), 10)
def test_large_batch_mixed(self):
"""
Test inserting a large batch with objects having primary key set
mixed together with objects without PK set.
"""
TwoFields.objects.bulk_create(
[
TwoFields(id=i if i % 2 == 0 else None, f1=i, f2=i + 1)
for i in range(100000, 101000)
]
)
self.assertEqual(TwoFields.objects.count(), 1000)
# We can't assume much about the ID's created, except that the above
# created IDs must exist.
id_range = range(100000, 101000, 2)
self.assertEqual(TwoFields.objects.filter(id__in=id_range).count(), 500)
self.assertEqual(TwoFields.objects.exclude(id__in=id_range).count(), 500)
@skipUnlessDBFeature("has_bulk_insert")
def test_large_batch_mixed_efficiency(self):
"""
Test inserting a large batch with objects having primary key set
mixed together with objects without PK set.
"""
with CaptureQueriesContext(connection) as ctx:
TwoFields.objects.bulk_create(
[
TwoFields(id=i if i % 2 == 0 else None, f1=i, f2=i + 1)
for i in range(100000, 101000)
]
)
self.assertLess(len(ctx), 10)
def test_explicit_batch_size(self):
objs = [TwoFields(f1=i, f2=i) for i in range(0, 4)]
num_objs = len(objs)
TwoFields.objects.bulk_create(objs, batch_size=1)
self.assertEqual(TwoFields.objects.count(), num_objs)
TwoFields.objects.all().delete()
TwoFields.objects.bulk_create(objs, batch_size=2)
self.assertEqual(TwoFields.objects.count(), num_objs)
TwoFields.objects.all().delete()
TwoFields.objects.bulk_create(objs, batch_size=3)
self.assertEqual(TwoFields.objects.count(), num_objs)
TwoFields.objects.all().delete()
TwoFields.objects.bulk_create(objs, batch_size=num_objs)
self.assertEqual(TwoFields.objects.count(), num_objs)
def test_empty_model(self):
NoFields.objects.bulk_create([NoFields() for i in range(2)])
self.assertEqual(NoFields.objects.count(), 2)
@skipUnlessDBFeature("has_bulk_insert")
def test_explicit_batch_size_efficiency(self):
objs = [TwoFields(f1=i, f2=i) for i in range(0, 100)]
with self.assertNumQueries(2):
TwoFields.objects.bulk_create(objs, 50)
TwoFields.objects.all().delete()
with self.assertNumQueries(1):
TwoFields.objects.bulk_create(objs, len(objs))
@skipUnlessDBFeature("has_bulk_insert")
def test_explicit_batch_size_respects_max_batch_size(self):
objs = [Country(name=f"Country {i}") for i in range(1000)]
fields = ["name", "iso_two_letter", "description"]
max_batch_size = max(connection.ops.bulk_batch_size(fields, objs), 1)
with self.assertNumQueries(ceil(len(objs) / max_batch_size)):
Country.objects.bulk_create(objs, batch_size=max_batch_size + 1)
@skipUnlessDBFeature("has_bulk_insert")
def test_max_batch_size(self):
objs = [Country(name=f"Country {i}") for i in range(1000)]
fields = ["name", "iso_two_letter", "description"]
max_batch_size = connection.ops.bulk_batch_size(fields, objs)
with self.assertNumQueries(ceil(len(objs) / max_batch_size)):
Country.objects.bulk_create(objs)
@skipUnlessDBFeature("has_bulk_insert")
def test_bulk_insert_expressions(self):
Restaurant.objects.bulk_create(
[
Restaurant(name="Sam's Shake Shack"),
Restaurant(name=Lower(Value("Betty's Beetroot Bar"))),
]
)
bbb = Restaurant.objects.filter(name="betty's beetroot bar")
self.assertEqual(bbb.count(), 1)
@skipUnlessDBFeature("has_bulk_insert")
def test_bulk_insert_now(self):
NullableFields.objects.bulk_create(
[
NullableFields(datetime_field=Now()),
NullableFields(datetime_field=Now()),
]
)
self.assertEqual(
NullableFields.objects.filter(datetime_field__isnull=False).count(),
2,
)
@skipUnlessDBFeature("has_bulk_insert")
def test_bulk_insert_nullable_fields(self):
fk_to_auto_fields = {
"auto_field": NoFields.objects.create(),
"small_auto_field": SmallAutoFieldModel.objects.create(),
"big_auto_field": BigAutoFieldModel.objects.create(),
}
# NULL can be mixed with other values in nullable fields
nullable_fields = [
field for field in NullableFields._meta.get_fields() if field.name != "id"
]
NullableFields.objects.bulk_create(
[
NullableFields(**{**fk_to_auto_fields, field.name: None})
for field in nullable_fields
]
)
self.assertEqual(NullableFields.objects.count(), len(nullable_fields))
for field in nullable_fields:
with self.subTest(field=field):
field_value = "" if isinstance(field, FileField) else None
self.assertEqual(
NullableFields.objects.filter(**{field.name: field_value}).count(),
1,
)
@skipUnlessDBFeature("can_return_rows_from_bulk_insert")
def test_set_pk_and_insert_single_item(self):
with self.assertNumQueries(1):
countries = Country.objects.bulk_create([self.data[0]])
self.assertEqual(len(countries), 1)
self.assertEqual(Country.objects.get(pk=countries[0].pk), countries[0])
@skipUnlessDBFeature("can_return_rows_from_bulk_insert")
def test_set_pk_and_query_efficiency(self):
with self.assertNumQueries(1):
countries = Country.objects.bulk_create(self.data)
self.assertEqual(len(countries), 4)
self.assertEqual(Country.objects.get(pk=countries[0].pk), countries[0])
self.assertEqual(Country.objects.get(pk=countries[1].pk), countries[1])
self.assertEqual(Country.objects.get(pk=countries[2].pk), countries[2])
self.assertEqual(Country.objects.get(pk=countries[3].pk), countries[3])
@skipUnlessDBFeature("can_return_rows_from_bulk_insert")
def test_set_state(self):
country_nl = Country(name="Netherlands", iso_two_letter="NL")
country_be = Country(name="Belgium", iso_two_letter="BE")
Country.objects.bulk_create([country_nl])
country_be.save()
# Objects save via bulk_create() and save() should have equal state.
self.assertEqual(country_nl._state.adding, country_be._state.adding)
self.assertEqual(country_nl._state.db, country_be._state.db)
def test_set_state_with_pk_specified(self):
state_ca = State(two_letter_code="CA")
state_ny = State(two_letter_code="NY")
State.objects.bulk_create([state_ca])
state_ny.save()
# Objects save via bulk_create() and save() should have equal state.
self.assertEqual(state_ca._state.adding, state_ny._state.adding)
self.assertEqual(state_ca._state.db, state_ny._state.db)
@skipIfDBFeature("supports_ignore_conflicts")
def test_ignore_conflicts_value_error(self):
message = "This database backend does not support ignoring conflicts."
with self.assertRaisesMessage(NotSupportedError, message):
TwoFields.objects.bulk_create(self.data, ignore_conflicts=True)
@skipUnlessDBFeature("supports_ignore_conflicts")
def test_ignore_conflicts_ignore(self):
data = [
TwoFields(f1=1, f2=1),
TwoFields(f1=2, f2=2),
TwoFields(f1=3, f2=3),
]
TwoFields.objects.bulk_create(data)
self.assertEqual(TwoFields.objects.count(), 3)
# With ignore_conflicts=True, conflicts are ignored.
conflicting_objects = [
TwoFields(f1=2, f2=2),
TwoFields(f1=3, f2=3),
]
TwoFields.objects.bulk_create([conflicting_objects[0]], ignore_conflicts=True)
TwoFields.objects.bulk_create(conflicting_objects, ignore_conflicts=True)
self.assertEqual(TwoFields.objects.count(), 3)
self.assertIsNone(conflicting_objects[0].pk)
self.assertIsNone(conflicting_objects[1].pk)
# New objects are created and conflicts are ignored.
new_object = TwoFields(f1=4, f2=4)
TwoFields.objects.bulk_create(
conflicting_objects + [new_object], ignore_conflicts=True
)
self.assertEqual(TwoFields.objects.count(), 4)
self.assertIsNone(new_object.pk)
# Without ignore_conflicts=True, there's a problem.
with self.assertRaises(IntegrityError):
TwoFields.objects.bulk_create(conflicting_objects)
def test_nullable_fk_after_parent(self):
parent = NoFields()
child = NullableFields(auto_field=parent, integer_field=88)
parent.save()
NullableFields.objects.bulk_create([child])
child = NullableFields.objects.get(integer_field=88)
self.assertEqual(child.auto_field, parent)
@skipUnlessDBFeature("can_return_rows_from_bulk_insert")
def test_nullable_fk_after_parent_bulk_create(self):
parent = NoFields()
child = NullableFields(auto_field=parent, integer_field=88)
NoFields.objects.bulk_create([parent])
NullableFields.objects.bulk_create([child])
child = NullableFields.objects.get(integer_field=88)
self.assertEqual(child.auto_field, parent)
def test_unsaved_parent(self):
parent = NoFields()
msg = (
"bulk_create() prohibited to prevent data loss due to unsaved "
"related object 'auto_field'."
)
with self.assertRaisesMessage(ValueError, msg):
NullableFields.objects.bulk_create([NullableFields(auto_field=parent)])
def test_invalid_batch_size_exception(self):
msg = "Batch size must be a positive integer."
with self.assertRaisesMessage(ValueError, msg):
Country.objects.bulk_create([], batch_size=-1)
@skipIfDBFeature("supports_update_conflicts")
def test_update_conflicts_unsupported(self):
msg = "This database backend does not support updating conflicts."
with self.assertRaisesMessage(NotSupportedError, msg):
Country.objects.bulk_create(self.data, update_conflicts=True)
@skipUnlessDBFeature("supports_ignore_conflicts", "supports_update_conflicts")
def test_ignore_update_conflicts_exclusive(self):
msg = "ignore_conflicts and update_conflicts are mutually exclusive"
with self.assertRaisesMessage(ValueError, msg):
Country.objects.bulk_create(
self.data,
ignore_conflicts=True,
update_conflicts=True,
)
@skipUnlessDBFeature("supports_update_conflicts")
def test_update_conflicts_no_update_fields(self):
msg = (
"Fields that will be updated when a row insertion fails on "
"conflicts must be provided."
)
with self.assertRaisesMessage(ValueError, msg):
Country.objects.bulk_create(self.data, update_conflicts=True)
@skipUnlessDBFeature("supports_update_conflicts")
@skipIfDBFeature("supports_update_conflicts_with_target")
def test_update_conflicts_unique_field_unsupported(self):
msg = (
"This database backend does not support updating conflicts with "
"specifying unique fields that can trigger the upsert."
)
with self.assertRaisesMessage(NotSupportedError, msg):
TwoFields.objects.bulk_create(
[TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)],
update_conflicts=True,
update_fields=["f2"],
unique_fields=["f1"],
)
@skipUnlessDBFeature("supports_update_conflicts")
def test_update_conflicts_nonexistent_update_fields(self):
unique_fields = None
if connection.features.supports_update_conflicts_with_target:
unique_fields = ["f1"]
msg = "TwoFields has no field named 'nonexistent'"
with self.assertRaisesMessage(FieldDoesNotExist, msg):
TwoFields.objects.bulk_create(
[TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)],
update_conflicts=True,
update_fields=["nonexistent"],
unique_fields=unique_fields,
)
@skipUnlessDBFeature(
"supports_update_conflicts",
"supports_update_conflicts_with_target",
)
def test_update_conflicts_unique_fields_required(self):
msg = "Unique fields that can trigger the upsert must be provided."
with self.assertRaisesMessage(ValueError, msg):
TwoFields.objects.bulk_create(
[TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)],
update_conflicts=True,
update_fields=["f1"],
)
@skipUnlessDBFeature(
"supports_update_conflicts",
"supports_update_conflicts_with_target",
)
def test_update_conflicts_invalid_update_fields(self):
msg = "bulk_create() can only be used with concrete fields in update_fields."
# Reverse one-to-one relationship.
with self.assertRaisesMessage(ValueError, msg):
Country.objects.bulk_create(
self.data,
update_conflicts=True,
update_fields=["relatedmodel"],
unique_fields=["pk"],
)
# Many-to-many relationship.
with self.assertRaisesMessage(ValueError, msg):
RelatedModel.objects.bulk_create(
[RelatedModel(country=self.data[0])],
update_conflicts=True,
update_fields=["big_auto_fields"],
unique_fields=["country"],
)
@skipUnlessDBFeature(
"supports_update_conflicts",
"supports_update_conflicts_with_target",
)
def test_update_conflicts_pk_in_update_fields(self):
msg = "bulk_create() cannot be used with primary keys in update_fields."
with self.assertRaisesMessage(ValueError, msg):
BigAutoFieldModel.objects.bulk_create(
[BigAutoFieldModel()],
update_conflicts=True,
update_fields=["id"],
unique_fields=["id"],
)
@skipUnlessDBFeature(
"supports_update_conflicts",
"supports_update_conflicts_with_target",
)
def test_update_conflicts_invalid_unique_fields(self):
msg = "bulk_create() can only be used with concrete fields in unique_fields."
# Reverse one-to-one relationship.
with self.assertRaisesMessage(ValueError, msg):
Country.objects.bulk_create(
self.data,
update_conflicts=True,
update_fields=["name"],
unique_fields=["relatedmodel"],
)
# Many-to-many relationship.
with self.assertRaisesMessage(ValueError, msg):
RelatedModel.objects.bulk_create(
[RelatedModel(country=self.data[0])],
update_conflicts=True,
update_fields=["name"],
unique_fields=["big_auto_fields"],
)
def _test_update_conflicts_two_fields(self, unique_fields):
TwoFields.objects.bulk_create(
[
TwoFields(f1=1, f2=1, name="a"),
TwoFields(f1=2, f2=2, name="b"),
]
)
self.assertEqual(TwoFields.objects.count(), 2)
conflicting_objects = [
TwoFields(f1=1, f2=1, name="c"),
TwoFields(f1=2, f2=2, name="d"),
]
results = TwoFields.objects.bulk_create(
conflicting_objects,
update_conflicts=True,
unique_fields=unique_fields,
update_fields=["name"],
)
self.assertEqual(len(results), len(conflicting_objects))
if connection.features.can_return_rows_from_bulk_insert:
for instance in results:
self.assertIsNotNone(instance.pk)
self.assertEqual(TwoFields.objects.count(), 2)
self.assertCountEqual(
TwoFields.objects.values("f1", "f2", "name"),
[
{"f1": 1, "f2": 1, "name": "c"},
{"f1": 2, "f2": 2, "name": "d"},
],
)
@skipUnlessDBFeature(
"supports_update_conflicts", "supports_update_conflicts_with_target"
)
def test_update_conflicts_two_fields_unique_fields_first(self):
self._test_update_conflicts_two_fields(["f1"])
@skipUnlessDBFeature(
"supports_update_conflicts", "supports_update_conflicts_with_target"
)
def test_update_conflicts_two_fields_unique_fields_second(self):
self._test_update_conflicts_two_fields(["f2"])
@skipUnlessDBFeature(
"supports_update_conflicts", "supports_update_conflicts_with_target"
)
def test_update_conflicts_unique_fields_pk(self):
TwoFields.objects.bulk_create(
[
TwoFields(f1=1, f2=1, name="a"),
TwoFields(f1=2, f2=2, name="b"),
]
)
obj1 = TwoFields.objects.get(f1=1)
obj2 = TwoFields.objects.get(f1=2)
conflicting_objects = [
TwoFields(pk=obj1.pk, f1=3, f2=3, name="c"),
TwoFields(pk=obj2.pk, f1=4, f2=4, name="d"),
]
results = TwoFields.objects.bulk_create(
conflicting_objects,
update_conflicts=True,
unique_fields=["pk"],
update_fields=["name"],
)
self.assertEqual(len(results), len(conflicting_objects))
if connection.features.can_return_rows_from_bulk_insert:
for instance in results:
self.assertIsNotNone(instance.pk)
self.assertEqual(TwoFields.objects.count(), 2)
self.assertCountEqual(
TwoFields.objects.values("f1", "f2", "name"),
[
{"f1": 1, "f2": 1, "name": "c"},
{"f1": 2, "f2": 2, "name": "d"},
],
)
@skipUnlessDBFeature(
"supports_update_conflicts", "supports_update_conflicts_with_target"
)
def test_update_conflicts_two_fields_unique_fields_both(self):
with self.assertRaises((OperationalError, ProgrammingError)):
self._test_update_conflicts_two_fields(["f1", "f2"])
@skipUnlessDBFeature("supports_update_conflicts")
@skipIfDBFeature("supports_update_conflicts_with_target")
def test_update_conflicts_two_fields_no_unique_fields(self):
self._test_update_conflicts_two_fields([])
def _test_update_conflicts_unique_two_fields(self, unique_fields):
Country.objects.bulk_create(self.data)
self.assertEqual(Country.objects.count(), 4)
new_data = [
# Conflicting countries.
Country(
name="Germany",
iso_two_letter="DE",
description=("Germany is a country in Central Europe."),
),
Country(
name="Czech Republic",
iso_two_letter="CZ",
description=(
"The Czech Republic is a landlocked country in Central Europe."
),
),
# New countries.
Country(name="Australia", iso_two_letter="AU"),
Country(
name="Japan",
iso_two_letter="JP",
description=("Japan is an island country in East Asia."),
),
]
results = Country.objects.bulk_create(
new_data,
update_conflicts=True,
update_fields=["description"],
unique_fields=unique_fields,
)
self.assertEqual(len(results), len(new_data))
if connection.features.can_return_rows_from_bulk_insert:
for instance in results:
self.assertIsNotNone(instance.pk)
self.assertEqual(Country.objects.count(), 6)
self.assertCountEqual(
Country.objects.values("iso_two_letter", "description"),
[
{"iso_two_letter": "US", "description": ""},
{"iso_two_letter": "NL", "description": ""},
{
"iso_two_letter": "DE",
"description": ("Germany is a country in Central Europe."),
},
{
"iso_two_letter": "CZ",
"description": (
"The Czech Republic is a landlocked country in Central Europe."
),
},
{"iso_two_letter": "AU", "description": ""},
{
"iso_two_letter": "JP",
"description": ("Japan is an island country in East Asia."),
},
],
)
@skipUnlessDBFeature(
"supports_update_conflicts", "supports_update_conflicts_with_target"
)
def test_update_conflicts_unique_two_fields_unique_fields_both(self):
self._test_update_conflicts_unique_two_fields(["iso_two_letter", "name"])
@skipUnlessDBFeature(
"supports_update_conflicts", "supports_update_conflicts_with_target"
)
def test_update_conflicts_unique_two_fields_unique_fields_one(self):
with self.assertRaises((OperationalError, ProgrammingError)):
self._test_update_conflicts_unique_two_fields(["iso_two_letter"])
@skipUnlessDBFeature("supports_update_conflicts")
@skipIfDBFeature("supports_update_conflicts_with_target")
def test_update_conflicts_unique_two_fields_unique_no_unique_fields(self):
self._test_update_conflicts_unique_two_fields([])
def _test_update_conflicts(self, unique_fields):
UpsertConflict.objects.bulk_create(
[
UpsertConflict(number=1, rank=1, name="John"),
UpsertConflict(number=2, rank=2, name="Mary"),
UpsertConflict(number=3, rank=3, name="Hannah"),
]
)
self.assertEqual(UpsertConflict.objects.count(), 3)
conflicting_objects = [
UpsertConflict(number=1, rank=4, name="Steve"),
UpsertConflict(number=2, rank=2, name="Olivia"),
UpsertConflict(number=3, rank=1, name="Hannah"),
]
results = UpsertConflict.objects.bulk_create(
conflicting_objects,
update_conflicts=True,
update_fields=["name", "rank"],
unique_fields=unique_fields,
)
self.assertEqual(len(results), len(conflicting_objects))
if connection.features.can_return_rows_from_bulk_insert:
for instance in results:
self.assertIsNotNone(instance.pk)
self.assertEqual(UpsertConflict.objects.count(), 3)
self.assertCountEqual(
UpsertConflict.objects.values("number", "rank", "name"),
[
{"number": 1, "rank": 4, "name": "Steve"},
{"number": 2, "rank": 2, "name": "Olivia"},
{"number": 3, "rank": 1, "name": "Hannah"},
],
)
results = UpsertConflict.objects.bulk_create(
conflicting_objects + [UpsertConflict(number=4, rank=4, name="Mark")],
update_conflicts=True,
update_fields=["name", "rank"],
unique_fields=unique_fields,
)
self.assertEqual(len(results), 4)
if connection.features.can_return_rows_from_bulk_insert:
for instance in results:
self.assertIsNotNone(instance.pk)
self.assertEqual(UpsertConflict.objects.count(), 4)
self.assertCountEqual(
UpsertConflict.objects.values("number", "rank", "name"),
[
{"number": 1, "rank": 4, "name": "Steve"},
{"number": 2, "rank": 2, "name": "Olivia"},
{"number": 3, "rank": 1, "name": "Hannah"},
{"number": 4, "rank": 4, "name": "Mark"},
],
)
@skipUnlessDBFeature(
"supports_update_conflicts", "supports_update_conflicts_with_target"
)
def test_update_conflicts_unique_fields(self):
self._test_update_conflicts(unique_fields=["number"])
@skipUnlessDBFeature("supports_update_conflicts")
@skipIfDBFeature("supports_update_conflicts_with_target")
def test_update_conflicts_no_unique_fields(self):
self._test_update_conflicts([])
@skipUnlessDBFeature(
"supports_update_conflicts", "supports_update_conflicts_with_target"
)
def test_update_conflicts_unique_fields_update_fields_db_column(self):
FieldsWithDbColumns.objects.bulk_create(
[
FieldsWithDbColumns(rank=1, name="a"),
FieldsWithDbColumns(rank=2, name="b"),
]
)
self.assertEqual(FieldsWithDbColumns.objects.count(), 2)
conflicting_objects = [
FieldsWithDbColumns(rank=1, name="c"),
FieldsWithDbColumns(rank=2, name="d"),
]
results = FieldsWithDbColumns.objects.bulk_create(
conflicting_objects,
update_conflicts=True,
unique_fields=["rank"],
update_fields=["name"],
)
self.assertEqual(len(results), len(conflicting_objects))
if connection.features.can_return_rows_from_bulk_insert:
for instance in results:
self.assertIsNotNone(instance.pk)
self.assertEqual(FieldsWithDbColumns.objects.count(), 2)
self.assertCountEqual(
FieldsWithDbColumns.objects.values("rank", "name"),
[
{"rank": 1, "name": "c"},
{"rank": 2, "name": "d"},
],
)
@skipUnlessDBFeature("supports_expression_defaults")
def test_db_default_field_excluded(self):
# created_at is excluded when no db_default override is provided.
with self.assertNumQueries(1) as ctx:
DbDefaultModel.objects.bulk_create(
[DbDefaultModel(name="foo"), DbDefaultModel(name="bar")]
)
created_at_quoted_name = connection.ops.quote_name("created_at")
self.assertEqual(
ctx[0]["sql"].count(created_at_quoted_name),
1 if connection.features.can_return_rows_from_bulk_insert else 0,
)
# created_at is included when a db_default override is provided.
with self.assertNumQueries(1) as ctx:
DbDefaultModel.objects.bulk_create(
[
DbDefaultModel(name="foo", created_at=timezone.now()),
DbDefaultModel(name="bar"),
]
)
self.assertEqual(
ctx[0]["sql"].count(created_at_quoted_name),
2 if connection.features.can_return_rows_from_bulk_insert else 1,
)
@skipUnlessDBFeature(
"can_return_rows_from_bulk_insert", "supports_expression_defaults"
)
def test_db_default_primary_key(self):
(obj,) = DbDefaultPrimaryKey.objects.bulk_create([DbDefaultPrimaryKey()])
self.assertIsInstance(obj.id, datetime)
@skipUnlessDBFeature("supports_transactions", "has_bulk_insert")
class BulkCreateTransactionTests(TransactionTestCase):
available_apps = ["bulk_create"]
def test_no_unnecessary_transaction(self):
with self.assertNumQueries(1):
Country.objects.bulk_create(
[Country(id=1, name="France", iso_two_letter="FR")]
)
with self.assertNumQueries(1):
Country.objects.bulk_create([Country(name="Canada", iso_two_letter="CA")])
def test_objs_with_and_without_pk(self):
with self.assertNumQueries(4):
Country.objects.bulk_create(
[
Country(id=10, name="France", iso_two_letter="FR"),
Country(name="Canada", iso_two_letter="CA"),
]
)
def test_multiple_batches(self):
with self.assertNumQueries(4):
Country.objects.bulk_create(
[
Country(name="France", iso_two_letter="FR"),
Country(name="Canada", iso_two_letter="CA"),
],
batch_size=1,
)