本章圍繞 Netty 的核心架構(gòu),通過(guò)簡(jiǎn)單的示例帶你快速入門(mén)。當(dāng)你讀完本章節(jié),你馬上就可以用 Netty 寫(xiě)出一個(gè)客戶端和服務(wù)器。
如果你在學(xué)習(xí)的時(shí)候喜歡“top-down(自頂向下)”,那你可能需要要從第二章《Architectural Overview (架構(gòu)總覽)》開(kāi)始,然后再回到這里。
在運(yùn)行本章示例之前,需要準(zhǔn)備:最新版的 Netty 以及 JDK 1.6 或以上版本。最新版的 Netty 在這下載。自行下載 JDK。
閱讀本章節(jié)過(guò)程中,你可能會(huì)對(duì)相關(guān)類有疑惑,關(guān)于這些類的詳細(xì)的信息請(qǐng)請(qǐng)參考 API 說(shuō)明文檔。為了方便,所有文檔中涉及到的類名字都會(huì)被關(guān)聯(lián)到一個(gè)在線的 API 說(shuō)明。當(dāng)然,如果有任何錯(cuò)誤信息、語(yǔ)法錯(cuò)誤或者你有任何好的建議來(lái)改進(jìn)文檔說(shuō)明,那么請(qǐng)聯(lián)系Netty社區(qū)。
譯者注:對(duì)本翻譯有任何疑問(wèn),在https://github.com/waylau/netty-4-user-guide/issues提問(wèn)
世上最簡(jiǎn)單的協(xié)議不是'Hello, World!' 而是 DISCARD(丟棄)。這個(gè)協(xié)議將會(huì)丟掉任何收到的數(shù)據(jù),而不響應(yīng)。
為了實(shí)現(xiàn) DISCARD 協(xié)議,你只需忽略所有收到的數(shù)據(jù)。讓我們從 handler (處理器)的實(shí)現(xiàn)開(kāi)始,handler 是由 Netty 生成用來(lái)處理 I/O 事件的。
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* 處理服務(wù)端 channel.
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
// 默默地丟棄收到的數(shù)據(jù)
((ByteBuf) msg).release(); // (3)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// 當(dāng)出現(xiàn)異常就關(guān)閉連接
cause.printStackTrace();
ctx.close();
}
}
1.DiscardServerHandler 繼承自 ChannelInboundHandlerAdapter,這個(gè)類實(shí)現(xiàn)了 ChannelInboundHandler接口,ChannelInboundHandler
提供了許多事件處理的接口方法,然后你可以覆蓋這些方法?,F(xiàn)在僅僅只需要繼承 ChannelInboundHandlerAdapter
類而不是你自己去實(shí)現(xiàn)接口方法。
2.這里我們覆蓋了 chanelRead()
事件處理方法。每當(dāng)從客戶端收到新的數(shù)據(jù)時(shí),這個(gè)方法會(huì)在收到消息時(shí)被調(diào)用,這個(gè)例子中,收到的消息的類型是 ByteBuf
3.為了實(shí)現(xiàn) DISCARD 協(xié)議,處理器不得不忽略所有接受到的消息。ByteBuf
是一個(gè)引用計(jì)數(shù)對(duì)象,這個(gè)對(duì)象必須顯示地調(diào)用 release()
方法來(lái)釋放。請(qǐng)記住處理器的職責(zé)是釋放所有傳遞到處理器的引用計(jì)數(shù)對(duì)象。通常,channelRead()
方法的實(shí)現(xiàn)就像下面的這段代碼:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
// Do something with msg
} finally {
ReferenceCountUtil.release(msg);
}
}
4.exceptionCaught()
事件處理方法是當(dāng)出現(xiàn) Throwable
對(duì)象才會(huì)被調(diào)用,即當(dāng) Netty 由于 IO 錯(cuò)誤或者處理器在處理事件時(shí)拋出的異常時(shí)。在大部分情況下,捕獲的異常應(yīng)該被記錄下來(lái)并且把關(guān)聯(lián)的 channel 給關(guān)閉掉。然而這個(gè)方法的處理方式會(huì)在遇到不同異常的情況下有不同的實(shí)現(xiàn),比如你可能想在關(guān)閉連接之前發(fā)送一個(gè)錯(cuò)誤碼的響應(yīng)消息。
目前為止一切都還不錯(cuò),我們已經(jīng)實(shí)現(xiàn)了 DISCARD 服務(wù)器的一半功能,剩下的需要編寫(xiě)一個(gè) main()
方法來(lái)啟動(dòng)服務(wù)端的 DiscardServerHandler
。
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* 丟棄任何進(jìn)入的數(shù)據(jù)
*/
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// 綁定端口,開(kāi)始接收進(jìn)來(lái)的連接
ChannelFuture f = b.bind(port).sync(); // (7)
// 等待服務(wù)器 socket 關(guān)閉 。
// 在這個(gè)例子中,這不會(huì)發(fā)生,但你可以優(yōu)雅地關(guān)閉你的服務(wù)器。
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new DiscardServer(port).run();
}
}
1.NioEventLoopGroup 是用來(lái)處理I/O操作的多線程事件循環(huán)器,Netty 提供了許多不同的 EventLoopGroup 的實(shí)現(xiàn)用來(lái)處理不同的傳輸。在這個(gè)例子中我們實(shí)現(xiàn)了一個(gè)服務(wù)端的應(yīng)用,因此會(huì)有2個(gè) NioEventLoopGroup
會(huì)被使用。第一個(gè)經(jīng)常被叫做‘boss’,用來(lái)接收進(jìn)來(lái)的連接。第二個(gè)經(jīng)常被叫做‘worker’,用來(lái)處理已經(jīng)被接收的連接,一旦‘boss’接收到連接,就會(huì)把連接信息注冊(cè)到‘worker’上。如何知道多少個(gè)線程已經(jīng)被使用,如何映射到已經(jīng)創(chuàng)建的
Channel上都需要依賴于 EventLoopGroup
的實(shí)現(xiàn),并且可以通過(guò)構(gòu)造函數(shù)來(lái)配置他們的關(guān)系。
2.ServerBootstrap 是一個(gè)啟動(dòng) NIO 服務(wù)的輔助啟動(dòng)類。你可以在這個(gè)服務(wù)中直接使用 Channel
,但是這會(huì)是一個(gè)復(fù)雜的處理過(guò)程,在很多情況下你并不需要這樣做。
3.這里我們指定使用 NioServerSocketChannel 類來(lái)舉例說(shuō)明一個(gè)新的 Channel
如何接收進(jìn)來(lái)的連接。
4.這里的事件處理類經(jīng)常會(huì)被用來(lái)處理一個(gè)最近的已經(jīng)接收的 Channel
。ChannelInitializer 是一個(gè)特殊的處理類,他的目的是幫助使用者配置一個(gè)新的 Channel
。也許你想通過(guò)增加一些處理類比如DiscardServerHandler
來(lái)配置一個(gè)新的
Channel 或者其對(duì)應(yīng)的ChannelPipeline 來(lái)實(shí)現(xiàn)你的網(wǎng)絡(luò)程序。當(dāng)你的程序變的復(fù)雜時(shí),可能你會(huì)增加更多的處理類到 pipline 上
,然后提取這些匿名類到最頂層的類上。
5.你可以設(shè)置這里指定的 Channel
實(shí)現(xiàn)的配置參數(shù)。我們正在寫(xiě)一個(gè)TCP/IP 的服務(wù)端,因此我們被允許設(shè)置 socket
的參數(shù)選項(xiàng)比如tcpNoDelay
和 keepAlive
。請(qǐng)參考 ChannelOption 和詳細(xì)的
ChannelConfig實(shí)現(xiàn)的接口文檔以此可以對(duì)ChannelOption
的有一個(gè)大概的認(rèn)識(shí)。
6.你關(guān)注過(guò) option()
和 childOption()
嗎?option()
是提供給NioServerSocketChannel 用來(lái)接收進(jìn)來(lái)的連接。childOption()
是提供給由父管道 ServerChannel 接收到的連接,在這個(gè)例子中也是 NioServerSocketChannel
。
7.我們繼續(xù),剩下的就是綁定端口然后啟動(dòng)服務(wù)。這里我們?cè)跈C(jī)器上綁定了機(jī)器所有網(wǎng)卡上的 8080 端口。當(dāng)然現(xiàn)在你可以多次調(diào)用 bind()
方法(基于不同綁定地址)。
恭喜!你已經(jīng)熟練地完成了第一個(gè)基于 Netty 的服務(wù)端程序。
現(xiàn)在我們已經(jīng)編寫(xiě)出我們第一個(gè)服務(wù)端,我們需要測(cè)試一下他是否真的可以運(yùn)行。最簡(jiǎn)單的測(cè)試方法是用 telnet
命令。例如,你可以在命令行上輸入telnet localhost 8080
或者其他類型參數(shù)。
然而我們能說(shuō)這個(gè)服務(wù)端是正常運(yùn)行了嗎?事實(shí)上我們也不知道,因?yàn)樗且粋€(gè) discard
服務(wù),你根本不可能得到任何的響應(yīng)。為了證明他仍然是在正常工作的,讓我們修改服務(wù)端的程序來(lái)打印出他到底接收到了什么。
我們已經(jīng)知道 channelRead()
方法是在數(shù)據(jù)被接收的時(shí)候調(diào)用。讓我們放一些代碼到 DiscardServerHandler
類的 channelRead()
方法。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
try {
while (in.isReadable()) { // (1)
System.out.print((char) in.readByte());
System.out.flush();
}
} finally {
ReferenceCountUtil.release(msg); // (2)
}
}
1.這個(gè)低效的循環(huán)事實(shí)上可以簡(jiǎn)化為:System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
2.或者,你可以在這里調(diào)用 in.release()
。
如果你再次運(yùn)行 telnet 命令,你將會(huì)看到服務(wù)端打印出了他所接收到的消息。
完整的discard server
代碼放在了io.netty.example.discard包下面。
譯者注:翻譯版本的項(xiàng)目源碼見(jiàn) https://github.com/waylau/netty-4-user-guide-demos 中的com.waylau.netty.demo.discard
包下
到目前為止,我們雖然接收到了數(shù)據(jù),但沒(méi)有做任何的響應(yīng)。然而一個(gè)服務(wù)端通常會(huì)對(duì)一個(gè)請(qǐng)求作出響應(yīng)。讓我們學(xué)習(xí)怎樣在 ECHO 協(xié)議的實(shí)現(xiàn)下編寫(xiě)一個(gè)響應(yīng)消息給客戶端,這個(gè)協(xié)議針對(duì)任何接收的數(shù)據(jù)都會(huì)返回一個(gè)響應(yīng)。
和 discard server
唯一不同的是把在此之前我們實(shí)現(xiàn)的 channelRead()
方法,返回所有的數(shù)據(jù)替代打印接收數(shù)據(jù)到控制臺(tái)上的邏輯。因此,需要把 channelRead()
方法修改如下:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg); // (1)
ctx.flush(); // (2)
}
write(Object)
方法來(lái)逐字地把接受到的消息寫(xiě)入。請(qǐng)注意不同于 DISCARD 的例子我們并沒(méi)有釋放接受到的消息,這是因?yàn)楫?dāng)寫(xiě)入的時(shí)候 Netty
已經(jīng)幫我們釋放了。ctx.write(Object)
方法不會(huì)使消息寫(xiě)入到通道上,他被緩沖在了內(nèi)部,你需要調(diào)用 ctx.flush()
方法來(lái)把緩沖區(qū)中數(shù)據(jù)強(qiáng)行輸出?;蛘吣憧梢杂酶?jiǎn)潔的 cxt.writeAndFlush(msg)
以達(dá)到同樣的目的。如果你再一次運(yùn)行 telnet
命令,你會(huì)看到服務(wù)端會(huì)發(fā)回一個(gè)你已經(jīng)發(fā)送的消息。
完整的echo
服務(wù)的代碼放在了 io.netty.example.echo包下面。
譯者注:翻譯版本的項(xiàng)目源碼見(jiàn) https://github.com/waylau/netty-4-user-guide-demos 中的com.waylau.netty.demo.echo
包下
在這個(gè)部分被實(shí)現(xiàn)的協(xié)議是 TIME 協(xié)議。和之前的例子不同的是在不接受任何請(qǐng)求時(shí)他會(huì)發(fā)送一個(gè)含32位的整數(shù)的消息,并且一旦消息發(fā)送就會(huì)立即關(guān)閉連接。在這個(gè)例子中,你會(huì)學(xué)習(xí)到如何構(gòu)建和發(fā)送一個(gè)消息,然后在完成時(shí)關(guān)閉連接。
因?yàn)槲覀儗?huì)忽略任何接收到的數(shù)據(jù),而只是在連接被創(chuàng)建發(fā)送一個(gè)消息,所以這次我們不能使用 channelRead()
方法了,代替他的是,我們需要覆蓋 channelActive()
方法,下面的就是實(shí)現(xiàn)的內(nèi)容:
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) { // (1)
final ByteBuf time = ctx.alloc().buffer(4); // (2)
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time); // (3)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
1.channelActive()
方法將會(huì)在連接被建立并且準(zhǔn)備進(jìn)行通信時(shí)被調(diào)用。因此讓我們?cè)谶@個(gè)方法里完成一個(gè)代表當(dāng)前時(shí)間的32位整數(shù)消息的構(gòu)建工作。
2.為了發(fā)送一個(gè)新的消息,我們需要分配一個(gè)包含這個(gè)消息的新的緩沖。因?yàn)槲覀冃枰獙?xiě)入一個(gè)32位的整數(shù),因此我們需要一個(gè)至少有4個(gè)字節(jié)的 ByteBuf。通過(guò) ChannelHandlerContext.alloc()
得到一個(gè)當(dāng)前的ByteBufAllocator,然后分配一個(gè)新的緩沖。
3.和往常一樣我們需要編寫(xiě)一個(gè)構(gòu)建好的消息。但是等一等,flip 在哪?難道我們使用 NIO 發(fā)送消息時(shí)不是調(diào)用 java.nio.ByteBuffer.flip()
嗎?ByteBuf
之所以沒(méi)有這個(gè)方法因?yàn)橛袃蓚€(gè)指針,一個(gè)對(duì)應(yīng)讀操作一個(gè)對(duì)應(yīng)寫(xiě)操作。當(dāng)你向 ByteBuf 里寫(xiě)入數(shù)據(jù)的時(shí)候?qū)懼羔樀乃饕蜁?huì)增加,同時(shí)讀指針的索引沒(méi)有變化。讀指針?biāo)饕蛯?xiě)指針?biāo)饕謩e代表了消息的開(kāi)始和結(jié)束。
比較起來(lái),NIO
緩沖并沒(méi)有提供一種簡(jiǎn)潔的方式來(lái)計(jì)算出消息內(nèi)容的開(kāi)始和結(jié)尾,除非你調(diào)用 flip
方法。當(dāng)你忘記調(diào)用 flip
方法而引起沒(méi)有數(shù)據(jù)或者錯(cuò)誤數(shù)據(jù)被發(fā)送時(shí),你會(huì)陷入困境。這樣的一個(gè)錯(cuò)誤不會(huì)發(fā)生在 Netty
上,因?yàn)槲覀儗?duì)于不同的操作類型有不同的指針。你會(huì)發(fā)現(xiàn)這樣的使用方法會(huì)讓你過(guò)程變得更加的容易,因?yàn)槟阋呀?jīng)習(xí)慣一種沒(méi)有使用 flip
的方式。
另外一個(gè)點(diǎn)需要注意的是 ChannelHandlerContext.write()
(和 writeAndFlush()
)方法會(huì)返回一個(gè) ChannelFuture 對(duì)象,一個(gè) ChannelFuture
代表了一個(gè)還沒(méi)有發(fā)生的 I/O 操作。這意味著任何一個(gè)請(qǐng)求操作都不會(huì)馬上被執(zhí)行,因?yàn)樵? Netty 里所有的操作都是異步的。舉個(gè)例子下面的代碼中在消息被發(fā)送之前可能會(huì)先關(guān)閉連接。
Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
因此你需要在 write()
方法返回的 ChannelFuture
完成后調(diào)用 close()
方法,然后當(dāng)他的寫(xiě)操作已經(jīng)完成他會(huì)通知他的監(jiān)聽(tīng)者。請(qǐng)注意,close()
方法也可能不會(huì)立馬關(guān)閉,他也會(huì)返回一個(gè)ChannelFuture
。
4.當(dāng)一個(gè)寫(xiě)請(qǐng)求已經(jīng)完成是如何通知到我們?這個(gè)只需要簡(jiǎn)單地在返回的 ChannelFuture
上增加一個(gè)ChannelFutureListener。這里我們構(gòu)建了一個(gè)匿名的 ChannelFutureListener
類用來(lái)在操作完成時(shí)關(guān)閉 Channel
。
或者,你可以使用簡(jiǎn)單的預(yù)定義監(jiān)聽(tīng)器代碼:
f.addListener(ChannelFutureListener.CLOSE);
為了測(cè)試我們的time
服務(wù)如我們期望的一樣工作,你可以使用 UNIX 的 rdate
命令
$ rdate -o <port> -p <host>
Port 是你在main()
函數(shù)中指定的端口,host 使用 locahost
就可以了。
不像 DISCARD 和 ECHO 的服務(wù)端,對(duì)于 TIME 協(xié)議我們需要一個(gè)客戶端,因?yàn)槿藗儾荒馨岩粋€(gè)32位的二進(jìn)制數(shù)據(jù)翻譯成一個(gè)日期或者日歷。在這一部分,我們將會(huì)討論如何確保服務(wù)端是正常工作的,并且學(xué)習(xí)怎樣用Netty 編寫(xiě)一個(gè)客戶端。
在 Netty 中,編寫(xiě)服務(wù)端和客戶端最大的并且唯一不同的使用了不同的BootStrap 和 Channel的實(shí)現(xiàn)。請(qǐng)看一下下面的代碼:
public class TimeClient {
public static void main(String[] args) throws Exception {
String host = args[0];
int port = Integer.parseInt(args[1]);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// 啟動(dòng)客戶端
ChannelFuture f = b.connect(host, port).sync(); // (5)
// 等待連接關(guān)閉
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
1.BootStrap 和 ServerBootstrap 類似,不過(guò)他是對(duì)非服務(wù)端的 channel
而言,比如客戶端或者無(wú)連接傳輸模式的 channel
。
2.如果你只指定了一個(gè) EventLoopGroup,那他就會(huì)即作為一個(gè) boss group
,也會(huì)作為一個(gè) workder group
,盡管客戶端不需要使用到 boss worker
。
3.代替NioServerSocketChannel的是NioSocketChannel,這個(gè)類在客戶端channel 被創(chuàng)建時(shí)使用。
4.不像在使用 ServerBootstrap
時(shí)需要用 childOption()
方法,因?yàn)榭蛻舳说?SocketChannel 沒(méi)有父親。
5.我們用 connect()
方法代替了 bind()
方法。
正如你看到的,他和服務(wù)端的代碼是不一樣的。ChannelHandler 是如何實(shí)現(xiàn)的?他應(yīng)該從服務(wù)端接受一個(gè)32位的整數(shù)消息,把他翻譯成人們能讀懂的格式,并打印翻譯好的時(shí)間,最后關(guān)閉連接:
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
1.在TCP/IP中,Netty 會(huì)把讀到的數(shù)據(jù)放到 ByteBuf
的數(shù)據(jù)結(jié)構(gòu)中。
這樣看起來(lái)非常簡(jiǎn)單,并且和服務(wù)端的那個(gè)例子的代碼也相差不多。然而,處理器有時(shí)候會(huì)因?yàn)閽伋?IndexOutOfBoundsException
而拒絕工作。在下個(gè)部分我們會(huì)討論為什么會(huì)發(fā)生這種情況。
Socket Buffer
的一個(gè)小警告基于流的傳輸比如 TCP/IP, 接收到數(shù)據(jù)是存在 socket
接收的 buffer
中。不幸的是,基于流的傳輸并不是一個(gè)數(shù)據(jù)包隊(duì)列,而是一個(gè)字節(jié)隊(duì)列。意味著,即使你發(fā)送了2個(gè)獨(dú)立的數(shù)據(jù)包,操作系統(tǒng)也不會(huì)作為2個(gè)消息處理而僅僅是作為一連串的字節(jié)而言。因此這是不能保證你遠(yuǎn)程寫(xiě)入的數(shù)據(jù)就會(huì)準(zhǔn)確地讀取。舉個(gè)例子,讓我們假設(shè)操作系統(tǒng)的 TCP/TP 協(xié)議棧已經(jīng)接收了3個(gè)數(shù)據(jù)包:
由于基于流傳輸?shù)膮f(xié)議的這種普通的性質(zhì),在你的應(yīng)用程序里讀取數(shù)據(jù)的時(shí)候會(huì)有很高的可能性被分成下面的片段
因此,一個(gè)接收方不管他是客戶端還是服務(wù)端,都應(yīng)該把接收到的數(shù)據(jù)整理成一個(gè)或者多個(gè)更有意思并且能夠讓程序的業(yè)務(wù)邏輯更好理解的數(shù)據(jù)。在上面的例子中,接收到的數(shù)據(jù)應(yīng)該被構(gòu)造成下面的格式:
回到 TIME 客戶端例子。同樣也有類似的問(wèn)題。一個(gè)32位整型是非常小的數(shù)據(jù),他并不見(jiàn)得會(huì)被經(jīng)常拆分到到不同的數(shù)據(jù)段內(nèi)。然而,問(wèn)題是他確實(shí)可能會(huì)被拆分到不同的數(shù)據(jù)段內(nèi),并且拆分的可能性會(huì)隨著通信量的增加而增加。
最簡(jiǎn)單的方案是構(gòu)造一個(gè)內(nèi)部的可積累的緩沖,直到4個(gè)字節(jié)全部接收到了內(nèi)部緩沖。下面的代碼修改了 TimeClientHandler
的實(shí)現(xiàn)類修復(fù)了這個(gè)問(wèn)題
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private ByteBuf buf;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
buf = ctx.alloc().buffer(4); // (1)
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
buf.release(); // (1)
buf = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
buf.writeBytes(m); // (2)
m.release();
if (buf.readableBytes() >= 4) { // (3)
long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
1.ChannelHandler 有2個(gè)生命周期的監(jiān)聽(tīng)方法:handlerAdded()
和 handlerRemoved()
。你可以完成任意初始化任務(wù)只要他不會(huì)被阻塞很長(zhǎng)的時(shí)間。
2.首先,所有接收的數(shù)據(jù)都應(yīng)該被累積在 buf
變量里。
3.然后,處理器必須檢查 buf
變量是否有足夠的數(shù)據(jù),在這個(gè)例子中是4個(gè)字節(jié),然后處理實(shí)際的業(yè)務(wù)邏輯。否則,Netty 會(huì)重復(fù)調(diào)用channelRead()
當(dāng)有更多數(shù)據(jù)到達(dá)直到4個(gè)字節(jié)的數(shù)據(jù)被積累。
盡管第一個(gè)解決方案已經(jīng)解決了 TIME 客戶端的問(wèn)題了,但是修改后的處理器看起來(lái)不那么的簡(jiǎn)潔,想象一下如果由多個(gè)字段比如可變長(zhǎng)度的字段組成的更為復(fù)雜的協(xié)議時(shí),你的 ChannelInboundHandler 的實(shí)現(xiàn)將很快地變得難以維護(hù)。
正如你所知的,你可以增加多個(gè) ChannelHandler 到ChannelPipeline ,因此你可以把一整個(gè)ChannelHandler/
拆分成多個(gè)模塊以減少應(yīng)用的復(fù)雜程度,比如你可以把TimeClientHandler 拆分成2個(gè)處理器:
- TimeDecoder 處理數(shù)據(jù)拆分的問(wèn)題
- TimeClientHandler 原始版本的實(shí)現(xiàn)
幸運(yùn)地是,Netty 提供了一個(gè)可擴(kuò)展的類,幫你完成 TimeDecoder 的開(kāi)發(fā)。
public class TimeDecoder extends ByteToMessageDecoder { // (1)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
if (in.readableBytes() < 4) {
return; // (3)
}
out.add(in.readBytes(4)); // (4)
}
}
1.ByteToMessageDecoder 是 ChannelInboundHandler 的一個(gè)實(shí)現(xiàn)類,他可以在處理數(shù)據(jù)拆分的問(wèn)題上變得很簡(jiǎn)單。
2.每當(dāng)有新數(shù)據(jù)接收的時(shí)候,ByteToMessageDecoder 都會(huì)調(diào)用 decode()
方法來(lái)處理內(nèi)部的那個(gè)累積緩沖。
3.Decode()
方法可以決定當(dāng)累積緩沖里沒(méi)有足夠數(shù)據(jù)時(shí)可以往 out 對(duì)象里放任意數(shù)據(jù)。當(dāng)有更多的數(shù)據(jù)被接收了 ByteToMessageDecoder 會(huì)再一次調(diào)用 decode() 方法。
4.如果在 decode()
方法里增加了一個(gè)對(duì)象到 out
對(duì)象里,這意味著解碼器解碼消息成功。ByteToMessageDecoder 將會(huì)丟棄在累積緩沖里已經(jīng)被讀過(guò)的數(shù)據(jù)。請(qǐng)記得你不需要對(duì)多條消息調(diào)用 decode()
,ByteToMessageDecoder 會(huì)持續(xù)調(diào)用 decode()
直到不放任何數(shù)據(jù)到 out
里。
現(xiàn)在我們有另外一個(gè)處理器插入到 ChannelPipeline 里,我們應(yīng)該在 TimeClient 里修改 ChannelInitializer 的實(shí)現(xiàn):
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
}
});
如果你是一個(gè)大膽的人,你可能會(huì)嘗試使用更簡(jiǎn)單的解碼類ReplayingDecoder。不過(guò)你還是需要參考一下 API 文檔來(lái)獲取更多的信息。
public class TimeDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(
ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
out.add(in.readBytes(4));
}
}
此外,Netty還提供了更多開(kāi)箱即用的解碼器使你可以更簡(jiǎn)單地實(shí)現(xiàn)更多的協(xié)議,幫助你避免開(kāi)發(fā)一個(gè)難以維護(hù)的處理器實(shí)現(xiàn)。請(qǐng)參考下面的包以獲取更多更詳細(xì)的例子:
- 對(duì)于二進(jìn)制協(xié)議請(qǐng)看 io.netty.example.factorial
- 對(duì)于基于文本協(xié)議請(qǐng)看 io.netty.example.telnet
譯者注:翻譯版本的項(xiàng)目源碼見(jiàn) https://github.com/waylau/netty-4-user-guide-demos 中的com.waylau.netty.demo.factorial
和 com.waylau.netty.demo.telnet
包下
用POJO代替ByteBuf
我們回顧了迄今為止的所有例子使用 ByteBuf 作為協(xié)議消息的主要數(shù)據(jù)結(jié)構(gòu)。在本節(jié)中,我們將改善的 TIME 協(xié)議客戶端和服務(wù)器例子,使用 POJO 代替 ByteBuf。
在 ChannelHandler 使用 POIO 的好處很明顯:通過(guò)從ChannelHandler 中提取出 ByteBuf 的代碼,將會(huì)使 ChannelHandler的實(shí)現(xiàn)變得更加可維護(hù)和可重用。在 TIME 客戶端和服務(wù)器的例子中,我們讀取的僅僅是一個(gè)32位的整形數(shù)據(jù),直接使用 ByteBuf 不會(huì)是一個(gè)主要的問(wèn)題。然而,你會(huì)發(fā)現(xiàn)當(dāng)你需要實(shí)現(xiàn)一個(gè)真實(shí)的協(xié)議,分離代碼變得非常的必要。
首先,讓我們定義一個(gè)新的類型叫做 UnixTime。
public class UnixTime {
private final long value;
public UnixTime() {
this(System.currentTimeMillis() / 1000L + 2208988800L);
}
public UnixTime(long value) {
this.value = value;
}
public long value() {
return value;
}
@Override
public String toString() {
return new Date((value() - 2208988800L) * 1000L).toString();
}
}
現(xiàn)在我們可以修改下 TimeDecoder 類,返回一個(gè) UnixTime,以替代ByteBuf
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < 4) {
return;
}
out.add(new UnixTime(in.readUnsignedInt()));
}
下面是修改后的解碼器,TimeClientHandler 不再任何的 ByteBuf 代碼了。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
UnixTime m = (UnixTime) msg;
System.out.println(m);
ctx.close();
}
是不是變得更加簡(jiǎn)單和優(yōu)雅了?相同的技術(shù)可以被運(yùn)用到服務(wù)端。讓我們修改一下 TimeServerHandler 的代碼。
@Override
public void channelActive(ChannelHandlerContext ctx) {
ChannelFuture f = ctx.writeAndFlush(new UnixTime());
f.addListener(ChannelFutureListener.CLOSE);
}
現(xiàn)在,唯一缺少的功能是一個(gè)編碼器,是ChannelOutboundHandler的實(shí)現(xiàn),用來(lái)將 UnixTime 對(duì)象重新轉(zhuǎn)化為一個(gè) ByteBuf。這是比編寫(xiě)一個(gè)解碼器簡(jiǎn)單得多,因?yàn)闆](méi)有需要處理的數(shù)據(jù)包編碼消息時(shí)拆分和組裝。
public class TimeEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
UnixTime m = (UnixTime) msg;
ByteBuf encoded = ctx.alloc().buffer(4);
encoded.writeInt((int)m.value());
ctx.write(encoded, promise); // (1)
}
}
1.在這幾行代碼里還有幾個(gè)重要的事情。第一,通過(guò) ChannelPromise,當(dāng)編碼后的數(shù)據(jù)被寫(xiě)到了通道上 Netty 可以通過(guò)這個(gè)對(duì)象標(biāo)記是成功還是失敗。第二, 我們不需要調(diào)用 cxt.flush()
。因?yàn)樘幚砥饕呀?jīng)單獨(dú)分離出了一個(gè)方法 void flush(ChannelHandlerContext cxt),如果像自己實(shí)現(xiàn)
flush()
方法內(nèi)容可以自行覆蓋這個(gè)方法。
進(jìn)一步簡(jiǎn)化操作,你可以使用 MessageToByteEncode:
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
@Override
protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
out.writeInt((int)msg.value());
}
}
最后的任務(wù)就是在 TimeServerHandler 之前把 TimeEncoder 插入到ChannelPipeline。 但這是不那么重要的工作。
關(guān)閉你的應(yīng)用
關(guān)閉一個(gè) Netty 應(yīng)用往往只需要簡(jiǎn)單地通過(guò) shutdownGracefully()
方法來(lái)關(guān)閉你構(gòu)建的所有的 EventLoopGroup。當(dāng)EventLoopGroup 被完全地終止,并且對(duì)應(yīng)的所有 channel 都已經(jīng)被關(guān)閉時(shí),Netty 會(huì)返回一個(gè)Future對(duì)象來(lái)通知你。
總結(jié)
在這一章節(jié)中,我們快速地回顧下如果在熟練掌握 Netty 的情況下編寫(xiě)出一個(gè)健壯能運(yùn)行的網(wǎng)絡(luò)應(yīng)用程序。在 Netty 接下去的章節(jié)中還會(huì)有更多更相信的信息。我們也鼓勵(lì)你去重新復(fù)習(xí)下在 io.netty.example 包下的例子。請(qǐng)注意社區(qū)一直在等待你的問(wèn)題和想法以幫助 Netty 的持續(xù)改進(jìn),Netty 的文檔也是基于你們的快速反饋上。
譯者注:翻譯版本的項(xiàng)目源碼見(jiàn) https://github.com/waylau/netty-4-user-guide-demos。如對(duì)本翻譯有任何建議,可以在https://github.com/waylau/netty-4-user-guide/issues留言
更多建議: