diff --git a/django/db/backends/base/features.py b/django/db/backends/base/features.py index 484b272294..9bb0ed6848 100644 --- a/django/db/backends/base/features.py +++ b/django/db/backends/base/features.py @@ -36,6 +36,7 @@ class BaseDatabaseFeatures(object): allow_sliced_subqueries = True has_select_for_update = False has_select_for_update_nowait = False + has_select_for_update_skip_locked = False supports_select_related = True diff --git a/django/db/backends/base/operations.py b/django/db/backends/base/operations.py index 4e5814e58f..1ed11b178e 100644 --- a/django/db/backends/base/operations.py +++ b/django/db/backends/base/operations.py @@ -177,12 +177,14 @@ class BaseDatabaseOperations(object): """ return [] - def for_update_sql(self, nowait=False): + def for_update_sql(self, nowait=False, skip_locked=False): """ Returns the FOR UPDATE SQL clause to lock rows for an update operation. """ if nowait: return 'FOR UPDATE NOWAIT' + elif skip_locked: + return 'FOR UPDATE SKIP LOCKED' else: return 'FOR UPDATE' diff --git a/django/db/backends/oracle/features.py b/django/db/backends/oracle/features.py index 84e18cf9ba..956f7a051b 100644 --- a/django/db/backends/oracle/features.py +++ b/django/db/backends/oracle/features.py @@ -13,6 +13,7 @@ class DatabaseFeatures(BaseDatabaseFeatures): uses_savepoints = True has_select_for_update = True has_select_for_update_nowait = True + has_select_for_update_skip_locked = True can_return_id_from_insert = True allow_sliced_subqueries = False supports_subqueries_in_group_by = False diff --git a/django/db/backends/postgresql/features.py b/django/db/backends/postgresql/features.py index 218a7645de..a9e1c77480 100644 --- a/django/db/backends/postgresql/features.py +++ b/django/db/backends/postgresql/features.py @@ -1,5 +1,6 @@ from django.db.backends.base.features import BaseDatabaseFeatures from django.db.utils import InterfaceError +from django.utils.functional import cached_property class DatabaseFeatures(BaseDatabaseFeatures): @@ -31,3 +32,7 @@ class DatabaseFeatures(BaseDatabaseFeatures): greatest_least_ignores_nulls = True can_clone_databases = True supports_temporal_subtraction = True + + @cached_property + def has_select_for_update_skip_locked(self): + return self.connection.pg_version >= 90500 diff --git a/django/db/models/query.py b/django/db/models/query.py index 07f1fc414f..0a0965fe47 100644 --- a/django/db/models/query.py +++ b/django/db/models/query.py @@ -835,15 +835,18 @@ class QuerySet(object): else: return self._filter_or_exclude(None, **filter_obj) - def select_for_update(self, nowait=False): + def select_for_update(self, nowait=False, skip_locked=False): """ Returns a new QuerySet instance that will select objects with a FOR UPDATE lock. """ + if nowait and skip_locked: + raise ValueError('The nowait option cannot be used with skip_locked.') obj = self._clone() obj._for_write = True obj.query.select_for_update = True obj.query.select_for_update_nowait = nowait + obj.query.select_for_update_skip_locked = skip_locked return obj def select_related(self, *fields): diff --git a/django/db/models/sql/compiler.py b/django/db/models/sql/compiler.py index cedb140143..5eaac1fc74 100644 --- a/django/db/models/sql/compiler.py +++ b/django/db/models/sql/compiler.py @@ -445,13 +445,16 @@ class SQLCompiler(object): "select_for_update cannot be used outside of a transaction." ) - # If we've been asked for a NOWAIT query but the backend does - # not support it, raise a DatabaseError otherwise we could get - # an unexpected deadlock. nowait = self.query.select_for_update_nowait + skip_locked = self.query.select_for_update_skip_locked + # If we've been asked for a NOWAIT/SKIP LOCKED query but the + # backend does not support it, raise a DatabaseError otherwise + # we could get an unexpected deadlock. if nowait and not self.connection.features.has_select_for_update_nowait: raise DatabaseError('NOWAIT is not supported on this database backend.') - result.append(self.connection.ops.for_update_sql(nowait=nowait)) + elif skip_locked and not self.connection.features.has_select_for_update_skip_locked: + raise DatabaseError('SKIP LOCKED is not supported on this database backend.') + result.append(self.connection.ops.for_update_sql(nowait=nowait, skip_locked=skip_locked)) return ' '.join(result), tuple(params) finally: diff --git a/django/db/models/sql/query.py b/django/db/models/sql/query.py index d182242ce4..426087f8af 100644 --- a/django/db/models/sql/query.py +++ b/django/db/models/sql/query.py @@ -167,6 +167,7 @@ class Query(object): self.distinct_fields = [] self.select_for_update = False self.select_for_update_nowait = False + self.select_for_update_skip_locked = False self.select_related = False # Arbitrary limit for select_related to prevents infinite recursion. @@ -286,6 +287,7 @@ class Query(object): obj.distinct_fields = self.distinct_fields[:] obj.select_for_update = self.select_for_update obj.select_for_update_nowait = self.select_for_update_nowait + obj.select_for_update_skip_locked = self.select_for_update_skip_locked obj.select_related = self.select_related obj.values_select = self.values_select[:] obj._annotations = self._annotations.copy() if self._annotations is not None else None diff --git a/docs/ref/databases.txt b/docs/ref/databases.txt index 35a3dbcbe8..983b7525c3 100644 --- a/docs/ref/databases.txt +++ b/docs/ref/databases.txt @@ -569,9 +569,9 @@ both MySQL and Django will attempt to convert the values from UTC to local time. Row locking with ``QuerySet.select_for_update()`` ------------------------------------------------- -MySQL does not support the ``NOWAIT`` option to the ``SELECT ... FOR UPDATE`` -statement. If ``select_for_update()`` is used with ``nowait=True`` then a -``DatabaseError`` will be raised. +MySQL does not support the ``NOWAIT`` and ``SKIP LOCKED`` options to the +``SELECT ... FOR UPDATE`` statement. If ``select_for_update()`` is used with +``nowait=True`` or ``skip_locked=True`` then a ``DatabaseError`` will be raised. Automatic typecasting can cause unexpected results -------------------------------------------------- diff --git a/docs/ref/models/querysets.txt b/docs/ref/models/querysets.txt index 3f4fce1a26..8d2e3ebb3b 100644 --- a/docs/ref/models/querysets.txt +++ b/docs/ref/models/querysets.txt @@ -1528,7 +1528,7 @@ For example:: ``select_for_update()`` ~~~~~~~~~~~~~~~~~~~~~~~ -.. method:: select_for_update(nowait=False) +.. method:: select_for_update(nowait=False, skip_locked=False) Returns a queryset that will lock rows until the end of the transaction, generating a ``SELECT ... FOR UPDATE`` SQL statement on supported databases. @@ -1546,16 +1546,19 @@ selected rows, the query will block until the lock is released. If this is not the behavior you want, call ``select_for_update(nowait=True)``. This will make the call non-blocking. If a conflicting lock is already acquired by another transaction, :exc:`~django.db.DatabaseError` will be raised when the -queryset is evaluated. +queryset is evaluated. You can also ignore locked rows by using +``select_for_update(skip_locked=True)`` instead. The ``nowait`` and +``skip_locked`` are mutually exclusive and attempts to call +``select_for_update()`` with both options enabled will result in a +:exc:`ValueError`. Currently, the ``postgresql``, ``oracle``, and ``mysql`` database -backends support ``select_for_update()``. However, MySQL has no support for the -``nowait`` argument. Obviously, users of external third-party backends should -check with their backend's documentation for specifics in those cases. +backends support ``select_for_update()``. However, MySQL doesn't support the +``nowait`` and ``skip_locked`` arguments. -Passing ``nowait=True`` to ``select_for_update()`` using database backends that -do not support ``nowait``, such as MySQL, will cause a -:exc:`~django.db.DatabaseError` to be raised. This is in order to prevent code +Passing ``nowait=True`` or ``skip_locked=True`` to ``select_for_update()`` +using database backends that do not support these options, such as MySQL, will +cause a :exc:`~django.db.DatabaseError` to be raised. This prevents code from unexpectedly blocking. Evaluating a queryset with ``select_for_update()`` in autocommit mode on @@ -1580,6 +1583,10 @@ raised if ``select_for_update()`` is used in autocommit mode. ``select_for_update()`` you should use :class:`~django.test.TransactionTestCase`. +.. versionchanged:: 1.11 + + The ``skip_locked`` argument was added. + ``raw()`` ~~~~~~~~~ diff --git a/docs/releases/1.11.txt b/docs/releases/1.11.txt index 1c7509b062..128ef2c203 100644 --- a/docs/releases/1.11.txt +++ b/docs/releases/1.11.txt @@ -150,7 +150,9 @@ CSRF Database backends ~~~~~~~~~~~~~~~~~ -* ... +* Added the ``skip_locked`` argument to :meth:`.QuerySet.select_for_update()` + on PostgreSQL 9.5+ and Oracle to execute queries with + ``FOR UPDATE SKIP LOCKED``. Email ~~~~~ @@ -297,6 +299,9 @@ Database backend API support the :lookup:`time` lookup. It accepts a ``field_name`` and ``tzname`` arguments and returns the SQL necessary to cast a datetime value to time value. +* To enable ``FOR UPDATE SKIP LOCKED`` support, set + ``DatabaseFeatures.has_select_for_update_skip_locked = True``. + Dropped support for PostgreSQL 9.2 and PostGIS 2.0 -------------------------------------------------- diff --git a/tests/select_for_update/tests.py b/tests/select_for_update/tests.py index 594dc0eb83..55ead7ef8f 100644 --- a/tests/select_for_update/tests.py +++ b/tests/select_for_update/tests.py @@ -52,10 +52,10 @@ class SelectForUpdateTests(TransactionTestCase): self.new_connection.rollback() self.new_connection.set_autocommit(True) - def has_for_update_sql(self, queries, nowait=False): + def has_for_update_sql(self, queries, **kwargs): # Examine the SQL that was executed to determine whether it # contains the 'SELECT..FOR UPDATE' stanza. - for_update_sql = connection.ops.for_update_sql(nowait) + for_update_sql = connection.ops.for_update_sql(**kwargs) return any(for_update_sql in query['sql'] for query in queries) @skipUnlessDBFeature('has_select_for_update') @@ -78,6 +78,16 @@ class SelectForUpdateTests(TransactionTestCase): list(Person.objects.all().select_for_update(nowait=True)) self.assertTrue(self.has_for_update_sql(ctx.captured_queries, nowait=True)) + @skipUnlessDBFeature('has_select_for_update_skip_locked') + def test_for_update_sql_generated_skip_locked(self): + """ + Test that the backend's FOR UPDATE SKIP LOCKED variant appears in + generated SQL when select_for_update is invoked. + """ + with transaction.atomic(), CaptureQueriesContext(connection) as ctx: + list(Person.objects.all().select_for_update(skip_locked=True)) + self.assertTrue(self.has_for_update_sql(ctx.captured_queries, skip_locked=True)) + @skipUnlessDBFeature('has_select_for_update_nowait') def test_nowait_raises_error_on_block(self): """ @@ -99,6 +109,25 @@ class SelectForUpdateTests(TransactionTestCase): self.end_blocking_transaction() self.assertIsInstance(status[-1], DatabaseError) + @skipUnlessDBFeature('has_select_for_update_skip_locked') + def test_skip_locked_skips_locked_rows(self): + """ + If skip_locked is specified, the locked row is skipped resulting in + Person.DoesNotExist. + """ + self.start_blocking_transaction() + status = [] + thread = threading.Thread( + target=self.run_select_for_update, + args=(status,), + kwargs={'skip_locked': True}, + ) + thread.start() + time.sleep(1) + thread.join() + self.end_blocking_transaction() + self.assertIsInstance(status[-1], Person.DoesNotExist) + @skipIfDBFeature('has_select_for_update_nowait') @skipUnlessDBFeature('has_select_for_update') def test_unsupported_nowait_raises_error(self): @@ -110,6 +139,17 @@ class SelectForUpdateTests(TransactionTestCase): with self.assertRaises(DatabaseError): list(Person.objects.all().select_for_update(nowait=True)) + @skipIfDBFeature('has_select_for_update_skip_locked') + @skipUnlessDBFeature('has_select_for_update') + def test_unsupported_skip_locked_raises_error(self): + """ + DatabaseError is raised if a SELECT...FOR UPDATE SKIP LOCKED is run on + a database backend that supports FOR UPDATE but not SKIP LOCKED. + """ + with self.assertRaisesMessage(DatabaseError, 'SKIP LOCKED is not supported on this database backend.'): + with transaction.atomic(): + Person.objects.select_for_update(skip_locked=True).get() + @skipUnlessDBFeature('has_select_for_update') def test_for_update_requires_transaction(self): """ @@ -130,7 +170,7 @@ class SelectForUpdateTests(TransactionTestCase): with self.assertRaises(transaction.TransactionManagementError): list(people) - def run_select_for_update(self, status, nowait=False): + def run_select_for_update(self, status, **kwargs): """ Utility method that runs a SELECT FOR UPDATE against all Person instances. After the select_for_update, it attempts @@ -143,12 +183,10 @@ class SelectForUpdateTests(TransactionTestCase): # We need to enter transaction management again, as this is done on # per-thread basis with transaction.atomic(): - people = list( - Person.objects.all().select_for_update(nowait=nowait) - ) - people[0].name = 'Fred' - people[0].save() - except DatabaseError as e: + person = Person.objects.select_for_update(**kwargs).get() + person.name = 'Fred' + person.save() + except (DatabaseError, Person.DoesNotExist) as e: status.append(e) finally: # This method is run in a separate thread. It uses its own @@ -248,3 +286,7 @@ class SelectForUpdateTests(TransactionTestCase): with transaction.atomic(): person = Person.objects.select_for_update().get(name='Reinhardt') self.assertEqual(person.name, 'Reinhardt') + + def test_nowait_and_skip_locked(self): + with self.assertRaisesMessage(ValueError, 'The nowait option cannot be used with skip_locked.'): + Person.objects.select_for_update(nowait=True, skip_locked=True)