import operator import uuid from unittest import mock from django import forms from django.core import serializers from django.core.exceptions import ValidationError from django.core.serializers.json import DjangoJSONEncoder from django.db import ( DataError, IntegrityError, NotSupportedError, OperationalError, connection, models, ) from django.db.models import ( Count, ExpressionWrapper, F, IntegerField, JSONField, OuterRef, Q, Subquery, Transform, Value, ) from django.db.models.expressions import RawSQL from django.db.models.fields.json import ( KT, HasKey, KeyTextTransform, KeyTransform, KeyTransformFactory, KeyTransformTextLookupMixin, ) from django.db.models.functions import Cast from django.test import SimpleTestCase, TestCase, skipIfDBFeature, skipUnlessDBFeature from django.test.utils import CaptureQueriesContext from .models import CustomJSONDecoder, JSONModel, NullableJSONModel, RelatedJSONModel @skipUnlessDBFeature("supports_json_field") class JSONFieldTests(TestCase): def test_invalid_value(self): msg = "is not JSON serializable" with self.assertRaisesMessage(TypeError, msg): NullableJSONModel.objects.create( value={ "uuid": uuid.UUID("d85e2076-b67c-4ee7-8c3a-2bf5a2cc2475"), } ) def test_custom_encoder_decoder(self): value = {"uuid": uuid.UUID("{d85e2076-b67c-4ee7-8c3a-2bf5a2cc2475}")} obj = NullableJSONModel(value_custom=value) obj.clean_fields() obj.save() obj.refresh_from_db() self.assertEqual(obj.value_custom, value) def test_db_check_constraints(self): value = "{@!invalid json value 123 $!@#" with mock.patch.object(DjangoJSONEncoder, "encode", return_value=value): with self.assertRaises((IntegrityError, DataError, OperationalError)): NullableJSONModel.objects.create(value_custom=value) class TestMethods(SimpleTestCase): def test_deconstruct(self): field = models.JSONField() name, path, args, kwargs = field.deconstruct() self.assertEqual(path, "django.db.models.JSONField") self.assertEqual(args, []) self.assertEqual(kwargs, {}) def test_deconstruct_custom_encoder_decoder(self): field = models.JSONField(encoder=DjangoJSONEncoder, decoder=CustomJSONDecoder) name, path, args, kwargs = field.deconstruct() self.assertEqual(kwargs["encoder"], DjangoJSONEncoder) self.assertEqual(kwargs["decoder"], CustomJSONDecoder) def test_get_transforms(self): @models.JSONField.register_lookup class MyTransform(Transform): lookup_name = "my_transform" field = models.JSONField() transform = field.get_transform("my_transform") self.assertIs(transform, MyTransform) models.JSONField._unregister_lookup(MyTransform) transform = field.get_transform("my_transform") self.assertIsInstance(transform, KeyTransformFactory) def test_key_transform_text_lookup_mixin_non_key_transform(self): transform = Transform("test") msg = ( "Transform should be an instance of KeyTransform in order to use " "this lookup." ) with self.assertRaisesMessage(TypeError, msg): KeyTransformTextLookupMixin(transform) def test_get_prep_value(self): class JSONFieldGetPrepValue(models.JSONField): def get_prep_value(self, value): if value is True: return {"value": True} return value def noop_adapt_json_value(value, encoder): return value field = JSONFieldGetPrepValue() with mock.patch.object( connection.ops, "adapt_json_value", noop_adapt_json_value ): self.assertEqual( field.get_db_prep_value(True, connection, prepared=False), {"value": True}, ) self.assertIs( field.get_db_prep_value(True, connection, prepared=True), True ) self.assertEqual(field.get_db_prep_value(1, connection, prepared=False), 1) class TestValidation(SimpleTestCase): def test_invalid_encoder(self): msg = "The encoder parameter must be a callable object." with self.assertRaisesMessage(ValueError, msg): models.JSONField(encoder=DjangoJSONEncoder()) def test_invalid_decoder(self): msg = "The decoder parameter must be a callable object." with self.assertRaisesMessage(ValueError, msg): models.JSONField(decoder=CustomJSONDecoder()) def test_validation_error(self): field = models.JSONField() msg = "Value must be valid JSON." value = uuid.UUID("{d85e2076-b67c-4ee7-8c3a-2bf5a2cc2475}") with self.assertRaisesMessage(ValidationError, msg): field.clean({"uuid": value}, None) def test_custom_encoder(self): field = models.JSONField(encoder=DjangoJSONEncoder) value = uuid.UUID("{d85e2076-b67c-4ee7-8c3a-2bf5a2cc2475}") field.clean({"uuid": value}, None) class TestFormField(SimpleTestCase): def test_formfield(self): model_field = models.JSONField() form_field = model_field.formfield() self.assertIsInstance(form_field, forms.JSONField) def test_formfield_custom_encoder_decoder(self): model_field = models.JSONField( encoder=DjangoJSONEncoder, decoder=CustomJSONDecoder ) form_field = model_field.formfield() self.assertIs(form_field.encoder, DjangoJSONEncoder) self.assertIs(form_field.decoder, CustomJSONDecoder) class TestSerialization(SimpleTestCase): test_data = ( '[{"fields": {"value": %s}, "model": "model_fields.jsonmodel", "pk": null}]' ) test_values = ( # (Python value, serialized value), ({"a": "b", "c": None}, '{"a": "b", "c": null}'), ("abc", '"abc"'), ('{"a": "a"}', '"{\\"a\\": \\"a\\"}"'), ) def test_dumping(self): for value, serialized in self.test_values: with self.subTest(value=value): instance = JSONModel(value=value) data = serializers.serialize("json", [instance]) self.assertJSONEqual(data, self.test_data % serialized) def test_loading(self): for value, serialized in self.test_values: with self.subTest(value=value): instance = list( serializers.deserialize("json", self.test_data % serialized) )[0].object self.assertEqual(instance.value, value) def test_xml_serialization(self): test_xml_data = ( '' '' '%s' "" ) for value, serialized in self.test_values: with self.subTest(value=value): instance = NullableJSONModel(value=value) data = serializers.serialize("xml", [instance], fields=["value"]) self.assertXMLEqual(data, test_xml_data % serialized) new_instance = list(serializers.deserialize("xml", data))[0].object self.assertEqual(new_instance.value, instance.value) @skipUnlessDBFeature("supports_json_field") class TestSaveLoad(TestCase): def test_null(self): obj = NullableJSONModel(value=None) obj.save() obj.refresh_from_db() self.assertIsNone(obj.value) @skipUnlessDBFeature("supports_primitives_in_json_field") def test_json_null_different_from_sql_null(self): json_null = NullableJSONModel.objects.create(value=Value(None, JSONField())) NullableJSONModel.objects.update(value=Value(None, JSONField())) json_null.refresh_from_db() sql_null = NullableJSONModel.objects.create(value=None) sql_null.refresh_from_db() # 'null' is not equal to NULL in the database. self.assertSequenceEqual( NullableJSONModel.objects.filter(value=Value(None, JSONField())), [json_null], ) self.assertSequenceEqual( NullableJSONModel.objects.filter(value=None), [json_null], ) self.assertSequenceEqual( NullableJSONModel.objects.filter(value__isnull=True), [sql_null], ) # 'null' is equal to NULL in Python (None). self.assertEqual(json_null.value, sql_null.value) @skipUnlessDBFeature("supports_primitives_in_json_field") def test_primitives(self): values = [ True, 1, 1.45, "String", "", ] for value in values: with self.subTest(value=value): obj = JSONModel(value=value) obj.save() obj.refresh_from_db() self.assertEqual(obj.value, value) def test_dict(self): values = [ {}, {"name": "John", "age": 20, "height": 180.3}, {"a": True, "b": {"b1": False, "b2": None}}, ] for value in values: with self.subTest(value=value): obj = JSONModel.objects.create(value=value) obj.refresh_from_db() self.assertEqual(obj.value, value) def test_list(self): values = [ [], ["John", 20, 180.3], [True, [False, None]], ] for value in values: with self.subTest(value=value): obj = JSONModel.objects.create(value=value) obj.refresh_from_db() self.assertEqual(obj.value, value) def test_realistic_object(self): value = { "name": "John", "age": 20, "pets": [ {"name": "Kit", "type": "cat", "age": 2}, {"name": "Max", "type": "dog", "age": 1}, ], "courses": [ ["A1", "A2", "A3"], ["B1", "B2"], ["C1"], ], } obj = JSONModel.objects.create(value=value) obj.refresh_from_db() self.assertEqual(obj.value, value) @skipUnlessDBFeature("supports_json_field") class TestQuerying(TestCase): @classmethod def setUpTestData(cls): cls.primitives = [True, False, "yes", 7, 9.6] values = [ None, [], {}, {"a": "b", "c": 14}, { "a": "b", "c": 14, "d": ["e", {"f": "g"}], "h": True, "i": False, "j": None, "k": {"l": "m"}, "n": [None, True, False], "o": '"quoted"', "p": 4.2, "r": {"s": True, "t": False}, }, [1, [2]], {"k": True, "l": False, "foo": "bax"}, { "foo": "bar", "baz": {"a": "b", "c": "d"}, "bar": ["foo", "bar"], "bax": {"foo": "bar"}, }, ] cls.objs = [NullableJSONModel.objects.create(value=value) for value in values] if connection.features.supports_primitives_in_json_field: cls.objs.extend( [ NullableJSONModel.objects.create(value=value) for value in cls.primitives ] ) cls.raw_sql = "%s::jsonb" if connection.vendor == "postgresql" else "%s" def test_exact(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__exact={}), [self.objs[2]], ) def test_exact_complex(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__exact={"a": "b", "c": 14}), [self.objs[3]], ) def test_icontains(self): self.assertCountEqual( NullableJSONModel.objects.filter(value__icontains="BaX"), self.objs[6:8], ) def test_isnull(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__isnull=True), [self.objs[0]], ) def test_ordering_by_transform(self): mariadb = connection.vendor == "mysql" and connection.mysql_is_mariadb values = [ {"ord": 93, "name": "bar"}, {"ord": 22.1, "name": "foo"}, {"ord": -1, "name": "baz"}, {"ord": 21.931902, "name": "spam"}, {"ord": -100291029, "name": "eggs"}, ] for field_name in ["value", "value_custom"]: with self.subTest(field=field_name): objs = [ NullableJSONModel.objects.create(**{field_name: value}) for value in values ] query = NullableJSONModel.objects.filter( **{"%s__name__isnull" % field_name: False}, ).order_by("%s__ord" % field_name) expected = [objs[4], objs[2], objs[3], objs[1], objs[0]] if mariadb or connection.vendor == "oracle": # MariaDB and Oracle return JSON values as strings. expected = [objs[2], objs[4], objs[3], objs[1], objs[0]] self.assertSequenceEqual(query, expected) def test_ordering_grouping_by_key_transform(self): base_qs = NullableJSONModel.objects.filter(value__d__0__isnull=False) for qs in ( base_qs.order_by("value__d__0"), base_qs.annotate( key=KeyTransform("0", KeyTransform("d", "value")) ).order_by("key"), ): self.assertSequenceEqual(qs, [self.objs[4]]) none_val = "" if connection.features.interprets_empty_strings_as_nulls else None qs = NullableJSONModel.objects.filter(value__isnull=False) self.assertQuerySetEqual( qs.filter(value__isnull=False) .annotate(key=KT("value__d__1__f")) .values("key") .annotate(count=Count("key")) .order_by("count"), [(none_val, 0), ("g", 1)], operator.itemgetter("key", "count"), ) def test_ordering_grouping_by_count(self): qs = ( NullableJSONModel.objects.filter( value__isnull=False, ) .values("value__d__0") .annotate(count=Count("value__d__0")) .order_by("count") ) self.assertQuerySetEqual(qs, [0, 1], operator.itemgetter("count")) def test_order_grouping_custom_decoder(self): NullableJSONModel.objects.create(value_custom={"a": "b"}) qs = NullableJSONModel.objects.filter(value_custom__isnull=False) self.assertSequenceEqual( qs.values( "value_custom__a", ) .annotate( count=Count("id"), ) .order_by("value_custom__a"), [{"value_custom__a": "b", "count": 1}], ) def test_key_transform_raw_expression(self): expr = RawSQL(self.raw_sql, ['{"x": "bar"}']) self.assertSequenceEqual( NullableJSONModel.objects.filter(value__foo=KeyTransform("x", expr)), [self.objs[7]], ) def test_nested_key_transform_raw_expression(self): expr = RawSQL(self.raw_sql, ['{"x": {"y": "bar"}}']) self.assertSequenceEqual( NullableJSONModel.objects.filter( value__foo=KeyTransform("y", KeyTransform("x", expr)) ), [self.objs[7]], ) def test_key_transform_expression(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__d__0__isnull=False) .annotate( key=KeyTransform("d", "value"), chain=KeyTransform("0", "key"), expr=KeyTransform("0", Cast("key", models.JSONField())), ) .filter(chain=F("expr")), [self.objs[4]], ) def test_key_transform_annotation_expression(self): obj = NullableJSONModel.objects.create(value={"d": ["e", "e"]}) self.assertSequenceEqual( NullableJSONModel.objects.filter(value__d__0__isnull=False) .annotate( key=F("value__d"), chain=F("key__0"), expr=Cast("key", models.JSONField()), ) .filter(chain=F("expr__1")), [obj], ) def test_nested_key_transform_expression(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__d__0__isnull=False) .annotate( key=KeyTransform("d", "value"), chain=KeyTransform("f", KeyTransform("1", "key")), expr=KeyTransform( "f", KeyTransform("1", Cast("key", models.JSONField())) ), ) .filter(chain=F("expr")), [self.objs[4]], ) def test_nested_key_transform_annotation_expression(self): obj = NullableJSONModel.objects.create( value={"d": ["e", {"f": "g"}, {"f": "g"}]}, ) self.assertSequenceEqual( NullableJSONModel.objects.filter(value__d__0__isnull=False) .annotate( key=F("value__d"), chain=F("key__1__f"), expr=Cast("key", models.JSONField()), ) .filter(chain=F("expr__2__f")), [obj], ) def test_nested_key_transform_on_subquery(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__d__0__isnull=False) .annotate( subquery_value=Subquery( NullableJSONModel.objects.filter(pk=OuterRef("pk")).values("value") ), key=KeyTransform("d", "subquery_value"), chain=KeyTransform("f", KeyTransform("1", "key")), ) .filter(chain="g"), [self.objs[4]], ) def test_key_text_transform_char_lookup(self): qs = NullableJSONModel.objects.annotate( char_value=KeyTextTransform("foo", "value"), ).filter(char_value__startswith="bar") self.assertSequenceEqual(qs, [self.objs[7]]) qs = NullableJSONModel.objects.annotate( char_value=KeyTextTransform(1, KeyTextTransform("bar", "value")), ).filter(char_value__startswith="bar") self.assertSequenceEqual(qs, [self.objs[7]]) def test_expression_wrapper_key_transform(self): self.assertCountEqual( NullableJSONModel.objects.annotate( expr=ExpressionWrapper( KeyTransform("c", "value"), output_field=IntegerField(), ), ).filter(expr__isnull=False), self.objs[3:5], ) def test_has_key(self): self.assertCountEqual( NullableJSONModel.objects.filter(value__has_key="a"), [self.objs[3], self.objs[4]], ) def test_has_key_null_value(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__has_key="j"), [self.objs[4]], ) def test_has_key_deep(self): tests = [ (Q(value__baz__has_key="a"), self.objs[7]), ( Q(value__has_key=KeyTransform("a", KeyTransform("baz", "value"))), self.objs[7], ), (Q(value__has_key=F("value__baz__a")), self.objs[7]), ( Q(value__has_key=KeyTransform("c", KeyTransform("baz", "value"))), self.objs[7], ), (Q(value__has_key=F("value__baz__c")), self.objs[7]), (Q(value__d__1__has_key="f"), self.objs[4]), ( Q( value__has_key=KeyTransform( "f", KeyTransform("1", KeyTransform("d", "value")) ) ), self.objs[4], ), (Q(value__has_key=F("value__d__1__f")), self.objs[4]), ] for condition, expected in tests: with self.subTest(condition=condition): self.assertSequenceEqual( NullableJSONModel.objects.filter(condition), [expected], ) def test_has_key_literal_lookup(self): self.assertSequenceEqual( NullableJSONModel.objects.filter( HasKey(Value({"foo": "bar"}, JSONField()), "foo") ).order_by("id"), self.objs, ) def test_has_key_list(self): obj = NullableJSONModel.objects.create(value=[{"a": 1}, {"b": "x"}]) tests = [ Q(value__1__has_key="b"), Q(value__has_key=KeyTransform("b", KeyTransform(1, "value"))), Q(value__has_key=KeyTransform("b", KeyTransform("1", "value"))), Q(value__has_key=F("value__1__b")), ] for condition in tests: with self.subTest(condition=condition): self.assertSequenceEqual( NullableJSONModel.objects.filter(condition), [obj], ) def test_has_keys(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__has_keys=["a", "c", "h"]), [self.objs[4]], ) def test_has_any_keys(self): self.assertCountEqual( NullableJSONModel.objects.filter(value__has_any_keys=["c", "l"]), [self.objs[3], self.objs[4], self.objs[6]], ) def test_has_key_number(self): obj = NullableJSONModel.objects.create( value={ "123": "value", "nested": {"456": "bar", "lorem": "abc", "999": True}, "array": [{"789": "baz", "777": "def", "ipsum": 200}], "000": "val", } ) tests = [ Q(value__has_key="123"), Q(value__nested__has_key="456"), Q(value__array__0__has_key="789"), Q(value__has_keys=["nested", "123", "array", "000"]), Q(value__nested__has_keys=["lorem", "999", "456"]), Q(value__array__0__has_keys=["789", "ipsum", "777"]), Q(value__has_any_keys=["000", "nonexistent"]), Q(value__nested__has_any_keys=["999", "nonexistent"]), Q(value__array__0__has_any_keys=["777", "nonexistent"]), ] for condition in tests: with self.subTest(condition=condition): self.assertSequenceEqual( NullableJSONModel.objects.filter(condition), [obj], ) @skipUnlessDBFeature("supports_json_field_contains") def test_contains(self): tests = [ ({}, self.objs[2:5] + self.objs[6:8]), ({"baz": {"a": "b", "c": "d"}}, [self.objs[7]]), ({"baz": {"a": "b"}}, [self.objs[7]]), ({"baz": {"c": "d"}}, [self.objs[7]]), ({"k": True, "l": False}, [self.objs[6]]), ({"d": ["e", {"f": "g"}]}, [self.objs[4]]), ({"d": ["e"]}, [self.objs[4]]), ({"d": [{"f": "g"}]}, [self.objs[4]]), ([1, [2]], [self.objs[5]]), ([1], [self.objs[5]]), ([[2]], [self.objs[5]]), ({"n": [None, True, False]}, [self.objs[4]]), ({"j": None}, [self.objs[4]]), ] for value, expected in tests: with self.subTest(value=value): qs = NullableJSONModel.objects.filter(value__contains=value) self.assertCountEqual(qs, expected) @skipIfDBFeature("supports_json_field_contains") def test_contains_unsupported(self): msg = "contains lookup is not supported on this database backend." with self.assertRaisesMessage(NotSupportedError, msg): NullableJSONModel.objects.filter( value__contains={"baz": {"a": "b", "c": "d"}}, ).get() @skipUnlessDBFeature( "supports_primitives_in_json_field", "supports_json_field_contains", ) def test_contains_primitives(self): for value in self.primitives: with self.subTest(value=value): qs = NullableJSONModel.objects.filter(value__contains=value) self.assertIs(qs.exists(), True) @skipUnlessDBFeature("supports_json_field_contains") def test_contained_by(self): qs = NullableJSONModel.objects.filter( value__contained_by={"a": "b", "c": 14, "h": True} ) self.assertCountEqual(qs, self.objs[2:4]) @skipIfDBFeature("supports_json_field_contains") def test_contained_by_unsupported(self): msg = "contained_by lookup is not supported on this database backend." with self.assertRaisesMessage(NotSupportedError, msg): NullableJSONModel.objects.filter(value__contained_by={"a": "b"}).get() def test_deep_values(self): qs = NullableJSONModel.objects.values_list("value__k__l").order_by("pk") expected_objs = [(None,)] * len(self.objs) expected_objs[4] = ("m",) self.assertSequenceEqual(qs, expected_objs) @skipUnlessDBFeature("can_distinct_on_fields") def test_deep_distinct(self): query = NullableJSONModel.objects.distinct("value__k__l").values_list( "value__k__l" ) expected = [("m",), (None,)] if not connection.features.nulls_order_largest: expected.reverse() self.assertSequenceEqual(query, expected) def test_isnull_key(self): # key__isnull=False works the same as has_key='key'. self.assertCountEqual( NullableJSONModel.objects.filter(value__a__isnull=True), self.objs[:3] + self.objs[5:], ) self.assertCountEqual( NullableJSONModel.objects.filter(value__j__isnull=True), self.objs[:4] + self.objs[5:], ) self.assertCountEqual( NullableJSONModel.objects.filter(value__a__isnull=False), [self.objs[3], self.objs[4]], ) self.assertSequenceEqual( NullableJSONModel.objects.filter(value__j__isnull=False), [self.objs[4]], ) def test_isnull_key_or_none(self): obj = NullableJSONModel.objects.create(value={"a": None}) self.assertCountEqual( NullableJSONModel.objects.filter( Q(value__a__isnull=True) | Q(value__a=None) ), self.objs[:3] + self.objs[5:] + [obj], ) def test_none_key(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__j=None), [self.objs[4]], ) def test_none_key_exclude(self): obj = NullableJSONModel.objects.create(value={"j": 1}) if connection.vendor == "oracle": # Oracle supports filtering JSON objects with NULL keys, but the # current implementation doesn't support it. self.assertSequenceEqual( NullableJSONModel.objects.exclude(value__j=None), self.objs[1:4] + self.objs[5:] + [obj], ) else: self.assertSequenceEqual( NullableJSONModel.objects.exclude(value__j=None), [obj] ) def test_shallow_list_lookup(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__0=1), [self.objs[5]], ) def test_shallow_obj_lookup(self): self.assertCountEqual( NullableJSONModel.objects.filter(value__a="b"), [self.objs[3], self.objs[4]], ) def test_obj_subquery_lookup(self): qs = NullableJSONModel.objects.annotate( field=Subquery( NullableJSONModel.objects.filter(pk=OuterRef("pk")).values("value") ), ).filter(field__a="b") self.assertCountEqual(qs, [self.objs[3], self.objs[4]]) def test_deep_lookup_objs(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__k__l="m"), [self.objs[4]], ) def test_shallow_lookup_obj_target(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__k={"l": "m"}), [self.objs[4]], ) def test_deep_lookup_array(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__1__0=2), [self.objs[5]], ) def test_deep_lookup_mixed(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__d__1__f="g"), [self.objs[4]], ) def test_deep_lookup_transform(self): self.assertCountEqual( NullableJSONModel.objects.filter(value__c__gt=2), [self.objs[3], self.objs[4]], ) self.assertCountEqual( NullableJSONModel.objects.filter(value__c__gt=2.33), [self.objs[3], self.objs[4]], ) self.assertIs(NullableJSONModel.objects.filter(value__c__lt=5).exists(), False) def test_lookups_special_chars(self): test_keys = [ "CONTROL", "single'", "dollar$", "dot.dot", "with space", "back\\slash", "question?mark", "user@name", "emo🤡'ji", "com,ma", "curly{{{brace}}}s", "escape\uffff'seq'\uffffue\uffff'nce", ] json_value = {key: "some value" for key in test_keys} obj = NullableJSONModel.objects.create(value=json_value) obj.refresh_from_db() self.assertEqual(obj.value, json_value) for key in test_keys: lookups = { "has_key": Q(value__has_key=key), "has_keys": Q(value__has_keys=[key, "CONTROL"]), "has_any_keys": Q(value__has_any_keys=[key, "does_not_exist"]), "exact": Q(**{f"value__{key}": "some value"}), } for lookup, condition in lookups.items(): results = NullableJSONModel.objects.filter(condition) with self.subTest(key=key, lookup=lookup): self.assertSequenceEqual(results, [obj]) def test_lookups_special_chars_double_quotes(self): test_keys = [ 'double"', "m\\i@x. m🤡'a,t{{{ch}}}e?d$\"'es\uffff'ca\uffff'pe", ] json_value = {key: "some value" for key in test_keys} obj = NullableJSONModel.objects.create(value=json_value) obj.refresh_from_db() self.assertEqual(obj.value, json_value) self.assertSequenceEqual( NullableJSONModel.objects.filter(value__has_keys=test_keys), [obj] ) for key in test_keys: with self.subTest(key=key): results = NullableJSONModel.objects.filter( Q(value__has_key=key), Q(value__has_any_keys=[key, "does_not_exist"]), Q(**{f"value__{key}": "some value"}), ) self.assertSequenceEqual(results, [obj]) def test_lookup_exclude(self): tests = [ (Q(value__a="b"), [self.objs[0]]), (Q(value__foo="bax"), [self.objs[0], self.objs[7]]), ] for condition, expected in tests: self.assertCountEqual( NullableJSONModel.objects.exclude(condition), expected, ) self.assertCountEqual( NullableJSONModel.objects.filter(~condition), expected, ) def test_lookup_exclude_nonexistent_key(self): # Values without the key are ignored. condition = Q(value__foo="bax") objs_with_value = [self.objs[6]] objs_with_different_value = [self.objs[0], self.objs[7]] self.assertCountEqual( NullableJSONModel.objects.exclude(condition), objs_with_different_value, ) self.assertSequenceEqual( NullableJSONModel.objects.exclude(~condition), objs_with_value, ) self.assertCountEqual( NullableJSONModel.objects.filter(condition | ~condition), objs_with_value + objs_with_different_value, ) self.assertCountEqual( NullableJSONModel.objects.exclude(condition & ~condition), objs_with_value + objs_with_different_value, ) # Add the __isnull lookup to get an exhaustive set. self.assertCountEqual( NullableJSONModel.objects.exclude(condition & Q(value__foo__isnull=False)), self.objs[0:6] + self.objs[7:], ) self.assertSequenceEqual( NullableJSONModel.objects.filter(condition & Q(value__foo__isnull=False)), objs_with_value, ) def test_usage_in_subquery(self): self.assertCountEqual( NullableJSONModel.objects.filter( id__in=NullableJSONModel.objects.filter(value__c=14), ), self.objs[3:5], ) @skipUnlessDBFeature("supports_json_field_contains") def test_array_key_contains(self): tests = [ ([], [self.objs[7]]), ("bar", [self.objs[7]]), (["bar"], [self.objs[7]]), ("ar", []), ] for value, expected in tests: with self.subTest(value=value): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__bar__contains=value), expected, ) def test_key_iexact(self): self.assertIs( NullableJSONModel.objects.filter(value__foo__iexact="BaR").exists(), True ) self.assertIs( NullableJSONModel.objects.filter(value__foo__iexact='"BaR"').exists(), False ) def test_key_in(self): tests = [ ("value__c__in", [14], self.objs[3:5]), ("value__c__in", [14, 15], self.objs[3:5]), ("value__0__in", [1], [self.objs[5]]), ("value__0__in", [1, 3], [self.objs[5]]), ("value__foo__in", ["bar"], [self.objs[7]]), ( "value__foo__in", [KeyTransform("foo", KeyTransform("bax", "value"))], [self.objs[7]], ), ("value__foo__in", [F("value__bax__foo")], [self.objs[7]]), ( "value__foo__in", [KeyTransform("foo", KeyTransform("bax", "value")), "baz"], [self.objs[7]], ), ("value__foo__in", [F("value__bax__foo"), "baz"], [self.objs[7]]), ("value__foo__in", ["bar", "baz"], [self.objs[7]]), ("value__bar__in", [["foo", "bar"]], [self.objs[7]]), ("value__bar__in", [["foo", "bar"], ["a"]], [self.objs[7]]), ("value__bax__in", [{"foo": "bar"}, {"a": "b"}], [self.objs[7]]), ("value__h__in", [True, "foo"], [self.objs[4]]), ("value__i__in", [False, "foo"], [self.objs[4]]), ] for lookup, value, expected in tests: with self.subTest(lookup=lookup, value=value): self.assertCountEqual( NullableJSONModel.objects.filter(**{lookup: value}), expected, ) def test_key_values(self): qs = NullableJSONModel.objects.filter(value__h=True) tests = [ ("value__a", "b"), ("value__c", 14), ("value__d", ["e", {"f": "g"}]), ("value__h", True), ("value__i", False), ("value__j", None), ("value__k", {"l": "m"}), ("value__n", [None, True, False]), ("value__p", 4.2), ("value__r", {"s": True, "t": False}), ] for lookup, expected in tests: with self.subTest(lookup=lookup): self.assertEqual(qs.values_list(lookup, flat=True).get(), expected) def test_key_values_boolean(self): qs = NullableJSONModel.objects.filter(value__h=True, value__i=False) tests = [ ("value__h", True), ("value__i", False), ] for lookup, expected in tests: with self.subTest(lookup=lookup): self.assertIs(qs.values_list(lookup, flat=True).get(), expected) @skipUnlessDBFeature("supports_json_field_contains") def test_key_contains(self): self.assertIs( NullableJSONModel.objects.filter(value__foo__contains="ar").exists(), False ) self.assertIs( NullableJSONModel.objects.filter(value__foo__contains="bar").exists(), True ) def test_key_icontains(self): self.assertIs( NullableJSONModel.objects.filter(value__foo__icontains="Ar").exists(), True ) def test_key_startswith(self): self.assertIs( NullableJSONModel.objects.filter(value__foo__startswith="b").exists(), True ) def test_key_istartswith(self): self.assertIs( NullableJSONModel.objects.filter(value__foo__istartswith="B").exists(), True ) def test_key_endswith(self): self.assertIs( NullableJSONModel.objects.filter(value__foo__endswith="r").exists(), True ) def test_key_iendswith(self): self.assertIs( NullableJSONModel.objects.filter(value__foo__iendswith="R").exists(), True ) def test_key_regex(self): self.assertIs( NullableJSONModel.objects.filter(value__foo__regex=r"^bar$").exists(), True ) def test_key_iregex(self): self.assertIs( NullableJSONModel.objects.filter(value__foo__iregex=r"^bAr$").exists(), True ) def test_key_quoted_string(self): self.assertEqual( NullableJSONModel.objects.filter(value__o='"quoted"').get(), self.objs[4], ) @skipUnlessDBFeature("has_json_operators") def test_key_sql_injection(self): with CaptureQueriesContext(connection) as queries: self.assertIs( NullableJSONModel.objects.filter( **{ """value__test' = '"a"') OR 1 = 1 OR ('d""": "x", } ).exists(), False, ) self.assertIn( """."value" -> 'test'' = ''"a"'') OR 1 = 1 OR (''d') = '"x"'""", queries[0]["sql"], ) @skipIfDBFeature("has_json_operators") def test_key_sql_injection_escape(self): query = str( JSONModel.objects.filter( **{ """value__test") = '"a"' OR 1 = 1 OR ("d""": "x", } ).query ) self.assertIn('"test\\"', query) self.assertIn('\\"d', query) def test_key_escape(self): obj = NullableJSONModel.objects.create(value={"%total": 10}) self.assertEqual( NullableJSONModel.objects.filter(**{"value__%total": 10}).get(), obj ) def test_none_key_and_exact_lookup(self): self.assertSequenceEqual( NullableJSONModel.objects.filter(value__a="b", value__j=None), [self.objs[4]], ) def test_lookups_with_key_transform(self): tests = ( ("value__baz__has_key", "c"), ("value__baz__has_keys", ["a", "c"]), ("value__baz__has_any_keys", ["a", "x"]), ("value__has_key", KeyTextTransform("foo", "value")), ) for lookup, value in tests: with self.subTest(lookup=lookup): self.assertIs( NullableJSONModel.objects.filter( **{lookup: value}, ).exists(), True, ) @skipUnlessDBFeature("supports_json_field_contains") def test_contains_contained_by_with_key_transform(self): tests = [ ("value__d__contains", "e"), ("value__d__contains", [{"f": "g"}]), ("value__contains", KeyTransform("bax", "value")), ("value__contains", F("value__bax")), ("value__baz__contains", {"a": "b"}), ("value__baz__contained_by", {"a": "b", "c": "d", "e": "f"}), ( "value__contained_by", KeyTransform( "x", RawSQL( self.raw_sql, ['{"x": {"a": "b", "c": 1, "d": "e"}}'], ), ), ), ] # For databases where {'f': 'g'} (without surrounding []) matches # [{'f': 'g'}]. if not connection.features.json_key_contains_list_matching_requires_list: tests.append(("value__d__contains", {"f": "g"})) for lookup, value in tests: with self.subTest(lookup=lookup, value=value): self.assertIs( NullableJSONModel.objects.filter( **{lookup: value}, ).exists(), True, ) def test_join_key_transform_annotation_expression(self): related_obj = RelatedJSONModel.objects.create( value={"d": ["f", "e"]}, json_model=self.objs[4], ) RelatedJSONModel.objects.create( value={"d": ["e", "f"]}, json_model=self.objs[4], ) self.assertSequenceEqual( RelatedJSONModel.objects.annotate( key=F("value__d"), related_key=F("json_model__value__d"), chain=F("key__1"), expr=Cast("key", models.JSONField()), ).filter(chain=F("related_key__0")), [related_obj], ) def test_key_text_transform_from_lookup(self): qs = NullableJSONModel.objects.annotate(b=KT("value__bax__foo")).filter( b__contains="ar", ) self.assertSequenceEqual(qs, [self.objs[7]]) qs = NullableJSONModel.objects.annotate(c=KT("value__o")).filter( c__contains="uot", ) self.assertSequenceEqual(qs, [self.objs[4]]) def test_key_text_transform_from_lookup_invalid(self): msg = "Lookup must contain key or index transforms." with self.assertRaisesMessage(ValueError, msg): KT("value") with self.assertRaisesMessage(ValueError, msg): KT("") def test_literal_annotation_filtering(self): all_objects = NullableJSONModel.objects.order_by("id") qs = all_objects.annotate(data=Value({"foo": "bar"}, JSONField())).filter( data__foo="bar" ) self.assertQuerySetEqual(qs, all_objects)