Netty 4.x 用POJO代替ByteBuf

2018-10-26 09:51 更新

用POJO代替ByteBuf

我們回顧了迄今為止的所有例子使用 ByteBuf 作為協(xié)議消息的主要數(shù)據(jù)結(jié)構(gòu)。在本節(jié)中,我們將改善的 TIME 協(xié)議客戶端和服務(wù)器例子,使用 POJO 代替 ByteBuf。

ChannelHandler 使用 POIO 的好處很明顯:通過從ChannelHandler 中提取出 ByteBuf 的代碼,將會使 ChannelHandler的實現(xiàn)變得更加可維護和可重用。在 TIME 客戶端和服務(wù)器的例子中,我們讀取的僅僅是一個32位的整形數(shù)據(jù),直接使用 ByteBuf 不會是一個主要的問題。然而,你會發(fā)現(xiàn)當(dāng)你需要實現(xiàn)一個真實的協(xié)議,分離代碼變得非常的必要。

首先,讓我們定義一個新的類型叫做 UnixTime。

    public class UnixTime {

        private final long value;

        public UnixTime() {
            this(System.currentTimeMillis() / 1000L + 2208988800L);
        }

        public UnixTime(long value) {
            this.value = value;
        }

        public long value() {
            return value;
        }

        @Override
        public String toString() {
            return new Date((value() - 2208988800L) * 1000L).toString();
        }
    }

現(xiàn)在我們可以修改下 TimeDecoder 類,返回一個 UnixTime,以替代ByteBuf

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        if (in.readableBytes() < 4) {
            return;
        }

        out.add(new UnixTime(in.readUnsignedInt()));
    }

下面是修改后的解碼器,TimeClientHandler 不再任何的 ByteBuf 代碼了。

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        UnixTime m = (UnixTime) msg;
        System.out.println(m);
        ctx.close();
    }

是不是變得更加簡單和優(yōu)雅了?相同的技術(shù)可以被運用到服務(wù)端。讓我們修改一下 TimeServerHandler 的代碼。

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ChannelFuture f = ctx.writeAndFlush(new UnixTime());
        f.addListener(ChannelFutureListener.CLOSE);
    }

現(xiàn)在,唯一缺少的功能是一個編碼器,是ChannelOutboundHandler的實現(xiàn),用來將 UnixTime 對象重新轉(zhuǎn)化為一個 ByteBuf。這是比編寫一個解碼器簡單得多,因為沒有需要處理的數(shù)據(jù)包編碼消息時拆分和組裝。

    public class TimeEncoder extends ChannelOutboundHandlerAdapter {
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
            UnixTime m = (UnixTime) msg;
            ByteBuf encoded = ctx.alloc().buffer(4);
            encoded.writeInt((int)m.value());
            ctx.write(encoded, promise); // (1)
        }
    }

1.在這幾行代碼里還有幾個重要的事情。第一,通過 ChannelPromise,當(dāng)編碼后的數(shù)據(jù)被寫到了通道上 Netty 可以通過這個對象標記是成功還是失敗。第二, 我們不需要調(diào)用 cxt.flush()。因為處理器已經(jīng)單獨分離出了一個方法 void flush(ChannelHandlerContext cxt),如果像自己實現(xiàn) flush() 方法內(nèi)容可以自行覆蓋這個方法。

進一步簡化操作,你可以使用 MessageToByteEncode:

    public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
        @Override
        protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
            out.writeInt((int)msg.value());
        }
    }

最后的任務(wù)就是在 TimeServerHandler 之前把 TimeEncoder 插入到ChannelPipeline。 但這是不那么重要的工作。

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號