Go 語言 并發(fā)的循環(huán)

2023-03-14 16:57 更新

原文鏈接:https://gopl-zh.github.io/ch8/ch8-05.html


8.5. 并發(fā)的循環(huán)

本節(jié)中,我們會(huì)探索一些用來在并行時(shí)循環(huán)迭代的常見并發(fā)模型。我們會(huì)探究從全尺寸圖片生成一些縮略圖的問題。gopl.io/ch8/thumbnail包提供了ImageFile函數(shù)來幫我們拉伸圖片。我們不會(huì)說明這個(gè)函數(shù)的實(shí)現(xiàn),只需要從gopl.io下載它。

gopl.io/ch8/thumbnail

package thumbnail

// ImageFile reads an image from infile and writes
// a thumbnail-size version of it in the same directory.
// It returns the generated file name, e.g., "foo.thumb.jpg".
func ImageFile(infile string) (string, error)

下面的程序會(huì)循環(huán)迭代一些圖片文件名,并為每一張圖片生成一個(gè)縮略圖:

gopl.io/ch8/thumbnail

// makeThumbnails makes thumbnails of the specified files.
func makeThumbnails(filenames []string) {
    for _, f := range filenames {
        if _, err := thumbnail.ImageFile(f); err != nil {
            log.Println(err)
        }
    }
}

顯然我們處理文件的順序無關(guān)緊要,因?yàn)槊恳粋€(gè)圖片的拉伸操作和其它圖片的處理操作都是彼此獨(dú)立的。像這種子問題都是完全彼此獨(dú)立的問題被叫做易并行問題(譯注:embarrassingly parallel,直譯的話更像是尷尬并行)。易并行問題是最容易被實(shí)現(xiàn)成并行的一類問題(廢話),并且最能夠享受到并發(fā)帶來的好處,能夠隨著并行的規(guī)模線性地?cái)U(kuò)展。

下面讓我們并行地執(zhí)行這些操作,從而將文件IO的延遲隱藏掉,并用上多核cpu的計(jì)算能力來拉伸圖像。我們的第一個(gè)并發(fā)程序只是使用了一個(gè)go關(guān)鍵字。這里我們先忽略掉錯(cuò)誤,之后再進(jìn)行處理。

// NOTE: incorrect!
func makeThumbnails2(filenames []string) {
    for _, f := range filenames {
        go thumbnail.ImageFile(f) // NOTE: ignoring errors
    }
}

這個(gè)版本運(yùn)行的實(shí)在有點(diǎn)太快,實(shí)際上,由于它比最早的版本使用的時(shí)間要短得多,即使當(dāng)文件名的slice中只包含有一個(gè)元素。這就有點(diǎn)奇怪了,如果程序沒有并發(fā)執(zhí)行的話,那為什么一個(gè)并發(fā)的版本還是要快呢?答案其實(shí)是makeThumbnails在它還沒有完成工作之前就已經(jīng)返回了。它啟動(dòng)了所有的goroutine,每一個(gè)文件名對應(yīng)一個(gè),但沒有等待它們一直到執(zhí)行完畢。

沒有什么直接的辦法能夠等待goroutine完成,但是我們可以改變goroutine里的代碼讓其能夠?qū)⑼瓿汕闆r報(bào)告給外部的goroutine知曉,使用的方式是向一個(gè)共享的channel中發(fā)送事件。因?yàn)槲覀円呀?jīng)確切地知道有l(wèi)en(filenames)個(gè)內(nèi)部goroutine,所以外部的goroutine只需要在返回之前對這些事件計(jì)數(shù)。

// makeThumbnails3 makes thumbnails of the specified files in parallel.
func makeThumbnails3(filenames []string) {
    ch := make(chan struct{})
    for _, f := range filenames {
        go func(f string) {
            thumbnail.ImageFile(f) // NOTE: ignoring errors
            ch <- struct{}{}
        }(f)
    }
    // Wait for goroutines to complete.
    for range filenames {
        <-ch
    }
}

注意我們將f的值作為一個(gè)顯式的變量傳給了函數(shù),而不是在循環(huán)的閉包中聲明:

for _, f := range filenames {
    go func() {
        thumbnail.ImageFile(f) // NOTE: incorrect!
        // ...
    }()
}

回憶一下之前在5.6.1節(jié)中,匿名函數(shù)中的循環(huán)變量快照問題。上面這個(gè)單獨(dú)的變量f是被所有的匿名函數(shù)值所共享,且會(huì)被連續(xù)的循環(huán)迭代所更新的。當(dāng)新的goroutine開始執(zhí)行字面函數(shù)時(shí),for循環(huán)可能已經(jīng)更新了f并且開始了另一輪的迭代或者(更有可能的)已經(jīng)結(jié)束了整個(gè)循環(huán),所以當(dāng)這些goroutine開始讀取f的值時(shí),它們所看到的值已經(jīng)是slice的最后一個(gè)元素了。顯式地添加這個(gè)參數(shù),我們能夠確保使用的f是當(dāng)go語句執(zhí)行時(shí)的“當(dāng)前”那個(gè)f。

如果我們想要從每一個(gè)worker goroutine往主goroutine中返回值時(shí)該怎么辦呢?當(dāng)我們調(diào)用thumbnail.ImageFile創(chuàng)建文件失敗的時(shí)候,它會(huì)返回一個(gè)錯(cuò)誤。下一個(gè)版本的makeThumbnails會(huì)返回其在做拉伸操作時(shí)接收到的第一個(gè)錯(cuò)誤:

// makeThumbnails4 makes thumbnails for the specified files in parallel.
// It returns an error if any step failed.
func makeThumbnails4(filenames []string) error {
    errors := make(chan error)

    for _, f := range filenames {
        go func(f string) {
            _, err := thumbnail.ImageFile(f)
            errors <- err
        }(f)
    }

    for range filenames {
        if err := <-errors; err != nil {
            return err // NOTE: incorrect: goroutine leak!
        }
    }

    return nil
}

這個(gè)程序有一個(gè)微妙的bug。當(dāng)它遇到第一個(gè)非nil的error時(shí)會(huì)直接將error返回到調(diào)用方,使得沒有一個(gè)goroutine去排空errors channel。這樣剩下的worker goroutine在向這個(gè)channel中發(fā)送值時(shí),都會(huì)永遠(yuǎn)地阻塞下去,并且永遠(yuǎn)都不會(huì)退出。這種情況叫做goroutine泄露(§8.4.4),可能會(huì)導(dǎo)致整個(gè)程序卡住或者跑出out of memory的錯(cuò)誤。

最簡單的解決辦法就是用一個(gè)具有合適大小的buffered channel,這樣這些worker goroutine向channel中發(fā)送錯(cuò)誤時(shí)就不會(huì)被阻塞。(一個(gè)可選的解決辦法是創(chuàng)建一個(gè)另外的goroutine,當(dāng)main goroutine返回第一個(gè)錯(cuò)誤的同時(shí)去排空channel。)

下一個(gè)版本的makeThumbnails使用了一個(gè)buffered channel來返回生成的圖片文件的名字,附帶生成時(shí)的錯(cuò)誤。

// makeThumbnails5 makes thumbnails for the specified files in parallel.
// It returns the generated file names in an arbitrary order,
// or an error if any step failed.
func makeThumbnails5(filenames []string) (thumbfiles []string, err error) {
    type item struct {
        thumbfile string
        err       error
    }

    ch := make(chan item, len(filenames))
    for _, f := range filenames {
        go func(f string) {
            var it item
            it.thumbfile, it.err = thumbnail.ImageFile(f)
            ch <- it
        }(f)
    }

    for range filenames {
        it := <-ch
        if it.err != nil {
            return nil, it.err
        }
        thumbfiles = append(thumbfiles, it.thumbfile)
    }

    return thumbfiles, nil
}

我們最后一個(gè)版本的makeThumbnails返回了新文件們的大小總計(jì)數(shù)(bytes)。和前面的版本都不一樣的一點(diǎn)是我們在這個(gè)版本里沒有把文件名放在slice里,而是通過一個(gè)string的channel傳過來,所以我們無法對循環(huán)的次數(shù)進(jìn)行預(yù)測。

為了知道最后一個(gè)goroutine什么時(shí)候結(jié)束(最后一個(gè)結(jié)束并不一定是最后一個(gè)開始),我們需要一個(gè)遞增的計(jì)數(shù)器,在每一個(gè)goroutine啟動(dòng)時(shí)加一,在goroutine退出時(shí)減一。這需要一種特殊的計(jì)數(shù)器,這個(gè)計(jì)數(shù)器需要在多個(gè)goroutine操作時(shí)做到安全并且提供在其減為零之前一直等待的一種方法。這種計(jì)數(shù)類型被稱為sync.WaitGroup,下面的代碼就用到了這種方法:

// makeThumbnails6 makes thumbnails for each file received from the channel.
// It returns the number of bytes occupied by the files it creates.
func makeThumbnails6(filenames <-chan string) int64 {
    sizes := make(chan int64)
    var wg sync.WaitGroup // number of working goroutines
    for f := range filenames {
        wg.Add(1)
        // worker
        go func(f string) {
            defer wg.Done()
            thumb, err := thumbnail.ImageFile(f)
            if err != nil {
                log.Println(err)
                return
            }
            info, _ := os.Stat(thumb) // OK to ignore error
            sizes <- info.Size()
        }(f)
    }

    // closer
    go func() {
        wg.Wait()
        close(sizes)
    }()

    var total int64
    for size := range sizes {
        total += size
    }
    return total
}

注意Add和Done方法的不對稱。Add是為計(jì)數(shù)器加一,必須在worker goroutine開始之前調(diào)用,而不是在goroutine中;否則的話我們沒辦法確定Add是在"closer" goroutine調(diào)用Wait之前被調(diào)用。并且Add還有一個(gè)參數(shù),但Done卻沒有任何參數(shù);其實(shí)它和Add(-1)是等價(jià)的。我們使用defer來確保計(jì)數(shù)器即使是在出錯(cuò)的情況下依然能夠正確地被減掉。上面的程序代碼結(jié)構(gòu)是當(dāng)我們使用并發(fā)循環(huán),但又不知道迭代次數(shù)時(shí)很通常而且很地道的寫法。

sizes channel攜帶了每一個(gè)文件的大小到main goroutine,在main goroutine中使用了range loop來計(jì)算總和。觀察一下我們是怎樣創(chuàng)建一個(gè)closer goroutine,并讓其在所有worker goroutine們結(jié)束之后再關(guān)閉sizes channel的。兩步操作:wait和close,必須是基于sizes的循環(huán)的并發(fā)??紤]一下另一種方案:如果等待操作被放在了main goroutine中,在循環(huán)之前,這樣的話就永遠(yuǎn)都不會(huì)結(jié)束了,如果在循環(huán)之后,那么又變成了不可達(dá)的部分,因?yàn)闆]有任何東西去關(guān)閉這個(gè)channel,這個(gè)循環(huán)就永遠(yuǎn)都不會(huì)終止。

圖8.5 表明了makethumbnails6函數(shù)中事件的序列??v列表示goroutine。窄線段代表sleep,粗線段代表活動(dòng)。斜線箭頭代表用來同步兩個(gè)goroutine的事件。時(shí)間向下流動(dòng)。注意main goroutine是如何大部分的時(shí)間被喚醒執(zhí)行其range循環(huán),等待worker發(fā)送值或者closer來關(guān)閉channel的。


練習(xí) 8.4: 修改reverb2服務(wù)器,在每一個(gè)連接中使用sync.WaitGroup來計(jì)數(shù)活躍的echo goroutine。當(dāng)計(jì)數(shù)減為零時(shí),關(guān)閉TCP連接的寫入,像練習(xí)8.3中一樣。驗(yàn)證一下你的修改版netcat3客戶端會(huì)一直等待所有的并發(fā)“喊叫”完成,即使是在標(biāo)準(zhǔn)輸入流已經(jīng)關(guān)閉的情況下。

練習(xí) 8.5: 使用一個(gè)已有的CPU綁定的順序程序,比如在3.3節(jié)中我們寫的Mandelbrot程序或者3.2節(jié)中的3-D surface計(jì)算程序,并將他們的主循環(huán)改為并發(fā)形式,使用channel來進(jìn)行通信。在多核計(jì)算機(jī)上這個(gè)程序得到了多少速度上的改進(jìn)?使用多少個(gè)goroutine是最合適的呢?



以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)