Netty如何編寫大型數(shù)據(jù)

2018-08-08 10:43 更新

出于網(wǎng)絡的原因,有一個特殊的問題需要我們思考,就是如何能夠有效的在異步框架寫大數(shù)據(jù)。因為寫操作是非阻塞的,即使不能寫出數(shù)據(jù),也只是通知 ChannelFuture 完成了。每當發(fā)生這種情況,就必須停止寫操作或面臨內(nèi)存耗盡的風險。所以在進行寫操作的時候,會產(chǎn)生的大量的數(shù)據(jù),在這種情況下我們要準備好處理因為連接遠端緩慢而導致的延遲釋放內(nèi)存的問題。作為一個例子讓我們考慮寫一個文件的內(nèi)容到網(wǎng)絡。

我們討論傳輸?shù)臅r候,有提到 NIO 的“zero-copy(零拷貝)”功能,消除移動一個文件的內(nèi)容從文件系統(tǒng)到網(wǎng)絡堆棧的復制步驟。所有這一切發(fā)生在 Netty 的核心,因此所有所需的應用程序代碼是使用 interface FileRegion 的實現(xiàn),在 Netty 的API 文檔中定義如下為一個通過 Channel 支持 zero-copy 文件傳輸?shù)奈募^(qū)域。

下面演示了通過 zero-copy 將文件內(nèi)容從 FileInputStream 創(chuàng)建 DefaultFileRegion 并寫入 使用 Channel

Listing 8.11 Transferring file contents with FileRegion

FileInputStream in = new FileInputStream(file); //1
FileRegion region = new DefaultFileRegion(in.getChannel(), 0, file.length()); //2

channel.writeAndFlush(region).addListener(new ChannelFutureListener() { //3
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        if (!future.isSuccess()) {
            Throwable cause = future.cause(); //4
            // Do something
        }
    }
});
  1. 獲取 FileInputStream
  2. 創(chuàng)建一個新的 DefaultFileRegion 用于文件的完整長度
  3. 發(fā)送 DefaultFileRegion 并且注冊一個 ChannelFutureListener
  4. 處理發(fā)送失敗

只是看到的例子只適用于直接傳輸一個文件的內(nèi)容,沒有執(zhí)行的數(shù)據(jù)應用程序的處理。在相反的情況下,將數(shù)據(jù)從文件系統(tǒng)復制到用戶內(nèi)存是必需的,您可以使用 ChunkedWriteHandler。這個類提供了支持異步寫大數(shù)據(jù)流不引起高內(nèi)存消耗。

這個關(guān)鍵是 interface ChunkedInput,實現(xiàn)如下:

名稱描述
ChunkedFile當你使用平臺不支持 zero-copy 或者你需要轉(zhuǎn)換數(shù)據(jù),從文件中一塊一塊的獲取數(shù)據(jù)
ChunkedNioFile與 ChunkedFile 類似,處理使用了NIOFileChannel
ChunkedStream從 InputStream 中一塊一塊的轉(zhuǎn)移內(nèi)容
ChunkedNioStream從 ReadableByteChannel 中一塊一塊的轉(zhuǎn)移內(nèi)容

清單 8.12 演示了使用 ChunkedStream,實現(xiàn)在實踐中最常用。 所示的類被實例化一個 File 和一個 SslContext。當 initChannel() 被調(diào)用來初始化顯示的處理程序鏈的通道。

當通道激活時,WriteStreamHandler 從文件一塊一塊的寫入數(shù)據(jù)作為ChunkedStream。最后將數(shù)據(jù)通過 SslHandler 加密后傳播。

Listing 8.12 Transfer file content with FileRegion

public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {
    private final File file;
    private final SslContext sslCtx;

    public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) {
        this.file = file;
        this.sslCtx = sslCtx;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new SslHandler(sslCtx.createEngine()); //1
        pipeline.addLast(new ChunkedWriteHandler());//2
        pipeline.addLast(new WriteStreamHandler());//3
    }

    public final class WriteStreamHandler extends ChannelInboundHandlerAdapter {  //4

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
            ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
        }
    }
}
  1. 添加 SslHandler 到 ChannelPipeline.
  2. 添加 ChunkedWriteHandler 用來處理作為 ChunkedInput 傳進的數(shù)據(jù)
  3. 當連接建立時,WriteStreamHandler 開始寫文件的內(nèi)容
  4. 當連接建立時,channelActive() 觸發(fā)使用 ChunkedInput 來寫文件的內(nèi)容 (插圖顯示了 FileInputStream;也可以使用任何 InputStream )

ChunkedInput 所有被要求使用自己的 ChunkedInput 實現(xiàn),是安裝ChunkedWriteHandler 在管道中

在本節(jié)中,我們討論

  • 如何采用zero-copy(零拷貝)功能高效地傳輸文件
  • 如何使用 ChunkedWriteHandler 編寫大型數(shù)據(jù)而避免 OutOfMemoryErrors 錯誤。

在下一節(jié)中我們將研究幾種不同方法來序列化 POJO。


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號