1
0
mirror of https://github.com/django/django.git synced 2025-09-10 11:09:12 +00:00

Fixed #27489 -- Renamed permissions upon model renaming in migrations.

This commit is contained in:
Artyom Kotovskiy 2025-04-20 18:17:47 -04:00 committed by Sarah Boyce
parent 4187da258f
commit f02b49d2f3
6 changed files with 292 additions and 5 deletions

View File

@ -1,12 +1,12 @@
from django.apps import AppConfig from django.apps import AppConfig
from django.core import checks from django.core import checks
from django.db.models.query_utils import DeferredAttribute from django.db.models.query_utils import DeferredAttribute
from django.db.models.signals import post_migrate from django.db.models.signals import post_migrate, pre_migrate
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from . import get_user_model from . import get_user_model
from .checks import check_middleware, check_models_permissions, check_user_model from .checks import check_middleware, check_models_permissions, check_user_model
from .management import create_permissions from .management import create_permissions, rename_permissions
from .signals import user_logged_in from .signals import user_logged_in
@ -20,6 +20,11 @@ class AuthConfig(AppConfig):
create_permissions, create_permissions,
dispatch_uid="django.contrib.auth.management.create_permissions", dispatch_uid="django.contrib.auth.management.create_permissions",
) )
pre_migrate.connect(
rename_permissions,
dispatch_uid="django.contrib.auth.management.rename_permissions",
)
last_login_field = getattr(get_user_model(), "last_login", None) last_login_field = getattr(get_user_model(), "last_login", None)
# Register the handler only if UserModel.last_login is a field. # Register the handler only if UserModel.last_login is a field.
if isinstance(last_login_field, DeferredAttribute): if isinstance(last_login_field, DeferredAttribute):

View File

@ -9,7 +9,9 @@ from django.apps import apps as global_apps
from django.contrib.auth import get_permission_codename from django.contrib.auth import get_permission_codename
from django.contrib.contenttypes.management import create_contenttypes from django.contrib.contenttypes.management import create_contenttypes
from django.core import exceptions from django.core import exceptions
from django.db import DEFAULT_DB_ALIAS, router from django.db import DEFAULT_DB_ALIAS, migrations, router, transaction
from django.db.utils import IntegrityError
from django.utils.text import camel_case_to_spaces
def _get_all_permissions(opts): def _get_all_permissions(opts):
@ -108,6 +110,84 @@ def create_permissions(
print("Adding permission '%s'" % perm) print("Adding permission '%s'" % perm)
class RenamePermission(migrations.RunPython):
def __init__(self, app_label, old_model, new_model):
self.app_label = app_label
self.old_model = old_model
self.new_model = new_model
super(RenamePermission, self).__init__(
self.rename_forward, self.rename_backward
)
def _rename(self, apps, schema_editor, old_model, new_model):
ContentType = apps.get_model("contenttypes", "ContentType")
# Use the live Permission model instead of the frozen one, since frozen
# models do not retain foreign key constraints.
from django.contrib.auth.models import Permission
db = schema_editor.connection.alias
ctypes = ContentType.objects.filter(
app_label=self.app_label, model__icontains=old_model.lower()
)
for permission in Permission.objects.filter(
content_type_id__in=ctypes.values("id")
):
prefix = permission.codename.split("_")[0]
default_verbose_name = camel_case_to_spaces(new_model)
new_codename = f"{prefix}_{new_model.lower()}"
new_name = f"Can {prefix} {default_verbose_name}"
if permission.codename != new_codename or permission.name != new_name:
permission.codename = new_codename
permission.name = new_name
try:
with transaction.atomic(using=db):
permission.save(update_fields={"name", "codename"})
except IntegrityError:
pass
def rename_forward(self, apps, schema_editor):
self._rename(apps, schema_editor, self.old_model, self.new_model)
def rename_backward(self, apps, schema_editor):
self._rename(apps, schema_editor, self.new_model, self.old_model)
def rename_permissions(
plan,
verbosity=2,
interactive=True,
using=DEFAULT_DB_ALIAS,
apps=global_apps,
**kwargs,
):
"""
Insert a `RenamePermissionType` operation after every planned `RenameModel`
operation.
"""
try:
Permission = apps.get_model("auth", "Permission")
except LookupError:
return
else:
if not router.allow_migrate_model(using, Permission):
return
for migration, backward in plan:
inserts = []
for index, operation in enumerate(migration.operations):
if isinstance(operation, migrations.RenameModel):
operation = RenamePermission(
migration.app_label,
operation.old_name,
operation.new_name,
)
inserts.append((index + 1, operation))
for inserted, (index, operation) in enumerate(inserts):
migration.operations.insert(inserted + index, operation)
def get_system_username(): def get_system_username():
""" """
Return the current system user's username, or an empty string if the Return the current system user's username, or an empty string if the

View File

@ -0,0 +1,14 @@
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
operations = [
migrations.CreateModel(
name="OldModel",
fields=[
("id", models.AutoField(primary_key=True)),
],
),
]

View File

@ -0,0 +1,14 @@
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("auth_tests", "0001_initial"),
]
operations = [
migrations.RenameModel(
old_name="OldModel",
new_name="NewModel",
),
]

View File

@ -7,15 +7,21 @@ from io import StringIO
from unittest import mock from unittest import mock
from django.apps import apps from django.apps import apps
from django.conf import settings
from django.contrib.auth import get_permission_codename, management from django.contrib.auth import get_permission_codename, management
from django.contrib.auth.management import create_permissions, get_default_username from django.contrib.auth.management import (
RenamePermission,
create_permissions,
get_default_username,
)
from django.contrib.auth.management.commands import changepassword, createsuperuser from django.contrib.auth.management.commands import changepassword, createsuperuser
from django.contrib.auth.models import Group, Permission, User from django.contrib.auth.models import Group, Permission, User
from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.models import ContentType
from django.core.management import call_command from django.core.management import call_command
from django.core.management.base import CommandError from django.core.management.base import CommandError
from django.db import migrations from django.db import migrations, models
from django.test import TestCase, override_settings from django.test import TestCase, override_settings
from django.test.testcases import TransactionTestCase
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from .models import ( from .models import (
@ -1528,6 +1534,174 @@ class CreatePermissionsTests(TestCase):
) )
@override_settings(
MIGRATION_MODULES=dict(
settings.MIGRATION_MODULES,
auth_tests="auth_tests.operations_migrations",
),
)
class PermissionRenameOperationsTests(TransactionTestCase):
available_apps = [
"django.contrib.contenttypes",
"django.contrib.auth",
"auth_tests",
]
def setUp(self):
app_config = apps.get_app_config("auth_tests")
models.signals.post_migrate.connect(
self.assertOperationsInjected, sender=app_config
)
self.addCleanup(
models.signals.post_migrate.disconnect,
self.assertOperationsInjected,
sender=app_config,
)
def assertOperationsInjected(self, plan, **kwargs):
for migration, _backward in plan:
operations = iter(migration.operations)
for operation in operations:
if isinstance(operation, migrations.RenameModel):
next_operation = next(operations)
self.assertIsInstance(next_operation, RenamePermission)
self.assertEqual(next_operation.app_label, migration.app_label)
self.assertEqual(next_operation.old_model, operation.old_name)
self.assertEqual(next_operation.new_model, operation.new_name)
def test_permission_rename(self):
ct = ContentType.objects.create(app_label="auth_tests", model="oldmodel")
actions = ["add", "change", "delete", "view"]
for action in actions:
Permission.objects.create(
codename=f"{action}_oldmodel",
name=f"Can {action} old model",
content_type=ct,
)
call_command("migrate", "auth_tests", verbosity=0)
for action in actions:
self.assertFalse(
Permission.objects.filter(codename=f"{action}_oldmodel").exists()
)
self.assertTrue(
Permission.objects.filter(codename=f"{action}_newmodel").exists()
)
call_command(
"migrate",
"auth_tests",
"zero",
database="default",
interactive=False,
verbosity=0,
)
for action in actions:
self.assertTrue(
Permission.objects.filter(codename=f"{action}_oldmodel").exists()
)
self.assertFalse(
Permission.objects.filter(codename=f"{action}_newmodel").exists()
)
@mock.patch(
"django.db.router.allow_migrate_model",
return_value=False,
)
def test_rename_skipped_if_router_disallows(self, _):
ct = ContentType.objects.create(app_label="auth_tests", model="oldmodel")
Permission.objects.create(
codename="change_oldmodel",
name="Can change old model",
content_type=ct,
)
# The rename operation should not be there when disallowed by router.
app_config = apps.get_app_config("auth_tests")
models.signals.post_migrate.disconnect(
self.assertOperationsInjected, sender=app_config
)
call_command(
"migrate",
"auth_tests",
database="default",
interactive=False,
verbosity=0,
)
self.assertTrue(Permission.objects.filter(codename="change_oldmodel").exists())
self.assertFalse(Permission.objects.filter(codename="change_newmodel").exists())
call_command(
"migrate",
"auth_tests",
"zero",
database="default",
interactive=False,
verbosity=0,
)
def test_rename_backward_does_nothing_if_no_permissions(self):
Permission.objects.filter(content_type__app_label="auth_tests").delete()
call_command(
"migrate",
"auth_tests",
"zero",
database="default",
interactive=False,
verbosity=0,
)
self.assertFalse(
Permission.objects.filter(
codename__in=["change_oldmodel", "change_newmodel"]
).exists()
)
def test_rename_permission_conflict(self):
ct = ContentType.objects.create(app_label="auth_tests", model="oldmodel")
Permission.objects.create(
codename="change_newmodel",
name="Can change new model",
content_type=ct,
)
Permission.objects.create(
codename="change_oldmodel",
name="Can change old model",
content_type=ct,
)
call_command(
"migrate",
"auth_tests",
database="default",
interactive=False,
verbosity=0,
)
self.assertTrue(
Permission.objects.filter(
codename="change_oldmodel",
name="Can change old model",
).exists()
)
self.assertEqual(
Permission.objects.filter(
codename="change_newmodel",
name="Can change new model",
).count(),
1,
)
call_command(
"migrate",
"auth_tests",
"zero",
database="default",
interactive=False,
verbosity=0,
)
class DefaultDBRouter: class DefaultDBRouter:
"""Route all writes to default.""" """Route all writes to default."""