Go語言 如何優(yōu)雅地關(guān)閉通道

2023-02-16 17:39 更新

在本文發(fā)表數(shù)日前,我曾寫了一篇文章來解釋通道的規(guī)則。 那篇文章在redditHN上獲得了很多點(diǎn)贊,但也有很多人對(duì)Go通道的細(xì)節(jié)設(shè)計(jì)提出了一些批評(píng)意見。

這些批評(píng)主要針對(duì)于通道設(shè)計(jì)中的下列細(xì)節(jié):

  1. 沒有一個(gè)簡單和通用的方法用來在不改變一個(gè)通道的狀態(tài)的情況下檢查這個(gè)通道是否已經(jīng)關(guān)閉。
  2. 關(guān)閉一個(gè)已經(jīng)關(guān)閉的通道將產(chǎn)生一個(gè)恐慌,所以在不知道一個(gè)通道是否已經(jīng)關(guān)閉的時(shí)候關(guān)閉此通道是很危險(xiǎn)的。
  3. 向一個(gè)已關(guān)閉的通道發(fā)送數(shù)據(jù)將產(chǎn)生一個(gè)恐慌,所以在不知道一個(gè)通道是否已經(jīng)關(guān)閉的時(shí)候向此通道發(fā)送數(shù)據(jù)是很危險(xiǎn)的。

這些批評(píng)看上去有幾分道理(實(shí)際上屬于對(duì)通道的不正確使用導(dǎo)致的偏見)。 是的,Go語言中并沒有提供一個(gè)內(nèi)置函數(shù)來檢查一個(gè)通道是否已經(jīng)關(guān)閉。

在Go中,如果我們能夠保證從不會(huì)向一個(gè)通道發(fā)送數(shù)據(jù),那么有一個(gè)簡單的方法來判斷此通道是否已經(jīng)關(guān)閉。 此方法已經(jīng)在上一篇文章通道用例大全中展示過了。 這里為了本文的連貫性,在下面的例子中重新列出了此方法。

package main

import "fmt"

type T int

func IsClosed(ch <-chan T) bool {
	select {
	case <-ch:
		return true
	default:
	}

	return false
}

func main() {
	c := make(chan T)
	fmt.Println(IsClosed(c)) // false
	close(c)
	fmt.Println(IsClosed(c)) // true
}

如前所述,此方法并不是一個(gè)通用的檢查通道是否已經(jīng)關(guān)閉的方法。

事實(shí)上,即使有一個(gè)內(nèi)置closed函數(shù)用來檢查一個(gè)通道是否已經(jīng)關(guān)閉,它的有用性也是十分有限的。 原因是當(dāng)此函數(shù)的一個(gè)調(diào)用的結(jié)果返回時(shí),被查詢的通道的狀態(tài)可能已經(jīng)又改變了,導(dǎo)致此調(diào)用結(jié)果并不能反映出被查詢的通道的最新狀態(tài)。 雖然我們可以根據(jù)一個(gè)調(diào)用closed(ch)的返回結(jié)果為true而得出我們不應(yīng)該再向通道ch發(fā)送數(shù)據(jù)的結(jié)論, 但是我們不能根據(jù)一個(gè)調(diào)用closed(ch)的返回結(jié)果為false而得出我們可以繼續(xù)向通道ch發(fā)送數(shù)據(jù)的結(jié)論。

通道關(guān)閉原則

一個(gè)常用的使用Go通道的原則是不要在數(shù)據(jù)接收方或者在有多個(gè)發(fā)送者的情況下關(guān)閉通道。 換句話說,我們只應(yīng)該讓一個(gè)通道唯一的發(fā)送者關(guān)閉此通道。

下面我們將稱此原則為通道關(guān)閉原則。

當(dāng)然,這并不是一個(gè)通用的關(guān)閉通道的原則。通用的原則是不要關(guān)閉已關(guān)閉的通道。 如果我們能夠保證從某個(gè)時(shí)刻之后,再?zèng)]有協(xié)程將向一個(gè)未關(guān)閉的非nil通道發(fā)送數(shù)據(jù),則一個(gè)協(xié)程可以安全地關(guān)閉此通道。 然而,做出這樣的保證常常需要很大的努力,從而導(dǎo)致代碼過度復(fù)雜。 另一方面,遵循通道關(guān)閉原則是一件相對(duì)簡單的事兒。

粗魯?shù)仃P(guān)閉通道的方法

如果由于某種原因,你一定非要從數(shù)據(jù)接收方或者讓眾多發(fā)送者中的一個(gè)關(guān)閉一個(gè)通道,你可以使用恢復(fù)機(jī)制來防止可能產(chǎn)生的恐慌而導(dǎo)致程序崩潰。 下面就是這樣的一個(gè)實(shí)現(xiàn)(假設(shè)通道的元素類型為T)。

func SafeClose(ch chan T) (justClosed bool) {
	defer func() {
		if recover() != nil {
			// 一個(gè)函數(shù)的返回結(jié)果可以在defer調(diào)用中修改。
			justClosed = false
		}
	}()

	// 假設(shè)ch != nil。
	close(ch)   // 如果ch已關(guān)閉,則產(chǎn)生一個(gè)恐慌。
	return true // <=> justClosed = true; return
}

此方法違反了通道關(guān)閉原則。

同樣的方法可以用來粗魯?shù)叵蛞粋€(gè)關(guān)閉狀態(tài)未知的通道發(fā)送數(shù)據(jù)。

func SafeSend(ch chan T, value T) (closed bool) {
	defer func() {
		if recover() != nil {
			closed = true
		}
	}()

	ch <- value  // 如果ch已關(guān)閉,則產(chǎn)生一個(gè)恐慌。
	return false // <=> closed = false; return
}

這樣的粗魯方法不僅違反了通道關(guān)閉原則,而且Go白皮書和標(biāo)準(zhǔn)編譯器不保證它的實(shí)現(xiàn)中不存在數(shù)據(jù)競爭。

禮貌地關(guān)閉通道的方法

很多Go程序員喜歡使用sync.Once來關(guān)閉通道。

type MyChannel struct {
	C    chan T
	once sync.Once
}

func NewMyChannel() *MyChannel {
	return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {
	mc.once.Do(func() {
		close(mc.C)
	})
}

當(dāng)然,我們也可以使用sync.Mutex來防止多次關(guān)閉一個(gè)通道。

type MyChannel struct {
	C      chan T
	closed bool
	mutex  sync.Mutex
}

func NewMyChannel() *MyChannel {
	return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {
	mc.mutex.Lock()
	defer mc.mutex.Unlock()
	if !mc.closed {
		close(mc.C)
		mc.closed = true
	}
}

func (mc *MyChannel) IsClosed() bool {
	mc.mutex.Lock()
	defer mc.mutex.Unlock()
	return mc.closed
}

這些實(shí)現(xiàn)確實(shí)比上一節(jié)中的方法禮貌一些,但是它們不能完全有效地避免數(shù)據(jù)競爭。 目前的Go白皮書并不保證發(fā)生在一個(gè)通道上的并發(fā)關(guān)閉操作和發(fā)送操作不會(huì)產(chǎn)生數(shù)據(jù)競爭。 如果一個(gè)SafeClose函數(shù)和同一個(gè)通道上的發(fā)送操作同時(shí)運(yùn)行,則數(shù)據(jù)競爭可能發(fā)生(雖然這樣的數(shù)據(jù)競爭一般并不會(huì)帶來什么危害)。

優(yōu)雅地關(guān)閉通道的方法

上一節(jié)中介紹的SafeSend函數(shù)有一個(gè)弊端,它的調(diào)用不能做為case操作而被使用在select代碼塊中。 另外,很多Go程序員(包括我)認(rèn)為上面兩節(jié)展示的關(guān)閉通道的方法不是很優(yōu)雅。 本節(jié)下面將介紹一些在各種情形下使用純通道操作來關(guān)閉通道的方法。

(為了演示程序的完整性,下面這些例子中使用到了sync.WaitGroup。在實(shí)踐中,sync.WaitGroup并不是必需的。)

情形一:M個(gè)接收者和一個(gè)發(fā)送者。發(fā)送者通過關(guān)閉用來傳輸數(shù)據(jù)的通道來傳遞發(fā)送結(jié)束信號(hào)

這是最簡單的一種情形。當(dāng)發(fā)送者欲結(jié)束發(fā)送,讓它關(guān)閉用來傳輸數(shù)據(jù)的通道即可。

package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)

	// ...
	const Max = 100000
	const NumReceivers = 100

	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(NumReceivers)

	// ...
	dataCh := make(chan int)

	// 發(fā)送者
	go func() {
		for {
			if value := rand.Intn(Max); value == 0 {
				// 此唯一的發(fā)送者可以安全地關(guān)閉此數(shù)據(jù)通道。
				close(dataCh)
				return
			} else {
				dataCh <- value
			}
		}
	}()

	// 接收者
	for i := 0; i < NumReceivers; i++ {
		go func() {
			defer wgReceivers.Done()

			// 接收數(shù)據(jù)直到通道dataCh已關(guān)閉
			// 并且dataCh的緩沖隊(duì)列已空。
			for value := range dataCh {
				log.Println(value)
			}
		}()
	}

	wgReceivers.Wait()
}

情形二:一個(gè)接收者和N個(gè)發(fā)送者,此唯一接收者通過關(guān)閉一個(gè)額外的信號(hào)通道來通知發(fā)送者不要再發(fā)送數(shù)據(jù)了

此情形比上一種情形復(fù)雜一些。我們不能讓接收者關(guān)閉用來傳輸數(shù)據(jù)的通道來停止數(shù)據(jù)傳輸,因?yàn)檫@樣做違反了通道關(guān)閉原則。 但是我們可以讓接收者關(guān)閉一個(gè)額外的信號(hào)通道來通知發(fā)送者不要再發(fā)送數(shù)據(jù)了。

package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)

	// ...
	const Max = 100000
	const NumSenders = 1000

	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(1)

	// ...
	dataCh := make(chan int)
	stopCh := make(chan struct{})
		// stopCh是一個(gè)額外的信號(hào)通道。它的
		// 發(fā)送者為dataCh數(shù)據(jù)通道的接收者。
		// 它的接收者為dataCh數(shù)據(jù)通道的發(fā)送者。

	// 發(fā)送者
	for i := 0; i < NumSenders; i++ {
		go func() {
			for {
				// 這里的第一個(gè)嘗試接收用來讓此發(fā)送者
				// 協(xié)程盡早地退出。對(duì)于這個(gè)特定的例子,
				// 此select代碼塊并非必需。
				select {
				case <- stopCh:
					return
				default:
				}

				// 即使stopCh已經(jīng)關(guān)閉,此第二個(gè)select
				// 代碼塊中的第一個(gè)分支仍很有可能在若干個(gè)
				// 循環(huán)步內(nèi)依然不會(huì)被選中。如果這是不可接受
				// 的,則上面的第一個(gè)select代碼塊是必需的。
				select {
				case <- stopCh:
					return
				case dataCh <- rand.Intn(Max):
				}
			}
		}()
	}

	// 接收者
	go func() {
		defer wgReceivers.Done()

		for value := range dataCh {
			if value == Max-1 {
				// 此唯一的接收者同時(shí)也是stopCh通道的
				// 唯一發(fā)送者。盡管它不能安全地關(guān)閉dataCh數(shù)
				// 據(jù)通道,但它可以安全地關(guān)閉stopCh通道。
				close(stopCh)
				return
			}

			log.Println(value)
		}
	}()

	// ...
	wgReceivers.Wait()
}

如此例中的注釋所述,對(duì)于此額外的信號(hào)通道stopCh,它只有一個(gè)發(fā)送者,即dataCh數(shù)據(jù)通道的唯一接收者。 dataCh數(shù)據(jù)通道的接收者關(guān)閉了信號(hào)通道stopCh,這是不違反通道關(guān)閉原則的。

在此例中,數(shù)據(jù)通道dataCh并沒有被關(guān)閉。是的,我們不必關(guān)閉它。 當(dāng)一個(gè)通道不再被任何協(xié)程所使用后,它將逐漸被垃圾回收掉,無論它是否已經(jīng)被關(guān)閉。 所以這里的優(yōu)雅性體現(xiàn)在通過不關(guān)閉一個(gè)通道來停止使用此通道。

情形三:M個(gè)接收者和N個(gè)發(fā)送者。它們中的任何協(xié)程都可以讓一個(gè)中間調(diào)解協(xié)程幫忙發(fā)出停止數(shù)據(jù)傳送的信號(hào)

這是最復(fù)雜的一種情形。我們不能讓接收者和發(fā)送者中的任何一個(gè)關(guān)閉用來傳輸數(shù)據(jù)的通道,我們也不能讓多個(gè)接收者之一關(guān)閉一個(gè)額外的信號(hào)通道。 這兩種做法都違反了通道關(guān)閉原則。 然而,我們可以引入一個(gè)中間調(diào)解者角色并讓其關(guān)閉額外的信號(hào)通道來通知所有的接收者和發(fā)送者結(jié)束工作。 具體實(shí)現(xiàn)見下例。注意其中使用了一個(gè)嘗試發(fā)送操作來向中間調(diào)解者發(fā)送信號(hào)。

package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
	"strconv"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)

	// ...
	const Max = 100000
	const NumReceivers = 10
	const NumSenders = 1000

	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(NumReceivers)

	// ...
	dataCh := make(chan int)
	stopCh := make(chan struct{})
		// stopCh是一個(gè)額外的信號(hào)通道。它的發(fā)送
		// 者為中間調(diào)解者。它的接收者為dataCh
		// 數(shù)據(jù)通道的所有的發(fā)送者和接收者。
	toStop := make(chan string, 1)
		// toStop是一個(gè)用來通知中間調(diào)解者讓其
		// 關(guān)閉信號(hào)通道stopCh的第二個(gè)信號(hào)通道。
		// 此第二個(gè)信號(hào)通道的發(fā)送者為dataCh數(shù)據(jù)
		// 通道的所有的發(fā)送者和接收者,它的接收者
		// 為中間調(diào)解者。它必須為一個(gè)緩沖通道。

	var stoppedBy string

	// 中間調(diào)解者
	go func() {
		stoppedBy = <-toStop
		close(stopCh)
	}()

	// 發(fā)送者
	for i := 0; i < NumSenders; i++ {
		go func(id string) {
			for {
				value := rand.Intn(Max)
				if value == 0 {
					// 為了防止阻塞,這里使用了一個(gè)嘗試
					// 發(fā)送操作來向中間調(diào)解者發(fā)送信號(hào)。
					select {
					case toStop <- "發(fā)送者#" + id:
					default:
					}
					return
				}

				// 此處的嘗試接收操作是為了讓此發(fā)送協(xié)程盡早
				// 退出。標(biāo)準(zhǔn)編譯器對(duì)嘗試接收和嘗試發(fā)送做了
				// 特殊的優(yōu)化,因而它們的速度很快。
				select {
				case <- stopCh:
					return
				default:
				}

				// 即使stopCh已關(guān)閉,如果這個(gè)select代碼塊
				// 中第二個(gè)分支的發(fā)送操作是非阻塞的,則第一個(gè)
				// 分支仍很有可能在若干個(gè)循環(huán)步內(nèi)依然不會(huì)被選
				// 中。如果這是不可接受的,則上面的第一個(gè)嘗試
				// 接收操作代碼塊是必需的。
				select {
				case <- stopCh:
					return
				case dataCh <- value:
				}
			}
		}(strconv.Itoa(i))
	}

	// 接收者
	for i := 0; i < NumReceivers; i++ {
		go func(id string) {
			defer wgReceivers.Done()

			for {
				// 和發(fā)送者協(xié)程一樣,此處的嘗試接收操作是為了
				// 讓此接收協(xié)程盡早退出。
				select {
				case <- stopCh:
					return
				default:
				}

				// 即使stopCh已關(guān)閉,如果這個(gè)select代碼塊
				// 中第二個(gè)分支的接收操作是非阻塞的,則第一個(gè)
				// 分支仍很有可能在若干個(gè)循環(huán)步內(nèi)依然不會(huì)被選
				// 中。如果這是不可接受的,則上面嘗試接收操作
				// 代碼塊是必需的。
				select {
				case <- stopCh:
					return
				case value := <-dataCh:
					if value == Max-1 {
						// 為了防止阻塞,這里使用了一個(gè)嘗試
						// 發(fā)送操作來向中間調(diào)解者發(fā)送信號(hào)。
						select {
						case toStop <- "接收者#" + id:
						default:
						}
						return
					}

					log.Println(value)
				}
			}
		}(strconv.Itoa(i))
	}

	// ...
	wgReceivers.Wait()
	log.Println("被" + stoppedBy + "終止了")
}

在此例中,通道關(guān)閉原則依舊得到了遵守。

請(qǐng)注意,信號(hào)通道toStop的容量必須至少為1。 如果它的容量為0,則在中間調(diào)解者還未準(zhǔn)備好的情況下就已經(jīng)有某個(gè)協(xié)程向toStop發(fā)送信號(hào)時(shí),此信號(hào)將被拋棄。

我們也可以不使用嘗試發(fā)送操作向中間調(diào)解者發(fā)送信號(hào),但信號(hào)通道toStop的容量必須至少為數(shù)據(jù)發(fā)送者和數(shù)據(jù)接收者的數(shù)量之和,以防止向其發(fā)送數(shù)據(jù)時(shí)(有一個(gè)極其微小的可能)導(dǎo)致某些發(fā)送者和接收者協(xié)程永久阻塞。

...
toStop := make(chan string, NumReceivers + NumSenders)
...
			value := rand.Intn(Max)
			if value == 0 {
				toStop <- "sender#" + id
				return
			}
...
				if value == Max-1 {
					toStop <- "receiver#" + id
					return
				}
...

情形四:“M個(gè)接收者和一個(gè)發(fā)送者”情形的一個(gè)變種:用來傳輸數(shù)據(jù)的通道的關(guān)閉請(qǐng)求由第三方發(fā)出

有時(shí),數(shù)據(jù)通道(dataCh)的關(guān)閉請(qǐng)求需要由某個(gè)第三方協(xié)程發(fā)出。對(duì)于這種情形,我們可以使用一個(gè)額外的信號(hào)通道來通知唯一的發(fā)送者關(guān)閉數(shù)據(jù)通道(dataCh)。

package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)

	// ...
	const Max = 100000
	const NumReceivers = 100
	const NumThirdParties = 15

	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(NumReceivers)

	// ...
	dataCh := make(chan int)
	closing := make(chan struct{}) // 信號(hào)通道
	closed := make(chan struct{})
	
	// 此stop函數(shù)可以被安全地多次調(diào)用。
	stop := func() {
		select {
		case closing<-struct{}{}:
			<-closed
		case <-closed:
		}
	}
	
	// 一些第三方協(xié)程
	for i := 0; i < NumThirdParties; i++ {
		go func() {
			r := 1 + rand.Intn(3)
			time.Sleep(time.Duration(r) * time.Second)
			stop()
		}()
	}

	// 發(fā)送者
	go func() {
		defer func() {
			close(closed)
			close(dataCh)
		}()

		for {
			select{
			case <-closing: return
			default:
			}

			select{
			case <-closing: return
			case dataCh <- rand.Intn(Max):
			}
		}
	}()

	// 接收者
	for i := 0; i < NumReceivers; i++ {
		go func() {
			defer wgReceivers.Done()

			for value := range dataCh {
				log.Println(value)
			}
		}()
	}

	wgReceivers.Wait()
}

上述代碼中的stop函數(shù)中使用的技巧偷自Roger Peppe在此貼中的一個(gè)留言。

情形五:“N個(gè)發(fā)送者”的一個(gè)變種:用來傳輸數(shù)據(jù)的通道必須被關(guān)閉以通知各個(gè)接收者數(shù)據(jù)發(fā)送已經(jīng)結(jié)束了

在上面的提到的“N個(gè)發(fā)送者”情形中,為了遵守通道關(guān)閉原則,我們避免了關(guān)閉數(shù)據(jù)通道(dataCh)。 但是有時(shí)候,數(shù)據(jù)通道(dataCh)必須被關(guān)閉以通知各個(gè)接收者數(shù)據(jù)發(fā)送已經(jīng)結(jié)束。 對(duì)于這種“N個(gè)發(fā)送者”情形,我們可以使用一個(gè)中間通道將它們轉(zhuǎn)化為“一個(gè)發(fā)送者”情形,然后繼續(xù)使用上一節(jié)介紹的技巧來關(guān)閉此中間通道,從而避免了關(guān)閉原始的dataCh數(shù)據(jù)通道。

package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
	"strconv"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)

	// ...
	const Max = 1000000
	const NumReceivers = 10
	const NumSenders = 1000
	const NumThirdParties = 15

	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(NumReceivers)

	// ...
	dataCh := make(chan int)   // 將被關(guān)閉
	middleCh := make(chan int) // 不會(huì)被關(guān)閉
	closing := make(chan string)
	closed := make(chan struct{})

	var stoppedBy string

	stop := func(by string) {
		select {
		case closing <- by:
			<-closed
		case <-closed:
		}
	}
	
	// 中間層
	go func() {
		exit := func(v int, needSend bool) {
			close(closed)
			if needSend {
				dataCh <- v
			}
			close(dataCh)
		}

		for {
			select {
			case stoppedBy = <-closing:
				exit(0, false)
				return
			case v := <- middleCh:
				select {
				case stoppedBy = <-closing:
					exit(v, true)
					return
				case dataCh <- v:
				}
			}
		}
	}()
	
	// 一些第三方協(xié)程
	for i := 0; i < NumThirdParties; i++ {
		go func(id string) {
			r := 1 + rand.Intn(3)
			time.Sleep(time.Duration(r) * time.Second)
			stop("3rd-party#" + id)
		}(strconv.Itoa(i))
	}

	// 發(fā)送者
	for i := 0; i < NumSenders; i++ {
		go func(id string) {
			for {
				value := rand.Intn(Max)
				if value == 0 {
					stop("sender#" + id)
					return
				}

				select {
				case <- closed:
					return
				default:
				}

				select {
				case <- closed:
					return
				case middleCh <- value:
				}
			}
		}(strconv.Itoa(i))
	}

	// 接收者
	for range [NumReceivers]struct{}{} {
		go func() {
			defer wgReceivers.Done()

			for value := range dataCh {
				log.Println(value)
			}
		}()
	}

	// ...
	wgReceivers.Wait()
	log.Println("stopped by", stoppedBy)
}

更多情形?

在日常編程中可能會(huì)遇到更多的變種情形,但是上面介紹的情形是最常見和最基本的。 通過聰明地使用通道(和其它并發(fā)同步技術(shù)),我相信,對(duì)于各種變種,我們總會(huì)找到相應(yīng)的遵守通道關(guān)閉原則的解決方法。

結(jié)論

并沒有什么情況非得逼得我們違反通道關(guān)閉原則。 如果你遇到了此情形,請(qǐng)考慮修改你的代碼流程和結(jié)構(gòu)設(shè)計(jì)。

使用通道編程宛如在藝術(shù)創(chuàng)作一般!


以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)