Netty 4.x 處理一個基于流的傳輸

2020-11-13 14:16 更新

處理一個基于流的傳輸

關(guān)于 Socket Buffer的一個小警告

基于流的傳輸比如 TCP/IP, 接收到數(shù)據(jù)是存在 socket 接收的 buffer 中。不幸的是,基于流的傳輸并不是一個數(shù)據(jù)包隊列,而是一個字節(jié)隊列。意味著,即使你發(fā)送了 2 個獨立的數(shù)據(jù)包,操作系統(tǒng)也不會作為 2 個消息處理而僅僅是作為一連串的字節(jié)而言。因此這是不能保證你遠(yuǎn)程寫入的數(shù)據(jù)就會準(zhǔn)確地讀取。舉個例子,讓我們假設(shè)操作系統(tǒng)的 TCP/IP 協(xié)議棧已經(jīng)接收了 3 個數(shù)據(jù)包:

由于基于流傳輸?shù)膮f(xié)議的這種普通的性質(zhì),在你的應(yīng)用程序里讀取數(shù)據(jù)的時候會有很高的可能性被分成下面的片段

因此,一個接收方不管他是客戶端還是服務(wù)端,都應(yīng)該把接收到的數(shù)據(jù)整理成一個或者多個更有意思并且能夠讓程序的業(yè)務(wù)邏輯更好理解的數(shù)據(jù)。在上面的例子中,接收到的數(shù)據(jù)應(yīng)該被構(gòu)造成下面的格式:

The First Solution 辦法一

回到 TIME 客戶端例子。同樣也有類似的問題。一個 32 位整型是非常小的數(shù)據(jù),他并不見得會被經(jīng)常拆分到到不同的數(shù)據(jù)段內(nèi)。然而,問題是他確實可能會被拆分到不同的數(shù)據(jù)段內(nèi),并且拆分的可能性會隨著通信量的增加而增加。

最簡單的方案是構(gòu)造一個內(nèi)部的可積累的緩沖,直到 4 個字節(jié)全部接收到了內(nèi)部緩沖。下面的代碼修改了 TimeClientHandler 的實現(xiàn)類修復(fù)了這個問題

    public class TimeClientHandler extends ChannelInboundHandlerAdapter {
        private ByteBuf buf;

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) {
            buf = ctx.alloc().buffer(4); // (1)
        }

        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) {
            buf.release(); // (1)
            buf = null;
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ByteBuf m = (ByteBuf) msg;
            buf.writeBytes(m); // (2)
            m.release();

            if (buf.readableBytes() >= 4) { // (3)
                long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
                System.out.println(new Date(currentTimeMillis));
                ctx.close();
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }

1.ChannelHandler 有 2 個生命周期的監(jiān)聽方法:handlerAdded() 和 handlerRemoved()。你可以完成任意初始化任務(wù)只要他不會被阻塞很長的時間。

2.首先,所有接收的數(shù)據(jù)都應(yīng)該被累積在 buf 變量里。

3.然后,處理器必須檢查 buf 變量是否有足夠的數(shù)據(jù),在這個例子中是 4 個字節(jié),然后處理實際的業(yè)務(wù)邏輯。否則,Netty 會重復(fù)調(diào)用channelRead() 當(dāng)有更多數(shù)據(jù)到達(dá)直到4個字節(jié)的數(shù)據(jù)被積累。

The Second Solution 方法二

盡管第一個解決方案已經(jīng)解決了 TIME 客戶端的問題了,但是修改后的處理器看起來不那么的簡潔,想象一下如果由多個字段比如可變長度的字段組成的更為復(fù)雜的協(xié)議時,你的 ChannelInboundHandler 的實現(xiàn)將很快地變得難以維護(hù)。

正如你所知的,你可以增加多個 ChannelHandlerChannelPipeline ,因此你可以把一整個ChannelHandler 拆分成多個模塊以減少應(yīng)用的復(fù)雜程度,比如你可以把TimeClientHandler 拆分成 2 個處理器:

  • TimeDecoder 處理數(shù)據(jù)拆分的問題
  • TimeClientHandler 原始版本的實現(xiàn)

幸運地是,Netty 提供了一個可擴(kuò)展的類,幫你完成 TimeDecoder 的開發(fā)。

    public class TimeDecoder extends ByteToMessageDecoder { // (1)
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
            if (in.readableBytes() < 4) {
                return; // (3)
            }

            out.add(in.readBytes(4)); // (4)
        }
    }

1.ByteToMessageDecoderChannelInboundHandler 的一個實現(xiàn)類,他可以在處理數(shù)據(jù)拆分的問題上變得很簡單。

2.每當(dāng)有新數(shù)據(jù)接收的時候,ByteToMessageDecoder 都會調(diào)用 decode() 方法來處理內(nèi)部的那個累積緩沖。

3.Decode() 方法可以決定當(dāng)累積緩沖里沒有足夠數(shù)據(jù)時可以往 out 對象里放任意數(shù)據(jù)。當(dāng)有更多的數(shù)據(jù)被接收了 ByteToMessageDecoder 會再一次調(diào)用 decode() 方法。

4.如果在 decode() 方法里增加了一個對象到 out 對象里,這意味著解碼器解碼消息成功。ByteToMessageDecoder 將會丟棄在累積緩沖里已經(jīng)被讀過的數(shù)據(jù)。請記得你不需要對多條消息調(diào)用 decode(),ByteToMessageDecoder 會持續(xù)調(diào)用 decode() 直到不放任何數(shù)據(jù)到 out 里。

現(xiàn)在我們有另外一個處理器插入到 ChannelPipeline 里,我們應(yīng)該在 TimeClient 里修改 ChannelInitializer 的實現(xiàn):

    b.handler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
        }
    });

如果你是一個大膽的人,你可能會嘗試使用更簡單的解碼類ReplayingDecoder。不過你還是需要參考一下 API 文檔來獲取更多的信息。

    public class TimeDecoder extends ReplayingDecoder<Void> {
        @Override
        protected void decode(
                ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
            out.add(in.readBytes(4));
        }
    }

此外,Netty 還提供了更多開箱即用的解碼器使你可以更簡單地實現(xiàn)更多的協(xié)議,幫助你避免開發(fā)一個難以維護(hù)的處理器實現(xiàn)。請參考下面的包以獲取更多更詳細(xì)的例子:

譯者注:翻譯版本的項目源碼見 https://github.com/waylau/netty-4-user-guide-demos 中的com.waylau.netty.demo.factorialcom.waylau.netty.demo.telnet 包下

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號