基于流的傳輸比如 TCP/IP, 接收到數(shù)據(jù)是存在 socket 接收的 buffer 中。不幸的是,基于流的傳輸并不是一個數(shù)據(jù)包隊列,而是一個字節(jié)隊列。意味著,即使你發(fā)送了 2 個獨立的數(shù)據(jù)包,操作系統(tǒng)也不會作為 2 個消息處理而僅僅是作為一連串的字節(jié)而言。因此這是不能保證你遠(yuǎn)程寫入的數(shù)據(jù)就會準(zhǔn)確地讀取。舉個例子,讓我們假設(shè)操作系統(tǒng)的 TCP/IP 協(xié)議棧已經(jīng)接收了 3 個數(shù)據(jù)包:
由于基于流傳輸?shù)膮f(xié)議的這種普通的性質(zhì),在你的應(yīng)用程序里讀取數(shù)據(jù)的時候會有很高的可能性被分成下面的片段
因此,一個接收方不管他是客戶端還是服務(wù)端,都應(yīng)該把接收到的數(shù)據(jù)整理成一個或者多個更有意思并且能夠讓程序的業(yè)務(wù)邏輯更好理解的數(shù)據(jù)。在上面的例子中,接收到的數(shù)據(jù)應(yīng)該被構(gòu)造成下面的格式:
回到 TIME 客戶端例子。同樣也有類似的問題。一個 32 位整型是非常小的數(shù)據(jù),他并不見得會被經(jīng)常拆分到到不同的數(shù)據(jù)段內(nèi)。然而,問題是他確實可能會被拆分到不同的數(shù)據(jù)段內(nèi),并且拆分的可能性會隨著通信量的增加而增加。
最簡單的方案是構(gòu)造一個內(nèi)部的可積累的緩沖,直到 4 個字節(jié)全部接收到了內(nèi)部緩沖。下面的代碼修改了 TimeClientHandler 的實現(xiàn)類修復(fù)了這個問題
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private ByteBuf buf;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
buf = ctx.alloc().buffer(4); // (1)
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
buf.release(); // (1)
buf = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
buf.writeBytes(m); // (2)
m.release();
if (buf.readableBytes() >= 4) { // (3)
long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
1.ChannelHandler 有 2 個生命周期的監(jiān)聽方法:handlerAdded() 和 handlerRemoved()。你可以完成任意初始化任務(wù)只要他不會被阻塞很長的時間。
2.首先,所有接收的數(shù)據(jù)都應(yīng)該被累積在 buf 變量里。
3.然后,處理器必須檢查 buf 變量是否有足夠的數(shù)據(jù),在這個例子中是 4 個字節(jié),然后處理實際的業(yè)務(wù)邏輯。否則,Netty 會重復(fù)調(diào)用channelRead() 當(dāng)有更多數(shù)據(jù)到達(dá)直到4個字節(jié)的數(shù)據(jù)被積累。
盡管第一個解決方案已經(jīng)解決了 TIME 客戶端的問題了,但是修改后的處理器看起來不那么的簡潔,想象一下如果由多個字段比如可變長度的字段組成的更為復(fù)雜的協(xié)議時,你的 ChannelInboundHandler 的實現(xiàn)將很快地變得難以維護(hù)。
正如你所知的,你可以增加多個 ChannelHandler 到ChannelPipeline ,因此你可以把一整個ChannelHandler 拆分成多個模塊以減少應(yīng)用的復(fù)雜程度,比如你可以把TimeClientHandler 拆分成 2 個處理器:
幸運地是,Netty 提供了一個可擴(kuò)展的類,幫你完成 TimeDecoder 的開發(fā)。
public class TimeDecoder extends ByteToMessageDecoder { // (1)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
if (in.readableBytes() < 4) {
return; // (3)
}
out.add(in.readBytes(4)); // (4)
}
}
1.ByteToMessageDecoder 是 ChannelInboundHandler 的一個實現(xiàn)類,他可以在處理數(shù)據(jù)拆分的問題上變得很簡單。
2.每當(dāng)有新數(shù)據(jù)接收的時候,ByteToMessageDecoder 都會調(diào)用 decode() 方法來處理內(nèi)部的那個累積緩沖。
3.Decode() 方法可以決定當(dāng)累積緩沖里沒有足夠數(shù)據(jù)時可以往 out 對象里放任意數(shù)據(jù)。當(dāng)有更多的數(shù)據(jù)被接收了 ByteToMessageDecoder 會再一次調(diào)用 decode() 方法。
4.如果在 decode() 方法里增加了一個對象到 out 對象里,這意味著解碼器解碼消息成功。ByteToMessageDecoder 將會丟棄在累積緩沖里已經(jīng)被讀過的數(shù)據(jù)。請記得你不需要對多條消息調(diào)用 decode(),ByteToMessageDecoder 會持續(xù)調(diào)用 decode() 直到不放任何數(shù)據(jù)到 out 里。
現(xiàn)在我們有另外一個處理器插入到 ChannelPipeline 里,我們應(yīng)該在 TimeClient 里修改 ChannelInitializer 的實現(xiàn):
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
}
});
如果你是一個大膽的人,你可能會嘗試使用更簡單的解碼類ReplayingDecoder。不過你還是需要參考一下 API 文檔來獲取更多的信息。
public class TimeDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(
ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
out.add(in.readBytes(4));
}
}
此外,Netty 還提供了更多開箱即用的解碼器使你可以更簡單地實現(xiàn)更多的協(xié)議,幫助你避免開發(fā)一個難以維護(hù)的處理器實現(xiàn)。請參考下面的包以獲取更多更詳細(xì)的例子:
譯者注:翻譯版本的項目源碼見 https://github.com/waylau/netty-4-user-guide-demos 中的com.waylau.netty.demo.factorial
和 com.waylau.netty.demo.telnet
包下
更多建議: