W3Cschool
恭喜您成為首批注冊用戶
獲得88經(jīng)驗(yàn)值獎(jiǎng)勵(lì)
原文鏈接:https://gopl-zh.github.io/ch8/ch8-05.html
本節(jié)中,我們會(huì)探索一些用來在并行時(shí)循環(huán)迭代的常見并發(fā)模型。我們會(huì)探究從全尺寸圖片生成一些縮略圖的問題。gopl.io/ch8/thumbnail包提供了ImageFile函數(shù)來幫我們拉伸圖片。我們不會(huì)說明這個(gè)函數(shù)的實(shí)現(xiàn),只需要從gopl.io下載它。
gopl.io/ch8/thumbnail
package thumbnail
// ImageFile reads an image from infile and writes
// a thumbnail-size version of it in the same directory.
// It returns the generated file name, e.g., "foo.thumb.jpg".
func ImageFile(infile string) (string, error)
下面的程序會(huì)循環(huán)迭代一些圖片文件名,并為每一張圖片生成一個(gè)縮略圖:
gopl.io/ch8/thumbnail
// makeThumbnails makes thumbnails of the specified files.
func makeThumbnails(filenames []string) {
for _, f := range filenames {
if _, err := thumbnail.ImageFile(f); err != nil {
log.Println(err)
}
}
}
顯然我們處理文件的順序無關(guān)緊要,因?yàn)槊恳粋€(gè)圖片的拉伸操作和其它圖片的處理操作都是彼此獨(dú)立的。像這種子問題都是完全彼此獨(dú)立的問題被叫做易并行問題(譯注:embarrassingly parallel,直譯的話更像是尷尬并行)。易并行問題是最容易被實(shí)現(xiàn)成并行的一類問題(廢話),并且最能夠享受到并發(fā)帶來的好處,能夠隨著并行的規(guī)模線性地?cái)U(kuò)展。
下面讓我們并行地執(zhí)行這些操作,從而將文件IO的延遲隱藏掉,并用上多核cpu的計(jì)算能力來拉伸圖像。我們的第一個(gè)并發(fā)程序只是使用了一個(gè)go關(guān)鍵字。這里我們先忽略掉錯(cuò)誤,之后再進(jìn)行處理。
// NOTE: incorrect!
func makeThumbnails2(filenames []string) {
for _, f := range filenames {
go thumbnail.ImageFile(f) // NOTE: ignoring errors
}
}
這個(gè)版本運(yùn)行的實(shí)在有點(diǎn)太快,實(shí)際上,由于它比最早的版本使用的時(shí)間要短得多,即使當(dāng)文件名的slice中只包含有一個(gè)元素。這就有點(diǎn)奇怪了,如果程序沒有并發(fā)執(zhí)行的話,那為什么一個(gè)并發(fā)的版本還是要快呢?答案其實(shí)是makeThumbnails在它還沒有完成工作之前就已經(jīng)返回了。它啟動(dòng)了所有的goroutine,每一個(gè)文件名對應(yīng)一個(gè),但沒有等待它們一直到執(zhí)行完畢。
沒有什么直接的辦法能夠等待goroutine完成,但是我們可以改變goroutine里的代碼讓其能夠?qū)⑼瓿汕闆r報(bào)告給外部的goroutine知曉,使用的方式是向一個(gè)共享的channel中發(fā)送事件。因?yàn)槲覀円呀?jīng)確切地知道有l(wèi)en(filenames)個(gè)內(nèi)部goroutine,所以外部的goroutine只需要在返回之前對這些事件計(jì)數(shù)。
// makeThumbnails3 makes thumbnails of the specified files in parallel.
func makeThumbnails3(filenames []string) {
ch := make(chan struct{})
for _, f := range filenames {
go func(f string) {
thumbnail.ImageFile(f) // NOTE: ignoring errors
ch <- struct{}{}
}(f)
}
// Wait for goroutines to complete.
for range filenames {
<-ch
}
}
注意我們將f的值作為一個(gè)顯式的變量傳給了函數(shù),而不是在循環(huán)的閉包中聲明:
for _, f := range filenames {
go func() {
thumbnail.ImageFile(f) // NOTE: incorrect!
// ...
}()
}
回憶一下之前在5.6.1節(jié)中,匿名函數(shù)中的循環(huán)變量快照問題。上面這個(gè)單獨(dú)的變量f是被所有的匿名函數(shù)值所共享,且會(huì)被連續(xù)的循環(huán)迭代所更新的。當(dāng)新的goroutine開始執(zhí)行字面函數(shù)時(shí),for循環(huán)可能已經(jīng)更新了f并且開始了另一輪的迭代或者(更有可能的)已經(jīng)結(jié)束了整個(gè)循環(huán),所以當(dāng)這些goroutine開始讀取f的值時(shí),它們所看到的值已經(jīng)是slice的最后一個(gè)元素了。顯式地添加這個(gè)參數(shù),我們能夠確保使用的f是當(dāng)go語句執(zhí)行時(shí)的“當(dāng)前”那個(gè)f。
如果我們想要從每一個(gè)worker goroutine往主goroutine中返回值時(shí)該怎么辦呢?當(dāng)我們調(diào)用thumbnail.ImageFile創(chuàng)建文件失敗的時(shí)候,它會(huì)返回一個(gè)錯(cuò)誤。下一個(gè)版本的makeThumbnails會(huì)返回其在做拉伸操作時(shí)接收到的第一個(gè)錯(cuò)誤:
// makeThumbnails4 makes thumbnails for the specified files in parallel.
// It returns an error if any step failed.
func makeThumbnails4(filenames []string) error {
errors := make(chan error)
for _, f := range filenames {
go func(f string) {
_, err := thumbnail.ImageFile(f)
errors <- err
}(f)
}
for range filenames {
if err := <-errors; err != nil {
return err // NOTE: incorrect: goroutine leak!
}
}
return nil
}
這個(gè)程序有一個(gè)微妙的bug。當(dāng)它遇到第一個(gè)非nil的error時(shí)會(huì)直接將error返回到調(diào)用方,使得沒有一個(gè)goroutine去排空errors channel。這樣剩下的worker goroutine在向這個(gè)channel中發(fā)送值時(shí),都會(huì)永遠(yuǎn)地阻塞下去,并且永遠(yuǎn)都不會(huì)退出。這種情況叫做goroutine泄露(§8.4.4),可能會(huì)導(dǎo)致整個(gè)程序卡住或者跑出out of memory的錯(cuò)誤。
最簡單的解決辦法就是用一個(gè)具有合適大小的buffered channel,這樣這些worker goroutine向channel中發(fā)送錯(cuò)誤時(shí)就不會(huì)被阻塞。(一個(gè)可選的解決辦法是創(chuàng)建一個(gè)另外的goroutine,當(dāng)main goroutine返回第一個(gè)錯(cuò)誤的同時(shí)去排空channel。)
下一個(gè)版本的makeThumbnails使用了一個(gè)buffered channel來返回生成的圖片文件的名字,附帶生成時(shí)的錯(cuò)誤。
// makeThumbnails5 makes thumbnails for the specified files in parallel.
// It returns the generated file names in an arbitrary order,
// or an error if any step failed.
func makeThumbnails5(filenames []string) (thumbfiles []string, err error) {
type item struct {
thumbfile string
err error
}
ch := make(chan item, len(filenames))
for _, f := range filenames {
go func(f string) {
var it item
it.thumbfile, it.err = thumbnail.ImageFile(f)
ch <- it
}(f)
}
for range filenames {
it := <-ch
if it.err != nil {
return nil, it.err
}
thumbfiles = append(thumbfiles, it.thumbfile)
}
return thumbfiles, nil
}
我們最后一個(gè)版本的makeThumbnails返回了新文件們的大小總計(jì)數(shù)(bytes)。和前面的版本都不一樣的一點(diǎn)是我們在這個(gè)版本里沒有把文件名放在slice里,而是通過一個(gè)string的channel傳過來,所以我們無法對循環(huán)的次數(shù)進(jìn)行預(yù)測。
為了知道最后一個(gè)goroutine什么時(shí)候結(jié)束(最后一個(gè)結(jié)束并不一定是最后一個(gè)開始),我們需要一個(gè)遞增的計(jì)數(shù)器,在每一個(gè)goroutine啟動(dòng)時(shí)加一,在goroutine退出時(shí)減一。這需要一種特殊的計(jì)數(shù)器,這個(gè)計(jì)數(shù)器需要在多個(gè)goroutine操作時(shí)做到安全并且提供在其減為零之前一直等待的一種方法。這種計(jì)數(shù)類型被稱為sync.WaitGroup,下面的代碼就用到了這種方法:
// makeThumbnails6 makes thumbnails for each file received from the channel.
// It returns the number of bytes occupied by the files it creates.
func makeThumbnails6(filenames <-chan string) int64 {
sizes := make(chan int64)
var wg sync.WaitGroup // number of working goroutines
for f := range filenames {
wg.Add(1)
// worker
go func(f string) {
defer wg.Done()
thumb, err := thumbnail.ImageFile(f)
if err != nil {
log.Println(err)
return
}
info, _ := os.Stat(thumb) // OK to ignore error
sizes <- info.Size()
}(f)
}
// closer
go func() {
wg.Wait()
close(sizes)
}()
var total int64
for size := range sizes {
total += size
}
return total
}
注意Add和Done方法的不對稱。Add是為計(jì)數(shù)器加一,必須在worker goroutine開始之前調(diào)用,而不是在goroutine中;否則的話我們沒辦法確定Add是在"closer" goroutine調(diào)用Wait之前被調(diào)用。并且Add還有一個(gè)參數(shù),但Done卻沒有任何參數(shù);其實(shí)它和Add(-1)是等價(jià)的。我們使用defer來確保計(jì)數(shù)器即使是在出錯(cuò)的情況下依然能夠正確地被減掉。上面的程序代碼結(jié)構(gòu)是當(dāng)我們使用并發(fā)循環(huán),但又不知道迭代次數(shù)時(shí)很通常而且很地道的寫法。
sizes channel攜帶了每一個(gè)文件的大小到main goroutine,在main goroutine中使用了range loop來計(jì)算總和。觀察一下我們是怎樣創(chuàng)建一個(gè)closer goroutine,并讓其在所有worker goroutine們結(jié)束之后再關(guān)閉sizes channel的。兩步操作:wait和close,必須是基于sizes的循環(huán)的并發(fā)??紤]一下另一種方案:如果等待操作被放在了main goroutine中,在循環(huán)之前,這樣的話就永遠(yuǎn)都不會(huì)結(jié)束了,如果在循環(huán)之后,那么又變成了不可達(dá)的部分,因?yàn)闆]有任何東西去關(guān)閉這個(gè)channel,這個(gè)循環(huán)就永遠(yuǎn)都不會(huì)終止。
圖8.5 表明了makethumbnails6函數(shù)中事件的序列??v列表示goroutine。窄線段代表sleep,粗線段代表活動(dòng)。斜線箭頭代表用來同步兩個(gè)goroutine的事件。時(shí)間向下流動(dòng)。注意main goroutine是如何大部分的時(shí)間被喚醒執(zhí)行其range循環(huán),等待worker發(fā)送值或者closer來關(guān)閉channel的。
練習(xí) 8.4: 修改reverb2服務(wù)器,在每一個(gè)連接中使用sync.WaitGroup來計(jì)數(shù)活躍的echo goroutine。當(dāng)計(jì)數(shù)減為零時(shí),關(guān)閉TCP連接的寫入,像練習(xí)8.3中一樣。驗(yàn)證一下你的修改版netcat3客戶端會(huì)一直等待所有的并發(fā)“喊叫”完成,即使是在標(biāo)準(zhǔn)輸入流已經(jīng)關(guān)閉的情況下。
練習(xí) 8.5: 使用一個(gè)已有的CPU綁定的順序程序,比如在3.3節(jié)中我們寫的Mandelbrot程序或者3.2節(jié)中的3-D surface計(jì)算程序,并將他們的主循環(huán)改為并發(fā)形式,使用channel來進(jìn)行通信。在多核計(jì)算機(jī)上這個(gè)程序得到了多少速度上的改進(jìn)?使用多少個(gè)goroutine是最合適的呢?
Copyright©2021 w3cschool編程獅|閩ICP備15016281號(hào)-3|閩公網(wǎng)安備35020302033924號(hào)
違法和不良信息舉報(bào)電話:173-0602-2364|舉報(bào)郵箱:jubao@eeedong.com
掃描二維碼
下載編程獅App
編程獅公眾號(hào)
聯(lián)系方式:
更多建議: