1
0
mirror of https://github.com/django/django.git synced 2025-01-04 07:26:38 +00:00
django/tests/postgres_tests/test_hstore.py
Simon Charette 6c3dfba892 Fixed #30769 -- Fixed a crash when filtering against a subquery JSON/HStoreField annotation.
This was a regression introduced by 7deeabc7c7
to address CVE-2019-14234.

Thanks Tim Kleinschmidt for the report and Mariusz for the tests.
2019-09-16 08:24:40 +02:00

399 lines
14 KiB
Python

import json
from django.core import checks, exceptions, serializers
from django.db import connection
from django.db.models.expressions import OuterRef, RawSQL, Subquery
from django.forms import Form
from django.test.utils import CaptureQueriesContext, isolate_apps
from . import PostgreSQLSimpleTestCase, PostgreSQLTestCase
from .models import HStoreModel, PostgreSQLModel
try:
from django.contrib.postgres import forms
from django.contrib.postgres.fields import HStoreField
from django.contrib.postgres.fields.hstore import KeyTransform
from django.contrib.postgres.validators import KeysValidator
except ImportError:
pass
class SimpleTests(PostgreSQLTestCase):
def test_save_load_success(self):
value = {'a': 'b'}
instance = HStoreModel(field=value)
instance.save()
reloaded = HStoreModel.objects.get()
self.assertEqual(reloaded.field, value)
def test_null(self):
instance = HStoreModel(field=None)
instance.save()
reloaded = HStoreModel.objects.get()
self.assertIsNone(reloaded.field)
def test_value_null(self):
value = {'a': None}
instance = HStoreModel(field=value)
instance.save()
reloaded = HStoreModel.objects.get()
self.assertEqual(reloaded.field, value)
def test_key_val_cast_to_string(self):
value = {'a': 1, 'b': 'B', 2: 'c', 'ï': 'ê'}
expected_value = {'a': '1', 'b': 'B', '2': 'c', 'ï': 'ê'}
instance = HStoreModel.objects.create(field=value)
instance = HStoreModel.objects.get()
self.assertEqual(instance.field, expected_value)
instance = HStoreModel.objects.get(field__a=1)
self.assertEqual(instance.field, expected_value)
instance = HStoreModel.objects.get(field__has_keys=[2, 'a', 'ï'])
self.assertEqual(instance.field, expected_value)
def test_array_field(self):
value = [
{'a': 1, 'b': 'B', 2: 'c', 'ï': 'ê'},
{'a': 1, 'b': 'B', 2: 'c', 'ï': 'ê'},
]
expected_value = [
{'a': '1', 'b': 'B', '2': 'c', 'ï': 'ê'},
{'a': '1', 'b': 'B', '2': 'c', 'ï': 'ê'},
]
instance = HStoreModel.objects.create(array_field=value)
instance.refresh_from_db()
self.assertEqual(instance.array_field, expected_value)
class TestQuerying(PostgreSQLTestCase):
@classmethod
def setUpTestData(cls):
cls.objs = HStoreModel.objects.bulk_create([
HStoreModel(field={'a': 'b'}),
HStoreModel(field={'a': 'b', 'c': 'd'}),
HStoreModel(field={'c': 'd'}),
HStoreModel(field={}),
HStoreModel(field=None),
])
def test_exact(self):
self.assertSequenceEqual(
HStoreModel.objects.filter(field__exact={'a': 'b'}),
self.objs[:1]
)
def test_contained_by(self):
self.assertSequenceEqual(
HStoreModel.objects.filter(field__contained_by={'a': 'b', 'c': 'd'}),
self.objs[:4]
)
def test_contains(self):
self.assertSequenceEqual(
HStoreModel.objects.filter(field__contains={'a': 'b'}),
self.objs[:2]
)
def test_in_generator(self):
def search():
yield {'a': 'b'}
self.assertSequenceEqual(
HStoreModel.objects.filter(field__in=search()),
self.objs[:1]
)
def test_has_key(self):
self.assertSequenceEqual(
HStoreModel.objects.filter(field__has_key='c'),
self.objs[1:3]
)
def test_has_keys(self):
self.assertSequenceEqual(
HStoreModel.objects.filter(field__has_keys=['a', 'c']),
self.objs[1:2]
)
def test_has_any_keys(self):
self.assertSequenceEqual(
HStoreModel.objects.filter(field__has_any_keys=['a', 'c']),
self.objs[:3]
)
def test_key_transform(self):
self.assertSequenceEqual(
HStoreModel.objects.filter(field__a='b'),
self.objs[:2]
)
def test_key_transform_raw_expression(self):
expr = RawSQL('%s::hstore', ['x => b, y => c'])
self.assertSequenceEqual(
HStoreModel.objects.filter(field__a=KeyTransform('x', expr)),
self.objs[:2]
)
def test_keys(self):
self.assertSequenceEqual(
HStoreModel.objects.filter(field__keys=['a']),
self.objs[:1]
)
def test_values(self):
self.assertSequenceEqual(
HStoreModel.objects.filter(field__values=['b']),
self.objs[:1]
)
def test_field_chaining(self):
self.assertSequenceEqual(
HStoreModel.objects.filter(field__a__contains='b'),
self.objs[:2]
)
def test_order_by_field(self):
more_objs = (
HStoreModel.objects.create(field={'g': '637'}),
HStoreModel.objects.create(field={'g': '002'}),
HStoreModel.objects.create(field={'g': '042'}),
HStoreModel.objects.create(field={'g': '981'}),
)
self.assertSequenceEqual(
HStoreModel.objects.filter(field__has_key='g').order_by('field__g'),
[more_objs[1], more_objs[2], more_objs[0], more_objs[3]]
)
def test_keys_contains(self):
self.assertSequenceEqual(
HStoreModel.objects.filter(field__keys__contains=['a']),
self.objs[:2]
)
def test_values_overlap(self):
self.assertSequenceEqual(
HStoreModel.objects.filter(field__values__overlap=['b', 'd']),
self.objs[:3]
)
def test_key_isnull(self):
obj = HStoreModel.objects.create(field={'a': None})
self.assertSequenceEqual(
HStoreModel.objects.filter(field__a__isnull=True),
self.objs[2:5] + [obj]
)
self.assertSequenceEqual(
HStoreModel.objects.filter(field__a__isnull=False),
self.objs[:2]
)
def test_usage_in_subquery(self):
self.assertSequenceEqual(
HStoreModel.objects.filter(id__in=HStoreModel.objects.filter(field__a='b')),
self.objs[:2]
)
def test_key_sql_injection(self):
with CaptureQueriesContext(connection) as queries:
self.assertFalse(
HStoreModel.objects.filter(**{
"field__test' = 'a') OR 1 = 1 OR ('d": 'x',
}).exists()
)
self.assertIn(
"""."field" -> 'test'' = ''a'') OR 1 = 1 OR (''d') = 'x' """,
queries[0]['sql'],
)
def test_obj_subquery_lookup(self):
qs = HStoreModel.objects.annotate(
value=Subquery(HStoreModel.objects.filter(pk=OuterRef('pk')).values('field')),
).filter(value__a='b')
self.assertSequenceEqual(qs, self.objs[:2])
@isolate_apps('postgres_tests')
class TestChecks(PostgreSQLSimpleTestCase):
def test_invalid_default(self):
class MyModel(PostgreSQLModel):
field = HStoreField(default={})
model = MyModel()
self.assertEqual(model.check(), [
checks.Warning(
msg=(
"HStoreField default should be a callable instead of an "
"instance so that it's not shared between all field "
"instances."
),
hint='Use a callable instead, e.g., use `dict` instead of `{}`.',
obj=MyModel._meta.get_field('field'),
id='postgres.E003',
)
])
def test_valid_default(self):
class MyModel(PostgreSQLModel):
field = HStoreField(default=dict)
self.assertEqual(MyModel().check(), [])
class TestSerialization(PostgreSQLSimpleTestCase):
test_data = json.dumps([{
'model': 'postgres_tests.hstoremodel',
'pk': None,
'fields': {
'field': json.dumps({'a': 'b'}),
'array_field': json.dumps([
json.dumps({'a': 'b'}),
json.dumps({'b': 'a'}),
]),
},
}])
def test_dumping(self):
instance = HStoreModel(field={'a': 'b'}, array_field=[{'a': 'b'}, {'b': 'a'}])
data = serializers.serialize('json', [instance])
self.assertEqual(json.loads(data), json.loads(self.test_data))
def test_loading(self):
instance = list(serializers.deserialize('json', self.test_data))[0].object
self.assertEqual(instance.field, {'a': 'b'})
self.assertEqual(instance.array_field, [{'a': 'b'}, {'b': 'a'}])
def test_roundtrip_with_null(self):
instance = HStoreModel(field={'a': 'b', 'c': None})
data = serializers.serialize('json', [instance])
new_instance = list(serializers.deserialize('json', data))[0].object
self.assertEqual(instance.field, new_instance.field)
class TestValidation(PostgreSQLSimpleTestCase):
def test_not_a_string(self):
field = HStoreField()
with self.assertRaises(exceptions.ValidationError) as cm:
field.clean({'a': 1}, None)
self.assertEqual(cm.exception.code, 'not_a_string')
self.assertEqual(cm.exception.message % cm.exception.params, 'The value of “a” is not a string or null.')
def test_none_allowed_as_value(self):
field = HStoreField()
self.assertEqual(field.clean({'a': None}, None), {'a': None})
class TestFormField(PostgreSQLSimpleTestCase):
def test_valid(self):
field = forms.HStoreField()
value = field.clean('{"a": "b"}')
self.assertEqual(value, {'a': 'b'})
def test_invalid_json(self):
field = forms.HStoreField()
with self.assertRaises(exceptions.ValidationError) as cm:
field.clean('{"a": "b"')
self.assertEqual(cm.exception.messages[0], 'Could not load JSON data.')
self.assertEqual(cm.exception.code, 'invalid_json')
def test_non_dict_json(self):
field = forms.HStoreField()
msg = 'Input must be a JSON dictionary.'
with self.assertRaisesMessage(exceptions.ValidationError, msg) as cm:
field.clean('["a", "b", 1]')
self.assertEqual(cm.exception.code, 'invalid_format')
def test_not_string_values(self):
field = forms.HStoreField()
value = field.clean('{"a": 1}')
self.assertEqual(value, {'a': '1'})
def test_none_value(self):
field = forms.HStoreField()
value = field.clean('{"a": null}')
self.assertEqual(value, {'a': None})
def test_empty(self):
field = forms.HStoreField(required=False)
value = field.clean('')
self.assertEqual(value, {})
def test_model_field_formfield(self):
model_field = HStoreField()
form_field = model_field.formfield()
self.assertIsInstance(form_field, forms.HStoreField)
def test_field_has_changed(self):
class HStoreFormTest(Form):
f1 = forms.HStoreField()
form_w_hstore = HStoreFormTest()
self.assertFalse(form_w_hstore.has_changed())
form_w_hstore = HStoreFormTest({'f1': '{"a": 1}'})
self.assertTrue(form_w_hstore.has_changed())
form_w_hstore = HStoreFormTest({'f1': '{"a": 1}'}, initial={'f1': '{"a": 1}'})
self.assertFalse(form_w_hstore.has_changed())
form_w_hstore = HStoreFormTest({'f1': '{"a": 2}'}, initial={'f1': '{"a": 1}'})
self.assertTrue(form_w_hstore.has_changed())
form_w_hstore = HStoreFormTest({'f1': '{"a": 1}'}, initial={'f1': {"a": 1}})
self.assertFalse(form_w_hstore.has_changed())
form_w_hstore = HStoreFormTest({'f1': '{"a": 2}'}, initial={'f1': {"a": 1}})
self.assertTrue(form_w_hstore.has_changed())
class TestValidator(PostgreSQLSimpleTestCase):
def test_simple_valid(self):
validator = KeysValidator(keys=['a', 'b'])
validator({'a': 'foo', 'b': 'bar', 'c': 'baz'})
def test_missing_keys(self):
validator = KeysValidator(keys=['a', 'b'])
with self.assertRaises(exceptions.ValidationError) as cm:
validator({'a': 'foo', 'c': 'baz'})
self.assertEqual(cm.exception.messages[0], 'Some keys were missing: b')
self.assertEqual(cm.exception.code, 'missing_keys')
def test_strict_valid(self):
validator = KeysValidator(keys=['a', 'b'], strict=True)
validator({'a': 'foo', 'b': 'bar'})
def test_extra_keys(self):
validator = KeysValidator(keys=['a', 'b'], strict=True)
with self.assertRaises(exceptions.ValidationError) as cm:
validator({'a': 'foo', 'b': 'bar', 'c': 'baz'})
self.assertEqual(cm.exception.messages[0], 'Some unknown keys were provided: c')
self.assertEqual(cm.exception.code, 'extra_keys')
def test_custom_messages(self):
messages = {
'missing_keys': 'Foobar',
}
validator = KeysValidator(keys=['a', 'b'], strict=True, messages=messages)
with self.assertRaises(exceptions.ValidationError) as cm:
validator({'a': 'foo', 'c': 'baz'})
self.assertEqual(cm.exception.messages[0], 'Foobar')
self.assertEqual(cm.exception.code, 'missing_keys')
with self.assertRaises(exceptions.ValidationError) as cm:
validator({'a': 'foo', 'b': 'bar', 'c': 'baz'})
self.assertEqual(cm.exception.messages[0], 'Some unknown keys were provided: c')
self.assertEqual(cm.exception.code, 'extra_keys')
def test_deconstruct(self):
messages = {
'missing_keys': 'Foobar',
}
validator = KeysValidator(keys=['a', 'b'], strict=True, messages=messages)
path, args, kwargs = validator.deconstruct()
self.assertEqual(path, 'django.contrib.postgres.validators.KeysValidator')
self.assertEqual(args, ())
self.assertEqual(kwargs, {'keys': ['a', 'b'], 'strict': True, 'messages': messages})