Netty 編碼器和解碼器

2018-02-02 20:03 更新

Netty是一個(gè)復(fù)雜和先進(jìn)的框架。如果我們請求了某些有設(shè)置 key 的給定值,那么 Request 類的一個(gè)實(shí)例就被建立來代表這個(gè)請求。但是 Request 對象轉(zhuǎn)成 Memcached 所期望的過程,Netty并不知道。Memcached 所期望的是字節(jié)序列,看,不管使用的協(xié)議是什么,數(shù)據(jù)在網(wǎng)絡(luò)上永遠(yuǎn)是按字節(jié)序列傳輸?shù)摹?/p>

要想將 Request 對象轉(zhuǎn)為 Memcached 所需的字節(jié)序列,Netty 需要使用 MemcachedRequest 來編碼成另外一種格式。可以從對象轉(zhuǎn)為字節(jié),也可以從對象轉(zhuǎn)為對象,又或者從對象轉(zhuǎn)為字符串等。有關(guān)編碼器的內(nèi)容詳見第七章。

MessageToByteEncoder 是 Netty 提供的一個(gè)抽象類。這個(gè)抽象類提供了一個(gè)抽象方法,可以將一條消息(在本例中我們的 MemcachedRequest 對象)轉(zhuǎn)為字節(jié)。你顯示什么信息實(shí)現(xiàn)可以通過使用 Java 泛型處理;例如,MessageToByteEncoder 說這個(gè)編碼器要編碼的對象類型是 MemcachedRequest。

MessageToByteEncoder 和 Java 泛型

使用 MessageToByteEncoder 可以綁定特定的參數(shù)類型。如果你有多個(gè)不同的消息類型,在相同的編碼器里,也可以使用MessageToByteEncoder,注意檢查消息的類型即可

這同樣也適用于解碼器,除了解碼器將一系列字節(jié)轉(zhuǎn)換回一個(gè)對象。 這個(gè) Netty 的提供了 ByteToMessageDecoder 類,而不是提供一個(gè)編碼方法用來實(shí)現(xiàn)解碼。在接下來的兩個(gè)部分你看看如何實(shí)現(xiàn)一個(gè) Memcached 解碼器和編碼器。在你做之前,應(yīng)該意識到在使用 Netty 時(shí),你不總是需要自己提供編碼器和解碼器。自所以現(xiàn)在這么做是因?yàn)?Netty 沒有對 Memcached 內(nèi)置支持。而 HTTP 以及其他標(biāo)準(zhǔn)的協(xié)議,Netty 已經(jīng)是提供的了。

編碼器和解碼器

記住,編碼器處理出站,而解碼器處理入站。這基本上意味著編碼器將編碼數(shù)據(jù),寫入遠(yuǎn)端。解碼器將從遠(yuǎn)端讀取處理數(shù)據(jù)。重要的是要記住,出站和入站是兩個(gè)不同的方向。

請注意,為了程序簡單,我們的編碼器和解碼器不檢查任何值的最大大小。在實(shí)際實(shí)現(xiàn)中你需要一些驗(yàn)證檢查,如果檢測到違反協(xié)議,則使用 EncoderException 或 DecoderException(或一個(gè)子類)。

實(shí)現(xiàn) Memcached 編碼器

本節(jié)我們將簡要介紹編碼器的實(shí)現(xiàn)。正如我們提到的,編碼器負(fù)責(zé)編碼消息為字節(jié)序列。這些字節(jié)可以通過網(wǎng)絡(luò)發(fā)送到遠(yuǎn)端。為了發(fā)送請求,我們首先創(chuàng)建 MemcachedRequest 類,稍后編碼器實(shí)現(xiàn)會編碼為一系列字節(jié)。下面的清單顯示了我們的 MemcachedRequest 類

Listing 14.1 Implementation of a Memcached request

public class MemcachedRequest { //1
    private static final Random rand = new Random();
    private final int magic = 0x80;//fixed so hard coded
    private final byte opCode; //the operation e.g. set or get
    private final String key; //the key to delete, get or set
    private final int flags = 0xdeadbeef; //random
    private final int expires; //0 = item never expires
    private final String body; //if opCode is set, the value
    private final int id = rand.nextInt(); //Opaque
    private final long cas = 0; //data version check...not used
    private final boolean hasExtras; //not all ops have extras

    public MemcachedRequest(byte opcode, String key, String value) {
        this.opCode = opcode;
        this.key = key;
        this.body = value == null ? "" : value;
        this.expires = 0;
        //only set command has extras in our example
        hasExtras = opcode == Opcode.SET;
    }

    public MemcachedRequest(byte opCode, String key) {
        this(opCode, key, null);
    }

    public int magic() { //2
        return magic;
    }

    public int opCode() {  //3
        return opCode;
    }

    public String key() {  //4
        return key;
    }

    public int flags() {  //5
        return flags;
    }

    public int expires() {  //6
        return expires;
    }

    public String body() {  //7
        return body;
    }

    public int id() {  //8
        return id;
    }

    public long cas() {  //9
        return cas;
    }

    public boolean hasExtras() {  //10
        return hasExtras;
    }
}
  1. 這個(gè)類將會發(fā)送請求到 Memcached server
  2. 幻數(shù),它可以用來標(biāo)記文件或者協(xié)議的格式
  3. opCode,反應(yīng)了響應(yīng)的操作已經(jīng)創(chuàng)建了
  4. 執(zhí)行操作的 key
  5. 使用的額外的 flag
  6. 表明到期時(shí)間
  7. body
  8. 請求的 id。這個(gè)id將在響應(yīng)中回顯。
  9. compare-and-check 的值
  10. 如果有額外的使用,將返回 true

你如果想實(shí)現(xiàn) Memcached 的其余部分協(xié)議,你只需要將 client.op(op 任何新的操作添加)轉(zhuǎn)換為其中一個(gè)方法請求。我們需要兩個(gè)更多的支持類,在下一個(gè)清單所示

Listing 14.2 Possible Memcached operation codes and response statuses

public class Status {
    public static final short NO_ERROR = 0x0000;
    public static final short KEY_NOT_FOUND = 0x0001;
    public static final short KEY_EXISTS = 0x0002;
    public static final short VALUE_TOO_LARGE = 0x0003;
    public static final short INVALID_ARGUMENTS = 0x0004;
    public static final short ITEM_NOT_STORED = 0x0005;
    public static final short INC_DEC_NON_NUM_VAL = 0x0006;
}
public class Opcode {
    public static final byte GET = 0x00;
    public static final byte SET = 0x01;
    public static final byte DELETE = 0x04;
}

一個(gè) Opcode 告訴 Memcached 要執(zhí)行哪些操作。每個(gè)操作都由一個(gè)字節(jié)表示。同樣的,當(dāng) Memcached 響應(yīng)一個(gè)請求,響應(yīng)頭中包含兩個(gè)字節(jié)代表響應(yīng)狀態(tài)。狀態(tài)和 Opcode 類表示這些 Memcached 的構(gòu)造。這些操作碼可以使用當(dāng)你構(gòu)建一個(gè)新的 MemcachedRequest 指定哪個(gè)行動(dòng)應(yīng)該由它引發(fā)的。

但現(xiàn)在可以集中精力在編碼器上:

Listing 14.3 MemcachedRequestEncoder implementation

public class MemcachedRequestEncoder extends
        MessageToByteEncoder<MemcachedRequest> { //1
    @Override
    protected void encode(ChannelHandlerContext ctx, MemcachedRequest msg,
                          ByteBuf out) throws Exception {  //2
        byte[] key = msg.key().getBytes(CharsetUtil.UTF_8);
        byte[] body = msg.body().getBytes(CharsetUtil.UTF_8);
        //total size of the body = key size + content size + extras size   //3
        int bodySize = key.length + body.length + (msg.hasExtras() ? 8 : 0);

        //write magic byte  //4
        out.writeByte(msg.magic());
        //write opcode byte  //5
        out.writeByte(msg.opCode());
        //write key length (2 byte) //6
        out.writeShort(key.length); //key length is max 2 bytes i.e. a Java short  //7
        //write extras length (1 byte)
        int extraSize =  msg.hasExtras() ? 0x08 : 0x0;
        out.writeByte(extraSize);
        //byte is the data type, not currently implemented in Memcached but required //8
        out.writeByte(0);
        //next two bytes are reserved, not currently implemented but are required  //9
        out.writeShort(0);

        //write total body length ( 4 bytes - 32 bit int)  //10
        out.writeInt(bodySize);
        //write opaque ( 4 bytes)  -  a 32 bit int that is returned in the response //11
        out.writeInt(msg.id());

        //write CAS ( 8 bytes)
        out.writeLong(msg.cas());   //24 byte header finishes with the CAS  //12

        if (msg.hasExtras()) {
            //write extras (flags and expiry, 4 bytes each) - 8 bytes total  //13
            out.writeInt(msg.flags());
            out.writeInt(msg.expires());
        }
        //write key   //14
        out.writeBytes(key);
        //write value  //15
        out.writeBytes(body);
    }
}
  1. 該類是負(fù)責(zé)編碼 MemachedRequest 為一系列字節(jié)
  2. 轉(zhuǎn)換的 key 和實(shí)際請求的 body 到字節(jié)數(shù)組
  3. 計(jì)算 body 大小
  4. 寫幻數(shù)到 ByteBuf 字節(jié)
  5. 寫 opCode 作為字節(jié)
  6. 寫 key 長度z作為 short
  7. 編寫額外的長度作為字節(jié)
  8. 寫數(shù)據(jù)類型,這總是0,因?yàn)槟壳安皇窃?Memcached,但可用于使用 后來的版本
  9. 為保留字節(jié)寫為 short ,后面的 Memcached 版本可能使用
  10. 寫 body 的大小作為 long
  11. 寫 opaque 作為 int
  12. 寫 cas 作為 long。這個(gè)是頭文件的最后部分,在 body 的開始
  13. 編寫額外的 flag 和到期時(shí)間為 int
  14. 寫 key
  15. 這個(gè)請求完成后 寫 body。

總結(jié),編碼器 使用 Netty 的 ByteBuf 處理請求,編碼 MemcachedRequest 成一套正確排序的字節(jié)。詳細(xì)步驟為:

  • 寫幻數(shù)字節(jié)。
  • 寫 opcode 字節(jié)。
  • 寫 key 長度(2字節(jié))。
  • 寫額外的長度(1字節(jié))。
  • 寫數(shù)據(jù)類型(1字節(jié))。
  • 為保留字節(jié)寫 null 字節(jié)(2字節(jié))。
  • 寫 body 長度(4字節(jié)- 32位整數(shù))。
  • 寫 opaque(4個(gè)字節(jié),一個(gè)32位整數(shù)在響應(yīng)中返回)。
  • 寫 CAS(8個(gè)字節(jié))。
  • 寫 額外的(flag 和 到期,4字節(jié))= 8個(gè)字節(jié)
  • 寫 key
  • 寫 值

無論你放入什么到輸出緩沖區(qū)( 調(diào)用 ByteBuf) Netty 的將向服務(wù)器發(fā)送被寫入請求。下一節(jié)將展示如何進(jìn)行反向通過解碼器工作。

實(shí)現(xiàn) Memcached 解碼器

將 MemcachedRequest 對象轉(zhuǎn)為字節(jié)序列,Memcached 僅需將字節(jié)轉(zhuǎn)到響應(yīng)對象返回即可。

先見一個(gè) POJO:

Listing 14.7 Implementation of a MemcachedResponse

public class MemcachedResponse {  //1
    private final byte magic;
    private final byte opCode;
    private byte dataType;
    private final short status;
    private final int id;
    private final long cas;
    private final int flags;
    private final int expires;
    private final String key;
    private final String data;

    public MemcachedResponse(byte magic, byte opCode,
            byte dataType,                             short status, 
            int id, long cas,
            int flags, int expires, String key, String data) {
        this.magic = magic;
        this.opCode = opCode;
        this.dataType = dataType;
        this.status = status;
        this.id = id;
        this.cas = cas;
        this.flags = flags;
        this.expires = expires;
        this.key = key;
        this.data = data;
    }

    public byte magic() { //2
        return magic;
    }

    public byte opCode() { //3
        return opCode;
    }

    public byte dataType() { //4
        return dataType;
    }

    public short status() {  //5
        return status;
    }

    public int id() {  //6
        return id;
    }

    public long cas() {  //7
        return cas;
    }

    public int flags() {  //8
        return flags;
    }

    public int expires() { //9
        return expires;
    }

    public String key() {  //10
        return key;
    }

    public String data() {  //11
        return data; 
    }
}
  1. 該類,代表從 Memcached 服務(wù)器返回的響應(yīng)
  2. 幻數(shù)
  3. opCode,這反映了創(chuàng)建操作的響應(yīng)
  4. 數(shù)據(jù)類型,這表明這個(gè)是基于二進(jìn)制還是文本
  5. 響應(yīng)的狀態(tài),這表明如果請求是成功的
  6. 惟一的 id
  7. compare-and-set 值
  8. 使用額外的 flag
  9. 表示該值存儲的一個(gè)有效期
  10. 響應(yīng)創(chuàng)建的 key
  11. 實(shí)際數(shù)據(jù)

下面為 MemcachedResponseDecoder, 使用了 ByteToMessageDecoder 基類,用于將 字節(jié)序列轉(zhuǎn)為 MemcachedResponse

Listing 14.4 MemcachedResponseDecoder class

public class MemcachedResponseDecoder extends ByteToMessageDecoder {  //1
    private enum State {  //2
        Header,
        Body
    }

    private State state = State.Header;
    private int totalBodySize;
    private byte magic;
    private byte opCode;
    private short keyLength;
    private byte extraLength;
    private short status;
    private int id;
    private long cas;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in,
                          List<Object> out) { 
        switch (state) { //3
            case Header:
                if (in.readableBytes() < 24) {
                    return;//response header is 24  bytes  //4
                }
                magic = in.readByte();  //5
                opCode = in.readByte();
                keyLength = in.readShort();
                extraLength = in.readByte();
                in.skipBytes(1);
                status = in.readShort();
                totalBodySize = in.readInt();
                id = in.readInt(); //referred to in the protocol spec as opaque
                cas = in.readLong();

                state = State.Body;
            case Body:
                if (in.readableBytes() < totalBodySize) {
                    return; //until we have the entire payload return  //6
                }
                int flags = 0, expires = 0;
                int actualBodySize = totalBodySize;
                if (extraLength > 0) {  //7
                    flags = in.readInt();
                    actualBodySize -= 4;
                }
                if (extraLength > 4) {  //8
                    expires = in.readInt();
                    actualBodySize -= 4;
                }
                String key = "";
                if (keyLength > 0) {  //9
                    ByteBuf keyBytes = in.readBytes(keyLength);
                    key = keyBytes.toString(CharsetUtil.UTF_8);
                    actualBodySize -= keyLength;
                }
                ByteBuf body = in.readBytes(actualBodySize);  //10
                String data = body.toString(CharsetUtil.UTF_8);
                out.add(new MemcachedResponse(  //1
                        magic,
                        opCode,
                        status,
                        id,
                        cas,
                        flags,
                        expires,
                        key,
                        data
                ));

                state = State.Header;
        }

    }
}
  1. 類負(fù)責(zé)創(chuàng)建的 MemcachedResponse 讀取字節(jié)
  2. 代表當(dāng)前解析狀態(tài),這意味著我們需要解析的頭或 body
  3. 根據(jù)解析狀態(tài)切換
  4. 如果不是至少24個(gè)字節(jié)是可讀的,它不可能讀整個(gè)頭部,所以返回這里,等待再通知一次數(shù)據(jù)準(zhǔn)備閱讀
  5. 閱讀所有頭的字段
  6. 檢查是否足夠的數(shù)據(jù)是可讀用來讀取完整的響應(yīng)的 body。長度是從頭讀取
  7. 檢查如果有任何額外的 flag 用于讀,如果是這樣做
  8. 檢查如果響應(yīng)包含一個(gè) expire 字段,有就讀它
  9. 檢查響應(yīng)是否包含一個(gè) key ,有就讀它
  10. 讀實(shí)際的 body 的 payload
  11. 從前面讀取字段和數(shù)據(jù)構(gòu)造一個(gè)新的 MemachedResponse

所以實(shí)現(xiàn)過程中發(fā)生了什么事?我們知道一個(gè) Memcached 響應(yīng)有24位頭;我們不知道是否所有數(shù)據(jù),響應(yīng)將被包含在輸入 ByteBuf ,當(dāng)解碼方法調(diào)用時(shí)。這是因?yàn)榈讓泳W(wǎng)絡(luò)堆??赡軐?shù)據(jù)分解成塊。所以確保我們只解碼當(dāng)我們有足夠的數(shù)據(jù),這段代碼檢查是否可用可讀的字節(jié)的數(shù)量至少是24。一旦我們有24個(gè)字節(jié),我們可以確定整個(gè)消息有多大,因?yàn)檫@個(gè)信息包含在24位頭。

當(dāng)我們解碼整個(gè)消息,我們創(chuàng)建一個(gè) MemcachedResponse 并將其添加到輸出列表。任何對象添加到該列表將被轉(zhuǎn)發(fā)到下一個(gè)ChannelInboundHandler 在 ChannelPipeline,因此允許處理。


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號