diff --git a/django/contrib/auth/apps.py b/django/contrib/auth/apps.py index ad6f816809..eeba5b7e13 100644 --- a/django/contrib/auth/apps.py +++ b/django/contrib/auth/apps.py @@ -1,12 +1,12 @@ from django.apps import AppConfig from django.core import checks from django.db.models.query_utils import DeferredAttribute -from django.db.models.signals import post_migrate +from django.db.models.signals import post_migrate, pre_migrate from django.utils.translation import gettext_lazy as _ from . import get_user_model from .checks import check_middleware, check_models_permissions, check_user_model -from .management import create_permissions +from .management import create_permissions, rename_permissions from .signals import user_logged_in @@ -20,6 +20,11 @@ class AuthConfig(AppConfig): create_permissions, dispatch_uid="django.contrib.auth.management.create_permissions", ) + pre_migrate.connect( + rename_permissions, + dispatch_uid="django.contrib.auth.management.rename_permissions", + ) + last_login_field = getattr(get_user_model(), "last_login", None) # Register the handler only if UserModel.last_login is a field. if isinstance(last_login_field, DeferredAttribute): diff --git a/django/contrib/auth/management/__init__.py b/django/contrib/auth/management/__init__.py index e816357b2b..40ada52e29 100644 --- a/django/contrib/auth/management/__init__.py +++ b/django/contrib/auth/management/__init__.py @@ -9,7 +9,9 @@ from django.apps import apps as global_apps from django.contrib.auth import get_permission_codename from django.contrib.contenttypes.management import create_contenttypes from django.core import exceptions -from django.db import DEFAULT_DB_ALIAS, router +from django.db import DEFAULT_DB_ALIAS, migrations, router, transaction +from django.db.utils import IntegrityError +from django.utils.text import camel_case_to_spaces def _get_all_permissions(opts): @@ -108,6 +110,84 @@ def create_permissions( print("Adding permission '%s'" % perm) +class RenamePermission(migrations.RunPython): + def __init__(self, app_label, old_model, new_model): + self.app_label = app_label + self.old_model = old_model + self.new_model = new_model + super(RenamePermission, self).__init__( + self.rename_forward, self.rename_backward + ) + + def _rename(self, apps, schema_editor, old_model, new_model): + ContentType = apps.get_model("contenttypes", "ContentType") + # Use the live Permission model instead of the frozen one, since frozen + # models do not retain foreign key constraints. + from django.contrib.auth.models import Permission + + db = schema_editor.connection.alias + ctypes = ContentType.objects.filter( + app_label=self.app_label, model__icontains=old_model.lower() + ) + for permission in Permission.objects.filter( + content_type_id__in=ctypes.values("id") + ): + prefix = permission.codename.split("_")[0] + default_verbose_name = camel_case_to_spaces(new_model) + + new_codename = f"{prefix}_{new_model.lower()}" + new_name = f"Can {prefix} {default_verbose_name}" + + if permission.codename != new_codename or permission.name != new_name: + permission.codename = new_codename + permission.name = new_name + try: + with transaction.atomic(using=db): + permission.save(update_fields={"name", "codename"}) + except IntegrityError: + pass + + def rename_forward(self, apps, schema_editor): + self._rename(apps, schema_editor, self.old_model, self.new_model) + + def rename_backward(self, apps, schema_editor): + self._rename(apps, schema_editor, self.new_model, self.old_model) + + +def rename_permissions( + plan, + verbosity=2, + interactive=True, + using=DEFAULT_DB_ALIAS, + apps=global_apps, + **kwargs, +): + """ + Insert a `RenamePermissionType` operation after every planned `RenameModel` + operation. + """ + try: + Permission = apps.get_model("auth", "Permission") + except LookupError: + return + else: + if not router.allow_migrate_model(using, Permission): + return + + for migration, backward in plan: + inserts = [] + for index, operation in enumerate(migration.operations): + if isinstance(operation, migrations.RenameModel): + operation = RenamePermission( + migration.app_label, + operation.old_name, + operation.new_name, + ) + inserts.append((index + 1, operation)) + for inserted, (index, operation) in enumerate(inserts): + migration.operations.insert(inserted + index, operation) + + def get_system_username(): """ Return the current system user's username, or an empty string if the diff --git a/tests/auth_tests/operations_migrations/0001_initial.py b/tests/auth_tests/operations_migrations/0001_initial.py new file mode 100644 index 0000000000..49a475653c --- /dev/null +++ b/tests/auth_tests/operations_migrations/0001_initial.py @@ -0,0 +1,14 @@ +from django.db import migrations, models + + +class Migration(migrations.Migration): + initial = True + + operations = [ + migrations.CreateModel( + name="OldModel", + fields=[ + ("id", models.AutoField(primary_key=True)), + ], + ), + ] diff --git a/tests/auth_tests/operations_migrations/0002_rename_oldmodel_to_newmodel.py b/tests/auth_tests/operations_migrations/0002_rename_oldmodel_to_newmodel.py new file mode 100644 index 0000000000..a23761e8fd --- /dev/null +++ b/tests/auth_tests/operations_migrations/0002_rename_oldmodel_to_newmodel.py @@ -0,0 +1,14 @@ +from django.db import migrations + + +class Migration(migrations.Migration): + dependencies = [ + ("auth_tests", "0001_initial"), + ] + + operations = [ + migrations.RenameModel( + old_name="OldModel", + new_name="NewModel", + ), + ] diff --git a/tests/auth_tests/operations_migrations/__init__.py b/tests/auth_tests/operations_migrations/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/auth_tests/test_management.py b/tests/auth_tests/test_management.py index 0701ac2d68..7e8d01cbce 100644 --- a/tests/auth_tests/test_management.py +++ b/tests/auth_tests/test_management.py @@ -7,15 +7,21 @@ from io import StringIO from unittest import mock from django.apps import apps +from django.conf import settings from django.contrib.auth import get_permission_codename, management -from django.contrib.auth.management import create_permissions, get_default_username +from django.contrib.auth.management import ( + RenamePermission, + create_permissions, + get_default_username, +) from django.contrib.auth.management.commands import changepassword, createsuperuser from django.contrib.auth.models import Group, Permission, User from django.contrib.contenttypes.models import ContentType from django.core.management import call_command from django.core.management.base import CommandError -from django.db import migrations +from django.db import migrations, models from django.test import TestCase, override_settings +from django.test.testcases import TransactionTestCase from django.utils.translation import gettext_lazy as _ from .models import ( @@ -1528,6 +1534,174 @@ class CreatePermissionsTests(TestCase): ) +@override_settings( + MIGRATION_MODULES=dict( + settings.MIGRATION_MODULES, + auth_tests="auth_tests.operations_migrations", + ), +) +class PermissionRenameOperationsTests(TransactionTestCase): + available_apps = [ + "django.contrib.contenttypes", + "django.contrib.auth", + "auth_tests", + ] + + def setUp(self): + app_config = apps.get_app_config("auth_tests") + models.signals.post_migrate.connect( + self.assertOperationsInjected, sender=app_config + ) + self.addCleanup( + models.signals.post_migrate.disconnect, + self.assertOperationsInjected, + sender=app_config, + ) + + def assertOperationsInjected(self, plan, **kwargs): + for migration, _backward in plan: + operations = iter(migration.operations) + for operation in operations: + if isinstance(operation, migrations.RenameModel): + next_operation = next(operations) + self.assertIsInstance(next_operation, RenamePermission) + self.assertEqual(next_operation.app_label, migration.app_label) + self.assertEqual(next_operation.old_model, operation.old_name) + self.assertEqual(next_operation.new_model, operation.new_name) + + def test_permission_rename(self): + ct = ContentType.objects.create(app_label="auth_tests", model="oldmodel") + actions = ["add", "change", "delete", "view"] + for action in actions: + Permission.objects.create( + codename=f"{action}_oldmodel", + name=f"Can {action} old model", + content_type=ct, + ) + + call_command("migrate", "auth_tests", verbosity=0) + for action in actions: + self.assertFalse( + Permission.objects.filter(codename=f"{action}_oldmodel").exists() + ) + self.assertTrue( + Permission.objects.filter(codename=f"{action}_newmodel").exists() + ) + + call_command( + "migrate", + "auth_tests", + "zero", + database="default", + interactive=False, + verbosity=0, + ) + + for action in actions: + self.assertTrue( + Permission.objects.filter(codename=f"{action}_oldmodel").exists() + ) + self.assertFalse( + Permission.objects.filter(codename=f"{action}_newmodel").exists() + ) + + @mock.patch( + "django.db.router.allow_migrate_model", + return_value=False, + ) + def test_rename_skipped_if_router_disallows(self, _): + ct = ContentType.objects.create(app_label="auth_tests", model="oldmodel") + Permission.objects.create( + codename="change_oldmodel", + name="Can change old model", + content_type=ct, + ) + # The rename operation should not be there when disallowed by router. + app_config = apps.get_app_config("auth_tests") + models.signals.post_migrate.disconnect( + self.assertOperationsInjected, sender=app_config + ) + + call_command( + "migrate", + "auth_tests", + database="default", + interactive=False, + verbosity=0, + ) + self.assertTrue(Permission.objects.filter(codename="change_oldmodel").exists()) + self.assertFalse(Permission.objects.filter(codename="change_newmodel").exists()) + + call_command( + "migrate", + "auth_tests", + "zero", + database="default", + interactive=False, + verbosity=0, + ) + + def test_rename_backward_does_nothing_if_no_permissions(self): + Permission.objects.filter(content_type__app_label="auth_tests").delete() + + call_command( + "migrate", + "auth_tests", + "zero", + database="default", + interactive=False, + verbosity=0, + ) + self.assertFalse( + Permission.objects.filter( + codename__in=["change_oldmodel", "change_newmodel"] + ).exists() + ) + + def test_rename_permission_conflict(self): + ct = ContentType.objects.create(app_label="auth_tests", model="oldmodel") + Permission.objects.create( + codename="change_newmodel", + name="Can change new model", + content_type=ct, + ) + Permission.objects.create( + codename="change_oldmodel", + name="Can change old model", + content_type=ct, + ) + + call_command( + "migrate", + "auth_tests", + database="default", + interactive=False, + verbosity=0, + ) + self.assertTrue( + Permission.objects.filter( + codename="change_oldmodel", + name="Can change old model", + ).exists() + ) + self.assertEqual( + Permission.objects.filter( + codename="change_newmodel", + name="Can change new model", + ).count(), + 1, + ) + + call_command( + "migrate", + "auth_tests", + "zero", + database="default", + interactive=False, + verbosity=0, + ) + + class DefaultDBRouter: """Route all writes to default."""