mirror of
https://github.com/django/django.git
synced 2025-01-10 02:16:08 +00:00
20848bcf39
Signed-off-by: cuinix <915115094@qq.com>
417 lines
15 KiB
Python
417 lines
15 KiB
Python
from datetime import date
|
|
from decimal import Decimal
|
|
|
|
from django.core.exceptions import FieldDoesNotExist
|
|
from django.db.models.query import RawQuerySet
|
|
from django.test import TestCase, skipUnlessDBFeature
|
|
|
|
from .models import (
|
|
Author,
|
|
Book,
|
|
BookFkAsPk,
|
|
Coffee,
|
|
FriendlyAuthor,
|
|
MixedCaseIDColumn,
|
|
Reviewer,
|
|
)
|
|
|
|
|
|
class RawQueryTests(TestCase):
|
|
@classmethod
|
|
def setUpTestData(cls):
|
|
cls.a1 = Author.objects.create(
|
|
first_name="Joe", last_name="Smith", dob=date(1950, 9, 20)
|
|
)
|
|
cls.a2 = Author.objects.create(
|
|
first_name="Jill", last_name="Doe", dob=date(1920, 4, 2)
|
|
)
|
|
cls.a3 = Author.objects.create(
|
|
first_name="Bob", last_name="Smith", dob=date(1986, 1, 25)
|
|
)
|
|
cls.a4 = Author.objects.create(
|
|
first_name="Bill", last_name="Jones", dob=date(1932, 5, 10)
|
|
)
|
|
cls.b1 = Book.objects.create(
|
|
title="The awesome book",
|
|
author=cls.a1,
|
|
paperback=False,
|
|
opening_line=(
|
|
"It was a bright cold day in April and the clocks were striking "
|
|
"thirteen."
|
|
),
|
|
)
|
|
cls.b2 = Book.objects.create(
|
|
title="The horrible book",
|
|
author=cls.a1,
|
|
paperback=True,
|
|
opening_line=(
|
|
"On an evening in the latter part of May a middle-aged man "
|
|
"was walking homeward from Shaston to the village of Marlott, "
|
|
"in the adjoining Vale of Blakemore, or Blackmoor."
|
|
),
|
|
)
|
|
cls.b3 = Book.objects.create(
|
|
title="Another awesome book",
|
|
author=cls.a1,
|
|
paperback=False,
|
|
opening_line="A squat gray building of only thirty-four stories.",
|
|
)
|
|
cls.b4 = Book.objects.create(
|
|
title="Some other book",
|
|
author=cls.a3,
|
|
paperback=True,
|
|
opening_line="It was the day my grandmother exploded.",
|
|
)
|
|
cls.c1 = Coffee.objects.create(brand="dunkin doughnuts")
|
|
cls.c2 = Coffee.objects.create(brand="starbucks")
|
|
cls.r1 = Reviewer.objects.create()
|
|
cls.r2 = Reviewer.objects.create()
|
|
cls.r1.reviewed.add(cls.b2, cls.b3, cls.b4)
|
|
|
|
def assertSuccessfulRawQuery(
|
|
self,
|
|
model,
|
|
query,
|
|
expected_results,
|
|
expected_annotations=(),
|
|
params=[],
|
|
translations=None,
|
|
):
|
|
"""
|
|
Execute the passed query against the passed model and check the output
|
|
"""
|
|
results = list(
|
|
model.objects.raw(query, params=params, translations=translations)
|
|
)
|
|
self.assertProcessed(model, results, expected_results, expected_annotations)
|
|
self.assertAnnotations(results, expected_annotations)
|
|
|
|
def assertProcessed(self, model, results, orig, expected_annotations=()):
|
|
"""
|
|
Compare the results of a raw query against expected results
|
|
"""
|
|
self.assertEqual(len(results), len(orig))
|
|
for index, item in enumerate(results):
|
|
orig_item = orig[index]
|
|
for annotation in expected_annotations:
|
|
setattr(orig_item, *annotation)
|
|
|
|
for field in model._meta.fields:
|
|
# All values on the model are equal
|
|
self.assertEqual(
|
|
getattr(item, field.attname), getattr(orig_item, field.attname)
|
|
)
|
|
# This includes checking that they are the same type
|
|
self.assertEqual(
|
|
type(getattr(item, field.attname)),
|
|
type(getattr(orig_item, field.attname)),
|
|
)
|
|
|
|
def assertNoAnnotations(self, results):
|
|
"""
|
|
The results of a raw query contain no annotations
|
|
"""
|
|
self.assertAnnotations(results, ())
|
|
|
|
def assertAnnotations(self, results, expected_annotations):
|
|
"""
|
|
The passed raw query results contain the expected annotations
|
|
"""
|
|
if expected_annotations:
|
|
for index, result in enumerate(results):
|
|
annotation, value = expected_annotations[index]
|
|
self.assertTrue(hasattr(result, annotation))
|
|
self.assertEqual(getattr(result, annotation), value)
|
|
|
|
def test_rawqueryset_repr(self):
|
|
queryset = RawQuerySet(raw_query="SELECT * FROM raw_query_author")
|
|
self.assertEqual(
|
|
repr(queryset), "<RawQuerySet: SELECT * FROM raw_query_author>"
|
|
)
|
|
self.assertEqual(
|
|
repr(queryset.query), "<RawQuery: SELECT * FROM raw_query_author>"
|
|
)
|
|
|
|
def test_simple_raw_query(self):
|
|
"""
|
|
Basic test of raw query with a simple database query
|
|
"""
|
|
query = "SELECT * FROM raw_query_author"
|
|
authors = Author.objects.all()
|
|
self.assertSuccessfulRawQuery(Author, query, authors)
|
|
|
|
def test_raw_query_lazy(self):
|
|
"""
|
|
Raw queries are lazy: they aren't actually executed until they're
|
|
iterated over.
|
|
"""
|
|
q = Author.objects.raw("SELECT * FROM raw_query_author")
|
|
self.assertIsNone(q.query.cursor)
|
|
list(q)
|
|
self.assertIsNotNone(q.query.cursor)
|
|
|
|
def test_FK_raw_query(self):
|
|
"""
|
|
Test of a simple raw query against a model containing a foreign key
|
|
"""
|
|
query = "SELECT * FROM raw_query_book"
|
|
books = Book.objects.all()
|
|
self.assertSuccessfulRawQuery(Book, query, books)
|
|
|
|
def test_db_column_handler(self):
|
|
"""
|
|
Test of a simple raw query against a model containing a field with
|
|
db_column defined.
|
|
"""
|
|
query = "SELECT * FROM raw_query_coffee"
|
|
coffees = Coffee.objects.all()
|
|
self.assertSuccessfulRawQuery(Coffee, query, coffees)
|
|
|
|
def test_pk_with_mixed_case_db_column(self):
|
|
"""
|
|
A raw query with a model that has a pk db_column with mixed case.
|
|
"""
|
|
query = "SELECT * FROM raw_query_mixedcaseidcolumn"
|
|
queryset = MixedCaseIDColumn.objects.all()
|
|
self.assertSuccessfulRawQuery(MixedCaseIDColumn, query, queryset)
|
|
|
|
def test_order_handler(self):
|
|
"""Raw query tolerates columns being returned in any order."""
|
|
selects = (
|
|
("dob, last_name, first_name, id"),
|
|
("last_name, dob, first_name, id"),
|
|
("first_name, last_name, dob, id"),
|
|
)
|
|
|
|
for select in selects:
|
|
query = "SELECT %s FROM raw_query_author" % select
|
|
authors = Author.objects.all()
|
|
self.assertSuccessfulRawQuery(Author, query, authors)
|
|
|
|
def test_translations(self):
|
|
"""
|
|
Test of raw query's optional ability to translate unexpected result
|
|
column names to specific model fields
|
|
"""
|
|
query = (
|
|
"SELECT first_name AS first, last_name AS last, dob, id "
|
|
"FROM raw_query_author"
|
|
)
|
|
translations = {"first": "first_name", "last": "last_name"}
|
|
authors = Author.objects.all()
|
|
self.assertSuccessfulRawQuery(Author, query, authors, translations=translations)
|
|
|
|
def test_params(self):
|
|
"""
|
|
Test passing optional query parameters
|
|
"""
|
|
query = "SELECT * FROM raw_query_author WHERE first_name = %s"
|
|
author = Author.objects.all()[2]
|
|
params = [author.first_name]
|
|
qset = Author.objects.raw(query, params=params)
|
|
results = list(qset)
|
|
self.assertProcessed(Author, results, [author])
|
|
self.assertNoAnnotations(results)
|
|
self.assertEqual(len(results), 1)
|
|
self.assertIsInstance(repr(qset), str)
|
|
|
|
def test_params_none(self):
|
|
query = "SELECT * FROM raw_query_author WHERE first_name like 'J%'"
|
|
qset = Author.objects.raw(query, params=None)
|
|
self.assertEqual(len(qset), 2)
|
|
|
|
def test_escaped_percent(self):
|
|
query = "SELECT * FROM raw_query_author WHERE first_name like 'J%%'"
|
|
qset = Author.objects.raw(query)
|
|
self.assertEqual(len(qset), 2)
|
|
|
|
@skipUnlessDBFeature("supports_paramstyle_pyformat")
|
|
def test_pyformat_params(self):
|
|
"""
|
|
Test passing optional query parameters
|
|
"""
|
|
query = "SELECT * FROM raw_query_author WHERE first_name = %(first)s"
|
|
author = Author.objects.all()[2]
|
|
params = {"first": author.first_name}
|
|
qset = Author.objects.raw(query, params=params)
|
|
results = list(qset)
|
|
self.assertProcessed(Author, results, [author])
|
|
self.assertNoAnnotations(results)
|
|
self.assertEqual(len(results), 1)
|
|
self.assertIsInstance(repr(qset), str)
|
|
|
|
def test_query_representation(self):
|
|
"""
|
|
Test representation of raw query with parameters
|
|
"""
|
|
query = "SELECT * FROM raw_query_author WHERE last_name = %(last)s"
|
|
qset = Author.objects.raw(query, {"last": "foo"})
|
|
self.assertEqual(
|
|
repr(qset),
|
|
"<RawQuerySet: SELECT * FROM raw_query_author WHERE last_name = foo>",
|
|
)
|
|
self.assertEqual(
|
|
repr(qset.query),
|
|
"<RawQuery: SELECT * FROM raw_query_author WHERE last_name = foo>",
|
|
)
|
|
|
|
query = "SELECT * FROM raw_query_author WHERE last_name = %s"
|
|
qset = Author.objects.raw(query, {"foo"})
|
|
self.assertEqual(
|
|
repr(qset),
|
|
"<RawQuerySet: SELECT * FROM raw_query_author WHERE last_name = foo>",
|
|
)
|
|
self.assertEqual(
|
|
repr(qset.query),
|
|
"<RawQuery: SELECT * FROM raw_query_author WHERE last_name = foo>",
|
|
)
|
|
|
|
def test_many_to_many(self):
|
|
"""
|
|
Test of a simple raw query against a model containing a m2m field
|
|
"""
|
|
query = "SELECT * FROM raw_query_reviewer"
|
|
reviewers = Reviewer.objects.all()
|
|
self.assertSuccessfulRawQuery(Reviewer, query, reviewers)
|
|
|
|
def test_extra_conversions(self):
|
|
"""Extra translations are ignored."""
|
|
query = "SELECT * FROM raw_query_author"
|
|
translations = {"something": "else"}
|
|
authors = Author.objects.all()
|
|
self.assertSuccessfulRawQuery(Author, query, authors, translations=translations)
|
|
|
|
def test_missing_fields(self):
|
|
query = "SELECT id, first_name, dob FROM raw_query_author"
|
|
for author in Author.objects.raw(query):
|
|
self.assertIsNotNone(author.first_name)
|
|
# last_name isn't given, but it will be retrieved on demand
|
|
self.assertIsNotNone(author.last_name)
|
|
|
|
def test_missing_fields_without_PK(self):
|
|
query = "SELECT first_name, dob FROM raw_query_author"
|
|
msg = "Raw query must include the primary key"
|
|
with self.assertRaisesMessage(FieldDoesNotExist, msg):
|
|
list(Author.objects.raw(query))
|
|
|
|
def test_annotations(self):
|
|
query = (
|
|
"SELECT a.*, count(b.id) as book_count "
|
|
"FROM raw_query_author a "
|
|
"LEFT JOIN raw_query_book b ON a.id = b.author_id "
|
|
"GROUP BY a.id, a.first_name, a.last_name, a.dob ORDER BY a.id"
|
|
)
|
|
expected_annotations = (
|
|
("book_count", 3),
|
|
("book_count", 0),
|
|
("book_count", 1),
|
|
("book_count", 0),
|
|
)
|
|
authors = Author.objects.order_by("pk")
|
|
self.assertSuccessfulRawQuery(Author, query, authors, expected_annotations)
|
|
|
|
def test_white_space_query(self):
|
|
query = " SELECT * FROM raw_query_author"
|
|
authors = Author.objects.all()
|
|
self.assertSuccessfulRawQuery(Author, query, authors)
|
|
|
|
def test_multiple_iterations(self):
|
|
query = "SELECT * FROM raw_query_author"
|
|
normal_authors = Author.objects.all()
|
|
raw_authors = Author.objects.raw(query)
|
|
|
|
# First Iteration
|
|
first_iterations = 0
|
|
for index, raw_author in enumerate(raw_authors):
|
|
self.assertEqual(normal_authors[index], raw_author)
|
|
first_iterations += 1
|
|
|
|
# Second Iteration
|
|
second_iterations = 0
|
|
for index, raw_author in enumerate(raw_authors):
|
|
self.assertEqual(normal_authors[index], raw_author)
|
|
second_iterations += 1
|
|
|
|
self.assertEqual(first_iterations, second_iterations)
|
|
|
|
def test_get_item(self):
|
|
# Indexing on RawQuerySets
|
|
query = "SELECT * FROM raw_query_author ORDER BY id ASC"
|
|
third_author = Author.objects.raw(query)[2]
|
|
self.assertEqual(third_author.first_name, "Bob")
|
|
|
|
first_two = Author.objects.raw(query)[0:2]
|
|
self.assertEqual(len(first_two), 2)
|
|
|
|
with self.assertRaises(TypeError):
|
|
Author.objects.raw(query)["test"]
|
|
|
|
def test_inheritance(self):
|
|
f = FriendlyAuthor.objects.create(
|
|
first_name="Wesley", last_name="Chun", dob=date(1962, 10, 28)
|
|
)
|
|
query = "SELECT * FROM raw_query_friendlyauthor"
|
|
self.assertEqual([o.pk for o in FriendlyAuthor.objects.raw(query)], [f.pk])
|
|
|
|
def test_query_count(self):
|
|
self.assertNumQueries(
|
|
1, list, Author.objects.raw("SELECT * FROM raw_query_author")
|
|
)
|
|
|
|
def test_subquery_in_raw_sql(self):
|
|
list(
|
|
Book.objects.raw(
|
|
"SELECT id FROM "
|
|
"(SELECT * FROM raw_query_book WHERE paperback IS NOT NULL) sq"
|
|
)
|
|
)
|
|
|
|
def test_db_column_name_is_used_in_raw_query(self):
|
|
"""
|
|
Regression test that ensures the `column` attribute on the field is
|
|
used to generate the list of fields included in the query, as opposed
|
|
to the `attname`. This is important when the primary key is a
|
|
ForeignKey field because `attname` and `column` are not necessarily the
|
|
same.
|
|
"""
|
|
b = BookFkAsPk.objects.create(book=self.b1)
|
|
self.assertEqual(
|
|
list(
|
|
BookFkAsPk.objects.raw(
|
|
"SELECT not_the_default FROM raw_query_bookfkaspk"
|
|
)
|
|
),
|
|
[b],
|
|
)
|
|
|
|
def test_decimal_parameter(self):
|
|
c = Coffee.objects.create(brand="starbucks", price=20.5)
|
|
qs = Coffee.objects.raw(
|
|
"SELECT * FROM raw_query_coffee WHERE price >= %s", params=[Decimal(20)]
|
|
)
|
|
self.assertEqual(list(qs), [c])
|
|
|
|
def test_result_caching(self):
|
|
with self.assertNumQueries(1):
|
|
books = Book.objects.raw("SELECT * FROM raw_query_book")
|
|
list(books)
|
|
list(books)
|
|
|
|
def test_iterator(self):
|
|
with self.assertNumQueries(2):
|
|
books = Book.objects.raw("SELECT * FROM raw_query_book")
|
|
list(books.iterator())
|
|
list(books.iterator())
|
|
|
|
def test_bool(self):
|
|
self.assertIs(bool(Book.objects.raw("SELECT * FROM raw_query_book")), True)
|
|
self.assertIs(
|
|
bool(Book.objects.raw("SELECT * FROM raw_query_book WHERE id = 0")), False
|
|
)
|
|
|
|
def test_len(self):
|
|
self.assertEqual(len(Book.objects.raw("SELECT * FROM raw_query_book")), 4)
|
|
self.assertEqual(
|
|
len(Book.objects.raw("SELECT * FROM raw_query_book WHERE id = 0")), 0
|
|
)
|