mirror of
https://github.com/django/django.git
synced 2025-01-09 18:06:34 +00:00
58cc91275a
Co-authored-by: Natalia <124304+nessita@users.noreply.github.com>
563 lines
21 KiB
Python
563 lines
21 KiB
Python
import os
|
|
import sys
|
|
from argparse import ArgumentDefaultsHelpFormatter
|
|
from io import StringIO
|
|
from pathlib import Path
|
|
from unittest import mock
|
|
|
|
from admin_scripts.tests import AdminScriptTestCase
|
|
|
|
from django.apps import apps
|
|
from django.core import management
|
|
from django.core.checks import Tags
|
|
from django.core.management import BaseCommand, CommandError, find_commands
|
|
from django.core.management.utils import (
|
|
find_command,
|
|
get_random_secret_key,
|
|
is_ignored_path,
|
|
normalize_path_patterns,
|
|
popen_wrapper,
|
|
run_formatters,
|
|
)
|
|
from django.db import connection
|
|
from django.test import SimpleTestCase, override_settings
|
|
from django.test.utils import captured_stderr, extend_sys_path
|
|
from django.utils import translation
|
|
|
|
from .management.commands import dance
|
|
from .utils import AssertFormatterFailureCaughtContext
|
|
|
|
|
|
# A minimal set of apps to avoid system checks running on all apps.
|
|
@override_settings(
|
|
INSTALLED_APPS=[
|
|
"django.contrib.auth",
|
|
"django.contrib.contenttypes",
|
|
"user_commands",
|
|
],
|
|
)
|
|
class CommandTests(SimpleTestCase):
|
|
def test_command(self):
|
|
out = StringIO()
|
|
management.call_command("dance", stdout=out)
|
|
self.assertIn("I don't feel like dancing Rock'n'Roll.\n", out.getvalue())
|
|
|
|
def test_command_style(self):
|
|
out = StringIO()
|
|
management.call_command("dance", style="Jive", stdout=out)
|
|
self.assertIn("I don't feel like dancing Jive.\n", out.getvalue())
|
|
# Passing options as arguments also works (thanks argparse)
|
|
management.call_command("dance", "--style", "Jive", stdout=out)
|
|
self.assertIn("I don't feel like dancing Jive.\n", out.getvalue())
|
|
|
|
def test_language_preserved(self):
|
|
with translation.override("fr"):
|
|
management.call_command("dance", verbosity=0)
|
|
self.assertEqual(translation.get_language(), "fr")
|
|
|
|
def test_explode(self):
|
|
"""An unknown command raises CommandError"""
|
|
with self.assertRaisesMessage(CommandError, "Unknown command: 'explode'"):
|
|
management.call_command(("explode",))
|
|
|
|
def test_system_exit(self):
|
|
"""Exception raised in a command should raise CommandError with
|
|
call_command, but SystemExit when run from command line
|
|
"""
|
|
with self.assertRaises(CommandError) as cm:
|
|
management.call_command("dance", example="raise")
|
|
self.assertEqual(cm.exception.returncode, 3)
|
|
dance.Command.requires_system_checks = []
|
|
try:
|
|
with captured_stderr() as stderr, self.assertRaises(SystemExit) as cm:
|
|
management.ManagementUtility(
|
|
["manage.py", "dance", "--example=raise"]
|
|
).execute()
|
|
self.assertEqual(cm.exception.code, 3)
|
|
finally:
|
|
dance.Command.requires_system_checks = "__all__"
|
|
self.assertIn("CommandError", stderr.getvalue())
|
|
|
|
def test_no_translations_deactivate_translations(self):
|
|
"""
|
|
When the Command handle method is decorated with @no_translations,
|
|
translations are deactivated inside the command.
|
|
"""
|
|
current_locale = translation.get_language()
|
|
with translation.override("pl"):
|
|
result = management.call_command("no_translations")
|
|
self.assertIsNone(result)
|
|
self.assertEqual(translation.get_language(), current_locale)
|
|
|
|
def test_find_command_without_PATH(self):
|
|
"""
|
|
find_command should still work when the PATH environment variable
|
|
doesn't exist (#22256).
|
|
"""
|
|
current_path = os.environ.pop("PATH", None)
|
|
|
|
try:
|
|
self.assertIsNone(find_command("_missing_"))
|
|
finally:
|
|
if current_path is not None:
|
|
os.environ["PATH"] = current_path
|
|
|
|
def test_discover_commands_in_eggs(self):
|
|
"""
|
|
Management commands can also be loaded from Python eggs.
|
|
"""
|
|
egg_dir = "%s/eggs" % os.path.dirname(__file__)
|
|
egg_name = "%s/basic.egg" % egg_dir
|
|
with extend_sys_path(egg_name):
|
|
with self.settings(INSTALLED_APPS=["commandegg"]):
|
|
cmds = find_commands(
|
|
os.path.join(apps.get_app_config("commandegg").path, "management")
|
|
)
|
|
self.assertEqual(cmds, ["eggcommand"])
|
|
|
|
def test_call_command_option_parsing(self):
|
|
"""
|
|
When passing the long option name to call_command, the available option
|
|
key is the option dest name (#22985).
|
|
"""
|
|
out = StringIO()
|
|
management.call_command("dance", stdout=out, opt_3=True)
|
|
self.assertIn("option3", out.getvalue())
|
|
self.assertNotIn("opt_3", out.getvalue())
|
|
self.assertNotIn("opt-3", out.getvalue())
|
|
|
|
def test_call_command_option_parsing_non_string_arg(self):
|
|
"""
|
|
It should be possible to pass non-string arguments to call_command.
|
|
"""
|
|
out = StringIO()
|
|
management.call_command("dance", 1, verbosity=0, stdout=out)
|
|
self.assertIn("You passed 1 as a positional argument.", out.getvalue())
|
|
|
|
def test_calling_a_command_with_only_empty_parameter_should_ends_gracefully(self):
|
|
out = StringIO()
|
|
management.call_command("hal", "--empty", stdout=out)
|
|
self.assertEqual(out.getvalue(), "\nDave, I can't do that.\n")
|
|
|
|
def test_calling_command_with_app_labels_and_parameters_should_be_ok(self):
|
|
out = StringIO()
|
|
management.call_command("hal", "myapp", "--verbosity", "3", stdout=out)
|
|
self.assertIn(
|
|
"Dave, my mind is going. I can feel it. I can feel it.\n", out.getvalue()
|
|
)
|
|
|
|
def test_calling_command_with_parameters_and_app_labels_at_the_end_should_be_ok(
|
|
self,
|
|
):
|
|
out = StringIO()
|
|
management.call_command("hal", "--verbosity", "3", "myapp", stdout=out)
|
|
self.assertIn(
|
|
"Dave, my mind is going. I can feel it. I can feel it.\n", out.getvalue()
|
|
)
|
|
|
|
def test_calling_a_command_with_no_app_labels_and_parameters_raise_command_error(
|
|
self,
|
|
):
|
|
with self.assertRaises(CommandError):
|
|
management.call_command("hal")
|
|
|
|
def test_output_transaction(self):
|
|
output = management.call_command(
|
|
"transaction", stdout=StringIO(), no_color=True
|
|
)
|
|
self.assertTrue(
|
|
output.strip().startswith(connection.ops.start_transaction_sql())
|
|
)
|
|
self.assertTrue(output.strip().endswith(connection.ops.end_transaction_sql()))
|
|
|
|
def test_call_command_no_checks(self):
|
|
"""
|
|
By default, call_command should not trigger the check framework, unless
|
|
specifically asked.
|
|
"""
|
|
self.counter = 0
|
|
|
|
def patched_check(self_, **kwargs):
|
|
self.counter += 1
|
|
self.kwargs = kwargs
|
|
|
|
saved_check = BaseCommand.check
|
|
BaseCommand.check = patched_check
|
|
try:
|
|
management.call_command("dance", verbosity=0)
|
|
self.assertEqual(self.counter, 0)
|
|
management.call_command("dance", verbosity=0, skip_checks=False)
|
|
self.assertEqual(self.counter, 1)
|
|
self.assertEqual(self.kwargs, {})
|
|
finally:
|
|
BaseCommand.check = saved_check
|
|
|
|
def test_requires_system_checks_empty(self):
|
|
with mock.patch(
|
|
"django.core.management.base.BaseCommand.check"
|
|
) as mocked_check:
|
|
management.call_command("no_system_checks")
|
|
self.assertIs(mocked_check.called, False)
|
|
|
|
def test_requires_system_checks_specific(self):
|
|
with mock.patch(
|
|
"django.core.management.base.BaseCommand.check"
|
|
) as mocked_check:
|
|
management.call_command("specific_system_checks", skip_checks=False)
|
|
mocked_check.assert_called_once_with(tags=[Tags.staticfiles, Tags.models])
|
|
|
|
def test_requires_system_checks_invalid(self):
|
|
class Command(BaseCommand):
|
|
requires_system_checks = "x"
|
|
|
|
msg = "requires_system_checks must be a list or tuple."
|
|
with self.assertRaisesMessage(TypeError, msg):
|
|
Command()
|
|
|
|
def test_check_migrations(self):
|
|
requires_migrations_checks = dance.Command.requires_migrations_checks
|
|
self.assertIs(requires_migrations_checks, False)
|
|
try:
|
|
with mock.patch.object(BaseCommand, "check_migrations") as check_migrations:
|
|
management.call_command("dance", verbosity=0)
|
|
self.assertFalse(check_migrations.called)
|
|
dance.Command.requires_migrations_checks = True
|
|
management.call_command("dance", verbosity=0)
|
|
self.assertTrue(check_migrations.called)
|
|
finally:
|
|
dance.Command.requires_migrations_checks = requires_migrations_checks
|
|
|
|
def test_call_command_unrecognized_option(self):
|
|
msg = (
|
|
"Unknown option(s) for dance command: unrecognized. Valid options "
|
|
"are: example, force_color, help, integer, no_color, opt_3, "
|
|
"option3, pythonpath, settings, skip_checks, stderr, stdout, "
|
|
"style, traceback, verbosity, version."
|
|
)
|
|
with self.assertRaisesMessage(TypeError, msg):
|
|
management.call_command("dance", unrecognized=1)
|
|
|
|
msg = (
|
|
"Unknown option(s) for dance command: unrecognized, unrecognized2. "
|
|
"Valid options are: example, force_color, help, integer, no_color, "
|
|
"opt_3, option3, pythonpath, settings, skip_checks, stderr, "
|
|
"stdout, style, traceback, verbosity, version."
|
|
)
|
|
with self.assertRaisesMessage(TypeError, msg):
|
|
management.call_command("dance", unrecognized=1, unrecognized2=1)
|
|
|
|
def test_call_command_with_required_parameters_in_options(self):
|
|
out = StringIO()
|
|
management.call_command(
|
|
"required_option", need_me="foo", needme2="bar", stdout=out
|
|
)
|
|
self.assertIn("need_me", out.getvalue())
|
|
self.assertIn("needme2", out.getvalue())
|
|
|
|
def test_call_command_with_required_parameters_in_mixed_options(self):
|
|
out = StringIO()
|
|
management.call_command(
|
|
"required_option", "--need-me=foo", needme2="bar", stdout=out
|
|
)
|
|
self.assertIn("need_me", out.getvalue())
|
|
self.assertIn("needme2", out.getvalue())
|
|
|
|
def test_command_add_arguments_after_common_arguments(self):
|
|
out = StringIO()
|
|
management.call_command("common_args", stdout=out)
|
|
self.assertIn("Detected that --version already exists", out.getvalue())
|
|
|
|
def test_mutually_exclusive_group_required_options(self):
|
|
out = StringIO()
|
|
management.call_command("mutually_exclusive_required", foo_id=1, stdout=out)
|
|
self.assertIn("foo_id", out.getvalue())
|
|
management.call_command(
|
|
"mutually_exclusive_required", foo_name="foo", stdout=out
|
|
)
|
|
self.assertIn("foo_name", out.getvalue())
|
|
msg = (
|
|
"Error: one of the arguments --foo-id --foo-name --foo-list "
|
|
"--append_const --const --count --flag_false --flag_true is "
|
|
"required"
|
|
)
|
|
with self.assertRaisesMessage(CommandError, msg):
|
|
management.call_command("mutually_exclusive_required", stdout=out)
|
|
|
|
def test_mutually_exclusive_group_required_const_options(self):
|
|
tests = [
|
|
("append_const", [42]),
|
|
("const", 31),
|
|
("count", 1),
|
|
("flag_false", False),
|
|
("flag_true", True),
|
|
]
|
|
for arg, value in tests:
|
|
out = StringIO()
|
|
expected_output = "%s=%s" % (arg, value)
|
|
with self.subTest(arg=arg):
|
|
management.call_command(
|
|
"mutually_exclusive_required",
|
|
"--%s" % arg,
|
|
stdout=out,
|
|
)
|
|
self.assertIn(expected_output, out.getvalue())
|
|
out.truncate(0)
|
|
management.call_command(
|
|
"mutually_exclusive_required",
|
|
**{arg: value, "stdout": out},
|
|
)
|
|
self.assertIn(expected_output, out.getvalue())
|
|
|
|
def test_mutually_exclusive_group_required_with_same_dest_options(self):
|
|
tests = [
|
|
{"until": "2"},
|
|
{"for": "1", "until": "2"},
|
|
]
|
|
msg = (
|
|
"Cannot pass the dest 'until' that matches multiple arguments via "
|
|
"**options."
|
|
)
|
|
for options in tests:
|
|
with self.subTest(options=options):
|
|
with self.assertRaisesMessage(TypeError, msg):
|
|
management.call_command(
|
|
"mutually_exclusive_required_with_same_dest",
|
|
**options,
|
|
)
|
|
|
|
def test_mutually_exclusive_group_required_with_same_dest_args(self):
|
|
tests = [
|
|
("--until=1",),
|
|
("--until", 1),
|
|
("--for=1",),
|
|
("--for", 1),
|
|
]
|
|
for args in tests:
|
|
out = StringIO()
|
|
with self.subTest(options=args):
|
|
management.call_command(
|
|
"mutually_exclusive_required_with_same_dest",
|
|
*args,
|
|
stdout=out,
|
|
)
|
|
output = out.getvalue()
|
|
self.assertIn("until=1", output)
|
|
|
|
def test_required_list_option(self):
|
|
tests = [
|
|
(("--foo-list", [1, 2]), {}),
|
|
((), {"foo_list": [1, 2]}),
|
|
]
|
|
for command in ["mutually_exclusive_required", "required_list_option"]:
|
|
for args, kwargs in tests:
|
|
with self.subTest(command=command, args=args, kwargs=kwargs):
|
|
out = StringIO()
|
|
management.call_command(
|
|
command,
|
|
*args,
|
|
**kwargs,
|
|
stdout=out,
|
|
)
|
|
self.assertIn("foo_list=[1, 2]", out.getvalue())
|
|
|
|
def test_required_const_options(self):
|
|
args = {
|
|
"append_const": [42],
|
|
"const": 31,
|
|
"count": 1,
|
|
"flag_false": False,
|
|
"flag_true": True,
|
|
}
|
|
expected_output = "\n".join(
|
|
"%s=%s" % (arg, value) for arg, value in args.items()
|
|
)
|
|
out = StringIO()
|
|
management.call_command(
|
|
"required_constant_option",
|
|
"--append_const",
|
|
"--const",
|
|
"--count",
|
|
"--flag_false",
|
|
"--flag_true",
|
|
stdout=out,
|
|
)
|
|
self.assertIn(expected_output, out.getvalue())
|
|
out.truncate(0)
|
|
management.call_command("required_constant_option", **args, stdout=out)
|
|
self.assertIn(expected_output, out.getvalue())
|
|
|
|
def test_subparser(self):
|
|
out = StringIO()
|
|
management.call_command("subparser", "foo", 12, stdout=out)
|
|
self.assertIn("bar", out.getvalue())
|
|
|
|
def test_subparser_dest_args(self):
|
|
out = StringIO()
|
|
management.call_command("subparser_dest", "foo", bar=12, stdout=out)
|
|
self.assertIn("bar", out.getvalue())
|
|
|
|
def test_subparser_dest_required_args(self):
|
|
out = StringIO()
|
|
management.call_command(
|
|
"subparser_required", "foo_1", "foo_2", bar=12, stdout=out
|
|
)
|
|
self.assertIn("bar", out.getvalue())
|
|
|
|
def test_subparser_invalid_option(self):
|
|
msg = r"invalid choice: 'test' \(choose from '?foo'?\)"
|
|
with self.assertRaisesRegex(CommandError, msg):
|
|
management.call_command("subparser", "test", 12)
|
|
msg = "Error: the following arguments are required: subcommand"
|
|
with self.assertRaisesMessage(CommandError, msg):
|
|
management.call_command("subparser_dest", subcommand="foo", bar=12)
|
|
|
|
def test_create_parser_kwargs(self):
|
|
"""BaseCommand.create_parser() passes kwargs to CommandParser."""
|
|
epilog = "some epilog text"
|
|
parser = BaseCommand().create_parser(
|
|
"prog_name",
|
|
"subcommand",
|
|
epilog=epilog,
|
|
formatter_class=ArgumentDefaultsHelpFormatter,
|
|
)
|
|
self.assertEqual(parser.epilog, epilog)
|
|
self.assertEqual(parser.formatter_class, ArgumentDefaultsHelpFormatter)
|
|
|
|
def test_outputwrapper_flush(self):
|
|
out = StringIO()
|
|
with mock.patch.object(out, "flush") as mocked_flush:
|
|
management.call_command("outputwrapper", stdout=out)
|
|
self.assertIn("Working...", out.getvalue())
|
|
self.assertIs(mocked_flush.called, True)
|
|
|
|
|
|
class CommandRunTests(AdminScriptTestCase):
|
|
"""
|
|
Tests that need to run by simulating the command line, not by call_command.
|
|
"""
|
|
|
|
def test_script_prefix_set_in_commands(self):
|
|
self.write_settings(
|
|
"settings.py",
|
|
apps=["user_commands"],
|
|
sdict={
|
|
"ROOT_URLCONF": '"user_commands.urls"',
|
|
"FORCE_SCRIPT_NAME": '"/PREFIX/"',
|
|
},
|
|
)
|
|
out, err = self.run_manage(["reverse_url"])
|
|
self.assertNoOutput(err)
|
|
self.assertEqual(out.strip(), "/PREFIX/some/url/")
|
|
|
|
def test_disallowed_abbreviated_options(self):
|
|
"""
|
|
To avoid conflicts with custom options, commands don't allow
|
|
abbreviated forms of the --setting and --pythonpath options.
|
|
"""
|
|
self.write_settings("settings.py", apps=["user_commands"])
|
|
out, err = self.run_manage(["set_option", "--set", "foo"])
|
|
self.assertNoOutput(err)
|
|
self.assertEqual(out.strip(), "Set foo")
|
|
|
|
def test_skip_checks(self):
|
|
self.write_settings(
|
|
"settings.py",
|
|
apps=["django.contrib.staticfiles", "user_commands"],
|
|
sdict={
|
|
# (staticfiles.E001) The STATICFILES_DIRS setting is not a tuple or
|
|
# list.
|
|
"STATICFILES_DIRS": '"foo"',
|
|
},
|
|
)
|
|
out, err = self.run_manage(["set_option", "--skip-checks", "--set", "foo"])
|
|
self.assertNoOutput(err)
|
|
self.assertEqual(out.strip(), "Set foo")
|
|
|
|
def test_subparser_error_formatting(self):
|
|
self.write_settings("settings.py", apps=["user_commands"])
|
|
out, err = self.run_manage(["subparser", "foo", "twelve"])
|
|
self.maxDiff = None
|
|
self.assertNoOutput(out)
|
|
err_lines = err.splitlines()
|
|
self.assertEqual(len(err_lines), 2)
|
|
self.assertEqual(
|
|
err_lines[1],
|
|
"manage.py subparser foo: error: argument bar: invalid int value: 'twelve'",
|
|
)
|
|
|
|
def test_subparser_non_django_error_formatting(self):
|
|
self.write_settings("settings.py", apps=["user_commands"])
|
|
out, err = self.run_manage(["subparser_vanilla", "foo", "seven"])
|
|
self.assertNoOutput(out)
|
|
err_lines = err.splitlines()
|
|
self.assertEqual(len(err_lines), 2)
|
|
self.assertEqual(
|
|
err_lines[1],
|
|
"manage.py subparser_vanilla foo: error: argument bar: invalid int value: "
|
|
"'seven'",
|
|
)
|
|
|
|
|
|
class UtilsTests(SimpleTestCase):
|
|
def test_no_existent_external_program(self):
|
|
msg = "Error executing a_42_command_that_doesnt_exist_42"
|
|
with self.assertRaisesMessage(CommandError, msg):
|
|
popen_wrapper(["a_42_command_that_doesnt_exist_42"])
|
|
|
|
def test_get_random_secret_key(self):
|
|
key = get_random_secret_key()
|
|
self.assertEqual(len(key), 50)
|
|
for char in key:
|
|
self.assertIn(char, "abcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*(-_=+)")
|
|
|
|
def test_is_ignored_path_true(self):
|
|
patterns = (
|
|
["foo/bar/baz"],
|
|
["baz"],
|
|
["foo/bar/baz"],
|
|
["*/baz"],
|
|
["*"],
|
|
["b?z"],
|
|
["[abc]az"],
|
|
["*/ba[!z]/baz"],
|
|
)
|
|
for ignore_patterns in patterns:
|
|
with self.subTest(ignore_patterns=ignore_patterns):
|
|
self.assertIs(
|
|
is_ignored_path("foo/bar/baz", ignore_patterns=ignore_patterns),
|
|
True,
|
|
)
|
|
|
|
def test_is_ignored_path_false(self):
|
|
self.assertIs(
|
|
is_ignored_path(
|
|
"foo/bar/baz", ignore_patterns=["foo/bar/bat", "bar", "flub/blub"]
|
|
),
|
|
False,
|
|
)
|
|
|
|
def test_normalize_path_patterns_truncates_wildcard_base(self):
|
|
expected = [os.path.normcase(p) for p in ["foo/bar", "bar/*/"]]
|
|
self.assertEqual(normalize_path_patterns(["foo/bar/*", "bar/*/"]), expected)
|
|
|
|
def test_run_formatters_handles_oserror_for_black_path(self):
|
|
cases = [
|
|
(FileNotFoundError, "nonexistent"),
|
|
(
|
|
OSError if sys.platform == "win32" else PermissionError,
|
|
str(Path(__file__).parent / "test_files" / "black"),
|
|
),
|
|
]
|
|
for exception, location in cases:
|
|
with (
|
|
self.subTest(exception.__qualname__),
|
|
AssertFormatterFailureCaughtContext(
|
|
self, shutil_which_result=location
|
|
) as ctx,
|
|
):
|
|
run_formatters([], stderr=ctx.stderr)
|
|
parsed_error = ctx.stderr.getvalue()
|
|
self.assertIn(exception.__qualname__, parsed_error)
|
|
if sys.platform != "win32":
|
|
self.assertIn(location, parsed_error)
|