1
0
mirror of https://github.com/django/django.git synced 2025-01-03 23:16:41 +00:00
django/tests/utils_tests/test_autoreload.py
Tom Forbes 480492fe70 Fixed #30523 -- Fixed updating file modification times on seen files in auto-reloader when using StatReloader.
Previously we updated the file mtimes if the file has not been seen
before - i.e on the first iteration of the loop.

If the mtime has been changed we triggered the notify_file_changed()
method which in all cases except the translations will result in the
process being terminated. To be strictly correct we need to update the
mtime for either branch of the conditional.

Regression in 6754bffa2b.
2019-05-29 09:41:24 +02:00

689 lines
29 KiB
Python

import contextlib
import os
import py_compile
import shutil
import sys
import tempfile
import threading
import time
import types
import weakref
import zipfile
from importlib import import_module
from pathlib import Path
from unittest import mock, skip, skipIf
from django.apps.registry import Apps
from django.test import SimpleTestCase
from django.test.utils import extend_sys_path
from django.utils import autoreload
from django.utils.autoreload import WatchmanUnavailable
from .utils import on_macos_with_hfs
class TestIterModulesAndFiles(SimpleTestCase):
def import_and_cleanup(self, name):
import_module(name)
self.addCleanup(lambda: sys.path_importer_cache.clear())
self.addCleanup(lambda: sys.modules.pop(name, None))
def clear_autoreload_caches(self):
autoreload.iter_modules_and_files.cache_clear()
def assertFileFound(self, filename):
# Some temp directories are symlinks. Python resolves these fully while
# importing.
resolved_filename = filename.resolve()
self.clear_autoreload_caches()
# Test uncached access
self.assertIn(resolved_filename, list(autoreload.iter_all_python_module_files()))
# Test cached access
self.assertIn(resolved_filename, list(autoreload.iter_all_python_module_files()))
self.assertEqual(autoreload.iter_modules_and_files.cache_info().hits, 1)
def assertFileNotFound(self, filename):
resolved_filename = filename.resolve()
self.clear_autoreload_caches()
# Test uncached access
self.assertNotIn(resolved_filename, list(autoreload.iter_all_python_module_files()))
# Test cached access
self.assertNotIn(resolved_filename, list(autoreload.iter_all_python_module_files()))
self.assertEqual(autoreload.iter_modules_and_files.cache_info().hits, 1)
def temporary_file(self, filename):
dirname = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, dirname)
return Path(dirname) / filename
def test_paths_are_pathlib_instances(self):
for filename in autoreload.iter_all_python_module_files():
self.assertIsInstance(filename, Path)
def test_file_added(self):
"""
When a file is added, it's returned by iter_all_python_module_files().
"""
filename = self.temporary_file('test_deleted_removed_module.py')
filename.touch()
with extend_sys_path(str(filename.parent)):
self.import_and_cleanup('test_deleted_removed_module')
self.assertFileFound(filename.absolute())
def test_check_errors(self):
"""
When a file containing an error is imported in a function wrapped by
check_errors(), gen_filenames() returns it.
"""
filename = self.temporary_file('test_syntax_error.py')
filename.write_text("Ceci n'est pas du Python.")
with extend_sys_path(str(filename.parent)):
with self.assertRaises(SyntaxError):
autoreload.check_errors(import_module)('test_syntax_error')
self.assertFileFound(filename)
def test_check_errors_catches_all_exceptions(self):
"""
Since Python may raise arbitrary exceptions when importing code,
check_errors() must catch Exception, not just some subclasses.
"""
filename = self.temporary_file('test_exception.py')
filename.write_text('raise Exception')
with extend_sys_path(str(filename.parent)):
with self.assertRaises(Exception):
autoreload.check_errors(import_module)('test_exception')
self.assertFileFound(filename)
def test_zip_reload(self):
"""
Modules imported from zipped files have their archive location included
in the result.
"""
zip_file = self.temporary_file('zip_import.zip')
with zipfile.ZipFile(str(zip_file), 'w', zipfile.ZIP_DEFLATED) as zipf:
zipf.writestr('test_zipped_file.py', '')
with extend_sys_path(str(zip_file)):
self.import_and_cleanup('test_zipped_file')
self.assertFileFound(zip_file)
def test_bytecode_conversion_to_source(self):
""".pyc and .pyo files are included in the files list."""
filename = self.temporary_file('test_compiled.py')
filename.touch()
compiled_file = Path(py_compile.compile(str(filename), str(filename.with_suffix('.pyc'))))
filename.unlink()
with extend_sys_path(str(compiled_file.parent)):
self.import_and_cleanup('test_compiled')
self.assertFileFound(compiled_file)
def test_weakref_in_sys_module(self):
"""iter_all_python_module_file() ignores weakref modules."""
time_proxy = weakref.proxy(time)
sys.modules['time_proxy'] = time_proxy
self.addCleanup(lambda: sys.modules.pop('time_proxy', None))
list(autoreload.iter_all_python_module_files()) # No crash.
def test_module_without_spec(self):
module = types.ModuleType('test_module')
del module.__spec__
self.assertEqual(autoreload.iter_modules_and_files((module,), frozenset()), frozenset())
def test_main_module_is_resolved(self):
main_module = sys.modules['__main__']
self.assertFileFound(Path(main_module.__file__))
class TestCommonRoots(SimpleTestCase):
def test_common_roots(self):
paths = (
Path('/first/second'),
Path('/first/second/third'),
Path('/first/'),
Path('/root/first/'),
)
results = autoreload.common_roots(paths)
self.assertCountEqual(results, [Path('/first/'), Path('/root/first/')])
class TestSysPathDirectories(SimpleTestCase):
def setUp(self):
self._directory = tempfile.TemporaryDirectory()
self.directory = Path(self._directory.name).resolve().absolute()
self.file = self.directory / 'test'
self.file.touch()
def tearDown(self):
self._directory.cleanup()
def test_sys_paths_with_directories(self):
with extend_sys_path(str(self.file)):
paths = list(autoreload.sys_path_directories())
self.assertIn(self.file.parent, paths)
def test_sys_paths_non_existing(self):
nonexistent_file = Path(self.directory.name) / 'does_not_exist'
with extend_sys_path(str(nonexistent_file)):
paths = list(autoreload.sys_path_directories())
self.assertNotIn(nonexistent_file, paths)
self.assertNotIn(nonexistent_file.parent, paths)
def test_sys_paths_absolute(self):
paths = list(autoreload.sys_path_directories())
self.assertTrue(all(p.is_absolute() for p in paths))
def test_sys_paths_directories(self):
with extend_sys_path(str(self.directory)):
paths = list(autoreload.sys_path_directories())
self.assertIn(self.directory, paths)
class GetReloaderTests(SimpleTestCase):
@mock.patch('django.utils.autoreload.WatchmanReloader')
def test_watchman_unavailable(self, mocked_watchman):
mocked_watchman.check_availability.side_effect = WatchmanUnavailable
self.assertIsInstance(autoreload.get_reloader(), autoreload.StatReloader)
@mock.patch.object(autoreload.WatchmanReloader, 'check_availability')
def test_watchman_available(self, mocked_available):
# If WatchmanUnavailable isn't raised, Watchman will be chosen.
mocked_available.return_value = None
result = autoreload.get_reloader()
self.assertIsInstance(result, autoreload.WatchmanReloader)
class RunWithReloaderTests(SimpleTestCase):
@mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: 'true'})
@mock.patch('django.utils.autoreload.get_reloader')
def test_swallows_keyboard_interrupt(self, mocked_get_reloader):
mocked_get_reloader.side_effect = KeyboardInterrupt()
autoreload.run_with_reloader(lambda: None) # No exception
@mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: 'false'})
@mock.patch('django.utils.autoreload.restart_with_reloader')
def test_calls_sys_exit(self, mocked_restart_reloader):
mocked_restart_reloader.return_value = 1
with self.assertRaises(SystemExit) as exc:
autoreload.run_with_reloader(lambda: None)
self.assertEqual(exc.exception.code, 1)
@mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: 'true'})
@mock.patch('django.utils.autoreload.start_django')
@mock.patch('django.utils.autoreload.get_reloader')
def test_calls_start_django(self, mocked_reloader, mocked_start_django):
mocked_reloader.return_value = mock.sentinel.RELOADER
autoreload.run_with_reloader(mock.sentinel.METHOD)
self.assertEqual(mocked_start_django.call_count, 1)
self.assertSequenceEqual(
mocked_start_django.call_args[0],
[mock.sentinel.RELOADER, mock.sentinel.METHOD]
)
class StartDjangoTests(SimpleTestCase):
@mock.patch('django.utils.autoreload.StatReloader')
def test_watchman_becomes_unavailable(self, mocked_stat):
mocked_stat.should_stop.return_value = True
fake_reloader = mock.MagicMock()
fake_reloader.should_stop = False
fake_reloader.run.side_effect = autoreload.WatchmanUnavailable()
autoreload.start_django(fake_reloader, lambda: None)
self.assertEqual(mocked_stat.call_count, 1)
@mock.patch('django.utils.autoreload.ensure_echo_on')
def test_echo_on_called(self, mocked_echo):
fake_reloader = mock.MagicMock()
autoreload.start_django(fake_reloader, lambda: None)
self.assertEqual(mocked_echo.call_count, 1)
@mock.patch('django.utils.autoreload.check_errors')
def test_check_errors_called(self, mocked_check_errors):
fake_method = mock.MagicMock(return_value=None)
fake_reloader = mock.MagicMock()
autoreload.start_django(fake_reloader, fake_method)
self.assertCountEqual(mocked_check_errors.call_args[0], [fake_method])
@mock.patch('threading.Thread')
@mock.patch('django.utils.autoreload.check_errors')
def test_starts_thread_with_args(self, mocked_check_errors, mocked_thread):
fake_reloader = mock.MagicMock()
fake_main_func = mock.MagicMock()
fake_thread = mock.MagicMock()
mocked_check_errors.return_value = fake_main_func
mocked_thread.return_value = fake_thread
autoreload.start_django(fake_reloader, fake_main_func, 123, abc=123)
self.assertEqual(mocked_thread.call_count, 1)
self.assertEqual(
mocked_thread.call_args[1],
{'target': fake_main_func, 'args': (123,), 'kwargs': {'abc': 123}, 'name': 'django-main-thread'}
)
self.assertSequenceEqual(fake_thread.setDaemon.call_args[0], [True])
self.assertTrue(fake_thread.start.called)
class TestCheckErrors(SimpleTestCase):
def test_mutates_error_files(self):
fake_method = mock.MagicMock(side_effect=RuntimeError())
wrapped = autoreload.check_errors(fake_method)
with mock.patch.object(autoreload, '_error_files') as mocked_error_files:
with self.assertRaises(RuntimeError):
wrapped()
self.assertEqual(mocked_error_files.append.call_count, 1)
class TestRaiseLastException(SimpleTestCase):
@mock.patch('django.utils.autoreload._exception', None)
def test_no_exception(self):
# Should raise no exception if _exception is None
autoreload.raise_last_exception()
def test_raises_exception(self):
class MyException(Exception):
pass
# Create an exception
try:
raise MyException('Test Message')
except MyException:
exc_info = sys.exc_info()
with mock.patch('django.utils.autoreload._exception', exc_info):
with self.assertRaisesMessage(MyException, 'Test Message'):
autoreload.raise_last_exception()
def test_raises_custom_exception(self):
class MyException(Exception):
def __init__(self, msg, extra_context):
super().__init__(msg)
self.extra_context = extra_context
# Create an exception.
try:
raise MyException('Test Message', 'extra context')
except MyException:
exc_info = sys.exc_info()
with mock.patch('django.utils.autoreload._exception', exc_info):
with self.assertRaisesMessage(MyException, 'Test Message'):
autoreload.raise_last_exception()
def test_raises_exception_with_context(self):
try:
raise Exception(2)
except Exception as e:
try:
raise Exception(1) from e
except Exception:
exc_info = sys.exc_info()
with mock.patch('django.utils.autoreload._exception', exc_info):
with self.assertRaises(Exception) as cm:
autoreload.raise_last_exception()
self.assertEqual(cm.exception.args[0], 1)
self.assertEqual(cm.exception.__cause__.args[0], 2)
class RestartWithReloaderTests(SimpleTestCase):
executable = '/usr/bin/python'
def patch_autoreload(self, argv):
patch_call = mock.patch('django.utils.autoreload.subprocess.call', return_value=0)
patches = [
mock.patch('django.utils.autoreload.sys.argv', argv),
mock.patch('django.utils.autoreload.sys.executable', self.executable),
mock.patch('django.utils.autoreload.sys.warnoptions', ['all']),
]
for p in patches:
p.start()
self.addCleanup(p.stop)
mock_call = patch_call.start()
self.addCleanup(patch_call.stop)
return mock_call
def test_manage_py(self):
argv = ['./manage.py', 'runserver']
mock_call = self.patch_autoreload(argv)
autoreload.restart_with_reloader()
self.assertEqual(mock_call.call_count, 1)
self.assertEqual(mock_call.call_args[0][0], [self.executable, '-Wall'] + argv)
def test_python_m_django(self):
main = '/usr/lib/pythonX.Y/site-packages/django/__main__.py'
argv = [main, 'runserver']
mock_call = self.patch_autoreload(argv)
with mock.patch('django.__main__.__file__', main):
autoreload.restart_with_reloader()
self.assertEqual(mock_call.call_count, 1)
self.assertEqual(mock_call.call_args[0][0], [self.executable, '-Wall', '-m', 'django'] + argv[1:])
class ReloaderTests(SimpleTestCase):
RELOADER_CLS = None
def setUp(self):
self._tempdir = tempfile.TemporaryDirectory()
self.tempdir = Path(self._tempdir.name).resolve().absolute()
self.existing_file = self.ensure_file(self.tempdir / 'test.py')
self.nonexistent_file = (self.tempdir / 'does_not_exist.py').absolute()
self.reloader = self.RELOADER_CLS()
def tearDown(self):
self._tempdir.cleanup()
self.reloader.stop()
def ensure_file(self, path):
path.parent.mkdir(exist_ok=True, parents=True)
path.touch()
# On Linux and Windows updating the mtime of a file using touch() will set a timestamp
# value that is in the past, as the time value for the last kernel tick is used rather
# than getting the correct absolute time.
# To make testing simpler set the mtime to be the observed time when this function is
# called.
self.set_mtime(path, time.time())
return path.absolute()
def set_mtime(self, fp, value):
os.utime(str(fp), (value, value))
def increment_mtime(self, fp, by=1):
current_time = time.time()
self.set_mtime(fp, current_time + by)
@contextlib.contextmanager
def tick_twice(self):
ticker = self.reloader.tick()
next(ticker)
yield
next(ticker)
class IntegrationTests:
@mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
@mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
def test_file(self, mocked_modules, notify_mock):
self.reloader.watch_file(self.existing_file)
with self.tick_twice():
self.increment_mtime(self.existing_file)
self.assertEqual(notify_mock.call_count, 1)
self.assertCountEqual(notify_mock.call_args[0], [self.existing_file])
@mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
@mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
def test_glob(self, mocked_modules, notify_mock):
non_py_file = self.ensure_file(self.tempdir / 'non_py_file')
self.reloader.watch_dir(self.tempdir, '*.py')
with self.tick_twice():
self.increment_mtime(non_py_file)
self.increment_mtime(self.existing_file)
self.assertEqual(notify_mock.call_count, 1)
self.assertCountEqual(notify_mock.call_args[0], [self.existing_file])
@mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
@mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
def test_multiple_globs(self, mocked_modules, notify_mock):
self.ensure_file(self.tempdir / 'x.test')
self.reloader.watch_dir(self.tempdir, '*.py')
self.reloader.watch_dir(self.tempdir, '*.test')
with self.tick_twice():
self.increment_mtime(self.existing_file)
self.assertEqual(notify_mock.call_count, 1)
self.assertCountEqual(notify_mock.call_args[0], [self.existing_file])
@mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
@mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
def test_overlapping_globs(self, mocked_modules, notify_mock):
self.reloader.watch_dir(self.tempdir, '*.py')
self.reloader.watch_dir(self.tempdir, '*.p*')
with self.tick_twice():
self.increment_mtime(self.existing_file)
self.assertEqual(notify_mock.call_count, 1)
self.assertCountEqual(notify_mock.call_args[0], [self.existing_file])
@mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
@mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
def test_glob_recursive(self, mocked_modules, notify_mock):
non_py_file = self.ensure_file(self.tempdir / 'dir' / 'non_py_file')
py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py')
self.reloader.watch_dir(self.tempdir, '**/*.py')
with self.tick_twice():
self.increment_mtime(non_py_file)
self.increment_mtime(py_file)
self.assertEqual(notify_mock.call_count, 1)
self.assertCountEqual(notify_mock.call_args[0], [py_file])
@mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
@mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
def test_multiple_recursive_globs(self, mocked_modules, notify_mock):
non_py_file = self.ensure_file(self.tempdir / 'dir' / 'test.txt')
py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py')
self.reloader.watch_dir(self.tempdir, '**/*.txt')
self.reloader.watch_dir(self.tempdir, '**/*.py')
with self.tick_twice():
self.increment_mtime(non_py_file)
self.increment_mtime(py_file)
self.assertEqual(notify_mock.call_count, 2)
self.assertCountEqual(notify_mock.call_args_list, [mock.call(py_file), mock.call(non_py_file)])
@mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
@mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
def test_nested_glob_recursive(self, mocked_modules, notify_mock):
inner_py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py')
self.reloader.watch_dir(self.tempdir, '**/*.py')
self.reloader.watch_dir(inner_py_file.parent, '**/*.py')
with self.tick_twice():
self.increment_mtime(inner_py_file)
self.assertEqual(notify_mock.call_count, 1)
self.assertCountEqual(notify_mock.call_args[0], [inner_py_file])
@mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
@mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
def test_overlapping_glob_recursive(self, mocked_modules, notify_mock):
py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py')
self.reloader.watch_dir(self.tempdir, '**/*.p*')
self.reloader.watch_dir(self.tempdir, '**/*.py*')
with self.tick_twice():
self.increment_mtime(py_file)
self.assertEqual(notify_mock.call_count, 1)
self.assertCountEqual(notify_mock.call_args[0], [py_file])
class BaseReloaderTests(ReloaderTests):
RELOADER_CLS = autoreload.BaseReloader
def test_watch_without_absolute(self):
with self.assertRaisesMessage(ValueError, 'test.py must be absolute.'):
self.reloader.watch_file('test.py')
def test_watch_with_single_file(self):
self.reloader.watch_file(self.existing_file)
watched_files = list(self.reloader.watched_files())
self.assertIn(self.existing_file, watched_files)
def test_watch_with_glob(self):
self.reloader.watch_dir(self.tempdir, '*.py')
watched_files = list(self.reloader.watched_files())
self.assertIn(self.existing_file, watched_files)
def test_watch_files_with_recursive_glob(self):
inner_file = self.ensure_file(self.tempdir / 'test' / 'test.py')
self.reloader.watch_dir(self.tempdir, '**/*.py')
watched_files = list(self.reloader.watched_files())
self.assertIn(self.existing_file, watched_files)
self.assertIn(inner_file, watched_files)
def test_run_loop_catches_stopiteration(self):
def mocked_tick():
yield
with mock.patch.object(self.reloader, 'tick', side_effect=mocked_tick) as tick:
self.reloader.run_loop()
self.assertEqual(tick.call_count, 1)
def test_run_loop_stop_and_return(self):
def mocked_tick(*args):
yield
self.reloader.stop()
return # Raises StopIteration
with mock.patch.object(self.reloader, 'tick', side_effect=mocked_tick) as tick:
self.reloader.run_loop()
self.assertEqual(tick.call_count, 1)
def test_wait_for_apps_ready_checks_for_exception(self):
app_reg = Apps()
app_reg.ready_event.set()
# thread.is_alive() is False if it's not started.
dead_thread = threading.Thread()
self.assertFalse(self.reloader.wait_for_apps_ready(app_reg, dead_thread))
def test_wait_for_apps_ready_without_exception(self):
app_reg = Apps()
app_reg.ready_event.set()
thread = mock.MagicMock()
thread.is_alive.return_value = True
self.assertTrue(self.reloader.wait_for_apps_ready(app_reg, thread))
def skip_unless_watchman_available():
try:
autoreload.WatchmanReloader.check_availability()
except WatchmanUnavailable as e:
return skip('Watchman unavailable: %s' % e)
return lambda func: func
@skip_unless_watchman_available()
class WatchmanReloaderTests(ReloaderTests, IntegrationTests):
RELOADER_CLS = autoreload.WatchmanReloader
def setUp(self):
super().setUp()
# Shorten the timeout to speed up tests.
self.reloader.client_timeout = 0.1
def test_watch_glob_ignores_non_existing_directories_two_levels(self):
with mock.patch.object(self.reloader, '_subscribe') as mocked_subscribe:
self.reloader._watch_glob(self.tempdir / 'does_not_exist' / 'more', ['*'])
self.assertFalse(mocked_subscribe.called)
def test_watch_glob_uses_existing_parent_directories(self):
with mock.patch.object(self.reloader, '_subscribe') as mocked_subscribe:
self.reloader._watch_glob(self.tempdir / 'does_not_exist', ['*'])
self.assertSequenceEqual(
mocked_subscribe.call_args[0],
[
self.tempdir, 'glob-parent-does_not_exist:%s' % self.tempdir,
['anyof', ['match', 'does_not_exist/*', 'wholename']]
]
)
def test_watch_glob_multiple_patterns(self):
with mock.patch.object(self.reloader, '_subscribe') as mocked_subscribe:
self.reloader._watch_glob(self.tempdir, ['*', '*.py'])
self.assertSequenceEqual(
mocked_subscribe.call_args[0],
[
self.tempdir, 'glob:%s' % self.tempdir,
['anyof', ['match', '*', 'wholename'], ['match', '*.py', 'wholename']]
]
)
def test_watched_roots_contains_files(self):
paths = self.reloader.watched_roots([self.existing_file])
self.assertIn(self.existing_file.parent, paths)
def test_watched_roots_contains_directory_globs(self):
self.reloader.watch_dir(self.tempdir, '*.py')
paths = self.reloader.watched_roots([])
self.assertIn(self.tempdir, paths)
def test_watched_roots_contains_sys_path(self):
with extend_sys_path(str(self.tempdir)):
paths = self.reloader.watched_roots([])
self.assertIn(self.tempdir, paths)
def test_check_server_status(self):
self.assertTrue(self.reloader.check_server_status())
def test_check_server_status_raises_error(self):
with mock.patch.object(self.reloader.client, 'query') as mocked_query:
mocked_query.side_effect = Exception()
with self.assertRaises(autoreload.WatchmanUnavailable):
self.reloader.check_server_status()
@mock.patch('pywatchman.client')
def test_check_availability(self, mocked_client):
mocked_client().capabilityCheck.side_effect = Exception()
with self.assertRaisesMessage(WatchmanUnavailable, 'Cannot connect to the watchman service'):
self.RELOADER_CLS.check_availability()
@mock.patch('pywatchman.client')
def test_check_availability_lower_version(self, mocked_client):
mocked_client().capabilityCheck.return_value = {'version': '4.8.10'}
with self.assertRaisesMessage(WatchmanUnavailable, 'Watchman 4.9 or later is required.'):
self.RELOADER_CLS.check_availability()
def test_pywatchman_not_available(self):
with mock.patch.object(autoreload, 'pywatchman') as mocked:
mocked.__bool__.return_value = False
with self.assertRaisesMessage(WatchmanUnavailable, 'pywatchman not installed.'):
self.RELOADER_CLS.check_availability()
def test_update_watches_raises_exceptions(self):
class TestException(Exception):
pass
with mock.patch.object(self.reloader, '_update_watches') as mocked_watches:
with mock.patch.object(self.reloader, 'check_server_status') as mocked_server_status:
mocked_watches.side_effect = TestException()
mocked_server_status.return_value = True
with self.assertRaises(TestException):
self.reloader.update_watches()
self.assertIsInstance(mocked_server_status.call_args[0][0], TestException)
@mock.patch.dict(os.environ, {'DJANGO_WATCHMAN_TIMEOUT': '10'})
def test_setting_timeout_from_environment_variable(self):
self.assertEqual(self.RELOADER_CLS.client_timeout, 10)
@skipIf(on_macos_with_hfs(), "These tests do not work with HFS+ as a filesystem")
class StatReloaderTests(ReloaderTests, IntegrationTests):
RELOADER_CLS = autoreload.StatReloader
def setUp(self):
super().setUp()
# Shorten the sleep time to speed up tests.
self.reloader.SLEEP_TIME = 0.01
@mock.patch('django.utils.autoreload.StatReloader.notify_file_changed')
def test_tick_does_not_trigger_twice(self, mock_notify_file_changed):
with mock.patch.object(self.reloader, 'watched_files', return_value=[self.existing_file]):
ticker = self.reloader.tick()
next(ticker)
self.increment_mtime(self.existing_file)
next(ticker)
next(ticker)
self.assertEqual(mock_notify_file_changed.call_count, 1)
def test_snapshot_files_ignores_missing_files(self):
with mock.patch.object(self.reloader, 'watched_files', return_value=[self.nonexistent_file]):
self.assertEqual(dict(self.reloader.snapshot_files()), {})
def test_snapshot_files_updates(self):
with mock.patch.object(self.reloader, 'watched_files', return_value=[self.existing_file]):
snapshot1 = dict(self.reloader.snapshot_files())
self.assertIn(self.existing_file, snapshot1)
self.increment_mtime(self.existing_file)
snapshot2 = dict(self.reloader.snapshot_files())
self.assertNotEqual(snapshot1[self.existing_file], snapshot2[self.existing_file])
def test_snapshot_files_with_duplicates(self):
with mock.patch.object(self.reloader, 'watched_files', return_value=[self.existing_file, self.existing_file]):
snapshot = list(self.reloader.snapshot_files())
self.assertEqual(len(snapshot), 1)
self.assertEqual(snapshot[0][0], self.existing_file)