1
0
mirror of https://github.com/django/django.git synced 2024-12-22 09:05:43 +00:00

Refs #35718 -- Add JSONArray to django.db.models.functions.

Co-authored-by: Sarah Boyce <42296566+sarahboyce@users.noreply.github.com>
Co-authored-by: Mariusz Felisiak <felisiak.mariusz@gmail.com>
This commit is contained in:
John Parton 2024-09-04 18:13:05 -05:00
parent 94436dee57
commit d3fc786dad
5 changed files with 311 additions and 1 deletions

View File

@ -25,7 +25,7 @@ from .datetime import (
TruncWeek,
TruncYear,
)
from .json import JSONObject
from .json import JSONArray, JSONObject
from .math import (
Abs,
ACos,
@ -126,6 +126,7 @@ __all__ = [
"TruncWeek",
"TruncYear",
# json
"JSONArray",
"JSONObject",
# math
"Abs",

View File

@ -5,6 +5,68 @@ from django.db.models.fields.json import JSONField
from django.db.models.functions import Cast
class JSONArray(Func):
function = "JSON_ARRAY"
output_field = JSONField()
def as_sql(self, compiler, connection, **extra_context):
if not connection.features.supports_json_field:
raise NotSupportedError(
"JSONFields are not supported on this database backend."
)
return super().as_sql(compiler, connection, **extra_context)
def as_native(self, compiler, connection, *, returning, **extra_context):
"""
PostgreSQL 16+ and Oracle remove SQL NULL values from the array by
default. Adds the NULL ON NULL clause to keep NULL values in the array,
mapping them to JSON null values, which matches the behavior of SQLite.
"""
null_on_null = "NULL ON NULL" if len(self.get_source_expressions()) > 0 else ""
return self.as_sql(
compiler,
connection,
template=(
f"%(function)s(%(expressions)s {null_on_null} RETURNING {returning})"
),
**extra_context,
)
def as_postgresql(self, compiler, connection, **extra_context):
# Explicitly casting source expressions is only required using JSONB_BUILD_ARRAY
# or when using JSON_ARRAY on PostgreSQL 16+ with server-side bindings.
# This is done in all cases for consistency.
casted_obj = self.copy()
casted_obj.set_source_expressions(
[
(
# Conditional Cast to avoid unnecessary wrapping.
expression
if isinstance(expression, Cast)
else Cast(expression, expression.output_field)
)
for expression in casted_obj.get_source_expressions()
]
)
if connection.features.is_postgresql_16:
return casted_obj.as_native(
compiler, connection, returning="JSONB", **extra_context
)
return casted_obj.as_sql(
compiler,
connection,
function="JSONB_BUILD_ARRAY",
**extra_context,
)
def as_oracle(self, compiler, connection, **extra_context):
return self.as_native(compiler, connection, returning="CLOB", **extra_context)
class JSONObject(Func):
function = "JSON_OBJECT"
output_field = JSONField()

View File

@ -841,6 +841,33 @@ that deal with time-parts can be used with ``TimeField``:
JSON Functions
==============
``JSONArray``
-------------
.. versionadded:: 5.2
.. class:: JSONArray(*expressions)
Accepts a list of field names or expressions and returns a JSON array
containing those values.
Usage example:
.. code-block:: pycon
>>> from django.db.models import F
>>> from django.db.models.functions import JSONArray, Lower
>>> Author.objects.create(name="Margaret Smith", alias="msmith", age=25)
>>> author = Author.objects.annotate(
... json_array=JSONArray(
... Lower("name"),
... "alias",
... F("age") * 2,
... )
... ).get()
>>> author.json_array
['margaret smith', 'msmith', 50]
``JSONObject``
--------------

View File

@ -312,6 +312,10 @@ Models
* :meth:`.QuerySet.explain` now supports the ``memory`` and ``serialize``
options on PostgreSQL 17+.
* The new :class:`~django.db.models.functions.JSONArray` database function
accepts a list of field names or expressions and returns a JSON array
containing those values.
Requests and Responses
~~~~~~~~~~~~~~~~~~~~~~

View File

@ -0,0 +1,216 @@
import unittest
from django.db import NotSupportedError, connection
from django.db.models import CharField, F, Value
from django.db.models.functions import Cast, JSONArray, JSONObject, Lower
from django.test import TestCase
from django.test.testcases import skipIfDBFeature, skipUnlessDBFeature
from django.utils import timezone
from ..models import Article, Author
@skipUnlessDBFeature("supports_json_field")
class JSONArrayTests(TestCase):
@classmethod
def setUpTestData(cls):
Author.objects.bulk_create(
[
Author(name="Ivan Ivanov", alias="iivanov"),
Author(name="Bertha Berthy", alias="bberthy"),
]
)
def test_empty(self):
obj = Author.objects.annotate(json_array=JSONArray()).first()
self.assertEqual(obj.json_array, [])
def test_basic(self):
obj = Author.objects.annotate(
json_array=JSONArray(Value("name"), F("name"))
).first()
self.assertEqual(obj.json_array, ["name", "Ivan Ivanov"])
def test_expressions(self):
obj = Author.objects.annotate(
json_array=JSONArray(
Lower("name"),
F("alias"),
F("goes_by"),
Value(30000.15),
F("age") * 2,
)
).first()
self.assertEqual(
obj.json_array,
[
"ivan ivanov",
"iivanov",
None,
30000.15,
60,
],
)
def test_nested_json_array(self):
obj = Author.objects.annotate(
json_array=JSONArray(
F("name"),
JSONArray(F("alias"), F("age")),
)
).first()
self.assertEqual(
obj.json_array,
[
"Ivan Ivanov",
["iivanov", 30],
],
)
def test_nested_empty_json_array(self):
obj = Author.objects.annotate(
json_array=JSONArray(
F("name"),
JSONArray(),
)
).first()
self.assertEqual(
obj.json_array,
[
"Ivan Ivanov",
[],
],
)
def test_textfield(self):
Article.objects.create(
title="The Title",
text="x" * 4000,
written=timezone.now(),
)
obj = Article.objects.annotate(json_array=JSONArray(F("text"))).first()
self.assertEqual(obj.json_array, ["x" * 4000])
@unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL specific tests")
def test_explicit_cast(self):
with self.assertNumQueries(1) as ctx:
self.assertSequenceEqual(
Author.objects.annotate(
json_array=JSONArray(Cast("age", CharField()))
).values("json_array"),
[
{"json_array": ["30"]},
{"json_array": ["30"]},
],
)
sql = ctx.captured_queries[0]["sql"]
self.assertIn("::varchar", sql)
self.assertNotIn("::varchar)::varchar", sql)
def test_order_by_key(self):
qs = Author.objects.annotate(arr=JSONArray(F("alias"))).order_by("arr__0")
self.assertQuerySetEqual(qs, Author.objects.order_by("alias"))
def test_order_by_nested_key(self):
qs = Author.objects.annotate(arr=JSONArray(JSONArray(F("alias")))).order_by(
"-arr__0__0"
)
self.assertQuerySetEqual(qs, Author.objects.order_by("-alias"))
@skipIfDBFeature("supports_json_field")
class JSONArrayNotSupportedTests(TestCase):
def test_not_supported(self):
msg = "JSONArray() is not supported on this database backend."
with self.assertRaisesMessage(NotSupportedError, msg):
Author.objects.annotate(json_array=JSONArray()).get()
@skipUnlessDBFeature("has_json_object_function", "supports_json_field")
class JSONArrayObjectTests(TestCase):
@classmethod
def setUpTestData(cls):
Author.objects.bulk_create(
[
Author(name="Ivan Ivanov", alias="iivanov"),
Author(name="Bertha Berthy", alias="bberthy"),
]
)
def test_nested_json_array_object(self):
obj = Author.objects.annotate(
json_array=JSONArray(
JSONObject(
name1="name",
nested_json_object1=JSONObject(
alias1="alias",
age1="age",
),
),
JSONObject(
name2="name",
nested_json_object2=JSONObject(
alias2="alias",
age2="age",
),
),
)
).first()
self.assertEqual(
obj.json_array,
[
{
"name1": "Ivan Ivanov",
"nested_json_object1": {
"alias1": "iivanov",
"age1": 30,
},
},
{
"name2": "Ivan Ivanov",
"nested_json_object2": {
"alias2": "iivanov",
"age2": 30,
},
},
],
)
def test_nested_json_object_array(self):
obj = Author.objects.annotate(
json_object=JSONObject(
name="name",
nested_json_array=JSONArray(
JSONObject(
alias1="alias",
age1="age",
),
JSONObject(
alias2="alias",
age2="age",
),
),
)
).first()
self.assertEqual(
obj.json_object,
{
"name": "Ivan Ivanov",
"nested_json_array": [
{
"alias1": "iivanov",
"age1": 30,
},
{
"alias2": "iivanov",
"age2": 30,
},
],
},
)
def test_order_by_nested_key(self):
qs = Author.objects.annotate(
arr=JSONArray(JSONObject(alias=F("alias")))
).order_by("-arr__0__alias")
self.assertQuerySetEqual(qs, Author.objects.order_by("-alias"))