Netty中的ChannelHandler 家族

2018-08-08 10:36 更新

在我們深入研究 ChannelHandler 內(nèi)部之前,讓我們花幾分鐘了解下這個(gè) Netty 組件模型的基礎(chǔ)。這里先對(duì)ChannelHandler 及其子類做個(gè)簡(jiǎn)單的介紹。

Channel 生命周期

Channel 有個(gè)簡(jiǎn)單但強(qiáng)大的狀態(tài)模型,與 ChannelInboundHandler API 密切相關(guān)。下面表格是 Channel 的四個(gè)狀態(tài)

Table 6.1 Channel lifeycle states

狀態(tài)描述
channelUnregisteredchannel已創(chuàng)建但未注冊(cè)到一個(gè) EventLoop.
channelRegisteredchannel 注冊(cè)到一個(gè) EventLoop.
channelActivechannel 變?yōu)榛钴S狀態(tài)(連接到了遠(yuǎn)程主機(jī)),現(xiàn)在可以接收和發(fā)送數(shù)據(jù)了
channelInactivechannel 處于非活躍狀態(tài),沒有連接到遠(yuǎn)程主機(jī)

Channel 的正常的生命周期如下圖,當(dāng)狀態(tài)出現(xiàn)變化,就會(huì)觸發(fā)對(duì)應(yīng)的事件,這樣就能與 ChannelPipeline 中的 ChannelHandler進(jìn)行及時(shí)的交互。

Figure 6.1 Channel State Model

Figure%206

ChannelHandler 生命周期

ChannelHandler 定義的生命周期操作如下表,當(dāng) ChannelHandler 添加到 ChannelPipeline,或者從 ChannelPipeline 移除后,對(duì)應(yīng)的方法將會(huì)被調(diào)用。每個(gè)方法都傳入了一個(gè) ChannelHandlerContext 參數(shù)

Table 6.2 ChannelHandler lifecycle methods

類型描述
handlerAdded當(dāng) ChannelHandler 添加到 ChannelPipeline 調(diào)用
handlerRemoved當(dāng) ChannelHandler 從 ChannelPipeline 移除時(shí)調(diào)用
exceptionCaught當(dāng) ChannelPipeline 執(zhí)行拋出異常時(shí)調(diào)用

ChannelHandler 子接口

Netty 提供2個(gè)重要的 ChannelHandler 子接口:

  • ChannelInboundHandler - 處理進(jìn)站數(shù)據(jù)和所有狀態(tài)更改事件
  • ChannelOutboundHandler - 處理出站數(shù)據(jù),允許攔截各種操作

ChannelHandler 適配器

Netty 提供了一個(gè)簡(jiǎn)單的 ChannelHandler 框架實(shí)現(xiàn),給所有聲明方法簽名。這個(gè)類 ChannelHandlerAdapter 的方法,主要推送事件 到 pipeline 下個(gè) ChannelHandler 直到 pipeline 的結(jié)束。這個(gè)類 也作為 ChannelInboundHandlerAdapter 和ChannelOutboundHandlerAdapter 的基礎(chǔ)。所有三個(gè)適配器類的目的是作為自己的實(shí)現(xiàn)的起點(diǎn);您可以擴(kuò)展它們,覆蓋你需要自定義的方法。

ChannelInboundHandler

ChannelInboundHandler 的生命周期方法在下表中,當(dāng)接收到數(shù)據(jù)或者與之關(guān)聯(lián)的 Channel 狀態(tài)改變時(shí)調(diào)用。之前已經(jīng)注意到了,這些方法與 Channel 的生命周期接近

Table 6.3 ChannelInboundHandler methods

類型描述
channelRegisteredInvoked when a Channel is registered to its EventLoop and is able to handle I/O.
channelUnregisteredInvoked when a Channel is deregistered from its EventLoop and cannot handle any I/O.
channelActiveInvoked when a Channel is active; the Channel is connected/bound and ready.
channelInactiveInvoked when a Channel leaves active state and is no longer connected to its remote peer.
channelReadCompleteInvoked when a read operation on the Channel has completed.
channelReadInvoked if data are read from the Channel.
channelWritabilityChangedInvoked when the writability state of the Channel changes. The user can ensure writes are not done too fast (with risk of an OutOfMemoryError) or can resume writes when the Channel becomes writable again.Channel.isWritable() can be used to detect the actual writability of the channel. The threshold for writability can be set via Channel.config().setWriteHighWaterMark() and Channel.config().setWriteLowWaterMark().
userEventTriggered(...)Invoked when a user calls Channel.fireUserEventTriggered(...) to pass a pojo through the ChannelPipeline. This can be used to pass user specific events through the ChannelPipeline and so allow handling those events.

注意,ChannelInboundHandler 實(shí)現(xiàn)覆蓋了 channelRead() 方法處理進(jìn)來的數(shù)據(jù)用來響應(yīng)釋放資源。Netty 在 ByteBuf 上使用了資源池,所以當(dāng)執(zhí)行釋放資源時(shí)可以減少內(nèi)存的消耗。

Listing 6.1 Handler to discard data

@ChannelHandler.Sharable
public class DiscardHandler extends ChannelInboundHandlerAdapter {        //1

    @Override
    public void channelRead(ChannelHandlerContext ctx,
                                     Object msg) {
        ReferenceCountUtil.release(msg); //2
    }

}

1.擴(kuò)展 ChannelInboundHandlerAdapter

2.ReferenceCountUtil.release() 來丟棄收到的信息

Netty 用一個(gè) WARN-level 日志條目記錄未釋放的資源,使其能相當(dāng)簡(jiǎn)單地找到代碼中的違規(guī)實(shí)例。然而,由于手工管理資源會(huì)很繁瑣,您可以通過使用 SimpleChannelInboundHandler 簡(jiǎn)化問題。如下:

Listing 6.2 Handler to discard data

@ChannelHandler.Sharable
public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> {  //1

    @Override
    public void channelRead0(ChannelHandlerContext ctx,
                                     Object msg) {
        // No need to do anything special //2
    }

}

1.擴(kuò)展 SimpleChannelInboundHandler

2.不需做特別的釋放資源的動(dòng)作

注意 SimpleChannelInboundHandler 會(huì)自動(dòng)釋放資源,而無需存儲(chǔ)任何信息的引用。

更多詳見 “Error! Reference source not found..” 一節(jié)

ChannelOutboundHandler

ChannelOutboundHandler 提供了出站操作時(shí)調(diào)用的方法。這些方法會(huì)被 Channel, ChannelPipeline, 和 ChannelHandlerContext 調(diào)用。

ChannelOutboundHandler 另個(gè)一個(gè)強(qiáng)大的方面是它具有在請(qǐng)求時(shí)延遲操作或者事件的能力。比如,當(dāng)你在寫數(shù)據(jù)到 remote peer 的過程中被意外暫停,你可以延遲執(zhí)行刷新操作,然后在遲些時(shí)候繼續(xù)。

下面顯示了 ChannelOutboundHandler 的方法(繼承自 ChannelHandler 未列出來)

Table 6.4 ChannelOutboundHandler methods

類型描述
bindInvoked on request to bind the Channel to a local address
connectInvoked on request to connect the Channel to the remote peer
disconnectInvoked on request to disconnect the Channel from the remote peer
closeInvoked on request to close the Channel
deregisterInvoked on request to deregister the Channel from its EventLoop
readInvoked on request to read more data from the Channel
flushInvoked on request to flush queued data to the remote peer through the Channel
writeInvoked on request to write data through the Channel to the remote peer

幾乎所有的方法都將 ChannelPromise 作為參數(shù),一旦請(qǐng)求結(jié)束要通過 ChannelPipeline 轉(zhuǎn)發(fā)的時(shí)候,必須通知此參數(shù)。

ChannelPromise vs. ChannelFuture

ChannelPromise 是 特殊的 ChannelFuture,允許你的 ChannelPromise 及其 操作 成功或失敗。所以任何時(shí)候調(diào)用例如 Channel.write(...) 一個(gè)新的 ChannelPromise將會(huì)創(chuàng)建并且通過 ChannelPipeline傳遞。這次寫操作本身將會(huì)返回 ChannelFuture, 這樣只允許你得到一次操作完成的通知。Netty 本身使用 ChannelPromise 作為返回的 ChannelFuture 的通知,事實(shí)上在大多數(shù)時(shí)候就是 ChannelPromise 自身(ChannelPromise 擴(kuò)展了 ChannelFuture)

如前所述,ChannelOutboundHandlerAdapter 提供了一個(gè)實(shí)現(xiàn)了 ChannelOutboundHandler 所有基本方法的實(shí)現(xiàn)的框架。 這些簡(jiǎn)單事件轉(zhuǎn)發(fā)到下一個(gè) ChannelOutboundHandler 管道通過調(diào)用 ChannelHandlerContext 相關(guān)的等效方法。你可以根據(jù)需要自己實(shí)現(xiàn)想要的方法。

資源管理

當(dāng)你通過 ChannelInboundHandler.channelRead(...) 或者 ChannelOutboundHandler.write(...) 來處理數(shù)據(jù),重要的是在處理資源時(shí)要確保資源不要泄漏。

Netty 使用引用計(jì)數(shù)器來處理池化的 ByteBuf。所以當(dāng) ByteBuf 完全處理后,要確保引用計(jì)數(shù)器被調(diào)整。

引用計(jì)數(shù)的權(quán)衡之一是用戶時(shí)必須小心使用消息。當(dāng) JVM 仍在 GC(不知道有這樣的消息引用計(jì)數(shù))這個(gè)消息,以至于可能是之前獲得的這個(gè)消息不會(huì)被放回池中。因此很可能,如果你不小心釋放這些消息,很可能會(huì)耗盡資源。

為了讓用戶更加簡(jiǎn)單的找到遺漏的釋放,Netty 包含了一個(gè) ResourceLeakDetector ,將會(huì)從已分配的緩沖區(qū) 1% 作為樣品來檢查是否存在在應(yīng)用程序泄漏。因?yàn)?1% 的抽樣,開銷很小。

對(duì)于檢測(cè)泄漏,您將看到類似于下面的日志消息。

LEAK: ByteBuf.release() was not called before it’s garbage-collected. Enable advanced leak reporting to find out where the leak occurred. To enable advanced
leak reporting, specify the JVM option ’-Dio.netty.leakDetectionLevel=advanced’ or call ResourceLeakDetector.setLevel()

Relaunch your application with the JVM option mentioned above, then you’ll see the recent locations of your application where the leaked buffer was accessed. The following output shows a leak from our unit test (XmlFrameDecoderTest.testDecodeWithXml()):

Running io.netty.handler.codec.xml.XmlFrameDecoderTest

15:03:36.886 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK:
ByteBuf.release() was not called before it’s garbage-collected.

Recent access records: 1

#1:

io.netty.buffer.AdvancedLeakAwareByteBuf.toString(AdvancedLeakAwareByteBuf.java:697)

io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml(XmlFrameDecoderTest.java:157)
    io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages(XmlFrameDecoderTest.java:133)

泄漏檢測(cè)等級(jí)

Netty 現(xiàn)在定義了四種泄漏檢測(cè)等級(jí),可以按需開啟,見下表

Table 6.5 Leak detection levels

Level DescriptionDISABLED
DisablesLeak detection completely. While this even eliminates the 1 % overhead you should only do this after extensive testing.
SIMPLETells if a leak was found or not. Again uses the sampling rate of 1%, the default level and a good fit for most cases.
ADVANCEDTells if a leak was found and where the message was accessed, using the sampling rate of 1%.
PARANOIDSame as level ADVANCED with the main difference that every access is sampled. This it has a massive impact on performance. Use this only in the debugging phase.

修改檢測(cè)等級(jí),只需修改 io.netty.leakDetectionLevel 系統(tǒng)屬性,舉例

# java -Dio.netty.leakDetectionLevel=paranoid

這樣,我們就能在 ChannelInboundHandler.channelRead(...) 和 ChannelOutboundHandler.write(...) 避免泄漏。

當(dāng)你處理 channelRead(...) 操作,并在消費(fèi)消息(不是通過 ChannelHandlerContext.fireChannelRead(...) 來傳遞它到下個(gè) ChannelInboundHandler) 時(shí),要釋放它,如下:

Listing 6.3 Handler that consume inbound data

@ChannelHandler.Sharable
public class DiscardInboundHandler extends ChannelInboundHandlerAdapter {  //1

    @Override
    public void channelRead(ChannelHandlerContext ctx,
                                     Object msg) {
        ReferenceCountUtil.release(msg); //2
    }

}
  1. 繼承 ChannelInboundHandlerAdapter
  2. 使用 ReferenceCountUtil.release(...) 來釋放資源

所以記得,每次處理消息時(shí),都要釋放它。

SimpleChannelInboundHandler -消費(fèi)入站消息更容易

使用入站數(shù)據(jù)和釋放它是一項(xiàng)常見的任務(wù),Netty 為你提供了一個(gè)特殊的稱為 SimpleChannelInboundHandler 的 ChannelInboundHandler 的實(shí)現(xiàn)。該實(shí)現(xiàn)將自動(dòng)釋放一個(gè)消息,一旦這個(gè)消息被用戶通過channelRead0() 方法消費(fèi)。

當(dāng)你在處理寫操作,并丟棄消息時(shí),你需要釋放它?,F(xiàn)在讓我們看下實(shí)際是如何操作的。

Listing 6.4 Handler to discard outbound data

@ChannelHandler.Sharable public class DiscardOutboundHandler extends ChannelOutboundHandlerAdapter { //1

@Override
public void write(ChannelHandlerContext ctx,
                                 Object msg, ChannelPromise promise) {
    ReferenceCountUtil.release(msg);  //2
    promise.setSuccess();    //3

}

}

  1. 繼承 ChannelOutboundHandlerAdapter
  2. 使用 ReferenceCountUtil.release(...) 來釋放資源
  3. 通知 ChannelPromise 數(shù)據(jù)已經(jīng)被處理

重要的是,釋放資源并通知 ChannelPromise。如果,ChannelPromise 沒有被通知到,這可能會(huì)引發(fā) ChannelFutureListener 不會(huì)被處理的消息通知的狀況。

所以,總結(jié)下:如果消息是被 消耗/丟棄 并不會(huì)被傳入下個(gè) ChannelPipeline 的 ChannelOutboundHandler ,調(diào)用 ReferenceCountUtil.release(message) 。一旦消息經(jīng)過實(shí)際的傳輸,在消息被寫或者 Channel 關(guān)閉時(shí),它將會(huì)自動(dòng)釋放。


以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)