原文鏈接:https://chai2010.cn/advanced-go-programming-book/ch4-rpc/ch4-04-grpc.html
gRPC 是 Google 公司基于 Protobuf 開發(fā)的跨語言的開源 RPC 框架。gRPC 基于 HTTP/2 協(xié)議設(shè)計,可以基于一個 HTTP/2 連接提供多個服務(wù),對于移動設(shè)備更加友好。本節(jié)將講述 gRPC 的簡單用法。
Go 語言的 gRPC 技術(shù)棧如圖 4-1 所示:
圖 4-1 gRPC 技術(shù)棧
最底層為 TCP 或 Unix Socket 協(xié)議,在此之上是 HTTP/2 協(xié)議的實現(xiàn),然后在 HTTP/2 協(xié)議之上又構(gòu)建了針對 Go 語言的 gRPC 核心庫。應(yīng)用程序通過 gRPC 插件生產(chǎn)的 Stub 代碼和 gRPC 核心庫通信,也可以直接和 gRPC 核心庫通信。
如果從 Protobuf 的角度看,gRPC 只不過是一個針對 service 接口生成代碼的生成器。我們在本章的第二節(jié)中手工實現(xiàn)了一個簡單的 Protobuf 代碼生成器插件,只不過當(dāng)時生成的代碼是適配標(biāo)準(zhǔn)庫的 RPC 框架的?,F(xiàn)在我們將學(xué)習(xí) gRPC 的用法。
創(chuàng)建 hello.proto 文件,定義 HelloService 接口:
syntax = "proto3";
package main;
message String {
string value = 1;
}
service HelloService {
rpc Hello (String) returns (String);
}
使用 protoc-gen-go 內(nèi)置的 gRPC 插件生成 gRPC 代碼:
$ protoc --go_out=plugins=grpc:. hello.proto
gRPC 插件會為服務(wù)端和客戶端生成不同的接口:
type HelloServiceServer interface {
Hello(context.Context, *String) (*String, error)
}
type HelloServiceClient interface {
Hello(context.Context, *String, ...grpc.CallOption) (*String, error)
}
gRPC 通過 context.Context 參數(shù),為每個方法調(diào)用提供了上下文支持。客戶端在調(diào)用方法的時候,可以通過可選的 grpc.CallOption 類型的參數(shù)提供額外的上下文信息。
基于服務(wù)端的 HelloServiceServer 接口可以重新實現(xiàn) HelloService 服務(wù):
type HelloServiceImpl struct{}
func (p *HelloServiceImpl) Hello(
ctx context.Context, args *String,
) (*String, error) {
reply := &String{Value: "hello:" + args.GetValue()}
return reply, nil
}
gRPC 服務(wù)的啟動流程和標(biāo)準(zhǔn)庫的 RPC 服務(wù)啟動流程類似:
func main() {
grpcServer := grpc.NewServer()
RegisterHelloServiceServer(grpcServer, new(HelloServiceImpl))
lis, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal(err)
}
grpcServer.Serve(lis)
}
首先是通過 grpc.NewServer()
構(gòu)造一個 gRPC 服務(wù)對象,然后通過 gRPC 插件生成的 RegisterHelloServiceServer 函數(shù)注冊我們實現(xiàn)的 HelloServiceImpl 服務(wù)。然后通過 grpcServer.Serve(lis)
在一個監(jiān)聽端口上提供 gRPC 服務(wù)。
然后就可以通過客戶端連接 gRPC 服務(wù)了:
func main() {
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := NewHelloServiceClient(conn)
reply, err := client.Hello(context.Background(), &String{Value: "hello"})
if err != nil {
log.Fatal(err)
}
fmt.Println(reply.GetValue())
}
其中 grpc.Dial 負(fù)責(zé)和 gRPC 服務(wù)建立連接,然后 NewHelloServiceClient 函數(shù)基于已經(jīng)建立的連接構(gòu)造 HelloServiceClient 對象。返回的 client 其實是一個 HelloServiceClient 接口對象,通過接口定義的方法就可以調(diào)用服務(wù)端對應(yīng)的 gRPC 服務(wù)提供的方法。
gRPC 和標(biāo)準(zhǔn)庫的 RPC 框架有一個區(qū)別,gRPC 生成的接口并不支持異步調(diào)用。不過我們可以在多個 Goroutine 之間安全地共享 gRPC 底層的 HTTP/2 連接,因此可以通過在另一個 Goroutine 阻塞調(diào)用的方式模擬異步調(diào)用。
RPC 是遠(yuǎn)程函數(shù)調(diào)用,因此每次調(diào)用的函數(shù)參數(shù)和返回值不能太大,否則將嚴(yán)重影響每次調(diào)用的響應(yīng)時間。因此傳統(tǒng)的 RPC 方法調(diào)用對于上傳和下載較大數(shù)據(jù)量場景并不適合。同時傳統(tǒng) RPC 模式也不適用于對時間不確定的訂閱和發(fā)布模式。為此,gRPC 框架針對服務(wù)器端和客戶端分別提供了流特性。
服務(wù)端或客戶端的單向流是雙向流的特例,我們在 HelloService 增加一個支持雙向流的 Channel 方法:
service HelloService {
rpc Hello (String) returns (String);
rpc Channel (stream String) returns (stream String);
}
關(guān)鍵字 stream 指定啟用流特性,參數(shù)部分是接收客戶端參數(shù)的流,返回值是返回給客戶端的流。
重新生成代碼可以看到接口中新增加的 Channel 方法的定義:
type HelloServiceServer interface {
Hello(context.Context, *String) (*String, error)
Channel(HelloService_ChannelServer) error
}
type HelloServiceClient interface {
Hello(ctx context.Context, in *String, opts ...grpc.CallOption) (
*String, error,
)
Channel(ctx context.Context, opts ...grpc.CallOption) (
HelloService_ChannelClient, error,
)
}
在服務(wù)端的 Channel 方法參數(shù)是一個新的 HelloService_ChannelServer 類型的參數(shù),可以用于和客戶端雙向通信。客戶端的 Channel 方法返回一個 HelloService_ChannelClient 類型的返回值,可以用于和服務(wù)端進(jìn)行雙向通信。
HelloService_ChannelServer 和 HelloService_ChannelClient 均為接口類型:
type HelloService_ChannelServer interface {
Send(*String) error
Recv() (*String, error)
grpc.ServerStream
}
type HelloService_ChannelClient interface {
Send(*String) error
Recv() (*String, error)
grpc.ClientStream
}
可以發(fā)現(xiàn)服務(wù)端和客戶端的流輔助接口均定義了 Send 和 Recv 方法用于流數(shù)據(jù)的雙向通信。
現(xiàn)在我們可以實現(xiàn)流服務(wù):
func (p *HelloServiceImpl) Channel(stream HelloService_ChannelServer) error {
for {
args, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
reply := &String{Value: "hello:" + args.GetValue()}
err = stream.Send(reply)
if err != nil {
return err
}
}
}
服務(wù)端在循環(huán)中接收客戶端發(fā)來的數(shù)據(jù),如果遇到 io.EOF 表示客戶端流被關(guān)閉,如果函數(shù)退出表示服務(wù)端流關(guān)閉。生成返回的數(shù)據(jù)通過流發(fā)送給客戶端,雙向流數(shù)據(jù)的發(fā)送和接收都是完全獨立的行為。需要注意的是,發(fā)送和接收的操作并不需要一一對應(yīng),用戶可以根據(jù)真實場景進(jìn)行組織代碼。
客戶端需要先調(diào)用 Channel 方法獲取返回的流對象:
stream, err := client.Channel(context.Background())
if err != nil {
log.Fatal(err)
}
在客戶端我們將發(fā)送和接收操作放到兩個獨立的 Goroutine。首先是向服務(wù)端發(fā)送數(shù)據(jù):
go func() {
for {
if err := stream.Send(&String{Value: "hi"}); err != nil {
log.Fatal(err)
}
time.Sleep(time.Second)
}
}()
然后在循環(huán)中接收服務(wù)端返回的數(shù)據(jù):
for {
reply, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}
fmt.Println(reply.GetValue())
}
這樣就完成了完整的流接收和發(fā)送支持。
在前一節(jié)中,我們基于 Go 內(nèi)置的 RPC 庫實現(xiàn)了一個簡化版的 Watch 方法?;?Watch 的思路雖然也可以構(gòu)造發(fā)布和訂閱系統(tǒng),但是因為 RPC 缺乏流機制導(dǎo)致每次只能返回一個結(jié)果。在發(fā)布和訂閱模式中,由調(diào)用者主動發(fā)起的發(fā)布行為類似一個普通函數(shù)調(diào)用,而被動的訂閱者則類似 gRPC 客戶端單向流中的接收者?,F(xiàn)在我們可以嘗試基于 gRPC 的流特性構(gòu)造一個發(fā)布和訂閱系統(tǒng)。
發(fā)布訂閱是一個常見的設(shè)計模式,開源社區(qū)中已經(jīng)存在很多該模式的實現(xiàn)。其中 docker 項目中提供了一個 pubsub 的極簡實現(xiàn),下面是基于 pubsub 包實現(xiàn)的本地發(fā)布訂閱代碼:
import (
"github.com/moby/moby/pkg/pubsub"
)
func main() {
p := pubsub.NewPublisher(100*time.Millisecond, 10)
golang := p.SubscribeTopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
if strings.HasPrefix(key, "golang:") {
return true
}
}
return false
})
docker := p.SubscribeTopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
if strings.HasPrefix(key, "docker:") {
return true
}
}
return false
})
go p.Publish("hi")
go p.Publish("golang: https://golang.org")
go p.Publish("docker: https://www.docker.com/")
time.Sleep(1)
go func() {
fmt.Println("golang topic:", <-golang)
}()
go func() {
fmt.Println("docker topic:", <-docker)
}()
<-make(chan bool)
}
其中 pubsub.NewPublisher
構(gòu)造一個發(fā)布對象,p.SubscribeTopic()
可以通過函數(shù)篩選感興趣的主題進(jìn)行訂閱。
現(xiàn)在嘗試基于 gRPC 和 pubsub 包,提供一個跨網(wǎng)絡(luò)的發(fā)布和訂閱系統(tǒng)。首先通過 Protobuf 定義一個發(fā)布訂閱服務(wù)接口:
service PubsubService {
rpc Publish (String) returns (String);
rpc Subscribe (String) returns (stream String);
}
其中 Publish 是普通的 RPC 方法,Subscribe 則是一個單向的流服務(wù)。然后 gRPC 插件會為服務(wù)端和客戶端生成對應(yīng)的接口:
type PubsubServiceServer interface {
Publish(context.Context, *String) (*String, error)
Subscribe(*String, PubsubService_SubscribeServer) error
}
type PubsubServiceClient interface {
Publish(context.Context, *String, ...grpc.CallOption) (*String, error)
Subscribe(context.Context, *String, ...grpc.CallOption) (
PubsubService_SubscribeClient, error,
)
}
type PubsubService_SubscribeServer interface {
Send(*String) error
grpc.ServerStream
}
因為 Subscribe 是服務(wù)端的單向流,因此生成的 PubsubService_SubscribeServer 接口中只有 Send 方法。
然后就可以實現(xiàn)發(fā)布和訂閱服務(wù)了:
type PubsubService struct {
pub *pubsub.Publisher
}
func NewPubsubService() *PubsubService {
return &PubsubService{
pub: pubsub.NewPublisher(100*time.Millisecond, 10),
}
}
然后是實現(xiàn)發(fā)布方法和訂閱方法:
func (p *PubsubService) Publish(
ctx context.Context, arg *String,
) (*String, error) {
p.pub.Publish(arg.GetValue())
return &String{}, nil
}
func (p *PubsubService) Subscribe(
arg *String, stream PubsubService_SubscribeServer,
) error {
ch := p.pub.SubscribeTopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
if strings.HasPrefix(key,arg.GetValue()) {
return true
}
}
return false
})
for v := range ch {
if err := stream.Send(&String{Value: v.(string)}); err != nil {
return err
}
}
return nil
}
這樣就可以從客戶端向服務(wù)器發(fā)布信息了:
func main() {
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := NewPubsubServiceClient(conn)
_, err = client.Publish(
context.Background(), &String{Value: "golang: hello Go"},
)
if err != nil {
log.Fatal(err)
}
_, err = client.Publish(
context.Background(), &String{Value: "docker: hello Docker"},
)
if err != nil {
log.Fatal(err)
}
}
然后就可以在另一個客戶端進(jìn)行訂閱信息了:
func main() {
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := NewPubsubServiceClient(conn)
stream, err := client.Subscribe(
context.Background(), &String{Value: "golang:"},
)
if err != nil {
log.Fatal(err)
}
for {
reply, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}
fmt.Println(reply.GetValue())
}
}
到此我們就基于 gRPC 簡單實現(xiàn)了一個跨網(wǎng)絡(luò)的發(fā)布和訂閱服務(wù)。
![]() | ![]() |
更多建議: