diff --git a/django/db/models/deletion.py b/django/db/models/deletion.py
index e3c38a03cc..dffdf808d1 100644
--- a/django/db/models/deletion.py
+++ b/django/db/models/deletion.py
@@ -144,6 +144,18 @@ class Collector(object):
                 return False
         return True
 
+    def get_del_batches(self, objs, field):
+        """
+        Returns the objs in suitably sized batches for the used connection.
+        """
+        conn_batch_size = max(
+            connections[self.using].ops.bulk_batch_size([field.name], objs), 1)
+        if len(objs) > conn_batch_size:
+            return [objs[i:i + conn_batch_size]
+                    for i in range(0, len(objs), conn_batch_size)]
+        else:
+            return [objs]
+
     def collect(self, objs, source=None, nullable=False, collect_related=True,
             source_attr=None, reverse_dependency=False):
         """
@@ -192,11 +204,13 @@ class Collector(object):
                 field = related.field
                 if field.rel.on_delete == DO_NOTHING:
                     continue
-                sub_objs = self.related_objects(related, new_objs)
-                if self.can_fast_delete(sub_objs, from_field=field):
-                    self.fast_deletes.append(sub_objs)
-                elif sub_objs:
-                    field.rel.on_delete(self, field, sub_objs, self.using)
+                batches = self.get_del_batches(new_objs, field)
+                for batch in batches:
+                    sub_objs = self.related_objects(related, batch)
+                    if self.can_fast_delete(sub_objs, from_field=field):
+                        self.fast_deletes.append(sub_objs)
+                    elif sub_objs:
+                        field.rel.on_delete(self, field, sub_objs, self.using)
             for field in model._meta.virtual_fields:
                 if hasattr(field, 'bulk_related_objects'):
                     # Its something like generic foreign key.
diff --git a/tests/delete/tests.py b/tests/delete/tests.py
index 6ecdb01e65..370cd7ddde 100644
--- a/tests/delete/tests.py
+++ b/tests/delete/tests.py
@@ -1,6 +1,9 @@
 from __future__ import unicode_literals
 
+from math import ceil
+
 from django.db import models, IntegrityError, connection
+from django.db.models.sql.constants import GET_ITERATOR_CHUNK_SIZE
 from django.test import TestCase, skipUnlessDBFeature, skipIfDBFeature
 from django.utils.six.moves import xrange
 
@@ -165,7 +168,6 @@ class DeletionTests(TestCase):
         self.assertFalse(m.m2m_through_null.exists())
 
     def test_bulk(self):
-        from django.db.models.sql.constants import GET_ITERATOR_CHUNK_SIZE
         s = S.objects.create(r=R.objects.create())
         for i in xrange(2 * GET_ITERATOR_CHUNK_SIZE):
             T.objects.create(s=s)
@@ -311,6 +313,41 @@ class DeletionTests(TestCase):
         r.delete()
         self.assertEqual(HiddenUserProfile.objects.count(), 0)
 
+    def test_large_delete(self):
+        TEST_SIZE = 2000
+        objs = [Avatar() for i in range(0, TEST_SIZE)]
+        Avatar.objects.bulk_create(objs)
+        # Calculate the number of queries needed.
+        batch_size = connection.ops.bulk_batch_size(['pk'], objs)
+        # The related fetches are done in batches.
+        batches = int(ceil(float(len(objs)) / batch_size))
+        # One query for Avatar.objects.all() and then one related fast delete for
+        # each batch.
+        fetches_to_mem = 1 + batches
+        # The Avatar objecs are going to be deleted in batches of GET_ITERATOR_CHUNK_SIZE
+        queries = fetches_to_mem + TEST_SIZE // GET_ITERATOR_CHUNK_SIZE
+        self.assertNumQueries(queries, Avatar.objects.all().delete)
+        self.assertFalse(Avatar.objects.exists())
+
+    def test_large_delete_related(self):
+        TEST_SIZE = 2000
+        s = S.objects.create(r=R.objects.create())
+        for i in xrange(TEST_SIZE):
+            T.objects.create(s=s)
+
+        batch_size = max(connection.ops.bulk_batch_size(['pk'], xrange(TEST_SIZE)), 1)
+
+        # TEST_SIZE // batch_size (select related `T` instances)
+        # + 1 (select related `U` instances)
+        # + TEST_SIZE // GET_ITERATOR_CHUNK_SIZE (delete `T` instances in batches)
+        # + 1 (delete `s`)
+        expected_num_queries = (ceil(TEST_SIZE // batch_size) +
+                                ceil(TEST_SIZE // GET_ITERATOR_CHUNK_SIZE) + 2)
+
+        self.assertNumQueries(expected_num_queries, s.delete)
+        self.assertFalse(S.objects.exists())
+        self.assertFalse(T.objects.exists())
+
 
 class FastDeleteTests(TestCase):
 
@@ -377,3 +414,15 @@ class FastDeleteTests(TestCase):
         self.assertNumQueries(2, p.delete)
         self.assertFalse(Parent.objects.exists())
         self.assertFalse(Child.objects.exists())
+
+    def test_fast_delete_large_batch(self):
+        User.objects.bulk_create(User() for i in range(0, 2000))
+        # No problems here - we aren't going to cascade, so we will fast
+        # delete the objects in a single query.
+        self.assertNumQueries(1, User.objects.all().delete)
+        a = Avatar.objects.create(desc='a')
+        User.objects.bulk_create(User(avatar=a) for i in range(0, 2000))
+        # We don't hit parameter amount limits for a, so just one query for
+        # that + fast delete of the related objs.
+        self.assertNumQueries(2, a.delete)
+        self.assertEqual(User.objects.count(), 0)