4.13 創(chuàng)建數(shù)據(jù)處理管道

2018-02-24 15:26 更新

問題

你想以數(shù)據(jù)管道(類似Unix管道)的方式迭代處理數(shù)據(jù)。比如,你有個大量的數(shù)據(jù)需要處理,但是不能將它們一次性放入內(nèi)存中。

解決方案

生成器函數(shù)是一個實現(xiàn)管道機制的好辦法。為了演示,假定你要處理一個非常大的日志文件目錄:

foo/
    access-log-012007.gz
    access-log-022007.gz
    access-log-032007.gz
    ...
    access-log-012008
bar/
    access-log-092007.bz2
    ...
    access-log-022008

假設(shè)每個日志文件包含這樣的數(shù)據(jù):

124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] "GET /robots.txt ..." 200 71
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /ply/ ..." 200 11875
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /favicon.ico ..." 404 369
61.135.216.105 - - [10/Jul/2012:00:20:04 -0500] "GET /blog/atom.xml ..." 304 -
...

為了處理這些文件,你可以定義一個由多個執(zhí)行特定任務(wù)獨立任務(wù)的簡單生成器函數(shù)組成的容器。就像這樣:

import os
import fnmatch
import gzip
import bz2
import re

def gen_find(filepat, top):
    '''
    Find all filenames in a directory tree that match a shell wildcard pattern
    '''
    for path, dirlist, filelist in os.walk(top):
        for name in fnmatch.filter(filelist, filepat):
            yield os.path.join(path,name)

def gen_opener(filenames):
    '''
    Open a sequence of filenames one at a time producing a file object.
    The file is closed immediately when proceeding to the next iteration.
    '''
    for filename in filenames:
        if filename.endswith('.gz'):
            f = gzip.open(filename, 'rt')
        elif filename.endswith('.bz2'):
            f = bz2.open(filename, 'rt')
        else:
            f = open(filename, 'rt')
        yield f
        f.close()

def gen_concatenate(iterators):
    '''
    Chain a sequence of iterators together into a single sequence.
    '''
    for it in iterators:
        yield from it

def gen_grep(pattern, lines):
    '''
    Look for a regex pattern in a sequence of lines
    '''
    pat = re.compile(pattern)
    for line in lines:
        if pat.search(line):
            yield line

現(xiàn)在你可以很容易的將這些函數(shù)連起來創(chuàng)建一個處理管道。比如,為了查找包含單詞python的所有日志行,你可以這樣做:

lognames = gen_find('access-log*', 'www')
files = gen_opener(lognames)
lines = gen_concatenate(files)
pylines = gen_grep('(?i)python', lines)
for line in pylines:
    print(line)

如果將來的時候你想擴展管道,你甚至可以在生成器表達式中包裝數(shù)據(jù)。比如,下面這個版本計算出傳輸?shù)淖止?jié)數(shù)并計算其總和。

lognames = gen_find('access-log*', 'www')
files = gen_opener(lognames)
lines = gen_concatenate(files)
pylines = gen_grep('(?i)python', lines)
bytecolumn = (line.rsplit(None,1)[1] for line in pylines)
bytes = (int(x) for x in bytecolumn if x != '-')
print('Total', sum(bytes))

討論

以管道方式處理數(shù)據(jù)可以用來解決各類其他問題,包括解析,讀取實時數(shù)據(jù),定時輪詢等。

為了理解上述代碼,重點是要明白yield語句作為數(shù)據(jù)的生產(chǎn)者而for循環(huán)語句作為數(shù)據(jù)的消費者。當(dāng)這些生成器被連在一起后,每個yield會將一個單獨的數(shù)據(jù)元素傳遞給迭代處理管道的下一階段。在例子最后部分,sum() 函數(shù)是最終的程序驅(qū)動者,每次從生成器管道中提取出一個元素。

這種方式一個非常好的特點是每個生成器函數(shù)很小并且都是獨立的。這樣的話就很容易編寫和維護它們了。很多時候,這些函數(shù)如果比較通用的話可以在其他場景重復(fù)使用。并且最終將這些組件組合起來的代碼看上去非常簡單,也很容易理解。

使用這種方式的內(nèi)存效率也不得不提。上述代碼即便是在一個超大型文件目錄中也能工作的很好。事實上,由于使用了迭代方式處理,代碼運行過程中只需要很小很小的內(nèi)存。

在調(diào)用 gen_concatenate() 函數(shù)的時候你可能會有些不太明白。這個函數(shù)的目的是將輸入序列拼接成一個很長的行序列。itertools.chain() 函數(shù)同樣有類似的功能,但是它需要將所有可迭代對象最為參數(shù)傳入。在上面這個例子中,你可能會寫類似這樣的語句 lines = itertools.chain(*files) ,使得 gen_opener() 生成器能被全部消費掉。但由于 gen_opener() 生成器每次生成一個打開過的文件,等到下一個迭代步驟時文件就關(guān)閉了,因此 china() 在這里不能這樣使用。上面的方案可以避免這種情況。

gen_concatenate() 函數(shù)中出現(xiàn)過 yield from 語句,它將yield操作代理到父生成器上去。語句 yield from it 簡單的返回生成器 it 所產(chǎn)生的所有值。關(guān)于這個我們在4.14小節(jié)會有更進一步的描述。

最后還有一點需要注意的是,管道方式并不是萬能的。有時候你想立即處理所有數(shù)據(jù)。然而,即便是這種情況,使用生成器管道也可以將這類問題從邏輯上變?yōu)楣ぷ髁鞯奶幚矸绞健?/p>

David Beazley在他的Generator Tricks for Systems Programmers 教程中對于這種技術(shù)有非常深入的講解??梢詤⒖歼@個教程獲取更多的信息。

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號