mirror of
https://github.com/django/django.git
synced 2025-01-16 05:12:23 +00:00
a8b70d251d
Backport of 0ed7d155635da9f79d4dd67e4889087d3673c6da from master
200 lines
6.5 KiB
Python
200 lines
6.5 KiB
Python
import gc
|
|
import sys
|
|
import time
|
|
import unittest
|
|
import weakref
|
|
from types import TracebackType
|
|
|
|
from django.dispatch import Signal, receiver
|
|
|
|
if sys.platform.startswith('java'):
|
|
def garbage_collect():
|
|
# Some JVM GCs will execute finalizers in a different thread, meaning
|
|
# we need to wait for that to complete before we go on looking for the
|
|
# effects of that.
|
|
gc.collect()
|
|
time.sleep(0.1)
|
|
elif hasattr(sys, "pypy_version_info"):
|
|
def garbage_collect():
|
|
# Collecting weakreferences can take two collections on PyPy.
|
|
gc.collect()
|
|
gc.collect()
|
|
else:
|
|
def garbage_collect():
|
|
gc.collect()
|
|
|
|
|
|
def receiver_1_arg(val, **kwargs):
|
|
return val
|
|
|
|
|
|
class Callable(object):
|
|
def __call__(self, val, **kwargs):
|
|
return val
|
|
|
|
def a(self, val, **kwargs):
|
|
return val
|
|
|
|
a_signal = Signal(providing_args=["val"])
|
|
b_signal = Signal(providing_args=["val"])
|
|
c_signal = Signal(providing_args=["val"])
|
|
d_signal = Signal(providing_args=["val"], use_caching=True)
|
|
|
|
|
|
class DispatcherTests(unittest.TestCase):
|
|
"""Test suite for dispatcher (barely started)"""
|
|
|
|
def assertTestIsClean(self, signal):
|
|
"""Assert that everything has been cleaned up automatically"""
|
|
# Note that dead weakref cleanup happens as side effect of using
|
|
# the signal's receivers through the signals API. So, first do a
|
|
# call to an API method to force cleanup.
|
|
self.assertFalse(signal.has_listeners())
|
|
self.assertEqual(signal.receivers, [])
|
|
|
|
def test_exact(self):
|
|
a_signal.connect(receiver_1_arg, sender=self)
|
|
expected = [(receiver_1_arg, "test")]
|
|
result = a_signal.send(sender=self, val="test")
|
|
self.assertEqual(result, expected)
|
|
a_signal.disconnect(receiver_1_arg, sender=self)
|
|
self.assertTestIsClean(a_signal)
|
|
|
|
def test_ignored_sender(self):
|
|
a_signal.connect(receiver_1_arg)
|
|
expected = [(receiver_1_arg, "test")]
|
|
result = a_signal.send(sender=self, val="test")
|
|
self.assertEqual(result, expected)
|
|
a_signal.disconnect(receiver_1_arg)
|
|
self.assertTestIsClean(a_signal)
|
|
|
|
def test_garbage_collected(self):
|
|
a = Callable()
|
|
a_signal.connect(a.a, sender=self)
|
|
expected = []
|
|
del a
|
|
garbage_collect()
|
|
result = a_signal.send(sender=self, val="test")
|
|
self.assertEqual(result, expected)
|
|
self.assertTestIsClean(a_signal)
|
|
|
|
def test_cached_garbaged_collected(self):
|
|
"""
|
|
Make sure signal caching sender receivers don't prevent garbage
|
|
collection of senders.
|
|
"""
|
|
class sender:
|
|
pass
|
|
wref = weakref.ref(sender)
|
|
d_signal.connect(receiver_1_arg)
|
|
d_signal.send(sender, val='garbage')
|
|
del sender
|
|
garbage_collect()
|
|
try:
|
|
self.assertIsNone(wref())
|
|
finally:
|
|
# Disconnect after reference check since it flushes the tested cache.
|
|
d_signal.disconnect(receiver_1_arg)
|
|
|
|
def test_multiple_registration(self):
|
|
a = Callable()
|
|
a_signal.connect(a)
|
|
a_signal.connect(a)
|
|
a_signal.connect(a)
|
|
a_signal.connect(a)
|
|
a_signal.connect(a)
|
|
a_signal.connect(a)
|
|
result = a_signal.send(sender=self, val="test")
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(len(a_signal.receivers), 1)
|
|
del a
|
|
del result
|
|
garbage_collect()
|
|
self.assertTestIsClean(a_signal)
|
|
|
|
def test_uid_registration(self):
|
|
def uid_based_receiver_1(**kwargs):
|
|
pass
|
|
|
|
def uid_based_receiver_2(**kwargs):
|
|
pass
|
|
|
|
a_signal.connect(uid_based_receiver_1, dispatch_uid="uid")
|
|
a_signal.connect(uid_based_receiver_2, dispatch_uid="uid")
|
|
self.assertEqual(len(a_signal.receivers), 1)
|
|
a_signal.disconnect(dispatch_uid="uid")
|
|
self.assertTestIsClean(a_signal)
|
|
|
|
def test_robust(self):
|
|
"""Test the send_robust() function"""
|
|
def fails(val, **kwargs):
|
|
raise ValueError('this')
|
|
a_signal.connect(fails)
|
|
result = a_signal.send_robust(sender=self, val="test")
|
|
err = result[0][1]
|
|
self.assertIsInstance(err, ValueError)
|
|
self.assertEqual(err.args, ('this',))
|
|
self.assertTrue(hasattr(err, '__traceback__'))
|
|
self.assertTrue(isinstance(err.__traceback__, TracebackType))
|
|
a_signal.disconnect(fails)
|
|
self.assertTestIsClean(a_signal)
|
|
|
|
def test_disconnection(self):
|
|
receiver_1 = Callable()
|
|
receiver_2 = Callable()
|
|
receiver_3 = Callable()
|
|
a_signal.connect(receiver_1)
|
|
a_signal.connect(receiver_2)
|
|
a_signal.connect(receiver_3)
|
|
a_signal.disconnect(receiver_1)
|
|
del receiver_2
|
|
garbage_collect()
|
|
a_signal.disconnect(receiver_3)
|
|
self.assertTestIsClean(a_signal)
|
|
|
|
def test_values_returned_by_disconnection(self):
|
|
receiver_1 = Callable()
|
|
receiver_2 = Callable()
|
|
a_signal.connect(receiver_1)
|
|
receiver_1_disconnected = a_signal.disconnect(receiver_1)
|
|
receiver_2_disconnected = a_signal.disconnect(receiver_2)
|
|
self.assertTrue(receiver_1_disconnected)
|
|
self.assertFalse(receiver_2_disconnected)
|
|
self.assertTestIsClean(a_signal)
|
|
|
|
def test_has_listeners(self):
|
|
self.assertFalse(a_signal.has_listeners())
|
|
self.assertFalse(a_signal.has_listeners(sender=object()))
|
|
receiver_1 = Callable()
|
|
a_signal.connect(receiver_1)
|
|
self.assertTrue(a_signal.has_listeners())
|
|
self.assertTrue(a_signal.has_listeners(sender=object()))
|
|
a_signal.disconnect(receiver_1)
|
|
self.assertFalse(a_signal.has_listeners())
|
|
self.assertFalse(a_signal.has_listeners(sender=object()))
|
|
|
|
|
|
class ReceiverTestCase(unittest.TestCase):
|
|
"""
|
|
Test suite for receiver.
|
|
"""
|
|
def test_receiver_single_signal(self):
|
|
@receiver(a_signal)
|
|
def f(val, **kwargs):
|
|
self.state = val
|
|
self.state = False
|
|
a_signal.send(sender=self, val=True)
|
|
self.assertTrue(self.state)
|
|
|
|
def test_receiver_signal_list(self):
|
|
@receiver([a_signal, b_signal, c_signal])
|
|
def f(val, **kwargs):
|
|
self.state.append(val)
|
|
self.state = []
|
|
a_signal.send(sender=self, val='a')
|
|
c_signal.send(sender=self, val='c')
|
|
b_signal.send(sender=self, val='b')
|
|
self.assertIn('a', self.state)
|
|
self.assertIn('b', self.state)
|
|
self.assertIn('c', self.state)
|