mirror of
https://github.com/django/django.git
synced 2025-10-24 06:06:09 +00:00
1392 lines
64 KiB
Python
1392 lines
64 KiB
Python
import datetime
|
|
import importlib
|
|
import io
|
|
import os
|
|
import sys
|
|
from unittest import mock
|
|
|
|
from django.apps import apps
|
|
from django.core.management import CommandError, call_command
|
|
from django.db import (
|
|
ConnectionHandler, DatabaseError, connection, connections, models,
|
|
)
|
|
from django.db.backends.base.schema import BaseDatabaseSchemaEditor
|
|
from django.db.migrations.exceptions import InconsistentMigrationHistory
|
|
from django.db.migrations.recorder import MigrationRecorder
|
|
from django.test import override_settings
|
|
|
|
from .models import UnicodeModel, UnserializableModel
|
|
from .routers import TestRouter
|
|
from .test_base import MigrationTestBase
|
|
|
|
|
|
class MigrateTests(MigrationTestBase):
|
|
"""
|
|
Tests running the migrate command.
|
|
"""
|
|
multi_db = True
|
|
|
|
@override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations"})
|
|
def test_migrate(self):
|
|
"""
|
|
Tests basic usage of the migrate command.
|
|
"""
|
|
# No tables are created
|
|
self.assertTableNotExists("migrations_author")
|
|
self.assertTableNotExists("migrations_tribble")
|
|
self.assertTableNotExists("migrations_book")
|
|
# Run the migrations to 0001 only
|
|
call_command("migrate", "migrations", "0001", verbosity=0)
|
|
# The correct tables exist
|
|
self.assertTableExists("migrations_author")
|
|
self.assertTableExists("migrations_tribble")
|
|
self.assertTableNotExists("migrations_book")
|
|
# Run migrations all the way
|
|
call_command("migrate", verbosity=0)
|
|
# The correct tables exist
|
|
self.assertTableExists("migrations_author")
|
|
self.assertTableNotExists("migrations_tribble")
|
|
self.assertTableExists("migrations_book")
|
|
# Unmigrate everything
|
|
call_command("migrate", "migrations", "zero", verbosity=0)
|
|
# Tables are gone
|
|
self.assertTableNotExists("migrations_author")
|
|
self.assertTableNotExists("migrations_tribble")
|
|
self.assertTableNotExists("migrations_book")
|
|
|
|
@override_settings(INSTALLED_APPS=[
|
|
'django.contrib.auth',
|
|
'django.contrib.contenttypes',
|
|
'migrations.migrations_test_apps.migrated_app',
|
|
])
|
|
def test_migrate_with_system_checks(self):
|
|
out = io.StringIO()
|
|
call_command('migrate', skip_checks=False, no_color=True, stdout=out)
|
|
self.assertIn('Apply all migrations: migrated_app', out.getvalue())
|
|
|
|
@override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations_initial_false"})
|
|
def test_migrate_initial_false(self):
|
|
"""
|
|
`Migration.initial = False` skips fake-initial detection.
|
|
"""
|
|
# Make sure no tables are created
|
|
self.assertTableNotExists("migrations_author")
|
|
self.assertTableNotExists("migrations_tribble")
|
|
# Run the migrations to 0001 only
|
|
call_command("migrate", "migrations", "0001", verbosity=0)
|
|
# Fake rollback
|
|
call_command("migrate", "migrations", "zero", fake=True, verbosity=0)
|
|
# Make sure fake-initial detection does not run
|
|
with self.assertRaises(DatabaseError):
|
|
call_command("migrate", "migrations", "0001", fake_initial=True, verbosity=0)
|
|
|
|
call_command("migrate", "migrations", "0001", fake=True, verbosity=0)
|
|
# Real rollback
|
|
call_command("migrate", "migrations", "zero", verbosity=0)
|
|
# Make sure it's all gone
|
|
self.assertTableNotExists("migrations_author")
|
|
self.assertTableNotExists("migrations_tribble")
|
|
self.assertTableNotExists("migrations_book")
|
|
|
|
@override_settings(
|
|
MIGRATION_MODULES={"migrations": "migrations.test_migrations"},
|
|
DATABASE_ROUTERS=['migrations.routers.TestRouter'],
|
|
)
|
|
def test_migrate_fake_initial(self):
|
|
"""
|
|
--fake-initial only works if all tables created in the initial
|
|
migration of an app exists. Database routers must be obeyed when doing
|
|
that check.
|
|
"""
|
|
# Make sure no tables are created
|
|
for db in connections:
|
|
self.assertTableNotExists("migrations_author", using=db)
|
|
self.assertTableNotExists("migrations_tribble", using=db)
|
|
# Run the migrations to 0001 only
|
|
call_command("migrate", "migrations", "0001", verbosity=0)
|
|
call_command("migrate", "migrations", "0001", verbosity=0, database="other")
|
|
# Make sure the right tables exist
|
|
self.assertTableExists("migrations_author")
|
|
self.assertTableNotExists("migrations_tribble")
|
|
# Also check the "other" database
|
|
self.assertTableNotExists("migrations_author", using="other")
|
|
self.assertTableExists("migrations_tribble", using="other")
|
|
|
|
# Fake a roll-back
|
|
call_command("migrate", "migrations", "zero", fake=True, verbosity=0)
|
|
call_command("migrate", "migrations", "zero", fake=True, verbosity=0, database="other")
|
|
# Make sure the tables still exist
|
|
self.assertTableExists("migrations_author")
|
|
self.assertTableExists("migrations_tribble", using="other")
|
|
# Try to run initial migration
|
|
with self.assertRaises(DatabaseError):
|
|
call_command("migrate", "migrations", "0001", verbosity=0)
|
|
# Run initial migration with an explicit --fake-initial
|
|
out = io.StringIO()
|
|
with mock.patch('django.core.management.color.supports_color', lambda *args: False):
|
|
call_command("migrate", "migrations", "0001", fake_initial=True, stdout=out, verbosity=1)
|
|
call_command("migrate", "migrations", "0001", fake_initial=True, verbosity=0, database="other")
|
|
self.assertIn(
|
|
"migrations.0001_initial... faked",
|
|
out.getvalue().lower()
|
|
)
|
|
# Run migrations all the way
|
|
call_command("migrate", verbosity=0)
|
|
call_command("migrate", verbosity=0, database="other")
|
|
# Make sure the right tables exist
|
|
self.assertTableExists("migrations_author")
|
|
self.assertTableNotExists("migrations_tribble")
|
|
self.assertTableExists("migrations_book")
|
|
self.assertTableNotExists("migrations_author", using="other")
|
|
self.assertTableNotExists("migrations_tribble", using="other")
|
|
self.assertTableNotExists("migrations_book", using="other")
|
|
# Fake a roll-back
|
|
call_command("migrate", "migrations", "zero", fake=True, verbosity=0)
|
|
call_command("migrate", "migrations", "zero", fake=True, verbosity=0, database="other")
|
|
# Make sure the tables still exist
|
|
self.assertTableExists("migrations_author")
|
|
self.assertTableNotExists("migrations_tribble")
|
|
self.assertTableExists("migrations_book")
|
|
# Try to run initial migration
|
|
with self.assertRaises(DatabaseError):
|
|
call_command("migrate", "migrations", verbosity=0)
|
|
# Run initial migration with an explicit --fake-initial
|
|
with self.assertRaises(DatabaseError):
|
|
# Fails because "migrations_tribble" does not exist but needs to in
|
|
# order to make --fake-initial work.
|
|
call_command("migrate", "migrations", fake_initial=True, verbosity=0)
|
|
# Fake a apply
|
|
call_command("migrate", "migrations", fake=True, verbosity=0)
|
|
call_command("migrate", "migrations", fake=True, verbosity=0, database="other")
|
|
# Unmigrate everything
|
|
call_command("migrate", "migrations", "zero", verbosity=0)
|
|
call_command("migrate", "migrations", "zero", verbosity=0, database="other")
|
|
# Make sure it's all gone
|
|
for db in connections:
|
|
self.assertTableNotExists("migrations_author", using=db)
|
|
self.assertTableNotExists("migrations_tribble", using=db)
|
|
self.assertTableNotExists("migrations_book", using=db)
|
|
|
|
@override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations_fake_split_initial"})
|
|
def test_migrate_fake_split_initial(self):
|
|
"""
|
|
Split initial migrations can be faked with --fake-initial.
|
|
"""
|
|
call_command("migrate", "migrations", "0002", verbosity=0)
|
|
call_command("migrate", "migrations", "zero", fake=True, verbosity=0)
|
|
out = io.StringIO()
|
|
with mock.patch('django.core.management.color.supports_color', lambda *args: False):
|
|
call_command("migrate", "migrations", "0002", fake_initial=True, stdout=out, verbosity=1)
|
|
value = out.getvalue().lower()
|
|
self.assertIn("migrations.0001_initial... faked", value)
|
|
self.assertIn("migrations.0002_second... faked", value)
|
|
# Fake an apply
|
|
call_command("migrate", "migrations", fake=True, verbosity=0)
|
|
# Unmigrate everything
|
|
call_command("migrate", "migrations", "zero", verbosity=0)
|
|
|
|
@override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations_conflict"})
|
|
def test_migrate_conflict_exit(self):
|
|
"""
|
|
migrate exits if it detects a conflict.
|
|
"""
|
|
with self.assertRaisesMessage(CommandError, "Conflicting migrations detected"):
|
|
call_command("migrate", "migrations")
|
|
|
|
@override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations"})
|
|
def test_showmigrations_list(self):
|
|
"""
|
|
showmigrations --list displays migrations and whether or not they're
|
|
applied.
|
|
"""
|
|
out = io.StringIO()
|
|
with mock.patch('django.core.management.color.supports_color', lambda *args: True):
|
|
call_command("showmigrations", format='list', stdout=out, verbosity=0, no_color=False)
|
|
self.assertEqual(
|
|
'\x1b[1mmigrations\n\x1b[0m'
|
|
' [ ] 0001_initial\n'
|
|
' [ ] 0002_second\n',
|
|
out.getvalue().lower()
|
|
)
|
|
|
|
call_command("migrate", "migrations", "0001", verbosity=0)
|
|
|
|
out = io.StringIO()
|
|
# Giving the explicit app_label tests for selective `show_list` in the command
|
|
call_command("showmigrations", "migrations", format='list', stdout=out, verbosity=0, no_color=True)
|
|
self.assertEqual(
|
|
'migrations\n'
|
|
' [x] 0001_initial\n'
|
|
' [ ] 0002_second\n',
|
|
out.getvalue().lower()
|
|
)
|
|
# Cleanup by unmigrating everything
|
|
call_command("migrate", "migrations", "zero", verbosity=0)
|
|
|
|
@override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations_run_before"})
|
|
def test_showmigrations_plan(self):
|
|
"""
|
|
Tests --plan output of showmigrations command
|
|
"""
|
|
out = io.StringIO()
|
|
call_command("showmigrations", format='plan', stdout=out)
|
|
self.assertEqual(
|
|
"[ ] migrations.0001_initial\n"
|
|
"[ ] migrations.0003_third\n"
|
|
"[ ] migrations.0002_second\n",
|
|
out.getvalue().lower()
|
|
)
|
|
|
|
out = io.StringIO()
|
|
call_command("showmigrations", format='plan', stdout=out, verbosity=2)
|
|
self.assertEqual(
|
|
"[ ] migrations.0001_initial\n"
|
|
"[ ] migrations.0003_third ... (migrations.0001_initial)\n"
|
|
"[ ] migrations.0002_second ... (migrations.0001_initial, migrations.0003_third)\n",
|
|
out.getvalue().lower()
|
|
)
|
|
call_command("migrate", "migrations", "0003", verbosity=0)
|
|
|
|
out = io.StringIO()
|
|
call_command("showmigrations", format='plan', stdout=out)
|
|
self.assertEqual(
|
|
"[x] migrations.0001_initial\n"
|
|
"[x] migrations.0003_third\n"
|
|
"[ ] migrations.0002_second\n",
|
|
out.getvalue().lower()
|
|
)
|
|
|
|
out = io.StringIO()
|
|
call_command("showmigrations", format='plan', stdout=out, verbosity=2)
|
|
self.assertEqual(
|
|
"[x] migrations.0001_initial\n"
|
|
"[x] migrations.0003_third ... (migrations.0001_initial)\n"
|
|
"[ ] migrations.0002_second ... (migrations.0001_initial, migrations.0003_third)\n",
|
|
out.getvalue().lower()
|
|
)
|
|
|
|
# Cleanup by unmigrating everything
|
|
call_command("migrate", "migrations", "zero", verbosity=0)
|
|
|
|
@override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations_empty"})
|
|
def test_showmigrations_plan_no_migrations(self):
|
|
"""
|
|
Tests --plan output of showmigrations command without migrations
|
|
"""
|
|
out = io.StringIO()
|
|
call_command("showmigrations", format='plan', stdout=out)
|
|
self.assertEqual("", out.getvalue().lower())
|
|
|
|
out = io.StringIO()
|
|
call_command("showmigrations", format='plan', stdout=out, verbosity=2)
|
|
self.assertEqual("", out.getvalue().lower())
|
|
|
|
@override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed_complex"})
|
|
def test_showmigrations_plan_squashed(self):
|
|
"""
|
|
Tests --plan output of showmigrations command with squashed migrations.
|
|
"""
|
|
out = io.StringIO()
|
|
call_command("showmigrations", format='plan', stdout=out)
|
|
self.assertEqual(
|
|
"[ ] migrations.1_auto\n"
|
|
"[ ] migrations.2_auto\n"
|
|
"[ ] migrations.3_squashed_5\n"
|
|
"[ ] migrations.6_auto\n"
|
|
"[ ] migrations.7_auto\n",
|
|
out.getvalue().lower()
|
|
)
|
|
|
|
out = io.StringIO()
|
|
call_command("showmigrations", format='plan', stdout=out, verbosity=2)
|
|
self.assertEqual(
|
|
"[ ] migrations.1_auto\n"
|
|
"[ ] migrations.2_auto ... (migrations.1_auto)\n"
|
|
"[ ] migrations.3_squashed_5 ... (migrations.2_auto)\n"
|
|
"[ ] migrations.6_auto ... (migrations.3_squashed_5)\n"
|
|
"[ ] migrations.7_auto ... (migrations.6_auto)\n",
|
|
out.getvalue().lower()
|
|
)
|
|
|
|
call_command("migrate", "migrations", "3_squashed_5", verbosity=0)
|
|
|
|
out = io.StringIO()
|
|
call_command("showmigrations", format='plan', stdout=out)
|
|
self.assertEqual(
|
|
"[x] migrations.1_auto\n"
|
|
"[x] migrations.2_auto\n"
|
|
"[x] migrations.3_squashed_5\n"
|
|
"[ ] migrations.6_auto\n"
|
|
"[ ] migrations.7_auto\n",
|
|
out.getvalue().lower()
|
|
)
|
|
|
|
out = io.StringIO()
|
|
call_command("showmigrations", format='plan', stdout=out, verbosity=2)
|
|
self.assertEqual(
|
|
"[x] migrations.1_auto\n"
|
|
"[x] migrations.2_auto ... (migrations.1_auto)\n"
|
|
"[x] migrations.3_squashed_5 ... (migrations.2_auto)\n"
|
|
"[ ] migrations.6_auto ... (migrations.3_squashed_5)\n"
|
|
"[ ] migrations.7_auto ... (migrations.6_auto)\n",
|
|
out.getvalue().lower()
|
|
)
|
|
|
|
@override_settings(INSTALLED_APPS=[
|
|
'migrations.migrations_test_apps.mutate_state_b',
|
|
'migrations.migrations_test_apps.alter_fk.author_app',
|
|
'migrations.migrations_test_apps.alter_fk.book_app',
|
|
])
|
|
def test_showmigrations_plan_single_app_label(self):
|
|
"""
|
|
`showmigrations --plan app_label` output with a single app_label.
|
|
"""
|
|
# Single app with no dependencies on other apps.
|
|
out = io.StringIO()
|
|
call_command('showmigrations', 'mutate_state_b', format='plan', stdout=out)
|
|
self.assertEqual(
|
|
'[ ] mutate_state_b.0001_initial\n'
|
|
'[ ] mutate_state_b.0002_add_field\n',
|
|
out.getvalue()
|
|
)
|
|
# Single app with dependencies.
|
|
out = io.StringIO()
|
|
call_command('showmigrations', 'author_app', format='plan', stdout=out)
|
|
self.assertEqual(
|
|
'[ ] author_app.0001_initial\n'
|
|
'[ ] book_app.0001_initial\n'
|
|
'[ ] author_app.0002_alter_id\n',
|
|
out.getvalue()
|
|
)
|
|
# Some migrations already applied.
|
|
call_command('migrate', 'author_app', '0001', verbosity=0)
|
|
out = io.StringIO()
|
|
call_command('showmigrations', 'author_app', format='plan', stdout=out)
|
|
self.assertEqual(
|
|
'[X] author_app.0001_initial\n'
|
|
'[ ] book_app.0001_initial\n'
|
|
'[ ] author_app.0002_alter_id\n',
|
|
out.getvalue()
|
|
)
|
|
# Cleanup by unmigrating author_app.
|
|
call_command('migrate', 'author_app', 'zero', verbosity=0)
|
|
|
|
@override_settings(INSTALLED_APPS=[
|
|
'migrations.migrations_test_apps.mutate_state_b',
|
|
'migrations.migrations_test_apps.alter_fk.author_app',
|
|
'migrations.migrations_test_apps.alter_fk.book_app',
|
|
])
|
|
def test_showmigrations_plan_multiple_app_labels(self):
|
|
"""
|
|
`showmigrations --plan app_label` output with multiple app_labels.
|
|
"""
|
|
# Multiple apps: author_app depends on book_app; mutate_state_b doesn't
|
|
# depend on other apps.
|
|
out = io.StringIO()
|
|
call_command('showmigrations', 'mutate_state_b', 'author_app', format='plan', stdout=out)
|
|
self.assertEqual(
|
|
'[ ] author_app.0001_initial\n'
|
|
'[ ] book_app.0001_initial\n'
|
|
'[ ] author_app.0002_alter_id\n'
|
|
'[ ] mutate_state_b.0001_initial\n'
|
|
'[ ] mutate_state_b.0002_add_field\n',
|
|
out.getvalue()
|
|
)
|
|
# Multiple apps: args order shouldn't matter (the same result is
|
|
# expected as above).
|
|
out = io.StringIO()
|
|
call_command('showmigrations', 'author_app', 'mutate_state_b', format='plan', stdout=out)
|
|
self.assertEqual(
|
|
'[ ] author_app.0001_initial\n'
|
|
'[ ] book_app.0001_initial\n'
|
|
'[ ] author_app.0002_alter_id\n'
|
|
'[ ] mutate_state_b.0001_initial\n'
|
|
'[ ] mutate_state_b.0002_add_field\n',
|
|
out.getvalue()
|
|
)
|
|
|
|
@override_settings(INSTALLED_APPS=['migrations.migrations_test_apps.unmigrated_app'])
|
|
def test_showmigrations_plan_app_label_error(self):
|
|
"""
|
|
`showmigrations --plan app_label` raises an error when no app or
|
|
no migrations are present in provided app labels.
|
|
"""
|
|
# App with no migrations.
|
|
with self.assertRaisesMessage(CommandError, 'No migrations present for: unmigrated_app'):
|
|
call_command('showmigrations', 'unmigrated_app', format='plan')
|
|
# Nonexistent app (wrong app label).
|
|
with self.assertRaisesMessage(CommandError, 'No migrations present for: nonexistent_app'):
|
|
call_command('showmigrations', 'nonexistent_app', format='plan')
|
|
# Multiple nonexistent apps; input order shouldn't matter.
|
|
with self.assertRaisesMessage(CommandError, 'No migrations present for: nonexistent_app1, nonexistent_app2'):
|
|
call_command('showmigrations', 'nonexistent_app1', 'nonexistent_app2', format='plan')
|
|
with self.assertRaisesMessage(CommandError, 'No migrations present for: nonexistent_app1, nonexistent_app2'):
|
|
call_command('showmigrations', 'nonexistent_app2', 'nonexistent_app1', format='plan')
|
|
|
|
@override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations"})
|
|
def test_sqlmigrate_forwards(self):
|
|
"""
|
|
sqlmigrate outputs forward looking SQL.
|
|
"""
|
|
out = io.StringIO()
|
|
call_command("sqlmigrate", "migrations", "0001", stdout=out)
|
|
output = out.getvalue().lower()
|
|
|
|
index_tx_start = output.find(connection.ops.start_transaction_sql().lower())
|
|
index_op_desc_author = output.find('-- create model author')
|
|
index_create_table = output.find('create table')
|
|
index_op_desc_tribble = output.find('-- create model tribble')
|
|
index_op_desc_unique_together = output.find('-- alter unique_together')
|
|
index_tx_end = output.find(connection.ops.end_transaction_sql().lower())
|
|
|
|
self.assertGreater(index_tx_start, -1, "Transaction start not found")
|
|
self.assertGreater(
|
|
index_op_desc_author, index_tx_start,
|
|
"Operation description (author) not found or found before transaction start"
|
|
)
|
|
self.assertGreater(
|
|
index_create_table, index_op_desc_author,
|
|
"CREATE TABLE not found or found before operation description (author)"
|
|
)
|
|
self.assertGreater(
|
|
index_op_desc_tribble, index_create_table,
|
|
"Operation description (tribble) not found or found before CREATE TABLE (author)"
|
|
)
|
|
self.assertGreater(
|
|
index_op_desc_unique_together, index_op_desc_tribble,
|
|
"Operation description (unique_together) not found or found before operation description (tribble)"
|
|
)
|
|
self.assertGreater(
|
|
index_tx_end, index_op_desc_unique_together,
|
|
"Transaction end not found or found before operation description (unique_together)"
|
|
)
|
|
|
|
@override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations"})
|
|
def test_sqlmigrate_backwards(self):
|
|
"""
|
|
sqlmigrate outputs reverse looking SQL.
|
|
"""
|
|
# Cannot generate the reverse SQL unless we've applied the migration.
|
|
call_command("migrate", "migrations", verbosity=0)
|
|
|
|
out = io.StringIO()
|
|
call_command("sqlmigrate", "migrations", "0001", stdout=out, backwards=True)
|
|
output = out.getvalue().lower()
|
|
|
|
index_tx_start = output.find(connection.ops.start_transaction_sql().lower())
|
|
index_op_desc_unique_together = output.find('-- alter unique_together')
|
|
index_op_desc_tribble = output.find('-- create model tribble')
|
|
index_op_desc_author = output.find('-- create model author')
|
|
index_drop_table = output.rfind('drop table')
|
|
index_tx_end = output.find(connection.ops.end_transaction_sql().lower())
|
|
|
|
self.assertGreater(index_tx_start, -1, "Transaction start not found")
|
|
self.assertGreater(
|
|
index_op_desc_unique_together, index_tx_start,
|
|
"Operation description (unique_together) not found or found before transaction start"
|
|
)
|
|
self.assertGreater(
|
|
index_op_desc_tribble, index_op_desc_unique_together,
|
|
"Operation description (tribble) not found or found before operation description (unique_together)"
|
|
)
|
|
self.assertGreater(
|
|
index_op_desc_author, index_op_desc_tribble,
|
|
"Operation description (author) not found or found before operation description (tribble)"
|
|
)
|
|
|
|
self.assertGreater(
|
|
index_drop_table, index_op_desc_author,
|
|
"DROP TABLE not found or found before operation description (author)"
|
|
)
|
|
self.assertGreater(
|
|
index_tx_end, index_op_desc_unique_together,
|
|
"Transaction end not found or found before DROP TABLE"
|
|
)
|
|
|
|
# Cleanup by unmigrating everything
|
|
call_command("migrate", "migrations", "zero", verbosity=0)
|
|
|
|
@override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations_non_atomic"})
|
|
def test_sqlmigrate_for_non_atomic_migration(self):
|
|
"""
|
|
Transaction wrappers aren't shown for non-atomic migrations.
|
|
"""
|
|
out = io.StringIO()
|
|
call_command("sqlmigrate", "migrations", "0001", stdout=out)
|
|
output = out.getvalue().lower()
|
|
queries = [q.strip() for q in output.splitlines()]
|
|
if connection.ops.start_transaction_sql():
|
|
self.assertNotIn(connection.ops.start_transaction_sql().lower(), queries)
|
|
self.assertNotIn(connection.ops.end_transaction_sql().lower(), queries)
|
|
|
|
@override_settings(
|
|
INSTALLED_APPS=[
|
|
"migrations.migrations_test_apps.migrated_app",
|
|
"migrations.migrations_test_apps.migrated_unapplied_app",
|
|
"migrations.migrations_test_apps.unmigrated_app",
|
|
],
|
|
)
|
|
def test_regression_22823_unmigrated_fk_to_migrated_model(self):
|
|
"""
|
|
Assuming you have 3 apps, `A`, `B`, and `C`, such that:
|
|
|
|
* `A` has migrations
|
|
* `B` has a migration we want to apply
|
|
* `C` has no migrations, but has an FK to `A`
|
|
|
|
When we try to migrate "B", an exception occurs because the
|
|
"B" was not included in the ProjectState that is used to detect
|
|
soft-applied migrations (#22823).
|
|
"""
|
|
call_command("migrate", "migrated_unapplied_app", stdout=io.StringIO())
|
|
|
|
# unmigrated_app.SillyModel has a foreign key to 'migrations.Tribble',
|
|
# but that model is only defined in a migration, so the global app
|
|
# registry never sees it and the reference is left dangling. Remove it
|
|
# to avoid problems in subsequent tests.
|
|
del apps._pending_operations[('migrations', 'tribble')]
|
|
|
|
@override_settings(INSTALLED_APPS=['migrations.migrations_test_apps.unmigrated_app_syncdb'])
|
|
def test_migrate_syncdb_deferred_sql_executed_with_schemaeditor(self):
|
|
"""
|
|
For an app without migrations, editor.execute() is used for executing
|
|
the syncdb deferred SQL.
|
|
"""
|
|
with mock.patch.object(BaseDatabaseSchemaEditor, 'execute') as execute:
|
|
call_command('migrate', run_syncdb=True, verbosity=0)
|
|
create_table_count = len([call for call in execute.mock_calls if 'CREATE TABLE' in str(call)])
|
|
self.assertEqual(create_table_count, 2)
|
|
# There's at least one deferred SQL for creating the foreign key
|
|
# index.
|
|
self.assertGreater(len(execute.mock_calls), 2)
|
|
|
|
@override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed"})
|
|
def test_migrate_record_replaced(self):
|
|
"""
|
|
Running a single squashed migration should record all of the original
|
|
replaced migrations as run.
|
|
"""
|
|
recorder = MigrationRecorder(connection)
|
|
out = io.StringIO()
|
|
call_command("migrate", "migrations", verbosity=0)
|
|
call_command("showmigrations", "migrations", stdout=out, no_color=True)
|
|
self.assertEqual(
|
|
'migrations\n'
|
|
' [x] 0001_squashed_0002 (2 squashed migrations)\n',
|
|
out.getvalue().lower()
|
|
)
|
|
applied_migrations = recorder.applied_migrations()
|
|
self.assertIn(("migrations", "0001_initial"), applied_migrations)
|
|
self.assertIn(("migrations", "0002_second"), applied_migrations)
|
|
self.assertIn(("migrations", "0001_squashed_0002"), applied_migrations)
|
|
# Rollback changes
|
|
call_command("migrate", "migrations", "zero", verbosity=0)
|
|
|
|
@override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed"})
|
|
def test_migrate_record_squashed(self):
|
|
"""
|
|
Running migrate for a squashed migration should record as run
|
|
if all of the replaced migrations have been run (#25231).
|
|
"""
|
|
recorder = MigrationRecorder(connection)
|
|
recorder.record_applied("migrations", "0001_initial")
|
|
recorder.record_applied("migrations", "0002_second")
|
|
out = io.StringIO()
|
|
call_command("migrate", "migrations", verbosity=0)
|
|
call_command("showmigrations", "migrations", stdout=out, no_color=True)
|
|
self.assertEqual(
|
|
'migrations\n'
|
|
' [x] 0001_squashed_0002 (2 squashed migrations)\n',
|
|
out.getvalue().lower()
|
|
)
|
|
self.assertIn(
|
|
("migrations", "0001_squashed_0002"),
|
|
recorder.applied_migrations()
|
|
)
|
|
# No changes were actually applied so there is nothing to rollback
|
|
|
|
@override_settings(MIGRATION_MODULES={'migrations': 'migrations.test_migrations'})
|
|
def test_migrate_inconsistent_history(self):
|
|
"""
|
|
Running migrate with some migrations applied before their dependencies
|
|
should not be allowed.
|
|
"""
|
|
recorder = MigrationRecorder(connection)
|
|
recorder.record_applied("migrations", "0002_second")
|
|
msg = "Migration migrations.0002_second is applied before its dependency migrations.0001_initial"
|
|
with self.assertRaisesMessage(InconsistentMigrationHistory, msg):
|
|
call_command("migrate")
|
|
applied_migrations = recorder.applied_migrations()
|
|
self.assertNotIn(("migrations", "0001_initial"), applied_migrations)
|
|
|
|
|
|
class MakeMigrationsTests(MigrationTestBase):
|
|
"""
|
|
Tests running the makemigrations command.
|
|
"""
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
self._old_models = apps.app_configs['migrations'].models.copy()
|
|
|
|
def tearDown(self):
|
|
apps.app_configs['migrations'].models = self._old_models
|
|
apps.all_models['migrations'] = self._old_models
|
|
apps.clear_cache()
|
|
super().tearDown()
|
|
|
|
def test_files_content(self):
|
|
self.assertTableNotExists("migrations_unicodemodel")
|
|
apps.register_model('migrations', UnicodeModel)
|
|
with self.temporary_migration_module() as migration_dir:
|
|
call_command("makemigrations", "migrations", verbosity=0)
|
|
|
|
# Check for empty __init__.py file in migrations folder
|
|
init_file = os.path.join(migration_dir, "__init__.py")
|
|
self.assertTrue(os.path.exists(init_file))
|
|
|
|
with open(init_file, 'r') as fp:
|
|
content = fp.read()
|
|
self.assertEqual(content, '')
|
|
|
|
# Check for existing 0001_initial.py file in migration folder
|
|
initial_file = os.path.join(migration_dir, "0001_initial.py")
|
|
self.assertTrue(os.path.exists(initial_file))
|
|
|
|
with open(initial_file, 'r', encoding='utf-8') as fp:
|
|
content = fp.read()
|
|
self.assertIn('migrations.CreateModel', content)
|
|
self.assertIn('initial = True', content)
|
|
|
|
self.assertIn('úñí©óðé µóðéø', content) # Meta.verbose_name
|
|
self.assertIn('úñí©óðé µóðéøß', content) # Meta.verbose_name_plural
|
|
self.assertIn('ÚÑÍ¢ÓÐÉ', content) # title.verbose_name
|
|
self.assertIn('“Ðjáñgó”', content) # title.default
|
|
|
|
def test_makemigrations_order(self):
|
|
"""
|
|
makemigrations should recognize number-only migrations (0001.py).
|
|
"""
|
|
module = 'migrations.test_migrations_order'
|
|
with self.temporary_migration_module(module=module) as migration_dir:
|
|
if hasattr(importlib, 'invalidate_caches'):
|
|
# importlib caches os.listdir() on some platforms like macOS
|
|
# (#23850).
|
|
importlib.invalidate_caches()
|
|
call_command('makemigrations', 'migrations', '--empty', '-n', 'a', '-v', '0')
|
|
self.assertTrue(os.path.exists(os.path.join(migration_dir, '0002_a.py')))
|
|
|
|
def test_makemigrations_empty_connections(self):
|
|
empty_connections = ConnectionHandler({'default': {}})
|
|
with mock.patch('django.core.management.commands.makemigrations.connections', new=empty_connections):
|
|
# with no apps
|
|
out = io.StringIO()
|
|
call_command('makemigrations', stdout=out)
|
|
self.assertIn('No changes detected', out.getvalue())
|
|
# with an app
|
|
with self.temporary_migration_module() as migration_dir:
|
|
call_command('makemigrations', 'migrations', verbosity=0)
|
|
init_file = os.path.join(migration_dir, '__init__.py')
|
|
self.assertTrue(os.path.exists(init_file))
|
|
|
|
@override_settings(INSTALLED_APPS=['migrations', 'migrations2'])
|
|
def test_makemigrations_consistency_checks_respect_routers(self):
|
|
"""
|
|
The history consistency checks in makemigrations respect
|
|
settings.DATABASE_ROUTERS.
|
|
"""
|
|
def patched_has_table(migration_recorder):
|
|
if migration_recorder.connection is connections['other']:
|
|
raise Exception('Other connection')
|
|
else:
|
|
return mock.DEFAULT
|
|
|
|
self.assertTableNotExists('migrations_unicodemodel')
|
|
apps.register_model('migrations', UnicodeModel)
|
|
with mock.patch.object(
|
|
MigrationRecorder, 'has_table',
|
|
autospec=True, side_effect=patched_has_table) as has_table:
|
|
with self.temporary_migration_module() as migration_dir:
|
|
call_command("makemigrations", "migrations", verbosity=0)
|
|
initial_file = os.path.join(migration_dir, "0001_initial.py")
|
|
self.assertTrue(os.path.exists(initial_file))
|
|
self.assertEqual(has_table.call_count, 1) # 'default' is checked
|
|
|
|
# Router says not to migrate 'other' so consistency shouldn't
|
|
# be checked.
|
|
with self.settings(DATABASE_ROUTERS=['migrations.routers.TestRouter']):
|
|
call_command('makemigrations', 'migrations', verbosity=0)
|
|
self.assertEqual(has_table.call_count, 2) # 'default' again
|
|
|
|
# With a router that doesn't prohibit migrating 'other',
|
|
# consistency is checked.
|
|
with self.settings(DATABASE_ROUTERS=['migrations.routers.EmptyRouter']):
|
|
with self.assertRaisesMessage(Exception, 'Other connection'):
|
|
call_command('makemigrations', 'migrations', verbosity=0)
|
|
self.assertEqual(has_table.call_count, 4) # 'default' and 'other'
|
|
|
|
# With a router that doesn't allow migrating on any database,
|
|
# no consistency checks are made.
|
|
with self.settings(DATABASE_ROUTERS=['migrations.routers.TestRouter']):
|
|
with mock.patch.object(TestRouter, 'allow_migrate', return_value=False) as allow_migrate:
|
|
call_command('makemigrations', 'migrations', verbosity=0)
|
|
allow_migrate.assert_any_call('other', 'migrations', model_name='UnicodeModel')
|
|
# allow_migrate() is called with the correct arguments.
|
|
self.assertGreater(len(allow_migrate.mock_calls), 0)
|
|
for mock_call in allow_migrate.mock_calls:
|
|
_, call_args, call_kwargs = mock_call
|
|
connection_alias, app_name = call_args
|
|
self.assertIn(connection_alias, ['default', 'other'])
|
|
# Raises an error if invalid app_name/model_name occurs.
|
|
apps.get_app_config(app_name).get_model(call_kwargs['model_name'])
|
|
self.assertEqual(has_table.call_count, 4)
|
|
|
|
def test_failing_migration(self):
|
|
# If a migration fails to serialize, it shouldn't generate an empty file. #21280
|
|
apps.register_model('migrations', UnserializableModel)
|
|
|
|
with self.temporary_migration_module() as migration_dir:
|
|
with self.assertRaisesMessage(ValueError, 'Cannot serialize'):
|
|
call_command("makemigrations", "migrations", verbosity=0)
|
|
|
|
initial_file = os.path.join(migration_dir, "0001_initial.py")
|
|
self.assertFalse(os.path.exists(initial_file))
|
|
|
|
def test_makemigrations_conflict_exit(self):
|
|
"""
|
|
makemigrations exits if it detects a conflict.
|
|
"""
|
|
with self.temporary_migration_module(module="migrations.test_migrations_conflict"):
|
|
with self.assertRaises(CommandError) as context:
|
|
call_command("makemigrations")
|
|
exception_message = str(context.exception)
|
|
self.assertIn(
|
|
'Conflicting migrations detected; multiple leaf nodes '
|
|
'in the migration graph:',
|
|
exception_message
|
|
)
|
|
self.assertIn('0002_second', exception_message)
|
|
self.assertIn('0002_conflicting_second', exception_message)
|
|
self.assertIn('in migrations', exception_message)
|
|
self.assertIn("To fix them run 'python manage.py makemigrations --merge'", exception_message)
|
|
|
|
def test_makemigrations_merge_no_conflict(self):
|
|
"""
|
|
makemigrations exits if in merge mode with no conflicts.
|
|
"""
|
|
out = io.StringIO()
|
|
with self.temporary_migration_module(module="migrations.test_migrations"):
|
|
call_command("makemigrations", merge=True, stdout=out)
|
|
self.assertIn("No conflicts detected to merge.", out.getvalue())
|
|
|
|
def test_makemigrations_no_app_sys_exit(self):
|
|
"""makemigrations exits if a nonexistent app is specified."""
|
|
err = io.StringIO()
|
|
with self.assertRaises(SystemExit):
|
|
call_command("makemigrations", "this_app_does_not_exist", stderr=err)
|
|
self.assertIn("'this_app_does_not_exist' could not be found.", err.getvalue())
|
|
|
|
def test_makemigrations_empty_no_app_specified(self):
|
|
"""
|
|
makemigrations exits if no app is specified with 'empty' mode.
|
|
"""
|
|
msg = 'You must supply at least one app label when using --empty.'
|
|
with self.assertRaisesMessage(CommandError, msg):
|
|
call_command("makemigrations", empty=True)
|
|
|
|
def test_makemigrations_empty_migration(self):
|
|
"""
|
|
makemigrations properly constructs an empty migration.
|
|
"""
|
|
with self.temporary_migration_module() as migration_dir:
|
|
call_command("makemigrations", "migrations", empty=True, verbosity=0)
|
|
|
|
# Check for existing 0001_initial.py file in migration folder
|
|
initial_file = os.path.join(migration_dir, "0001_initial.py")
|
|
self.assertTrue(os.path.exists(initial_file))
|
|
|
|
with open(initial_file, 'r', encoding='utf-8') as fp:
|
|
content = fp.read()
|
|
|
|
# Remove all whitespace to check for empty dependencies and operations
|
|
content = content.replace(' ', '')
|
|
self.assertIn('dependencies=[\n]', content)
|
|
self.assertIn('operations=[\n]', content)
|
|
|
|
@override_settings(MIGRATION_MODULES={"migrations": None})
|
|
def test_makemigrations_disabled_migrations_for_app(self):
|
|
"""
|
|
makemigrations raises a nice error when migrations are disabled for an
|
|
app.
|
|
"""
|
|
msg = (
|
|
"Django can't create migrations for app 'migrations' because migrations "
|
|
"have been disabled via the MIGRATION_MODULES setting."
|
|
)
|
|
with self.assertRaisesMessage(ValueError, msg):
|
|
call_command("makemigrations", "migrations", empty=True, verbosity=0)
|
|
|
|
def test_makemigrations_no_changes_no_apps(self):
|
|
"""
|
|
makemigrations exits when there are no changes and no apps are specified.
|
|
"""
|
|
out = io.StringIO()
|
|
call_command("makemigrations", stdout=out)
|
|
self.assertIn("No changes detected", out.getvalue())
|
|
|
|
def test_makemigrations_no_changes(self):
|
|
"""
|
|
makemigrations exits when there are no changes to an app.
|
|
"""
|
|
out = io.StringIO()
|
|
with self.temporary_migration_module(module="migrations.test_migrations_no_changes"):
|
|
call_command("makemigrations", "migrations", stdout=out)
|
|
self.assertIn("No changes detected in app 'migrations'", out.getvalue())
|
|
|
|
def test_makemigrations_no_apps_initial(self):
|
|
"""
|
|
makemigrations should detect initial is needed on empty migration
|
|
modules if no app provided.
|
|
"""
|
|
out = io.StringIO()
|
|
with self.temporary_migration_module(module="migrations.test_migrations_empty"):
|
|
call_command("makemigrations", stdout=out)
|
|
self.assertIn("0001_initial.py", out.getvalue())
|
|
|
|
def test_makemigrations_migrations_announce(self):
|
|
"""
|
|
makemigrations announces the migration at the default verbosity level.
|
|
"""
|
|
out = io.StringIO()
|
|
with self.temporary_migration_module():
|
|
call_command("makemigrations", "migrations", stdout=out)
|
|
self.assertIn("Migrations for 'migrations'", out.getvalue())
|
|
|
|
def test_makemigrations_no_common_ancestor(self):
|
|
"""
|
|
makemigrations fails to merge migrations with no common ancestor.
|
|
"""
|
|
with self.assertRaises(ValueError) as context:
|
|
with self.temporary_migration_module(module="migrations.test_migrations_no_ancestor"):
|
|
call_command("makemigrations", "migrations", merge=True)
|
|
exception_message = str(context.exception)
|
|
self.assertIn("Could not find common ancestor of", exception_message)
|
|
self.assertIn("0002_second", exception_message)
|
|
self.assertIn("0002_conflicting_second", exception_message)
|
|
|
|
def test_makemigrations_interactive_reject(self):
|
|
"""
|
|
makemigrations enters and exits interactive mode properly.
|
|
"""
|
|
# Monkeypatch interactive questioner to auto reject
|
|
with mock.patch('builtins.input', mock.Mock(return_value='N')):
|
|
with self.temporary_migration_module(module="migrations.test_migrations_conflict") as migration_dir:
|
|
call_command("makemigrations", "migrations", name="merge", merge=True, interactive=True, verbosity=0)
|
|
merge_file = os.path.join(migration_dir, '0003_merge.py')
|
|
self.assertFalse(os.path.exists(merge_file))
|
|
|
|
def test_makemigrations_interactive_accept(self):
|
|
"""
|
|
makemigrations enters interactive mode and merges properly.
|
|
"""
|
|
# Monkeypatch interactive questioner to auto accept
|
|
with mock.patch('builtins.input', mock.Mock(return_value='y')):
|
|
out = io.StringIO()
|
|
with self.temporary_migration_module(module="migrations.test_migrations_conflict") as migration_dir:
|
|
call_command("makemigrations", "migrations", name="merge", merge=True, interactive=True, stdout=out)
|
|
merge_file = os.path.join(migration_dir, '0003_merge.py')
|
|
self.assertTrue(os.path.exists(merge_file))
|
|
self.assertIn("Created new merge migration", out.getvalue())
|
|
|
|
@mock.patch('django.db.migrations.utils.datetime')
|
|
def test_makemigrations_default_merge_name(self, mock_datetime):
|
|
mock_datetime.datetime.now.return_value = datetime.datetime(2016, 1, 2, 3, 4)
|
|
with mock.patch('builtins.input', mock.Mock(return_value='y')):
|
|
out = io.StringIO()
|
|
with self.temporary_migration_module(module="migrations.test_migrations_conflict") as migration_dir:
|
|
call_command("makemigrations", "migrations", merge=True, interactive=True, stdout=out)
|
|
merge_file = os.path.join(migration_dir, '0003_merge_20160102_0304.py')
|
|
self.assertTrue(os.path.exists(merge_file))
|
|
self.assertIn("Created new merge migration", out.getvalue())
|
|
|
|
def test_makemigrations_non_interactive_not_null_addition(self):
|
|
"""
|
|
Non-interactive makemigrations fails when a default is missing on a
|
|
new not-null field.
|
|
"""
|
|
class SillyModel(models.Model):
|
|
silly_field = models.BooleanField(default=False)
|
|
silly_int = models.IntegerField()
|
|
|
|
class Meta:
|
|
app_label = "migrations"
|
|
|
|
out = io.StringIO()
|
|
with self.assertRaises(SystemExit):
|
|
with self.temporary_migration_module(module="migrations.test_migrations_no_default"):
|
|
call_command("makemigrations", "migrations", interactive=False, stdout=out)
|
|
|
|
def test_makemigrations_non_interactive_not_null_alteration(self):
|
|
"""
|
|
Non-interactive makemigrations fails when a default is missing on a
|
|
field changed to not-null.
|
|
"""
|
|
class Author(models.Model):
|
|
name = models.CharField(max_length=255)
|
|
slug = models.SlugField()
|
|
age = models.IntegerField(default=0)
|
|
|
|
class Meta:
|
|
app_label = "migrations"
|
|
|
|
out = io.StringIO()
|
|
with self.temporary_migration_module(module="migrations.test_migrations"):
|
|
call_command("makemigrations", "migrations", interactive=False, stdout=out)
|
|
self.assertIn("Alter field slug on author", out.getvalue())
|
|
|
|
def test_makemigrations_non_interactive_no_model_rename(self):
|
|
"""
|
|
makemigrations adds and removes a possible model rename in
|
|
non-interactive mode.
|
|
"""
|
|
class RenamedModel(models.Model):
|
|
silly_field = models.BooleanField(default=False)
|
|
|
|
class Meta:
|
|
app_label = "migrations"
|
|
|
|
out = io.StringIO()
|
|
with self.temporary_migration_module(module="migrations.test_migrations_no_default"):
|
|
call_command("makemigrations", "migrations", interactive=False, stdout=out)
|
|
self.assertIn("Delete model SillyModel", out.getvalue())
|
|
self.assertIn("Create model RenamedModel", out.getvalue())
|
|
|
|
def test_makemigrations_non_interactive_no_field_rename(self):
|
|
"""
|
|
makemigrations adds and removes a possible field rename in
|
|
non-interactive mode.
|
|
"""
|
|
class SillyModel(models.Model):
|
|
silly_rename = models.BooleanField(default=False)
|
|
|
|
class Meta:
|
|
app_label = "migrations"
|
|
|
|
out = io.StringIO()
|
|
with self.temporary_migration_module(module="migrations.test_migrations_no_default"):
|
|
call_command("makemigrations", "migrations", interactive=False, stdout=out)
|
|
self.assertIn("Remove field silly_field from sillymodel", out.getvalue())
|
|
self.assertIn("Add field silly_rename to sillymodel", out.getvalue())
|
|
|
|
def test_makemigrations_handle_merge(self):
|
|
"""
|
|
makemigrations properly merges the conflicting migrations with --noinput.
|
|
"""
|
|
out = io.StringIO()
|
|
with self.temporary_migration_module(module="migrations.test_migrations_conflict") as migration_dir:
|
|
call_command("makemigrations", "migrations", name="merge", merge=True, interactive=False, stdout=out)
|
|
merge_file = os.path.join(migration_dir, '0003_merge.py')
|
|
self.assertTrue(os.path.exists(merge_file))
|
|
output = out.getvalue()
|
|
self.assertIn("Merging migrations", output)
|
|
self.assertIn("Branch 0002_second", output)
|
|
self.assertIn("Branch 0002_conflicting_second", output)
|
|
self.assertIn("Created new merge migration", output)
|
|
|
|
def test_makemigration_merge_dry_run(self):
|
|
"""
|
|
makemigrations respects --dry-run option when fixing migration
|
|
conflicts (#24427).
|
|
"""
|
|
out = io.StringIO()
|
|
with self.temporary_migration_module(module="migrations.test_migrations_conflict") as migration_dir:
|
|
call_command(
|
|
"makemigrations", "migrations", name="merge", dry_run=True,
|
|
merge=True, interactive=False, stdout=out,
|
|
)
|
|
merge_file = os.path.join(migration_dir, '0003_merge.py')
|
|
self.assertFalse(os.path.exists(merge_file))
|
|
output = out.getvalue()
|
|
self.assertIn("Merging migrations", output)
|
|
self.assertIn("Branch 0002_second", output)
|
|
self.assertIn("Branch 0002_conflicting_second", output)
|
|
self.assertNotIn("Created new merge migration", output)
|
|
|
|
def test_makemigration_merge_dry_run_verbosity_3(self):
|
|
"""
|
|
`makemigrations --merge --dry-run` writes the merge migration file to
|
|
stdout with `verbosity == 3` (#24427).
|
|
"""
|
|
out = io.StringIO()
|
|
with self.temporary_migration_module(module="migrations.test_migrations_conflict") as migration_dir:
|
|
call_command(
|
|
"makemigrations", "migrations", name="merge", dry_run=True,
|
|
merge=True, interactive=False, stdout=out, verbosity=3,
|
|
)
|
|
merge_file = os.path.join(migration_dir, '0003_merge.py')
|
|
self.assertFalse(os.path.exists(merge_file))
|
|
output = out.getvalue()
|
|
self.assertIn("Merging migrations", output)
|
|
self.assertIn("Branch 0002_second", output)
|
|
self.assertIn("Branch 0002_conflicting_second", output)
|
|
self.assertNotIn("Created new merge migration", output)
|
|
|
|
# Additional output caused by verbosity 3
|
|
# The complete merge migration file that would be written
|
|
self.assertIn("class Migration(migrations.Migration):", output)
|
|
self.assertIn("dependencies = [", output)
|
|
self.assertIn("('migrations', '0002_second')", output)
|
|
self.assertIn("('migrations', '0002_conflicting_second')", output)
|
|
self.assertIn("operations = [", output)
|
|
self.assertIn("]", output)
|
|
|
|
def test_makemigrations_dry_run(self):
|
|
"""
|
|
`makemigrations --dry-run` should not ask for defaults.
|
|
"""
|
|
class SillyModel(models.Model):
|
|
silly_field = models.BooleanField(default=False)
|
|
silly_date = models.DateField() # Added field without a default
|
|
|
|
class Meta:
|
|
app_label = "migrations"
|
|
|
|
out = io.StringIO()
|
|
with self.temporary_migration_module(module="migrations.test_migrations_no_default"):
|
|
call_command("makemigrations", "migrations", dry_run=True, stdout=out)
|
|
# Output the expected changes directly, without asking for defaults
|
|
self.assertIn("Add field silly_date to sillymodel", out.getvalue())
|
|
|
|
def test_makemigrations_dry_run_verbosity_3(self):
|
|
"""
|
|
Allow `makemigrations --dry-run` to output the migrations file to
|
|
stdout (with verbosity == 3).
|
|
"""
|
|
class SillyModel(models.Model):
|
|
silly_field = models.BooleanField(default=False)
|
|
silly_char = models.CharField(default="")
|
|
|
|
class Meta:
|
|
app_label = "migrations"
|
|
|
|
out = io.StringIO()
|
|
with self.temporary_migration_module(module="migrations.test_migrations_no_default"):
|
|
call_command("makemigrations", "migrations", dry_run=True, stdout=out, verbosity=3)
|
|
|
|
# Normal --dry-run output
|
|
self.assertIn("- Add field silly_char to sillymodel", out.getvalue())
|
|
|
|
# Additional output caused by verbosity 3
|
|
# The complete migrations file that would be written
|
|
self.assertIn("class Migration(migrations.Migration):", out.getvalue())
|
|
self.assertIn("dependencies = [", out.getvalue())
|
|
self.assertIn("('migrations', '0001_initial'),", out.getvalue())
|
|
self.assertIn("migrations.AddField(", out.getvalue())
|
|
self.assertIn("model_name='sillymodel',", out.getvalue())
|
|
self.assertIn("name='silly_char',", out.getvalue())
|
|
|
|
def test_makemigrations_migrations_modules_path_not_exist(self):
|
|
"""
|
|
makemigrations creates migrations when specifying a custom location
|
|
for migration files using MIGRATION_MODULES if the custom path
|
|
doesn't already exist.
|
|
"""
|
|
class SillyModel(models.Model):
|
|
silly_field = models.BooleanField(default=False)
|
|
|
|
class Meta:
|
|
app_label = "migrations"
|
|
|
|
out = io.StringIO()
|
|
migration_module = "migrations.test_migrations_path_doesnt_exist.foo.bar"
|
|
with self.temporary_migration_module(module=migration_module) as migration_dir:
|
|
call_command("makemigrations", "migrations", stdout=out)
|
|
|
|
# Migrations file is actually created in the expected path.
|
|
initial_file = os.path.join(migration_dir, "0001_initial.py")
|
|
self.assertTrue(os.path.exists(initial_file))
|
|
|
|
# Command output indicates the migration is created.
|
|
self.assertIn(" - Create model SillyModel", out.getvalue())
|
|
|
|
def test_makemigrations_interactive_by_default(self):
|
|
"""
|
|
The user is prompted to merge by default if there are conflicts and
|
|
merge is True. Answer negative to differentiate it from behavior when
|
|
--noinput is specified.
|
|
"""
|
|
# Monkeypatch interactive questioner to auto reject
|
|
out = io.StringIO()
|
|
with mock.patch('builtins.input', mock.Mock(return_value='N')):
|
|
with self.temporary_migration_module(module="migrations.test_migrations_conflict") as migration_dir:
|
|
call_command("makemigrations", "migrations", name="merge", merge=True, stdout=out)
|
|
merge_file = os.path.join(migration_dir, '0003_merge.py')
|
|
# This will fail if interactive is False by default
|
|
self.assertFalse(os.path.exists(merge_file))
|
|
self.assertNotIn("Created new merge migration", out.getvalue())
|
|
|
|
@override_settings(
|
|
INSTALLED_APPS=[
|
|
"migrations",
|
|
"migrations.migrations_test_apps.unspecified_app_with_conflict"])
|
|
def test_makemigrations_unspecified_app_with_conflict_no_merge(self):
|
|
"""
|
|
makemigrations does not raise a CommandError when an unspecified app
|
|
has conflicting migrations.
|
|
"""
|
|
with self.temporary_migration_module(module="migrations.test_migrations_no_changes"):
|
|
call_command("makemigrations", "migrations", merge=False, verbosity=0)
|
|
|
|
@override_settings(
|
|
INSTALLED_APPS=[
|
|
"migrations.migrations_test_apps.migrated_app",
|
|
"migrations.migrations_test_apps.unspecified_app_with_conflict"])
|
|
def test_makemigrations_unspecified_app_with_conflict_merge(self):
|
|
"""
|
|
makemigrations does not create a merge for an unspecified app even if
|
|
it has conflicting migrations.
|
|
"""
|
|
# Monkeypatch interactive questioner to auto accept
|
|
with mock.patch('builtins.input', mock.Mock(return_value='y')):
|
|
out = io.StringIO()
|
|
with self.temporary_migration_module(app_label="migrated_app") as migration_dir:
|
|
call_command("makemigrations", "migrated_app", name="merge", merge=True, interactive=True, stdout=out)
|
|
merge_file = os.path.join(migration_dir, '0003_merge.py')
|
|
self.assertFalse(os.path.exists(merge_file))
|
|
self.assertIn("No conflicts detected to merge.", out.getvalue())
|
|
|
|
@override_settings(
|
|
INSTALLED_APPS=[
|
|
"migrations.migrations_test_apps.migrated_app",
|
|
"migrations.migrations_test_apps.conflicting_app_with_dependencies"])
|
|
def test_makemigrations_merge_dont_output_dependency_operations(self):
|
|
"""
|
|
makemigrations --merge does not output any operations from apps that
|
|
don't belong to a given app.
|
|
"""
|
|
# Monkeypatch interactive questioner to auto accept
|
|
with mock.patch('builtins.input', mock.Mock(return_value='N')):
|
|
out = io.StringIO()
|
|
with mock.patch('django.core.management.color.supports_color', lambda *args: False):
|
|
call_command(
|
|
"makemigrations", "conflicting_app_with_dependencies",
|
|
merge=True, interactive=True, stdout=out
|
|
)
|
|
val = out.getvalue().lower()
|
|
self.assertIn('merging conflicting_app_with_dependencies\n', val)
|
|
self.assertIn(
|
|
' branch 0002_conflicting_second\n'
|
|
' - create model something\n',
|
|
val
|
|
)
|
|
self.assertIn(
|
|
' branch 0002_second\n'
|
|
' - delete model tribble\n'
|
|
' - remove field silly_field from author\n'
|
|
' - add field rating to author\n'
|
|
' - create model book\n',
|
|
val
|
|
)
|
|
|
|
def test_makemigrations_with_custom_name(self):
|
|
"""
|
|
makemigrations --name generate a custom migration name.
|
|
"""
|
|
with self.temporary_migration_module() as migration_dir:
|
|
|
|
def cmd(migration_count, migration_name, *args):
|
|
call_command("makemigrations", "migrations", "--verbosity", "0", "--name", migration_name, *args)
|
|
migration_file = os.path.join(migration_dir, "%s_%s.py" % (migration_count, migration_name))
|
|
# Check for existing migration file in migration folder
|
|
self.assertTrue(os.path.exists(migration_file))
|
|
with open(migration_file, "r", encoding="utf-8") as fp:
|
|
content = fp.read()
|
|
content = content.replace(" ", "")
|
|
return content
|
|
|
|
# generate an initial migration
|
|
migration_name_0001 = "my_initial_migration"
|
|
content = cmd("0001", migration_name_0001)
|
|
self.assertIn("dependencies=[\n]", content)
|
|
|
|
# importlib caches os.listdir() on some platforms like macOS
|
|
# (#23850).
|
|
if hasattr(importlib, 'invalidate_caches'):
|
|
importlib.invalidate_caches()
|
|
|
|
# generate an empty migration
|
|
migration_name_0002 = "my_custom_migration"
|
|
content = cmd("0002", migration_name_0002, "--empty")
|
|
self.assertIn("dependencies=[\n('migrations','0001_%s'),\n]" % migration_name_0001, content)
|
|
self.assertIn("operations=[\n]", content)
|
|
|
|
def test_makemigrations_check(self):
|
|
"""
|
|
makemigrations --check should exit with a non-zero status when
|
|
there are changes to an app requiring migrations.
|
|
"""
|
|
with self.temporary_migration_module():
|
|
with self.assertRaises(SystemExit):
|
|
call_command("makemigrations", "--check", "migrations", verbosity=0)
|
|
|
|
with self.temporary_migration_module(module="migrations.test_migrations_no_changes"):
|
|
call_command("makemigrations", "--check", "migrations", verbosity=0)
|
|
|
|
def test_makemigrations_migration_path_output(self):
|
|
"""
|
|
makemigrations should print the relative paths to the migrations unless
|
|
they are outside of the current tree, in which case the absolute path
|
|
should be shown.
|
|
"""
|
|
out = io.StringIO()
|
|
apps.register_model('migrations', UnicodeModel)
|
|
with self.temporary_migration_module() as migration_dir:
|
|
call_command("makemigrations", "migrations", stdout=out)
|
|
self.assertIn(os.path.join(migration_dir, '0001_initial.py'), out.getvalue())
|
|
|
|
def test_makemigrations_migration_path_output_valueerror(self):
|
|
"""
|
|
makemigrations prints the absolute path if os.path.relpath() raises a
|
|
ValueError when it's impossible to obtain a relative path, e.g. on
|
|
Windows if Django is installed on a different drive than where the
|
|
migration files are created.
|
|
"""
|
|
out = io.StringIO()
|
|
with self.temporary_migration_module() as migration_dir:
|
|
with mock.patch('os.path.relpath', side_effect=ValueError):
|
|
call_command('makemigrations', 'migrations', stdout=out)
|
|
self.assertIn(os.path.join(migration_dir, '0001_initial.py'), out.getvalue())
|
|
|
|
def test_makemigrations_inconsistent_history(self):
|
|
"""
|
|
makemigrations should raise InconsistentMigrationHistory exception if
|
|
there are some migrations applied before their dependencies.
|
|
"""
|
|
recorder = MigrationRecorder(connection)
|
|
recorder.record_applied('migrations', '0002_second')
|
|
msg = "Migration migrations.0002_second is applied before its dependency migrations.0001_initial"
|
|
with self.temporary_migration_module(module="migrations.test_migrations"):
|
|
with self.assertRaisesMessage(InconsistentMigrationHistory, msg):
|
|
call_command("makemigrations")
|
|
|
|
@mock.patch('builtins.input', return_value='1')
|
|
@mock.patch('django.db.migrations.questioner.sys.stdin', mock.MagicMock(encoding=sys.getdefaultencoding()))
|
|
def test_makemigrations_auto_now_add_interactive(self, *args):
|
|
"""
|
|
makemigrations prompts the user when adding auto_now_add to an existing
|
|
model.
|
|
"""
|
|
class Entry(models.Model):
|
|
title = models.CharField(max_length=255)
|
|
creation_date = models.DateTimeField(auto_now_add=True)
|
|
|
|
class Meta:
|
|
app_label = 'migrations'
|
|
|
|
# Monkeypatch interactive questioner to auto accept
|
|
with mock.patch('django.db.migrations.questioner.sys.stdout', new_callable=io.StringIO) as prompt_stdout:
|
|
out = io.StringIO()
|
|
with self.temporary_migration_module(module='migrations.test_auto_now_add'):
|
|
call_command('makemigrations', 'migrations', interactive=True, stdout=out)
|
|
output = out.getvalue()
|
|
prompt_output = prompt_stdout.getvalue()
|
|
self.assertIn("You can accept the default 'timezone.now' by pressing 'Enter'", prompt_output)
|
|
self.assertIn("Add field creation_date to entry", output)
|
|
|
|
|
|
class SquashMigrationsTests(MigrationTestBase):
|
|
"""
|
|
Tests running the squashmigrations command.
|
|
"""
|
|
|
|
def test_squashmigrations_squashes(self):
|
|
"""
|
|
squashmigrations squashes migrations.
|
|
"""
|
|
with self.temporary_migration_module(module="migrations.test_migrations") as migration_dir:
|
|
call_command("squashmigrations", "migrations", "0002", interactive=False, verbosity=0)
|
|
|
|
squashed_migration_file = os.path.join(migration_dir, "0001_squashed_0002_second.py")
|
|
self.assertTrue(os.path.exists(squashed_migration_file))
|
|
|
|
def test_squashmigrations_initial_attribute(self):
|
|
with self.temporary_migration_module(module="migrations.test_migrations") as migration_dir:
|
|
call_command("squashmigrations", "migrations", "0002", interactive=False, verbosity=0)
|
|
|
|
squashed_migration_file = os.path.join(migration_dir, "0001_squashed_0002_second.py")
|
|
with open(squashed_migration_file, "r", encoding="utf-8") as fp:
|
|
content = fp.read()
|
|
self.assertIn("initial = True", content)
|
|
|
|
def test_squashmigrations_optimizes(self):
|
|
"""
|
|
squashmigrations optimizes operations.
|
|
"""
|
|
out = io.StringIO()
|
|
with self.temporary_migration_module(module="migrations.test_migrations"):
|
|
call_command("squashmigrations", "migrations", "0002", interactive=False, verbosity=1, stdout=out)
|
|
self.assertIn("Optimized from 8 operations to 3 operations.", out.getvalue())
|
|
|
|
def test_ticket_23799_squashmigrations_no_optimize(self):
|
|
"""
|
|
squashmigrations --no-optimize doesn't optimize operations.
|
|
"""
|
|
out = io.StringIO()
|
|
with self.temporary_migration_module(module="migrations.test_migrations"):
|
|
call_command("squashmigrations", "migrations", "0002",
|
|
interactive=False, verbosity=1, no_optimize=True, stdout=out)
|
|
self.assertIn("Skipping optimization", out.getvalue())
|
|
|
|
def test_squashmigrations_valid_start(self):
|
|
"""
|
|
squashmigrations accepts a starting migration.
|
|
"""
|
|
out = io.StringIO()
|
|
with self.temporary_migration_module(module="migrations.test_migrations_no_changes") as migration_dir:
|
|
call_command("squashmigrations", "migrations", "0002", "0003",
|
|
interactive=False, verbosity=1, stdout=out)
|
|
|
|
squashed_migration_file = os.path.join(migration_dir, "0002_second_squashed_0003_third.py")
|
|
with open(squashed_migration_file, "r", encoding="utf-8") as fp:
|
|
content = fp.read()
|
|
self.assertIn(" ('migrations', '0001_initial')", content)
|
|
self.assertNotIn("initial = True", content)
|
|
out = out.getvalue()
|
|
self.assertNotIn(" - 0001_initial", out)
|
|
self.assertIn(" - 0002_second", out)
|
|
self.assertIn(" - 0003_third", out)
|
|
|
|
def test_squashmigrations_invalid_start(self):
|
|
"""
|
|
squashmigrations doesn't accept a starting migration after the ending migration.
|
|
"""
|
|
with self.temporary_migration_module(module="migrations.test_migrations_no_changes"):
|
|
msg = (
|
|
"The migration 'migrations.0003_third' cannot be found. Maybe "
|
|
"it comes after the migration 'migrations.0002_second'"
|
|
)
|
|
with self.assertRaisesMessage(CommandError, msg):
|
|
call_command("squashmigrations", "migrations", "0003", "0002", interactive=False, verbosity=0)
|
|
|
|
def test_squashed_name_with_start_migration_name(self):
|
|
"""--squashed-name specifies the new migration's name."""
|
|
squashed_name = 'squashed_name'
|
|
with self.temporary_migration_module(module='migrations.test_migrations') as migration_dir:
|
|
call_command(
|
|
'squashmigrations', 'migrations', '0001', '0002',
|
|
squashed_name=squashed_name, interactive=False, verbosity=0,
|
|
)
|
|
squashed_migration_file = os.path.join(migration_dir, '0001_%s.py' % squashed_name)
|
|
self.assertTrue(os.path.exists(squashed_migration_file))
|
|
|
|
def test_squashed_name_without_start_migration_name(self):
|
|
"""--squashed-name also works if a start migration is omitted."""
|
|
squashed_name = 'squashed_name'
|
|
with self.temporary_migration_module(module="migrations.test_migrations") as migration_dir:
|
|
call_command(
|
|
'squashmigrations', 'migrations', '0001',
|
|
squashed_name=squashed_name, interactive=False, verbosity=0,
|
|
)
|
|
squashed_migration_file = os.path.join(migration_dir, '0001_%s.py' % squashed_name)
|
|
self.assertTrue(os.path.exists(squashed_migration_file))
|