Tornado 協程

2022-03-08 10:40 更新

協程

協程是在 Tornado 中編寫異步代碼的推薦方式。協程使用 Python的?await?或?yield?關鍵字來暫停和恢復執(zhí)行,而不是一連串的回調(在gevent等框架中看到的協作輕量級線程有時也稱為協程,但在 Tornado 中,所有協程都使用顯式上下文切換并被稱為異步函數)
協程幾乎和同步代碼一樣簡單,但沒有線程的開銷。它們還通過減少可能發(fā)生上下文切換的位置數量,使并發(fā)更容易推理。

例子:

async def fetch_coroutine(url):
    http_client = AsyncHTTPClient()
    response = await http_client.fetch(url)
    return response.body

原生協程與裝飾協程

Python 3.5 引入了?async?和?await?關鍵字(使用這些關鍵字的函數也稱為“本機協程”)。為了與舊版本的 Python 兼容,您可以使用裝飾器使用“裝飾”或“基于產量”的協程tornado.gen.coroutine
盡可能推薦使用原生協程。僅在需要與舊版本的 Python 兼容時才使用修飾的協程。Tornado 文檔中的示例通常會使用原生形式。
兩種形式之間的轉換通常很簡單:

# Decorated:                    # Native:

# Normal function declaration
# with decorator                # "async def" keywords
@gen.coroutine
def a():                        async def a():
    # "yield" all async funcs       # "await" all async funcs
    b = yield c()                   b = await c()
    # "return" and "yield"
    # cannot be mixed in
    # Python 2, so raise a
    # special exception.            # Return normally
    raise gen.Return(b)             return b

下面概述了兩種形式的協程之間的其他差異:

原生協程:

  • 通常更快。
  • 可以使用?async for?和?async with?語句使某些模式更簡單。 
  • 除非您知道 ?yield?和?await?他們,否則根本不要運行。裝飾協程一旦被調用就可以開始“在后臺”運行。請注意,對于這兩種協程,使用?await?或?yield?是很重要的,這樣任何異常都有可能發(fā)生。

裝飾協程:

  • 包有額外的集成 ?concurrent.futures?,允許 ?executor.submit?直接產生結果。對于本機協程,請?IOLoop.run_in_executor?改用
  • 通過產生一個列表或字典來支持一些等待多個對象的簡寫。用于?tornado.gen.multi?在本機協程中執(zhí)行此操作
  • 可以通過轉換函數注冊表支持與其他包的集成,包括 Twisted。要在本機協程中訪問此功能,請使用 tornado.gen.convert_yielded
  • 總是返回一個Future對象。本機協程返回一個不是Future. 在 Tornado 中,兩者大多可以互換。

如何運作

本節(jié)解釋裝飾協程的操作。原生協程在概念上相似,但由于與 Python 運行時的額外集成而稍微復雜一些

包含的函數?yield?是生成器。所有生成器都是異步的;當被調用時,它們返回一個生成器對象,而不是運行到完成。裝飾器?@gen.coroutine?通過?yield?表達式與生成器通信,并通過返回一個Future

這是協程裝飾器內部循環(huán)的簡化版本:

# Simplified inner loop of tornado.gen.Runner
def run(self):
    # send(x) makes the current yield return x.
    # It returns when the next yield is reached
    future = self.gen.send(self.next)
    def callback(f):
        self.next = f.result()
        self.run()
    future.add_done_callback(callback)

裝飾器Future從生成器接收 a,等待(不阻塞)Future完成,然后“解包”并將結果作為表達式Future 的結果發(fā)送回生成器 。?yield?大多數異步代碼從不直接接觸類,除非立即將Future異步函數返回的值傳遞給?yield?表達式。

如何調用協程

協程不會以正常方式引發(fā)異常:它們引發(fā)的任何異常都將被困在等待對象中,直到它被產生。這意味著以正確的方式調用協程很重要,否則您可能會遇到未被注意到的錯誤:

async def divide(x, y):
    return x / y

def bad_call():
    # This should raise a ZeroDivisionError, but it won't because
    # the coroutine is called incorrectly.
    divide(1, 0)

在幾乎所有情況下,任何調用協程的函數都必須是協程本身,并且在調用中使用?await?或者?yield?關鍵字。當您覆蓋類中定義的方法時,請查閱文檔以查看是否允許使用協程(文檔應說明該方法“可能是協程”或“可能返回一個Future”):

async def good_call():
    # await will unwrap the object returned by divide() and raise
    # the exception.
    await divide(1, 0)

有時你可能想“觸發(fā)并忘記”一個協程而不等待它的結果。在這種情況下,建議使用IOLoop.spawn_callback,這使得IOLoop負責呼叫。如果失敗,IOLoop將記錄堆棧跟蹤:

# The IOLoop will catch the exception and print a stack trace in
# the logs. Note that this doesn't look like a normal call, since
# we pass the function object to be called by the IOLoop.
IOLoop.current().spawn_callback(divide, 1, 0)

對于使用IOLoop.spawn_callback的函數,建議以這種方式使用?@gen.coroutine?,但對于使用?async def?的函數,則需要以這種方式使用(否則,協程運行程序將無法啟動)。

最后,在程序的頂層,如果 IOLoop 尚未運行,您可以啟動IOLoop,運行協程,然后IOLoop使用IOLoop.run_sync方法停止。這通常用于啟動?main?面向批處理的程序的功能:

# run_sync() doesn't take arguments, so we must wrap the
# call in a lambda.
IOLoop.current().run_sync(lambda: divide(1, 0))

協程模式

調用阻塞函數

從協程調用阻塞函數的最簡單方法是使用IOLoop.run_in_executor,它的返回值 ?Futures?與協程兼容:

async def call_blocking():
    await IOLoop.current().run_in_executor(None, blocking_func, args)

并行性

multi函數接受值為列表和字典,并等待所有這些?Futures?:

from tornado.gen import multi

async def parallel_fetch(url1, url2):
    resp1, resp2 = await multi([http_client.fetch(url1),
                                http_client.fetch(url2)])

async def parallel_fetch_many(urls):
    responses = await multi ([http_client.fetch(url) for url in urls])
    # responses is a list of HTTPResponses in the same order

async def parallel_fetch_dict(urls):
    responses = await multi({url: http_client.fetch(url)
                             for url in urls})
    # responses is a dict {url: HTTPResponse}

在裝飾協程中,可以?yield?直接生成使用list 或dict:

@gen.coroutine
def parallel_fetch_decorated(url1, url2):
    resp1, resp2 = yield [http_client.fetch(url1),
                          http_client.fetch(url2)]

交錯

有時,保存一個Future而不是立即放棄它是有用的,這樣你就可以在等待之前開始另一個操作。

from tornado.gen import convert_yielded

async def get(self):
    # convert_yielded() starts the native coroutine in the background.
    # This is equivalent to asyncio.ensure_future() (both work in Tornado).
    fetch_future = convert_yielded(self.fetch_next_chunk())
    while True:
        chunk = yield fetch_future
        if chunk is None: break
        self.write(chunk)
        fetch_future = convert_yielded(self.fetch_next_chunk())
        yield self.flush()

這對裝飾協程來說更容易一些,因為它們在調用時立即啟動:

@gen.coroutine
def get(self):
    fetch_future = self.fetch_next_chunk()
    while True:
        chunk = yield fetch_future
        if chunk is None: break
        self.write(chunk)
        fetch_future = self.fetch_next_chunk()
        yield self.flush()

循環(huán)

在本地協同程序中,可以使用?async for?。在較舊版本的Python中,使用協同路由進行循環(huán)是很棘手的,因為在?for?或?while?循環(huán)的每次迭代中都無法找到?yield?并捕獲結果。相反,您需要將循環(huán)條件與訪問結果分開,如本例中的Motor:

import motor
db = motor.MotorClient().test

@gen.coroutine
def loop_example(collection):
    cursor = db.collection.find()
    while (yield cursor.fetch_next):
        doc = cursor.next_object()

在后臺運行

PeriodicCallback通常不與coroutines一起使用。相反,協同程序可以包含?while True?:循環(huán)并使用tornado.gen.sleep:

async def minute_loop():
    while True:
        await do_something()
        await gen.sleep(60)

# Coroutines that loop forever are generally started with
# spawn_callback().
IOLoop.current().spawn_callback(minute_loop)

有時可能需要一個更復雜的循環(huán)。例如,前一個循環(huán)每?60+N?秒運行一次,其中?N?是?do_something()?的運行時間。要準確地每60秒運行一次,請使用上面的交錯模式:

async def minute_loop2():
    while True:
        nxt = gen.sleep(60)   # Start the clock.
        await do_something()  # Run while the clock is ticking.
        await nxt             # Wait for the timer to run out.


以上內容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號