diff --git a/django/db/backends/base/features.py b/django/db/backends/base/features.py index 3417fdf1cd..bb4b59e7c8 100644 --- a/django/db/backends/base/features.py +++ b/django/db/backends/base/features.py @@ -181,6 +181,8 @@ class BaseDatabaseFeatures: # Does it support CHECK constraints? supports_column_check_constraints = True supports_table_check_constraints = True + # Does the backend support introspection of CHECK constraints? + can_introspect_check_constraints = True # Does the backend support 'pyformat' style ("... %(name)s ...", {'name': value}) # parameter passing? Note this can be provided by the backend even if not diff --git a/django/db/backends/mysql/base.py b/django/db/backends/mysql/base.py index 9b88c5ac25..792eae086f 100644 --- a/django/db/backends/mysql/base.py +++ b/django/db/backends/mysql/base.py @@ -61,6 +61,7 @@ class CursorWrapper: codes_for_integrityerror = ( 1048, # Column cannot be null 1690, # BIGINT UNSIGNED value is out of range + 4025, # CHECK constraint failed ) def __init__(self, cursor): @@ -328,6 +329,15 @@ class DatabaseWrapper(BaseDatabaseWrapper): else: return True + @cached_property + def data_type_check_constraints(self): + if self.features.supports_column_check_constraints: + return { + 'PositiveIntegerField': '`%(column)s` >= 0', + 'PositiveSmallIntegerField': '`%(column)s` >= 0', + } + return {} + @cached_property def mysql_server_info(self): with self.temporary_connection() as cursor: diff --git a/django/db/backends/mysql/features.py b/django/db/backends/mysql/features.py index 21e2e04181..229e26a750 100644 --- a/django/db/backends/mysql/features.py +++ b/django/db/backends/mysql/features.py @@ -27,8 +27,6 @@ class DatabaseFeatures(BaseDatabaseFeatures): allows_auto_pk_0 = False can_release_savepoints = True atomic_transactions = False - supports_column_check_constraints = False - supports_table_check_constraints = False can_clone_databases = True supports_temporal_subtraction = True supports_select_intersection = False @@ -89,6 +87,20 @@ class DatabaseFeatures(BaseDatabaseFeatures): return self.connection.mysql_version >= (10, 2) return self.connection.mysql_version >= (8, 0, 2) + @cached_property + def supports_column_check_constraints(self): + return self.connection.mysql_is_mariadb and self.connection.mysql_version >= (10, 2, 1) + + supports_table_check_constraints = property(operator.attrgetter('supports_column_check_constraints')) + + @cached_property + def can_introspect_check_constraints(self): + if self.connection.mysql_is_mariadb: + version = self.connection.mysql_version + if (version >= (10, 2, 22) and version < (10, 3)) or version >= (10, 3, 10): + return True + return False + @cached_property def has_select_for_update_skip_locked(self): return not self.connection.mysql_is_mariadb and self.connection.mysql_version >= (8, 0, 1) diff --git a/django/db/backends/mysql/introspection.py b/django/db/backends/mysql/introspection.py index 65eae56493..a64e82e004 100644 --- a/django/db/backends/mysql/introspection.py +++ b/django/db/backends/mysql/introspection.py @@ -1,5 +1,6 @@ from collections import namedtuple +import sqlparse from MySQLdb.constants import FIELD_TYPE from django.db.backends.base.introspection import ( @@ -189,6 +190,31 @@ class DatabaseIntrospection(BaseDatabaseIntrospection): constraints[constraint]['unique'] = True elif kind.lower() == "unique": constraints[constraint]['unique'] = True + # Add check constraints. + if self.connection.features.can_introspect_check_constraints: + type_query = """ + SELECT c.constraint_name, c.check_clause + FROM information_schema.check_constraints AS c + WHERE + c.constraint_schema = DATABASE() AND + c.table_name = %s + """ + cursor.execute(type_query, [table_name]) + for constraint, check_clause in cursor.fetchall(): + # Parse columns. + columns = OrderedSet() + for statement in sqlparse.parse(check_clause): + for token in statement.flatten(): + if token.ttype in [sqlparse.tokens.Name, sqlparse.tokens.Literal.String.Single]: + columns.add(token.value[1:-1]) + constraints[constraint] = { + 'columns': columns, + 'primary_key': False, + 'unique': False, + 'index': False, + 'check': True, + 'foreign_key': None, + } # Now add in the indexes cursor.execute("SHOW INDEX FROM %s" % self.connection.ops.quote_name(table_name)) for table, non_unique, index, colseq, column, type_ in [x[:5] + (x[10],) for x in cursor.fetchall()]: diff --git a/django/db/backends/mysql/schema.py b/django/db/backends/mysql/schema.py index 666aa292e8..9b80b9bede 100644 --- a/django/db/backends/mysql/schema.py +++ b/django/db/backends/mysql/schema.py @@ -28,9 +28,15 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor): sql_delete_pk = "ALTER TABLE %(table)s DROP PRIMARY KEY" sql_create_index = 'CREATE INDEX %(name)s ON %(table)s (%(columns)s)%(extra)s' + # The name of the column check constraint is the same as the field name on + # MariaDB. Adding IF EXISTS clause prevents migrations crash. Constraint is + # removed during a "MODIFY" column statement. + sql_delete_check = 'ALTER TABLE %(table)s DROP CONSTRAINT IF EXISTS %(name)s' def quote_value(self, value): self.connection.ensure_connection() + if isinstance(value, str): + value = value.replace('%', '%%') # MySQLdb escapes to string, PyMySQL to bytes. quoted = self.connection.connection.escape(value, self.connection.connection.encoders) if isinstance(value, str) and isinstance(quoted, bytes): diff --git a/tests/constraints/tests.py b/tests/constraints/tests.py index c2f99c565d..7ac478a89f 100644 --- a/tests/constraints/tests.py +++ b/tests/constraints/tests.py @@ -73,7 +73,7 @@ class CheckConstraintTests(TestCase): with self.assertRaises(IntegrityError): Product.objects.create(name='Invalid', price=10, discounted_price=20) - @skipUnlessDBFeature('supports_table_check_constraints') + @skipUnlessDBFeature('supports_table_check_constraints', 'can_introspect_check_constraints') def test_name(self): constraints = get_constraints(Product._meta.db_table) for expected_name in ( @@ -83,7 +83,7 @@ class CheckConstraintTests(TestCase): with self.subTest(expected_name): self.assertIn(expected_name, constraints) - @skipUnlessDBFeature('supports_table_check_constraints') + @skipUnlessDBFeature('supports_table_check_constraints', 'can_introspect_check_constraints') def test_abstract_name(self): constraints = get_constraints(ChildModel._meta.db_table) self.assertIn('constraints_childmodel_adult', constraints) diff --git a/tests/introspection/tests.py b/tests/introspection/tests.py index 10524cdacb..7edb01a4b7 100644 --- a/tests/introspection/tests.py +++ b/tests/introspection/tests.py @@ -237,7 +237,10 @@ class IntrospectionTests(TransactionTestCase): 'article_email_pub_date_uniq', 'email_pub_date_idx', } - if connection.features.supports_column_check_constraints: + if ( + connection.features.supports_column_check_constraints and + connection.features.can_introspect_check_constraints + ): custom_constraints.add('up_votes_gte_0_check') assertDetails(constraints['up_votes_gte_0_check'], ['up_votes'], check=True) assertDetails(constraints['article_email_pub_date_uniq'], ['article_id', 'email', 'pub_date'], unique=True) diff --git a/tests/schema/tests.py b/tests/schema/tests.py index 91760987d7..2a22e9dec3 100644 --- a/tests/schema/tests.py +++ b/tests/schema/tests.py @@ -1556,7 +1556,7 @@ class SchemaTests(TransactionTestCase): # Ensure the m2m table is still there. self.assertEqual(len(self.column_classes(LocalM2M)), 1) - @skipUnlessDBFeature('supports_column_check_constraints') + @skipUnlessDBFeature('supports_column_check_constraints', 'can_introspect_check_constraints') def test_check_constraints(self): """ Tests creating/deleting CHECK constraints @@ -1586,7 +1586,7 @@ class SchemaTests(TransactionTestCase): if not any(details['columns'] == ['height'] and details['check'] for details in constraints.values()): self.fail("No check constraint for height found") - @skipUnlessDBFeature('supports_column_check_constraints') + @skipUnlessDBFeature('supports_column_check_constraints', 'can_introspect_check_constraints') def test_remove_field_check_does_not_remove_meta_constraints(self): with connection.schema_editor() as editor: editor.create_model(Author)