mirror of
				https://github.com/django/django.git
				synced 2025-10-29 08:36:09 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			237 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			237 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import asyncio
 | |
| import sys
 | |
| import threading
 | |
| from pathlib import Path
 | |
| from unittest import skipIf
 | |
| 
 | |
| from asgiref.sync import SyncToAsync
 | |
| from asgiref.testing import ApplicationCommunicator
 | |
| 
 | |
| from django.contrib.staticfiles.handlers import ASGIStaticFilesHandler
 | |
| from django.core.asgi import get_asgi_application
 | |
| from django.core.signals import request_finished, request_started
 | |
| from django.db import close_old_connections
 | |
| from django.test import (
 | |
|     AsyncRequestFactory, SimpleTestCase, modify_settings, override_settings,
 | |
| )
 | |
| from django.utils.http import http_date
 | |
| 
 | |
| from .urls import test_filename
 | |
| 
 | |
| TEST_STATIC_ROOT = Path(__file__).parent / 'project' / 'static'
 | |
| 
 | |
| 
 | |
| @skipIf(sys.platform == 'win32' and (3, 8, 0) < sys.version_info < (3, 8, 1), 'https://bugs.python.org/issue38563')
 | |
| @override_settings(ROOT_URLCONF='asgi.urls')
 | |
| class ASGITest(SimpleTestCase):
 | |
|     async_request_factory = AsyncRequestFactory()
 | |
| 
 | |
|     def setUp(self):
 | |
|         request_started.disconnect(close_old_connections)
 | |
| 
 | |
|     def tearDown(self):
 | |
|         request_started.connect(close_old_connections)
 | |
| 
 | |
|     async def test_get_asgi_application(self):
 | |
|         """
 | |
|         get_asgi_application() returns a functioning ASGI callable.
 | |
|         """
 | |
|         application = get_asgi_application()
 | |
|         # Construct HTTP request.
 | |
|         scope = self.async_request_factory._base_scope(path='/')
 | |
|         communicator = ApplicationCommunicator(application, scope)
 | |
|         await communicator.send_input({'type': 'http.request'})
 | |
|         # Read the response.
 | |
|         response_start = await communicator.receive_output()
 | |
|         self.assertEqual(response_start['type'], 'http.response.start')
 | |
|         self.assertEqual(response_start['status'], 200)
 | |
|         self.assertEqual(
 | |
|             set(response_start['headers']),
 | |
|             {
 | |
|                 (b'Content-Length', b'12'),
 | |
|                 (b'Content-Type', b'text/html; charset=utf-8'),
 | |
|             },
 | |
|         )
 | |
|         response_body = await communicator.receive_output()
 | |
|         self.assertEqual(response_body['type'], 'http.response.body')
 | |
|         self.assertEqual(response_body['body'], b'Hello World!')
 | |
| 
 | |
|     async def test_file_response(self):
 | |
|         """
 | |
|         Makes sure that FileResponse works over ASGI.
 | |
|         """
 | |
|         application = get_asgi_application()
 | |
|         # Construct HTTP request.
 | |
|         scope = self.async_request_factory._base_scope(path='/file/')
 | |
|         communicator = ApplicationCommunicator(application, scope)
 | |
|         await communicator.send_input({'type': 'http.request'})
 | |
|         # Get the file content.
 | |
|         with open(test_filename, 'rb') as test_file:
 | |
|             test_file_contents = test_file.read()
 | |
|         # Read the response.
 | |
|         response_start = await communicator.receive_output()
 | |
|         self.assertEqual(response_start['type'], 'http.response.start')
 | |
|         self.assertEqual(response_start['status'], 200)
 | |
|         self.assertEqual(
 | |
|             set(response_start['headers']),
 | |
|             {
 | |
|                 (b'Content-Length', str(len(test_file_contents)).encode('ascii')),
 | |
|                 (b'Content-Type', b'text/plain' if sys.platform == 'win32' else b'text/x-python'),
 | |
|                 (b'Content-Disposition', b'inline; filename="urls.py"'),
 | |
|             },
 | |
|         )
 | |
|         response_body = await communicator.receive_output()
 | |
|         self.assertEqual(response_body['type'], 'http.response.body')
 | |
|         self.assertEqual(response_body['body'], test_file_contents)
 | |
|         # Allow response.close() to finish.
 | |
|         await communicator.wait()
 | |
| 
 | |
|     @modify_settings(INSTALLED_APPS={'append': 'django.contrib.staticfiles'})
 | |
|     @override_settings(
 | |
|         STATIC_URL='/static/',
 | |
|         STATIC_ROOT=TEST_STATIC_ROOT,
 | |
|         STATICFILES_DIRS=[TEST_STATIC_ROOT],
 | |
|         STATICFILES_FINDERS=[
 | |
|             'django.contrib.staticfiles.finders.FileSystemFinder',
 | |
|         ],
 | |
|     )
 | |
|     async def test_static_file_response(self):
 | |
|         application = ASGIStaticFilesHandler(get_asgi_application())
 | |
|         # Construct HTTP request.
 | |
|         scope = self.async_request_factory._base_scope(path='/static/file.txt')
 | |
|         communicator = ApplicationCommunicator(application, scope)
 | |
|         await communicator.send_input({'type': 'http.request'})
 | |
|         # Get the file content.
 | |
|         file_path = TEST_STATIC_ROOT / 'file.txt'
 | |
|         with open(file_path, 'rb') as test_file:
 | |
|             test_file_contents = test_file.read()
 | |
|         # Read the response.
 | |
|         stat = file_path.stat()
 | |
|         response_start = await communicator.receive_output()
 | |
|         self.assertEqual(response_start['type'], 'http.response.start')
 | |
|         self.assertEqual(response_start['status'], 200)
 | |
|         self.assertEqual(
 | |
|             set(response_start['headers']),
 | |
|             {
 | |
|                 (b'Content-Length', str(len(test_file_contents)).encode('ascii')),
 | |
|                 (b'Content-Type', b'text/plain'),
 | |
|                 (b'Content-Disposition', b'inline; filename="file.txt"'),
 | |
|                 (b'Last-Modified', http_date(stat.st_mtime).encode('ascii')),
 | |
|             },
 | |
|         )
 | |
|         response_body = await communicator.receive_output()
 | |
|         self.assertEqual(response_body['type'], 'http.response.body')
 | |
|         self.assertEqual(response_body['body'], test_file_contents)
 | |
|         # Allow response.close() to finish.
 | |
|         await communicator.wait()
 | |
| 
 | |
|     async def test_headers(self):
 | |
|         application = get_asgi_application()
 | |
|         communicator = ApplicationCommunicator(
 | |
|             application,
 | |
|             self.async_request_factory._base_scope(
 | |
|                 path='/meta/',
 | |
|                 headers=[
 | |
|                     [b'content-type', b'text/plain; charset=utf-8'],
 | |
|                     [b'content-length', b'77'],
 | |
|                     [b'referer', b'Scotland'],
 | |
|                     [b'referer', b'Wales'],
 | |
|                 ],
 | |
|             ),
 | |
|         )
 | |
|         await communicator.send_input({'type': 'http.request'})
 | |
|         response_start = await communicator.receive_output()
 | |
|         self.assertEqual(response_start['type'], 'http.response.start')
 | |
|         self.assertEqual(response_start['status'], 200)
 | |
|         self.assertEqual(
 | |
|             set(response_start['headers']),
 | |
|             {
 | |
|                 (b'Content-Length', b'19'),
 | |
|                 (b'Content-Type', b'text/plain; charset=utf-8'),
 | |
|             },
 | |
|         )
 | |
|         response_body = await communicator.receive_output()
 | |
|         self.assertEqual(response_body['type'], 'http.response.body')
 | |
|         self.assertEqual(response_body['body'], b'From Scotland,Wales')
 | |
| 
 | |
|     async def test_get_query_string(self):
 | |
|         application = get_asgi_application()
 | |
|         for query_string in (b'name=Andrew', 'name=Andrew'):
 | |
|             with self.subTest(query_string=query_string):
 | |
|                 scope = self.async_request_factory._base_scope(
 | |
|                     path='/',
 | |
|                     query_string=query_string,
 | |
|                 )
 | |
|                 communicator = ApplicationCommunicator(application, scope)
 | |
|                 await communicator.send_input({'type': 'http.request'})
 | |
|                 response_start = await communicator.receive_output()
 | |
|                 self.assertEqual(response_start['type'], 'http.response.start')
 | |
|                 self.assertEqual(response_start['status'], 200)
 | |
|                 response_body = await communicator.receive_output()
 | |
|                 self.assertEqual(response_body['type'], 'http.response.body')
 | |
|                 self.assertEqual(response_body['body'], b'Hello Andrew!')
 | |
| 
 | |
|     async def test_disconnect(self):
 | |
|         application = get_asgi_application()
 | |
|         scope = self.async_request_factory._base_scope(path='/')
 | |
|         communicator = ApplicationCommunicator(application, scope)
 | |
|         await communicator.send_input({'type': 'http.disconnect'})
 | |
|         with self.assertRaises(asyncio.TimeoutError):
 | |
|             await communicator.receive_output()
 | |
| 
 | |
|     async def test_wrong_connection_type(self):
 | |
|         application = get_asgi_application()
 | |
|         scope = self.async_request_factory._base_scope(path='/', type='other')
 | |
|         communicator = ApplicationCommunicator(application, scope)
 | |
|         await communicator.send_input({'type': 'http.request'})
 | |
|         msg = 'Django can only handle ASGI/HTTP connections, not other.'
 | |
|         with self.assertRaisesMessage(ValueError, msg):
 | |
|             await communicator.receive_output()
 | |
| 
 | |
|     async def test_non_unicode_query_string(self):
 | |
|         application = get_asgi_application()
 | |
|         scope = self.async_request_factory._base_scope(path='/', query_string=b'\xff')
 | |
|         communicator = ApplicationCommunicator(application, scope)
 | |
|         await communicator.send_input({'type': 'http.request'})
 | |
|         response_start = await communicator.receive_output()
 | |
|         self.assertEqual(response_start['type'], 'http.response.start')
 | |
|         self.assertEqual(response_start['status'], 400)
 | |
|         response_body = await communicator.receive_output()
 | |
|         self.assertEqual(response_body['type'], 'http.response.body')
 | |
|         self.assertEqual(response_body['body'], b'')
 | |
| 
 | |
|     async def test_request_lifecycle_signals_dispatched_with_thread_sensitive(self):
 | |
|         class SignalHandler:
 | |
|             """Track threads handler is dispatched on."""
 | |
|             threads = []
 | |
| 
 | |
|             def __call__(self, **kwargs):
 | |
|                 self.threads.append(threading.current_thread())
 | |
| 
 | |
|         signal_handler = SignalHandler()
 | |
|         request_started.connect(signal_handler)
 | |
|         request_finished.connect(signal_handler)
 | |
| 
 | |
|         # Perform a basic request.
 | |
|         application = get_asgi_application()
 | |
|         scope = self.async_request_factory._base_scope(path='/')
 | |
|         communicator = ApplicationCommunicator(application, scope)
 | |
|         await communicator.send_input({'type': 'http.request'})
 | |
|         response_start = await communicator.receive_output()
 | |
|         self.assertEqual(response_start['type'], 'http.response.start')
 | |
|         self.assertEqual(response_start['status'], 200)
 | |
|         response_body = await communicator.receive_output()
 | |
|         self.assertEqual(response_body['type'], 'http.response.body')
 | |
|         self.assertEqual(response_body['body'], b'Hello World!')
 | |
|         # Give response.close() time to finish.
 | |
|         await communicator.wait()
 | |
| 
 | |
|         # At this point, AsyncToSync does not have a current executor. Thus
 | |
|         # SyncToAsync falls-back to .single_thread_executor.
 | |
|         target_thread = next(iter(SyncToAsync.single_thread_executor._threads))
 | |
|         request_started_thread, request_finished_thread = signal_handler.threads
 | |
|         self.assertEqual(request_started_thread, target_thread)
 | |
|         self.assertEqual(request_finished_thread, target_thread)
 | |
|         request_started.disconnect(signal_handler)
 | |
|         request_finished.disconnect(signal_handler)
 |