Go語言是一門天然支持并發(fā)的編程語言。 通過使用?go
?關(guān)鍵字,我們可以很輕松地創(chuàng)建協(xié)程;通過使用通道和Go中提供的其它各種同步技術(shù),并發(fā)編程變得簡單、輕松和有趣。
另一方面,Go并不阻止程序員在并發(fā)編程中因為粗心或者經(jīng)驗不足而犯錯。 本文的余下部分將展示一些常見的并發(fā)錯誤,來幫助Go程序員在實踐中避免這些錯誤。
我們已經(jīng)知道,源文件中的代碼行在運行時刻并非總是按照它們的出現(xiàn)次序被執(zhí)行。
下面這個示例程序犯了兩個錯誤:
b
的讀取和匿名協(xié)程中的對變量b
的寫入可能會產(chǎn)生數(shù)據(jù)競爭;b == true
成立并不能確保條件a != nil
也成立。 編譯器和CPU可能會對調(diào)整此程序中匿名協(xié)程中的某些指令的順序已獲取更快的執(zhí)行速度。 所以,站在主協(xié)程的視角看,對變量b
的賦值可能會發(fā)生在對變量a
的賦值之前,這將造成在修改a
的元素時a
依然為一個nil切片。package main
import (
"time"
"runtime"
)
func main() {
var a []int // nil
var b bool // false
// 一個匿名協(xié)程。
go func () {
a = make([]int, 3)
b = true // 寫入b
}()
for !b { // 讀取b
time.Sleep(time.Second)
runtime.Gosched()
}
a[0], a[1], a[2] = 0, 1, 2 // 可能會發(fā)生恐慌
}
上面這個程序可能在很多計算機上運行良好,但是可能會在某些計算機上因為恐慌而崩潰退出;或者使用某些編譯器編譯的時候運行良好,但使用另外的某個編譯器編譯的時候?qū)⒃斐沙绦蜻\行時崩潰退出。
我們應(yīng)該使用通道或者sync
標準庫包中的同步技術(shù)來確保內(nèi)存順序。比如:
package main
func main() {
var a []int = nil
c := make(chan struct{})
go func () {
a = make([]int, 3)
c <- struct{}{}
}()
<-c
a[0], a[1], a[2] = 0, 1, 2 // 絕不會造成恐慌
}
讓我們看一個簡單的例子:
package main
import (
"fmt"
"time"
)
func main() {
var x = 123
go func() {
x = 789 // 寫入x
}()
time.Sleep(time.Second)
fmt.Println(x) // 讀取x
}
我們期望著此程序打印出789
。 事實上,則其運行結(jié)果常常正如我們所期待的。 但是,此程序中的同步處理實現(xiàn)的正確嗎?否!原因很簡單,Go運行時并不能保證對x
的寫入一定發(fā)生在對x
的讀取之前。 在某些特定的情形下,比如CPU資源被很一些其它計算密集的程序所占用,則對x
的寫入有可能發(fā)生在對x
的讀取之后。 因此,我們不應(yīng)該在正式的項目中使用time.Sleep
調(diào)用來做同步。
讓我們看另一個簡單的例子:
package main
import (
"fmt"
"time"
)
var x = 0
func main() {
var num = 123
var p = &num
c := make(chan int)
go func() {
c <- *p + x
}()
time.Sleep(time.Second)
num = 789
fmt.Println(<-c)
}
你覺得此程序會輸出什么?123
還是789
? 事實上,它的輸出是和具體使用的編譯器相關(guān)的。 對于標準編譯器1.19版本來說,它很可能輸出123
。 但是從理論上說,它也可能輸出789
。
讓我們將此例中的c <- *p + x
一行換成c <- *p
,然后重新運行它,你將會發(fā)現(xiàn)它的輸出變成了789
(如果它使用標準編譯器1.19版本編譯的話)。 重申一次,此結(jié)果是和具體使用的編譯器和編譯器的版本相關(guān)的。
是的,此程序中存在數(shù)據(jù)競爭。表達式*p
的估值可能發(fā)生在賦值num = 789
之前、之后、或者同時。 time.Sleep
調(diào)用并不能保證*p
的估值發(fā)生在此賦值之后。
對于這個特定的例子,我們應(yīng)該將欲發(fā)送的值在開啟新協(xié)程之前存儲在一個臨時變量中來避免數(shù)據(jù)競爭。
...
tmp := *p
go func() {
c <- tmp
}()
...
有很多原因?qū)е履硞€協(xié)程永久阻塞,比如:
除了有時我們故意地將主協(xié)程永久阻塞以防止程序退出外,其它大多數(shù)造成協(xié)程永久阻塞的情況都不是我們所期待的。 Go運行時很難分辨出一個處于阻塞狀態(tài)的協(xié)程是否將永久阻塞下去,所以Go運行時不會釋放永久處于阻塞狀態(tài)的協(xié)程占用的資源。
在采用最快回應(yīng)通道用例中,如果被當作future/promise來用的通道的容量不足夠大,則較慢回應(yīng)的協(xié)程在準備發(fā)送回應(yīng)結(jié)果時將永久阻塞。 比如,下面的例子中,每個請求將導(dǎo)致4個協(xié)程永久阻塞。
func request() int {
c := make(chan int)
for i := 0; i < 5; i++ {
i := i
go func() {
c <- i // 4個協(xié)程將永久阻塞在這里
}()
}
return <-c
}
為了防止有4個協(xié)程永久阻塞,被當作future/promise使用的通道的容量必須至少為4
.
在第二種“采用最快回應(yīng)”實現(xiàn)方法中,如果被當作future/promise使用的通道是一個非緩沖通道(如下面的代碼所示),則有可能導(dǎo)致其通道的接收者可能會錯過所有的回應(yīng)而導(dǎo)致處于永久阻塞狀態(tài)。
func request() int {
c := make(chan int)
for i := 0; i < 5; i++ {
i := i
go func() {
select {
case c <- i:
default:
}
}()
}
return <-c // 有可能永久阻塞在此
}
接收者協(xié)程可能會永久阻塞的原因是如果5個嘗試發(fā)送操作都發(fā)生在接收操作<-c
準備好之前,亦即5個個嘗試發(fā)送操作都失敗了,則接收者協(xié)程將永遠無值可接收(從而將處于永久阻塞狀態(tài))。
將通道c
改為一個緩沖通道,則至少會有一個嘗試發(fā)送將成功,從而接收者協(xié)程肯定不會永久阻塞。
在實踐中,sync
標準庫包中的類型(除了Locker
接口類型)的值不應(yīng)該被復(fù)制。 我們只應(yīng)該復(fù)制它們的指針值。
下面是一個有問題的并發(fā)編程的例子。 在此例子中,當Counter.Value
方法被調(diào)用時,一個Counter
屬主值將被復(fù)制,此屬主值的字段Mutex
也將被一同復(fù)制。 此復(fù)制并沒有被同步保護,因此復(fù)制結(jié)果可能是不完整的,并非被復(fù)制的屬主值的一個快照。 即使此Mutex
字段得以僥幸完整復(fù)制,它的副本所保護的是對字段n
的一個副本的訪問,因此一般是沒有意義的。
import "sync"
type Counter struct {
sync.Mutex
n int64
}
// 此方法實現(xiàn)是沒問題的。
func (c *Counter) Increase(d int64) (r int64) {
c.Lock()
c.n += d
r = c.n
c.Unlock()
return
}
// 此方法的實現(xiàn)是有問題的。當它被調(diào)用時,
// 一個Counter屬主值將被復(fù)制。
func (c Counter) Value() (r int64) {
c.Lock()
r = c.n
c.Unlock()
return
}
我們應(yīng)該將Value
方法的屬主參數(shù)類型更改為指針類型*Counter
來避免復(fù)制sync.Mutex
值。
Go官方工具鏈中提供的go vet
命令將提示此例中的Value
方法的聲明可能是一個潛在的邏輯錯誤。
每個sync.WaitGroup
值內(nèi)部維護著一個計數(shù)。此計數(shù)的初始值為0。 如果一個sync.WaitGroup
值的Wait
方法在此計數(shù)為0的時候被調(diào)用,則此調(diào)用不會阻塞,否則此調(diào)用將一直阻塞到此計數(shù)變?yōu)?為止。
為了讓一個WaitGroup
值的使用有意義,在此值的計數(shù)為0的情況下,對它的下一次Add
方法的調(diào)用必須出現(xiàn)在對它的下一次Wait
方法的調(diào)用之前。
比如,在下面的例子中,Add
方法的調(diào)用位置是不合適的。 此例子程序的打印結(jié)果并不總是100
,而可能是0
到100
間的任何一個值。 原因是沒有任何一個Add
方法調(diào)用可以確保發(fā)生在唯一的Wait
方法調(diào)用之前,結(jié)果導(dǎo)致沒有任何一個Done
方法調(diào)用可以確保發(fā)生在唯一的Wait
方法調(diào)用返回之前。
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var wg sync.WaitGroup
var x int32 = 0
for i := 0; i < 100; i++ {
go func() {
wg.Add(1)
atomic.AddInt32(&x, 1)
wg.Done()
}()
}
fmt.Println("等待片刻...")
wg.Wait()
fmt.Println(atomic.LoadInt32(&x))
}
我們應(yīng)該將對Add
方法的調(diào)用移出匿名協(xié)程之外,像下面這樣,使得任何一個Done
方法調(diào)用都確保發(fā)生在唯一的Wait
方法調(diào)用返回之前。
...
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
atomic.AddInt32(&x, 1)
wg.Done()
}()
}
...
從通道用例大全一文中,我們了解到一些函數(shù)可以返回用做future/promise的通道結(jié)果。 假設(shè)fa
和fb
是這樣的兩個函數(shù),則下面的調(diào)用方式并沒有體現(xiàn)出這兩個函數(shù)的真正價值。
doSomethingWithFutureArguments(<-fa(), <-fb())
在上面這行調(diào)用中,兩個實參值(promise回應(yīng)結(jié)果)的生成實際上是串行進行的,future/promise的價值沒有體現(xiàn)出來。
我們應(yīng)該像下面這樣調(diào)用這兩個函數(shù)來并發(fā)生成兩個回應(yīng)結(jié)果:
ca, cb := fa(), fb()
doSomethingWithFutureArguments(<-ca, <-cb)
Go程序員常犯的一個錯誤是關(guān)閉一個后續(xù)可能還會有協(xié)程向其發(fā)送數(shù)據(jù)的通道。 當向一個已關(guān)閉的通道發(fā)送數(shù)據(jù)的時候,一個恐慌將產(chǎn)生。
這樣的錯誤曾經(jīng)發(fā)生在一些很有名的項目中,比如Kubernetes項目中的這個bug和這個bug。
請閱讀此篇文章來了解如何安全和優(yōu)雅地關(guān)閉通道。
64位非方法原子操作中涉及到的實參地址必須為8字節(jié)對齊的。不滿足此條件的64位原子操作將造成一個恐慌。 對于標準編譯器,這樣的情形只可能發(fā)生在32位的架構(gòu)中。 從Go 1.19版本開始,我們可以使用64位方法原子操作來避免這一缺陷。 請閱讀內(nèi)存布局一文來獲知如何確保讓64位的整數(shù)值的地址在32位的架構(gòu)中8字節(jié)對齊。
time
標準庫包中的After
函數(shù)返回一個用做延遲通知的通道。 此函數(shù)給并發(fā)編程帶來了很多便利,但是它的每個調(diào)用都需要創(chuàng)建一個time.Timer
值,此新創(chuàng)建的Timer
值在傳遞給After
函數(shù)調(diào)用的時長(實參)內(nèi)肯定不會被垃圾回收。
如果此函數(shù)在某個時段內(nèi)被多次頻繁調(diào)用,則可能導(dǎo)致積累很多尚未過期的Timer
值從而造成大量的內(nèi)存和計算消耗。
比如在下面這個例子中,如果longRunning
函數(shù)被調(diào)用并且在一分鐘內(nèi)有一百萬條消息到達, 那么在某個特定的很小時間段(大概若干秒)內(nèi)將存在一百萬個活躍的Timer
值,即使其中只有一個是真正有用的。
import (
"fmt"
"time"
)
// 如果某兩個連續(xù)的消息的間隔大于一分鐘,此函數(shù)將返回。
func longRunning(messages <-chan string) {
for {
select {
case <-time.After(time.Minute):
return
case msg := <-messages:
fmt.Println(msg)
}
}
}
為了避免太多的Timer
值被創(chuàng)建,我們應(yīng)該只使用(并復(fù)用)一個Timer
值,像下面這樣:
func longRunning(messages <-chan string) {
timer := time.NewTimer(time.Minute)
defer timer.Stop()
for {
select {
case <-timer.C: // 過期了
return
case msg := <-messages:
fmt.Println(msg)
// 此if代碼塊很重要。
if !timer.Stop() {
<-timer.C
}
}
// 必須重置以復(fù)用。
timer.Reset(time.Minute)
}
}
注意,此示例中的if
代碼塊用來舍棄一個可能在執(zhí)行第二個分支代碼塊的時候發(fā)送過來的超時通知。
一個典型的time.Timer
的使用已經(jīng)在上一節(jié)中展示了。一些解釋:
Timer
值已經(jīng)過期或者已經(jīng)被終止(stopped),則相應(yīng)的Stop
方法調(diào)用返回false
。
在此Timer
值尚未終止的時候,Stop
方法調(diào)用返回false
只能意味著此Timer
值已經(jīng)過期。
Timer
值被終止之后,它的通道字段C
最多只能含有一個過期的通知。
Timer
終止(stopped)之后并且在重置和重用此Timer
值之前,我們應(yīng)該確保此Timer
值中肯定不存在過期的通知。
這就是上一節(jié)中的例子中的if
代碼塊的意義所在。
一個*Timer
值的Reset
方法必須在對應(yīng)Timer
值過期或者終止之后才能被調(diào)用; 否則,此Reset
方法調(diào)用和一個可能的向此Timer
值的C
通道字段的發(fā)送通知操作產(chǎn)生數(shù)據(jù)競爭。
如果上一節(jié)中的例子中的select
流程控制代碼塊中的第一個分支被選中,則這表示相應(yīng)的Timer
值已經(jīng)過期,所以我們不必終止它。 但是我們必須在第二個分支中通過終止此Timer
以檢查此Timer
中是否存在一個過期的通知。 如果確實有一個過期的通知,我們必須在重用這個Timer
之前將此過期的通知取出;否則,此過期的通知將下一個循環(huán)步導(dǎo)致在第一個分支立即被選中。
比如,下面這個程序?qū)⒃谶\行后大概一秒鐘(而不是十秒鐘)后退出。 而且此程序存在著潛在的數(shù)據(jù)競爭。
package main
import (
"fmt"
"time"
)
func main() {
start := time.Now()
timer := time.NewTimer(time.Second/2)
select {
case <-timer.C:
default:
time.Sleep(time.Second) // 此分支被選中的可能性較大
}
timer.Reset(time.Second * 10) // 可能數(shù)據(jù)競爭
<-timer.C
fmt.Println(time.Since(start)) // 大約1s
}
當一個time.Timer
值不再被使用后,我們不必(但是推薦)終止之。
在多個協(xié)程中使用同一個time.Timer
值比較容易寫出不當?shù)牟l(fā)代碼,所以盡量不要跨協(xié)程使用一個Timer
值。
我們不應(yīng)該依賴于time.Timer
的Reset
方法的返回值。此返回值只要是為了歷史兼容性而存在的。
更多建議: