mirror of
https://github.com/django/django.git
synced 2025-10-24 06:06:09 +00:00
Ensure cursors are closed when no longer needed.
This commit touchs various parts of the code base and test framework. Any found usage of opening a cursor for the sake of initializing a connection has been replaced with 'ensure_connection()'.
This commit is contained in:
@@ -37,38 +37,38 @@ class SchemaTests(TransactionTestCase):
|
||||
|
||||
def delete_tables(self):
|
||||
"Deletes all model tables for our models for a clean test environment"
|
||||
cursor = connection.cursor()
|
||||
connection.disable_constraint_checking()
|
||||
table_names = connection.introspection.table_names(cursor)
|
||||
for model in self.models:
|
||||
# Remove any M2M tables first
|
||||
for field in model._meta.local_many_to_many:
|
||||
with connection.cursor() as cursor:
|
||||
connection.disable_constraint_checking()
|
||||
table_names = connection.introspection.table_names(cursor)
|
||||
for model in self.models:
|
||||
# Remove any M2M tables first
|
||||
for field in model._meta.local_many_to_many:
|
||||
with atomic():
|
||||
tbl = field.rel.through._meta.db_table
|
||||
if tbl in table_names:
|
||||
cursor.execute(connection.schema_editor().sql_delete_table % {
|
||||
"table": connection.ops.quote_name(tbl),
|
||||
})
|
||||
table_names.remove(tbl)
|
||||
# Then remove the main tables
|
||||
with atomic():
|
||||
tbl = field.rel.through._meta.db_table
|
||||
tbl = model._meta.db_table
|
||||
if tbl in table_names:
|
||||
cursor.execute(connection.schema_editor().sql_delete_table % {
|
||||
"table": connection.ops.quote_name(tbl),
|
||||
})
|
||||
table_names.remove(tbl)
|
||||
# Then remove the main tables
|
||||
with atomic():
|
||||
tbl = model._meta.db_table
|
||||
if tbl in table_names:
|
||||
cursor.execute(connection.schema_editor().sql_delete_table % {
|
||||
"table": connection.ops.quote_name(tbl),
|
||||
})
|
||||
table_names.remove(tbl)
|
||||
connection.enable_constraint_checking()
|
||||
|
||||
def column_classes(self, model):
|
||||
cursor = connection.cursor()
|
||||
columns = dict(
|
||||
(d[0], (connection.introspection.get_field_type(d[1], d), d))
|
||||
for d in connection.introspection.get_table_description(
|
||||
cursor,
|
||||
model._meta.db_table,
|
||||
with connection.cursor() as cursor:
|
||||
columns = dict(
|
||||
(d[0], (connection.introspection.get_field_type(d[1], d), d))
|
||||
for d in connection.introspection.get_table_description(
|
||||
cursor,
|
||||
model._meta.db_table,
|
||||
)
|
||||
)
|
||||
)
|
||||
# SQLite has a different format for field_type
|
||||
for name, (type, desc) in columns.items():
|
||||
if isinstance(type, tuple):
|
||||
@@ -78,6 +78,20 @@ class SchemaTests(TransactionTestCase):
|
||||
raise DatabaseError("Table does not exist (empty pragma)")
|
||||
return columns
|
||||
|
||||
def get_indexes(self, table):
|
||||
"""
|
||||
Get the indexes on the table using a new cursor.
|
||||
"""
|
||||
with connection.cursor() as cursor:
|
||||
return connection.introspection.get_indexes(cursor, table)
|
||||
|
||||
def get_constraints(self, table):
|
||||
"""
|
||||
Get the constraints on a table using a new cursor.
|
||||
"""
|
||||
with connection.cursor() as cursor:
|
||||
return connection.introspection.get_constraints(cursor, table)
|
||||
|
||||
# Tests
|
||||
|
||||
def test_creation_deletion(self):
|
||||
@@ -127,7 +141,7 @@ class SchemaTests(TransactionTestCase):
|
||||
strict=True,
|
||||
)
|
||||
# Make sure the new FK constraint is present
|
||||
constraints = connection.introspection.get_constraints(connection.cursor(), Book._meta.db_table)
|
||||
constraints = self.get_constraints(Book._meta.db_table)
|
||||
for name, details in constraints.items():
|
||||
if details['columns'] == ["author_id"] and details['foreign_key']:
|
||||
self.assertEqual(details['foreign_key'], ('schema_tag', 'id'))
|
||||
@@ -342,7 +356,7 @@ class SchemaTests(TransactionTestCase):
|
||||
editor.create_model(TagM2MTest)
|
||||
editor.create_model(UniqueTest)
|
||||
# Ensure the M2M exists and points to TagM2MTest
|
||||
constraints = connection.introspection.get_constraints(connection.cursor(), BookWithM2M._meta.get_field_by_name("tags")[0].rel.through._meta.db_table)
|
||||
constraints = self.get_constraints(BookWithM2M._meta.get_field_by_name("tags")[0].rel.through._meta.db_table)
|
||||
if connection.features.supports_foreign_keys:
|
||||
for name, details in constraints.items():
|
||||
if details['columns'] == ["tagm2mtest_id"] and details['foreign_key']:
|
||||
@@ -363,7 +377,7 @@ class SchemaTests(TransactionTestCase):
|
||||
# Ensure old M2M is gone
|
||||
self.assertRaises(DatabaseError, self.column_classes, BookWithM2M._meta.get_field_by_name("tags")[0].rel.through)
|
||||
# Ensure the new M2M exists and points to UniqueTest
|
||||
constraints = connection.introspection.get_constraints(connection.cursor(), new_field.rel.through._meta.db_table)
|
||||
constraints = self.get_constraints(new_field.rel.through._meta.db_table)
|
||||
if connection.features.supports_foreign_keys:
|
||||
for name, details in constraints.items():
|
||||
if details['columns'] == ["uniquetest_id"] and details['foreign_key']:
|
||||
@@ -388,7 +402,7 @@ class SchemaTests(TransactionTestCase):
|
||||
with connection.schema_editor() as editor:
|
||||
editor.create_model(Author)
|
||||
# Ensure the constraint exists
|
||||
constraints = connection.introspection.get_constraints(connection.cursor(), Author._meta.db_table)
|
||||
constraints = self.get_constraints(Author._meta.db_table)
|
||||
for name, details in constraints.items():
|
||||
if details['columns'] == ["height"] and details['check']:
|
||||
break
|
||||
@@ -404,7 +418,7 @@ class SchemaTests(TransactionTestCase):
|
||||
new_field,
|
||||
strict=True,
|
||||
)
|
||||
constraints = connection.introspection.get_constraints(connection.cursor(), Author._meta.db_table)
|
||||
constraints = self.get_constraints(Author._meta.db_table)
|
||||
for name, details in constraints.items():
|
||||
if details['columns'] == ["height"] and details['check']:
|
||||
self.fail("Check constraint for height found")
|
||||
@@ -416,7 +430,7 @@ class SchemaTests(TransactionTestCase):
|
||||
Author._meta.get_field_by_name("height")[0],
|
||||
strict=True,
|
||||
)
|
||||
constraints = connection.introspection.get_constraints(connection.cursor(), Author._meta.db_table)
|
||||
constraints = self.get_constraints(Author._meta.db_table)
|
||||
for name, details in constraints.items():
|
||||
if details['columns'] == ["height"] and details['check']:
|
||||
break
|
||||
@@ -527,7 +541,7 @@ class SchemaTests(TransactionTestCase):
|
||||
False,
|
||||
any(
|
||||
c["index"]
|
||||
for c in connection.introspection.get_constraints(connection.cursor(), "schema_tag").values()
|
||||
for c in self.get_constraints("schema_tag").values()
|
||||
if c['columns'] == ["slug", "title"]
|
||||
),
|
||||
)
|
||||
@@ -543,7 +557,7 @@ class SchemaTests(TransactionTestCase):
|
||||
True,
|
||||
any(
|
||||
c["index"]
|
||||
for c in connection.introspection.get_constraints(connection.cursor(), "schema_tag").values()
|
||||
for c in self.get_constraints("schema_tag").values()
|
||||
if c['columns'] == ["slug", "title"]
|
||||
),
|
||||
)
|
||||
@@ -561,7 +575,7 @@ class SchemaTests(TransactionTestCase):
|
||||
False,
|
||||
any(
|
||||
c["index"]
|
||||
for c in connection.introspection.get_constraints(connection.cursor(), "schema_tag").values()
|
||||
for c in self.get_constraints("schema_tag").values()
|
||||
if c['columns'] == ["slug", "title"]
|
||||
),
|
||||
)
|
||||
@@ -578,7 +592,7 @@ class SchemaTests(TransactionTestCase):
|
||||
True,
|
||||
any(
|
||||
c["index"]
|
||||
for c in connection.introspection.get_constraints(connection.cursor(), "schema_tagindexed").values()
|
||||
for c in self.get_constraints("schema_tagindexed").values()
|
||||
if c['columns'] == ["slug", "title"]
|
||||
),
|
||||
)
|
||||
@@ -627,7 +641,7 @@ class SchemaTests(TransactionTestCase):
|
||||
# Ensure the table is there and has the right index
|
||||
self.assertIn(
|
||||
"title",
|
||||
connection.introspection.get_indexes(connection.cursor(), Book._meta.db_table),
|
||||
self.get_indexes(Book._meta.db_table),
|
||||
)
|
||||
# Alter to remove the index
|
||||
new_field = CharField(max_length=100, db_index=False)
|
||||
@@ -642,7 +656,7 @@ class SchemaTests(TransactionTestCase):
|
||||
# Ensure the table is there and has no index
|
||||
self.assertNotIn(
|
||||
"title",
|
||||
connection.introspection.get_indexes(connection.cursor(), Book._meta.db_table),
|
||||
self.get_indexes(Book._meta.db_table),
|
||||
)
|
||||
# Alter to re-add the index
|
||||
with connection.schema_editor() as editor:
|
||||
@@ -655,7 +669,7 @@ class SchemaTests(TransactionTestCase):
|
||||
# Ensure the table is there and has the index again
|
||||
self.assertIn(
|
||||
"title",
|
||||
connection.introspection.get_indexes(connection.cursor(), Book._meta.db_table),
|
||||
self.get_indexes(Book._meta.db_table),
|
||||
)
|
||||
# Add a unique column, verify that creates an implicit index
|
||||
with connection.schema_editor() as editor:
|
||||
@@ -665,7 +679,7 @@ class SchemaTests(TransactionTestCase):
|
||||
)
|
||||
self.assertIn(
|
||||
"slug",
|
||||
connection.introspection.get_indexes(connection.cursor(), Book._meta.db_table),
|
||||
self.get_indexes(Book._meta.db_table),
|
||||
)
|
||||
# Remove the unique, check the index goes with it
|
||||
new_field2 = CharField(max_length=20, unique=False)
|
||||
@@ -679,7 +693,7 @@ class SchemaTests(TransactionTestCase):
|
||||
)
|
||||
self.assertNotIn(
|
||||
"slug",
|
||||
connection.introspection.get_indexes(connection.cursor(), Book._meta.db_table),
|
||||
self.get_indexes(Book._meta.db_table),
|
||||
)
|
||||
|
||||
def test_primary_key(self):
|
||||
@@ -691,7 +705,7 @@ class SchemaTests(TransactionTestCase):
|
||||
editor.create_model(Tag)
|
||||
# Ensure the table is there and has the right PK
|
||||
self.assertTrue(
|
||||
connection.introspection.get_indexes(connection.cursor(), Tag._meta.db_table)['id']['primary_key'],
|
||||
self.get_indexes(Tag._meta.db_table)['id']['primary_key'],
|
||||
)
|
||||
# Alter to change the PK
|
||||
new_field = SlugField(primary_key=True)
|
||||
@@ -707,10 +721,10 @@ class SchemaTests(TransactionTestCase):
|
||||
# Ensure the PK changed
|
||||
self.assertNotIn(
|
||||
'id',
|
||||
connection.introspection.get_indexes(connection.cursor(), Tag._meta.db_table),
|
||||
self.get_indexes(Tag._meta.db_table),
|
||||
)
|
||||
self.assertTrue(
|
||||
connection.introspection.get_indexes(connection.cursor(), Tag._meta.db_table)['slug']['primary_key'],
|
||||
self.get_indexes(Tag._meta.db_table)['slug']['primary_key'],
|
||||
)
|
||||
|
||||
def test_context_manager_exit(self):
|
||||
@@ -741,7 +755,7 @@ class SchemaTests(TransactionTestCase):
|
||||
# Ensure the table is there and has an index on the column
|
||||
self.assertIn(
|
||||
column_name,
|
||||
connection.introspection.get_indexes(connection.cursor(), BookWithLongName._meta.db_table),
|
||||
self.get_indexes(BookWithLongName._meta.db_table),
|
||||
)
|
||||
|
||||
def test_creation_deletion_reserved_names(self):
|
||||
|
||||
Reference in New Issue
Block a user