上一次我們用Netty快速實(shí)現(xiàn)了一個(gè) Java 聊天程序?,F(xiàn)在,我們要做下修改,加入 WebSocket 的支持,使它可以在瀏覽器里進(jìn)行文本聊天。
WebSocket 通過(guò)“Upgrade handshake(升級(jí)握手)”從標(biāo)準(zhǔn)的 HTTP 或HTTPS 協(xié)議轉(zhuǎn)為 WebSocket。因此,使用 WebSocket 的應(yīng)用程序?qū)⑹冀K以 HTTP/S 開(kāi)始,然后進(jìn)行升級(jí)。在什么時(shí)候發(fā)生這種情況取決于具體的應(yīng)用;它可以是在啟動(dòng)時(shí),或當(dāng)一個(gè)特定的 URL 被請(qǐng)求時(shí)。
在我們的應(yīng)用中,當(dāng) URL 請(qǐng)求以“/ws”結(jié)束時(shí),我們才升級(jí)協(xié)議為WebSocket。否則,服務(wù)器將使用基本的 HTTP/S。一旦升級(jí)連接將使用的WebSocket 傳輸所有數(shù)據(jù)。
整個(gè)服務(wù)器邏輯如下:
1.客戶端/用戶連接到服務(wù)器并加入聊天
2.HTTP 請(qǐng)求頁(yè)面或 WebSocket 升級(jí)握手
3.服務(wù)器處理所有客戶端/用戶
4.響應(yīng) URI “/”的請(qǐng)求,轉(zhuǎn)到默認(rèn) html 頁(yè)面
5.如果訪問(wèn)的是 URI“/ws” ,處理 WebSocket 升級(jí)握手
6.升級(jí)握手完成后 ,通過(guò) WebSocket 發(fā)送聊天消息
讓我們從處理 HTTP 請(qǐng)求的實(shí)現(xiàn)開(kāi)始。
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> { //1
private final String wsUri;
private static final File INDEX;
static {
URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();
try {
String path = location.toURI() + "WebsocketChatClient.html";
path = !path.contains("file:") ? path : path.substring(5);
INDEX = new File(path);
} catch (URISyntaxException e) {
throw new IllegalStateException("Unable to locate WebsocketChatClient.html", e);
}
}
public HttpRequestHandler(String wsUri) {
this.wsUri = wsUri;
}
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
if (wsUri.equalsIgnoreCase(request.getUri())) {
ctx.fireChannelRead(request.retain()); //2
} else {
if (HttpHeaders.is100ContinueExpected(request)) {
send100Continue(ctx); //3
}
RandomAccessFile file = new RandomAccessFile(INDEX, "r");//4
HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK);
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");
boolean keepAlive = HttpHeaders.isKeepAlive(request);
if (keepAlive) { //5
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
}
ctx.write(response); //6
if (ctx.pipeline().get(SslHandler.class) == null) { //7
ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
} else {
ctx.write(new ChunkedNioFile(file.getChannel()));
}
ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); //8
if (!keepAlive) {
future.addListener(ChannelFutureListener.CLOSE); //9
}
file.close();
}
}
private static void send100Continue(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
Channel incoming = ctx.channel();
System.out.println("Client:"+incoming.remoteAddress()+"異常");
// 當(dāng)出現(xiàn)異常就關(guān)閉連接
cause.printStackTrace();
ctx.close();
}
}
1.擴(kuò)展 SimpleChannelInboundHandler 用于處理 FullHttpRequest信息
2.如果請(qǐng)求是 WebSocket 升級(jí),遞增引用計(jì)數(shù)器(保留)并且將它傳遞給在 ChannelPipeline 中的下個(gè) ChannelInboundHandler
3.處理符合 HTTP 1.1的 "100 Continue" 請(qǐng)求
4.讀取默認(rèn)的 WebsocketChatClient.html 頁(yè)面
5.判斷 keepalive 是否在請(qǐng)求頭里面
6.寫 HttpResponse 到客戶端
7.寫 index.html 到客戶端,判斷 SslHandler 是否在 ChannelPipeline 來(lái)決定是使用 DefaultFileRegion 還是 ChunkedNioFile
8.寫并刷新 LastHttpContent 到客戶端,標(biāo)記響應(yīng)完成
9.如果 keepalive 沒(méi)有要求,當(dāng)寫完成時(shí),關(guān)閉 Channel
HttpRequestHandler 做了下面幾件事,
WebSockets 在“幀”里面來(lái)發(fā)送數(shù)據(jù),其中每一個(gè)都代表了一個(gè)消息的一部分。一個(gè)完整的消息可以利用了多個(gè)幀。 WebSocket "Request for Comments" (RFC) 定義了六中不同的 frame; Netty 給他們每個(gè)都提供了一個(gè) POJO 實(shí)現(xiàn) ,而我們的程序只需要使用下面4個(gè)幀類型:
在這里我們只需要顯示處理 TextWebSocketFrame,其他的會(huì)由 WebSocketServerProtocolHandler 自動(dòng)處理。
下面代碼展示了 ChannelInboundHandler 處理 TextWebSocketFrame,同時(shí)也將跟蹤在 ChannelGroup 中所有活動(dòng)的 WebSocket 連接
TextWebSocketFrameHandler.java
public class TextWebSocketFrameHandler extends
SimpleChannelInboundHandler<TextWebSocketFrame> {
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx,
TextWebSocketFrame msg) throws Exception { // (1)
Channel incoming = ctx.channel();
for (Channel channel : channels) {
if (channel != incoming){
channel.writeAndFlush(new TextWebSocketFrame("[" + incoming.remoteAddress() + "]" + msg.text()));
} else {
channel.writeAndFlush(new TextWebSocketFrame("[you]" + msg.text() ));
}
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // (2)
Channel incoming = ctx.channel();
for (Channel channel : channels) {
channel.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 加入"));
}
channels.add(ctx.channel());
System.out.println("Client:"+incoming.remoteAddress() +"加入");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3)
Channel incoming = ctx.channel();
for (Channel channel : channels) {
channel.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 離開(kāi)"));
}
System.out.println("Client:"+incoming.remoteAddress() +"離開(kāi)");
channels.remove(ctx.channel());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5)
Channel incoming = ctx.channel();
System.out.println("Client:"+incoming.remoteAddress()+"在線");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6)
Channel incoming = ctx.channel();
System.out.println("Client:"+incoming.remoteAddress()+"掉線");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
Channel incoming = ctx.channel();
System.out.println("Client:"+incoming.remoteAddress()+"異常");
// 當(dāng)出現(xiàn)異常就關(guān)閉連接
cause.printStackTrace();
ctx.close();
}
}
1.TextWebSocketFrameHandler 繼承自 SimpleChannelInboundHandler,這個(gè)類實(shí)現(xiàn)了ChannelInboundHandler接口,ChannelInboundHandler 提供了許多事件處理的接口方法,然后你可以覆蓋這些方法?,F(xiàn)在僅僅只需要繼承 SimpleChannelInboundHandler 類而不是你自己去實(shí)現(xiàn)接口方法。
2.覆蓋了 handlerAdded() 事件處理方法。每當(dāng)從服務(wù)端收到新的客戶端連接時(shí),客戶端的 Channel 存入ChannelGroup列表中,并通知列表中的其他客戶端 Channel
3.覆蓋了 handlerRemoved() 事件處理方法。每當(dāng)從服務(wù)端收到客戶端斷開(kāi)時(shí),客戶端的 Channel 移除 ChannelGroup 列表中,并通知列表中的其他客戶端 Channel
4.覆蓋了 channelRead0() 事件處理方法。每當(dāng)從服務(wù)端讀到客戶端寫入信息時(shí),將信息轉(zhuǎn)發(fā)給其他客戶端的 Channel。其中如果你使用的是 Netty 5.x 版本時(shí),需要把 channelRead0() 重命名為messageReceived()
5.覆蓋了 channelActive() 事件處理方法。服務(wù)端監(jiān)聽(tīng)到客戶端活動(dòng)
6.覆蓋了 channelInactive() 事件處理方法。服務(wù)端監(jiān)聽(tīng)到客戶端不活動(dòng)
7.exceptionCaught() 事件處理方法是當(dāng)出現(xiàn) Throwable 對(duì)象才會(huì)被調(diào)用,即當(dāng) Netty 由于 IO 錯(cuò)誤或者處理器在處理事件時(shí)拋出的異常時(shí)。在大部分情況下,捕獲的異常應(yīng)該被記錄下來(lái)并且把關(guān)聯(lián)的 channel 給關(guān)閉掉。然而這個(gè)方法的處理方式會(huì)在遇到不同異常的情況下有不同的實(shí)現(xiàn),比如你可能想在關(guān)閉連接之前發(fā)送一個(gè)錯(cuò)誤碼的響應(yīng)消息。
上面顯示了 TextWebSocketFrameHandler 僅作了幾件事:
由于 Netty 處理了其余大部分功能,唯一剩下的我們現(xiàn)在要做的是初始化 ChannelPipeline 給每一個(gè)創(chuàng)建的新的 Channel 。做到這一點(diǎn),我們需要一個(gè)ChannelInitializer
public class WebsocketChatServerInitializer extends
ChannelInitializer<SocketChannel> { //1
@Override
public void initChannel(SocketChannel ch) throws Exception {//2
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(64*1024));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpRequestHandler("/ws"));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new TextWebSocketFrameHandler());
}
}
1.擴(kuò)展 ChannelInitializer
2.添加 ChannelHandler 到 ChannelPipeline
initChannel() 方法設(shè)置 ChannelPipeline 中所有新注冊(cè)的 Channel,安裝所有需要的 ChannelHandler。
編寫一個(gè) main() 方法來(lái)啟動(dòng)服務(wù)端。
public class WebsocketChatServer {
private int port;
public WebsocketChatServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new WebsocketChatServerInitializer()) //(4)
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
System.out.println("WebsocketChatServer 啟動(dòng)了");
// 綁定端口,開(kāi)始接收進(jìn)來(lái)的連接
ChannelFuture f = b.bind(port).sync(); // (7)
// 等待服務(wù)器 socket 關(guān)閉 。
// 在這個(gè)例子中,這不會(huì)發(fā)生,但你可以優(yōu)雅地關(guān)閉你的服務(wù)器。
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
System.out.println("WebsocketChatServer 關(guān)閉了");
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new WebsocketChatServer(port).run();
}
}
1.NioEventLoopGroup是用來(lái)處理I/O操作的多線程事件循環(huán)器,Netty 提供了許多不同的EventLoopGroup的實(shí)現(xiàn)用來(lái)處理不同的傳輸。在這個(gè)例子中我們實(shí)現(xiàn)了一個(gè)服務(wù)端的應(yīng)用,因此會(huì)有2個(gè) NioEventLoopGroup 會(huì)被使用。第一個(gè)經(jīng)常被叫做‘boss’,用來(lái)接收進(jìn)來(lái)的連接。第二個(gè)經(jīng)常被叫做‘worker’,用來(lái)處理已經(jīng)被接收的連接,一旦‘boss’接收到連接,就會(huì)把連接信息注冊(cè)到‘worker’上。如何知道多少個(gè)線程已經(jīng)被使用,如何映射到已經(jīng)創(chuàng)建的 Channel上都需要依賴于 EventLoopGroup 的實(shí)現(xiàn),并且可以通過(guò)構(gòu)造函數(shù)來(lái)配置他們的關(guān)系。
2.ServerBootstrap是一個(gè)啟動(dòng) NIO 服務(wù)的輔助啟動(dòng)類。你可以在這個(gè)服務(wù)中直接使用 Channel,但是這會(huì)是一個(gè)復(fù)雜的處理過(guò)程,在很多情況下你并不需要這樣做。
3.這里我們指定使用NioServerSocketChannel類來(lái)舉例說(shuō)明一個(gè)新的 Channel 如何接收進(jìn)來(lái)的連接。
4.這里的事件處理類經(jīng)常會(huì)被用來(lái)處理一個(gè)最近的已經(jīng)接收的 Channel。SimpleChatServerInitializer 繼承自ChannelInitializer是一個(gè)特殊的處理類,他的目的是幫助使用者配置一個(gè)新的 Channel。也許你想通過(guò)增加一些處理類比如 SimpleChatServerHandler 來(lái)配置一個(gè)新的 Channel 或者其對(duì)應(yīng)的ChannelPipeline來(lái)實(shí)現(xiàn)你的網(wǎng)絡(luò)程序。當(dāng)你的程序變的復(fù)雜時(shí),可能你會(huì)增加更多的處理類到 pipline 上,然后提取這些匿名類到最頂層的類上。
5.你可以設(shè)置這里指定的 Channel 實(shí)現(xiàn)的配置參數(shù)。我們正在寫一個(gè)TCP/IP 的服務(wù)端,因此我們被允許設(shè)置 socket 的參數(shù)選項(xiàng)比如tcpNoDelay 和 keepAlive。請(qǐng)參考ChannelOption和詳細(xì)的ChannelConfig實(shí)現(xiàn)的接口文檔以此可以對(duì)ChannelOption 的有一個(gè)大概的認(rèn)識(shí)。
6.option() 是提供給NioServerSocketChannel用來(lái)接收進(jìn)來(lái)的連接。childOption() 是提供給由父管道ServerChannel接收到的連接,在這個(gè)例子中也是 NioServerSocketChannel。
7.我們繼續(xù),剩下的就是綁定端口然后啟動(dòng)服務(wù)。這里我們?cè)跈C(jī)器上綁定了機(jī)器所有網(wǎng)卡上的 8080 端口。當(dāng)然現(xiàn)在你可以多次調(diào)用 bind() 方法(基于不同綁定地址)。
恭喜!你已經(jīng)完成了基于 Netty 聊天服務(wù)端程序。
在程序的 resources 目錄下,我們創(chuàng)建一個(gè) WebsocketChatClient.html 頁(yè)面來(lái)作為客戶端
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket Chat</title>
</head>
<body>
<script type="text/javascript">
var socket;
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
socket = new WebSocket("ws://localhost:8080/ws");
socket.onmessage = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + '\n' + event.data
};
socket.onopen = function(event) {
var ta = document.getElementById('responseText');
ta.value = "連接開(kāi)啟!";
};
socket.onclose = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + "連接被關(guān)閉";
};
} else {
alert("你的瀏覽器不支持 WebSocket!");
}
function send(message) {
if (!window.WebSocket) {
return;
}
if (socket.readyState == WebSocket.OPEN) {
socket.send(message);
} else {
alert("連接沒(méi)有開(kāi)啟.");
}
}
</script>
<form onsubmit="return false;">
<h3>WebSocket 聊天室:</h3>
<textarea id="responseText" style="width: 500px; height: 300px;"></textarea>
<br>
<input type="text" name="message" style="width: 300px" value="Welcome to www.waylau.com">
<input type="button" value="發(fā)送消息" onclick="send(this.form.message.value)">
<input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空聊天記錄">
</form>
<br>
<br>
<a href="http://www.waylau.com/" >更多例子請(qǐng)?jiān)L問(wèn) www.waylau.com</a>
</body>
</html>
邏輯比較簡(jiǎn)單,不累述。
先運(yùn)行 WebsocketChatServer,再打開(kāi)多個(gè)瀏覽器頁(yè)面實(shí)現(xiàn)多個(gè) 客戶端訪問(wèn) http://localhost:8080
見(jiàn)https://github.com/waylau/netty-4-user-guide-demos中 websocketchat
參考Netty 4.x 用戶指南https://github.com/waylau/netty-4-user-guide
更多建議: