原文鏈接:https://gopl-zh.github.io/ch9/ch9-07.html
本節(jié)中我們會(huì)做一個(gè)無阻塞的緩存,這種工具可以幫助我們來解決現(xiàn)實(shí)世界中并發(fā)程序出現(xiàn)但沒有現(xiàn)成的庫可以解決的問題。這個(gè)問題叫作緩存(memoizing)函數(shù)(譯注:Memoization的定義: memoization 一詞是Donald Michie 根據(jù)拉丁語memorandum杜撰的一個(gè)詞。相應(yīng)的動(dòng)詞、過去分詞、ing形式有memoiz、memoized、memoizing),也就是說,我們需要緩存函數(shù)的返回結(jié)果,這樣在對(duì)函數(shù)進(jìn)行調(diào)用的時(shí)候,我們就只需要一次計(jì)算,之后只要返回計(jì)算的結(jié)果就可以了。我們的解決方案會(huì)是并發(fā)安全且會(huì)避免對(duì)整個(gè)緩存加鎖而導(dǎo)致所有操作都去爭一個(gè)鎖的設(shè)計(jì)。
我們將使用下面的httpGetBody函數(shù)作為我們需要緩存的函數(shù)的一個(gè)樣例。這個(gè)函數(shù)會(huì)去進(jìn)行HTTP GET請(qǐng)求并且獲取http響應(yīng)body。對(duì)這個(gè)函數(shù)的調(diào)用本身開銷是比較大的,所以我們盡量避免在不必要的時(shí)候反復(fù)調(diào)用。
func httpGetBody(url string) (interface{}, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}
最后一行稍微隱藏了一些細(xì)節(jié)。ReadAll會(huì)返回兩個(gè)結(jié)果,一個(gè)[]byte數(shù)組和一個(gè)錯(cuò)誤,不過這兩個(gè)對(duì)象可以被賦值給httpGetBody的返回聲明里的interface{}和error類型,所以我們也就可以這樣返回結(jié)果并且不需要額外的工作了。我們?cè)趆ttpGetBody中選用這種返回類型是為了使其可以與緩存匹配。
下面是我們要設(shè)計(jì)的cache的第一個(gè)“草稿”:
gopl.io/ch9/memo1
// Package memo provides a concurrency-unsafe
// memoization of a function of type Func.
package memo
// A Memo caches the results of calling a Func.
type Memo struct {
f Func
cache map[string]result
}
// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)
type result struct {
value interface{}
err error
}
func New(f Func) *Memo {
return &Memo{f: f, cache: make(map[string]result)}
}
// NOTE: not concurrency-safe!
func (memo *Memo) Get(key string) (interface{}, error) {
res, ok := memo.cache[key]
if !ok {
res.value, res.err = memo.f(key)
memo.cache[key] = res
}
return res.value, res.err
}
Memo實(shí)例會(huì)記錄需要緩存的函數(shù)f(類型為Func),以及緩存內(nèi)容(里面是一個(gè)string到result映射的map)。每一個(gè)result都是簡單的函數(shù)返回的值對(duì)兒——一個(gè)值和一個(gè)錯(cuò)誤值。繼續(xù)下去我們會(huì)展示一些Memo的變種,不過所有的例子都會(huì)遵循上面的這些方面。
下面是一個(gè)使用Memo的例子。對(duì)于流入的URL的每一個(gè)元素我們都會(huì)調(diào)用Get,并打印調(diào)用延時(shí)以及其返回的數(shù)據(jù)大小的log:
m := memo.New(httpGetBody)
for url := range incomingURLs() {
start := time.Now()
value, err := m.Get(url)
if err != nil {
log.Print(err)
}
fmt.Printf("%s, %s, %d bytes\n",
url, time.Since(start), len(value.([]byte)))
}
我們可以使用測試包(第11章的主題)來系統(tǒng)地鑒定緩存的效果。從下面的測試輸出,我們可以看到URL流包含了一些重復(fù)的情況,盡管我們第一次對(duì)每一個(gè)URL的(*Memo).Get
的調(diào)用都會(huì)花上幾百毫秒,但第二次就只需要花1毫秒就可以返回完整的數(shù)據(jù)了。
$ go test -v gopl.io/ch9/memo1
=== RUN Test
https://golang.org, 175.026418ms, 7537 bytes
https://godoc.org, 172.686825ms, 6878 bytes
https://play.golang.org, 115.762377ms, 5767 bytes
http://gopl.io, 749.887242ms, 2856 bytes
https://golang.org, 721ns, 7537 bytes
https://godoc.org, 152ns, 6878 bytes
https://play.golang.org, 205ns, 5767 bytes
http://gopl.io, 326ns, 2856 bytes
--- PASS: Test (1.21s)
PASS
ok gopl.io/ch9/memo1 1.257s
這個(gè)測試是順序地去做所有的調(diào)用的。
由于這種彼此獨(dú)立的HTTP請(qǐng)求可以很好地并發(fā),我們可以把這個(gè)測試改成并發(fā)形式??梢允褂胹ync.WaitGroup來等待所有的請(qǐng)求都完成之后再返回。
m := memo.New(httpGetBody)
var n sync.WaitGroup
for url := range incomingURLs() {
n.Add(1)
go func(url string) {
start := time.Now()
value, err := m.Get(url)
if err != nil {
log.Print(err)
}
fmt.Printf("%s, %s, %d bytes\n",
url, time.Since(start), len(value.([]byte)))
n.Done()
}(url)
}
n.Wait()
這次測試跑起來更快了,然而不幸的是貌似這個(gè)測試不是每次都能夠正常工作。我們注意到有一些意料之外的cache miss(緩存未命中),或者命中了緩存但卻返回了錯(cuò)誤的值,或者甚至?xí)苯颖罎ⅰ?
但更糟糕的是,有時(shí)候這個(gè)程序還是能正確的運(yùn)行(譯:也就是最讓人崩潰的偶發(fā)bug),所以我們甚至可能都不會(huì)意識(shí)到這個(gè)程序有bug。但是我們可以使用-race這個(gè)flag來運(yùn)行程序,競爭檢測器(§9.6)會(huì)打印像下面這樣的報(bào)告:
$ go test -run=TestConcurrent -race -v gopl.io/ch9/memo1
=== RUN TestConcurrent
...
WARNING: DATA RACE
Write by goroutine 36:
runtime.mapassign1()
~/go/src/runtime/hashmap.go:411 +0x0
gopl.io/ch9/memo1.(*Memo).Get()
~/gobook2/src/gopl.io/ch9/memo1/memo.go:32 +0x205
...
Previous write by goroutine 35:
runtime.mapassign1()
~/go/src/runtime/hashmap.go:411 +0x0
gopl.io/ch9/memo1.(*Memo).Get()
~/gobook2/src/gopl.io/ch9/memo1/memo.go:32 +0x205
...
Found 1 data race(s)
FAIL gopl.io/ch9/memo1 2.393s
memo.go的32行出現(xiàn)了兩次,說明有兩個(gè)goroutine在沒有同步干預(yù)的情況下更新了cache map。這表明Get不是并發(fā)安全的,存在數(shù)據(jù)競爭。
28 func (memo *Memo) Get(key string) (interface{}, error) {
29 res, ok := memo.cache(key)
30 if !ok {
31 res.value, res.err = memo.f(key)
32 memo.cache[key] = res
33 }
34 return res.value, res.err
35 }
最簡單的使cache并發(fā)安全的方式是使用基于監(jiān)控的同步。只要給Memo加上一個(gè)mutex,在Get的一開始獲取互斥鎖,return的時(shí)候釋放鎖,就可以讓cache的操作發(fā)生在臨界區(qū)內(nèi)了:
gopl.io/ch9/memo2
type Memo struct {
f Func
mu sync.Mutex // guards cache
cache map[string]result
}
// Get is concurrency-safe.
func (memo *Memo) Get(key string) (value interface{}, err error) {
memo.mu.Lock()
res, ok := memo.cache[key]
if !ok {
res.value, res.err = memo.f(key)
memo.cache[key] = res
}
memo.mu.Unlock()
return res.value, res.err
}
測試依然并發(fā)進(jìn)行,但這回競爭檢查器“沉默”了。不幸的是對(duì)于Memo的這一點(diǎn)改變使我們完全喪失了并發(fā)的性能優(yōu)點(diǎn)。每次對(duì)f的調(diào)用期間都會(huì)持有鎖,Get將本來可以并行運(yùn)行的I/O操作串行化了。我們本章的目的是完成一個(gè)無鎖緩存,而不是現(xiàn)在這樣的將所有請(qǐng)求串行化的函數(shù)的緩存。
下一個(gè)Get的實(shí)現(xiàn),調(diào)用Get的goroutine會(huì)兩次獲取鎖:查找階段獲取一次,如果查找沒有返回任何內(nèi)容,那么進(jìn)入更新階段會(huì)再次獲取。在這兩次獲取鎖的中間階段,其它goroutine可以隨意使用cache。
gopl.io/ch9/memo3
func (memo *Memo) Get(key string) (value interface{}, err error) {
memo.mu.Lock()
res, ok := memo.cache[key]
memo.mu.Unlock()
if !ok {
res.value, res.err = memo.f(key)
// Between the two critical sections, several goroutines
// may race to compute f(key) and update the map.
memo.mu.Lock()
memo.cache[key] = res
memo.mu.Unlock()
}
return res.value, res.err
}
這些修改使性能再次得到了提升,但有一些URL被獲取了兩次。這種情況在兩個(gè)以上的goroutine同一時(shí)刻調(diào)用Get來請(qǐng)求同樣的URL時(shí)會(huì)發(fā)生。多個(gè)goroutine一起查詢cache,發(fā)現(xiàn)沒有值,然后一起調(diào)用f這個(gè)慢不拉嘰的函數(shù)。在得到結(jié)果后,也都會(huì)去更新map。其中一個(gè)獲得的結(jié)果會(huì)覆蓋掉另一個(gè)的結(jié)果。
理想情況下是應(yīng)該避免掉多余的工作的。而這種“避免”工作一般被稱為duplicate suppression(重復(fù)抑制/避免)。下面版本的Memo每一個(gè)map元素都是指向一個(gè)條目的指針。每一個(gè)條目包含對(duì)函數(shù)f調(diào)用結(jié)果的內(nèi)容緩存。與之前不同的是這次entry還包含了一個(gè)叫ready的channel。在條目的結(jié)果被設(shè)置之后,這個(gè)channel就會(huì)被關(guān)閉,以向其它goroutine廣播(§8.9)去讀取該條目內(nèi)的結(jié)果是安全的了。
gopl.io/ch9/memo4
type entry struct {
res result
ready chan struct{} // closed when res is ready
}
func New(f Func) *Memo {
return &Memo{f: f, cache: make(map[string]*entry)}
}
type Memo struct {
f Func
mu sync.Mutex // guards cache
cache map[string]*entry
}
func (memo *Memo) Get(key string) (value interface{}, err error) {
memo.mu.Lock()
e := memo.cache[key]
if e == nil {
// This is the first request for this key.
// This goroutine becomes responsible for computing
// the value and broadcasting the ready condition.
e = &entry{ready: make(chan struct{})}
memo.cache[key] = e
memo.mu.Unlock()
e.res.value, e.res.err = memo.f(key)
close(e.ready) // broadcast ready condition
} else {
// This is a repeat request for this key.
memo.mu.Unlock()
<-e.ready // wait for ready condition
}
return e.res.value, e.res.err
}
現(xiàn)在Get函數(shù)包括下面這些步驟了:獲取互斥鎖來保護(hù)共享變量cache map,查詢map中是否存在指定條目,如果沒有找到那么分配空間插入一個(gè)新條目,釋放互斥鎖。如果存在條目的話且其值沒有寫入完成(也就是有其它的goroutine在調(diào)用f這個(gè)慢函數(shù))時(shí),goroutine必須等待值ready之后才能讀到條目的結(jié)果。而想知道是否ready的話,可以直接從ready channel中讀取,由于這個(gè)讀取操作在channel關(guān)閉之前一直是阻塞。
如果沒有條目的話,需要向map中插入一個(gè)沒有準(zhǔn)備好的條目,當(dāng)前正在調(diào)用的goroutine就需要負(fù)責(zé)調(diào)用慢函數(shù)、更新條目以及向其它所有g(shù)oroutine廣播條目已經(jīng)ready可讀的消息了。
條目中的e.res.value和e.res.err變量是在多個(gè)goroutine之間共享的。創(chuàng)建條目的goroutine同時(shí)也會(huì)設(shè)置條目的值,其它goroutine在收到"ready"的廣播消息之后立刻會(huì)去讀取條目的值。盡管會(huì)被多個(gè)goroutine同時(shí)訪問,但卻并不需要互斥鎖。ready channel的關(guān)閉一定會(huì)發(fā)生在其它goroutine接收到廣播事件之前,因此第一個(gè)goroutine對(duì)這些變量的寫操作是一定發(fā)生在這些讀操作之前的。不會(huì)發(fā)生數(shù)據(jù)競爭。
這樣并發(fā)、不重復(fù)、無阻塞的cache就完成了。
上面這樣Memo的實(shí)現(xiàn)使用了一個(gè)互斥量來保護(hù)多個(gè)goroutine調(diào)用Get時(shí)的共享map變量。不妨把這種設(shè)計(jì)和前面提到的把map變量限制在一個(gè)單獨(dú)的monitor goroutine的方案做一些對(duì)比,后者在調(diào)用Get時(shí)需要發(fā)消息。
Func、result和entry的聲明和之前保持一致:
// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)
// A result is the result of calling a Func.
type result struct {
value interface{}
err error
}
type entry struct {
res result
ready chan struct{} // closed when res is ready
}
然而Memo類型現(xiàn)在包含了一個(gè)叫做requests的channel,Get的調(diào)用方用這個(gè)channel來和monitor goroutine來通信。requests channel中的元素類型是request。Get的調(diào)用方會(huì)把這個(gè)結(jié)構(gòu)中的兩組key都填充好,實(shí)際上用這兩個(gè)變量來對(duì)函數(shù)進(jìn)行緩存的。另一個(gè)叫response的channel會(huì)被拿來發(fā)送響應(yīng)結(jié)果。這個(gè)channel只會(huì)傳回一個(gè)單獨(dú)的值。
gopl.io/ch9/memo5
// A request is a message requesting that the Func be applied to key.
type request struct {
key string
response chan<- result // the client wants a single result
}
type Memo struct{ requests chan request }
// New returns a memoization of f. Clients must subsequently call Close.
func New(f Func) *Memo {
memo := &Memo{requests: make(chan request)}
go memo.server(f)
return memo
}
func (memo *Memo) Get(key string) (interface{}, error) {
response := make(chan result)
memo.requests <- request{key, response}
res := <-response
return res.value, res.err
}
func (memo *Memo) Close() { close(memo.requests) }
上面的Get方法,會(huì)創(chuàng)建一個(gè)response channel,把它放進(jìn)request結(jié)構(gòu)中,然后發(fā)送給monitor goroutine,然后馬上又會(huì)接收它。
cache變量被限制在了monitor goroutine ``(*Memo).server`中,下面會(huì)看到。monitor會(huì)在循環(huán)中一直讀取請(qǐng)求,直到request channel被Close方法關(guān)閉。每一個(gè)請(qǐng)求都會(huì)去查詢cache,如果沒有找到條目的話,那么就會(huì)創(chuàng)建/插入一個(gè)新的條目。
func (memo *Memo) server(f Func) {
cache := make(map[string]*entry)
for req := range memo.requests {
e := cache[req.key]
if e == nil {
// This is the first request for this key.
e = &entry{ready: make(chan struct{})}
cache[req.key] = e
go e.call(f, req.key) // call f(key)
}
go e.deliver(req.response)
}
}
func (e *entry) call(f Func, key string) {
// Evaluate the function.
e.res.value, e.res.err = f(key)
// Broadcast the ready condition.
close(e.ready)
}
func (e *entry) deliver(response chan<- result) {
// Wait for the ready condition.
<-e.ready
// Send the result to the client.
response <- e.res
}
和基于互斥量的版本類似,第一個(gè)對(duì)某個(gè)key的請(qǐng)求需要負(fù)責(zé)去調(diào)用函數(shù)f并傳入這個(gè)key,將結(jié)果存在條目里,并關(guān)閉ready channel來廣播條目的ready消息。使用(*entry).call
來完成上述工作。
緊接著對(duì)同一個(gè)key的請(qǐng)求會(huì)發(fā)現(xiàn)map中已經(jīng)有了存在的條目,然后會(huì)等待結(jié)果變?yōu)閞eady,并將結(jié)果從response發(fā)送給客戶端的goroutien。上述工作是用(*entry).deliver
來完成的。對(duì)call和deliver方法的調(diào)用必須讓它們?cè)谧约旱膅oroutine中進(jìn)行以確保monitor goroutines不會(huì)因此而被阻塞住而沒法處理新的請(qǐng)求。
這個(gè)例子說明我們無論用上鎖,還是通信來建立并發(fā)程序都是可行的。
上面的兩種方案并不好說特定情境下哪種更好,不過了解他們還是有價(jià)值的。有時(shí)候從一種方式切換到另一種可以使你的代碼更為簡潔。(譯注:不是說好的golang推崇通信并發(fā)么。)
練習(xí) 9.3: 擴(kuò)展Func類型和(*Memo).Get
方法,支持調(diào)用方提供一個(gè)可選的done channel,使其具備通過該channel來取消整個(gè)操作的能力(§8.9)。一個(gè)被取消了的Func的調(diào)用結(jié)果不應(yīng)該被緩存。
更多建議: