mirror of
https://github.com/django/django.git
synced 2025-10-24 06:06:09 +00:00
Fixed #7732 -- Added support for connection pools on Oracle.
This commit is contained in:
@@ -1,12 +1,28 @@
|
||||
import copy
|
||||
import unittest
|
||||
from unittest import mock
|
||||
|
||||
from django.db import DatabaseError, NotSupportedError, connection
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
from django.db import DatabaseError, NotSupportedError, ProgrammingError, connection
|
||||
from django.db.models import BooleanField
|
||||
from django.test import TestCase, TransactionTestCase
|
||||
|
||||
from ..models import Square, VeryLongModelNameZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ
|
||||
|
||||
try:
|
||||
from django.db.backends.oracle.oracledb_any import is_oracledb
|
||||
except ImportError:
|
||||
is_oracledb = False
|
||||
|
||||
|
||||
def no_pool_connection(alias=None):
|
||||
new_connection = connection.copy(alias)
|
||||
new_connection.settings_dict = copy.deepcopy(connection.settings_dict)
|
||||
# Ensure that the second connection circumvents the pool, this is kind
|
||||
# of a hack, but we cannot easily change the pool connections.
|
||||
new_connection.settings_dict["OPTIONS"]["pool"] = False
|
||||
return new_connection
|
||||
|
||||
|
||||
@unittest.skipUnless(connection.vendor == "oracle", "Oracle tests")
|
||||
class Tests(TestCase):
|
||||
@@ -69,6 +85,76 @@ class Tests(TestCase):
|
||||
connection.check_database_version_supported()
|
||||
self.assertTrue(mocked_get_database_version.called)
|
||||
|
||||
@unittest.skipUnless(is_oracledb, "Pool specific tests")
|
||||
def test_pool_set_to_true(self):
|
||||
new_connection = no_pool_connection(alias="default_pool")
|
||||
new_connection.settings_dict["OPTIONS"]["pool"] = True
|
||||
try:
|
||||
self.assertIsNotNone(new_connection.pool)
|
||||
finally:
|
||||
new_connection.close_pool()
|
||||
|
||||
@unittest.skipUnless(is_oracledb, "Pool specific tests")
|
||||
def test_pool_reuse(self):
|
||||
new_connection = no_pool_connection(alias="default_pool")
|
||||
new_connection.settings_dict["OPTIONS"]["pool"] = {
|
||||
"min": 0,
|
||||
"max": 2,
|
||||
}
|
||||
self.assertIsNotNone(new_connection.pool)
|
||||
|
||||
connections = []
|
||||
|
||||
def get_connection():
|
||||
# copy() reuses the existing alias and as such the same pool.
|
||||
conn = new_connection.copy()
|
||||
conn.connect()
|
||||
connections.append(conn)
|
||||
return conn
|
||||
|
||||
try:
|
||||
connection_1 = get_connection() # First connection.
|
||||
get_connection() # Get the second connection.
|
||||
sql = "select sys_context('userenv', 'sid') from dual"
|
||||
sids = [conn.cursor().execute(sql).fetchone()[0] for conn in connections]
|
||||
connection_1.close() # Release back to the pool.
|
||||
connection_3 = get_connection()
|
||||
sid = connection_3.cursor().execute(sql).fetchone()[0]
|
||||
# Reuses the first connection as it is available.
|
||||
self.assertEqual(sid, sids[0])
|
||||
finally:
|
||||
# Release all connections back to the pool.
|
||||
for conn in connections:
|
||||
conn.close()
|
||||
new_connection.close_pool()
|
||||
|
||||
@unittest.skipUnless(is_oracledb, "Pool specific tests")
|
||||
def test_cannot_open_new_connection_in_atomic_block(self):
|
||||
new_connection = no_pool_connection(alias="default_pool")
|
||||
new_connection.settings_dict["OPTIONS"]["pool"] = True
|
||||
msg = "Cannot open a new connection in an atomic block."
|
||||
new_connection.in_atomic_block = True
|
||||
new_connection.closed_in_transaction = True
|
||||
with self.assertRaisesMessage(ProgrammingError, msg):
|
||||
new_connection.ensure_connection()
|
||||
|
||||
@unittest.skipUnless(is_oracledb, "Pool specific tests")
|
||||
def test_pooling_not_support_persistent_connections(self):
|
||||
new_connection = no_pool_connection(alias="default_pool")
|
||||
new_connection.settings_dict["OPTIONS"]["pool"] = True
|
||||
new_connection.settings_dict["CONN_MAX_AGE"] = 10
|
||||
msg = "Pooling doesn't support persistent connections."
|
||||
with self.assertRaisesMessage(ImproperlyConfigured, msg):
|
||||
new_connection.pool
|
||||
|
||||
@unittest.skipIf(is_oracledb, "cx_oracle specific tests")
|
||||
def test_cx_Oracle_not_support_pooling(self):
|
||||
new_connection = no_pool_connection()
|
||||
new_connection.settings_dict["OPTIONS"]["pool"] = True
|
||||
msg = "Pooling isn't supported by cx_Oracle. Use python-oracledb instead."
|
||||
with self.assertRaisesMessage(ImproperlyConfigured, msg):
|
||||
new_connection.connect()
|
||||
|
||||
|
||||
@unittest.skipUnless(connection.vendor == "oracle", "Oracle tests")
|
||||
class TransactionalTests(TransactionTestCase):
|
||||
|
||||
Reference in New Issue
Block a user