7.11 內聯(lián)回調函數(shù)

2018-02-24 15:26 更新

問題

當你編寫使用回調函數(shù)的代碼的時候,擔心很多小函數(shù)的擴張可能會弄亂程序控制流。你希望找到某個方法來讓代碼看上去更像是一個普通的執(zhí)行序列。

解決方案

通過使用生成器和協(xié)程可以使得回調函數(shù)內聯(lián)在某個函數(shù)中。為了演示說明,假設你有如下所示的一個執(zhí)行某種計算任務然后調用一個回調函數(shù)的函數(shù)(參考7.10小節(jié)):

def apply_async(func, args, *, callback):
    # Compute the result
    result = func(*args)

    # Invoke the callback with the result
    callback(result)

接下來讓我們看一下下面的代碼,它包含了一個 Async 類和一個 inlined_async 裝飾器:

from queue import Queue
from functools import wraps

class Async:
    def __init__(self, func, args):
        self.func = func
        self.args = args

def inlined_async(func):
    @wraps(func)
    def wrapper(*args):
        f = func(*args)
        result_queue = Queue()
        result_queue.put(None)
        while True:
            result = result_queue.get()
            try:
                a = f.send(result)
                apply_async(a.func, a.args, callback=result_queue.put)
            except StopIteration:
                break
    return wrapper

這兩個代碼片段允許你使用 yield 語句內聯(lián)回調步驟。比如:

def add(x, y):
    return x + y

@inlined_async
def test():
    r = yield Async(add, (2, 3))
    print(r)
    r = yield Async(add, ('hello', 'world'))
    print(r)
    for n in range(10):
        r = yield Async(add, (n, n))
        print(r)
    print('Goodbye')

如果你調用 test() ,你會得到類似如下的輸出:

5
helloworld
0
2
4
6
8
10
12
14
16
18
Goodbye

你會發(fā)現(xiàn),除了那個特別的裝飾器和 yield 語句外,其他地方并沒有出現(xiàn)任何的回調函數(shù)(其實是在后臺定義的)。

討論

本小節(jié)會實實在在的測試你關于回調函數(shù)、生成器和控制流的知識。

首先,在需要使用到回調的代碼中,關鍵點在于當前計算工作會掛起并在將來的某個時候重啟(比如異步執(zhí)行)。當計算重啟時,回調函數(shù)被調用來繼續(xù)處理結果。apply_async() 函數(shù)演示了執(zhí)行回調的實際邏輯,盡管實際情況中它可能會更加復雜(包括線程、進程、事件處理器等等)。

計算的暫停與重啟思路跟生成器函數(shù)的執(zhí)行模型不謀而合。具體來講,yield 操作會使一個生成器函數(shù)產生一個值并暫停。接下來調用生成器的 __next__()send() 方法又會讓它從暫停處繼續(xù)執(zhí)行。

根據(jù)這個思路,這一小節(jié)的核心就在 inline_async() 裝飾器函數(shù)中了。關鍵點就是,裝飾器會逐步遍歷生成器函數(shù)的所有 yield 語句,每一次一個。為了這樣做,剛開始的時候創(chuàng)建了一個 result 隊列并向里面放入一個 None 值。然后開始一個循環(huán)操作,從隊列中取出結果值并發(fā)送給生成器,它會持續(xù)到下一個 yield 語句,在這里一個 Async 的實例被接受到。然后循環(huán)開始檢查函數(shù)和參數(shù),并開始進行異步計算 apply_async() 。然而,這個計算有個最詭異部分是它并沒有使用一個普通的回調函數(shù),而是用隊列的 put() 方法來回調。

這時候,是時候詳細解釋下到底發(fā)生了什么了。主循環(huán)立即返回頂部并在隊列上執(zhí)行 get() 操作。如果數(shù)據(jù)存在,它一定是 put() 回調存放的結果。如果沒有數(shù)據(jù),那么先暫停操作并等待結果的到來。這個具體怎樣實現(xiàn)是由 apply_async() 函數(shù)來決定的。如果你不相信會有這么神奇的事情,你可以使用 multiprocessing 庫來試一下,在單獨的進程中執(zhí)行異步計算操作,如下所示:

if __name__ == '__main__':
    import multiprocessing
    pool = multiprocessing.Pool()
    apply_async = pool.apply_async

    # Run the test function
    test()

實際上你會發(fā)現(xiàn)這個真的就是這樣的,但是要解釋清楚具體的控制流得需要點時間了。

將復雜的控制流隱藏到生成器函數(shù)背后的例子在標準庫和第三方包中都能看到。比如,在contextlib 中的 @contextmanager 裝飾器使用了一個令人費解的技巧,通過一個 yield 語句將進入和離開上下文管理器粘合在一起。另外非常流行的 Twisted 包中也包含了非常類似的內聯(lián)回調。

以上內容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號