mirror of
https://github.com/django/django.git
synced 2025-07-05 18:29:11 +00:00
[soc2009/multidb] Updated content types to be multidb aware. Patch from Russell Keith-Magee.
git-svn-id: http://code.djangoproject.com/svn/django/branches/soc2009/multidb@11889 bcc190cf-cafb-0310-a4f2-bffc1f526a37
This commit is contained in:
parent
f9412b4d21
commit
2a99b2ba5b
@ -5,7 +5,7 @@ Classes allowing "generic" relations through ContentType and object-id fields.
|
|||||||
from django.core.exceptions import ObjectDoesNotExist
|
from django.core.exceptions import ObjectDoesNotExist
|
||||||
from django.db import connection
|
from django.db import connection
|
||||||
from django.db.models import signals
|
from django.db.models import signals
|
||||||
from django.db import models
|
from django.db import models, DEFAULT_DB_ALIAS
|
||||||
from django.db.models.fields.related import RelatedField, Field, ManyToManyRel
|
from django.db.models.fields.related import RelatedField, Field, ManyToManyRel
|
||||||
from django.db.models.loading import get_model
|
from django.db.models.loading import get_model
|
||||||
from django.forms import ModelForm
|
from django.forms import ModelForm
|
||||||
@ -45,14 +45,14 @@ class GenericForeignKey(object):
|
|||||||
kwargs[self.ct_field] = self.get_content_type(obj=value)
|
kwargs[self.ct_field] = self.get_content_type(obj=value)
|
||||||
kwargs[self.fk_field] = value._get_pk_val()
|
kwargs[self.fk_field] = value._get_pk_val()
|
||||||
|
|
||||||
def get_content_type(self, obj=None, id=None):
|
def get_content_type(self, obj=None, id=None, using=DEFAULT_DB_ALIAS):
|
||||||
# Convenience function using get_model avoids a circular import when
|
# Convenience function using get_model avoids a circular import when
|
||||||
# using this model
|
# using this model
|
||||||
ContentType = get_model("contenttypes", "contenttype")
|
ContentType = get_model("contenttypes", "contenttype")
|
||||||
if obj:
|
if obj:
|
||||||
return ContentType.objects.get_for_model(obj)
|
return ContentType.objects.get_for_model(obj, using=obj._state.db)
|
||||||
elif id:
|
elif id:
|
||||||
return ContentType.objects.get_for_id(id)
|
return ContentType.objects.get_for_id(id, using=using)
|
||||||
else:
|
else:
|
||||||
# This should never happen. I love comments like this, don't you?
|
# This should never happen. I love comments like this, don't you?
|
||||||
raise Exception("Impossible arguments to GFK.get_content_type!")
|
raise Exception("Impossible arguments to GFK.get_content_type!")
|
||||||
@ -73,7 +73,7 @@ class GenericForeignKey(object):
|
|||||||
f = self.model._meta.get_field(self.ct_field)
|
f = self.model._meta.get_field(self.ct_field)
|
||||||
ct_id = getattr(instance, f.get_attname(), None)
|
ct_id = getattr(instance, f.get_attname(), None)
|
||||||
if ct_id:
|
if ct_id:
|
||||||
ct = self.get_content_type(id=ct_id)
|
ct = self.get_content_type(id=ct_id, using=instance._state.db)
|
||||||
try:
|
try:
|
||||||
rel_obj = ct.get_object_for_this_type(pk=getattr(instance, self.fk_field))
|
rel_obj = ct.get_object_for_this_type(pk=getattr(instance, self.fk_field))
|
||||||
except ObjectDoesNotExist:
|
except ObjectDoesNotExist:
|
||||||
@ -201,11 +201,10 @@ class ReverseGenericRelatedObjectsDescriptor(object):
|
|||||||
join_table = qn(self.field.m2m_db_table()),
|
join_table = qn(self.field.m2m_db_table()),
|
||||||
source_col_name = qn(self.field.m2m_column_name()),
|
source_col_name = qn(self.field.m2m_column_name()),
|
||||||
target_col_name = qn(self.field.m2m_reverse_name()),
|
target_col_name = qn(self.field.m2m_reverse_name()),
|
||||||
content_type = ContentType.objects.get_for_model(instance),
|
content_type = ContentType.objects.get_for_model(instance, using=instance._state.db),
|
||||||
content_type_field_name = self.field.content_type_field_name,
|
content_type_field_name = self.field.content_type_field_name,
|
||||||
object_id_field_name = self.field.object_id_field_name
|
object_id_field_name = self.field.object_id_field_name
|
||||||
)
|
)
|
||||||
|
|
||||||
return manager
|
return manager
|
||||||
|
|
||||||
def __set__(self, instance, value):
|
def __set__(self, instance, value):
|
||||||
@ -247,7 +246,7 @@ def create_generic_related_manager(superclass):
|
|||||||
'%s__pk' % self.content_type_field_name : self.content_type.id,
|
'%s__pk' % self.content_type_field_name : self.content_type.id,
|
||||||
'%s__exact' % self.object_id_field_name : self.pk_val,
|
'%s__exact' % self.object_id_field_name : self.pk_val,
|
||||||
}
|
}
|
||||||
return superclass.get_query_set(self).filter(**query)
|
return superclass.get_query_set(self).using(self.instance._state.db).filter(**query)
|
||||||
|
|
||||||
def add(self, *objs):
|
def add(self, *objs):
|
||||||
for obj in objs:
|
for obj in objs:
|
||||||
@ -255,17 +254,17 @@ def create_generic_related_manager(superclass):
|
|||||||
raise TypeError, "'%s' instance expected" % self.model._meta.object_name
|
raise TypeError, "'%s' instance expected" % self.model._meta.object_name
|
||||||
setattr(obj, self.content_type_field_name, self.content_type)
|
setattr(obj, self.content_type_field_name, self.content_type)
|
||||||
setattr(obj, self.object_id_field_name, self.pk_val)
|
setattr(obj, self.object_id_field_name, self.pk_val)
|
||||||
obj.save()
|
obj.save(using=self.instance._state.db)
|
||||||
add.alters_data = True
|
add.alters_data = True
|
||||||
|
|
||||||
def remove(self, *objs):
|
def remove(self, *objs):
|
||||||
for obj in objs:
|
for obj in objs:
|
||||||
obj.delete()
|
obj.delete(using=self.instance._state.db)
|
||||||
remove.alters_data = True
|
remove.alters_data = True
|
||||||
|
|
||||||
def clear(self):
|
def clear(self):
|
||||||
for obj in self.all():
|
for obj in self.all():
|
||||||
obj.delete()
|
obj.delete(using=self.instance._state.db)
|
||||||
clear.alters_data = True
|
clear.alters_data = True
|
||||||
|
|
||||||
def create(self, **kwargs):
|
def create(self, **kwargs):
|
||||||
|
@ -10,18 +10,19 @@ def update_contenttypes(app, created_models, verbosity=2, **kwargs):
|
|||||||
ContentType.objects.clear_cache()
|
ContentType.objects.clear_cache()
|
||||||
content_types = list(ContentType.objects.filter(app_label=app.__name__.split('.')[-2]))
|
content_types = list(ContentType.objects.filter(app_label=app.__name__.split('.')[-2]))
|
||||||
app_models = get_models(app)
|
app_models = get_models(app)
|
||||||
|
db = kwargs['db']
|
||||||
if not app_models:
|
if not app_models:
|
||||||
return
|
return
|
||||||
for klass in app_models:
|
for klass in app_models:
|
||||||
opts = klass._meta
|
opts = klass._meta
|
||||||
try:
|
try:
|
||||||
ct = ContentType.objects.get(app_label=opts.app_label,
|
ct = ContentType.objects.using(db).get(app_label=opts.app_label,
|
||||||
model=opts.object_name.lower())
|
model=opts.object_name.lower())
|
||||||
content_types.remove(ct)
|
content_types.remove(ct)
|
||||||
except ContentType.DoesNotExist:
|
except ContentType.DoesNotExist:
|
||||||
ct = ContentType(name=smart_unicode(opts.verbose_name_raw),
|
ct = ContentType(name=smart_unicode(opts.verbose_name_raw),
|
||||||
app_label=opts.app_label, model=opts.object_name.lower())
|
app_label=opts.app_label, model=opts.object_name.lower())
|
||||||
ct.save()
|
ct.save(using=db)
|
||||||
if verbosity >= 2:
|
if verbosity >= 2:
|
||||||
print "Adding content type '%s | %s'" % (ct.app_label, ct.model)
|
print "Adding content type '%s | %s'" % (ct.app_label, ct.model)
|
||||||
# The presence of any remaining content types means the supplied app has an
|
# The presence of any remaining content types means the supplied app has an
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
from django.db import models
|
from django.db import models, DEFAULT_DB_ALIAS
|
||||||
from django.utils.translation import ugettext_lazy as _
|
from django.utils.translation import ugettext_lazy as _
|
||||||
from django.utils.encoding import smart_unicode
|
from django.utils.encoding import smart_unicode
|
||||||
|
|
||||||
@ -8,14 +8,14 @@ class ContentTypeManager(models.Manager):
|
|||||||
# This cache is shared by all the get_for_* methods.
|
# This cache is shared by all the get_for_* methods.
|
||||||
_cache = {}
|
_cache = {}
|
||||||
|
|
||||||
def get_by_natural_key(self, app_label, model):
|
def get_by_natural_key(self, app_label, model, using=DEFAULT_DB_ALIAS):
|
||||||
try:
|
try:
|
||||||
ct = self.__class__._cache[(app_label, model)]
|
ct = self.__class__._cache[using][(app_label, model)]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
ct = self.get(app_label=app_label, model=model)
|
ct = self.using(using).get(app_label=app_label, model=model)
|
||||||
return ct
|
return ct
|
||||||
|
|
||||||
def get_for_model(self, model):
|
def get_for_model(self, model, using=DEFAULT_DB_ALIAS):
|
||||||
"""
|
"""
|
||||||
Returns the ContentType object for a given model, creating the
|
Returns the ContentType object for a given model, creating the
|
||||||
ContentType if necessary. Lookups are cached so that subsequent lookups
|
ContentType if necessary. Lookups are cached so that subsequent lookups
|
||||||
@ -27,32 +27,33 @@ class ContentTypeManager(models.Manager):
|
|||||||
opts = model._meta
|
opts = model._meta
|
||||||
key = (opts.app_label, opts.object_name.lower())
|
key = (opts.app_label, opts.object_name.lower())
|
||||||
try:
|
try:
|
||||||
ct = self.__class__._cache[key]
|
ct = self.__class__._cache[using][key]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
# Load or create the ContentType entry. The smart_unicode() is
|
# Load or create the ContentType entry. The smart_unicode() is
|
||||||
# needed around opts.verbose_name_raw because name_raw might be a
|
# needed around opts.verbose_name_raw because name_raw might be a
|
||||||
# django.utils.functional.__proxy__ object.
|
# django.utils.functional.__proxy__ object.
|
||||||
ct, created = self.get_or_create(
|
ct, created = self.using(using).get_or_create(
|
||||||
app_label = opts.app_label,
|
app_label = opts.app_label,
|
||||||
model = opts.object_name.lower(),
|
model = opts.object_name.lower(),
|
||||||
defaults = {'name': smart_unicode(opts.verbose_name_raw)},
|
defaults = {'name': smart_unicode(opts.verbose_name_raw)},
|
||||||
)
|
)
|
||||||
self._add_to_cache(ct)
|
self._add_to_cache(using, ct)
|
||||||
|
|
||||||
return ct
|
return ct
|
||||||
|
|
||||||
def get_for_id(self, id):
|
def get_for_id(self, id, using=DEFAULT_DB_ALIAS):
|
||||||
"""
|
"""
|
||||||
Lookup a ContentType by ID. Uses the same shared cache as get_for_model
|
Lookup a ContentType by ID. Uses the same shared cache as get_for_model
|
||||||
(though ContentTypes are obviously not created on-the-fly by get_by_id).
|
(though ContentTypes are obviously not created on-the-fly by get_by_id).
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
ct = self.__class__._cache[id]
|
ct = self.__class__._cache[using][id]
|
||||||
|
|
||||||
except KeyError:
|
except KeyError:
|
||||||
# This could raise a DoesNotExist; that's correct behavior and will
|
# This could raise a DoesNotExist; that's correct behavior and will
|
||||||
# make sure that only correct ctypes get stored in the cache dict.
|
# make sure that only correct ctypes get stored in the cache dict.
|
||||||
ct = self.get(pk=id)
|
ct = self.using(using).get(pk=id)
|
||||||
self._add_to_cache(ct)
|
self._add_to_cache(using, ct)
|
||||||
return ct
|
return ct
|
||||||
|
|
||||||
def clear_cache(self):
|
def clear_cache(self):
|
||||||
@ -64,12 +65,12 @@ class ContentTypeManager(models.Manager):
|
|||||||
"""
|
"""
|
||||||
self.__class__._cache.clear()
|
self.__class__._cache.clear()
|
||||||
|
|
||||||
def _add_to_cache(self, ct):
|
def _add_to_cache(self, using, ct):
|
||||||
"""Insert a ContentType into the cache."""
|
"""Insert a ContentType into the cache."""
|
||||||
model = ct.model_class()
|
model = ct.model_class()
|
||||||
key = (model._meta.app_label, model._meta.object_name.lower())
|
key = (model._meta.app_label, model._meta.object_name.lower())
|
||||||
self.__class__._cache[key] = ct
|
self.__class__._cache.setdefault(using, {})[key] = ct
|
||||||
self.__class__._cache[ct.id] = ct
|
self.__class__._cache.setdefault(using, {})[ct.id] = ct
|
||||||
|
|
||||||
class ContentType(models.Model):
|
class ContentType(models.Model):
|
||||||
name = models.CharField(max_length=100)
|
name = models.CharField(max_length=100)
|
||||||
@ -99,7 +100,7 @@ class ContentType(models.Model):
|
|||||||
method. The ObjectNotExist exception, if thrown, will not be caught,
|
method. The ObjectNotExist exception, if thrown, will not be caught,
|
||||||
so code that calls this method should catch it.
|
so code that calls this method should catch it.
|
||||||
"""
|
"""
|
||||||
return self.model_class()._default_manager.get(**kwargs)
|
return self.model_class()._default_manager.using(self._state.db).get(**kwargs)
|
||||||
|
|
||||||
def natural_key(self):
|
def natural_key(self):
|
||||||
return (self.app_label, self.model)
|
return (self.app_label, self.model)
|
||||||
|
@ -183,12 +183,16 @@ The ``ContentTypeManager``
|
|||||||
probably won't ever need to call this method yourself; Django will call
|
probably won't ever need to call this method yourself; Django will call
|
||||||
it automatically when it's needed.
|
it automatically when it's needed.
|
||||||
|
|
||||||
.. method:: models.ContentTypeManager.get_for_model(model)
|
.. method:: models.ContentTypeManager.get_for_model(model, using=DEFAULT_DB_ALIAS)
|
||||||
|
|
||||||
Takes either a model class or an instance of a model, and returns the
|
Takes either a model class or an instance of a model, and returns the
|
||||||
:class:`~django.contrib.contenttypes.models.ContentType` instance
|
:class:`~django.contrib.contenttypes.models.ContentType` instance
|
||||||
representing that model.
|
representing that model.
|
||||||
|
|
||||||
|
By default, this will find the content type on the default database.
|
||||||
|
You can load an instance from a different database by providing
|
||||||
|
a ``using`` argument.
|
||||||
|
|
||||||
The :meth:`~models.ContentTypeManager.get_for_model()` method is especially useful when you know you
|
The :meth:`~models.ContentTypeManager.get_for_model()` method is especially useful when you know you
|
||||||
need to work with a :class:`ContentType <django.contrib.contenttypes.models.ContentType>` but don't want to go to the
|
need to work with a :class:`ContentType <django.contrib.contenttypes.models.ContentType>` but don't want to go to the
|
||||||
trouble of obtaining the model's metadata to perform a manual lookup::
|
trouble of obtaining the model's metadata to perform a manual lookup::
|
||||||
|
@ -1,10 +1,25 @@
|
|||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
|
from django.contrib.contenttypes.models import ContentType
|
||||||
|
from django.contrib.contenttypes import generic
|
||||||
from django.db import models, DEFAULT_DB_ALIAS
|
from django.db import models, DEFAULT_DB_ALIAS
|
||||||
|
|
||||||
|
class Review(models.Model):
|
||||||
|
source = models.CharField(max_length=100)
|
||||||
|
content_type = models.ForeignKey(ContentType)
|
||||||
|
object_id = models.PositiveIntegerField()
|
||||||
|
content_object = generic.GenericForeignKey()
|
||||||
|
|
||||||
|
def __unicode__(self):
|
||||||
|
return self.source
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
ordering = ('source',)
|
||||||
|
|
||||||
class Book(models.Model):
|
class Book(models.Model):
|
||||||
title = models.CharField(max_length=100)
|
title = models.CharField(max_length=100)
|
||||||
published = models.DateField()
|
published = models.DateField()
|
||||||
authors = models.ManyToManyField('Author')
|
authors = models.ManyToManyField('Author')
|
||||||
|
reviews = generic.GenericRelation(Review)
|
||||||
|
|
||||||
def __unicode__(self):
|
def __unicode__(self):
|
||||||
return self.title
|
return self.title
|
||||||
|
@ -5,7 +5,7 @@ from django.conf import settings
|
|||||||
from django.db import connections
|
from django.db import connections
|
||||||
from django.test import TestCase
|
from django.test import TestCase
|
||||||
|
|
||||||
from models import Book, Author
|
from models import Book, Author, Review
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# we only have these models if the user is using multi-db, it's safe the
|
# we only have these models if the user is using multi-db, it's safe the
|
||||||
@ -258,6 +258,53 @@ class QueryTestCase(TestCase):
|
|||||||
self.assertEquals(list(Author.objects.using('other').filter(book__title='Dive into HTML5').values_list('name', flat=True)),
|
self.assertEquals(list(Author.objects.using('other').filter(book__title='Dive into HTML5').values_list('name', flat=True)),
|
||||||
[u'Mark Pilgrim'])
|
[u'Mark Pilgrim'])
|
||||||
|
|
||||||
|
def test_m2m_cross_database_protection(self):
|
||||||
|
"Operations that involve sharing M2M objects across databases raise an error"
|
||||||
|
# Create a book and author on the default database
|
||||||
|
pro = Book.objects.create(title="Pro Django",
|
||||||
|
published=datetime.date(2008, 12, 16))
|
||||||
|
|
||||||
|
marty = Author.objects.create(name="Marty Alchin")
|
||||||
|
|
||||||
|
# Create a book and author on the other database
|
||||||
|
dive = Book.objects.using('other').create(title="Dive into Python",
|
||||||
|
published=datetime.date(2009, 5, 4))
|
||||||
|
|
||||||
|
mark = Author.objects.using('other').create(name="Mark Pilgrim")
|
||||||
|
# Set a foreign key set with an object from a different database
|
||||||
|
try:
|
||||||
|
marty.book_set = [pro, dive]
|
||||||
|
self.fail("Shouldn't be able to assign across databases")
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Add to an m2m with an object from a different database
|
||||||
|
try:
|
||||||
|
marty.book_set.add(dive)
|
||||||
|
self.fail("Shouldn't be able to assign across databases")
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Set a m2m with an object from a different database
|
||||||
|
try:
|
||||||
|
marty.book_set = [pro, dive]
|
||||||
|
self.fail("Shouldn't be able to assign across databases")
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Add to a reverse m2m with an object from a different database
|
||||||
|
try:
|
||||||
|
dive.authors.add(marty)
|
||||||
|
self.fail("Shouldn't be able to assign across databases")
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Set a reverse m2m with an object from a different database
|
||||||
|
try:
|
||||||
|
dive.authors = [mark, marty]
|
||||||
|
self.fail("Shouldn't be able to assign across databases")
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
def test_foreign_key_separation(self):
|
def test_foreign_key_separation(self):
|
||||||
"FK fields are constrained to a single database"
|
"FK fields are constrained to a single database"
|
||||||
@ -351,54 +398,6 @@ class QueryTestCase(TestCase):
|
|||||||
self.assertEquals(list(Book.objects.using('other').filter(favourite_of__name='Jane Brown').values_list('title', flat=True)),
|
self.assertEquals(list(Book.objects.using('other').filter(favourite_of__name='Jane Brown').values_list('title', flat=True)),
|
||||||
[u'Dive into Python'])
|
[u'Dive into Python'])
|
||||||
|
|
||||||
def test_m2m_cross_database_protection(self):
|
|
||||||
"Operations that involve sharing M2M objects across databases raise an error"
|
|
||||||
# Create a book and author on the default database
|
|
||||||
pro = Book.objects.create(title="Pro Django",
|
|
||||||
published=datetime.date(2008, 12, 16))
|
|
||||||
|
|
||||||
marty = Author.objects.create(name="Marty Alchin")
|
|
||||||
|
|
||||||
# Create a book and author on the other database
|
|
||||||
dive = Book.objects.using('other').create(title="Dive into Python",
|
|
||||||
published=datetime.date(2009, 5, 4))
|
|
||||||
|
|
||||||
mark = Author.objects.using('other').create(name="Mark Pilgrim")
|
|
||||||
# Set a foreign key set with an object from a different database
|
|
||||||
try:
|
|
||||||
marty.book_set = [pro, dive]
|
|
||||||
self.fail("Shouldn't be able to assign across databases")
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Add to an m2m with an object from a different database
|
|
||||||
try:
|
|
||||||
marty.book_set.add(dive)
|
|
||||||
self.fail("Shouldn't be able to assign across databases")
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Set a m2m with an object from a different database
|
|
||||||
try:
|
|
||||||
marty.book_set = [pro, dive]
|
|
||||||
self.fail("Shouldn't be able to assign across databases")
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Add to a reverse m2m with an object from a different database
|
|
||||||
try:
|
|
||||||
dive.authors.add(marty)
|
|
||||||
self.fail("Shouldn't be able to assign across databases")
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Set a reverse m2m with an object from a different database
|
|
||||||
try:
|
|
||||||
dive.authors = [mark, marty]
|
|
||||||
self.fail("Shouldn't be able to assign across databases")
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def test_foreign_key_cross_database_protection(self):
|
def test_foreign_key_cross_database_protection(self):
|
||||||
"Operations that involve sharing FK objects across databases raise an error"
|
"Operations that involve sharing FK objects across databases raise an error"
|
||||||
# Create a book and author on the default database
|
# Create a book and author on the default database
|
||||||
@ -469,6 +468,128 @@ class QueryTestCase(TestCase):
|
|||||||
self.assertEquals(list(Author.objects.using('other').values_list('name',flat=True)),
|
self.assertEquals(list(Author.objects.using('other').values_list('name',flat=True)),
|
||||||
[u'Jane Brown', u'John Smith', u'Mark Pilgrim'])
|
[u'Jane Brown', u'John Smith', u'Mark Pilgrim'])
|
||||||
|
|
||||||
|
def test_generic_key_separation(self):
|
||||||
|
"Generic fields are constrained to a single database"
|
||||||
|
# Create a book and author on the default database
|
||||||
|
pro = Book.objects.create(title="Pro Django",
|
||||||
|
published=datetime.date(2008, 12, 16))
|
||||||
|
|
||||||
|
review1 = Review.objects.create(source="Python Monthly", content_object=pro)
|
||||||
|
|
||||||
|
# Create a book and author on the other database
|
||||||
|
dive = Book.objects.using('other').create(title="Dive into Python",
|
||||||
|
published=datetime.date(2009, 5, 4))
|
||||||
|
|
||||||
|
review2 = Review.objects.using('other').create(source="Python Weekly", content_object=dive)
|
||||||
|
|
||||||
|
review1 = Review.objects.using('default').get(source="Python Monthly")
|
||||||
|
self.assertEquals(review1.content_object.title, "Pro Django")
|
||||||
|
|
||||||
|
review2 = Review.objects.using('other').get(source="Python Weekly")
|
||||||
|
self.assertEquals(review2.content_object.title, "Dive into Python")
|
||||||
|
|
||||||
|
# Reget the objects to clear caches
|
||||||
|
dive = Book.objects.using('other').get(title="Dive into Python")
|
||||||
|
|
||||||
|
# Retrive related object by descriptor. Related objects should be database-bound
|
||||||
|
self.assertEquals(list(dive.reviews.all().values_list('source', flat=True)),
|
||||||
|
[u'Python Weekly'])
|
||||||
|
|
||||||
|
def test_generic_key_reverse_operations(self):
|
||||||
|
"Generic reverse manipulations are all constrained to a single DB"
|
||||||
|
dive = Book.objects.using('other').create(title="Dive into Python",
|
||||||
|
published=datetime.date(2009, 5, 4))
|
||||||
|
|
||||||
|
temp = Book.objects.using('other').create(title="Temp",
|
||||||
|
published=datetime.date(2009, 5, 4))
|
||||||
|
|
||||||
|
review1 = Review.objects.using('other').create(source="Python Weekly", content_object=dive)
|
||||||
|
review2 = Review.objects.using('other').create(source="Python Monthly", content_object=temp)
|
||||||
|
|
||||||
|
self.assertEquals(list(Review.objects.using('default').filter(object_id=dive.pk).values_list('source', flat=True)),
|
||||||
|
[])
|
||||||
|
self.assertEquals(list(Review.objects.using('other').filter(object_id=dive.pk).values_list('source', flat=True)),
|
||||||
|
[u'Python Weekly'])
|
||||||
|
|
||||||
|
# Add a second review
|
||||||
|
dive.reviews.add(review2)
|
||||||
|
self.assertEquals(list(Review.objects.using('default').filter(object_id=dive.pk).values_list('source', flat=True)),
|
||||||
|
[])
|
||||||
|
self.assertEquals(list(Review.objects.using('other').filter(object_id=dive.pk).values_list('source', flat=True)),
|
||||||
|
[u'Python Monthly', u'Python Weekly'])
|
||||||
|
|
||||||
|
# Remove the second author
|
||||||
|
dive.reviews.remove(review1)
|
||||||
|
self.assertEquals(list(Review.objects.using('default').filter(object_id=dive.pk).values_list('source', flat=True)),
|
||||||
|
[])
|
||||||
|
self.assertEquals(list(Review.objects.using('other').filter(object_id=dive.pk).values_list('source', flat=True)),
|
||||||
|
[u'Python Monthly'])
|
||||||
|
|
||||||
|
# Clear all reviews
|
||||||
|
dive.reviews.clear()
|
||||||
|
self.assertEquals(list(Review.objects.using('default').filter(object_id=dive.pk).values_list('source', flat=True)),
|
||||||
|
[])
|
||||||
|
self.assertEquals(list(Review.objects.using('other').filter(object_id=dive.pk).values_list('source', flat=True)),
|
||||||
|
[])
|
||||||
|
|
||||||
|
# Create an author through the generic interface
|
||||||
|
dive.reviews.create(source='Python Daily')
|
||||||
|
self.assertEquals(list(Review.objects.using('default').filter(object_id=dive.pk).values_list('source', flat=True)),
|
||||||
|
[])
|
||||||
|
self.assertEquals(list(Review.objects.using('other').filter(object_id=dive.pk).values_list('source', flat=True)),
|
||||||
|
[u'Python Daily'])
|
||||||
|
|
||||||
|
def test_generic_key_cross_database_protection(self):
|
||||||
|
"Operations that involve sharing FK objects across databases raise an error"
|
||||||
|
# Create a book and author on the default database
|
||||||
|
pro = Book.objects.create(title="Pro Django",
|
||||||
|
published=datetime.date(2008, 12, 16))
|
||||||
|
|
||||||
|
review1 = Review.objects.create(source="Python Monthly", content_object=pro)
|
||||||
|
|
||||||
|
# Create a book and author on the other database
|
||||||
|
dive = Book.objects.using('other').create(title="Dive into Python",
|
||||||
|
published=datetime.date(2009, 5, 4))
|
||||||
|
|
||||||
|
review2 = Review.objects.using('other').create(source="Python Weekly", content_object=dive)
|
||||||
|
|
||||||
|
# Set a foreign key with an object from a different database
|
||||||
|
try:
|
||||||
|
review1.content_object = dive
|
||||||
|
self.fail("Shouldn't be able to assign across databases")
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Add to a foreign key set with an object from a different database
|
||||||
|
try:
|
||||||
|
dive.reviews.add(review1)
|
||||||
|
self.fail("Shouldn't be able to assign across databases")
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# BUT! if you assign a FK object when the base object hasn't
|
||||||
|
# been saved yet, you implicitly assign the database for the
|
||||||
|
# base object.
|
||||||
|
review3 = Review(source="Python Daily")
|
||||||
|
# initially, no db assigned
|
||||||
|
self.assertEquals(review3._state.db, None)
|
||||||
|
|
||||||
|
# Dive comes from 'other', so review3 is set to use 'other'...
|
||||||
|
review3.content_object = dive
|
||||||
|
self.assertEquals(review3._state.db, 'other')
|
||||||
|
# ... but it isn't saved yet
|
||||||
|
self.assertEquals(list(Review.objects.using('default').filter(object_id=pro.pk).values_list('source', flat=True)),
|
||||||
|
[u'Python Monthly'])
|
||||||
|
self.assertEquals(list(Review.objects.using('other').filter(object_id=dive.pk).values_list('source',flat=True)),
|
||||||
|
[u'Python Weekly'])
|
||||||
|
|
||||||
|
# When saved, John goes to 'other'
|
||||||
|
review3.save()
|
||||||
|
self.assertEquals(list(Review.objects.using('default').filter(object_id=pro.pk).values_list('source', flat=True)),
|
||||||
|
[u'Python Monthly'])
|
||||||
|
self.assertEquals(list(Review.objects.using('other').filter(object_id=dive.pk).values_list('source',flat=True)),
|
||||||
|
[u'Python Daily', u'Python Weekly'])
|
||||||
|
|
||||||
|
|
||||||
class FixtureTestCase(TestCase):
|
class FixtureTestCase(TestCase):
|
||||||
multi_db = True
|
multi_db = True
|
||||||
|
Loading…
x
Reference in New Issue
Block a user