Go 語言 gRPC 入門

2023-03-22 15:02 更新

原文鏈接:https://chai2010.cn/advanced-go-programming-book/ch4-rpc/ch4-04-grpc.html


4.4 gRPC 入門

gRPC 是 Google 公司基于 Protobuf 開發(fā)的跨語言的開源 RPC 框架。gRPC 基于 HTTP/2 協(xié)議設(shè)計,可以基于一個 HTTP/2 連接提供多個服務(wù),對于移動設(shè)備更加友好。本節(jié)將講述 gRPC 的簡單用法。

4.4.1 gRPC 技術(shù)棧

Go 語言的 gRPC 技術(shù)棧如圖 4-1 所示:


圖 4-1 gRPC 技術(shù)棧

最底層為 TCP 或 Unix Socket 協(xié)議,在此之上是 HTTP/2 協(xié)議的實現(xiàn),然后在 HTTP/2 協(xié)議之上又構(gòu)建了針對 Go 語言的 gRPC 核心庫。應(yīng)用程序通過 gRPC 插件生產(chǎn)的 Stub 代碼和 gRPC 核心庫通信,也可以直接和 gRPC 核心庫通信。

4.4.2 gRPC 入門

如果從 Protobuf 的角度看,gRPC 只不過是一個針對 service 接口生成代碼的生成器。我們在本章的第二節(jié)中手工實現(xiàn)了一個簡單的 Protobuf 代碼生成器插件,只不過當(dāng)時生成的代碼是適配標(biāo)準(zhǔn)庫的 RPC 框架的?,F(xiàn)在我們將學(xué)習(xí) gRPC 的用法。

創(chuàng)建 hello.proto 文件,定義 HelloService 接口:

syntax = "proto3";

package main;

message String {
	string value = 1;
}

service HelloService {
	rpc Hello (String) returns (String);
}

使用 protoc-gen-go 內(nèi)置的 gRPC 插件生成 gRPC 代碼:

$ protoc --go_out=plugins=grpc:. hello.proto

gRPC 插件會為服務(wù)端和客戶端生成不同的接口:

type HelloServiceServer interface {
    Hello(context.Context, *String) (*String, error)
}

type HelloServiceClient interface {
    Hello(context.Context, *String, ...grpc.CallOption) (*String, error)
}

gRPC 通過 context.Context 參數(shù),為每個方法調(diào)用提供了上下文支持。客戶端在調(diào)用方法的時候,可以通過可選的 grpc.CallOption 類型的參數(shù)提供額外的上下文信息。

基于服務(wù)端的 HelloServiceServer 接口可以重新實現(xiàn) HelloService 服務(wù):

type HelloServiceImpl struct{}

func (p *HelloServiceImpl) Hello(
    ctx context.Context, args *String,
) (*String, error) {
    reply := &String{Value: "hello:" + args.GetValue()}
    return reply, nil
}

gRPC 服務(wù)的啟動流程和標(biāo)準(zhǔn)庫的 RPC 服務(wù)啟動流程類似:

func main() {
    grpcServer := grpc.NewServer()
    RegisterHelloServiceServer(grpcServer, new(HelloServiceImpl))

    lis, err := net.Listen("tcp", ":1234")
    if err != nil {
        log.Fatal(err)
    }
    grpcServer.Serve(lis)
}

首先是通過 grpc.NewServer() 構(gòu)造一個 gRPC 服務(wù)對象,然后通過 gRPC 插件生成的 RegisterHelloServiceServer 函數(shù)注冊我們實現(xiàn)的 HelloServiceImpl 服務(wù)。然后通過 grpcServer.Serve(lis) 在一個監(jiān)聽端口上提供 gRPC 服務(wù)。

然后就可以通過客戶端連接 gRPC 服務(wù)了:

func main() {
    conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    client := NewHelloServiceClient(conn)
    reply, err := client.Hello(context.Background(), &String{Value: "hello"})
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println(reply.GetValue())
}

其中 grpc.Dial 負(fù)責(zé)和 gRPC 服務(wù)建立連接,然后 NewHelloServiceClient 函數(shù)基于已經(jīng)建立的連接構(gòu)造 HelloServiceClient 對象。返回的 client 其實是一個 HelloServiceClient 接口對象,通過接口定義的方法就可以調(diào)用服務(wù)端對應(yīng)的 gRPC 服務(wù)提供的方法。

gRPC 和標(biāo)準(zhǔn)庫的 RPC 框架有一個區(qū)別,gRPC 生成的接口并不支持異步調(diào)用。不過我們可以在多個 Goroutine 之間安全地共享 gRPC 底層的 HTTP/2 連接,因此可以通過在另一個 Goroutine 阻塞調(diào)用的方式模擬異步調(diào)用。

4.4.3 gRPC 流

RPC 是遠(yuǎn)程函數(shù)調(diào)用,因此每次調(diào)用的函數(shù)參數(shù)和返回值不能太大,否則將嚴(yán)重影響每次調(diào)用的響應(yīng)時間。因此傳統(tǒng)的 RPC 方法調(diào)用對于上傳和下載較大數(shù)據(jù)量場景并不適合。同時傳統(tǒng) RPC 模式也不適用于對時間不確定的訂閱和發(fā)布模式。為此,gRPC 框架針對服務(wù)器端和客戶端分別提供了流特性。

服務(wù)端或客戶端的單向流是雙向流的特例,我們在 HelloService 增加一個支持雙向流的 Channel 方法:

service HelloService {
	rpc Hello (String) returns (String);

	rpc Channel (stream String) returns (stream String);
}

關(guān)鍵字 stream 指定啟用流特性,參數(shù)部分是接收客戶端參數(shù)的流,返回值是返回給客戶端的流。

重新生成代碼可以看到接口中新增加的 Channel 方法的定義:

type HelloServiceServer interface {
    Hello(context.Context, *String) (*String, error)
    Channel(HelloService_ChannelServer) error
}
type HelloServiceClient interface {
    Hello(ctx context.Context, in *String, opts ...grpc.CallOption) (
        *String, error,
    )
    Channel(ctx context.Context, opts ...grpc.CallOption) (
        HelloService_ChannelClient, error,
    )
}

在服務(wù)端的 Channel 方法參數(shù)是一個新的 HelloService_ChannelServer 類型的參數(shù),可以用于和客戶端雙向通信。客戶端的 Channel 方法返回一個 HelloService_ChannelClient 類型的返回值,可以用于和服務(wù)端進(jìn)行雙向通信。

HelloService_ChannelServer 和 HelloService_ChannelClient 均為接口類型:

type HelloService_ChannelServer interface {
    Send(*String) error
    Recv() (*String, error)
    grpc.ServerStream
}

type HelloService_ChannelClient interface {
    Send(*String) error
    Recv() (*String, error)
    grpc.ClientStream
}

可以發(fā)現(xiàn)服務(wù)端和客戶端的流輔助接口均定義了 Send 和 Recv 方法用于流數(shù)據(jù)的雙向通信。

現(xiàn)在我們可以實現(xiàn)流服務(wù):

func (p *HelloServiceImpl) Channel(stream HelloService_ChannelServer) error {
    for {
        args, err := stream.Recv()
        if err != nil {
            if err == io.EOF {
                return nil
            }
            return err
        }

        reply := &String{Value: "hello:" + args.GetValue()}

        err = stream.Send(reply)
        if err != nil {
            return err
        }
    }
}

服務(wù)端在循環(huán)中接收客戶端發(fā)來的數(shù)據(jù),如果遇到 io.EOF 表示客戶端流被關(guān)閉,如果函數(shù)退出表示服務(wù)端流關(guān)閉。生成返回的數(shù)據(jù)通過流發(fā)送給客戶端,雙向流數(shù)據(jù)的發(fā)送和接收都是完全獨立的行為。需要注意的是,發(fā)送和接收的操作并不需要一一對應(yīng),用戶可以根據(jù)真實場景進(jìn)行組織代碼。

客戶端需要先調(diào)用 Channel 方法獲取返回的流對象:

stream, err := client.Channel(context.Background())
if err != nil {
    log.Fatal(err)
}

在客戶端我們將發(fā)送和接收操作放到兩個獨立的 Goroutine。首先是向服務(wù)端發(fā)送數(shù)據(jù):

go func() {
    for {
        if err := stream.Send(&String{Value: "hi"}); err != nil {
            log.Fatal(err)
        }
        time.Sleep(time.Second)
    }
}()

然后在循環(huán)中接收服務(wù)端返回的數(shù)據(jù):

for {
    reply, err := stream.Recv()
    if err != nil {
        if err == io.EOF {
            break
        }
        log.Fatal(err)
    }
    fmt.Println(reply.GetValue())
}

這樣就完成了完整的流接收和發(fā)送支持。

4.4.4 發(fā)布和訂閱模式

在前一節(jié)中,我們基于 Go 內(nèi)置的 RPC 庫實現(xiàn)了一個簡化版的 Watch 方法?;?Watch 的思路雖然也可以構(gòu)造發(fā)布和訂閱系統(tǒng),但是因為 RPC 缺乏流機制導(dǎo)致每次只能返回一個結(jié)果。在發(fā)布和訂閱模式中,由調(diào)用者主動發(fā)起的發(fā)布行為類似一個普通函數(shù)調(diào)用,而被動的訂閱者則類似 gRPC 客戶端單向流中的接收者?,F(xiàn)在我們可以嘗試基于 gRPC 的流特性構(gòu)造一個發(fā)布和訂閱系統(tǒng)。

發(fā)布訂閱是一個常見的設(shè)計模式,開源社區(qū)中已經(jīng)存在很多該模式的實現(xiàn)。其中 docker 項目中提供了一個 pubsub 的極簡實現(xiàn),下面是基于 pubsub 包實現(xiàn)的本地發(fā)布訂閱代碼:

import (
    "github.com/moby/moby/pkg/pubsub"
)

func main() {
    p := pubsub.NewPublisher(100*time.Millisecond, 10)

    golang := p.SubscribeTopic(func(v interface{}) bool {
        if key, ok := v.(string); ok {
            if strings.HasPrefix(key, "golang:") {
                return true
            }
        }
        return false
    })
    docker := p.SubscribeTopic(func(v interface{}) bool {
        if key, ok := v.(string); ok {
            if strings.HasPrefix(key, "docker:") {
                return true
            }
        }
        return false
    })

    go p.Publish("hi")
    go p.Publish("golang: https://golang.org")
    go p.Publish("docker: https://www.docker.com/")
    time.Sleep(1)

    go func() {
        fmt.Println("golang topic:", <-golang)
    }()
    go func() {
        fmt.Println("docker topic:", <-docker)
    }()

    <-make(chan bool)
}

其中 pubsub.NewPublisher 構(gòu)造一個發(fā)布對象,p.SubscribeTopic() 可以通過函數(shù)篩選感興趣的主題進(jìn)行訂閱。

現(xiàn)在嘗試基于 gRPC 和 pubsub 包,提供一個跨網(wǎng)絡(luò)的發(fā)布和訂閱系統(tǒng)。首先通過 Protobuf 定義一個發(fā)布訂閱服務(wù)接口:

service PubsubService {
	rpc Publish (String) returns (String);
	rpc Subscribe (String) returns (stream String);
}

其中 Publish 是普通的 RPC 方法,Subscribe 則是一個單向的流服務(wù)。然后 gRPC 插件會為服務(wù)端和客戶端生成對應(yīng)的接口:

type PubsubServiceServer interface {
    Publish(context.Context, *String) (*String, error)
    Subscribe(*String, PubsubService_SubscribeServer) error
}
type PubsubServiceClient interface {
    Publish(context.Context, *String, ...grpc.CallOption) (*String, error)
    Subscribe(context.Context, *String, ...grpc.CallOption) (
        PubsubService_SubscribeClient, error,
    )
}

type PubsubService_SubscribeServer interface {
    Send(*String) error
    grpc.ServerStream
}

因為 Subscribe 是服務(wù)端的單向流,因此生成的 PubsubService_SubscribeServer 接口中只有 Send 方法。

然后就可以實現(xiàn)發(fā)布和訂閱服務(wù)了:

type PubsubService struct {
    pub *pubsub.Publisher
}

func NewPubsubService() *PubsubService {
    return &PubsubService{
        pub: pubsub.NewPublisher(100*time.Millisecond, 10),
    }
}

然后是實現(xiàn)發(fā)布方法和訂閱方法:

func (p *PubsubService) Publish(
    ctx context.Context, arg *String,
) (*String, error) {
    p.pub.Publish(arg.GetValue())
    return &String{}, nil
}

func (p *PubsubService) Subscribe(
    arg *String, stream PubsubService_SubscribeServer,
) error {
    ch := p.pub.SubscribeTopic(func(v interface{}) bool {
        if key, ok := v.(string); ok {
            if strings.HasPrefix(key,arg.GetValue()) {
                return true
            }
        }
        return false
    })

    for v := range ch {
        if err := stream.Send(&String{Value: v.(string)}); err != nil {
            return err
        }
    }

    return nil
}

這樣就可以從客戶端向服務(wù)器發(fā)布信息了:

func main() {
    conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    client := NewPubsubServiceClient(conn)

    _, err = client.Publish(
        context.Background(), &String{Value: "golang: hello Go"},
    )
    if err != nil {
        log.Fatal(err)
    }
    _, err = client.Publish(
        context.Background(), &String{Value: "docker: hello Docker"},
    )
    if err != nil {
        log.Fatal(err)
    }
}

然后就可以在另一個客戶端進(jìn)行訂閱信息了:

func main() {
    conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    client := NewPubsubServiceClient(conn)
    stream, err := client.Subscribe(
        context.Background(), &String{Value: "golang:"},
    )
    if err != nil {
        log.Fatal(err)
    }

    for {
        reply, err := stream.Recv()
        if err != nil {
            if err == io.EOF {
                break
            }
            log.Fatal(err)
        }

        fmt.Println(reply.GetValue())
    }
}

到此我們就基于 gRPC 簡單實現(xiàn)了一個跨網(wǎng)絡(luò)的發(fā)布和訂閱服務(wù)。



以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號