1
0
mirror of https://github.com/django/django.git synced 2024-12-22 17:16:24 +00:00

Fixed closing connections in test_utils.tests.AllowedDatabaseQueriesTests.

This commit is contained in:
Mariusz Felisiak 2024-02-19 14:08:30 +01:00 committed by GitHub
parent f65f8ab84e
commit 9350308f37
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -2132,12 +2132,34 @@ class AllowedDatabaseQueriesTests(SimpleTestCase):
next(Car.objects.iterator(), None)
def test_allowed_threaded_database_queries(self):
connections_dict = {}
def thread_func():
# Passing django.db.connection between threads doesn't work while
# connections[DEFAULT_DB_ALIAS] does.
from django.db import connections
connection = connections["default"]
next(Car.objects.iterator(), None)
t = threading.Thread(target=thread_func)
t.start()
t.join()
# Allow thread sharing so the connection can be closed by the main
# thread.
connection.inc_thread_sharing()
connections_dict[id(connection)] = connection
try:
t = threading.Thread(target=thread_func)
t.start()
t.join()
finally:
# Finish by closing the connections opened by the other threads
# (the connection opened in the main thread will automatically be
# closed on teardown).
for conn in connections_dict.values():
if conn is not connection and conn.allow_thread_sharing:
conn.close()
conn.dec_thread_sharing()
class DatabaseAliasTests(SimpleTestCase):