Go 語言 常見的并發(fā)模式

2023-03-22 14:57 更新

原文鏈接:https://chai2010.cn/advanced-go-programming-book/ch1-basic/ch1-06-goroutine.html


1.6 常見的并發(fā)模式

Go 語言最吸引人的地方是它內(nèi)建的并發(fā)支持。Go 語言并發(fā)體系的理論是 C.A.R Hoare 在 1978 年提出的 CSP(Communicating Sequential Process,通訊順序進程)。CSP 有著精確的數(shù)學模型,并實際應用在了 Hoare 參與設計的 T9000 通用計算機上。從 NewSqueak、Alef、Limbo 到現(xiàn)在的 Go 語言,對于對 CSP 有著 20 多年實戰(zhàn)經(jīng)驗的 Rob Pike 來說,他更關(guān)注的是將 CSP 應用在通用編程語言上產(chǎn)生的潛力。作為 Go 并發(fā)編程核心的 CSP 理論的核心概念只有一個:同步通信。關(guān)于同步通信的話題我們在前面一節(jié)已經(jīng)講過,本節(jié)我們將簡單介紹下 Go 語言中常見的并發(fā)模式。

首先要明確一個概念:并發(fā)不是并行。并發(fā)更關(guān)注的是程序的設計層面,并發(fā)的程序完全是可以順序執(zhí)行的,只有在真正的多核 CPU 上才可能真正地同時運行。并行更關(guān)注的是程序的運行層面,并行一般是簡單的大量重復,例如 GPU 中對圖像處理都會有大量的并行運算。為更好的編寫并發(fā)程序,從設計之初 Go 語言就注重如何在編程語言層級上設計一個簡潔安全高效的抽象模型,讓程序員專注于分解問題和組合方案,而且不用被線程管理和信號互斥這些繁瑣的操作分散精力。

在并發(fā)編程中,對共享資源的正確訪問需要精確的控制,在目前的絕大多數(shù)語言中,都是通過加鎖等線程同步方案來解決這一困難問題,而 Go 語言卻另辟蹊徑,它將共享的值通過 Channel 傳遞(實際上多個獨立執(zhí)行的線程很少主動共享資源)。在任意給定的時刻,最好只有一個 Goroutine 能夠擁有該資源。數(shù)據(jù)競爭從設計層面上就被杜絕了。為了提倡這種思考方式,Go 語言將其并發(fā)編程哲學化為一句口號:

Do not communicate by sharing memory; instead, share memory by communicating.

不要通過共享內(nèi)存來通信,而應通過通信來共享內(nèi)存。

這是更高層次的并發(fā)編程哲學(通過管道來傳值是 Go 語言推薦的做法)。雖然像引用計數(shù)這類簡單的并發(fā)問題通過原子操作或互斥鎖就能很好地實現(xiàn),但是通過 Channel 來控制訪問能夠讓你寫出更簡潔正確的程序。

1.6.1 并發(fā)版本的 Hello world

我們先以在一個新的 Goroutine 中輸出“Hello world”,main 等待后臺線程輸出工作完成之后退出,這樣一個簡單的并發(fā)程序作為熱身。

并發(fā)編程的核心概念是同步通信,但是同步的方式卻有多種。我們先以大家熟悉的互斥量 sync.Mutex 來實現(xiàn)同步通信。根據(jù)文檔,我們不能直接對一個未加鎖狀態(tài)的 sync.Mutex 進行解鎖,這會導致運行時異常。下面這種方式并不能保證正常工作:

func main() {
    var mu sync.Mutex

    go func(){
        fmt.Println("你好, 世界")
        mu.Lock()
    }()

    mu.Unlock()
}

因為 mu.Lock() 和 mu.Unlock() 并不在同一個 Goroutine 中,所以也就不滿足順序一致性內(nèi)存模型。同時它們也沒有其它的同步事件可以參考,這兩個事件不可排序也就是可以并發(fā)的。因為可能是并發(fā)的事件,所以 main 函數(shù)中的 mu.Unlock() 很有可能先發(fā)生,而這個時刻 mu 互斥對象還處于未加鎖的狀態(tài),從而會導致運行時異常。

下面是修復后的代碼:

func main() {
    var mu sync.Mutex

    mu.Lock()
    go func(){
        fmt.Println("你好, 世界")
        mu.Unlock()
    }()

    mu.Lock()
}

修復的方式是在 main 函數(shù)所在線程中執(zhí)行兩次 mu.Lock(),當?shù)诙渭渔i時會因為鎖已經(jīng)被占用(不是遞歸鎖)而阻塞,main 函數(shù)的阻塞狀態(tài)驅(qū)動后臺線程繼續(xù)向前執(zhí)行。當后臺線程執(zhí)行到 mu.Unlock() 時解鎖,此時打印工作已經(jīng)完成了,解鎖會導致 main 函數(shù)中的第二個 mu.Lock() 阻塞狀態(tài)取消,此時后臺線程和主線程再沒有其它的同步事件參考,它們退出的事件將是并發(fā)的:在 main 函數(shù)退出導致程序退出時,后臺線程可能已經(jīng)退出了,也可能沒有退出。雖然無法確定兩個線程退出的時間,但是打印工作是可以正確完成的。

使用 sync.Mutex 互斥鎖同步是比較低級的做法。我們現(xiàn)在改用無緩存的管道來實現(xiàn)同步:

func main() {
    done := make(chan int)

    go func(){
        fmt.Println("你好, 世界")
        <-done
    }()

    done <- 1
}

根據(jù) Go 語言內(nèi)存模型規(guī)范,對于從無緩沖 Channel 進行的接收,發(fā)生在對該 Channel 進行的發(fā)送完成之前。因此,后臺線程 <-done 接收操作完成之后,main 線程的 done <- 1 發(fā)送操作才可能完成(從而退出 main、退出程序),而此時打印工作已經(jīng)完成了。

上面的代碼雖然可以正確同步,但是對管道的緩存大小太敏感:如果管道有緩存的話,就無法保證 main 退出之前后臺線程能正常打印了。更好的做法是將管道的發(fā)送和接收方向調(diào)換一下,這樣可以避免同步事件受管道緩存大小的影響:

func main() {
    done := make(chan int, 1) // 帶緩存的管道

    go func(){
        fmt.Println("你好, 世界")
        done <- 1
    }()

    <-done
}

對于帶緩沖的 Channel,對于 Channel 的第 K 個接收完成操作發(fā)生在第 K+C 個發(fā)送操作完成之前,其中 C 是 Channel 的緩存大小。雖然管道是帶緩存的,main 線程接收完成是在后臺線程發(fā)送開始但還未完成的時刻,此時打印工作也是已經(jīng)完成的。

基于帶緩存的管道,我們可以很容易將打印線程擴展到 N 個。下面的例子是開啟 10 個后臺線程分別打?。?

func main() {
    done := make(chan int, 10) // 帶 10 個緩存

    // 開 N 個后臺打印線程
    for i := 0; i < cap(done); i++ {
        go func(){
            fmt.Println("你好, 世界")
            done <- 1
        }()
    }

    // 等待 N 個后臺線程完成
    for i := 0; i < cap(done); i++ {
        <-done
    }
}

對于這種要等待 N 個線程完成后再進行下一步的同步操作有一個簡單的做法,就是使用 sync.WaitGroup 來等待一組事件:

func main() {
    var wg sync.WaitGroup

    // 開 N 個后臺打印線程
    for i := 0; i < 10; i++ {
        wg.Add(1)

        go func() {
            fmt.Println("你好, 世界")
            wg.Done()
        }()
    }

    // 等待 N 個后臺線程完成
    wg.Wait()
}

其中 wg.Add(1) 用于增加等待事件的個數(shù),必須確保在后臺線程啟動之前執(zhí)行(如果放到后臺線程之中執(zhí)行則不能保證被正常執(zhí)行到)。當后臺線程完成打印工作之后,調(diào)用 wg.Done() 表示完成一個事件。main 函數(shù)的 wg.Wait() 是等待全部的事件完成。

1.6.2 生產(chǎn)者消費者模型

并發(fā)編程中最常見的例子就是生產(chǎn)者消費者模式,該模式主要通過平衡生產(chǎn)線程和消費線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度。簡單地說,就是生產(chǎn)者生產(chǎn)一些數(shù)據(jù),然后放到成果隊列中,同時消費者從成果隊列中來取這些數(shù)據(jù)。這樣就讓生產(chǎn)消費變成了異步的兩個過程。當成果隊列中沒有數(shù)據(jù)時,消費者就進入饑餓的等待中;而當成果隊列中數(shù)據(jù)已滿時,生產(chǎn)者則面臨因產(chǎn)品擠壓導致 CPU 被剝奪的下崗問題。

Go 語言實現(xiàn)生產(chǎn)者消費者并發(fā)很簡單:

// 生產(chǎn)者: 生成 factor 整數(shù)倍的序列
func Producer(factor int, out chan<- int) {
    for i := 0; ; i++ {
        out <- i*factor
    }
}

// 消費者
func Consumer(in <-chan int) {
    for v := range in {
        fmt.Println(v)
    }
}
func main() {
    ch := make(chan int, 64) // 成果隊列

    go Producer(3, ch) // 生成 3 的倍數(shù)的序列
    go Producer(5, ch) // 生成 5 的倍數(shù)的序列
    go Consumer(ch)    // 消費生成的隊列

    // 運行一定時間后退出
    time.Sleep(5 * time.Second)
}

我們開啟了 2 個 Producer 生產(chǎn)流水線,分別用于生成 3 和 5 的倍數(shù)的序列。然后開啟 1 個 Consumer 消費者線程,打印獲取的結(jié)果。我們通過在 main 函數(shù)休眠一定的時間來讓生產(chǎn)者和消費者工作一定時間。正如前面一節(jié)說的,這種靠休眠方式是無法保證穩(wěn)定的輸出結(jié)果的。

我們可以讓 main 函數(shù)保存阻塞狀態(tài)不退出,只有當用戶輸入 Ctrl-C 時才真正退出程序:

func main() {
    ch := make(chan int, 64) // 成果隊列

    go Producer(3, ch) // 生成 3 的倍數(shù)的序列
    go Producer(5, ch) // 生成 5 的倍數(shù)的序列
    go Consumer(ch)    // 消費 生成的隊列

    // Ctrl+C 退出
    sig := make(chan os.Signal, 1)
    signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
    fmt.Printf("quit (%v)\n", <-sig)
}

我們這個例子中有 2 個生產(chǎn)者,并且 2 個生產(chǎn)者之間并無同步事件可參考,它們是并發(fā)的。因此,消費者輸出的結(jié)果序列的順序是不確定的,這并沒有問題,生產(chǎn)者和消費者依然可以相互配合工作。

1.6.3 發(fā)布訂閱模型

發(fā)布訂閱(publish-and-subscribe)模型通常被簡寫為 pub/sub 模型。在這個模型中,消息生產(chǎn)者成為發(fā)布者(publisher),而消息消費者則成為訂閱者(subscriber),生產(chǎn)者和消費者是 M:N 的關(guān)系。在傳統(tǒng)生產(chǎn)者和消費者模型中,是將消息發(fā)送到一個隊列中,而發(fā)布訂閱模型則是將消息發(fā)布給一個主題。

為此,我們構(gòu)建了一個名為 pubsub 的發(fā)布訂閱模型支持包:

// Package pubsub implements a simple multi-topic pub-sub library.
package pubsub

import (
    "sync"
    "time"
)

type (
    subscriber chan interface{}         // 訂閱者為一個管道
    topicFunc  func(v interface{}) bool // 主題為一個過濾器
)

// 發(fā)布者對象
type Publisher struct {
    m           sync.RWMutex             // 讀寫鎖
    buffer      int                      // 訂閱隊列的緩存大小
    timeout     time.Duration            // 發(fā)布超時時間
    subscribers map[subscriber]topicFunc // 訂閱者信息
}

// 構(gòu)建一個發(fā)布者對象, 可以設置發(fā)布超時時間和緩存隊列的長度
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
    return &Publisher{
        buffer:      buffer,
        timeout:     publishTimeout,
        subscribers: make(map[subscriber]topicFunc),
    }
}

// 添加一個新的訂閱者,訂閱全部主題
func (p *Publisher) Subscribe() chan interface{} {
    return p.SubscribeTopic(nil)
}

// 添加一個新的訂閱者,訂閱過濾器篩選后的主題
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
    ch := make(chan interface{}, p.buffer)
    p.m.Lock()
    p.subscribers[ch] = topic
    p.m.Unlock()
    return ch
}

// 退出訂閱
func (p *Publisher) Evict(sub chan interface{}) {
    p.m.Lock()
    defer p.m.Unlock()

    delete(p.subscribers, sub)
    close(sub)
}

// 發(fā)布一個主題
func (p *Publisher) Publish(v interface{}) {
    p.m.RLock()
    defer p.m.RUnlock()

    var wg sync.WaitGroup
    for sub, topic := range p.subscribers {
        wg.Add(1)
        go p.sendTopic(sub, topic, v, &wg)
    }
    wg.Wait()
}

// 關(guān)閉發(fā)布者對象,同時關(guān)閉所有的訂閱者管道。
func (p *Publisher) Close() {
    p.m.Lock()
    defer p.m.Unlock()

    for sub := range p.subscribers {
        delete(p.subscribers, sub)
        close(sub)
    }
}

// 發(fā)送主題,可以容忍一定的超時
func (p *Publisher) sendTopic(
    sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup,
) {
    defer wg.Done()
    if topic != nil && !topic(v) {
        return
    }

    select {
    case sub <- v:
    case <-time.After(p.timeout):
    }
}

下面的例子中,有兩個訂閱者分別訂閱了全部主題和含有"golang"的主題:

import "path/to/pubsub"

func main() {
    p := pubsub.NewPublisher(100*time.Millisecond, 10)
    defer p.Close()

    all := p.Subscribe()
    golang := p.SubscribeTopic(func(v interface{}) bool {
        if s, ok := v.(string); ok {
            return strings.Contains(s, "golang")
        }
        return false
    })

    p.Publish("hello,  world!")
    p.Publish("hello, golang!")

    go func() {
        for  msg := range all {
            fmt.Println("all:", msg)
        }
    } ()

    go func() {
        for  msg := range golang {
            fmt.Println("golang:", msg)
        }
    } ()

    // 運行一定時間后退出
    time.Sleep(3 * time.Second)
}

在發(fā)布訂閱模型中,每條消息都會傳送給多個訂閱者。發(fā)布者通常不會知道、也不關(guān)心哪一個訂閱者正在接收主題消息。訂閱者和發(fā)布者可以在運行時動態(tài)添加,是一種松散的耦合關(guān)系,這使得系統(tǒng)的復雜性可以隨時間的推移而增長。在現(xiàn)實生活中,像天氣預報之類的應用就可以應用這個并發(fā)模式。

1.6.4 控制并發(fā)數(shù)

很多用戶在適應了 Go 語言強大的并發(fā)特性之后,都傾向于編寫最大并發(fā)的程序,因為這樣似乎可以提供最大的性能。在現(xiàn)實中我們行色匆匆,但有時卻需要我們放慢腳步享受生活,并發(fā)的程序也是一樣:有時候我們需要適當?shù)乜刂撇l(fā)的程度,因為這樣不僅僅可給其它的應用/任務讓出/預留一定的 CPU 資源,也可以適當降低功耗緩解電池的壓力。

在 Go 語言自帶的 godoc 程序?qū)崿F(xiàn)中有一個 vfs 的包對應虛擬的文件系統(tǒng),在 vfs 包下面有一個 gatefs 的子包,gatefs 子包的目的就是為了控制訪問該虛擬文件系統(tǒng)的最大并發(fā)數(shù)。gatefs 包的應用很簡單:

import (
    "golang.org/x/tools/godoc/vfs"
    "golang.org/x/tools/godoc/vfs/gatefs"
)

func main() {
    fs := gatefs.New(vfs.OS("/path"), make(chan bool, 8))
    // ...
}

其中 vfs.OS("/path") 基于本地文件系統(tǒng)構(gòu)造一個虛擬的文件系統(tǒng),然后 gatefs.New 基于現(xiàn)有的虛擬文件系統(tǒng)構(gòu)造一個并發(fā)受控的虛擬文件系統(tǒng)。并發(fā)數(shù)控制的原理在前面一節(jié)已經(jīng)講過,就是通過帶緩存管道的發(fā)送和接收規(guī)則來實現(xiàn)最大并發(fā)阻塞:

var limit = make(chan int, 3)

func main() {
    for _, w := range work {
        go func() {
            limit <- 1
            w()
            <-limit
        }()
    }
    select{}
}

不過 gatefs 對此做一個抽象類型 gate,增加了 enter 和 leave 方法分別對應并發(fā)代碼的進入和離開。當超出并發(fā)數(shù)目限制的時候,enter 方法會阻塞直到并發(fā)數(shù)降下來為止。

type gate chan bool

func (g gate) enter() { g <- true }
func (g gate) leave() { <-g }

gatefs 包裝的新的虛擬文件系統(tǒng)就是將需要控制并發(fā)的方法增加了 enter 和 leave 調(diào)用而已:

type gatefs struct {
    fs vfs.FileSystem
    gate
}

func (fs gatefs) Lstat(p string) (os.FileInfo, error) {
    fs.enter()
    defer fs.leave()
    return fs.fs.Lstat(p)
}

我們不僅可以控制最大的并發(fā)數(shù)目,而且可以通過帶緩存 Channel 的使用量和最大容量比例來判斷程序運行的并發(fā)率。當管道為空的時候可以認為是空閑狀態(tài),當管道滿了時任務是繁忙狀態(tài),這對于后臺一些低級任務的運行是有參考價值的。

1.6.5 贏者為王

采用并發(fā)編程的動機有很多:并發(fā)編程可以簡化問題,比如一類問題對應一個處理線程會更簡單;并發(fā)編程還可以提升性能,在一個多核 CPU 上開 2 個線程一般會比開 1 個線程快一些。其實對于提升性能而言,程序并不是簡單地運行速度快就表示用戶體驗好的;很多時候程序能快速響應用戶請求才是最重要的,當沒有用戶請求需要處理的時候才合適處理一些低優(yōu)先級的后臺任務。

假設我們想快速地搜索“golang”相關(guān)的主題,我們可能會同時打開 Bing、Google 或百度等多個檢索引擎。當某個搜索最先返回結(jié)果后,就可以關(guān)閉其它搜索頁面了。因為受網(wǎng)絡環(huán)境和搜索引擎算法的影響,某些搜索引擎可能很快返回搜索結(jié)果,某些搜索引擎也可能等到他們公司倒閉也沒有完成搜索。我們可以采用類似的策略來編寫這個程序:

func main() {
    ch := make(chan string, 32)

    go func() {
        ch <- searchByBing("golang")
    }()
    go func() {
        ch <- searchByGoogle("golang")
    }()
    go func() {
        ch <- searchByBaidu("golang")
    }()

    fmt.Println(<-ch)
}

首先,我們創(chuàng)建了一個帶緩存的管道,管道的緩存數(shù)目要足夠大,保證不會因為緩存的容量引起不必要的阻塞。然后我們開啟了多個后臺線程,分別向不同的搜索引擎提交搜索請求。當任意一個搜索引擎最先有結(jié)果之后,都會馬上將結(jié)果發(fā)到管道中(因為管道帶了足夠的緩存,這個過程不會阻塞)。但是最終我們只從管道取第一個結(jié)果,也就是最先返回的結(jié)果。

通過適當開啟一些冗余的線程,嘗試用不同途徑去解決同樣的問題,最終以贏者為王的方式提升了程序的相應性能。

1.6.6 素數(shù)篩

在“Hello world 的革命”一節(jié)中,我們?yōu)榱搜菔?Newsqueak 的并發(fā)特性,文中給出了并發(fā)版本素數(shù)篩的實現(xiàn)。并發(fā)版本的素數(shù)篩是一個經(jīng)典的并發(fā)例子,通過它我們可以更深刻地理解 Go 語言的并發(fā)特性?!八財?shù)篩”的原理如圖:


圖 1-13 素數(shù)篩

我們需要先生成最初的 2, 3, 4, ... 自然數(shù)序列(不包含開頭的 0、1):

// 返回生成自然數(shù)序列的管道: 2, 3, 4, ...
func GenerateNatural() chan int {
    ch := make(chan int)
    go func() {
        for i := 2; ; i++ {
            ch <- i
        }
    }()
    return ch
}

GenerateNatural 函數(shù)內(nèi)部啟動一個 Goroutine 生產(chǎn)序列,返回對應的管道。

然后是為每個素數(shù)構(gòu)造一個篩子:將輸入序列中是素數(shù)倍數(shù)的數(shù)提出,并返回新的序列,是一個新的管道。

// 管道過濾器: 刪除能被素數(shù)整除的數(shù)
func PrimeFilter(in <-chan int, prime int) chan int {
    out := make(chan int)
    go func() {
        for {
            if i := <-in; i%prime != 0 {
                out <- i
            }
        }
    }()
    return out
}

PrimeFilter 函數(shù)也是內(nèi)部啟動一個 Goroutine 生產(chǎn)序列,返回過濾后序列對應的管道。

現(xiàn)在我們可以在 main 函數(shù)中驅(qū)動這個并發(fā)的素數(shù)篩了:

func main() {
    ch := GenerateNatural() // 自然數(shù)序列: 2, 3, 4, ...
    for i := 0; i < 100; i++ {
        prime := <-ch // 新出現(xiàn)的素數(shù)
        fmt.Printf("%v: %v\n", i+1, prime)
        ch = PrimeFilter(ch, prime) // 基于新素數(shù)構(gòu)造的過濾器
    }
}

我們先是調(diào)用 GenerateNatural() 生成最原始的從 2 開始的自然數(shù)序列。然后開始一個 100 次迭代的循環(huán),希望生成 100 個素數(shù)。在每次循環(huán)迭代開始的時候,管道中的第一個數(shù)必定是素數(shù),我們先讀取并打印這個素數(shù)。然后基于管道中剩余的數(shù)列,并以當前取出的素數(shù)為篩子過濾后面的素數(shù)。不同的素數(shù)篩子對應的管道是串聯(lián)在一起的。

素數(shù)篩展示了一種優(yōu)雅的并發(fā)程序結(jié)構(gòu)。但是因為每個并發(fā)體處理的任務粒度太細微,程序整體的性能并不理想。對于細粒度的并發(fā)程序,CSP 模型中固有的消息傳遞的代價太高了(多線程并發(fā)模型同樣要面臨線程啟動的代價)。

1.6.7 并發(fā)的安全退出

有時候我們需要通知 Goroutine 停止它正在干的事情,特別是當它工作在錯誤的方向上的時候。Go 語言并沒有提供在一個直接終止 Goroutine 的方法,由于這樣會導致 Goroutine 之間的共享變量處在未定義的狀態(tài)上。但是如果我們想要退出兩個或者任意多個 Goroutine 怎么辦呢?

Go 語言中不同 Goroutine 之間主要依靠管道進行通信和同步。要同時處理多個管道的發(fā)送或接收操作,我們需要使用 select 關(guān)鍵字(這個關(guān)鍵字和網(wǎng)絡編程中的 select 函數(shù)的行為類似)。當 select 有多個分支時,會隨機選擇一個可用的管道分支,如果沒有可用的管道分支則選擇 default 分支,否則會一直保存阻塞狀態(tài)。

基于 select 實現(xiàn)的管道的超時判斷:

select {
case v := <-in:
    fmt.Println(v)
case <-time.After(time.Second):
    return // 超時
}

通過 select 的 default 分支實現(xiàn)非阻塞的管道發(fā)送或接收操作:

select {
case v := <-in:
    fmt.Println(v)
default:
    // 沒有數(shù)據(jù)
}

通過 select 來阻止 main 函數(shù)退出:

func main() {
    // do some thins
    select{}
}

當有多個管道均可操作時,select 會隨機選擇一個管道。基于該特性我們可以用 select 實現(xiàn)一個生成隨機數(shù)序列的程序:

func main() {
    ch := make(chan int)
    go func() {
        for {
            select {
            case ch <- 0:
            case ch <- 1:
            }
        }
    }()

    for v := range ch {
        fmt.Println(v)
    }
}

我們通過 select 和 default 分支可以很容易實現(xiàn)一個 Goroutine 的退出控制:

func worker(cancel chan bool) {
    for {
        select {
        default:
            fmt.Println("hello")
            // 正常工作
        case <-cancel:
            // 退出
        }
    }
}

func main() {
    cancel := make(chan bool)
    go worker(cancel)

    time.Sleep(time.Second)
    cancel <- true
}

但是管道的發(fā)送操作和接收操作是一一對應的,如果要停止多個 Goroutine 那么可能需要創(chuàng)建同樣數(shù)量的管道,這個代價太大了。其實我們可以通過 close 關(guān)閉一個管道來實現(xiàn)廣播的效果,所有從關(guān)閉管道接收的操作均會收到一個零值和一個可選的失敗標志。

func worker(cancel chan bool) {
    for {
        select {
        default:
            fmt.Println("hello")
            // 正常工作
        case <-cancel:
            // 退出
        }
    }
}

func main() {
    cancel := make(chan bool)

    for i := 0; i < 10; i++ {
        go worker(cancel)
    }

    time.Sleep(time.Second)
    close(cancel)
}

我們通過 close 來關(guān)閉 cancel 管道向多個 Goroutine 廣播退出的指令。不過這個程序依然不夠穩(wěn)?。寒斆總€ Goroutine 收到退出指令退出時一般會進行一定的清理工作,但是退出的清理工作并不能保證被完成,因為 main 線程并沒有等待各個工作 Goroutine 退出工作完成的機制。我們可以結(jié)合 sync.WaitGroup 來改進:

func worker(wg *sync.WaitGroup, cancel chan bool) {
    defer wg.Done()

    for {
        select {
        default:
            fmt.Println("hello")
        case <-cancel:
            return
        }
    }
}

func main() {
    cancel := make(chan bool)

    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go worker(&wg, cancel)
    }

    time.Sleep(time.Second)
    close(cancel)
    wg.Wait()
}

現(xiàn)在每個工作者并發(fā)體的創(chuàng)建、運行、暫停和退出都是在 main 函數(shù)的安全控制之下了。

1.6.8 context 包

在 Go1.7 發(fā)布時,標準庫增加了一個 context 包,用來簡化對于處理單個請求的多個 Goroutine 之間與請求域的數(shù)據(jù)、超時和退出等操作,官方有博文對此做了專門介紹。我們可以用 context 包來重新實現(xiàn)前面的線程安全退出或超時的控制:

func worker(ctx context.Context, wg *sync.WaitGroup) error {
    defer wg.Done()

    for {
        select {
        default:
            fmt.Println("hello")
        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go worker(ctx, &wg)
    }

    time.Sleep(time.Second)
    cancel()

    wg.Wait()
}

當并發(fā)體超時或 main 主動停止工作者 Goroutine 時,每個工作者都可以安全退出。

Go 語言是帶內(nèi)存自動回收特性的,因此內(nèi)存一般不會泄漏。在前面素數(shù)篩的例子中,GenerateNatural 和 PrimeFilter 函數(shù)內(nèi)部都啟動了新的 Goroutine,當 main 函數(shù)不再使用管道時后臺 Goroutine 有泄漏的風險。我們可以通過 context 包來避免這個問題,下面是改進的素數(shù)篩實現(xiàn):

// 返回生成自然數(shù)序列的管道: 2, 3, 4, ...
func GenerateNatural(ctx context.Context) chan int {
    ch := make(chan int)
    go func() {
        for i := 2; ; i++ {
            select {
            case <- ctx.Done():
                return
            case ch <- i:
            }
        }
    }()
    return ch
}

// 管道過濾器: 刪除能被素數(shù)整除的數(shù)
func PrimeFilter(ctx context.Context, in <-chan int, prime int) chan int {
    out := make(chan int)
    go func() {
        for {
            if i := <-in; i%prime != 0 {
                select {
                case <- ctx.Done():
                    return
                case out <- i:
                }
            }
        }
    }()
    return out
}

func main() {
    // 通過 Context 控制后臺 Goroutine 狀態(tài)
    ctx, cancel := context.WithCancel(context.Background())

    ch := GenerateNatural(ctx) // 自然數(shù)序列: 2, 3, 4, ...
    for i := 0; i < 100; i++ {
        prime := <-ch // 新出現(xiàn)的素數(shù)
        fmt.Printf("%v: %v\n", i+1, prime)
        ch = PrimeFilter(ctx, ch, prime) // 基于新素數(shù)構(gòu)造的過濾器
    }

    cancel()
}

當 main 函數(shù)完成工作前,通過調(diào)用 cancel() 來通知后臺 Goroutine 退出,這樣就避免了 Goroutine 的泄漏。

然而,上面這個例子只是展示了 cancel() 的基礎用法,實際上這個例子會導致 Goroutine 死鎖,不能正常退出。 我們可以給上面這個例子添加 sync.WaitGroup 來復現(xiàn)這個問題。

package main

import (
    "context"
    "fmt"
    "sync"
)

// 返回生成自然數(shù)序列的管道: 2, 3, 4, ...
func GenerateNatural(ctx context.Context, wg *sync.WaitGroup) chan int {
    ch := make(chan int)
    go func() {
        defer wg.Done()
        for i := 2; ; i++ {
            select {
            case <-ctx.Done():
                return
            case ch <- i:
            }
        }
    }()
    return ch
}

// 管道過濾器: 刪除能被素數(shù)整除的數(shù)
func PrimeFilter(ctx context.Context, in <-chan int, prime int, wg *sync.WaitGroup) chan int {
    out := make(chan int)
    go func() {
        defer wg.Done()
        for {
            if i := <-in; i%prime != 0 {
                select {
                case <-ctx.Done():
                    return
                case out <- i:
                }
            }
        }
    }()
    return out
}

func main() {
    wg := sync.WaitGroup{}
    // 通過 Context 控制后臺 Goroutine 狀態(tài)
    ctx, cancel := context.WithCancel(context.Background())
    wg.Add(1)
    ch := GenerateNatural(ctx, &wg) // 自然數(shù)序列: 2, 3, 4, ...
    for i := 0; i < 100; i++ {
        prime := <-ch // 新出現(xiàn)的素數(shù)
        fmt.Printf("%v: %v\n", i+1, prime)
        wg.Add(1)
        ch = PrimeFilter(ctx, ch, prime, &wg) // 基于新素數(shù)構(gòu)造的過濾器
    }

    cancel()
    wg.Wait()
}

執(zhí)行上面這個例子很容易就復現(xiàn)了死鎖的問題,原因是素數(shù)篩中的 ctx.Done() 位于 if i := <-in; i%prime != 0 判斷之內(nèi), 而這個判斷可能會一直阻塞,導致 Goroutine 無法正常退出。讓我們來解決這個問題。

package main

import (
    "context"
    "fmt"
    "sync"
)

// 返回生成自然數(shù)序列的管道: 2, 3, 4, ...
func GenerateNatural(ctx context.Context, wg *sync.WaitGroup) chan int {
    ch := make(chan int)
    go func() {
        defer wg.Done()
        for i := 2; ; i++ {
            select {
            case <-ctx.Done():
                return
            case ch <- i:
            }
        }
    }()
    return ch
}

// 管道過濾器: 刪除能被素數(shù)整除的數(shù)
func PrimeFilter(ctx context.Context, in <-chan int, prime int, wg *sync.WaitGroup) chan int {
    out := make(chan int)
    go func() {
        defer wg.Done()
        for {
            select {
            case <-ctx.Done():
                return
            case i := <-in:
                if i%prime != 0 {
                    select {
                    case <-ctx.Done():
                        return
                    case out <- i:
                    }
                }
            }

        }
    }()
    return out
}

func main() {
    wg := sync.WaitGroup{}
    // 通過 Context 控制后臺 Goroutine 狀態(tài)
    ctx, cancel := context.WithCancel(context.Background())
    wg.Add(1)
    ch := GenerateNatural(ctx, &wg) // 自然數(shù)序列: 2, 3, 4, ...
    for i := 0; i < 100; i++ {
        prime := <-ch // 新出現(xiàn)的素數(shù)
        fmt.Printf("%v: %v\n", i+1, prime)
        wg.Add(1)
        ch = PrimeFilter(ctx, ch, prime, &wg) // 基于新素數(shù)構(gòu)造的過濾器
    }

    cancel()
    wg.Wait()
}

如上所示,我們可以通過將 i := <-in 放入 select,在這個 select 內(nèi)也執(zhí)行 <-ctx.Done() 來解決阻塞導致的死鎖。 不過上面這個例子并不優(yōu)美,讓我們換一種方式。

package main

import (
    "context"
    "fmt"
    "sync"
)

// 返回生成自然數(shù)序列的管道: 2, 3, 4, ...
func GenerateNatural(ctx context.Context, wg *sync.WaitGroup) chan int {
    ch := make(chan int)
    go func() {
        defer wg.Done()
        defer close(ch)
        for i := 2; ; i++ {
            select {
            case <-ctx.Done():
                return
            case ch <- i:
            }
        }
    }()
    return ch
}

// 管道過濾器: 刪除能被素數(shù)整除的數(shù)
func PrimeFilter(ctx context.Context, in <-chan int, prime int, wg *sync.WaitGroup) chan int {
    out := make(chan int)
    go func() {
        defer wg.Done()
        defer close(out)
        for i := range in {
            if i%prime != 0 {
                select {
                case <-ctx.Done():
                    return
                case out <- i:
                }
            }
        }
    }()
    return out
}

func main() {
    wg := sync.WaitGroup{}
    // 通過 Context 控制后臺 Goroutine 狀態(tài)
    ctx, cancel := context.WithCancel(context.Background())
    wg.Add(1)
    ch := GenerateNatural(ctx, &wg) // 自然數(shù)序列: 2, 3, 4, ...
    for i := 0; i < 100; i++ {
        prime := <-ch // 新出現(xiàn)的素數(shù)
        fmt.Printf("%v: %v\n", i+1, prime)
        wg.Add(1)
        ch = PrimeFilter(ctx, ch, prime, &wg) // 基于新素數(shù)構(gòu)造的過濾器
    }

    cancel()
    wg.Wait()
}

在上面這個例子中主要有以下幾點需要關(guān)注:

  1. 通過 ?for range? 循環(huán)保證了輸入管道被關(guān)閉時,循環(huán)能退出,不會出現(xiàn)死循環(huán);
  2. 通過 ?defer close? 保證了無論是輸入管道被關(guān)閉,還是 ctx 被取消,只要素數(shù)篩退出,都會關(guān)閉輸出管道。

至此,我們終于足夠優(yōu)美地解決了這個死鎖問題。

并發(fā)是一個非常大的主題,我們這里只是展示幾個非?;A的并發(fā)編程的例子。官方文檔也有很多關(guān)于并發(fā)編程的討論,國內(nèi)也有專門討論 Go 語言并發(fā)編程的書籍。讀者可以根據(jù)自己的需求查閱相關(guān)的文獻。



以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號