mirror of
				https://github.com/django/django.git
				synced 2025-10-31 01:25:32 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			231 lines
		
	
	
		
			8.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			231 lines
		
	
	
		
			8.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from io import BytesIO
 | |
| from socketserver import ThreadingMixIn
 | |
| 
 | |
| from django.core.handlers.wsgi import WSGIRequest
 | |
| from django.core.servers.basehttp import WSGIRequestHandler, WSGIServer
 | |
| from django.test import SimpleTestCase
 | |
| from django.test.client import RequestFactory
 | |
| from django.test.utils import captured_stderr
 | |
| 
 | |
| 
 | |
| class Stub(ThreadingMixIn):
 | |
|     def __init__(self, **kwargs):
 | |
|         self.__dict__.update(kwargs)
 | |
| 
 | |
|     def sendall(self, data):
 | |
|         self.makefile("wb").write(data)
 | |
| 
 | |
| 
 | |
| class UnclosableBytesIO(BytesIO):
 | |
|     def close(self):
 | |
|         # WSGIRequestHandler closes the output file; we need to make this a
 | |
|         # no-op so we can still read its contents.
 | |
|         pass
 | |
| 
 | |
| 
 | |
| class WSGIRequestHandlerTestCase(SimpleTestCase):
 | |
|     request_factory = RequestFactory()
 | |
| 
 | |
|     def test_log_message(self):
 | |
|         request = WSGIRequest(self.request_factory.get("/").environ)
 | |
|         request.makefile = lambda *args, **kwargs: BytesIO()
 | |
|         handler = WSGIRequestHandler(request, "192.168.0.2", None)
 | |
|         level_status_codes = {
 | |
|             "info": [200, 301, 304],
 | |
|             "warning": [400, 403, 404],
 | |
|             "error": [500, 503],
 | |
|         }
 | |
|         for level, status_codes in level_status_codes.items():
 | |
|             for status_code in status_codes:
 | |
|                 # The correct level gets the message.
 | |
|                 with self.assertLogs("django.server", level.upper()) as cm:
 | |
|                     handler.log_message("GET %s %s", "A", str(status_code))
 | |
|                 self.assertIn("GET A %d" % status_code, cm.output[0])
 | |
|                 # Incorrect levels don't have any messages.
 | |
|                 for wrong_level in level_status_codes:
 | |
|                     if wrong_level != level:
 | |
|                         with self.assertLogs("django.server", "INFO") as cm:
 | |
|                             handler.log_message("GET %s %s", "A", str(status_code))
 | |
|                         self.assertNotEqual(
 | |
|                             cm.records[0].levelname, wrong_level.upper()
 | |
|                         )
 | |
| 
 | |
|     def test_https(self):
 | |
|         request = WSGIRequest(self.request_factory.get("/").environ)
 | |
|         request.makefile = lambda *args, **kwargs: BytesIO()
 | |
| 
 | |
|         handler = WSGIRequestHandler(request, "192.168.0.2", None)
 | |
| 
 | |
|         with self.assertLogs("django.server", "ERROR") as cm:
 | |
|             handler.log_message("GET %s %s", "\x16\x03", "4")
 | |
|         self.assertEqual(
 | |
|             "You're accessing the development server over HTTPS, "
 | |
|             "but it only supports HTTP.",
 | |
|             cm.records[0].getMessage(),
 | |
|         )
 | |
| 
 | |
|     def test_strips_underscore_headers(self):
 | |
|         """WSGIRequestHandler ignores headers containing underscores.
 | |
| 
 | |
|         This follows the lead of nginx and Apache 2.4, and is to avoid
 | |
|         ambiguity between dashes and underscores in mapping to WSGI environ,
 | |
|         which can have security implications.
 | |
|         """
 | |
| 
 | |
|         def test_app(environ, start_response):
 | |
|             """A WSGI app that just reflects its HTTP environ."""
 | |
|             start_response("200 OK", [])
 | |
|             http_environ_items = sorted(
 | |
|                 "%s:%s" % (k, v) for k, v in environ.items() if k.startswith("HTTP_")
 | |
|             )
 | |
|             yield (",".join(http_environ_items)).encode()
 | |
| 
 | |
|         rfile = BytesIO()
 | |
|         rfile.write(b"GET / HTTP/1.0\r\n")
 | |
|         rfile.write(b"Some-Header: good\r\n")
 | |
|         rfile.write(b"Some_Header: bad\r\n")
 | |
|         rfile.write(b"Other_Header: bad\r\n")
 | |
|         rfile.seek(0)
 | |
| 
 | |
|         wfile = UnclosableBytesIO()
 | |
| 
 | |
|         def makefile(mode, *a, **kw):
 | |
|             if mode == "rb":
 | |
|                 return rfile
 | |
|             elif mode == "wb":
 | |
|                 return wfile
 | |
| 
 | |
|         request = Stub(makefile=makefile)
 | |
|         server = Stub(base_environ={}, get_app=lambda: test_app)
 | |
| 
 | |
|         # Prevent logging from appearing in test output.
 | |
|         with self.assertLogs("django.server", "INFO"):
 | |
|             # instantiating a handler runs the request as side effect
 | |
|             WSGIRequestHandler(request, "192.168.0.2", server)
 | |
| 
 | |
|         wfile.seek(0)
 | |
|         body = list(wfile.readlines())[-1]
 | |
| 
 | |
|         self.assertEqual(body, b"HTTP_SOME_HEADER:good")
 | |
| 
 | |
|     def test_no_body_returned_for_head_requests(self):
 | |
|         hello_world_body = b"<!DOCTYPE html><html><body>Hello World</body></html>"
 | |
|         content_length = len(hello_world_body)
 | |
| 
 | |
|         def test_app(environ, start_response):
 | |
|             """A WSGI app that returns a hello world."""
 | |
|             start_response("200 OK", [])
 | |
|             return [hello_world_body]
 | |
| 
 | |
|         rfile = BytesIO(b"GET / HTTP/1.0\r\n")
 | |
|         rfile.seek(0)
 | |
| 
 | |
|         wfile = UnclosableBytesIO()
 | |
| 
 | |
|         def makefile(mode, *a, **kw):
 | |
|             if mode == "rb":
 | |
|                 return rfile
 | |
|             elif mode == "wb":
 | |
|                 return wfile
 | |
| 
 | |
|         request = Stub(makefile=makefile)
 | |
|         server = Stub(base_environ={}, get_app=lambda: test_app)
 | |
| 
 | |
|         # Prevent logging from appearing in test output.
 | |
|         with self.assertLogs("django.server", "INFO"):
 | |
|             # Instantiating a handler runs the request as side effect.
 | |
|             WSGIRequestHandler(request, "192.168.0.2", server)
 | |
| 
 | |
|         wfile.seek(0)
 | |
|         lines = list(wfile.readlines())
 | |
|         body = lines[-1]
 | |
|         # The body is returned in a GET response.
 | |
|         self.assertEqual(body, hello_world_body)
 | |
|         self.assertIn(f"Content-Length: {content_length}\r\n".encode(), lines)
 | |
|         self.assertNotIn(b"Connection: close\r\n", lines)
 | |
| 
 | |
|         rfile = BytesIO(b"HEAD / HTTP/1.0\r\n")
 | |
|         rfile.seek(0)
 | |
|         wfile = UnclosableBytesIO()
 | |
| 
 | |
|         with self.assertLogs("django.server", "INFO"):
 | |
|             WSGIRequestHandler(request, "192.168.0.2", server)
 | |
| 
 | |
|         wfile.seek(0)
 | |
|         lines = list(wfile.readlines())
 | |
|         body = lines[-1]
 | |
|         # The body is not returned in a HEAD response.
 | |
|         self.assertEqual(body, b"\r\n")
 | |
|         self.assertIs(
 | |
|             any([line.startswith(b"Content-Length:") for line in lines]), False
 | |
|         )
 | |
|         self.assertNotIn(b"Connection: close\r\n", lines)
 | |
| 
 | |
|     def test_non_zero_content_length_set_head_request(self):
 | |
|         hello_world_body = b"<!DOCTYPE html><html><body>Hello World</body></html>"
 | |
|         content_length = len(hello_world_body)
 | |
| 
 | |
|         def test_app(environ, start_response):
 | |
|             """
 | |
|             A WSGI app that returns a hello world with non-zero Content-Length.
 | |
|             """
 | |
|             start_response("200 OK", [("Content-length", str(content_length))])
 | |
|             return [hello_world_body]
 | |
| 
 | |
|         rfile = BytesIO(b"HEAD / HTTP/1.0\r\n")
 | |
|         rfile.seek(0)
 | |
| 
 | |
|         wfile = UnclosableBytesIO()
 | |
| 
 | |
|         def makefile(mode, *a, **kw):
 | |
|             if mode == "rb":
 | |
|                 return rfile
 | |
|             elif mode == "wb":
 | |
|                 return wfile
 | |
| 
 | |
|         request = Stub(makefile=makefile)
 | |
|         server = Stub(base_environ={}, get_app=lambda: test_app)
 | |
| 
 | |
|         # Prevent logging from appearing in test output.
 | |
|         with self.assertLogs("django.server", "INFO"):
 | |
|             # Instantiating a handler runs the request as side effect.
 | |
|             WSGIRequestHandler(request, "192.168.0.2", server)
 | |
| 
 | |
|         wfile.seek(0)
 | |
|         lines = list(wfile.readlines())
 | |
|         body = lines[-1]
 | |
|         # The body is not returned in a HEAD response.
 | |
|         self.assertEqual(body, b"\r\n")
 | |
|         # Non-zero Content-Length is not removed.
 | |
|         self.assertEqual(lines[-2], f"Content-length: {content_length}\r\n".encode())
 | |
|         self.assertNotIn(b"Connection: close\r\n", lines)
 | |
| 
 | |
| 
 | |
| class WSGIServerTestCase(SimpleTestCase):
 | |
|     request_factory = RequestFactory()
 | |
| 
 | |
|     def test_broken_pipe_errors(self):
 | |
|         """WSGIServer handles broken pipe errors."""
 | |
|         request = WSGIRequest(self.request_factory.get("/").environ)
 | |
|         client_address = ("192.168.2.0", 8080)
 | |
|         msg = f"- Broken pipe from {client_address}"
 | |
|         tests = [
 | |
|             BrokenPipeError,
 | |
|             ConnectionAbortedError,
 | |
|             ConnectionResetError,
 | |
|         ]
 | |
|         for exception in tests:
 | |
|             with self.subTest(exception=exception):
 | |
|                 try:
 | |
|                     server = WSGIServer(("localhost", 0), WSGIRequestHandler)
 | |
|                     try:
 | |
|                         raise exception()
 | |
|                     except Exception:
 | |
|                         with captured_stderr() as err:
 | |
|                             with self.assertLogs("django.server", "INFO") as cm:
 | |
|                                 server.handle_error(request, client_address)
 | |
|                         self.assertEqual(err.getvalue(), "")
 | |
|                         self.assertEqual(cm.records[0].getMessage(), msg)
 | |
|                 finally:
 | |
|                     server.server_close()
 |