Liang Bo Wang (亮亮), 2015-07-14
By Liang2 under CC 4.0 BY license
Esc to overview
← → to navigate
雖然 ES 很快一個 query 約 15ms,十萬次也要半個小時。耍廢半小時?
所以想到用個非同步打 ES 好了。mutlithreading / async?
from concurrent.futures import ThreadPoolExecutor
def search(query, url='/search'):
r = requests.request('GET', url, params={'q': query})
quote_json = r.json()
return quote_json
with ThreadPoolExecutor(max_workers) as executor:
results = executor.map(search, all_queries)
For Python 2.7, futures is the backport package for concurrent.futures
http://localhost:5566/quote
就會回傳 1984 隨機段落?slow=true
強制加 0.5 秒/quote?part=1&chapter=2
可以選段落章節抽樣/quote/uniform
確保每個段落機率是一致的(統計強迫症)python 0_parse_1984.py
python random_quote_app.py [--help|--num_process=1]
用起來大概像這樣
$ curl -s -XGET "http://localhost:5566/quote" | \
python -m json.tool # prettify
{
"quote": " WAR IS PEACE FREEDOM IS SLAVERY
IGNORANCE IS STRENGTH",
"source": {
"chapter": 1,
"part": 1
}
}
$ curl -s -XGET "localhost:5566/quote/uniform?slow=true&part=3"
# pause for 500ms then return
{
"quote": "'In memory. Very well, then. We, the Party,
control all records, and we control all memories.
Then we control the past, do we not?'",
"source": {
"chapter": 2,
"part": 3
}
}
@asyncio.coroutine
def quote_simple(url='/quote/uniform'):
r = yield from aiohttp.request('GET', url)
quote_json = yield from r.json()
return quote_json['quote']
loop = asyncio.get_event_loop()
quote = loop.run_until_complete(quote_simple())
Coroutines can be entered, exited, and resumed at many different points
def quote_block(url='/quote/uniform'):
r = requests.request('GET', url)
quote_json = r.json()
return quote_json['quote']
quote = quote_block()
requests.get(...)
aiohttp.get(...)
@asyncio.coroutine
def quote_simple(url='/quote/uniform'):
r = yield from aiohttp.request('GET', url)
quote_json = yield from r.json()
return quote_json['quote']
loop = asyncio.get_event_loop()
quote = loop.run_until_complete(quote_simple())
@asyncio.coroutine
def quote_simple(url='/quote/uniform'):
r = yield from aiohttp.request('GET', url)
quote_json = yield from r.json()
return quote_json['quote']
loop = asyncio.get_event_loop()
quote = loop.run_until_complete(quote_simple())
quote_simple()
會回傳一個 coroutine(可以 yield from 它)loop.run_until_complete(coro)
會自動把 coroutine 用 Task
(subclass of future) 包住,
並丟到 event loop 去執行Task
有很多好處:接 callback、看包的 coroutine 有沒有結束、記住回傳值、存 tracebackt = Task(coro); loop.run_until_complete(t)
t.result()
能拿到 coro
回傳值用 asyncio.gather(*coro)
同時放入多個 coroutines 進 loop,
等他們結束,並依序收集回傳值。
@asyncio.coroutine
def quote_many(num_quotes=1):
coroutines = [
quote_simple() for i in range(num_quotes)
]
quotes = yield from asyncio.gather(*coroutines)
return quotes
loop.run_until_complete(quote_many(2000))
netstat -apn | grep 9527
tcp 0 0 127.0.0.1:5566 127.0.0.1:45236 ESTABLISHED 5991/python
tcp 0 0 127.0.0.1:45213 127.0.0.1:5566 ESTABLISHED 6680/python
tcp 0 0 127.0.0.1:45168 127.0.0.1:5566 TIME_WAIT -
tcp 0 0 127.0.0.1:44763 127.0.0.1:5566 TIME_WAIT -
tcp 0 0 127.0.0.1:45772 127.0.0.1:5566 TIME_WAIT -
tcp 0 0 127.0.0.1:44741 127.0.0.1:5566 TIME_WAIT -
asyncio.Semaphore
(維基說明)@asyncio.coroutine
def quote_simple(url='/quote/uniform'):
r = yield from aiohttp.request('GET', url)
quote_json = yield from r.json()
return quote_json['quote']
@asyncio.coroutine
def quote_with_lock(semaphore, url='/quote/uniform'):
with (yield from semaphore):
r = yield from aiohttp.request('GET', url)
quote_json = yield from r.json()
return quote_json['quote']
with ...
進入時 acquire,出來時 release 來控制總活動連線數
@asyncio.coroutine
def quote_many(num_quotes=1, conn_limit=20):
semaphore = asyncio.Semaphore(conn_limit)
coroutines = [
quote_with_lock(semaphore)
for i in range(num_quotes)
]
quotes = yield from asyncio.gather(*coroutines)
return quotes
quotes = loop.run_until_complete(
quote_many(2000, conn_limit=100))
底下那根東西一直會動就是潮
IPython.html.widget.IntProgress
;可以考慮用 asyncio.as_completed(coroutines)
。
它會把執行結束的 coroutine 依完成先後丟回對應的 Future
quotes, step = [], 10
for i, fut in enumerate(asyncio.as_completed(coros), 1):
if i % step == 0:
progress.value += 1 # or progress.next()
q = yield from fut # get the result of future
quotes.append(q)
futures = [
asyncio.ensure_future(quote_with_lock(semaphore))
for i in range(num_quotes)
]
for i, fut in enumerate(asyncio.as_completed(futures), 1):
if i % step == 0:
progress.value += 1
yield from fut # don't store
quotes = [fut.result() for fut in futures]
done_tasks = 0
def progress_adder(fut):
nonlocal done_tasks
done_tasks += 1
if done_tasks % step == 0:
progress.value += 1
自己做一個 callback function progress_adder
負責增加 progress
futures = [] # wrap coroutines as Tasks
for i in range(num_quotes):
task = asyncio.Task(quote_with_lock(semaphore))
task.add_done_callback(progress_adder)
futures.append(task)
quotes = yield from (asyncio.gather(*futures))
@asyncio.coroutine
def quote_many(num_quotes, conn_limit, progress, step):
semaphore = asyncio.Semaphore(conn_limit)
done_tasks = 0
def progress_adder(fut):
nonlocal done_tasks
done_tasks += 1
if done_tasks % step == 0:
progress.value += 1
futures = []
for i in range(num_quotes):
task = asyncio.Task(quote_with_lock(semaphore))
task.add_done_callback(progress_adder)
futures.append(task)
quotes = yield from (asyncio.gather(*futures))
return quotes
async
and await
usage at 3_console_demo_py35.py
class RootHandler(tornado.web.RequestHandler):
def get(self):
response = {'message': 'Big Brother is watching you'}
self.write(response)
self.finish()
if __name__ == '__main__':
app = tornado.web.Application([(r'^/$', RootHandler)])
app.listen(5566) # try curl -s -XGET localhost:5566/
By default, Tornado app already is async.
To pause for an extra 0.5 sec before response,
class SlowHandler(RootHandler):
@tornado.web.asynchronous
@gen.engine
def get(self):
yield gen.Task(IOLoop.current().add_timeout, time.time() + 0.5)
self.finish() # close the connection