# IPython Parallel 使用與經驗分享

Liang Bo Wang (亮亮), 2015-04-27

## IPython Parallel 使用與經驗分享

Esc to overview
to navigate

• 亮亮 / liang2 / Liang Bo Wang
• Master student at
Bioinfo & Biostat Core Lab, NTU CGM
• Learn to speak DNA
• R / Python
• Taiwan R co-organizer
• PyCon APAC 2014-15/TW staff
• Former intern at Microsoft Research Asia

## TOC (for first half)

• multiprocessing: batteries included standard modules
• IPython Parallel: debuggable and interactive
• Celery: distributed task queue (no experience)
• Experience sharing
• Summary

Second half will be about my intern at MSRA (won't share the slide) in free discussion form.

## Aweful feelings ...

Engineers' time is more valuable than computing, but many algorithms is hard to make it parallel.

## Prime number validation

      PRIMES = [112272535095293, 112582705942171, ...]
def is_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True


## Straightforward way

Validate them one-by-one and benchmark by timeit

      def orig_way():
ans = list(map(is_prime, PRIMES))
# ans = [is_prime(prime) for prime in PRIMES]
return ans

from timeit import timeit
timeit('orig_way()', 'from __main__ import orig_way',
number=3)  # return total time of all runs


## Threads and Process in Python

Multithread, threading, does not work as expected because of GIL.

(From David Beazley's Understanding GIL)

## Multiprocess easy and elegant in Python

      >>> from validate_prime import is_prime, PRIMES
>>> from multiprocessing import Pool
>>> p = Pool(2)  # max concurrent processes
>>> p.map(is_prime, PRIMES[:4])
[True, True, True, True]

      >>>  # Ctrl-C / Ctrl-Z
Traceback x N ... endless
BOOOOOOOOM!! # NEVER run this interactively


## Use concurrent.futures (Py 3.2+)

Still not safe in interactive mode. Provide some async APIs, which immediately returns a Future object.

      from concurrent.futures import ProcessPoolExecutor
def use_process():
with ProcessPoolExecutor(4) as executor:
all_ans = executor.map(is_prime, PRIMES)
return all_ans  # block at leaving with block
use_process()

      executor = ProcessPoolExecutor(4)
# every submit returns a future, nonblocking
futures = [executor.submit(is_prime, p)
for p in PRIMES[:6]]

while not all(map(lambda f: f.done(), futures)):
print('do sth else, waiting')
sleep(1)

# get result from done futures
print([f.result() for f in future_ans])

      $python3 demo_concurrent_async.py do sth else, waiting do sth else, waiting do sth else, waiting do sth else, waiting [True, True, True, True, True, False] More about concurrent.futures.Future on official documentation. ## concurrent.future for thread pooling  # replace ProcessPoolExecutor with ThreadPoolExecutor def use_thread(): with ThreadPoolExecutor(4) as executor: ans = executor.map(is_prime, PRIMES) return ans  You don't gain computation boost by this. ## Demo codes Ways to speed up using standard library so far: Demo codes are under /demo_code/builtin-cpuheavy ## Benchmark on a 12 core machine  > python3 benchmark.py # Minimum time average of 3 in 3 repeats Straight: 42.10 Multithread: 39.47 Multiprocess: 7.98  You don't get a 12x speed up because of the forking process overhead. ## So you forgot the asyncio module ## Summary for built-in modules • Easy to write • High level APIs available • Fail in interactive mode Bad because that's the way we do data mangling • Limited Scalability Simply don't scale, and it is hard to find a 100 core server • Use multithread for IO bound; multiprocess for CPU bound tasks ## Introducing IPython Parallel ## Features of IPython Parallel • Get all the goodness of IPython • Both interactive and parallel. Highly useful in data analysis • Scale across multiple machines • Support MPI, PBS, and SGE common schemes Simply, awesome. ## Setup Trivial for Python 3.4+, which comes along with pip. # For Debian/Ubuntu, apt-get install ipython3-notebook pip3 install ipython[all]  Windows? Get a *unix system. Try Anaconda(Miniconda) scientific Python solution, which automatically manages both python and external dependencies. ## (Scientific) Python setup on Windows • If you can't come up a solution, use Miniconda / Anaconda. • I wrote a small installation note on gist.  conda install ipython-notebook # or inside a conda virtual env, conda create -n ipy python=3.4 ipython-notebook activate ipy  ## Overview  In [1]: from IPython.parallel import Client ...: rc = Client() # connect controller In [2]: dview = rc[:] # return a View  Adapted from the official docs ## Start IPython Cluster  $ ipcluster start
# [IPClusterStart] Using existing profile dir: '.../profile_default'
# [IPClusterStart] Starting ipcluster with [daemon=False]
# ... (automatically set up controller and engines locally)


Or use IPython Notebook to start

      In [1]: from IPython.parallel import Client
...: rc = Client()
In [2]: rc.ids
Out[2]: [0, 1, 2, 3]
In [3]: dview = rc[:]  # DirectView use all engines
In [4]: with dview.sync_imports():
...:     import math  # import on all engines
In [5]: ar = dview.map_sync(is_prime, PRIMES[:8]) # block
In [6]: ar.get()
Out[6]: [True, True, True, True, True, False, True, True]
# (continued on next page)

      In [1]: %%px  # run on all engines by ipython magic
...: import numpy as np
...: rand_n = np.random.randint(10, 2)
In [2]: dview['talk'] = 'Kaohsiung.py'  # spread variables
In [3]: dview.scatter('a', list(range(8)))
...: dview['a']   # and collect them back
Out[3]: [[0, 1], [2, 3], [4, 5], [6, 7]]
In [4]: dview.gather('a')  # in merged way
Out[4]: [0, 1, 2, 3, 4, 5, 6, 7]


## Get our hands wet

Try this online IPython notebook (hosted by Notebook Viewer)

      ipython notebook
# Luanch IPython notebook server
# on http://localhost:8888/
# ... (terminated by Ctrl-C/Ctrl-Z)


## Now more about the IPy Parallel

Decorator View.parallel wraps your function as a parallel task runner, which works the same as previous way.

      @dview.parallel(block=False)
def my_work(arg1, arg2):
pass

my_work.map(all_input1, all_input2)


## Different types of views - load balancer

View we used before was DirectView, a one-to-one mapping of all available engines. Input tasks was first split then passed to engines before execution.

Another view called LoadBalancedView passes the input tasks to idle engines on the fly to prevent task congestion in busy engines.

      rc = Client()
lbview = rc.load_balanced_view()


## How load balance helps the parallelization

Visualize and demo in this notebook.

More Python packages required: pandas, matplotlib, and seaborn. On Anaconda, the installation is as trivial as one single command.

conda install pandas matplotlib seaborn

## Task execution - direct view

A deliberate worst case for preallocated task.
Optimally should only take around 3 seconds.

If tasks of different engines complete at the same time, they stocked at the load balancer so a longer pause required. (see randomized work for comparison)

## Random task execution - direct view

All engines run the exactly same number of tasks (shown by color)

Some engines run fewer tasks if their previous work load are heavier. Pause space are shorter since they return randomly.

## IPy parallel across machines

• Previously we use ipcluster to start the cluster.
• Now break it down to ipcontroller and ipcluster engines.
• IPy controller creates a connection info file at
<profile_dir>/security/ipcontroller-engine.json
• Engines read that file to know how to connect the controller (e.g. which ip and port).
• To sum up, (1) launch controller; (2) launch engines (may on different machines); (3) connect by calling Client().

More detailed guide: PyBroMo's wiki.

    ipython profile create --parallel --profile=parallel
# on master
ipcontroller --profile=parallel
# on slave(s)
ipcluster engines --profile=parallel \
--file=/path/to/ipcontroller-engine.json
# or modify path of c.IPEngineApp.url_file in
# ~\.ipython\profile_parallel\ipengine_config.py 


Then use it normally by rc = Client(profile='parallel')

## Experience

• Scale on 16(64) cores machine(s).

## Summary for IPy Parallel

• Awesome.
• Provides more scalability (cross machine) and flexbility (interactive) through network connections
• Passing object may bring up issues like pickling, but IPython handles most of them correctly.

## Celery Intro

• Messages first produced by starting a new task by producer
• Messages then passed to Celery Worker (consumers, can be many)
• RabbitMQ deals with the message queue;

## Overview

From Abhishek Tiwari,
AMQP, RabbitMQ and Celery - A Visual Guide For Dummies

    # Application under tasks.py
def is_prime(n):
# ...
# run Celery workers: celery -A tasks worker

result = is_prime.delay(PRIME)
print(result.get())

## summary

builtin ipy parallel celery
high level apis
interactive mode
scale across machines

✔ = trivial; △ = some lines of code; ✕ = plenty lines of code
they can work as partners : )

## How is our naive parallelism?

• Easy to use; clean APIs; boost your computation by some factor
• Can come up the parallel strategy by try-n-error
• Heavy jobs (that run fews day) would have more significant performance improvement

But that comes with price

• Copy objects a lot → use database
• Memory usage → buy memory XD
• Unaccepted overhead sometimes → avoid this pattern

## Take-home message

• Put these tools under your pillow