Liang Bo Wang (亮亮), 2016-02-25
By Liang2 under CC 4.0 BY license
Esc to overview
← → to navigate
Actions that take > 1 second. Common ones:
You don't want to keep user awaited, hence async.
All tasks can happen simultaneously. You need a job queue.
Django-Q guarantees at least once successful task delivery
In Django and Django-Q
pip install django django-q
# start the project
django-admin startproject djangoq_demo
django-admin startapp fruit_shop
Add 'django_q'
in INSTALLED_APPS
and run the migration.
Use Django ORM as result storage and broker.
# In django_demo/settings.py
Q_CLUSTER = {
'name': 'DjangoORM',
'timeout': 1200, # Timeout in secs for a task
'save_limit': 10, # Store latest 10 results only
'catch_up': False, # Ignore un-run scheduled tasks
'orm': 'default' # Django database connection
}
Maybe caching is is not required, but recommended.
# In django_demo/settings.py
CACHES = {
'default': {
'BACKEND': \
'django.core.cache.backends.locmem.LocMemCache',
'LOCATION': 'djangoq-localmem',
}
All settings are done now :)
Django-Q task is normal Python function.
# fruit_shop/tasks.py
import time
def order_fruit(fruit, num_fruit):
time.sleep(num_fruit) # e.g. 2 apples take 2 secs
return '%s_%s' % (fruit, num_fruit)
from django_q.tasks import async, result
task_id = async(
'fruit_shop.tasks.order_fruit', # func full name
fruit=fruit, num_fruit=num_fruit
)
ret = result(task_id) # get result, None if yet run
ret = result(task_id, 200) # block and wait for 200 ms
Add the form and view to be used by our customers.
<form action="{% url 'order' %}" method="post">
<select class="form-control" name="fruit_type">
<option>Apple</option> <option>Banana</option>
</select>
<input type="number" name="num_fruit" min="1" max="5">
<button type="submit">Send invitation</button>
{% csrf_token %}
</form>
from django_q.humanhash import humanize
def order(request):
fruit = request.POST.get('fruit_type', '')
num_fruit = int(request.POST.get('num_fruit', '1'))
task_id = async(...) # Create async task
messages.info( # Django message as notification
request,
'You ordered {fruit:s} x {num_fruit:d} (task: {task})'
.format(..., task=humanize(task_id))
)
$ python manage.py qcluster
09:19:59 [Q] INFO Q Cluster-87972 starting.
09:19:59 [Q] INFO Process-1:1 ready for work at 87974
# ...
09:19:59 [Q] INFO Process-1:5 monitoring at 87978
09:19:59 [Q] INFO Process-1 guarding cluster at 87973
09:19:59 [Q] INFO Process-1:6 pushing tasks at 87979
09:19:59 [Q] INFO Q Cluster-87972 running.
Through the all-mighty Django admin. (demo)
schedule(...)
async
manage.py qcluster
sign
.subprocess.run()
and CompletedProcess
from django_q.models import OrmQ
from django_q.tasks import Task
# Select orders in queue
queue_orders = OrmQ.objects.all().order_by('lock')
# Select finished orders
complete_orders = Task.objects.all().filter(
func__exact='fruit_shop.tasks.order_fruit',
)
{% for task in complete_orders %}
<tr>
<td>{{ task.name }}</td>
<td>{{ task.kwargs.fruit }}</td>
<td>{{ task.kwargs.num_fruit }}</td>
<td>{{ task.result }}</td>
<td>{{ task.time_taken }}</td>
<td>{{ task.stopped|naturaltime }}</td>
</tr>
{% endfor %}
python manage.py qinfo
python manage.py qmonitor
Many examples can be found in the Examples, Documentation.
What if I want to run A --> B --> C --> D, but if C fails, run E then D, and when B fails, run A again with different parameter A' and skip C, .... Should I write it down in a flow chart?
Or even more complex framework like Luigi to resovle task dependency, show visualization, and handle complex failures.
Django-Q vs Celery:
From Abhishek Tiwari,
AMQP, RabbitMQ and Celery - A Visual Guide For Dummies
Basically it's about calling a lot of shell commands.
@contextmanager
def cd(newdir):
"""Context manager for changing working directory.
Ref: http://stackoverflow.com/a/24176022
"""
prevdir = os.getcwd()
os.chdir(newdir)
try:
yield
finally:
os.chdir(prevdir)
import subprocess as sp
def run_command_under_doc_root(cmd, catched=True):
with cd(newdir=settings.PYDOC_ROOT):
if catched:
process = sp.run(
cmd,
stdout=sp.PIPE, stderr=sp.PIPE
)
else:
process = sp.run(cmd)
return process
def git_add_commit_push():
git_processes = OrderedDict()
commit_msg = 'Update translation (auto daily)'
commands = OrderedDict([
('git_add', [GIT, 'add', 'locale/*']),
('git_commit', [GIT, 'commit', '-m', commit_msg]),
('git_push', [GIT, 'push']),
])
for cmd_name, cmd in commands.items():
git_processes[cmd_name] = \
run_command_under_doc_root(cmd)
return git_processes