協(xié)程是在 Tornado 中編寫異步代碼的推薦方式。協(xié)程使用 Python的?await
?或?yield
?關(guān)鍵字來暫停和恢復(fù)執(zhí)行,而不是一連串的回調(diào)(在gevent等框架中看到的協(xié)作輕量級線程有時也稱為協(xié)程,但在 Tornado 中,所有協(xié)程都使用顯式上下文切換并被稱為異步函數(shù))
協(xié)程幾乎和同步代碼一樣簡單,但沒有線程的開銷。它們還通過減少可能發(fā)生上下文切換的位置數(shù)量,使并發(fā)更容易推理。
例子:
async def fetch_coroutine(url):
http_client = AsyncHTTPClient()
response = await http_client.fetch(url)
return response.body
Python 3.5 引入了?async
?和?await
?關(guān)鍵字(使用這些關(guān)鍵字的函數(shù)也稱為“本機協(xié)程”)。為了與舊版本的 Python 兼容,您可以使用裝飾器使用“裝飾”或“基于產(chǎn)量”的協(xié)程tornado.gen.coroutine
盡可能推薦使用原生協(xié)程。僅在需要與舊版本的 Python 兼容時才使用修飾的協(xié)程。Tornado 文檔中的示例通常會使用原生形式。
兩種形式之間的轉(zhuǎn)換通常很簡單:
# Decorated: # Native:
# Normal function declaration
# with decorator # "async def" keywords
@gen.coroutine
def a(): async def a():
# "yield" all async funcs # "await" all async funcs
b = yield c() b = await c()
# "return" and "yield"
# cannot be mixed in
# Python 2, so raise a
# special exception. # Return normally
raise gen.Return(b) return b
下面概述了兩種形式的協(xié)程之間的其他差異:
原生協(xié)程:
async for
?和?async with
?語句使某些模式更簡單。 yield
?和?await
?他們,否則根本不要運行。裝飾協(xié)程一旦被調(diào)用就可以開始“在后臺”運行。請注意,對于這兩種協(xié)程,使用?await
?或?yield
?是很重要的,這樣任何異常都有可能發(fā)生。裝飾協(xié)程:
executor.submit
?直接產(chǎn)生結(jié)果。對于本機協(xié)程,請?IOLoop.run_in_executor?改用本節(jié)解釋裝飾協(xié)程的操作。原生協(xié)程在概念上相似,但由于與 Python 運行時的額外集成而稍微復(fù)雜一些
包含的函數(shù)?yield
?是生成器。所有生成器都是異步的;當被調(diào)用時,它們返回一個生成器對象,而不是運行到完成。裝飾器?@gen.coroutine
?通過?yield
?表達式與生成器通信,并通過返回一個Future
這是協(xié)程裝飾器內(nèi)部循環(huán)的簡化版本:
# Simplified inner loop of tornado.gen.Runner
def run(self):
# send(x) makes the current yield return x.
# It returns when the next yield is reached
future = self.gen.send(self.next)
def callback(f):
self.next = f.result()
self.run()
future.add_done_callback(callback)
裝飾器Future從生成器接收 a,等待(不阻塞)Future完成,然后“解包”并將結(jié)果作為表達式Future 的結(jié)果發(fā)送回生成器 。?yield
?大多數(shù)異步代碼從不直接接觸類,除非立即將Future異步函數(shù)返回的值傳遞給?yield
?表達式。
協(xié)程不會以正常方式引發(fā)異常:它們引發(fā)的任何異常都將被困在等待對象中,直到它被產(chǎn)生。這意味著以正確的方式調(diào)用協(xié)程很重要,否則您可能會遇到未被注意到的錯誤:
async def divide(x, y):
return x / y
def bad_call():
# This should raise a ZeroDivisionError, but it won't because
# the coroutine is called incorrectly.
divide(1, 0)
在幾乎所有情況下,任何調(diào)用協(xié)程的函數(shù)都必須是協(xié)程本身,并且在調(diào)用中使用?await
?或者?yield
?關(guān)鍵字。當您覆蓋類中定義的方法時,請查閱文檔以查看是否允許使用協(xié)程(文檔應(yīng)說明該方法“可能是協(xié)程”或“可能返回一個Future”):
async def good_call():
# await will unwrap the object returned by divide() and raise
# the exception.
await divide(1, 0)
有時你可能想“觸發(fā)并忘記”一個協(xié)程而不等待它的結(jié)果。在這種情況下,建議使用IOLoop.spawn_callback,這使得IOLoop負責呼叫。如果失敗,IOLoop將記錄堆棧跟蹤:
# The IOLoop will catch the exception and print a stack trace in
# the logs. Note that this doesn't look like a normal call, since
# we pass the function object to be called by the IOLoop.
IOLoop.current().spawn_callback(divide, 1, 0)
對于使用IOLoop.spawn_callback的函數(shù),建議以這種方式使用?@gen.coroutine
?,但對于使用?async def
?的函數(shù),則需要以這種方式使用(否則,協(xié)程運行程序?qū)o法啟動)。
最后,在程序的頂層,如果 IOLoop 尚未運行,您可以啟動IOLoop,運行協(xié)程,然后IOLoop使用IOLoop.run_sync方法停止。這通常用于啟動?main
?面向批處理的程序的功能:
# run_sync() doesn't take arguments, so we must wrap the
# call in a lambda.
IOLoop.current().run_sync(lambda: divide(1, 0))
從協(xié)程調(diào)用阻塞函數(shù)的最簡單方法是使用IOLoop.run_in_executor,它的返回值 ?Futures
?與協(xié)程兼容:
async def call_blocking():
await IOLoop.current().run_in_executor(None, blocking_func, args)
該multi函數(shù)接受值為列表和字典,并等待所有這些?Futures
?:
from tornado.gen import multi
async def parallel_fetch(url1, url2):
resp1, resp2 = await multi([http_client.fetch(url1),
http_client.fetch(url2)])
async def parallel_fetch_many(urls):
responses = await multi ([http_client.fetch(url) for url in urls])
# responses is a list of HTTPResponses in the same order
async def parallel_fetch_dict(urls):
responses = await multi({url: http_client.fetch(url)
for url in urls})
# responses is a dict {url: HTTPResponse}
在裝飾協(xié)程中,可以?yield
?直接生成使用list 或dict:
@gen.coroutine
def parallel_fetch_decorated(url1, url2):
resp1, resp2 = yield [http_client.fetch(url1),
http_client.fetch(url2)]
有時,保存一個Future而不是立即放棄它是有用的,這樣你就可以在等待之前開始另一個操作。
from tornado.gen import convert_yielded
async def get(self):
# convert_yielded() starts the native coroutine in the background.
# This is equivalent to asyncio.ensure_future() (both work in Tornado).
fetch_future = convert_yielded(self.fetch_next_chunk())
while True:
chunk = yield fetch_future
if chunk is None: break
self.write(chunk)
fetch_future = convert_yielded(self.fetch_next_chunk())
yield self.flush()
這對裝飾協(xié)程來說更容易一些,因為它們在調(diào)用時立即啟動:
@gen.coroutine
def get(self):
fetch_future = self.fetch_next_chunk()
while True:
chunk = yield fetch_future
if chunk is None: break
self.write(chunk)
fetch_future = self.fetch_next_chunk()
yield self.flush()
在本地協(xié)同程序中,可以使用?async for
?。在較舊版本的Python中,使用協(xié)同路由進行循環(huán)是很棘手的,因為在?for
?或?while
?循環(huán)的每次迭代中都無法找到?yield
?并捕獲結(jié)果。相反,您需要將循環(huán)條件與訪問結(jié)果分開,如本例中的Motor:
import motor
db = motor.MotorClient().test
@gen.coroutine
def loop_example(collection):
cursor = db.collection.find()
while (yield cursor.fetch_next):
doc = cursor.next_object()
PeriodicCallback通常不與coroutines一起使用。相反,協(xié)同程序可以包含?while True
?:循環(huán)并使用tornado.gen.sleep:
async def minute_loop():
while True:
await do_something()
await gen.sleep(60)
# Coroutines that loop forever are generally started with
# spawn_callback().
IOLoop.current().spawn_callback(minute_loop)
有時可能需要一個更復(fù)雜的循環(huán)。例如,前一個循環(huán)每?60+N
?秒運行一次,其中?N
?是?do_something()
?的運行時間。要準確地每60秒運行一次,請使用上面的交錯模式:
async def minute_loop2():
while True:
nxt = gen.sleep(60) # Start the clock.
await do_something() # Run while the clock is ticking.
await nxt # Wait for the timer to run out.
更多建議: