mirror of
				https://github.com/django/django.git
				synced 2025-10-26 07:06:08 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			124 lines
		
	
	
		
			5.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			124 lines
		
	
	
		
			5.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from unittest import skipUnless
 | |
| 
 | |
| from django.db import connection, connections
 | |
| from django.test import TestCase
 | |
| 
 | |
| 
 | |
| @skipUnless(connection.vendor == "mysql", "MySQL tests")
 | |
| class ParsingTests(TestCase):
 | |
|     def test_parse_constraint_columns(self):
 | |
|         _parse_constraint_columns = connection.introspection._parse_constraint_columns
 | |
|         tests = (
 | |
|             ("`height` >= 0", ["height"], ["height"]),
 | |
|             ("`cost` BETWEEN 1 AND 10", ["cost"], ["cost"]),
 | |
|             ("`ref1` > `ref2`", ["id", "ref1", "ref2"], ["ref1", "ref2"]),
 | |
|             (
 | |
|                 "`start` IS NULL OR `end` IS NULL OR `start` < `end`",
 | |
|                 ["id", "start", "end"],
 | |
|                 ["start", "end"],
 | |
|             ),
 | |
|             ("JSON_VALID(`json_field`)", ["json_field"], ["json_field"]),
 | |
|             ("CHAR_LENGTH(`name`) > 2", ["name"], ["name"]),
 | |
|             ("lower(`ref1`) != 'test'", ["id", "owe", "ref1"], ["ref1"]),
 | |
|             ("lower(`ref1`) != 'test'", ["id", "lower", "ref1"], ["ref1"]),
 | |
|             ("`name` LIKE 'test%'", ["name"], ["name"]),
 | |
|         )
 | |
|         for check_clause, table_columns, expected_columns in tests:
 | |
|             with self.subTest(check_clause):
 | |
|                 check_columns = _parse_constraint_columns(check_clause, table_columns)
 | |
|                 self.assertEqual(list(check_columns), expected_columns)
 | |
| 
 | |
| 
 | |
| @skipUnless(connection.vendor == "mysql", "MySQL tests")
 | |
| class StorageEngineTests(TestCase):
 | |
|     databases = {"default", "other"}
 | |
| 
 | |
|     def test_get_storage_engine(self):
 | |
|         table_name = "test_storage_engine"
 | |
|         create_sql = "CREATE TABLE %s (id INTEGER) ENGINE = %%s" % table_name
 | |
|         drop_sql = "DROP TABLE %s" % table_name
 | |
|         default_connection = connections["default"]
 | |
|         other_connection = connections["other"]
 | |
|         try:
 | |
|             with default_connection.cursor() as cursor:
 | |
|                 cursor.execute(create_sql % "InnoDB")
 | |
|                 self.assertEqual(
 | |
|                     default_connection.introspection.get_storage_engine(
 | |
|                         cursor, table_name
 | |
|                     ),
 | |
|                     "InnoDB",
 | |
|                 )
 | |
|             with other_connection.cursor() as cursor:
 | |
|                 cursor.execute(create_sql % "MyISAM")
 | |
|                 self.assertEqual(
 | |
|                     other_connection.introspection.get_storage_engine(
 | |
|                         cursor, table_name
 | |
|                     ),
 | |
|                     "MyISAM",
 | |
|                 )
 | |
|         finally:
 | |
|             with default_connection.cursor() as cursor:
 | |
|                 cursor.execute(drop_sql)
 | |
|             with other_connection.cursor() as cursor:
 | |
|                 cursor.execute(drop_sql)
 | |
| 
 | |
| 
 | |
| @skipUnless(connection.vendor == "mysql", "MySQL specific SQL")
 | |
| class TestCrossDatabaseRelations(TestCase):
 | |
|     databases = {"default", "other"}
 | |
| 
 | |
|     def test_omit_cross_database_relations(self):
 | |
|         default_connection = connections["default"]
 | |
|         other_connection = connections["other"]
 | |
|         main_table = "cross_schema_get_relations_main_table"
 | |
|         main_table_quoted = default_connection.ops.quote_name(main_table)
 | |
|         other_schema_quoted = other_connection.ops.quote_name(
 | |
|             other_connection.settings_dict["NAME"]
 | |
|         )
 | |
|         rel_table = "cross_schema_get_relations_rel_table"
 | |
|         rel_table_quoted = other_connection.ops.quote_name(rel_table)
 | |
|         rel_column = "cross_schema_get_relations_rel_table_id"
 | |
|         rel_column_quoted = other_connection.ops.quote_name(rel_column)
 | |
|         try:
 | |
|             with other_connection.cursor() as other_cursor:
 | |
|                 other_cursor.execute(
 | |
|                     f"""
 | |
|                     CREATE TABLE {rel_table_quoted} (
 | |
|                         id integer AUTO_INCREMENT,
 | |
|                         PRIMARY KEY (id)
 | |
|                     )
 | |
|                     """
 | |
|                 )
 | |
|             with default_connection.cursor() as default_cursor:
 | |
|                 # Create table in the default schema with a cross-database
 | |
|                 # relation.
 | |
|                 default_cursor.execute(
 | |
|                     f"""
 | |
|                     CREATE TABLE {main_table_quoted} (
 | |
|                         id integer AUTO_INCREMENT,
 | |
|                         {rel_column_quoted} integer NOT NULL,
 | |
|                         PRIMARY KEY (id),
 | |
|                         FOREIGN KEY ({rel_column_quoted})
 | |
|                         REFERENCES {other_schema_quoted}.{rel_table_quoted}(id)
 | |
|                     )
 | |
|                     """
 | |
|                 )
 | |
|                 relations = default_connection.introspection.get_relations(
 | |
|                     default_cursor, main_table
 | |
|                 )
 | |
|                 constraints = default_connection.introspection.get_constraints(
 | |
|                     default_cursor, main_table
 | |
|                 )
 | |
|             self.assertEqual(len(relations), 0)
 | |
|             rel_column_fk_constraints = [
 | |
|                 spec
 | |
|                 for name, spec in constraints.items()
 | |
|                 if spec["columns"] == [rel_column] and spec["foreign_key"] is not None
 | |
|             ]
 | |
|             self.assertEqual(len(rel_column_fk_constraints), 0)
 | |
|         finally:
 | |
|             with default_connection.cursor() as default_cursor:
 | |
|                 default_cursor.execute(f"DROP TABLE IF EXISTS {main_table_quoted}")
 | |
|             with other_connection.cursor() as other_cursor:
 | |
|                 other_cursor.execute(f"DROP TABLE IF EXISTS {rel_table_quoted}")
 |