Netty 4.x Netty 實現(xiàn)聊天功能

2018-10-26 09:54 更新

Netty 實現(xiàn)聊天功能

Netty 是一個 Java NIO 客戶端服務器框架,使用它可以快速簡單地開發(fā)網(wǎng)絡應用程序,比如服務器和客戶端的協(xié)議。Netty 大大簡化了網(wǎng)絡程序的開發(fā)過程比如 TCP 和 UDP 的 socket 服務的開發(fā)。更多關于 Netty 的知識,可以參閱《Netty 4.x 用戶指南》https://github.com/waylau/netty-4-user-guide

下面,就基于 Netty 快速實現(xiàn)一個聊天小程序。

準備

  • JDK 7+
  • Maven 3.2.x
  • Netty 4.x
  • Eclipse 4.x

服務端

讓我們從 handler (處理器)的實現(xiàn)開始,handler 是由 Netty 生成用來處理 I/O 事件的。

SimpleChatServerHandler.java

public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String> { // (1)

    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {  // (2)
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入\n");
        }
        channels.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {  // (3)
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 離開\n");
        }
        channels.remove(ctx.channel());
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { // (4)
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            if (channel != incoming){
                channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + s + "\n");
            } else {
                channel.writeAndFlush("[you]" + s + "\n");
            }
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5)
        Channel incoming = ctx.channel();
        System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"在線");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6)
        Channel incoming = ctx.channel();
        System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"掉線");
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (7)
        Channel incoming = ctx.channel();
        System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"異常");
        // 當出現(xiàn)異常就關閉連接
        cause.printStackTrace();
        ctx.close();
    }
}

1.SimpleChatServerHandler 繼承自 SimpleChannelInboundHandler,這個類實現(xiàn)了ChannelInboundHandler接口,ChannelInboundHandler 提供了許多事件處理的接口方法,然后你可以覆蓋這些方法。現(xiàn)在僅僅只需要繼承 SimpleChannelInboundHandler 類而不是你自己去實現(xiàn)接口方法。

2.覆蓋了 handlerAdded() 事件處理方法。每當從服務端收到新的客戶端連接時,客戶端的 Channel 存入ChannelGroup列表中,并通知列表中的其他客戶端 Channel

3.覆蓋了 handlerRemoved() 事件處理方法。每當從服務端收到客戶端斷開時,客戶端的 Channel 移除 ChannelGroup 列表中,并通知列表中的其他客戶端 Channel

4.覆蓋了 channelRead0() 事件處理方法。每當從服務端讀到客戶端寫入信息時,將信息轉(zhuǎn)發(fā)給其他客戶端的 Channel。其中如果你使用的是 Netty 5.x 版本時,需要把 channelRead0() 重命名為messageReceived()

5.覆蓋了 channelActive() 事件處理方法。服務端監(jiān)聽到客戶端活動

6.覆蓋了 channelInactive() 事件處理方法。服務端監(jiān)聽到客戶端不活動

7.exceptionCaught() 事件處理方法是當出現(xiàn) Throwable 對象才會被調(diào)用,即當 Netty 由于 IO 錯誤或者處理器在處理事件時拋出的異常時。在大部分情況下,捕獲的異常應該被記錄下來并且把關聯(lián)的 channel 給關閉掉。然而這個方法的處理方式會在遇到不同異常的情況下有不同的實現(xiàn),比如你可能想在關閉連接之前發(fā)送一個錯誤碼的響應消息。

SimpleChatServerInitializer.java

SimpleChatServerInitializer 用來增加多個的處理類到 ChannelPipeline 上,包括編碼、解碼、SimpleChatServerHandler 等。

public class SimpleChatServerInitializer extends
        ChannelInitializer<SocketChannel> {

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast("handler", new SimpleChatServerHandler());

        System.out.println("SimpleChatClient:"+ch.remoteAddress() +"連接上");
    }
}

SimpleChatServer.java

編寫一個 main() 方法來啟動服務端。

public class SimpleChatServer {

    private int port;

    public SimpleChatServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new SimpleChatServerInitializer())  //(4)
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

            System.out.println("SimpleChatServer 啟動了");

            // 綁定端口,開始接收進來的連接
            ChannelFuture f = b.bind(port).sync(); // (7)

            // 等待服務器  socket 關閉 。
            // 在這個例子中,這不會發(fā)生,但你可以優(yōu)雅地關閉你的服務器。
            f.channel().closeFuture().sync();

        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();

            System.out.println("SimpleChatServer 關閉了");
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new SimpleChatServer(port).run();

    }
}

1.NioEventLoopGroup是用來處理I/O操作的多線程事件循環(huán)器,Netty 提供了許多不同的EventLoopGroup的實現(xiàn)用來處理不同的傳輸。在這個例子中我們實現(xiàn)了一個服務端的應用,因此會有2個 NioEventLoopGroup 會被使用。第一個經(jīng)常被叫做‘boss’,用來接收進來的連接。第二個經(jīng)常被叫做‘worker’,用來處理已經(jīng)被接收的連接,一旦‘boss’接收到連接,就會把連接信息注冊到‘worker’上。如何知道多少個線程已經(jīng)被使用,如何映射到已經(jīng)創(chuàng)建的 Channel上都需要依賴于 EventLoopGroup 的實現(xiàn),并且可以通過構造函數(shù)來配置他們的關系。

2.ServerBootstrap是一個啟動 NIO 服務的輔助啟動類。你可以在這個服務中直接使用 Channel,但是這會是一個復雜的處理過程,在很多情況下你并不需要這樣做。

3.這里我們指定使用NioServerSocketChannel類來舉例說明一個新的 Channel 如何接收進來的連接。

4.這里的事件處理類經(jīng)常會被用來處理一個最近的已經(jīng)接收的 Channel。SimpleChatServerInitializer 繼承自ChannelInitializer是一個特殊的處理類,他的目的是幫助使用者配置一個新的 Channel。也許你想通過增加一些處理類比如 SimpleChatServerHandler 來配置一個新的 Channel 或者其對應的ChannelPipeline來實現(xiàn)你的網(wǎng)絡程序。當你的程序變的復雜時,可能你會增加更多的處理類到 pipline 上,然后提取這些匿名類到最頂層的類上。

5.你可以設置這里指定的 Channel 實現(xiàn)的配置參數(shù)。我們正在寫一個TCP/IP 的服務端,因此我們被允許設置 socket 的參數(shù)選項比如tcpNoDelay 和 keepAlive。請參考ChannelOption和詳細的ChannelConfig實現(xiàn)的接口文檔以此可以對ChannelOption 的有一個大概的認識。

6.option() 是提供給NioServerSocketChannel用來接收進來的連接。childOption() 是提供給由父管道ServerChannel接收到的連接,在這個例子中也是 NioServerSocketChannel。

7.我們繼續(xù),剩下的就是綁定端口然后啟動服務。這里我們在機器上綁定了機器所有網(wǎng)卡上的 8080 端口。當然現(xiàn)在你可以多次調(diào)用 bind() 方法(基于不同綁定地址)。

恭喜!你已經(jīng)完成了基于 Netty 聊天服務端程序。

客戶端

SimpleChatClientHandler.java

客戶端的處理類比較簡單,只需要將讀到的信息打印出來即可

public class SimpleChatClientHandler extends  SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
        System.out.println(s);
    }
}

SimpleChatClientInitializer.java

與服務端類似

public class SimpleChatClientInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast("handler", new SimpleChatClientHandler());
    }
}

SimpleChatClient.java

編寫一個 main() 方法來啟動客戶端。

public class SimpleChatClient {
    public static void main(String[] args) throws Exception{
            new SimpleChatClient("localhost", 8080).run();
        }

        private final String host;
        private final int port;

        public SimpleChatClient(String host, int port){
            this.host = host;
            this.port = port;
        }

        public void run() throws Exception{
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap  = new Bootstrap()
                        .group(group)
                        .channel(NioSocketChannel.class)
                        .handler(new SimpleChatClientInitializer());
                Channel channel = bootstrap.connect(host, port).sync().channel();
                BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
                while(true){
                    channel.writeAndFlush(in.readLine() + "\r\n");
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                group.shutdownGracefully();
            }

        }
    }

}

運行效果

先運行 SimpleChatServer,再可以運行多個 SimpleChatClient,控制臺輸入文本繼續(xù)測試

源碼

https://github.com/waylau/netty-4-user-guide-demos中 simplechat

參考

Netty 4.x 用戶指南https://github.com/waylau/netty-4-user-guide

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號