1
0
mirror of https://github.com/django/django.git synced 2025-01-07 08:56:32 +00:00
django/tests/asgi/urls.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

71 lines
1.6 KiB
Python
Raw Normal View History

import asyncio
import threading
import time
from django.http import FileResponse, HttpResponse, StreamingHttpResponse
from django.urls import path
from django.views.decorators.csrf import csrf_exempt
def hello(request):
name = request.GET.get("name") or "World"
return HttpResponse("Hello %s!" % name)
def hello_with_delay(request):
name = request.GET.get("name") or "World"
time.sleep(1)
return HttpResponse(f"Hello {name}!")
def hello_meta(request):
return HttpResponse(
"From %s" % request.META.get("HTTP_REFERER") or "",
content_type=request.META.get("CONTENT_TYPE"),
)
def sync_waiter(request):
with sync_waiter.lock:
sync_waiter.active_threads.add(threading.current_thread())
sync_waiter.barrier.wait(timeout=0.5)
return hello(request)
@csrf_exempt
def post_echo(request):
if request.GET.get("echo"):
return HttpResponse(request.body)
else:
return HttpResponse(status=204)
sync_waiter.active_threads = set()
sync_waiter.lock = threading.Lock()
sync_waiter.barrier = threading.Barrier(2)
async def streaming_inner(sleep_time):
yield b"first\n"
await asyncio.sleep(sleep_time)
yield b"last\n"
async def streaming_view(request):
sleep_time = float(request.GET["sleep"])
return StreamingHttpResponse(streaming_inner(sleep_time))
test_filename = __file__
urlpatterns = [
path("", hello),
path("file/", lambda x: FileResponse(open(test_filename, "rb"))),
path("meta/", hello_meta),
path("post/", post_echo),
path("wait/", sync_waiter),
path("delayed_hello/", hello_with_delay),
path("streaming/", streaming_view),
]