Tornado 協(xié)程

2022-03-08 10:40 更新

協(xié)程

協(xié)程是在 Tornado 中編寫異步代碼的推薦方式。協(xié)程使用 Python的?await?或?yield?關(guān)鍵字來暫停和恢復(fù)執(zhí)行,而不是一連串的回調(diào)(在gevent等框架中看到的協(xié)作輕量級線程有時也稱為協(xié)程,但在 Tornado 中,所有協(xié)程都使用顯式上下文切換并被稱為異步函數(shù))
協(xié)程幾乎和同步代碼一樣簡單,但沒有線程的開銷。它們還通過減少可能發(fā)生上下文切換的位置數(shù)量,使并發(fā)更容易推理

例子:

async def fetch_coroutine(url):
    http_client = AsyncHTTPClient()
    response = await http_client.fetch(url)
    return response.body

原生協(xié)程與裝飾協(xié)程

Python 3.5 引入了?async?和?await?關(guān)鍵字(使用這些關(guān)鍵字的函數(shù)也稱為“本機協(xié)程”)。為了與舊版本的 Python 兼容,您可以使用裝飾器使用“裝飾”或“基于產(chǎn)量”的協(xié)程tornado.gen.coroutine
盡可能推薦使用原生協(xié)程。僅在需要與舊版本的 Python 兼容時才使用修飾的協(xié)程。Tornado 文檔中的示例通常會使用原生形式。
兩種形式之間的轉(zhuǎn)換通常很簡單:

# Decorated:                    # Native:

# Normal function declaration
# with decorator                # "async def" keywords
@gen.coroutine
def a():                        async def a():
    # "yield" all async funcs       # "await" all async funcs
    b = yield c()                   b = await c()
    # "return" and "yield"
    # cannot be mixed in
    # Python 2, so raise a
    # special exception.            # Return normally
    raise gen.Return(b)             return b

下面概述了兩種形式的協(xié)程之間的其他差異:

原生協(xié)程:

  • 通常更快。
  • 可以使用?async for?和?async with?語句使某些模式更簡單。 
  • 除非您知道 ?yield?和?await?他們,否則根本不要運行。裝飾協(xié)程一旦被調(diào)用就可以開始“在后臺”運行。請注意,對于這兩種協(xié)程,使用?await?或?yield?是很重要的,這樣任何異常都有可能發(fā)生。

裝飾協(xié)程:

  • 包有額外的集成 ?concurrent.futures?,允許 ?executor.submit?直接產(chǎn)生結(jié)果。對于本機協(xié)程,請?IOLoop.run_in_executor?改用
  • 通過產(chǎn)生一個列表或字典來支持一些等待多個對象的簡寫。用于?tornado.gen.multi?在本機協(xié)程中執(zhí)行此操作
  • 可以通過轉(zhuǎn)換函數(shù)注冊表支持與其他包的集成,包括 Twisted。要在本機協(xié)程中訪問此功能,請使用 tornado.gen.convert_yielded
  • 總是返回一個Future對象。本機協(xié)程返回一個不是Future. 在 Tornado 中,兩者大多可以互換。

如何運作

本節(jié)解釋裝飾協(xié)程的操作。原生協(xié)程在概念上相似,但由于與 Python 運行時的額外集成而稍微復(fù)雜一些

包含的函數(shù)?yield?是生成器。所有生成器都是異步的;當被調(diào)用時,它們返回一個生成器對象,而不是運行到完成。裝飾器?@gen.coroutine?通過?yield?表達式與生成器通信,并通過返回一個Future

這是協(xié)程裝飾器內(nèi)部循環(huán)的簡化版本:

# Simplified inner loop of tornado.gen.Runner
def run(self):
    # send(x) makes the current yield return x.
    # It returns when the next yield is reached
    future = self.gen.send(self.next)
    def callback(f):
        self.next = f.result()
        self.run()
    future.add_done_callback(callback)

裝飾器Future從生成器接收 a,等待(不阻塞)Future完成,然后“解包”并將結(jié)果作為表達式Future 的結(jié)果發(fā)送回生成器 。?yield?大多數(shù)異步代碼從不直接接觸類,除非立即將Future異步函數(shù)返回的值傳遞給?yield?表達式。

如何調(diào)用協(xié)程

協(xié)程不會以正常方式引發(fā)異常:它們引發(fā)的任何異常都將被困在等待對象中,直到它被產(chǎn)生。這意味著以正確的方式調(diào)用協(xié)程很重要,否則您可能會遇到未被注意到的錯誤:

async def divide(x, y):
    return x / y

def bad_call():
    # This should raise a ZeroDivisionError, but it won't because
    # the coroutine is called incorrectly.
    divide(1, 0)

在幾乎所有情況下,任何調(diào)用協(xié)程的函數(shù)都必須是協(xié)程本身,并且在調(diào)用中使用?await?或者?yield?關(guān)鍵字。當您覆蓋類中定義的方法時,請查閱文檔以查看是否允許使用協(xié)程(文檔應(yīng)說明該方法“可能是協(xié)程”或“可能返回一個Future”):

async def good_call():
    # await will unwrap the object returned by divide() and raise
    # the exception.
    await divide(1, 0)

有時你可能想“觸發(fā)并忘記”一個協(xié)程而不等待它的結(jié)果。在這種情況下,建議使用IOLoop.spawn_callback,這使得IOLoop負責呼叫。如果失敗,IOLoop將記錄堆棧跟蹤:

# The IOLoop will catch the exception and print a stack trace in
# the logs. Note that this doesn't look like a normal call, since
# we pass the function object to be called by the IOLoop.
IOLoop.current().spawn_callback(divide, 1, 0)

對于使用IOLoop.spawn_callback的函數(shù),建議以這種方式使用?@gen.coroutine?,但對于使用?async def?的函數(shù),則需要以這種方式使用(否則,協(xié)程運行程序?qū)o法啟動)。

最后,在程序的頂層,如果 IOLoop 尚未運行,您可以啟動IOLoop,運行協(xié)程,然后IOLoop使用IOLoop.run_sync方法停止。這通常用于啟動?main?面向批處理的程序的功能:

# run_sync() doesn't take arguments, so we must wrap the
# call in a lambda.
IOLoop.current().run_sync(lambda: divide(1, 0))

協(xié)程模式

調(diào)用阻塞函數(shù)

從協(xié)程調(diào)用阻塞函數(shù)的最簡單方法是使用IOLoop.run_in_executor,它的返回值 ?Futures?與協(xié)程兼容:

async def call_blocking():
    await IOLoop.current().run_in_executor(None, blocking_func, args)

并行性

multi函數(shù)接受值為列表和字典,并等待所有這些?Futures?:

from tornado.gen import multi

async def parallel_fetch(url1, url2):
    resp1, resp2 = await multi([http_client.fetch(url1),
                                http_client.fetch(url2)])

async def parallel_fetch_many(urls):
    responses = await multi ([http_client.fetch(url) for url in urls])
    # responses is a list of HTTPResponses in the same order

async def parallel_fetch_dict(urls):
    responses = await multi({url: http_client.fetch(url)
                             for url in urls})
    # responses is a dict {url: HTTPResponse}

在裝飾協(xié)程中,可以?yield?直接生成使用list 或dict:

@gen.coroutine
def parallel_fetch_decorated(url1, url2):
    resp1, resp2 = yield [http_client.fetch(url1),
                          http_client.fetch(url2)]

交錯

有時,保存一個Future而不是立即放棄它是有用的,這樣你就可以在等待之前開始另一個操作。

from tornado.gen import convert_yielded

async def get(self):
    # convert_yielded() starts the native coroutine in the background.
    # This is equivalent to asyncio.ensure_future() (both work in Tornado).
    fetch_future = convert_yielded(self.fetch_next_chunk())
    while True:
        chunk = yield fetch_future
        if chunk is None: break
        self.write(chunk)
        fetch_future = convert_yielded(self.fetch_next_chunk())
        yield self.flush()

這對裝飾協(xié)程來說更容易一些,因為它們在調(diào)用時立即啟動:

@gen.coroutine
def get(self):
    fetch_future = self.fetch_next_chunk()
    while True:
        chunk = yield fetch_future
        if chunk is None: break
        self.write(chunk)
        fetch_future = self.fetch_next_chunk()
        yield self.flush()

循環(huán)

在本地協(xié)同程序中,可以使用?async for?。在較舊版本的Python中,使用協(xié)同路由進行循環(huán)是很棘手的,因為在?for?或?while?循環(huán)的每次迭代中都無法找到?yield?并捕獲結(jié)果。相反,您需要將循環(huán)條件與訪問結(jié)果分開,如本例中的Motor:

import motor
db = motor.MotorClient().test

@gen.coroutine
def loop_example(collection):
    cursor = db.collection.find()
    while (yield cursor.fetch_next):
        doc = cursor.next_object()

在后臺運行

PeriodicCallback通常不與coroutines一起使用。相反,協(xié)同程序可以包含?while True?:循環(huán)并使用tornado.gen.sleep:

async def minute_loop():
    while True:
        await do_something()
        await gen.sleep(60)

# Coroutines that loop forever are generally started with
# spawn_callback().
IOLoop.current().spawn_callback(minute_loop)

有時可能需要一個更復(fù)雜的循環(huán)。例如,前一個循環(huán)每?60+N?秒運行一次,其中?N?是?do_something()?的運行時間。要準確地每60秒運行一次,請使用上面的交錯模式:

async def minute_loop2():
    while True:
        nxt = gen.sleep(60)   # Start the clock.
        await do_something()  # Run while the clock is ticking.
        await nxt             # Wait for the timer to run out.


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號