Netty如何編寫(xiě)大型數(shù)據(jù)

2018-08-08 10:43 更新

出于網(wǎng)絡(luò)的原因,有一個(gè)特殊的問(wèn)題需要我們思考,就是如何能夠有效的在異步框架寫(xiě)大數(shù)據(jù)。因?yàn)閷?xiě)操作是非阻塞的,即使不能寫(xiě)出數(shù)據(jù),也只是通知 ChannelFuture 完成了。每當(dāng)發(fā)生這種情況,就必須停止寫(xiě)操作或面臨內(nèi)存耗盡的風(fēng)險(xiǎn)。所以在進(jìn)行寫(xiě)操作的時(shí)候,會(huì)產(chǎn)生的大量的數(shù)據(jù),在這種情況下我們要準(zhǔn)備好處理因?yàn)檫B接遠(yuǎn)端緩慢而導(dǎo)致的延遲釋放內(nèi)存的問(wèn)題。作為一個(gè)例子讓我們考慮寫(xiě)一個(gè)文件的內(nèi)容到網(wǎng)絡(luò)。

我們討論傳輸?shù)臅r(shí)候,有提到 NIO 的“zero-copy(零拷貝)”功能,消除移動(dòng)一個(gè)文件的內(nèi)容從文件系統(tǒng)到網(wǎng)絡(luò)堆棧的復(fù)制步驟。所有這一切發(fā)生在 Netty 的核心,因此所有所需的應(yīng)用程序代碼是使用 interface FileRegion 的實(shí)現(xiàn),在 Netty 的API 文檔中定義如下為一個(gè)通過(guò) Channel 支持 zero-copy 文件傳輸?shù)奈募^(qū)域。

下面演示了通過(guò) zero-copy 將文件內(nèi)容從 FileInputStream 創(chuàng)建 DefaultFileRegion 并寫(xiě)入 使用 Channel

Listing 8.11 Transferring file contents with FileRegion

FileInputStream in = new FileInputStream(file); //1
FileRegion region = new DefaultFileRegion(in.getChannel(), 0, file.length()); //2

channel.writeAndFlush(region).addListener(new ChannelFutureListener() { //3
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        if (!future.isSuccess()) {
            Throwable cause = future.cause(); //4
            // Do something
        }
    }
});
  1. 獲取 FileInputStream
  2. 創(chuàng)建一個(gè)新的 DefaultFileRegion 用于文件的完整長(zhǎng)度
  3. 發(fā)送 DefaultFileRegion 并且注冊(cè)一個(gè) ChannelFutureListener
  4. 處理發(fā)送失敗

只是看到的例子只適用于直接傳輸一個(gè)文件的內(nèi)容,沒(méi)有執(zhí)行的數(shù)據(jù)應(yīng)用程序的處理。在相反的情況下,將數(shù)據(jù)從文件系統(tǒng)復(fù)制到用戶內(nèi)存是必需的,您可以使用 ChunkedWriteHandler。這個(gè)類(lèi)提供了支持異步寫(xiě)大數(shù)據(jù)流不引起高內(nèi)存消耗。

這個(gè)關(guān)鍵是 interface ChunkedInput,實(shí)現(xiàn)如下:

名稱描述
ChunkedFile當(dāng)你使用平臺(tái)不支持 zero-copy 或者你需要轉(zhuǎn)換數(shù)據(jù),從文件中一塊一塊的獲取數(shù)據(jù)
ChunkedNioFile與 ChunkedFile 類(lèi)似,處理使用了NIOFileChannel
ChunkedStream從 InputStream 中一塊一塊的轉(zhuǎn)移內(nèi)容
ChunkedNioStream從 ReadableByteChannel 中一塊一塊的轉(zhuǎn)移內(nèi)容

清單 8.12 演示了使用 ChunkedStream,實(shí)現(xiàn)在實(shí)踐中最常用。 所示的類(lèi)被實(shí)例化一個(gè) File 和一個(gè) SslContext。當(dāng) initChannel() 被調(diào)用來(lái)初始化顯示的處理程序鏈的通道。

當(dāng)通道激活時(shí),WriteStreamHandler 從文件一塊一塊的寫(xiě)入數(shù)據(jù)作為ChunkedStream。最后將數(shù)據(jù)通過(guò) SslHandler 加密后傳播。

Listing 8.12 Transfer file content with FileRegion

public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {
    private final File file;
    private final SslContext sslCtx;

    public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) {
        this.file = file;
        this.sslCtx = sslCtx;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new SslHandler(sslCtx.createEngine()); //1
        pipeline.addLast(new ChunkedWriteHandler());//2
        pipeline.addLast(new WriteStreamHandler());//3
    }

    public final class WriteStreamHandler extends ChannelInboundHandlerAdapter {  //4

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
            ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
        }
    }
}
  1. 添加 SslHandler 到 ChannelPipeline.
  2. 添加 ChunkedWriteHandler 用來(lái)處理作為 ChunkedInput 傳進(jìn)的數(shù)據(jù)
  3. 當(dāng)連接建立時(shí),WriteStreamHandler 開(kāi)始寫(xiě)文件的內(nèi)容
  4. 當(dāng)連接建立時(shí),channelActive() 觸發(fā)使用 ChunkedInput 來(lái)寫(xiě)文件的內(nèi)容 (插圖顯示了 FileInputStream;也可以使用任何 InputStream )

ChunkedInput 所有被要求使用自己的 ChunkedInput 實(shí)現(xiàn),是安裝ChunkedWriteHandler 在管道中

在本節(jié)中,我們討論

  • 如何采用zero-copy(零拷貝)功能高效地傳輸文件
  • 如何使用 ChunkedWriteHandler 編寫(xiě)大型數(shù)據(jù)而避免 OutOfMemoryErrors 錯(cuò)誤。

在下一節(jié)中我們將研究幾種不同方法來(lái)序列化 POJO。


以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)