Handy Parallel(Distributed) Computing in Python

Liang Bo Wang (亮亮), 2014-05-17

From Taiwan R User Group, more info on Meetup.

PyCon APAC 2014, 2014-05-17

Handy Parallel(Distributed) Computing in Python

亮亮 CC 4.0 BY

Esc to overview
to navigate

About Me

TOC

Aweful feelings ...

Engineers' time is more valuable than computing, but many algorithms is hard to make it parallel.

That's exactly what I want

Not hard if your tasks are independent. Here we focus on the easiest pattern.

Prime number validation

PRIMES = [int(l) for l in open('prime_list.txt')]
def is_prime(n):
    if n % 2 == 0:
        return False
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

Straightforward way

Validate them one-by-one and benchmark by timeit

def orig_way():
    ans = [is_prime(prime) for prime in PRIMES]
    return ans

from timeit import timeit
timeit('orig_way()', 'from __main__ import orig_way',
       number=3)  # return total time of all runs

Using Python
Standard Library

Threads and Process in Python

Multithread, threading, does not work as expected because of GIL.

(From David Beazley's Understanding GIL)

Multiprocess easy and elegant in Python

>>> from validate_prime import is_prime, PRIMES
>>> from multiprocessing import Pool
>>> p = Pool(2) # max concurrent processes
>>> p.map(is_prime, PRIMES[:4])
[True, True, True, True]
>>> # Ctrl-C / Ctrl-Z
Traceback x N ... endless
BOOOOOOOOM!! # NEVER run this interactively

Use concurrent.future (Py 3.2+)

Still not safe in interactive mode. Provide some async APIs, which immediately returns a Future object.

from concurrent.futures import ProcessPoolExecutor
def use_process():
    with ProcessPoolExecutor(4) as executor:
        all_ans = executor.map(is_prime, PRIMES)
        return all_ans  # block at leaving `with` block
use_process()
executor = ProcessPoolExecutor()
# every submit returns a future, nonblocking
all_future_ans = [executor.submit(is_prime, p)
              for p in PRIMES[:6]]

while not all(fu.done() for fu in all_future_ans):
    print('do sth else, waiting')
    sleep(1)

# get result from done futures
print([f.result() for f in future_ans])
$ python3 demo_async.py
do sth else, waiting
do sth else, waiting
do sth else, waiting
do sth else, waiting
[True, True, True, True, True, False]

More on the asyncio module in Py 3.4, which makes you escape the callback hell through coroutine(generator) pattern ... like magic.
See TP's talk on day 1 Yielding a Tulip (slide).

concurrent.future for thread pooling

# replace ProcessPoolExecutor with ThreadPoolExecutor
def use_thread():
    with ThreadPoolExecutor(4) as executor:
        ans = executor.map(is_prime, PRIMES)
        return ans
        

You don't gain computation boost by this.

Demo codes

Ways to speed up using standard library so far:

Demo codes are under /demo_code/builtin-cpuheavy

Benchmark on a 12 core machine

> python3 benchmark.py
# Minimum time average of 3 in 3 repeats
Straight: 42.10
Multithread: 39.47
Multiprocess: 7.98
        

You don't get a 12x speed up because of forking process overhead

Summary for built-in modules

Introducing
IPython Parallel

Features of IPython Parallel

Simply, awesome

Setup

Trivial if you setup your Python correctly

python3 -m ensurepip
# or apt-get install python3-pip
# or python3 ez_setup.py from setuptools
pip3 install ipython[all]
        

Windows? (... get a *unix)
or search Unofficial Windows Binaries for Python

Overview

In[1]: from IPython.parallel import Client
       rc = Client() # connect controller
In[2]: dview = rc[:] # return a View

Start IPython Cluster

$ ipcluster3 start
# [IPClusterStart] Using existing profile dir: '.../profile_default'
# [IPClusterStart] Starting ipcluster with [daemon=False]
# ... (automatically set up controller and engines locally)

Or use IPython Notebook to start

In [1]: from IPython.parallel import Client
        rc = Client()
In [2]: rc.ids
Out[2]: [0, 1, 2, 3]
In [3]: dview = rc[:]  # DirectView use all engines
In [4]: with dview.sync_imports():
   ...:     import math  # import on all engines
In [5]: ar = dview.map_sync(is_prime, PRIMES[:8]) # block
In [6]: ar.get()
Out[6]: [True, True, True, True, True, False, True, True]
In [1]: %%px  # run on all engines by ipython magic
   ...: import numpy as np
   ...: rand_n = np.random.randint(10, 2)
In [2]: dview['talk'] = 'MLDM Monday'   # spread variables
In [3]: dview.scatter('a', list(range(8)))
   ...: dview['a']   # and collect them back
Out[3]: [[0, 1], [2, 3], [4, 5], [6, 7]]
In [4]: dview.gather('a')
Out[4]: [0, 1, 2, 3, 4, 5, 6, 7]

More in demo code

Or this online IPython notebook (hosted by Notebook Viewer)

You can use ipython notebook and actually run it.

ipython3 notebook
# Luanch IPython notebook server
# on http://localhost:8888/
# ... (terminated by Ctrl-C/Ctrl-Z)

Summary for IPy Parallel

What if we want to add workers dynamically during heavy load?

Introducing
Celery

Celery Intro

Celery is an asynchronous distributed task queue. RabbitMQ is a message broker which implements the Advanced Message Queuing Protocol (AMQP)

Abhishek Tiwari, AMQP, RabbitMQ and Celery - A Visual Guide For Dummies

Overview

From Abhishek Tiwari,
AMQP, RabbitMQ and Celery - A Visual Guide For Dummies

       # Application under tasks.py
app = Celery('tasks', broker='amqp://guest@localhost//')
@app.task
def is_prime(n):
    # ...
# run Celery workers: celery -A tasks worker

# call the task
from tasks import is_prime
result = is_prime.delay(PRIME)
print(result.get())

Summary

Builtin IPy Parallel Celery
High Level APIs
Spreading data
Interactive mode
Scale across machines
Hot add/drop workers
Overhead (relatively) S M L

✔ = trivial; △ = some lines of code; ✕ = plenty lines of code
They can work as partners : )

How is our naive parallelism?

But that comes with price

Take-home message

Thank You!

Fork me on Github