2009-01-16 22:23:58 +00:00
# -*- coding: utf-8 -*-
2009-03-29 23:15:58 +00:00
# Unit and doctests for specific database backends.
2012-06-07 18:08:47 +02:00
from __future__ import absolute_import , unicode_literals
2011-10-13 18:51:33 +00:00
2010-02-24 15:29:25 +00:00
import datetime
2011-12-16 13:40:19 +00:00
import threading
2010-06-21 11:48:45 +00:00
2011-04-22 12:14:54 +00:00
from django . conf import settings
2010-06-21 11:48:45 +00:00
from django . core . management . color import no_style
2011-10-13 18:51:33 +00:00
from django . db import ( backend , connection , connections , DEFAULT_DB_ALIAS ,
IntegrityError , transaction )
2009-03-29 23:15:58 +00:00
from django . db . backends . signals import connection_created
2011-04-02 08:44:47 +00:00
from django . db . backends . postgresql_psycopg2 import version as pg_version
2013-02-20 11:27:32 -08:00
from django . db . models import Sum , Avg , Variance , StdDev
from django . db . utils import ConnectionHandler , DatabaseError
2012-04-24 18:03:14 +00:00
from django . test import ( TestCase , skipUnlessDBFeature , skipIfDBFeature ,
TransactionTestCase )
2013-01-26 17:51:44 +01:00
from django . test . utils import override_settings , str_prefix
2012-07-20 14:48:51 +02:00
from django . utils import six
2012-07-20 18:53:11 +02:00
from django . utils . six . moves import xrange
2010-10-11 12:55:17 +00:00
from django . utils import unittest
2009-01-16 22:23:58 +00:00
2011-10-13 18:51:33 +00:00
from . import models
2010-06-21 11:48:45 +00:00
2012-10-28 23:02:41 +01:00
class DummyBackendTest ( TestCase ) :
def test_no_databases ( self ) :
"""
Test that empty DATABASES setting default to the dummy backend .
"""
DATABASES = { }
conns = ConnectionHandler ( DATABASES )
self . assertEqual ( conns [ DEFAULT_DB_ALIAS ] . settings_dict [ ' ENGINE ' ] ,
' django.db.backends.dummy ' )
2010-10-11 12:55:17 +00:00
class OracleChecks ( unittest . TestCase ) :
2009-01-16 22:23:58 +00:00
2012-10-27 04:51:14 +03:00
@unittest.skipUnless ( connection . vendor == ' oracle ' ,
" No need to check Oracle quote_name semantics " )
def test_quote_name ( self ) :
# Check that '%' chars are escaped for query execution.
name = ' " SOME % NAME " '
quoted_name = connection . ops . quote_name ( name )
2013-01-08 19:07:12 +01:00
self . assertEqual ( quoted_name % ( ) , name )
2012-10-27 04:51:14 +03:00
2010-10-11 12:55:17 +00:00
@unittest.skipUnless ( connection . vendor == ' oracle ' ,
" No need to check Oracle cursor semantics " )
2009-01-16 22:23:58 +00:00
def test_dbms_session ( self ) :
# If the backend is Oracle, test that we can call a standard
# stored procedure through our cursor wrapper.
2010-10-11 12:55:17 +00:00
convert_unicode = backend . convert_unicode
cursor = connection . cursor ( )
cursor . callproc ( convert_unicode ( ' DBMS_SESSION.SET_IDENTIFIER ' ) ,
[ convert_unicode ( ' _django_testing! ' ) , ] )
2009-12-22 15:18:51 +00:00
2010-10-11 12:55:17 +00:00
@unittest.skipUnless ( connection . vendor == ' oracle ' ,
" No need to check Oracle cursor semantics " )
2010-04-28 17:08:06 +00:00
def test_cursor_var ( self ) :
# If the backend is Oracle, test that we can pass cursor variables
# as query parameters.
2010-10-11 12:55:17 +00:00
cursor = connection . cursor ( )
var = cursor . var ( backend . Database . STRING )
cursor . execute ( " BEGIN %s := ' X ' ; END; " , [ var ] )
self . assertEqual ( var . getvalue ( ) , ' X ' )
2009-07-21 21:20:18 +00:00
2010-10-11 12:55:17 +00:00
@unittest.skipUnless ( connection . vendor == ' oracle ' ,
" No need to check Oracle cursor semantics " )
2009-07-21 21:20:18 +00:00
def test_long_string ( self ) :
# If the backend is Oracle, test that we can save a text longer
# than 4000 chars and read it properly
2010-10-11 12:55:17 +00:00
c = connection . cursor ( )
c . execute ( ' CREATE TABLE ltext ( " TEXT " NCLOB) ' )
2012-07-20 14:48:51 +02:00
long_str = ' ' . join ( [ six . text_type ( x ) for x in xrange ( 4000 ) ] )
2010-10-11 12:55:17 +00:00
c . execute ( ' INSERT INTO ltext VALUES ( %s ) ' , [ long_str ] )
c . execute ( ' SELECT text FROM ltext ' )
row = c . fetchone ( )
self . assertEqual ( long_str , row [ 0 ] . read ( ) )
c . execute ( ' DROP TABLE ltext ' )
2009-01-16 22:23:58 +00:00
2010-12-03 18:15:54 +00:00
@unittest.skipUnless ( connection . vendor == ' oracle ' ,
" No need to check Oracle connection semantics " )
def test_client_encoding ( self ) :
# If the backend is Oracle, test that the client encoding is set
# correctly. This was broken under Cygwin prior to r14781.
2012-04-24 16:47:31 +00:00
connection . cursor ( ) # Ensure the connection is initialized.
2010-12-03 18:15:54 +00:00
self . assertEqual ( connection . connection . encoding , " UTF-8 " )
self . assertEqual ( connection . connection . nencoding , " UTF-8 " )
2012-06-12 20:30:52 +10:00
@unittest.skipUnless ( connection . vendor == ' oracle ' ,
" No need to check Oracle connection semantics " )
def test_order_of_nls_parameters ( self ) :
# an 'almost right' datetime should work with configured
# NLS parameters as per #18465.
c = connection . cursor ( )
query = " select 1 from dual where ' 1936-12-29 00:00 ' < sysdate "
# Test that the query succeeds without errors - pre #18465 this
# wasn't the case.
c . execute ( query )
self . assertEqual ( c . fetchone ( ) [ 0 ] , 1 )
2012-04-24 16:47:31 +00:00
class MySQLTests ( TestCase ) :
@unittest.skipUnless ( connection . vendor == ' mysql ' ,
" Test valid only for MySQL " )
def test_autoincrement ( self ) :
"""
Check that auto_increment fields are reset correctly by sql_flush ( ) .
Before MySQL version 5.0 .13 TRUNCATE did not do auto_increment reset .
Refs #16961.
"""
statements = connection . ops . sql_flush ( no_style ( ) ,
tables = [ ' test ' ] ,
sequences = [ {
' table ' : ' test ' ,
' col ' : ' somecol ' ,
} ] )
found_reset = False
for sql in statements :
found_reset = found_reset or ' ALTER TABLE ' in sql
if connection . mysql_version < ( 5 , 0 , 13 ) :
self . assertTrue ( found_reset )
else :
self . assertFalse ( found_reset )
2010-02-24 15:29:25 +00:00
class DateQuotingTest ( TestCase ) :
def test_django_date_trunc ( self ) :
"""
Test the custom ` ` django_date_trunc method ` ` , in particular against
fields which clash with strings passed to it ( e . g . ' year ' ) - see
#12818__.
__ : http : / / code . djangoproject . com / ticket / 12818
"""
updated = datetime . datetime ( 2010 , 2 , 20 )
models . SchoolClass . objects . create ( year = 2009 , last_updated = updated )
years = models . SchoolClass . objects . dates ( ' last_updated ' , ' year ' )
2013-02-10 16:15:49 +01:00
self . assertEqual ( list ( years ) , [ datetime . date ( 2010 , 1 , 1 ) ] )
2010-02-24 15:29:25 +00:00
2013-02-10 16:15:49 +01:00
def test_django_date_extract ( self ) :
2010-02-24 15:29:25 +00:00
"""
2013-02-10 16:15:49 +01:00
Test the custom ` ` django_date_extract method ` ` , in particular against fields
2010-02-24 15:29:25 +00:00
which clash with strings passed to it ( e . g . ' day ' ) - see #12818__.
__ : http : / / code . djangoproject . com / ticket / 12818
"""
updated = datetime . datetime ( 2010 , 2 , 20 )
models . SchoolClass . objects . create ( year = 2009 , last_updated = updated )
classes = models . SchoolClass . objects . filter ( last_updated__day = 20 )
self . assertEqual ( len ( classes ) , 1 )
2011-04-22 12:14:54 +00:00
2013-01-26 17:51:44 +01:00
@override_settings ( DEBUG = True )
2012-06-13 11:36:27 +02:00
class LastExecutedQueryTest ( TestCase ) :
2013-01-26 17:51:44 +01:00
2012-06-13 13:28:09 +02:00
def test_debug_sql ( self ) :
list ( models . Tag . objects . filter ( name = " test " ) )
sql = connection . queries [ - 1 ] [ ' sql ' ] . lower ( )
2013-01-26 17:51:44 +01:00
self . assertIn ( " select " , sql )
2012-06-13 13:28:09 +02:00
self . assertIn ( models . Tag . _meta . db_table , sql )
2010-06-21 11:48:45 +00:00
2012-06-13 11:36:27 +02:00
def test_query_encoding ( self ) :
"""
Test that last_executed_query ( ) returns an Unicode string
"""
2013-01-26 17:51:44 +01:00
tags = models . Tag . objects . extra ( select = { ' föö ' : 1 } )
2012-06-13 11:36:27 +02:00
sql , params = tags . query . sql_with_params ( )
cursor = tags . query . get_compiler ( ' default ' ) . execute_sql ( None )
last_sql = cursor . db . ops . last_executed_query ( cursor , sql , params )
2012-07-20 14:48:51 +02:00
self . assertTrue ( isinstance ( last_sql , six . text_type ) )
2012-06-13 11:36:27 +02:00
2013-01-26 17:51:44 +01:00
@unittest.skipUnless ( connection . vendor == ' sqlite ' ,
" This test is specific to SQLite. " )
def test_no_interpolation_on_sqlite ( self ) :
# Regression for #17158
# This shouldn't raise an exception
query = " SELECT strftime( ' % Y ' , ' now ' ); "
connection . cursor ( ) . execute ( query )
self . assertEqual ( connection . queries [ - 1 ] [ ' sql ' ] ,
str_prefix ( " QUERY = %(_)s \" SELECT strftime( ' %% Y ' , ' now ' ); \" - PARAMS = () " ) )
2012-06-13 11:36:27 +02:00
2010-03-23 13:51:11 +00:00
class ParameterHandlingTest ( TestCase ) :
def test_bad_parameter_count ( self ) :
" An executemany call with too many/not enough parameters will raise an exception (Refs #12612) "
cursor = connection . cursor ( )
query = ( ' INSERT INTO %s ( %s , %s ) VALUES ( %% s, %% s) ' % (
connection . introspection . table_name_converter ( ' backends_square ' ) ,
connection . ops . quote_name ( ' root ' ) ,
connection . ops . quote_name ( ' square ' )
) )
self . assertRaises ( Exception , cursor . executemany , query , [ ( 1 , 2 , 3 ) , ] )
self . assertRaises ( Exception , cursor . executemany , query , [ ( 1 , ) , ] )
2010-06-21 11:48:45 +00:00
# Unfortunately, the following tests would be a good test to run on all
# backends, but it breaks MySQL hard. Until #13711 is fixed, it can't be run
# everywhere (although it would be an effective test of #13711).
2010-10-11 12:55:17 +00:00
class LongNameTest ( TestCase ) :
""" Long primary keys and model names can result in a sequence name
that exceeds the database limits , which will result in truncation
on certain databases ( e . g . , Postgres ) . The backend needs to use
the correct sequence name in last_insert_id and other places , so
check it is . Refs #8901.
"""
@skipUnlessDBFeature ( ' supports_long_model_names ' )
def test_sequence_name_length_limits_create ( self ) :
""" Test creation of model with long name and long pk name doesn ' t error. Ref #8901 """
models . VeryLongModelNameZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ . objects . create ( )
@skipUnlessDBFeature ( ' supports_long_model_names ' )
def test_sequence_name_length_limits_m2m ( self ) :
""" Test an m2m save of a model with a long name and a long m2m field name doesn ' t error as on Django >=1.2 this now uses object saves. Ref #8901 """
obj = models . VeryLongModelNameZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ . objects . create ( )
rel_obj = models . Person . objects . create ( first_name = ' Django ' , last_name = ' Reinhardt ' )
obj . m2m_also_quite_long_zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz . add ( rel_obj )
@skipUnlessDBFeature ( ' supports_long_model_names ' )
def test_sequence_name_length_limits_flush ( self ) :
""" Test that sequence resetting as part of a flush with model with long name and long pk name doesn ' t error. Ref #8901 """
# A full flush is expensive to the full test, so we dig into the
# internals to generate the likely offending SQL and run it manually
# Some convenience aliases
VLM = models . VeryLongModelNameZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ
VLM_m2m = VLM . m2m_also_quite_long_zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz . through
tables = [
VLM . _meta . db_table ,
VLM_m2m . _meta . db_table ,
]
sequences = [
{
' column ' : VLM . _meta . pk . column ,
' table ' : VLM . _meta . db_table
} ,
]
cursor = connection . cursor ( )
for statement in connection . ops . sql_flush ( no_style ( ) , tables , sequences ) :
cursor . execute ( statement )
2010-06-21 11:48:45 +00:00
2010-07-30 02:42:36 +00:00
class SequenceResetTest ( TestCase ) :
def test_generic_relation ( self ) :
" Sequence names are correct when resetting generic relations (Ref #13941) "
# Create an object with a manually specified PK
models . Post . objects . create ( id = 10 , name = ' 1st post ' , text = ' hello world ' )
# Reset the sequences for the database
cursor = connection . cursor ( )
commands = connections [ DEFAULT_DB_ALIAS ] . ops . sequence_reset_sql ( no_style ( ) , [ models . Post ] )
for sql in commands :
cursor . execute ( sql )
# If we create a new object now, it should have a PK greater
# than the PK we specified manually.
obj = models . Post . objects . create ( name = ' New post ' , text = ' goodbye world ' )
self . assertTrue ( obj . pk > 10 )
2010-08-30 13:21:18 +00:00
class PostgresVersionTest ( TestCase ) :
def assert_parses ( self , version_string , version ) :
self . assertEqual ( pg_version . _parse_version ( version_string ) , version )
2010-03-23 13:51:11 +00:00
2010-08-30 13:21:18 +00:00
def test_parsing ( self ) :
2011-06-19 18:00:09 +00:00
""" Test PostgreSQL version parsing from `SELECT version()` output """
self . assert_parses ( " PostgreSQL 8.3 beta4 " , 80300 )
self . assert_parses ( " PostgreSQL 8.3 " , 80300 )
self . assert_parses ( " EnterpriseDB 8.3 " , 80300 )
self . assert_parses ( " PostgreSQL 8.3.6 " , 80306 )
self . assert_parses ( " PostgreSQL 8.4beta1 " , 80400 )
self . assert_parses ( " PostgreSQL 8.3.1 on i386-apple-darwin9.2.2, compiled by GCC i686-apple-darwin9-gcc-4.0.1 (GCC) 4.0.1 (Apple Inc. build 5478) " , 80301 )
def test_version_detection ( self ) :
""" Test PostgreSQL version detection """
# Helper mocks
class CursorMock ( object ) :
" Very simple mock of DB-API cursor "
def execute ( self , arg ) :
pass
def fetchone ( self ) :
return [ " PostgreSQL 8.3 " ]
class OlderConnectionMock ( object ) :
" Mock of psycopg2 (< 2.0.12) connection "
def cursor ( self ) :
return CursorMock ( )
# psycopg2 < 2.0.12 code path
conn = OlderConnectionMock ( )
self . assertEqual ( pg_version . get_version ( conn ) , 80300 )
2009-03-29 23:15:58 +00:00
2011-11-20 16:12:33 +00:00
class PostgresNewConnectionTest ( TestCase ) :
"""
#17062: PostgreSQL shouldn't roll back SET TIME ZONE, even if the first
transaction is rolled back .
"""
2011-11-20 20:43:59 +00:00
@unittest.skipUnless (
connection . vendor == ' postgresql ' and connection . isolation_level > 0 ,
" This test applies only to PostgreSQL without autocommit " )
2011-11-20 16:12:33 +00:00
def test_connect_and_rollback ( self ) :
new_connections = ConnectionHandler ( settings . DATABASES )
new_connection = new_connections [ DEFAULT_DB_ALIAS ]
try :
# Ensure the database default time zone is different than
# the time zone in new_connection.settings_dict. We can
# get the default time zone by reset & show.
cursor = new_connection . cursor ( )
cursor . execute ( " RESET TIMEZONE " )
cursor . execute ( " SHOW TIMEZONE " )
db_default_tz = cursor . fetchone ( ) [ 0 ]
new_tz = ' Europe/Paris ' if db_default_tz == ' UTC ' else ' UTC '
new_connection . close ( )
# Fetch a new connection with the new_tz as default
# time zone, run a query and rollback.
new_connection . settings_dict [ ' TIME_ZONE ' ] = new_tz
new_connection . enter_transaction_management ( )
cursor = new_connection . cursor ( )
new_connection . rollback ( )
# Now let's see if the rollback rolled back the SET TIME ZONE.
cursor . execute ( " SHOW TIMEZONE " )
tz = cursor . fetchone ( ) [ 0 ]
self . assertEqual ( new_tz , tz )
finally :
try :
new_connection . close ( )
except DatabaseError :
pass
2009-03-29 23:15:58 +00:00
# Unfortunately with sqlite3 the in-memory test database cannot be
# closed, and so it cannot be re-opened during testing, and so we
# sadly disable this test for now.
2010-10-11 12:55:17 +00:00
class ConnectionCreatedSignalTest ( TestCase ) :
@skipUnlessDBFeature ( ' test_db_allows_multiple_connections ' )
def test_signal ( self ) :
data = { }
def receiver ( sender , connection , * * kwargs ) :
data [ " connection " ] = connection
connection_created . connect ( receiver )
connection . close ( )
cursor = connection . cursor ( )
2011-12-16 13:40:19 +00:00
self . assertTrue ( data [ " connection " ] . connection is connection . connection )
2010-08-30 13:21:18 +00:00
2010-10-11 12:55:17 +00:00
connection_created . disconnect ( receiver )
data . clear ( )
cursor = connection . cursor ( )
self . assertTrue ( data == { } )
2010-09-27 15:12:48 +00:00
2011-05-10 12:20:47 +00:00
class EscapingChecks ( TestCase ) :
@unittest.skipUnless ( connection . vendor == ' sqlite ' ,
" This is a sqlite-specific issue " )
def test_parameter_escaping ( self ) :
#13648: '%s' escaping support for sqlite3
cursor = connection . cursor ( )
response = cursor . execute (
" select strftime( ' %% s ' , date( ' now ' )) " ) . fetchall ( ) [ 0 ] [ 0 ]
self . assertNotEqual ( response , None )
# response should be an non-zero integer
self . assertTrue ( int ( response ) )
2013-01-11 13:57:54 +08:00
class SqlliteAggregationTests ( TestCase ) :
"""
#19360: Raise NotImplementedError when aggregating on date/time fields.
"""
@unittest.skipUnless ( connection . vendor == ' sqlite ' ,
" No need to check SQLite aggregation semantics " )
def test_aggregation ( self ) :
for aggregate in ( Sum , Avg , Variance , StdDev ) :
self . assertRaises ( NotImplementedError ,
models . Item . objects . all ( ) . aggregate , aggregate ( ' time ' ) )
self . assertRaises ( NotImplementedError ,
models . Item . objects . all ( ) . aggregate , aggregate ( ' date ' ) )
self . assertRaises ( NotImplementedError ,
models . Item . objects . all ( ) . aggregate , aggregate ( ' last_modified ' ) )
2010-09-27 15:12:48 +00:00
class BackendTestCase ( TestCase ) :
2012-01-22 16:41:20 +00:00
def create_squares_with_executemany ( self , args ) :
2010-09-27 15:12:48 +00:00
cursor = connection . cursor ( )
opts = models . Square . _meta
2012-01-22 16:41:20 +00:00
tbl = connection . introspection . table_name_converter ( opts . db_table )
f1 = connection . ops . quote_name ( opts . get_field ( ' root ' ) . column )
f2 = connection . ops . quote_name ( opts . get_field ( ' square ' ) . column )
query = ' INSERT INTO %s ( %s , %s ) VALUES ( %% s, %% s) ' % ( tbl , f1 , f2 )
cursor . executemany ( query , args )
def test_cursor_executemany ( self ) :
#4896: Test cursor.executemany
args = [ ( i , i * * 2 ) for i in range ( - 5 , 6 ) ]
self . create_squares_with_executemany ( args )
2010-09-27 15:12:48 +00:00
self . assertEqual ( models . Square . objects . count ( ) , 11 )
for i in range ( - 5 , 6 ) :
square = models . Square . objects . get ( root = i )
self . assertEqual ( square . square , i * * 2 )
2012-01-22 16:41:20 +00:00
def test_cursor_executemany_with_empty_params_list ( self ) :
2010-09-27 15:12:48 +00:00
#4765: executemany with params=[] does nothing
2012-01-22 16:41:20 +00:00
args = [ ]
self . create_squares_with_executemany ( args )
self . assertEqual ( models . Square . objects . count ( ) , 0 )
def test_cursor_executemany_with_iterator ( self ) :
#10320: executemany accepts iterators
args = iter ( ( i , i * * 2 ) for i in range ( - 3 , 2 ) )
self . create_squares_with_executemany ( args )
self . assertEqual ( models . Square . objects . count ( ) , 5 )
args = iter ( ( i , i * * 2 ) for i in range ( 3 , 7 ) )
with override_settings ( DEBUG = True ) :
# same test for DebugCursorWrapper
self . create_squares_with_executemany ( args )
self . assertEqual ( models . Square . objects . count ( ) , 9 )
2010-09-27 15:12:48 +00:00
def test_unicode_fetches ( self ) :
#6254: fetchone, fetchmany, fetchall return strings as unicode objects
qn = connection . ops . quote_name
models . Person ( first_name = " John " , last_name = " Doe " ) . save ( )
models . Person ( first_name = " Jane " , last_name = " Doe " ) . save ( )
models . Person ( first_name = " Mary " , last_name = " Agnelline " ) . save ( )
models . Person ( first_name = " Peter " , last_name = " Parker " ) . save ( )
models . Person ( first_name = " Clark " , last_name = " Kent " ) . save ( )
opts2 = models . Person . _meta
f3 , f4 = opts2 . get_field ( ' first_name ' ) , opts2 . get_field ( ' last_name ' )
query2 = ( ' SELECT %s , %s FROM %s ORDER BY %s '
% ( qn ( f3 . column ) , qn ( f4 . column ) , connection . introspection . table_name_converter ( opts2 . db_table ) ,
qn ( f3 . column ) ) )
cursor = connection . cursor ( )
cursor . execute ( query2 )
2012-06-07 18:08:47 +02:00
self . assertEqual ( cursor . fetchone ( ) , ( ' Clark ' , ' Kent ' ) )
self . assertEqual ( list ( cursor . fetchmany ( 2 ) ) , [ ( ' Jane ' , ' Doe ' ) , ( ' John ' , ' Doe ' ) ] )
self . assertEqual ( list ( cursor . fetchall ( ) ) , [ ( ' Mary ' , ' Agnelline ' ) , ( ' Peter ' , ' Parker ' ) ] )
2010-10-11 12:55:17 +00:00
2012-09-12 10:16:49 +02:00
def test_unicode_password ( self ) :
old_password = connection . settings_dict [ ' PASSWORD ' ]
connection . settings_dict [ ' PASSWORD ' ] = " françois "
try :
cursor = connection . cursor ( )
except backend . Database . DatabaseError :
# As password is probably wrong, a database exception is expected
pass
except Exception as e :
self . fail ( " Unexpected error raised with unicode password: %s " % e )
finally :
connection . settings_dict [ ' PASSWORD ' ] = old_password
2011-04-05 00:19:17 +00:00
def test_database_operations_helper_class ( self ) :
# Ticket #13630
self . assertTrue ( hasattr ( connection , ' ops ' ) )
self . assertTrue ( hasattr ( connection . ops , ' connection ' ) )
self . assertEqual ( connection , connection . ops . connection )
2012-06-09 15:59:52 +02:00
def test_cached_db_features ( self ) :
2012-05-18 12:08:36 +02:00
self . assertIn ( connection . features . supports_transactions , ( True , False ) )
self . assertIn ( connection . features . supports_stddev , ( True , False ) )
self . assertIn ( connection . features . can_introspect_foreign_keys , ( True , False ) )
2012-01-07 19:53:20 +00:00
def test_duplicate_table_error ( self ) :
""" Test that creating an existing table returns a DatabaseError """
cursor = connection . cursor ( )
query = ' CREATE TABLE %s (id INTEGER); ' % models . Article . _meta . db_table
with self . assertRaises ( DatabaseError ) :
cursor . execute ( query )
2010-10-23 00:01:22 +00:00
# We don't make these tests conditional because that means we would need to
# check and differentiate between:
# * MySQL+InnoDB, MySQL+MYISAM (something we currently can't do).
# * if sqlite3 (if/once we get #14204 fixed) has referential integrity turned
# on or not, something that would be controlled by runtime support and user
# preference.
# verify if its type is django.database.db.IntegrityError.
class FkConstraintsTests ( TransactionTestCase ) :
def setUp ( self ) :
# Create a Reporter.
self . r = models . Reporter . objects . create ( first_name = ' John ' , last_name = ' Smith ' )
def test_integrity_checks_on_creation ( self ) :
"""
Try to create a model instance that violates a FK constraint . If it
fails it should fail with IntegrityError .
"""
a = models . Article ( headline = " This is a test " , pub_date = datetime . datetime ( 2005 , 7 , 27 ) , reporter_id = 30 )
try :
a . save ( )
except IntegrityError :
2011-08-07 00:43:26 +00:00
return
self . skipTest ( " This backend does not support integrity checks. " )
2010-10-23 00:01:22 +00:00
def test_integrity_checks_on_update ( self ) :
"""
Try to update a model instance introducing a FK constraint violation .
If it fails it should fail with IntegrityError .
"""
# Create an Article.
models . Article . objects . create ( headline = " Test article " , pub_date = datetime . datetime ( 2010 , 9 , 4 ) , reporter = self . r )
# Retrive it from the DB
a = models . Article . objects . get ( headline = " Test article " )
a . reporter_id = 30
try :
a . save ( )
except IntegrityError :
2011-08-07 00:43:26 +00:00
return
self . skipTest ( " This backend does not support integrity checks. " )
def test_disable_constraint_checks_manually ( self ) :
"""
When constraint checks are disabled , should be able to write bad data without IntegrityErrors .
"""
with transaction . commit_manually ( ) :
# Create an Article.
models . Article . objects . create ( headline = " Test article " , pub_date = datetime . datetime ( 2010 , 9 , 4 ) , reporter = self . r )
# Retrive it from the DB
a = models . Article . objects . get ( headline = " Test article " )
a . reporter_id = 30
try :
connection . disable_constraint_checking ( )
a . save ( )
connection . enable_constraint_checking ( )
except IntegrityError :
self . fail ( " IntegrityError should not have occurred. " )
finally :
transaction . rollback ( )
def test_disable_constraint_checks_context_manager ( self ) :
"""
When constraint checks are disabled ( using context manager ) , should be able to write bad data without IntegrityErrors .
"""
with transaction . commit_manually ( ) :
# Create an Article.
models . Article . objects . create ( headline = " Test article " , pub_date = datetime . datetime ( 2010 , 9 , 4 ) , reporter = self . r )
# Retrive it from the DB
a = models . Article . objects . get ( headline = " Test article " )
a . reporter_id = 30
try :
with connection . constraint_checks_disabled ( ) :
a . save ( )
except IntegrityError :
self . fail ( " IntegrityError should not have occurred. " )
finally :
transaction . rollback ( )
def test_check_constraints ( self ) :
"""
Constraint checks should raise an IntegrityError when bad data is in the DB .
"""
with transaction . commit_manually ( ) :
# Create an Article.
models . Article . objects . create ( headline = " Test article " , pub_date = datetime . datetime ( 2010 , 9 , 4 ) , reporter = self . r )
# Retrive it from the DB
a = models . Article . objects . get ( headline = " Test article " )
a . reporter_id = 30
try :
with connection . constraint_checks_disabled ( ) :
a . save ( )
with self . assertRaises ( IntegrityError ) :
connection . check_constraints ( )
finally :
transaction . rollback ( )
2011-12-16 13:40:19 +00:00
class ThreadTests ( TestCase ) :
def test_default_connection_thread_local ( self ) :
"""
Ensure that the default connection ( i . e . django . db . connection ) is
different for each thread .
Refs #17258.
"""
connections_set = set ( )
connection . cursor ( )
2012-11-26 22:52:44 +02:00
connections_set . add ( connection )
2011-12-16 13:40:19 +00:00
def runner ( ) :
2012-11-26 22:52:44 +02:00
# Passing django.db.connection between threads doesn't work while
# connections[DEFAULT_DB_ALIAS] does.
from django . db import connections
connection = connections [ DEFAULT_DB_ALIAS ]
2012-12-16 23:00:11 +02:00
# Allow thread sharing so the connection can be closed by the
# main thread.
connection . allow_thread_sharing = True
2011-12-16 13:40:19 +00:00
connection . cursor ( )
2012-11-26 22:52:44 +02:00
connections_set . add ( connection )
2012-07-20 18:53:11 +02:00
for x in range ( 2 ) :
2011-12-16 13:40:19 +00:00
t = threading . Thread ( target = runner )
t . start ( )
t . join ( )
2012-11-26 22:52:44 +02:00
# Check that each created connection got different inner connection.
self . assertEqual (
len ( set ( [ conn . connection for conn in connections_set ] ) ) ,
3 )
2011-12-16 13:40:19 +00:00
# Finish by closing the connections opened by the other threads (the
# connection opened in the main thread will automatically be closed on
# teardown).
for conn in connections_set :
2012-12-16 23:00:11 +02:00
if conn is not connection :
2011-12-16 13:40:19 +00:00
conn . close ( )
def test_connections_thread_local ( self ) :
"""
Ensure that the connections are different for each thread .
Refs #17258.
"""
connections_set = set ( )
for conn in connections . all ( ) :
connections_set . add ( conn )
def runner ( ) :
from django . db import connections
for conn in connections . all ( ) :
2011-12-16 17:02:41 +00:00
# Allow thread sharing so the connection can be closed by the
# main thread.
conn . allow_thread_sharing = True
2011-12-16 13:40:19 +00:00
connections_set . add ( conn )
2012-07-20 18:53:11 +02:00
for x in range ( 2 ) :
2011-12-16 13:40:19 +00:00
t = threading . Thread ( target = runner )
t . start ( )
t . join ( )
2012-05-03 16:39:16 +02:00
self . assertEqual ( len ( connections_set ) , 6 )
2011-12-16 13:40:19 +00:00
# Finish by closing the connections opened by the other threads (the
# connection opened in the main thread will automatically be closed on
# teardown).
for conn in connections_set :
2012-12-16 23:00:11 +02:00
if conn is not connection :
2011-12-16 13:40:19 +00:00
conn . close ( )
def test_pass_connection_between_threads ( self ) :
"""
Ensure that a connection can be passed from one thread to the other .
Refs #17258.
"""
models . Person . objects . create ( first_name = " John " , last_name = " Doe " )
def do_thread ( ) :
def runner ( main_thread_connection ) :
from django . db import connections
connections [ ' default ' ] = main_thread_connection
try :
models . Person . objects . get ( first_name = " John " , last_name = " Doe " )
2012-08-18 10:59:10 +02:00
except Exception as e :
2011-12-16 13:40:19 +00:00
exceptions . append ( e )
t = threading . Thread ( target = runner , args = [ connections [ ' default ' ] ] )
t . start ( )
t . join ( )
# Without touching allow_thread_sharing, which should be False by default.
exceptions = [ ]
do_thread ( )
# Forbidden!
2012-08-18 10:59:10 +02:00
self . assertIsInstance ( exceptions [ 0 ] , DatabaseError )
2011-12-16 13:40:19 +00:00
# If explicitly setting allow_thread_sharing to False
connections [ ' default ' ] . allow_thread_sharing = False
exceptions = [ ]
do_thread ( )
# Forbidden!
2012-08-18 10:59:10 +02:00
self . assertIsInstance ( exceptions [ 0 ] , DatabaseError )
2011-12-16 13:40:19 +00:00
# If explicitly setting allow_thread_sharing to True
connections [ ' default ' ] . allow_thread_sharing = True
exceptions = [ ]
do_thread ( )
# All good
2012-08-18 10:59:10 +02:00
self . assertEqual ( exceptions , [ ] )
2011-12-16 17:02:41 +00:00
def test_closing_non_shared_connections ( self ) :
"""
Ensure that a connection that is not explicitly shareable cannot be
closed by another thread .
Refs #17258.
"""
# First, without explicitly enabling the connection for sharing.
exceptions = set ( )
def runner1 ( ) :
def runner2 ( other_thread_connection ) :
try :
other_thread_connection . close ( )
2012-04-28 18:09:37 +02:00
except DatabaseError as e :
2011-12-16 17:02:41 +00:00
exceptions . add ( e )
t2 = threading . Thread ( target = runner2 , args = [ connections [ ' default ' ] ] )
t2 . start ( )
t2 . join ( )
t1 = threading . Thread ( target = runner1 )
t1 . start ( )
t1 . join ( )
# The exception was raised
self . assertEqual ( len ( exceptions ) , 1 )
# Then, with explicitly enabling the connection for sharing.
exceptions = set ( )
def runner1 ( ) :
def runner2 ( other_thread_connection ) :
try :
other_thread_connection . close ( )
2012-04-28 18:09:37 +02:00
except DatabaseError as e :
2011-12-16 17:02:41 +00:00
exceptions . add ( e )
# Enable thread sharing
connections [ ' default ' ] . allow_thread_sharing = True
t2 = threading . Thread ( target = runner2 , args = [ connections [ ' default ' ] ] )
t2 . start ( )
t2 . join ( )
t1 = threading . Thread ( target = runner1 )
t1 . start ( )
t1 . join ( )
# No exception was raised
2011-12-25 13:24:39 +00:00
self . assertEqual ( len ( exceptions ) , 0 )
2012-04-24 18:03:14 +00:00
class MySQLPKZeroTests ( TestCase ) :
"""
Zero as id for AutoField should raise exception in MySQL , because MySQL
does not allow zero for automatic primary key .
"""
@skipIfDBFeature ( ' allows_primary_key_0 ' )
def test_zero_as_autoval ( self ) :
with self . assertRaises ( ValueError ) :
models . Square . objects . create ( id = 0 , root = 0 , square = 1 )
2013-02-20 11:27:32 -08:00
class DBConstraintTestCase ( TransactionTestCase ) :
def test_can_reference_existant ( self ) :
obj = models . Object . objects . create ( )
ref = models . ObjectReference . objects . create ( obj = obj )
self . assertEqual ( ref . obj , obj )
ref = models . ObjectReference . objects . get ( obj = obj )
self . assertEqual ( ref . obj , obj )
def test_can_reference_non_existant ( self ) :
self . assertFalse ( models . Object . objects . filter ( id = 12345 ) . exists ( ) )
ref = models . ObjectReference . objects . create ( obj_id = 12345 )
ref_new = models . ObjectReference . objects . get ( obj_id = 12345 )
self . assertEqual ( ref , ref_new )
with self . assertRaises ( models . Object . DoesNotExist ) :
ref . obj