diff --git a/django/http/request.py b/django/http/request.py index 986d4eee89..62ed1d58f0 100644 --- a/django/http/request.py +++ b/django/http/request.py @@ -31,9 +31,7 @@ from django.utils.http import is_same_domain, parse_header_parameters from django.utils.regex_helper import _lazy_re_compile RAISE_ERROR = object() -host_validation_re = _lazy_re_compile( - r"^([a-z0-9.-]+|\[[a-f0-9]*:[a-f0-9.:]+\])(?::([0-9]+))?$" -) +host_validation_re = _lazy_re_compile(r"^([a-z0-9.-]+|\[[a-f0-9]*:[a-f0-9\.:]+\])(?::(\d+))?$") class UnreadablePostError(OSError): @@ -145,38 +143,48 @@ class HttpRequest: else: self.encoding = self.content_params["charset"] - def _get_raw_host(self): - """ - Return the HTTP host using the environment or request headers. Skip - allowed hosts protection, so may return an insecure host. - """ - # We try three options, in order of decreasing preference. - if settings.USE_X_FORWARDED_HOST and ("HTTP_X_FORWARDED_HOST" in self.META): - host = self.META["HTTP_X_FORWARDED_HOST"] - elif "HTTP_HOST" in self.META: - host = self.META["HTTP_HOST"] + @cached_property + def _parsed_host_header(self): + use_x_fw_host = settings.USE_X_FORWARDED_HOST + use_x_fw_port = settings.USE_X_FORWARDED_PORT + + port_in_x_fw_host = False + default_port = ('443' if self.is_secure() else '80') + + if use_x_fw_host and 'HTTP_X_FORWARDED_HOST' in self.META: + host, port = _parse_host_header(self.META['HTTP_X_FORWARDED_HOST']) + port_in_x_fw_host = port != '' + elif 'HTTP_HOST' in self.META: + host, port = _parse_host_header(self.META['HTTP_HOST']) else: # Reconstruct the host using the algorithm from PEP 333. - host = self.META["SERVER_NAME"] - server_port = self.get_port() - if server_port != ("443" if self.is_secure() else "80"): - host = "%s:%s" % (host, server_port) - return host + host, port = self.META['SERVER_NAME'], str(self.META['SERVER_PORT']) + if port == default_port: + port = '' + + if use_x_fw_port and 'HTTP_X_FORWARDED_PORT' in self.META: + if port_in_x_fw_host: + raise ImproperlyConfigured('HTTP_X_FORWARDED_HOST contains a port number ' + 'and USE_X_FORWARDED_PORT is set to True') + port = self.META['HTTP_X_FORWARDED_PORT'] + + reconstructed = '%s:%s' % (host, port) if port else host + return host, port or default_port, reconstructed def get_host(self): """Return the HTTP host using the environment or request headers.""" - host = self._get_raw_host() + _, _, host_header = self._parsed_host_header # Allow variants of localhost if ALLOWED_HOSTS is empty and DEBUG=True. allowed_hosts = settings.ALLOWED_HOSTS if settings.DEBUG and not allowed_hosts: allowed_hosts = [".localhost", "127.0.0.1", "[::1]"] - domain, port = split_domain_port(host) + domain, port = split_domain_port(host_header) if domain and validate_host(domain, allowed_hosts): - return host + return host_header else: - msg = "Invalid HTTP_HOST header: %r." % host + msg = "Invalid HTTP_HOST header: %r." % host_header if domain: msg += " You may need to add %r to ALLOWED_HOSTS." % domain else: @@ -187,11 +195,8 @@ class HttpRequest: def get_port(self): """Return the port number for the request as a string.""" - if settings.USE_X_FORWARDED_PORT and "HTTP_X_FORWARDED_PORT" in self.META: - port = self.META["HTTP_X_FORWARDED_PORT"] - else: - port = self.META["SERVER_PORT"] - return str(port) + _, port, _ = self._parsed_host_header + return port def get_full_path(self, force_append_slash=False): return self._get_full_path(self.path, force_append_slash) @@ -236,6 +241,18 @@ class HttpRequest: raise return value + def get_raw_uri(self): + """ + Return an absolute URI from variables available in this request. Skip + allowed hosts protection, so may return insecure URI. + """ + _, _, host_header = self._parsed_host_header + return '{scheme}://{host}{path}'.format( + scheme=self.scheme, + host=host_header, + path=self.get_full_path(), + ) + def build_absolute_uri(self, location=None): """ Build an absolute URI from the location and the variables available in @@ -763,6 +780,20 @@ def bytes_to_text(s, encoding): return s +def _parse_host_header(host_header): + """ + Returns a (domain, port) tuple for a given host. + + Neither domain name nor port are validated. + """ + + if host_header[-1] == ']': + # It's an IPv6 address without a port. + return host_header, '' + bits = host_header.rsplit(':', 1) + return tuple(bits) if len(bits) == 2 else (bits[0], '') + + def split_domain_port(host): """ Return a (domain, port) tuple from a given host. @@ -770,11 +801,17 @@ def split_domain_port(host): Returned domain is lowercased. If the host is invalid, the domain will be empty. """ - if match := host_validation_re.fullmatch(host.lower()): - domain, port = match.groups(default="") - # Remove a trailing dot (if present) from the domain. - return domain.removesuffix("."), port - return "", "" + host = host.lower() + + host_match = host_validation_re.match(host) + if not host_match: + return '', '' + + domain, port = host_match.groups() + port = port or '' + # Remove a trailing dot (if present) from the domain. + domain = domain[:-1] if domain.endswith('.') else domain + return domain, port def validate_host(host, allowed_hosts): diff --git a/tests/csrf_tests/tests.py b/tests/csrf_tests/tests.py index 956cff11d9..b2a48c2e65 100644 --- a/tests/csrf_tests/tests.py +++ b/tests/csrf_tests/tests.py @@ -831,8 +831,8 @@ class CsrfViewMiddlewareTestMixin(CsrfFunctionTestMixin): def _test_https_good_referer_matches_cookie_domain(self): req = self._get_POST_request_with_token() req._is_secure_override = True - req.META["HTTP_REFERER"] = "https://foo.example.com/" - req.META["SERVER_PORT"] = "443" + req.META['HTTP_HOST'] = 'www.example.com' + req.META['HTTP_REFERER'] = 'https://foo.example.com/' mw = CsrfViewMiddleware(post_form_view) mw.process_request(req) response = mw.process_view(req, post_form_view, (), {}) @@ -841,9 +841,8 @@ class CsrfViewMiddlewareTestMixin(CsrfFunctionTestMixin): def _test_https_good_referer_matches_cookie_domain_with_different_port(self): req = self._get_POST_request_with_token() req._is_secure_override = True - req.META["HTTP_HOST"] = "www.example.com" - req.META["HTTP_REFERER"] = "https://foo.example.com:4443/" - req.META["SERVER_PORT"] = "4443" + req.META['HTTP_HOST'] = 'www.example.com:4443' + req.META['HTTP_REFERER'] = 'https://foo.example.com:4443/' mw = CsrfViewMiddleware(post_form_view) mw.process_request(req) response = mw.process_view(req, post_form_view, (), {}) diff --git a/tests/requests/tests.py b/tests/requests/tests.py new file mode 100644 index 0000000000..b6b1db4273 --- /dev/null +++ b/tests/requests/tests.py @@ -0,0 +1,1053 @@ +from io import BytesIO +from itertools import chain +from urllib.parse import urlencode + +from django.core.exceptions import DisallowedHost, ImproperlyConfigured +from django.core.handlers.wsgi import LimitedStream, WSGIRequest +from django.http import HttpRequest, RawPostDataException, UnreadablePostError +from django.http.multipartparser import MultiPartParserError +from django.http.request import HttpHeaders, split_domain_port +from django.test import RequestFactory, SimpleTestCase, override_settings +from django.test.client import FakePayload + + +class RequestsTests(SimpleTestCase): + def test_httprequest(self): + request = HttpRequest() + self.assertEqual(list(request.GET), []) + self.assertEqual(list(request.POST), []) + self.assertEqual(list(request.COOKIES), []) + self.assertEqual(list(request.META), []) + + # .GET and .POST should be QueryDicts + self.assertEqual(request.GET.urlencode(), '') + self.assertEqual(request.POST.urlencode(), '') + + # and FILES should be MultiValueDict + self.assertEqual(request.FILES.getlist('foo'), []) + + self.assertIsNone(request.content_type) + self.assertIsNone(request.content_params) + + def test_httprequest_full_path(self): + request = HttpRequest() + request.path = '/;some/?awful/=path/foo:bar/' + request.path_info = '/prefix' + request.path + request.META['QUERY_STRING'] = ';some=query&+query=string' + expected = '/%3Bsome/%3Fawful/%3Dpath/foo:bar/?;some=query&+query=string' + self.assertEqual(request.get_full_path(), expected) + self.assertEqual(request.get_full_path_info(), '/prefix' + expected) + + def test_httprequest_full_path_with_query_string_and_fragment(self): + request = HttpRequest() + request.path = '/foo#bar' + request.path_info = '/prefix' + request.path + request.META['QUERY_STRING'] = 'baz#quux' + self.assertEqual(request.get_full_path(), '/foo%23bar?baz#quux') + self.assertEqual(request.get_full_path_info(), '/prefix/foo%23bar?baz#quux') + + def test_httprequest_repr(self): + request = HttpRequest() + request.path = '/somepath/' + request.method = 'GET' + request.GET = {'get-key': 'get-value'} + request.POST = {'post-key': 'post-value'} + request.COOKIES = {'post-key': 'post-value'} + request.META = {'post-key': 'post-value'} + self.assertEqual(repr(request), "") + + def test_httprequest_repr_invalid_method_and_path(self): + request = HttpRequest() + self.assertEqual(repr(request), "") + request = HttpRequest() + request.method = "GET" + self.assertEqual(repr(request), "") + request = HttpRequest() + request.path = "" + self.assertEqual(repr(request), "") + + def test_wsgirequest(self): + request = WSGIRequest({ + 'PATH_INFO': 'bogus', + 'REQUEST_METHOD': 'bogus', + 'CONTENT_TYPE': 'text/html; charset=utf8', + 'wsgi.input': BytesIO(b''), + }) + self.assertEqual(list(request.GET), []) + self.assertEqual(list(request.POST), []) + self.assertEqual(list(request.COOKIES), []) + self.assertEqual( + set(request.META), + {'PATH_INFO', 'REQUEST_METHOD', 'SCRIPT_NAME', 'CONTENT_TYPE', 'wsgi.input'} + ) + self.assertEqual(request.META['PATH_INFO'], 'bogus') + self.assertEqual(request.META['REQUEST_METHOD'], 'bogus') + self.assertEqual(request.META['SCRIPT_NAME'], '') + self.assertEqual(request.content_type, 'text/html') + self.assertEqual(request.content_params, {'charset': 'utf8'}) + + def test_wsgirequest_with_script_name(self): + """ + The request's path is correctly assembled, regardless of whether or + not the SCRIPT_NAME has a trailing slash (#20169). + """ + # With trailing slash + request = WSGIRequest({ + 'PATH_INFO': '/somepath/', + 'SCRIPT_NAME': '/PREFIX/', + 'REQUEST_METHOD': 'get', + 'wsgi.input': BytesIO(b''), + }) + self.assertEqual(request.path, '/PREFIX/somepath/') + # Without trailing slash + request = WSGIRequest({ + 'PATH_INFO': '/somepath/', + 'SCRIPT_NAME': '/PREFIX', + 'REQUEST_METHOD': 'get', + 'wsgi.input': BytesIO(b''), + }) + self.assertEqual(request.path, '/PREFIX/somepath/') + + def test_wsgirequest_script_url_double_slashes(self): + """ + WSGI squashes multiple successive slashes in PATH_INFO, WSGIRequest + should take that into account when populating request.path and + request.META['SCRIPT_NAME'] (#17133). + """ + request = WSGIRequest({ + 'SCRIPT_URL': '/mst/milestones//accounts/login//help', + 'PATH_INFO': '/milestones/accounts/login/help', + 'REQUEST_METHOD': 'get', + 'wsgi.input': BytesIO(b''), + }) + self.assertEqual(request.path, '/mst/milestones/accounts/login/help') + self.assertEqual(request.META['SCRIPT_NAME'], '/mst') + + def test_wsgirequest_with_force_script_name(self): + """ + The FORCE_SCRIPT_NAME setting takes precedence over the request's + SCRIPT_NAME environment parameter (#20169). + """ + with override_settings(FORCE_SCRIPT_NAME='/FORCED_PREFIX/'): + request = WSGIRequest({ + 'PATH_INFO': '/somepath/', + 'SCRIPT_NAME': '/PREFIX/', + 'REQUEST_METHOD': 'get', + 'wsgi.input': BytesIO(b''), + }) + self.assertEqual(request.path, '/FORCED_PREFIX/somepath/') + + def test_wsgirequest_path_with_force_script_name_trailing_slash(self): + """ + The request's path is correctly assembled, regardless of whether or not + the FORCE_SCRIPT_NAME setting has a trailing slash (#20169). + """ + # With trailing slash + with override_settings(FORCE_SCRIPT_NAME='/FORCED_PREFIX/'): + request = WSGIRequest({'PATH_INFO': '/somepath/', 'REQUEST_METHOD': 'get', 'wsgi.input': BytesIO(b'')}) + self.assertEqual(request.path, '/FORCED_PREFIX/somepath/') + # Without trailing slash + with override_settings(FORCE_SCRIPT_NAME='/FORCED_PREFIX'): + request = WSGIRequest({'PATH_INFO': '/somepath/', 'REQUEST_METHOD': 'get', 'wsgi.input': BytesIO(b'')}) + self.assertEqual(request.path, '/FORCED_PREFIX/somepath/') + + def test_wsgirequest_repr(self): + request = WSGIRequest({'REQUEST_METHOD': 'get', 'wsgi.input': BytesIO(b'')}) + self.assertEqual(repr(request), "") + request = WSGIRequest({'PATH_INFO': '/somepath/', 'REQUEST_METHOD': 'get', 'wsgi.input': BytesIO(b'')}) + request.GET = {'get-key': 'get-value'} + request.POST = {'post-key': 'post-value'} + request.COOKIES = {'post-key': 'post-value'} + request.META = {'post-key': 'post-value'} + self.assertEqual(repr(request), "") + + def test_wsgirequest_path_info(self): + def wsgi_str(path_info, encoding='utf-8'): + path_info = path_info.encode(encoding) # Actual URL sent by the browser (bytestring) + path_info = path_info.decode('iso-8859-1') # Value in the WSGI environ dict (native string) + return path_info + # Regression for #19468 + request = WSGIRequest({'PATH_INFO': wsgi_str("/سلام/"), 'REQUEST_METHOD': 'get', 'wsgi.input': BytesIO(b'')}) + self.assertEqual(request.path, "/سلام/") + + # The URL may be incorrectly encoded in a non-UTF-8 encoding (#26971) + request = WSGIRequest({ + 'PATH_INFO': wsgi_str("/café/", encoding='iso-8859-1'), + 'REQUEST_METHOD': 'get', + 'wsgi.input': BytesIO(b''), + }) + # Since it's impossible to decide the (wrong) encoding of the URL, it's + # left percent-encoded in the path. + self.assertEqual(request.path, "/caf%E9/") + + def test_limited_stream(self): + # Read all of a limited stream + stream = LimitedStream(BytesIO(b'test'), 2) + self.assertEqual(stream.read(), b'te') + # Reading again returns nothing. + self.assertEqual(stream.read(), b'') + + # Read a number of characters greater than the stream has to offer + stream = LimitedStream(BytesIO(b'test'), 2) + self.assertEqual(stream.read(5), b'te') + # Reading again returns nothing. + self.assertEqual(stream.readline(5), b'') + + # Read sequentially from a stream + stream = LimitedStream(BytesIO(b'12345678'), 8) + self.assertEqual(stream.read(5), b'12345') + self.assertEqual(stream.read(5), b'678') + # Reading again returns nothing. + self.assertEqual(stream.readline(5), b'') + + # Read lines from a stream + stream = LimitedStream(BytesIO(b'1234\n5678\nabcd\nefgh\nijkl'), 24) + # Read a full line, unconditionally + self.assertEqual(stream.readline(), b'1234\n') + # Read a number of characters less than a line + self.assertEqual(stream.readline(2), b'56') + # Read the rest of the partial line + self.assertEqual(stream.readline(), b'78\n') + # Read a full line, with a character limit greater than the line length + self.assertEqual(stream.readline(6), b'abcd\n') + # Read the next line, deliberately terminated at the line end + self.assertEqual(stream.readline(4), b'efgh') + # Read the next line... just the line end + self.assertEqual(stream.readline(), b'\n') + # Read everything else. + self.assertEqual(stream.readline(), b'ijkl') + + # Regression for #15018 + # If a stream contains a newline, but the provided length + # is less than the number of provided characters, the newline + # doesn't reset the available character count + stream = LimitedStream(BytesIO(b'1234\nabcdef'), 9) + self.assertEqual(stream.readline(10), b'1234\n') + self.assertEqual(stream.readline(3), b'abc') + # Now expire the available characters + self.assertEqual(stream.readline(3), b'd') + # Reading again returns nothing. + self.assertEqual(stream.readline(2), b'') + + # Same test, but with read, not readline. + stream = LimitedStream(BytesIO(b'1234\nabcdef'), 9) + self.assertEqual(stream.read(6), b'1234\na') + self.assertEqual(stream.read(2), b'bc') + self.assertEqual(stream.read(2), b'd') + self.assertEqual(stream.read(2), b'') + self.assertEqual(stream.read(), b'') + + def test_stream(self): + payload = FakePayload('name=value') + request = WSGIRequest({ + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': 'application/x-www-form-urlencoded', + 'CONTENT_LENGTH': len(payload), + 'wsgi.input': payload}, + ) + self.assertEqual(request.read(), b'name=value') + + def test_read_after_value(self): + """ + Reading from request is allowed after accessing request contents as + POST or body. + """ + payload = FakePayload('name=value') + request = WSGIRequest({ + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': 'application/x-www-form-urlencoded', + 'CONTENT_LENGTH': len(payload), + 'wsgi.input': payload, + }) + self.assertEqual(request.POST, {'name': ['value']}) + self.assertEqual(request.body, b'name=value') + self.assertEqual(request.read(), b'name=value') + + def test_value_after_read(self): + """ + Construction of POST or body is not allowed after reading + from request. + """ + payload = FakePayload('name=value') + request = WSGIRequest({ + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': 'application/x-www-form-urlencoded', + 'CONTENT_LENGTH': len(payload), + 'wsgi.input': payload, + }) + self.assertEqual(request.read(2), b'na') + with self.assertRaises(RawPostDataException): + request.body + self.assertEqual(request.POST, {}) + + def test_non_ascii_POST(self): + payload = FakePayload(urlencode({'key': 'España'})) + request = WSGIRequest({ + 'REQUEST_METHOD': 'POST', + 'CONTENT_LENGTH': len(payload), + 'CONTENT_TYPE': 'application/x-www-form-urlencoded', + 'wsgi.input': payload, + }) + self.assertEqual(request.POST, {'key': ['España']}) + + def test_alternate_charset_POST(self): + """ + Test a POST with non-utf-8 payload encoding. + """ + payload = FakePayload(urlencode({'key': 'España'.encode('latin-1')})) + request = WSGIRequest({ + 'REQUEST_METHOD': 'POST', + 'CONTENT_LENGTH': len(payload), + 'CONTENT_TYPE': 'application/x-www-form-urlencoded; charset=iso-8859-1', + 'wsgi.input': payload, + }) + self.assertEqual(request.POST, {'key': ['España']}) + + def test_body_after_POST_multipart_form_data(self): + """ + Reading body after parsing multipart/form-data is not allowed + """ + # Because multipart is used for large amounts of data i.e. file uploads, + # we don't want the data held in memory twice, and we don't want to + # silence the error by setting body = '' either. + payload = FakePayload("\r\n".join([ + '--boundary', + 'Content-Disposition: form-data; name="name"', + '', + 'value', + '--boundary--' + ''])) + request = WSGIRequest({ + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': 'multipart/form-data; boundary=boundary', + 'CONTENT_LENGTH': len(payload), + 'wsgi.input': payload, + }) + self.assertEqual(request.POST, {'name': ['value']}) + with self.assertRaises(RawPostDataException): + request.body + + def test_body_after_POST_multipart_related(self): + """ + Reading body after parsing multipart that isn't form-data is allowed + """ + # Ticket #9054 + # There are cases in which the multipart data is related instead of + # being a binary upload, in which case it should still be accessible + # via body. + payload_data = b"\r\n".join([ + b'--boundary', + b'Content-ID: id; name="name"', + b'', + b'value', + b'--boundary--' + b'']) + payload = FakePayload(payload_data) + request = WSGIRequest({ + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': 'multipart/related; boundary=boundary', + 'CONTENT_LENGTH': len(payload), + 'wsgi.input': payload, + }) + self.assertEqual(request.POST, {}) + self.assertEqual(request.body, payload_data) + + def test_POST_multipart_with_content_length_zero(self): + """ + Multipart POST requests with Content-Length >= 0 are valid and need to be handled. + """ + # According to: + # https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.13 + # Every request.POST with Content-Length >= 0 is a valid request, + # this test ensures that we handle Content-Length == 0. + payload = FakePayload("\r\n".join([ + '--boundary', + 'Content-Disposition: form-data; name="name"', + '', + 'value', + '--boundary--' + ''])) + request = WSGIRequest({ + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': 'multipart/form-data; boundary=boundary', + 'CONTENT_LENGTH': 0, + 'wsgi.input': payload, + }) + self.assertEqual(request.POST, {}) + + def test_POST_binary_only(self): + payload = b'\r\n\x01\x00\x00\x00ab\x00\x00\xcd\xcc,@' + environ = { + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': 'application/octet-stream', + 'CONTENT_LENGTH': len(payload), + 'wsgi.input': BytesIO(payload), + } + request = WSGIRequest(environ) + self.assertEqual(request.POST, {}) + self.assertEqual(request.FILES, {}) + self.assertEqual(request.body, payload) + + # Same test without specifying content-type + environ.update({'CONTENT_TYPE': '', 'wsgi.input': BytesIO(payload)}) + request = WSGIRequest(environ) + self.assertEqual(request.POST, {}) + self.assertEqual(request.FILES, {}) + self.assertEqual(request.body, payload) + + def test_read_by_lines(self): + payload = FakePayload('name=value') + request = WSGIRequest({ + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': 'application/x-www-form-urlencoded', + 'CONTENT_LENGTH': len(payload), + 'wsgi.input': payload, + }) + self.assertEqual(list(request), [b'name=value']) + + def test_POST_after_body_read(self): + """ + POST should be populated even if body is read first + """ + payload = FakePayload('name=value') + request = WSGIRequest({ + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': 'application/x-www-form-urlencoded', + 'CONTENT_LENGTH': len(payload), + 'wsgi.input': payload, + }) + request.body # evaluate + self.assertEqual(request.POST, {'name': ['value']}) + + def test_POST_after_body_read_and_stream_read(self): + """ + POST should be populated even if body is read first, and then + the stream is read second. + """ + payload = FakePayload('name=value') + request = WSGIRequest({ + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': 'application/x-www-form-urlencoded', + 'CONTENT_LENGTH': len(payload), + 'wsgi.input': payload, + }) + request.body # evaluate + self.assertEqual(request.read(1), b'n') + self.assertEqual(request.POST, {'name': ['value']}) + + def test_POST_after_body_read_and_stream_read_multipart(self): + """ + POST should be populated even if body is read first, and then + the stream is read second. Using multipart/form-data instead of urlencoded. + """ + payload = FakePayload("\r\n".join([ + '--boundary', + 'Content-Disposition: form-data; name="name"', + '', + 'value', + '--boundary--' + ''])) + request = WSGIRequest({ + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': 'multipart/form-data; boundary=boundary', + 'CONTENT_LENGTH': len(payload), + 'wsgi.input': payload, + }) + request.body # evaluate + # Consume enough data to mess up the parsing: + self.assertEqual(request.read(13), b'--boundary\r\nC') + self.assertEqual(request.POST, {'name': ['value']}) + + def test_POST_immutable_for_multipart(self): + """ + MultiPartParser.parse() leaves request.POST immutable. + """ + payload = FakePayload("\r\n".join([ + '--boundary', + 'Content-Disposition: form-data; name="name"', + '', + 'value', + '--boundary--', + ])) + request = WSGIRequest({ + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': 'multipart/form-data; boundary=boundary', + 'CONTENT_LENGTH': len(payload), + 'wsgi.input': payload, + }) + self.assertFalse(request.POST._mutable) + + def test_multipart_without_boundary(self): + request = WSGIRequest({ + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': 'multipart/form-data;', + 'CONTENT_LENGTH': 0, + 'wsgi.input': FakePayload(), + }) + with self.assertRaisesMessage(MultiPartParserError, 'Invalid boundary in multipart: None'): + request.POST + + def test_multipart_non_ascii_content_type(self): + request = WSGIRequest({ + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': 'multipart/form-data; boundary = \xe0', + 'CONTENT_LENGTH': 0, + 'wsgi.input': FakePayload(), + }) + msg = 'Invalid non-ASCII Content-Type in multipart: multipart/form-data; boundary = à' + with self.assertRaisesMessage(MultiPartParserError, msg): + request.POST + + def test_POST_connection_error(self): + """ + If wsgi.input.read() raises an exception while trying to read() the + POST, the exception is identifiable (not a generic OSError). + """ + class ExplodingBytesIO(BytesIO): + def read(self, len=0): + raise OSError('kaboom!') + + payload = b'name=value' + request = WSGIRequest({ + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': 'application/x-www-form-urlencoded', + 'CONTENT_LENGTH': len(payload), + 'wsgi.input': ExplodingBytesIO(payload), + }) + with self.assertRaises(UnreadablePostError): + request.body + + def test_set_encoding_clears_POST(self): + payload = FakePayload('name=Hello Günter') + request = WSGIRequest({ + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': 'application/x-www-form-urlencoded', + 'CONTENT_LENGTH': len(payload), + 'wsgi.input': payload, + }) + self.assertEqual(request.POST, {'name': ['Hello Günter']}) + request.encoding = 'iso-8859-16' + self.assertEqual(request.POST, {'name': ['Hello GĂŒnter']}) + + def test_set_encoding_clears_GET(self): + request = WSGIRequest({ + 'REQUEST_METHOD': 'GET', + 'wsgi.input': '', + 'QUERY_STRING': 'name=Hello%20G%C3%BCnter', + }) + self.assertEqual(request.GET, {'name': ['Hello Günter']}) + request.encoding = 'iso-8859-16' + self.assertEqual(request.GET, {'name': ['Hello G\u0102\u0152nter']}) + + def test_FILES_connection_error(self): + """ + If wsgi.input.read() raises an exception while trying to read() the + FILES, the exception is identifiable (not a generic OSError). + """ + class ExplodingBytesIO(BytesIO): + def read(self, len=0): + raise OSError('kaboom!') + + payload = b'x' + request = WSGIRequest({ + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': 'multipart/form-data; boundary=foo_', + 'CONTENT_LENGTH': len(payload), + 'wsgi.input': ExplodingBytesIO(payload), + }) + with self.assertRaises(UnreadablePostError): + request.FILES + + @override_settings(ALLOWED_HOSTS=['example.com']) + def test_get_raw_uri(self): + factory = RequestFactory(HTTP_HOST='evil.com') + request = factory.get('////absolute-uri') + self.assertEqual(request.get_raw_uri(), 'http://evil.com//absolute-uri') + + request = factory.get('/?foo=bar') + self.assertEqual(request.get_raw_uri(), 'http://evil.com/?foo=bar') + + request = factory.get('/path/with:colons') + self.assertEqual(request.get_raw_uri(), 'http://evil.com/path/with:colons') + + +class HostValidationTests(SimpleTestCase): + poisoned_hosts = [ + 'example.com@evil.tld', + 'example.com:dr.frankenstein@evil.tld', + 'example.com:dr.frankenstein@evil.tld:80', + 'example.com:80/badpath', + 'example.com: recovermypassword.com', + ] + + @override_settings( + USE_X_FORWARDED_HOST=False, + ALLOWED_HOSTS=[ + 'forward.com', 'example.com', 'internal.com', '12.34.56.78', + '[2001:19f0:feee::dead:beef:cafe]', 'xn--4ca9at.com', + '.multitenant.com', 'INSENSITIVE.com', '[::ffff:169.254.169.254]', + ]) + def test_http_get_host(self): + # Check if X_FORWARDED_HOST is provided. + request = HttpRequest() + request.META = { + 'HTTP_X_FORWARDED_HOST': 'forward.com', + 'HTTP_HOST': 'example.com', + 'SERVER_NAME': 'internal.com', + 'SERVER_PORT': 80, + } + # X_FORWARDED_HOST is ignored. + self.assertEqual(request.get_host(), 'example.com') + + # Check if X_FORWARDED_HOST isn't provided. + request = HttpRequest() + request.META = { + 'HTTP_HOST': 'example.com', + 'SERVER_NAME': 'internal.com', + 'SERVER_PORT': 80, + } + self.assertEqual(request.get_host(), 'example.com') + + # Check if HTTP_HOST isn't provided. + request = HttpRequest() + request.META = { + 'SERVER_NAME': 'internal.com', + 'SERVER_PORT': 80, + } + self.assertEqual(request.get_host(), 'internal.com') + + # Check if HTTP_HOST isn't provided, and we're on a nonstandard port + request = HttpRequest() + request.META = { + 'SERVER_NAME': 'internal.com', + 'SERVER_PORT': 8042, + } + self.assertEqual(request.get_host(), 'internal.com:8042') + + legit_hosts = [ + 'example.com', + 'example.com:80', + '12.34.56.78', + '12.34.56.78:443', + '[2001:19f0:feee::dead:beef:cafe]', + '[2001:19f0:feee::dead:beef:cafe]:8080', + 'xn--4ca9at.com', # Punycode for öäü.com + 'anything.multitenant.com', + 'multitenant.com', + 'insensitive.com', + 'example.com.', + 'example.com.:80', + '[::ffff:169.254.169.254]', + ] + + for host in legit_hosts: + request = HttpRequest() + request.META = { + 'HTTP_HOST': host, + } + request.get_host() + + # Poisoned host headers are rejected as suspicious + for host in chain(self.poisoned_hosts, ['other.com', 'example.com..']): + with self.assertRaises(DisallowedHost): + request = HttpRequest() + request.META = { + 'HTTP_HOST': host, + } + request.get_host() + + @override_settings(USE_X_FORWARDED_HOST=True, ALLOWED_HOSTS=['*']) + def test_http_get_host_with_x_forwarded_host(self): + # Check if X_FORWARDED_HOST is provided. + request = HttpRequest() + request.META = { + 'HTTP_X_FORWARDED_HOST': 'forward.com', + 'HTTP_HOST': 'example.com', + 'SERVER_NAME': 'internal.com', + 'SERVER_PORT': 80, + } + # X_FORWARDED_HOST is obeyed. + self.assertEqual(request.get_host(), 'forward.com') + + # Check if X_FORWARDED_HOST is provided with a port. + request = HttpRequest() + request.META = { + 'HTTP_X_FORWARDED_HOST': 'forward.com:8080', + 'HTTP_HOST': 'example.com', + 'SERVER_NAME': 'internal.com', + 'SERVER_PORT': 80, + } + # X_FORWARDED_HOST is obeyed. + self.assertEqual(request.get_host(), 'forward.com:8080') + + # Check if X_FORWARDED_HOST isn't provided. + request = HttpRequest() + request.META = { + 'HTTP_HOST': 'example.com', + 'SERVER_NAME': 'internal.com', + 'SERVER_PORT': 80, + } + self.assertEqual(request.get_host(), 'example.com') + + # Check if HTTP_HOST isn't provided. + request = HttpRequest() + request.META = { + 'SERVER_NAME': 'internal.com', + 'SERVER_PORT': 80, + } + self.assertEqual(request.get_host(), 'internal.com') + + # Check if HTTP_HOST isn't provided, and we're on a nonstandard port + request = HttpRequest() + request.META = { + 'SERVER_NAME': 'internal.com', + 'SERVER_PORT': 8042, + } + self.assertEqual(request.get_host(), 'internal.com:8042') + + # Poisoned host headers are rejected as suspicious + legit_hosts = [ + 'example.com', + 'example.com:80', + '12.34.56.78', + '12.34.56.78:443', + '[2001:19f0:feee::dead:beef:cafe]', + '[2001:19f0:feee::dead:beef:cafe]:8080', + 'xn--4ca9at.com', # Punycode for öäü.com + ] + + for host in legit_hosts: + request = HttpRequest() + request.META = { + 'HTTP_HOST': host, + } + request.get_host() + + for host in self.poisoned_hosts: + with self.assertRaises(DisallowedHost): + request = HttpRequest() + request.META = { + 'HTTP_HOST': host, + } + request.get_host() + + @override_settings( + USE_X_FORWARDED_PORT=True, + USE_X_FORWARDED_HOST=False, + ALLOWED_HOSTS=['*']) + def test_get_host_with_use_x_forwarded_port_and_http_host(self): + request = HttpRequest() + request.META = { + 'HTTP_HOST': 'original.com', + 'SERVER_PORT': '8000', + 'HTTP_X_FORWARDED_HOST': 'example.com', + 'HTTP_X_FORWARDED_PORT': '8080', + } + # Should NOT use the X-Forwarded-Port header + self.assertEqual(request.get_host(), 'original.com:8080') + self.assertEqual(request.get_port(), '8080') + + @override_settings(USE_X_FORWARDED_PORT=False) + def test_get_port(self): + request = HttpRequest() + request.META = { + 'SERVER_NAME': 'testserver', + 'SERVER_PORT': '8080', + 'HTTP_X_FORWARDED_PORT': '80', + } + # Shouldn't use the X-Forwarded-Port header + self.assertEqual(request.get_port(), '8080') + + request = HttpRequest() + request.META = { + 'SERVER_NAME': 'testserver', + 'SERVER_PORT': '8080', + } + self.assertEqual(request.get_port(), '8080') + + @override_settings(USE_X_FORWARDED_PORT=True) + def test_get_port_with_x_forwarded_port(self): + request = HttpRequest() + request.META = { + 'SERVER_NAME': 'testserver', + 'SERVER_PORT': '8080', + 'HTTP_X_FORWARDED_PORT': '80', + } + # Should use the X-Forwarded-Port header + self.assertEqual(request.get_port(), '80') + + request = HttpRequest() + request.META = { + 'SERVER_NAME': 'testserver', + 'SERVER_PORT': '8080', + } + self.assertEqual(request.get_port(), '8080') + + @override_settings(USE_X_FORWARDED_HOST=True) + def test_get_port_with_x_forwarded_host(self): + request = HttpRequest() + request.META = { + 'SERVER_NAME': 'testserver', + 'SERVER_PORT': '8080', + 'HTTP_X_FORWARDED_HOST': 'example.com:8000', + } + # Should use the X-Forwarded-Host header + self.assertEqual(request.get_port(), '8000') + + request = HttpRequest() + request.META = { + 'SERVER_NAME': 'testserver', + 'SERVER_PORT': '8080', + } + self.assertEqual(request.get_port(), '8080') + + @override_settings(USE_X_FORWARDED_HOST=True) + def test_get_port_with_x_forwarded_host_ipv6(self): + request = HttpRequest() + request.META = { + 'SERVER_NAME': 'testserver', + 'SERVER_PORT': '8080', + 'HTTP_X_FORWARDED_HOST': '[2001:19f0:feee::dead:beef:cafe]:8000', + } + # Should use the X-Forwarded-Host header + self.assertEqual(request.get_port(), '8000') + + request = HttpRequest() + request.META = { + 'SERVER_NAME': 'testserver', + 'SERVER_PORT': '8080', + } + self.assertEqual(request.get_port(), '8080') + + @override_settings(USE_X_FORWARDED_HOST=True, + USE_X_FORWARDED_PORT=True) + def test_get_port_with_x_forwarded_host_and_port(self): + request = HttpRequest() + request.META = { + 'SERVER_NAME': 'testserver', + 'SERVER_PORT': '8080', + 'HTTP_X_FORWARDED_HOST': 'example.com', + 'HTTP_X_FORWARDED_PORT': '8010', + } + # Should use the X-Forwarded-Port header + self.assertEqual(request.get_port(), '8010') + + request = HttpRequest() + request.META = { + 'SERVER_NAME': 'testserver', + 'SERVER_PORT': '8080', + } + self.assertEqual(request.get_port(), '8080') + + @override_settings( + USE_X_FORWARDED_PORT=True, + USE_X_FORWARDED_HOST=True, + ALLOWED_HOSTS=['*']) + def test_get_host_with_x_forwarded_port(self): + request = HttpRequest() + request.META = { + 'SERVER_PORT': '80', + 'HTTP_X_FORWARDED_HOST': 'example.com', + 'HTTP_X_FORWARDED_PORT': '8080', + } + # Should use the X-Forwarded-Port header + self.assertEqual(request.get_host(), 'example.com:8080') + + @override_settings( + USE_X_FORWARDED_PORT=True, + USE_X_FORWARDED_HOST=True, + ALLOWED_HOSTS=['*']) + def test_get_host_with_use_x_forwarded_port_and_port_in_host(self): + request = HttpRequest() + request.META = { + 'SERVER_PORT': '80', + 'HTTP_X_FORWARDED_HOST': 'example.com:8081', + 'HTTP_X_FORWARDED_PORT': '8080', + } + # Should use the X-Forwarded-Port header + with self.assertRaises(ImproperlyConfigured): + request.get_host() + + @override_settings(DEBUG=True, ALLOWED_HOSTS=[]) + def test_host_validation_in_debug_mode(self): + """ + If ALLOWED_HOSTS is empty and DEBUG is True, variants of localhost are + allowed. + """ + valid_hosts = ['localhost', 'subdomain.localhost', '127.0.0.1', '[::1]'] + for host in valid_hosts: + request = HttpRequest() + request.META = {'HTTP_HOST': host} + self.assertEqual(request.get_host(), host) + + # Other hostnames raise a DisallowedHost. + with self.assertRaises(DisallowedHost): + request = HttpRequest() + request.META = {'HTTP_HOST': 'example.com'} + request.get_host() + + @override_settings(ALLOWED_HOSTS=[]) + def test_get_host_suggestion_of_allowed_host(self): + """get_host() makes helpful suggestions if a valid-looking host is not in ALLOWED_HOSTS.""" + msg_invalid_host = "Invalid HTTP_HOST header: %r." + msg_suggestion = msg_invalid_host + " You may need to add %r to ALLOWED_HOSTS." + msg_suggestion2 = msg_invalid_host + " The domain name provided is not valid according to RFC 1034/1035" + + for host in [ # Valid-looking hosts + 'example.com', + '12.34.56.78', + '[2001:19f0:feee::dead:beef:cafe]', + 'xn--4ca9at.com', # Punycode for öäü.com + ]: + request = HttpRequest() + request.META = {'HTTP_HOST': host} + with self.assertRaisesMessage(DisallowedHost, msg_suggestion % (host, host)): + request.get_host() + + for domain, port in [ # Valid-looking hosts with a port number + ('example.com', 80), + ('12.34.56.78', 443), + ('[2001:19f0:feee::dead:beef:cafe]', 8080), + ]: + host = '%s:%s' % (domain, port) + request = HttpRequest() + request.META = {'HTTP_HOST': host} + with self.assertRaisesMessage(DisallowedHost, msg_suggestion % (host, domain)): + request.get_host() + + for host in self.poisoned_hosts: + request = HttpRequest() + request.META = {'HTTP_HOST': host} + with self.assertRaisesMessage(DisallowedHost, msg_invalid_host % host): + request.get_host() + + request = HttpRequest() + request.META = {'HTTP_HOST': "invalid_hostname.com"} + with self.assertRaisesMessage(DisallowedHost, msg_suggestion2 % "invalid_hostname.com"): + request.get_host() + + def test_split_domain_port_removes_trailing_dot(self): + domain, port = split_domain_port('example.com.:8080') + self.assertEqual(domain, 'example.com') + self.assertEqual(port, '8080') + + +class BuildAbsoluteURITests(SimpleTestCase): + factory = RequestFactory() + + def test_absolute_url(self): + request = HttpRequest() + url = 'https://www.example.com/asdf' + self.assertEqual(request.build_absolute_uri(location=url), url) + + def test_host_retrieval(self): + request = HttpRequest() + request.get_host = lambda: 'www.example.com' + request.path = '' + self.assertEqual( + request.build_absolute_uri(location='/path/with:colons'), + 'http://www.example.com/path/with:colons' + ) + + def test_request_path_begins_with_two_slashes(self): + # //// creates a request with a path beginning with // + request = self.factory.get('////absolute-uri') + tests = ( + # location isn't provided + (None, 'http://testserver//absolute-uri'), + # An absolute URL + ('http://example.com/?foo=bar', 'http://example.com/?foo=bar'), + # A schema-relative URL + ('//example.com/?foo=bar', 'http://example.com/?foo=bar'), + # Relative URLs + ('/foo/bar/', 'http://testserver/foo/bar/'), + ('/foo/./bar/', 'http://testserver/foo/bar/'), + ('/foo/../bar/', 'http://testserver/bar/'), + ('///foo/bar/', 'http://testserver/foo/bar/'), + ) + for location, expected_url in tests: + with self.subTest(location=location): + self.assertEqual(request.build_absolute_uri(location=location), expected_url) + + +class RequestHeadersTests(SimpleTestCase): + ENVIRON = { + # Non-headers are ignored. + 'PATH_INFO': '/somepath/', + 'REQUEST_METHOD': 'get', + 'wsgi.input': BytesIO(b''), + 'SERVER_NAME': 'internal.com', + 'SERVER_PORT': 80, + # These non-HTTP prefixed headers are included. + 'CONTENT_TYPE': 'text/html', + 'CONTENT_LENGTH': '100', + # All HTTP-prefixed headers are included. + 'HTTP_ACCEPT': '*', + 'HTTP_HOST': 'example.com', + 'HTTP_USER_AGENT': 'python-requests/1.2.0', + } + + def test_base_request_headers(self): + request = HttpRequest() + request.META = self.ENVIRON + self.assertEqual(dict(request.headers), { + 'Content-Type': 'text/html', + 'Content-Length': '100', + 'Accept': '*', + 'Host': 'example.com', + 'User-Agent': 'python-requests/1.2.0', + }) + + def test_wsgi_request_headers(self): + request = WSGIRequest(self.ENVIRON) + self.assertEqual(dict(request.headers), { + 'Content-Type': 'text/html', + 'Content-Length': '100', + 'Accept': '*', + 'Host': 'example.com', + 'User-Agent': 'python-requests/1.2.0', + }) + + def test_wsgi_request_headers_getitem(self): + request = WSGIRequest(self.ENVIRON) + self.assertEqual(request.headers['User-Agent'], 'python-requests/1.2.0') + self.assertEqual(request.headers['user-agent'], 'python-requests/1.2.0') + self.assertEqual(request.headers['user_agent'], 'python-requests/1.2.0') + self.assertEqual(request.headers['Content-Type'], 'text/html') + self.assertEqual(request.headers['Content-Length'], '100') + + def test_wsgi_request_headers_get(self): + request = WSGIRequest(self.ENVIRON) + self.assertEqual(request.headers.get('User-Agent'), 'python-requests/1.2.0') + self.assertEqual(request.headers.get('user-agent'), 'python-requests/1.2.0') + self.assertEqual(request.headers.get('Content-Type'), 'text/html') + self.assertEqual(request.headers.get('Content-Length'), '100') + + +class HttpHeadersTests(SimpleTestCase): + def test_basic(self): + environ = { + 'CONTENT_TYPE': 'text/html', + 'CONTENT_LENGTH': '100', + 'HTTP_HOST': 'example.com', + } + headers = HttpHeaders(environ) + self.assertEqual(sorted(headers), ['Content-Length', 'Content-Type', 'Host']) + self.assertEqual(headers, { + 'Content-Type': 'text/html', + 'Content-Length': '100', + 'Host': 'example.com', + }) + + def test_parse_header_name(self): + tests = ( + ('PATH_INFO', None), + ('HTTP_ACCEPT', 'Accept'), + ('HTTP_USER_AGENT', 'User-Agent'), + ('HTTP_X_FORWARDED_PROTO', 'X-Forwarded-Proto'), + ('CONTENT_TYPE', 'Content-Type'), + ('CONTENT_LENGTH', 'Content-Length'), + ) + for header, expected in tests: + with self.subTest(header=header): + self.assertEqual(HttpHeaders.parse_header_name(header), expected) diff --git a/tests/sites_tests/tests.py b/tests/sites_tests/tests.py index 4f5b07ee8f..b8716227d7 100644 --- a/tests/sites_tests/tests.py +++ b/tests/sites_tests/tests.py @@ -106,37 +106,42 @@ class SitesFrameworkTests(TestCase): @override_settings(SITE_ID=None, ALLOWED_HOSTS=["example.com", "example.net"]) def test_get_current_site_no_site_id_and_handle_port_fallback(self): - request = HttpRequest() s1 = self.site s2 = Site.objects.create(domain="example.com:80", name="example.com:80") # Host header without port - request.META = {"HTTP_HOST": "example.com"} + request = HttpRequest() + request.META = {'HTTP_HOST': 'example.com'} site = get_current_site(request) self.assertEqual(site, s1) # Host header with port - match, no fallback without port - request.META = {"HTTP_HOST": "example.com:80"} + request = HttpRequest() + request.META = {'HTTP_HOST': 'example.com:80'} site = get_current_site(request) self.assertEqual(site, s2) # Host header with port - no match, fallback without port - request.META = {"HTTP_HOST": "example.com:81"} + request = HttpRequest() + request.META = {'HTTP_HOST': 'example.com:81'} site = get_current_site(request) self.assertEqual(site, s1) # Host header with non-matching domain - request.META = {"HTTP_HOST": "example.net"} + request = HttpRequest() + request.META = {'HTTP_HOST': 'example.net'} with self.assertRaises(ObjectDoesNotExist): get_current_site(request) # Ensure domain for RequestSite always matches host header - with self.modify_settings(INSTALLED_APPS={"remove": "django.contrib.sites"}): - request.META = {"HTTP_HOST": "example.com"} + with self.modify_settings(INSTALLED_APPS={'remove': 'django.contrib.sites'}): + request = HttpRequest() + request.META = {'HTTP_HOST': 'example.com'} site = get_current_site(request) self.assertEqual(site.name, "example.com") - request.META = {"HTTP_HOST": "example.com:80"} + request = HttpRequest() + request.META = {'HTTP_HOST': 'example.com:80'} site = get_current_site(request) self.assertEqual(site.name, "example.com:80")