Netty 4.x Netty 實(shí)現(xiàn) WebSocket 聊天功能

2018-10-26 09:54 更新

Netty 實(shí)現(xiàn) WebSocket 聊天功能

上一次我們用Netty快速實(shí)現(xiàn)了一個(gè) Java 聊天程序?,F(xiàn)在,我們要做下修改,加入 WebSocket 的支持,使它可以在瀏覽器里進(jìn)行文本聊天。

準(zhǔn)備

  • JDK 7+
  • Maven 3.2.x
  • Netty 4.x
  • Eclipse 4.x

WebSocket

WebSocket 通過(guò)“Upgrade handshake(升級(jí)握手)”從標(biāo)準(zhǔn)的 HTTP 或HTTPS 協(xié)議轉(zhuǎn)為 WebSocket。因此,使用 WebSocket 的應(yīng)用程序?qū)⑹冀K以 HTTP/S 開(kāi)始,然后進(jìn)行升級(jí)。在什么時(shí)候發(fā)生這種情況取決于具體的應(yīng)用;它可以是在啟動(dòng)時(shí),或當(dāng)一個(gè)特定的 URL 被請(qǐng)求時(shí)。

在我們的應(yīng)用中,當(dāng) URL 請(qǐng)求以“/ws”結(jié)束時(shí),我們才升級(jí)協(xié)議為WebSocket。否則,服務(wù)器將使用基本的 HTTP/S。一旦升級(jí)連接將使用的WebSocket 傳輸所有數(shù)據(jù)。

整個(gè)服務(wù)器邏輯如下:

1.客戶端/用戶連接到服務(wù)器并加入聊天

2.HTTP 請(qǐng)求頁(yè)面或 WebSocket 升級(jí)握手

3.服務(wù)器處理所有客戶端/用戶

4.響應(yīng) URI “/”的請(qǐng)求,轉(zhuǎn)到默認(rèn) html 頁(yè)面

5.如果訪問(wèn)的是 URI“/ws” ,處理 WebSocket 升級(jí)握手

6.升級(jí)握手完成后 ,通過(guò) WebSocket 發(fā)送聊天消息

服務(wù)端

讓我們從處理 HTTP 請(qǐng)求的實(shí)現(xiàn)開(kāi)始。

處理 HTTP 請(qǐng)求

HttpRequestHandler.java

public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> { //1
    private final String wsUri;
    private static final File INDEX;

    static {
        URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();
        try {
            String path = location.toURI() + "WebsocketChatClient.html";
            path = !path.contains("file:") ? path : path.substring(5);
            INDEX = new File(path);
        } catch (URISyntaxException e) {
            throw new IllegalStateException("Unable to locate WebsocketChatClient.html", e);
        }
    }

    public HttpRequestHandler(String wsUri) {
        this.wsUri = wsUri;
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        if (wsUri.equalsIgnoreCase(request.getUri())) {
            ctx.fireChannelRead(request.retain());                  //2
        } else {
            if (HttpHeaders.is100ContinueExpected(request)) {
                send100Continue(ctx);                               //3
            }

            RandomAccessFile file = new RandomAccessFile(INDEX, "r");//4

            HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK);
            response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");

            boolean keepAlive = HttpHeaders.isKeepAlive(request);

            if (keepAlive) {                                        //5
                response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
                response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
            }
            ctx.write(response);                    //6

            if (ctx.pipeline().get(SslHandler.class) == null) {     //7
                ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
            } else {
                ctx.write(new ChunkedNioFile(file.getChannel()));
            }
            ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);           //8
            if (!keepAlive) {
                future.addListener(ChannelFutureListener.CLOSE);        //9
            }

            file.close();
        }
    }

    private static void send100Continue(ChannelHandlerContext ctx) {
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
        ctx.writeAndFlush(response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("Client:"+incoming.remoteAddress()+"異常");
        // 當(dāng)出現(xiàn)異常就關(guān)閉連接
        cause.printStackTrace();
        ctx.close();
    }
}

1.擴(kuò)展 SimpleChannelInboundHandler 用于處理 FullHttpRequest信息

2.如果請(qǐng)求是 WebSocket 升級(jí),遞增引用計(jì)數(shù)器(保留)并且將它傳遞給在 ChannelPipeline 中的下個(gè) ChannelInboundHandler

3.處理符合 HTTP 1.1的 "100 Continue" 請(qǐng)求

4.讀取默認(rèn)的 WebsocketChatClient.html 頁(yè)面

5.判斷 keepalive 是否在請(qǐng)求頭里面

6.寫 HttpResponse 到客戶端

7.寫 index.html 到客戶端,判斷 SslHandler 是否在 ChannelPipeline 來(lái)決定是使用 DefaultFileRegion 還是 ChunkedNioFile

8.寫并刷新 LastHttpContent 到客戶端,標(biāo)記響應(yīng)完成

9.如果 keepalive 沒(méi)有要求,當(dāng)寫完成時(shí),關(guān)閉 Channel

HttpRequestHandler 做了下面幾件事,

  • 如果該 HTTP 請(qǐng)求被發(fā)送到URI “/ws”,調(diào)用 FullHttpRequest 上的 retain(),并通過(guò)調(diào)用 fireChannelRead(msg) 轉(zhuǎn)發(fā)到下一個(gè) ChannelInboundHandler。retain() 是必要的,因?yàn)?channelRead() 完成后,它會(huì)調(diào)用 FullHttpRequest 上的 release() 來(lái)釋放其資源。 (請(qǐng)參考我們先前的 SimpleChannelInboundHandler 在第6章中討論)
  • 如果客戶端發(fā)送的 HTTP 1.1 頭是“Expect: 100-continue” ,將發(fā)送“100 Continue”的響應(yīng)。
  • 在 頭被設(shè)置后,寫一個(gè) HttpResponse 返回給客戶端。注意,這是不是 FullHttpResponse,唯一的反應(yīng)的第一部分。此外,我們不使用 writeAndFlush() 在這里 - 這個(gè)是在最后完成。
  • 如果沒(méi)有加密也不壓縮,要達(dá)到最大的效率可以是通過(guò)存儲(chǔ) index.html 的內(nèi)容在一個(gè) DefaultFileRegion 實(shí)現(xiàn)。這將利用零拷貝來(lái)執(zhí)行傳輸。出于這個(gè)原因,我們檢查,看看是否有一個(gè) SslHandler 在 ChannelPipeline 中。另外,我們使用 ChunkedNioFile。
  • 寫 LastHttpContent 來(lái)標(biāo)記響應(yīng)的結(jié)束,并終止它
  • 如果不要求 keepalive ,添加 ChannelFutureListener 到 ChannelFuture 對(duì)象的最后寫入,并關(guān)閉連接。注意,這里我們調(diào)用 writeAndFlush() 來(lái)刷新所有以前寫的信息。

處理 WebSocket frame

WebSockets 在“幀”里面來(lái)發(fā)送數(shù)據(jù),其中每一個(gè)都代表了一個(gè)消息的一部分。一個(gè)完整的消息可以利用了多個(gè)幀。 WebSocket "Request for Comments" (RFC) 定義了六中不同的 frame; Netty 給他們每個(gè)都提供了一個(gè) POJO 實(shí)現(xiàn) ,而我們的程序只需要使用下面4個(gè)幀類型:

  • CloseWebSocketFrame
  • PingWebSocketFrame
  • PongWebSocketFrame
  • TextWebSocketFrame

在這里我們只需要顯示處理 TextWebSocketFrame,其他的會(huì)由 WebSocketServerProtocolHandler 自動(dòng)處理。

下面代碼展示了 ChannelInboundHandler 處理 TextWebSocketFrame,同時(shí)也將跟蹤在 ChannelGroup 中所有活動(dòng)的 WebSocket 連接

TextWebSocketFrameHandler.java

public class TextWebSocketFrameHandler extends
        SimpleChannelInboundHandler<TextWebSocketFrame> {

    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx,
            TextWebSocketFrame msg) throws Exception { // (1)
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            if (channel != incoming){
                channel.writeAndFlush(new TextWebSocketFrame("[" + incoming.remoteAddress() + "]" + msg.text()));
            } else {
                channel.writeAndFlush(new TextWebSocketFrame("[you]" + msg.text() ));
            }
        }
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {  // (2)
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            channel.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 加入"));
        }
        channels.add(ctx.channel());
        System.out.println("Client:"+incoming.remoteAddress() +"加入");
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {  // (3)
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            channel.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 離開(kāi)"));
        }
        System.out.println("Client:"+incoming.remoteAddress() +"離開(kāi)");
        channels.remove(ctx.channel());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5)
        Channel incoming = ctx.channel();
        System.out.println("Client:"+incoming.remoteAddress()+"在線");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6)
        Channel incoming = ctx.channel();
        System.out.println("Client:"+incoming.remoteAddress()+"掉線");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("Client:"+incoming.remoteAddress()+"異常");
        // 當(dāng)出現(xiàn)異常就關(guān)閉連接
        cause.printStackTrace();
        ctx.close();
    }

}

1.TextWebSocketFrameHandler 繼承自 SimpleChannelInboundHandler,這個(gè)類實(shí)現(xiàn)了ChannelInboundHandler接口,ChannelInboundHandler 提供了許多事件處理的接口方法,然后你可以覆蓋這些方法?,F(xiàn)在僅僅只需要繼承 SimpleChannelInboundHandler 類而不是你自己去實(shí)現(xiàn)接口方法。

2.覆蓋了 handlerAdded() 事件處理方法。每當(dāng)從服務(wù)端收到新的客戶端連接時(shí),客戶端的 Channel 存入ChannelGroup列表中,并通知列表中的其他客戶端 Channel

3.覆蓋了 handlerRemoved() 事件處理方法。每當(dāng)從服務(wù)端收到客戶端斷開(kāi)時(shí),客戶端的 Channel 移除 ChannelGroup 列表中,并通知列表中的其他客戶端 Channel

4.覆蓋了 channelRead0() 事件處理方法。每當(dāng)從服務(wù)端讀到客戶端寫入信息時(shí),將信息轉(zhuǎn)發(fā)給其他客戶端的 Channel。其中如果你使用的是 Netty 5.x 版本時(shí),需要把 channelRead0() 重命名為messageReceived()

5.覆蓋了 channelActive() 事件處理方法。服務(wù)端監(jiān)聽(tīng)到客戶端活動(dòng)

6.覆蓋了 channelInactive() 事件處理方法。服務(wù)端監(jiān)聽(tīng)到客戶端不活動(dòng)

7.exceptionCaught() 事件處理方法是當(dāng)出現(xiàn) Throwable 對(duì)象才會(huì)被調(diào)用,即當(dāng) Netty 由于 IO 錯(cuò)誤或者處理器在處理事件時(shí)拋出的異常時(shí)。在大部分情況下,捕獲的異常應(yīng)該被記錄下來(lái)并且把關(guān)聯(lián)的 channel 給關(guān)閉掉。然而這個(gè)方法的處理方式會(huì)在遇到不同異常的情況下有不同的實(shí)現(xiàn),比如你可能想在關(guān)閉連接之前發(fā)送一個(gè)錯(cuò)誤碼的響應(yīng)消息。

上面顯示了 TextWebSocketFrameHandler 僅作了幾件事:

  • 當(dāng)WebSocket 與新客戶端已成功握手完成,通過(guò)寫入信息到 ChannelGroup 中的 Channel 來(lái)通知所有連接的客戶端,然后添加新 Channel 到 ChannelGroup
  • 如果接收到 TextWebSocketFrame,調(diào)用 retain() ,并將其寫、刷新到 ChannelGroup,使所有連接的 WebSocket Channel 都能接收到它。和以前一樣,retain() 是必需的,因?yàn)楫?dāng) channelRead0()返回時(shí),TextWebSocketFrame 的引用計(jì)數(shù)將遞減。由于所有操作都是異步的,writeAndFlush() 可能會(huì)在以后完成,我們不希望它來(lái)訪問(wèn)無(wú)效的引用。

由于 Netty 處理了其余大部分功能,唯一剩下的我們現(xiàn)在要做的是初始化 ChannelPipeline 給每一個(gè)創(chuàng)建的新的 Channel 。做到這一點(diǎn),我們需要一個(gè)ChannelInitializer

WebsocketChatServerInitializer.java

public class WebsocketChatServerInitializer extends
        ChannelInitializer<SocketChannel> { //1

    @Override
    public void initChannel(SocketChannel ch) throws Exception {//2
         ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new HttpObjectAggregator(64*1024));
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpRequestHandler("/ws"));
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        pipeline.addLast(new TextWebSocketFrameHandler());

    }
}

1.擴(kuò)展 ChannelInitializer

2.添加 ChannelHandler 到 ChannelPipeline

initChannel() 方法設(shè)置 ChannelPipeline 中所有新注冊(cè)的 Channel,安裝所有需要的  ChannelHandler。

WebsocketChatServer.java

編寫一個(gè) main() 方法來(lái)啟動(dòng)服務(wù)端。

public class WebsocketChatServer {

    private int port;

    public WebsocketChatServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new WebsocketChatServerInitializer())  //(4)
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

            System.out.println("WebsocketChatServer 啟動(dòng)了");

            // 綁定端口,開(kāi)始接收進(jìn)來(lái)的連接
            ChannelFuture f = b.bind(port).sync(); // (7)

            // 等待服務(wù)器  socket 關(guān)閉 。
            // 在這個(gè)例子中,這不會(huì)發(fā)生,但你可以優(yōu)雅地關(guān)閉你的服務(wù)器。
            f.channel().closeFuture().sync();

        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();

            System.out.println("WebsocketChatServer 關(guān)閉了");
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new WebsocketChatServer(port).run();

    }
}

1.NioEventLoopGroup是用來(lái)處理I/O操作的多線程事件循環(huán)器,Netty 提供了許多不同的EventLoopGroup的實(shí)現(xiàn)用來(lái)處理不同的傳輸。在這個(gè)例子中我們實(shí)現(xiàn)了一個(gè)服務(wù)端的應(yīng)用,因此會(huì)有2個(gè) NioEventLoopGroup 會(huì)被使用。第一個(gè)經(jīng)常被叫做‘boss’,用來(lái)接收進(jìn)來(lái)的連接。第二個(gè)經(jīng)常被叫做‘worker’,用來(lái)處理已經(jīng)被接收的連接,一旦‘boss’接收到連接,就會(huì)把連接信息注冊(cè)到‘worker’上。如何知道多少個(gè)線程已經(jīng)被使用,如何映射到已經(jīng)創(chuàng)建的 Channel上都需要依賴于 EventLoopGroup 的實(shí)現(xiàn),并且可以通過(guò)構(gòu)造函數(shù)來(lái)配置他們的關(guān)系。

2.ServerBootstrap是一個(gè)啟動(dòng) NIO 服務(wù)的輔助啟動(dòng)類。你可以在這個(gè)服務(wù)中直接使用 Channel,但是這會(huì)是一個(gè)復(fù)雜的處理過(guò)程,在很多情況下你并不需要這樣做。

3.這里我們指定使用NioServerSocketChannel類來(lái)舉例說(shuō)明一個(gè)新的 Channel 如何接收進(jìn)來(lái)的連接。

4.這里的事件處理類經(jīng)常會(huì)被用來(lái)處理一個(gè)最近的已經(jīng)接收的 Channel。SimpleChatServerInitializer 繼承自ChannelInitializer是一個(gè)特殊的處理類,他的目的是幫助使用者配置一個(gè)新的 Channel。也許你想通過(guò)增加一些處理類比如 SimpleChatServerHandler 來(lái)配置一個(gè)新的 Channel 或者其對(duì)應(yīng)的ChannelPipeline來(lái)實(shí)現(xiàn)你的網(wǎng)絡(luò)程序。當(dāng)你的程序變的復(fù)雜時(shí),可能你會(huì)增加更多的處理類到 pipline 上,然后提取這些匿名類到最頂層的類上。

5.你可以設(shè)置這里指定的 Channel 實(shí)現(xiàn)的配置參數(shù)。我們正在寫一個(gè)TCP/IP 的服務(wù)端,因此我們被允許設(shè)置 socket 的參數(shù)選項(xiàng)比如tcpNoDelay 和 keepAlive。請(qǐng)參考ChannelOption和詳細(xì)的ChannelConfig實(shí)現(xiàn)的接口文檔以此可以對(duì)ChannelOption 的有一個(gè)大概的認(rèn)識(shí)。

6.option() 是提供給NioServerSocketChannel用來(lái)接收進(jìn)來(lái)的連接。childOption() 是提供給由父管道ServerChannel接收到的連接,在這個(gè)例子中也是 NioServerSocketChannel。

7.我們繼續(xù),剩下的就是綁定端口然后啟動(dòng)服務(wù)。這里我們?cè)跈C(jī)器上綁定了機(jī)器所有網(wǎng)卡上的 8080 端口。當(dāng)然現(xiàn)在你可以多次調(diào)用 bind() 方法(基于不同綁定地址)。

恭喜!你已經(jīng)完成了基于 Netty 聊天服務(wù)端程序。

客戶端

在程序的 resources 目錄下,我們創(chuàng)建一個(gè) WebsocketChatClient.html 頁(yè)面來(lái)作為客戶端

WebsocketChatClient.html

<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket Chat</title>
</head>
<body>
    <script type="text/javascript">
        var socket;
        if (!window.WebSocket) {
            window.WebSocket = window.MozWebSocket;
        }
        if (window.WebSocket) {
            socket = new WebSocket("ws://localhost:8080/ws");
            socket.onmessage = function(event) {
                var ta = document.getElementById('responseText');
                ta.value = ta.value + '\n' + event.data
            };
            socket.onopen = function(event) {
                var ta = document.getElementById('responseText');
                ta.value = "連接開(kāi)啟!";
            };
            socket.onclose = function(event) {
                var ta = document.getElementById('responseText');
                ta.value = ta.value + "連接被關(guān)閉";
            };
        } else {
            alert("你的瀏覽器不支持 WebSocket!");
        }

        function send(message) {
            if (!window.WebSocket) {
                return;
            }
            if (socket.readyState == WebSocket.OPEN) {
                socket.send(message);
            } else {
                alert("連接沒(méi)有開(kāi)啟.");
            }
        }
    </script>
    <form onsubmit="return false;">
        <h3>WebSocket 聊天室:</h3>
        <textarea id="responseText" style="width: 500px; height: 300px;"></textarea>
        <br> 
        <input type="text" name="message"  style="width: 300px" value="Welcome to www.waylau.com">
        <input type="button" value="發(fā)送消息" onclick="send(this.form.message.value)">
        <input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空聊天記錄">
    </form>
    <br> 
    <br> 
    <a href="http://www.waylau.com/" >更多例子請(qǐng)?jiān)L問(wèn) www.waylau.com</a>
</body>
</html>

邏輯比較簡(jiǎn)單,不累述。

運(yùn)行效果

先運(yùn)行 WebsocketChatServer,再打開(kāi)多個(gè)瀏覽器頁(yè)面實(shí)現(xiàn)多個(gè) 客戶端訪問(wèn) http://localhost:8080

源碼

見(jiàn)https://github.com/waylau/netty-4-user-guide-demos中 websocketchat

參考Netty 4.x 用戶指南https://github.com/waylau/netty-4-user-guide

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)