from math import ceil
from operator import attrgetter

from django.core.exceptions import FieldDoesNotExist
from django.db import (
    IntegrityError,
    NotSupportedError,
    OperationalError,
    ProgrammingError,
    connection,
)
from django.db.models import FileField, Value
from django.db.models.functions import Lower, Now
from django.test import (
    TestCase,
    override_settings,
    skipIfDBFeature,
    skipUnlessDBFeature,
)

from .models import (
    BigAutoFieldModel,
    Country,
    FieldsWithDbColumns,
    NoFields,
    NullableFields,
    Pizzeria,
    ProxyCountry,
    ProxyMultiCountry,
    ProxyMultiProxyCountry,
    ProxyProxyCountry,
    RelatedModel,
    Restaurant,
    SmallAutoFieldModel,
    State,
    TwoFields,
    UpsertConflict,
)


class BulkCreateTests(TestCase):
    def setUp(self):
        self.data = [
            Country(name="United States of America", iso_two_letter="US"),
            Country(name="The Netherlands", iso_two_letter="NL"),
            Country(name="Germany", iso_two_letter="DE"),
            Country(name="Czech Republic", iso_two_letter="CZ"),
        ]

    def test_simple(self):
        created = Country.objects.bulk_create(self.data)
        self.assertEqual(created, self.data)
        self.assertQuerySetEqual(
            Country.objects.order_by("-name"),
            [
                "United States of America",
                "The Netherlands",
                "Germany",
                "Czech Republic",
            ],
            attrgetter("name"),
        )

        created = Country.objects.bulk_create([])
        self.assertEqual(created, [])
        self.assertEqual(Country.objects.count(), 4)

    @skipUnlessDBFeature("has_bulk_insert")
    def test_efficiency(self):
        with self.assertNumQueries(1):
            Country.objects.bulk_create(self.data)

    @skipUnlessDBFeature("has_bulk_insert")
    def test_long_non_ascii_text(self):
        """
        Inserting non-ASCII values with a length in the range 2001 to 4000
        characters, i.e. 4002 to 8000 bytes, must be set as a CLOB on Oracle
        (#22144).
        """
        Country.objects.bulk_create([Country(description="Ж" * 3000)])
        self.assertEqual(Country.objects.count(), 1)

    @skipUnlessDBFeature("has_bulk_insert")
    def test_long_and_short_text(self):
        Country.objects.bulk_create(
            [
                Country(description="a" * 4001, iso_two_letter="A"),
                Country(description="a", iso_two_letter="B"),
                Country(description="Ж" * 2001, iso_two_letter="C"),
                Country(description="Ж", iso_two_letter="D"),
            ]
        )
        self.assertEqual(Country.objects.count(), 4)

    def test_multi_table_inheritance_unsupported(self):
        expected_message = "Can't bulk create a multi-table inherited model"
        with self.assertRaisesMessage(ValueError, expected_message):
            Pizzeria.objects.bulk_create(
                [
                    Pizzeria(name="The Art of Pizza"),
                ]
            )
        with self.assertRaisesMessage(ValueError, expected_message):
            ProxyMultiCountry.objects.bulk_create(
                [
                    ProxyMultiCountry(name="Fillory", iso_two_letter="FL"),
                ]
            )
        with self.assertRaisesMessage(ValueError, expected_message):
            ProxyMultiProxyCountry.objects.bulk_create(
                [
                    ProxyMultiProxyCountry(name="Fillory", iso_two_letter="FL"),
                ]
            )

    def test_proxy_inheritance_supported(self):
        ProxyCountry.objects.bulk_create(
            [
                ProxyCountry(name="Qwghlm", iso_two_letter="QW"),
                Country(name="Tortall", iso_two_letter="TA"),
            ]
        )
        self.assertQuerySetEqual(
            ProxyCountry.objects.all(),
            {"Qwghlm", "Tortall"},
            attrgetter("name"),
            ordered=False,
        )

        ProxyProxyCountry.objects.bulk_create(
            [
                ProxyProxyCountry(name="Netherlands", iso_two_letter="NT"),
            ]
        )
        self.assertQuerySetEqual(
            ProxyProxyCountry.objects.all(),
            {
                "Qwghlm",
                "Tortall",
                "Netherlands",
            },
            attrgetter("name"),
            ordered=False,
        )

    def test_non_auto_increment_pk(self):
        State.objects.bulk_create(
            [State(two_letter_code=s) for s in ["IL", "NY", "CA", "ME"]]
        )
        self.assertQuerySetEqual(
            State.objects.order_by("two_letter_code"),
            [
                "CA",
                "IL",
                "ME",
                "NY",
            ],
            attrgetter("two_letter_code"),
        )

    @skipUnlessDBFeature("has_bulk_insert")
    def test_non_auto_increment_pk_efficiency(self):
        with self.assertNumQueries(1):
            State.objects.bulk_create(
                [State(two_letter_code=s) for s in ["IL", "NY", "CA", "ME"]]
            )
        self.assertQuerySetEqual(
            State.objects.order_by("two_letter_code"),
            [
                "CA",
                "IL",
                "ME",
                "NY",
            ],
            attrgetter("two_letter_code"),
        )

    @skipIfDBFeature("allows_auto_pk_0")
    def test_zero_as_autoval(self):
        """
        Zero as id for AutoField should raise exception in MySQL, because MySQL
        does not allow zero for automatic primary key if the
        NO_AUTO_VALUE_ON_ZERO SQL mode is not enabled.
        """
        valid_country = Country(name="Germany", iso_two_letter="DE")
        invalid_country = Country(id=0, name="Poland", iso_two_letter="PL")
        msg = "The database backend does not accept 0 as a value for AutoField."
        with self.assertRaisesMessage(ValueError, msg):
            Country.objects.bulk_create([valid_country, invalid_country])

    def test_batch_same_vals(self):
        # SQLite had a problem where all the same-valued models were
        # collapsed to one insert.
        Restaurant.objects.bulk_create([Restaurant(name="foo") for i in range(0, 2)])
        self.assertEqual(Restaurant.objects.count(), 2)

    def test_large_batch(self):
        TwoFields.objects.bulk_create(
            [TwoFields(f1=i, f2=i + 1) for i in range(0, 1001)]
        )
        self.assertEqual(TwoFields.objects.count(), 1001)
        self.assertEqual(
            TwoFields.objects.filter(f1__gte=450, f1__lte=550).count(), 101
        )
        self.assertEqual(TwoFields.objects.filter(f2__gte=901).count(), 101)

    @skipUnlessDBFeature("has_bulk_insert")
    def test_large_single_field_batch(self):
        # SQLite had a problem with more than 500 UNIONed selects in single
        # query.
        Restaurant.objects.bulk_create([Restaurant() for i in range(0, 501)])

    @skipUnlessDBFeature("has_bulk_insert")
    def test_large_batch_efficiency(self):
        with override_settings(DEBUG=True):
            connection.queries_log.clear()
            TwoFields.objects.bulk_create(
                [TwoFields(f1=i, f2=i + 1) for i in range(0, 1001)]
            )
            self.assertLess(len(connection.queries), 10)

    def test_large_batch_mixed(self):
        """
        Test inserting a large batch with objects having primary key set
        mixed together with objects without PK set.
        """
        TwoFields.objects.bulk_create(
            [
                TwoFields(id=i if i % 2 == 0 else None, f1=i, f2=i + 1)
                for i in range(100000, 101000)
            ]
        )
        self.assertEqual(TwoFields.objects.count(), 1000)
        # We can't assume much about the ID's created, except that the above
        # created IDs must exist.
        id_range = range(100000, 101000, 2)
        self.assertEqual(TwoFields.objects.filter(id__in=id_range).count(), 500)
        self.assertEqual(TwoFields.objects.exclude(id__in=id_range).count(), 500)

    @skipUnlessDBFeature("has_bulk_insert")
    def test_large_batch_mixed_efficiency(self):
        """
        Test inserting a large batch with objects having primary key set
        mixed together with objects without PK set.
        """
        with override_settings(DEBUG=True):
            connection.queries_log.clear()
            TwoFields.objects.bulk_create(
                [
                    TwoFields(id=i if i % 2 == 0 else None, f1=i, f2=i + 1)
                    for i in range(100000, 101000)
                ]
            )
            self.assertLess(len(connection.queries), 10)

    def test_explicit_batch_size(self):
        objs = [TwoFields(f1=i, f2=i) for i in range(0, 4)]
        num_objs = len(objs)
        TwoFields.objects.bulk_create(objs, batch_size=1)
        self.assertEqual(TwoFields.objects.count(), num_objs)
        TwoFields.objects.all().delete()
        TwoFields.objects.bulk_create(objs, batch_size=2)
        self.assertEqual(TwoFields.objects.count(), num_objs)
        TwoFields.objects.all().delete()
        TwoFields.objects.bulk_create(objs, batch_size=3)
        self.assertEqual(TwoFields.objects.count(), num_objs)
        TwoFields.objects.all().delete()
        TwoFields.objects.bulk_create(objs, batch_size=num_objs)
        self.assertEqual(TwoFields.objects.count(), num_objs)

    def test_empty_model(self):
        NoFields.objects.bulk_create([NoFields() for i in range(2)])
        self.assertEqual(NoFields.objects.count(), 2)

    @skipUnlessDBFeature("has_bulk_insert")
    def test_explicit_batch_size_efficiency(self):
        objs = [TwoFields(f1=i, f2=i) for i in range(0, 100)]
        with self.assertNumQueries(2):
            TwoFields.objects.bulk_create(objs, 50)
        TwoFields.objects.all().delete()
        with self.assertNumQueries(1):
            TwoFields.objects.bulk_create(objs, len(objs))

    @skipUnlessDBFeature("has_bulk_insert")
    def test_explicit_batch_size_respects_max_batch_size(self):
        objs = [Country(name=f"Country {i}") for i in range(1000)]
        fields = ["name", "iso_two_letter", "description"]
        max_batch_size = max(connection.ops.bulk_batch_size(fields, objs), 1)
        with self.assertNumQueries(ceil(len(objs) / max_batch_size)):
            Country.objects.bulk_create(objs, batch_size=max_batch_size + 1)

    @skipUnlessDBFeature("has_bulk_insert")
    def test_bulk_insert_expressions(self):
        Restaurant.objects.bulk_create(
            [
                Restaurant(name="Sam's Shake Shack"),
                Restaurant(name=Lower(Value("Betty's Beetroot Bar"))),
            ]
        )
        bbb = Restaurant.objects.filter(name="betty's beetroot bar")
        self.assertEqual(bbb.count(), 1)

    @skipUnlessDBFeature("has_bulk_insert")
    def test_bulk_insert_now(self):
        NullableFields.objects.bulk_create(
            [
                NullableFields(datetime_field=Now()),
                NullableFields(datetime_field=Now()),
            ]
        )
        self.assertEqual(
            NullableFields.objects.filter(datetime_field__isnull=False).count(),
            2,
        )

    @skipUnlessDBFeature("has_bulk_insert")
    def test_bulk_insert_nullable_fields(self):
        fk_to_auto_fields = {
            "auto_field": NoFields.objects.create(),
            "small_auto_field": SmallAutoFieldModel.objects.create(),
            "big_auto_field": BigAutoFieldModel.objects.create(),
        }
        # NULL can be mixed with other values in nullable fields
        nullable_fields = [
            field for field in NullableFields._meta.get_fields() if field.name != "id"
        ]
        NullableFields.objects.bulk_create(
            [
                NullableFields(**{**fk_to_auto_fields, field.name: None})
                for field in nullable_fields
            ]
        )
        self.assertEqual(NullableFields.objects.count(), len(nullable_fields))
        for field in nullable_fields:
            with self.subTest(field=field):
                field_value = "" if isinstance(field, FileField) else None
                self.assertEqual(
                    NullableFields.objects.filter(**{field.name: field_value}).count(),
                    1,
                )

    @skipUnlessDBFeature("can_return_rows_from_bulk_insert")
    def test_set_pk_and_insert_single_item(self):
        with self.assertNumQueries(1):
            countries = Country.objects.bulk_create([self.data[0]])
        self.assertEqual(len(countries), 1)
        self.assertEqual(Country.objects.get(pk=countries[0].pk), countries[0])

    @skipUnlessDBFeature("can_return_rows_from_bulk_insert")
    def test_set_pk_and_query_efficiency(self):
        with self.assertNumQueries(1):
            countries = Country.objects.bulk_create(self.data)
        self.assertEqual(len(countries), 4)
        self.assertEqual(Country.objects.get(pk=countries[0].pk), countries[0])
        self.assertEqual(Country.objects.get(pk=countries[1].pk), countries[1])
        self.assertEqual(Country.objects.get(pk=countries[2].pk), countries[2])
        self.assertEqual(Country.objects.get(pk=countries[3].pk), countries[3])

    @skipUnlessDBFeature("can_return_rows_from_bulk_insert")
    def test_set_state(self):
        country_nl = Country(name="Netherlands", iso_two_letter="NL")
        country_be = Country(name="Belgium", iso_two_letter="BE")
        Country.objects.bulk_create([country_nl])
        country_be.save()
        # Objects save via bulk_create() and save() should have equal state.
        self.assertEqual(country_nl._state.adding, country_be._state.adding)
        self.assertEqual(country_nl._state.db, country_be._state.db)

    def test_set_state_with_pk_specified(self):
        state_ca = State(two_letter_code="CA")
        state_ny = State(two_letter_code="NY")
        State.objects.bulk_create([state_ca])
        state_ny.save()
        # Objects save via bulk_create() and save() should have equal state.
        self.assertEqual(state_ca._state.adding, state_ny._state.adding)
        self.assertEqual(state_ca._state.db, state_ny._state.db)

    @skipIfDBFeature("supports_ignore_conflicts")
    def test_ignore_conflicts_value_error(self):
        message = "This database backend does not support ignoring conflicts."
        with self.assertRaisesMessage(NotSupportedError, message):
            TwoFields.objects.bulk_create(self.data, ignore_conflicts=True)

    @skipUnlessDBFeature("supports_ignore_conflicts")
    def test_ignore_conflicts_ignore(self):
        data = [
            TwoFields(f1=1, f2=1),
            TwoFields(f1=2, f2=2),
            TwoFields(f1=3, f2=3),
        ]
        TwoFields.objects.bulk_create(data)
        self.assertEqual(TwoFields.objects.count(), 3)
        # With ignore_conflicts=True, conflicts are ignored.
        conflicting_objects = [
            TwoFields(f1=2, f2=2),
            TwoFields(f1=3, f2=3),
        ]
        TwoFields.objects.bulk_create([conflicting_objects[0]], ignore_conflicts=True)
        TwoFields.objects.bulk_create(conflicting_objects, ignore_conflicts=True)
        self.assertEqual(TwoFields.objects.count(), 3)
        self.assertIsNone(conflicting_objects[0].pk)
        self.assertIsNone(conflicting_objects[1].pk)
        # New objects are created and conflicts are ignored.
        new_object = TwoFields(f1=4, f2=4)
        TwoFields.objects.bulk_create(
            conflicting_objects + [new_object], ignore_conflicts=True
        )
        self.assertEqual(TwoFields.objects.count(), 4)
        self.assertIsNone(new_object.pk)
        # Without ignore_conflicts=True, there's a problem.
        with self.assertRaises(IntegrityError):
            TwoFields.objects.bulk_create(conflicting_objects)

    def test_nullable_fk_after_parent(self):
        parent = NoFields()
        child = NullableFields(auto_field=parent, integer_field=88)
        parent.save()
        NullableFields.objects.bulk_create([child])
        child = NullableFields.objects.get(integer_field=88)
        self.assertEqual(child.auto_field, parent)

    @skipUnlessDBFeature("can_return_rows_from_bulk_insert")
    def test_nullable_fk_after_parent_bulk_create(self):
        parent = NoFields()
        child = NullableFields(auto_field=parent, integer_field=88)
        NoFields.objects.bulk_create([parent])
        NullableFields.objects.bulk_create([child])
        child = NullableFields.objects.get(integer_field=88)
        self.assertEqual(child.auto_field, parent)

    def test_unsaved_parent(self):
        parent = NoFields()
        msg = (
            "bulk_create() prohibited to prevent data loss due to unsaved "
            "related object 'auto_field'."
        )
        with self.assertRaisesMessage(ValueError, msg):
            NullableFields.objects.bulk_create([NullableFields(auto_field=parent)])

    def test_invalid_batch_size_exception(self):
        msg = "Batch size must be a positive integer."
        with self.assertRaisesMessage(ValueError, msg):
            Country.objects.bulk_create([], batch_size=-1)

    @skipIfDBFeature("supports_update_conflicts")
    def test_update_conflicts_unsupported(self):
        msg = "This database backend does not support updating conflicts."
        with self.assertRaisesMessage(NotSupportedError, msg):
            Country.objects.bulk_create(self.data, update_conflicts=True)

    @skipUnlessDBFeature("supports_ignore_conflicts", "supports_update_conflicts")
    def test_ignore_update_conflicts_exclusive(self):
        msg = "ignore_conflicts and update_conflicts are mutually exclusive"
        with self.assertRaisesMessage(ValueError, msg):
            Country.objects.bulk_create(
                self.data,
                ignore_conflicts=True,
                update_conflicts=True,
            )

    @skipUnlessDBFeature("supports_update_conflicts")
    def test_update_conflicts_no_update_fields(self):
        msg = (
            "Fields that will be updated when a row insertion fails on "
            "conflicts must be provided."
        )
        with self.assertRaisesMessage(ValueError, msg):
            Country.objects.bulk_create(self.data, update_conflicts=True)

    @skipUnlessDBFeature("supports_update_conflicts")
    @skipIfDBFeature("supports_update_conflicts_with_target")
    def test_update_conflicts_unique_field_unsupported(self):
        msg = (
            "This database backend does not support updating conflicts with "
            "specifying unique fields that can trigger the upsert."
        )
        with self.assertRaisesMessage(NotSupportedError, msg):
            TwoFields.objects.bulk_create(
                [TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)],
                update_conflicts=True,
                update_fields=["f2"],
                unique_fields=["f1"],
            )

    @skipUnlessDBFeature("supports_update_conflicts")
    def test_update_conflicts_nonexistent_update_fields(self):
        unique_fields = None
        if connection.features.supports_update_conflicts_with_target:
            unique_fields = ["f1"]
        msg = "TwoFields has no field named 'nonexistent'"
        with self.assertRaisesMessage(FieldDoesNotExist, msg):
            TwoFields.objects.bulk_create(
                [TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)],
                update_conflicts=True,
                update_fields=["nonexistent"],
                unique_fields=unique_fields,
            )

    @skipUnlessDBFeature(
        "supports_update_conflicts",
        "supports_update_conflicts_with_target",
    )
    def test_update_conflicts_unique_fields_required(self):
        msg = "Unique fields that can trigger the upsert must be provided."
        with self.assertRaisesMessage(ValueError, msg):
            TwoFields.objects.bulk_create(
                [TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)],
                update_conflicts=True,
                update_fields=["f1"],
            )

    @skipUnlessDBFeature(
        "supports_update_conflicts",
        "supports_update_conflicts_with_target",
    )
    def test_update_conflicts_invalid_update_fields(self):
        msg = "bulk_create() can only be used with concrete fields in update_fields."
        # Reverse one-to-one relationship.
        with self.assertRaisesMessage(ValueError, msg):
            Country.objects.bulk_create(
                self.data,
                update_conflicts=True,
                update_fields=["relatedmodel"],
                unique_fields=["pk"],
            )
        # Many-to-many relationship.
        with self.assertRaisesMessage(ValueError, msg):
            RelatedModel.objects.bulk_create(
                [RelatedModel(country=self.data[0])],
                update_conflicts=True,
                update_fields=["big_auto_fields"],
                unique_fields=["country"],
            )

    @skipUnlessDBFeature(
        "supports_update_conflicts",
        "supports_update_conflicts_with_target",
    )
    def test_update_conflicts_pk_in_update_fields(self):
        msg = "bulk_create() cannot be used with primary keys in update_fields."
        with self.assertRaisesMessage(ValueError, msg):
            BigAutoFieldModel.objects.bulk_create(
                [BigAutoFieldModel()],
                update_conflicts=True,
                update_fields=["id"],
                unique_fields=["id"],
            )

    @skipUnlessDBFeature(
        "supports_update_conflicts",
        "supports_update_conflicts_with_target",
    )
    def test_update_conflicts_invalid_unique_fields(self):
        msg = "bulk_create() can only be used with concrete fields in unique_fields."
        # Reverse one-to-one relationship.
        with self.assertRaisesMessage(ValueError, msg):
            Country.objects.bulk_create(
                self.data,
                update_conflicts=True,
                update_fields=["name"],
                unique_fields=["relatedmodel"],
            )
        # Many-to-many relationship.
        with self.assertRaisesMessage(ValueError, msg):
            RelatedModel.objects.bulk_create(
                [RelatedModel(country=self.data[0])],
                update_conflicts=True,
                update_fields=["name"],
                unique_fields=["big_auto_fields"],
            )

    def _test_update_conflicts_two_fields(self, unique_fields):
        TwoFields.objects.bulk_create(
            [
                TwoFields(f1=1, f2=1, name="a"),
                TwoFields(f1=2, f2=2, name="b"),
            ]
        )
        self.assertEqual(TwoFields.objects.count(), 2)

        conflicting_objects = [
            TwoFields(f1=1, f2=1, name="c"),
            TwoFields(f1=2, f2=2, name="d"),
        ]
        results = TwoFields.objects.bulk_create(
            conflicting_objects,
            update_conflicts=True,
            unique_fields=unique_fields,
            update_fields=["name"],
        )
        self.assertEqual(len(results), len(conflicting_objects))
        if connection.features.can_return_rows_from_bulk_insert:
            for instance in results:
                self.assertIsNotNone(instance.pk)
        self.assertEqual(TwoFields.objects.count(), 2)
        self.assertCountEqual(
            TwoFields.objects.values("f1", "f2", "name"),
            [
                {"f1": 1, "f2": 1, "name": "c"},
                {"f1": 2, "f2": 2, "name": "d"},
            ],
        )

    @skipUnlessDBFeature(
        "supports_update_conflicts", "supports_update_conflicts_with_target"
    )
    def test_update_conflicts_two_fields_unique_fields_first(self):
        self._test_update_conflicts_two_fields(["f1"])

    @skipUnlessDBFeature(
        "supports_update_conflicts", "supports_update_conflicts_with_target"
    )
    def test_update_conflicts_two_fields_unique_fields_second(self):
        self._test_update_conflicts_two_fields(["f2"])

    @skipUnlessDBFeature(
        "supports_update_conflicts", "supports_update_conflicts_with_target"
    )
    def test_update_conflicts_unique_fields_pk(self):
        TwoFields.objects.bulk_create(
            [
                TwoFields(f1=1, f2=1, name="a"),
                TwoFields(f1=2, f2=2, name="b"),
            ]
        )

        obj1 = TwoFields.objects.get(f1=1)
        obj2 = TwoFields.objects.get(f1=2)
        conflicting_objects = [
            TwoFields(pk=obj1.pk, f1=3, f2=3, name="c"),
            TwoFields(pk=obj2.pk, f1=4, f2=4, name="d"),
        ]
        results = TwoFields.objects.bulk_create(
            conflicting_objects,
            update_conflicts=True,
            unique_fields=["pk"],
            update_fields=["name"],
        )
        self.assertEqual(len(results), len(conflicting_objects))
        if connection.features.can_return_rows_from_bulk_insert:
            for instance in results:
                self.assertIsNotNone(instance.pk)
        self.assertEqual(TwoFields.objects.count(), 2)
        self.assertCountEqual(
            TwoFields.objects.values("f1", "f2", "name"),
            [
                {"f1": 1, "f2": 1, "name": "c"},
                {"f1": 2, "f2": 2, "name": "d"},
            ],
        )

    @skipUnlessDBFeature(
        "supports_update_conflicts", "supports_update_conflicts_with_target"
    )
    def test_update_conflicts_two_fields_unique_fields_both(self):
        with self.assertRaises((OperationalError, ProgrammingError)):
            self._test_update_conflicts_two_fields(["f1", "f2"])

    @skipUnlessDBFeature("supports_update_conflicts")
    @skipIfDBFeature("supports_update_conflicts_with_target")
    def test_update_conflicts_two_fields_no_unique_fields(self):
        self._test_update_conflicts_two_fields([])

    def _test_update_conflicts_unique_two_fields(self, unique_fields):
        Country.objects.bulk_create(self.data)
        self.assertEqual(Country.objects.count(), 4)

        new_data = [
            # Conflicting countries.
            Country(
                name="Germany",
                iso_two_letter="DE",
                description=("Germany is a country in Central Europe."),
            ),
            Country(
                name="Czech Republic",
                iso_two_letter="CZ",
                description=(
                    "The Czech Republic is a landlocked country in Central Europe."
                ),
            ),
            # New countries.
            Country(name="Australia", iso_two_letter="AU"),
            Country(
                name="Japan",
                iso_two_letter="JP",
                description=("Japan is an island country in East Asia."),
            ),
        ]
        results = Country.objects.bulk_create(
            new_data,
            update_conflicts=True,
            update_fields=["description"],
            unique_fields=unique_fields,
        )
        self.assertEqual(len(results), len(new_data))
        if connection.features.can_return_rows_from_bulk_insert:
            for instance in results:
                self.assertIsNotNone(instance.pk)
        self.assertEqual(Country.objects.count(), 6)
        self.assertCountEqual(
            Country.objects.values("iso_two_letter", "description"),
            [
                {"iso_two_letter": "US", "description": ""},
                {"iso_two_letter": "NL", "description": ""},
                {
                    "iso_two_letter": "DE",
                    "description": ("Germany is a country in Central Europe."),
                },
                {
                    "iso_two_letter": "CZ",
                    "description": (
                        "The Czech Republic is a landlocked country in Central Europe."
                    ),
                },
                {"iso_two_letter": "AU", "description": ""},
                {
                    "iso_two_letter": "JP",
                    "description": ("Japan is an island country in East Asia."),
                },
            ],
        )

    @skipUnlessDBFeature(
        "supports_update_conflicts", "supports_update_conflicts_with_target"
    )
    def test_update_conflicts_unique_two_fields_unique_fields_both(self):
        self._test_update_conflicts_unique_two_fields(["iso_two_letter", "name"])

    @skipUnlessDBFeature(
        "supports_update_conflicts", "supports_update_conflicts_with_target"
    )
    def test_update_conflicts_unique_two_fields_unique_fields_one(self):
        with self.assertRaises((OperationalError, ProgrammingError)):
            self._test_update_conflicts_unique_two_fields(["iso_two_letter"])

    @skipUnlessDBFeature("supports_update_conflicts")
    @skipIfDBFeature("supports_update_conflicts_with_target")
    def test_update_conflicts_unique_two_fields_unique_no_unique_fields(self):
        self._test_update_conflicts_unique_two_fields([])

    def _test_update_conflicts(self, unique_fields):
        UpsertConflict.objects.bulk_create(
            [
                UpsertConflict(number=1, rank=1, name="John"),
                UpsertConflict(number=2, rank=2, name="Mary"),
                UpsertConflict(number=3, rank=3, name="Hannah"),
            ]
        )
        self.assertEqual(UpsertConflict.objects.count(), 3)

        conflicting_objects = [
            UpsertConflict(number=1, rank=4, name="Steve"),
            UpsertConflict(number=2, rank=2, name="Olivia"),
            UpsertConflict(number=3, rank=1, name="Hannah"),
        ]
        results = UpsertConflict.objects.bulk_create(
            conflicting_objects,
            update_conflicts=True,
            update_fields=["name", "rank"],
            unique_fields=unique_fields,
        )
        self.assertEqual(len(results), len(conflicting_objects))
        if connection.features.can_return_rows_from_bulk_insert:
            for instance in results:
                self.assertIsNotNone(instance.pk)
        self.assertEqual(UpsertConflict.objects.count(), 3)
        self.assertCountEqual(
            UpsertConflict.objects.values("number", "rank", "name"),
            [
                {"number": 1, "rank": 4, "name": "Steve"},
                {"number": 2, "rank": 2, "name": "Olivia"},
                {"number": 3, "rank": 1, "name": "Hannah"},
            ],
        )

        results = UpsertConflict.objects.bulk_create(
            conflicting_objects + [UpsertConflict(number=4, rank=4, name="Mark")],
            update_conflicts=True,
            update_fields=["name", "rank"],
            unique_fields=unique_fields,
        )
        self.assertEqual(len(results), 4)
        if connection.features.can_return_rows_from_bulk_insert:
            for instance in results:
                self.assertIsNotNone(instance.pk)
        self.assertEqual(UpsertConflict.objects.count(), 4)
        self.assertCountEqual(
            UpsertConflict.objects.values("number", "rank", "name"),
            [
                {"number": 1, "rank": 4, "name": "Steve"},
                {"number": 2, "rank": 2, "name": "Olivia"},
                {"number": 3, "rank": 1, "name": "Hannah"},
                {"number": 4, "rank": 4, "name": "Mark"},
            ],
        )

    @skipUnlessDBFeature(
        "supports_update_conflicts", "supports_update_conflicts_with_target"
    )
    def test_update_conflicts_unique_fields(self):
        self._test_update_conflicts(unique_fields=["number"])

    @skipUnlessDBFeature("supports_update_conflicts")
    @skipIfDBFeature("supports_update_conflicts_with_target")
    def test_update_conflicts_no_unique_fields(self):
        self._test_update_conflicts([])

    @skipUnlessDBFeature(
        "supports_update_conflicts", "supports_update_conflicts_with_target"
    )
    def test_update_conflicts_unique_fields_update_fields_db_column(self):
        FieldsWithDbColumns.objects.bulk_create(
            [
                FieldsWithDbColumns(rank=1, name="a"),
                FieldsWithDbColumns(rank=2, name="b"),
            ]
        )
        self.assertEqual(FieldsWithDbColumns.objects.count(), 2)

        conflicting_objects = [
            FieldsWithDbColumns(rank=1, name="c"),
            FieldsWithDbColumns(rank=2, name="d"),
        ]
        results = FieldsWithDbColumns.objects.bulk_create(
            conflicting_objects,
            update_conflicts=True,
            unique_fields=["rank"],
            update_fields=["name"],
        )
        self.assertEqual(len(results), len(conflicting_objects))
        if connection.features.can_return_rows_from_bulk_insert:
            for instance in results:
                self.assertIsNotNone(instance.pk)
        self.assertEqual(FieldsWithDbColumns.objects.count(), 2)
        self.assertCountEqual(
            FieldsWithDbColumns.objects.values("rank", "name"),
            [
                {"rank": 1, "name": "c"},
                {"rank": 2, "name": "d"},
            ],
        )