mirror of https://github.com/django/django.git
124 lines
5.3 KiB
Python
124 lines
5.3 KiB
Python
from unittest import skipUnless
|
|
|
|
from django.db import connection, connections
|
|
from django.test import TestCase
|
|
|
|
|
|
@skipUnless(connection.vendor == "mysql", "MySQL tests")
|
|
class ParsingTests(TestCase):
|
|
def test_parse_constraint_columns(self):
|
|
_parse_constraint_columns = connection.introspection._parse_constraint_columns
|
|
tests = (
|
|
("`height` >= 0", ["height"], ["height"]),
|
|
("`cost` BETWEEN 1 AND 10", ["cost"], ["cost"]),
|
|
("`ref1` > `ref2`", ["id", "ref1", "ref2"], ["ref1", "ref2"]),
|
|
(
|
|
"`start` IS NULL OR `end` IS NULL OR `start` < `end`",
|
|
["id", "start", "end"],
|
|
["start", "end"],
|
|
),
|
|
("JSON_VALID(`json_field`)", ["json_field"], ["json_field"]),
|
|
("CHAR_LENGTH(`name`) > 2", ["name"], ["name"]),
|
|
("lower(`ref1`) != 'test'", ["id", "owe", "ref1"], ["ref1"]),
|
|
("lower(`ref1`) != 'test'", ["id", "lower", "ref1"], ["ref1"]),
|
|
("`name` LIKE 'test%'", ["name"], ["name"]),
|
|
)
|
|
for check_clause, table_columns, expected_columns in tests:
|
|
with self.subTest(check_clause):
|
|
check_columns = _parse_constraint_columns(check_clause, table_columns)
|
|
self.assertEqual(list(check_columns), expected_columns)
|
|
|
|
|
|
@skipUnless(connection.vendor == "mysql", "MySQL tests")
|
|
class StorageEngineTests(TestCase):
|
|
databases = {"default", "other"}
|
|
|
|
def test_get_storage_engine(self):
|
|
table_name = "test_storage_engine"
|
|
create_sql = "CREATE TABLE %s (id INTEGER) ENGINE = %%s" % table_name
|
|
drop_sql = "DROP TABLE %s" % table_name
|
|
default_connection = connections["default"]
|
|
other_connection = connections["other"]
|
|
try:
|
|
with default_connection.cursor() as cursor:
|
|
cursor.execute(create_sql % "InnoDB")
|
|
self.assertEqual(
|
|
default_connection.introspection.get_storage_engine(
|
|
cursor, table_name
|
|
),
|
|
"InnoDB",
|
|
)
|
|
with other_connection.cursor() as cursor:
|
|
cursor.execute(create_sql % "MyISAM")
|
|
self.assertEqual(
|
|
other_connection.introspection.get_storage_engine(
|
|
cursor, table_name
|
|
),
|
|
"MyISAM",
|
|
)
|
|
finally:
|
|
with default_connection.cursor() as cursor:
|
|
cursor.execute(drop_sql)
|
|
with other_connection.cursor() as cursor:
|
|
cursor.execute(drop_sql)
|
|
|
|
|
|
@skipUnless(connection.vendor == "mysql", "MySQL specific SQL")
|
|
class TestCrossDatabaseRelations(TestCase):
|
|
databases = {"default", "other"}
|
|
|
|
def test_omit_cross_database_relations(self):
|
|
default_connection = connections["default"]
|
|
other_connection = connections["other"]
|
|
main_table = "cross_schema_get_relations_main_table"
|
|
main_table_quoted = default_connection.ops.quote_name(main_table)
|
|
other_schema_quoted = other_connection.ops.quote_name(
|
|
other_connection.settings_dict["NAME"]
|
|
)
|
|
rel_table = "cross_schema_get_relations_rel_table"
|
|
rel_table_quoted = other_connection.ops.quote_name(rel_table)
|
|
rel_column = "cross_schema_get_relations_rel_table_id"
|
|
rel_column_quoted = other_connection.ops.quote_name(rel_column)
|
|
try:
|
|
with other_connection.cursor() as other_cursor:
|
|
other_cursor.execute(
|
|
f"""
|
|
CREATE TABLE {rel_table_quoted} (
|
|
id integer AUTO_INCREMENT,
|
|
PRIMARY KEY (id)
|
|
)
|
|
"""
|
|
)
|
|
with default_connection.cursor() as default_cursor:
|
|
# Create table in the default schema with a cross-database
|
|
# relation.
|
|
default_cursor.execute(
|
|
f"""
|
|
CREATE TABLE {main_table_quoted} (
|
|
id integer AUTO_INCREMENT,
|
|
{rel_column_quoted} integer NOT NULL,
|
|
PRIMARY KEY (id),
|
|
FOREIGN KEY ({rel_column_quoted})
|
|
REFERENCES {other_schema_quoted}.{rel_table_quoted}(id)
|
|
)
|
|
"""
|
|
)
|
|
relations = default_connection.introspection.get_relations(
|
|
default_cursor, main_table
|
|
)
|
|
constraints = default_connection.introspection.get_constraints(
|
|
default_cursor, main_table
|
|
)
|
|
self.assertEqual(len(relations), 0)
|
|
rel_column_fk_constraints = [
|
|
spec
|
|
for name, spec in constraints.items()
|
|
if spec["columns"] == [rel_column] and spec["foreign_key"] is not None
|
|
]
|
|
self.assertEqual(len(rel_column_fk_constraints), 0)
|
|
finally:
|
|
with default_connection.cursor() as default_cursor:
|
|
default_cursor.execute(f"DROP TABLE IF EXISTS {main_table_quoted}")
|
|
with other_connection.cursor() as other_cursor:
|
|
other_cursor.execute(f"DROP TABLE IF EXISTS {rel_table_quoted}")
|