Fixed #16281 -- Fixed ContentType.get_object_for_this_type() in a multiple database setup.

This commit is contained in:
Ben Cail 2023-11-15 14:32:03 -05:00 committed by Mariusz Felisiak
parent a47de0d6cd
commit 02a600ff67
6 changed files with 97 additions and 6 deletions

View File

@ -280,7 +280,9 @@ class GenericForeignKey(FieldCacheMixin):
if ct_id is not None:
ct = self.get_content_type(id=ct_id, using=instance._state.db)
try:
rel_obj = ct.get_object_for_this_type(pk=pk_val)
rel_obj = ct.get_object_for_this_type(
using=instance._state.db, pk=pk_val
)
except ObjectDoesNotExist:
pass
self.set_cached_value(instance, rel_obj)

View File

@ -174,20 +174,20 @@ class ContentType(models.Model):
except LookupError:
return None
def get_object_for_this_type(self, **kwargs):
def get_object_for_this_type(self, using=None, **kwargs):
"""
Return an object of this type for the keyword arguments given.
Basically, this is a proxy around this object_type's get_object() model
method. The ObjectNotExist exception, if thrown, will not be caught,
so code that calls this method should catch it.
"""
return self.model_class()._base_manager.using(self._state.db).get(**kwargs)
return self.model_class()._base_manager.using(using).get(**kwargs)
def get_all_objects_for_this_type(self, **kwargs):
"""
Return all objects of this type for the keyword arguments given.
"""
return self.model_class()._base_manager.using(self._state.db).filter(**kwargs)
return self.model_class()._base_manager.filter(**kwargs)
def natural_key(self):
return (self.app_label, self.model)

View File

@ -106,13 +106,18 @@ methods that allow you to get from a
:class:`~django.contrib.contenttypes.models.ContentType` instance to the
model it represents, or to retrieve objects from that model:
.. method:: ContentType.get_object_for_this_type(**kwargs)
.. method:: ContentType.get_object_for_this_type(using=None, **kwargs)
Takes a set of valid :ref:`lookup arguments <field-lookups-intro>` for the
model the :class:`~django.contrib.contenttypes.models.ContentType`
represents, and does
:meth:`a get() lookup <django.db.models.query.QuerySet.get>`
on that model, returning the corresponding object.
on that model, returning the corresponding object. The ``using`` argument
can be used to specify a different database than the default one.
.. versionchanged:: 5.1
The ``using`` argument was added.
.. method:: ContentType.model_class()

View File

@ -83,6 +83,9 @@ class Book(models.Model):
def __str__(self):
return self.name
def get_absolute_url(self):
return f"/books/{self.id}/"
class Promo(models.Model):
name = models.CharField(max_length=100, verbose_name="¿Name?")

View File

@ -2,6 +2,8 @@ from unittest import mock
from django.contrib import admin
from django.contrib.auth.models import User
from django.contrib.contenttypes.models import ContentType
from django.http import HttpResponse
from django.test import TestCase, override_settings
from django.urls import path, reverse
@ -23,8 +25,15 @@ class Router:
site = admin.AdminSite(name="test_adminsite")
site.register(Book)
def book(request, book_id):
b = Book.objects.get(id=book_id)
return HttpResponse(b.title)
urlpatterns = [
path("admin/", site.urls),
path("books/<book_id>/", book),
]
@ -88,3 +97,47 @@ class MultiDatabaseTests(TestCase):
{"post": "yes"},
)
mock.atomic.assert_called_with(using=db)
class ViewOnSiteRouter:
def db_for_read(self, model, instance=None, **hints):
if model._meta.app_label in {"auth", "sessions", "contenttypes"}:
return "default"
return "other"
def db_for_write(self, model, **hints):
if model._meta.app_label in {"auth", "sessions", "contenttypes"}:
return "default"
return "other"
def allow_relation(self, obj1, obj2, **hints):
return obj1._state.db in {"default", "other"} and obj2._state.db in {
"default",
"other",
}
def allow_migrate(self, db, app_label, **hints):
return True
@override_settings(ROOT_URLCONF=__name__, DATABASE_ROUTERS=[ViewOnSiteRouter()])
class ViewOnSiteTests(TestCase):
databases = {"default", "other"}
def test_contenttype_in_separate_db(self):
ContentType.objects.using("other").all().delete()
book = Book.objects.using("other").create(name="other book")
user = User.objects.create_superuser(
username="super", password="secret", email="super@example.com"
)
book_type = ContentType.objects.get(app_label="admin_views", model="book")
self.client.force_login(user)
shortcut_url = reverse("admin:view_on_site", args=(book_type.pk, book.id))
response = self.client.get(shortcut_url, follow=False)
self.assertEqual(response.status_code, 302)
self.assertRegex(
response.url, f"http://(testserver|example.com)/books/{book.id}/"
)

View File

@ -1302,6 +1302,34 @@ class QueryTestCase(TestCase):
title="Dive into Water", published=datetime.date(2009, 5, 4), extra_arg=True
)
@override_settings(DATABASE_ROUTERS=["multiple_database.tests.TestRouter"])
def test_contenttype_in_separate_db(self):
ContentType.objects.using("other").all().delete()
book_other = Book.objects.using("other").create(
title="Test title other", published=datetime.date(2009, 5, 4)
)
book_default = Book.objects.using("default").create(
title="Test title default", published=datetime.date(2009, 5, 4)
)
book_type = ContentType.objects.using("default").get(
app_label="multiple_database", model="book"
)
book = book_type.get_object_for_this_type(title=book_other.title)
self.assertEqual(book, book_other)
book = book_type.get_object_for_this_type(using="other", title=book_other.title)
self.assertEqual(book, book_other)
with self.assertRaises(Book.DoesNotExist):
book_type.get_object_for_this_type(title=book_default.title)
book = book_type.get_object_for_this_type(
using="default", title=book_default.title
)
self.assertEqual(book, book_default)
all_books = book_type.get_all_objects_for_this_type()
self.assertCountEqual(all_books, [book_other])
class ConnectionRouterTestCase(SimpleTestCase):
@override_settings(