diff --git a/django/conf/global_settings.py b/django/conf/global_settings.py index 77f2b83e68..72f84dd6af 100644 --- a/django/conf/global_settings.py +++ b/django/conf/global_settings.py @@ -672,3 +672,8 @@ SECURE_CSP_REPORT_ONLY = {} # HTTPS as the default protocol in urlize and urlizetrunc when no protocol is # provided. Set to True to assume HTTPS during the Django 6.x release cycle. URLIZE_ASSUME_HTTPS = False + +######### +# TASKS # +######### +TASKS = {"default": {"BACKEND": "django.tasks.backends.immediate.ImmediateBackend"}} diff --git a/django/tasks/__init__.py b/django/tasks/__init__.py new file mode 100644 index 0000000000..ae7ae8dd7c --- /dev/null +++ b/django/tasks/__init__.py @@ -0,0 +1,45 @@ +from django.utils.connection import BaseConnectionHandler, ConnectionProxy +from django.utils.module_loading import import_string + +from . import checks, signals # NOQA +from .base import ( + DEFAULT_TASK_BACKEND_ALIAS, + DEFAULT_TASK_QUEUE_NAME, + Task, + TaskContext, + TaskResult, + TaskResultStatus, + task, +) +from .exceptions import InvalidTaskBackend + +__all__ = [ + "DEFAULT_TASK_BACKEND_ALIAS", + "DEFAULT_TASK_QUEUE_NAME", + "default_task_backend", + "task", + "task_backends", + "Task", + "TaskContext", + "TaskResult", + "TaskResultStatus", +] + + +class TaskBackendHandler(BaseConnectionHandler): + settings_name = "TASKS" + exception_class = InvalidTaskBackend + + def create_connection(self, alias): + params = self.settings[alias] + backend = params["BACKEND"] + try: + backend_cls = import_string(backend) + except ImportError as e: + raise InvalidTaskBackend(f"Could not find backend '{backend}': {e}") from e + return backend_cls(alias=alias, params=params) + + +task_backends = TaskBackendHandler() + +default_task_backend = ConnectionProxy(task_backends, DEFAULT_TASK_BACKEND_ALIAS) diff --git a/django/tasks/backends/__init__.py b/django/tasks/backends/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/django/tasks/backends/base.py b/django/tasks/backends/base.py new file mode 100644 index 0000000000..32ae10018d --- /dev/null +++ b/django/tasks/backends/base.py @@ -0,0 +1,138 @@ +from abc import ABCMeta, abstractmethod +from inspect import iscoroutinefunction + +from asgiref.sync import sync_to_async + +from django.conf import settings +from django.core import checks +from django.db import connections +from django.tasks import DEFAULT_TASK_QUEUE_NAME +from django.tasks.base import ( + DEFAULT_TASK_PRIORITY, + TASK_MAX_PRIORITY, + TASK_MIN_PRIORITY, + Task, +) +from django.tasks.exceptions import InvalidTask +from django.utils import timezone +from django.utils.inspect import get_func_args, is_module_level_function + + +class BaseTaskBackend(metaclass=ABCMeta): + task_class = Task + + # Does the backend support Tasks to be enqueued with the run_after + # attribute? + supports_defer = False + + # Does the backend support coroutines to be enqueued? + supports_async_task = False + + # Does the backend support results being retrieved (from any + # thread/process)? + supports_get_result = False + + # Does the backend support executing Tasks in a given + # priority order? + supports_priority = False + + def __init__(self, alias, params): + self.alias = alias + self.queues = set(params.get("QUEUES", [DEFAULT_TASK_QUEUE_NAME])) + self.enqueue_on_commit = bool(params.get("ENQUEUE_ON_COMMIT", True)) + self.options = params.get("OPTIONS", {}) + + def _get_enqueue_on_commit_for_task(self, task): + return ( + task.enqueue_on_commit + if task.enqueue_on_commit is not None + else self.enqueue_on_commit + ) + + def validate_task(self, task): + """ + Determine whether the provided Task can be executed by the backend. + """ + if not is_module_level_function(task.func): + raise InvalidTask("Task function must be defined at a module level.") + + if not self.supports_async_task and iscoroutinefunction(task.func): + raise InvalidTask("Backend does not support async Tasks.") + + task_func_args = get_func_args(task.func) + if task.takes_context and ( + not task_func_args or task_func_args[0] != "context" + ): + raise InvalidTask( + "Task takes context but does not have a first argument of 'context'." + ) + + if not self.supports_priority and task.priority != DEFAULT_TASK_PRIORITY: + raise InvalidTask("Backend does not support setting priority of tasks.") + if ( + task.priority < TASK_MIN_PRIORITY + or task.priority > TASK_MAX_PRIORITY + or int(task.priority) != task.priority + ): + raise InvalidTask( + f"priority must be a whole number between {TASK_MIN_PRIORITY} and " + f"{TASK_MAX_PRIORITY}." + ) + + if not self.supports_defer and task.run_after is not None: + raise InvalidTask("Backend does not support run_after.") + + if ( + settings.USE_TZ + and task.run_after is not None + and not timezone.is_aware(task.run_after) + ): + raise InvalidTask("run_after must be an aware datetime.") + + if self.queues and task.queue_name not in self.queues: + raise InvalidTask(f"Queue '{task.queue_name}' is not valid for backend.") + + @abstractmethod + def enqueue(self, task, args, kwargs): + """Queue up a task to be executed.""" + + async def aenqueue(self, task, args, kwargs): + """Queue up a task function (or coroutine) to be executed.""" + return await sync_to_async(self.enqueue, thread_sensitive=True)( + task=task, args=args, kwargs=kwargs + ) + + def get_result(self, result_id): + """ + Retrieve a task result by id. + + Raise TaskResultDoesNotExist if such result does not exist. + """ + raise NotImplementedError( + "This backend does not support retrieving or refreshing results." + ) + + async def aget_result(self, result_id): + """See get_result().""" + return await sync_to_async(self.get_result, thread_sensitive=True)( + result_id=result_id + ) + + def check(self, **kwargs): + if self.enqueue_on_commit and not connections._settings: + yield checks.Error( + "ENQUEUE_ON_COMMIT cannot be used when no databases are configured.", + hint="Set ENQUEUE_ON_COMMIT to False", + id="tasks.E001", + ) + + elif ( + self.enqueue_on_commit + and not connections["default"].features.supports_transactions + ): + yield checks.Error( + "ENQUEUE_ON_COMMIT cannot be used on a database which doesn't support " + "transactions.", + hint="Set ENQUEUE_ON_COMMIT to False", + id="tasks.E002", + ) diff --git a/django/tasks/backends/dummy.py b/django/tasks/backends/dummy.py new file mode 100644 index 0000000000..93bb8f3ee4 --- /dev/null +++ b/django/tasks/backends/dummy.py @@ -0,0 +1,69 @@ +from copy import deepcopy +from functools import partial + +from django.db import transaction +from django.tasks.base import TaskResult, TaskResultStatus +from django.tasks.exceptions import TaskResultDoesNotExist +from django.tasks.signals import task_enqueued +from django.utils import timezone +from django.utils.crypto import get_random_string + +from .base import BaseTaskBackend + + +class DummyBackend(BaseTaskBackend): + supports_defer = True + supports_async_task = True + supports_priority = True + + def __init__(self, alias, params): + super().__init__(alias, params) + self.results = [] + + def _store_result(self, result): + object.__setattr__(result, "enqueued_at", timezone.now()) + self.results.append(result) + task_enqueued.send(type(self), task_result=result) + + def enqueue(self, task, args, kwargs): + self.validate_task(task) + + result = TaskResult( + task=task, + id=get_random_string(32), + status=TaskResultStatus.READY, + enqueued_at=None, + started_at=None, + last_attempted_at=None, + finished_at=None, + args=args, + kwargs=kwargs, + backend=self.alias, + errors=[], + worker_ids=[], + ) + + if self._get_enqueue_on_commit_for_task(task) is not False: + transaction.on_commit(partial(self._store_result, result)) + else: + self._store_result(result) + + # Copy the task to prevent mutation issues. + return deepcopy(result) + + def get_result(self, result_id): + # Results are only scoped to the current thread, hence + # supports_get_result is False. + try: + return next(result for result in self.results if result.id == result_id) + except StopIteration: + raise TaskResultDoesNotExist(result_id) from None + + async def aget_result(self, result_id): + try: + return next(result for result in self.results if result.id == result_id) + except StopIteration: + raise TaskResultDoesNotExist(result_id) from None + + def clear(self): + self.results.clear() diff --git a/django/tasks/backends/immediate.py b/django/tasks/backends/immediate.py new file mode 100644 index 0000000000..06b94d18ab --- /dev/null +++ b/django/tasks/backends/immediate.py @@ -0,0 +1,100 @@ +import logging +from functools import partial +from traceback import format_exception + +from django.db import transaction +from django.tasks.base import TaskContext, TaskError, TaskResult, TaskResultStatus +from django.tasks.signals import task_enqueued, task_finished, task_started +from django.utils import timezone +from django.utils.crypto import get_random_string +from django.utils.json import normalize_json + +from .base import BaseTaskBackend + +logger = logging.getLogger(__name__) + + +class ImmediateBackend(BaseTaskBackend): + supports_async_task = True + supports_priority = True + + def __init__(self, alias, params): + super().__init__(alias, params) + self.worker_id = get_random_string(32) + + def _execute_task(self, task_result): + """ + Execute the Task for the given TaskResult, mutating it with the + outcome. + """ + object.__setattr__(task_result, "enqueued_at", timezone.now()) + task_enqueued.send(type(self), task_result=task_result) + + task = task_result.task + task_start_time = timezone.now() + object.__setattr__(task_result, "status", TaskResultStatus.RUNNING) + object.__setattr__(task_result, "started_at", task_start_time) + object.__setattr__(task_result, "last_attempted_at", task_start_time) + task_result.worker_ids.append(self.worker_id) + task_started.send(sender=type(self), task_result=task_result) + + try: + if task.takes_context: + raw_return_value = task.call( + TaskContext(task_result=task_result), + *task_result.args, + **task_result.kwargs, + ) + else: + raw_return_value = task.call(*task_result.args, **task_result.kwargs) + + object.__setattr__( + task_result, + "_return_value", + normalize_json(raw_return_value), + ) + except KeyboardInterrupt: + # If the user tried to terminate, let them + raise + except BaseException as e: + object.__setattr__(task_result, "finished_at", timezone.now()) + exception_type = type(e) + task_result.errors.append( + TaskError( + exception_class_path=( + f"{exception_type.__module__}.{exception_type.__qualname__}" + ), + traceback="".join(format_exception(e)), + ) + ) + object.__setattr__(task_result, "status", TaskResultStatus.FAILED) + task_finished.send(type(self), task_result=task_result) + else: + object.__setattr__(task_result, "finished_at", timezone.now()) + object.__setattr__(task_result, "status", TaskResultStatus.SUCCESSFUL) + task_finished.send(type(self), task_result=task_result) + + def enqueue(self, task, args, kwargs): + self.validate_task(task) + + task_result = TaskResult( + task=task, + id=get_random_string(32), + status=TaskResultStatus.READY, + enqueued_at=None, + started_at=None, + last_attempted_at=None, + finished_at=None, + args=args, + kwargs=kwargs, + backend=self.alias, + errors=[], + worker_ids=[], + ) + + if self._get_enqueue_on_commit_for_task(task) is not False: + transaction.on_commit(partial(self._execute_task, task_result)) + else: + self._execute_task(task_result) + + return task_result diff --git a/django/tasks/base.py b/django/tasks/base.py new file mode 100644 index 0000000000..905dbef597 --- /dev/null +++ b/django/tasks/base.py @@ -0,0 +1,253 @@ +from dataclasses import dataclass, field, replace +from datetime import datetime +from inspect import isclass, iscoroutinefunction +from typing import Any, Callable, Dict, Optional + +from asgiref.sync import async_to_sync, sync_to_async + +from django.db.models.enums import TextChoices +from django.utils.json import normalize_json +from django.utils.module_loading import import_string +from django.utils.translation import pgettext_lazy + +from .exceptions import TaskResultMismatch + +DEFAULT_TASK_BACKEND_ALIAS = "default" +DEFAULT_TASK_PRIORITY = 0 +DEFAULT_TASK_QUEUE_NAME = "default" +TASK_MAX_PRIORITY = 100 +TASK_MIN_PRIORITY = -100 +TASK_REFRESH_ATTRS = { + "errors", + "_return_value", + "finished_at", + "started_at", + "last_attempted_at", + "status", + "enqueued_at", + "worker_ids", +} + + +class TaskResultStatus(TextChoices): + # The Task has just been enqueued, or is ready to be executed again. + READY = ("READY", pgettext_lazy("Task", "Ready")) + # The Task is currently running. + RUNNING = ("RUNNING", pgettext_lazy("Task", "Running")) + # The Task raised an exception during execution, or was unable to start. + FAILED = ("FAILED", pgettext_lazy("Task", "Failed")) + # The Task has finished running successfully. + SUCCESSFUL = ("SUCCESSFUL", pgettext_lazy("Task", "Successful")) + + +@dataclass(frozen=True, slots=True, kw_only=True) +class Task: + priority: int + func: Callable # The Task function. + backend: str + queue_name: str + run_after: Optional[datetime] # The earliest this Task will run. + + # Whether the Task will be enqueued when the current transaction commits, + # immediately, or whatever the backend decides. + enqueue_on_commit: Optional[bool] + + # Whether the Task receives the Task context when executed. + takes_context: bool = False + + def __post_init__(self): + self.get_backend().validate_task(self) + + @property + def name(self): + return self.func.__name__ + + def using( + self, + *, + priority=None, + queue_name=None, + run_after=None, + backend=None, + ): + """Create a new Task with modified defaults.""" + + changes = {} + if priority is not None: + changes["priority"] = priority + if queue_name is not None: + changes["queue_name"] = queue_name + if run_after is not None: + changes["run_after"] = run_after + if backend is not None: + changes["backend"] = backend + return replace(self, **changes) + + def enqueue(self, *args, **kwargs): + """Queue up the Task to be executed.""" + return self.get_backend().enqueue(self, args, kwargs) + + async def aenqueue(self, *args, **kwargs): + """Queue up the Task to be executed.""" + return await self.get_backend().aenqueue(self, args, kwargs) + + def get_result(self, result_id): + """ + Retrieve a task result by id. + + Raise TaskResultDoesNotExist if such result does not exist, or raise + TaskResultMismatch if the result exists but belongs to another Task. + """ + result = self.get_backend().get_result(result_id) + if result.task.func != self.func: + raise TaskResultMismatch( + f"Task does not match (received {result.task.module_path!r})" + ) + return result + + async def aget_result(self, result_id): + """See get_result().""" + result = await self.get_backend().aget_result(result_id) + if result.task.func != self.func: + raise TaskResultMismatch( + f"Task does not match (received {result.task.module_path!r})" + ) + return result + + def call(self, *args, **kwargs): + if iscoroutinefunction(self.func): + return async_to_sync(self.func)(*args, **kwargs) + return self.func(*args, **kwargs) + + async def acall(self, *args, **kwargs): + if iscoroutinefunction(self.func): + return await self.func(*args, **kwargs) + return await sync_to_async(self.func)(*args, **kwargs) + + def get_backend(self): + from . import task_backends + + return task_backends[self.backend] + + @property + def module_path(self): + return f"{self.func.__module__}.{self.func.__qualname__}" + + +def task( + function=None, + *, + priority=DEFAULT_TASK_PRIORITY, + queue_name=DEFAULT_TASK_QUEUE_NAME, + backend=DEFAULT_TASK_BACKEND_ALIAS, + enqueue_on_commit=None, + takes_context=False, +): + from . import task_backends + + def wrapper(f): + return task_backends[backend].task_class( + priority=priority, + func=f, + queue_name=queue_name, + backend=backend, + enqueue_on_commit=enqueue_on_commit, + takes_context=takes_context, + run_after=None, + ) + + if function: + return wrapper(function) + return wrapper + + +@dataclass(frozen=True, slots=True, kw_only=True) +class TaskError: + exception_class_path: str + traceback: str + + @property + def exception_class(self): + # Lazy resolve the exception class. + exception_class = import_string(self.exception_class_path) + + if not isclass(exception_class) or not issubclass( + exception_class, BaseException + ): + raise ValueError( + f"{self.exception_class_path!r} does not reference a valid exception." + ) + return exception_class + + +@dataclass(frozen=True, slots=True, kw_only=True) +class TaskResult: + task: Task + + id: str # Unique identifier for the task result. + status: TaskResultStatus + enqueued_at: Optional[datetime] # Time the task was enqueued. + started_at: Optional[datetime] # Time the task was started. + finished_at: Optional[datetime] # Time the task was finished. + + # Time the task was last attempted to be run. + last_attempted_at: Optional[datetime] + + args: list # Arguments to pass to the task function. + kwargs: Dict[str, Any] # Keyword arguments to pass to the task function. + backend: str + errors: list[TaskError] # Errors raised when running the task. + worker_ids: list[str] # Workers which have processed the task. + + _return_value: Optional[Any] = field(init=False, default=None) + + def __post_init__(self): + object.__setattr__(self, "args", normalize_json(self.args)) + object.__setattr__(self, "kwargs", normalize_json(self.kwargs)) + + @property + def return_value(self): + """ + The return value of the task. + + If the task didn't succeed, an exception is raised. + This is to distinguish against the task returning None. + """ + if self.status == TaskResultStatus.SUCCESSFUL: + return self._return_value + elif self.status == TaskResultStatus.FAILED: + raise ValueError("Task failed") + else: + raise ValueError("Task has not finished yet") + + @property + def is_finished(self): + return self.status in {TaskResultStatus.FAILED, TaskResultStatus.SUCCESSFUL} + + @property + def attempts(self): + return len(self.worker_ids) + + def refresh(self): + """Reload the cached task data from the task store.""" + refreshed_task = self.task.get_backend().get_result(self.id) + + for attr in TASK_REFRESH_ATTRS: + object.__setattr__(self, attr, getattr(refreshed_task, attr)) + + async def arefresh(self): + """ + Reload the cached task data from the task store + """ + refreshed_task = await self.task.get_backend().aget_result(self.id) + for attr in TASK_REFRESH_ATTRS: + object.__setattr__(self, attr, getattr(refreshed_task, attr)) + + +@dataclass(frozen=True, slots=True, kw_only=True) +class TaskContext: + task_result: TaskResult + + @property + def attempt(self): + return self.task_result.attempts diff --git a/django/tasks/checks.py b/django/tasks/checks.py new file mode 100644 index 0000000000..f0b26c5b0e --- /dev/null +++ b/django/tasks/checks.py @@ -0,0 +1,11 @@ +from django.core import checks + + +@checks.register +def check_tasks(app_configs=None, **kwargs): + """Checks all registered Task backends.""" + + from . import task_backends + + for backend in task_backends.all(): + yield from backend.check() diff --git a/django/tasks/exceptions.py b/django/tasks/exceptions.py new file mode 100644 index 0000000000..91bd8f4cd5 --- /dev/null +++ b/django/tasks/exceptions.py @@ -0,0 +1,21 @@ +from django.core.exceptions import ImproperlyConfigured + + +class TaskException(Exception): + """Base class for task-related exceptions. Do not raise directly.""" + + +class InvalidTask(TaskException): + """The provided Task is invalid.""" + + +class InvalidTaskBackend(ImproperlyConfigured): + """The provided Task backend is invalid.""" + + +class TaskResultDoesNotExist(TaskException): + """The requested TaskResult does not exist.""" + + +class TaskResultMismatch(TaskException): + """The requested TaskResult is invalid.""" diff --git a/django/tasks/signals.py b/django/tasks/signals.py new file mode 100644 index 0000000000..288fe08e32 --- /dev/null +++ b/django/tasks/signals.py @@ -0,0 +1,64 @@ +import logging +import sys + +from asgiref.local import Local + +from django.core.signals import setting_changed +from django.dispatch import Signal, receiver + +from .base import TaskResultStatus + +logger = logging.getLogger("django.tasks") + +task_enqueued = Signal() +task_finished = Signal() +task_started = Signal() + + +@receiver(setting_changed) +def clear_tasks_handlers(*, setting, **kwargs): + """Reset the connection handler whenever the settings change.""" + if setting == "TASKS": + from . import task_backends + + task_backends._settings = task_backends.settings = ( + task_backends.configure_settings(None) + ) + task_backends._connections = Local() + + +@receiver(task_enqueued) +def log_task_enqueued(sender, task_result, **kwargs): + logger.debug( + "Task id=%s path=%s enqueued backend=%s", + task_result.id, + task_result.task.module_path, + task_result.backend, + ) + + +@receiver(task_started) +def log_task_started(sender, task_result, **kwargs): + logger.info( + "Task id=%s path=%s state=%s", + task_result.id, + task_result.task.module_path, + task_result.status, + ) + + +@receiver(task_finished) +def log_task_finished(sender, task_result, **kwargs): + logger.log( + ( + logging.ERROR + if task_result.status == TaskResultStatus.FAILED + else logging.INFO + ), + "Task id=%s path=%s state=%s", + task_result.id, + task_result.task.module_path, + task_result.status, + # Signal is sent inside exception handlers, so exc_info() is available. + exc_info=sys.exc_info(), + ) diff --git a/django/utils/inspect.py b/django/utils/inspect.py index 4e065f0347..31f3cf994b 100644 --- a/django/utils/inspect.py +++ b/django/utils/inspect.py @@ -74,3 +74,13 @@ def method_has_no_args(meth): def func_supports_parameter(func, name): return any(param.name == name for param in _get_callable_parameters(func)) + + +def is_module_level_function(func): + if not inspect.isfunction(func) or inspect.isbuiltin(func): + return False + + if "" in func.__qualname__: + return False + + return True diff --git a/django/utils/json.py b/django/utils/json.py new file mode 100644 index 0000000000..e7389b70ed --- /dev/null +++ b/django/utils/json.py @@ -0,0 +1,19 @@ +from collections.abc import Mapping, Sequence + + +def normalize_json(obj): + """Recursively normalize an object into JSON-compatible types.""" + match obj: + case Mapping(): + return {normalize_json(k): normalize_json(v) for k, v in obj.items()} + case bytes(): + try: + return obj.decode("utf-8") + except UnicodeDecodeError: + raise ValueError(f"Unsupported value: {type(obj)}") + case str() | int() | float() | bool() | None: + return obj + case Sequence(): # str and bytes were already handled. + return [normalize_json(v) for v in obj] + case _: # Other types can't be serialized to JSON + raise TypeError(f"Unsupported type: {type(obj)}") diff --git a/docs/ref/checks.txt b/docs/ref/checks.txt index e1ea5bc753..138db8708e 100644 --- a/docs/ref/checks.txt +++ b/docs/ref/checks.txt @@ -597,6 +597,14 @@ Signals a lazy reference to the sender ``.``, but app ```` isn't installed or doesn't provide model ````. +Tasks +----- + +* **tasks.E001**: ``ENQUEUE_ON_COMMIT`` cannot be used when no databases are + configured. +* **tasks.E002**: ``ENQUEUE_ON_COMMIT`` cannot be used on a database which + doesn't support transactions. + Templates --------- diff --git a/docs/ref/index.txt b/docs/ref/index.txt index 3741b82aad..af32b131cb 100644 --- a/docs/ref/index.txt +++ b/docs/ref/index.txt @@ -26,6 +26,7 @@ API Reference schema-editor settings signals + tasks templates/index template-response unicode diff --git a/docs/ref/settings.txt b/docs/ref/settings.txt index c16547e72a..54957a726a 100644 --- a/docs/ref/settings.txt +++ b/docs/ref/settings.txt @@ -2766,6 +2766,82 @@ backend definition in :setting:`STORAGES`. Defining this setting overrides the default value and is *not* merged with it. +.. setting:: TASKS + +``TASKS`` +--------- + +.. versionadded:: 6.0 + +Default:: + + { + "default": { + "BACKEND": "django.tasks.backends.immediate.ImmediateBackend", + } + } + +A dictionary containing the settings for all Task backends to be used with +Django. It is a nested dictionary whose contents maps backend aliases to a +dictionary containing the options for each backend. + +The :setting:`TASKS` setting must configure a ``default`` backend; any number +of additional backends may also be specified. Depending on which backend is +used, other options may be required. The following options are available as +standard. + +.. setting:: TASKS-BACKEND + +``BACKEND`` +~~~~~~~~~~~ + +Default: ``''`` (Empty string) + +The Tasks backend to use. The built-in backends are: + +* ``'django.tasks.backends.dummy.DummyBackend'`` +* ``'django.tasks.backends.immediate.ImmediateBackend'`` + +You can use a backend that doesn't ship with Django by setting +:setting:`BACKEND ` to a fully-qualified path of a backend +class (i.e. ``mypackage.backends.whatever.WhateverBackend``). + +.. setting:: TASKS-ENQUEUE_ON_COMMIT + +``ENQUEUE_ON_COMMIT`` +~~~~~~~~~~~~~~~~~~~~~ + +Default: ``True`` + +Whether to enqueue a Task only after the current transaction, if any, commits +successfully, instead of enqueueing immediately. + +This can also be configured on a per-Task basis. + +See :ref:`Task transactions ` for more information. + +.. setting:: TASKS-QUEUES + +``QUEUES`` +~~~~~~~~~~ + +Default: ``["default"]`` + +Specify the queue names supported by the backend. This can be used to ensure +Tasks aren't enqueued to queues which do not exist. + +To disable queue name validation, set to an empty list (``[]``). + +.. setting:: TASKS-OPTIONS + +``OPTIONS`` +~~~~~~~~~~~ + +Default: ``{}`` + +Extra parameters to pass to the Task backend. Available parameters vary +depending on the Task backend. + .. setting:: TEMPLATES ``TEMPLATES`` diff --git a/docs/ref/signals.txt b/docs/ref/signals.txt index 6dc5122f96..82b92e12c2 100644 --- a/docs/ref/signals.txt +++ b/docs/ref/signals.txt @@ -703,3 +703,60 @@ Arguments sent with this signal: The database connection that was opened. This can be used in a multiple-database configuration to differentiate connection signals from different databases. + +Tasks signals +============= + +.. versionadded:: 6.0 + +Signals sent by the :doc:`tasks ` framework. + +``task_enqueued`` +----------------- + +.. data:: django.tasks.signals.task_enqueued + :module: + +Sent once a Task has been enqueued. If +:attr:`django.tasks.Task.enqueue_on_commit` is set, the signal is only sent +once the transaction commits successfully. + +Arguments sent with this signal: + +``sender`` + The backend class which the Task was enqueued on to. + +``task_result`` + The enqueued :class:`TaskResult `. + +``task_started`` +---------------- + +.. data:: django.tasks.signals.task_started + :module: + +Sent when a Task has started executing. + +Arguments sent with this signal: + +``sender`` + The backend class which the Task was enqueued on to. + +``task_result`` + The started :class:`TaskResult `. + +``task_finished`` +----------------- + +.. data:: django.tasks.signals.task_finished + :module: + +Sent once a Task has finished executing, successfully or otherwise. + +Arguments sent with this signal: + +``sender`` + The backend class which the Task was enqueued on to. + +``task_result`` + The finished :class:`TaskResult `. diff --git a/docs/ref/tasks.txt b/docs/ref/tasks.txt new file mode 100644 index 0000000000..3134243d40 --- /dev/null +++ b/docs/ref/tasks.txt @@ -0,0 +1,444 @@ +===== +Tasks +===== + +.. versionadded:: 6.0 + +.. module:: django.tasks + :synopsis: Django's built-in background Task system. + +Task definition +=============== + +The ``task`` decorator +---------------------- + +.. function:: task(*, priority=0, queue_name="default", backend="default", enqueue_on_commit=None, takes_context=False) + + The ``@task`` decorator defines a :class:`Task` instance. This has the + following optional arguments: + + * ``priority``: Sets the :attr:`~Task.priority` of the ``Task``. Defaults + to 0. + * ``queue_name``: Sets the :attr:`~Task.queue_name` of the ``Task``. + Defaults to ``"default"``. + * ``backend``: Sets the :attr:`~Task.backend` of the ``Task``. Defaults to + ``"default"``. + * ``enqueue_on_commit``: Sets :attr:`~Task.enqueue_on_commit` for the + ``Task``. Defaults to ``None``. + * ``takes_context``: Controls whether the ``Task`` function accepts a + :class:`TaskContext`. Defaults to ``False``. See :ref:`Task context + ` for details. + + If the defined ``Task`` is not valid according to the backend, + :exc:`~django.tasks.exceptions.InvalidTask` is raised. + + See :ref:`defining tasks ` for usage examples. + +``Task`` +-------- + +.. class:: Task + + Represents a Task to be run in the background. Tasks should be defined + using the :func:`task` decorator. + + Attributes of ``Task`` cannot be modified. See :ref:`modifying Tasks + ` for details. + + .. attribute:: Task.priority + + The priority of the ``Task``. Priorities must be between -100 and 100, + where larger numbers are higher priority, and will be run sooner. + + The backend must have :attr:`.supports_priority` set to ``True`` to use + this feature. + + .. attribute:: Task.backend + + The alias of the backend the ``Task`` should be enqueued to. This must + match a backend defined in :setting:`BACKEND `. + + .. attribute:: Task.queue_name + + The name of the queue the ``Task`` will be enqueued on to. Defaults to + ``"default"``. This must match a queue defined in + :setting:`QUEUES `, unless + :setting:`QUEUES ` is set to ``[]``. + + .. attribute:: Task.run_after + + The earliest time the ``Task`` will be executed. This can be a + :class:`timedelta `, which is used relative to the + current time, a timezone-aware :class:`datetime `, + or ``None`` if not constrained. Defaults to ``None``. + + The backend must have :attr:`.supports_defer` set to ``True`` to use + this feature. Otherwise, + :exc:`~django.tasks.exceptions.InvalidTask` is raised. + + .. attribute:: Task.enqueue_on_commit + + Whether the ``Task`` should be enqueued when the transaction commits + successfully, or immediately. Defaults to :setting:`ENQUEUE_ON_COMMIT + ` for the backend. + + See :ref:`Task transactions ` for more information. + + .. attribute:: Task.name + + The name of the function decorated with :func:`task`. This name is not + necessarily unique. + + .. method:: Task.using(*, priority=None, backend=None, queue_name=None, run_after=None) + + Creates a new ``Task`` with modified defaults. The existing ``Task`` is + left unchanged. + + ``using`` allows modifying the following attributes: + + * :attr:`priority ` + * :attr:`backend ` + * :attr:`queue_name ` + * :attr:`run_after ` + + See :ref:`modifying Tasks ` for usage examples. + + .. method:: Task.enqueue(*args, **kwargs) + + Enqueues the ``Task`` to the ``Task`` backend for later execution. + + Arguments are passed to the ``Task``'s function after a round-trip + through a :func:`json.dumps`/:func:`json.loads` cycle. Hence, all + arguments must be JSON-serializable and preserve their type after the + round-trip. + + If the ``Task`` is not valid according to the backend, + :exc:`~django.tasks.exceptions.InvalidTask` is raised. + + See :ref:`enqueueing Tasks ` for usage examples. + + .. method:: Task.aenqueue(*args, **kwargs) + + The ``async`` variant of :meth:`enqueue `. + + .. method:: Task.get_result(result_id) + + Retrieves a result by its id. + + If the result does not exist, :exc:`TaskResultDoesNotExist + ` is raised. If the + result is not the same type as the current Task, + :exc:`TaskResultMismatch ` + is raised. If the backend does not support ``get_result()``, + :exc:`NotImplementedError` is raised. + + .. method:: Task.aget_result(*args, **kwargs) + + The ``async`` variant of :meth:`get_result `. + +Task context +============ + +.. class:: TaskContext + + Contains context for the running :class:`Task`. Context only passed to a + ``Task`` if it was defined with ``takes_context=True``. + + Attributes of ``TaskContext`` cannot be modified. + + .. attribute:: TaskContext.task_result + + The :class:`TaskResult` currently being run. + + .. attribute:: TaskContext.attempt + + The number of the current execution attempts for this Task, starting at + 1. + +Task results +============ + +.. class:: TaskResultStatus + + An Enum representing the status of a :class:`TaskResult`. + + .. attribute:: TaskResultStatus.READY + + The :class:`Task` has just been enqueued, or is ready to be executed + again. + + .. attribute:: TaskResultStatus.RUNNING + + The :class:`Task` is currently being executed. + + .. attribute:: TaskResultStatus.FAILED + + The :class:`Task` raised an exception during execution, or was unable + to start. + + .. attribute:: TaskResultStatus.SUCCESSFUL + + The :class:`Task` has finished executing successfully. + +.. class:: TaskResult + + The ``TaskResult`` stores the information about a specific execution of a + :class:`Task`. + + Attributes of ``TaskResult`` cannot be modified. + + .. attribute:: TaskResult.task + + The :class:`Task` the result was enqueued for. + + .. attribute:: TaskResult.id + + A unique identifier for the result, which can be passed to + :meth:`Task.get_result`. + + The format of the id will depend on the backend being used. Task result + ids are always strings less than 64 characters. + + See :ref:`Task results ` for more details. + + .. attribute:: TaskResult.status + + The :class:`status ` of the result. + + .. attribute:: TaskResult.enqueued_at + + The time when the ``Task`` was enqueued. + + If :attr:`Task.enqueue_on_commit` was set, this is the time the + transaction committed. + + .. attribute:: TaskResult.started_at + + The time when the ``Task`` began execution, on its first attempt. + + .. attribute:: TaskResult.last_attempted_at + + The time when the most recent ``Task`` run began execution. + + .. attribute:: TaskResult.finished_at + + The time when the ``Task`` finished execution, whether it failed or + succeeded. + + .. attribute:: TaskResult.backend + + The backend the result is from. + + .. attribute:: TaskResult.errors + + A list of :class:`TaskError` instances for the errors raised as part of + each execution of the Task. + + .. attribute:: TaskResult.return_value + + The return value from the ``Task`` function. + + If the ``Task`` did not finish successfully, :exc:`ValueError` is + raised. + + See :ref:`return values ` for usage examples. + + .. method:: TaskResult.refresh + + Refresh the result's attributes from the queue store. + + .. method:: TaskResult.arefresh + + The ``async`` variant of :meth:`TaskResult.refresh`. + + .. attribute:: TaskResult.is_finished + + Whether the ``Task`` has finished (successfully or not). + + .. attribute:: TaskResult.attempts + + The number of times the Task has been run. + + If the task is currently running, it does not count as an attempt. + + .. attribute:: TaskResult.worker_ids + + The ids of the workers which have executed the Task. + + +Task errors +----------- + +.. class:: TaskError + + Contains information about the error raised during the execution of a + ``Task``. + + .. attribute:: TaskError.traceback + + The traceback (as a string) from the raised exception when the ``Task`` + failed. + + .. attribute:: TaskError.exception_class + + The exception class raised when executing the ``Task``. + +Backends +======== + +Base backend +------------ + +.. module:: django.tasks.backends.base + +.. class:: BaseTaskBackend + + ``BaseTaskBackend`` is the parent class for all Task backends. + + .. attribute:: BaseTaskBackend.options + + A dictionary of extra parameters for the Task backend. These are + provided using the :setting:`OPTIONS ` setting. + + .. method:: BaseTaskBackend.enqueue(task, args, kwargs) + + Task backends which subclass ``BaseTaskBackend`` should implement this + method as a minimum. + + When implemented, ``enqueue()`` enqueues the ``task``, a :class:`.Task` + instance, for later execution. ``args`` are the positional arguments + and ``kwargs`` are the keyword arguments to be passed to the ``task``. + Returns a :class:`~django.tasks.TaskResult`. + + .. method:: BaseTaskBackend.aenqueue(task, args, kwargs) + + The ``async`` variant of :meth:`BaseTaskBackend.enqueue`. + + .. method:: BaseTaskBackend.get_result(result_id) + + Retrieve a result by its id. If the result does not exist, + :exc:`TaskResultDoesNotExist + ` is raised. + + If the backend does not support ``get_result()``, + :exc:`NotImplementedError` is raised. + + .. method:: BaseTaskBackend.aget_result(result_id) + + The ``async`` variant of :meth:`BaseTaskBackend.get_result`. + + .. method:: BaseTaskBackend.validate_task(task) + + Validates whether the provided ``Task`` is able to be enqueued using + the backend. If the Task is not valid, + :exc:`InvalidTask ` + is raised. + +Feature flags +~~~~~~~~~~~~~ + +Some backends may not support all features Django provides. It's possible to +identify the supported functionality of a backend, and potentially change +behavior accordingly. + +.. attribute:: BaseTaskBackend.supports_defer + + Whether the backend supports enqueueing Tasks to be executed after a + specific time using the :attr:`~django.tasks.Task.run_after` attribute. + +.. attribute:: BaseTaskBackend.supports_async_task + + Whether the backend supports enqueueing async functions (coroutines). + +.. attribute:: BaseTaskBackend.supports_get_result + + Whether the backend supports retrieving ``Task`` results from another + thread after they have been enqueued. + +.. attribute:: BaseTaskBackend.supports_priority + + Whether the backend supports executing Tasks as ordered by their + :attr:`~django.tasks.Task.priority`. + +The below table notes which of the :ref:`built-in backends +` support which features: + +============================ ======================= =========================== +Feature :class:`.DummyBackend` :class:`.ImmediateBackend` +============================ ======================= =========================== +:attr:`.supports_defer` Yes No +:attr:`.supports_async_task` Yes Yes +:attr:`.supports_get_result` No No [#fnimmediateresult]_ +:attr:`.supports_priority` Yes [#fndummypriority]_ Yes [#fnimmediatepriority]_ +============================ ======================= =========================== + +.. _task-available-backends: + +Available backends +------------------ + +Immediate backend +~~~~~~~~~~~~~~~~~ + +.. module:: django.tasks.backends.immediate + +.. class:: ImmediateBackend + + The :ref:`immediate backend ` executes Tasks + immediately, rather than in the background. + +Dummy backend +~~~~~~~~~~~~~ + +.. module:: django.tasks.backends.dummy + +.. class:: DummyBackend + + The :ref:`dummy backend ` does not execute enqueued + Tasks. Instead, it stores task results for later inspection. + + .. attribute:: DummyBackend.results + + A list of results for the enqueued Tasks, in the order they were + enqueued. + + .. method:: DummyBackend.clear + + Clears the list of stored results. + +Exceptions +========== + +.. module:: django.tasks.exceptions + +.. exception:: InvalidTask + + Raised when the :class:`.Task` attempting to be enqueued + is invalid. + +.. exception:: InvalidTaskBackend + + Raised when the requested :class:`.BaseTaskBackend` is invalid. + +.. exception:: TaskResultDoesNotExist + + Raised by :meth:`~django.tasks.backends.base.BaseTaskBackend.get_result` + when the provided ``result_id`` does not exist. + +.. exception:: TaskResultMismatch + + Raised by :meth:`~django.tasks.Task.get_result` when the provided + ``result_id`` is for a different Task than the current Task. + +.. rubric:: Footnotes +.. [#fnimmediateresult] The :class:`.ImmediateBackend` doesn't officially + support ``get_result()``, despite implementing the API, since the result + cannot be retrieved from a different thread. +.. [#fndummypriority] The :class:`.DummyBackend` has ``supports_priority=True`` + so that it can be used as a drop-in replacement in tests. Since this + backend never executes Tasks, the ``priority`` value has no effect. +.. [#fnimmediatepriority] The :class:`.ImmediateBackend` has + ``supports_priority=True`` so that it can be used as a drop-in replacement + in tests. Because Tasks run as soon as they are scheduled, the ``priority`` + value has no effect. diff --git a/docs/releases/6.0.txt b/docs/releases/6.0.txt index fba0935a2b..8f0bf321a5 100644 --- a/docs/releases/6.0.txt +++ b/docs/releases/6.0.txt @@ -112,6 +112,45 @@ A `migration guide`_ is available if you're updating from the .. _migration guide: https://github.com/carltongibson/django-template-partials/blob/main/Migration.md +Background Tasks +---------------- + +Django now includes a built-in Tasks framework for running code outside the +HTTP request–response cycle. This enables offloading work, such as sending +emails or processing data, to background workers. + +Tasks are defined using the :func:`~django.tasks.task` decorator:: + + from django.core.mail import send_mail + from django.tasks import task + + + @task + def email_users(emails, subject, message): + return send_mail(subject, message, None, emails) + +Once defined, tasks can be enqueued through a configured backend:: + + email_users.enqueue( + emails=["user@example.com"], + subject="You have a message", + message="Hello there!", + ) + +Backends are configured via the :setting:`TASKS` setting. Django provides +two built-in backends, primarily for development and testing: + +* :class:`~django.tasks.backends.immediate.ImmediateBackend`: executes tasks + immediately in the same process. +* :class:`~django.tasks.backends.dummy.DummyBackend`: stores tasks without + running them, leaving results in the + :attr:`~django.tasks.TaskResultStatus.READY` state. + +Django only handles task creation and queuing; it does not provide a worker +mechanism to run tasks. Execution must be managed by external infrastructure, +such as a separate process or service. See :doc:`/topics/tasks` for an +overview, and the :doc:`Tasks reference ` for API details. + Minor features -------------- diff --git a/docs/spelling_wordlist b/docs/spelling_wordlist index 864b99f84a..2898f85d5b 100644 --- a/docs/spelling_wordlist +++ b/docs/spelling_wordlist @@ -152,6 +152,7 @@ editability encodings Endian Enero +enqueueing enum environ esque diff --git a/docs/topics/index.txt b/docs/topics/index.txt index 4f837c81e2..59484d9799 100644 --- a/docs/topics/index.txt +++ b/docs/topics/index.txt @@ -33,3 +33,4 @@ Introductions to all the key parts of Django you'll need to know: checks external-packages async + tasks diff --git a/docs/topics/tasks.txt b/docs/topics/tasks.txt new file mode 100644 index 0000000000..17c233d595 --- /dev/null +++ b/docs/topics/tasks.txt @@ -0,0 +1,438 @@ +======================== +Django's Tasks framework +======================== + +.. versionadded:: 6.0 + +For a web application, there's often more than just turning HTTP requests into +HTTP responses. For some functionality, it may be beneficial to run code +outside the request-response cycle. + +That's where background Tasks come in. + +Background Tasks can offload work to be run outside the request-response cycle, +to be run elsewhere, potentially at a later date. This keeps requests fast, +reduces latency, and improves the user experience. For example, a user +shouldn't have to wait for an email to send before their page finishes loading. + +Django's new Tasks framework makes it easy to define and enqueue such work. It +does not provide a worker mechanism to run Tasks. The actual execution must be +handled by infrastructure outside Django, such as a separate process or +service. + +Background Task fundamentals +============================ + +When work needs to be done in the background, Django creates a ``Task``, which +is stored in the Queue Store. This ``Task`` contains all the metadata needed to +execute it, as well as a unique identifier for Django to retrieve the result +later. + +A Worker will look at the Queue Store for new Tasks to run. When a new Task is +added, a Worker claims the Task, executes it, and saves the status and result +back to the Queue Store. These workers run outside the request-response +lifecycle. + +.. _configuring-a-task-backend: + +Configuring a Task backend +========================== + +The Task backend determines how and where Tasks are stored for execution and +how they are executed. Different Task backends have different characteristics +and configuration options, which may impact the performance and reliability of +your application. Django comes with a number of :ref:`built-in backends +`. Django does not provide a generic way to execute +Tasks, only enqueue them. + +Task backends are configured using the :setting:`TASKS` setting in your +settings file. Whilst most applications will only need a single backend, +multiple are supported. + +.. _immediate-task-backend: + +Immediate execution +------------------- + +This is the default backend if another is not specified in your settings file. +The :class:`.ImmediateBackend` runs enqueued Tasks immediately, rather than in +the background. This allows background Task functionality to be slowly added to +an application, before the required infrastructure is available. + +To use it, set :setting:`BACKEND ` to +``"django.tasks.backends.immediate.ImmediateBackend"``:: + + TASKS = {"default": {"BACKEND": "django.tasks.backends.immediate.ImmediateBackend"}} + +The :class:`.ImmediateBackend` may also be useful in tests, to bypass the need +to run a real background worker in your tests. + +.. admonition:: ``ImmediateBackend`` and ``ENQUEUE_ON_COMMIT`` + + When :setting:`ENQUEUE_ON_COMMIT ` is ``False``, + the Task will be executed within the same transaction it was enqueued in. + + See :ref:`Task transactions ` for more information. + +.. _dummy-task-backend: + +Dummy backend +------------- + +The :class:`.DummyBackend` doesn't execute enqueued Tasks at all, instead +storing results for later use. Task results will forever remain in the +:attr:`~django.tasks.TaskResultStatus.READY` state. + +This backend is not intended for use in production - it is provided as a +convenience that can be used during development and testing. + +To use it, set :setting:`BACKEND ` to +``"django.tasks.backends.dummy.DummyBackend"``:: + + TASKS = {"default": {"BACKEND": "django.tasks.backends.dummy.DummyBackend"}} + +The results for enqueued Tasks can be retrieved from the backend's +:attr:`~django.tasks.backends.dummy.DummyBackend.results` attribute: + +.. code-block:: pycon + + >>> from django.tasks import default_task_backend + >>> my_task.enqueue() + >>> len(default_task_backend.results) + 1 + +Stored results can be cleared using the +:meth:`~django.tasks.backends.dummy.DummyBackend.clear` method: + +.. code-block:: pycon + + >>> default_task_backend.clear() + >>> len(default_task_backend.results) + 0 + +Using a custom backend +---------------------- + +While Django includes support for a number of Task backends out-of-the-box, +sometimes you might want to customize the Task backend. To use an external Task +backend with Django, use the Python import path as the :setting:`BACKEND +` of the :setting:`TASKS` setting, like so:: + + TASKS = { + "default": { + "BACKEND": "path.to.backend", + } + } + +A Task backend is a class that inherits +:class:`~django.tasks.backends.base.BaseTaskBackend`. At a minimum, it must +implement :meth:`.BaseTaskBackend.enqueue`. If you're building your own +backend, you can use the built-in Task backends as reference implementations. +You'll find the code in the :source:`django/tasks/backends/` directory of the +Django source. + +Asynchronous support +-------------------- + +Django has developing support for asynchronous Task backends. + +:class:`~django.tasks.backends.base.BaseTaskBackend` has async variants of all +base methods. By convention, the asynchronous versions of all methods are +prefixed with ``a``. The arguments for both variants are the same. + +Retrieving backends +------------------- + +Backends can be retrieved using the ``task_backends`` connection handler:: + + from django.tasks import task_backends + + task_backends["default"] # The default backend + task_backends["reserve"] # Another backend + +The "default" backend is available as ``default_task_backend``:: + + from django.tasks import default_task_backend + +.. _defining-tasks: + +Defining Tasks +============== + +Tasks are defined using the :meth:`django.tasks.task` decorator on a +module-level function:: + + from django.core.mail import send_mail + from django.tasks import task + + + @task + def email_users(emails, subject, message): + return send_mail( + subject=subject, message=message, from_email=None, recipient_list=emails + ) + + +The return value of the decorator is a :class:`~django.tasks.Task` instance. + +:class:`~django.tasks.Task` attributes can be customized via the ``@task`` +decorator arguments:: + + from django.core.mail import send_mail + from django.tasks import task + + + @task(priority=2, queue_name="emails", enqueue_on_commit=True) + def email_users(emails, subject, message): + return send_mail( + subject=subject, message=message, from_email=None, recipient_list=emails + ) + +By convention, Tasks are defined in a ``tasks.py`` file, however this is not +enforced. + +.. _task-context: + +Task context +------------ + +Sometimes, the running ``Task`` may need to know context about how it was +enqueued, and how it is being executed. This can be accessed by taking a +``context`` argument, which is an instance of +:class:`~django.tasks.TaskContext`. + +To receive the Task context as an argument to your Task function, pass +``takes_context`` when defining it:: + + import logging + from django.core.mail import send_mail + from django.tasks import task + + + logger = logging.getLogger(__name__) + + + @task(takes_context=True) + def email_users(context, emails, subject, message): + logger.debug( + f"Attempt {context.attempt} to send user email. Task result id: {context.task_result.id}." + ) + return send_mail( + subject=subject, message=message, from_email=None, recipient_list=emails + ) + +.. _modifying-tasks: + +Modifying Tasks +--------------- + +Before enqueueing Tasks, it may be necessary to modify certain parameters of +the Task. For example, to give it a higher priority than it would normally. + +A ``Task`` instance cannot be modified directly. Instead, a modified instance +can be created with the :meth:`~django.tasks.Task.using` method, leaving the +original as-is. For example: + +.. code-block:: pycon + + >>> email_users.priority + 0 + >>> email_users.using(priority=10).priority + 10 + +.. _enqueueing-tasks: + +Enqueueing Tasks +================ + +To add the Task to the queue store, so it will be executed, call the +:meth:`~django.tasks.Task.enqueue` method on it. If the Task takes arguments, +these can be passed as-is. For example:: + + result = email_users.enqueue( + emails=["user@example.com"], + subject="You have a message", + message="Hello there!", + ) + +This returns a :class:`~django.tasks.TaskResult`, which can be used to retrieve +the result of the Task once it has finished executing. + +To enqueue Tasks in an ``async`` context, :meth:`~django.tasks.Task.aenqueue` +is available as an ``async`` variant of :meth:`~django.tasks.Task.enqueue`. + +Because both Task arguments and return values are serialized to JSON, they must +be JSON-serializable: + +.. code-block:: pycon + + >>> process_data.enqueue(datetime.now()) + Traceback (most recent call last): + ... + TypeError: Object of type datetime is not JSON serializable + +Arguments must also be able to round-trip through a :func:`json.dumps`/ +:func:`json.loads` cycle without changing type. For example, consider this +Task:: + + @task() + def double_dictionary(key): + return {key: key * 2} + +With the ``ImmediateBackend`` configured as the default backend: + +.. code-block:: pycon + + >>> result = double_dictionary.enqueue((1, 2, 3)) + >>> result.status + FAILED + >>> result.errors[0].traceback + Traceback (most recent call last): + ... + TypeError: unhashable type: 'list' + +The ``double_dictionary`` Task fails because after the JSON round-trip the +tuple ``(1, 2, 3)`` becomes the list ``[1, 2, 3]``, which cannot be used as a +dictionary key. + +In general, complex objects such as model instances, or built-in types like +``datetime`` and ``tuple`` cannot be used in Tasks without additional +conversion. + +.. _task-transactions: + +Transactions +------------ + +By default, Tasks are enqueued after the current database transaction (if there +is one) commits successfully (using :meth:`transaction.on_commit() +`), rather than enqueueing immediately. For +most backends, Tasks are run in a separate process, using a different database +connection. Without waiting for the transaction to commit, workers could start +to process a Task which uses objects it can't access yet. + +This behavior can be changed by changing the :setting:`TASKS-ENQUEUE_ON_COMMIT` +setting for the backend, or for a specific Task using the ``enqueue_on_commit`` +parameter. + +For example, consider this simplified example:: + + @task + def my_task(): + Thing.objects.get() + + + with transaction.atomic(): + Thing.objects.create() + my_task.enqueue() + + +If :setting:`ENQUEUE_ON_COMMIT ` is ``False``, then it +is possible for ``my_task`` to run before the ``Thing`` is committed to the +database, and the Task won't be able to see the created object within your +transaction. + +.. _task-results: + +Task results +============ + +When enqueueing a ``Task``, you receive a :class:`~django.tasks.TaskResult`, +however it's likely useful to retrieve the result from somewhere else (for +example another request or another Task). + +Each ``TaskResult`` has a unique :attr:`~django.tasks.TaskResult.id`, which can +be used to identify and retrieve the result once the code which enqueued the +Task has finished. + +The :meth:`~django.tasks.Task.get_result` method can retrieve a result based on +its ``id``:: + + # Later, somewhere else... + result = email_users.get_result(result_id) + +To retrieve a ``TaskResult``, regardless of which kind of ``Task`` it was from, +use the :meth:`~django.tasks.Task.get_result` method on the backend:: + + from django.tasks import default_task_backend + + result = default_task_backend.get_result(result_id) + +To retrieve results in an ``async`` context, +:meth:`~django.tasks.Task.aget_result` is available as an ``async`` variant of +:meth:`~django.tasks.Task.get_result` on both the backend and ``Task``. + +Some backends, such as the built-in ``ImmediateBackend`` do not support +``get_result()``. Calling ``get_result()`` on these backends will +raise :exc:`NotImplementedError`. + +Updating results +---------------- + +A ``TaskResult`` contains the status of a Task's execution at the point it was +retrieved. If the Task finishes after :meth:`~django.tasks.Task.get_result` is +called, it will not update. + +To refresh the values, call the :meth:`django.tasks.TaskResult.refresh` +method: + +.. code-block:: pycon + + >>> result.status + RUNNING + >>> result.refresh() # or await result.arefresh() + >>> result.status + SUCCESSFUL + +.. _task-return-values: + +Return values +------------- + +If your Task function returns something, it can be retrieved from the +:attr:`django.tasks.TaskResult.return_value` attribute: + +.. code-block:: pycon + + >>> result.status + SUCCESSFUL + >>> result.return_value + 42 + +If the Task has not finished executing, or has failed, :exc:`ValueError` is +raised. + +.. code-block:: pycon + + >>> result.status + RUNNING + >>> result.return_value + Traceback (most recent call last): + ... + ValueError: Task has not finished yet + +Errors +------ + +If the Task doesn't succeed, and instead raises an exception, either as part of +the Task or as part of running it, the exception and traceback are saved to the +:attr:`django.tasks.TaskResult.errors` list. + +Each entry in ``errors`` is a :class:`~django.tasks.TaskError` containing +information about error raised during the execution: + +.. code-block:: pycon + + >>> result.errors[0].exception_class + + +Note that this is just the type of exception, and contains no other values. The +traceback information is reduced to a string which you can use to help +debugging: + +.. code-block:: pycon + + >>> result.errors[0].traceback + Traceback (most recent call last): + ... + TypeError: Object of type datetime is not JSON serializable diff --git a/tests/tasks/__init__.py b/tests/tasks/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/tasks/tasks.py b/tests/tasks/tasks.py new file mode 100644 index 0000000000..3959660ab9 --- /dev/null +++ b/tests/tasks/tasks.py @@ -0,0 +1,88 @@ +import time + +from django.tasks import TaskContext, task + + +@task() +def noop_task(*args, **kwargs): + return None + + +@task +def noop_task_from_bare_decorator(*args, **kwargs): + return None + + +@task() +async def noop_task_async(*args, **kwargs): + return None + + +@task() +def calculate_meaning_of_life(): + return 42 + + +@task() +def failing_task_value_error(): + raise ValueError("This Task failed due to ValueError") + + +@task() +def failing_task_system_exit(): + raise SystemExit("This Task failed due to SystemExit") + + +@task() +def failing_task_keyboard_interrupt(): + raise KeyboardInterrupt("This Task failed due to KeyboardInterrupt") + + +@task() +def complex_exception(): + raise ValueError(ValueError("This task failed")) + + +@task() +def complex_return_value(): + # Return something which isn't JSON serializable nor picklable. + return lambda: True + + +@task() +def exit_task(): + exit(1) + + +@task(enqueue_on_commit=True) +def enqueue_on_commit_task(): + pass + + +@task(enqueue_on_commit=False) +def never_enqueue_on_commit_task(): + pass + + +@task() +def hang(): + """ + Do nothing for 5 minutes + """ + time.sleep(300) + + +@task() +def sleep_for(seconds): + time.sleep(seconds) + + +@task(takes_context=True) +def get_task_id(context): + return context.task_result.id + + +@task(takes_context=True) +def test_context(context, attempt): + assert isinstance(context, TaskContext) + assert context.attempt == attempt diff --git a/tests/tasks/test_custom_backend.py b/tests/tasks/test_custom_backend.py new file mode 100644 index 0000000000..83a302a183 --- /dev/null +++ b/tests/tasks/test_custom_backend.py @@ -0,0 +1,71 @@ +import logging +from unittest import mock + +from django.tasks import default_task_backend, task_backends +from django.tasks.backends.base import BaseTaskBackend +from django.tasks.exceptions import InvalidTask +from django.test import SimpleTestCase, override_settings + +from . import tasks as test_tasks + + +class CustomBackend(BaseTaskBackend): + def __init__(self, alias, params): + super().__init__(alias, params) + self.prefix = self.options.get("prefix", "") + + def enqueue(self, *args, **kwargs): + logger = logging.getLogger(__name__) + logger.info(f"{self.prefix}Task enqueued.") + + +class CustomBackendNoEnqueue(BaseTaskBackend): + pass + + +@override_settings( + TASKS={ + "default": { + "BACKEND": f"{CustomBackend.__module__}.{CustomBackend.__qualname__}", + "ENQUEUE_ON_COMMIT": False, + "OPTIONS": {"prefix": "PREFIX: "}, + }, + "no_enqueue": { + "BACKEND": f"{CustomBackendNoEnqueue.__module__}." + f"{CustomBackendNoEnqueue.__qualname__}", + }, + } +) +class CustomBackendTestCase(SimpleTestCase): + def test_using_correct_backend(self): + self.assertEqual(default_task_backend, task_backends["default"]) + self.assertIsInstance(task_backends["default"], CustomBackend) + self.assertEqual(default_task_backend.alias, "default") + self.assertEqual(default_task_backend.options, {"prefix": "PREFIX: "}) + + @mock.patch.multiple(CustomBackend, supports_async_task=False) + def test_enqueue_async_task_on_non_async_backend(self): + with self.assertRaisesMessage( + InvalidTask, "Backend does not support async Tasks." + ): + default_task_backend.validate_task(test_tasks.noop_task_async) + + def test_backend_does_not_support_priority(self): + with self.assertRaisesMessage( + InvalidTask, "Backend does not support setting priority of tasks." + ): + test_tasks.noop_task.using(priority=10) + + def test_options(self): + with self.assertLogs(__name__, level="INFO") as captured_logs: + test_tasks.noop_task.enqueue() + self.assertEqual(len(captured_logs.output), 1) + self.assertIn("PREFIX: Task enqueued", captured_logs.output[0]) + + def test_no_enqueue(self): + with self.assertRaisesMessage( + TypeError, + "Can't instantiate abstract class CustomBackendNoEnqueue " + "without an implementation for abstract method 'enqueue'", + ): + test_tasks.noop_task.using(backend="no_enqueue") diff --git a/tests/tasks/test_dummy_backend.py b/tests/tasks/test_dummy_backend.py new file mode 100644 index 0000000000..27205f6ab3 --- /dev/null +++ b/tests/tasks/test_dummy_backend.py @@ -0,0 +1,337 @@ +from typing import cast +from unittest import mock + +from django.db import transaction +from django.db.utils import ConnectionHandler +from django.tasks import TaskResultStatus, default_task_backend, task_backends +from django.tasks.backends.dummy import DummyBackend +from django.tasks.base import Task +from django.tasks.exceptions import InvalidTask, TaskResultDoesNotExist +from django.test import ( + SimpleTestCase, + TransactionTestCase, + override_settings, + skipIfDBFeature, +) + +from . import tasks as test_tasks + + +@override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.dummy.DummyBackend", + "QUEUES": [], + "ENQUEUE_ON_COMMIT": False, + } + } +) +class DummyBackendTestCase(SimpleTestCase): + def setUp(self): + default_task_backend.clear() + + def test_using_correct_backend(self): + self.assertEqual(default_task_backend, task_backends["default"]) + self.assertIsInstance(task_backends["default"], DummyBackend) + self.assertEqual(default_task_backend.alias, "default") + self.assertEqual(default_task_backend.options, {}) + + def test_enqueue_task(self): + for task in [test_tasks.noop_task, test_tasks.noop_task_async]: + with self.subTest(task): + result = cast(Task, task).enqueue(1, two=3) + + self.assertEqual(result.status, TaskResultStatus.READY) + self.assertIs(result.is_finished, False) + self.assertIsNone(result.started_at) + self.assertIsNone(result.last_attempted_at) + self.assertIsNone(result.finished_at) + with self.assertRaisesMessage(ValueError, "Task has not finished yet"): + result.return_value + self.assertEqual(result.task, task) + self.assertEqual(result.args, [1]) + self.assertEqual(result.kwargs, {"two": 3}) + self.assertEqual(result.attempts, 0) + + self.assertIn(result, default_task_backend.results) + + async def test_enqueue_task_async(self): + for task in [test_tasks.noop_task, test_tasks.noop_task_async]: + with self.subTest(task): + result = await cast(Task, task).aenqueue() + + self.assertEqual(result.status, TaskResultStatus.READY) + self.assertIs(result.is_finished, False) + self.assertIsNone(result.started_at) + self.assertIsNone(result.last_attempted_at) + self.assertIsNone(result.finished_at) + with self.assertRaisesMessage(ValueError, "Task has not finished yet"): + result.return_value + self.assertEqual(result.task, task) + self.assertEqual(result.args, []) + self.assertEqual(result.kwargs, {}) + self.assertEqual(result.attempts, 0) + + self.assertIn(result, default_task_backend.results) + + def test_get_result(self): + result = default_task_backend.enqueue(test_tasks.noop_task, (), {}) + + new_result = default_task_backend.get_result(result.id) + + self.assertEqual(result, new_result) + + async def test_get_result_async(self): + result = await default_task_backend.aenqueue(test_tasks.noop_task, (), {}) + + new_result = await default_task_backend.aget_result(result.id) + + self.assertEqual(result, new_result) + + def test_refresh_result(self): + result = default_task_backend.enqueue( + test_tasks.calculate_meaning_of_life, (), {} + ) + + enqueued_result = default_task_backend.results[0] + object.__setattr__(enqueued_result, "status", TaskResultStatus.SUCCESSFUL) + + self.assertEqual(result.status, TaskResultStatus.READY) + result.refresh() + self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL) + + async def test_refresh_result_async(self): + result = await default_task_backend.aenqueue( + test_tasks.calculate_meaning_of_life, (), {} + ) + + enqueued_result = default_task_backend.results[0] + object.__setattr__(enqueued_result, "status", TaskResultStatus.SUCCESSFUL) + + self.assertEqual(result.status, TaskResultStatus.READY) + await result.arefresh() + self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL) + + async def test_get_missing_result(self): + with self.assertRaises(TaskResultDoesNotExist): + default_task_backend.get_result("123") + + with self.assertRaises(TaskResultDoesNotExist): + await default_task_backend.aget_result("123") + + def test_enqueue_on_commit(self): + self.assertIs( + default_task_backend._get_enqueue_on_commit_for_task( + test_tasks.enqueue_on_commit_task + ), + True, + ) + + def test_enqueue_logs(self): + with self.assertLogs("django.tasks", level="DEBUG") as captured_logs: + result = test_tasks.noop_task.enqueue() + + self.assertEqual(len(captured_logs.output), 1) + self.assertIn("enqueued", captured_logs.output[0]) + self.assertIn(result.id, captured_logs.output[0]) + + def test_errors(self): + result = test_tasks.noop_task.enqueue() + self.assertEqual(result.errors, []) + + def test_validate_disallowed_async_task(self): + with mock.patch.multiple(default_task_backend, supports_async_task=False): + with self.assertRaisesMessage( + InvalidTask, "Backend does not support async Tasks." + ): + default_task_backend.validate_task(test_tasks.noop_task_async) + + def test_check(self): + errors = list(default_task_backend.check()) + self.assertEqual(len(errors), 0, errors) + + @override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.dummy.DummyBackend", + "ENQUEUE_ON_COMMIT": True, + } + } + ) + @mock.patch("django.tasks.backends.base.connections", ConnectionHandler({})) + def test_enqueue_on_commit_with_no_databases(self): + self.assertIn( + "tasks.E001", {error.id for error in default_task_backend.check()} + ) + + def test_takes_context(self): + result = test_tasks.get_task_id.enqueue() + self.assertEqual(result.status, TaskResultStatus.READY) + + def test_clear(self): + result = test_tasks.noop_task.enqueue() + + default_task_backend.get_result(result.id) + + default_task_backend.clear() + + with self.assertRaisesMessage(TaskResultDoesNotExist, result.id): + default_task_backend.get_result(result.id) + + def test_validate_on_enqueue(self): + task_with_custom_queue_name = test_tasks.noop_task.using( + queue_name="unknown_queue" + ) + + with override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.dummy.DummyBackend", + "QUEUES": ["queue-1"], + "ENQUEUE_ON_COMMIT": False, + } + } + ): + with self.assertRaisesMessage( + InvalidTask, "Queue 'unknown_queue' is not valid for backend" + ): + task_with_custom_queue_name.enqueue() + + async def test_validate_on_aenqueue(self): + task_with_custom_queue_name = test_tasks.noop_task.using( + queue_name="unknown_queue" + ) + + with override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.dummy.DummyBackend", + "QUEUES": ["queue-1"], + "ENQUEUE_ON_COMMIT": False, + } + } + ): + with self.assertRaisesMessage( + InvalidTask, "Queue 'unknown_queue' is not valid for backend" + ): + await task_with_custom_queue_name.aenqueue() + + +class DummyBackendTransactionTestCase(TransactionTestCase): + available_apps = [] + + @override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.dummy.DummyBackend", + "ENQUEUE_ON_COMMIT": True, + } + } + ) + def test_wait_until_transaction_commit(self): + self.assertIs(default_task_backend.enqueue_on_commit, True) + self.assertIs( + default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task), + True, + ) + + with transaction.atomic(): + test_tasks.noop_task.enqueue() + + self.assertEqual(len(default_task_backend.results), 0) + + self.assertEqual(len(default_task_backend.results), 1) + + @override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.dummy.DummyBackend", + "ENQUEUE_ON_COMMIT": False, + } + } + ) + def test_doesnt_wait_until_transaction_commit(self): + self.assertIs(default_task_backend.enqueue_on_commit, False) + self.assertIs( + default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task), + False, + ) + + with transaction.atomic(): + result = test_tasks.noop_task.enqueue() + + self.assertIsNotNone(result.enqueued_at) + + self.assertEqual(len(default_task_backend.results), 1) + + self.assertEqual(len(default_task_backend.results), 1) + + @override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.dummy.DummyBackend", + } + } + ) + def test_wait_until_transaction_by_default(self): + self.assertIs(default_task_backend.enqueue_on_commit, True) + self.assertIs( + default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task), + True, + ) + + with transaction.atomic(): + result = test_tasks.noop_task.enqueue() + + self.assertIsNone(result.enqueued_at) + + self.assertEqual(len(default_task_backend.results), 0) + + self.assertEqual(len(default_task_backend.results), 1) + self.assertIsNone(result.enqueued_at) + result.refresh() + self.assertIsNotNone(result.enqueued_at) + + @override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.dummy.DummyBackend", + "ENQUEUE_ON_COMMIT": False, + } + } + ) + def test_task_specific_enqueue_on_commit(self): + self.assertIs(default_task_backend.enqueue_on_commit, False) + self.assertIs(test_tasks.enqueue_on_commit_task.enqueue_on_commit, True) + self.assertIs( + default_task_backend._get_enqueue_on_commit_for_task( + test_tasks.enqueue_on_commit_task + ), + True, + ) + + with transaction.atomic(): + result = test_tasks.enqueue_on_commit_task.enqueue() + + self.assertIsNone(result.enqueued_at) + + self.assertEqual(len(default_task_backend.results), 0) + + self.assertEqual(len(default_task_backend.results), 1) + self.assertIsNone(result.enqueued_at) + result.refresh() + self.assertIsNotNone(result.enqueued_at) + + @override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.dummy.DummyBackend", + "ENQUEUE_ON_COMMIT": True, + } + } + ) + @skipIfDBFeature("supports_transactions") + def test_enqueue_on_commit_with_no_transactions(self): + self.assertIn( + "tasks.E002", {error.id for error in default_task_backend.check()} + ) diff --git a/tests/tasks/test_immediate_backend.py b/tests/tasks/test_immediate_backend.py new file mode 100644 index 0000000000..01e2841aa7 --- /dev/null +++ b/tests/tasks/test_immediate_backend.py @@ -0,0 +1,387 @@ +from django.db import transaction +from django.tasks import TaskResultStatus, default_task_backend, task_backends +from django.tasks.backends.immediate import ImmediateBackend +from django.tasks.exceptions import InvalidTask +from django.test import SimpleTestCase, TransactionTestCase, override_settings +from django.utils import timezone + +from . import tasks as test_tasks + + +@override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.immediate.ImmediateBackend", + "QUEUES": [], + "ENQUEUE_ON_COMMIT": False, + } + } +) +class ImmediateBackendTestCase(SimpleTestCase): + def test_using_correct_backend(self): + self.assertEqual(default_task_backend, task_backends["default"]) + self.assertIsInstance(task_backends["default"], ImmediateBackend) + self.assertEqual(default_task_backend.alias, "default") + self.assertEqual(default_task_backend.options, {}) + + def test_enqueue_task(self): + for task in [test_tasks.noop_task, test_tasks.noop_task_async]: + with self.subTest(task): + result = task.enqueue(1, two=3) + + self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL) + self.assertIs(result.is_finished, True) + self.assertIsNotNone(result.started_at) + self.assertIsNotNone(result.last_attempted_at) + self.assertIsNotNone(result.finished_at) + self.assertGreaterEqual(result.started_at, result.enqueued_at) + self.assertGreaterEqual(result.finished_at, result.started_at) + self.assertIsNone(result.return_value) + self.assertEqual(result.task, task) + self.assertEqual(result.args, [1]) + self.assertEqual(result.kwargs, {"two": 3}) + self.assertEqual(result.attempts, 1) + + async def test_enqueue_task_async(self): + for task in [test_tasks.noop_task, test_tasks.noop_task_async]: + with self.subTest(task): + result = await task.aenqueue() + + self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL) + self.assertIs(result.is_finished, True) + self.assertIsNotNone(result.started_at) + self.assertIsNotNone(result.last_attempted_at) + self.assertIsNotNone(result.finished_at) + self.assertGreaterEqual(result.started_at, result.enqueued_at) + self.assertGreaterEqual(result.finished_at, result.started_at) + self.assertIsNone(result.return_value) + self.assertEqual(result.task, task) + self.assertEqual(result.args, []) + self.assertEqual(result.kwargs, {}) + self.assertEqual(result.attempts, 1) + + def test_catches_exception(self): + test_data = [ + ( + test_tasks.failing_task_value_error, # Task function. + ValueError, # Expected exception. + "This Task failed due to ValueError", # Expected message. + ), + ( + test_tasks.failing_task_system_exit, + SystemExit, + "This Task failed due to SystemExit", + ), + ] + for task, exception, message in test_data: + with ( + self.subTest(task), + self.assertLogs("django.tasks", level="ERROR") as captured_logs, + ): + result = task.enqueue() + + self.assertEqual(len(captured_logs.output), 1) + self.assertIn(message, captured_logs.output[0]) + + self.assertEqual(result.status, TaskResultStatus.FAILED) + with self.assertRaisesMessage(ValueError, "Task failed"): + result.return_value + self.assertIs(result.is_finished, True) + self.assertIsNotNone(result.started_at) + self.assertIsNotNone(result.last_attempted_at) + self.assertIsNotNone(result.finished_at) + self.assertGreaterEqual(result.started_at, result.enqueued_at) + self.assertGreaterEqual(result.finished_at, result.started_at) + self.assertEqual(result.errors[0].exception_class, exception) + traceback = result.errors[0].traceback + self.assertIs( + traceback + and traceback.endswith(f"{exception.__name__}: {message}\n"), + True, + traceback, + ) + self.assertEqual(result.task, task) + self.assertEqual(result.args, []) + self.assertEqual(result.kwargs, {}) + + def test_throws_keyboard_interrupt(self): + with self.assertRaises(KeyboardInterrupt): + with self.assertNoLogs("django.tasks", level="ERROR"): + default_task_backend.enqueue( + test_tasks.failing_task_keyboard_interrupt, [], {} + ) + + def test_complex_exception(self): + with self.assertLogs("django.tasks", level="ERROR"): + result = test_tasks.complex_exception.enqueue() + + self.assertEqual(result.status, TaskResultStatus.FAILED) + with self.assertRaisesMessage(ValueError, "Task failed"): + result.return_value + self.assertIsNotNone(result.started_at) + self.assertIsNotNone(result.last_attempted_at) + self.assertIsNotNone(result.finished_at) + self.assertGreaterEqual(result.started_at, result.enqueued_at) + self.assertGreaterEqual(result.finished_at, result.started_at) + + self.assertIsNone(result._return_value) + self.assertEqual(result.errors[0].exception_class, ValueError) + self.assertIn( + 'ValueError(ValueError("This task failed"))', result.errors[0].traceback + ) + + self.assertEqual(result.task, test_tasks.complex_exception) + self.assertEqual(result.args, []) + self.assertEqual(result.kwargs, {}) + + def test_complex_return_value(self): + with self.assertLogs("django.tasks", level="ERROR"): + result = test_tasks.complex_return_value.enqueue() + + self.assertEqual(result.status, TaskResultStatus.FAILED) + self.assertIsNotNone(result.started_at) + self.assertIsNotNone(result.last_attempted_at) + self.assertIsNotNone(result.finished_at) + self.assertGreaterEqual(result.started_at, result.enqueued_at) + self.assertGreaterEqual(result.finished_at, result.started_at) + self.assertIsNone(result._return_value) + self.assertEqual(result.errors[0].exception_class, TypeError) + self.assertIn("Unsupported type", result.errors[0].traceback) + + def test_result(self): + result = default_task_backend.enqueue( + test_tasks.calculate_meaning_of_life, [], {} + ) + + self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL) + self.assertEqual(result.return_value, 42) + + async def test_result_async(self): + result = await default_task_backend.aenqueue( + test_tasks.calculate_meaning_of_life, [], {} + ) + + self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL) + self.assertEqual(result.return_value, 42) + + async def test_cannot_get_result(self): + with self.assertRaisesMessage( + NotImplementedError, + "This backend does not support retrieving or refreshing results.", + ): + default_task_backend.get_result("123") + + with self.assertRaisesMessage( + NotImplementedError, + "This backend does not support retrieving or refreshing results.", + ): + await default_task_backend.aget_result(123) + + async def test_cannot_refresh_result(self): + result = await default_task_backend.aenqueue( + test_tasks.calculate_meaning_of_life, (), {} + ) + + with self.assertRaisesMessage( + NotImplementedError, + "This backend does not support retrieving or refreshing results.", + ): + await result.arefresh() + + with self.assertRaisesMessage( + NotImplementedError, + "This backend does not support retrieving or refreshing results.", + ): + result.refresh() + + def test_cannot_pass_run_after(self): + with self.assertRaisesMessage( + InvalidTask, + "Backend does not support run_after.", + ): + default_task_backend.validate_task( + test_tasks.failing_task_value_error.using(run_after=timezone.now()) + ) + + def test_enqueue_on_commit(self): + self.assertIs( + default_task_backend._get_enqueue_on_commit_for_task( + test_tasks.enqueue_on_commit_task + ), + True, + ) + + def test_enqueue_logs(self): + with self.assertLogs("django.tasks", level="DEBUG") as captured_logs: + result = test_tasks.noop_task.enqueue() + + self.assertEqual(len(captured_logs.output), 3) + + self.assertIn("enqueued", captured_logs.output[0]) + self.assertIn(result.id, captured_logs.output[0]) + + self.assertIn("state=RUNNING", captured_logs.output[1]) + self.assertIn(result.id, captured_logs.output[1]) + + self.assertIn("state=SUCCESSFUL", captured_logs.output[2]) + self.assertIn(result.id, captured_logs.output[2]) + + def test_failed_logs(self): + with self.assertLogs("django.tasks", level="DEBUG") as captured_logs: + result = test_tasks.failing_task_value_error.enqueue() + + self.assertEqual(len(captured_logs.output), 3) + self.assertIn("state=RUNNING", captured_logs.output[1]) + self.assertIn(result.id, captured_logs.output[1]) + + self.assertIn("state=FAILED", captured_logs.output[2]) + self.assertIn(result.id, captured_logs.output[2]) + + def test_takes_context(self): + result = test_tasks.get_task_id.enqueue() + + self.assertEqual(result.return_value, result.id) + + def test_context(self): + result = test_tasks.test_context.enqueue(1) + self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL) + + def test_validate_on_enqueue(self): + task_with_custom_queue_name = test_tasks.noop_task.using( + queue_name="unknown_queue" + ) + + with override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.immediate.ImmediateBackend", + "QUEUES": ["queue-1"], + "ENQUEUE_ON_COMMIT": False, + } + } + ): + with self.assertRaisesMessage( + InvalidTask, "Queue 'unknown_queue' is not valid for backend" + ): + task_with_custom_queue_name.enqueue() + + async def test_validate_on_aenqueue(self): + task_with_custom_queue_name = test_tasks.noop_task.using( + queue_name="unknown_queue" + ) + + with override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.immediate.ImmediateBackend", + "QUEUES": ["queue-1"], + "ENQUEUE_ON_COMMIT": False, + } + } + ): + with self.assertRaisesMessage( + InvalidTask, "Queue 'unknown_queue' is not valid for backend" + ): + await task_with_custom_queue_name.aenqueue() + + +class ImmediateBackendTransactionTestCase(TransactionTestCase): + available_apps = [] + + @override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.immediate.ImmediateBackend", + "ENQUEUE_ON_COMMIT": True, + } + } + ) + def test_wait_until_transaction_commit(self): + self.assertIs(default_task_backend.enqueue_on_commit, True) + self.assertIs( + default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task), + True, + ) + + with transaction.atomic(): + result = test_tasks.noop_task.enqueue() + + self.assertIsNone(result.enqueued_at) + self.assertEqual(result.attempts, 0) + self.assertEqual(result.status, TaskResultStatus.READY) + + self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL) + self.assertIsNotNone(result.enqueued_at) + self.assertEqual(result.attempts, 1) + + @override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.immediate.ImmediateBackend", + "ENQUEUE_ON_COMMIT": False, + } + } + ) + def test_doesnt_wait_until_transaction_commit(self): + self.assertIs(default_task_backend.enqueue_on_commit, False) + self.assertIs( + default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task), + False, + ) + + with transaction.atomic(): + result = test_tasks.noop_task.enqueue() + + self.assertIsNotNone(result.enqueued_at) + + self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL) + + self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL) + + @override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.immediate.ImmediateBackend", + } + } + ) + def test_wait_until_transaction_by_default(self): + self.assertIs(default_task_backend.enqueue_on_commit, True) + self.assertIs( + default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task), + True, + ) + + with transaction.atomic(): + result = test_tasks.noop_task.enqueue() + + self.assertIsNone(result.enqueued_at) + self.assertEqual(result.status, TaskResultStatus.READY) + + self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL) + + @override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.immediate.ImmediateBackend", + "ENQUEUE_ON_COMMIT": False, + } + } + ) + def test_task_specific_enqueue_on_commit(self): + self.assertIs(default_task_backend.enqueue_on_commit, False) + self.assertIs(test_tasks.enqueue_on_commit_task.enqueue_on_commit, True) + self.assertIs( + default_task_backend._get_enqueue_on_commit_for_task( + test_tasks.enqueue_on_commit_task + ), + True, + ) + + with transaction.atomic(): + result = test_tasks.enqueue_on_commit_task.enqueue() + + self.assertIsNone(result.enqueued_at) + self.assertEqual(result.status, TaskResultStatus.READY) + + self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL) diff --git a/tests/tasks/test_tasks.py b/tests/tasks/test_tasks.py new file mode 100644 index 0000000000..2c11d9657d --- /dev/null +++ b/tests/tasks/test_tasks.py @@ -0,0 +1,316 @@ +import dataclasses +from datetime import datetime + +from django.tasks import ( + DEFAULT_TASK_QUEUE_NAME, + TaskResultStatus, + default_task_backend, + task, + task_backends, +) +from django.tasks.backends.dummy import DummyBackend +from django.tasks.backends.immediate import ImmediateBackend +from django.tasks.base import TASK_MAX_PRIORITY, TASK_MIN_PRIORITY, Task +from django.tasks.exceptions import ( + InvalidTask, + InvalidTaskBackend, + TaskResultDoesNotExist, + TaskResultMismatch, +) +from django.test import SimpleTestCase, override_settings +from django.utils import timezone +from django.utils.module_loading import import_string + +from . import tasks as test_tasks + + +@override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.dummy.DummyBackend", + "QUEUES": ["default", "queue_1"], + "ENQUEUE_ON_COMMIT": False, + }, + "immediate": { + "BACKEND": "django.tasks.backends.immediate.ImmediateBackend", + "ENQUEUE_ON_COMMIT": False, + "QUEUES": [], + }, + "missing": {"BACKEND": "does.not.exist"}, + }, + USE_TZ=True, +) +class TaskTestCase(SimpleTestCase): + def setUp(self): + default_task_backend.clear() + + def test_using_correct_backend(self): + self.assertEqual(default_task_backend, task_backends["default"]) + self.assertIsInstance(task_backends["default"], DummyBackend) + + def test_task_decorator(self): + self.assertIsInstance(test_tasks.noop_task, Task) + self.assertIsInstance(test_tasks.noop_task_async, Task) + self.assertIsInstance(test_tasks.noop_task_from_bare_decorator, Task) + + def test_enqueue_task(self): + result = test_tasks.noop_task.enqueue() + + self.assertEqual(result.status, TaskResultStatus.READY) + self.assertEqual(result.task, test_tasks.noop_task) + self.assertEqual(result.args, []) + self.assertEqual(result.kwargs, {}) + + self.assertEqual(default_task_backend.results, [result]) + + async def test_enqueue_task_async(self): + result = await test_tasks.noop_task.aenqueue() + + self.assertEqual(result.status, TaskResultStatus.READY) + self.assertEqual(result.task, test_tasks.noop_task) + self.assertEqual(result.args, []) + self.assertEqual(result.kwargs, {}) + + self.assertEqual(default_task_backend.results, [result]) + + def test_enqueue_with_invalid_argument(self): + with self.assertRaisesMessage(TypeError, "Unsupported type"): + test_tasks.noop_task.enqueue(datetime.now()) + + async def test_aenqueue_with_invalid_argument(self): + with self.assertRaisesMessage(TypeError, "Unsupported type"): + await test_tasks.noop_task.aenqueue(datetime.now()) + + def test_using_priority(self): + self.assertEqual(test_tasks.noop_task.priority, 0) + self.assertEqual(test_tasks.noop_task.using(priority=1).priority, 1) + self.assertEqual(test_tasks.noop_task.priority, 0) + + def test_using_queue_name(self): + self.assertEqual(test_tasks.noop_task.queue_name, DEFAULT_TASK_QUEUE_NAME) + self.assertEqual( + test_tasks.noop_task.using(queue_name="queue_1").queue_name, "queue_1" + ) + self.assertEqual(test_tasks.noop_task.queue_name, DEFAULT_TASK_QUEUE_NAME) + + def test_using_run_after(self): + now = timezone.now() + + self.assertIsNone(test_tasks.noop_task.run_after) + self.assertEqual(test_tasks.noop_task.using(run_after=now).run_after, now) + self.assertIsNone(test_tasks.noop_task.run_after) + + def test_using_unknown_backend(self): + self.assertEqual(test_tasks.noop_task.backend, "default") + + with self.assertRaisesMessage( + InvalidTaskBackend, "The connection 'unknown' doesn't exist." + ): + test_tasks.noop_task.using(backend="unknown") + + def test_using_missing_backend(self): + self.assertEqual(test_tasks.noop_task.backend, "default") + + with self.assertRaisesMessage( + InvalidTaskBackend, + "Could not find backend 'does.not.exist': No module named 'does'", + ): + test_tasks.noop_task.using(backend="missing") + + def test_using_creates_new_instance(self): + new_task = test_tasks.noop_task.using() + + self.assertEqual(new_task, test_tasks.noop_task) + self.assertIsNot(new_task, test_tasks.noop_task) + + def test_chained_using(self): + now = timezone.now() + + run_after_task = test_tasks.noop_task.using(run_after=now) + self.assertEqual(run_after_task.run_after, now) + + priority_task = run_after_task.using(priority=10) + self.assertEqual(priority_task.priority, 10) + self.assertEqual(priority_task.run_after, now) + + self.assertEqual(run_after_task.priority, 0) + + async def test_refresh_result(self): + result = await test_tasks.noop_task.aenqueue() + + original_result = dataclasses.asdict(result) + + result.refresh() + + self.assertEqual(dataclasses.asdict(result), original_result) + + await result.arefresh() + + self.assertEqual(dataclasses.asdict(result), original_result) + + def test_naive_datetime(self): + with self.assertRaisesMessage( + InvalidTask, "run_after must be an aware datetime." + ): + test_tasks.noop_task.using(run_after=datetime.now()) + + def test_invalid_priority(self): + with self.assertRaisesMessage( + InvalidTask, + f"priority must be a whole number between {TASK_MIN_PRIORITY} and " + f"{TASK_MAX_PRIORITY}.", + ): + test_tasks.noop_task.using(priority=-101) + + with self.assertRaisesMessage( + InvalidTask, + f"priority must be a whole number between {TASK_MIN_PRIORITY} and " + f"{TASK_MAX_PRIORITY}.", + ): + test_tasks.noop_task.using(priority=101) + + with self.assertRaisesMessage( + InvalidTask, + f"priority must be a whole number between {TASK_MIN_PRIORITY} and " + f"{TASK_MAX_PRIORITY}.", + ): + test_tasks.noop_task.using(priority=3.1) + + test_tasks.noop_task.using(priority=100) + test_tasks.noop_task.using(priority=-100) + test_tasks.noop_task.using(priority=0) + + def test_unknown_queue_name(self): + with self.assertRaisesMessage( + InvalidTask, "Queue 'queue-2' is not valid for backend." + ): + test_tasks.noop_task.using(queue_name="queue-2") + # Validation is bypassed when the backend QUEUES is an empty list. + self.assertEqual( + test_tasks.noop_task.using( + queue_name="queue-2", backend="immediate" + ).queue_name, + "queue-2", + ) + + def test_call_task(self): + self.assertEqual(test_tasks.calculate_meaning_of_life.call(), 42) + + async def test_call_task_async(self): + self.assertEqual(await test_tasks.calculate_meaning_of_life.acall(), 42) + + async def test_call_async_task(self): + self.assertIsNone(await test_tasks.noop_task_async.acall()) + + def test_call_async_task_sync(self): + self.assertIsNone(test_tasks.noop_task_async.call()) + + def test_get_result(self): + result = default_task_backend.enqueue(test_tasks.noop_task, (), {}) + + new_result = test_tasks.noop_task.get_result(result.id) + + self.assertEqual(result, new_result) + + async def test_get_result_async(self): + result = await default_task_backend.aenqueue(test_tasks.noop_task, (), {}) + + new_result = await test_tasks.noop_task.aget_result(result.id) + + self.assertEqual(result, new_result) + + async def test_get_missing_result(self): + with self.assertRaises(TaskResultDoesNotExist): + test_tasks.noop_task.get_result("123") + + with self.assertRaises(TaskResultDoesNotExist): + await test_tasks.noop_task.aget_result("123") + + def test_get_incorrect_result(self): + result = default_task_backend.enqueue(test_tasks.noop_task_async, (), {}) + with self.assertRaisesMessage(TaskResultMismatch, "Task does not match"): + test_tasks.noop_task.get_result(result.id) + + async def test_get_incorrect_result_async(self): + result = await default_task_backend.aenqueue(test_tasks.noop_task_async, (), {}) + with self.assertRaisesMessage(TaskResultMismatch, "Task does not match"): + await test_tasks.noop_task.aget_result(result.id) + + def test_invalid_function(self): + for invalid_function in [any, self.test_invalid_function]: + with self.subTest(invalid_function): + with self.assertRaisesMessage( + InvalidTask, + "Task function must be defined at a module level.", + ): + task()(invalid_function) + + def test_get_backend(self): + self.assertEqual(test_tasks.noop_task.backend, "default") + self.assertIsInstance(test_tasks.noop_task.get_backend(), DummyBackend) + + immediate_task = test_tasks.noop_task.using(backend="immediate") + self.assertEqual(immediate_task.backend, "immediate") + self.assertIsInstance(immediate_task.get_backend(), ImmediateBackend) + + def test_name(self): + self.assertEqual(test_tasks.noop_task.name, "noop_task") + self.assertEqual(test_tasks.noop_task_async.name, "noop_task_async") + + def test_module_path(self): + self.assertEqual(test_tasks.noop_task.module_path, "tasks.tasks.noop_task") + self.assertEqual( + test_tasks.noop_task_async.module_path, "tasks.tasks.noop_task_async" + ) + + self.assertIs( + import_string(test_tasks.noop_task.module_path), test_tasks.noop_task + ) + self.assertIs( + import_string(test_tasks.noop_task_async.module_path), + test_tasks.noop_task_async, + ) + + @override_settings(TASKS={}) + def test_no_backends(self): + with self.assertRaises(InvalidTaskBackend): + test_tasks.noop_task.enqueue() + + def test_task_error_invalid_exception(self): + with self.assertLogs("django.tasks"): + immediate_task = test_tasks.failing_task_value_error.using( + backend="immediate" + ).enqueue() + + self.assertEqual(len(immediate_task.errors), 1) + + object.__setattr__( + immediate_task.errors[0], "exception_class_path", "subprocess.run" + ) + + with self.assertRaisesMessage( + ValueError, "'subprocess.run' does not reference a valid exception." + ): + immediate_task.errors[0].exception_class + + def test_task_error_unknown_module(self): + with self.assertLogs("django.tasks"): + immediate_task = test_tasks.failing_task_value_error.using( + backend="immediate" + ).enqueue() + + self.assertEqual(len(immediate_task.errors), 1) + + object.__setattr__( + immediate_task.errors[0], "exception_class_path", "does.not.exist" + ) + + with self.assertRaises(ImportError): + immediate_task.errors[0].exception_class + + def test_takes_context_without_taking_context(self): + with self.assertRaisesMessage( + InvalidTask, + "Task takes context but does not have a first argument of 'context'.", + ): + task(takes_context=True)(test_tasks.calculate_meaning_of_life.func) diff --git a/tests/utils_tests/test_inspect.py b/tests/utils_tests/test_inspect.py index b8359c2508..38ea35ecfb 100644 --- a/tests/utils_tests/test_inspect.py +++ b/tests/utils_tests/test_inspect.py @@ -1,5 +1,7 @@ +import subprocess import unittest +from django.shortcuts import aget_object_or_404 from django.utils import inspect @@ -100,3 +102,50 @@ class TestInspectMethods(unittest.TestCase): self.assertIs(inspect.func_accepts_kwargs(Person().just_args), False) self.assertIs(inspect.func_accepts_kwargs(Person.all_kinds), True) self.assertIs(inspect.func_accepts_kwargs(Person().just_args), False) + + +class IsModuleLevelFunctionTestCase(unittest.TestCase): + @classmethod + def _class_method(cls) -> None: + return None + + @staticmethod + def _static_method() -> None: + return None + + def test_builtin(self): + self.assertIs(inspect.is_module_level_function(any), False) + self.assertIs(inspect.is_module_level_function(isinstance), False) + + def test_from_module(self): + self.assertIs(inspect.is_module_level_function(subprocess.run), True) + self.assertIs(inspect.is_module_level_function(subprocess.check_output), True) + self.assertIs( + inspect.is_module_level_function(inspect.is_module_level_function), True + ) + + def test_private_function(self): + def private_function(): + pass + + self.assertIs(inspect.is_module_level_function(private_function), False) + + def test_coroutine(self): + self.assertIs(inspect.is_module_level_function(aget_object_or_404), True) + + def test_method(self): + self.assertIs(inspect.is_module_level_function(self.test_method), False) + self.assertIs(inspect.is_module_level_function(self.setUp), False) + + def test_unbound_method(self): + self.assertIs( + inspect.is_module_level_function(self.__class__.test_unbound_method), True + ) + self.assertIs(inspect.is_module_level_function(self.__class__.setUp), True) + + def test_lambda(self): + self.assertIs(inspect.is_module_level_function(lambda: True), False) + + def test_class_and_static_method(self): + self.assertIs(inspect.is_module_level_function(self._static_method), True) + self.assertIs(inspect.is_module_level_function(self._class_method), False) diff --git a/tests/utils_tests/test_json.py b/tests/utils_tests/test_json.py new file mode 100644 index 0000000000..9d137149ed --- /dev/null +++ b/tests/utils_tests/test_json.py @@ -0,0 +1,46 @@ +import json +from collections import UserList, defaultdict +from datetime import datetime +from decimal import Decimal + +from django.test import SimpleTestCase +from django.utils.json import normalize_json + + +class JSONNormalizeTestCase(SimpleTestCase): + def test_converts_json_types(self): + for test_case, expected in [ + (None, "null"), + (True, "true"), + (False, "false"), + (2, "2"), + (3.0, "3.0"), + (1e23 + 1, "1e+23"), + ("1", '"1"'), + (b"hello", '"hello"'), + ([], "[]"), + (UserList([1, 2]), "[1, 2]"), + ({}, "{}"), + ({1: "a"}, '{"1": "a"}'), + ({"foo": (1, 2, 3)}, '{"foo": [1, 2, 3]}'), + (defaultdict(list), "{}"), + (float("nan"), "NaN"), + (float("inf"), "Infinity"), + (float("-inf"), "-Infinity"), + ]: + with self.subTest(test_case): + normalized = normalize_json(test_case) + # Ensure that the normalized result is serializable. + self.assertEqual(json.dumps(normalized), expected) + + def test_bytes_decode_error(self): + with self.assertRaisesMessage(ValueError, "Unsupported value"): + normalize_json(b"\xff") + + def test_encode_error(self): + for test_case in [self, any, object(), datetime.now(), set(), Decimal("3.42")]: + with ( + self.subTest(test_case), + self.assertRaisesMessage(TypeError, "Unsupported type"), + ): + normalize_json(test_case)