Go語言 一些常見并發(fā)編程錯誤

2023-02-16 17:40 更新

Go語言是一門天然支持并發(fā)的編程語言。 通過使用?go?關(guān)鍵字,我們可以很輕松地創(chuàng)建協(xié)程;通過使用通道Go中提供的其它各種同步技術(shù),并發(fā)編程變得簡單、輕松和有趣。

另一方面,Go并不阻止程序員在并發(fā)編程中因為粗心或者經(jīng)驗不足而犯錯。 本文的余下部分將展示一些常見的并發(fā)錯誤,來幫助Go程序員在實踐中避免這些錯誤。

當需要同步的時候沒有同步

我們已經(jīng)知道,源文件中的代碼行在運行時刻并非總是按照它們的出現(xiàn)次序被執(zhí)行。

下面這個示例程序犯了兩個錯誤:

  • 首先,主協(xié)程中對變量b的讀取和匿名協(xié)程中的對變量b的寫入可能會產(chǎn)生數(shù)據(jù)競爭;
  • 其次,在主協(xié)程中,條件b == true成立并不能確保條件a != nil也成立。 編譯器和CPU可能會對調(diào)整此程序中匿名協(xié)程中的某些指令的順序已獲取更快的執(zhí)行速度。 所以,站在主協(xié)程的視角看,對變量b的賦值可能會發(fā)生在對變量a的賦值之前,這將造成在修改a的元素時a依然為一個nil切片。
package main

import (
	"time"
	"runtime"
)

func main() {
	var a []int // nil
	var b bool  // false

	// 一個匿名協(xié)程。
	go func () {
		a = make([]int, 3)
		b = true // 寫入b
	}()

	for !b { // 讀取b
		time.Sleep(time.Second)
		runtime.Gosched()
	}
	a[0], a[1], a[2] = 0, 1, 2 // 可能會發(fā)生恐慌
}

上面這個程序可能在很多計算機上運行良好,但是可能會在某些計算機上因為恐慌而崩潰退出;或者使用某些編譯器編譯的時候運行良好,但使用另外的某個編譯器編譯的時候?qū)⒃斐沙绦蜻\行時崩潰退出。

我們應(yīng)該使用通道或者sync標準庫包中的同步技術(shù)來確保內(nèi)存順序。比如:

package main

func main() {
	var a []int = nil
	c := make(chan struct{})

	go func () {
		a = make([]int, 3)
		c <- struct{}{}
	}()

	<-c
	a[0], a[1], a[2] = 0, 1, 2 // 絕不會造成恐慌
}

使用time.Sleep調(diào)用來做同步

讓我們看一個簡單的例子:

package main

import (
	"fmt"
	"time"
)

func main() {
	var x = 123

	go func() {
		x = 789 // 寫入x
	}()

	time.Sleep(time.Second)
	fmt.Println(x) // 讀取x
}

我們期望著此程序打印出789。 事實上,則其運行結(jié)果常常正如我們所期待的。 但是,此程序中的同步處理實現(xiàn)的正確嗎?否!原因很簡單,Go運行時并不能保證對x的寫入一定發(fā)生在對x的讀取之前。 在某些特定的情形下,比如CPU資源被很一些其它計算密集的程序所占用,則對x的寫入有可能發(fā)生在對x的讀取之后。 因此,我們不應(yīng)該在正式的項目中使用time.Sleep調(diào)用來做同步。

讓我們看另一個簡單的例子:

package main

import (
	"fmt"
	"time"
)

var x = 0

func main() {
	var num = 123
	var p = &num

	c := make(chan int)

	go func() {
		c <- *p + x
	}()

	time.Sleep(time.Second)
	num = 789
	fmt.Println(<-c)
}

你覺得此程序會輸出什么?123還是789? 事實上,它的輸出是和具體使用的編譯器相關(guān)的。 對于標準編譯器1.19版本來說,它很可能輸出123。 但是從理論上說,它也可能輸出789。

讓我們將此例中的c <- *p + x一行換成c <- *p,然后重新運行它,你將會發(fā)現(xiàn)它的輸出變成了789(如果它使用標準編譯器1.19版本編譯的話)。 重申一次,此結(jié)果是和具體使用的編譯器和編譯器的版本相關(guān)的。

是的,此程序中存在數(shù)據(jù)競爭。表達式*p的估值可能發(fā)生在賦值num = 789之前、之后、或者同時。 time.Sleep調(diào)用并不能保證*p的估值發(fā)生在此賦值之后。

對于這個特定的例子,我們應(yīng)該將欲發(fā)送的值在開啟新協(xié)程之前存儲在一個臨時變量中來避免數(shù)據(jù)競爭。

...
	tmp := *p
	go func() {
		c <- tmp
	}()
...

使一些協(xié)程永久處于阻塞狀態(tài)

有很多原因?qū)е履硞€協(xié)程永久阻塞,比如:

  • 從一個永遠不會有其它協(xié)程向其發(fā)送數(shù)據(jù)的通道接收數(shù)據(jù);
  • 向一個永遠不會有其它協(xié)程從中讀取數(shù)據(jù)的通道發(fā)送數(shù)據(jù);
  • 被自己死鎖了;
  • 和其它協(xié)程相互死鎖了;
  • 等等。

除了有時我們故意地將主協(xié)程永久阻塞以防止程序退出外,其它大多數(shù)造成協(xié)程永久阻塞的情況都不是我們所期待的。 Go運行時很難分辨出一個處于阻塞狀態(tài)的協(xié)程是否將永久阻塞下去,所以Go運行時不會釋放永久處于阻塞狀態(tài)的協(xié)程占用的資源。

采用最快回應(yīng)通道用例中,如果被當作future/promise來用的通道的容量不足夠大,則較慢回應(yīng)的協(xié)程在準備發(fā)送回應(yīng)結(jié)果時將永久阻塞。 比如,下面的例子中,每個請求將導(dǎo)致4個協(xié)程永久阻塞。

func request() int {
	c := make(chan int)
	for i := 0; i < 5; i++ {
		i := i
		go func() {
			c <- i // 4個協(xié)程將永久阻塞在這里
		}()
	}
	return <-c
}

為了防止有4個協(xié)程永久阻塞,被當作future/promise使用的通道的容量必須至少為4.

第二種“采用最快回應(yīng)”實現(xiàn)方法中,如果被當作future/promise使用的通道是一個非緩沖通道(如下面的代碼所示),則有可能導(dǎo)致其通道的接收者可能會錯過所有的回應(yīng)而導(dǎo)致處于永久阻塞狀態(tài)。

func request() int {
	c := make(chan int)
	for i := 0; i < 5; i++ {
		i := i
		go func() {
			select {
			case c <- i:
			default:
			}
		}()
	}
	return <-c // 有可能永久阻塞在此
}

接收者協(xié)程可能會永久阻塞的原因是如果5個嘗試發(fā)送操作都發(fā)生在接收操作<-c準備好之前,亦即5個個嘗試發(fā)送操作都失敗了,則接收者協(xié)程將永遠無值可接收(從而將處于永久阻塞狀態(tài))。

將通道c改為一個緩沖通道,則至少會有一個嘗試發(fā)送將成功,從而接收者協(xié)程肯定不會永久阻塞。

復(fù)制sync標準庫包中的類型的值

在實踐中,sync標準庫包中的類型(除了Locker接口類型)的值不應(yīng)該被復(fù)制。 我們只應(yīng)該復(fù)制它們的指針值。

下面是一個有問題的并發(fā)編程的例子。 在此例子中,當Counter.Value方法被調(diào)用時,一個Counter屬主值將被復(fù)制,此屬主值的字段Mutex也將被一同復(fù)制。 此復(fù)制并沒有被同步保護,因此復(fù)制結(jié)果可能是不完整的,并非被復(fù)制的屬主值的一個快照。 即使此Mutex字段得以僥幸完整復(fù)制,它的副本所保護的是對字段n的一個副本的訪問,因此一般是沒有意義的。

import "sync"

type Counter struct {
	sync.Mutex
	n int64
}

// 此方法實現(xiàn)是沒問題的。
func (c *Counter) Increase(d int64) (r int64) {
	c.Lock()
	c.n += d
	r = c.n
	c.Unlock()
	return
}

// 此方法的實現(xiàn)是有問題的。當它被調(diào)用時,
// 一個Counter屬主值將被復(fù)制。
func (c Counter) Value() (r int64) {
	c.Lock()
	r = c.n
	c.Unlock()
	return
}

我們應(yīng)該將Value方法的屬主參數(shù)類型更改為指針類型*Counter來避免復(fù)制sync.Mutex值。

Go官方工具鏈中提供的go vet命令將提示此例中的Value方法的聲明可能是一個潛在的邏輯錯誤。

在錯誤的地方調(diào)用sync.WaitGroup.Add方法

每個sync.WaitGroup值內(nèi)部維護著一個計數(shù)。此計數(shù)的初始值為0。 如果一個sync.WaitGroup值的Wait方法在此計數(shù)為0的時候被調(diào)用,則此調(diào)用不會阻塞,否則此調(diào)用將一直阻塞到此計數(shù)變?yōu)?為止。

為了讓一個WaitGroup值的使用有意義,在此值的計數(shù)為0的情況下,對它的下一次Add方法的調(diào)用必須出現(xiàn)在對它的下一次Wait方法的調(diào)用之前。

比如,在下面的例子中,Add方法的調(diào)用位置是不合適的。 此例子程序的打印結(jié)果并不總是100,而可能是0100間的任何一個值。 原因是沒有任何一個Add方法調(diào)用可以確保發(fā)生在唯一的Wait方法調(diào)用之前,結(jié)果導(dǎo)致沒有任何一個Done方法調(diào)用可以確保發(fā)生在唯一的Wait方法調(diào)用返回之前。

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

func main() {
	var wg sync.WaitGroup
	var x int32 = 0
	for i := 0; i < 100; i++ {
		go func() {
			wg.Add(1)
			atomic.AddInt32(&x, 1)
			wg.Done()
		}()
	}

	fmt.Println("等待片刻...")
	wg.Wait()
	fmt.Println(atomic.LoadInt32(&x))
}

我們應(yīng)該將對Add方法的調(diào)用移出匿名協(xié)程之外,像下面這樣,使得任何一個Done方法調(diào)用都確保發(fā)生在唯一的Wait方法調(diào)用返回之前。

...
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go func() {
			atomic.AddInt32(&x, 1)
			wg.Done()
		}()
	}
...

不當?shù)厥褂糜米鯢uture/Promise的通道

通道用例大全一文中,我們了解到一些函數(shù)可以返回用做future/promise的通道結(jié)果。 假設(shè)fafb是這樣的兩個函數(shù),則下面的調(diào)用方式并沒有體現(xiàn)出這兩個函數(shù)的真正價值。

doSomethingWithFutureArguments(<-fa(), <-fb())

在上面這行調(diào)用中,兩個實參值(promise回應(yīng)結(jié)果)的生成實際上是串行進行的,future/promise的價值沒有體現(xiàn)出來。

我們應(yīng)該像下面這樣調(diào)用這兩個函數(shù)來并發(fā)生成兩個回應(yīng)結(jié)果:

ca, cb := fa(), fb()
doSomethingWithFutureArguments(<-ca, <-cb)

沒有讓最后一個活躍的發(fā)送者關(guān)閉通道

Go程序員常犯的一個錯誤是關(guān)閉一個后續(xù)可能還會有協(xié)程向其發(fā)送數(shù)據(jù)的通道。 當向一個已關(guān)閉的通道發(fā)送數(shù)據(jù)的時候,一個恐慌將產(chǎn)生。

這樣的錯誤曾經(jīng)發(fā)生在一些很有名的項目中,比如Kubernetes項目中的這個bug這個bug。

請閱讀此篇文章來了解如何安全和優(yōu)雅地關(guān)閉通道。

對地址不保證為8字節(jié)對齊的值執(zhí)行64位原子操作

64位非方法原子操作中涉及到的實參地址必須為8字節(jié)對齊的。不滿足此條件的64位原子操作將造成一個恐慌。 對于標準編譯器,這樣的情形只可能發(fā)生在32位的架構(gòu)中。 從Go 1.19版本開始,我們可以使用64位方法原子操作來避免這一缺陷。 請閱讀內(nèi)存布局一文來獲知如何確保讓64位的整數(shù)值的地址在32位的架構(gòu)中8字節(jié)對齊。

沒留意過多的time.After函數(shù)調(diào)用消耗了大量資源

time標準庫包中的After函數(shù)返回一個用做延遲通知的通道。 此函數(shù)給并發(fā)編程帶來了很多便利,但是它的每個調(diào)用都需要創(chuàng)建一個time.Timer值,此新創(chuàng)建的Timer值在傳遞給After函數(shù)調(diào)用的時長(實參)內(nèi)肯定不會被垃圾回收。 如果此函數(shù)在某個時段內(nèi)被多次頻繁調(diào)用,則可能導(dǎo)致積累很多尚未過期的Timer值從而造成大量的內(nèi)存和計算消耗。

比如在下面這個例子中,如果longRunning函數(shù)被調(diào)用并且在一分鐘內(nèi)有一百萬條消息到達, 那么在某個特定的很小時間段(大概若干秒)內(nèi)將存在一百萬個活躍的Timer值,即使其中只有一個是真正有用的。

import (
	"fmt"
	"time"
)

// 如果某兩個連續(xù)的消息的間隔大于一分鐘,此函數(shù)將返回。
func longRunning(messages <-chan string) {
	for {
		select {
		case <-time.After(time.Minute):
			return
		case msg := <-messages:
			fmt.Println(msg)
		}
	}
}

為了避免太多的Timer值被創(chuàng)建,我們應(yīng)該只使用(并復(fù)用)一個Timer值,像下面這樣:

func longRunning(messages <-chan string) {
	timer := time.NewTimer(time.Minute)
	defer timer.Stop()

	for {
		select {
		case <-timer.C: // 過期了
			return
		case msg := <-messages:
			fmt.Println(msg)

			// 此if代碼塊很重要。
			if !timer.Stop() {
				<-timer.C
			}
		}

		// 必須重置以復(fù)用。
		timer.Reset(time.Minute)
	}
}

注意,此示例中的if代碼塊用來舍棄一個可能在執(zhí)行第二個分支代碼塊的時候發(fā)送過來的超時通知。

不正確地使用time.Timer值

一個典型的time.Timer的使用已經(jīng)在上一節(jié)中展示了。一些解釋:

  • 如果一個Timer值已經(jīng)過期或者已經(jīng)被終止(stopped),則相應(yīng)的Stop方法調(diào)用返回false。 在此Timer值尚未終止的時候,Stop方法調(diào)用返回false只能意味著此Timer值已經(jīng)過期。
  • 一個Timer值被終止之后,它的通道字段C最多只能含有一個過期的通知。
  • 在一個Timer終止(stopped)之后并且在重置和重用此Timer值之前,我們應(yīng)該確保此Timer值中肯定不存在過期的通知。 這就是上一節(jié)中的例子中的if代碼塊的意義所在。

一個*Timer值的Reset方法必須在對應(yīng)Timer值過期或者終止之后才能被調(diào)用; 否則,此Reset方法調(diào)用和一個可能的向此Timer值的C通道字段的發(fā)送通知操作產(chǎn)生數(shù)據(jù)競爭。

如果上一節(jié)中的例子中的select流程控制代碼塊中的第一個分支被選中,則這表示相應(yīng)的Timer值已經(jīng)過期,所以我們不必終止它。 但是我們必須在第二個分支中通過終止此Timer以檢查此Timer中是否存在一個過期的通知。 如果確實有一個過期的通知,我們必須在重用這個Timer之前將此過期的通知取出;否則,此過期的通知將下一個循環(huán)步導(dǎo)致在第一個分支立即被選中。

比如,下面這個程序?qū)⒃谶\行后大概一秒鐘(而不是十秒鐘)后退出。 而且此程序存在著潛在的數(shù)據(jù)競爭。

package main

import (
	"fmt"
	"time"
)

func main() {
	start := time.Now()
	timer := time.NewTimer(time.Second/2)
	select {
	case <-timer.C:
	default:
		time.Sleep(time.Second) // 此分支被選中的可能性較大
	}
	timer.Reset(time.Second * 10) // 可能數(shù)據(jù)競爭
	<-timer.C
	fmt.Println(time.Since(start)) // 大約1s
}

當一個time.Timer值不再被使用后,我們不必(但是推薦)終止之。

在多個協(xié)程中使用同一個time.Timer值比較容易寫出不當?shù)牟l(fā)代碼,所以盡量不要跨協(xié)程使用一個Timer值。

我們不應(yīng)該依賴于time.TimerReset方法的返回值。此返回值只要是為了歷史兼容性而存在的。


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號