Go語言 通道用例大全

2023-02-16 17:39 更新

在閱讀本文之前,請先閱讀通道一文。 那篇文章詳細(xì)地解釋了通道類型和通道值,以及各種通道操作的規(guī)則細(xì)節(jié)。 一個(gè)Go新手程序員可能需要反復(fù)多次閱讀那篇文章和當(dāng)前這篇文章來精通Go通道編程。

本文余下的內(nèi)容將展示很多通道用例。 希望這篇文章能夠說服你接收下面的觀點(diǎn):

  • 使用通道進(jìn)行異步和并發(fā)編程是簡單和愜意的;
  • 通道同步技術(shù)比被很多其它語言采用的其它同步方案(比如角色模型async/await模式)有著更多的應(yīng)用場景和更多的使用變種。

請注意,本文的目的是展示盡量多的通道用例。但是,我們應(yīng)該知道通道并不是Go支持的唯一同步技術(shù),并且通道并不是在任何情況下都是最佳的同步技術(shù)。 請閱讀原子操作其它并發(fā)同步技術(shù)來了解更多的Go支持的同步技術(shù)。

將通道用做future/promise

很多其它流行語言支持future/promise來實(shí)現(xiàn)異步(并發(fā))編程。 Future/promise常常用在請求/回應(yīng)場合。

返回單向接收通道做為函數(shù)返回結(jié)果

在下面這個(gè)例子中,sumSquares函數(shù)調(diào)用的兩個(gè)實(shí)參請求并發(fā)進(jìn)行。 每個(gè)通道讀取操作將阻塞到請求返回結(jié)果為止。 兩個(gè)實(shí)參總共需要大約3秒鐘(而不是6秒鐘)準(zhǔn)備完畢(以較慢的一個(gè)為準(zhǔn))。

package main

import (
	"time"
	"math/rand"
	"fmt"
)

func longTimeRequest() <-chan int32 {
	r := make(chan int32)

	go func() {
		time.Sleep(time.Second * 3) // 模擬一個(gè)工作負(fù)載
		r <- rand.Int31n(100)
	}()

	return r
}

func sumSquares(a, b int32) int32 {
	return a*a + b*b
}

func main() {
	rand.Seed(time.Now().UnixNano())

	a, b := longTimeRequest(), longTimeRequest()
	fmt.Println(sumSquares(<-a, <-b))
}

將單向發(fā)送通道類型用做函數(shù)實(shí)參

和上例一樣,在下面這個(gè)例子中,sumSquares函數(shù)調(diào)用的兩個(gè)實(shí)參的請求也是并發(fā)進(jìn)行的。 和上例不同的是longTimeRequest函數(shù)接收一個(gè)單向發(fā)送通道類型參數(shù)而不是返回一個(gè)單向接收通道結(jié)果。

package main

import (
	"time"
	"math/rand"
	"fmt"
)

func longTimeRequest(r chan<- int32)  {
	time.Sleep(time.Second * 3) // 模擬一個(gè)工作負(fù)載
	r <- rand.Int31n(100)
}

func sumSquares(a, b int32) int32 {
	return a*a + b*b
}

func main() {
	rand.Seed(time.Now().UnixNano())

	ra, rb := make(chan int32), make(chan int32)
	go longTimeRequest(ra)
	go longTimeRequest(rb)

	fmt.Println(sumSquares(<-ra, <-rb))
}

對于上面這個(gè)特定的例子,我們可以只使用一個(gè)通道來接收回應(yīng)結(jié)果,因?yàn)閮蓚€(gè)參數(shù)的作用是對等的。

...

	results := make(chan int32, 2) // 緩沖與否不重要
	go longTimeRequest(results)
	go longTimeRequest(results)

	fmt.Println(sumSquares(<-results, <-results))
}

這可以看作是后面將要提到的數(shù)據(jù)聚合的一個(gè)應(yīng)用。

采用最快回應(yīng)

本用例可以看作是上例中只使用一個(gè)通道變種的增強(qiáng)。

有時(shí)候,一份數(shù)據(jù)可能同時(shí)從多個(gè)數(shù)據(jù)源獲取。這些數(shù)據(jù)源將返回相同的數(shù)據(jù)。 因?yàn)楦鞣N因素,這些數(shù)據(jù)源的回應(yīng)速度參差不一,甚至某個(gè)特定數(shù)據(jù)源的多次回應(yīng)速度之間也可能相差很大。 同時(shí)從多個(gè)數(shù)據(jù)源獲取一份相同的數(shù)據(jù)可以有效保障低延遲。我們只需采用最快的回應(yīng)并舍棄其它較慢回應(yīng)。

注意:如果有N個(gè)數(shù)據(jù)源,為了防止被舍棄的回應(yīng)對應(yīng)的協(xié)程永久阻塞,則傳輸數(shù)據(jù)用的通道必須為一個(gè)容量至少為N-1的緩沖通道。

package main

import (
	"fmt"
	"time"
	"math/rand"
)

func source(c chan<- int32) {
	ra, rb := rand.Int31(), rand.Intn(3) + 1
	// 睡眠1秒/2秒/3秒
	time.Sleep(time.Duration(rb) * time.Second)
	c <- ra
}

func main() {
	rand.Seed(time.Now().UnixNano())

	startTime := time.Now()
	c := make(chan int32, 5) // 必須用一個(gè)緩沖通道
	for i := 0; i < cap(c); i++ {
		go source(c)
	}
	rnd := <- c // 只有第一個(gè)回應(yīng)被使用了
	fmt.Println(time.Since(startTime))
	fmt.Println(rnd)
}

“采用最快回應(yīng)”用例還有一些其它實(shí)現(xiàn)方式,本文后面將會談及。

更多“請求/回應(yīng)”用例變種

做為函數(shù)參數(shù)和返回結(jié)果使用的通道可以是緩沖的,從而使得請求協(xié)程不需阻塞到它所發(fā)送的數(shù)據(jù)被接收為止。

有時(shí),一個(gè)請求可能并不保證返回一份有效的數(shù)據(jù)。對于這種情形,我們可以使用一個(gè)形如struct{v T; err error}的結(jié)構(gòu)體類型或者一個(gè)空接口類型做為通道的元素類型以用來區(qū)分回應(yīng)的值是否有效。

有時(shí),一個(gè)請求可能需要比預(yù)期更長的用時(shí)才能回應(yīng),甚至永遠(yuǎn)都得不到回應(yīng)。 我們可以使用本文后面將要介紹的超時(shí)機(jī)制來應(yīng)對這樣的情況。

有時(shí),回應(yīng)方可能會不斷地返回一系列值,這也同時(shí)屬于后面將要介紹的數(shù)據(jù)流的一個(gè)用例。

使用通道實(shí)現(xiàn)通知

通知可以被看作是特殊的請求/回應(yīng)用例。在一個(gè)通知用例中,我們并不關(guān)心回應(yīng)的值,我們只關(guān)心回應(yīng)是否已發(fā)生。 所以我們常常使用空結(jié)構(gòu)體類型struct{}來做為通道的元素類型,因?yàn)榭战Y(jié)構(gòu)體類型的尺寸為零,能夠節(jié)省一些內(nèi)存(雖然常常很少量)。

向一個(gè)通道發(fā)送一個(gè)值來實(shí)現(xiàn)單對單通知

我們已知道,如果一個(gè)通道中無值可接收,則此通道上的下一個(gè)接收操作將阻塞到另一個(gè)協(xié)程發(fā)送一個(gè)值到此通道為止。 所以一個(gè)協(xié)程可以向此通道發(fā)送一個(gè)值來通知另一個(gè)等待著從此通道接收數(shù)據(jù)的協(xié)程。

在下面這個(gè)例子中,通道done被用來做為一個(gè)信號通道來實(shí)現(xiàn)單對單通知。

package main

import (
	"crypto/rand"
	"fmt"
	"os"
	"sort"
)

func main() {
	values := make([]byte, 32 * 1024 * 1024)
	if _, err := rand.Read(values); err != nil {
		fmt.Println(err)
		os.Exit(1)
	}

	done := make(chan struct{}) // 也可以是緩沖的

	// 排序協(xié)程
	go func() {
		sort.Slice(values, func(i, j int) bool {
			return values[i] < values[j]
		})
		done <- struct{}{} // 通知排序已完成
	}()

	// 并發(fā)地做一些其它事情...

	<- done // 等待通知
	fmt.Println(values[0], values[len(values)-1])
}

從一個(gè)通道接收一個(gè)值來實(shí)現(xiàn)單對單通知

如果一個(gè)通道的數(shù)據(jù)緩沖隊(duì)列已滿(非緩沖的通道的數(shù)據(jù)緩沖隊(duì)列總是滿的)但它的發(fā)送協(xié)程隊(duì)列為空,則向此通道發(fā)送一個(gè)值將阻塞,直到另外一個(gè)協(xié)程從此通道接收一個(gè)值為止。 所以我們可以通過從一個(gè)通道接收數(shù)據(jù)來實(shí)現(xiàn)單對單通知。一般我們使用非緩沖通道來實(shí)現(xiàn)這樣的通知。

這種通知方式不如上例中介紹的方式使用得廣泛。

package main

import (
	"fmt"
	"time"
)

func main() {
	done := make(chan struct{})
		// 此信號通道也可以緩沖為1。如果這樣,則在下面
		// 這個(gè)協(xié)程創(chuàng)建之前,我們必須向其中寫入一個(gè)值。

	go func() {
		fmt.Print("Hello")
		// 模擬一個(gè)工作負(fù)載。
		time.Sleep(time.Second * 2)

		// 使用一個(gè)接收操作來通知主協(xié)程。
		<- done
	}()

	done <- struct{}{} // 阻塞在此,等待通知
	fmt.Println(" world!")
}

另一個(gè)事實(shí)是,上面的兩種單對單通知方式其實(shí)并沒有本質(zhì)的區(qū)別。 它們都可以被概括為較快者等待較慢者發(fā)出通知。

多對單和單對多通知

略微擴(kuò)展一下上面兩個(gè)用例,我們可以很輕松地實(shí)現(xiàn)多對單和單對多通知。

package main

import "log"
import "time"

type T = struct{}

func worker(id int, ready <-chan T, done chan<- T) {
	<-ready // 阻塞在此,等待通知
	log.Print("Worker#", id, "開始工作")
	// 模擬一個(gè)工作負(fù)載。
	time.Sleep(time.Second * time.Duration(id+1))
	log.Print("Worker#", id, "工作完成")
	done <- T{} // 通知主協(xié)程(N-to-1)
}

func main() {
	log.SetFlags(0)

	ready, done := make(chan T), make(chan T)
	go worker(0, ready, done)
	go worker(1, ready, done)
	go worker(2, ready, done)

	// 模擬一個(gè)初始化過程
	time.Sleep(time.Second * 3 / 2)
	// 單對多通知
	ready <- T{}; ready <- T{}; ready <- T{}
	// 等待被多對單通知
	<-done; <-done; <-done
}

事實(shí)上,上例中展示的多對單和單對多通知實(shí)現(xiàn)方式在實(shí)踐中用的并不多。 在實(shí)踐中,我們多使用sync.WaitGroup來實(shí)現(xiàn)多對單通知,使用關(guān)閉一個(gè)通道的方式來實(shí)現(xiàn)單對多通知(詳見下一個(gè)用例)。

通過關(guān)閉一個(gè)通道來實(shí)現(xiàn)群發(fā)通知

上一個(gè)用例中的單對多通知實(shí)現(xiàn)在實(shí)踐中很少用,因?yàn)橥ㄟ^關(guān)閉一個(gè)通道的方式在來實(shí)現(xiàn)單對多通知的方式更簡單。 我們已經(jīng)知道,從一個(gè)已關(guān)閉的通道可以接收到無窮個(gè)值,我們可以利用這一特性來實(shí)現(xiàn)群發(fā)通知。

我們可以把上一個(gè)例子中的三個(gè)數(shù)據(jù)發(fā)送操作ready <- struct{}{}替換為一個(gè)通道關(guān)閉操作close(ready)來達(dá)到同樣的單對多通知效果。

...
	close(ready) // 群發(fā)通知
...

當(dāng)然,我們也可以通過關(guān)閉一個(gè)通道來實(shí)現(xiàn)單對單通知。事實(shí)上,關(guān)閉通道是實(shí)踐中用得最多通知實(shí)現(xiàn)方式。

從一個(gè)已關(guān)閉的通道可以接收到無窮個(gè)值這一特性也將被用在很多其它在后面將要介紹的用例中。 實(shí)際上,這一特性被廣泛地使用于標(biāo)準(zhǔn)庫包中。比如,context標(biāo)準(zhǔn)庫包使用了此特性來傳達(dá)操作取消消息。

定時(shí)通知(timer)

用通道實(shí)現(xiàn)一個(gè)一次性的定時(shí)通知器是很簡單的。 下面是一個(gè)自定義實(shí)現(xiàn):

package main

import (
	"fmt"
	"time"
)

func AfterDuration(d time.Duration) <- chan struct{} {
	c := make(chan struct{}, 1)
	go func() {
		time.Sleep(d)
		c <- struct{}{}
	}()
	return c
}

func main() {
	fmt.Println("Hi!")
	<- AfterDuration(time.Second)
	fmt.Println("Hello!")
	<- AfterDuration(time.Second)
	fmt.Println("Bye!")
}

事實(shí)上,time標(biāo)準(zhǔn)庫包中的After函數(shù)提供了和上例中AfterDuration同樣的功能。 在實(shí)踐中,我們應(yīng)該盡量使用time.After函數(shù)以使代碼看上去更干凈。

注意,操作<-time.After(aDuration)將使當(dāng)前協(xié)程進(jìn)入阻塞狀態(tài),而一個(gè)time.Sleep(aDuration)函數(shù)調(diào)用不會如此。

<-time.After(aDuration)經(jīng)常被使用在后面將要介紹的超時(shí)機(jī)制實(shí)現(xiàn)中。

將通道用做互斥鎖(mutex)

上面的某個(gè)例子提到了容量為1的緩沖通道可以用做一次性二元信號量。 事實(shí)上,容量為1的緩沖通道也可以用做多次性二元信號量(即互斥鎖)盡管這樣的互斥鎖效率不如sync標(biāo)準(zhǔn)庫包中提供的互斥鎖高效。

有兩種方式將一個(gè)容量為1的緩沖通道用做互斥鎖:

  1. 通過發(fā)送操作來加鎖,通過接收操作來解鎖;
  2. 通過接收操作來加鎖,通過發(fā)送操作來解鎖。

下面是一個(gè)通過發(fā)送操作來加鎖的例子。

package main

import "fmt"

func main() {
	mutex := make(chan struct{}, 1) // 容量必須為1

	counter := 0
	increase := func() {
		mutex <- struct{}{} // 加鎖
		counter++
		<-mutex // 解鎖
	}

	increase1000 := func(done chan<- struct{}) {
		for i := 0; i < 1000; i++ {
			increase()
		}
		done <- struct{}{}
	}

	done := make(chan struct{})
	go increase1000(done)
	go increase1000(done)
	<-done; <-done
	fmt.Println(counter) // 2000
}

下面是一個(gè)通過接收操作來加鎖的例子,其中只顯示了相對于上例而修改了的部分。

...
func main() {
	mutex := make(chan struct{}, 1)
	mutex <- struct{}{} // 此行是必需的

	counter := 0
	increase := func() {
		<-mutex // 加鎖
		counter++
		mutex <- struct{}{} // 解鎖
	}
...

將通道用做計(jì)數(shù)信號量(counting semaphore)

緩沖通道可以被用做計(jì)數(shù)信號量。 計(jì)數(shù)信號量可以被視為多主鎖。如果一個(gè)緩沖通道的容量為N,那么它可以被看作是一個(gè)在任何時(shí)刻最多可有N個(gè)主人的鎖。 上面提到的二元信號量是特殊的計(jì)數(shù)信號量,每個(gè)二元信號量在任一時(shí)刻最多只能有一個(gè)主人。

計(jì)數(shù)信號量經(jīng)常被使用于限制最大并發(fā)數(shù)。

和將通道用做互斥鎖一樣,也有兩種方式用來獲取一個(gè)用做計(jì)數(shù)信號量的通道的一份所有權(quán)。

  1. 通過發(fā)送操作來獲取所有權(quán),通過接收操作來釋放所有權(quán);
  2. 通過接收操作來獲取所有權(quán),通過發(fā)送操作來釋放所有權(quán)。

下面是一個(gè)通過接收操作來獲取所有權(quán)的例子:

package main

import (
	"log"
	"time"
	"math/rand"
)

type Seat int
type Bar chan Seat

func (bar Bar) ServeCustomer(c int) {
	log.Print("顧客#", c, "進(jìn)入酒吧")
	seat := <- bar // 需要一個(gè)位子來喝酒
	log.Print("++ customer#", c, " drinks at seat#", seat)
	log.Print("++ 顧客#", c, "在第", seat, "個(gè)座位開始飲酒")
	time.Sleep(time.Second * time.Duration(2 + rand.Intn(6)))
	log.Print("-- 顧客#", c, "離開了第", seat, "個(gè)座位")
	bar <- seat // 釋放座位,離開酒吧
}

func main() {
	rand.Seed(time.Now().UnixNano())

	bar24x7 := make(Bar, 10) // 此酒吧有10個(gè)座位
	// 擺放10個(gè)座位。
	for seatId := 0; seatId < cap(bar24x7); seatId++ {
		bar24x7 <- Seat(seatId) // 均不會阻塞
	}

	for customerId := 0; ; customerId++ {
		time.Sleep(time.Second)
		go bar24x7.ServeCustomer(customerId)
	}
	for {time.Sleep(time.Second)} // 睡眠不屬于阻塞狀態(tài)
}

在上例中,只有獲得一個(gè)座位的顧客才能開始飲酒。 所以在任一時(shí)刻同時(shí)在喝酒的顧客數(shù)不會超過座位數(shù)10。

上例main函數(shù)中的最后一行for循環(huán)是為了防止程序退出。 后面將介紹一種更好的實(shí)現(xiàn)此目的的方法。

在上例中,盡管在任一時(shí)刻同時(shí)在喝酒的顧客數(shù)不會超過座位數(shù)10,但是在某一時(shí)刻可能有多于10個(gè)顧客進(jìn)入了酒吧,因?yàn)槟承╊櫩驮谂抨?duì)等位子。 在上例中,每個(gè)顧客對應(yīng)著一個(gè)協(xié)程。雖然協(xié)程的開銷比系統(tǒng)線程小得多,但是如果協(xié)程的數(shù)量很多,則它們的總體開銷還是不能忽略不計(jì)的。 所以,最好當(dāng)有空位的時(shí)候才創(chuàng)建顧客協(xié)程。

... // 省略了和上例相同的代碼

func (bar Bar) ServeCustomerAtSeat(c int, seat Seat) {
	log.Print("++ 顧客#", c, "在第", seat, "個(gè)座位開始飲酒")
	time.Sleep(time.Second * time.Duration(2 + rand.Intn(6)))
	log.Print("-- 顧客#", c, "離開了第", seat, "個(gè)座位")
	bar <- seat // 釋放座位,離開酒吧
}

func main() {
	rand.Seed(time.Now().UnixNano())

	bar24x7 := make(Bar, 10)
	for seatId := 0; seatId < cap(bar24x7); seatId++ {
		bar24x7 <- Seat(seatId)
	}

	// 這個(gè)for循環(huán)和上例不一樣。
	for customerId := 0; ; customerId++ {
		time.Sleep(time.Second)
		seat := <- bar24x7 // 需要一個(gè)空位招待顧客
		go bar24x7.ServeCustomerAtSeat(customerId, seat)
	}
	for {time.Sleep(time.Second)}
}

在上面這個(gè)修改后的例子中,在任一時(shí)刻最多只有10個(gè)顧客協(xié)程在運(yùn)行(但是在程序的生命期內(nèi),仍舊會有大量的顧客協(xié)程不斷被創(chuàng)建和銷毀)。

在下面這個(gè)更加高效的實(shí)現(xiàn)中,在程序的生命期內(nèi)最多只會有10個(gè)顧客協(xié)程被創(chuàng)建出來。

... // 省略了和上例相同的代碼

func (bar Bar) ServeCustomerAtSeat(consumers chan int) {
	for c := range consumers {
		seatId := <- bar
		log.Print("++ 顧客#", c, "在第", seatId, "個(gè)座位開始飲酒")
		time.Sleep(time.Second * time.Duration(2 + rand.Intn(6)))
		log.Print("-- 顧客#", c, "離開了第", seatId, "個(gè)座位")
		bar <- seatId // 釋放座位,離開酒吧
	}
}

func main() {
	rand.Seed(time.Now().UnixNano())

	bar24x7 := make(Bar, 10)
	for seatId := 0; seatId < cap(bar24x7); seatId++ {
		bar24x7 <- Seat(seatId)
	}

	consumers := make(chan int)
	for i := 0; i < cap(bar24x7); i++ {
		go bar24x7.ServeCustomerAtSeat(consumers)
	}
	
	for customerId := 0; ; customerId++ {
		time.Sleep(time.Second)
		consumers <- customerId
	}
}

題外話:當(dāng)然,如果我們并不關(guān)心座位號(這種情況在編程實(shí)踐中很常見),則實(shí)際上bar24x7計(jì)數(shù)信號量是完全不需要的:

... // 省略了和上例相同的代碼

func ServeCustomer(consumers chan int) {
	for c := range consumers {
		log.Print("++ 顧客#", c, "開始在酒吧飲酒")
		time.Sleep(time.Second * time.Duration(2 + rand.Intn(6)))
		log.Print("-- 顧客#", c, "離開了酒吧")
	}
}

func main() {
	rand.Seed(time.Now().UnixNano())

	const BarSeatCount = 10
	consumers := make(chan int)
	for i := 0; i < BarSeatCount; i++ {
		go ServeCustomer(consumers)
	}
	
	for customerId := 0; ; customerId++ {
		time.Sleep(time.Second)
		consumers <- customerId
	}
}

通過發(fā)送操作來獲取所有權(quán)的實(shí)現(xiàn)相對簡單一些,省去了擺放座位的步驟。

package main

import (
	"log"
	"time"
	"math/rand"
)

type Customer struct{id int}
type Bar chan Customer

func (bar Bar) ServeCustomer(c Customer) {
	log.Print("++ 顧客#", c.id, "開始飲酒")
	time.Sleep(time.Second * time.Duration(3 + rand.Intn(16)))
	log.Print("-- 顧客#", c.id, "離開酒吧")
	<- bar // 離開酒吧,騰出位子
}

func main() {
	rand.Seed(time.Now().UnixNano())

	bar24x7 := make(Bar, 10) // 最對同時(shí)服務(wù)10位顧客
	for customerId := 0; ; customerId++ {
		time.Sleep(time.Second * 2)
		customer := Customer{customerId}
		bar24x7 <- customer // 等待進(jìn)入酒吧
		go bar24x7.ServeCustomer(customer)
	}
	for {time.Sleep(time.Second)}
}

對話(或稱乒乓)

兩個(gè)協(xié)程可以通過一個(gè)通道進(jìn)行對話,整個(gè)過程宛如打乒乓球一樣。 下面是一個(gè)這樣的例子,它將打印出一系列斐波那契(Fibonacci)數(shù)。

package main

import "fmt"
import "time"
import "os"

type Ball uint64

func Play(playerName string, table chan Ball) {
	var lastValue Ball = 1
	for {
		ball := <- table // 接球
		fmt.Println(playerName, ball)
		ball += lastValue
		if ball < lastValue { // 溢出結(jié)束
			os.Exit(0)
		}
		lastValue = ball
		table <- ball // 回球
		time.Sleep(time.Second)
	}
}

func main() {
	table := make(chan Ball)
	go func() {
		table <- 1 // (裁判)發(fā)球
	}()
	go Play("A:", table)
	Play("B:", table)
}

使用通道傳送傳輸通道

一個(gè)通道類型的元素類型可以是另一個(gè)通道類型。 在下面這個(gè)例子中, 單向發(fā)送通道類型chan<- int是另一個(gè)通道類型chan chan<- int的元素類型。

package main

import "fmt"

var counter = func (n int) chan<- chan<- int {
	requests := make(chan chan<- int)
	go func() {
		for request := range requests {
			if request == nil {
				n++ // 遞增計(jì)數(shù)
			} else {
				request <- n // 返回當(dāng)前計(jì)數(shù)
			}
		}
	}()
	return requests // 隱式轉(zhuǎn)換到類型chan<- (chan<- int)
}(0)

func main() {
	increase1000 := func(done chan<- struct{}) {
		for i := 0; i < 1000; i++ {
			counter <- nil
		}
		done <- struct{}{}
	}

	done := make(chan struct{})
	go increase1000(done)
	go increase1000(done)
	<-done; <-done

	request := make(chan int, 1)
	counter <- request
	fmt.Println(<-request) // 2000
}

盡管對于上面這個(gè)用例來說,使用通道傳送傳輸通道這種方式并非是最有效的實(shí)現(xiàn)方式,但是這種方式肯定有最適合它的用武之地。

檢查通道的長度和容量

我們可以使用內(nèi)置函數(shù)caplen來查看一個(gè)通道的容量和當(dāng)前長度。 但是在實(shí)踐中我們很少這樣做。我們很少使用內(nèi)置函數(shù)cap的原因是一個(gè)通道的容量常常是已知的或者不重要的。 我們很少使用內(nèi)置函數(shù)len的原因是一個(gè)len調(diào)用的結(jié)果并不能總能準(zhǔn)確地反映出的一個(gè)通道的當(dāng)前長度。

但有時(shí)確實(shí)有一些場景需要調(diào)用這兩個(gè)函數(shù)。比如,有時(shí)一個(gè)協(xié)程欲將一個(gè)未關(guān)閉的并且不會再向其中發(fā)送數(shù)據(jù)的緩沖通道中的所有數(shù)據(jù)接收出來,在確保只有此一個(gè)協(xié)程從此通道接收數(shù)據(jù)的情況下,我們可以用下面的代碼來實(shí)現(xiàn)之:

for len(c) > 0 {
	value := <-c
	// 使用value ...
}

我們也可以用本文后面將要介紹的嘗試接收機(jī)制來實(shí)現(xiàn)這一需求。兩者的運(yùn)行效率差不多,但嘗試接收機(jī)制的優(yōu)點(diǎn)是多個(gè)協(xié)程可以并發(fā)地進(jìn)行讀取操作。

有時(shí)一個(gè)協(xié)程欲將一個(gè)緩沖通道寫滿而又不阻塞,在確保只有此一個(gè)協(xié)程向此通道發(fā)送數(shù)據(jù)的情況下,我們可以用下面的代碼實(shí)現(xiàn)這一目的:

for len(c) < cap(c) {
	c <- aValue
}

當(dāng)然,我們也可以使用后面將要介紹的嘗試發(fā)送機(jī)制來實(shí)現(xiàn)這一需求。

使當(dāng)前協(xié)程永久阻塞

Go中的選擇機(jī)制(select)是一個(gè)非常獨(dú)特的特性。它給并發(fā)編程帶來了很多新的模式和技巧。

我們可以用一個(gè)無分支的select流程控制代碼塊使當(dāng)前協(xié)程永久處于阻塞狀態(tài)。 這是select流程控制的最簡單的應(yīng)用。 事實(shí)上,上面很多例子中的for {time.Sleep(time.Second)}都可以換為select{}。

一般,select{}用在主協(xié)程中以防止程序退出。

一個(gè)例子:

package main

import "runtime"

func DoSomething() {
	for {
		// 做點(diǎn)什么...

		runtime.Gosched() // 防止本協(xié)程霸占CPU不放
	}
}

func main() {
	go DoSomething()
	go DoSomething()
	select{}
}

順便說一句,另外還有一些使當(dāng)前協(xié)程永久阻塞的方法,但是select{}是最簡單的方法。

嘗試發(fā)送和嘗試接收

含有一個(gè)default分支和一個(gè)case分支的select代碼塊可以被用做一個(gè)嘗試發(fā)送或者嘗試接收操作,取決于case關(guān)鍵字后跟隨的是一個(gè)發(fā)送操作還是一個(gè)接收操作。

  • 如果case關(guān)鍵字后跟隨的是一個(gè)發(fā)送操作,則此select代碼塊為一個(gè)嘗試發(fā)送操作。 如果case分支的發(fā)送操作是阻塞的,則default分支將被執(zhí)行,發(fā)送失?。环駝t發(fā)送成功,case分支得到執(zhí)行。
  • 如果case關(guān)鍵字后跟隨的是一個(gè)接收操作,則此select代碼塊為一個(gè)嘗試接收操作。 如果case分支的接收操作是阻塞的,則default分支將被執(zhí)行,接收失??;否則接收成功,case分支得到執(zhí)行。

嘗試發(fā)送和嘗試接收代碼塊永不阻塞。

標(biāo)準(zhǔn)編譯器對嘗試發(fā)送和嘗試接收代碼塊做了特別的優(yōu)化,使得它們的執(zhí)行效率比多case分支的普通select代碼塊執(zhí)行效率高得多。

下例演示了嘗試發(fā)送和嘗試接收代碼塊的工作原理。

package main

import "fmt"

func main() {
	type Book struct{id int}
	bookshelf := make(chan Book, 3)

	for i := 0; i < cap(bookshelf) * 2; i++ {
		select {
		case bookshelf <- Book{id: i}:
			fmt.Println("成功將書放在書架上", i)
		default:
			fmt.Println("書架已經(jīng)被占滿了")
		}
	}

	for i := 0; i < cap(bookshelf) * 2; i++ {
		select {
		case book := <-bookshelf:
			fmt.Println("成功從書架上取下一本書", book.id)
		default:
			fmt.Println("書架上已經(jīng)沒有書了")
		}
	}
}

輸出結(jié)果:

成功將書放在書架上 0
成功將書放在書架上 1
成功將書放在書架上 2
書架已經(jīng)被占滿了
書架已經(jīng)被占滿了
書架已經(jīng)被占滿了
成功從書架上取下一本書 0
成功從書架上取下一本書 1
成功從書架上取下一本書 2
書架上已經(jīng)沒有書了
書架上已經(jīng)沒有書了
書架上已經(jīng)沒有書了

后面的很多用例還要用到嘗試發(fā)送和嘗試接收代碼塊。

無阻塞地檢查一個(gè)通道是否已經(jīng)關(guān)閉

假設(shè)我們可以保證沒有任何協(xié)程會向一個(gè)通道發(fā)送數(shù)據(jù),則我們可以使用下面的代碼來(并發(fā)安全地)檢查此通道是否已經(jīng)關(guān)閉,此檢查不會阻塞當(dāng)前協(xié)程。

func IsClosed(c chan T) bool {
	select {
	case <-c:
		return true
	default:
	}
	return false
}

此方法常用來查看某個(gè)期待中的通知是否已經(jīng)來臨。此通知將由另一個(gè)協(xié)程通過關(guān)閉一個(gè)通道來發(fā)送。

峰值限制(peak/burst limiting)

通道用做計(jì)數(shù)信號量用例和通道嘗試(發(fā)送或者接收)操作結(jié)合起來可用實(shí)現(xiàn)峰值限制。 峰值限制的目的是防止過大的并發(fā)請求數(shù)。

下面是對將通道用做計(jì)數(shù)信號量一節(jié)中的最后一個(gè)例子的簡單修改,從而使得顧客不再等待而是離去或者尋找其它酒吧。

...
	bar24x7 := make(Bar, 10) // 此酒吧只能同時(shí)招待10個(gè)顧客
	for customerId := 0; ; customerId++ {
		time.Sleep(time.Second)
		consumer := Consumer{customerId}
		select {
		case bar24x7 <- consumer: // 試圖進(jìn)入此酒吧
			go bar24x7.ServeConsumer(consumer)
		default:
			log.Print("顧客#", customerId, "不愿等待而離去")
		}
	}
...

另一種“采用最快回應(yīng)”的實(shí)現(xiàn)方式

在上面的“采用最快回應(yīng)”用例一節(jié)已經(jīng)提到,我們也可以使用選擇機(jī)制來實(shí)現(xiàn)“采用最快回應(yīng)”用例。 每個(gè)數(shù)據(jù)源協(xié)程只需使用一個(gè)緩沖為1的通道并向其嘗試發(fā)送回應(yīng)數(shù)據(jù)即可。示例代碼如下:

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func source(c chan<- int32) {
	ra, rb := rand.Int31(), rand.Intn(3)+1
	// 休眠1秒/2秒/3秒
	time.Sleep(time.Duration(rb) * time.Second)
	select {
	case c <- ra:
	default:
	}
}

func main() {
	rand.Seed(time.Now().UnixNano())

	c := make(chan int32, 1) // 此通道容量必須至少為1
	for i := 0; i < 5; i++ {
		go source(c)
	}
	rnd := <-c // 只采用第一個(gè)成功發(fā)送的回應(yīng)數(shù)據(jù)
	fmt.Println(rnd)
}

注意,使用選擇機(jī)制來實(shí)現(xiàn)“采用最快回應(yīng)”的代碼中使用的通道的容量必須至少為1,以保證最快回應(yīng)總能夠發(fā)送成功。 否則,如果數(shù)據(jù)請求者因?yàn)榉N種原因未及時(shí)準(zhǔn)備好接收,則所有回應(yīng)者的嘗試發(fā)送都將失敗,從而所有回應(yīng)的數(shù)據(jù)都將被錯(cuò)過。

第三種“采用最快回應(yīng)”的實(shí)現(xiàn)方式

如果一個(gè)“采用最快回應(yīng)”用例中的數(shù)據(jù)源的數(shù)量很少,比如兩個(gè)或三個(gè),我們可以讓每個(gè)數(shù)據(jù)源使用一個(gè)單獨(dú)的緩沖通道來回應(yīng)數(shù)據(jù),然后使用一個(gè)select代碼塊來同時(shí)接收這三個(gè)通道。 示例代碼如下:

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func source() <-chan int32 {
	c := make(chan int32, 1) // 必須為一個(gè)緩沖通道
	go func() {
		ra, rb := rand.Int31(), rand.Intn(3)+1
		time.Sleep(time.Duration(rb) * time.Second)
		c <- ra
	}()
	return c
}

func main() {
	rand.Seed(time.Now().UnixNano())

	var rnd int32
	// 阻塞在此直到某個(gè)數(shù)據(jù)源率先回應(yīng)。
	select{
	case rnd = <-source():
	case rnd = <-source():
	case rnd = <-source():
	}
	fmt.Println(rnd)
}

注意:如果上例中使用的通道是非緩沖的,未被選中的case分支對應(yīng)的兩個(gè)source函數(shù)調(diào)用中開辟的協(xié)程將處于永久阻塞狀態(tài),從而造成內(nèi)存泄露。

本小節(jié)和上一小節(jié)中展示的兩種方法也可以用來實(shí)現(xiàn)多對單通知。

超時(shí)機(jī)制(timeout)

在一些請求/回應(yīng)用例中,一個(gè)請求可能因?yàn)榉N種原因?qū)е滦枰鲱A(yù)期的時(shí)長才能得到回應(yīng),有時(shí)甚至永遠(yuǎn)得不到回應(yīng)。 對于這樣的情形,我們可以使用一個(gè)超時(shí)方案給請求者返回一個(gè)錯(cuò)誤信息。 使用選擇機(jī)制可以很輕松地實(shí)現(xiàn)這樣的一個(gè)超時(shí)方案。

下面這個(gè)例子展示了如何實(shí)現(xiàn)一個(gè)支持超時(shí)設(shè)置的請求:

func requestWithTimeout(timeout time.Duration) (int, error) {
	c := make(chan int)
	go doRequest(c) // 可能需要超出預(yù)期的時(shí)長回應(yīng)

	select {
	case data := <-c:
		return data, nil
	case <-time.After(timeout):
		return 0, errors.New("超時(shí)了!")
	}
}

脈搏器(ticker)

我們可以使用嘗試發(fā)送操作來實(shí)現(xiàn)一個(gè)每隔一定時(shí)間發(fā)送一個(gè)信號的脈搏器。

package main

import "fmt"
import "time"

func Tick(d time.Duration) <-chan struct{} {
	c := make(chan struct{}, 1) // 容量最好為1
	go func() {
		for {
			time.Sleep(d)
			select {
			case c <- struct{}{}:
			default:
			}
		}
	}()
	return c
}

func main() {
	t := time.Now()
	for range Tick(time.Second) {
		fmt.Println(time.Since(t))
	}
}

事實(shí)上,time標(biāo)準(zhǔn)庫包中的Tick函數(shù)提供了同樣的功能,但效率更高。 我們應(yīng)該盡量使用標(biāo)準(zhǔn)庫包中的實(shí)現(xiàn)。

速率限制(rate limiting)

上面已經(jīng)展示了如何使用嘗試發(fā)送實(shí)現(xiàn)峰值限制。 同樣地,我們也可以使用使用嘗試機(jī)制來實(shí)現(xiàn)速率限制,但需要前面剛提到的定時(shí)器實(shí)現(xiàn)的配合。 速率限制常用來限制吞吐和確保在一段時(shí)間內(nèi)的資源使用不會超標(biāo)。

下面的例子借鑒了官方Go維基中的例子。 在此例中,任何一分鐘時(shí)段內(nèi)處理的請求數(shù)不會超過200。

package main

import "fmt"
import "time"

type Request interface{}
func handle(r Request) {fmt.Println(r.(int))}

const RateLimitPeriod = time.Minute
const RateLimit = 200 // 任何一分鐘內(nèi)最多處理200個(gè)請求

func handleRequests(requests <-chan Request) {
	quotas := make(chan time.Time, RateLimit)

	go func() {
		tick := time.NewTicker(RateLimitPeriod / RateLimit)
		defer tick.Stop()
		for t := range tick.C {
			select {
			case quotas <- t:
			default:
			}
		}
	}()

	for r := range requests {
		<-quotas
		go handle(r)
	}
}

func main() {
	requests := make(chan Request)
	go handleRequests(requests)
	// time.Sleep(time.Minute)
	for i := 0; ; i++ {requests <- i}
}

上例的代碼雖然可以保證任何一分鐘時(shí)段內(nèi)處理的請求數(shù)不會超過200,但是如果在開始的一分鐘內(nèi)沒有任何請求,則接下來的某個(gè)瞬時(shí)時(shí)間點(diǎn)可能會同時(shí)處理最多200個(gè)請求(試著將time.Sleep行的注釋去掉看看)。 這可能會造成卡頓情況。我們可以將速率限制和峰值限制一并使用來避免出現(xiàn)這樣的情況。

開關(guān)

通道一文提到了向一個(gè)nil通道發(fā)送數(shù)據(jù)或者從中接收數(shù)據(jù)都屬于阻塞操作。 利用這一事實(shí),我們可以將一個(gè)select流程控制中的case操作中涉及的通道設(shè)置為不同的值,以使此select流程控制選擇執(zhí)行不同的分支。

下面是另一個(gè)乒乓模擬游戲的實(shí)現(xiàn)。此實(shí)現(xiàn)使用了選擇機(jī)制。在此例子中,兩個(gè)case操作中的通道有且只有一個(gè)為nil,所以只能是不為nil的通道對應(yīng)的分支被選中。 每個(gè)循環(huán)步將對調(diào)這兩個(gè)case操作中的通道,從而改變兩個(gè)分支的可被選中狀態(tài)。

package main

import "fmt"
import "time"
import "os"

type Ball uint8
func Play(playerName string, table chan Ball, serve bool) {
	var receive, send chan Ball
	if serve {
		receive, send = nil, table
	} else {
		receive, send = table, nil
	}
	var lastValue Ball = 1
	for {
		select {
		case send <- lastValue:
		case value := <- receive:
			fmt.Println(playerName, value)
			value += lastValue
			if value < lastValue { // 溢出了
				os.Exit(0)
			}
			lastValue = value
		}
		receive, send = send, receive // 開關(guān)切換
		time.Sleep(time.Second)
	}
}

func main() {
	table := make(chan Ball)
	go Play("A:", table, false)
	Play("B:", table, true)
}

下面是另一個(gè)也展示了開關(guān)效果的但簡單得多的(非并發(fā)的)小例子。 此程序?qū)⒉粩啻蛴〕?code>1212...。 它在實(shí)踐中沒有太多實(shí)用價(jià)值,這里只是為了學(xué)習(xí)的目的才展示之。

package main

import "fmt"
import "time"

func main() {
	for c := make(chan struct{}, 1); true; {
		select {
		case c <- struct{}{}:
			fmt.Print("1")
		case <-c:
			fmt.Print("2")
		}
		time.Sleep(time.Second)
	}
}

控制代碼被執(zhí)行的幾率

我們可以通過在一個(gè)select流程控制中使用重復(fù)的case操作來增加對應(yīng)分支中的代碼的執(zhí)行幾率。

一個(gè)例子:

package main

import "fmt"

func main() {
	foo, bar := make(chan struct{}), make(chan struct{})
	close(foo); close(bar) // 僅為演示目的
	x, y := 0.0, 0.0
	f := func(){x++}
	g := func(){y++}
	for i := 0; i < 100000; i++ {
		select {
		case <-foo: f()
		case <-foo: f()
		case <-bar: g()
		}
	}
	fmt.Println(x/y) // 大致為2
}

在上面這個(gè)例子中,函數(shù)f的調(diào)用執(zhí)行幾率大致為函數(shù)g的兩倍。

從動態(tài)數(shù)量的分支中選擇

每個(gè)select控制流程中的分支數(shù)量在運(yùn)行中是固定的,但是我們可以使用reflect標(biāo)準(zhǔn)庫包中提供的功能在運(yùn)行時(shí)刻來構(gòu)建動態(tài)分支數(shù)量的select控制流程。 但是請注意:一個(gè)select控制流程中的分支越多,此select控制流程的執(zhí)行效率就越低(這是我們常常只使用不多于三個(gè)分支的select控制流程的原因)。

reflect標(biāo)準(zhǔn)庫包中也提供了模擬嘗試發(fā)送和嘗試接收代碼塊的TrySendTryRecv函數(shù)。

數(shù)據(jù)流操縱

本節(jié)將介紹一些使用通道進(jìn)行數(shù)據(jù)流處理的用例。

一般來說,一個(gè)數(shù)據(jù)流處理程序由多個(gè)模塊組成。不同的模塊執(zhí)行分配給它們的不同的任務(wù)。 每個(gè)模塊由一個(gè)或者數(shù)個(gè)并行工作的協(xié)程組成。實(shí)踐中常見的工作任務(wù)包括:

  • 數(shù)據(jù)生成/搜集/加載;
  • 數(shù)據(jù)服務(wù)/存盤;
  • 數(shù)據(jù)計(jì)算/處理;
  • 數(shù)據(jù)驗(yàn)證/過濾;
  • 數(shù)據(jù)聚合/分流;
  • 數(shù)據(jù)組合/拆分;
  • 數(shù)據(jù)復(fù)制/增殖;
  • 等等。

一個(gè)模塊中的工作協(xié)程從一些其它模塊接收數(shù)據(jù)做為輸入,并向另一些模塊發(fā)送輸出數(shù)據(jù)。 換句話數(shù),一個(gè)模塊可能同時(shí)兼任數(shù)據(jù)消費(fèi)者和數(shù)據(jù)產(chǎn)生者的角色。

多個(gè)模塊一起組成了一個(gè)數(shù)據(jù)流處理系統(tǒng)。

下面將展示一些模塊工作協(xié)程的實(shí)現(xiàn)。這些實(shí)現(xiàn)僅僅是為了解釋目的,所以它們都很簡單,并且它們可能并不高效。

數(shù)據(jù)生成/搜集/加載

一個(gè)數(shù)據(jù)產(chǎn)生者可能通過以下途徑生成數(shù)據(jù):

  • 加載一個(gè)文件、或者讀取一個(gè)數(shù)據(jù)庫、或者用爬蟲抓取網(wǎng)頁數(shù)據(jù);
  • 從一個(gè)軟件或者硬件系統(tǒng)搜集各種數(shù)據(jù);
  • 產(chǎn)生一系列隨機(jī)數(shù);
  • 等等。

這里,我們使用一個(gè)隨機(jī)數(shù)產(chǎn)生器做為一個(gè)數(shù)據(jù)產(chǎn)生者的例子。 此數(shù)據(jù)產(chǎn)生者函數(shù)沒有輸入,只有輸出。

import (
	"crypto/rand"
	"encoding/binary"
)

func RandomGenerator() <-chan uint64 {
	c := make(chan uint64)
	go func() {
		rnds := make([]byte, 8)
		for {
			_, err := rand.Read(rnds)
			if err != nil {
				close(c)
				break
			}
			c <- binary.BigEndian.Uint64(rnds)
		}
	}()
	return c
}

事實(shí)上,此隨機(jī)數(shù)產(chǎn)生器是一個(gè)多返回值的future/promise。

一個(gè)數(shù)據(jù)產(chǎn)生者可以在任何時(shí)刻關(guān)閉返回的通道以結(jié)束數(shù)據(jù)生成。

數(shù)據(jù)聚合

一個(gè)數(shù)據(jù)聚合模塊的工作協(xié)程將多個(gè)數(shù)據(jù)流合為一個(gè)數(shù)據(jù)流。 假設(shè)數(shù)據(jù)類型為int64,下面這個(gè)函數(shù)將任意數(shù)量的數(shù)據(jù)流合為一個(gè)。

func Aggregator(inputs ...<-chan uint64) <-chan uint64 {
	out := make(chan uint64)
	for _, in := range inputs {
		go func(in <-chan uint64) {
			for {
				out <- <-in // <=> out <- (<-in)
			}
		}(in)
	}
	return out
}

一個(gè)更完美的實(shí)現(xiàn)需要考慮一個(gè)輸入數(shù)據(jù)流是否已經(jīng)關(guān)閉。(下面要介紹的其它工作協(xié)程同理。)

import "sync"

func Aggregator(inputs ...<-chan uint64) <-chan uint64 {
	output := make(chan uint64)
	var wg sync.WaitGroup
	for _, in := range inputs {
		wg.Add(1)
		go func(int <-chan uint64) {
			defer wg.Done()
			// 如果通道in被關(guān)閉,此循環(huán)將最終結(jié)束。
			for x := range in {
				output <- x
			}
		}(in)
	}
	go func() {
		wg.Wait()
		close(output)
	}()
	return output
}

如果被聚合的數(shù)據(jù)流的數(shù)量很小,我們也可以使用一個(gè)select控制流程代碼塊來聚合這些數(shù)據(jù)流。

// 假設(shè)數(shù)據(jù)流的數(shù)量為2。
...
	output := make(chan uint64)
	go func() {
		inA, inB := inputs[0], inputs[1]
		for {
			select {
			case v := <- inA: output <- v
			case v := <- inB: output <- v
			}
		}
	}
...

數(shù)據(jù)分流

數(shù)據(jù)分流是數(shù)據(jù)聚合的逆過程。數(shù)據(jù)分流的實(shí)現(xiàn)很簡單,但在實(shí)踐中用的并不多。

func Divisor(input <-chan uint64, outputs ...chan<- uint64) {
	for _, out := range outputs {
		go func(o chan<- uint64) {
			for {
				o <- <-input // <=> o <- (<-input)
			}
		}(out)
	}
}

數(shù)據(jù)合成

數(shù)據(jù)合成將多個(gè)數(shù)據(jù)流中讀取的數(shù)據(jù)合成一個(gè)。

下面是一個(gè)數(shù)據(jù)合成工作函數(shù)的實(shí)現(xiàn)中,從兩個(gè)不同數(shù)據(jù)流讀取的兩個(gè)uint64值組成了一個(gè)新的uint64值。 當(dāng)然,在實(shí)踐中,數(shù)據(jù)的組合比這復(fù)雜得多。

func Composor(inA, inB <-chan uint64) <-chan uint64 {
	output := make(chan uint64)
	go func() {
		for {
			a1, b, a2 := <-inA, <-inB, <-inA
			output <- a1 ^ b & a2
		}
	}()
	return output
}

數(shù)據(jù)分解

數(shù)據(jù)分解是數(shù)據(jù)合成的逆過程。一個(gè)數(shù)據(jù)分解者從一個(gè)通道讀取一份數(shù)據(jù),并將此數(shù)據(jù)分解為多份數(shù)據(jù)。 這里就不舉例了。

數(shù)據(jù)復(fù)制/增殖

數(shù)據(jù)復(fù)制(增殖)可以看作是特殊的數(shù)據(jù)分解。一份輸入數(shù)據(jù)將被復(fù)制多份并輸出給多個(gè)數(shù)據(jù)流。

一個(gè)例子:

func Duplicator(in <-chan uint64) (<-chan uint64, <-chan uint64) {
	outA, outB := make(chan uint64), make(chan uint64)
	go func() {
		for x := range in {
			outA <- x
			outB <- x
		}
	}()
	return outA, outB
}

數(shù)據(jù)計(jì)算/分析

數(shù)據(jù)計(jì)算和數(shù)據(jù)分析模塊的功能因具體程序不同而有很大的差異。 一般來說,數(shù)據(jù)分析者接收一份數(shù)據(jù)并對之加工處理后轉(zhuǎn)換為另一份數(shù)據(jù)。

下面的簡單示例中,每個(gè)輸入的uint64值將被進(jìn)行位反轉(zhuǎn)后輸出。

func Calculator(in <-chan uint64, out chan uint64) (<-chan uint64) {
	if out == nil {
		out = make(chan uint64)
	}
	go func() {
		for x := range in {
			out <- ^x
		}
	}()
	return out
}

數(shù)據(jù)驗(yàn)證/過濾

一個(gè)數(shù)據(jù)驗(yàn)證或過濾者的任務(wù)是檢查輸入數(shù)據(jù)的合理性并拋棄不合理的數(shù)據(jù)。 比如,下面的工作者協(xié)程將拋棄所有的非素?cái)?shù)。

import "math/big"

func Filter0(input <-chan uint64, output chan uint64) <-chan uint64 {
	if output == nil {
		output = make(chan uint64)
	}
	go func() {
		bigInt := big.NewInt(0)
		for x := range input {
			bigInt.SetUint64(x)
			if bigInt.ProbablyPrime(1) {
				output <- x
			}
		}
	}()
	return output
}

func Filter(input <-chan uint64) <-chan uint64 {
	return Filter0(input, nil)
}

請注意這兩個(gè)函數(shù)版本分別被本文下面最后展示的兩個(gè)例子所使用。

數(shù)據(jù)服務(wù)/存盤

一般,一個(gè)數(shù)據(jù)服務(wù)或者存盤模塊為一個(gè)數(shù)據(jù)流系統(tǒng)中的最后一個(gè)模塊。 這里的實(shí)現(xiàn)值是簡單地將數(shù)據(jù)輸出到終端。

import "fmt"

func Printer(input <-chan uint64) {
	for x := range input {
		fmt.Println(x)
	}
}

組裝數(shù)據(jù)流系統(tǒng)

現(xiàn)在,讓我們使用上面的模塊工作者函數(shù)實(shí)現(xiàn)來組裝一些數(shù)據(jù)流系統(tǒng)。 組裝數(shù)據(jù)流僅僅是創(chuàng)建一些工作者協(xié)程函數(shù)調(diào)用,并為這些調(diào)用指定輸入數(shù)據(jù)流和輸出數(shù)據(jù)流。

數(shù)據(jù)流系統(tǒng)例子1(一個(gè)流線型系統(tǒng)):

package main

... // 上面的模塊工作者函數(shù)實(shí)現(xiàn)

func main() {
	Printer(
		Filter(
			Calculator(
				RandomGenerator(), nil,
			),
		),
	)
}

上面這個(gè)流線型系統(tǒng)描繪在下圖中:


數(shù)據(jù)流系統(tǒng)例子2(一個(gè)單向無環(huán)圖系統(tǒng)):

package main

... // 上面的模塊工作者函數(shù)實(shí)現(xiàn)

func main() {
	filterA := Filter(RandomGenerator())
	filterB := Filter(RandomGenerator())
	filterC := Filter(RandomGenerator())
	filter := Aggregator(filterA, filterB, filterC)
	calculatorA := Calculator(filter, nil)
	calculatorB := Calculator(filter, nil)
	calculator := Aggregator(calculatorA, calculatorB)
	Printer(calculator)
}

上面這個(gè)單向無環(huán)圖系統(tǒng)描繪在下圖中:


更復(fù)雜的數(shù)據(jù)流系統(tǒng)可以表示為任何拓?fù)浣Y(jié)構(gòu)的圖。比如一個(gè)復(fù)雜的數(shù)據(jù)流系統(tǒng)可能有多個(gè)輸出模塊。 但是有環(huán)拓?fù)浣Y(jié)構(gòu)的數(shù)據(jù)流系統(tǒng)在實(shí)踐中很少用。

從上面兩個(gè)例子可以看出,使用通道來構(gòu)建數(shù)據(jù)流系統(tǒng)是很簡單和直觀的。

從上例可以看出,通過使用數(shù)據(jù)聚合模塊,我們可以很輕松地實(shí)現(xiàn)各個(gè)模塊的工作協(xié)程數(shù)量的扇入(fan-in)和扇出(fan-out)。

事實(shí)上,我們也可以使用一個(gè)簡單的通道來代替數(shù)據(jù)聚合模塊的角色。比如,下面的代碼使用兩個(gè)通道代替了上例中的兩個(gè)數(shù)據(jù)聚合器。

package main

... // 上面的模塊工作者函數(shù)實(shí)現(xiàn)

func main() {
	c1 := make(chan uint64, 100)
	Filter0(RandomGenerator(), c1) // filterA
	Filter0(RandomGenerator(), c1) // filterB
	Filter0(RandomGenerator(), c1) // filterC
	c2 := make(chan uint64, 100)
	Calculator(c1, c2) // calculatorA
	Calculator(c1, c2) // calculatorB
	Printer(c2)
}

修改后的數(shù)據(jù)流的拓?fù)浣Y(jié)構(gòu)如下圖所示:


上面的代碼示例并沒有太多考慮如何關(guān)閉一個(gè)數(shù)據(jù)流。請閱讀此篇文章來了解如何優(yōu)雅地關(guān)閉通道。


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號