Netty添加 WebSocket 支持

2018-08-08 10:56 更新

一種被稱作“Upgrade handshake(升級握手)”的機制能夠?qū)藴实腍TTP或者HTTPS協(xié)議轉(zhuǎn)成 WebSocket。所以,應(yīng)用程序如果使用了 WebSocket ,那么它都是以 HTTP/S 開始,之后再進行升級,升級會發(fā)生在什么時候是不確定的,要根據(jù)具體的應(yīng)用來決定:可能是在應(yīng)用啟動的時候,也可能是當一個特定的 URL 被請求的時候。

在我們的應(yīng)用中,要想升級協(xié)議為 WebSocket,只有當 URL 請求以“/ws”結(jié)束時才可以,如果沒有達到該要求,服務(wù)器仍將使用基本的 HTTP/S,一旦連接升級,之后的數(shù)據(jù)傳輸都將使用 WebSocket 。

下面看下服務(wù)器的邏輯圖

Figure 11.2 Server logic

Figure%2011

#1客戶端/用戶連接到服務(wù)器并加入聊天

#2 HTTP 請求頁面或 WebSocket 升級握手

#3服務(wù)器處理所有客戶端/用戶

#4響應(yīng) URI “/”的請求,轉(zhuǎn)到 index.html

#5如果訪問的是 URI“/ws” ,處理 WebSocket 升級握手

#6升級握手完成后 ,通過 WebSocket 發(fā)送聊天消息

處理 HTTP 請求

本節(jié)我們將實現(xiàn)此應(yīng)用中用于處理 HTTP 請求的組件,這個組件托管著可供客戶端訪問的聊天室頁面,并且顯示客戶端發(fā)送的消息。

下面就是這個 HttpRequestHandler 的代碼,它是一個用來處理 FullHttpRequest 消息的 ChannelInboundHandler 的實現(xiàn)類。注意看它是怎么實現(xiàn)忽略符合 "/ws" 格式的 URI 請求的。

Listing 11.1 HTTPRequestHandler

public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {    //1
    private final String wsUri;
    private static final File INDEX;

    static {
        URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();
        try {
            String path = location.toURI() + "index.html";
            path = !path.contains("file:") ? path : path.substring(5);
            INDEX = new File(path);
        } catch (URISyntaxException e) {
            throw new IllegalStateException("Unable to locate index.html", e);
        }
    }

    public HttpRequestHandler(String wsUri) {
        this.wsUri = wsUri;
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        if (wsUri.equalsIgnoreCase(request.getUri())) {
            ctx.fireChannelRead(request.retain());                    //2
        } else {
            if (HttpHeaders.is100ContinueExpected(request)) {
                send100Continue(ctx);                                //3
            }

            RandomAccessFile file = new RandomAccessFile(INDEX, "r");//4

            HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK);
            response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");

            boolean keepAlive = HttpHeaders.isKeepAlive(request);

            if (keepAlive) {                                        //5
                response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
                response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
            }
            ctx.write(response);                    //6

            if (ctx.pipeline().get(SslHandler.class) == null) {        //7
                ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
            } else {
                ctx.write(new ChunkedNioFile(file.getChannel()));
            }
            ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);            //8
            if (!keepAlive) {
                future.addListener(ChannelFutureListener.CLOSE);        //9
            }
        }
    }

    private static void send100Continue(ChannelHandlerContext ctx) {
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
        ctx.writeAndFlush(response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

1.擴展 SimpleChannelInboundHandler 用于處理 FullHttpRequest信息

2.如果請求是一次升級了的 WebSocket 請求,則遞增引用計數(shù)器(retain)并且將它傳遞給在 ChannelPipeline 中的下個 ChannelInboundHandler

3.處理符合 HTTP 1.1的 "100 Continue" 請求

4.讀取 index.html

5.判斷 keepalive 是否在請求頭里面

6.寫 HttpResponse 到客戶端

7.寫 index.html 到客戶端,根據(jù) ChannelPipeline 中是否有 SslHandler 來決定使用 DefaultFileRegion 還是 ChunkedNioFile

8.寫并刷新 LastHttpContent 到客戶端,標記響應(yīng)完成

9.如果 請求頭中不包含 keepalive,當寫完成時,關(guān)閉 Channel

HttpRequestHandler 做了下面幾件事,

  • 如果該 HTTP 請求被發(fā)送到URI “/ws”,則調(diào)用 FullHttpRequest 上的 retain(),并通過調(diào)用 fireChannelRead(msg) 轉(zhuǎn)發(fā)到下一個 ChannelInboundHandler。retain() 的調(diào)用是必要的,因為 channelRead() 完成后,它會調(diào)用 FullHttpRequest 上的 release() 來釋放其資源。 (請參考我們先前在第6章中關(guān)于 SimpleChannelInboundHandler 的討論)
  • 如果客戶端發(fā)送的 HTTP 1.1 頭是“Expect: 100-continue” ,則發(fā)送“100 Continue”的響應(yīng)。
  • 在 頭被設(shè)置后,寫一個 HttpResponse 返回給客戶端。注意,這不是 FullHttpResponse,這只是響應(yīng)的第一部分。另外,這里我們也不使用 writeAndFlush(), 這個是在留在最后完成。
  • 如果傳輸過程既沒有要求加密也沒有要求壓縮,那么把 index.html 的內(nèi)容存儲在一個 DefaultFileRegion 里就可以達到最好的效率。這將利用零拷貝來執(zhí)行傳輸。出于這個原因,我們要檢查 ChannelPipeline 中是否有一個 SslHandler。如果是的話,我們就使用 ChunkedNioFile。
  • 寫 LastHttpContent 來標記響應(yīng)的結(jié)束,并終止它
  • 如果不要求 keepalive ,添加 ChannelFutureListener 到 ChannelFuture 對象的最后寫入,并關(guān)閉連接。注意,這里我們調(diào)用 writeAndFlush() 來刷新所有以前寫的信息。

這里展示了應(yīng)用程序的第一部分,用來處理純的 HTTP 請求和響應(yīng)。接下來我們將處理 WebSocket 的 frame(幀),用來發(fā)送聊天消息。

WebSocket frame

WebSockets 在“幀”里面來發(fā)送數(shù)據(jù),其中每一個都代表了一個消息的一部分。一個完整的消息可以利用了多個幀。

處理 WebSocket frame

WebSocket "Request for Comments" (RFC) 定義了六種不同的 frame; Netty 給他們每個都提供了一個 POJO 實現(xiàn) ,見下表:

Table 11.1 WebSocketFrame types

名稱描述
BinaryWebSocketFramecontains binary data
TextWebSocketFramecontains text data
ContinuationWebSocketFramecontains text or binary data that belongs to a previous BinaryWebSocketFrame or TextWebSocketFrame
CloseWebSocketFramerepresents a CLOSE request and contains close status code and a phrase
PingWebSocketFramerequests the transmission of a PongWebSocketFrame
PongWebSocketFramesent as a response to a PingWebSocketFrame

我們的程序只需要使用下面4個幀類型:

  • CloseWebSocketFrame
  • PingWebSocketFrame
  • PongWebSocketFrame
  • TextWebSocketFrame

在這里我們只需要處理 TextWebSocketFrame,其他的會由 WebSocketServerProtocolHandler 自動處理。

下面代碼展示了 ChannelInboundHandler 處理 TextWebSocketFrame,同時也將跟蹤在 ChannelGroup 中所有活動的 WebSocket 連接

Listing 11.2 Handles Text frames

public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { //1
    private final ChannelGroup group;

    public TextWebSocketFrameHandler(ChannelGroup group) {
        this.group = group;
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {    //2
        if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {

            ctx.pipeline().remove(HttpRequestHandler.class);    //3

            group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined"));//4

            group.add(ctx.channel());    //5
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        group.writeAndFlush(msg.retain());    //6
    }
}

1.擴展 SimpleChannelInboundHandler 用于處理 TextWebSocketFrame 信息

2.覆寫userEventTriggered() 方法來處理自定義事件

3.如果接收的事件表明握手成功,就從 ChannelPipeline 中刪除HttpRequestHandler ,因為接下來不會接受 HTTP 消息了

4.寫一條消息給所有的已連接 WebSocket 客戶端,通知它們建立了一個新的 Channel 連接

5.添加新連接的 WebSocket Channel 到 ChannelGroup 中,這樣它就能收到所有的信息

6.保留收到的消息,并通過 writeAndFlush() 傳遞給所有連接的客戶端。

上面顯示了 TextWebSocketFrameHandler 僅作了幾件事:

  • 當WebSocket 與新客戶端已成功握手完成,通過寫入信息到 ChannelGroup 中的 Channel 來通知所有連接的客戶端,然后添加新 Channel 到 ChannelGroup
  • 如果接收到 TextWebSocketFrame,調(diào)用 retain() ,并將其寫、刷新到 ChannelGroup,使所有連接的 WebSocket Channel 都能接收到它。和以前一樣,retain() 是必需的,因為當 channelRead0()返回時,TextWebSocketFrame 的引用計數(shù)將遞減。由于所有操作都是異步的,writeAndFlush() 可能會在以后完成,我們不希望它訪問無效的引用。

由于 Netty 在其內(nèi)部處理了其余大部分功能,唯一剩下的需要我們?nèi)プ龅木褪菫槊恳粋€新創(chuàng)建的 Channel 初始化 ChannelPipeline 。要完成這個,我們需要一個ChannelInitializer

初始化 ChannelPipeline

接下來,我們需要安裝我們上面實現(xiàn)的兩個 ChannelHandler 到 ChannelPipeline。為此,我們需要繼承 ChannelInitializer 并且實現(xiàn) initChannel()。看下面 ChatServerInitializer 的代碼實現(xiàn)

Listing 11.3 Init the ChannelPipeline

public class ChatServerInitializer extends ChannelInitializer<Channel> {    //1
    private final ChannelGroup group;

    public ChatServerInitializer(ChannelGroup group) {
        this.group = group;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {            //2
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new HttpObjectAggregator(64 * 1024));
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpRequestHandler("/ws"));
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        pipeline.addLast(new TextWebSocketFrameHandler(group));
    }
}

1.擴展 ChannelInitializer

2.添加 ChannelHandler 到 ChannelPipeline

initChannel() 方法用于設(shè)置所有新注冊的 Channel 的ChannelPipeline,安裝所有需要的 ChannelHandler??偨Y(jié)如下:

Table 11.2 ChannelHandlers for the WebSockets Chat server

ChannelHandler  職責
HttpServerCodecDecode bytes to HttpRequest, HttpContent, LastHttpContent.Encode HttpRequest, HttpContent, LastHttpContent to bytes.
ChunkedWriteHandlerWrite the contents of a file.
HttpObjectAggregatorThis ChannelHandler aggregates an HttpMessage and its following HttpContents into a single FullHttpRequest or FullHttpResponse (depending on whether it is being used to handle requests or responses).With this installed the next ChannelHandler in the pipeline will receive only full HTTP requests.
HttpRequestHandlerHandle FullHttpRequests (those not sent to "/ws" URI).
WebSocketServerProtocolHandlerAs required by the WebSockets specification, handle the WebSocket Upgrade handshake, PingWebSocketFrames,PongWebSocketFrames and CloseWebSocketFrames.
TextWebSocketFrameHandlerHandles TextWebSocketFrames and handshake completion events

該 WebSocketServerProtocolHandler 處理所有規(guī)定的 WebSocket 幀類型和升級握手本身。如果握手成功所需的 ChannelHandler 被添加到管道,而那些不再需要的則被去除。管道升級之前的狀態(tài)如下圖。這代表了 ChannelPipeline 剛剛經(jīng)過 ChatServerInitializer 初始化。

Figure 11.3 ChannelPipeline before WebSockets Upgrade

Figure%2011

握手升級成功后 WebSocketServerProtocolHandler 替換HttpRequestDecoder 為 WebSocketFrameDecoder,HttpResponseEncoder 為WebSocketFrameEncoder。 為了最大化性能,WebSocket 連接不需要的 ChannelHandler 將會被移除。其中就包括了 HttpObjectAggregator 和 HttpRequestHandler

下圖,展示了 ChannelPipeline 經(jīng)過這個操作完成后的情況。注意 Netty 目前支持四個版本 WebSocket 協(xié)議,每個通過其自身的方式實現(xiàn)類。選擇正確的版本W(wǎng)ebSocketFrameDecoder 和 WebSocketFrameEncoder 是自動進行的,這取決于在客戶端(在這里指瀏覽器)的支持(在這個例子中,我們假設(shè)使用版本是 13 的 WebSocket 協(xié)議,從而圖中顯示的是 WebSocketFrameDecoder13 和 WebSocketFrameEncoder13)。

Figure 11.4 ChannelPipeline after WebSockets Upgrade

Figure%2011

引導

最后一步是 引導服務(wù)器,設(shè)置 ChannelInitializer

public class ChatServer {

    private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);//1
    private final EventLoopGroup group = new NioEventLoopGroup();
    private Channel channel;

    public ChannelFuture start(InetSocketAddress address) {
        ServerBootstrap bootstrap  = new ServerBootstrap(); //2
        bootstrap.group(group)
                .channel(NioServerSocketChannel.class)
                .childHandler(createInitializer(channelGroup));
        ChannelFuture future = bootstrap.bind(address);
        future.syncUninterruptibly();
        channel = future.channel();
        return future;
    }

    protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {        //3
       return new ChatServerInitializer(group);
    }

    public void destroy() {        //4
        if (channel != null) {
            channel.close();
        }
        channelGroup.close();
        group.shutdownGracefully();
    }

    public static void main(String[] args) throws Exception{
        if (args.length != 1) {
            System.err.println("Please give port as argument");
            System.exit(1);
        }
        int port = Integer.parseInt(args[0]);

        final ChatServer endpoint = new ChatServer();
        ChannelFuture future = endpoint.start(new InetSocketAddress(port));

        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                endpoint.destroy();
            }
        });
        future.channel().closeFuture().syncUninterruptibly();
    }
}

1.創(chuàng)建 DefaultChannelGroup 用來 保存所有連接的的 WebSocket channel

2.引導 服務(wù)器

3.創(chuàng)建 ChannelInitializer

4.處理服務(wù)器關(guān)閉,包括釋放所有資源


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號