Aweful feelings ...
Engineers' time is more valuable than computing, but many algorithms is hard to make it parallel.
Liang Bo Wang (亮亮), 2014-05-17
From Taiwan R User Group, more info on Meetup.
亮亮 CC 4.0 BY
Esc to overview
← → to navigate
Engineers' time is more valuable than computing, but many algorithms is hard to make it parallel.
Not hard if your tasks are independent. Here we focus on the easiest pattern.
PRIMES = [int(l) for l in open('prime_list.txt')]
def is_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
Validate them one-by-one and benchmark by timeit
def orig_way():
ans = [is_prime(prime) for prime in PRIMES]
return ans
from timeit import timeit
timeit('orig_way()', 'from __main__ import orig_way',
number=3) # return total time of all runs
Multithread, threading
, does not work as expected because of GIL.
(From David Beazley's Understanding GIL)
>>> from validate_prime import is_prime, PRIMES
>>> from multiprocessing import Pool
>>> p = Pool(2) # max concurrent processes
>>> p.map(is_prime, PRIMES[:4])
[True, True, True, True]
>>> # Ctrl-C / Ctrl-Z
Traceback x N ... endless
BOOOOOOOOM!! # NEVER run this interactively
Still not safe in interactive mode. Provide some async APIs, which immediately returns a Future object.
from concurrent.futures import ProcessPoolExecutor
def use_process():
with ProcessPoolExecutor(4) as executor:
all_ans = executor.map(is_prime, PRIMES)
return all_ans # block at leaving `with` block
use_process()
executor = ProcessPoolExecutor()
# every submit returns a future, nonblocking
all_future_ans = [executor.submit(is_prime, p)
for p in PRIMES[:6]]
while not all(fu.done() for fu in all_future_ans):
print('do sth else, waiting')
sleep(1)
# get result from done futures
print([f.result() for f in future_ans])
$ python3 demo_async.py
do sth else, waiting
do sth else, waiting
do sth else, waiting
do sth else, waiting
[True, True, True, True, True, False]
More on the asyncio module in Py 3.4, which makes you escape the callback hell through coroutine(generator) pattern ... like magic.
See TP's talk on day 1 Yielding a Tulip (slide).
# replace ProcessPoolExecutor with ThreadPoolExecutor
def use_thread():
with ThreadPoolExecutor(4) as executor:
ans = executor.map(is_prime, PRIMES)
return ans
You don't gain computation boost by this.
Ways to speed up using standard library so far:
Demo codes are under /demo_code/builtin-cpuheavy
> python3 benchmark.py
# Minimum time average of 3 in 3 repeats
Straight: 42.10
Multithread: 39.47
Multiprocess: 7.98
You don't get a 12x speed up because of forking process overhead
Simply, awesome
Trivial if you setup your Python correctly
python3 -m ensurepip
# or apt-get install python3-pip
# or python3 ez_setup.py from setuptools
pip3 install ipython[all]
Windows? (... get a *unix)
or search Unofficial Windows Binaries for Python
In[1]: from IPython.parallel import Client
rc = Client() # connect controller
In[2]: dview = rc[:] # return a View
$ ipcluster3 start
# [IPClusterStart] Using existing profile dir: '.../profile_default'
# [IPClusterStart] Starting ipcluster with [daemon=False]
# ... (automatically set up controller and engines locally)
Or use IPython Notebook to start
In [1]: from IPython.parallel import Client
rc = Client()
In [2]: rc.ids
Out[2]: [0, 1, 2, 3]
In [3]: dview = rc[:] # DirectView use all engines
In [4]: with dview.sync_imports():
...: import math # import on all engines
In [5]: ar = dview.map_sync(is_prime, PRIMES[:8]) # block
In [6]: ar.get()
Out[6]: [True, True, True, True, True, False, True, True]
In [1]: %%px # run on all engines by ipython magic
...: import numpy as np
...: rand_n = np.random.randint(10, 2)
In [2]: dview['talk'] = 'MLDM Monday' # spread variables
In [3]: dview.scatter('a', list(range(8)))
...: dview['a'] # and collect them back
Out[3]: [[0, 1], [2, 3], [4, 5], [6, 7]]
In [4]: dview.gather('a')
Out[4]: [0, 1, 2, 3, 4, 5, 6, 7]
Or this online IPython notebook (hosted by Notebook Viewer)
You can use ipython notebook and actually run it.
ipython3 notebook
# Luanch IPython notebook server
# on http://localhost:8888/
# ... (terminated by Ctrl-C/Ctrl-Z)
What if we want to add workers dynamically during heavy load?
From Abhishek Tiwari,
AMQP, RabbitMQ and Celery - A Visual Guide For Dummies
# Application under tasks.py
app = Celery('tasks', broker='amqp://guest@localhost//')
@app.task
def is_prime(n):
# ...
# run Celery workers: celery -A tasks worker
# call the task
from tasks import is_prime
result = is_prime.delay(PRIME)
print(result.get())
Builtin | IPy Parallel | Celery | |
---|---|---|---|
High Level APIs | ✔ | ✔ | ✔ |
Spreading data | △ | ✔ | △ |
Interactive mode | ✕ | ✔ | ✔ |
Scale across machines | ✕ | ✔ | ✔ |
Hot add/drop workers | ✕ | △ | ✔ |
Overhead (relatively) | S | M | L |
✔ = trivial; △ = some lines of code; ✕ = plenty lines of code
They can work as partners : )
But that comes with price