介紹Netty的ChannelPipeline

2018-08-08 10:37 更新

ChannelPipeline 是一系列的ChannelHandler 實(shí)例,流經(jīng)一個(gè) Channel 的入站和出站事件可以被ChannelPipeline 攔截,ChannelPipeline能夠讓用戶自己對(duì)入站/出站事件的處理邏輯,以及pipeline里的各個(gè)Handler之間的交互進(jìn)行定義。

每當(dāng)一個(gè)新的Channel被創(chuàng)建了,都會(huì)建立一個(gè)新的 ChannelPipeline,并且這個(gè)新的 ChannelPipeline 還會(huì)綁定到Channel上。這個(gè)關(guān)聯(lián)是永久性的;Channel 既不能附上另一個(gè) ChannelPipeline 也不能分離當(dāng)前這個(gè)。這些都由Netty負(fù)責(zé)完成,,而無需開發(fā)人員的特別處理。

根據(jù)它的起源,一個(gè)事件將由 ChannelInboundHandler 或 ChannelOutboundHandler 處理。隨后它將調(diào)用 ChannelHandlerContext 實(shí)現(xiàn)轉(zhuǎn)發(fā)到下一個(gè)相同的超類型的處理程序。

ChannelHandlerContext

一個(gè) ChannelHandlerContext 使 ChannelHandler 與 ChannelPipeline 和 其他處理程序交互。一個(gè)處理程序可以通知下一個(gè) ChannelPipeline 中的 ChannelHandler 甚至動(dòng)態(tài)修改 ChannelPipeline 的歸屬。

下圖展示了用于入站和出站 ChannelHandler 的 典型 ChannelPipeline 布局。

Figure%206

Figure 6.2 ChannelPipeline and ChannelHandlers

上圖說明了 ChannelPipeline 主要是一系列 ChannelHandler。通過ChannelPipeline ChannelPipeline 還提供了方法傳播事件本身。如果一個(gè)入站事件被觸發(fā),它將被傳遞的從 ChannelPipeline 開始到結(jié)束。舉個(gè)例子,在這個(gè)圖中出站 I/O 事件將從 ChannelPipeline 右端開始一直處理到左邊。

ChannelPipeline 相對(duì)論

你可能會(huì)說,從 ChannelPipeline 事件傳遞的角度來看,ChannelPipeline 的“開始” 取決于是否入站或出站事件。然而,Netty 總是指 ChannelPipeline 入站口(圖中的左邊)為“開始”,出站口(右邊)作為“結(jié)束”。當(dāng)我們完成使用 ChannelPipeline.add() 添加混合入站和出站處理程序,每個(gè) ChannelHandler 的“順序”是它的地位從“開始”到“結(jié)束”正如我們剛才定義的。因此,如果我們?cè)趫D6.1處理程序按順序從左到右第一個(gè)ChannelHandler被一個(gè)入站事件將是#1,第一個(gè)處理程序被出站事件將是#5*

隨著管道傳播事件,它決定下個(gè) ChannelHandler 是否是相匹配的方向運(yùn)動(dòng)的類型。如果沒有,ChannelPipeline 跳過 ChannelHandler 并繼續(xù)下一個(gè)合適的方向。記住,一個(gè)處理程序可能同時(shí)實(shí)現(xiàn)ChannelInboundHandler 和 ChannelOutboundHandler 接口。

修改 ChannelPipeline

ChannelHandler 可以實(shí)時(shí)修改 ChannelPipeline 的布局,通過添加、移除、替換其他 ChannelHandler(也可以從 ChannelPipeline 移除 ChannelHandler 自身)。這個(gè) 是 ChannelHandler 重要的功能之一。

Table 6.6 ChannelHandler methods for modifying a ChannelPipeline

名稱描述
addFirst addBefore addAfter addLast添加 ChannelHandler 到 ChannelPipeline.
Remove從 ChannelPipeline 移除 ChannelHandler.
Replace在 ChannelPipeline 替換另外一個(gè) ChannelHandler

下面展示了操作

Listing 6.5 Modify the ChannelPipeline

ChannelPipeline pipeline = null; // get reference to pipeline;
FirstHandler firstHandler = new FirstHandler(); //1
pipeline.addLast("handler1", firstHandler); //2
pipeline.addFirst("handler2", new SecondHandler()); //3
pipeline.addLast("handler3", new ThirdHandler()); //4

pipeline.remove("handler3"); //5
pipeline.remove(firstHandler); //6 

pipeline.replace("handler2", "handler4", new ForthHandler()); //6
  1. 創(chuàng)建一個(gè) FirstHandler 實(shí)例
  2. 添加該實(shí)例作為 "handler1" 到 ChannelPipeline
  3. 添加 SecondHandler 實(shí)例作為 "handler2" 到 ChannelPipeline 的第一個(gè)槽,這意味著它將替換之前已經(jīng)存在的 "handler1"
  4. 添加 ThirdHandler 實(shí)例作為"handler3" 到 ChannelPipeline 的最后一個(gè)槽
  5. 通過名稱移除 "handler3"
  6. 通過引用移除 FirstHandler (因?yàn)橹挥幸粋€(gè),所以可以不用關(guān)聯(lián)名字 "handler1").
  7. 將作為"handler2"的 SecondHandler 實(shí)例替換為作為 "handler4"的 FourthHandler

以后我們將看到,這種輕松添加、移除和替換 ChannelHandler 能力, 適合非常靈活的實(shí)現(xiàn)邏輯。

ChannelHandler 執(zhí)行 ChannelPipeline 和阻塞

通常每個(gè) ChannelHandler 添加到 ChannelPipeline 將處理事件 傳遞到 EventLoop( I/O 的線程)。至關(guān)重要的是不要阻塞這個(gè)線程, 它將會(huì)負(fù)面影響的整體處理I/O。 有時(shí)可能需要使用阻塞 api 接口來處理遺留代碼。對(duì)于這個(gè)情況下,ChannelPipeline 已有 add() 方法,它接受一個(gè)EventExecutorGroup。如果一個(gè)定制的 EventExecutorGroup 傳入事件將由含在這個(gè) EventExecutorGroup 中的 EventExecutor之一來處理,并且從 Channel 的 EventLoop 本身離開。一個(gè)默認(rèn)實(shí)現(xiàn),稱為來自 Netty 的 DefaultEventExecutorGroup

除了上述操作,其他訪問 ChannelHandler 的方法如下:

Table 6.7 ChannelPipeline operations for retrieving ChannelHandlers

名稱描述
get(...)Return a ChannelHandler by type or name
context(...)Return the ChannelHandlerContext bound to a ChannelHandler.
names() iterator()Return the names or of all the ChannelHander in the ChannelPipeline.

發(fā)送事件

ChannelPipeline API 有額外調(diào)用入站和出站操作的方法。下表列出了入站操作,用于通知 ChannelPipeline 中 ChannelInboundHandlers 正在發(fā)生的事件

Table 6.8 Inbound operations on ChannelPipeline

名稱描述
fireChannelRegisteredCalls channelRegistered(ChannelHandlerContext) on the next ChannelInboundHandler in the ChannelPipeline.
fireChannelUnregisteredCalls channelUnregistered(ChannelHandlerContext) on the next ChannelInboundHandler in the ChannelPipeline.
fireChannelActiveCalls channelActive(ChannelHandlerContext) on the next ChannelInboundHandler in the ChannelPipeline.
fireChannelInactiveCalls channelInactive(ChannelHandlerContext)on the next ChannelInboundHandler in the ChannelPipeline.
fireExceptionCaughtCalls exceptionCaught(ChannelHandlerContext, Throwable) on the next ChannelHandler in the ChannelPipeline.
fireUserEventTriggeredCalls userEventTriggered(ChannelHandlerContext, Object) on the next ChannelInboundHandler in the ChannelPipeline.
fireChannelReadCalls channelRead(ChannelHandlerContext, Object msg) on the next ChannelInboundHandler in the ChannelPipeline.
fireChannelReadCompleteCalls channelReadComplete(ChannelHandlerContext) on the next ChannelStateHandler in the ChannelPipeline.

在出站方面,處理一個(gè)事件將導(dǎo)致底層套接字的一些行動(dòng)。下表列出了ChannelPipeline API 出站的操作。

Table 6.9 Outbound operations on ChannelPipeline

名稱描述
bindBind the Channel to a local address. This will call bind(ChannelHandlerContext, SocketAddress, ChannelPromise) on the next ChannelOutboundHandler in the ChannelPipeline.
connectConnect the Channel to a remote address. This will call connect(ChannelHandlerContext, SocketAddress,ChannelPromise) on the next ChannelOutboundHandler in the ChannelPipeline.
disconnectDisconnect the Channel. This will call disconnect(ChannelHandlerContext, ChannelPromise) on the next ChannelOutboundHandler in the ChannelPipeline.
closeClose the Channel. This will call close(ChannelHandlerContext,ChannelPromise) on the next ChannelOutboundHandler in the ChannelPipeline.
deregisterDeregister the Channel from the previously assigned EventExecutor (the EventLoop). This will call deregister(ChannelHandlerContext,ChannelPromise) on the next ChannelOutboundHandler in the ChannelPipeline.
flushFlush all pending writes of the Channel. This will call flush(ChannelHandlerContext) on the next ChannelOutboundHandler in the ChannelPipeline.
writeWrite a message to the Channel. This will call write(ChannelHandlerContext, Object msg, ChannelPromise) on the next ChannelOutboundHandler in the ChannelPipeline. Note: this does not write the message to the underlying Socket, but only queues it. To write it to the Socket call flush() or writeAndFlush().
writeAndFlushConvenience method for calling write() then flush().
readRequests to read more data from the Channel. This will call read(ChannelHandlerContext) on the next ChannelOutboundHandler in the ChannelPipeline.

總結(jié)下:

  • 一個(gè) ChannelPipeline 是用來保存關(guān)聯(lián)到一個(gè) Channel 的ChannelHandler
  • 可以修改 ChannelPipeline 通過動(dòng)態(tài)添加和刪除 ChannelHandler
  • ChannelPipeline 有著豐富的API調(diào)用動(dòng)作來回應(yīng)入站和出站事件。


以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)