原文鏈接:https://chai2010.cn/advanced-go-programming-book/ch4-rpc/ch4-03-netrpc-hack.html
在不同的場(chǎng)景中 RPC 有著不同的需求,因此開(kāi)源的社區(qū)就誕生了各種 RPC 框架。本節(jié)我們將嘗試 Go 內(nèi)置 RPC 框架在一些比較特殊場(chǎng)景的用法。
Go 語(yǔ)言的 RPC 庫(kù)最簡(jiǎn)單的使用方式是通過(guò) Client.Call
方法進(jìn)行同步阻塞調(diào)用,該方法的實(shí)現(xiàn)如下:
func (client *Client) Call(
serviceMethod string, args interface{},
reply interface{},
) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
首先通過(guò) Client.Go
方法進(jìn)行一次異步調(diào)用,返回一個(gè)表示這次調(diào)用的 Call
結(jié)構(gòu)體。然后等待 Call
結(jié)構(gòu)體的 Done 管道返回調(diào)用結(jié)果。
我們也可以通過(guò) Client.Go
方法異步調(diào)用前面的 HelloService 服務(wù):
func doClientWork(client *rpc.Client) {
helloCall := client.Go("HelloService.Hello", "hello", new(string), nil)
// do some thing
helloCall = <-helloCall.Done
if err := helloCall.Error; err != nil {
log.Fatal(err)
}
args := helloCall.Args.(string)
reply := helloCall.Reply.(*string)
fmt.Println(args, *reply)
}
在異步調(diào)用命令發(fā)出后,一般會(huì)執(zhí)行其他的任務(wù),因此異步調(diào)用的輸入?yún)?shù)和返回值可以通過(guò)返回的 Call 變量進(jìn)行獲取。
執(zhí)行異步調(diào)用的 Client.Go
方法實(shí)現(xiàn)如下:
func (client *Client) Go(
serviceMethod string, args interface{},
reply interface{},
done chan *Call,
) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
call.Done = make(chan *Call, 10) // buffered.
client.send(call)
return call
}
首先是構(gòu)造一個(gè)表示當(dāng)前調(diào)用的 call 變量,然后通過(guò) client.send
將 call 的完整參數(shù)發(fā)送到 RPC 框架。client.send
方法調(diào)用是線程安全的,因此可以從多個(gè) Goroutine 同時(shí)向同一個(gè) RPC 連接發(fā)送調(diào)用指令。
當(dāng)調(diào)用完成或者發(fā)生錯(cuò)誤時(shí),將調(diào)用 call.done
方法通知完成:
func (call *Call) done() {
select {
case call.Done <- call:
// ok
default:
// We don't want to block here. It is the caller's responsibility to make
// sure the channel has enough buffer space. See comment in Go().
}
}
從 Call.done
方法的實(shí)現(xiàn)可以得知 call.Done
管道會(huì)將處理后的 call 返回。
在很多系統(tǒng)中都提供了 Watch 監(jiān)視功能的接口,當(dāng)系統(tǒng)滿足某種條件時(shí) Watch 方法返回監(jiān)控的結(jié)果。在這里我們可以嘗試通過(guò) RPC 框架實(shí)現(xiàn)一個(gè)基本的 Watch 功能。如前文所描述,因?yàn)?nbsp;client.send
是線程安全的,我們也可以通過(guò)在不同的 Goroutine 中同時(shí)并發(fā)阻塞調(diào)用 RPC 方法。通過(guò)在一個(gè)獨(dú)立的 Goroutine 中調(diào)用 Watch 函數(shù)進(jìn)行監(jiān)控。
為了便于演示,我們計(jì)劃通過(guò) RPC 構(gòu)造一個(gè)簡(jiǎn)單的內(nèi)存 KV 數(shù)據(jù)庫(kù)。首先定義服務(wù)如下:
type KVStoreService struct {
m map[string]string
filter map[string]func(key string)
mu sync.Mutex
}
func NewKVStoreService() *KVStoreService {
return &KVStoreService{
m: make(map[string]string),
filter: make(map[string]func(key string)),
}
}
其中 m
成員是一個(gè) map 類型,用于存儲(chǔ) KV 數(shù)據(jù)。filter
成員對(duì)應(yīng)每個(gè) Watch 調(diào)用時(shí)定義的過(guò)濾器函數(shù)列表。而 mu
成員為互斥鎖,用于在多個(gè) Goroutine 訪問(wèn)或修改時(shí)對(duì)其它成員提供保護(hù)。
然后就是 Get 和 Set 方法:
func (p *KVStoreService) Get(key string, value *string) error {
p.mu.Lock()
defer p.mu.Unlock()
if v, ok := p.m[key]; ok {
*value = v
return nil
}
return fmt.Errorf("not found")
}
func (p *KVStoreService) Set(kv [2]string, reply *struct{}) error {
p.mu.Lock()
defer p.mu.Unlock()
key, value := kv[0], kv[1]
if oldValue := p.m[key]; oldValue != value {
for _, fn := range p.filter {
fn(key)
}
}
p.m[key] = value
return nil
}
在 Set 方法中,輸入?yún)?shù)是 key 和 value 組成的數(shù)組,用一個(gè)匿名的空結(jié)構(gòu)體表示忽略了輸出參數(shù)。當(dāng)修改某個(gè) key 對(duì)應(yīng)的值時(shí)會(huì)調(diào)用每一個(gè)過(guò)濾器函數(shù)。
而過(guò)濾器列表在 Watch 方法中提供:
func (p *KVStoreService) Watch(timeoutSecond int, keyChanged *string) error {
id := fmt.Sprintf("watch-%s-%03d", time.Now(), rand.Int())
ch := make(chan string, 10) // buffered
p.mu.Lock()
p.filter[id] = func(key string) { ch <- key }
p.mu.Unlock()
select {
case <-time.After(time.Duration(timeoutSecond) * time.Second):
return fmt.Errorf("timeout")
case key := <-ch:
*keyChanged = key
return nil
}
return nil
}
Watch 方法的輸入?yún)?shù)是超時(shí)的秒數(shù)。當(dāng)有 key 變化時(shí)將 key 作為返回值返回。如果超過(guò)時(shí)間后依然沒(méi)有 key 被修改,則返回超時(shí)的錯(cuò)誤。Watch 的實(shí)現(xiàn)中,用唯一的 id 表示每個(gè) Watch 調(diào)用,然后根據(jù) id 將自身對(duì)應(yīng)的過(guò)濾器函數(shù)注冊(cè)到 p.filter
列表。
KVStoreService 服務(wù)的注冊(cè)和啟動(dòng)過(guò)程我們不再贅述。下面我們看看如何從客戶端使用 Watch 方法:
func doClientWork(client *rpc.Client) {
go func() {
var keyChanged string
err := client.Call("KVStoreService.Watch", 30, &keyChanged)
if err != nil {
log.Fatal(err)
}
fmt.Println("watch:", keyChanged)
} ()
err := client.Call(
"KVStoreService.Set", [2]string{"abc", "abc-value"},
new(struct{}),
)
if err != nil {
log.Fatal(err)
}
time.Sleep(time.Second*3)
}
首先啟動(dòng)一個(gè)獨(dú)立的 Goroutine 監(jiān)控 key 的變化。同步的 watch 調(diào)用會(huì)阻塞,直到有 key 發(fā)生變化或者超時(shí)。然后在通過(guò) Set 方法修改 KV 值時(shí),服務(wù)器會(huì)將變化的 key 通過(guò) Watch 方法返回。這樣我們就可以實(shí)現(xiàn)對(duì)某些狀態(tài)的監(jiān)控。
通常的 RPC 是基于 C/S 結(jié)構(gòu),RPC 的服務(wù)端對(duì)應(yīng)網(wǎng)絡(luò)的服務(wù)器,RPC 的客戶端也對(duì)應(yīng)網(wǎng)絡(luò)客戶端。但是對(duì)于一些特殊場(chǎng)景,比如在公司內(nèi)網(wǎng)提供一個(gè) RPC 服務(wù),但是在外網(wǎng)無(wú)法連接到內(nèi)網(wǎng)的服務(wù)器。這種時(shí)候我們可以參考類似反向代理的技術(shù),首先從內(nèi)網(wǎng)主動(dòng)連接到外網(wǎng)的 TCP 服務(wù)器,然后基于 TCP 連接向外網(wǎng)提供 RPC 服務(wù)。
以下是啟動(dòng)反向 RPC 服務(wù)的代碼:
func main() {
rpc.Register(new(HelloService))
for {
conn, _ := net.Dial("tcp", "localhost:1234")
if conn == nil {
time.Sleep(time.Second)
continue
}
rpc.ServeConn(conn)
conn.Close()
}
}
反向 RPC 的內(nèi)網(wǎng)服務(wù)將不再主動(dòng)提供 TCP 監(jiān)聽(tīng)服務(wù),而是首先主動(dòng)連接到對(duì)方的 TCP 服務(wù)器。然后基于每個(gè)建立的 TCP 連接向?qū)Ψ教峁?RPC 服務(wù)。
而 RPC 客戶端則需要在一個(gè)公共的地址提供一個(gè) TCP 服務(wù),用于接受 RPC 服務(wù)器的連接請(qǐng)求:
func main() {
listener, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal("ListenTCP error:", err)
}
clientChan := make(chan *rpc.Client)
go func() {
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal("Accept error:", err)
}
clientChan <- rpc.NewClient(conn)
}
}()
doClientWork(clientChan)
}
當(dāng)每個(gè)連接建立后,基于網(wǎng)絡(luò)連接構(gòu)造 RPC 客戶端對(duì)象并發(fā)送到 clientChan 管道。
客戶端執(zhí)行 RPC 調(diào)用的操作在 doClientWork 函數(shù)完成:
func doClientWork(clientChan <-chan *rpc.Client) {
client := <-clientChan
defer client.Close()
var reply string
err := client.Call("HelloService.Hello", "hello", &reply)
if err != nil {
log.Fatal(err)
}
fmt.Println(reply)
}
首先從管道去取一個(gè) RPC 客戶端對(duì)象,并且通過(guò) defer 語(yǔ)句指定在函數(shù)退出前關(guān)閉客戶端。然后是執(zhí)行正常的 RPC 調(diào)用。
基于上下文我們可以針對(duì)不同客戶端提供定制化的 RPC 服務(wù)。我們可以通過(guò)為每個(gè)連接提供獨(dú)立的 RPC 服務(wù)來(lái)實(shí)現(xiàn)對(duì)上下文特性的支持。
首先改造 HelloService,里面增加了對(duì)應(yīng)連接的 conn 成員:
type HelloService struct {
conn net.Conn
}
然后為每個(gè)連接啟動(dòng)獨(dú)立的 RPC 服務(wù):
func main() {
listener, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal("ListenTCP error:", err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal("Accept error:", err)
}
go func() {
defer conn.Close()
p := rpc.NewServer()
p.Register(&HelloService{conn: conn})
p.ServeConn(conn)
} ()
}
}
Hello 方法中就可以根據(jù) conn 成員識(shí)別不同連接的 RPC 調(diào)用:
func (p *HelloService) Hello(request string, reply *string) error {
*reply = "hello:" + request + ", from" + p.conn.RemoteAddr().String()
return nil
}
基于上下文信息,我們可以方便地為 RPC 服務(wù)增加簡(jiǎn)單的登陸狀態(tài)的驗(yàn)證:
type HelloService struct {
conn net.Conn
isLogin bool
}
func (p *HelloService) Login(request string, reply *string) error {
if request != "user:password" {
return fmt.Errorf("auth failed")
}
log.Println("login ok")
p.isLogin = true
return nil
}
func (p *HelloService) Hello(request string, reply *string) error {
if !p.isLogin {
return fmt.Errorf("please login")
}
*reply = "hello:" + request + ", from" + p.conn.RemoteAddr().String()
return nil
}
這樣可以要求在客戶端連接 RPC 服務(wù)時(shí),首先要執(zhí)行登陸操作,登陸成功后才能正常執(zhí)行其他的服務(wù)。
![]() | ![]() |
更多建議: