Go 語(yǔ)言并發(fā)

2022-04-24 15:49 更新

基本概念

并發(fā)與并行

并發(fā):同一時(shí)間段內(nèi)執(zhí)行多個(gè)任務(wù)(你早上在編程獅學(xué)習(xí)Java和Python)

并行:同一時(shí)刻執(zhí)行多個(gè)任務(wù)(你和你的網(wǎng)友早上都在使用編程獅學(xué)習(xí)Go)

Go語(yǔ)言中的并發(fā)程序主要是通過(guò)基于CSP(communicating sequential processes)的goroutine和channel來(lái)實(shí)現(xiàn),當(dāng)然也支持使用傳統(tǒng)的多線程共享內(nèi)存的并發(fā)方式

goroutine

Go語(yǔ)言中使用goroutine非常簡(jiǎn)單,只需要在函數(shù)或者方法前面加上go關(guān)鍵字就可以創(chuàng)建一個(gè)goroutine,從而讓該函數(shù)或者方法在新的goroutine中執(zhí)行

匿名函數(shù)同樣也支持使用go關(guān)鍵字來(lái)創(chuàng)建goroutine去執(zhí)行

一個(gè)goroutine必定對(duì)應(yīng)一個(gè)函數(shù)或者方法,可以創(chuàng)建多個(gè)goroutine去執(zhí)行相同的函數(shù)或者方法

啟動(dòng)單個(gè)goroutine

啟動(dòng)方式非常簡(jiǎn)單,我們先來(lái)看一個(gè)案例

package main

import (
	"fmt"
)

func hello() {
	fmt.Println("hello")
}

func main() {
	go hello()
	fmt.Println("歡迎來(lái)到編程獅")
}

以上代碼輸出結(jié)果如下

歡迎來(lái)到編程獅

上述代碼執(zhí)行結(jié)果只在終端控制臺(tái)輸出了“歡迎來(lái)到編程獅”,并沒(méi)有打印“hello”,這是為什么呢 ?.

其實(shí)在Go程序中,會(huì)默認(rèn)為main函數(shù)創(chuàng)建一個(gè)goroutine,而在上述代碼中我們使用go關(guān)鍵字創(chuàng)建了一個(gè)新的goroutine去調(diào)用hello函數(shù)。而此時(shí)main的goroutine還在往下執(zhí)行中,我們的程序中存在兩個(gè)并發(fā)執(zhí)行的goroutine。當(dāng)main函數(shù)結(jié)束時(shí),整個(gè)程序也結(jié)束了,所有由main函數(shù)創(chuàng)建的子goroutine也會(huì)跟著退出,也就是說(shuō)我們的main函數(shù)執(zhí)行過(guò)快退出導(dǎo)致另一個(gè)goroutine內(nèi)容還未執(zhí)行就退出了,導(dǎo)致未能打印出hello

所以我們這邊要想辦法讓main函數(shù)等一等,讓另一個(gè)goroutine的內(nèi)容執(zhí)行完。其中最簡(jiǎn)單的方法就是在main函數(shù)中使用time.sleep睡眠一秒鐘

按如下方式修改

package main

import (
	"fmt"
	"time"
)

func hello(){
	fmt.Println("hello")
}

func main() {
	go hello()
	fmt.Println("歡迎來(lái)到編程獅")
	time.Sleep(time.Second)
}

此時(shí)的輸出結(jié)果為

歡迎來(lái)到編程獅

hello

為什么會(huì)先打印歡迎來(lái)到編程獅呢?

這是因?yàn)樵诔绦蛑袆?chuàng)建 goroutine 執(zhí)行函數(shù)需要一定的開(kāi)銷,而與此同時(shí) main 函數(shù)所在的 goroutine 是繼續(xù)執(zhí)行的。

sync.WaitGroup

在上述代碼中使用time.sleep的方法是不準(zhǔn)確的

Go語(yǔ)言中的sync包為我們提供了一些常用的并發(fā)原語(yǔ)

在這一小節(jié),我們介紹一下sync包中的WaitGroup。當(dāng)你并不關(guān)心并發(fā)操作的結(jié)果或者有其它方式收集并發(fā)操作的結(jié)果時(shí),WaitGroup是實(shí)現(xiàn)等待一組并發(fā)操作完成的好方法

我們?cè)傩薷南律鲜龃a

package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup

func hello() {
	fmt.Println("hello")
	defer wg.Done()//把計(jì)算器-1
}

func main() {
	wg.Add(1)//把計(jì)數(shù)器+1
	go hello()
	fmt.Println("歡迎來(lái)到編程獅")
	wg.Wait()//阻塞代碼的運(yùn)行,直到計(jì)算器為0
}

以上代碼輸出結(jié)果如下

歡迎來(lái)到編程獅
hello

啟動(dòng)多個(gè)goroutine

在Go語(yǔ)言中啟動(dòng)并發(fā)就是這么簡(jiǎn)單,接下來(lái)我們看看如何啟動(dòng)多個(gè)goroutine

package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup

func hello(i int) {
	fmt.Printf("hello,歡迎來(lái)到編程獅%v\n", i)
	defer wg.Done()//goroutine結(jié)束計(jì)數(shù)器-1
}

func main() {
	for i := 0; i < 10; i++ {
		go hello(i)
		wg.Add(1)//啟動(dòng)一個(gè)goroutine計(jì)數(shù)器+1
	}
	wg.Wait()//等待所有的goroutine執(zhí)行結(jié)束
}

以上代碼執(zhí)行結(jié)果如下

hello,歡迎來(lái)到編程獅6
hello,歡迎來(lái)到編程獅9
hello,歡迎來(lái)到編程獅4
hello,歡迎來(lái)到編程獅7
hello,歡迎來(lái)到編程獅8
hello,歡迎來(lái)到編程獅0
hello,歡迎來(lái)到編程獅3
hello,歡迎來(lái)到編程獅2
hello,歡迎來(lái)到編程獅1
hello,歡迎來(lái)到編程獅5

執(zhí)行多次上述代碼你會(huì)發(fā)現(xiàn)輸出順序并不一致,這是因?yàn)?0個(gè)goroutine都是并發(fā)執(zhí)行的,而goroutine的調(diào)度是隨機(jī)的

動(dòng)態(tài)棧

操作系統(tǒng)的線程一般都有固定的棧內(nèi)存(通常為2MB),而 Go 語(yǔ)言中的 goroutine 非常輕量級(jí),一個(gè) goroutine 的初始棧空間很小(一般為2KB),所以在 Go 語(yǔ)言中一次創(chuàng)建數(shù)萬(wàn)個(gè) goroutine 也是可能的。并且 goroutine 的棧不是固定的,可以根據(jù)需要?jiǎng)討B(tài)地增大或縮小, Go 的 runtime 會(huì)自動(dòng)為 goroutine 分配合適的棧空間。

goroutine調(diào)度

在經(jīng)過(guò)數(shù)個(gè)版本迭代之后,目前Go語(yǔ)言的調(diào)度器采用的是GPM調(diào)度模型

  • G: 表示goroutine,存儲(chǔ)了goroutine的執(zhí)行stack信息、goroutine狀態(tài)以及goroutine的任務(wù)函數(shù)等;另外G對(duì)象是可以重用的。
  • P: 表示邏輯processor,P的數(shù)量決定了系統(tǒng)內(nèi)最大可并行的G的數(shù)量(前提:系統(tǒng)的物理cpu核數(shù)>=P的數(shù)量);P的最大作用還是其擁有的各種G對(duì)象隊(duì)列、鏈表、一些cache和狀態(tài)。
  • M: M代表著真正的執(zhí)行計(jì)算資源。在綁定有效的p后,進(jìn)入schedule循環(huán);而schedule循環(huán)的機(jī)制大致是從各種隊(duì)列、p的本地隊(duì)列中獲取G,切換到G的執(zhí)行棧上并執(zhí)行G的函數(shù),調(diào)用goexit做清理工作并回到m,如此反復(fù)。M并不保留G狀態(tài),這是G可以跨M調(diào)度的基礎(chǔ)。

GOMAXPROCS

Go運(yùn)行時(shí),調(diào)度器使用GOMAXPROCS的參數(shù)來(lái)決定需要使用多少個(gè)OS線程來(lái)同時(shí)執(zhí)行Go代碼。默認(rèn)值是當(dāng)前計(jì)算機(jī)的CPU核心數(shù)。例如在一個(gè)8核處理器的電腦上,GOMAXPROCS默認(rèn)值為8。Go語(yǔ)言中可以使用runtime.GOMAXPROCS()函數(shù)設(shè)置當(dāng)前程序并發(fā)時(shí)占用的CPU核心數(shù)

channel

單純地將函數(shù)并發(fā)執(zhí)行是沒(méi)有意義的,函數(shù)與函數(shù)間需要交換數(shù)據(jù)才能體現(xiàn)并發(fā)執(zhí)行函數(shù)的意義

雖然可以使用共享內(nèi)存進(jìn)行數(shù)據(jù)交換,但是共享內(nèi)存在不同的 goroutine 中容易發(fā)生競(jìng)態(tài)問(wèn)題。為了保證數(shù)據(jù)交換的正確性,很多并發(fā)模型中必須使用互斥鎖對(duì)內(nèi)存進(jìn)行加鎖,這種做法勢(shì)必造成性能問(wèn)題

Go語(yǔ)言采用的并發(fā)模型是CSP(Communicating Sequential Processes),提倡通過(guò)通信共享內(nèi)存,而不是通過(guò)共享內(nèi)存而實(shí)現(xiàn)通信

Go 語(yǔ)言中的通道(channel)是一種特殊的類型。通道像一個(gè)傳送帶或者隊(duì)列,總是遵循先入先出的規(guī)則,保證收發(fā)數(shù)據(jù)的順序。每一個(gè)通道都是一個(gè)具體類型的導(dǎo)管,也就是聲明channel的時(shí)候需要為其指定元素類型。

channel類型

聲明通道類型變量方法如下

var 變量名 chan 元素類型

其中chan是關(guān)鍵字,元素類型指通道中傳遞的元素的類型

舉幾個(gè)例子

var a chan int //聲明一個(gè)傳遞int類型的通道
var b chan string // 聲明一個(gè)傳遞string類型的通道
var c chan bool //聲明一個(gè)傳遞bool類型的通道

channel零值

未經(jīng)初始化的通道默認(rèn)值為nil

package main

import "fmt"

func main() {
	var a chan map[int]string
	fmt.Println(a)
}

以上代碼執(zhí)行結(jié)果如下

<nil>

初始化channel

聲明的通道類型變量需要使用內(nèi)置的make函數(shù)初始化之后才能使用,具體格式如下

make(chan 元素類型,[緩沖大小])

channel的緩沖大小是可選的

a:=make(chan int)
b:=make(chan int,10)//聲明一個(gè)緩沖大小為10的通道

channel操作

通道共有發(fā)送,接收,關(guān)閉三種操作,而發(fā)送和接收操作均用?<-?符號(hào),舉幾個(gè)例子

  • 聲明通道并初始化

a := make(chan int) //聲明一個(gè)通道并初始化

  • 給一個(gè)通道發(fā)送值

a <- 10  //把10發(fā)送給a通道

  • 從一個(gè)通道中取值

x := <-a //x從a通道中取值
<-a      //從a通道中取值,忽略結(jié)果

  • 關(guān)閉通道

close(a) //關(guān)閉通道

一個(gè)通道值是可以被垃圾回收掉的。通道通常由發(fā)送方執(zhí)行關(guān)閉操作,并且只有在接收方明確等待通道關(guān)閉的信號(hào)時(shí)才需要執(zhí)行關(guān)閉操作。它和關(guān)閉文件不一樣,通常在結(jié)束操作之后關(guān)閉文件是必須要做的,但關(guān)閉通道不是必須的。

關(guān)閉后的通道有以下特點(diǎn)

  • 對(duì)一個(gè)關(guān)閉的通道再發(fā)送值就會(huì)導(dǎo)致 panic。
  • 對(duì)一個(gè)關(guān)閉的通道進(jìn)行接收會(huì)一直獲取值直到通道為空。
  • 對(duì)一個(gè)關(guān)閉的并且沒(méi)有值的通道執(zhí)行接收操作會(huì)得到對(duì)應(yīng)類型的零值。
  • 關(guān)閉一個(gè)已經(jīng)關(guān)閉的通道會(huì)導(dǎo)致 panic。

無(wú)緩沖的通道

無(wú)緩沖的通道又稱為阻塞的通道,我們來(lái)看一下如下代碼片段

package main

import "fmt"

func main() {
	a := make(chan int)
	a <- 10
	fmt.Println("發(fā)送成功")
}

上面這段代碼能夠通過(guò)編譯,但是執(zhí)行時(shí)會(huì)報(bào)錯(cuò)

fatal error: all goroutines are asleep - deadlock!     

goroutine 1 [chan send]:
main.main()
        C:/Users/W3Cschool/Desktop/test/main.go:7 +0x31
exit status 2

deadlock表示我們程序中所有的goroutine都被掛起導(dǎo)致程序死鎖了,為什么會(huì)出現(xiàn)這種情況呢?

這是因?yàn)槲覀儎?chuàng)建的是一個(gè)無(wú)緩沖區(qū)的通道,無(wú)緩沖的通道只有在有接收方能夠接收值的時(shí)候才能發(fā)送成功,否則會(huì)一直處于等待發(fā)送的階段。同理,如果對(duì)一個(gè)無(wú)緩沖通道執(zhí)行接收操作時(shí),沒(méi)有任何向通道中發(fā)送值的操作那么也會(huì)導(dǎo)致接收操作阻塞。

我們可以創(chuàng)建一個(gè)goroutine去接收值,例如

package main

import "fmt"

func receive(x chan int) {
	ret := <-x
	fmt.Println("接收成功", ret)
}

func main() {
	a := make(chan int)
	go receive(a)
	a <- 10
	fmt.Println("發(fā)送成功")
}

以上代碼執(zhí)行結(jié)果如下

接收成功 10
發(fā)送成功

有緩沖區(qū)的通道

另外還有一種方法解決上述死鎖的問(wèn)題,那就是使用有緩沖區(qū)的通道。我們可以在使用make函數(shù)初始化通道時(shí),為其指定緩沖區(qū)大小,例如

package main

import "fmt"

func main() {
	a := make(chan int,1)
	a <- 10
	fmt.Println("發(fā)送成功")
}

以上代碼執(zhí)行結(jié)果如下

發(fā)送成功

只要通道的容量大于零,那么該通道就屬于有緩沖的通道,通道的容量表示通道中最大能存放的元素?cái)?shù)量。當(dāng)通道內(nèi)已有元素?cái)?shù)達(dá)到最大容量后,再向通道執(zhí)行發(fā)送操作就會(huì)阻塞,除非有從通道執(zhí)行接收操作。

我們可以使用內(nèi)置的len函數(shù)獲取通道的長(zhǎng)度,使用cap函數(shù)獲取通道的容量

判斷通道關(guān)閉

當(dāng)向通道中發(fā)送完數(shù)據(jù)時(shí),我們可以通過(guò)close函數(shù)來(lái)關(guān)閉通道。當(dāng)一個(gè)通道被關(guān)閉后,再往該通道發(fā)送值會(huì)引發(fā)panic。從該通道取值的操作會(huì)先取完通道中的值。通道內(nèi)的值被接收完后再對(duì)通道執(zhí)行接收操作得到的值會(huì)一直都是對(duì)應(yīng)元素類型的零值。那我們?nèi)绾闻袛嘁粋€(gè)通道是否被關(guān)閉了呢?

value, ok := <-ch

value:表示從通道中所取得的值

ok:若通道已關(guān)閉,返回false,否則返回true

以下代碼會(huì)不斷從通道中取值,直到通道被關(guān)閉后退出

package main

import "fmt"

func receive(ch chan int) {
	for {
		v, ok := <-ch
		if !ok {
			fmt.Println("通道已關(guān)閉")
			break
		}
		fmt.Printf("v:%#v ok:%#v\n", v, ok)
	}
}

func main() {
	ch := make(chan int, 1)
	ch <- 1
	close(ch)
	receive(ch)
}

以上代碼執(zhí)行結(jié)果如下

v:1 ok:true
通道已關(guān)閉

for range接收值

通常我們會(huì)使用for range循環(huán)來(lái)從通道中接收值,當(dāng)通道關(guān)閉后,會(huì)在通道內(nèi)所有值被取完之后退出循環(huán),上面的例子我們使用for range會(huì)更加簡(jiǎn)潔

package main

import "fmt"

func receive(ch chan int) {
	for i:=range ch{
		fmt.Printf("v:%v",i)
	}
}

func main() {
	ch := make(chan int, 1)
	ch <- 1
	close(ch)
	receive(ch)
}

以上代碼執(zhí)行結(jié)果如下

v:1

單向通道

在某些場(chǎng)景下我們可能會(huì)將通道作為參數(shù)在多個(gè)任務(wù)函數(shù)間進(jìn)行傳遞,通常我們會(huì)選擇在不同的任務(wù)函數(shù)中對(duì)通道的使用進(jìn)行限制,比如限制通道在某個(gè)函數(shù)中只能執(zhí)行發(fā)送或只能執(zhí)行接收操作

<- chan int // 只接收通道,只能接收不能發(fā)送
chan <- int // 只發(fā)送通道,只能發(fā)送不能接收

select多路復(fù)用

在某些場(chǎng)景下我們可能需要同時(shí)從多個(gè)通道接收數(shù)據(jù)。通道在接收數(shù)據(jù)時(shí),如果沒(méi)有數(shù)據(jù)可以被接收那么當(dāng)前 goroutine 將會(huì)發(fā)生阻塞。Go語(yǔ)言內(nèi)置了select關(guān)鍵字,使用它可以同時(shí)響應(yīng)多個(gè)通道的操作,具體格式如下

select {
case <-ch1:
	//...
case data := <-ch2:
	//...
case ch3 <- 10:
	//...
default:
	//默認(rèn)操作
}

select語(yǔ)句具有以下特點(diǎn)

  • 可處理一個(gè)或多個(gè)channel的發(fā)送/接收操作
  • 如果多個(gè)case同時(shí)滿足,select會(huì)隨機(jī)選擇一個(gè)執(zhí)行
  • 對(duì)于沒(méi)有case的select會(huì)一直阻塞,可用于阻塞 main 函數(shù),防止退出

下面這段代碼在終端中打印1-10之間的奇數(shù),借助這段代碼來(lái)看下select的使用方法

package main

import "fmt"

func main() {
	ch := make(chan int, 1)//創(chuàng)建一個(gè)類型為int,緩沖區(qū)大小為1的通道
	for i := 1; i <= 10; i++ {
		select {
		case x := <-ch://第一次循環(huán)由于沒(méi)有值,所以該分支不滿足
			fmt.Println(x)
		case ch <- i://將i發(fā)送給通道(由于緩沖區(qū)大小為1,緩沖區(qū)已滿,第二次不會(huì)走該分支)
		}
	}
}

以上代碼執(zhí)行結(jié)果如下

1
3
5
7
9

并發(fā)安全和互斥鎖

有時(shí)候我們的代碼中可能會(huì)存在多個(gè) goroutine 同時(shí)操作一個(gè)資源的情況,這種情況下就會(huì)發(fā)生數(shù)據(jù)讀寫錯(cuò)亂的問(wèn)題,例如下面這段代碼

package main

import (
	"fmt"
	"sync"
)

var (
	x int64
	wg sync.WaitGroup // 等待組
)

// add 對(duì)全局變量x執(zhí)行5000次加1操作
func add() {
	for i := 0; i < 5000; i++ {
		x = x + 1
	}
	wg.Done()
}

func main() {
	wg.Add(2)
	go add()
	go add()
	wg.Wait()
	fmt.Println(x)
}

我們將上述代碼執(zhí)行多次,不出意外會(huì)輸出許多不同的結(jié)果,這是為什么呢?

因?yàn)樵谏鲜龃a中,我們開(kāi)啟了2個(gè)goroutine去執(zhí)行add函數(shù),某個(gè)goroutine對(duì)全局變量x的修改可能會(huì)覆蓋掉另外一個(gè)goroutine中的操作,所以導(dǎo)致結(jié)果與預(yù)期不符

互斥鎖

互斥鎖是一種常用的控制共享資源訪問(wèn)的方法,它能夠保證同一時(shí)間只有一個(gè) goroutine 可以訪問(wèn)共享資源。Go語(yǔ)言中使用sync包中提供的Mutex類型來(lái)實(shí)現(xiàn)互斥鎖

我們?cè)谙旅娴拇a中使用互斥鎖限制每次只有一個(gè)goroutine能修改全局變量x,從而解決上述問(wèn)題

package main

import (
	"fmt"
	"sync"
)

var (
	x  int64
	wg sync.WaitGroup
	m  sync.Mutex // 互斥鎖
)

func add() {
	for i := 0; i < 5000; i++ {
		m.Lock() // 修改x前加鎖
		x = x + 1
		m.Unlock() // 改完解鎖
	}
	wg.Done()
}

func main() {
	wg.Add(2)
	go add()
	go add()
	wg.Wait()
	fmt.Println(x)
}

將上述代碼編譯后多次執(zhí)行,最終結(jié)果都會(huì)是10000

使用互斥鎖能夠保證同一時(shí)間有且只有一個(gè) goroutine 進(jìn)入臨界區(qū),其他的 goroutine 則在等待鎖;當(dāng)互斥鎖釋放后,等待的 goroutine 才可以獲取鎖進(jìn)入臨界區(qū),多個(gè) goroutine 同時(shí)等待一個(gè)鎖時(shí),喚醒的策略是隨機(jī)的

讀寫互斥鎖

互斥鎖是完全互斥的,但是實(shí)際上有很多場(chǎng)景是讀多寫少的,當(dāng)我們并發(fā)的去讀取一個(gè)資源而不涉及資源修改的時(shí)候是沒(méi)有必要加互斥鎖的,這種場(chǎng)景下使用讀寫鎖是更好的一種選擇。在Go語(yǔ)言中使用sync包中的RWMutex類型來(lái)實(shí)現(xiàn)讀寫互斥鎖

讀寫鎖分為兩種:讀鎖和寫鎖。當(dāng)一個(gè) goroutine 獲取到讀鎖之后,其他的 goroutine 如果是獲取讀鎖會(huì)繼續(xù)獲得鎖,如果是獲取寫鎖就會(huì)等待;而當(dāng)一個(gè) goroutine 獲取寫鎖之后,其他的 goroutine 無(wú)論是獲取讀鎖還是寫鎖都會(huì)等待

以下為讀多寫少場(chǎng)景

package main

import (
	"fmt"
	"sync"
	"time"
)

var (
	x  = 0
	wg sync.WaitGroup
	// lock sync.Mutex
	rwlock sync.RWMutex
)

func read() {
	defer wg.Done()
	// lock.Lock()
	rwlock.RLock()
	fmt.Println(x)
	time.Sleep(time.Millisecond)
	rwlock.RUnlock()
	// lock.Unlock()
}

func write() {
	defer wg.Done()
	rwlock.Lock()
	// lock.Lock()
	x += 1
	time.Sleep(time.Millisecond * 5)
	rwlock.Unlock()
	// lock.Unlock()
}

func main() {
	start := time.Now()
	for i := 0; i < 10; i++ {
		go write()
		wg.Add(1)
	}
	time.Sleep(time.Second)
	for i := 0; i < 1000; i++ {
		go read()
		wg.Add(1)
	}
	wg.Wait()
	fmt.Println(time.Since(start))
}


以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)