mirror of
				https://github.com/django/django.git
				synced 2025-10-30 17:16:10 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			999 lines
		
	
	
		
			38 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			999 lines
		
	
	
		
			38 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from unittest import mock
 | |
| 
 | |
| from django.apps.registry import apps as global_apps
 | |
| from django.db import DatabaseError, connection, migrations, models
 | |
| from django.db.migrations.exceptions import InvalidMigrationPlan
 | |
| from django.db.migrations.executor import MigrationExecutor
 | |
| from django.db.migrations.graph import MigrationGraph
 | |
| from django.db.migrations.recorder import MigrationRecorder
 | |
| from django.db.migrations.state import ProjectState
 | |
| from django.test import (
 | |
|     SimpleTestCase,
 | |
|     modify_settings,
 | |
|     override_settings,
 | |
|     skipUnlessDBFeature,
 | |
| )
 | |
| from django.test.utils import isolate_lru_cache
 | |
| 
 | |
| from .test_base import MigrationTestBase
 | |
| 
 | |
| 
 | |
| @modify_settings(INSTALLED_APPS={"append": "migrations2"})
 | |
| class ExecutorTests(MigrationTestBase):
 | |
|     """
 | |
|     Tests the migration executor (full end-to-end running).
 | |
| 
 | |
|     Bear in mind that if these are failing you should fix the other
 | |
|     test failures first, as they may be propagating into here.
 | |
|     """
 | |
| 
 | |
|     available_apps = [
 | |
|         "migrations",
 | |
|         "migrations2",
 | |
|         "django.contrib.auth",
 | |
|         "django.contrib.contenttypes",
 | |
|     ]
 | |
| 
 | |
|     @override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations"})
 | |
|     def test_run(self):
 | |
|         """
 | |
|         Tests running a simple set of migrations.
 | |
|         """
 | |
|         executor = MigrationExecutor(connection)
 | |
|         # Let's look at the plan first and make sure it's up to scratch
 | |
|         plan = executor.migration_plan([("migrations", "0002_second")])
 | |
|         self.assertEqual(
 | |
|             plan,
 | |
|             [
 | |
|                 (executor.loader.graph.nodes["migrations", "0001_initial"], False),
 | |
|                 (executor.loader.graph.nodes["migrations", "0002_second"], False),
 | |
|             ],
 | |
|         )
 | |
|         # Were the tables there before?
 | |
|         self.assertTableNotExists("migrations_author")
 | |
|         self.assertTableNotExists("migrations_book")
 | |
|         # Alright, let's try running it
 | |
|         executor.migrate([("migrations", "0002_second")])
 | |
|         # Are the tables there now?
 | |
|         self.assertTableExists("migrations_author")
 | |
|         self.assertTableExists("migrations_book")
 | |
|         # Rebuild the graph to reflect the new DB state
 | |
|         executor.loader.build_graph()
 | |
|         # Alright, let's undo what we did
 | |
|         plan = executor.migration_plan([("migrations", None)])
 | |
|         self.assertEqual(
 | |
|             plan,
 | |
|             [
 | |
|                 (executor.loader.graph.nodes["migrations", "0002_second"], True),
 | |
|                 (executor.loader.graph.nodes["migrations", "0001_initial"], True),
 | |
|             ],
 | |
|         )
 | |
|         executor.migrate([("migrations", None)])
 | |
|         # Are the tables gone?
 | |
|         self.assertTableNotExists("migrations_author")
 | |
|         self.assertTableNotExists("migrations_book")
 | |
| 
 | |
|     @override_settings(
 | |
|         MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed"}
 | |
|     )
 | |
|     def test_run_with_squashed(self):
 | |
|         """
 | |
|         Tests running a squashed migration from zero (should ignore what it replaces)
 | |
|         """
 | |
|         executor = MigrationExecutor(connection)
 | |
|         # Check our leaf node is the squashed one
 | |
|         leaves = [
 | |
|             key for key in executor.loader.graph.leaf_nodes() if key[0] == "migrations"
 | |
|         ]
 | |
|         self.assertEqual(leaves, [("migrations", "0001_squashed_0002")])
 | |
|         # Check the plan
 | |
|         plan = executor.migration_plan([("migrations", "0001_squashed_0002")])
 | |
|         self.assertEqual(
 | |
|             plan,
 | |
|             [
 | |
|                 (
 | |
|                     executor.loader.graph.nodes["migrations", "0001_squashed_0002"],
 | |
|                     False,
 | |
|                 ),
 | |
|             ],
 | |
|         )
 | |
|         # Were the tables there before?
 | |
|         self.assertTableNotExists("migrations_author")
 | |
|         self.assertTableNotExists("migrations_book")
 | |
|         # Alright, let's try running it
 | |
|         executor.migrate([("migrations", "0001_squashed_0002")])
 | |
|         # Are the tables there now?
 | |
|         self.assertTableExists("migrations_author")
 | |
|         self.assertTableExists("migrations_book")
 | |
|         # Rebuild the graph to reflect the new DB state
 | |
|         executor.loader.build_graph()
 | |
|         # Alright, let's undo what we did. Should also just use squashed.
 | |
|         plan = executor.migration_plan([("migrations", None)])
 | |
|         self.assertEqual(
 | |
|             plan,
 | |
|             [
 | |
|                 (executor.loader.graph.nodes["migrations", "0001_squashed_0002"], True),
 | |
|             ],
 | |
|         )
 | |
|         executor.migrate([("migrations", None)])
 | |
|         # Are the tables gone?
 | |
|         self.assertTableNotExists("migrations_author")
 | |
|         self.assertTableNotExists("migrations_book")
 | |
| 
 | |
|     @override_settings(
 | |
|         MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed"},
 | |
|     )
 | |
|     def test_migrate_backward_to_squashed_migration(self):
 | |
|         executor = MigrationExecutor(connection)
 | |
|         try:
 | |
|             self.assertTableNotExists("migrations_author")
 | |
|             self.assertTableNotExists("migrations_book")
 | |
|             executor.migrate([("migrations", "0001_squashed_0002")])
 | |
|             self.assertTableExists("migrations_author")
 | |
|             self.assertTableExists("migrations_book")
 | |
|             executor.loader.build_graph()
 | |
|             # Migrate backward to a squashed migration.
 | |
|             executor.migrate([("migrations", "0001_initial")])
 | |
|             self.assertTableExists("migrations_author")
 | |
|             self.assertTableNotExists("migrations_book")
 | |
|         finally:
 | |
|             # Unmigrate everything.
 | |
|             executor = MigrationExecutor(connection)
 | |
|             executor.migrate([("migrations", None)])
 | |
|             self.assertTableNotExists("migrations_author")
 | |
|             self.assertTableNotExists("migrations_book")
 | |
| 
 | |
|     @override_settings(
 | |
|         MIGRATION_MODULES={"migrations": "migrations.test_migrations_non_atomic"}
 | |
|     )
 | |
|     def test_non_atomic_migration(self):
 | |
|         """
 | |
|         Applying a non-atomic migration works as expected.
 | |
|         """
 | |
|         executor = MigrationExecutor(connection)
 | |
|         with self.assertRaisesMessage(RuntimeError, "Abort migration"):
 | |
|             executor.migrate([("migrations", "0001_initial")])
 | |
|         self.assertTableExists("migrations_publisher")
 | |
|         migrations_apps = executor.loader.project_state(
 | |
|             ("migrations", "0001_initial")
 | |
|         ).apps
 | |
|         Publisher = migrations_apps.get_model("migrations", "Publisher")
 | |
|         self.assertTrue(Publisher.objects.exists())
 | |
|         self.assertTableNotExists("migrations_book")
 | |
| 
 | |
|     @override_settings(
 | |
|         MIGRATION_MODULES={"migrations": "migrations.test_migrations_atomic_operation"}
 | |
|     )
 | |
|     def test_atomic_operation_in_non_atomic_migration(self):
 | |
|         """
 | |
|         An atomic operation is properly rolled back inside a non-atomic
 | |
|         migration.
 | |
|         """
 | |
|         executor = MigrationExecutor(connection)
 | |
|         with self.assertRaisesMessage(RuntimeError, "Abort migration"):
 | |
|             executor.migrate([("migrations", "0001_initial")])
 | |
|         migrations_apps = executor.loader.project_state(
 | |
|             ("migrations", "0001_initial")
 | |
|         ).apps
 | |
|         Editor = migrations_apps.get_model("migrations", "Editor")
 | |
|         self.assertFalse(Editor.objects.exists())
 | |
|         # Record previous migration as successful.
 | |
|         executor.migrate([("migrations", "0001_initial")], fake=True)
 | |
|         # Rebuild the graph to reflect the new DB state.
 | |
|         executor.loader.build_graph()
 | |
|         # Migrating backwards is also atomic.
 | |
|         with self.assertRaisesMessage(RuntimeError, "Abort migration"):
 | |
|             executor.migrate([("migrations", None)])
 | |
|         self.assertFalse(Editor.objects.exists())
 | |
| 
 | |
|     @override_settings(
 | |
|         MIGRATION_MODULES={
 | |
|             "migrations": "migrations.test_migrations",
 | |
|             "migrations2": "migrations2.test_migrations_2",
 | |
|         }
 | |
|     )
 | |
|     def test_empty_plan(self):
 | |
|         """
 | |
|         Re-planning a full migration of a fully-migrated set doesn't
 | |
|         perform spurious unmigrations and remigrations.
 | |
| 
 | |
|         There was previously a bug where the executor just always performed the
 | |
|         backwards plan for applied migrations - which even for the most recent
 | |
|         migration in an app, might include other, dependent apps, and these
 | |
|         were being unmigrated.
 | |
|         """
 | |
|         # Make the initial plan, check it
 | |
|         executor = MigrationExecutor(connection)
 | |
|         plan = executor.migration_plan(
 | |
|             [
 | |
|                 ("migrations", "0002_second"),
 | |
|                 ("migrations2", "0001_initial"),
 | |
|             ]
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             plan,
 | |
|             [
 | |
|                 (executor.loader.graph.nodes["migrations", "0001_initial"], False),
 | |
|                 (executor.loader.graph.nodes["migrations", "0002_second"], False),
 | |
|                 (executor.loader.graph.nodes["migrations2", "0001_initial"], False),
 | |
|             ],
 | |
|         )
 | |
|         # Fake-apply all migrations
 | |
|         executor.migrate(
 | |
|             [("migrations", "0002_second"), ("migrations2", "0001_initial")], fake=True
 | |
|         )
 | |
|         # Rebuild the graph to reflect the new DB state
 | |
|         executor.loader.build_graph()
 | |
|         # Now plan a second time and make sure it's empty
 | |
|         plan = executor.migration_plan(
 | |
|             [
 | |
|                 ("migrations", "0002_second"),
 | |
|                 ("migrations2", "0001_initial"),
 | |
|             ]
 | |
|         )
 | |
|         self.assertEqual(plan, [])
 | |
|         # The resulting state should include applied migrations.
 | |
|         state = executor.migrate(
 | |
|             [
 | |
|                 ("migrations", "0002_second"),
 | |
|                 ("migrations2", "0001_initial"),
 | |
|             ]
 | |
|         )
 | |
|         self.assertIn(("migrations", "book"), state.models)
 | |
|         self.assertIn(("migrations", "author"), state.models)
 | |
|         self.assertIn(("migrations2", "otherauthor"), state.models)
 | |
|         # Erase all the fake records
 | |
|         executor.recorder.record_unapplied("migrations2", "0001_initial")
 | |
|         executor.recorder.record_unapplied("migrations", "0002_second")
 | |
|         executor.recorder.record_unapplied("migrations", "0001_initial")
 | |
| 
 | |
|     @override_settings(
 | |
|         MIGRATION_MODULES={
 | |
|             "migrations": "migrations.test_migrations",
 | |
|             "migrations2": "migrations2.test_migrations_2_no_deps",
 | |
|         }
 | |
|     )
 | |
|     def test_mixed_plan_not_supported(self):
 | |
|         """
 | |
|         Although the MigrationExecutor interfaces allows for mixed migration
 | |
|         plans (combined forwards and backwards migrations) this is not
 | |
|         supported.
 | |
|         """
 | |
|         # Prepare for mixed plan
 | |
|         executor = MigrationExecutor(connection)
 | |
|         plan = executor.migration_plan([("migrations", "0002_second")])
 | |
|         self.assertEqual(
 | |
|             plan,
 | |
|             [
 | |
|                 (executor.loader.graph.nodes["migrations", "0001_initial"], False),
 | |
|                 (executor.loader.graph.nodes["migrations", "0002_second"], False),
 | |
|             ],
 | |
|         )
 | |
|         executor.migrate(None, plan)
 | |
|         # Rebuild the graph to reflect the new DB state
 | |
|         executor.loader.build_graph()
 | |
|         self.assertIn(
 | |
|             ("migrations", "0001_initial"), executor.loader.applied_migrations
 | |
|         )
 | |
|         self.assertIn(("migrations", "0002_second"), executor.loader.applied_migrations)
 | |
|         self.assertNotIn(
 | |
|             ("migrations2", "0001_initial"), executor.loader.applied_migrations
 | |
|         )
 | |
| 
 | |
|         # Generate mixed plan
 | |
|         plan = executor.migration_plan(
 | |
|             [
 | |
|                 ("migrations", None),
 | |
|                 ("migrations2", "0001_initial"),
 | |
|             ]
 | |
|         )
 | |
|         msg = (
 | |
|             "Migration plans with both forwards and backwards migrations are "
 | |
|             "not supported. Please split your migration process into separate "
 | |
|             "plans of only forwards OR backwards migrations."
 | |
|         )
 | |
|         with self.assertRaisesMessage(InvalidMigrationPlan, msg) as cm:
 | |
|             executor.migrate(None, plan)
 | |
|         self.assertEqual(
 | |
|             cm.exception.args[1],
 | |
|             [
 | |
|                 (executor.loader.graph.nodes["migrations", "0002_second"], True),
 | |
|                 (executor.loader.graph.nodes["migrations", "0001_initial"], True),
 | |
|                 (executor.loader.graph.nodes["migrations2", "0001_initial"], False),
 | |
|             ],
 | |
|         )
 | |
|         # Rebuild the graph to reflect the new DB state
 | |
|         executor.loader.build_graph()
 | |
|         executor.migrate(
 | |
|             [
 | |
|                 ("migrations", None),
 | |
|                 ("migrations2", None),
 | |
|             ]
 | |
|         )
 | |
|         # Are the tables gone?
 | |
|         self.assertTableNotExists("migrations_author")
 | |
|         self.assertTableNotExists("migrations_book")
 | |
|         self.assertTableNotExists("migrations2_otherauthor")
 | |
| 
 | |
|     @override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations"})
 | |
|     def test_soft_apply(self):
 | |
|         """
 | |
|         Tests detection of initial migrations already having been applied.
 | |
|         """
 | |
|         state = {"faked": None}
 | |
| 
 | |
|         def fake_storer(phase, migration=None, fake=None):
 | |
|             state["faked"] = fake
 | |
| 
 | |
|         executor = MigrationExecutor(connection, progress_callback=fake_storer)
 | |
|         # Were the tables there before?
 | |
|         self.assertTableNotExists("migrations_author")
 | |
|         self.assertTableNotExists("migrations_tribble")
 | |
|         # Run it normally
 | |
|         self.assertEqual(
 | |
|             executor.migration_plan([("migrations", "0001_initial")]),
 | |
|             [
 | |
|                 (executor.loader.graph.nodes["migrations", "0001_initial"], False),
 | |
|             ],
 | |
|         )
 | |
|         executor.migrate([("migrations", "0001_initial")])
 | |
|         # Are the tables there now?
 | |
|         self.assertTableExists("migrations_author")
 | |
|         self.assertTableExists("migrations_tribble")
 | |
|         # We shouldn't have faked that one
 | |
|         self.assertIs(state["faked"], False)
 | |
|         # Rebuild the graph to reflect the new DB state
 | |
|         executor.loader.build_graph()
 | |
|         # Fake-reverse that
 | |
|         executor.migrate([("migrations", None)], fake=True)
 | |
|         # Are the tables still there?
 | |
|         self.assertTableExists("migrations_author")
 | |
|         self.assertTableExists("migrations_tribble")
 | |
|         # Make sure that was faked
 | |
|         self.assertIs(state["faked"], True)
 | |
|         # Finally, migrate forwards; this should fake-apply our initial migration
 | |
|         executor.loader.build_graph()
 | |
|         self.assertEqual(
 | |
|             executor.migration_plan([("migrations", "0001_initial")]),
 | |
|             [
 | |
|                 (executor.loader.graph.nodes["migrations", "0001_initial"], False),
 | |
|             ],
 | |
|         )
 | |
|         # Applying the migration should raise a database level error
 | |
|         # because we haven't given the --fake-initial option
 | |
|         with self.assertRaises(DatabaseError):
 | |
|             executor.migrate([("migrations", "0001_initial")])
 | |
|         # Reset the faked state
 | |
|         state = {"faked": None}
 | |
|         # Allow faking of initial CreateModel operations
 | |
|         executor.migrate([("migrations", "0001_initial")], fake_initial=True)
 | |
|         self.assertIs(state["faked"], True)
 | |
|         # And migrate back to clean up the database
 | |
|         executor.loader.build_graph()
 | |
|         executor.migrate([("migrations", None)])
 | |
|         self.assertTableNotExists("migrations_author")
 | |
|         self.assertTableNotExists("migrations_tribble")
 | |
| 
 | |
|     @override_settings(
 | |
|         MIGRATION_MODULES={
 | |
|             "migrations": "migrations.test_migrations_custom_user",
 | |
|             "django.contrib.auth": "django.contrib.auth.migrations",
 | |
|         },
 | |
|         AUTH_USER_MODEL="migrations.Author",
 | |
|     )
 | |
|     def test_custom_user(self):
 | |
|         """
 | |
|         Regression test for #22325 - references to a custom user model defined in the
 | |
|         same app are not resolved correctly.
 | |
|         """
 | |
|         with isolate_lru_cache(global_apps.get_swappable_settings_name):
 | |
|             executor = MigrationExecutor(connection)
 | |
|             self.assertTableNotExists("migrations_author")
 | |
|             self.assertTableNotExists("migrations_tribble")
 | |
|             # Migrate forwards
 | |
|             executor.migrate([("migrations", "0001_initial")])
 | |
|             self.assertTableExists("migrations_author")
 | |
|             self.assertTableExists("migrations_tribble")
 | |
|             # The soft-application detection works.
 | |
|             # Change table_names to not return auth_user during this as it
 | |
|             # wouldn't be there in a normal run, and ensure migrations.Author
 | |
|             # exists in the global app registry temporarily.
 | |
|             old_table_names = connection.introspection.table_names
 | |
|             connection.introspection.table_names = lambda c: [
 | |
|                 x for x in old_table_names(c) if x != "auth_user"
 | |
|             ]
 | |
|             migrations_apps = executor.loader.project_state(
 | |
|                 ("migrations", "0001_initial"),
 | |
|             ).apps
 | |
|             global_apps.get_app_config("migrations").models[
 | |
|                 "author"
 | |
|             ] = migrations_apps.get_model("migrations", "author")
 | |
|             try:
 | |
|                 migration = executor.loader.get_migration("auth", "0001_initial")
 | |
|                 self.assertIs(executor.detect_soft_applied(None, migration)[0], True)
 | |
|             finally:
 | |
|                 connection.introspection.table_names = old_table_names
 | |
|                 del global_apps.get_app_config("migrations").models["author"]
 | |
|                 # Migrate back to clean up the database.
 | |
|                 executor.loader.build_graph()
 | |
|                 executor.migrate([("migrations", None)])
 | |
|                 self.assertTableNotExists("migrations_author")
 | |
|                 self.assertTableNotExists("migrations_tribble")
 | |
| 
 | |
|     @override_settings(
 | |
|         MIGRATION_MODULES={
 | |
|             "migrations": "migrations.test_add_many_to_many_field_initial",
 | |
|         },
 | |
|     )
 | |
|     def test_detect_soft_applied_add_field_manytomanyfield(self):
 | |
|         """
 | |
|         executor.detect_soft_applied() detects ManyToManyField tables from an
 | |
|         AddField operation. This checks the case of AddField in a migration
 | |
|         with other operations (0001) and the case of AddField in its own
 | |
|         migration (0002).
 | |
|         """
 | |
|         tables = [
 | |
|             # from 0001
 | |
|             "migrations_project",
 | |
|             "migrations_task",
 | |
|             "migrations_project_tasks",
 | |
|             # from 0002
 | |
|             "migrations_task_projects",
 | |
|         ]
 | |
|         executor = MigrationExecutor(connection)
 | |
|         # Create the tables for 0001 but make it look like the migration hasn't
 | |
|         # been applied.
 | |
|         executor.migrate([("migrations", "0001_initial")])
 | |
|         executor.migrate([("migrations", None)], fake=True)
 | |
|         for table in tables[:3]:
 | |
|             self.assertTableExists(table)
 | |
|         # Table detection sees 0001 is applied but not 0002.
 | |
|         migration = executor.loader.get_migration("migrations", "0001_initial")
 | |
|         self.assertIs(executor.detect_soft_applied(None, migration)[0], True)
 | |
|         migration = executor.loader.get_migration("migrations", "0002_initial")
 | |
|         self.assertIs(executor.detect_soft_applied(None, migration)[0], False)
 | |
| 
 | |
|         # Create the tables for both migrations but make it look like neither
 | |
|         # has been applied.
 | |
|         executor.loader.build_graph()
 | |
|         executor.migrate([("migrations", "0001_initial")], fake=True)
 | |
|         executor.migrate([("migrations", "0002_initial")])
 | |
|         executor.loader.build_graph()
 | |
|         executor.migrate([("migrations", None)], fake=True)
 | |
|         # Table detection sees 0002 is applied.
 | |
|         migration = executor.loader.get_migration("migrations", "0002_initial")
 | |
|         self.assertIs(executor.detect_soft_applied(None, migration)[0], True)
 | |
| 
 | |
|         # Leave the tables for 0001 except the many-to-many table. That missing
 | |
|         # table should cause detect_soft_applied() to return False.
 | |
|         with connection.schema_editor() as editor:
 | |
|             for table in tables[2:]:
 | |
|                 editor.execute(editor.sql_delete_table % {"table": table})
 | |
|         migration = executor.loader.get_migration("migrations", "0001_initial")
 | |
|         self.assertIs(executor.detect_soft_applied(None, migration)[0], False)
 | |
| 
 | |
|         # Cleanup by removing the remaining tables.
 | |
|         with connection.schema_editor() as editor:
 | |
|             for table in tables[:2]:
 | |
|                 editor.execute(editor.sql_delete_table % {"table": table})
 | |
|         for table in tables:
 | |
|             self.assertTableNotExists(table)
 | |
| 
 | |
|     @override_settings(
 | |
|         INSTALLED_APPS=[
 | |
|             "migrations.migrations_test_apps.lookuperror_a",
 | |
|             "migrations.migrations_test_apps.lookuperror_b",
 | |
|             "migrations.migrations_test_apps.lookuperror_c",
 | |
|         ]
 | |
|     )
 | |
|     def test_unrelated_model_lookups_forwards(self):
 | |
|         """
 | |
|         #24123 - All models of apps already applied which are
 | |
|         unrelated to the first app being applied are part of the initial model
 | |
|         state.
 | |
|         """
 | |
|         try:
 | |
|             executor = MigrationExecutor(connection)
 | |
|             self.assertTableNotExists("lookuperror_a_a1")
 | |
|             self.assertTableNotExists("lookuperror_b_b1")
 | |
|             self.assertTableNotExists("lookuperror_c_c1")
 | |
|             executor.migrate([("lookuperror_b", "0003_b3")])
 | |
|             self.assertTableExists("lookuperror_b_b3")
 | |
|             # Rebuild the graph to reflect the new DB state
 | |
|             executor.loader.build_graph()
 | |
| 
 | |
|             # Migrate forwards -- This led to a lookup LookupErrors because
 | |
|             # lookuperror_b.B2 is already applied
 | |
|             executor.migrate(
 | |
|                 [
 | |
|                     ("lookuperror_a", "0004_a4"),
 | |
|                     ("lookuperror_c", "0003_c3"),
 | |
|                 ]
 | |
|             )
 | |
|             self.assertTableExists("lookuperror_a_a4")
 | |
|             self.assertTableExists("lookuperror_c_c3")
 | |
| 
 | |
|             # Rebuild the graph to reflect the new DB state
 | |
|             executor.loader.build_graph()
 | |
|         finally:
 | |
|             # Cleanup
 | |
|             executor.migrate(
 | |
|                 [
 | |
|                     ("lookuperror_a", None),
 | |
|                     ("lookuperror_b", None),
 | |
|                     ("lookuperror_c", None),
 | |
|                 ]
 | |
|             )
 | |
|             self.assertTableNotExists("lookuperror_a_a1")
 | |
|             self.assertTableNotExists("lookuperror_b_b1")
 | |
|             self.assertTableNotExists("lookuperror_c_c1")
 | |
| 
 | |
|     @override_settings(
 | |
|         INSTALLED_APPS=[
 | |
|             "migrations.migrations_test_apps.lookuperror_a",
 | |
|             "migrations.migrations_test_apps.lookuperror_b",
 | |
|             "migrations.migrations_test_apps.lookuperror_c",
 | |
|         ]
 | |
|     )
 | |
|     def test_unrelated_model_lookups_backwards(self):
 | |
|         """
 | |
|         #24123 - All models of apps being unapplied which are
 | |
|         unrelated to the first app being unapplied are part of the initial
 | |
|         model state.
 | |
|         """
 | |
|         try:
 | |
|             executor = MigrationExecutor(connection)
 | |
|             self.assertTableNotExists("lookuperror_a_a1")
 | |
|             self.assertTableNotExists("lookuperror_b_b1")
 | |
|             self.assertTableNotExists("lookuperror_c_c1")
 | |
|             executor.migrate(
 | |
|                 [
 | |
|                     ("lookuperror_a", "0004_a4"),
 | |
|                     ("lookuperror_b", "0003_b3"),
 | |
|                     ("lookuperror_c", "0003_c3"),
 | |
|                 ]
 | |
|             )
 | |
|             self.assertTableExists("lookuperror_b_b3")
 | |
|             self.assertTableExists("lookuperror_a_a4")
 | |
|             self.assertTableExists("lookuperror_c_c3")
 | |
|             # Rebuild the graph to reflect the new DB state
 | |
|             executor.loader.build_graph()
 | |
| 
 | |
|             # Migrate backwards -- This led to a lookup LookupErrors because
 | |
|             # lookuperror_b.B2 is not in the initial state (unrelated to app c)
 | |
|             executor.migrate([("lookuperror_a", None)])
 | |
| 
 | |
|             # Rebuild the graph to reflect the new DB state
 | |
|             executor.loader.build_graph()
 | |
|         finally:
 | |
|             # Cleanup
 | |
|             executor.migrate([("lookuperror_b", None), ("lookuperror_c", None)])
 | |
|             self.assertTableNotExists("lookuperror_a_a1")
 | |
|             self.assertTableNotExists("lookuperror_b_b1")
 | |
|             self.assertTableNotExists("lookuperror_c_c1")
 | |
| 
 | |
|     @override_settings(
 | |
|         INSTALLED_APPS=[
 | |
|             "migrations.migrations_test_apps.mutate_state_a",
 | |
|             "migrations.migrations_test_apps.mutate_state_b",
 | |
|         ]
 | |
|     )
 | |
|     def test_unrelated_applied_migrations_mutate_state(self):
 | |
|         """
 | |
|         #26647 - Unrelated applied migrations should be part of the final
 | |
|         state in both directions.
 | |
|         """
 | |
|         executor = MigrationExecutor(connection)
 | |
|         executor.migrate(
 | |
|             [
 | |
|                 ("mutate_state_b", "0002_add_field"),
 | |
|             ]
 | |
|         )
 | |
|         # Migrate forward.
 | |
|         executor.loader.build_graph()
 | |
|         state = executor.migrate(
 | |
|             [
 | |
|                 ("mutate_state_a", "0001_initial"),
 | |
|             ]
 | |
|         )
 | |
|         self.assertIn("added", state.models["mutate_state_b", "b"].fields)
 | |
|         executor.loader.build_graph()
 | |
|         # Migrate backward.
 | |
|         state = executor.migrate(
 | |
|             [
 | |
|                 ("mutate_state_a", None),
 | |
|             ]
 | |
|         )
 | |
|         self.assertIn("added", state.models["mutate_state_b", "b"].fields)
 | |
|         executor.migrate(
 | |
|             [
 | |
|                 ("mutate_state_b", None),
 | |
|             ]
 | |
|         )
 | |
| 
 | |
|     @override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations"})
 | |
|     def test_process_callback(self):
 | |
|         """
 | |
|         #24129 - Tests callback process
 | |
|         """
 | |
|         call_args_list = []
 | |
| 
 | |
|         def callback(*args):
 | |
|             call_args_list.append(args)
 | |
| 
 | |
|         executor = MigrationExecutor(connection, progress_callback=callback)
 | |
|         # Were the tables there before?
 | |
|         self.assertTableNotExists("migrations_author")
 | |
|         self.assertTableNotExists("migrations_tribble")
 | |
|         executor.migrate(
 | |
|             [
 | |
|                 ("migrations", "0001_initial"),
 | |
|                 ("migrations", "0002_second"),
 | |
|             ]
 | |
|         )
 | |
|         # Rebuild the graph to reflect the new DB state
 | |
|         executor.loader.build_graph()
 | |
| 
 | |
|         executor.migrate(
 | |
|             [
 | |
|                 ("migrations", None),
 | |
|                 ("migrations", None),
 | |
|             ]
 | |
|         )
 | |
|         self.assertTableNotExists("migrations_author")
 | |
|         self.assertTableNotExists("migrations_tribble")
 | |
| 
 | |
|         migrations = executor.loader.graph.nodes
 | |
|         expected = [
 | |
|             ("render_start",),
 | |
|             ("render_success",),
 | |
|             ("apply_start", migrations["migrations", "0001_initial"], False),
 | |
|             ("apply_success", migrations["migrations", "0001_initial"], False),
 | |
|             ("apply_start", migrations["migrations", "0002_second"], False),
 | |
|             ("apply_success", migrations["migrations", "0002_second"], False),
 | |
|             ("render_start",),
 | |
|             ("render_success",),
 | |
|             ("unapply_start", migrations["migrations", "0002_second"], False),
 | |
|             ("unapply_success", migrations["migrations", "0002_second"], False),
 | |
|             ("unapply_start", migrations["migrations", "0001_initial"], False),
 | |
|             ("unapply_success", migrations["migrations", "0001_initial"], False),
 | |
|         ]
 | |
|         self.assertEqual(call_args_list, expected)
 | |
| 
 | |
|     @override_settings(
 | |
|         INSTALLED_APPS=[
 | |
|             "migrations.migrations_test_apps.alter_fk.author_app",
 | |
|             "migrations.migrations_test_apps.alter_fk.book_app",
 | |
|         ]
 | |
|     )
 | |
|     def test_alter_id_type_with_fk(self):
 | |
|         try:
 | |
|             executor = MigrationExecutor(connection)
 | |
|             self.assertTableNotExists("author_app_author")
 | |
|             self.assertTableNotExists("book_app_book")
 | |
|             # Apply initial migrations
 | |
|             executor.migrate(
 | |
|                 [
 | |
|                     ("author_app", "0001_initial"),
 | |
|                     ("book_app", "0001_initial"),
 | |
|                 ]
 | |
|             )
 | |
|             self.assertTableExists("author_app_author")
 | |
|             self.assertTableExists("book_app_book")
 | |
|             # Rebuild the graph to reflect the new DB state
 | |
|             executor.loader.build_graph()
 | |
| 
 | |
|             # Apply PK type alteration
 | |
|             executor.migrate([("author_app", "0002_alter_id")])
 | |
| 
 | |
|             # Rebuild the graph to reflect the new DB state
 | |
|             executor.loader.build_graph()
 | |
|         finally:
 | |
|             # We can't simply unapply the migrations here because there is no
 | |
|             # implicit cast from VARCHAR to INT on the database level.
 | |
|             with connection.schema_editor() as editor:
 | |
|                 editor.execute(editor.sql_delete_table % {"table": "book_app_book"})
 | |
|                 editor.execute(editor.sql_delete_table % {"table": "author_app_author"})
 | |
|             self.assertTableNotExists("author_app_author")
 | |
|             self.assertTableNotExists("book_app_book")
 | |
|             executor.migrate([("author_app", None)], fake=True)
 | |
| 
 | |
|     @override_settings(
 | |
|         MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed"}
 | |
|     )
 | |
|     def test_apply_all_replaced_marks_replacement_as_applied(self):
 | |
|         """
 | |
|         Applying all replaced migrations marks replacement as applied (#24628).
 | |
|         """
 | |
|         recorder = MigrationRecorder(connection)
 | |
|         # Place the database in a state where the replaced migrations are
 | |
|         # partially applied: 0001 is applied, 0002 is not.
 | |
|         recorder.record_applied("migrations", "0001_initial")
 | |
|         executor = MigrationExecutor(connection)
 | |
|         # Use fake because we don't actually have the first migration
 | |
|         # applied, so the second will fail. And there's no need to actually
 | |
|         # create/modify tables here, we're just testing the
 | |
|         # MigrationRecord, which works the same with or without fake.
 | |
|         executor.migrate([("migrations", "0002_second")], fake=True)
 | |
| 
 | |
|         # Because we've now applied 0001 and 0002 both, their squashed
 | |
|         # replacement should be marked as applied.
 | |
|         self.assertIn(
 | |
|             ("migrations", "0001_squashed_0002"),
 | |
|             recorder.applied_migrations(),
 | |
|         )
 | |
| 
 | |
|     @override_settings(
 | |
|         MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed"}
 | |
|     )
 | |
|     def test_migrate_marks_replacement_applied_even_if_it_did_nothing(self):
 | |
|         """
 | |
|         A new squash migration will be marked as applied even if all its
 | |
|         replaced migrations were previously already applied (#24628).
 | |
|         """
 | |
|         recorder = MigrationRecorder(connection)
 | |
|         # Record all replaced migrations as applied
 | |
|         recorder.record_applied("migrations", "0001_initial")
 | |
|         recorder.record_applied("migrations", "0002_second")
 | |
|         executor = MigrationExecutor(connection)
 | |
|         executor.migrate([("migrations", "0001_squashed_0002")])
 | |
| 
 | |
|         # Because 0001 and 0002 are both applied, even though this migrate run
 | |
|         # didn't apply anything new, their squashed replacement should be
 | |
|         # marked as applied.
 | |
|         self.assertIn(
 | |
|             ("migrations", "0001_squashed_0002"),
 | |
|             recorder.applied_migrations(),
 | |
|         )
 | |
| 
 | |
|     @override_settings(
 | |
|         MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed"}
 | |
|     )
 | |
|     def test_migrate_marks_replacement_unapplied(self):
 | |
|         executor = MigrationExecutor(connection)
 | |
|         executor.migrate([("migrations", "0001_squashed_0002")])
 | |
|         try:
 | |
|             self.assertIn(
 | |
|                 ("migrations", "0001_squashed_0002"),
 | |
|                 executor.recorder.applied_migrations(),
 | |
|             )
 | |
|         finally:
 | |
|             executor.loader.build_graph()
 | |
|             executor.migrate([("migrations", None)])
 | |
|             self.assertNotIn(
 | |
|                 ("migrations", "0001_squashed_0002"),
 | |
|                 executor.recorder.applied_migrations(),
 | |
|             )
 | |
| 
 | |
|     # When the feature is False, the operation and the record won't be
 | |
|     # performed in a transaction and the test will systematically pass.
 | |
|     @skipUnlessDBFeature("can_rollback_ddl")
 | |
|     def test_migrations_applied_and_recorded_atomically(self):
 | |
|         """Migrations are applied and recorded atomically."""
 | |
| 
 | |
|         class Migration(migrations.Migration):
 | |
|             operations = [
 | |
|                 migrations.CreateModel(
 | |
|                     "model",
 | |
|                     [
 | |
|                         ("id", models.AutoField(primary_key=True)),
 | |
|                     ],
 | |
|                 ),
 | |
|             ]
 | |
| 
 | |
|         executor = MigrationExecutor(connection)
 | |
|         with mock.patch(
 | |
|             "django.db.migrations.executor.MigrationExecutor.record_migration"
 | |
|         ) as record_migration:
 | |
|             record_migration.side_effect = RuntimeError("Recording migration failed.")
 | |
|             with self.assertRaisesMessage(RuntimeError, "Recording migration failed."):
 | |
|                 executor.apply_migration(
 | |
|                     ProjectState(),
 | |
|                     Migration("0001_initial", "record_migration"),
 | |
|                 )
 | |
|                 executor.migrate([("migrations", "0001_initial")])
 | |
|         # The migration isn't recorded as applied since it failed.
 | |
|         migration_recorder = MigrationRecorder(connection)
 | |
|         self.assertIs(
 | |
|             migration_recorder.migration_qs.filter(
 | |
|                 app="record_migration",
 | |
|                 name="0001_initial",
 | |
|             ).exists(),
 | |
|             False,
 | |
|         )
 | |
|         self.assertTableNotExists("record_migration_model")
 | |
| 
 | |
|     def test_migrations_not_applied_on_deferred_sql_failure(self):
 | |
|         """Migrations are not recorded if deferred SQL application fails."""
 | |
| 
 | |
|         class DeferredSQL:
 | |
|             def __str__(self):
 | |
|                 raise DatabaseError("Failed to apply deferred SQL")
 | |
| 
 | |
|         class Migration(migrations.Migration):
 | |
|             atomic = False
 | |
| 
 | |
|             def apply(self, project_state, schema_editor, collect_sql=False):
 | |
|                 schema_editor.deferred_sql.append(DeferredSQL())
 | |
| 
 | |
|         executor = MigrationExecutor(connection)
 | |
|         with self.assertRaisesMessage(DatabaseError, "Failed to apply deferred SQL"):
 | |
|             executor.apply_migration(
 | |
|                 ProjectState(),
 | |
|                 Migration("0001_initial", "deferred_sql"),
 | |
|             )
 | |
|         # The migration isn't recorded as applied since it failed.
 | |
|         migration_recorder = MigrationRecorder(connection)
 | |
|         self.assertIs(
 | |
|             migration_recorder.migration_qs.filter(
 | |
|                 app="deferred_sql",
 | |
|                 name="0001_initial",
 | |
|             ).exists(),
 | |
|             False,
 | |
|         )
 | |
| 
 | |
|     @mock.patch.object(MigrationRecorder, "has_table", return_value=False)
 | |
|     def test_migrate_skips_schema_creation(self, mocked_has_table):
 | |
|         """
 | |
|         The django_migrations table is not created if there are no migrations
 | |
|         to record.
 | |
|         """
 | |
|         executor = MigrationExecutor(connection)
 | |
|         # 0 queries, since the query for has_table is being mocked.
 | |
|         with self.assertNumQueries(0):
 | |
|             executor.migrate([], plan=[])
 | |
| 
 | |
| 
 | |
| class FakeLoader:
 | |
|     def __init__(self, graph, applied):
 | |
|         self.graph = graph
 | |
|         self.applied_migrations = applied
 | |
|         self.replace_migrations = True
 | |
| 
 | |
| 
 | |
| class FakeMigration:
 | |
|     """Really all we need is any object with a debug-useful repr."""
 | |
| 
 | |
|     def __init__(self, name):
 | |
|         self.name = name
 | |
| 
 | |
|     def __repr__(self):
 | |
|         return "M<%s>" % self.name
 | |
| 
 | |
| 
 | |
| class ExecutorUnitTests(SimpleTestCase):
 | |
|     """(More) isolated unit tests for executor methods."""
 | |
| 
 | |
|     def test_minimize_rollbacks(self):
 | |
|         """
 | |
|         Minimize unnecessary rollbacks in connected apps.
 | |
| 
 | |
|         When you say "./manage.py migrate appA 0001", rather than migrating to
 | |
|         just after appA-0001 in the linearized migration plan (which could roll
 | |
|         back migrations in other apps that depend on appA 0001, but don't need
 | |
|         to be rolled back since we're not rolling back appA 0001), we migrate
 | |
|         to just before appA-0002.
 | |
|         """
 | |
|         a1_impl = FakeMigration("a1")
 | |
|         a1 = ("a", "1")
 | |
|         a2_impl = FakeMigration("a2")
 | |
|         a2 = ("a", "2")
 | |
|         b1_impl = FakeMigration("b1")
 | |
|         b1 = ("b", "1")
 | |
|         graph = MigrationGraph()
 | |
|         graph.add_node(a1, a1_impl)
 | |
|         graph.add_node(a2, a2_impl)
 | |
|         graph.add_node(b1, b1_impl)
 | |
|         graph.add_dependency(None, b1, a1)
 | |
|         graph.add_dependency(None, a2, a1)
 | |
| 
 | |
|         executor = MigrationExecutor(None)
 | |
|         executor.loader = FakeLoader(
 | |
|             graph,
 | |
|             {
 | |
|                 a1: a1_impl,
 | |
|                 b1: b1_impl,
 | |
|                 a2: a2_impl,
 | |
|             },
 | |
|         )
 | |
| 
 | |
|         plan = executor.migration_plan({a1})
 | |
| 
 | |
|         self.assertEqual(plan, [(a2_impl, True)])
 | |
| 
 | |
|     def test_minimize_rollbacks_branchy(self):
 | |
|         r"""
 | |
|         Minimize rollbacks when target has multiple in-app children.
 | |
| 
 | |
|         a: 1 <---- 3 <--\
 | |
|               \ \- 2 <--- 4
 | |
|                \       \
 | |
|         b:      \- 1 <--- 2
 | |
|         """
 | |
|         a1_impl = FakeMigration("a1")
 | |
|         a1 = ("a", "1")
 | |
|         a2_impl = FakeMigration("a2")
 | |
|         a2 = ("a", "2")
 | |
|         a3_impl = FakeMigration("a3")
 | |
|         a3 = ("a", "3")
 | |
|         a4_impl = FakeMigration("a4")
 | |
|         a4 = ("a", "4")
 | |
|         b1_impl = FakeMigration("b1")
 | |
|         b1 = ("b", "1")
 | |
|         b2_impl = FakeMigration("b2")
 | |
|         b2 = ("b", "2")
 | |
|         graph = MigrationGraph()
 | |
|         graph.add_node(a1, a1_impl)
 | |
|         graph.add_node(a2, a2_impl)
 | |
|         graph.add_node(a3, a3_impl)
 | |
|         graph.add_node(a4, a4_impl)
 | |
|         graph.add_node(b1, b1_impl)
 | |
|         graph.add_node(b2, b2_impl)
 | |
|         graph.add_dependency(None, a2, a1)
 | |
|         graph.add_dependency(None, a3, a1)
 | |
|         graph.add_dependency(None, a4, a2)
 | |
|         graph.add_dependency(None, a4, a3)
 | |
|         graph.add_dependency(None, b2, b1)
 | |
|         graph.add_dependency(None, b1, a1)
 | |
|         graph.add_dependency(None, b2, a2)
 | |
| 
 | |
|         executor = MigrationExecutor(None)
 | |
|         executor.loader = FakeLoader(
 | |
|             graph,
 | |
|             {
 | |
|                 a1: a1_impl,
 | |
|                 b1: b1_impl,
 | |
|                 a2: a2_impl,
 | |
|                 b2: b2_impl,
 | |
|                 a3: a3_impl,
 | |
|                 a4: a4_impl,
 | |
|             },
 | |
|         )
 | |
| 
 | |
|         plan = executor.migration_plan({a1})
 | |
| 
 | |
|         should_be_rolled_back = [b2_impl, a4_impl, a2_impl, a3_impl]
 | |
|         exp = [(m, True) for m in should_be_rolled_back]
 | |
|         self.assertEqual(plan, exp)
 | |
| 
 | |
|     def test_backwards_nothing_to_do(self):
 | |
|         r"""
 | |
|         If the current state satisfies the given target, do nothing.
 | |
| 
 | |
|         a: 1 <--- 2
 | |
|         b:    \- 1
 | |
|         c:     \- 1
 | |
| 
 | |
|         If a1 is applied already and a2 is not, and we're asked to migrate to
 | |
|         a1, don't apply or unapply b1 or c1, regardless of their current state.
 | |
|         """
 | |
|         a1_impl = FakeMigration("a1")
 | |
|         a1 = ("a", "1")
 | |
|         a2_impl = FakeMigration("a2")
 | |
|         a2 = ("a", "2")
 | |
|         b1_impl = FakeMigration("b1")
 | |
|         b1 = ("b", "1")
 | |
|         c1_impl = FakeMigration("c1")
 | |
|         c1 = ("c", "1")
 | |
|         graph = MigrationGraph()
 | |
|         graph.add_node(a1, a1_impl)
 | |
|         graph.add_node(a2, a2_impl)
 | |
|         graph.add_node(b1, b1_impl)
 | |
|         graph.add_node(c1, c1_impl)
 | |
|         graph.add_dependency(None, a2, a1)
 | |
|         graph.add_dependency(None, b1, a1)
 | |
|         graph.add_dependency(None, c1, a1)
 | |
| 
 | |
|         executor = MigrationExecutor(None)
 | |
|         executor.loader = FakeLoader(
 | |
|             graph,
 | |
|             {
 | |
|                 a1: a1_impl,
 | |
|                 b1: b1_impl,
 | |
|             },
 | |
|         )
 | |
| 
 | |
|         plan = executor.migration_plan({a1})
 | |
| 
 | |
|         self.assertEqual(plan, [])
 |