Go語言 非阻塞io

2018-07-25 16:08 更新

Go提供的網(wǎng)絡(luò)接口,在用戶層是阻塞的,這樣最符合人們的編程習(xí)慣。在runtime層面,是用epoll/kqueue實現(xiàn)的非阻塞io,為性能提供了保障。

如何實現(xiàn)

底層非阻塞io是如何實現(xiàn)的呢?簡單地說,所有文件描述符都被設(shè)置成非阻塞的,某個goroutine進行io操作,讀或者寫文件描述符,如果此刻io還沒準備好,則這個goroutine會被放到系統(tǒng)的等待隊列中,這個goroutine失去了運行權(quán),但并不是真正的整個系統(tǒng)“阻塞”于系統(tǒng)調(diào)用。

后臺還有一個poller會不停地進行poll,所有的文件描述符都被添加到了這個poller中的,當(dāng)某個時刻一個文件描述符準備好了,poller就會喚醒之前因它而阻塞的goroutine,于是goroutine重新運行起來。

這個poller是在后臺一直運行的,前面分析系統(tǒng)調(diào)度章節(jié)時為了簡化并沒有提起它。其實在proc.c文件中,runtime.main函數(shù)的第一行代碼就是

newm(sysmon, nil);

這個意思就是新建一個M并讓它運行sysmon函數(shù),前面說過M就是機器的抽象,它會直接開一個物理線程。sysmon里面是個死循環(huán),每睡眠一小會兒就會調(diào)用runtime.epoll函數(shù),這個sysmon就是所謂的poller。

poller是一個比gc更高優(yōu)先級的東西,何以見得呢?首先,垃圾回收只是用runtime.newproc建立出來的,它僅僅是個goroutine任務(wù),而poller是直接用newm建立出來的,它跟startm是平級的。也就相當(dāng)于gc只是線程池里的任務(wù),而poller自身直接就是worker。然后,gc只是被觸發(fā)性地發(fā)生的,是被動的。而poller卻是每隔很短時間就會主動運行。

封裝層次

從最原始的epoll系統(tǒng)調(diào)用,到提供給用戶的網(wǎng)絡(luò)庫函數(shù),可以分成三個封裝層次。這三個層次分別是,依賴于系統(tǒng)的api封裝,平臺獨立的runtime封裝,提供給用戶的庫的封裝。

最下面一層是依賴于系統(tǒng)部分的封裝。各個平臺下的實現(xiàn)并不一樣,比如linux下是封裝的epoll,freebsd下是封裝的kqueue。以linux為例,實現(xiàn)了一組調(diào)用epoll相關(guān)系統(tǒng)調(diào)用的封裝:

int32 runtime·epollcreate(int32 size);
int32 runtime·epollcreate1(int32 flags);
int32 runtime·epollctl(int32 epfd, int32 op, int32 fd, EpollEvent *ev);
int32 runtime·epollwait(int32 epfd, EpollEvent *ev, int32 nev, int32 timeout);
void runtime·closeonexec(int32 fd);

它們都是直接使用匯編調(diào)用系統(tǒng)調(diào)用實現(xiàn)的,比如:

TEXT runtime·epollcreate1(SB),7,$0
    MOVL    8(SP), DI
    MOVL    $291, AX            // syscall entry
    SYSCALL
    RET

這些函數(shù)還要繼續(xù)被封裝成下面一組函數(shù):

runtime·netpollinit(void);
runtime·netpollopen(int32 fd, PollDesc *pd);
runtime·netpollready(G **gpp, PollDesc *pd, int32 mode);

runtime·netpollinit是對poller進行初始化。 runtime·netpollopen是對fd和pd進行關(guān)聯(lián),實現(xiàn)邊沿觸發(fā)通知。 runtime·netpollready,使用前必須調(diào)用這個函數(shù)來表示fd是就緒的

不管是哪個平臺,最終都會將依賴于系統(tǒng)的部分封裝好,提供上面這樣一組函數(shù)供runtime使用。

接下來是平臺獨立的poller的封裝,也就是runtime層的封裝。這一層封裝是最復(fù)雜的,它對外提供的一組接口是:

func runtime_pollServerInit()
func runtime_pollOpen(fd int) (pd *PollDesc, errno int)
func runtime_pollClose(pd *PollDesc)
func runtime_pollReset(pd *PollDesc, mode int) (err int)
func runtime_pollWait(pd *PollDesc, mode int) (err int)
func runtime_pollSetDeadline(pd *PollDesc, d int64, mode int)
func runtime_pollUnblock(pd *PollDesc)

這一組函數(shù)是由runtime封裝好,提供給net包調(diào)用的。里面定義了一個PollDesc的結(jié)構(gòu)體,將fd和對應(yīng)的goroutine封裝起來,從而實現(xiàn)當(dāng)goroutine讀寫fd阻塞時,將goroutine變?yōu)镚waiting。等一下回頭再看實現(xiàn)的細節(jié)。

最后一層封裝層次是提供給用戶的net包。在net包中網(wǎng)絡(luò)文件描述符都是用一個netFD結(jié)構(gòu)體來表示的,其中有個成員就是pollDesc。

// 網(wǎng)絡(luò)文件描述符
type netFD struct {
    sysmu  sync.Mutex
    sysref int

    // must lock both sysmu and pollDesc to write
    // can lock either to read
    closing bool

    // immutable until Close
    sysfd       int
    family      int
    sotype      int
    isConnected bool
    sysfile     *os.File
    net         string
    laddr       Addr
    raddr       Addr

    // serialize access to Read and Write methods
    rio, wio sync.Mutex

    // wait server
    pd pollDesc
}

所有用戶的net包的調(diào)用最終調(diào)用到pollDesc的上面那一組函數(shù)中,這樣就實現(xiàn)了當(dāng)goroutine讀或?qū)懽枞麜r會被放到等待隊列。最終的效果就是用戶層阻塞,底層非阻塞。

文件描述符和goroutine

當(dāng)一個goroutine進行io阻塞時,會去被放到等待隊列。這里面就關(guān)鍵的就是建立起文件描述符和goroutine之間的關(guān)聯(lián)。pollDesc結(jié)構(gòu)體就是完成這個任務(wù)的。它的結(jié)構(gòu)體定義如下:

struct PollDesc
{
    PollDesc* link;    // in pollcache, protected by pollcache.Lock
    Lock;        // protectes the following fields
    int32    fd;
    bool    closing;
    uintptr    seq;    // protects from stale timers and ready notifications
    G*    rg;    // 因讀這個fd而阻塞的G,等待READY信號
    Timer    rt;    // read deadline timer (set if rt.fv != nil)
    int64    rd;    // read deadline
    G*    wg;    // 因?qū)戇@個fd而阻塞的goroutines
    Timer    wt;
    int64    wd;
};

這個結(jié)構(gòu)體是重用的,其中l(wèi)ink就是將它鏈起來。PollDesc對象必須是類型穩(wěn)定的,因為在描述符關(guān)閉/重用之后我們會得到epoll/kqueue就緒通知。結(jié)構(gòu)體中有一個seq序號,穩(wěn)定的通知是通過使用這個序號實現(xiàn)的,當(dāng)deadline改變或者描述符重用時,序號會增加。

runtime_pollServerInit的實現(xiàn)就是調(diào)用更下層的runtime·netpollinit函數(shù)。 runtime_pollOpen從PollDesc結(jié)構(gòu)體緩存中拿一個出來,設(shè)置好它的fd。之所以叫Open而不是new,就是因為PollDesc結(jié)構(gòu)體是重用的。 runtime_pollClose函數(shù)調(diào)用runtime·netpollclose后將PollDesc結(jié)構(gòu)體放回緩存。

這些都還沒涉及到fd與goroutine交互部分,僅僅是直接對epoll的調(diào)用。從下面這個函數(shù)可以看到fd與goroutine交互部分:

func runtime_pollWait(pd *PollDesc, mode int) (err int)

它會調(diào)用到netpollblock,這個函數(shù)是這樣子的:

static void
netpollblock(PollDesc *pd, int32 mode)
{
    G **gpp;

    gpp = &pd->rg;
    if(mode == 'w')
        gpp = &pd->wg;
    if(*gpp == READY) {
        *gpp = nil;
        return;
    }
    if(*gpp != nil)
        runtime·throw("epoll: double wait");
    *gpp = g;
    runtime·park(runtime·unlock, &pd->Lock, "IO wait");
    runtime·lock(pd);
}

最后的runtime.park函數(shù),就是將當(dāng)前的goroutine(調(diào)用者)設(shè)置為waiting狀態(tài)。

上面這一部分是goroutine被放到等待隊列的部分,下面看它被喚醒的部分。在sysmon函數(shù)中,會不停地調(diào)用runtime.epoll,這個函數(shù)對就緒的網(wǎng)絡(luò)連接進行poll,返回可運行的goroutine。epoll只能知道哪個fd就緒了,那么它怎么知道哪個goroutine就緒了呢?原來epoll的data域存放的就是PollDesc結(jié)構(gòu)體指針。因此就可以得到其中的goroutine了。


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號