mirror of
https://github.com/django/django.git
synced 2025-02-10 17:35:38 +00:00
7b21bfc074
prevent test order dependant failures. This involves introducing usage of `TestCase.urls` and implementing proper admin.py modules for some of the test apps. Thanks Florian Apolloner for finding the issue and contributing the patch. Refs #15294 (it solves these problems so the fix for that ticket we are going to commit doesn't introduce obscure and hard to reproduce test failures when running the Django test suite.) git-svn-id: http://code.djangoproject.com/svn/django/trunk@16856 bcc190cf-cafb-0310-a4f2-bffc1f526a37
1610 lines
70 KiB
Python
1610 lines
70 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Unit tests for cache framework
|
|
# Uses whatever cache backend is set in the test settings file.
|
|
from __future__ import with_statement
|
|
|
|
import hashlib
|
|
import os
|
|
import re
|
|
import tempfile
|
|
import time
|
|
import warnings
|
|
|
|
from django.conf import settings
|
|
from django.core import management
|
|
from django.core.cache import get_cache, DEFAULT_CACHE_ALIAS
|
|
from django.core.cache.backends.base import (CacheKeyWarning,
|
|
InvalidCacheBackendError)
|
|
from django.http import HttpResponse, HttpRequest, QueryDict
|
|
from django.middleware.cache import (FetchFromCacheMiddleware,
|
|
UpdateCacheMiddleware, CacheMiddleware)
|
|
from django.template import Template
|
|
from django.template.response import TemplateResponse
|
|
from django.test import TestCase, RequestFactory
|
|
from django.test.utils import (get_warnings_state, restore_warnings_state,
|
|
override_settings)
|
|
from django.utils import translation, unittest
|
|
from django.utils.cache import (patch_vary_headers, get_cache_key,
|
|
learn_cache_key, patch_cache_control, patch_response_headers)
|
|
from django.views.decorators.cache import cache_page
|
|
|
|
from regressiontests.cache.models import Poll, expensive_calculation
|
|
|
|
# functions/classes for complex data type tests
|
|
def f():
|
|
return 42
|
|
class C:
|
|
def m(n):
|
|
return 24
|
|
|
|
class DummyCacheTests(unittest.TestCase):
|
|
# The Dummy cache backend doesn't really behave like a test backend,
|
|
# so it has different test requirements.
|
|
backend_name = 'django.core.cache.backends.dummy.DummyCache'
|
|
|
|
def setUp(self):
|
|
self.cache = get_cache(self.backend_name)
|
|
|
|
def test_simple(self):
|
|
"Dummy cache backend ignores cache set calls"
|
|
self.cache.set("key", "value")
|
|
self.assertEqual(self.cache.get("key"), None)
|
|
|
|
def test_add(self):
|
|
"Add doesn't do anything in dummy cache backend"
|
|
self.cache.add("addkey1", "value")
|
|
result = self.cache.add("addkey1", "newvalue")
|
|
self.assertEqual(result, True)
|
|
self.assertEqual(self.cache.get("addkey1"), None)
|
|
|
|
def test_non_existent(self):
|
|
"Non-existent keys aren't found in the dummy cache backend"
|
|
self.assertEqual(self.cache.get("does_not_exist"), None)
|
|
self.assertEqual(self.cache.get("does_not_exist", "bang!"), "bang!")
|
|
|
|
def test_get_many(self):
|
|
"get_many returns nothing for the dummy cache backend"
|
|
self.cache.set('a', 'a')
|
|
self.cache.set('b', 'b')
|
|
self.cache.set('c', 'c')
|
|
self.cache.set('d', 'd')
|
|
self.assertEqual(self.cache.get_many(['a', 'c', 'd']), {})
|
|
self.assertEqual(self.cache.get_many(['a', 'b', 'e']), {})
|
|
|
|
def test_delete(self):
|
|
"Cache deletion is transparently ignored on the dummy cache backend"
|
|
self.cache.set("key1", "spam")
|
|
self.cache.set("key2", "eggs")
|
|
self.assertEqual(self.cache.get("key1"), None)
|
|
self.cache.delete("key1")
|
|
self.assertEqual(self.cache.get("key1"), None)
|
|
self.assertEqual(self.cache.get("key2"), None)
|
|
|
|
def test_has_key(self):
|
|
"The has_key method doesn't ever return True for the dummy cache backend"
|
|
self.cache.set("hello1", "goodbye1")
|
|
self.assertEqual(self.cache.has_key("hello1"), False)
|
|
self.assertEqual(self.cache.has_key("goodbye1"), False)
|
|
|
|
def test_in(self):
|
|
"The in operator doesn't ever return True for the dummy cache backend"
|
|
self.cache.set("hello2", "goodbye2")
|
|
self.assertEqual("hello2" in self.cache, False)
|
|
self.assertEqual("goodbye2" in self.cache, False)
|
|
|
|
def test_incr(self):
|
|
"Dummy cache values can't be incremented"
|
|
self.cache.set('answer', 42)
|
|
self.assertRaises(ValueError, self.cache.incr, 'answer')
|
|
self.assertRaises(ValueError, self.cache.incr, 'does_not_exist')
|
|
|
|
def test_decr(self):
|
|
"Dummy cache values can't be decremented"
|
|
self.cache.set('answer', 42)
|
|
self.assertRaises(ValueError, self.cache.decr, 'answer')
|
|
self.assertRaises(ValueError, self.cache.decr, 'does_not_exist')
|
|
|
|
def test_data_types(self):
|
|
"All data types are ignored equally by the dummy cache"
|
|
stuff = {
|
|
'string' : 'this is a string',
|
|
'int' : 42,
|
|
'list' : [1, 2, 3, 4],
|
|
'tuple' : (1, 2, 3, 4),
|
|
'dict' : {'A': 1, 'B' : 2},
|
|
'function' : f,
|
|
'class' : C,
|
|
}
|
|
self.cache.set("stuff", stuff)
|
|
self.assertEqual(self.cache.get("stuff"), None)
|
|
|
|
def test_expiration(self):
|
|
"Expiration has no effect on the dummy cache"
|
|
self.cache.set('expire1', 'very quickly', 1)
|
|
self.cache.set('expire2', 'very quickly', 1)
|
|
self.cache.set('expire3', 'very quickly', 1)
|
|
|
|
time.sleep(2)
|
|
self.assertEqual(self.cache.get("expire1"), None)
|
|
|
|
self.cache.add("expire2", "newvalue")
|
|
self.assertEqual(self.cache.get("expire2"), None)
|
|
self.assertEqual(self.cache.has_key("expire3"), False)
|
|
|
|
def test_unicode(self):
|
|
"Unicode values are ignored by the dummy cache"
|
|
stuff = {
|
|
u'ascii': u'ascii_value',
|
|
u'unicode_ascii': u'Iñtërnâtiônàlizætiøn1',
|
|
u'Iñtërnâtiônàlizætiøn': u'Iñtërnâtiônàlizætiøn2',
|
|
u'ascii2': {u'x' : 1 }
|
|
}
|
|
for (key, value) in stuff.items():
|
|
self.cache.set(key, value)
|
|
self.assertEqual(self.cache.get(key), None)
|
|
|
|
def test_set_many(self):
|
|
"set_many does nothing for the dummy cache backend"
|
|
self.cache.set_many({'a': 1, 'b': 2})
|
|
self.cache.set_many({'a': 1, 'b': 2}, timeout=2, version='1')
|
|
|
|
def test_delete_many(self):
|
|
"delete_many does nothing for the dummy cache backend"
|
|
self.cache.delete_many(['a', 'b'])
|
|
|
|
def test_clear(self):
|
|
"clear does nothing for the dummy cache backend"
|
|
self.cache.clear()
|
|
|
|
def test_incr_version(self):
|
|
"Dummy cache versions can't be incremented"
|
|
self.cache.set('answer', 42)
|
|
self.assertRaises(ValueError, self.cache.incr_version, 'answer')
|
|
self.assertRaises(ValueError, self.cache.incr_version, 'does_not_exist')
|
|
|
|
def test_decr_version(self):
|
|
"Dummy cache versions can't be decremented"
|
|
self.cache.set('answer', 42)
|
|
self.assertRaises(ValueError, self.cache.decr_version, 'answer')
|
|
self.assertRaises(ValueError, self.cache.decr_version, 'does_not_exist')
|
|
|
|
|
|
class BaseCacheTests(object):
|
|
# A common set of tests to apply to all cache backends
|
|
|
|
def test_simple(self):
|
|
# Simple cache set/get works
|
|
self.cache.set("key", "value")
|
|
self.assertEqual(self.cache.get("key"), "value")
|
|
|
|
def test_add(self):
|
|
# A key can be added to a cache
|
|
self.cache.add("addkey1", "value")
|
|
result = self.cache.add("addkey1", "newvalue")
|
|
self.assertEqual(result, False)
|
|
self.assertEqual(self.cache.get("addkey1"), "value")
|
|
|
|
def test_prefix(self):
|
|
# Test for same cache key conflicts between shared backend
|
|
self.cache.set('somekey', 'value')
|
|
|
|
# should not be set in the prefixed cache
|
|
self.assertFalse(self.prefix_cache.has_key('somekey'))
|
|
|
|
self.prefix_cache.set('somekey', 'value2')
|
|
|
|
self.assertEqual(self.cache.get('somekey'), 'value')
|
|
self.assertEqual(self.prefix_cache.get('somekey'), 'value2')
|
|
|
|
def test_non_existent(self):
|
|
# Non-existent cache keys return as None/default
|
|
# get with non-existent keys
|
|
self.assertEqual(self.cache.get("does_not_exist"), None)
|
|
self.assertEqual(self.cache.get("does_not_exist", "bang!"), "bang!")
|
|
|
|
def test_get_many(self):
|
|
# Multiple cache keys can be returned using get_many
|
|
self.cache.set('a', 'a')
|
|
self.cache.set('b', 'b')
|
|
self.cache.set('c', 'c')
|
|
self.cache.set('d', 'd')
|
|
self.assertEqual(self.cache.get_many(['a', 'c', 'd']), {'a' : 'a', 'c' : 'c', 'd' : 'd'})
|
|
self.assertEqual(self.cache.get_many(['a', 'b', 'e']), {'a' : 'a', 'b' : 'b'})
|
|
|
|
def test_delete(self):
|
|
# Cache keys can be deleted
|
|
self.cache.set("key1", "spam")
|
|
self.cache.set("key2", "eggs")
|
|
self.assertEqual(self.cache.get("key1"), "spam")
|
|
self.cache.delete("key1")
|
|
self.assertEqual(self.cache.get("key1"), None)
|
|
self.assertEqual(self.cache.get("key2"), "eggs")
|
|
|
|
def test_has_key(self):
|
|
# The cache can be inspected for cache keys
|
|
self.cache.set("hello1", "goodbye1")
|
|
self.assertEqual(self.cache.has_key("hello1"), True)
|
|
self.assertEqual(self.cache.has_key("goodbye1"), False)
|
|
|
|
def test_in(self):
|
|
# The in operator can be used to inspect cache contents
|
|
self.cache.set("hello2", "goodbye2")
|
|
self.assertEqual("hello2" in self.cache, True)
|
|
self.assertEqual("goodbye2" in self.cache, False)
|
|
|
|
def test_incr(self):
|
|
# Cache values can be incremented
|
|
self.cache.set('answer', 41)
|
|
self.assertEqual(self.cache.incr('answer'), 42)
|
|
self.assertEqual(self.cache.get('answer'), 42)
|
|
self.assertEqual(self.cache.incr('answer', 10), 52)
|
|
self.assertEqual(self.cache.get('answer'), 52)
|
|
self.assertRaises(ValueError, self.cache.incr, 'does_not_exist')
|
|
|
|
def test_decr(self):
|
|
# Cache values can be decremented
|
|
self.cache.set('answer', 43)
|
|
self.assertEqual(self.cache.decr('answer'), 42)
|
|
self.assertEqual(self.cache.get('answer'), 42)
|
|
self.assertEqual(self.cache.decr('answer', 10), 32)
|
|
self.assertEqual(self.cache.get('answer'), 32)
|
|
self.assertRaises(ValueError, self.cache.decr, 'does_not_exist')
|
|
|
|
def test_data_types(self):
|
|
# Many different data types can be cached
|
|
stuff = {
|
|
'string' : 'this is a string',
|
|
'int' : 42,
|
|
'list' : [1, 2, 3, 4],
|
|
'tuple' : (1, 2, 3, 4),
|
|
'dict' : {'A': 1, 'B' : 2},
|
|
'function' : f,
|
|
'class' : C,
|
|
}
|
|
self.cache.set("stuff", stuff)
|
|
self.assertEqual(self.cache.get("stuff"), stuff)
|
|
|
|
def test_cache_read_for_model_instance(self):
|
|
# Don't want fields with callable as default to be called on cache read
|
|
expensive_calculation.num_runs = 0
|
|
Poll.objects.all().delete()
|
|
my_poll = Poll.objects.create(question="Well?")
|
|
self.assertEqual(Poll.objects.count(), 1)
|
|
pub_date = my_poll.pub_date
|
|
self.cache.set('question', my_poll)
|
|
cached_poll = self.cache.get('question')
|
|
self.assertEqual(cached_poll.pub_date, pub_date)
|
|
# We only want the default expensive calculation run once
|
|
self.assertEqual(expensive_calculation.num_runs, 1)
|
|
|
|
def test_cache_write_for_model_instance_with_deferred(self):
|
|
# Don't want fields with callable as default to be called on cache write
|
|
expensive_calculation.num_runs = 0
|
|
Poll.objects.all().delete()
|
|
my_poll = Poll.objects.create(question="What?")
|
|
self.assertEqual(expensive_calculation.num_runs, 1)
|
|
defer_qs = Poll.objects.all().defer('question')
|
|
self.assertEqual(defer_qs.count(), 1)
|
|
self.assertEqual(expensive_calculation.num_runs, 1)
|
|
self.cache.set('deferred_queryset', defer_qs)
|
|
# cache set should not re-evaluate default functions
|
|
self.assertEqual(expensive_calculation.num_runs, 1)
|
|
|
|
def test_cache_read_for_model_instance_with_deferred(self):
|
|
# Don't want fields with callable as default to be called on cache read
|
|
expensive_calculation.num_runs = 0
|
|
Poll.objects.all().delete()
|
|
my_poll = Poll.objects.create(question="What?")
|
|
self.assertEqual(expensive_calculation.num_runs, 1)
|
|
defer_qs = Poll.objects.all().defer('question')
|
|
self.assertEqual(defer_qs.count(), 1)
|
|
self.cache.set('deferred_queryset', defer_qs)
|
|
self.assertEqual(expensive_calculation.num_runs, 1)
|
|
runs_before_cache_read = expensive_calculation.num_runs
|
|
cached_polls = self.cache.get('deferred_queryset')
|
|
# We only want the default expensive calculation run on creation and set
|
|
self.assertEqual(expensive_calculation.num_runs, runs_before_cache_read)
|
|
|
|
def test_expiration(self):
|
|
# Cache values can be set to expire
|
|
self.cache.set('expire1', 'very quickly', 1)
|
|
self.cache.set('expire2', 'very quickly', 1)
|
|
self.cache.set('expire3', 'very quickly', 1)
|
|
|
|
time.sleep(2)
|
|
self.assertEqual(self.cache.get("expire1"), None)
|
|
|
|
self.cache.add("expire2", "newvalue")
|
|
self.assertEqual(self.cache.get("expire2"), "newvalue")
|
|
self.assertEqual(self.cache.has_key("expire3"), False)
|
|
|
|
def test_unicode(self):
|
|
# Unicode values can be cached
|
|
stuff = {
|
|
u'ascii': u'ascii_value',
|
|
u'unicode_ascii': u'Iñtërnâtiônàlizætiøn1',
|
|
u'Iñtërnâtiônàlizætiøn': u'Iñtërnâtiônàlizætiøn2',
|
|
u'ascii2': {u'x' : 1 }
|
|
}
|
|
# Test `set`
|
|
for (key, value) in stuff.items():
|
|
self.cache.set(key, value)
|
|
self.assertEqual(self.cache.get(key), value)
|
|
|
|
# Test `add`
|
|
for (key, value) in stuff.items():
|
|
self.cache.delete(key)
|
|
self.cache.add(key, value)
|
|
self.assertEqual(self.cache.get(key), value)
|
|
|
|
# Test `set_many`
|
|
for (key, value) in stuff.items():
|
|
self.cache.delete(key)
|
|
self.cache.set_many(stuff)
|
|
for (key, value) in stuff.items():
|
|
self.assertEqual(self.cache.get(key), value)
|
|
|
|
def test_binary_string(self):
|
|
# Binary strings should be cacheable
|
|
from zlib import compress, decompress
|
|
value = 'value_to_be_compressed'
|
|
compressed_value = compress(value)
|
|
|
|
# Test set
|
|
self.cache.set('binary1', compressed_value)
|
|
compressed_result = self.cache.get('binary1')
|
|
self.assertEqual(compressed_value, compressed_result)
|
|
self.assertEqual(value, decompress(compressed_result))
|
|
|
|
# Test add
|
|
self.cache.add('binary1-add', compressed_value)
|
|
compressed_result = self.cache.get('binary1-add')
|
|
self.assertEqual(compressed_value, compressed_result)
|
|
self.assertEqual(value, decompress(compressed_result))
|
|
|
|
# Test set_many
|
|
self.cache.set_many({'binary1-set_many': compressed_value})
|
|
compressed_result = self.cache.get('binary1-set_many')
|
|
self.assertEqual(compressed_value, compressed_result)
|
|
self.assertEqual(value, decompress(compressed_result))
|
|
|
|
def test_set_many(self):
|
|
# Multiple keys can be set using set_many
|
|
self.cache.set_many({"key1": "spam", "key2": "eggs"})
|
|
self.assertEqual(self.cache.get("key1"), "spam")
|
|
self.assertEqual(self.cache.get("key2"), "eggs")
|
|
|
|
def test_set_many_expiration(self):
|
|
# set_many takes a second ``timeout`` parameter
|
|
self.cache.set_many({"key1": "spam", "key2": "eggs"}, 1)
|
|
time.sleep(2)
|
|
self.assertEqual(self.cache.get("key1"), None)
|
|
self.assertEqual(self.cache.get("key2"), None)
|
|
|
|
def test_delete_many(self):
|
|
# Multiple keys can be deleted using delete_many
|
|
self.cache.set("key1", "spam")
|
|
self.cache.set("key2", "eggs")
|
|
self.cache.set("key3", "ham")
|
|
self.cache.delete_many(["key1", "key2"])
|
|
self.assertEqual(self.cache.get("key1"), None)
|
|
self.assertEqual(self.cache.get("key2"), None)
|
|
self.assertEqual(self.cache.get("key3"), "ham")
|
|
|
|
def test_clear(self):
|
|
# The cache can be emptied using clear
|
|
self.cache.set("key1", "spam")
|
|
self.cache.set("key2", "eggs")
|
|
self.cache.clear()
|
|
self.assertEqual(self.cache.get("key1"), None)
|
|
self.assertEqual(self.cache.get("key2"), None)
|
|
|
|
def test_long_timeout(self):
|
|
'''
|
|
Using a timeout greater than 30 days makes memcached think
|
|
it is an absolute expiration timestamp instead of a relative
|
|
offset. Test that we honour this convention. Refs #12399.
|
|
'''
|
|
self.cache.set('key1', 'eggs', 60*60*24*30 + 1) #30 days + 1 second
|
|
self.assertEqual(self.cache.get('key1'), 'eggs')
|
|
|
|
self.cache.add('key2', 'ham', 60*60*24*30 + 1)
|
|
self.assertEqual(self.cache.get('key2'), 'ham')
|
|
|
|
self.cache.set_many({'key3': 'sausage', 'key4': 'lobster bisque'}, 60*60*24*30 + 1)
|
|
self.assertEqual(self.cache.get('key3'), 'sausage')
|
|
self.assertEqual(self.cache.get('key4'), 'lobster bisque')
|
|
|
|
def test_float_timeout(self):
|
|
# Make sure a timeout given as a float doesn't crash anything.
|
|
self.cache.set("key1", "spam", 100.2)
|
|
self.assertEqual(self.cache.get("key1"), "spam")
|
|
|
|
def perform_cull_test(self, initial_count, final_count):
|
|
"""This is implemented as a utility method, because only some of the backends
|
|
implement culling. The culling algorithm also varies slightly, so the final
|
|
number of entries will vary between backends"""
|
|
# Create initial cache key entries. This will overflow the cache, causing a cull
|
|
for i in range(1, initial_count):
|
|
self.cache.set('cull%d' % i, 'value', 1000)
|
|
count = 0
|
|
# Count how many keys are left in the cache.
|
|
for i in range(1, initial_count):
|
|
if self.cache.has_key('cull%d' % i):
|
|
count = count + 1
|
|
self.assertEqual(count, final_count)
|
|
|
|
def test_invalid_keys(self):
|
|
"""
|
|
All the builtin backends (except memcached, see below) should warn on
|
|
keys that would be refused by memcached. This encourages portable
|
|
caching code without making it too difficult to use production backends
|
|
with more liberal key rules. Refs #6447.
|
|
|
|
"""
|
|
# mimic custom ``make_key`` method being defined since the default will
|
|
# never show the below warnings
|
|
def func(key, *args):
|
|
return key
|
|
|
|
old_func = self.cache.key_func
|
|
self.cache.key_func = func
|
|
# On Python 2.6+ we could use the catch_warnings context
|
|
# manager to test this warning nicely. Since we can't do that
|
|
# yet, the cleanest option is to temporarily ask for
|
|
# CacheKeyWarning to be raised as an exception.
|
|
_warnings_state = get_warnings_state()
|
|
warnings.simplefilter("error", CacheKeyWarning)
|
|
|
|
try:
|
|
# memcached does not allow whitespace or control characters in keys
|
|
self.assertRaises(CacheKeyWarning, self.cache.set, 'key with spaces', 'value')
|
|
# memcached limits key length to 250
|
|
self.assertRaises(CacheKeyWarning, self.cache.set, 'a' * 251, 'value')
|
|
finally:
|
|
restore_warnings_state(_warnings_state)
|
|
self.cache.key_func = old_func
|
|
|
|
def test_cache_versioning_get_set(self):
|
|
# set, using default version = 1
|
|
self.cache.set('answer1', 42)
|
|
self.assertEqual(self.cache.get('answer1'), 42)
|
|
self.assertEqual(self.cache.get('answer1', version=1), 42)
|
|
self.assertEqual(self.cache.get('answer1', version=2), None)
|
|
|
|
self.assertEqual(self.v2_cache.get('answer1'), None)
|
|
self.assertEqual(self.v2_cache.get('answer1', version=1), 42)
|
|
self.assertEqual(self.v2_cache.get('answer1', version=2), None)
|
|
|
|
# set, default version = 1, but manually override version = 2
|
|
self.cache.set('answer2', 42, version=2)
|
|
self.assertEqual(self.cache.get('answer2'), None)
|
|
self.assertEqual(self.cache.get('answer2', version=1), None)
|
|
self.assertEqual(self.cache.get('answer2', version=2), 42)
|
|
|
|
self.assertEqual(self.v2_cache.get('answer2'), 42)
|
|
self.assertEqual(self.v2_cache.get('answer2', version=1), None)
|
|
self.assertEqual(self.v2_cache.get('answer2', version=2), 42)
|
|
|
|
# v2 set, using default version = 2
|
|
self.v2_cache.set('answer3', 42)
|
|
self.assertEqual(self.cache.get('answer3'), None)
|
|
self.assertEqual(self.cache.get('answer3', version=1), None)
|
|
self.assertEqual(self.cache.get('answer3', version=2), 42)
|
|
|
|
self.assertEqual(self.v2_cache.get('answer3'), 42)
|
|
self.assertEqual(self.v2_cache.get('answer3', version=1), None)
|
|
self.assertEqual(self.v2_cache.get('answer3', version=2), 42)
|
|
|
|
# v2 set, default version = 2, but manually override version = 1
|
|
self.v2_cache.set('answer4', 42, version=1)
|
|
self.assertEqual(self.cache.get('answer4'), 42)
|
|
self.assertEqual(self.cache.get('answer4', version=1), 42)
|
|
self.assertEqual(self.cache.get('answer4', version=2), None)
|
|
|
|
self.assertEqual(self.v2_cache.get('answer4'), None)
|
|
self.assertEqual(self.v2_cache.get('answer4', version=1), 42)
|
|
self.assertEqual(self.v2_cache.get('answer4', version=2), None)
|
|
|
|
def test_cache_versioning_add(self):
|
|
|
|
# add, default version = 1, but manually override version = 2
|
|
self.cache.add('answer1', 42, version=2)
|
|
self.assertEqual(self.cache.get('answer1', version=1), None)
|
|
self.assertEqual(self.cache.get('answer1', version=2), 42)
|
|
|
|
self.cache.add('answer1', 37, version=2)
|
|
self.assertEqual(self.cache.get('answer1', version=1), None)
|
|
self.assertEqual(self.cache.get('answer1', version=2), 42)
|
|
|
|
self.cache.add('answer1', 37, version=1)
|
|
self.assertEqual(self.cache.get('answer1', version=1), 37)
|
|
self.assertEqual(self.cache.get('answer1', version=2), 42)
|
|
|
|
# v2 add, using default version = 2
|
|
self.v2_cache.add('answer2', 42)
|
|
self.assertEqual(self.cache.get('answer2', version=1), None)
|
|
self.assertEqual(self.cache.get('answer2', version=2), 42)
|
|
|
|
self.v2_cache.add('answer2', 37)
|
|
self.assertEqual(self.cache.get('answer2', version=1), None)
|
|
self.assertEqual(self.cache.get('answer2', version=2), 42)
|
|
|
|
self.v2_cache.add('answer2', 37, version=1)
|
|
self.assertEqual(self.cache.get('answer2', version=1), 37)
|
|
self.assertEqual(self.cache.get('answer2', version=2), 42)
|
|
|
|
# v2 add, default version = 2, but manually override version = 1
|
|
self.v2_cache.add('answer3', 42, version=1)
|
|
self.assertEqual(self.cache.get('answer3', version=1), 42)
|
|
self.assertEqual(self.cache.get('answer3', version=2), None)
|
|
|
|
self.v2_cache.add('answer3', 37, version=1)
|
|
self.assertEqual(self.cache.get('answer3', version=1), 42)
|
|
self.assertEqual(self.cache.get('answer3', version=2), None)
|
|
|
|
self.v2_cache.add('answer3', 37)
|
|
self.assertEqual(self.cache.get('answer3', version=1), 42)
|
|
self.assertEqual(self.cache.get('answer3', version=2), 37)
|
|
|
|
def test_cache_versioning_has_key(self):
|
|
self.cache.set('answer1', 42)
|
|
|
|
# has_key
|
|
self.assertTrue(self.cache.has_key('answer1'))
|
|
self.assertTrue(self.cache.has_key('answer1', version=1))
|
|
self.assertFalse(self.cache.has_key('answer1', version=2))
|
|
|
|
self.assertFalse(self.v2_cache.has_key('answer1'))
|
|
self.assertTrue(self.v2_cache.has_key('answer1', version=1))
|
|
self.assertFalse(self.v2_cache.has_key('answer1', version=2))
|
|
|
|
def test_cache_versioning_delete(self):
|
|
self.cache.set('answer1', 37, version=1)
|
|
self.cache.set('answer1', 42, version=2)
|
|
self.cache.delete('answer1')
|
|
self.assertEqual(self.cache.get('answer1', version=1), None)
|
|
self.assertEqual(self.cache.get('answer1', version=2), 42)
|
|
|
|
self.cache.set('answer2', 37, version=1)
|
|
self.cache.set('answer2', 42, version=2)
|
|
self.cache.delete('answer2', version=2)
|
|
self.assertEqual(self.cache.get('answer2', version=1), 37)
|
|
self.assertEqual(self.cache.get('answer2', version=2), None)
|
|
|
|
self.cache.set('answer3', 37, version=1)
|
|
self.cache.set('answer3', 42, version=2)
|
|
self.v2_cache.delete('answer3')
|
|
self.assertEqual(self.cache.get('answer3', version=1), 37)
|
|
self.assertEqual(self.cache.get('answer3', version=2), None)
|
|
|
|
self.cache.set('answer4', 37, version=1)
|
|
self.cache.set('answer4', 42, version=2)
|
|
self.v2_cache.delete('answer4', version=1)
|
|
self.assertEqual(self.cache.get('answer4', version=1), None)
|
|
self.assertEqual(self.cache.get('answer4', version=2), 42)
|
|
|
|
def test_cache_versioning_incr_decr(self):
|
|
self.cache.set('answer1', 37, version=1)
|
|
self.cache.set('answer1', 42, version=2)
|
|
self.cache.incr('answer1')
|
|
self.assertEqual(self.cache.get('answer1', version=1), 38)
|
|
self.assertEqual(self.cache.get('answer1', version=2), 42)
|
|
self.cache.decr('answer1')
|
|
self.assertEqual(self.cache.get('answer1', version=1), 37)
|
|
self.assertEqual(self.cache.get('answer1', version=2), 42)
|
|
|
|
self.cache.set('answer2', 37, version=1)
|
|
self.cache.set('answer2', 42, version=2)
|
|
self.cache.incr('answer2', version=2)
|
|
self.assertEqual(self.cache.get('answer2', version=1), 37)
|
|
self.assertEqual(self.cache.get('answer2', version=2), 43)
|
|
self.cache.decr('answer2', version=2)
|
|
self.assertEqual(self.cache.get('answer2', version=1), 37)
|
|
self.assertEqual(self.cache.get('answer2', version=2), 42)
|
|
|
|
self.cache.set('answer3', 37, version=1)
|
|
self.cache.set('answer3', 42, version=2)
|
|
self.v2_cache.incr('answer3')
|
|
self.assertEqual(self.cache.get('answer3', version=1), 37)
|
|
self.assertEqual(self.cache.get('answer3', version=2), 43)
|
|
self.v2_cache.decr('answer3')
|
|
self.assertEqual(self.cache.get('answer3', version=1), 37)
|
|
self.assertEqual(self.cache.get('answer3', version=2), 42)
|
|
|
|
self.cache.set('answer4', 37, version=1)
|
|
self.cache.set('answer4', 42, version=2)
|
|
self.v2_cache.incr('answer4', version=1)
|
|
self.assertEqual(self.cache.get('answer4', version=1), 38)
|
|
self.assertEqual(self.cache.get('answer4', version=2), 42)
|
|
self.v2_cache.decr('answer4', version=1)
|
|
self.assertEqual(self.cache.get('answer4', version=1), 37)
|
|
self.assertEqual(self.cache.get('answer4', version=2), 42)
|
|
|
|
def test_cache_versioning_get_set_many(self):
|
|
# set, using default version = 1
|
|
self.cache.set_many({'ford1': 37, 'arthur1': 42})
|
|
self.assertEqual(self.cache.get_many(['ford1','arthur1']),
|
|
{'ford1': 37, 'arthur1': 42})
|
|
self.assertEqual(self.cache.get_many(['ford1','arthur1'], version=1),
|
|
{'ford1': 37, 'arthur1': 42})
|
|
self.assertEqual(self.cache.get_many(['ford1','arthur1'], version=2), {})
|
|
|
|
self.assertEqual(self.v2_cache.get_many(['ford1','arthur1']), {})
|
|
self.assertEqual(self.v2_cache.get_many(['ford1','arthur1'], version=1),
|
|
{'ford1': 37, 'arthur1': 42})
|
|
self.assertEqual(self.v2_cache.get_many(['ford1','arthur1'], version=2), {})
|
|
|
|
# set, default version = 1, but manually override version = 2
|
|
self.cache.set_many({'ford2': 37, 'arthur2': 42}, version=2)
|
|
self.assertEqual(self.cache.get_many(['ford2','arthur2']), {})
|
|
self.assertEqual(self.cache.get_many(['ford2','arthur2'], version=1), {})
|
|
self.assertEqual(self.cache.get_many(['ford2','arthur2'], version=2),
|
|
{'ford2': 37, 'arthur2': 42})
|
|
|
|
self.assertEqual(self.v2_cache.get_many(['ford2','arthur2']),
|
|
{'ford2': 37, 'arthur2': 42})
|
|
self.assertEqual(self.v2_cache.get_many(['ford2','arthur2'], version=1), {})
|
|
self.assertEqual(self.v2_cache.get_many(['ford2','arthur2'], version=2),
|
|
{'ford2': 37, 'arthur2': 42})
|
|
|
|
# v2 set, using default version = 2
|
|
self.v2_cache.set_many({'ford3': 37, 'arthur3': 42})
|
|
self.assertEqual(self.cache.get_many(['ford3','arthur3']), {})
|
|
self.assertEqual(self.cache.get_many(['ford3','arthur3'], version=1), {})
|
|
self.assertEqual(self.cache.get_many(['ford3','arthur3'], version=2),
|
|
{'ford3': 37, 'arthur3': 42})
|
|
|
|
self.assertEqual(self.v2_cache.get_many(['ford3','arthur3']),
|
|
{'ford3': 37, 'arthur3': 42})
|
|
self.assertEqual(self.v2_cache.get_many(['ford3','arthur3'], version=1), {})
|
|
self.assertEqual(self.v2_cache.get_many(['ford3','arthur3'], version=2),
|
|
{'ford3': 37, 'arthur3': 42})
|
|
|
|
# v2 set, default version = 2, but manually override version = 1
|
|
self.v2_cache.set_many({'ford4': 37, 'arthur4': 42}, version=1)
|
|
self.assertEqual(self.cache.get_many(['ford4','arthur4']),
|
|
{'ford4': 37, 'arthur4': 42})
|
|
self.assertEqual(self.cache.get_many(['ford4','arthur4'], version=1),
|
|
{'ford4': 37, 'arthur4': 42})
|
|
self.assertEqual(self.cache.get_many(['ford4','arthur4'], version=2), {})
|
|
|
|
self.assertEqual(self.v2_cache.get_many(['ford4','arthur4']), {})
|
|
self.assertEqual(self.v2_cache.get_many(['ford4','arthur4'], version=1),
|
|
{'ford4': 37, 'arthur4': 42})
|
|
self.assertEqual(self.v2_cache.get_many(['ford4','arthur4'], version=2), {})
|
|
|
|
def test_incr_version(self):
|
|
self.cache.set('answer', 42, version=2)
|
|
self.assertEqual(self.cache.get('answer'), None)
|
|
self.assertEqual(self.cache.get('answer', version=1), None)
|
|
self.assertEqual(self.cache.get('answer', version=2), 42)
|
|
self.assertEqual(self.cache.get('answer', version=3), None)
|
|
|
|
self.assertEqual(self.cache.incr_version('answer', version=2), 3)
|
|
self.assertEqual(self.cache.get('answer'), None)
|
|
self.assertEqual(self.cache.get('answer', version=1), None)
|
|
self.assertEqual(self.cache.get('answer', version=2), None)
|
|
self.assertEqual(self.cache.get('answer', version=3), 42)
|
|
|
|
self.v2_cache.set('answer2', 42)
|
|
self.assertEqual(self.v2_cache.get('answer2'), 42)
|
|
self.assertEqual(self.v2_cache.get('answer2', version=1), None)
|
|
self.assertEqual(self.v2_cache.get('answer2', version=2), 42)
|
|
self.assertEqual(self.v2_cache.get('answer2', version=3), None)
|
|
|
|
self.assertEqual(self.v2_cache.incr_version('answer2'), 3)
|
|
self.assertEqual(self.v2_cache.get('answer2'), None)
|
|
self.assertEqual(self.v2_cache.get('answer2', version=1), None)
|
|
self.assertEqual(self.v2_cache.get('answer2', version=2), None)
|
|
self.assertEqual(self.v2_cache.get('answer2', version=3), 42)
|
|
|
|
self.assertRaises(ValueError, self.cache.incr_version, 'does_not_exist')
|
|
|
|
def test_decr_version(self):
|
|
self.cache.set('answer', 42, version=2)
|
|
self.assertEqual(self.cache.get('answer'), None)
|
|
self.assertEqual(self.cache.get('answer', version=1), None)
|
|
self.assertEqual(self.cache.get('answer', version=2), 42)
|
|
|
|
self.assertEqual(self.cache.decr_version('answer', version=2), 1)
|
|
self.assertEqual(self.cache.get('answer'), 42)
|
|
self.assertEqual(self.cache.get('answer', version=1), 42)
|
|
self.assertEqual(self.cache.get('answer', version=2), None)
|
|
|
|
self.v2_cache.set('answer2', 42)
|
|
self.assertEqual(self.v2_cache.get('answer2'), 42)
|
|
self.assertEqual(self.v2_cache.get('answer2', version=1), None)
|
|
self.assertEqual(self.v2_cache.get('answer2', version=2), 42)
|
|
|
|
self.assertEqual(self.v2_cache.decr_version('answer2'), 1)
|
|
self.assertEqual(self.v2_cache.get('answer2'), None)
|
|
self.assertEqual(self.v2_cache.get('answer2', version=1), 42)
|
|
self.assertEqual(self.v2_cache.get('answer2', version=2), None)
|
|
|
|
self.assertRaises(ValueError, self.cache.decr_version, 'does_not_exist', version=2)
|
|
|
|
def test_custom_key_func(self):
|
|
# Two caches with different key functions aren't visible to each other
|
|
self.cache.set('answer1', 42)
|
|
self.assertEqual(self.cache.get('answer1'), 42)
|
|
self.assertEqual(self.custom_key_cache.get('answer1'), None)
|
|
self.assertEqual(self.custom_key_cache2.get('answer1'), None)
|
|
|
|
self.custom_key_cache.set('answer2', 42)
|
|
self.assertEqual(self.cache.get('answer2'), None)
|
|
self.assertEqual(self.custom_key_cache.get('answer2'), 42)
|
|
self.assertEqual(self.custom_key_cache2.get('answer2'), 42)
|
|
|
|
def custom_key_func(key, key_prefix, version):
|
|
"A customized cache key function"
|
|
return 'CUSTOM-' + '-'.join([key_prefix, str(version), key])
|
|
|
|
class DBCacheTests(unittest.TestCase, BaseCacheTests):
|
|
backend_name = 'django.core.cache.backends.db.DatabaseCache'
|
|
|
|
def setUp(self):
|
|
# Spaces are used in the table name to ensure quoting/escaping is working
|
|
self._table_name = 'test cache table'
|
|
management.call_command('createcachetable', self._table_name, verbosity=0, interactive=False)
|
|
self.cache = get_cache(self.backend_name, LOCATION=self._table_name, OPTIONS={'MAX_ENTRIES': 30})
|
|
self.prefix_cache = get_cache(self.backend_name, LOCATION=self._table_name, KEY_PREFIX='cacheprefix')
|
|
self.v2_cache = get_cache(self.backend_name, LOCATION=self._table_name, VERSION=2)
|
|
self.custom_key_cache = get_cache(self.backend_name, LOCATION=self._table_name, KEY_FUNCTION=custom_key_func)
|
|
self.custom_key_cache2 = get_cache(self.backend_name, LOCATION=self._table_name, KEY_FUNCTION='regressiontests.cache.tests.custom_key_func')
|
|
|
|
def tearDown(self):
|
|
from django.db import connection
|
|
cursor = connection.cursor()
|
|
cursor.execute('DROP TABLE %s' % connection.ops.quote_name(self._table_name))
|
|
|
|
def test_cull(self):
|
|
self.perform_cull_test(50, 29)
|
|
|
|
def test_zero_cull(self):
|
|
self.cache = get_cache(self.backend_name, LOCATION=self._table_name, OPTIONS={'MAX_ENTRIES': 30, 'CULL_FREQUENCY': 0})
|
|
self.perform_cull_test(50, 18)
|
|
|
|
def test_old_initialization(self):
|
|
self.cache = get_cache('db://%s?max_entries=30&cull_frequency=0' % self._table_name)
|
|
self.perform_cull_test(50, 18)
|
|
|
|
|
|
class LocMemCacheTests(unittest.TestCase, BaseCacheTests):
|
|
backend_name = 'django.core.cache.backends.locmem.LocMemCache'
|
|
|
|
def setUp(self):
|
|
self.cache = get_cache(self.backend_name, OPTIONS={'MAX_ENTRIES': 30})
|
|
self.prefix_cache = get_cache(self.backend_name, KEY_PREFIX='cacheprefix')
|
|
self.v2_cache = get_cache(self.backend_name, VERSION=2)
|
|
self.custom_key_cache = get_cache(self.backend_name, OPTIONS={'MAX_ENTRIES': 30}, KEY_FUNCTION=custom_key_func)
|
|
self.custom_key_cache2 = get_cache(self.backend_name, OPTIONS={'MAX_ENTRIES': 30}, KEY_FUNCTION='regressiontests.cache.tests.custom_key_func')
|
|
|
|
# LocMem requires a hack to make the other caches
|
|
# share a data store with the 'normal' cache.
|
|
self.prefix_cache._cache = self.cache._cache
|
|
self.prefix_cache._expire_info = self.cache._expire_info
|
|
|
|
self.v2_cache._cache = self.cache._cache
|
|
self.v2_cache._expire_info = self.cache._expire_info
|
|
|
|
self.custom_key_cache._cache = self.cache._cache
|
|
self.custom_key_cache._expire_info = self.cache._expire_info
|
|
|
|
self.custom_key_cache2._cache = self.cache._cache
|
|
self.custom_key_cache2._expire_info = self.cache._expire_info
|
|
|
|
def tearDown(self):
|
|
self.cache.clear()
|
|
|
|
def test_cull(self):
|
|
self.perform_cull_test(50, 29)
|
|
|
|
def test_zero_cull(self):
|
|
self.cache = get_cache(self.backend_name, OPTIONS={'MAX_ENTRIES': 30, 'CULL_FREQUENCY': 0})
|
|
self.perform_cull_test(50, 19)
|
|
|
|
def test_old_initialization(self):
|
|
self.cache = get_cache('locmem://?max_entries=30&cull_frequency=0')
|
|
self.perform_cull_test(50, 19)
|
|
|
|
def test_multiple_caches(self):
|
|
"Check that multiple locmem caches are isolated"
|
|
mirror_cache = get_cache(self.backend_name)
|
|
other_cache = get_cache(self.backend_name, LOCATION='other')
|
|
|
|
self.cache.set('value1', 42)
|
|
self.assertEqual(mirror_cache.get('value1'), 42)
|
|
self.assertEqual(other_cache.get('value1'), None)
|
|
|
|
# memcached backend isn't guaranteed to be available.
|
|
# To check the memcached backend, the test settings file will
|
|
# need to contain a cache backend setting that points at
|
|
# your memcache server.
|
|
class MemcachedCacheTests(unittest.TestCase, BaseCacheTests):
|
|
backend_name = 'django.core.cache.backends.memcached.MemcachedCache'
|
|
|
|
def setUp(self):
|
|
name = settings.CACHES[DEFAULT_CACHE_ALIAS]['LOCATION']
|
|
self.cache = get_cache(self.backend_name, LOCATION=name)
|
|
self.prefix_cache = get_cache(self.backend_name, LOCATION=name, KEY_PREFIX='cacheprefix')
|
|
self.v2_cache = get_cache(self.backend_name, LOCATION=name, VERSION=2)
|
|
self.custom_key_cache = get_cache(self.backend_name, LOCATION=name, KEY_FUNCTION=custom_key_func)
|
|
self.custom_key_cache2 = get_cache(self.backend_name, LOCATION=name, KEY_FUNCTION='regressiontests.cache.tests.custom_key_func')
|
|
|
|
def tearDown(self):
|
|
self.cache.clear()
|
|
|
|
def test_invalid_keys(self):
|
|
"""
|
|
On memcached, we don't introduce a duplicate key validation
|
|
step (for speed reasons), we just let the memcached API
|
|
library raise its own exception on bad keys. Refs #6447.
|
|
|
|
In order to be memcached-API-library agnostic, we only assert
|
|
that a generic exception of some kind is raised.
|
|
|
|
"""
|
|
# memcached does not allow whitespace or control characters in keys
|
|
self.assertRaises(Exception, self.cache.set, 'key with spaces', 'value')
|
|
# memcached limits key length to 250
|
|
self.assertRaises(Exception, self.cache.set, 'a' * 251, 'value')
|
|
|
|
MemcachedCacheTests = unittest.skipUnless(settings.CACHES[DEFAULT_CACHE_ALIAS]['BACKEND'].startswith('django.core.cache.backends.memcached.'), "memcached not available")(MemcachedCacheTests)
|
|
|
|
class FileBasedCacheTests(unittest.TestCase, BaseCacheTests):
|
|
"""
|
|
Specific test cases for the file-based cache.
|
|
"""
|
|
backend_name = 'django.core.cache.backends.filebased.FileBasedCache'
|
|
|
|
def setUp(self):
|
|
self.dirname = tempfile.mkdtemp()
|
|
self.cache = get_cache(self.backend_name, LOCATION=self.dirname, OPTIONS={'MAX_ENTRIES': 30})
|
|
self.prefix_cache = get_cache(self.backend_name, LOCATION=self.dirname, KEY_PREFIX='cacheprefix')
|
|
self.v2_cache = get_cache(self.backend_name, LOCATION=self.dirname, VERSION=2)
|
|
self.custom_key_cache = get_cache(self.backend_name, LOCATION=self.dirname, KEY_FUNCTION=custom_key_func)
|
|
self.custom_key_cache2 = get_cache(self.backend_name, LOCATION=self.dirname, KEY_FUNCTION='regressiontests.cache.tests.custom_key_func')
|
|
|
|
def tearDown(self):
|
|
self.cache.clear()
|
|
|
|
def test_hashing(self):
|
|
"""Test that keys are hashed into subdirectories correctly"""
|
|
self.cache.set("foo", "bar")
|
|
key = self.cache.make_key("foo")
|
|
keyhash = hashlib.md5(key).hexdigest()
|
|
keypath = os.path.join(self.dirname, keyhash[:2], keyhash[2:4], keyhash[4:])
|
|
self.assertTrue(os.path.exists(keypath))
|
|
|
|
def test_subdirectory_removal(self):
|
|
"""
|
|
Make sure that the created subdirectories are correctly removed when empty.
|
|
"""
|
|
self.cache.set("foo", "bar")
|
|
key = self.cache.make_key("foo")
|
|
keyhash = hashlib.md5(key).hexdigest()
|
|
keypath = os.path.join(self.dirname, keyhash[:2], keyhash[2:4], keyhash[4:])
|
|
self.assertTrue(os.path.exists(keypath))
|
|
|
|
self.cache.delete("foo")
|
|
self.assertTrue(not os.path.exists(keypath))
|
|
self.assertTrue(not os.path.exists(os.path.dirname(keypath)))
|
|
self.assertTrue(not os.path.exists(os.path.dirname(os.path.dirname(keypath))))
|
|
|
|
def test_cull(self):
|
|
self.perform_cull_test(50, 29)
|
|
|
|
def test_old_initialization(self):
|
|
self.cache = get_cache('file://%s?max_entries=30' % self.dirname)
|
|
self.perform_cull_test(50, 29)
|
|
|
|
class CustomCacheKeyValidationTests(unittest.TestCase):
|
|
"""
|
|
Tests for the ability to mixin a custom ``validate_key`` method to
|
|
a custom cache backend that otherwise inherits from a builtin
|
|
backend, and override the default key validation. Refs #6447.
|
|
|
|
"""
|
|
def test_custom_key_validation(self):
|
|
cache = get_cache('regressiontests.cache.liberal_backend://')
|
|
|
|
# this key is both longer than 250 characters, and has spaces
|
|
key = 'some key with spaces' * 15
|
|
val = 'a value'
|
|
cache.set(key, val)
|
|
self.assertEqual(cache.get(key), val)
|
|
|
|
|
|
class GetCacheTests(unittest.TestCase):
|
|
|
|
def test_simple(self):
|
|
cache = get_cache('locmem://')
|
|
from django.core.cache.backends.locmem import LocMemCache
|
|
self.assertTrue(isinstance(cache, LocMemCache))
|
|
|
|
from django.core.cache import cache
|
|
self.assertTrue(isinstance(cache, get_cache('default').__class__))
|
|
|
|
cache = get_cache(
|
|
'django.core.cache.backends.dummy.DummyCache', **{'TIMEOUT': 120})
|
|
self.assertEqual(cache.default_timeout, 120)
|
|
|
|
self.assertRaises(InvalidCacheBackendError, get_cache, 'does_not_exist')
|
|
|
|
class CacheUtils(unittest.TestCase):
|
|
"""TestCase for django.utils.cache functions."""
|
|
|
|
def setUp(self):
|
|
self.path = '/cache/test/'
|
|
self.old_cache_middleware_key_prefix = settings.CACHE_MIDDLEWARE_KEY_PREFIX
|
|
self.old_cache_middleware_seconds = settings.CACHE_MIDDLEWARE_SECONDS
|
|
self.orig_use_i18n = settings.USE_I18N
|
|
settings.CACHE_MIDDLEWARE_KEY_PREFIX = 'settingsprefix'
|
|
settings.CACHE_MIDDLEWARE_SECONDS = 1
|
|
settings.USE_I18N = False
|
|
|
|
def tearDown(self):
|
|
settings.CACHE_MIDDLEWARE_KEY_PREFIX = self.old_cache_middleware_key_prefix
|
|
settings.CACHE_MIDDLEWARE_SECONDS = self.old_cache_middleware_seconds
|
|
settings.USE_I18N = self.orig_use_i18n
|
|
|
|
def _get_request(self, path, method='GET'):
|
|
request = HttpRequest()
|
|
request.META = {
|
|
'SERVER_NAME': 'testserver',
|
|
'SERVER_PORT': 80,
|
|
}
|
|
request.method = method
|
|
request.path = request.path_info = "/cache/%s" % path
|
|
return request
|
|
|
|
def test_patch_vary_headers(self):
|
|
headers = (
|
|
# Initial vary, new headers, resulting vary.
|
|
(None, ('Accept-Encoding',), 'Accept-Encoding'),
|
|
('Accept-Encoding', ('accept-encoding',), 'Accept-Encoding'),
|
|
('Accept-Encoding', ('ACCEPT-ENCODING',), 'Accept-Encoding'),
|
|
('Cookie', ('Accept-Encoding',), 'Cookie, Accept-Encoding'),
|
|
('Cookie, Accept-Encoding', ('Accept-Encoding',), 'Cookie, Accept-Encoding'),
|
|
('Cookie, Accept-Encoding', ('Accept-Encoding', 'cookie'), 'Cookie, Accept-Encoding'),
|
|
(None, ('Accept-Encoding', 'COOKIE'), 'Accept-Encoding, COOKIE'),
|
|
('Cookie, Accept-Encoding', ('Accept-Encoding', 'cookie'), 'Cookie, Accept-Encoding'),
|
|
('Cookie , Accept-Encoding', ('Accept-Encoding', 'cookie'), 'Cookie, Accept-Encoding'),
|
|
)
|
|
for initial_vary, newheaders, resulting_vary in headers:
|
|
response = HttpResponse()
|
|
if initial_vary is not None:
|
|
response['Vary'] = initial_vary
|
|
patch_vary_headers(response, newheaders)
|
|
self.assertEqual(response['Vary'], resulting_vary)
|
|
|
|
def test_get_cache_key(self):
|
|
request = self._get_request(self.path)
|
|
response = HttpResponse()
|
|
key_prefix = 'localprefix'
|
|
# Expect None if no headers have been set yet.
|
|
self.assertEqual(get_cache_key(request), None)
|
|
# Set headers to an empty list.
|
|
learn_cache_key(request, response)
|
|
self.assertEqual(get_cache_key(request), 'views.decorators.cache.cache_page.settingsprefix.GET.a8c87a3d8c44853d7f79474f7ffe4ad5.d41d8cd98f00b204e9800998ecf8427e')
|
|
# Verify that a specified key_prefix is taken into account.
|
|
learn_cache_key(request, response, key_prefix=key_prefix)
|
|
self.assertEqual(get_cache_key(request, key_prefix=key_prefix), 'views.decorators.cache.cache_page.localprefix.GET.a8c87a3d8c44853d7f79474f7ffe4ad5.d41d8cd98f00b204e9800998ecf8427e')
|
|
|
|
def test_get_cache_key_with_query(self):
|
|
request = self._get_request(self.path + '?test=1')
|
|
response = HttpResponse()
|
|
# Expect None if no headers have been set yet.
|
|
self.assertEqual(get_cache_key(request), None)
|
|
# Set headers to an empty list.
|
|
learn_cache_key(request, response)
|
|
# Verify that the querystring is taken into account.
|
|
self.assertEqual(get_cache_key(request), 'views.decorators.cache.cache_page.settingsprefix.GET.bd889c5a59603af44333ed21504db3cd.d41d8cd98f00b204e9800998ecf8427e')
|
|
|
|
def test_learn_cache_key(self):
|
|
request = self._get_request(self.path, 'HEAD')
|
|
response = HttpResponse()
|
|
response['Vary'] = 'Pony'
|
|
# Make sure that the Vary header is added to the key hash
|
|
learn_cache_key(request, response)
|
|
self.assertEqual(get_cache_key(request), 'views.decorators.cache.cache_page.settingsprefix.HEAD.a8c87a3d8c44853d7f79474f7ffe4ad5.d41d8cd98f00b204e9800998ecf8427e')
|
|
|
|
def test_patch_cache_control(self):
|
|
tests = (
|
|
# Initial Cache-Control, kwargs to patch_cache_control, expected Cache-Control parts
|
|
(None, {'private' : True}, set(['private'])),
|
|
|
|
# Test whether private/public attributes are mutually exclusive
|
|
('private', {'private' : True}, set(['private'])),
|
|
('private', {'public' : True}, set(['public'])),
|
|
('public', {'public' : True}, set(['public'])),
|
|
('public', {'private' : True}, set(['private'])),
|
|
('must-revalidate,max-age=60,private', {'public' : True}, set(['must-revalidate', 'max-age=60', 'public'])),
|
|
('must-revalidate,max-age=60,public', {'private' : True}, set(['must-revalidate', 'max-age=60', 'private'])),
|
|
('must-revalidate,max-age=60', {'public' : True}, set(['must-revalidate', 'max-age=60', 'public'])),
|
|
)
|
|
|
|
cc_delim_re = re.compile(r'\s*,\s*')
|
|
|
|
for initial_cc, newheaders, expected_cc in tests:
|
|
response = HttpResponse()
|
|
if initial_cc is not None:
|
|
response['Cache-Control'] = initial_cc
|
|
patch_cache_control(response, **newheaders)
|
|
parts = set(cc_delim_re.split(response['Cache-Control']))
|
|
self.assertEqual(parts, expected_cc)
|
|
|
|
class PrefixedCacheUtils(CacheUtils):
|
|
def setUp(self):
|
|
super(PrefixedCacheUtils, self).setUp()
|
|
self.old_cache_key_prefix = settings.CACHES['default'].get('KEY_PREFIX', None)
|
|
settings.CACHES['default']['KEY_PREFIX'] = 'cacheprefix'
|
|
|
|
def tearDown(self):
|
|
super(PrefixedCacheUtils, self).tearDown()
|
|
if self.old_cache_key_prefix is None:
|
|
del settings.CACHES['default']['KEY_PREFIX']
|
|
else:
|
|
settings.CACHES['default']['KEY_PREFIX'] = self.old_cache_key_prefix
|
|
|
|
class CacheHEADTest(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.orig_cache_middleware_seconds = settings.CACHE_MIDDLEWARE_SECONDS
|
|
self.orig_cache_middleware_key_prefix = settings.CACHE_MIDDLEWARE_KEY_PREFIX
|
|
self.orig_caches = settings.CACHES
|
|
settings.CACHE_MIDDLEWARE_SECONDS = 60
|
|
settings.CACHE_MIDDLEWARE_KEY_PREFIX = 'test'
|
|
settings.CACHES = {
|
|
'default': {
|
|
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'
|
|
}
|
|
}
|
|
self.path = '/cache/test/'
|
|
|
|
def tearDown(self):
|
|
settings.CACHE_MIDDLEWARE_SECONDS = self.orig_cache_middleware_seconds
|
|
settings.CACHE_MIDDLEWARE_KEY_PREFIX = self.orig_cache_middleware_key_prefix
|
|
settings.CACHES = self.orig_caches
|
|
|
|
def _get_request(self, method):
|
|
request = HttpRequest()
|
|
request.META = {
|
|
'SERVER_NAME': 'testserver',
|
|
'SERVER_PORT': 80,
|
|
}
|
|
request.method = method
|
|
request.path = request.path_info = self.path
|
|
return request
|
|
|
|
def _get_request_cache(self, method):
|
|
request = self._get_request(method)
|
|
request._cache_update_cache = True
|
|
return request
|
|
|
|
def _set_cache(self, request, msg):
|
|
response = HttpResponse()
|
|
response.content = msg
|
|
return UpdateCacheMiddleware().process_response(request, response)
|
|
|
|
def test_head_caches_correctly(self):
|
|
test_content = 'test content'
|
|
|
|
request = self._get_request_cache('HEAD')
|
|
self._set_cache(request, test_content)
|
|
|
|
request = self._get_request('HEAD')
|
|
get_cache_data = FetchFromCacheMiddleware().process_request(request)
|
|
self.assertNotEqual(get_cache_data, None)
|
|
self.assertEqual(test_content, get_cache_data.content)
|
|
|
|
def test_head_with_cached_get(self):
|
|
test_content = 'test content'
|
|
|
|
request = self._get_request_cache('GET')
|
|
self._set_cache(request, test_content)
|
|
|
|
request = self._get_request('HEAD')
|
|
get_cache_data = FetchFromCacheMiddleware().process_request(request)
|
|
self.assertNotEqual(get_cache_data, None)
|
|
self.assertEqual(test_content, get_cache_data.content)
|
|
|
|
class CacheI18nTest(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.orig_cache_middleware_seconds = settings.CACHE_MIDDLEWARE_SECONDS
|
|
self.orig_cache_middleware_key_prefix = settings.CACHE_MIDDLEWARE_KEY_PREFIX
|
|
self.orig_caches = settings.CACHES
|
|
self.orig_use_i18n = settings.USE_I18N
|
|
self.orig_languages = settings.LANGUAGES
|
|
settings.LANGUAGES = (
|
|
('en', 'English'),
|
|
('es', 'Spanish'),
|
|
)
|
|
settings.CACHE_MIDDLEWARE_KEY_PREFIX = 'settingsprefix'
|
|
self.path = '/cache/test/'
|
|
|
|
def tearDown(self):
|
|
settings.CACHE_MIDDLEWARE_SECONDS = self.orig_cache_middleware_seconds
|
|
settings.CACHE_MIDDLEWARE_KEY_PREFIX = self.orig_cache_middleware_key_prefix
|
|
settings.CACHES = self.orig_caches
|
|
settings.USE_I18N = self.orig_use_i18n
|
|
settings.LANGUAGES = self.orig_languages
|
|
translation.deactivate()
|
|
|
|
def _get_request(self):
|
|
request = HttpRequest()
|
|
request.META = {
|
|
'SERVER_NAME': 'testserver',
|
|
'SERVER_PORT': 80,
|
|
}
|
|
request.path = request.path_info = self.path
|
|
return request
|
|
|
|
def _get_request_cache(self, query_string=None):
|
|
request = HttpRequest()
|
|
request.META = {
|
|
'SERVER_NAME': 'testserver',
|
|
'SERVER_PORT': 80,
|
|
}
|
|
if query_string:
|
|
request.META['QUERY_STRING'] = query_string
|
|
request.GET = QueryDict(query_string)
|
|
request.path = request.path_info = self.path
|
|
request._cache_update_cache = True
|
|
request.method = 'GET'
|
|
request.session = {}
|
|
return request
|
|
|
|
def test_cache_key_i18n(self):
|
|
settings.USE_I18N = True
|
|
request = self._get_request()
|
|
lang = translation.get_language()
|
|
response = HttpResponse()
|
|
key = learn_cache_key(request, response)
|
|
self.assertTrue(key.endswith(lang), "Cache keys should include the language name when i18n is active")
|
|
key2 = get_cache_key(request)
|
|
self.assertEqual(key, key2)
|
|
|
|
def test_cache_key_no_i18n (self):
|
|
settings.USE_I18N = False
|
|
request = self._get_request()
|
|
lang = translation.get_language()
|
|
response = HttpResponse()
|
|
key = learn_cache_key(request, response)
|
|
self.assertFalse(key.endswith(lang), "Cache keys shouldn't include the language name when i18n is inactive")
|
|
|
|
def test_middleware(self):
|
|
def set_cache(request, lang, msg):
|
|
translation.activate(lang)
|
|
response = HttpResponse()
|
|
response.content= msg
|
|
return UpdateCacheMiddleware().process_response(request, response)
|
|
|
|
settings.CACHE_MIDDLEWARE_SECONDS = 60
|
|
settings.CACHE_MIDDLEWARE_KEY_PREFIX = "test"
|
|
settings.CACHES = {
|
|
'default': {
|
|
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'
|
|
}
|
|
}
|
|
settings.USE_ETAGS = True
|
|
settings.USE_I18N = True
|
|
|
|
# cache with non empty request.GET
|
|
request = self._get_request_cache(query_string='foo=bar&other=true')
|
|
get_cache_data = FetchFromCacheMiddleware().process_request(request)
|
|
# first access, cache must return None
|
|
self.assertEqual(get_cache_data, None)
|
|
response = HttpResponse()
|
|
content = 'Check for cache with QUERY_STRING'
|
|
response.content = content
|
|
UpdateCacheMiddleware().process_response(request, response)
|
|
get_cache_data = FetchFromCacheMiddleware().process_request(request)
|
|
# cache must return content
|
|
self.assertNotEqual(get_cache_data, None)
|
|
self.assertEqual(get_cache_data.content, content)
|
|
# different QUERY_STRING, cache must be empty
|
|
request = self._get_request_cache(query_string='foo=bar&somethingelse=true')
|
|
get_cache_data = FetchFromCacheMiddleware().process_request(request)
|
|
self.assertEqual(get_cache_data, None)
|
|
|
|
# i18n tests
|
|
en_message ="Hello world!"
|
|
es_message ="Hola mundo!"
|
|
|
|
request = self._get_request_cache()
|
|
set_cache(request, 'en', en_message)
|
|
get_cache_data = FetchFromCacheMiddleware().process_request(request)
|
|
# Check that we can recover the cache
|
|
self.assertNotEqual(get_cache_data.content, None)
|
|
self.assertEqual(en_message, get_cache_data.content)
|
|
# Check that we use etags
|
|
self.assertTrue(get_cache_data.has_header('ETag'))
|
|
# Check that we can disable etags
|
|
settings.USE_ETAGS = False
|
|
request._cache_update_cache = True
|
|
set_cache(request, 'en', en_message)
|
|
get_cache_data = FetchFromCacheMiddleware().process_request(request)
|
|
self.assertFalse(get_cache_data.has_header('ETag'))
|
|
# change the session language and set content
|
|
request = self._get_request_cache()
|
|
set_cache(request, 'es', es_message)
|
|
# change again the language
|
|
translation.activate('en')
|
|
# retrieve the content from cache
|
|
get_cache_data = FetchFromCacheMiddleware().process_request(request)
|
|
self.assertEqual(get_cache_data.content, en_message)
|
|
# change again the language
|
|
translation.activate('es')
|
|
get_cache_data = FetchFromCacheMiddleware().process_request(request)
|
|
self.assertEqual(get_cache_data.content, es_message)
|
|
|
|
class PrefixedCacheI18nTest(CacheI18nTest):
|
|
def setUp(self):
|
|
super(PrefixedCacheI18nTest, self).setUp()
|
|
self.old_cache_key_prefix = settings.CACHES['default'].get('KEY_PREFIX', None)
|
|
settings.CACHES['default']['KEY_PREFIX'] = 'cacheprefix'
|
|
|
|
def tearDown(self):
|
|
super(PrefixedCacheI18nTest, self).tearDown()
|
|
if self.old_cache_key_prefix is not None:
|
|
del settings.CACHES['default']['KEY_PREFIX']
|
|
else:
|
|
settings.CACHES['default']['KEY_PREFIX'] = self.old_cache_key_prefix
|
|
|
|
|
|
def hello_world_view(request, value):
|
|
return HttpResponse('Hello World %s' % value)
|
|
|
|
class CacheMiddlewareTest(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.factory = RequestFactory()
|
|
|
|
self.orig_cache_middleware_alias = settings.CACHE_MIDDLEWARE_ALIAS
|
|
self.orig_cache_middleware_key_prefix = settings.CACHE_MIDDLEWARE_KEY_PREFIX
|
|
self.orig_cache_middleware_seconds = settings.CACHE_MIDDLEWARE_SECONDS
|
|
self.orig_cache_middleware_anonymous_only = getattr(settings, 'CACHE_MIDDLEWARE_ANONYMOUS_ONLY', False)
|
|
self.orig_caches = settings.CACHES
|
|
|
|
settings.CACHE_MIDDLEWARE_ALIAS = 'other'
|
|
settings.CACHE_MIDDLEWARE_KEY_PREFIX = 'middlewareprefix'
|
|
settings.CACHE_MIDDLEWARE_SECONDS = 30
|
|
settings.CACHE_MIDDLEWARE_ANONYMOUS_ONLY = False
|
|
settings.CACHES = {
|
|
'default': {
|
|
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'
|
|
},
|
|
'other': {
|
|
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
|
|
'LOCATION': 'other',
|
|
'TIMEOUT': '1'
|
|
}
|
|
}
|
|
|
|
def tearDown(self):
|
|
settings.CACHE_MIDDLEWARE_ALIAS = self.orig_cache_middleware_alias
|
|
settings.CACHE_MIDDLEWARE_KEY_PREFIX = self.orig_cache_middleware_key_prefix
|
|
settings.CACHE_MIDDLEWARE_SECONDS = self.orig_cache_middleware_seconds
|
|
settings.CACHE_MIDDLEWARE_ANONYMOUS_ONLY = self.orig_cache_middleware_anonymous_only
|
|
settings.CACHES = self.orig_caches
|
|
|
|
def test_constructor(self):
|
|
"""
|
|
Ensure the constructor is correctly distinguishing between usage of CacheMiddleware as
|
|
Middleware vs. usage of CacheMiddleware as view decorator and setting attributes
|
|
appropriately.
|
|
"""
|
|
# If no arguments are passed in construction, it's being used as middleware.
|
|
middleware = CacheMiddleware()
|
|
|
|
# Now test object attributes against values defined in setUp above
|
|
self.assertEqual(middleware.cache_timeout, 30)
|
|
self.assertEqual(middleware.key_prefix, 'middlewareprefix')
|
|
self.assertEqual(middleware.cache_alias, 'other')
|
|
self.assertEqual(middleware.cache_anonymous_only, False)
|
|
|
|
# If arguments are being passed in construction, it's being used as a decorator.
|
|
# First, test with "defaults":
|
|
as_view_decorator = CacheMiddleware(cache_alias=None, key_prefix=None)
|
|
|
|
self.assertEqual(as_view_decorator.cache_timeout, 300) # Timeout value for 'default' cache, i.e. 300
|
|
self.assertEqual(as_view_decorator.key_prefix, '')
|
|
self.assertEqual(as_view_decorator.cache_alias, 'default') # Value of DEFAULT_CACHE_ALIAS from django.core.cache
|
|
self.assertEqual(as_view_decorator.cache_anonymous_only, False)
|
|
|
|
# Next, test with custom values:
|
|
as_view_decorator_with_custom = CacheMiddleware(cache_anonymous_only=True, cache_timeout=60, cache_alias='other', key_prefix='foo')
|
|
|
|
self.assertEqual(as_view_decorator_with_custom.cache_timeout, 60)
|
|
self.assertEqual(as_view_decorator_with_custom.key_prefix, 'foo')
|
|
self.assertEqual(as_view_decorator_with_custom.cache_alias, 'other')
|
|
self.assertEqual(as_view_decorator_with_custom.cache_anonymous_only, True)
|
|
|
|
def test_middleware(self):
|
|
middleware = CacheMiddleware()
|
|
prefix_middleware = CacheMiddleware(key_prefix='prefix1')
|
|
timeout_middleware = CacheMiddleware(cache_timeout=1)
|
|
|
|
request = self.factory.get('/view/')
|
|
|
|
# Put the request through the request middleware
|
|
result = middleware.process_request(request)
|
|
self.assertEqual(result, None)
|
|
|
|
response = hello_world_view(request, '1')
|
|
|
|
# Now put the response through the response middleware
|
|
response = middleware.process_response(request, response)
|
|
|
|
# Repeating the request should result in a cache hit
|
|
result = middleware.process_request(request)
|
|
self.assertNotEquals(result, None)
|
|
self.assertEqual(result.content, 'Hello World 1')
|
|
|
|
# The same request through a different middleware won't hit
|
|
result = prefix_middleware.process_request(request)
|
|
self.assertEqual(result, None)
|
|
|
|
# The same request with a timeout _will_ hit
|
|
result = timeout_middleware.process_request(request)
|
|
self.assertNotEquals(result, None)
|
|
self.assertEqual(result.content, 'Hello World 1')
|
|
|
|
def test_cache_middleware_anonymous_only_wont_cause_session_access(self):
|
|
""" The cache middleware shouldn't cause a session access due to
|
|
CACHE_MIDDLEWARE_ANONYMOUS_ONLY if nothing else has accessed the
|
|
session. Refs 13283 """
|
|
settings.CACHE_MIDDLEWARE_ANONYMOUS_ONLY = True
|
|
|
|
from django.contrib.sessions.middleware import SessionMiddleware
|
|
from django.contrib.auth.middleware import AuthenticationMiddleware
|
|
|
|
middleware = CacheMiddleware()
|
|
session_middleware = SessionMiddleware()
|
|
auth_middleware = AuthenticationMiddleware()
|
|
|
|
request = self.factory.get('/view_anon/')
|
|
|
|
# Put the request through the request middleware
|
|
session_middleware.process_request(request)
|
|
auth_middleware.process_request(request)
|
|
result = middleware.process_request(request)
|
|
self.assertEqual(result, None)
|
|
|
|
response = hello_world_view(request, '1')
|
|
|
|
# Now put the response through the response middleware
|
|
session_middleware.process_response(request, response)
|
|
response = middleware.process_response(request, response)
|
|
|
|
self.assertEqual(request.session.accessed, False)
|
|
|
|
def test_cache_middleware_anonymous_only_with_cache_page(self):
|
|
"""CACHE_MIDDLEWARE_ANONYMOUS_ONLY should still be effective when used
|
|
with the cache_page decorator: the response to a request from an
|
|
authenticated user should not be cached."""
|
|
settings.CACHE_MIDDLEWARE_ANONYMOUS_ONLY = True
|
|
|
|
request = self.factory.get('/view_anon/')
|
|
|
|
class MockAuthenticatedUser(object):
|
|
def is_authenticated(self):
|
|
return True
|
|
|
|
class MockAccessedSession(object):
|
|
accessed = True
|
|
|
|
request.user = MockAuthenticatedUser()
|
|
request.session = MockAccessedSession()
|
|
|
|
response = cache_page(hello_world_view)(request, '1')
|
|
|
|
self.assertFalse("Cache-Control" in response)
|
|
|
|
def test_view_decorator(self):
|
|
# decorate the same view with different cache decorators
|
|
default_view = cache_page(hello_world_view)
|
|
default_with_prefix_view = cache_page(key_prefix='prefix1')(hello_world_view)
|
|
|
|
explicit_default_view = cache_page(cache='default')(hello_world_view)
|
|
explicit_default_with_prefix_view = cache_page(cache='default', key_prefix='prefix1')(hello_world_view)
|
|
|
|
other_view = cache_page(cache='other')(hello_world_view)
|
|
other_with_prefix_view = cache_page(cache='other', key_prefix='prefix2')(hello_world_view)
|
|
other_with_timeout_view = cache_page(3, cache='other', key_prefix='prefix3')(hello_world_view)
|
|
|
|
request = self.factory.get('/view/')
|
|
|
|
# Request the view once
|
|
response = default_view(request, '1')
|
|
self.assertEqual(response.content, 'Hello World 1')
|
|
|
|
# Request again -- hit the cache
|
|
response = default_view(request, '2')
|
|
self.assertEqual(response.content, 'Hello World 1')
|
|
|
|
# Requesting the same view with the explicit cache should yield the same result
|
|
response = explicit_default_view(request, '3')
|
|
self.assertEqual(response.content, 'Hello World 1')
|
|
|
|
# Requesting with a prefix will hit a different cache key
|
|
response = explicit_default_with_prefix_view(request, '4')
|
|
self.assertEqual(response.content, 'Hello World 4')
|
|
|
|
# Hitting the same view again gives a cache hit
|
|
response = explicit_default_with_prefix_view(request, '5')
|
|
self.assertEqual(response.content, 'Hello World 4')
|
|
|
|
# And going back to the implicit cache will hit the same cache
|
|
response = default_with_prefix_view(request, '6')
|
|
self.assertEqual(response.content, 'Hello World 4')
|
|
|
|
# Requesting from an alternate cache won't hit cache
|
|
response = other_view(request, '7')
|
|
self.assertEqual(response.content, 'Hello World 7')
|
|
|
|
# But a repeated hit will hit cache
|
|
response = other_view(request, '8')
|
|
self.assertEqual(response.content, 'Hello World 7')
|
|
|
|
# And prefixing the alternate cache yields yet another cache entry
|
|
response = other_with_prefix_view(request, '9')
|
|
self.assertEqual(response.content, 'Hello World 9')
|
|
|
|
# Request from the alternate cache with a new prefix and a custom timeout
|
|
response = other_with_timeout_view(request, '10')
|
|
self.assertEqual(response.content, 'Hello World 10')
|
|
|
|
# But if we wait a couple of seconds...
|
|
time.sleep(2)
|
|
|
|
# ... the default cache will still hit
|
|
cache = get_cache('default')
|
|
response = default_view(request, '11')
|
|
self.assertEqual(response.content, 'Hello World 1')
|
|
|
|
# ... the default cache with a prefix will still hit
|
|
response = default_with_prefix_view(request, '12')
|
|
self.assertEqual(response.content, 'Hello World 4')
|
|
|
|
# ... the explicit default cache will still hit
|
|
response = explicit_default_view(request, '13')
|
|
self.assertEqual(response.content, 'Hello World 1')
|
|
|
|
# ... the explicit default cache with a prefix will still hit
|
|
response = explicit_default_with_prefix_view(request, '14')
|
|
self.assertEqual(response.content, 'Hello World 4')
|
|
|
|
# .. but a rapidly expiring cache won't hit
|
|
response = other_view(request, '15')
|
|
self.assertEqual(response.content, 'Hello World 15')
|
|
|
|
# .. even if it has a prefix
|
|
response = other_with_prefix_view(request, '16')
|
|
self.assertEqual(response.content, 'Hello World 16')
|
|
|
|
# ... but a view with a custom timeout will still hit
|
|
response = other_with_timeout_view(request, '17')
|
|
self.assertEqual(response.content, 'Hello World 10')
|
|
|
|
# And if we wait a few more seconds
|
|
time.sleep(2)
|
|
|
|
# the custom timeouot cache will miss
|
|
response = other_with_timeout_view(request, '18')
|
|
self.assertEqual(response.content, 'Hello World 18')
|
|
|
|
|
|
class TestWithTemplateResponse(TestCase):
|
|
"""
|
|
Tests various headers w/ TemplateResponse.
|
|
|
|
Most are probably redundant since they manipulate the same object
|
|
anyway but the Etag header is 'special' because it relies on the
|
|
content being complete (which is not necessarily always the case
|
|
with a TemplateResponse)
|
|
"""
|
|
def setUp(self):
|
|
self.path = '/cache/test/'
|
|
|
|
def _get_request(self, path, method='GET'):
|
|
request = HttpRequest()
|
|
request.META = {
|
|
'SERVER_NAME': 'testserver',
|
|
'SERVER_PORT': 80,
|
|
}
|
|
request.method = method
|
|
request.path = request.path_info = "/cache/%s" % path
|
|
return request
|
|
|
|
def test_patch_vary_headers(self):
|
|
headers = (
|
|
# Initial vary, new headers, resulting vary.
|
|
(None, ('Accept-Encoding',), 'Accept-Encoding'),
|
|
('Accept-Encoding', ('accept-encoding',), 'Accept-Encoding'),
|
|
('Accept-Encoding', ('ACCEPT-ENCODING',), 'Accept-Encoding'),
|
|
('Cookie', ('Accept-Encoding',), 'Cookie, Accept-Encoding'),
|
|
('Cookie, Accept-Encoding', ('Accept-Encoding',), 'Cookie, Accept-Encoding'),
|
|
('Cookie, Accept-Encoding', ('Accept-Encoding', 'cookie'), 'Cookie, Accept-Encoding'),
|
|
(None, ('Accept-Encoding', 'COOKIE'), 'Accept-Encoding, COOKIE'),
|
|
('Cookie, Accept-Encoding', ('Accept-Encoding', 'cookie'), 'Cookie, Accept-Encoding'),
|
|
('Cookie , Accept-Encoding', ('Accept-Encoding', 'cookie'), 'Cookie, Accept-Encoding'),
|
|
)
|
|
for initial_vary, newheaders, resulting_vary in headers:
|
|
response = TemplateResponse(HttpResponse(), Template("This is a test"))
|
|
if initial_vary is not None:
|
|
response['Vary'] = initial_vary
|
|
patch_vary_headers(response, newheaders)
|
|
self.assertEqual(response['Vary'], resulting_vary)
|
|
|
|
def test_get_cache_key(self):
|
|
request = self._get_request(self.path)
|
|
response = TemplateResponse(HttpResponse(), Template("This is a test"))
|
|
key_prefix = 'localprefix'
|
|
# Expect None if no headers have been set yet.
|
|
self.assertEqual(get_cache_key(request), None)
|
|
# Set headers to an empty list.
|
|
learn_cache_key(request, response)
|
|
self.assertEqual(get_cache_key(request), 'views.decorators.cache.cache_page.settingsprefix.GET.a8c87a3d8c44853d7f79474f7ffe4ad5.d41d8cd98f00b204e9800998ecf8427e')
|
|
# Verify that a specified key_prefix is taken into account.
|
|
learn_cache_key(request, response, key_prefix=key_prefix)
|
|
self.assertEqual(get_cache_key(request, key_prefix=key_prefix), 'views.decorators.cache.cache_page.localprefix.GET.a8c87a3d8c44853d7f79474f7ffe4ad5.d41d8cd98f00b204e9800998ecf8427e')
|
|
|
|
def test_get_cache_key_with_query(self):
|
|
request = self._get_request(self.path + '?test=1')
|
|
response = TemplateResponse(HttpResponse(), Template("This is a test"))
|
|
# Expect None if no headers have been set yet.
|
|
self.assertEqual(get_cache_key(request), None)
|
|
# Set headers to an empty list.
|
|
learn_cache_key(request, response)
|
|
# Verify that the querystring is taken into account.
|
|
self.assertEqual(get_cache_key(request), 'views.decorators.cache.cache_page.settingsprefix.GET.bd889c5a59603af44333ed21504db3cd.d41d8cd98f00b204e9800998ecf8427e')
|
|
|
|
@override_settings(USE_ETAGS=False)
|
|
def test_without_etag(self):
|
|
response = TemplateResponse(HttpResponse(), Template("This is a test"))
|
|
self.assertFalse(response.has_header('ETag'))
|
|
patch_response_headers(response)
|
|
self.assertFalse(response.has_header('ETag'))
|
|
response = response.render()
|
|
self.assertFalse(response.has_header('ETag'))
|
|
|
|
@override_settings(USE_ETAGS=True)
|
|
def test_with_etag(self):
|
|
response = TemplateResponse(HttpResponse(), Template("This is a test"))
|
|
self.assertFalse(response.has_header('ETag'))
|
|
patch_response_headers(response)
|
|
self.assertFalse(response.has_header('ETag'))
|
|
response = response.render()
|
|
self.assertTrue(response.has_header('ETag'))
|
|
|
|
TestWithTemplateResponse = override_settings(
|
|
CACHE_MIDDLEWARE_KEY_PREFIX='settingsprefix',
|
|
CACHE_MIDDLEWARE_SECONDS=1,
|
|
USE_I18N=False,
|
|
)(TestWithTemplateResponse)
|
|
|
|
|
|
class TestEtagWithAdmin(TestCase):
|
|
# See https://code.djangoproject.com/ticket/16003
|
|
urls = "regressiontests.admin_views.urls"
|
|
|
|
def test_admin(self):
|
|
with self.settings(USE_ETAGS=False):
|
|
response = self.client.get('/test_admin/admin/')
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertFalse(response.has_header('ETag'))
|
|
|
|
with self.settings(USE_ETAGS=True):
|
|
response = self.client.get('/test_admin/admin/')
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertTrue(response.has_header('ETag'))
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|
|
|