mirror of
https://github.com/django/django.git
synced 2025-01-10 18:36:05 +00:00
962 lines
39 KiB
Python
962 lines
39 KiB
Python
from __future__ import unicode_literals
|
|
|
|
import datetime
|
|
import uuid
|
|
from copy import deepcopy
|
|
|
|
from django.core.exceptions import FieldError
|
|
from django.db import DatabaseError, connection, models, transaction
|
|
from django.db.models import TimeField, UUIDField
|
|
from django.db.models.aggregates import (
|
|
Avg, Count, Max, Min, StdDev, Sum, Variance,
|
|
)
|
|
from django.db.models.expressions import (
|
|
Case, Col, ExpressionWrapper, F, Func, OrderBy, Random, RawSQL, Ref, Value,
|
|
When,
|
|
)
|
|
from django.db.models.functions import (
|
|
Coalesce, Concat, Length, Lower, Substr, Upper,
|
|
)
|
|
from django.test import TestCase, skipIfDBFeature, skipUnlessDBFeature
|
|
from django.test.utils import Approximate
|
|
from django.utils import six
|
|
|
|
from .models import UUID, Company, Employee, Experiment, Number, Time
|
|
|
|
|
|
class BasicExpressionsTests(TestCase):
|
|
@classmethod
|
|
def setUpTestData(cls):
|
|
Company.objects.create(
|
|
name="Example Inc.", num_employees=2300, num_chairs=5,
|
|
ceo=Employee.objects.create(firstname="Joe", lastname="Smith", salary=10)
|
|
)
|
|
Company.objects.create(
|
|
name="Foobar Ltd.", num_employees=3, num_chairs=4,
|
|
ceo=Employee.objects.create(firstname="Frank", lastname="Meyer", salary=20)
|
|
)
|
|
Company.objects.create(
|
|
name="Test GmbH", num_employees=32, num_chairs=1,
|
|
ceo=Employee.objects.create(firstname="Max", lastname="Mustermann", salary=30)
|
|
)
|
|
|
|
def setUp(self):
|
|
self.company_query = Company.objects.values(
|
|
"name", "num_employees", "num_chairs"
|
|
).order_by(
|
|
"name", "num_employees", "num_chairs"
|
|
)
|
|
|
|
def test_annotate_values_aggregate(self):
|
|
companies = Company.objects.annotate(
|
|
salaries=F('ceo__salary'),
|
|
).values('num_employees', 'salaries').aggregate(
|
|
result=Sum(
|
|
F('salaries') + F('num_employees'),
|
|
output_field=models.IntegerField()
|
|
),
|
|
)
|
|
self.assertEqual(companies['result'], 2395)
|
|
|
|
def test_annotate_values_filter(self):
|
|
companies = Company.objects.annotate(
|
|
foo=RawSQL('%s', ['value']),
|
|
).filter(foo='value').order_by('name')
|
|
self.assertQuerysetEqual(
|
|
companies, [
|
|
'<Company: Example Inc.>',
|
|
'<Company: Foobar Ltd.>',
|
|
'<Company: Test GmbH>',
|
|
],
|
|
)
|
|
|
|
def test_filter_inter_attribute(self):
|
|
# We can filter on attribute relationships on same model obj, e.g.
|
|
# find companies where the number of employees is greater
|
|
# than the number of chairs.
|
|
self.assertQuerysetEqual(
|
|
self.company_query.filter(num_employees__gt=F("num_chairs")), [
|
|
{
|
|
"num_chairs": 5,
|
|
"name": "Example Inc.",
|
|
"num_employees": 2300,
|
|
},
|
|
{
|
|
"num_chairs": 1,
|
|
"name": "Test GmbH",
|
|
"num_employees": 32
|
|
},
|
|
],
|
|
lambda o: o
|
|
)
|
|
|
|
def test_update(self):
|
|
# We can set one field to have the value of another field
|
|
# Make sure we have enough chairs
|
|
self.company_query.update(num_chairs=F("num_employees"))
|
|
self.assertQuerysetEqual(
|
|
self.company_query, [
|
|
{
|
|
"num_chairs": 2300,
|
|
"name": "Example Inc.",
|
|
"num_employees": 2300
|
|
},
|
|
{
|
|
"num_chairs": 3,
|
|
"name": "Foobar Ltd.",
|
|
"num_employees": 3
|
|
},
|
|
{
|
|
"num_chairs": 32,
|
|
"name": "Test GmbH",
|
|
"num_employees": 32
|
|
}
|
|
],
|
|
lambda o: o
|
|
)
|
|
|
|
def test_arithmetic(self):
|
|
# We can perform arithmetic operations in expressions
|
|
# Make sure we have 2 spare chairs
|
|
self.company_query.update(num_chairs=F("num_employees") + 2)
|
|
self.assertQuerysetEqual(
|
|
self.company_query, [
|
|
{
|
|
'num_chairs': 2302,
|
|
'name': 'Example Inc.',
|
|
'num_employees': 2300
|
|
},
|
|
{
|
|
'num_chairs': 5,
|
|
'name': 'Foobar Ltd.',
|
|
'num_employees': 3
|
|
},
|
|
{
|
|
'num_chairs': 34,
|
|
'name': 'Test GmbH',
|
|
'num_employees': 32
|
|
}
|
|
],
|
|
lambda o: o,
|
|
)
|
|
|
|
def test_order_of_operations(self):
|
|
# Law of order of operations is followed
|
|
self. company_query.update(
|
|
num_chairs=F('num_employees') + 2 * F('num_employees')
|
|
)
|
|
self.assertQuerysetEqual(
|
|
self.company_query, [
|
|
{
|
|
'num_chairs': 6900,
|
|
'name': 'Example Inc.',
|
|
'num_employees': 2300
|
|
},
|
|
{
|
|
'num_chairs': 9,
|
|
'name': 'Foobar Ltd.',
|
|
'num_employees': 3
|
|
},
|
|
{
|
|
'num_chairs': 96,
|
|
'name': 'Test GmbH',
|
|
'num_employees': 32
|
|
}
|
|
],
|
|
lambda o: o,
|
|
)
|
|
|
|
def test_parenthesis_priority(self):
|
|
# Law of order of operations can be overridden by parentheses
|
|
self.company_query.update(
|
|
num_chairs=((F('num_employees') + 2) * F('num_employees'))
|
|
)
|
|
self.assertQuerysetEqual(
|
|
self.company_query, [
|
|
{
|
|
'num_chairs': 5294600,
|
|
'name': 'Example Inc.',
|
|
'num_employees': 2300
|
|
},
|
|
{
|
|
'num_chairs': 15,
|
|
'name': 'Foobar Ltd.',
|
|
'num_employees': 3
|
|
},
|
|
{
|
|
'num_chairs': 1088,
|
|
'name': 'Test GmbH',
|
|
'num_employees': 32
|
|
}
|
|
],
|
|
lambda o: o,
|
|
)
|
|
|
|
def test_update_with_fk(self):
|
|
# ForeignKey can become updated with the value of another ForeignKey.
|
|
self.assertEqual(
|
|
Company.objects.update(point_of_contact=F('ceo')),
|
|
3
|
|
)
|
|
self.assertQuerysetEqual(
|
|
Company.objects.all(), [
|
|
"Joe Smith",
|
|
"Frank Meyer",
|
|
"Max Mustermann",
|
|
],
|
|
lambda c: six.text_type(c.point_of_contact),
|
|
ordered=False
|
|
)
|
|
|
|
def test_update_with_none(self):
|
|
Number.objects.create(integer=1, float=1.0)
|
|
Number.objects.create(integer=2)
|
|
Number.objects.filter(float__isnull=False).update(float=Value(None))
|
|
self.assertQuerysetEqual(
|
|
Number.objects.all(), [
|
|
None,
|
|
None,
|
|
],
|
|
lambda n: n.float,
|
|
ordered=False
|
|
)
|
|
|
|
def test_filter_with_join(self):
|
|
# F Expressions can also span joins
|
|
Company.objects.update(point_of_contact=F('ceo'))
|
|
c = Company.objects.all()[0]
|
|
c.point_of_contact = Employee.objects.create(firstname="Guido", lastname="van Rossum")
|
|
c.save()
|
|
|
|
self.assertQuerysetEqual(
|
|
Company.objects.filter(ceo__firstname=F("point_of_contact__firstname")), [
|
|
"Foobar Ltd.",
|
|
"Test GmbH",
|
|
],
|
|
lambda c: c.name,
|
|
ordered=False
|
|
)
|
|
|
|
Company.objects.exclude(
|
|
ceo__firstname=F("point_of_contact__firstname")
|
|
).update(name="foo")
|
|
self.assertEqual(
|
|
Company.objects.exclude(
|
|
ceo__firstname=F('point_of_contact__firstname')
|
|
).get().name,
|
|
"foo",
|
|
)
|
|
|
|
with transaction.atomic():
|
|
with self.assertRaises(FieldError):
|
|
Company.objects.exclude(
|
|
ceo__firstname=F('point_of_contact__firstname')
|
|
).update(name=F('point_of_contact__lastname'))
|
|
|
|
def test_object_update(self):
|
|
# F expressions can be used to update attributes on single objects
|
|
test_gmbh = Company.objects.get(name="Test GmbH")
|
|
self.assertEqual(test_gmbh.num_employees, 32)
|
|
test_gmbh.num_employees = F("num_employees") + 4
|
|
test_gmbh.save()
|
|
test_gmbh = Company.objects.get(pk=test_gmbh.pk)
|
|
self.assertEqual(test_gmbh.num_employees, 36)
|
|
|
|
def test_new_object_save(self):
|
|
# We should be able to use Funcs when inserting new data
|
|
test_co = Company(
|
|
name=Lower(Value("UPPER")), num_employees=32, num_chairs=1,
|
|
ceo=Employee.objects.create(firstname="Just", lastname="Doit", salary=30),
|
|
)
|
|
test_co.save()
|
|
test_co.refresh_from_db()
|
|
self.assertEqual(test_co.name, "upper")
|
|
|
|
def test_new_object_create(self):
|
|
test_co = Company.objects.create(
|
|
name=Lower(Value("UPPER")), num_employees=32, num_chairs=1,
|
|
ceo=Employee.objects.create(firstname="Just", lastname="Doit", salary=30),
|
|
)
|
|
test_co.refresh_from_db()
|
|
self.assertEqual(test_co.name, "upper")
|
|
|
|
def test_object_create_with_aggregate(self):
|
|
# Aggregates are not allowed when inserting new data
|
|
with self.assertRaisesMessage(FieldError, 'Aggregate functions are not allowed in this query'):
|
|
Company.objects.create(
|
|
name='Company', num_employees=Max(Value(1)), num_chairs=1,
|
|
ceo=Employee.objects.create(firstname="Just", lastname="Doit", salary=30),
|
|
)
|
|
|
|
def test_object_update_fk(self):
|
|
# F expressions cannot be used to update attributes which are foreign
|
|
# keys, or attributes which involve joins.
|
|
test_gmbh = Company.objects.get(name="Test GmbH")
|
|
|
|
def test():
|
|
test_gmbh.point_of_contact = F("ceo")
|
|
with self.assertRaises(ValueError):
|
|
test()
|
|
|
|
test_gmbh.point_of_contact = test_gmbh.ceo
|
|
test_gmbh.save()
|
|
test_gmbh.name = F("ceo__last_name")
|
|
with self.assertRaises(FieldError):
|
|
test_gmbh.save()
|
|
|
|
def test_object_update_unsaved_objects(self):
|
|
# F expressions cannot be used to update attributes on objects which do
|
|
# not yet exist in the database
|
|
test_gmbh = Company.objects.get(name="Test GmbH")
|
|
acme = Company(
|
|
name="The Acme Widget Co.", num_employees=12, num_chairs=5,
|
|
ceo=test_gmbh.ceo
|
|
)
|
|
acme.num_employees = F("num_employees") + 16
|
|
msg = (
|
|
'Failed to insert expression "Col(expressions_company, '
|
|
'expressions.Company.num_employees) + Value(16)" on '
|
|
'expressions.Company.num_employees. F() expressions can only be '
|
|
'used to update, not to insert.'
|
|
)
|
|
with self.assertRaisesMessage(ValueError, msg):
|
|
acme.save()
|
|
|
|
acme.num_employees = 12
|
|
acme.name = Lower(F('name'))
|
|
msg = (
|
|
'Failed to insert expression "Lower(Col(expressions_company, '
|
|
'expressions.Company.name))" on expressions.Company.name. F() '
|
|
'expressions can only be used to update, not to insert.'
|
|
)
|
|
with self.assertRaisesMessage(ValueError, msg):
|
|
acme.save()
|
|
|
|
def test_ticket_11722_iexact_lookup(self):
|
|
Employee.objects.create(firstname="John", lastname="Doe")
|
|
Employee.objects.create(firstname="Test", lastname="test")
|
|
|
|
queryset = Employee.objects.filter(firstname__iexact=F('lastname'))
|
|
self.assertQuerysetEqual(queryset, ["<Employee: Test test>"])
|
|
|
|
@skipIfDBFeature('has_case_insensitive_like')
|
|
def test_ticket_16731_startswith_lookup(self):
|
|
Employee.objects.create(firstname="John", lastname="Doe")
|
|
e2 = Employee.objects.create(firstname="Jack", lastname="Jackson")
|
|
e3 = Employee.objects.create(firstname="Jack", lastname="jackson")
|
|
self.assertQuerysetEqual(
|
|
Employee.objects.filter(lastname__startswith=F('firstname')),
|
|
[e2], lambda x: x)
|
|
self.assertQuerysetEqual(
|
|
Employee.objects.filter(lastname__istartswith=F('firstname')).order_by('pk'),
|
|
[e2, e3], lambda x: x)
|
|
|
|
def test_ticket_18375_join_reuse(self):
|
|
# Test that reverse multijoin F() references and the lookup target
|
|
# the same join. Pre #18375 the F() join was generated first, and the
|
|
# lookup couldn't reuse that join.
|
|
qs = Employee.objects.filter(
|
|
company_ceo_set__num_chairs=F('company_ceo_set__num_employees'))
|
|
self.assertEqual(str(qs.query).count('JOIN'), 1)
|
|
|
|
def test_ticket_18375_kwarg_ordering(self):
|
|
# The next query was dict-randomization dependent - if the "gte=1"
|
|
# was seen first, then the F() will reuse the join generated by the
|
|
# gte lookup, if F() was seen first, then it generated a join the
|
|
# other lookups could not reuse.
|
|
qs = Employee.objects.filter(
|
|
company_ceo_set__num_chairs=F('company_ceo_set__num_employees'),
|
|
company_ceo_set__num_chairs__gte=1)
|
|
self.assertEqual(str(qs.query).count('JOIN'), 1)
|
|
|
|
def test_ticket_18375_kwarg_ordering_2(self):
|
|
# Another similar case for F() than above. Now we have the same join
|
|
# in two filter kwargs, one in the lhs lookup, one in F. Here pre
|
|
# #18375 the amount of joins generated was random if dict
|
|
# randomization was enabled, that is the generated query dependent
|
|
# on which clause was seen first.
|
|
qs = Employee.objects.filter(
|
|
company_ceo_set__num_employees=F('pk'),
|
|
pk=F('company_ceo_set__num_employees')
|
|
)
|
|
self.assertEqual(str(qs.query).count('JOIN'), 1)
|
|
|
|
def test_ticket_18375_chained_filters(self):
|
|
# Test that F() expressions do not reuse joins from previous filter.
|
|
qs = Employee.objects.filter(
|
|
company_ceo_set__num_employees=F('pk')
|
|
).filter(
|
|
company_ceo_set__num_employees=F('company_ceo_set__num_employees')
|
|
)
|
|
self.assertEqual(str(qs.query).count('JOIN'), 2)
|
|
|
|
|
|
class ExpressionsTests(TestCase):
|
|
|
|
def test_F_object_deepcopy(self):
|
|
"""
|
|
Make sure F objects can be deepcopied (#23492)
|
|
"""
|
|
f = F("foo")
|
|
g = deepcopy(f)
|
|
self.assertEqual(f.name, g.name)
|
|
|
|
def test_f_reuse(self):
|
|
f = F('id')
|
|
n = Number.objects.create(integer=-1)
|
|
c = Company.objects.create(
|
|
name="Example Inc.", num_employees=2300, num_chairs=5,
|
|
ceo=Employee.objects.create(firstname="Joe", lastname="Smith")
|
|
)
|
|
c_qs = Company.objects.filter(id=f)
|
|
self.assertEqual(c_qs.get(), c)
|
|
# Reuse the same F-object for another queryset
|
|
n_qs = Number.objects.filter(id=f)
|
|
self.assertEqual(n_qs.get(), n)
|
|
# The original query still works correctly
|
|
self.assertEqual(c_qs.get(), c)
|
|
|
|
def test_patterns_escape(self):
|
|
"""
|
|
Test that special characters (e.g. %, _ and \) stored in database are
|
|
properly escaped when using a pattern lookup with an expression
|
|
refs #16731
|
|
"""
|
|
Employee.objects.bulk_create([
|
|
Employee(firstname="%Joh\\nny", lastname="%Joh\\n"),
|
|
Employee(firstname="Johnny", lastname="%John"),
|
|
Employee(firstname="Jean-Claude", lastname="Claud_"),
|
|
Employee(firstname="Jean-Claude", lastname="Claude"),
|
|
Employee(firstname="Jean-Claude", lastname="Claude%"),
|
|
Employee(firstname="Johnny", lastname="Joh\\n"),
|
|
Employee(firstname="Johnny", lastname="John"),
|
|
Employee(firstname="Johnny", lastname="_ohn"),
|
|
])
|
|
|
|
self.assertQuerysetEqual(
|
|
Employee.objects.filter(firstname__contains=F('lastname')),
|
|
["<Employee: %Joh\\nny %Joh\\n>", "<Employee: Jean-Claude Claude>", "<Employee: Johnny John>"],
|
|
ordered=False)
|
|
|
|
self.assertQuerysetEqual(
|
|
Employee.objects.filter(firstname__startswith=F('lastname')),
|
|
["<Employee: %Joh\\nny %Joh\\n>", "<Employee: Johnny John>"],
|
|
ordered=False)
|
|
|
|
self.assertQuerysetEqual(
|
|
Employee.objects.filter(firstname__endswith=F('lastname')),
|
|
["<Employee: Jean-Claude Claude>"],
|
|
ordered=False)
|
|
|
|
def test_insensitive_patterns_escape(self):
|
|
"""
|
|
Test that special characters (e.g. %, _ and \) stored in database are
|
|
properly escaped when using a case insensitive pattern lookup with an
|
|
expression -- refs #16731
|
|
"""
|
|
Employee.objects.bulk_create([
|
|
Employee(firstname="%Joh\\nny", lastname="%joh\\n"),
|
|
Employee(firstname="Johnny", lastname="%john"),
|
|
Employee(firstname="Jean-Claude", lastname="claud_"),
|
|
Employee(firstname="Jean-Claude", lastname="claude"),
|
|
Employee(firstname="Jean-Claude", lastname="claude%"),
|
|
Employee(firstname="Johnny", lastname="joh\\n"),
|
|
Employee(firstname="Johnny", lastname="john"),
|
|
Employee(firstname="Johnny", lastname="_ohn"),
|
|
])
|
|
|
|
self.assertQuerysetEqual(
|
|
Employee.objects.filter(firstname__icontains=F('lastname')),
|
|
["<Employee: %Joh\\nny %joh\\n>", "<Employee: Jean-Claude claude>", "<Employee: Johnny john>"],
|
|
ordered=False)
|
|
|
|
self.assertQuerysetEqual(
|
|
Employee.objects.filter(firstname__istartswith=F('lastname')),
|
|
["<Employee: %Joh\\nny %joh\\n>", "<Employee: Johnny john>"],
|
|
ordered=False)
|
|
|
|
self.assertQuerysetEqual(
|
|
Employee.objects.filter(firstname__iendswith=F('lastname')),
|
|
["<Employee: Jean-Claude claude>"],
|
|
ordered=False)
|
|
|
|
|
|
class ExpressionsNumericTests(TestCase):
|
|
|
|
def setUp(self):
|
|
Number(integer=-1).save()
|
|
Number(integer=42).save()
|
|
Number(integer=1337).save()
|
|
self.assertEqual(Number.objects.update(float=F('integer')), 3)
|
|
|
|
def test_fill_with_value_from_same_object(self):
|
|
"""
|
|
We can fill a value in all objects with an other value of the
|
|
same object.
|
|
"""
|
|
self.assertQuerysetEqual(
|
|
Number.objects.all(),
|
|
[
|
|
'<Number: -1, -1.000>',
|
|
'<Number: 42, 42.000>',
|
|
'<Number: 1337, 1337.000>'
|
|
],
|
|
ordered=False
|
|
)
|
|
|
|
def test_increment_value(self):
|
|
"""
|
|
We can increment a value of all objects in a query set.
|
|
"""
|
|
self.assertEqual(
|
|
Number.objects.filter(integer__gt=0)
|
|
.update(integer=F('integer') + 1),
|
|
2)
|
|
|
|
self.assertQuerysetEqual(
|
|
Number.objects.all(),
|
|
[
|
|
'<Number: -1, -1.000>',
|
|
'<Number: 43, 42.000>',
|
|
'<Number: 1338, 1337.000>'
|
|
],
|
|
ordered=False
|
|
)
|
|
|
|
def test_filter_not_equals_other_field(self):
|
|
"""
|
|
We can filter for objects, where a value is not equals the value
|
|
of an other field.
|
|
"""
|
|
self.assertEqual(
|
|
Number.objects.filter(integer__gt=0)
|
|
.update(integer=F('integer') + 1),
|
|
2)
|
|
self.assertQuerysetEqual(
|
|
Number.objects.exclude(float=F('integer')),
|
|
[
|
|
'<Number: 43, 42.000>',
|
|
'<Number: 1338, 1337.000>'
|
|
],
|
|
ordered=False
|
|
)
|
|
|
|
def test_complex_expressions(self):
|
|
"""
|
|
Complex expressions of different connection types are possible.
|
|
"""
|
|
n = Number.objects.create(integer=10, float=123.45)
|
|
self.assertEqual(Number.objects.filter(pk=n.pk).update(
|
|
float=F('integer') + F('float') * 2), 1)
|
|
|
|
self.assertEqual(Number.objects.get(pk=n.pk).integer, 10)
|
|
self.assertEqual(Number.objects.get(pk=n.pk).float, Approximate(256.900, places=3))
|
|
|
|
def test_incorrect_field_expression(self):
|
|
with six.assertRaisesRegex(self, FieldError, "Cannot resolve keyword u?'nope' into field.*"):
|
|
list(Employee.objects.filter(firstname=F('nope')))
|
|
|
|
|
|
class ExpressionOperatorTests(TestCase):
|
|
def setUp(self):
|
|
self.n = Number.objects.create(integer=42, float=15.5)
|
|
|
|
def test_lefthand_addition(self):
|
|
# LH Addition of floats and integers
|
|
Number.objects.filter(pk=self.n.pk).update(
|
|
integer=F('integer') + 15,
|
|
float=F('float') + 42.7
|
|
)
|
|
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 57)
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).float, Approximate(58.200, places=3))
|
|
|
|
def test_lefthand_subtraction(self):
|
|
# LH Subtraction of floats and integers
|
|
Number.objects.filter(pk=self.n.pk).update(integer=F('integer') - 15, float=F('float') - 42.7)
|
|
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 27)
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).float, Approximate(-27.200, places=3))
|
|
|
|
def test_lefthand_multiplication(self):
|
|
# Multiplication of floats and integers
|
|
Number.objects.filter(pk=self.n.pk).update(integer=F('integer') * 15, float=F('float') * 42.7)
|
|
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 630)
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).float, Approximate(661.850, places=3))
|
|
|
|
def test_lefthand_division(self):
|
|
# LH Division of floats and integers
|
|
Number.objects.filter(pk=self.n.pk).update(integer=F('integer') / 2, float=F('float') / 42.7)
|
|
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 21)
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).float, Approximate(0.363, places=3))
|
|
|
|
def test_lefthand_modulo(self):
|
|
# LH Modulo arithmetic on integers
|
|
Number.objects.filter(pk=self.n.pk).update(integer=F('integer') % 20)
|
|
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 2)
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).float, Approximate(15.500, places=3))
|
|
|
|
def test_lefthand_bitwise_and(self):
|
|
# LH Bitwise ands on integers
|
|
Number.objects.filter(pk=self.n.pk).update(integer=F('integer').bitand(56))
|
|
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 40)
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).float, Approximate(15.500, places=3))
|
|
|
|
@skipUnlessDBFeature('supports_bitwise_or')
|
|
def test_lefthand_bitwise_or(self):
|
|
# LH Bitwise or on integers
|
|
Number.objects.filter(pk=self.n.pk).update(integer=F('integer').bitor(48))
|
|
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 58)
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).float, Approximate(15.500, places=3))
|
|
|
|
def test_lefthand_power(self):
|
|
# LH Powert arithmetic operation on floats and integers
|
|
Number.objects.filter(pk=self.n.pk).update(integer=F('integer') ** 2, float=F('float') ** 1.5)
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 1764)
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).float, Approximate(61.02, places=2))
|
|
|
|
def test_right_hand_addition(self):
|
|
# Right hand operators
|
|
Number.objects.filter(pk=self.n.pk).update(integer=15 + F('integer'), float=42.7 + F('float'))
|
|
|
|
# RH Addition of floats and integers
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 57)
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).float, Approximate(58.200, places=3))
|
|
|
|
def test_right_hand_subtraction(self):
|
|
Number.objects.filter(pk=self.n.pk).update(integer=15 - F('integer'), float=42.7 - F('float'))
|
|
|
|
# RH Subtraction of floats and integers
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).integer, -27)
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).float, Approximate(27.200, places=3))
|
|
|
|
def test_right_hand_multiplication(self):
|
|
# RH Multiplication of floats and integers
|
|
Number.objects.filter(pk=self.n.pk).update(integer=15 * F('integer'), float=42.7 * F('float'))
|
|
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 630)
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).float, Approximate(661.850, places=3))
|
|
|
|
def test_right_hand_division(self):
|
|
# RH Division of floats and integers
|
|
Number.objects.filter(pk=self.n.pk).update(integer=640 / F('integer'), float=42.7 / F('float'))
|
|
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 15)
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).float, Approximate(2.755, places=3))
|
|
|
|
def test_right_hand_modulo(self):
|
|
# RH Modulo arithmetic on integers
|
|
Number.objects.filter(pk=self.n.pk).update(integer=69 % F('integer'))
|
|
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 27)
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).float, Approximate(15.500, places=3))
|
|
|
|
def test_righthand_power(self):
|
|
# RH Powert arithmetic operation on floats and integers
|
|
Number.objects.filter(pk=self.n.pk).update(integer=2 ** F('integer'), float=1.5 ** F('float'))
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 4398046511104)
|
|
self.assertEqual(Number.objects.get(pk=self.n.pk).float, Approximate(536.308, places=3))
|
|
|
|
|
|
class FTimeDeltaTests(TestCase):
|
|
|
|
@classmethod
|
|
def setUpTestData(cls):
|
|
cls.sday = sday = datetime.date(2010, 6, 25)
|
|
cls.stime = stime = datetime.datetime(2010, 6, 25, 12, 15, 30, 747000)
|
|
midnight = datetime.time(0)
|
|
|
|
delta0 = datetime.timedelta(0)
|
|
delta1 = datetime.timedelta(microseconds=253000)
|
|
delta2 = datetime.timedelta(seconds=44)
|
|
delta3 = datetime.timedelta(hours=21, minutes=8)
|
|
delta4 = datetime.timedelta(days=10)
|
|
|
|
# Test data is set so that deltas and delays will be
|
|
# strictly increasing.
|
|
cls.deltas = []
|
|
cls.delays = []
|
|
cls.days_long = []
|
|
|
|
# e0: started same day as assigned, zero duration
|
|
end = stime + delta0
|
|
e0 = Experiment.objects.create(
|
|
name='e0', assigned=sday, start=stime, end=end,
|
|
completed=end.date(), estimated_time=delta0,
|
|
)
|
|
cls.deltas.append(delta0)
|
|
cls.delays.append(e0.start - datetime.datetime.combine(e0.assigned, midnight))
|
|
cls.days_long.append(e0.completed - e0.assigned)
|
|
|
|
# e1: started one day after assigned, tiny duration, data
|
|
# set so that end time has no fractional seconds, which
|
|
# tests an edge case on sqlite. This Experiment is only
|
|
# included in the test data when the DB supports microsecond
|
|
# precision.
|
|
if connection.features.supports_microsecond_precision:
|
|
delay = datetime.timedelta(1)
|
|
end = stime + delay + delta1
|
|
e1 = Experiment.objects.create(
|
|
name='e1', assigned=sday, start=stime + delay, end=end,
|
|
completed=end.date(), estimated_time=delta1,
|
|
)
|
|
cls.deltas.append(delta1)
|
|
cls.delays.append(e1.start - datetime.datetime.combine(e1.assigned, midnight))
|
|
cls.days_long.append(e1.completed - e1.assigned)
|
|
|
|
# e2: started three days after assigned, small duration
|
|
end = stime + delta2
|
|
e2 = Experiment.objects.create(
|
|
name='e2', assigned=sday - datetime.timedelta(3), start=stime,
|
|
end=end, completed=end.date(), estimated_time=datetime.timedelta(hours=1),
|
|
)
|
|
cls.deltas.append(delta2)
|
|
cls.delays.append(e2.start - datetime.datetime.combine(e2.assigned, midnight))
|
|
cls.days_long.append(e2.completed - e2.assigned)
|
|
|
|
# e3: started four days after assigned, medium duration
|
|
delay = datetime.timedelta(4)
|
|
end = stime + delay + delta3
|
|
e3 = Experiment.objects.create(
|
|
name='e3', assigned=sday, start=stime + delay, end=end,
|
|
completed=end.date(), estimated_time=delta3,
|
|
)
|
|
cls.deltas.append(delta3)
|
|
cls.delays.append(e3.start - datetime.datetime.combine(e3.assigned, midnight))
|
|
cls.days_long.append(e3.completed - e3.assigned)
|
|
|
|
# e4: started 10 days after assignment, long duration
|
|
end = stime + delta4
|
|
e4 = Experiment.objects.create(
|
|
name='e4', assigned=sday - datetime.timedelta(10), start=stime,
|
|
end=end, completed=end.date(), estimated_time=delta4 - datetime.timedelta(1),
|
|
)
|
|
cls.deltas.append(delta4)
|
|
cls.delays.append(e4.start - datetime.datetime.combine(e4.assigned, midnight))
|
|
cls.days_long.append(e4.completed - e4.assigned)
|
|
cls.expnames = [e.name for e in Experiment.objects.all()]
|
|
|
|
def test_multiple_query_compilation(self):
|
|
# Ticket #21643
|
|
queryset = Experiment.objects.filter(end__lt=F('start') + datetime.timedelta(hours=1))
|
|
q1 = str(queryset.query)
|
|
q2 = str(queryset.query)
|
|
self.assertEqual(q1, q2)
|
|
|
|
def test_query_clone(self):
|
|
# Ticket #21643 - Crash when compiling query more than once
|
|
qs = Experiment.objects.filter(end__lt=F('start') + datetime.timedelta(hours=1))
|
|
qs2 = qs.all()
|
|
list(qs)
|
|
list(qs2)
|
|
# Intentionally no assert
|
|
|
|
def test_delta_add(self):
|
|
for i in range(len(self.deltas)):
|
|
delta = self.deltas[i]
|
|
test_set = [e.name for e in Experiment.objects.filter(end__lt=F('start') + delta)]
|
|
self.assertEqual(test_set, self.expnames[:i])
|
|
|
|
test_set = [e.name for e in Experiment.objects.filter(end__lt=delta + F('start'))]
|
|
self.assertEqual(test_set, self.expnames[:i])
|
|
|
|
test_set = [e.name for e in Experiment.objects.filter(end__lte=F('start') + delta)]
|
|
self.assertEqual(test_set, self.expnames[:i + 1])
|
|
|
|
def test_delta_subtract(self):
|
|
for i in range(len(self.deltas)):
|
|
delta = self.deltas[i]
|
|
test_set = [e.name for e in Experiment.objects.filter(start__gt=F('end') - delta)]
|
|
self.assertEqual(test_set, self.expnames[:i])
|
|
|
|
test_set = [e.name for e in Experiment.objects.filter(start__gte=F('end') - delta)]
|
|
self.assertEqual(test_set, self.expnames[:i + 1])
|
|
|
|
def test_exclude(self):
|
|
for i in range(len(self.deltas)):
|
|
delta = self.deltas[i]
|
|
test_set = [e.name for e in Experiment.objects.exclude(end__lt=F('start') + delta)]
|
|
self.assertEqual(test_set, self.expnames[i:])
|
|
|
|
test_set = [e.name for e in Experiment.objects.exclude(end__lte=F('start') + delta)]
|
|
self.assertEqual(test_set, self.expnames[i + 1:])
|
|
|
|
def test_date_comparison(self):
|
|
for i in range(len(self.days_long)):
|
|
days = self.days_long[i]
|
|
test_set = [e.name for e in Experiment.objects.filter(completed__lt=F('assigned') + days)]
|
|
self.assertEqual(test_set, self.expnames[:i])
|
|
|
|
test_set = [e.name for e in Experiment.objects.filter(completed__lte=F('assigned') + days)]
|
|
self.assertEqual(test_set, self.expnames[:i + 1])
|
|
|
|
@skipUnlessDBFeature("supports_mixed_date_datetime_comparisons")
|
|
def test_mixed_comparisons1(self):
|
|
for i in range(len(self.delays)):
|
|
delay = self.delays[i]
|
|
if not connection.features.supports_microsecond_precision:
|
|
delay = datetime.timedelta(delay.days, delay.seconds)
|
|
test_set = [e.name for e in Experiment.objects.filter(assigned__gt=F('start') - delay)]
|
|
self.assertEqual(test_set, self.expnames[:i])
|
|
|
|
test_set = [e.name for e in Experiment.objects.filter(assigned__gte=F('start') - delay)]
|
|
self.assertEqual(test_set, self.expnames[:i + 1])
|
|
|
|
def test_mixed_comparisons2(self):
|
|
delays = [datetime.timedelta(delay.days) for delay in self.delays]
|
|
for i in range(len(delays)):
|
|
delay = delays[i]
|
|
test_set = [e.name for e in Experiment.objects.filter(start__lt=F('assigned') + delay)]
|
|
self.assertEqual(test_set, self.expnames[:i])
|
|
|
|
test_set = [
|
|
e.name for e in Experiment.objects.filter(start__lte=F('assigned') + delay + datetime.timedelta(1))
|
|
]
|
|
self.assertEqual(test_set, self.expnames[:i + 1])
|
|
|
|
def test_delta_update(self):
|
|
for i in range(len(self.deltas)):
|
|
delta = self.deltas[i]
|
|
exps = Experiment.objects.all()
|
|
expected_durations = [e.duration() for e in exps]
|
|
expected_starts = [e.start + delta for e in exps]
|
|
expected_ends = [e.end + delta for e in exps]
|
|
|
|
Experiment.objects.update(start=F('start') + delta, end=F('end') + delta)
|
|
exps = Experiment.objects.all()
|
|
new_starts = [e.start for e in exps]
|
|
new_ends = [e.end for e in exps]
|
|
new_durations = [e.duration() for e in exps]
|
|
self.assertEqual(expected_starts, new_starts)
|
|
self.assertEqual(expected_ends, new_ends)
|
|
self.assertEqual(expected_durations, new_durations)
|
|
|
|
def test_invalid_operator(self):
|
|
with self.assertRaises(DatabaseError):
|
|
list(Experiment.objects.filter(start=F('start') * datetime.timedelta(0)))
|
|
|
|
def test_durationfield_add(self):
|
|
zeros = [e.name for e in Experiment.objects.filter(start=F('start') + F('estimated_time'))]
|
|
self.assertEqual(zeros, ['e0'])
|
|
|
|
end_less = [e.name for e in Experiment.objects.filter(end__lt=F('start') + F('estimated_time'))]
|
|
self.assertEqual(end_less, ['e2'])
|
|
|
|
delta_math = [
|
|
e.name for e in
|
|
Experiment.objects.filter(end__gte=F('start') + F('estimated_time') + datetime.timedelta(hours=1))
|
|
]
|
|
self.assertEqual(delta_math, ['e4'])
|
|
|
|
@skipUnlessDBFeature('supports_temporal_subtraction')
|
|
def test_date_subtraction(self):
|
|
queryset = Experiment.objects.annotate(
|
|
completion_duration=ExpressionWrapper(
|
|
F('completed') - F('assigned'), output_field=models.DurationField()
|
|
)
|
|
)
|
|
|
|
at_least_5_days = {e.name for e in queryset.filter(completion_duration__gte=datetime.timedelta(days=5))}
|
|
self.assertEqual(at_least_5_days, {'e3', 'e4'})
|
|
|
|
less_than_5_days = {e.name for e in queryset.filter(completion_duration__lt=datetime.timedelta(days=5))}
|
|
expected = {'e0', 'e2'}
|
|
if connection.features.supports_microsecond_precision:
|
|
expected.add('e1')
|
|
self.assertEqual(less_than_5_days, expected)
|
|
|
|
@skipUnlessDBFeature('supports_temporal_subtraction')
|
|
def test_time_subtraction(self):
|
|
if connection.features.supports_microsecond_precision:
|
|
time = datetime.time(12, 30, 15, 2345)
|
|
timedelta = datetime.timedelta(hours=1, minutes=15, seconds=15, microseconds=2345)
|
|
else:
|
|
time = datetime.time(12, 30, 15)
|
|
timedelta = datetime.timedelta(hours=1, minutes=15, seconds=15)
|
|
Time.objects.create(time=time)
|
|
queryset = Time.objects.annotate(
|
|
difference=ExpressionWrapper(
|
|
F('time') - Value(datetime.time(11, 15, 0), output_field=models.TimeField()),
|
|
output_field=models.DurationField(),
|
|
)
|
|
)
|
|
self.assertEqual(queryset.get().difference, timedelta)
|
|
|
|
@skipUnlessDBFeature('supports_temporal_subtraction')
|
|
def test_datetime_subtraction(self):
|
|
under_estimate = [
|
|
e.name for e in Experiment.objects.filter(estimated_time__gt=F('end') - F('start'))
|
|
]
|
|
self.assertEqual(under_estimate, ['e2'])
|
|
|
|
over_estimate = [
|
|
e.name for e in Experiment.objects.filter(estimated_time__lt=F('end') - F('start'))
|
|
]
|
|
self.assertEqual(over_estimate, ['e4'])
|
|
|
|
def test_duration_with_datetime(self):
|
|
# Exclude e1 which has very high precision so we can test this on all
|
|
# backends regardless of whether or not it supports
|
|
# microsecond_precision.
|
|
over_estimate = Experiment.objects.exclude(name='e1').filter(
|
|
completed__gt=self.stime + F('estimated_time'),
|
|
).order_by('name')
|
|
self.assertQuerysetEqual(over_estimate, ['e3', 'e4'], lambda e: e.name)
|
|
|
|
|
|
class ValueTests(TestCase):
|
|
def test_update_TimeField_using_Value(self):
|
|
Time.objects.create()
|
|
Time.objects.update(time=Value(datetime.time(1), output_field=TimeField()))
|
|
self.assertEqual(Time.objects.get().time, datetime.time(1))
|
|
|
|
def test_update_UUIDField_using_Value(self):
|
|
UUID.objects.create()
|
|
UUID.objects.update(uuid=Value(uuid.UUID('12345678901234567890123456789012'), output_field=UUIDField()))
|
|
self.assertEqual(UUID.objects.get().uuid, uuid.UUID('12345678901234567890123456789012'))
|
|
|
|
|
|
class ReprTests(TestCase):
|
|
|
|
def test_expressions(self):
|
|
self.assertEqual(
|
|
repr(Case(When(a=1))),
|
|
"<Case: CASE WHEN <Q: (AND: ('a', 1))> THEN Value(None), ELSE Value(None)>"
|
|
)
|
|
self.assertEqual(repr(Col('alias', 'field')), "Col(alias, field)")
|
|
self.assertEqual(repr(F('published')), "F(published)")
|
|
self.assertEqual(repr(F('cost') + F('tax')), "<CombinedExpression: F(cost) + F(tax)>")
|
|
self.assertEqual(
|
|
repr(ExpressionWrapper(F('cost') + F('tax'), models.IntegerField())),
|
|
"ExpressionWrapper(F(cost) + F(tax))"
|
|
)
|
|
self.assertEqual(repr(Func('published', function='TO_CHAR')), "Func(F(published), function=TO_CHAR)")
|
|
self.assertEqual(repr(OrderBy(Value(1))), 'OrderBy(Value(1), descending=False)')
|
|
self.assertEqual(repr(Random()), "Random()")
|
|
self.assertEqual(repr(RawSQL('table.col', [])), "RawSQL(table.col, [])")
|
|
self.assertEqual(repr(Ref('sum_cost', Sum('cost'))), "Ref(sum_cost, Sum(F(cost)))")
|
|
self.assertEqual(repr(Value(1)), "Value(1)")
|
|
|
|
def test_functions(self):
|
|
self.assertEqual(repr(Coalesce('a', 'b')), "Coalesce(F(a), F(b))")
|
|
self.assertEqual(repr(Concat('a', 'b')), "Concat(ConcatPair(F(a), F(b)))")
|
|
self.assertEqual(repr(Length('a')), "Length(F(a))")
|
|
self.assertEqual(repr(Lower('a')), "Lower(F(a))")
|
|
self.assertEqual(repr(Substr('a', 1, 3)), "Substr(F(a), Value(1), Value(3))")
|
|
self.assertEqual(repr(Upper('a')), "Upper(F(a))")
|
|
|
|
def test_aggregates(self):
|
|
self.assertEqual(repr(Avg('a')), "Avg(F(a))")
|
|
self.assertEqual(repr(Count('a')), "Count(F(a), distinct=False)")
|
|
self.assertEqual(repr(Count('*')), "Count('*', distinct=False)")
|
|
self.assertEqual(repr(Max('a')), "Max(F(a))")
|
|
self.assertEqual(repr(Min('a')), "Min(F(a))")
|
|
self.assertEqual(repr(StdDev('a')), "StdDev(F(a), sample=False)")
|
|
self.assertEqual(repr(Sum('a')), "Sum(F(a))")
|
|
self.assertEqual(repr(Variance('a', sample=True)), "Variance(F(a), sample=True)")
|