1
0
mirror of https://github.com/django/django.git synced 2025-06-08 21:19:13 +00:00

Fixed #36060 -- Prevented IntegrityError in bulk_create() with order_with_respect_to.

This commit is contained in:
myoungjinGo-BE 2025-05-16 23:56:20 +09:00 committed by Sarah Boyce
parent 90429625a8
commit 953095d1e6
3 changed files with 182 additions and 2 deletions

View File

@ -55,11 +55,15 @@ class GenericForeignKey(FieldCacheMixin, Field):
attname, column = super().get_attname_column()
return attname, None
@cached_property
def ct_field_attname(self):
return self.model._meta.get_field(self.ct_field).attname
def get_filter_kwargs_for_object(self, obj):
"""See corresponding method on Field"""
return {
self.fk_field: getattr(obj, self.fk_field),
self.ct_field: getattr(obj, self.ct_field),
self.ct_field_attname: getattr(obj, self.ct_field_attname),
}
def get_forward_related_filter(self, obj):

View File

@ -5,6 +5,7 @@ The main QuerySet implementation. This provides the public API for the ORM.
import copy
import operator
import warnings
from functools import reduce
from itertools import chain, islice
from asgiref.sync import sync_to_async
@ -20,7 +21,7 @@ from django.db import (
router,
transaction,
)
from django.db.models import AutoField, DateField, DateTimeField, Field, sql
from django.db.models import AutoField, DateField, DateTimeField, Field, Max, sql
from django.db.models.constants import LOOKUP_SEP, OnConflict
from django.db.models.deletion import Collector
from django.db.models.expressions import Case, DatabaseDefault, F, Value, When
@ -800,6 +801,7 @@ class QuerySet(AltersData):
objs = list(objs)
objs_with_pk, objs_without_pk = self._prepare_for_bulk_create(objs)
with transaction.atomic(using=self.db, savepoint=False):
self._handle_order_with_respect_to(objs)
if objs_with_pk:
returned_columns = self._batched_insert(
objs_with_pk,
@ -840,6 +842,37 @@ class QuerySet(AltersData):
return objs
def _handle_order_with_respect_to(self, objs):
if objs and (order_wrt := self.model._meta.order_with_respect_to):
get_filter_kwargs_for_object = order_wrt.get_filter_kwargs_for_object
attnames = list(get_filter_kwargs_for_object(objs[0]))
group_keys = set()
obj_groups = []
for obj in objs:
group_key = tuple(get_filter_kwargs_for_object(obj).values())
group_keys.add(group_key)
obj_groups.append((obj, group_key))
filters = [
Q.create(list(zip(attnames, group_key))) for group_key in group_keys
]
next_orders = (
self.model._base_manager.using(self.db)
.filter(reduce(operator.or_, filters))
.values_list(*attnames)
.annotate(_order__max=Max("_order") + 1)
)
# Create mapping of group values to max order.
group_next_orders = dict.fromkeys(group_keys, 0)
group_next_orders.update(
(tuple(group_key), next_order) for *group_key, next_order in next_orders
)
# Assign _order values to new objects.
for obj, group_key in obj_groups:
if getattr(obj, "_order", None) is None:
group_next_order = group_next_orders[group_key]
obj._order = group_next_order
group_next_orders[group_key] += 1
bulk_create.alters_data = True
async def abulk_create(

View File

@ -126,3 +126,146 @@ class BaseOrderWithRespectToTests:
),
):
self.q1.set_answer_order([3, 1, 2, 4])
def test_bulk_create_with_empty_parent(self):
"""
bulk_create() should properly set _order when parent has no existing children.
"""
question = self.Question.objects.create(text="Test Question")
answers = [self.Answer(question=question, text=f"Answer {i}") for i in range(3)]
answer0, answer1, answer2 = self.Answer.objects.bulk_create(answers)
self.assertEqual(answer0._order, 0)
self.assertEqual(answer1._order, 1)
self.assertEqual(answer2._order, 2)
def test_bulk_create_with_existing_children(self):
"""
bulk_create() should continue _order sequence from existing children.
"""
question = self.Question.objects.create(text="Test Question")
self.Answer.objects.create(question=question, text="Existing 0")
self.Answer.objects.create(question=question, text="Existing 1")
new_answers = [
self.Answer(question=question, text=f"New Answer {i}") for i in range(2)
]
answer2, answer3 = self.Answer.objects.bulk_create(new_answers)
self.assertEqual(answer2._order, 2)
self.assertEqual(answer3._order, 3)
def test_bulk_create_multiple_parents(self):
"""
bulk_create() should maintain separate _order sequences for different parents.
"""
question0 = self.Question.objects.create(text="Question 0")
question1 = self.Question.objects.create(text="Question 1")
answers = [
self.Answer(question=question0, text="Q0 Answer 0"),
self.Answer(question=question1, text="Q1 Answer 0"),
self.Answer(question=question0, text="Q0 Answer 1"),
self.Answer(question=question1, text="Q1 Answer 1"),
]
created_answers = self.Answer.objects.bulk_create(answers)
answer_q0_0, answer_q1_0, answer_q0_1, answer_q1_1 = created_answers
self.assertEqual(answer_q0_0._order, 0)
self.assertEqual(answer_q0_1._order, 1)
self.assertEqual(answer_q1_0._order, 0)
self.assertEqual(answer_q1_1._order, 1)
def test_bulk_create_mixed_scenario(self):
"""
The _order field should be correctly set for new Answer objects based
on the count of existing Answers for each related Question.
"""
question0 = self.Question.objects.create(text="Question 0")
question1 = self.Question.objects.create(text="Question 1")
self.Answer.objects.create(question=question1, text="Q1 Existing 0")
self.Answer.objects.create(question=question1, text="Q1 Existing 1")
new_answers = [
self.Answer(question=question0, text="Q0 New 0"),
self.Answer(question=question1, text="Q1 New 0"),
self.Answer(question=question0, text="Q0 New 1"),
]
created_answers = self.Answer.objects.bulk_create(new_answers)
answer_q0_0, answer_q1_2, answer_q0_1 = created_answers
self.assertEqual(answer_q0_0._order, 0)
self.assertEqual(answer_q0_1._order, 1)
self.assertEqual(answer_q1_2._order, 2)
def test_bulk_create_respects_mixed_manual_order(self):
"""
bulk_create() should assign _order automatically only for instances
where it is not manually set. Mixed objects with and without _order
should result in expected final order values.
"""
question_a = self.Question.objects.create(text="Question A")
question_b = self.Question.objects.create(text="Question B")
# Existing answers to push initial _order forward.
self.Answer.objects.create(question=question_a, text="Q-A Existing 0")
self.Answer.objects.create(question=question_b, text="Q-B Existing 0")
self.Answer.objects.create(question=question_b, text="Q-B Existing 1")
answers = [
self.Answer(question=question_a, text="Q-A Manual 4", _order=4),
self.Answer(question=question_b, text="Q-B Auto 2"),
self.Answer(question=question_a, text="Q-A Auto"),
self.Answer(question=question_b, text="Q-B Manual 10", _order=10),
self.Answer(question=question_a, text="Q-A Manual 7", _order=7),
self.Answer(question=question_b, text="Q-B Auto 3"),
]
created_answers = self.Answer.objects.bulk_create(answers)
(
qa_manual_4,
qb_auto_2,
qa_auto,
qb_manual_10,
qa_manual_7,
qb_auto_3,
) = created_answers
# Manual values should stay untouched.
self.assertEqual(qa_manual_4._order, 4)
self.assertEqual(qb_manual_10._order, 10)
self.assertEqual(qa_manual_7._order, 7)
# Existing max was 0 → auto should get _order=1.
self.assertEqual(qa_auto._order, 1)
# Existing max was 1 → next auto gets 2, then 3 (manual 10 is skipped).
self.assertEqual(qb_auto_2._order, 2)
self.assertEqual(qb_auto_3._order, 3)
def test_bulk_create_allows_duplicate_order_values(self):
"""
bulk_create() should allow duplicate _order values if the model
does not enforce uniqueness on the _order field.
"""
question = self.Question.objects.create(text="Duplicated Test")
# Existing answer to set initial _order=0.
self.Answer.objects.create(question=question, text="Existing Answer")
# Two manually set _order=1 and one auto (which may also be assigned 1).
answers = [
self.Answer(question=question, text="Manual Order 1", _order=1),
self.Answer(question=question, text="Auto Order 1"),
self.Answer(question=question, text="Auto Order 2"),
self.Answer(question=question, text="Manual Order 1 Duplicate", _order=1),
]
created_answers = self.Answer.objects.bulk_create(answers)
manual_1, auto_1, auto_2, manual_2 = created_answers
# Manual values are as assigned, even if duplicated.
self.assertEqual(manual_1._order, 1)
self.assertEqual(manual_2._order, 1)
# Auto-assigned orders may also use 1 or any value, depending on implementation.
# If no collision logic, they may overlap with manual values.
self.assertEqual(auto_1._order, 1)
self.assertEqual(auto_2._order, 2)