在閱讀本文之前,請先閱讀通道一文。 那篇文章詳細(xì)地解釋了通道類型和通道值,以及各種通道操作的規(guī)則細(xì)節(jié)。 一個(gè)Go新手程序員可能需要反復(fù)多次閱讀那篇文章和當(dāng)前這篇文章來精通Go通道編程。
本文余下的內(nèi)容將展示很多通道用例。 希望這篇文章能夠說服你接收下面的觀點(diǎn):
請注意,本文的目的是展示盡量多的通道用例。但是,我們應(yīng)該知道通道并不是Go支持的唯一同步技術(shù),并且通道并不是在任何情況下都是最佳的同步技術(shù)。 請閱讀原子操作和其它并發(fā)同步技術(shù)來了解更多的Go支持的同步技術(shù)。
很多其它流行語言支持future/promise來實(shí)現(xiàn)異步(并發(fā))編程。 Future/promise常常用在請求/回應(yīng)場合。
在下面這個(gè)例子中,sumSquares
函數(shù)調(diào)用的兩個(gè)實(shí)參請求并發(fā)進(jìn)行。 每個(gè)通道讀取操作將阻塞到請求返回結(jié)果為止。 兩個(gè)實(shí)參總共需要大約3秒鐘(而不是6秒鐘)準(zhǔn)備完畢(以較慢的一個(gè)為準(zhǔn))。
package main
import (
"time"
"math/rand"
"fmt"
)
func longTimeRequest() <-chan int32 {
r := make(chan int32)
go func() {
time.Sleep(time.Second * 3) // 模擬一個(gè)工作負(fù)載
r <- rand.Int31n(100)
}()
return r
}
func sumSquares(a, b int32) int32 {
return a*a + b*b
}
func main() {
rand.Seed(time.Now().UnixNano())
a, b := longTimeRequest(), longTimeRequest()
fmt.Println(sumSquares(<-a, <-b))
}
和上例一樣,在下面這個(gè)例子中,sumSquares
函數(shù)調(diào)用的兩個(gè)實(shí)參的請求也是并發(fā)進(jìn)行的。 和上例不同的是longTimeRequest
函數(shù)接收一個(gè)單向發(fā)送通道類型參數(shù)而不是返回一個(gè)單向接收通道結(jié)果。
package main
import (
"time"
"math/rand"
"fmt"
)
func longTimeRequest(r chan<- int32) {
time.Sleep(time.Second * 3) // 模擬一個(gè)工作負(fù)載
r <- rand.Int31n(100)
}
func sumSquares(a, b int32) int32 {
return a*a + b*b
}
func main() {
rand.Seed(time.Now().UnixNano())
ra, rb := make(chan int32), make(chan int32)
go longTimeRequest(ra)
go longTimeRequest(rb)
fmt.Println(sumSquares(<-ra, <-rb))
}
對于上面這個(gè)特定的例子,我們可以只使用一個(gè)通道來接收回應(yīng)結(jié)果,因?yàn)閮蓚€(gè)參數(shù)的作用是對等的。
...
results := make(chan int32, 2) // 緩沖與否不重要
go longTimeRequest(results)
go longTimeRequest(results)
fmt.Println(sumSquares(<-results, <-results))
}
這可以看作是后面將要提到的數(shù)據(jù)聚合的一個(gè)應(yīng)用。
本用例可以看作是上例中只使用一個(gè)通道變種的增強(qiáng)。
有時(shí)候,一份數(shù)據(jù)可能同時(shí)從多個(gè)數(shù)據(jù)源獲取。這些數(shù)據(jù)源將返回相同的數(shù)據(jù)。 因?yàn)楦鞣N因素,這些數(shù)據(jù)源的回應(yīng)速度參差不一,甚至某個(gè)特定數(shù)據(jù)源的多次回應(yīng)速度之間也可能相差很大。 同時(shí)從多個(gè)數(shù)據(jù)源獲取一份相同的數(shù)據(jù)可以有效保障低延遲。我們只需采用最快的回應(yīng)并舍棄其它較慢回應(yīng)。
注意:如果有N個(gè)數(shù)據(jù)源,為了防止被舍棄的回應(yīng)對應(yīng)的協(xié)程永久阻塞,則傳輸數(shù)據(jù)用的通道必須為一個(gè)容量至少為N-1的緩沖通道。
package main
import (
"fmt"
"time"
"math/rand"
)
func source(c chan<- int32) {
ra, rb := rand.Int31(), rand.Intn(3) + 1
// 睡眠1秒/2秒/3秒
time.Sleep(time.Duration(rb) * time.Second)
c <- ra
}
func main() {
rand.Seed(time.Now().UnixNano())
startTime := time.Now()
c := make(chan int32, 5) // 必須用一個(gè)緩沖通道
for i := 0; i < cap(c); i++ {
go source(c)
}
rnd := <- c // 只有第一個(gè)回應(yīng)被使用了
fmt.Println(time.Since(startTime))
fmt.Println(rnd)
}
“采用最快回應(yīng)”用例還有一些其它實(shí)現(xiàn)方式,本文后面將會談及。
做為函數(shù)參數(shù)和返回結(jié)果使用的通道可以是緩沖的,從而使得請求協(xié)程不需阻塞到它所發(fā)送的數(shù)據(jù)被接收為止。
有時(shí),一個(gè)請求可能并不保證返回一份有效的數(shù)據(jù)。對于這種情形,我們可以使用一個(gè)形如struct{v T; err error}
的結(jié)構(gòu)體類型或者一個(gè)空接口類型做為通道的元素類型以用來區(qū)分回應(yīng)的值是否有效。
有時(shí),一個(gè)請求可能需要比預(yù)期更長的用時(shí)才能回應(yīng),甚至永遠(yuǎn)都得不到回應(yīng)。 我們可以使用本文后面將要介紹的超時(shí)機(jī)制來應(yīng)對這樣的情況。
有時(shí),回應(yīng)方可能會不斷地返回一系列值,這也同時(shí)屬于后面將要介紹的數(shù)據(jù)流的一個(gè)用例。
通知可以被看作是特殊的請求/回應(yīng)用例。在一個(gè)通知用例中,我們并不關(guān)心回應(yīng)的值,我們只關(guān)心回應(yīng)是否已發(fā)生。 所以我們常常使用空結(jié)構(gòu)體類型struct{}
來做為通道的元素類型,因?yàn)榭战Y(jié)構(gòu)體類型的尺寸為零,能夠節(jié)省一些內(nèi)存(雖然常常很少量)。
我們已知道,如果一個(gè)通道中無值可接收,則此通道上的下一個(gè)接收操作將阻塞到另一個(gè)協(xié)程發(fā)送一個(gè)值到此通道為止。 所以一個(gè)協(xié)程可以向此通道發(fā)送一個(gè)值來通知另一個(gè)等待著從此通道接收數(shù)據(jù)的協(xié)程。
在下面這個(gè)例子中,通道done
被用來做為一個(gè)信號通道來實(shí)現(xiàn)單對單通知。
package main
import (
"crypto/rand"
"fmt"
"os"
"sort"
)
func main() {
values := make([]byte, 32 * 1024 * 1024)
if _, err := rand.Read(values); err != nil {
fmt.Println(err)
os.Exit(1)
}
done := make(chan struct{}) // 也可以是緩沖的
// 排序協(xié)程
go func() {
sort.Slice(values, func(i, j int) bool {
return values[i] < values[j]
})
done <- struct{}{} // 通知排序已完成
}()
// 并發(fā)地做一些其它事情...
<- done // 等待通知
fmt.Println(values[0], values[len(values)-1])
}
如果一個(gè)通道的數(shù)據(jù)緩沖隊(duì)列已滿(非緩沖的通道的數(shù)據(jù)緩沖隊(duì)列總是滿的)但它的發(fā)送協(xié)程隊(duì)列為空,則向此通道發(fā)送一個(gè)值將阻塞,直到另外一個(gè)協(xié)程從此通道接收一個(gè)值為止。 所以我們可以通過從一個(gè)通道接收數(shù)據(jù)來實(shí)現(xiàn)單對單通知。一般我們使用非緩沖通道來實(shí)現(xiàn)這樣的通知。
這種通知方式不如上例中介紹的方式使用得廣泛。
package main
import (
"fmt"
"time"
)
func main() {
done := make(chan struct{})
// 此信號通道也可以緩沖為1。如果這樣,則在下面
// 這個(gè)協(xié)程創(chuàng)建之前,我們必須向其中寫入一個(gè)值。
go func() {
fmt.Print("Hello")
// 模擬一個(gè)工作負(fù)載。
time.Sleep(time.Second * 2)
// 使用一個(gè)接收操作來通知主協(xié)程。
<- done
}()
done <- struct{}{} // 阻塞在此,等待通知
fmt.Println(" world!")
}
另一個(gè)事實(shí)是,上面的兩種單對單通知方式其實(shí)并沒有本質(zhì)的區(qū)別。 它們都可以被概括為較快者等待較慢者發(fā)出通知。
略微擴(kuò)展一下上面兩個(gè)用例,我們可以很輕松地實(shí)現(xiàn)多對單和單對多通知。
package main
import "log"
import "time"
type T = struct{}
func worker(id int, ready <-chan T, done chan<- T) {
<-ready // 阻塞在此,等待通知
log.Print("Worker#", id, "開始工作")
// 模擬一個(gè)工作負(fù)載。
time.Sleep(time.Second * time.Duration(id+1))
log.Print("Worker#", id, "工作完成")
done <- T{} // 通知主協(xié)程(N-to-1)
}
func main() {
log.SetFlags(0)
ready, done := make(chan T), make(chan T)
go worker(0, ready, done)
go worker(1, ready, done)
go worker(2, ready, done)
// 模擬一個(gè)初始化過程
time.Sleep(time.Second * 3 / 2)
// 單對多通知
ready <- T{}; ready <- T{}; ready <- T{}
// 等待被多對單通知
<-done; <-done; <-done
}
事實(shí)上,上例中展示的多對單和單對多通知實(shí)現(xiàn)方式在實(shí)踐中用的并不多。 在實(shí)踐中,我們多使用sync.WaitGroup
來實(shí)現(xiàn)多對單通知,使用關(guān)閉一個(gè)通道的方式來實(shí)現(xiàn)單對多通知(詳見下一個(gè)用例)。
上一個(gè)用例中的單對多通知實(shí)現(xiàn)在實(shí)踐中很少用,因?yàn)橥ㄟ^關(guān)閉一個(gè)通道的方式在來實(shí)現(xiàn)單對多通知的方式更簡單。 我們已經(jīng)知道,從一個(gè)已關(guān)閉的通道可以接收到無窮個(gè)值,我們可以利用這一特性來實(shí)現(xiàn)群發(fā)通知。
我們可以把上一個(gè)例子中的三個(gè)數(shù)據(jù)發(fā)送操作ready <- struct{}{}
替換為一個(gè)通道關(guān)閉操作close(ready)
來達(dá)到同樣的單對多通知效果。
...
close(ready) // 群發(fā)通知
...
當(dāng)然,我們也可以通過關(guān)閉一個(gè)通道來實(shí)現(xiàn)單對單通知。事實(shí)上,關(guān)閉通道是實(shí)踐中用得最多通知實(shí)現(xiàn)方式。
從一個(gè)已關(guān)閉的通道可以接收到無窮個(gè)值這一特性也將被用在很多其它在后面將要介紹的用例中。 實(shí)際上,這一特性被廣泛地使用于標(biāo)準(zhǔn)庫包中。比如,context
標(biāo)準(zhǔn)庫包使用了此特性來傳達(dá)操作取消消息。
用通道實(shí)現(xiàn)一個(gè)一次性的定時(shí)通知器是很簡單的。 下面是一個(gè)自定義實(shí)現(xiàn):
package main
import (
"fmt"
"time"
)
func AfterDuration(d time.Duration) <- chan struct{} {
c := make(chan struct{}, 1)
go func() {
time.Sleep(d)
c <- struct{}{}
}()
return c
}
func main() {
fmt.Println("Hi!")
<- AfterDuration(time.Second)
fmt.Println("Hello!")
<- AfterDuration(time.Second)
fmt.Println("Bye!")
}
事實(shí)上,time
標(biāo)準(zhǔn)庫包中的After
函數(shù)提供了和上例中AfterDuration
同樣的功能。 在實(shí)踐中,我們應(yīng)該盡量使用time.After
函數(shù)以使代碼看上去更干凈。
注意,操作<-time.After(aDuration)
將使當(dāng)前協(xié)程進(jìn)入阻塞狀態(tài),而一個(gè)time.Sleep(aDuration)
函數(shù)調(diào)用不會如此。
<-time.After(aDuration)
經(jīng)常被使用在后面將要介紹的超時(shí)機(jī)制實(shí)現(xiàn)中。
上面的某個(gè)例子提到了容量為1的緩沖通道可以用做一次性二元信號量。 事實(shí)上,容量為1的緩沖通道也可以用做多次性二元信號量(即互斥鎖)盡管這樣的互斥鎖效率不如sync
標(biāo)準(zhǔn)庫包中提供的互斥鎖高效。
有兩種方式將一個(gè)容量為1的緩沖通道用做互斥鎖:
下面是一個(gè)通過發(fā)送操作來加鎖的例子。
package main
import "fmt"
func main() {
mutex := make(chan struct{}, 1) // 容量必須為1
counter := 0
increase := func() {
mutex <- struct{}{} // 加鎖
counter++
<-mutex // 解鎖
}
increase1000 := func(done chan<- struct{}) {
for i := 0; i < 1000; i++ {
increase()
}
done <- struct{}{}
}
done := make(chan struct{})
go increase1000(done)
go increase1000(done)
<-done; <-done
fmt.Println(counter) // 2000
}
下面是一個(gè)通過接收操作來加鎖的例子,其中只顯示了相對于上例而修改了的部分。
...
func main() {
mutex := make(chan struct{}, 1)
mutex <- struct{}{} // 此行是必需的
counter := 0
increase := func() {
<-mutex // 加鎖
counter++
mutex <- struct{}{} // 解鎖
}
...
緩沖通道可以被用做計(jì)數(shù)信號量。 計(jì)數(shù)信號量可以被視為多主鎖。如果一個(gè)緩沖通道的容量為N
,那么它可以被看作是一個(gè)在任何時(shí)刻最多可有N
個(gè)主人的鎖。 上面提到的二元信號量是特殊的計(jì)數(shù)信號量,每個(gè)二元信號量在任一時(shí)刻最多只能有一個(gè)主人。
計(jì)數(shù)信號量經(jīng)常被使用于限制最大并發(fā)數(shù)。
和將通道用做互斥鎖一樣,也有兩種方式用來獲取一個(gè)用做計(jì)數(shù)信號量的通道的一份所有權(quán)。
下面是一個(gè)通過接收操作來獲取所有權(quán)的例子:
package main
import (
"log"
"time"
"math/rand"
)
type Seat int
type Bar chan Seat
func (bar Bar) ServeCustomer(c int) {
log.Print("顧客#", c, "進(jìn)入酒吧")
seat := <- bar // 需要一個(gè)位子來喝酒
log.Print("++ customer#", c, " drinks at seat#", seat)
log.Print("++ 顧客#", c, "在第", seat, "個(gè)座位開始飲酒")
time.Sleep(time.Second * time.Duration(2 + rand.Intn(6)))
log.Print("-- 顧客#", c, "離開了第", seat, "個(gè)座位")
bar <- seat // 釋放座位,離開酒吧
}
func main() {
rand.Seed(time.Now().UnixNano())
bar24x7 := make(Bar, 10) // 此酒吧有10個(gè)座位
// 擺放10個(gè)座位。
for seatId := 0; seatId < cap(bar24x7); seatId++ {
bar24x7 <- Seat(seatId) // 均不會阻塞
}
for customerId := 0; ; customerId++ {
time.Sleep(time.Second)
go bar24x7.ServeCustomer(customerId)
}
for {time.Sleep(time.Second)} // 睡眠不屬于阻塞狀態(tài)
}
在上例中,只有獲得一個(gè)座位的顧客才能開始飲酒。 所以在任一時(shí)刻同時(shí)在喝酒的顧客數(shù)不會超過座位數(shù)10。
上例main
函數(shù)中的最后一行for
循環(huán)是為了防止程序退出。 后面將介紹一種更好的實(shí)現(xiàn)此目的的方法。
在上例中,盡管在任一時(shí)刻同時(shí)在喝酒的顧客數(shù)不會超過座位數(shù)10,但是在某一時(shí)刻可能有多于10個(gè)顧客進(jìn)入了酒吧,因?yàn)槟承╊櫩驮谂抨?duì)等位子。 在上例中,每個(gè)顧客對應(yīng)著一個(gè)協(xié)程。雖然協(xié)程的開銷比系統(tǒng)線程小得多,但是如果協(xié)程的數(shù)量很多,則它們的總體開銷還是不能忽略不計(jì)的。 所以,最好當(dāng)有空位的時(shí)候才創(chuàng)建顧客協(xié)程。
... // 省略了和上例相同的代碼
func (bar Bar) ServeCustomerAtSeat(c int, seat Seat) {
log.Print("++ 顧客#", c, "在第", seat, "個(gè)座位開始飲酒")
time.Sleep(time.Second * time.Duration(2 + rand.Intn(6)))
log.Print("-- 顧客#", c, "離開了第", seat, "個(gè)座位")
bar <- seat // 釋放座位,離開酒吧
}
func main() {
rand.Seed(time.Now().UnixNano())
bar24x7 := make(Bar, 10)
for seatId := 0; seatId < cap(bar24x7); seatId++ {
bar24x7 <- Seat(seatId)
}
// 這個(gè)for循環(huán)和上例不一樣。
for customerId := 0; ; customerId++ {
time.Sleep(time.Second)
seat := <- bar24x7 // 需要一個(gè)空位招待顧客
go bar24x7.ServeCustomerAtSeat(customerId, seat)
}
for {time.Sleep(time.Second)}
}
在上面這個(gè)修改后的例子中,在任一時(shí)刻最多只有10個(gè)顧客協(xié)程在運(yùn)行(但是在程序的生命期內(nèi),仍舊會有大量的顧客協(xié)程不斷被創(chuàng)建和銷毀)。
在下面這個(gè)更加高效的實(shí)現(xiàn)中,在程序的生命期內(nèi)最多只會有10個(gè)顧客協(xié)程被創(chuàng)建出來。
... // 省略了和上例相同的代碼
func (bar Bar) ServeCustomerAtSeat(consumers chan int) {
for c := range consumers {
seatId := <- bar
log.Print("++ 顧客#", c, "在第", seatId, "個(gè)座位開始飲酒")
time.Sleep(time.Second * time.Duration(2 + rand.Intn(6)))
log.Print("-- 顧客#", c, "離開了第", seatId, "個(gè)座位")
bar <- seatId // 釋放座位,離開酒吧
}
}
func main() {
rand.Seed(time.Now().UnixNano())
bar24x7 := make(Bar, 10)
for seatId := 0; seatId < cap(bar24x7); seatId++ {
bar24x7 <- Seat(seatId)
}
consumers := make(chan int)
for i := 0; i < cap(bar24x7); i++ {
go bar24x7.ServeCustomerAtSeat(consumers)
}
for customerId := 0; ; customerId++ {
time.Sleep(time.Second)
consumers <- customerId
}
}
題外話:當(dāng)然,如果我們并不關(guān)心座位號(這種情況在編程實(shí)踐中很常見),則實(shí)際上bar24x7
計(jì)數(shù)信號量是完全不需要的:
... // 省略了和上例相同的代碼
func ServeCustomer(consumers chan int) {
for c := range consumers {
log.Print("++ 顧客#", c, "開始在酒吧飲酒")
time.Sleep(time.Second * time.Duration(2 + rand.Intn(6)))
log.Print("-- 顧客#", c, "離開了酒吧")
}
}
func main() {
rand.Seed(time.Now().UnixNano())
const BarSeatCount = 10
consumers := make(chan int)
for i := 0; i < BarSeatCount; i++ {
go ServeCustomer(consumers)
}
for customerId := 0; ; customerId++ {
time.Sleep(time.Second)
consumers <- customerId
}
}
通過發(fā)送操作來獲取所有權(quán)的實(shí)現(xiàn)相對簡單一些,省去了擺放座位的步驟。
package main
import (
"log"
"time"
"math/rand"
)
type Customer struct{id int}
type Bar chan Customer
func (bar Bar) ServeCustomer(c Customer) {
log.Print("++ 顧客#", c.id, "開始飲酒")
time.Sleep(time.Second * time.Duration(3 + rand.Intn(16)))
log.Print("-- 顧客#", c.id, "離開酒吧")
<- bar // 離開酒吧,騰出位子
}
func main() {
rand.Seed(time.Now().UnixNano())
bar24x7 := make(Bar, 10) // 最對同時(shí)服務(wù)10位顧客
for customerId := 0; ; customerId++ {
time.Sleep(time.Second * 2)
customer := Customer{customerId}
bar24x7 <- customer // 等待進(jìn)入酒吧
go bar24x7.ServeCustomer(customer)
}
for {time.Sleep(time.Second)}
}
兩個(gè)協(xié)程可以通過一個(gè)通道進(jìn)行對話,整個(gè)過程宛如打乒乓球一樣。 下面是一個(gè)這樣的例子,它將打印出一系列斐波那契(Fibonacci)數(shù)。
package main
import "fmt"
import "time"
import "os"
type Ball uint64
func Play(playerName string, table chan Ball) {
var lastValue Ball = 1
for {
ball := <- table // 接球
fmt.Println(playerName, ball)
ball += lastValue
if ball < lastValue { // 溢出結(jié)束
os.Exit(0)
}
lastValue = ball
table <- ball // 回球
time.Sleep(time.Second)
}
}
func main() {
table := make(chan Ball)
go func() {
table <- 1 // (裁判)發(fā)球
}()
go Play("A:", table)
Play("B:", table)
}
一個(gè)通道類型的元素類型可以是另一個(gè)通道類型。 在下面這個(gè)例子中, 單向發(fā)送通道類型chan<- int
是另一個(gè)通道類型chan chan<- int
的元素類型。
package main
import "fmt"
var counter = func (n int) chan<- chan<- int {
requests := make(chan chan<- int)
go func() {
for request := range requests {
if request == nil {
n++ // 遞增計(jì)數(shù)
} else {
request <- n // 返回當(dāng)前計(jì)數(shù)
}
}
}()
return requests // 隱式轉(zhuǎn)換到類型chan<- (chan<- int)
}(0)
func main() {
increase1000 := func(done chan<- struct{}) {
for i := 0; i < 1000; i++ {
counter <- nil
}
done <- struct{}{}
}
done := make(chan struct{})
go increase1000(done)
go increase1000(done)
<-done; <-done
request := make(chan int, 1)
counter <- request
fmt.Println(<-request) // 2000
}
盡管對于上面這個(gè)用例來說,使用通道傳送傳輸通道這種方式并非是最有效的實(shí)現(xiàn)方式,但是這種方式肯定有最適合它的用武之地。
我們可以使用內(nèi)置函數(shù)cap
和len
來查看一個(gè)通道的容量和當(dāng)前長度。 但是在實(shí)踐中我們很少這樣做。我們很少使用內(nèi)置函數(shù)cap
的原因是一個(gè)通道的容量常常是已知的或者不重要的。 我們很少使用內(nèi)置函數(shù)len
的原因是一個(gè)len
調(diào)用的結(jié)果并不能總能準(zhǔn)確地反映出的一個(gè)通道的當(dāng)前長度。
但有時(shí)確實(shí)有一些場景需要調(diào)用這兩個(gè)函數(shù)。比如,有時(shí)一個(gè)協(xié)程欲將一個(gè)未關(guān)閉的并且不會再向其中發(fā)送數(shù)據(jù)的緩沖通道中的所有數(shù)據(jù)接收出來,在確保只有此一個(gè)協(xié)程從此通道接收數(shù)據(jù)的情況下,我們可以用下面的代碼來實(shí)現(xiàn)之:
for len(c) > 0 {
value := <-c
// 使用value ...
}
我們也可以用本文后面將要介紹的嘗試接收機(jī)制來實(shí)現(xiàn)這一需求。兩者的運(yùn)行效率差不多,但嘗試接收機(jī)制的優(yōu)點(diǎn)是多個(gè)協(xié)程可以并發(fā)地進(jìn)行讀取操作。
有時(shí)一個(gè)協(xié)程欲將一個(gè)緩沖通道寫滿而又不阻塞,在確保只有此一個(gè)協(xié)程向此通道發(fā)送數(shù)據(jù)的情況下,我們可以用下面的代碼實(shí)現(xiàn)這一目的:
for len(c) < cap(c) {
c <- aValue
}
當(dāng)然,我們也可以使用后面將要介紹的嘗試發(fā)送機(jī)制來實(shí)現(xiàn)這一需求。
Go中的選擇機(jī)制(select)是一個(gè)非常獨(dú)特的特性。它給并發(fā)編程帶來了很多新的模式和技巧。
我們可以用一個(gè)無分支的select
流程控制代碼塊使當(dāng)前協(xié)程永久處于阻塞狀態(tài)。 這是select
流程控制的最簡單的應(yīng)用。 事實(shí)上,上面很多例子中的for {time.Sleep(time.Second)}
都可以換為select{}
。
一般,select{}
用在主協(xié)程中以防止程序退出。
一個(gè)例子:
package main
import "runtime"
func DoSomething() {
for {
// 做點(diǎn)什么...
runtime.Gosched() // 防止本協(xié)程霸占CPU不放
}
}
func main() {
go DoSomething()
go DoSomething()
select{}
}
順便說一句,另外還有一些使當(dāng)前協(xié)程永久阻塞的方法,但是select{}
是最簡單的方法。
含有一個(gè)default
分支和一個(gè)case
分支的select
代碼塊可以被用做一個(gè)嘗試發(fā)送或者嘗試接收操作,取決于case
關(guān)鍵字后跟隨的是一個(gè)發(fā)送操作還是一個(gè)接收操作。
case
關(guān)鍵字后跟隨的是一個(gè)發(fā)送操作,則此select
代碼塊為一個(gè)嘗試發(fā)送操作。
如果case
分支的發(fā)送操作是阻塞的,則default
分支將被執(zhí)行,發(fā)送失?。环駝t發(fā)送成功,case
分支得到執(zhí)行。
case
關(guān)鍵字后跟隨的是一個(gè)接收操作,則此select
代碼塊為一個(gè)嘗試接收操作。
如果case
分支的接收操作是阻塞的,則default
分支將被執(zhí)行,接收失??;否則接收成功,case
分支得到執(zhí)行。
嘗試發(fā)送和嘗試接收代碼塊永不阻塞。
標(biāo)準(zhǔn)編譯器對嘗試發(fā)送和嘗試接收代碼塊做了特別的優(yōu)化,使得它們的執(zhí)行效率比多case
分支的普通select
代碼塊執(zhí)行效率高得多。
下例演示了嘗試發(fā)送和嘗試接收代碼塊的工作原理。
package main
import "fmt"
func main() {
type Book struct{id int}
bookshelf := make(chan Book, 3)
for i := 0; i < cap(bookshelf) * 2; i++ {
select {
case bookshelf <- Book{id: i}:
fmt.Println("成功將書放在書架上", i)
default:
fmt.Println("書架已經(jīng)被占滿了")
}
}
for i := 0; i < cap(bookshelf) * 2; i++ {
select {
case book := <-bookshelf:
fmt.Println("成功從書架上取下一本書", book.id)
default:
fmt.Println("書架上已經(jīng)沒有書了")
}
}
}
輸出結(jié)果:
成功將書放在書架上 0
成功將書放在書架上 1
成功將書放在書架上 2
書架已經(jīng)被占滿了
書架已經(jīng)被占滿了
書架已經(jīng)被占滿了
成功從書架上取下一本書 0
成功從書架上取下一本書 1
成功從書架上取下一本書 2
書架上已經(jīng)沒有書了
書架上已經(jīng)沒有書了
書架上已經(jīng)沒有書了
后面的很多用例還要用到嘗試發(fā)送和嘗試接收代碼塊。
假設(shè)我們可以保證沒有任何協(xié)程會向一個(gè)通道發(fā)送數(shù)據(jù),則我們可以使用下面的代碼來(并發(fā)安全地)檢查此通道是否已經(jīng)關(guān)閉,此檢查不會阻塞當(dāng)前協(xié)程。
func IsClosed(c chan T) bool {
select {
case <-c:
return true
default:
}
return false
}
此方法常用來查看某個(gè)期待中的通知是否已經(jīng)來臨。此通知將由另一個(gè)協(xié)程通過關(guān)閉一個(gè)通道來發(fā)送。
將通道用做計(jì)數(shù)信號量用例和通道嘗試(發(fā)送或者接收)操作結(jié)合起來可用實(shí)現(xiàn)峰值限制。 峰值限制的目的是防止過大的并發(fā)請求數(shù)。
下面是對將通道用做計(jì)數(shù)信號量一節(jié)中的最后一個(gè)例子的簡單修改,從而使得顧客不再等待而是離去或者尋找其它酒吧。
...
bar24x7 := make(Bar, 10) // 此酒吧只能同時(shí)招待10個(gè)顧客
for customerId := 0; ; customerId++ {
time.Sleep(time.Second)
consumer := Consumer{customerId}
select {
case bar24x7 <- consumer: // 試圖進(jìn)入此酒吧
go bar24x7.ServeConsumer(consumer)
default:
log.Print("顧客#", customerId, "不愿等待而離去")
}
}
...
在上面的“采用最快回應(yīng)”用例一節(jié)已經(jīng)提到,我們也可以使用選擇機(jī)制來實(shí)現(xiàn)“采用最快回應(yīng)”用例。 每個(gè)數(shù)據(jù)源協(xié)程只需使用一個(gè)緩沖為1的通道并向其嘗試發(fā)送回應(yīng)數(shù)據(jù)即可。示例代碼如下:
package main
import (
"fmt"
"math/rand"
"time"
)
func source(c chan<- int32) {
ra, rb := rand.Int31(), rand.Intn(3)+1
// 休眠1秒/2秒/3秒
time.Sleep(time.Duration(rb) * time.Second)
select {
case c <- ra:
default:
}
}
func main() {
rand.Seed(time.Now().UnixNano())
c := make(chan int32, 1) // 此通道容量必須至少為1
for i := 0; i < 5; i++ {
go source(c)
}
rnd := <-c // 只采用第一個(gè)成功發(fā)送的回應(yīng)數(shù)據(jù)
fmt.Println(rnd)
}
注意,使用選擇機(jī)制來實(shí)現(xiàn)“采用最快回應(yīng)”的代碼中使用的通道的容量必須至少為1,以保證最快回應(yīng)總能夠發(fā)送成功。 否則,如果數(shù)據(jù)請求者因?yàn)榉N種原因未及時(shí)準(zhǔn)備好接收,則所有回應(yīng)者的嘗試發(fā)送都將失敗,從而所有回應(yīng)的數(shù)據(jù)都將被錯(cuò)過。
如果一個(gè)“采用最快回應(yīng)”用例中的數(shù)據(jù)源的數(shù)量很少,比如兩個(gè)或三個(gè),我們可以讓每個(gè)數(shù)據(jù)源使用一個(gè)單獨(dú)的緩沖通道來回應(yīng)數(shù)據(jù),然后使用一個(gè)select
代碼塊來同時(shí)接收這三個(gè)通道。 示例代碼如下:
package main
import (
"fmt"
"math/rand"
"time"
)
func source() <-chan int32 {
c := make(chan int32, 1) // 必須為一個(gè)緩沖通道
go func() {
ra, rb := rand.Int31(), rand.Intn(3)+1
time.Sleep(time.Duration(rb) * time.Second)
c <- ra
}()
return c
}
func main() {
rand.Seed(time.Now().UnixNano())
var rnd int32
// 阻塞在此直到某個(gè)數(shù)據(jù)源率先回應(yīng)。
select{
case rnd = <-source():
case rnd = <-source():
case rnd = <-source():
}
fmt.Println(rnd)
}
注意:如果上例中使用的通道是非緩沖的,未被選中的case
分支對應(yīng)的兩個(gè)source
函數(shù)調(diào)用中開辟的協(xié)程將處于永久阻塞狀態(tài),從而造成內(nèi)存泄露。
本小節(jié)和上一小節(jié)中展示的兩種方法也可以用來實(shí)現(xiàn)多對單通知。
在一些請求/回應(yīng)用例中,一個(gè)請求可能因?yàn)榉N種原因?qū)е滦枰鲱A(yù)期的時(shí)長才能得到回應(yīng),有時(shí)甚至永遠(yuǎn)得不到回應(yīng)。 對于這樣的情形,我們可以使用一個(gè)超時(shí)方案給請求者返回一個(gè)錯(cuò)誤信息。 使用選擇機(jī)制可以很輕松地實(shí)現(xiàn)這樣的一個(gè)超時(shí)方案。
下面這個(gè)例子展示了如何實(shí)現(xiàn)一個(gè)支持超時(shí)設(shè)置的請求:
func requestWithTimeout(timeout time.Duration) (int, error) {
c := make(chan int)
go doRequest(c) // 可能需要超出預(yù)期的時(shí)長回應(yīng)
select {
case data := <-c:
return data, nil
case <-time.After(timeout):
return 0, errors.New("超時(shí)了!")
}
}
我們可以使用嘗試發(fā)送操作來實(shí)現(xiàn)一個(gè)每隔一定時(shí)間發(fā)送一個(gè)信號的脈搏器。
package main
import "fmt"
import "time"
func Tick(d time.Duration) <-chan struct{} {
c := make(chan struct{}, 1) // 容量最好為1
go func() {
for {
time.Sleep(d)
select {
case c <- struct{}{}:
default:
}
}
}()
return c
}
func main() {
t := time.Now()
for range Tick(time.Second) {
fmt.Println(time.Since(t))
}
}
事實(shí)上,time
標(biāo)準(zhǔn)庫包中的Tick
函數(shù)提供了同樣的功能,但效率更高。 我們應(yīng)該盡量使用標(biāo)準(zhǔn)庫包中的實(shí)現(xiàn)。
上面已經(jīng)展示了如何使用嘗試發(fā)送實(shí)現(xiàn)峰值限制。 同樣地,我們也可以使用使用嘗試機(jī)制來實(shí)現(xiàn)速率限制,但需要前面剛提到的定時(shí)器實(shí)現(xiàn)的配合。 速率限制常用來限制吞吐和確保在一段時(shí)間內(nèi)的資源使用不會超標(biāo)。
下面的例子借鑒了官方Go維基中的例子。 在此例中,任何一分鐘時(shí)段內(nèi)處理的請求數(shù)不會超過200。
package main
import "fmt"
import "time"
type Request interface{}
func handle(r Request) {fmt.Println(r.(int))}
const RateLimitPeriod = time.Minute
const RateLimit = 200 // 任何一分鐘內(nèi)最多處理200個(gè)請求
func handleRequests(requests <-chan Request) {
quotas := make(chan time.Time, RateLimit)
go func() {
tick := time.NewTicker(RateLimitPeriod / RateLimit)
defer tick.Stop()
for t := range tick.C {
select {
case quotas <- t:
default:
}
}
}()
for r := range requests {
<-quotas
go handle(r)
}
}
func main() {
requests := make(chan Request)
go handleRequests(requests)
// time.Sleep(time.Minute)
for i := 0; ; i++ {requests <- i}
}
上例的代碼雖然可以保證任何一分鐘時(shí)段內(nèi)處理的請求數(shù)不會超過200,但是如果在開始的一分鐘內(nèi)沒有任何請求,則接下來的某個(gè)瞬時(shí)時(shí)間點(diǎn)可能會同時(shí)處理最多200個(gè)請求(試著將time.Sleep
行的注釋去掉看看)。 這可能會造成卡頓情況。我們可以將速率限制和峰值限制一并使用來避免出現(xiàn)這樣的情況。
通道一文提到了向一個(gè)nil通道發(fā)送數(shù)據(jù)或者從中接收數(shù)據(jù)都屬于阻塞操作。 利用這一事實(shí),我們可以將一個(gè)select
流程控制中的case
操作中涉及的通道設(shè)置為不同的值,以使此select
流程控制選擇執(zhí)行不同的分支。
下面是另一個(gè)乒乓模擬游戲的實(shí)現(xiàn)。此實(shí)現(xiàn)使用了選擇機(jī)制。在此例子中,兩個(gè)case
操作中的通道有且只有一個(gè)為nil,所以只能是不為nil的通道對應(yīng)的分支被選中。 每個(gè)循環(huán)步將對調(diào)這兩個(gè)case
操作中的通道,從而改變兩個(gè)分支的可被選中狀態(tài)。
package main
import "fmt"
import "time"
import "os"
type Ball uint8
func Play(playerName string, table chan Ball, serve bool) {
var receive, send chan Ball
if serve {
receive, send = nil, table
} else {
receive, send = table, nil
}
var lastValue Ball = 1
for {
select {
case send <- lastValue:
case value := <- receive:
fmt.Println(playerName, value)
value += lastValue
if value < lastValue { // 溢出了
os.Exit(0)
}
lastValue = value
}
receive, send = send, receive // 開關(guān)切換
time.Sleep(time.Second)
}
}
func main() {
table := make(chan Ball)
go Play("A:", table, false)
Play("B:", table, true)
}
下面是另一個(gè)也展示了開關(guān)效果的但簡單得多的(非并發(fā)的)小例子。 此程序?qū)⒉粩啻蛴〕?code>1212...。 它在實(shí)踐中沒有太多實(shí)用價(jià)值,這里只是為了學(xué)習(xí)的目的才展示之。
package main
import "fmt"
import "time"
func main() {
for c := make(chan struct{}, 1); true; {
select {
case c <- struct{}{}:
fmt.Print("1")
case <-c:
fmt.Print("2")
}
time.Sleep(time.Second)
}
}
我們可以通過在一個(gè)select
流程控制中使用重復(fù)的case
操作來增加對應(yīng)分支中的代碼的執(zhí)行幾率。
一個(gè)例子:
package main
import "fmt"
func main() {
foo, bar := make(chan struct{}), make(chan struct{})
close(foo); close(bar) // 僅為演示目的
x, y := 0.0, 0.0
f := func(){x++}
g := func(){y++}
for i := 0; i < 100000; i++ {
select {
case <-foo: f()
case <-foo: f()
case <-bar: g()
}
}
fmt.Println(x/y) // 大致為2
}
在上面這個(gè)例子中,函數(shù)f
的調(diào)用執(zhí)行幾率大致為函數(shù)g
的兩倍。
每個(gè)select
控制流程中的分支數(shù)量在運(yùn)行中是固定的,但是我們可以使用reflect
標(biāo)準(zhǔn)庫包中提供的功能在運(yùn)行時(shí)刻來構(gòu)建動態(tài)分支數(shù)量的select
控制流程。 但是請注意:一個(gè)select
控制流程中的分支越多,此select
控制流程的執(zhí)行效率就越低(這是我們常常只使用不多于三個(gè)分支的select
控制流程的原因)。
reflect
標(biāo)準(zhǔn)庫包中也提供了模擬嘗試發(fā)送和嘗試接收代碼塊的TrySend
和TryRecv
函數(shù)。
本節(jié)將介紹一些使用通道進(jìn)行數(shù)據(jù)流處理的用例。
一般來說,一個(gè)數(shù)據(jù)流處理程序由多個(gè)模塊組成。不同的模塊執(zhí)行分配給它們的不同的任務(wù)。 每個(gè)模塊由一個(gè)或者數(shù)個(gè)并行工作的協(xié)程組成。實(shí)踐中常見的工作任務(wù)包括:
一個(gè)模塊中的工作協(xié)程從一些其它模塊接收數(shù)據(jù)做為輸入,并向另一些模塊發(fā)送輸出數(shù)據(jù)。 換句話數(shù),一個(gè)模塊可能同時(shí)兼任數(shù)據(jù)消費(fèi)者和數(shù)據(jù)產(chǎn)生者的角色。
多個(gè)模塊一起組成了一個(gè)數(shù)據(jù)流處理系統(tǒng)。
下面將展示一些模塊工作協(xié)程的實(shí)現(xiàn)。這些實(shí)現(xiàn)僅僅是為了解釋目的,所以它們都很簡單,并且它們可能并不高效。
一個(gè)數(shù)據(jù)產(chǎn)生者可能通過以下途徑生成數(shù)據(jù):
這里,我們使用一個(gè)隨機(jī)數(shù)產(chǎn)生器做為一個(gè)數(shù)據(jù)產(chǎn)生者的例子。 此數(shù)據(jù)產(chǎn)生者函數(shù)沒有輸入,只有輸出。
import (
"crypto/rand"
"encoding/binary"
)
func RandomGenerator() <-chan uint64 {
c := make(chan uint64)
go func() {
rnds := make([]byte, 8)
for {
_, err := rand.Read(rnds)
if err != nil {
close(c)
break
}
c <- binary.BigEndian.Uint64(rnds)
}
}()
return c
}
事實(shí)上,此隨機(jī)數(shù)產(chǎn)生器是一個(gè)多返回值的future/promise。
一個(gè)數(shù)據(jù)產(chǎn)生者可以在任何時(shí)刻關(guān)閉返回的通道以結(jié)束數(shù)據(jù)生成。
一個(gè)數(shù)據(jù)聚合模塊的工作協(xié)程將多個(gè)數(shù)據(jù)流合為一個(gè)數(shù)據(jù)流。 假設(shè)數(shù)據(jù)類型為int64
,下面這個(gè)函數(shù)將任意數(shù)量的數(shù)據(jù)流合為一個(gè)。
func Aggregator(inputs ...<-chan uint64) <-chan uint64 {
out := make(chan uint64)
for _, in := range inputs {
go func(in <-chan uint64) {
for {
out <- <-in // <=> out <- (<-in)
}
}(in)
}
return out
}
一個(gè)更完美的實(shí)現(xiàn)需要考慮一個(gè)輸入數(shù)據(jù)流是否已經(jīng)關(guān)閉。(下面要介紹的其它工作協(xié)程同理。)
import "sync"
func Aggregator(inputs ...<-chan uint64) <-chan uint64 {
output := make(chan uint64)
var wg sync.WaitGroup
for _, in := range inputs {
wg.Add(1)
go func(int <-chan uint64) {
defer wg.Done()
// 如果通道in被關(guān)閉,此循環(huán)將最終結(jié)束。
for x := range in {
output <- x
}
}(in)
}
go func() {
wg.Wait()
close(output)
}()
return output
}
如果被聚合的數(shù)據(jù)流的數(shù)量很小,我們也可以使用一個(gè)select
控制流程代碼塊來聚合這些數(shù)據(jù)流。
// 假設(shè)數(shù)據(jù)流的數(shù)量為2。
...
output := make(chan uint64)
go func() {
inA, inB := inputs[0], inputs[1]
for {
select {
case v := <- inA: output <- v
case v := <- inB: output <- v
}
}
}
...
數(shù)據(jù)分流是數(shù)據(jù)聚合的逆過程。數(shù)據(jù)分流的實(shí)現(xiàn)很簡單,但在實(shí)踐中用的并不多。
func Divisor(input <-chan uint64, outputs ...chan<- uint64) {
for _, out := range outputs {
go func(o chan<- uint64) {
for {
o <- <-input // <=> o <- (<-input)
}
}(out)
}
}
數(shù)據(jù)合成將多個(gè)數(shù)據(jù)流中讀取的數(shù)據(jù)合成一個(gè)。
下面是一個(gè)數(shù)據(jù)合成工作函數(shù)的實(shí)現(xiàn)中,從兩個(gè)不同數(shù)據(jù)流讀取的兩個(gè)uint64
值組成了一個(gè)新的uint64
值。 當(dāng)然,在實(shí)踐中,數(shù)據(jù)的組合比這復(fù)雜得多。
func Composor(inA, inB <-chan uint64) <-chan uint64 {
output := make(chan uint64)
go func() {
for {
a1, b, a2 := <-inA, <-inB, <-inA
output <- a1 ^ b & a2
}
}()
return output
}
數(shù)據(jù)分解是數(shù)據(jù)合成的逆過程。一個(gè)數(shù)據(jù)分解者從一個(gè)通道讀取一份數(shù)據(jù),并將此數(shù)據(jù)分解為多份數(shù)據(jù)。 這里就不舉例了。
數(shù)據(jù)復(fù)制(增殖)可以看作是特殊的數(shù)據(jù)分解。一份輸入數(shù)據(jù)將被復(fù)制多份并輸出給多個(gè)數(shù)據(jù)流。
一個(gè)例子:
func Duplicator(in <-chan uint64) (<-chan uint64, <-chan uint64) {
outA, outB := make(chan uint64), make(chan uint64)
go func() {
for x := range in {
outA <- x
outB <- x
}
}()
return outA, outB
}
數(shù)據(jù)計(jì)算和數(shù)據(jù)分析模塊的功能因具體程序不同而有很大的差異。 一般來說,數(shù)據(jù)分析者接收一份數(shù)據(jù)并對之加工處理后轉(zhuǎn)換為另一份數(shù)據(jù)。
下面的簡單示例中,每個(gè)輸入的uint64
值將被進(jìn)行位反轉(zhuǎn)后輸出。
func Calculator(in <-chan uint64, out chan uint64) (<-chan uint64) {
if out == nil {
out = make(chan uint64)
}
go func() {
for x := range in {
out <- ^x
}
}()
return out
}
一個(gè)數(shù)據(jù)驗(yàn)證或過濾者的任務(wù)是檢查輸入數(shù)據(jù)的合理性并拋棄不合理的數(shù)據(jù)。 比如,下面的工作者協(xié)程將拋棄所有的非素?cái)?shù)。
import "math/big"
func Filter0(input <-chan uint64, output chan uint64) <-chan uint64 {
if output == nil {
output = make(chan uint64)
}
go func() {
bigInt := big.NewInt(0)
for x := range input {
bigInt.SetUint64(x)
if bigInt.ProbablyPrime(1) {
output <- x
}
}
}()
return output
}
func Filter(input <-chan uint64) <-chan uint64 {
return Filter0(input, nil)
}
請注意這兩個(gè)函數(shù)版本分別被本文下面最后展示的兩個(gè)例子所使用。
一般,一個(gè)數(shù)據(jù)服務(wù)或者存盤模塊為一個(gè)數(shù)據(jù)流系統(tǒng)中的最后一個(gè)模塊。 這里的實(shí)現(xiàn)值是簡單地將數(shù)據(jù)輸出到終端。
import "fmt"
func Printer(input <-chan uint64) {
for x := range input {
fmt.Println(x)
}
}
現(xiàn)在,讓我們使用上面的模塊工作者函數(shù)實(shí)現(xiàn)來組裝一些數(shù)據(jù)流系統(tǒng)。 組裝數(shù)據(jù)流僅僅是創(chuàng)建一些工作者協(xié)程函數(shù)調(diào)用,并為這些調(diào)用指定輸入數(shù)據(jù)流和輸出數(shù)據(jù)流。
數(shù)據(jù)流系統(tǒng)例子1(一個(gè)流線型系統(tǒng)):
package main
... // 上面的模塊工作者函數(shù)實(shí)現(xiàn)
func main() {
Printer(
Filter(
Calculator(
RandomGenerator(), nil,
),
),
)
}
上面這個(gè)流線型系統(tǒng)描繪在下圖中:
數(shù)據(jù)流系統(tǒng)例子2(一個(gè)單向無環(huán)圖系統(tǒng)):
package main
... // 上面的模塊工作者函數(shù)實(shí)現(xiàn)
func main() {
filterA := Filter(RandomGenerator())
filterB := Filter(RandomGenerator())
filterC := Filter(RandomGenerator())
filter := Aggregator(filterA, filterB, filterC)
calculatorA := Calculator(filter, nil)
calculatorB := Calculator(filter, nil)
calculator := Aggregator(calculatorA, calculatorB)
Printer(calculator)
}
上面這個(gè)單向無環(huán)圖系統(tǒng)描繪在下圖中:
更復(fù)雜的數(shù)據(jù)流系統(tǒng)可以表示為任何拓?fù)浣Y(jié)構(gòu)的圖。比如一個(gè)復(fù)雜的數(shù)據(jù)流系統(tǒng)可能有多個(gè)輸出模塊。 但是有環(huán)拓?fù)浣Y(jié)構(gòu)的數(shù)據(jù)流系統(tǒng)在實(shí)踐中很少用。
從上面兩個(gè)例子可以看出,使用通道來構(gòu)建數(shù)據(jù)流系統(tǒng)是很簡單和直觀的。
從上例可以看出,通過使用數(shù)據(jù)聚合模塊,我們可以很輕松地實(shí)現(xiàn)各個(gè)模塊的工作協(xié)程數(shù)量的扇入(fan-in)和扇出(fan-out)。
事實(shí)上,我們也可以使用一個(gè)簡單的通道來代替數(shù)據(jù)聚合模塊的角色。比如,下面的代碼使用兩個(gè)通道代替了上例中的兩個(gè)數(shù)據(jù)聚合器。
package main
... // 上面的模塊工作者函數(shù)實(shí)現(xiàn)
func main() {
c1 := make(chan uint64, 100)
Filter0(RandomGenerator(), c1) // filterA
Filter0(RandomGenerator(), c1) // filterB
Filter0(RandomGenerator(), c1) // filterC
c2 := make(chan uint64, 100)
Calculator(c1, c2) // calculatorA
Calculator(c1, c2) // calculatorB
Printer(c2)
}
修改后的數(shù)據(jù)流的拓?fù)浣Y(jié)構(gòu)如下圖所示:
上面的代碼示例并沒有太多考慮如何關(guān)閉一個(gè)數(shù)據(jù)流。請閱讀此篇文章來了解如何優(yōu)雅地關(guān)閉通道。
更多建議: