django/tests/backends/mysql/test_introspection.py

124 lines
5.3 KiB
Python

from unittest import skipUnless
from django.db import connection, connections
from django.test import TestCase
@skipUnless(connection.vendor == "mysql", "MySQL tests")
class ParsingTests(TestCase):
def test_parse_constraint_columns(self):
_parse_constraint_columns = connection.introspection._parse_constraint_columns
tests = (
("`height` >= 0", ["height"], ["height"]),
("`cost` BETWEEN 1 AND 10", ["cost"], ["cost"]),
("`ref1` > `ref2`", ["id", "ref1", "ref2"], ["ref1", "ref2"]),
(
"`start` IS NULL OR `end` IS NULL OR `start` < `end`",
["id", "start", "end"],
["start", "end"],
),
("JSON_VALID(`json_field`)", ["json_field"], ["json_field"]),
("CHAR_LENGTH(`name`) > 2", ["name"], ["name"]),
("lower(`ref1`) != 'test'", ["id", "owe", "ref1"], ["ref1"]),
("lower(`ref1`) != 'test'", ["id", "lower", "ref1"], ["ref1"]),
("`name` LIKE 'test%'", ["name"], ["name"]),
)
for check_clause, table_columns, expected_columns in tests:
with self.subTest(check_clause):
check_columns = _parse_constraint_columns(check_clause, table_columns)
self.assertEqual(list(check_columns), expected_columns)
@skipUnless(connection.vendor == "mysql", "MySQL tests")
class StorageEngineTests(TestCase):
databases = {"default", "other"}
def test_get_storage_engine(self):
table_name = "test_storage_engine"
create_sql = "CREATE TABLE %s (id INTEGER) ENGINE = %%s" % table_name
drop_sql = "DROP TABLE %s" % table_name
default_connection = connections["default"]
other_connection = connections["other"]
try:
with default_connection.cursor() as cursor:
cursor.execute(create_sql % "InnoDB")
self.assertEqual(
default_connection.introspection.get_storage_engine(
cursor, table_name
),
"InnoDB",
)
with other_connection.cursor() as cursor:
cursor.execute(create_sql % "MyISAM")
self.assertEqual(
other_connection.introspection.get_storage_engine(
cursor, table_name
),
"MyISAM",
)
finally:
with default_connection.cursor() as cursor:
cursor.execute(drop_sql)
with other_connection.cursor() as cursor:
cursor.execute(drop_sql)
@skipUnless(connection.vendor == "mysql", "MySQL specific SQL")
class TestCrossDatabaseRelations(TestCase):
databases = {"default", "other"}
def test_omit_cross_database_relations(self):
default_connection = connections["default"]
other_connection = connections["other"]
main_table = "cross_schema_get_relations_main_table"
main_table_quoted = default_connection.ops.quote_name(main_table)
other_schema_quoted = other_connection.ops.quote_name(
other_connection.settings_dict["NAME"]
)
rel_table = "cross_schema_get_relations_rel_table"
rel_table_quoted = other_connection.ops.quote_name(rel_table)
rel_column = "cross_schema_get_relations_rel_table_id"
rel_column_quoted = other_connection.ops.quote_name(rel_column)
try:
with other_connection.cursor() as other_cursor:
other_cursor.execute(
f"""
CREATE TABLE {rel_table_quoted} (
id integer AUTO_INCREMENT,
PRIMARY KEY (id)
)
"""
)
with default_connection.cursor() as default_cursor:
# Create table in the default schema with a cross-database
# relation.
default_cursor.execute(
f"""
CREATE TABLE {main_table_quoted} (
id integer AUTO_INCREMENT,
{rel_column_quoted} integer NOT NULL,
PRIMARY KEY (id),
FOREIGN KEY ({rel_column_quoted})
REFERENCES {other_schema_quoted}.{rel_table_quoted}(id)
)
"""
)
relations = default_connection.introspection.get_relations(
default_cursor, main_table
)
constraints = default_connection.introspection.get_constraints(
default_cursor, main_table
)
self.assertEqual(len(relations), 0)
rel_column_fk_constraints = [
spec
for name, spec in constraints.items()
if spec["columns"] == [rel_column] and spec["foreign_key"] is not None
]
self.assertEqual(len(rel_column_fk_constraints), 0)
finally:
with default_connection.cursor() as default_cursor:
default_cursor.execute(f"DROP TABLE IF EXISTS {main_table_quoted}")
with other_connection.cursor() as other_cursor:
other_cursor.execute(f"DROP TABLE IF EXISTS {rel_table_quoted}")