Netty4 開(kāi)始

2020-10-19 15:44 更新

開(kāi)始

本章圍繞 Netty 的核心架構(gòu),通過(guò)簡(jiǎn)單的示例帶你快速入門(mén)。當(dāng)你讀完本章節(jié),你馬上就可以用 Netty 寫(xiě)出一個(gè)客戶端和服務(wù)器。

如果你在學(xué)習(xí)的時(shí)候喜歡“top-down(自頂向下)”,那你可能需要要從第二章《Architectural Overview (架構(gòu)總覽)》開(kāi)始,然后再回到這里。

開(kāi)始之前

在運(yùn)行本章示例之前,需要準(zhǔn)備:最新版的 Netty 以及 JDK 1.6 或以上版本。最新版的 Netty 在這下載。自行下載 JDK。

閱讀本章節(jié)過(guò)程中,你可能會(huì)對(duì)相關(guān)類有疑惑,關(guān)于這些類的詳細(xì)的信息請(qǐng)請(qǐng)參考 API 說(shuō)明文檔。為了方便,所有文檔中涉及到的類名字都會(huì)被關(guān)聯(lián)到一個(gè)在線的 API 說(shuō)明。當(dāng)然,如果有任何錯(cuò)誤信息、語(yǔ)法錯(cuò)誤或者你有任何好的建議來(lái)改進(jìn)文檔說(shuō)明,那么請(qǐng)聯(lián)系Netty社區(qū)

譯者注:對(duì)本翻譯有任何疑問(wèn),在https://github.com/waylau/netty-4-user-guide/issues提問(wèn)

寫(xiě)個(gè)丟棄服務(wù)器

世上最簡(jiǎn)單的協(xié)議不是'Hello, World!' 而是 DISCARD(丟棄)。這個(gè)協(xié)議將會(huì)丟掉任何收到的數(shù)據(jù),而不響應(yīng)。

為了實(shí)現(xiàn) DISCARD 協(xié)議,你只需忽略所有收到的數(shù)據(jù)。讓我們從 handler (處理器)的實(shí)現(xiàn)開(kāi)始,handler 是由 Netty 生成用來(lái)處理 I/O 事件的。

     import io.netty.buffer.ByteBuf;
 
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
 
    /**
     * 處理服務(wù)端 channel.
     */
    public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)
 
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
            // 默默地丟棄收到的數(shù)據(jù)
            ((ByteBuf) msg).release(); // (3)
        }
 
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
            // 當(dāng)出現(xiàn)異常就關(guān)閉連接
            cause.printStackTrace();
            ctx.close();
        }
    }

1.DiscardServerHandler 繼承自 ChannelInboundHandlerAdapter,這個(gè)類實(shí)現(xiàn)了 ChannelInboundHandler接口,ChannelInboundHandler 提供了許多事件處理的接口方法,然后你可以覆蓋這些方法?,F(xiàn)在僅僅只需要繼承 ChannelInboundHandlerAdapter 類而不是你自己去實(shí)現(xiàn)接口方法。

2.這里我們覆蓋了 chanelRead() 事件處理方法。每當(dāng)從客戶端收到新的數(shù)據(jù)時(shí),這個(gè)方法會(huì)在收到消息時(shí)被調(diào)用,這個(gè)例子中,收到的消息的類型是 ByteBuf

3.為了實(shí)現(xiàn) DISCARD 協(xié)議,處理器不得不忽略所有接受到的消息。ByteBuf 是一個(gè)引用計(jì)數(shù)對(duì)象,這個(gè)對(duì)象必須顯示地調(diào)用 release() 方法來(lái)釋放。請(qǐng)記住處理器的職責(zé)是釋放所有傳遞到處理器的引用計(jì)數(shù)對(duì)象。通常,channelRead() 方法的實(shí)現(xiàn)就像下面的這段代碼:

 
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            // Do something with msg
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

4.exceptionCaught() 事件處理方法是當(dāng)出現(xiàn) Throwable 對(duì)象才會(huì)被調(diào)用,即當(dāng) Netty 由于 IO 錯(cuò)誤或者處理器在處理事件時(shí)拋出的異常時(shí)。在大部分情況下,捕獲的異常應(yīng)該被記錄下來(lái)并且把關(guān)聯(lián)的 channel 給關(guān)閉掉。然而這個(gè)方法的處理方式會(huì)在遇到不同異常的情況下有不同的實(shí)現(xiàn),比如你可能想在關(guān)閉連接之前發(fā)送一個(gè)錯(cuò)誤碼的響應(yīng)消息。

目前為止一切都還不錯(cuò),我們已經(jīng)實(shí)現(xiàn)了 DISCARD 服務(wù)器的一半功能,剩下的需要編寫(xiě)一個(gè) main() 方法來(lái)啟動(dòng)服務(wù)端的 DiscardServerHandler

    import io.netty.bootstrap.ServerBootstrap;
 
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
 
    /**
     * 丟棄任何進(jìn)入的數(shù)據(jù)
     */
    public class DiscardServer {
 
        private int port;
 
        public DiscardServer(int port) {
            this.port = port;
        }
 
        public void run() throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap(); // (2)
                b.group(bossGroup, workerGroup)
                 .channel(NioServerSocketChannel.class) // (3)
                 .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                     @Override
                     public void initChannel(SocketChannel ch) throws Exception {
                         ch.pipeline().addLast(new DiscardServerHandler());
                     }
                 })
                 .option(ChannelOption.SO_BACKLOG, 128)          // (5)
                 .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
 
                // 綁定端口,開(kāi)始接收進(jìn)來(lái)的連接
                ChannelFuture f = b.bind(port).sync(); // (7)
 
                // 等待服務(wù)器  socket 關(guān)閉 。
                // 在這個(gè)例子中,這不會(huì)發(fā)生,但你可以優(yōu)雅地關(guān)閉你的服務(wù)器。
                f.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
 
        public static void main(String[] args) throws Exception {
            int port;
            if (args.length > 0) {
                port = Integer.parseInt(args[0]);
            } else {
                port = 8080;
            }
            new DiscardServer(port).run();
        }
    }

1.NioEventLoopGroup 是用來(lái)處理I/O操作的多線程事件循環(huán)器,Netty 提供了許多不同的 EventLoopGroup 的實(shí)現(xiàn)用來(lái)處理不同的傳輸。在這個(gè)例子中我們實(shí)現(xiàn)了一個(gè)服務(wù)端的應(yīng)用,因此會(huì)有2個(gè) NioEventLoopGroup 會(huì)被使用。第一個(gè)經(jīng)常被叫做‘boss’,用來(lái)接收進(jìn)來(lái)的連接。第二個(gè)經(jīng)常被叫做‘worker’,用來(lái)處理已經(jīng)被接收的連接,一旦‘boss’接收到連接,就會(huì)把連接信息注冊(cè)到‘worker’上。如何知道多少個(gè)線程已經(jīng)被使用,如何映射到已經(jīng)創(chuàng)建的 Channel上都需要依賴于 EventLoopGroup 的實(shí)現(xiàn),并且可以通過(guò)構(gòu)造函數(shù)來(lái)配置他們的關(guān)系。

2.ServerBootstrap 是一個(gè)啟動(dòng) NIO 服務(wù)的輔助啟動(dòng)類。你可以在這個(gè)服務(wù)中直接使用 Channel,但是這會(huì)是一個(gè)復(fù)雜的處理過(guò)程,在很多情況下你并不需要這樣做。

3.這里我們指定使用 NioServerSocketChannel 類來(lái)舉例說(shuō)明一個(gè)新的 Channel 如何接收進(jìn)來(lái)的連接。

4.這里的事件處理類經(jīng)常會(huì)被用來(lái)處理一個(gè)最近的已經(jīng)接收的 Channel。ChannelInitializer 是一個(gè)特殊的處理類,他的目的是幫助使用者配置一個(gè)新的 Channel。也許你想通過(guò)增加一些處理類比如DiscardServerHandler 來(lái)配置一個(gè)新的 Channel 或者其對(duì)應(yīng)的ChannelPipeline 來(lái)實(shí)現(xiàn)你的網(wǎng)絡(luò)程序。當(dāng)你的程序變的復(fù)雜時(shí),可能你會(huì)增加更多的處理類到 pipline 上,然后提取這些匿名類到最頂層的類上。

5.你可以設(shè)置這里指定的 Channel 實(shí)現(xiàn)的配置參數(shù)。我們正在寫(xiě)一個(gè)TCP/IP 的服務(wù)端,因此我們被允許設(shè)置 socket 的參數(shù)選項(xiàng)比如tcpNoDelay keepAlive。請(qǐng)參考 ChannelOption 和詳細(xì)的 ChannelConfig實(shí)現(xiàn)的接口文檔以此可以對(duì)ChannelOption 的有一個(gè)大概的認(rèn)識(shí)。

6.你關(guān)注過(guò) option()childOption() 嗎?option() 是提供給NioServerSocketChannel 用來(lái)接收進(jìn)來(lái)的連接。childOption() 是提供給由父管道 ServerChannel 接收到的連接,在這個(gè)例子中也是 NioServerSocketChannel

7.我們繼續(xù),剩下的就是綁定端口然后啟動(dòng)服務(wù)。這里我們?cè)跈C(jī)器上綁定了機(jī)器所有網(wǎng)卡上的 8080 端口。當(dāng)然現(xiàn)在你可以多次調(diào)用 bind() 方法(基于不同綁定地址)。

恭喜!你已經(jīng)熟練地完成了第一個(gè)基于 Netty 的服務(wù)端程序。

查看收到的數(shù)據(jù)

現(xiàn)在我們已經(jīng)編寫(xiě)出我們第一個(gè)服務(wù)端,我們需要測(cè)試一下他是否真的可以運(yùn)行。最簡(jiǎn)單的測(cè)試方法是用 telnet 命令。例如,你可以在命令行上輸入telnet localhost 8080或者其他類型參數(shù)。

然而我們能說(shuō)這個(gè)服務(wù)端是正常運(yùn)行了嗎?事實(shí)上我們也不知道,因?yàn)樗且粋€(gè) discard 服務(wù),你根本不可能得到任何的響應(yīng)。為了證明他仍然是在正常工作的,讓我們修改服務(wù)端的程序來(lái)打印出他到底接收到了什么。

我們已經(jīng)知道 channelRead() 方法是在數(shù)據(jù)被接收的時(shí)候調(diào)用。讓我們放一些代碼到 DiscardServerHandler 類的 channelRead() 方法。

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        try {
            while (in.isReadable()) { // (1)
                System.out.print((char) in.readByte());
                System.out.flush();
            }
        } finally {
            ReferenceCountUtil.release(msg); // (2)
        }
    }

1.這個(gè)低效的循環(huán)事實(shí)上可以簡(jiǎn)化為:System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))

2.或者,你可以在這里調(diào)用 in.release()。

如果你再次運(yùn)行 telnet 命令,你將會(huì)看到服務(wù)端打印出了他所接收到的消息。

完整的discard server代碼放在了io.netty.example.discard包下面。

譯者注:翻譯版本的項(xiàng)目源碼見(jiàn) https://github.com/waylau/netty-4-user-guide-demos 中的com.waylau.netty.demo.discard 包下

寫(xiě)個(gè)應(yīng)答服務(wù)器

到目前為止,我們雖然接收到了數(shù)據(jù),但沒(méi)有做任何的響應(yīng)。然而一個(gè)服務(wù)端通常會(huì)對(duì)一個(gè)請(qǐng)求作出響應(yīng)。讓我們學(xué)習(xí)怎樣在 ECHO 協(xié)議的實(shí)現(xiàn)下編寫(xiě)一個(gè)響應(yīng)消息給客戶端,這個(gè)協(xié)議針對(duì)任何接收的數(shù)據(jù)都會(huì)返回一個(gè)響應(yīng)。

discard server 唯一不同的是把在此之前我們實(shí)現(xiàn)的 channelRead() 方法,返回所有的數(shù)據(jù)替代打印接收數(shù)據(jù)到控制臺(tái)上的邏輯。因此,需要把 channelRead() 方法修改如下:

 
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg); // (1)
        ctx.flush(); // (2)
    }
  1. ChannelHandlerContext 對(duì)象提供了許多操作,使你能夠觸發(fā)各種各樣的 I/O 事件和操作。這里我們調(diào)用了 write(Object) 方法來(lái)逐字地把接受到的消息寫(xiě)入。請(qǐng)注意不同于 DISCARD 的例子我們并沒(méi)有釋放接受到的消息,這是因?yàn)楫?dāng)寫(xiě)入的時(shí)候 Netty 已經(jīng)幫我們釋放了。
  2. ctx.write(Object) 方法不會(huì)使消息寫(xiě)入到通道上,他被緩沖在了內(nèi)部,你需要調(diào)用 ctx.flush() 方法來(lái)把緩沖區(qū)中數(shù)據(jù)強(qiáng)行輸出?;蛘吣憧梢杂酶?jiǎn)潔的 cxt.writeAndFlush(msg) 以達(dá)到同樣的目的。

如果你再一次運(yùn)行 telnet 命令,你會(huì)看到服務(wù)端會(huì)發(fā)回一個(gè)你已經(jīng)發(fā)送的消息。

完整的echo服務(wù)的代碼放在了 io.netty.example.echo包下面。

譯者注:翻譯版本的項(xiàng)目源碼見(jiàn) https://github.com/waylau/netty-4-user-guide-demos 中的com.waylau.netty.demo.echo 包下

寫(xiě)個(gè)時(shí)間服務(wù)器

在這個(gè)部分被實(shí)現(xiàn)的協(xié)議是 TIME 協(xié)議。和之前的例子不同的是在不接受任何請(qǐng)求時(shí)他會(huì)發(fā)送一個(gè)含32位的整數(shù)的消息,并且一旦消息發(fā)送就會(huì)立即關(guān)閉連接。在這個(gè)例子中,你會(huì)學(xué)習(xí)到如何構(gòu)建和發(fā)送一個(gè)消息,然后在完成時(shí)關(guān)閉連接。

因?yàn)槲覀儗?huì)忽略任何接收到的數(shù)據(jù),而只是在連接被創(chuàng)建發(fā)送一個(gè)消息,所以這次我們不能使用 channelRead() 方法了,代替他的是,我們需要覆蓋 channelActive() 方法,下面的就是實(shí)現(xiàn)的內(nèi)容:

    public class TimeServerHandler extends ChannelInboundHandlerAdapter {
 
        @Override
        public void channelActive(final ChannelHandlerContext ctx) { // (1)
            final ByteBuf time = ctx.alloc().buffer(4); // (2)
            time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
 
            final ChannelFuture f = ctx.writeAndFlush(time); // (3)
            f.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    assert f == future;
                    ctx.close();
                }
            }); // (4)
        }
 
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }

1.channelActive() 方法將會(huì)在連接被建立并且準(zhǔn)備進(jìn)行通信時(shí)被調(diào)用。因此讓我們?cè)谶@個(gè)方法里完成一個(gè)代表當(dāng)前時(shí)間的32位整數(shù)消息的構(gòu)建工作。

2.為了發(fā)送一個(gè)新的消息,我們需要分配一個(gè)包含這個(gè)消息的新的緩沖。因?yàn)槲覀冃枰獙?xiě)入一個(gè)32位的整數(shù),因此我們需要一個(gè)至少有4個(gè)字節(jié)的 ByteBuf。通過(guò) ChannelHandlerContext.alloc() 得到一個(gè)當(dāng)前的ByteBufAllocator,然后分配一個(gè)新的緩沖。

3.和往常一樣我們需要編寫(xiě)一個(gè)構(gòu)建好的消息。但是等一等,flip 在哪?難道我們使用 NIO 發(fā)送消息時(shí)不是調(diào)用 java.nio.ByteBuffer.flip() 嗎?ByteBuf 之所以沒(méi)有這個(gè)方法因?yàn)橛袃蓚€(gè)指針,一個(gè)對(duì)應(yīng)讀操作一個(gè)對(duì)應(yīng)寫(xiě)操作。當(dāng)你向 ByteBuf 里寫(xiě)入數(shù)據(jù)的時(shí)候?qū)懼羔樀乃饕蜁?huì)增加,同時(shí)讀指針的索引沒(méi)有變化。讀指針?biāo)饕蛯?xiě)指針?biāo)饕謩e代表了消息的開(kāi)始和結(jié)束。

比較起來(lái),NIO 緩沖并沒(méi)有提供一種簡(jiǎn)潔的方式來(lái)計(jì)算出消息內(nèi)容的開(kāi)始和結(jié)尾,除非你調(diào)用 flip方法。當(dāng)你忘記調(diào)用 flip 方法而引起沒(méi)有數(shù)據(jù)或者錯(cuò)誤數(shù)據(jù)被發(fā)送時(shí),你會(huì)陷入困境。這樣的一個(gè)錯(cuò)誤不會(huì)發(fā)生在 Netty 上,因?yàn)槲覀儗?duì)于不同的操作類型有不同的指針。你會(huì)發(fā)現(xiàn)這樣的使用方法會(huì)讓你過(guò)程變得更加的容易,因?yàn)槟阋呀?jīng)習(xí)慣一種沒(méi)有使用 flip 的方式。

另外一個(gè)點(diǎn)需要注意的是 ChannelHandlerContext.write() (和 writeAndFlush() )方法會(huì)返回一個(gè) ChannelFuture 對(duì)象,一個(gè) ChannelFuture 代表了一個(gè)還沒(méi)有發(fā)生的 I/O 操作。這意味著任何一個(gè)請(qǐng)求操作都不會(huì)馬上被執(zhí)行,因?yàn)樵? Netty 里所有的操作都是異步的。舉個(gè)例子下面的代碼中在消息被發(fā)送之前可能會(huì)先關(guān)閉連接。

 
    Channel ch = ...;
    ch.writeAndFlush(message);
    ch.close();

因此你需要在 write() 方法返回的 ChannelFuture 完成后調(diào)用 close() 方法,然后當(dāng)他的寫(xiě)操作已經(jīng)完成他會(huì)通知他的監(jiān)聽(tīng)者。請(qǐng)注意,close() 方法也可能不會(huì)立馬關(guān)閉,他也會(huì)返回一個(gè)ChannelFuture

4.當(dāng)一個(gè)寫(xiě)請(qǐng)求已經(jīng)完成是如何通知到我們?這個(gè)只需要簡(jiǎn)單地在返回的 ChannelFuture 上增加一個(gè)ChannelFutureListener。這里我們構(gòu)建了一個(gè)匿名的 ChannelFutureListener 類用來(lái)在操作完成時(shí)關(guān)閉 Channel。

或者,你可以使用簡(jiǎn)單的預(yù)定義監(jiān)聽(tīng)器代碼:

    f.addListener(ChannelFutureListener.CLOSE);

為了測(cè)試我們的time服務(wù)如我們期望的一樣工作,你可以使用 UNIX 的 rdate 命令

    $ rdate -o <port> -p <host>

Port 是你在main()函數(shù)中指定的端口,host 使用 locahost 就可以了。

寫(xiě)個(gè)時(shí)間客戶端

不像 DISCARD 和 ECHO 的服務(wù)端,對(duì)于 TIME 協(xié)議我們需要一個(gè)客戶端,因?yàn)槿藗儾荒馨岩粋€(gè)32位的二進(jìn)制數(shù)據(jù)翻譯成一個(gè)日期或者日歷。在這一部分,我們將會(huì)討論如何確保服務(wù)端是正常工作的,并且學(xué)習(xí)怎樣用Netty 編寫(xiě)一個(gè)客戶端。

在 Netty 中,編寫(xiě)服務(wù)端和客戶端最大的并且唯一不同的使用了不同的BootStrapChannel的實(shí)現(xiàn)。請(qǐng)看一下下面的代碼:

    public class TimeClient {
 
        public static void main(String[] args) throws Exception {
 
            String host = args[0];
            int port = Integer.parseInt(args[1]);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
 
            try {
                Bootstrap b = new Bootstrap(); // (1)
                b.group(workerGroup); // (2)
                b.channel(NioSocketChannel.class); // (3)
                b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
                b.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new TimeClientHandler());
                    }
                });
 
                // 啟動(dòng)客戶端
                ChannelFuture f = b.connect(host, port).sync(); // (5)
 
                // 等待連接關(guān)閉
                f.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
            }
        }
    }

1.BootStrap 和 ServerBootstrap 類似,不過(guò)他是對(duì)非服務(wù)端的 channel 而言,比如客戶端或者無(wú)連接傳輸模式的 channel。

2.如果你只指定了一個(gè) EventLoopGroup,那他就會(huì)即作為一個(gè) boss group ,也會(huì)作為一個(gè) workder group,盡管客戶端不需要使用到 boss worker

3.代替NioServerSocketChannel的是NioSocketChannel,這個(gè)類在客戶端channel 被創(chuàng)建時(shí)使用。

4.不像在使用 ServerBootstrap 時(shí)需要用 childOption() 方法,因?yàn)榭蛻舳说?SocketChannel 沒(méi)有父親。

5.我們用 connect() 方法代替了 bind() 方法。

正如你看到的,他和服務(wù)端的代碼是不一樣的。ChannelHandler 是如何實(shí)現(xiàn)的?他應(yīng)該從服務(wù)端接受一個(gè)32位的整數(shù)消息,把他翻譯成人們能讀懂的格式,并打印翻譯好的時(shí)間,最后關(guān)閉連接:

    import java.util.Date;
 
    public class TimeClientHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ByteBuf m = (ByteBuf) msg; // (1)
            try {
                long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
                System.out.println(new Date(currentTimeMillis));
                ctx.close();
            } finally {
                m.release();
            }
        }
 
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }

1.在TCP/IP中,Netty 會(huì)把讀到的數(shù)據(jù)放到 ByteBuf 的數(shù)據(jù)結(jié)構(gòu)中。

這樣看起來(lái)非常簡(jiǎn)單,并且和服務(wù)端的那個(gè)例子的代碼也相差不多。然而,處理器有時(shí)候會(huì)因?yàn)閽伋?IndexOutOfBoundsException 而拒絕工作。在下個(gè)部分我們會(huì)討論為什么會(huì)發(fā)生這種情況。

處理一個(gè)基于流的傳輸

關(guān)于 Socket Buffer的一個(gè)小警告

基于流的傳輸比如 TCP/IP, 接收到數(shù)據(jù)是存在 socket 接收的 buffer 中。不幸的是,基于流的傳輸并不是一個(gè)數(shù)據(jù)包隊(duì)列,而是一個(gè)字節(jié)隊(duì)列。意味著,即使你發(fā)送了2個(gè)獨(dú)立的數(shù)據(jù)包,操作系統(tǒng)也不會(huì)作為2個(gè)消息處理而僅僅是作為一連串的字節(jié)而言。因此這是不能保證你遠(yuǎn)程寫(xiě)入的數(shù)據(jù)就會(huì)準(zhǔn)確地讀取。舉個(gè)例子,讓我們假設(shè)操作系統(tǒng)的 TCP/TP 協(xié)議棧已經(jīng)接收了3個(gè)數(shù)據(jù)包:

由于基于流傳輸?shù)膮f(xié)議的這種普通的性質(zhì),在你的應(yīng)用程序里讀取數(shù)據(jù)的時(shí)候會(huì)有很高的可能性被分成下面的片段

因此,一個(gè)接收方不管他是客戶端還是服務(wù)端,都應(yīng)該把接收到的數(shù)據(jù)整理成一個(gè)或者多個(gè)更有意思并且能夠讓程序的業(yè)務(wù)邏輯更好理解的數(shù)據(jù)。在上面的例子中,接收到的數(shù)據(jù)應(yīng)該被構(gòu)造成下面的格式:

The First Solution 辦法一

回到 TIME 客戶端例子。同樣也有類似的問(wèn)題。一個(gè)32位整型是非常小的數(shù)據(jù),他并不見(jiàn)得會(huì)被經(jīng)常拆分到到不同的數(shù)據(jù)段內(nèi)。然而,問(wèn)題是他確實(shí)可能會(huì)被拆分到不同的數(shù)據(jù)段內(nèi),并且拆分的可能性會(huì)隨著通信量的增加而增加。

最簡(jiǎn)單的方案是構(gòu)造一個(gè)內(nèi)部的可積累的緩沖,直到4個(gè)字節(jié)全部接收到了內(nèi)部緩沖。下面的代碼修改了 TimeClientHandler 的實(shí)現(xiàn)類修復(fù)了這個(gè)問(wèn)題

    public class TimeClientHandler extends ChannelInboundHandlerAdapter {
        private ByteBuf buf;
 
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) {
            buf = ctx.alloc().buffer(4); // (1)
        }
 
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) {
            buf.release(); // (1)
            buf = null;
        }
 
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ByteBuf m = (ByteBuf) msg;
            buf.writeBytes(m); // (2)
            m.release();
 
            if (buf.readableBytes() >= 4) { // (3)
                long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
                System.out.println(new Date(currentTimeMillis));
                ctx.close();
            }
        }
 
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }

1.ChannelHandler 有2個(gè)生命周期的監(jiān)聽(tīng)方法:handlerAdded()handlerRemoved()。你可以完成任意初始化任務(wù)只要他不會(huì)被阻塞很長(zhǎng)的時(shí)間。

2.首先,所有接收的數(shù)據(jù)都應(yīng)該被累積在 buf 變量里。

3.然后,處理器必須檢查 buf 變量是否有足夠的數(shù)據(jù),在這個(gè)例子中是4個(gè)字節(jié),然后處理實(shí)際的業(yè)務(wù)邏輯。否則,Netty 會(huì)重復(fù)調(diào)用channelRead() 當(dāng)有更多數(shù)據(jù)到達(dá)直到4個(gè)字節(jié)的數(shù)據(jù)被積累。

The Second Solution 方法二

盡管第一個(gè)解決方案已經(jīng)解決了 TIME 客戶端的問(wèn)題了,但是修改后的處理器看起來(lái)不那么的簡(jiǎn)潔,想象一下如果由多個(gè)字段比如可變長(zhǎng)度的字段組成的更為復(fù)雜的協(xié)議時(shí),你的 ChannelInboundHandler 的實(shí)現(xiàn)將很快地變得難以維護(hù)。

正如你所知的,你可以增加多個(gè) ChannelHandlerChannelPipeline ,因此你可以把一整個(gè)ChannelHandler/ 拆分成多個(gè)模塊以減少應(yīng)用的復(fù)雜程度,比如你可以把TimeClientHandler 拆分成2個(gè)處理器:

  • TimeDecoder 處理數(shù)據(jù)拆分的問(wèn)題
  • TimeClientHandler 原始版本的實(shí)現(xiàn)

幸運(yùn)地是,Netty 提供了一個(gè)可擴(kuò)展的類,幫你完成 TimeDecoder 的開(kāi)發(fā)。

    public class TimeDecoder extends ByteToMessageDecoder { // (1)
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
            if (in.readableBytes() < 4) {
                return; // (3)
            }
 
            out.add(in.readBytes(4)); // (4)
        }
    }
    

1.ByteToMessageDecoderChannelInboundHandler 的一個(gè)實(shí)現(xiàn)類,他可以在處理數(shù)據(jù)拆分的問(wèn)題上變得很簡(jiǎn)單。

2.每當(dāng)有新數(shù)據(jù)接收的時(shí)候,ByteToMessageDecoder 都會(huì)調(diào)用 decode() 方法來(lái)處理內(nèi)部的那個(gè)累積緩沖。

3.Decode() 方法可以決定當(dāng)累積緩沖里沒(méi)有足夠數(shù)據(jù)時(shí)可以往 out 對(duì)象里放任意數(shù)據(jù)。當(dāng)有更多的數(shù)據(jù)被接收了 ByteToMessageDecoder 會(huì)再一次調(diào)用 decode() 方法。

4.如果在 decode() 方法里增加了一個(gè)對(duì)象到 out 對(duì)象里,這意味著解碼器解碼消息成功。ByteToMessageDecoder 將會(huì)丟棄在累積緩沖里已經(jīng)被讀過(guò)的數(shù)據(jù)。請(qǐng)記得你不需要對(duì)多條消息調(diào)用 decode(),ByteToMessageDecoder 會(huì)持續(xù)調(diào)用 decode() 直到不放任何數(shù)據(jù)到 out 里。

現(xiàn)在我們有另外一個(gè)處理器插入到 ChannelPipeline 里,我們應(yīng)該在 TimeClient 里修改 ChannelInitializer 的實(shí)現(xiàn):

    b.handler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
        }
    });

如果你是一個(gè)大膽的人,你可能會(huì)嘗試使用更簡(jiǎn)單的解碼類ReplayingDecoder。不過(guò)你還是需要參考一下 API 文檔來(lái)獲取更多的信息。

    public class TimeDecoder extends ReplayingDecoder<Void> {
        @Override
        protected void decode(
                ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
            out.add(in.readBytes(4));
        }
    }

此外,Netty還提供了更多開(kāi)箱即用的解碼器使你可以更簡(jiǎn)單地實(shí)現(xiàn)更多的協(xié)議,幫助你避免開(kāi)發(fā)一個(gè)難以維護(hù)的處理器實(shí)現(xiàn)。請(qǐng)參考下面的包以獲取更多更詳細(xì)的例子:

譯者注:翻譯版本的項(xiàng)目源碼見(jiàn) https://github.com/waylau/netty-4-user-guide-demos 中的com.waylau.netty.demo.factorialcom.waylau.netty.demo.telnet 包下

用POJO代替ByteBuf

我們回顧了迄今為止的所有例子使用 ByteBuf 作為協(xié)議消息的主要數(shù)據(jù)結(jié)構(gòu)。在本節(jié)中,我們將改善的 TIME 協(xié)議客戶端和服務(wù)器例子,使用 POJO 代替 ByteBuf。

ChannelHandler 使用 POIO 的好處很明顯:通過(guò)從ChannelHandler 中提取出 ByteBuf 的代碼,將會(huì)使 ChannelHandler的實(shí)現(xiàn)變得更加可維護(hù)和可重用。在 TIME 客戶端和服務(wù)器的例子中,我們讀取的僅僅是一個(gè)32位的整形數(shù)據(jù),直接使用 ByteBuf 不會(huì)是一個(gè)主要的問(wèn)題。然而,你會(huì)發(fā)現(xiàn)當(dāng)你需要實(shí)現(xiàn)一個(gè)真實(shí)的協(xié)議,分離代碼變得非常的必要。

首先,讓我們定義一個(gè)新的類型叫做 UnixTime。

    public class UnixTime {
 
        private final long value;
 
        public UnixTime() {
            this(System.currentTimeMillis() / 1000L + 2208988800L);
        }
 
        public UnixTime(long value) {
            this.value = value;
        }
 
        public long value() {
            return value;
        }
 
        @Override
        public String toString() {
            return new Date((value() - 2208988800L) * 1000L).toString();
        }
    }

現(xiàn)在我們可以修改下 TimeDecoder 類,返回一個(gè) UnixTime,以替代ByteBuf

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        if (in.readableBytes() < 4) {
            return;
        }
 
        out.add(new UnixTime(in.readUnsignedInt()));
    }

下面是修改后的解碼器,TimeClientHandler 不再任何的 ByteBuf 代碼了。

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        UnixTime m = (UnixTime) msg;
        System.out.println(m);
        ctx.close();
    }

是不是變得更加簡(jiǎn)單和優(yōu)雅了?相同的技術(shù)可以被運(yùn)用到服務(wù)端。讓我們修改一下 TimeServerHandler 的代碼。

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ChannelFuture f = ctx.writeAndFlush(new UnixTime());
        f.addListener(ChannelFutureListener.CLOSE);
    }

現(xiàn)在,唯一缺少的功能是一個(gè)編碼器,是ChannelOutboundHandler的實(shí)現(xiàn),用來(lái)將 UnixTime 對(duì)象重新轉(zhuǎn)化為一個(gè) ByteBuf。這是比編寫(xiě)一個(gè)解碼器簡(jiǎn)單得多,因?yàn)闆](méi)有需要處理的數(shù)據(jù)包編碼消息時(shí)拆分和組裝。

    public class TimeEncoder extends ChannelOutboundHandlerAdapter {
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
            UnixTime m = (UnixTime) msg;
            ByteBuf encoded = ctx.alloc().buffer(4);
            encoded.writeInt((int)m.value());
            ctx.write(encoded, promise); // (1)
        }
    }

1.在這幾行代碼里還有幾個(gè)重要的事情。第一,通過(guò) ChannelPromise,當(dāng)編碼后的數(shù)據(jù)被寫(xiě)到了通道上 Netty 可以通過(guò)這個(gè)對(duì)象標(biāo)記是成功還是失敗。第二, 我們不需要調(diào)用 cxt.flush()。因?yàn)樘幚砥饕呀?jīng)單獨(dú)分離出了一個(gè)方法 void flush(ChannelHandlerContext cxt),如果像自己實(shí)現(xiàn) flush() 方法內(nèi)容可以自行覆蓋這個(gè)方法。

進(jìn)一步簡(jiǎn)化操作,你可以使用 MessageToByteEncode:

    public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
        @Override
        protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
            out.writeInt((int)msg.value());
        }
    }

最后的任務(wù)就是在 TimeServerHandler 之前把 TimeEncoder 插入到ChannelPipeline。 但這是不那么重要的工作。

關(guān)閉你的應(yīng)用

關(guān)閉一個(gè) Netty 應(yīng)用往往只需要簡(jiǎn)單地通過(guò) shutdownGracefully() 方法來(lái)關(guān)閉你構(gòu)建的所有的 EventLoopGroup。當(dāng)EventLoopGroup 被完全地終止,并且對(duì)應(yīng)的所有 channel 都已經(jīng)被關(guān)閉時(shí),Netty 會(huì)返回一個(gè)Future對(duì)象來(lái)通知你。

總結(jié)

在這一章節(jié)中,我們快速地回顧下如果在熟練掌握 Netty 的情況下編寫(xiě)出一個(gè)健壯能運(yùn)行的網(wǎng)絡(luò)應(yīng)用程序。在 Netty 接下去的章節(jié)中還會(huì)有更多更相信的信息。我們也鼓勵(lì)你去重新復(fù)習(xí)下在 io.netty.example 包下的例子。請(qǐng)注意社區(qū)一直在等待你的問(wèn)題和想法以幫助 Netty 的持續(xù)改進(jìn),Netty 的文檔也是基于你們的快速反饋上。

譯者注:翻譯版本的項(xiàng)目源碼見(jiàn) https://github.com/waylau/netty-4-user-guide-demos。如對(duì)本翻譯有任何建議,可以在https://github.com/waylau/netty-4-user-guide/issues留言

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)