基于Netty傳輸的API

2018-08-08 10:29 更新

在上節(jié)中我們通過對比了解到了使用基于Netty傳輸的好處,那么在本節(jié)中我們就來看看Transport API是如何工作的。

Transport API 的核心是 Channel 接口,用于所有的出站操作,見下圖

Figure%204

如上圖所示,每個 Channel 都會分配一個 ChannelPipeline 和ChannelConfig。ChannelConfig 負責設置并存儲 Channel 的配置,并允許在運行期間更新它們。傳輸一般有特定的配置設置,可能實現了 ChannelConfig. 的子類型。

ChannelPipeline 容納了使用的 ChannelHandler 實例,這些ChannelHandler 將處理通道傳遞的“入站”和“出站”數據以及事件。ChannelHandler 的實現允許你改變數據狀態(tài)和傳輸數據。

現在我們可以使用 ChannelHandler 做下面一些事情:

  • 傳輸數據時,將數據從一種格式轉換到另一種格式
  • 異常通知
  • Channel 變?yōu)?active(活動) 或 inactive(非活動) 時獲得通知* Channel 被注冊或注銷時從 EventLoop 中獲得通知
  • 通知用戶特定事件

Intercepting Filter(攔截過濾器)

ChannelPipeline 實現了常用的 Intercepting Filter(攔截過濾器)設計模式。UNIX管道是另一例子:命令鏈接在一起,一個命令的輸出連接到 的下一行中的輸入。

你還可以在運行時根據需要添加 ChannelHandler 實例到ChannelPipeline 或從 ChannelPipeline 中刪除,這能幫助我們構建高度靈活的 Netty 程序。例如,你可以支持 STARTTLS 協議,只需通過加入適當的 ChannelHandler(這里是 SslHandler)到的ChannelPipeline 中,當被請求這個協議時。

此外,訪問指定的 ChannelPipeline 和 ChannelConfig,你能在Channel 自身上進行操作。Channel 提供了很多方法,如下列表:

Table 4.1 Channel main methods

方法名稱描述
eventLoop()返回分配給Channel的EventLoop
pipeline()返回分配給Channel的ChannelPipeline
isActive()返回Channel是否激活,已激活說明與遠程連接對等
localAddress()返回已綁定的本地SocketAddress
remoteAddress()返回已綁定的遠程SocketAddress
write()寫數據到遠程客戶端,數據通過ChannelPipeline傳輸過去
flush()刷新先前的數據
writeAndFlush(...)一個方便的方法用戶調用write(...)而后調用y flush()

后面會越來越熟悉這些方法,現在只需要記住我們的操作都是在相同的接口上運行,Netty 的高靈活性讓你可以以不同的傳輸實現進行重構。

寫數據到遠程已連接客戶端可以調用Channel.write()方法,如下代碼:

Listing 4.5 Writing to a channel

Channel channel = ...; // 獲取channel的引用
ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8);            //1
ChannelFuture cf = channel.writeAndFlush(buf); //2

cf.addListener(new ChannelFutureListener() {    //3
    @Override
    public void operationComplete(ChannelFuture future) {
        if (future.isSuccess()) {                //4
            System.out.println("Write successful");
        } else {
            System.err.println("Write error");    //5
            future.cause().printStackTrace();
        }
    }
});

1.創(chuàng)建 ByteBuf 保存寫的數據

2.寫數據,并刷新

3.添加 ChannelFutureListener 即可寫操作完成后收到通知,

4.寫操作沒有錯誤完成

5.寫操作完成時出現錯誤

Channel 是線程安全(thread-safe)的,它可以被多個不同的線程安全的操作,在多線程環(huán)境下,所有的方法都是安全的。正因為 Channel 是安全的,我們存儲對Channel的引用,并在學習的時候使用它寫入數據到遠程已連接的客戶端,使用多線程也是如此。下面的代碼是一個簡單的多線程例子:

Listing 4.6 Using the channel from many threads

final Channel channel = ...; // 獲取channel的引用
final ByteBuf buf = Unpooled.copiedBuffer("your data",
        CharsetUtil.UTF_8).retain();    //1
Runnable writer = new Runnable() {        //2
    @Override
    public void run() {
        channel.writeAndFlush(buf.duplicate());
    }
};
Executor executor = Executors.newCachedThreadPool();//3

//寫進一個線程
executor.execute(writer);        //4

//寫進另外一個線程
executor.execute(writer);        //5

1.創(chuàng)建一個 ByteBuf 保存寫的數據

2.創(chuàng)建 Runnable 用于寫數據到 channel

3.獲取 Executor 的引用使用線程來執(zhí)行任務

4.手寫一個任務,在一個線程中執(zhí)行

5.手寫另一個任務,在另一個線程中執(zhí)行


以上內容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號