mirror of
				https://github.com/django/django.git
				synced 2025-10-24 22:26:08 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			175 lines
		
	
	
		
			6.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			175 lines
		
	
	
		
			6.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import unittest
 | |
| 
 | |
| try:
 | |
|     import sqlparse
 | |
| except ImportError:
 | |
|     sqlparse = None
 | |
| 
 | |
| from django.db import migrations, models, connection
 | |
| from django.db.migrations.state import ProjectState
 | |
| from django.test import override_settings
 | |
| 
 | |
| from .test_operations import OperationTestBase
 | |
| 
 | |
| 
 | |
| class AgnosticRouter(object):
 | |
|     """
 | |
|     A router that doesn't have an opinion regarding migrating.
 | |
|     """
 | |
|     def allow_migrate(self, db, model, **hints):
 | |
|         return None
 | |
| 
 | |
| 
 | |
| class MigrateNothingRouter(object):
 | |
|     """
 | |
|     A router that doesn't allow migrating.
 | |
|     """
 | |
|     def allow_migrate(self, db, model, **hints):
 | |
|         return False
 | |
| 
 | |
| 
 | |
| class MigrateEverythingRouter(object):
 | |
|     """
 | |
|     A router that always allows migrating.
 | |
|     """
 | |
|     def allow_migrate(self, db, model, **hints):
 | |
|         return True
 | |
| 
 | |
| 
 | |
| class MigrateWhenFooRouter(object):
 | |
|     """
 | |
|     A router that allows migrating depending on a hint.
 | |
|     """
 | |
|     def allow_migrate(self, db, model, **hints):
 | |
|         return hints.get('foo', False)
 | |
| 
 | |
| 
 | |
| class MultiDBOperationTests(OperationTestBase):
 | |
|     multi_db = True
 | |
| 
 | |
|     def _test_create_model(self, app_label, should_run):
 | |
|         """
 | |
|         Tests that CreateModel honours multi-db settings.
 | |
|         """
 | |
|         operation = migrations.CreateModel(
 | |
|             "Pony",
 | |
|             [("id", models.AutoField(primary_key=True))],
 | |
|         )
 | |
|         # Test the state alteration
 | |
|         project_state = ProjectState()
 | |
|         new_state = project_state.clone()
 | |
|         operation.state_forwards(app_label, new_state)
 | |
|         # Test the database alteration
 | |
|         self.assertTableNotExists("%s_pony" % app_label)
 | |
|         with connection.schema_editor() as editor:
 | |
|             operation.database_forwards(app_label, editor, project_state, new_state)
 | |
|         if should_run:
 | |
|             self.assertTableExists("%s_pony" % app_label)
 | |
|         else:
 | |
|             self.assertTableNotExists("%s_pony" % app_label)
 | |
|         # And test reversal
 | |
|         with connection.schema_editor() as editor:
 | |
|             operation.database_backwards(app_label, editor, new_state, project_state)
 | |
|         self.assertTableNotExists("%s_pony" % app_label)
 | |
| 
 | |
|     @override_settings(DATABASE_ROUTERS=[AgnosticRouter()])
 | |
|     def test_create_model(self):
 | |
|         """
 | |
|         Test when router doesn't have an opinion (i.e. CreateModel should run).
 | |
|         """
 | |
|         self._test_create_model("test_mltdb_crmo", should_run=True)
 | |
| 
 | |
|     @override_settings(DATABASE_ROUTERS=[MigrateNothingRouter()])
 | |
|     def test_create_model2(self):
 | |
|         """
 | |
|         Test when router returns False (i.e. CreateModel shouldn't run).
 | |
|         """
 | |
|         self._test_create_model("test_mltdb_crmo2", should_run=False)
 | |
| 
 | |
|     @override_settings(DATABASE_ROUTERS=[MigrateEverythingRouter()])
 | |
|     def test_create_model3(self):
 | |
|         """
 | |
|         Test when router returns True (i.e. CreateModel should run).
 | |
|         """
 | |
|         self._test_create_model("test_mltdb_crmo3", should_run=True)
 | |
| 
 | |
|     def test_create_model4(self):
 | |
|         """
 | |
|         Test multiple routers.
 | |
|         """
 | |
|         with override_settings(DATABASE_ROUTERS=[AgnosticRouter(), AgnosticRouter()]):
 | |
|             self._test_create_model("test_mltdb_crmo4", should_run=True)
 | |
|         with override_settings(DATABASE_ROUTERS=[MigrateNothingRouter(), MigrateEverythingRouter()]):
 | |
|             self._test_create_model("test_mltdb_crmo4", should_run=False)
 | |
|         with override_settings(DATABASE_ROUTERS=[MigrateEverythingRouter(), MigrateNothingRouter()]):
 | |
|             self._test_create_model("test_mltdb_crmo4", should_run=True)
 | |
| 
 | |
|     def _test_run_sql(self, app_label, should_run, hints=None):
 | |
|         with override_settings(DATABASE_ROUTERS=[MigrateEverythingRouter()]):
 | |
|             project_state = self.set_up_test_model(app_label)
 | |
| 
 | |
|         sql = """
 | |
|         INSERT INTO {0}_pony (pink, weight) VALUES (1, 3.55);
 | |
|         INSERT INTO {0}_pony (pink, weight) VALUES (3, 5.0);
 | |
|         """.format(app_label)
 | |
| 
 | |
|         operation = migrations.RunSQL(sql, hints=hints or {})
 | |
|         # Test the state alteration does nothing
 | |
|         new_state = project_state.clone()
 | |
|         operation.state_forwards(app_label, new_state)
 | |
|         self.assertEqual(new_state, project_state)
 | |
|         # Test the database alteration
 | |
|         self.assertEqual(project_state.apps.get_model(app_label, "Pony").objects.count(), 0)
 | |
|         with connection.schema_editor() as editor:
 | |
|             operation.database_forwards(app_label, editor, project_state, new_state)
 | |
|         Pony = project_state.apps.get_model(app_label, "Pony")
 | |
|         if should_run:
 | |
|             self.assertEqual(Pony.objects.count(), 2)
 | |
|         else:
 | |
|             self.assertEqual(Pony.objects.count(), 0)
 | |
| 
 | |
|     @unittest.skipIf(sqlparse is None and connection.features.requires_sqlparse_for_splitting, "Missing sqlparse")
 | |
|     @override_settings(DATABASE_ROUTERS=[MigrateNothingRouter()])
 | |
|     def test_run_sql(self):
 | |
|         self._test_run_sql("test_mltdb_runsql", should_run=False)
 | |
| 
 | |
|     @unittest.skipIf(sqlparse is None and connection.features.requires_sqlparse_for_splitting, "Missing sqlparse")
 | |
|     @override_settings(DATABASE_ROUTERS=[MigrateWhenFooRouter()])
 | |
|     def test_run_sql2(self):
 | |
|         self._test_run_sql("test_mltdb_runsql2", should_run=False)
 | |
|         self._test_run_sql("test_mltdb_runsql2", should_run=True, hints={'foo': True})
 | |
| 
 | |
|     def _test_run_python(self, app_label, should_run, hints=None):
 | |
|         with override_settings(DATABASE_ROUTERS=[MigrateEverythingRouter()]):
 | |
|             project_state = self.set_up_test_model(app_label)
 | |
| 
 | |
|         # Create the operation
 | |
|         def inner_method(models, schema_editor):
 | |
|             Pony = models.get_model(app_label, "Pony")
 | |
|             Pony.objects.create(pink=1, weight=3.55)
 | |
|             Pony.objects.create(weight=5)
 | |
| 
 | |
|         operation = migrations.RunPython(inner_method, hints=hints or {})
 | |
|         # Test the state alteration does nothing
 | |
|         new_state = project_state.clone()
 | |
|         operation.state_forwards(app_label, new_state)
 | |
|         self.assertEqual(new_state, project_state)
 | |
|         # Test the database alteration
 | |
|         self.assertEqual(project_state.apps.get_model(app_label, "Pony").objects.count(), 0)
 | |
|         with connection.schema_editor() as editor:
 | |
|             operation.database_forwards(app_label, editor, project_state, new_state)
 | |
|         Pony = project_state.apps.get_model(app_label, "Pony")
 | |
|         if should_run:
 | |
|             self.assertEqual(Pony.objects.count(), 2)
 | |
|         else:
 | |
|             self.assertEqual(Pony.objects.count(), 0)
 | |
| 
 | |
|     @override_settings(DATABASE_ROUTERS=[MigrateNothingRouter()])
 | |
|     def test_run_python(self):
 | |
|         self._test_run_python("test_mltdb_runpython", should_run=False)
 | |
| 
 | |
|     @override_settings(DATABASE_ROUTERS=[MigrateWhenFooRouter()])
 | |
|     def test_run_python2(self):
 | |
|         self._test_run_python("test_mltdb_runpython2", should_run=False)
 | |
|         self._test_run_python("test_mltdb_runpython2", should_run=True, hints={'foo': True})
 |