Django Task Queue: Django-Q

Liang Bo Wang (亮亮), 2016-02-25

Django-Q
Django Task Queue

By Liang2 under CC 4.0 BY license

Esc to overview
to navigate

Async task on server side

Actions that take > 1 second. Common ones:

You don't want to keep user awaited, hence async.
All tasks can happen simultaneously. You need a job queue.

Django-Q: a task queue for Django

Django Q is a native Django task queue, scheduler and worker application using Python multiprocessing.

Django Q documentation

Django-Q features

Django-Q guarantees at least once successful task delivery

Django-Q Tutorial

Let's open a fruits shop!

In Django and Django-Q

Initiate our Django project

			pip install django django-q

# start the project
django-admin startproject djangoq_demo
django-admin startapp fruit_shop
		

Add 'django_q' in INSTALLED_APPS and run the migration.

Django-Q setting

Use Django ORM as result storage and broker.

# In django_demo/settings.py
Q_CLUSTER = {
    'name': 'DjangoORM',
    'timeout': 1200,	# Timeout in secs for a task
    'save_limit': 10,	# Store latest 10 results only
    'catch_up': False,	# Ignore un-run scheduled tasks
    'orm': 'default'	# Django database connection
}
		

Django cache framework

Maybe caching is is not required, but recommended.

# In django_demo/settings.py
CACHES = {
    'default': {
		'BACKEND': \
			'django.core.cache.backends.locmem.LocMemCache',
        'LOCATION': 'djangoq-localmem',
    }
		

All settings are done now :)

Writing our tasks

Django-Q task is normal Python function.

# fruit_shop/tasks.py
import time
def order_fruit(fruit, num_fruit):
    time.sleep(num_fruit)   # e.g. 2 apples take 2 secs
    return '%s_%s' % (fruit, num_fruit)
		

To call your async task

from django_q.tasks import async, result
task_id = async(
    'fruit_shop.tasks.order_fruit',  # func full name
    fruit=fruit, num_fruit=num_fruit
)

ret = result(task_id)  # get result, None if yet run
ret = result(task_id, 200)  # block and wait for 200 ms

		

We have a functional task queue!

Add the form and view to be used by our customers.

<form action="{% url 'order' %}" method="post">
	<select class="form-control" name="fruit_type">
		<option>Apple</option> <option>Banana</option>
	</select>
	<input type="number" name="num_fruit" min="1" max="5">
	<button type="submit">Send invitation</button>
	{% csrf_token %}
</form>
	
from django_q.humanhash import humanize
def order(request):
    fruit = request.POST.get('fruit_type', '')
    num_fruit = int(request.POST.get('num_fruit', '1'))
    task_id = async(...)  # Create async task
    messages.info(  # Django message as notification
        request,
        'You ordered {fruit:s} x {num_fruit:d} (task: {task})'
        .format(..., task=humanize(task_id))
    )

Start the task cluster

$ python manage.py qcluster
09:19:59 [Q] INFO Q Cluster-87972 starting.
09:19:59 [Q] INFO Process-1:1 ready for work at 87974
# ...
09:19:59 [Q] INFO Process-1:5 monitoring at 87978
09:19:59 [Q] INFO Process-1 guarding cluster at 87973
09:19:59 [Q] INFO Process-1:6 pushing tasks at 87979
09:19:59 [Q] INFO Q Cluster-87972 running.

How to see my result / queue / ...?

Through the all-mighty Django admin. (demo)

Scheduling

Tutorial Summary

More Django-Q

How the task is store in the database?

More informative demo by ORM

from django_q.models import OrmQ
from django_q.tasks import Task
# Select orders in queue
queue_orders = OrmQ.objects.all().order_by('lock')

# Select finished orders
complete_orders = Task.objects.all().filter(
    func__exact='fruit_shop.tasks.order_fruit',
)
		
{% for task in complete_orders %}
	<tr>
		<td>{{ task.name }}</td>
		<td>{{ task.kwargs.fruit }}</td>
		<td>{{ task.kwargs.num_fruit }}</td>
		<td>{{ task.result }}</td>
		<td>{{ task.time_taken }}</td>
		<td>{{ task.stopped|naturaltime }}</td>
	</tr>
{% endfor %}

Monitoring

python manage.py qinfo

python manage.py qmonitor

Tasks can be more complicated

Many examples can be found in the Examples, Documentation.

What if I want to run A --> B --> C --> D, but if C fails, run E then D, and when B fails, run A again with different parameter A' and skip C, .... Should I write it down in a flow chart?

Complex task, use Celery instead

Or even more complex framework like Luigi to resovle task dependency, show visualization, and handle complex failures.

Django-Q vs Celery:

Celery Intro

Celery is an asynchronous distributed task queue. RabbitMQ is a message broker which implements the Advanced Message Queuing Protocol (AMQP)

Abhishek Tiwari, AMQP, RabbitMQ and Celery - A Visual Guide For Dummies

Celery Overview

From Abhishek Tiwari,
AMQP, RabbitMQ and Celery - A Visual Guide For Dummies

Django-Q
Overview

Python
官方文件
中文化

Live running Django-Q example

What does the site do?

Basically it's about calling a lot of shell commands.

@contextmanager
def cd(newdir):
    """Context manager for changing working directory.
	Ref: http://stackoverflow.com/a/24176022
    """
    prevdir = os.getcwd()
    os.chdir(newdir)
    try:
        yield
    finally:
        os.chdir(prevdir)
import subprocess as sp
def run_command_under_doc_root(cmd, catched=True):
    with cd(newdir=settings.PYDOC_ROOT):
        if catched:
			process = sp.run(
				cmd,
				stdout=sp.PIPE, stderr=sp.PIPE
			)
        else:
            process = sp.run(cmd)
		return process
	
def git_add_commit_push():
    git_processes = OrderedDict()
    commit_msg = 'Update translation (auto daily)'
    commands = OrderedDict([
        ('git_add', [GIT, 'add', 'locale/*']),
        ('git_commit', [GIT, 'commit', '-m', commit_msg]),
        ('git_push', [GIT, 'push']),
    ])
    for cmd_name, cmd in commands.items():
		git_processes[cmd_name] = \
			run_command_under_doc_root(cmd)
    return git_processes

	

使用心得

?

Sources on GitHub