1
0
mirror of https://github.com/django/django.git synced 2025-03-25 08:40:45 +00:00

Fixed #35945 -- Added async interface to Paginator.

This commit is contained in:
wookkl 2025-03-12 00:53:04 +09:00 committed by Sarah Boyce
parent 0ebea6e5c0
commit 2ae3044d9d
6 changed files with 784 additions and 116 deletions

View File

@ -500,6 +500,7 @@ answer newbie questions, and generally made Django that much better:
Jens Page
Jensen Cochran <jensen.cochran@gmail.com>
Jeong-Min Lee <falsetru@gmail.com>
Jeong-Wook Lee <devwookkl@gmail.com>
Jérémie Blaser <blaserje@gmail.com>
Jeremy Bowman <https://github.com/jmbowman>
Jeremy Carbaugh <jcarbaugh@gmail.com>

View File

@ -1,8 +1,11 @@
import collections.abc
import inspect
import warnings
from asyncio import iscoroutinefunction
from math import ceil
from asgiref.sync import sync_to_async
from django.utils.functional import cached_property
from django.utils.inspect import method_has_no_args
from django.utils.translation import gettext_lazy as _
@ -24,7 +27,7 @@ class EmptyPage(InvalidPage):
pass
class Paginator:
class BasePaginator:
# Translators: String used to replace omitted page numbers in elided page
# range generated by paginators, e.g. [1, 2, '…', 5, 6, 7, '…', 9, 10].
ELLIPSIS = _("")
@ -53,11 +56,74 @@ class Paginator:
else self.default_error_messages | error_messages
)
def __iter__(self):
for page_number in self.page_range:
yield self.page(page_number)
def _check_object_list_is_ordered(self):
"""
Warn if self.object_list is unordered (typically a QuerySet).
"""
ordered = getattr(self.object_list, "ordered", None)
if ordered is not None and not ordered:
obj_list_repr = (
"{} {}".format(
self.object_list.model, self.object_list.__class__.__name__
)
if hasattr(self.object_list, "model")
else "{!r}".format(self.object_list)
)
warnings.warn(
"Pagination may yield inconsistent results with an unordered "
"object_list: {}.".format(obj_list_repr),
UnorderedObjectListWarning,
stacklevel=3,
)
def validate_number(self, number):
def _get_elided_page_range(
self, number, num_pages, page_range, on_each_side=3, on_ends=2
):
"""
Return a 1-based range of pages with some values elided.
If the page range is larger than a given size, the whole range is not
provided and a compact form is returned instead, e.g. for a paginator
with 50 pages, if page 43 were the current page, the output, with the
default arguments, would be:
1, 2, , 40, 41, 42, 43, 44, 45, 46, , 49, 50.
"""
if num_pages <= (on_each_side + on_ends) * 2:
for page in page_range:
yield page
return
if number > (1 + on_each_side + on_ends) + 1:
for page in range(1, on_ends + 1):
yield page
yield self.ELLIPSIS
for page in range(number - on_each_side, number + 1):
yield page
else:
for page in range(1, number + 1):
yield page
if number < (num_pages - on_each_side - on_ends) - 1:
for page in range(number + 1, number + on_each_side + 1):
yield page
yield self.ELLIPSIS
for page in range(num_pages - on_ends + 1, num_pages + 1):
yield page
else:
for page in range(number + 1, num_pages + 1):
yield page
def _get_page(self, *args, **kwargs):
"""
Return an instance of a single page.
This hook can be used by subclasses to use an alternative to the
standard :cls:`Page` object.
"""
return Page(*args, **kwargs)
def _validate_number(self, number, num_pages):
"""Validate the given 1-based page number."""
try:
if isinstance(number, float) and not number.is_integer():
@ -67,10 +133,19 @@ class Paginator:
raise PageNotAnInteger(self.error_messages["invalid_page"])
if number < 1:
raise EmptyPage(self.error_messages["min_page"])
if number > self.num_pages:
if number > num_pages:
raise EmptyPage(self.error_messages["no_results"])
return number
class Paginator(BasePaginator):
def __iter__(self):
for page_number in self.page_range:
yield self.page(page_number)
def validate_number(self, number):
return self._validate_number(number, self.num_pages)
def get_page(self, number):
"""
Return a valid page, even if the page argument isn't a number or isn't
@ -93,15 +168,6 @@ class Paginator:
top = self.count
return self._get_page(self.object_list[bottom:top], number, self)
def _get_page(self, *args, **kwargs):
"""
Return an instance of a single page.
This hook can be used by subclasses to use an alternative to the
standard :cls:`Page` object.
"""
return Page(*args, **kwargs)
@cached_property
def count(self):
"""Return the total number of objects, across all pages."""
@ -126,56 +192,105 @@ class Paginator:
"""
return range(1, self.num_pages + 1)
def _check_object_list_is_ordered(self):
"""
Warn if self.object_list is unordered (typically a QuerySet).
"""
ordered = getattr(self.object_list, "ordered", None)
if ordered is not None and not ordered:
obj_list_repr = (
"{} {}".format(
self.object_list.model, self.object_list.__class__.__name__
)
if hasattr(self.object_list, "model")
else "{!r}".format(self.object_list)
)
warnings.warn(
"Pagination may yield inconsistent results with an unordered "
"object_list: {}.".format(obj_list_repr),
UnorderedObjectListWarning,
stacklevel=3,
)
def get_elided_page_range(self, number=1, *, on_each_side=3, on_ends=2):
"""
Return a 1-based range of pages with some values elided.
If the page range is larger than a given size, the whole range is not
provided and a compact form is returned instead, e.g. for a paginator
with 50 pages, if page 43 were the current page, the output, with the
default arguments, would be:
1, 2, , 40, 41, 42, 43, 44, 45, 46, , 49, 50.
"""
number = self.validate_number(number)
yield from self._get_elided_page_range(
number, self.num_pages, self.page_range, on_each_side, on_ends
)
if self.num_pages <= (on_each_side + on_ends) * 2:
yield from self.page_range
return
if number > (1 + on_each_side + on_ends) + 1:
yield from range(1, on_ends + 1)
yield self.ELLIPSIS
yield from range(number - on_each_side, number + 1)
class AsyncPaginator(BasePaginator):
def __init__(
self,
object_list,
per_page,
orphans=0,
allow_empty_first_page=True,
error_messages=None,
):
super().__init__(
object_list, per_page, orphans, allow_empty_first_page, error_messages
)
self._cache_acount = None
self._cache_anum_pages = None
async def __aiter__(self):
page_range = await self.apage_range()
for page_number in page_range:
yield await self.apage(page_number)
async def avalidate_number(self, number):
num_pages = await self.anum_pages()
return self._validate_number(number, num_pages)
async def aget_page(self, number):
"""See Paginator.get_page()."""
try:
number = await self.avalidate_number(number)
except PageNotAnInteger:
number = 1
except EmptyPage:
number = await self.anum_pages()
return await self.apage(number)
async def apage(self, number):
"""See Paginator.page()."""
number = await self.avalidate_number(number)
bottom = (number - 1) * self.per_page
top = bottom + self.per_page
count = await self.acount()
if top + self.orphans >= count:
top = count
return self._get_page(self.object_list[bottom:top], number, self)
def _get_page(self, *args, **kwargs):
return AsyncPage(*args, **kwargs)
async def acount(self):
"""See Paginator.count()."""
if self._cache_acount is not None:
return self._cache_acount
c = getattr(self.object_list, "acount", None)
if (
iscoroutinefunction(c)
and not inspect.isbuiltin(c)
and method_has_no_args(c)
):
count = await c()
else:
yield from range(1, number + 1)
count = len(self.object_list)
if number < (self.num_pages - on_each_side - on_ends) - 1:
yield from range(number + 1, number + on_each_side + 1)
yield self.ELLIPSIS
yield from range(self.num_pages - on_ends + 1, self.num_pages + 1)
else:
yield from range(number + 1, self.num_pages + 1)
self._cache_acount = count
return count
async def anum_pages(self):
"""See Paginator.num_pages()."""
if self._cache_anum_pages is not None:
return self._cache_anum_pages
count = await self.acount()
if count == 0 and not self.allow_empty_first_page:
self._cache_anum_pages = 0
return self._cache_anum_pages
hits = max(1, count - self.orphans)
num_pages = ceil(hits / self.per_page)
self._cache_anum_pages = num_pages
return num_pages
async def apage_range(self):
"""See Paginator.page_range()"""
num_pages = await self.anum_pages()
return range(1, num_pages + 1)
async def aget_elided_page_range(self, number=1, *, on_each_side=3, on_ends=2):
number = await self.avalidate_number(number)
num_pages = await self.anum_pages()
page_range = await self.apage_range()
for page in self._get_elided_page_range(
number, num_pages, page_range, on_each_side, on_ends
):
yield page
class Page(collections.abc.Sequence):
@ -236,3 +351,96 @@ class Page(collections.abc.Sequence):
if self.number == self.paginator.num_pages:
return self.paginator.count
return self.number * self.paginator.per_page
class AsyncPage:
def __init__(self, object_list, number, paginator):
self.object_list = object_list
self.number = number
self.paginator = paginator
def __repr__(self):
return "<Async Page %s>" % self.number
async def __aiter__(self):
if hasattr(self.object_list, "__aiter__"):
async for obj in self.object_list:
yield obj
else:
for obj in self.object_list:
yield obj
def __len__(self):
if not isinstance(self.object_list, list):
raise TypeError(
"AsyncPage.aget_object_list() must be awaited before calling len()."
)
return len(self.object_list)
def __reversed__(self):
if not isinstance(self.object_list, list):
raise TypeError(
"AsyncPage.aget_object_list() "
"must be awaited before calling reversed()."
)
return reversed(self.object_list)
def __getitem__(self, index):
if not isinstance(index, (int, slice)):
raise TypeError(
"AsyncPage indices must be integers or slices, not %s."
% type(index).__name__
)
if not isinstance(self.object_list, list):
raise TypeError(
"AsyncPage.aget_object_list() must be awaited before using indexing."
)
return self.object_list[index]
async def aget_object_list(self):
"""
Returns self.object_list as a list.
This method must be awaited before AsyncPage can be
treated as a sequence of self.object_list.
"""
if not isinstance(self.object_list, list):
if hasattr(self.object_list, "__aiter__"):
self.object_list = [obj async for obj in self.object_list]
else:
self.object_list = await sync_to_async(list)(self.object_list)
return self.object_list
async def ahas_next(self):
num_pages = await self.paginator.anum_pages()
return self.number < num_pages
async def ahas_previous(self):
return self.number > 1
async def ahas_other_pages(self):
has_previous = await self.ahas_previous()
has_next = await self.ahas_next()
return has_previous or has_next
async def anext_page_number(self):
return await self.paginator.avalidate_number(self.number + 1)
async def aprevious_page_number(self):
return await self.paginator.avalidate_number(self.number - 1)
async def astart_index(self):
"""See Page.start_index()."""
count = await self.paginator.acount()
if count == 0:
return 0
return (self.paginator.per_page * (self.number - 1)) + 1
async def aend_index(self):
"""See Page.end_index()."""
num_pages = await self.paginator.anum_pages()
if self.number == num_pages:
return await self.paginator.acount()
return self.number * self.paginator.per_page

View File

@ -161,6 +161,30 @@ Attributes
A 1-based range iterator of page numbers, e.g. yielding ``[1, 2, 3, 4]``.
``AsyncPaginator`` class
========================
.. versionadded:: 6.0
.. class:: AsyncPaginator(object_list, per_page, orphans=0, allow_empty_first_page=True, error_messages=None)
Asynchronous version of :class:`Paginator`.
``AsyncPaginator`` has the same attributes and signatures as
:class:`Paginator`, with the following exceptions:
* The attribute :attr:`.Paginator.count` is supported as an asynchronous
method ``AsyncPaginator.acount()``.
* The attribute :attr:`.Paginator.num_pages` is supported as an
asynchronous method ``AsyncPaginator.anum_pages()``.
* The attribute :attr:`.Paginator.page_range` is supported as an
asynchronous method ``AsyncPaginator.apage_range()``.
``AsyncPaginator`` has asynchronous versions of the same methods as
:class:`Paginator`, using an ``a`` prefix - for example, use
``await async_paginator.aget_page(number)`` rather than
``paginator.get_page(number)``.
``Page`` class
==============
@ -226,6 +250,28 @@ Attributes
The associated :class:`Paginator` object.
``AsyncPage`` class
===================
.. versionadded:: 6.0
.. class:: AsyncPage(object_list, number, paginator)
Asynchronous version of :class:`Page`.
``AsyncPage`` has the same attributes and signatures as :class:`Page`, as
well as asynchronous versions of all the same methods, using an ``a``
prefix - for example, use ``await async_page.ahas_next()`` rather than
``page.has_next()``.
``AsyncPage`` has the following additional method:
.. method:: AsyncPage.aget_object_list()
Returns ``AsyncPage.object_list`` as a list. This method must be
awaited before ``AsyncPage`` can be treated as a sequence of
``AsyncPage.object_list``.
Exceptions
==========

View File

@ -199,6 +199,14 @@ Models
:ref:`a forced update <ref-models-force-insert>` results in no affected rows,
instead of a generic :exc:`django.db.DatabaseError`.
Pagination
~~~~~~~~~~
* The new :class:`~django.core.paginator.AsyncPaginator` and
:class:`~django.core.paginator.AsyncPage` provide async implementations of
:class:`~django.core.paginator.Paginator` and
:class:`~django.core.paginator.Page` respectively.
Requests and Responses
~~~~~~~~~~~~~~~~~~~~~~

View File

@ -1,4 +1,4 @@
from django.core.paginator import Page, Paginator
from django.core.paginator import AsyncPage, AsyncPaginator, Page, Paginator
class ValidAdjacentNumsPage(Page):
@ -16,3 +16,20 @@ class ValidAdjacentNumsPage(Page):
class ValidAdjacentNumsPaginator(Paginator):
def _get_page(self, *args, **kwargs):
return ValidAdjacentNumsPage(*args, **kwargs)
class AsyncValidAdjacentNumsPage(AsyncPage):
async def anext_page_number(self):
if not await self.ahas_next():
return None
return await super().anext_page_number()
async def aprevious_page_number(self):
if not await self.ahas_previous():
return None
return await super().aprevious_page_number()
class AsyncValidAdjacentNumsPaginator(AsyncPaginator):
def _get_page(self, *args, **kwargs):
return AsyncValidAdjacentNumsPage(*args, **kwargs)

View File

@ -1,9 +1,12 @@
import collections.abc
import inspect
import unittest.mock
import warnings
from datetime import datetime
from django.core.paginator import (
AsyncPaginator,
BasePaginator,
EmptyPage,
InvalidPage,
PageNotAnInteger,
@ -12,7 +15,7 @@ from django.core.paginator import (
)
from django.test import SimpleTestCase, TestCase
from .custom import ValidAdjacentNumsPaginator
from .custom import AsyncValidAdjacentNumsPaginator, ValidAdjacentNumsPaginator
from .models import Article
@ -32,6 +35,13 @@ class PaginationTests(SimpleTestCase):
self.check_attribute("num_pages", paginator, num_pages, params)
self.check_attribute("page_range", paginator, page_range, params, coerce=list)
async def check_paginator_async(self, params, output):
"""See check_paginator."""
count, num_pages, page_range = output
paginator = AsyncPaginator(*params)
await self.check_attribute_async("acount", paginator, count, params)
await self.check_attribute_async("anum_pages", paginator, num_pages, params)
def check_attribute(self, name, paginator, expected, params, coerce=None):
"""
Helper method that checks a single attribute and gives a nice error
@ -47,14 +57,21 @@ class PaginationTests(SimpleTestCase):
% (name, expected, got, params),
)
def test_paginator(self):
"""
Tests the paginator attributes using varying inputs.
"""
async def check_attribute_async(self, name, paginator, expected, params):
"""See check_attribute."""
got = getattr(paginator, name)
self.assertEqual(
expected,
await got(),
"For '%s', expected %s but got %s. Paginator parameters were: %s"
% (name, expected, got, params),
)
def get_test_cases_for_test_paginator(self):
nine = [1, 2, 3, 4, 5, 6, 7, 8, 9]
ten = nine + [10]
eleven = ten + [11]
tests = (
return (
# Each item is 2-tuple:
# First tuple is Paginator parameters - object_list, per_page,
# orphans, and allow_empty_first_page.
@ -111,9 +128,17 @@ class PaginationTests(SimpleTestCase):
((ten, 4, "1", False), (10, 3, [1, 2, 3])),
((ten, 4, "1", False), (10, 3, [1, 2, 3])),
)
def test_paginator(self):
tests = self.get_test_cases_for_test_paginator()
for params, output in tests:
self.check_paginator(params, output)
async def test_paginator_async(self):
tests = self.get_test_cases_for_test_paginator()
for params, output in tests:
await self.check_paginator_async(params, output)
def test_invalid_page_number(self):
"""
Invalid page numbers result in the correct exception being raised.
@ -128,6 +153,12 @@ class PaginationTests(SimpleTestCase):
with self.assertRaises(PageNotAnInteger):
paginator.validate_number(1.2)
async def test_invalid_apage_number_async(self):
"""See test_invalid_page_number."""
paginator = AsyncPaginator([1, 2, 3], 2)
with self.assertRaises(InvalidPage):
await paginator.apage(3)
def test_error_messages(self):
error_messages = {
"invalid_page": "Wrong page number",
@ -186,6 +217,27 @@ class PaginationTests(SimpleTestCase):
self.assertEqual(5, paginator.num_pages)
self.assertEqual([1, 2, 3, 4, 5], list(paginator.page_range))
async def test_paginate_misc_classes_async(self):
class CountContainer:
async def acount(self):
return 42
# AsyncPaginator can be passed other objects with an acount() method.
paginator = AsyncPaginator(CountContainer(), 10)
self.assertEqual(42, await paginator.acount())
self.assertEqual(5, await paginator.anum_pages())
self.assertEqual([1, 2, 3, 4, 5], list(await paginator.apage_range()))
# AsyncPaginator can be passed other objects that implement __len__.
class LenContainer:
def __len__(self):
return 42
paginator = AsyncPaginator(LenContainer(), 10)
self.assertEqual(42, await paginator.acount())
self.assertEqual(5, await paginator.anum_pages())
self.assertEqual([1, 2, 3, 4, 5], list(await paginator.apage_range()))
def test_count_does_not_silence_attribute_error(self):
class AttributeErrorContainer:
def count(self):
@ -194,6 +246,14 @@ class PaginationTests(SimpleTestCase):
with self.assertRaisesMessage(AttributeError, "abc"):
Paginator(AttributeErrorContainer(), 10).count
async def test_acount_does_not_silence_attribute_error_async(self):
class AttributeErrorContainer:
async def acount(self):
raise AttributeError("abc")
with self.assertRaisesMessage(AttributeError, "abc"):
await AsyncPaginator(AttributeErrorContainer(), 10).acount()
def test_count_does_not_silence_type_error(self):
class TypeErrorContainer:
def count(self):
@ -202,6 +262,14 @@ class PaginationTests(SimpleTestCase):
with self.assertRaisesMessage(TypeError, "abc"):
Paginator(TypeErrorContainer(), 10).count
async def test_acount_does_not_silence_type_error_async(self):
class TypeErrorContainer:
async def acount(self):
raise TypeError("abc")
with self.assertRaisesMessage(TypeError, "abc"):
await AsyncPaginator(TypeErrorContainer(), 10).acount()
def check_indexes(self, params, page_num, indexes):
"""
Helper method that instantiates a Paginator object from the passed
@ -227,12 +295,30 @@ class PaginationTests(SimpleTestCase):
msg % ("end index", page_num, end, page.end_index(), params),
)
def test_page_indexes(self):
"""
Paginator pages have the correct start and end indexes.
"""
async def check_indexes_async(self, params, page_num, indexes):
"""See check_indexes."""
paginator = AsyncPaginator(*params)
if page_num == "first":
page_num = 1
elif page_num == "last":
page_num = await paginator.anum_pages()
page = await paginator.apage(page_num)
start, end = indexes
msg = "For %s of page %s, expected %s but got %s. Paginator parameters were: %s"
self.assertEqual(
start,
await page.astart_index(),
msg % ("start index", page_num, start, await page.astart_index(), params),
)
self.assertEqual(
end,
await page.aend_index(),
msg % ("end index", page_num, end, await page.aend_index(), params),
)
def get_test_cases_for_test_page_indexes(self):
ten = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
tests = (
return (
# Each item is 3-tuple:
# First tuple is Paginator parameters - object_list, per_page,
# orphans, and allow_empty_first_page.
@ -265,6 +351,12 @@ class PaginationTests(SimpleTestCase):
(([], 4, 1, True), (0, 0), (0, 0)),
(([], 4, 2, True), (0, 0), (0, 0)),
)
def test_page_indexes(self):
"""
Paginator pages have the correct start and end indexes.
"""
tests = self.get_test_cases_for_test_page_indexes()
for params, first, last in tests:
self.check_indexes(params, "first", first)
self.check_indexes(params, "last", last)
@ -277,6 +369,21 @@ class PaginationTests(SimpleTestCase):
with self.assertRaises(EmptyPage):
self.check_indexes(([], 4, 2, False), 1, None)
async def test_page_indexes_async(self):
"""See test_page_indexes"""
tests = self.get_test_cases_for_test_page_indexes()
for params, first, last in tests:
await self.check_indexes_async(params, "first", first)
await self.check_indexes_async(params, "last", last)
# When no items and no empty first page, we should get EmptyPage error.
with self.assertRaises(EmptyPage):
await self.check_indexes_async(([], 4, 0, False), 1, None)
with self.assertRaises(EmptyPage):
await self.check_indexes_async(([], 4, 1, False), 1, None)
with self.assertRaises(EmptyPage):
await self.check_indexes_async(([], 4, 2, False), 1, None)
def test_page_sequence(self):
"""
A paginator page acts like a standard sequence.
@ -289,6 +396,16 @@ class PaginationTests(SimpleTestCase):
self.assertEqual("".join(page2), "fghijk")
self.assertEqual("".join(reversed(page2)), "kjihgf")
async def test_page_sequence_async(self):
eleven = "abcdefghijk"
page2 = await AsyncPaginator(eleven, per_page=5, orphans=1).apage(2)
await page2.aget_object_list()
self.assertEqual(len(page2), 6)
self.assertIn("k", page2)
self.assertNotIn("a", page2)
self.assertEqual("".join(page2), "fghijk")
self.assertEqual("".join(reversed(page2)), "kjihgf")
def test_get_page_hook(self):
"""
A Paginator subclass can use the ``_get_page`` hook to
@ -303,6 +420,20 @@ class PaginationTests(SimpleTestCase):
self.assertEqual(page2.previous_page_number(), 1)
self.assertIsNone(page2.next_page_number())
async def test_get_page_hook_async(self):
"""
An AsyncPaginator subclass can use the ``_get_page`` hook to
return an alternative to the standard AsyncPage class.
"""
eleven = "abcdefghijk"
paginator = AsyncValidAdjacentNumsPaginator(eleven, per_page=6)
page1 = await paginator.apage(1)
page2 = await paginator.apage(2)
self.assertIsNone(await page1.aprevious_page_number())
self.assertEqual(await page1.anext_page_number(), 2)
self.assertEqual(await page2.aprevious_page_number(), 1)
self.assertIsNone(await page2.anext_page_number())
def test_page_range_iterator(self):
"""
Paginator.page_range should be an iterator.
@ -323,6 +454,20 @@ class PaginationTests(SimpleTestCase):
# Non-integer page returns the first page.
self.assertEqual(paginator.get_page(None).number, 1)
async def test_aget_page_async(self):
"""
AsyncPaginator.aget_page() returns a valid page even with invalid page
arguments.
"""
paginator = AsyncPaginator([1, 2, 3], 2)
page = await paginator.aget_page(1)
self.assertEqual(page.number, 1)
self.assertEqual(page.object_list, [1, 2])
# An empty page returns the last page.
self.assertEqual((await paginator.aget_page(3)).number, 2)
# Non-integer page returns the first page.
self.assertEqual((await paginator.aget_page(None)).number, 1)
def test_get_page_empty_object_list(self):
"""Paginator.get_page() with an empty object_list."""
paginator = Paginator([], 2)
@ -332,6 +477,15 @@ class PaginationTests(SimpleTestCase):
# Non-integer page returns the first page.
self.assertEqual(paginator.get_page(None).number, 1)
async def test_aget_page_empty_object_list_async(self):
"""AsyncPaginator.aget_page() with an empty object_list."""
paginator = AsyncPaginator([], 2)
# An empty page returns the last page.
self.assertEqual((await paginator.aget_page(1)).number, 1)
self.assertEqual((await paginator.aget_page(2)).number, 1)
# Non-integer page returns the first page.
self.assertEqual((await paginator.aget_page(None)).number, 1)
def test_get_page_empty_object_list_and_allow_empty_first_page_false(self):
"""
Paginator.get_page() raises EmptyPage if allow_empty_first_page=False
@ -341,6 +495,17 @@ class PaginationTests(SimpleTestCase):
with self.assertRaises(EmptyPage):
paginator.get_page(1)
async def test_aget_page_empty_obj_list_and_allow_empty_first_page_false_async(
self,
):
"""
AsyncPaginator.aget_page() raises EmptyPage if allow_empty_first_page=False
and object_list is empty.
"""
paginator = AsyncPaginator([], 2, allow_empty_first_page=False)
with self.assertRaises(EmptyPage):
await paginator.aget_page(1)
def test_paginator_iteration(self):
paginator = Paginator([1, 2, 3], 2)
page_iterator = iter(paginator)
@ -353,6 +518,66 @@ class PaginationTests(SimpleTestCase):
["<Page 1 of 2>", "<Page 2 of 2>"],
)
async def test_paginator_iteration_async(self):
paginator = AsyncPaginator([1, 2, 3], 2)
page_iterator = aiter(paginator)
for page, expected in enumerate(([1, 2], [3]), start=1):
with self.subTest(page=page):
async_page = await anext(page_iterator)
self.assertEqual(expected, [obj async for obj in async_page])
self.assertEqual(
[str(page) async for page in aiter(paginator)],
["<Async Page 1>", "<Async Page 2>"],
)
def get_test_cases_for_test_get_elided_page_range(self):
ELLIPSIS = Paginator.ELLIPSIS
return [
# on_each_side=2, on_ends=1
(1, 2, 1, [1, 2, 3, ELLIPSIS, 50]),
(4, 2, 1, [1, 2, 3, 4, 5, 6, ELLIPSIS, 50]),
(5, 2, 1, [1, 2, 3, 4, 5, 6, 7, ELLIPSIS, 50]),
(6, 2, 1, [1, ELLIPSIS, 4, 5, 6, 7, 8, ELLIPSIS, 50]),
(45, 2, 1, [1, ELLIPSIS, 43, 44, 45, 46, 47, ELLIPSIS, 50]),
(46, 2, 1, [1, ELLIPSIS, 44, 45, 46, 47, 48, 49, 50]),
(47, 2, 1, [1, ELLIPSIS, 45, 46, 47, 48, 49, 50]),
(50, 2, 1, [1, ELLIPSIS, 48, 49, 50]),
# on_each_side=1, on_ends=3
(1, 1, 3, [1, 2, ELLIPSIS, 48, 49, 50]),
(5, 1, 3, [1, 2, 3, 4, 5, 6, ELLIPSIS, 48, 49, 50]),
(6, 1, 3, [1, 2, 3, 4, 5, 6, 7, ELLIPSIS, 48, 49, 50]),
(7, 1, 3, [1, 2, 3, ELLIPSIS, 6, 7, 8, ELLIPSIS, 48, 49, 50]),
(44, 1, 3, [1, 2, 3, ELLIPSIS, 43, 44, 45, ELLIPSIS, 48, 49, 50]),
(45, 1, 3, [1, 2, 3, ELLIPSIS, 44, 45, 46, 47, 48, 49, 50]),
(46, 1, 3, [1, 2, 3, ELLIPSIS, 45, 46, 47, 48, 49, 50]),
(50, 1, 3, [1, 2, 3, ELLIPSIS, 49, 50]),
# on_each_side=4, on_ends=0
(1, 4, 0, [1, 2, 3, 4, 5, ELLIPSIS]),
(5, 4, 0, [1, 2, 3, 4, 5, 6, 7, 8, 9, ELLIPSIS]),
(6, 4, 0, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, ELLIPSIS]),
(7, 4, 0, [ELLIPSIS, 3, 4, 5, 6, 7, 8, 9, 10, 11, ELLIPSIS]),
(44, 4, 0, [ELLIPSIS, 40, 41, 42, 43, 44, 45, 46, 47, 48, ELLIPSIS]),
(45, 4, 0, [ELLIPSIS, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50]),
(46, 4, 0, [ELLIPSIS, 42, 43, 44, 45, 46, 47, 48, 49, 50]),
(50, 4, 0, [ELLIPSIS, 46, 47, 48, 49, 50]),
# on_each_side=0, on_ends=1
(1, 0, 1, [1, ELLIPSIS, 50]),
(2, 0, 1, [1, 2, ELLIPSIS, 50]),
(3, 0, 1, [1, 2, 3, ELLIPSIS, 50]),
(4, 0, 1, [1, ELLIPSIS, 4, ELLIPSIS, 50]),
(47, 0, 1, [1, ELLIPSIS, 47, ELLIPSIS, 50]),
(48, 0, 1, [1, ELLIPSIS, 48, 49, 50]),
(49, 0, 1, [1, ELLIPSIS, 49, 50]),
(50, 0, 1, [1, ELLIPSIS, 50]),
# on_each_side=0, on_ends=0
(1, 0, 0, [1, ELLIPSIS]),
(2, 0, 0, [1, 2, ELLIPSIS]),
(3, 0, 0, [ELLIPSIS, 3, ELLIPSIS]),
(48, 0, 0, [ELLIPSIS, 48, ELLIPSIS]),
(49, 0, 0, [ELLIPSIS, 49, 50]),
(50, 0, 0, [ELLIPSIS, 50]),
]
def test_get_elided_page_range(self):
# Paginator.validate_number() must be called:
paginator = Paginator([1, 2, 3], 2)
@ -426,51 +651,7 @@ class PaginationTests(SimpleTestCase):
self.assertIn(ELLIPSIS, page_range)
# Range should be elided if enough pages when using custom arguments:
tests = [
# on_each_side=2, on_ends=1
(1, 2, 1, [1, 2, 3, ELLIPSIS, 50]),
(4, 2, 1, [1, 2, 3, 4, 5, 6, ELLIPSIS, 50]),
(5, 2, 1, [1, 2, 3, 4, 5, 6, 7, ELLIPSIS, 50]),
(6, 2, 1, [1, ELLIPSIS, 4, 5, 6, 7, 8, ELLIPSIS, 50]),
(45, 2, 1, [1, ELLIPSIS, 43, 44, 45, 46, 47, ELLIPSIS, 50]),
(46, 2, 1, [1, ELLIPSIS, 44, 45, 46, 47, 48, 49, 50]),
(47, 2, 1, [1, ELLIPSIS, 45, 46, 47, 48, 49, 50]),
(50, 2, 1, [1, ELLIPSIS, 48, 49, 50]),
# on_each_side=1, on_ends=3
(1, 1, 3, [1, 2, ELLIPSIS, 48, 49, 50]),
(5, 1, 3, [1, 2, 3, 4, 5, 6, ELLIPSIS, 48, 49, 50]),
(6, 1, 3, [1, 2, 3, 4, 5, 6, 7, ELLIPSIS, 48, 49, 50]),
(7, 1, 3, [1, 2, 3, ELLIPSIS, 6, 7, 8, ELLIPSIS, 48, 49, 50]),
(44, 1, 3, [1, 2, 3, ELLIPSIS, 43, 44, 45, ELLIPSIS, 48, 49, 50]),
(45, 1, 3, [1, 2, 3, ELLIPSIS, 44, 45, 46, 47, 48, 49, 50]),
(46, 1, 3, [1, 2, 3, ELLIPSIS, 45, 46, 47, 48, 49, 50]),
(50, 1, 3, [1, 2, 3, ELLIPSIS, 49, 50]),
# on_each_side=4, on_ends=0
(1, 4, 0, [1, 2, 3, 4, 5, ELLIPSIS]),
(5, 4, 0, [1, 2, 3, 4, 5, 6, 7, 8, 9, ELLIPSIS]),
(6, 4, 0, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, ELLIPSIS]),
(7, 4, 0, [ELLIPSIS, 3, 4, 5, 6, 7, 8, 9, 10, 11, ELLIPSIS]),
(44, 4, 0, [ELLIPSIS, 40, 41, 42, 43, 44, 45, 46, 47, 48, ELLIPSIS]),
(45, 4, 0, [ELLIPSIS, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50]),
(46, 4, 0, [ELLIPSIS, 42, 43, 44, 45, 46, 47, 48, 49, 50]),
(50, 4, 0, [ELLIPSIS, 46, 47, 48, 49, 50]),
# on_each_side=0, on_ends=1
(1, 0, 1, [1, ELLIPSIS, 50]),
(2, 0, 1, [1, 2, ELLIPSIS, 50]),
(3, 0, 1, [1, 2, 3, ELLIPSIS, 50]),
(4, 0, 1, [1, ELLIPSIS, 4, ELLIPSIS, 50]),
(47, 0, 1, [1, ELLIPSIS, 47, ELLIPSIS, 50]),
(48, 0, 1, [1, ELLIPSIS, 48, 49, 50]),
(49, 0, 1, [1, ELLIPSIS, 49, 50]),
(50, 0, 1, [1, ELLIPSIS, 50]),
# on_each_side=0, on_ends=0
(1, 0, 0, [1, ELLIPSIS]),
(2, 0, 0, [1, 2, ELLIPSIS]),
(3, 0, 0, [ELLIPSIS, 3, ELLIPSIS]),
(48, 0, 0, [ELLIPSIS, 48, ELLIPSIS]),
(49, 0, 0, [ELLIPSIS, 49, 50]),
(50, 0, 0, [ELLIPSIS, 50]),
]
tests = self.get_test_cases_for_test_get_elided_page_range()
paginator = Paginator(range(5000), 100)
for number, on_each_side, on_ends, expected in tests:
with self.subTest(
@ -484,6 +665,94 @@ class PaginationTests(SimpleTestCase):
self.assertIsInstance(page_range, collections.abc.Generator)
self.assertEqual(list(page_range), expected)
async def test_aget_elided_page_range_async(self):
# AsyncPaginator.avalidate_number() must be called:
paginator = AsyncPaginator([1, 2, 3], 2)
with unittest.mock.patch.object(paginator, "avalidate_number") as mock:
mock.assert_not_called()
[p async for p in paginator.aget_elided_page_range(2)]
mock.assert_called_with(2)
ELLIPSIS = Paginator.ELLIPSIS
# Range is not elided if not enough pages when using default arguments:
paginator = AsyncPaginator(range(10 * 100), 100)
page_range = paginator.aget_elided_page_range(1)
self.assertIsInstance(page_range, collections.abc.AsyncGenerator)
self.assertNotIn(ELLIPSIS, [p async for p in page_range])
paginator = AsyncPaginator(range(10 * 100 + 1), 100)
self.assertIsInstance(page_range, collections.abc.AsyncGenerator)
page_range = paginator.aget_elided_page_range(1)
self.assertIn(ELLIPSIS, [p async for p in page_range])
# Range should be elided if enough pages when using default arguments:
tests = [
# on_each_side=3, on_ends=2
(1, [1, 2, 3, 4, ELLIPSIS, 49, 50]),
(6, [1, 2, 3, 4, 5, 6, 7, 8, 9, ELLIPSIS, 49, 50]),
(7, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, ELLIPSIS, 49, 50]),
(8, [1, 2, ELLIPSIS, 5, 6, 7, 8, 9, 10, 11, ELLIPSIS, 49, 50]),
(43, [1, 2, ELLIPSIS, 40, 41, 42, 43, 44, 45, 46, ELLIPSIS, 49, 50]),
(44, [1, 2, ELLIPSIS, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50]),
(45, [1, 2, ELLIPSIS, 42, 43, 44, 45, 46, 47, 48, 49, 50]),
(50, [1, 2, ELLIPSIS, 47, 48, 49, 50]),
]
paginator = AsyncPaginator(range(5000), 100)
for number, expected in tests:
with self.subTest(number=number):
page_range = paginator.aget_elided_page_range(number)
self.assertIsInstance(page_range, collections.abc.AsyncGenerator)
self.assertEqual([p async for p in page_range], expected)
# Range is not elided if not enough pages when using custom arguments:
tests = [
(6, 2, 1, 1),
(8, 1, 3, 1),
(8, 4, 0, 1),
(4, 1, 1, 1),
# When on_each_side and on_ends are both <= 1 but not both == 1 it
# is a special case where the range is not elided until an extra
# page is added.
(2, 0, 1, 2),
(2, 1, 0, 2),
(1, 0, 0, 2),
]
for pages, on_each_side, on_ends, elided_after in tests:
for offset in range(elided_after + 1):
with self.subTest(
pages=pages,
offset=elided_after,
on_each_side=on_each_side,
on_ends=on_ends,
):
paginator = AsyncPaginator(range((pages + offset) * 100), 100)
page_range = paginator.aget_elided_page_range(
1,
on_each_side=on_each_side,
on_ends=on_ends,
)
self.assertIsInstance(page_range, collections.abc.AsyncGenerator)
page_list = [p async for p in page_range]
if offset < elided_after:
self.assertNotIn(ELLIPSIS, page_list)
else:
self.assertIn(ELLIPSIS, page_list)
# Range should be elided if enough pages when using custom arguments:
tests = self.get_test_cases_for_test_get_elided_page_range()
paginator = AsyncPaginator(range(5000), 100)
for number, on_each_side, on_ends, expected in tests:
with self.subTest(
number=number, on_each_side=on_each_side, on_ends=on_ends
):
page_range = paginator.aget_elided_page_range(
number,
on_each_side=on_each_side,
on_ends=on_ends,
)
self.assertIsInstance(page_range, collections.abc.AsyncGenerator)
self.assertEqual([p async for p in page_range], expected)
class ModelPaginationTests(TestCase):
"""
@ -513,6 +782,21 @@ class ModelPaginationTests(TestCase):
self.assertEqual(1, p.start_index())
self.assertEqual(5, p.end_index())
async def test_first_page_async(self):
paginator = AsyncPaginator(Article.objects.order_by("id"), 5)
p = await paginator.apage(1)
self.assertEqual("<Async Page 1>", str(p))
object_list = await p.aget_object_list()
self.assertSequenceEqual(object_list, self.articles[:5])
self.assertTrue(await p.ahas_next())
self.assertFalse(await p.ahas_previous())
self.assertTrue(await p.ahas_other_pages())
self.assertEqual(2, await p.anext_page_number())
with self.assertRaises(InvalidPage):
await p.aprevious_page_number()
self.assertEqual(1, await p.astart_index())
self.assertEqual(5, await p.aend_index())
def test_last_page(self):
paginator = Paginator(Article.objects.order_by("id"), 5)
p = paginator.page(2)
@ -527,6 +811,21 @@ class ModelPaginationTests(TestCase):
self.assertEqual(6, p.start_index())
self.assertEqual(9, p.end_index())
async def test_last_page_async(self):
paginator = AsyncPaginator(Article.objects.order_by("id"), 5)
p = await paginator.apage(2)
self.assertEqual("<Async Page 2>", str(p))
object_list = await p.aget_object_list()
self.assertSequenceEqual(object_list, self.articles[5:])
self.assertFalse(await p.ahas_next())
self.assertTrue(await p.ahas_previous())
self.assertTrue(await p.ahas_other_pages())
with self.assertRaises(InvalidPage):
await p.anext_page_number()
self.assertEqual(1, await p.aprevious_page_number())
self.assertEqual(6, await p.astart_index())
self.assertEqual(9, await p.aend_index())
def test_page_getitem(self):
"""
Tests proper behavior of a paginator page __getitem__ (queryset
@ -551,6 +850,24 @@ class ModelPaginationTests(TestCase):
# After __getitem__ is called, object_list is a list
self.assertIsInstance(p.object_list, list)
async def test_page_getitem_async(self):
paginator = AsyncPaginator(Article.objects.order_by("id"), 5)
p = await paginator.apage(1)
msg = "AsyncPage indices must be integers or slices, not str."
with self.assertRaisesMessage(TypeError, msg):
p["has_previous"]
self.assertIsNone(p.object_list._result_cache)
self.assertNotIsInstance(p.object_list, list)
await p.aget_object_list()
self.assertEqual(p[0], self.articles[0])
self.assertSequenceEqual(p[slice(2)], self.articles[:2])
self.assertIsInstance(p.object_list, list)
def test_paginating_unordered_queryset_raises_warning(self):
msg = (
"Pagination may yield inconsistent results with an unordered "
@ -562,11 +879,27 @@ class ModelPaginationTests(TestCase):
# is appropriate).
self.assertEqual(cm.filename, __file__)
async def test_paginating_unordered_queryset_raises_warning_async(self):
msg = (
"Pagination may yield inconsistent results with an unordered "
"object_list: <class 'pagination.models.Article'> QuerySet."
)
with self.assertWarnsMessage(UnorderedObjectListWarning, msg) as cm:
AsyncPaginator(Article.objects.all(), 5)
# The warning points at the BasePaginator caller.
# The reason is that the UnorderedObjectListWarning occurs in BasePaginator.
self.assertEqual(cm.filename, inspect.getfile(BasePaginator))
def test_paginating_empty_queryset_does_not_warn(self):
with warnings.catch_warnings(record=True) as recorded:
Paginator(Article.objects.none(), 5)
self.assertEqual(len(recorded), 0)
async def test_paginating_empty_queryset_does_not_warn_async(self):
with warnings.catch_warnings(record=True) as recorded:
AsyncPaginator(Article.objects.none(), 5)
self.assertEqual(len(recorded), 0)
def test_paginating_unordered_object_list_raises_warning(self):
"""
Unordered object list warning with an object that has an ordered
@ -583,3 +916,58 @@ class ModelPaginationTests(TestCase):
)
with self.assertWarnsMessage(UnorderedObjectListWarning, msg):
Paginator(object_list, 5)
async def test_paginating_unordered_object_list_raises_warning_async(self):
"""
See test_paginating_unordered_object_list_raises_warning.
"""
class ObjectList:
ordered = False
object_list = ObjectList()
msg = (
"Pagination may yield inconsistent results with an unordered "
"object_list: {!r}.".format(object_list)
)
with self.assertWarnsMessage(UnorderedObjectListWarning, msg):
AsyncPaginator(object_list, 5)
async def test_async_page_object_list_raises_type_error_before_await(self):
paginator = AsyncPaginator(Article.objects.order_by("id"), 5)
p = await paginator.apage(1)
with self.subTest(func="len"):
msg = "AsyncPage.aget_object_list() must be awaited before calling len()."
with self.assertRaisesMessage(TypeError, msg):
len(p)
with self.subTest(func="reversed"):
msg = (
"AsyncPage.aget_object_list() must be awaited before calling "
"reversed()."
)
with self.assertRaisesMessage(TypeError, msg):
reversed(p)
with self.subTest(func="index"):
msg = "AsyncPage.aget_object_list() must be awaited before using indexing."
with self.assertRaisesMessage(TypeError, msg):
p[0]
async def test_async_page_aiteration(self):
paginator = AsyncPaginator(Article.objects.order_by("id"), 5)
p = await paginator.apage(1)
object_list = [obj async for obj in p]
self.assertEqual(len(object_list), 5)
async def test_aget_object_list(self):
paginator = AsyncPaginator(Article.objects.order_by("id"), 5)
p = await paginator.apage(1)
# object_list queryset is converted to list.
first_called_objs = await p.aget_object_list()
self.assertIsInstance(first_called_objs, list)
# It returns the same list that was converted on the first call.
second_called_objs = await p.aget_object_list()
self.assertEqual(id(first_called_objs), id(second_called_objs))