From e83a88566a71a2353cebc35992c110be0f8628af Mon Sep 17 00:00:00 2001 From: Jon Janzen Date: Sat, 7 Nov 2020 13:19:20 +0300 Subject: [PATCH] Fixed #32172 -- Adapted signals to allow async handlers. co-authored-by: kozzztik co-authored-by: Carlton Gibson --- django/core/handlers/asgi.py | 4 +- django/dispatch/dispatcher.py | 233 +++++++++++++++++++++++++++--- django/test/client.py | 4 +- django/test/signals.py | 2 +- docs/releases/5.0.txt | 4 +- docs/topics/async.txt | 2 + docs/topics/signals.txt | 65 ++++++++- tests/postgres_tests/test_apps.py | 6 +- tests/signals/tests.py | 93 ++++++++++++ 9 files changed, 370 insertions(+), 43 deletions(-) diff --git a/django/core/handlers/asgi.py b/django/core/handlers/asgi.py index 39e2abe5a9..569157b277 100644 --- a/django/core/handlers/asgi.py +++ b/django/core/handlers/asgi.py @@ -170,9 +170,7 @@ class ASGIHandler(base.BaseHandler): return # Request is complete and can be served. set_script_prefix(self.get_script_prefix(scope)) - await sync_to_async(signals.request_started.send, thread_sensitive=True)( - sender=self.__class__, scope=scope - ) + await signals.request_started.asend(sender=self.__class__, scope=scope) # Get the request and check for basic issues. request, error_response = self.create_request(scope, body_file) if request is None: diff --git a/django/dispatch/dispatcher.py b/django/dispatch/dispatcher.py index 86eb1c3b20..26ef09ce49 100644 --- a/django/dispatch/dispatcher.py +++ b/django/dispatch/dispatcher.py @@ -1,7 +1,10 @@ +import asyncio import logging import threading import weakref +from asgiref.sync import async_to_sync, iscoroutinefunction, sync_to_async + from django.utils.inspect import func_accepts_kwargs logger = logging.getLogger("django.dispatch") @@ -52,7 +55,8 @@ class Signal: receiver A function or an instance method which is to receive signals. - Receivers must be hashable objects. + Receivers must be hashable objects. Receivers can be + asynchronous. If weak is True, then receiver must be weak referenceable. @@ -94,6 +98,8 @@ class Signal: else: lookup_key = (_make_id(receiver), _make_id(sender)) + is_async = iscoroutinefunction(receiver) + if weak: ref = weakref.ref receiver_object = receiver @@ -106,8 +112,8 @@ class Signal: with self.lock: self._clear_dead_receivers() - if not any(r_key == lookup_key for r_key, _ in self.receivers): - self.receivers.append((lookup_key, receiver)) + if not any(r_key == lookup_key for r_key, _, _ in self.receivers): + self.receivers.append((lookup_key, receiver, is_async)) self.sender_receivers_cache.clear() def disconnect(self, receiver=None, sender=None, dispatch_uid=None): @@ -138,7 +144,7 @@ class Signal: with self.lock: self._clear_dead_receivers() for index in range(len(self.receivers)): - (r_key, _) = self.receivers[index] + r_key, *_ = self.receivers[index] if r_key == lookup_key: disconnected = True del self.receivers[index] @@ -147,7 +153,8 @@ class Signal: return disconnected def has_listeners(self, sender=None): - return bool(self._live_receivers(sender)) + sync_receivers, async_receivers = self._live_receivers(sender) + return bool(sync_receivers) or bool(async_receivers) def send(self, sender, **named): """ @@ -157,6 +164,10 @@ class Signal: terminating the dispatch loop. So it's possible that all receivers won't be called if an error is raised. + If any receivers are asynchronous, they are called after all the + synchronous receivers via a single call to async_to_sync(). They are + also executed concurrently with asyncio.gather(). + Arguments: sender @@ -172,16 +183,97 @@ class Signal: or self.sender_receivers_cache.get(sender) is NO_RECEIVERS ): return [] + responses = [] + sync_receivers, async_receivers = self._live_receivers(sender) + for receiver in sync_receivers: + response = receiver(signal=self, sender=sender, **named) + responses.append((receiver, response)) + if async_receivers: - return [ - (receiver, receiver(signal=self, sender=sender, **named)) - for receiver in self._live_receivers(sender) - ] + async def asend(): + async_responses = await asyncio.gather( + *( + receiver(signal=self, sender=sender, **named) + for receiver in async_receivers + ) + ) + return zip(async_receivers, async_responses) + + responses.extend(async_to_sync(asend)()) + return responses + + async def asend(self, sender, **named): + """ + Send signal from sender to all connected receivers in async mode. + + All sync receivers will be wrapped by sync_to_async() + If any receiver raises an error, the error propagates back through + send, terminating the dispatch loop. So it's possible that all + receivers won't be called if an error is raised. + + If any receivers are synchronous, they are grouped and called behind a + sync_to_async() adaption before executing any asynchronous receivers. + + If any receivers are asynchronous, they are grouped and executed + concurrently with asyncio.gather(). + + Arguments: + + sender + The sender of the signal. Either a specific object or None. + + named + Named arguments which will be passed to receivers. + + Return a list of tuple pairs [(receiver, response), ...]. + """ + if ( + not self.receivers + or self.sender_receivers_cache.get(sender) is NO_RECEIVERS + ): + return [] + sync_receivers, async_receivers = self._live_receivers(sender) + if sync_receivers: + + @sync_to_async + def sync_send(): + responses = [] + for receiver in sync_receivers: + response = receiver(signal=self, sender=sender, **named) + responses.append((receiver, response)) + return responses + + else: + sync_send = list + + responses, async_responses = await asyncio.gather( + sync_send(), + asyncio.gather( + *( + receiver(signal=self, sender=sender, **named) + for receiver in async_receivers + ) + ), + ) + responses.extend(zip(async_receivers, async_responses)) + return responses + + def _log_robust_failure(self, receiver, err): + logger.error( + "Error calling %s in Signal.send_robust() (%s)", + receiver.__qualname__, + err, + exc_info=err, + ) def send_robust(self, sender, **named): """ Send signal from sender to all connected receivers catching errors. + If any receivers are asynchronous, they are called after all the + synchronous receivers via a single call to async_to_sync(). They are + also executed concurrently with asyncio.gather(). + Arguments: sender @@ -206,19 +298,105 @@ class Signal: # Call each receiver with whatever arguments it can accept. # Return a list of tuple pairs [(receiver, response), ... ]. responses = [] - for receiver in self._live_receivers(sender): + sync_receivers, async_receivers = self._live_receivers(sender) + for receiver in sync_receivers: try: response = receiver(signal=self, sender=sender, **named) except Exception as err: - logger.error( - "Error calling %s in Signal.send_robust() (%s)", - receiver.__qualname__, - err, - exc_info=err, - ) + self._log_robust_failure(receiver, err) responses.append((receiver, err)) else: responses.append((receiver, response)) + if async_receivers: + + async def asend_and_wrap_exception(receiver): + try: + response = await receiver(signal=self, sender=sender, **named) + except Exception as err: + self._log_robust_failure(receiver, err) + return err + return response + + async def asend(): + async_responses = await asyncio.gather( + *( + asend_and_wrap_exception(receiver) + for receiver in async_receivers + ) + ) + return zip(async_receivers, async_responses) + + responses.extend(async_to_sync(asend)()) + return responses + + async def asend_robust(self, sender, **named): + """ + Send signal from sender to all connected receivers catching errors. + + If any receivers are synchronous, they are grouped and called behind a + sync_to_async() adaption before executing any asynchronous receivers. + + If any receivers are asynchronous, they are grouped and executed + concurrently with asyncio.gather. + + Arguments: + + sender + The sender of the signal. Can be any Python object (normally one + registered with a connect if you actually want something to + occur). + + named + Named arguments which will be passed to receivers. + + Return a list of tuple pairs [(receiver, response), ... ]. + + If any receiver raises an error (specifically any subclass of + Exception), return the error instance as the result for that receiver. + """ + if ( + not self.receivers + or self.sender_receivers_cache.get(sender) is NO_RECEIVERS + ): + return [] + + # Call each receiver with whatever arguments it can accept. + # Return a list of tuple pairs [(receiver, response), ... ]. + sync_receivers, async_receivers = self._live_receivers(sender) + + if sync_receivers: + + @sync_to_async + def sync_send(): + responses = [] + for receiver in sync_receivers: + try: + response = receiver(signal=self, sender=sender, **named) + except Exception as err: + self._log_robust_failure(receiver, err) + responses.append((receiver, err)) + else: + responses.append((receiver, response)) + return responses + + else: + sync_send = list + + async def asend_and_wrap_exception(receiver): + try: + response = await receiver(signal=self, sender=sender, **named) + except Exception as err: + self._log_robust_failure(receiver, err) + return err + return response + + responses, async_responses = await asyncio.gather( + sync_send(), + asyncio.gather( + *(asend_and_wrap_exception(receiver) for receiver in async_receivers), + ), + ) + responses.extend(zip(async_receivers, async_responses)) return responses def _clear_dead_receivers(self): @@ -244,31 +422,38 @@ class Signal: # We could end up here with NO_RECEIVERS even if we do check this case in # .send() prior to calling _live_receivers() due to concurrent .send() call. if receivers is NO_RECEIVERS: - return [] + return [], [] if receivers is None: with self.lock: self._clear_dead_receivers() senderkey = _make_id(sender) receivers = [] - for (receiverkey, r_senderkey), receiver in self.receivers: + for (_receiverkey, r_senderkey), receiver, is_async in self.receivers: if r_senderkey == NONE_ID or r_senderkey == senderkey: - receivers.append(receiver) + receivers.append((receiver, is_async)) if self.use_caching: if not receivers: self.sender_receivers_cache[sender] = NO_RECEIVERS else: # Note, we must cache the weakref versions. self.sender_receivers_cache[sender] = receivers - non_weak_receivers = [] - for receiver in receivers: + non_weak_sync_receivers = [] + non_weak_async_receivers = [] + for receiver, is_async in receivers: if isinstance(receiver, weakref.ReferenceType): # Dereference the weak reference. receiver = receiver() if receiver is not None: - non_weak_receivers.append(receiver) + if is_async: + non_weak_async_receivers.append(receiver) + else: + non_weak_sync_receivers.append(receiver) else: - non_weak_receivers.append(receiver) - return non_weak_receivers + if is_async: + non_weak_async_receivers.append(receiver) + else: + non_weak_sync_receivers.append(receiver) + return non_weak_sync_receivers, non_weak_async_receivers def _remove_receiver(self, receiver=None): # Mark that the self.receivers list has dead weakrefs. If so, we will diff --git a/django/test/client.py b/django/test/client.py index cf63265faa..6e1dbb5be5 100644 --- a/django/test/client.py +++ b/django/test/client.py @@ -219,9 +219,7 @@ class AsyncClientHandler(BaseHandler): body_file = FakePayload("") request_started.disconnect(close_old_connections) - await sync_to_async(request_started.send, thread_sensitive=False)( - sender=self.__class__, scope=scope - ) + await request_started.asend(sender=self.__class__, scope=scope) request_started.connect(close_old_connections) # Wrap FakePayload body_file to allow large read() in test environment. request = ASGIRequest(scope, LimitedStream(body_file, len(body_file))) diff --git a/django/test/signals.py b/django/test/signals.py index 94a5161e82..c16f4aa5ee 100644 --- a/django/test/signals.py +++ b/django/test/signals.py @@ -183,7 +183,7 @@ def complex_setting_changed(*, enter, setting, **kwargs): # this stacklevel shows the line containing the override_settings call. warnings.warn( f"Overriding setting {setting} can lead to unexpected behavior.", - stacklevel=6, + stacklevel=5, ) diff --git a/docs/releases/5.0.txt b/docs/releases/5.0.txt index c7546338ef..83cb6284a5 100644 --- a/docs/releases/5.0.txt +++ b/docs/releases/5.0.txt @@ -218,7 +218,9 @@ Serialization Signals ~~~~~~~ -* ... +* The new :meth:`.Signal.asend` and :meth:`.Signal.asend_robust` methods allow + asynchronous signal dispatch. Signal receivers may be synchronous or + asynchronous, and will be automatically adapted to the correct calling style. Templates ~~~~~~~~~ diff --git a/docs/topics/async.txt b/docs/topics/async.txt index 2541ab8e05..96220b97c1 100644 --- a/docs/topics/async.txt +++ b/docs/topics/async.txt @@ -108,6 +108,8 @@ synchronous function and call it using :func:`sync_to_async`. Asynchronous model and related manager interfaces were added. +.. _async_performance: + Performance ----------- diff --git a/docs/topics/signals.txt b/docs/topics/signals.txt index 601634c309..a4d973ebb4 100644 --- a/docs/topics/signals.txt +++ b/docs/topics/signals.txt @@ -96,6 +96,21 @@ This would be wrong -- in fact, Django will throw an error if you do so. That's because at any point arguments could get added to the signal and your receiver must be able to handle those new arguments. +Receivers may also be asynchronous functions, with the same signature but +declared using ``async def``:: + + async def my_callback(sender, **kwargs): + await asyncio.sleep(5) + print("Request finished!") + +Signals can be sent either synchronously or asynchronously, and receivers will +automatically be adapted to the correct call-style. See :ref:`sending signals +` for more information. + +.. versionchanged:: 5.0 + + Support for asynchronous receivers was added. + .. _connecting-receiver-functions: Connecting receiver functions @@ -248,18 +263,26 @@ For example:: This declares a ``pizza_done`` signal. +.. _sending-signals: + Sending signals --------------- -There are two ways to send signals in Django. +There are two ways to send signals synchronously in Django. .. method:: Signal.send(sender, **kwargs) .. method:: Signal.send_robust(sender, **kwargs) -To send a signal, call either :meth:`Signal.send` (all built-in signals use -this) or :meth:`Signal.send_robust`. You must provide the ``sender`` argument -(which is a class most of the time) and may provide as many other keyword -arguments as you like. +Signals may also be sent asynchronously. + +.. method:: Signal.asend(sender, **kwargs) +.. method:: Signal.asend_robust(sender, **kwargs) + +To send a signal, call either :meth:`Signal.send`, :meth:`Signal.send_robust`, +:meth:`await Signal.asend()`, or +:meth:`await Signal.asend_robust() `. You must provide the +``sender`` argument (which is a class most of the time) and may provide as many +other keyword arguments as you like. For example, here's how sending our ``pizza_done`` signal might look:: @@ -270,9 +293,8 @@ For example, here's how sending our ``pizza_done`` signal might look:: pizza_done.send(sender=self.__class__, toppings=toppings, size=size) ... -Both ``send()`` and ``send_robust()`` return a list of tuple pairs -``[(receiver, response), ... ]``, representing the list of called receiver -functions and their response values. +All four methods return a list of tuple pairs ``[(receiver, response), ...]``, +representing the list of called receiver functions and their response values. ``send()`` differs from ``send_robust()`` in how exceptions raised by receiver functions are handled. ``send()`` does *not* catch any exceptions raised by @@ -286,6 +308,33 @@ error instance is returned in the tuple pair for the receiver that raised the er The tracebacks are present on the ``__traceback__`` attribute of the errors returned when calling ``send_robust()``. +``asend()`` is similar as ``send()``, but it is coroutine that must be +awaited:: + + async def asend_pizza(self, toppings, size): + await pizza_done.asend(sender=self.__class__, toppings=toppings, size=size) + ... + +Whether synchronous or asynchronous, receivers will be correctly adapted to +whether ``send()`` or ``asend()`` is used. Synchronous receivers will be +called using :func:`~.sync_to_async` when invoked via ``asend()``. Asynchronous +receivers will be called using :func:`~.async_to_sync` when invoked via +``sync()``. Similar to the :ref:`case for middleware `, +there is a small performance cost to adapting receivers in this way. Note that +in order to reduce the number of sync/async calling-style switches within a +``send()`` or ``asend()`` call, the receivers are grouped by whether or not +they are async before being called. This means that an asynchronous receiver +registered before a synchronous receiver may be executed after the synchronous +receiver. In addition, async receivers are executed concurrently using +``asyncio.gather()``. + +All built-in signals, except those in the async request-response cycle, are +dispatched using :meth:`Signal.send`. + +.. versionchanged:: 5.0 + + Support for asynchronous signals was added. + Disconnecting signals ===================== diff --git a/tests/postgres_tests/test_apps.py b/tests/postgres_tests/test_apps.py index d9fb962251..3fdd7c3faf 100644 --- a/tests/postgres_tests/test_apps.py +++ b/tests/postgres_tests/test_apps.py @@ -31,14 +31,14 @@ class PostgresConfigTests(TestCase): from django.contrib.postgres.signals import register_type_handlers self.assertNotIn( - register_type_handlers, connection_created._live_receivers(None) + register_type_handlers, connection_created._live_receivers(None)[0] ) with modify_settings(INSTALLED_APPS={"append": "django.contrib.postgres"}): self.assertIn( - register_type_handlers, connection_created._live_receivers(None) + register_type_handlers, connection_created._live_receivers(None)[0] ) self.assertNotIn( - register_type_handlers, connection_created._live_receivers(None) + register_type_handlers, connection_created._live_receivers(None)[0] ) def test_register_serializer_for_migrations(self): diff --git a/tests/signals/tests.py b/tests/signals/tests.py index 0385033b07..0f161eeeb1 100644 --- a/tests/signals/tests.py +++ b/tests/signals/tests.py @@ -1,5 +1,7 @@ +import asyncio from unittest import mock +from django import dispatch from django.apps.registry import Apps from django.db import models from django.db.models import signals @@ -530,3 +532,94 @@ class LazyModelRefTests(BaseSignalSetup, SimpleTestCase): apps2 = Apps() signals.post_init.connect(self.receiver, sender=Book, apps=apps2) self.assertEqual(list(apps2._pending_operations), []) + + +class SyncHandler: + param = 0 + + def __call__(self, **kwargs): + self.param += 1 + return self.param + + +class AsyncHandler: + _is_coroutine = asyncio.coroutines._is_coroutine + param = 0 + + async def __call__(self, **kwargs): + self.param += 1 + return self.param + + +class AsyncReceiversTests(SimpleTestCase): + async def test_asend(self): + sync_handler = SyncHandler() + async_handler = AsyncHandler() + signal = dispatch.Signal() + signal.connect(sync_handler) + signal.connect(async_handler) + result = await signal.asend(self.__class__) + self.assertEqual(result, [(sync_handler, 1), (async_handler, 1)]) + + def test_send(self): + sync_handler = SyncHandler() + async_handler = AsyncHandler() + signal = dispatch.Signal() + signal.connect(sync_handler) + signal.connect(async_handler) + result = signal.send(self.__class__) + self.assertEqual(result, [(sync_handler, 1), (async_handler, 1)]) + + def test_send_robust(self): + class ReceiverException(Exception): + pass + + receiver_exception = ReceiverException() + + async def failing_async_handler(**kwargs): + raise receiver_exception + + sync_handler = SyncHandler() + async_handler = AsyncHandler() + signal = dispatch.Signal() + signal.connect(failing_async_handler) + signal.connect(async_handler) + signal.connect(sync_handler) + result = signal.send_robust(self.__class__) + # The ordering here is different than the order that signals were + # connected in. + self.assertEqual( + result, + [ + (sync_handler, 1), + (failing_async_handler, receiver_exception), + (async_handler, 1), + ], + ) + + async def test_asend_robust(self): + class ReceiverException(Exception): + pass + + receiver_exception = ReceiverException() + + async def failing_async_handler(**kwargs): + raise receiver_exception + + sync_handler = SyncHandler() + async_handler = AsyncHandler() + signal = dispatch.Signal() + signal.connect(failing_async_handler) + signal.connect(async_handler) + signal.connect(sync_handler) + result = await signal.asend_robust(self.__class__) + # The ordering here is different than the order that signals were + # connected in. + self.assertEqual( + result, + [ + (sync_handler, 1), + (failing_async_handler, receiver_exception), + (async_handler, 1), + ], + )