mirror of
				https://github.com/django/django.git
				synced 2025-10-31 09:41:08 +00:00 
			
		
		
		
	This removes the ability to configure Task enqueueing via a setting,
since the proposed `ENQUEUE_ON_COMMIT` did not support multi-database
setups.
Thanks to Simon Charette for the report.
Follow-up to 4289966d1b.
		
	
		
			
				
	
	
		
			79 lines
		
	
	
		
			1.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			79 lines
		
	
	
		
			1.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import time
 | |
| 
 | |
| from django.tasks import TaskContext, task
 | |
| 
 | |
| 
 | |
| @task()
 | |
| def noop_task(*args, **kwargs):
 | |
|     return None
 | |
| 
 | |
| 
 | |
| @task
 | |
| def noop_task_from_bare_decorator(*args, **kwargs):
 | |
|     return None
 | |
| 
 | |
| 
 | |
| @task()
 | |
| async def noop_task_async(*args, **kwargs):
 | |
|     return None
 | |
| 
 | |
| 
 | |
| @task()
 | |
| def calculate_meaning_of_life():
 | |
|     return 42
 | |
| 
 | |
| 
 | |
| @task()
 | |
| def failing_task_value_error():
 | |
|     raise ValueError("This Task failed due to ValueError")
 | |
| 
 | |
| 
 | |
| @task()
 | |
| def failing_task_system_exit():
 | |
|     raise SystemExit("This Task failed due to SystemExit")
 | |
| 
 | |
| 
 | |
| @task()
 | |
| def failing_task_keyboard_interrupt():
 | |
|     raise KeyboardInterrupt("This Task failed due to KeyboardInterrupt")
 | |
| 
 | |
| 
 | |
| @task()
 | |
| def complex_exception():
 | |
|     raise ValueError(ValueError("This task failed"))
 | |
| 
 | |
| 
 | |
| @task()
 | |
| def complex_return_value():
 | |
|     # Return something which isn't JSON serializable nor picklable.
 | |
|     return lambda: True
 | |
| 
 | |
| 
 | |
| @task()
 | |
| def exit_task():
 | |
|     exit(1)
 | |
| 
 | |
| 
 | |
| @task()
 | |
| def hang():
 | |
|     """
 | |
|     Do nothing for 5 minutes
 | |
|     """
 | |
|     time.sleep(300)
 | |
| 
 | |
| 
 | |
| @task()
 | |
| def sleep_for(seconds):
 | |
|     time.sleep(seconds)
 | |
| 
 | |
| 
 | |
| @task(takes_context=True)
 | |
| def get_task_id(context):
 | |
|     return context.task_result.id
 | |
| 
 | |
| 
 | |
| @task(takes_context=True)
 | |
| def test_context(context, attempt):
 | |
|     assert isinstance(context, TaskContext)
 | |
|     assert context.attempt == attempt
 |