Aiohttp Usage

Liang Bo Wang (亮亮), 2015-07-14

Python Web Meetup, 2015-07-14

Aiohttp Usage

By Liang2 under CC 4.0 BY license

Esc to overview
to navigate

About Me

這幾天在分析 ElasticSearch 結果

雖然 ES 很快一個 query 約 15ms,十萬次也要半個小時。耍廢半小時?

所以想到用個非同步打 ES 好了。mutlithreading / async?

        from concurrent.futures import ThreadPoolExecutor

def search(query, url='/search'):
    r = requests.request('GET', url, params={'q': query})
    quote_json = r.json()
    return quote_json

with ThreadPoolExecutor(max_workers) as executor:
    results = executor.map(search, all_queries)
      

For Python 2.7, futures is the backport package for concurrent.futures

Multithreading 不夠潮

Event loop / asyncio 是啥?可以吃嗎?

情境測試:Random Quoter

python 0_parse_1984.py
python random_quote_app.py [--help|--num_process=1]

用起來大概像這樣

$ curl -s -XGET "http://localhost:5566/quote" | \
  python -m json.tool  # prettify
{
    "quote": "   WAR IS PEACE   FREEDOM IS SLAVERY
                 IGNORANCE IS STRENGTH",
    "source": {
        "chapter": 1,
        "part": 1
    }
}

$ curl -s -XGET "localhost:5566/quote/uniform?slow=true&part=3"
# pause for 500ms then return
{
    "quote": "'In memory. Very well, then. We, the Party,
              control all records, and we control all memories.
              Then we control the past, do we not?'",
    "source": {
        "chapter": 2,
        "part": 3
    }
}

Asycnio (http)
Basics

最基本 asyncio 架構:一個 coroutine

@asyncio.coroutine
def quote_simple(url='/quote/uniform'):
    r = yield from aiohttp.request('GET', url)
    quote_json = yield from r.json()
    return quote_json['quote']

loop = asyncio.get_event_loop()
quote = loop.run_until_complete(quote_simple())

本來使用 requests 長這樣


def quote_block(url='/quote/uniform'):
    r = requests.request('GET', url)
    quote_json = r.json()
    return quote_json['quote']


quote = quote_block()

Note

  • requests 中比較常用 requests.get(...)
  • aiohttp 0.17+ 會有對應 aiohttp.get(...)

最基本 asyncio 架構:一個 coroutine

@asyncio.coroutine
def quote_simple(url='/quote/uniform'):
    r = yield from aiohttp.request('GET', url)
    quote_json = yield from r.json()
    return quote_json['quote']

loop = asyncio.get_event_loop()
quote = loop.run_until_complete(quote_simple())

最基本 asyncio 架構:一個 coroutine

@asyncio.coroutine
def quote_simple(url='/quote/uniform'):
    r = yield from aiohttp.request('GET', url)
    quote_json = yield from r.json()
    return quote_json['quote']

loop = asyncio.get_event_loop()
quote = loop.run_until_complete(quote_simple())

Coroutine / Task

多個 requests 情況

asyncio.gather(*coro) 同時放入多個 coroutines 進 loop,
等他們結束,並依序收集回傳值。

@asyncio.coroutine
def quote_many(num_quotes=1):
    coroutines = [
        quote_simple() for i in range(num_quotes)
    ]
    quotes = yield from asyncio.gather(*coroutines)
    return quotes

一發不可拾

tcp 0 0   127.0.0.1:5566    127.0.0.1:45236  ESTABLISHED 5991/python
tcp 0 0   127.0.0.1:45213   127.0.0.1:5566   ESTABLISHED 6680/python
tcp 0 0   127.0.0.1:45168   127.0.0.1:5566   TIME_WAIT   -
tcp 0 0   127.0.0.1:44763   127.0.0.1:5566   TIME_WAIT   -
tcp 0 0   127.0.0.1:45772   127.0.0.1:5566   TIME_WAIT   -
tcp 0 0   127.0.0.1:44741   127.0.0.1:5566   TIME_WAIT   -

Cont'd

本來的 quote_simple

@asyncio.coroutine
def quote_simple(url='/quote/uniform'):
    r = yield from aiohttp.request('GET', url)
    quote_json = yield from r.json()
    return quote_json['quote']

Coroutine with Semaphore

@asyncio.coroutine
def quote_with_lock(semaphore, url='/quote/uniform'):
    with (yield from semaphore):
        r = yield from aiohttp.request('GET', url)
        quote_json = yield from r.json()
    return quote_json['quote']

with ... 進入時 acquire,出來時 release 來控制總活動連線數

@asyncio.coroutine
def quote_many(num_quotes=1, conn_limit=20):
    semaphore = asyncio.Semaphore(conn_limit)
    coroutines = [
        quote_with_lock(semaphore)
        for i in range(num_quotes)
    ]
    quotes = yield from asyncio.gather(*coroutines)
    return quotes
quotes = loop.run_until_complete(
      quote_many(2000, conn_limit=100))

這樣就完成了

所以我說那個進


底下那根東西一直會動就是潮

進度條真的是好處多多

不考慮原本執行順序的話

可以考慮用 asyncio.as_completed(coroutines)
它會把執行結束的 coroutine 依完成先後丟回對應的 Future

quotes, step = [], 10
for i, fut in enumerate(asyncio.as_completed(coros), 1):
    if i % step == 0:
        progress.value += 1  # or progress.next()
    q = yield from fut  # get the result of future
    quotes.append(q)

考慮執行順序的話:方法一之
包成 Task 呼叫 .result()

futures = [
    asyncio.ensure_future(quote_with_lock(semaphore))
    for i in range(num_quotes)
]
for i, fut in enumerate(asyncio.as_completed(futures), 1):
    if i % step == 0:
        progress.value += 1
    yield from fut  # don't store
quotes = [fut.result() for fut in futures]

方法二之 callback

done_tasks = 0
def progress_adder(fut):
    nonlocal done_tasks
    done_tasks += 1
    if done_tasks % step == 0:
        progress.value += 1

自己做一個 callback function progress_adder 負責增加 progress

(續)包成 Task 加上 callback

futures = []  # wrap coroutines as Tasks
for i in range(num_quotes):
    task = asyncio.Task(quote_with_lock(semaphore))
    task.add_done_callback(progress_adder)
    futures.append(task)

quotes = yield from (asyncio.gather(*futures))
@asyncio.coroutine
def quote_many(num_quotes, conn_limit, progress, step):
    semaphore = asyncio.Semaphore(conn_limit)
    done_tasks = 0
    def progress_adder(fut):
        nonlocal done_tasks
        done_tasks += 1
        if done_tasks % step == 0:
            progress.value += 1

    futures = []
    for i in range(num_quotes):
        task = asyncio.Task(quote_with_lock(semaphore))
        task.add_done_callback(progress_adder)
        futures.append(task)

    quotes = yield from (asyncio.gather(*futures))
    return quotes

Demo: Console Progressbar

Console Progressbar Demo

Or view it on gfycat

pip 6.0+ 裡面包了各種各樣的 progress bar (Ex. spinner, counter, and bar)

Demo: IPython Notebook Progressbar

IPython Notebook Progressbar Demo

Or view it on gfycat

step 設太小,I/O 高且會拖慢速度(最快不超過 0.1s 更新一次)

Take home message

Pinkoiis hiring

Back-End | QA | Data | Search
Front-End | iOS | Android

Appendix之
我很清楚這些東西沒時間講惹

Who the X use Python 3?

The REST server by Tornado

class RootHandler(tornado.web.RequestHandler):
    def get(self):
        response = {'message': 'Big Brother is watching you'}
        self.write(response)
        self.finish()

if __name__ == '__main__':
    app = tornado.web.Application([(r'^/$', RootHandler)])
    app.listen(5566)  # try curl -s -XGET localhost:5566/
      

Add extra async operations

By default, Tornado app already is async.
To pause for an extra 0.5 sec before response,

class SlowHandler(RootHandler):
    @tornado.web.asynchronous
    @gen.engine
    def get(self):
        yield gen.Task(IOLoop.current().add_timeout, time.time() + 0.5)
        self.finish()  # close the connection

Questions?

Fork me on Github