IPython Parallel 使用與經驗分享

Liang Bo Wang (亮亮), 2015-04-27

Kaohsiung Python User Group, 2015-04-27

IPython Parallel 使用與經驗分享

亮亮 under CC 4.0 BY license

Esc to overview
to navigate

About Me

TOC (for first half)

Second half will be about my intern at MSRA (won't share the slide) in free discussion form.

Aweful feelings ...

Engineers' time is more valuable than computing, but many algorithms is hard to make it parallel.

That's exactly what I want

Prime number validation

      PRIMES = [112272535095293, 112582705942171, ...]
def is_prime(n):
    if n % 2 == 0:
        return False
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

Straightforward way

Validate them one-by-one and benchmark by timeit

      def orig_way():
    ans = list(map(is_prime, PRIMES))
    # ans = [is_prime(prime) for prime in PRIMES]
    return ans

from timeit import timeit
timeit('orig_way()', 'from __main__ import orig_way',
       number=3)  # return total time of all runs

Using Python
Standard Library

Threads and Process in Python

Multithread, threading, does not work as expected because of GIL.

(From David Beazley's Understanding GIL)

Multiprocess easy and elegant in Python

      >>> from validate_prime import is_prime, PRIMES
>>> from multiprocessing import Pool
>>> p = Pool(2)  # max concurrent processes
>>> p.map(is_prime, PRIMES[:4])
[True, True, True, True]
      >>>  # Ctrl-C / Ctrl-Z
Traceback x N ... endless
BOOOOOOOOM!! # NEVER run this interactively

Use concurrent.futures (Py 3.2+)

Still not safe in interactive mode. Provide some async APIs, which immediately returns a Future object.

      from concurrent.futures import ProcessPoolExecutor
def use_process():
    with ProcessPoolExecutor(4) as executor:
        all_ans = executor.map(is_prime, PRIMES)
        return all_ans  # block at leaving `with` block
      executor = ProcessPoolExecutor(4)
# every submit returns a future, nonblocking
futures = [executor.submit(is_prime, p)
           for p in PRIMES[:6]]

while not all(map(lambda f: f.done(), futures)):
    print('do sth else, waiting')

# get result from done futures
print([f.result() for f in future_ans])
      $ python3 demo_concurrent_async.py
do sth else, waiting
do sth else, waiting
do sth else, waiting
do sth else, waiting
[True, True, True, True, True, False]

More about concurrent.futures.Future on official documentation.

concurrent.future for thread pooling

      # replace ProcessPoolExecutor with ThreadPoolExecutor
def use_thread():
    with ThreadPoolExecutor(4) as executor:
        ans = executor.map(is_prime, PRIMES)
        return ans

You don't gain computation boost by this.

Demo codes

Ways to speed up using standard library so far:

Demo codes are under /demo_code/builtin-cpuheavy

Benchmark on a 12 core machine

      > python3 benchmark.py
# Minimum time average of 3 in 3 repeats
Straight: 42.10
Multithread: 39.47
Multiprocess: 7.98

You don't get a 12x speed up because of the forking process overhead.

So you forgot the asyncio module

Summary for built-in modules

IPython Parallel

Features of IPython Parallel

Simply, awesome.


Trivial for Python 3.4+, which comes along with pip.

# For Debian/Ubuntu, apt-get install ipython3-notebook
pip3 install ipython[all]

Windows? Get a *unix system.

Try Anaconda(Miniconda) scientific Python solution, which automatically manages both python and external dependencies.

(Scientific) Python setup on Windows

    conda install ipython-notebook
# or inside a conda virtual env,
conda create -n ipy python=3.4 ipython-notebook
activate ipy


      In [1]: from IPython.parallel import Client
   ...: rc = Client() # connect controller
In [2]: dview = rc[:] # return a View

Adapted from the official docs

Start IPython Cluster

      $ ipcluster start
# [IPClusterStart] Using existing profile dir: '.../profile_default'
# [IPClusterStart] Starting ipcluster with [daemon=False]
# ... (automatically set up controller and engines locally)

Or use IPython Notebook to start

      In [1]: from IPython.parallel import Client
   ...: rc = Client()
In [2]: rc.ids
Out[2]: [0, 1, 2, 3]
In [3]: dview = rc[:]  # DirectView use all engines
In [4]: with dview.sync_imports():
   ...:     import math  # import on all engines
In [5]: ar = dview.map_sync(is_prime, PRIMES[:8]) # block
In [6]: ar.get()
Out[6]: [True, True, True, True, True, False, True, True]
# (continued on next page)
      In [1]: %%px  # run on all engines by ipython magic
   ...: import numpy as np
   ...: rand_n = np.random.randint(10, 2)
In [2]: dview['talk'] = 'Kaohsiung.py'  # spread variables
In [3]: dview.scatter('a', list(range(8)))
   ...: dview['a']   # and collect them back
Out[3]: [[0, 1], [2, 3], [4, 5], [6, 7]]
In [4]: dview.gather('a')  # in merged way
Out[4]: [0, 1, 2, 3, 4, 5, 6, 7]

Get our hands wet

Try this online IPython notebook (hosted by Notebook Viewer)

Download it as an ipython notebook and execute it.

      ipython notebook
# Luanch IPython notebook server
# on http://localhost:8888/
# ... (terminated by Ctrl-C/Ctrl-Z)

Now more about the IPy Parallel

Decorator View.parallel wraps your function as a parallel task runner, which works the same as previous way.

def my_work(arg1, arg2):

my_work.map(all_input1, all_input2)

Different types of views - load balancer

View we used before was DirectView, a one-to-one mapping of all available engines. Input tasks was first split then passed to engines before execution.

Another view called LoadBalancedView passes the input tasks to idle engines on the fly to prevent task congestion in busy engines.

      rc = Client()
lbview = rc.load_balanced_view()

How load balance helps the parallelization

Visualize and demo in this notebook.

More Python packages required: pandas, matplotlib, and seaborn. On Anaconda, the installation is as trivial as one single command.

conda install pandas matplotlib seaborn

Task execution - direct view

A deliberate worst case for preallocated task.
Optimally should only take around 3 seconds.

Task execution - load balanced

Note the space between adjacent tasks.
If tasks of different engines complete at the same time, they stocked at the load balancer so a longer pause required. (see randomized work for comparison)

Random task execution - direct view

All engines run the exactly same number of tasks (shown by color)

Random task execution - load balanced

Some engines run fewer tasks if their previous work load are heavier. Pause space are shorter since they return randomly.

IPy parallel across machines

More detailed guide: PyBroMo's wiki.

    ipython profile create --parallel --profile=parallel
# on master
ipcontroller --profile=parallel
# on slave(s)
ipcluster engines --profile=parallel \
# or modify path of `c.IPEngineApp.url_file` in
# ~\.ipython\profile_parallel\ipengine_config.py 

Then use it normally by rc = Client(profile='parallel')


  • Scale on 16(64) cores machine(s).

Summary for IPy Parallel

What if we want to add workers dynamically during heavy load?


Celery Intro

Celery is an asynchronous distributed task queue. RabbitMQ is a message broker which implements the Advanced Message Queuing Protocol (AMQP)

Abhishek Tiwari, AMQP, RabbitMQ and Celery - A Visual Guide For Dummies


From Abhishek Tiwari,
AMQP, RabbitMQ and Celery - A Visual Guide For Dummies

    # Application under tasks.py
app = Celery('tasks', broker='amqp://[email protected]//')
def is_prime(n):
    # ...
# run Celery workers: celery -A tasks worker

# call the task
from tasks import is_prime
result = is_prime.delay(PRIME)


builtin ipy parallel celery
high level apis
spreading data
interactive mode
scale across machines
hot add/drop workers
overhead (relatively) s m l

✔ = trivial; △ = some lines of code; ✕ = plenty lines of code
they can work as partners : )

How is our naive parallelism?

But that comes with price

Take-home message

Thank You!

Fork me on Github