Netty如何寫(xiě)一個(gè) echo 服務(wù)器

2018-08-02 14:09 更新

Netty 實(shí)現(xiàn)的 echo 服務(wù)器都需要下面這些:

  • 一個(gè)服務(wù)器 handler:這個(gè)組件實(shí)現(xiàn)了服務(wù)器的業(yè)務(wù)邏輯,決定了連接創(chuàng)建后和接收到信息后該如何處理
  • Bootstrapping: 這個(gè)是配置服務(wù)器的啟動(dòng)代碼。最少需要設(shè)置服務(wù)器綁定的端口,用來(lái)監(jiān)聽(tīng)連接請(qǐng)求。

通過(guò) ChannelHandler 來(lái)實(shí)現(xiàn)服務(wù)器的邏輯

Echo Server 將會(huì)將接受到的數(shù)據(jù)的拷貝發(fā)送給客戶端。因此,我們需要實(shí)現(xiàn) ChannelInboundHandler 接口,用來(lái)定義處理入站事件的方法。由于我們的應(yīng)用很簡(jiǎn)單,只需要繼承 ChannelInboundHandlerAdapter 就行了。這個(gè)類 提供了默認(rèn) ChannelInboundHandler 的實(shí)現(xiàn),所以只需要覆蓋下面的方法:

  • channelRead() - 每個(gè)信息入站都會(huì)調(diào)用
  • channelReadComplete() - 通知處理器最后的 channelread() 是當(dāng)前批處理中的最后一條消息時(shí)調(diào)用
  • exceptionCaught()- 讀操作時(shí)捕獲到異常時(shí)調(diào)用

EchoServerHandler 代碼如下:

Listing 2.2 EchoServerHandler

@Sharable                                        //1
public class EchoServerHandler extends
        ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx,
        Object msg) {
        ByteBuf in = (ByteBuf) msg;
        System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));        //2
        ctx.write(in);                            //3
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)//4
        .addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,
        Throwable cause) {
        cause.printStackTrace();                //5
        ctx.close();                            //6
    }
}

1.@Sharable 標(biāo)識(shí)這類的實(shí)例之間可以在 channel 里面共享

2.日志消息輸出到控制臺(tái)

3.將所接收的消息返回給發(fā)送者。注意,這還沒(méi)有沖刷數(shù)據(jù)

4.沖刷所有待審消息到遠(yuǎn)程節(jié)點(diǎn)。關(guān)閉通道后,操作完成

5.打印異常堆棧跟蹤

6.關(guān)閉通道

這種使用 ChannelHandler 的方式體現(xiàn)了關(guān)注點(diǎn)分離的設(shè)計(jì)原則,并簡(jiǎn)化業(yè)務(wù)邏輯的迭代開(kāi)發(fā)的要求。處理程序很簡(jiǎn)單;它的每一個(gè)方法可以覆蓋到“hook(鉤子)”在活動(dòng)周期適當(dāng)?shù)狞c(diǎn)。很顯然,我們覆蓋 channelRead因?yàn)槲覀冃枰幚硭薪邮盏降臄?shù)據(jù)。

覆蓋 exceptionCaught 使我們能夠應(yīng)對(duì)任何 Throwable 的子類型。在這種情況下我們記錄,并關(guān)閉所有可能處于未知狀態(tài)的連接。它通常是難以 從連接錯(cuò)誤中恢復(fù),所以干脆關(guān)閉遠(yuǎn)程連接。當(dāng)然,也有可能的情況是可以從錯(cuò)誤中恢復(fù)的,所以可以用一個(gè)更復(fù)雜的措施來(lái)嘗試識(shí)別和處理 這樣的情況。

如果異常沒(méi)有被捕獲,會(huì)發(fā)生什么?

每個(gè) Channel 都有一個(gè)關(guān)聯(lián)的 ChannelPipeline,它代表了 ChannelHandler 實(shí)例的鏈。適配器處理的實(shí)現(xiàn)只是將一個(gè)處理方法調(diào)用轉(zhuǎn)發(fā)到鏈中的下一個(gè)處理器。因此,如果一個(gè) Netty 應(yīng)用程序不覆蓋exceptionCaught ,那么這些錯(cuò)誤將最終到達(dá) ChannelPipeline,并且結(jié)束警告將被記錄。出于這個(gè)原因,你應(yīng)該提供至少一個(gè) 實(shí)現(xiàn) exceptionCaught 的 ChannelHandler。

關(guān)鍵點(diǎn)要牢記:

  • ChannelHandler 是給不同類型的事件調(diào)用
  • 應(yīng)用程序?qū)崿F(xiàn)或擴(kuò)展 ChannelHandler 掛接到事件生命周期和 提供自定義應(yīng)用邏輯。

引導(dǎo)服務(wù)器

了解到業(yè)務(wù)核心處理邏輯 EchoServerHandler 后,下面要引導(dǎo)服務(wù)器自身了。

  • 監(jiān)聽(tīng)和接收進(jìn)來(lái)的連接請(qǐng)求
  • 配置 Channel 來(lái)通知一個(gè)關(guān)于入站消息的 EchoServerHandler 實(shí)例

Transport(傳輸)

在本節(jié)中,你會(huì)遇到“transport(傳輸)”一詞。在網(wǎng)絡(luò)的多層視圖協(xié)議里面,傳輸層提供了用于端至端或主機(jī)到主機(jī)的通信服務(wù)。互聯(lián)網(wǎng)通信的基礎(chǔ)是 TCP 傳輸。當(dāng)我們使用術(shù)語(yǔ)“NIO transport”我們指的是一個(gè)傳輸?shù)膶?shí)現(xiàn),它是大多等同于 TCP ,除了一些由 Java NIO 的實(shí)現(xiàn)提供了服務(wù)器端的性能增強(qiáng)。Transport 詳細(xì)在第4章中討論。

Listing 2.3 EchoServer

public class EchoServer {

    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }
        public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            System.err.println(
                    "Usage: " + EchoServer.class.getSimpleName() +
                    " <port>");
            return;
        }
        int port = Integer.parseInt(args[0]);        //1
        new EchoServer(port).start();                //2
    }

    public void start() throws Exception {
        NioEventLoopGroup group = new NioEventLoopGroup(); //3
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)                                //4
             .channel(NioServerSocketChannel.class)        //5
             .localAddress(new InetSocketAddress(port))    //6
             .childHandler(new ChannelInitializer<SocketChannel>() { //7
                 @Override
                 public void initChannel(SocketChannel ch) 
                     throws Exception {
                     ch.pipeline().addLast(
                             new EchoServerHandler());
                 }
             });

            ChannelFuture f = b.bind().sync();            //8
            System.out.println(EchoServer.class.getName() + " started and listen on " + f.channel().localAddress());
            f.channel().closeFuture().sync();            //9
        } finally {
            group.shutdownGracefully().sync();            //10
        }
    }

}

1.設(shè)置端口值(拋出一個(gè) NumberFormatException 如果該端口參數(shù)的格式不正確)

2.呼叫服務(wù)器的 start() 方法

3.創(chuàng)建 EventLoopGroup

4.創(chuàng)建 ServerBootstrap

5.指定使用 NIO 的傳輸 Channel

6.設(shè)置 socket 地址使用所選的端口

7.添加 EchoServerHandler 到 Channel 的 ChannelPipeline

8.綁定的服務(wù)器;sync 等待服務(wù)器關(guān)閉

9.關(guān)閉 channel 和 塊,直到它被關(guān)閉

10.關(guān)機(jī)的 EventLoopGroup,釋放所有資源。

在這個(gè)例子中,代碼創(chuàng)建 ServerBootstrap 實(shí)例(步驟4)。由于我們使用在 NIO 傳輸,我們已指定 NioEventLoopGroup(3)接受和處理新連接,指定 NioServerSocketChannel(5)為信道類型。在此之后,我們?cè)O(shè)置本地地址是 InetSocketAddress 與所選擇的端口(6)如。服務(wù)器將綁定到此地址來(lái)監(jiān)聽(tīng)新的連接請(qǐng)求。

第7步是關(guān)鍵:在這里我們使用一個(gè)特殊的類,ChannelInitializer 。當(dāng)一個(gè)新的連接被接受,一個(gè)新的子 Channel 將被創(chuàng)建, ChannelInitializer 會(huì)添加我們EchoServerHandler 的實(shí)例到 Channel 的 ChannelPipeline。正如我們?nèi)缜八?,如果有入站信息,這個(gè)處理器將被通知。

雖然 NIO 是可擴(kuò)展性,但它的正確配置是不簡(jiǎn)單的。特別是多線程,要正確處理也非易事。幸運(yùn)的是,Netty 的設(shè)計(jì)封裝了大部分復(fù)雜性,尤其是通過(guò)抽象,例如 EventLoopGroup,SocketChannel 和 ChannelInitializer,其中每一個(gè)將在更詳細(xì)地在第3章中討論。

在步驟8,我們綁定的服務(wù)器,等待綁定完成。 (調(diào)用 sync() 的原因是當(dāng)前線程阻塞)在第9步的應(yīng)用程序?qū)⒌却?wù)器 Channel 關(guān)閉(因?yàn)槲覀?在 Channel 的 CloseFuture 上調(diào)用 sync())。現(xiàn)在,我們可以關(guān)閉下 EventLoopGroup 并釋放所有資源,包括所有創(chuàng)建的線程(10)。

NIO 用于在本實(shí)施例,因?yàn)樗悄壳白顝V泛使用的傳輸,歸功于它的可擴(kuò)展性和徹底的不同步。但不同的傳輸?shù)膶?shí)現(xiàn)是也是可能的。例如,如果本實(shí)施例中使用的 OIO 傳輸,我們將指定 OioServerSocketChannel 和 OioEventLoopGroup。 Netty 的架構(gòu),包括更關(guān)于傳輸信息,將包含在第4章。在此期間,讓我們回顧下在服務(wù)器上執(zhí)行,我們只研究重要步驟。

服務(wù)器的主代碼組件是

  • EchoServerHandler 實(shí)現(xiàn)了的業(yè)務(wù)邏輯
  • 在 main() 方法,引導(dǎo)了服務(wù)器

執(zhí)行后者所需的步驟是:

  • 創(chuàng)建 ServerBootstrap 實(shí)例來(lái)引導(dǎo)服務(wù)器并隨后綁定
  • 創(chuàng)建并分配一個(gè) NioEventLoopGroup 實(shí)例來(lái)處理事件的處理,如接受新的連接和讀/寫(xiě)數(shù)據(jù)。
  • 指定本地 InetSocketAddress 給服務(wù)器綁定
  • 通過(guò) EchoServerHandler 實(shí)例給每一個(gè)新的 Channel 初始化
  • 最后調(diào)用 ServerBootstrap.bind() 綁定服務(wù)器

這樣服務(wù)器的初始化就完成了,并可以被使用。


以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)