Netty中的ChannelHandler 家族

2018-08-08 10:36 更新

在我們深入研究 ChannelHandler 內(nèi)部之前,讓我們花幾分鐘了解下這個 Netty 組件模型的基礎。這里先對ChannelHandler 及其子類做個簡單的介紹。

Channel 生命周期

Channel 有個簡單但強大的狀態(tài)模型,與 ChannelInboundHandler API 密切相關(guān)。下面表格是 Channel 的四個狀態(tài)

Table 6.1 Channel lifeycle states

狀態(tài)描述
channelUnregisteredchannel已創(chuàng)建但未注冊到一個 EventLoop.
channelRegisteredchannel 注冊到一個 EventLoop.
channelActivechannel 變?yōu)榛钴S狀態(tài)(連接到了遠程主機),現(xiàn)在可以接收和發(fā)送數(shù)據(jù)了
channelInactivechannel 處于非活躍狀態(tài),沒有連接到遠程主機

Channel 的正常的生命周期如下圖,當狀態(tài)出現(xiàn)變化,就會觸發(fā)對應的事件,這樣就能與 ChannelPipeline 中的 ChannelHandler進行及時的交互。

Figure 6.1 Channel State Model

Figure%206

ChannelHandler 生命周期

ChannelHandler 定義的生命周期操作如下表,當 ChannelHandler 添加到 ChannelPipeline,或者從 ChannelPipeline 移除后,對應的方法將會被調(diào)用。每個方法都傳入了一個 ChannelHandlerContext 參數(shù)

Table 6.2 ChannelHandler lifecycle methods

類型描述
handlerAdded當 ChannelHandler 添加到 ChannelPipeline 調(diào)用
handlerRemoved當 ChannelHandler 從 ChannelPipeline 移除時調(diào)用
exceptionCaught當 ChannelPipeline 執(zhí)行拋出異常時調(diào)用

ChannelHandler 子接口

Netty 提供2個重要的 ChannelHandler 子接口:

  • ChannelInboundHandler - 處理進站數(shù)據(jù)和所有狀態(tài)更改事件
  • ChannelOutboundHandler - 處理出站數(shù)據(jù),允許攔截各種操作

ChannelHandler 適配器

Netty 提供了一個簡單的 ChannelHandler 框架實現(xiàn),給所有聲明方法簽名。這個類 ChannelHandlerAdapter 的方法,主要推送事件 到 pipeline 下個 ChannelHandler 直到 pipeline 的結(jié)束。這個類 也作為 ChannelInboundHandlerAdapter 和ChannelOutboundHandlerAdapter 的基礎。所有三個適配器類的目的是作為自己的實現(xiàn)的起點;您可以擴展它們,覆蓋你需要自定義的方法。

ChannelInboundHandler

ChannelInboundHandler 的生命周期方法在下表中,當接收到數(shù)據(jù)或者與之關(guān)聯(lián)的 Channel 狀態(tài)改變時調(diào)用。之前已經(jīng)注意到了,這些方法與 Channel 的生命周期接近

Table 6.3 ChannelInboundHandler methods

類型描述
channelRegisteredInvoked when a Channel is registered to its EventLoop and is able to handle I/O.
channelUnregisteredInvoked when a Channel is deregistered from its EventLoop and cannot handle any I/O.
channelActiveInvoked when a Channel is active; the Channel is connected/bound and ready.
channelInactiveInvoked when a Channel leaves active state and is no longer connected to its remote peer.
channelReadCompleteInvoked when a read operation on the Channel has completed.
channelReadInvoked if data are read from the Channel.
channelWritabilityChangedInvoked when the writability state of the Channel changes. The user can ensure writes are not done too fast (with risk of an OutOfMemoryError) or can resume writes when the Channel becomes writable again.Channel.isWritable() can be used to detect the actual writability of the channel. The threshold for writability can be set via Channel.config().setWriteHighWaterMark() and Channel.config().setWriteLowWaterMark().
userEventTriggered(...)Invoked when a user calls Channel.fireUserEventTriggered(...) to pass a pojo through the ChannelPipeline. This can be used to pass user specific events through the ChannelPipeline and so allow handling those events.

注意,ChannelInboundHandler 實現(xiàn)覆蓋了 channelRead() 方法處理進來的數(shù)據(jù)用來響應釋放資源。Netty 在 ByteBuf 上使用了資源池,所以當執(zhí)行釋放資源時可以減少內(nèi)存的消耗。

Listing 6.1 Handler to discard data

@ChannelHandler.Sharable
public class DiscardHandler extends ChannelInboundHandlerAdapter {        //1

    @Override
    public void channelRead(ChannelHandlerContext ctx,
                                     Object msg) {
        ReferenceCountUtil.release(msg); //2
    }

}

1.擴展 ChannelInboundHandlerAdapter

2.ReferenceCountUtil.release() 來丟棄收到的信息

Netty 用一個 WARN-level 日志條目記錄未釋放的資源,使其能相當簡單地找到代碼中的違規(guī)實例。然而,由于手工管理資源會很繁瑣,您可以通過使用 SimpleChannelInboundHandler 簡化問題。如下:

Listing 6.2 Handler to discard data

@ChannelHandler.Sharable
public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> {  //1

    @Override
    public void channelRead0(ChannelHandlerContext ctx,
                                     Object msg) {
        // No need to do anything special //2
    }

}

1.擴展 SimpleChannelInboundHandler

2.不需做特別的釋放資源的動作

注意 SimpleChannelInboundHandler 會自動釋放資源,而無需存儲任何信息的引用。

更多詳見 “Error! Reference source not found..” 一節(jié)

ChannelOutboundHandler

ChannelOutboundHandler 提供了出站操作時調(diào)用的方法。這些方法會被 Channel, ChannelPipeline, 和 ChannelHandlerContext 調(diào)用。

ChannelOutboundHandler 另個一個強大的方面是它具有在請求時延遲操作或者事件的能力。比如,當你在寫數(shù)據(jù)到 remote peer 的過程中被意外暫停,你可以延遲執(zhí)行刷新操作,然后在遲些時候繼續(xù)。

下面顯示了 ChannelOutboundHandler 的方法(繼承自 ChannelHandler 未列出來)

Table 6.4 ChannelOutboundHandler methods

類型描述
bindInvoked on request to bind the Channel to a local address
connectInvoked on request to connect the Channel to the remote peer
disconnectInvoked on request to disconnect the Channel from the remote peer
closeInvoked on request to close the Channel
deregisterInvoked on request to deregister the Channel from its EventLoop
readInvoked on request to read more data from the Channel
flushInvoked on request to flush queued data to the remote peer through the Channel
writeInvoked on request to write data through the Channel to the remote peer

幾乎所有的方法都將 ChannelPromise 作為參數(shù),一旦請求結(jié)束要通過 ChannelPipeline 轉(zhuǎn)發(fā)的時候,必須通知此參數(shù)。

ChannelPromise vs. ChannelFuture

ChannelPromise 是 特殊的 ChannelFuture,允許你的 ChannelPromise 及其 操作 成功或失敗。所以任何時候調(diào)用例如 Channel.write(...) 一個新的 ChannelPromise將會創(chuàng)建并且通過 ChannelPipeline傳遞。這次寫操作本身將會返回 ChannelFuture, 這樣只允許你得到一次操作完成的通知。Netty 本身使用 ChannelPromise 作為返回的 ChannelFuture 的通知,事實上在大多數(shù)時候就是 ChannelPromise 自身(ChannelPromise 擴展了 ChannelFuture)

如前所述,ChannelOutboundHandlerAdapter 提供了一個實現(xiàn)了 ChannelOutboundHandler 所有基本方法的實現(xiàn)的框架。 這些簡單事件轉(zhuǎn)發(fā)到下一個 ChannelOutboundHandler 管道通過調(diào)用 ChannelHandlerContext 相關(guān)的等效方法。你可以根據(jù)需要自己實現(xiàn)想要的方法。

資源管理

當你通過 ChannelInboundHandler.channelRead(...) 或者 ChannelOutboundHandler.write(...) 來處理數(shù)據(jù),重要的是在處理資源時要確保資源不要泄漏。

Netty 使用引用計數(shù)器來處理池化的 ByteBuf。所以當 ByteBuf 完全處理后,要確保引用計數(shù)器被調(diào)整。

引用計數(shù)的權(quán)衡之一是用戶時必須小心使用消息。當 JVM 仍在 GC(不知道有這樣的消息引用計數(shù))這個消息,以至于可能是之前獲得的這個消息不會被放回池中。因此很可能,如果你不小心釋放這些消息,很可能會耗盡資源。

為了讓用戶更加簡單的找到遺漏的釋放,Netty 包含了一個 ResourceLeakDetector ,將會從已分配的緩沖區(qū) 1% 作為樣品來檢查是否存在在應用程序泄漏。因為 1% 的抽樣,開銷很小。

對于檢測泄漏,您將看到類似于下面的日志消息。

LEAK: ByteBuf.release() was not called before it’s garbage-collected. Enable advanced leak reporting to find out where the leak occurred. To enable advanced
leak reporting, specify the JVM option ’-Dio.netty.leakDetectionLevel=advanced’ or call ResourceLeakDetector.setLevel()

Relaunch your application with the JVM option mentioned above, then you’ll see the recent locations of your application where the leaked buffer was accessed. The following output shows a leak from our unit test (XmlFrameDecoderTest.testDecodeWithXml()):

Running io.netty.handler.codec.xml.XmlFrameDecoderTest

15:03:36.886 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK:
ByteBuf.release() was not called before it’s garbage-collected.

Recent access records: 1

#1:

io.netty.buffer.AdvancedLeakAwareByteBuf.toString(AdvancedLeakAwareByteBuf.java:697)

io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml(XmlFrameDecoderTest.java:157)
    io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages(XmlFrameDecoderTest.java:133)

泄漏檢測等級

Netty 現(xiàn)在定義了四種泄漏檢測等級,可以按需開啟,見下表

Table 6.5 Leak detection levels

Level DescriptionDISABLED
DisablesLeak detection completely. While this even eliminates the 1 % overhead you should only do this after extensive testing.
SIMPLETells if a leak was found or not. Again uses the sampling rate of 1%, the default level and a good fit for most cases.
ADVANCEDTells if a leak was found and where the message was accessed, using the sampling rate of 1%.
PARANOIDSame as level ADVANCED with the main difference that every access is sampled. This it has a massive impact on performance. Use this only in the debugging phase.

修改檢測等級,只需修改 io.netty.leakDetectionLevel 系統(tǒng)屬性,舉例

# java -Dio.netty.leakDetectionLevel=paranoid

這樣,我們就能在 ChannelInboundHandler.channelRead(...) 和 ChannelOutboundHandler.write(...) 避免泄漏。

當你處理 channelRead(...) 操作,并在消費消息(不是通過 ChannelHandlerContext.fireChannelRead(...) 來傳遞它到下個 ChannelInboundHandler) 時,要釋放它,如下:

Listing 6.3 Handler that consume inbound data

@ChannelHandler.Sharable
public class DiscardInboundHandler extends ChannelInboundHandlerAdapter {  //1

    @Override
    public void channelRead(ChannelHandlerContext ctx,
                                     Object msg) {
        ReferenceCountUtil.release(msg); //2
    }

}
  1. 繼承 ChannelInboundHandlerAdapter
  2. 使用 ReferenceCountUtil.release(...) 來釋放資源

所以記得,每次處理消息時,都要釋放它。

SimpleChannelInboundHandler -消費入站消息更容易

使用入站數(shù)據(jù)和釋放它是一項常見的任務,Netty 為你提供了一個特殊的稱為 SimpleChannelInboundHandler 的 ChannelInboundHandler 的實現(xiàn)。該實現(xiàn)將自動釋放一個消息,一旦這個消息被用戶通過channelRead0() 方法消費。

當你在處理寫操作,并丟棄消息時,你需要釋放它?,F(xiàn)在讓我們看下實際是如何操作的。

Listing 6.4 Handler to discard outbound data

@ChannelHandler.Sharable public class DiscardOutboundHandler extends ChannelOutboundHandlerAdapter { //1

@Override
public void write(ChannelHandlerContext ctx,
                                 Object msg, ChannelPromise promise) {
    ReferenceCountUtil.release(msg);  //2
    promise.setSuccess();    //3

}

}

  1. 繼承 ChannelOutboundHandlerAdapter
  2. 使用 ReferenceCountUtil.release(...) 來釋放資源
  3. 通知 ChannelPromise 數(shù)據(jù)已經(jīng)被處理

重要的是,釋放資源并通知 ChannelPromise。如果,ChannelPromise 沒有被通知到,這可能會引發(fā) ChannelFutureListener 不會被處理的消息通知的狀況。

所以,總結(jié)下:如果消息是被 消耗/丟棄 并不會被傳入下個 ChannelPipeline 的 ChannelOutboundHandler ,調(diào)用 ReferenceCountUtil.release(message) 。一旦消息經(jīng)過實際的傳輸,在消息被寫或者 Channel 關(guān)閉時,它將會自動釋放。


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號