Go 語(yǔ)言 玩轉(zhuǎn) RPC

2023-03-22 15:02 更新

原文鏈接:https://chai2010.cn/advanced-go-programming-book/ch4-rpc/ch4-03-netrpc-hack.html


4.3 玩轉(zhuǎn) RPC

在不同的場(chǎng)景中 RPC 有著不同的需求,因此開(kāi)源的社區(qū)就誕生了各種 RPC 框架。本節(jié)我們將嘗試 Go 內(nèi)置 RPC 框架在一些比較特殊場(chǎng)景的用法。

4.3.1 客戶端 RPC 的實(shí)現(xiàn)原理

Go 語(yǔ)言的 RPC 庫(kù)最簡(jiǎn)單的使用方式是通過(guò) Client.Call 方法進(jìn)行同步阻塞調(diào)用,該方法的實(shí)現(xiàn)如下:

func (client *Client) Call(
    serviceMethod string, args interface{},
    reply interface{},
) error {
    call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
    return call.Error
}

首先通過(guò) Client.Go 方法進(jìn)行一次異步調(diào)用,返回一個(gè)表示這次調(diào)用的 Call 結(jié)構(gòu)體。然后等待 Call 結(jié)構(gòu)體的 Done 管道返回調(diào)用結(jié)果。

我們也可以通過(guò) Client.Go 方法異步調(diào)用前面的 HelloService 服務(wù):

func doClientWork(client *rpc.Client) {
    helloCall := client.Go("HelloService.Hello", "hello", new(string), nil)

    // do some thing

    helloCall = <-helloCall.Done
    if err := helloCall.Error; err != nil {
        log.Fatal(err)
    }

    args := helloCall.Args.(string)
    reply := helloCall.Reply.(*string)
    fmt.Println(args, *reply)
}

在異步調(diào)用命令發(fā)出后,一般會(huì)執(zhí)行其他的任務(wù),因此異步調(diào)用的輸入?yún)?shù)和返回值可以通過(guò)返回的 Call 變量進(jìn)行獲取。

執(zhí)行異步調(diào)用的 Client.Go 方法實(shí)現(xiàn)如下:

func (client *Client) Go(
    serviceMethod string, args interface{},
    reply interface{},
    done chan *Call,
) *Call {
    call := new(Call)
    call.ServiceMethod = serviceMethod
    call.Args = args
    call.Reply = reply
    call.Done = make(chan *Call, 10) // buffered.

    client.send(call)
    return call
}

首先是構(gòu)造一個(gè)表示當(dāng)前調(diào)用的 call 變量,然后通過(guò) client.send 將 call 的完整參數(shù)發(fā)送到 RPC 框架。client.send 方法調(diào)用是線程安全的,因此可以從多個(gè) Goroutine 同時(shí)向同一個(gè) RPC 連接發(fā)送調(diào)用指令。

當(dāng)調(diào)用完成或者發(fā)生錯(cuò)誤時(shí),將調(diào)用 call.done 方法通知完成:

func (call *Call) done() {
    select {
    case call.Done <- call:
        // ok
    default:
        // We don't want to block here. It is the caller's responsibility to make
        // sure the channel has enough buffer space. See comment in Go().
    }
}

從 Call.done 方法的實(shí)現(xiàn)可以得知 call.Done 管道會(huì)將處理后的 call 返回。

4.3.2 基于 RPC 實(shí)現(xiàn) Watch 功能

在很多系統(tǒng)中都提供了 Watch 監(jiān)視功能的接口,當(dāng)系統(tǒng)滿足某種條件時(shí) Watch 方法返回監(jiān)控的結(jié)果。在這里我們可以嘗試通過(guò) RPC 框架實(shí)現(xiàn)一個(gè)基本的 Watch 功能。如前文所描述,因?yàn)?nbsp;client.send 是線程安全的,我們也可以通過(guò)在不同的 Goroutine 中同時(shí)并發(fā)阻塞調(diào)用 RPC 方法。通過(guò)在一個(gè)獨(dú)立的 Goroutine 中調(diào)用 Watch 函數(shù)進(jìn)行監(jiān)控。

為了便于演示,我們計(jì)劃通過(guò) RPC 構(gòu)造一個(gè)簡(jiǎn)單的內(nèi)存 KV 數(shù)據(jù)庫(kù)。首先定義服務(wù)如下:

type KVStoreService struct {
    m      map[string]string
    filter map[string]func(key string)
    mu     sync.Mutex
}

func NewKVStoreService() *KVStoreService {
    return &KVStoreService{
        m:      make(map[string]string),
        filter: make(map[string]func(key string)),
    }
}

其中 m 成員是一個(gè) map 類型,用于存儲(chǔ) KV 數(shù)據(jù)。filter 成員對(duì)應(yīng)每個(gè) Watch 調(diào)用時(shí)定義的過(guò)濾器函數(shù)列表。而 mu 成員為互斥鎖,用于在多個(gè) Goroutine 訪問(wèn)或修改時(shí)對(duì)其它成員提供保護(hù)。

然后就是 Get 和 Set 方法:

func (p *KVStoreService) Get(key string, value *string) error {
    p.mu.Lock()
    defer p.mu.Unlock()

    if v, ok := p.m[key]; ok {
        *value = v
        return nil
    }

    return fmt.Errorf("not found")
}

func (p *KVStoreService) Set(kv [2]string, reply *struct{}) error {
    p.mu.Lock()
    defer p.mu.Unlock()

    key, value := kv[0], kv[1]

    if oldValue := p.m[key]; oldValue != value {
        for _, fn := range p.filter {
            fn(key)
        }
    }

    p.m[key] = value
    return nil
}

在 Set 方法中,輸入?yún)?shù)是 key 和 value 組成的數(shù)組,用一個(gè)匿名的空結(jié)構(gòu)體表示忽略了輸出參數(shù)。當(dāng)修改某個(gè) key 對(duì)應(yīng)的值時(shí)會(huì)調(diào)用每一個(gè)過(guò)濾器函數(shù)。

而過(guò)濾器列表在 Watch 方法中提供:

func (p *KVStoreService) Watch(timeoutSecond int, keyChanged *string) error {
    id := fmt.Sprintf("watch-%s-%03d", time.Now(), rand.Int())
    ch := make(chan string, 10) // buffered

    p.mu.Lock()
    p.filter[id] = func(key string) { ch <- key }
    p.mu.Unlock()

    select {
    case <-time.After(time.Duration(timeoutSecond) * time.Second):
        return fmt.Errorf("timeout")
    case key := <-ch:
        *keyChanged = key
        return nil
    }

    return nil
}

Watch 方法的輸入?yún)?shù)是超時(shí)的秒數(shù)。當(dāng)有 key 變化時(shí)將 key 作為返回值返回。如果超過(guò)時(shí)間后依然沒(méi)有 key 被修改,則返回超時(shí)的錯(cuò)誤。Watch 的實(shí)現(xiàn)中,用唯一的 id 表示每個(gè) Watch 調(diào)用,然后根據(jù) id 將自身對(duì)應(yīng)的過(guò)濾器函數(shù)注冊(cè)到 p.filter 列表。

KVStoreService 服務(wù)的注冊(cè)和啟動(dòng)過(guò)程我們不再贅述。下面我們看看如何從客戶端使用 Watch 方法:

func doClientWork(client *rpc.Client) {
    go func() {
        var keyChanged string
        err := client.Call("KVStoreService.Watch", 30, &keyChanged)
        if err != nil {
            log.Fatal(err)
        }
        fmt.Println("watch:", keyChanged)
    } ()

    err := client.Call(
        "KVStoreService.Set", [2]string{"abc", "abc-value"},
        new(struct{}),
    )
    if err != nil {
        log.Fatal(err)
    }

    time.Sleep(time.Second*3)
}

首先啟動(dòng)一個(gè)獨(dú)立的 Goroutine 監(jiān)控 key 的變化。同步的 watch 調(diào)用會(huì)阻塞,直到有 key 發(fā)生變化或者超時(shí)。然后在通過(guò) Set 方法修改 KV 值時(shí),服務(wù)器會(huì)將變化的 key 通過(guò) Watch 方法返回。這樣我們就可以實(shí)現(xiàn)對(duì)某些狀態(tài)的監(jiān)控。

4.3.3 反向 RPC

通常的 RPC 是基于 C/S 結(jié)構(gòu),RPC 的服務(wù)端對(duì)應(yīng)網(wǎng)絡(luò)的服務(wù)器,RPC 的客戶端也對(duì)應(yīng)網(wǎng)絡(luò)客戶端。但是對(duì)于一些特殊場(chǎng)景,比如在公司內(nèi)網(wǎng)提供一個(gè) RPC 服務(wù),但是在外網(wǎng)無(wú)法連接到內(nèi)網(wǎng)的服務(wù)器。這種時(shí)候我們可以參考類似反向代理的技術(shù),首先從內(nèi)網(wǎng)主動(dòng)連接到外網(wǎng)的 TCP 服務(wù)器,然后基于 TCP 連接向外網(wǎng)提供 RPC 服務(wù)。

以下是啟動(dòng)反向 RPC 服務(wù)的代碼:

func main() {
    rpc.Register(new(HelloService))

    for {
        conn, _ := net.Dial("tcp", "localhost:1234")
        if conn == nil {
            time.Sleep(time.Second)
            continue
        }

        rpc.ServeConn(conn)
        conn.Close()
    }
}

反向 RPC 的內(nèi)網(wǎng)服務(wù)將不再主動(dòng)提供 TCP 監(jiān)聽(tīng)服務(wù),而是首先主動(dòng)連接到對(duì)方的 TCP 服務(wù)器。然后基于每個(gè)建立的 TCP 連接向?qū)Ψ教峁?RPC 服務(wù)。

而 RPC 客戶端則需要在一個(gè)公共的地址提供一個(gè) TCP 服務(wù),用于接受 RPC 服務(wù)器的連接請(qǐng)求:

func main() {
    listener, err := net.Listen("tcp", ":1234")
    if err != nil {
        log.Fatal("ListenTCP error:", err)
    }

    clientChan := make(chan *rpc.Client)

    go func() {
        for {
            conn, err := listener.Accept()
            if err != nil {
                log.Fatal("Accept error:", err)
            }

            clientChan <- rpc.NewClient(conn)
        }
    }()

    doClientWork(clientChan)
}

當(dāng)每個(gè)連接建立后,基于網(wǎng)絡(luò)連接構(gòu)造 RPC 客戶端對(duì)象并發(fā)送到 clientChan 管道。

客戶端執(zhí)行 RPC 調(diào)用的操作在 doClientWork 函數(shù)完成:

func doClientWork(clientChan <-chan *rpc.Client) {
    client := <-clientChan
    defer client.Close()

    var reply string
    err := client.Call("HelloService.Hello", "hello", &reply)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Println(reply)
}

首先從管道去取一個(gè) RPC 客戶端對(duì)象,并且通過(guò) defer 語(yǔ)句指定在函數(shù)退出前關(guān)閉客戶端。然后是執(zhí)行正常的 RPC 調(diào)用。

4.3.4 上下文信息

基于上下文我們可以針對(duì)不同客戶端提供定制化的 RPC 服務(wù)。我們可以通過(guò)為每個(gè)連接提供獨(dú)立的 RPC 服務(wù)來(lái)實(shí)現(xiàn)對(duì)上下文特性的支持。

首先改造 HelloService,里面增加了對(duì)應(yīng)連接的 conn 成員:

type HelloService struct {
    conn net.Conn
}

然后為每個(gè)連接啟動(dòng)獨(dú)立的 RPC 服務(wù):

func main() {
    listener, err := net.Listen("tcp", ":1234")
    if err != nil {
        log.Fatal("ListenTCP error:", err)
    }

    for {
        conn, err := listener.Accept()
        if err != nil {
            log.Fatal("Accept error:", err)
        }

        go func() {
            defer conn.Close()

            p := rpc.NewServer()
            p.Register(&HelloService{conn: conn})
            p.ServeConn(conn)
        } ()
    }
}

Hello 方法中就可以根據(jù) conn 成員識(shí)別不同連接的 RPC 調(diào)用:

func (p *HelloService) Hello(request string, reply *string) error {
    *reply = "hello:" + request + ", from" + p.conn.RemoteAddr().String()
    return nil
}

基于上下文信息,我們可以方便地為 RPC 服務(wù)增加簡(jiǎn)單的登陸狀態(tài)的驗(yàn)證:

type HelloService struct {
    conn    net.Conn
    isLogin bool
}

func (p *HelloService) Login(request string, reply *string) error {
    if request != "user:password" {
        return fmt.Errorf("auth failed")
    }
    log.Println("login ok")
    p.isLogin = true
    return nil
}

func (p *HelloService) Hello(request string, reply *string) error {
    if !p.isLogin {
        return fmt.Errorf("please login")
    }
    *reply = "hello:" + request + ", from" + p.conn.RemoteAddr().String()
    return nil
}

這樣可以要求在客戶端連接 RPC 服務(wù)時(shí),首先要執(zhí)行登陸操作,登陸成功后才能正常執(zhí)行其他的服務(wù)。



以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)