Netty是一個(gè)復(fù)雜和先進(jìn)的框架。如果我們請求了某些有設(shè)置 key 的給定值,那么 Request 類的一個(gè)實(shí)例就被建立來代表這個(gè)請求。但是 Request 對象轉(zhuǎn)成 Memcached 所期望的過程,Netty并不知道。Memcached 所期望的是字節(jié)序列,看,不管使用的協(xié)議是什么,數(shù)據(jù)在網(wǎng)絡(luò)上永遠(yuǎn)是按字節(jié)序列傳輸?shù)摹?/p>
要想將 Request 對象轉(zhuǎn)為 Memcached 所需的字節(jié)序列,Netty 需要使用 MemcachedRequest 來編碼成另外一種格式。可以從對象轉(zhuǎn)為字節(jié),也可以從對象轉(zhuǎn)為對象,又或者從對象轉(zhuǎn)為字符串等。有關(guān)編碼器的內(nèi)容詳見第七章。
MessageToByteEncoder 是 Netty 提供的一個(gè)抽象類。這個(gè)抽象類提供了一個(gè)抽象方法,可以將一條消息(在本例中我們的 MemcachedRequest 對象)轉(zhuǎn)為字節(jié)。你顯示什么信息實(shí)現(xiàn)可以通過使用 Java 泛型處理;例如,MessageToByteEncoder 說這個(gè)編碼器要編碼的對象類型是 MemcachedRequest。
MessageToByteEncoder 和 Java 泛型
使用 MessageToByteEncoder 可以綁定特定的參數(shù)類型。如果你有多個(gè)不同的消息類型,在相同的編碼器里,也可以使用MessageToByteEncoder
這同樣也適用于解碼器,除了解碼器將一系列字節(jié)轉(zhuǎn)換回一個(gè)對象。 這個(gè) Netty 的提供了 ByteToMessageDecoder 類,而不是提供一個(gè)編碼方法用來實(shí)現(xiàn)解碼。在接下來的兩個(gè)部分你看看如何實(shí)現(xiàn)一個(gè) Memcached 解碼器和編碼器。在你做之前,應(yīng)該意識到在使用 Netty 時(shí),你不總是需要自己提供編碼器和解碼器。自所以現(xiàn)在這么做是因?yàn)?Netty 沒有對 Memcached 內(nèi)置支持。而 HTTP 以及其他標(biāo)準(zhǔn)的協(xié)議,Netty 已經(jīng)是提供的了。
編碼器和解碼器
記住,編碼器處理出站,而解碼器處理入站。這基本上意味著編碼器將編碼數(shù)據(jù),寫入遠(yuǎn)端。解碼器將從遠(yuǎn)端讀取處理數(shù)據(jù)。重要的是要記住,出站和入站是兩個(gè)不同的方向。
請注意,為了程序簡單,我們的編碼器和解碼器不檢查任何值的最大大小。在實(shí)際實(shí)現(xiàn)中你需要一些驗(yàn)證檢查,如果檢測到違反協(xié)議,則使用 EncoderException 或 DecoderException(或一個(gè)子類)。
本節(jié)我們將簡要介紹編碼器的實(shí)現(xiàn)。正如我們提到的,編碼器負(fù)責(zé)編碼消息為字節(jié)序列。這些字節(jié)可以通過網(wǎng)絡(luò)發(fā)送到遠(yuǎn)端。為了發(fā)送請求,我們首先創(chuàng)建 MemcachedRequest 類,稍后編碼器實(shí)現(xiàn)會編碼為一系列字節(jié)。下面的清單顯示了我們的 MemcachedRequest 類
Listing 14.1 Implementation of a Memcached request
public class MemcachedRequest { //1
private static final Random rand = new Random();
private final int magic = 0x80;//fixed so hard coded
private final byte opCode; //the operation e.g. set or get
private final String key; //the key to delete, get or set
private final int flags = 0xdeadbeef; //random
private final int expires; //0 = item never expires
private final String body; //if opCode is set, the value
private final int id = rand.nextInt(); //Opaque
private final long cas = 0; //data version check...not used
private final boolean hasExtras; //not all ops have extras
public MemcachedRequest(byte opcode, String key, String value) {
this.opCode = opcode;
this.key = key;
this.body = value == null ? "" : value;
this.expires = 0;
//only set command has extras in our example
hasExtras = opcode == Opcode.SET;
}
public MemcachedRequest(byte opCode, String key) {
this(opCode, key, null);
}
public int magic() { //2
return magic;
}
public int opCode() { //3
return opCode;
}
public String key() { //4
return key;
}
public int flags() { //5
return flags;
}
public int expires() { //6
return expires;
}
public String body() { //7
return body;
}
public int id() { //8
return id;
}
public long cas() { //9
return cas;
}
public boolean hasExtras() { //10
return hasExtras;
}
}
你如果想實(shí)現(xiàn) Memcached 的其余部分協(xié)議,你只需要將 client.op(op 任何新的操作添加)轉(zhuǎn)換為其中一個(gè)方法請求。我們需要兩個(gè)更多的支持類,在下一個(gè)清單所示
Listing 14.2 Possible Memcached operation codes and response statuses
public class Status {
public static final short NO_ERROR = 0x0000;
public static final short KEY_NOT_FOUND = 0x0001;
public static final short KEY_EXISTS = 0x0002;
public static final short VALUE_TOO_LARGE = 0x0003;
public static final short INVALID_ARGUMENTS = 0x0004;
public static final short ITEM_NOT_STORED = 0x0005;
public static final short INC_DEC_NON_NUM_VAL = 0x0006;
}
public class Opcode {
public static final byte GET = 0x00;
public static final byte SET = 0x01;
public static final byte DELETE = 0x04;
}
一個(gè) Opcode 告訴 Memcached 要執(zhí)行哪些操作。每個(gè)操作都由一個(gè)字節(jié)表示。同樣的,當(dāng) Memcached 響應(yīng)一個(gè)請求,響應(yīng)頭中包含兩個(gè)字節(jié)代表響應(yīng)狀態(tài)。狀態(tài)和 Opcode 類表示這些 Memcached 的構(gòu)造。這些操作碼可以使用當(dāng)你構(gòu)建一個(gè)新的 MemcachedRequest 指定哪個(gè)行動(dòng)應(yīng)該由它引發(fā)的。
但現(xiàn)在可以集中精力在編碼器上:
Listing 14.3 MemcachedRequestEncoder implementation
public class MemcachedRequestEncoder extends
MessageToByteEncoder<MemcachedRequest> { //1
@Override
protected void encode(ChannelHandlerContext ctx, MemcachedRequest msg,
ByteBuf out) throws Exception { //2
byte[] key = msg.key().getBytes(CharsetUtil.UTF_8);
byte[] body = msg.body().getBytes(CharsetUtil.UTF_8);
//total size of the body = key size + content size + extras size //3
int bodySize = key.length + body.length + (msg.hasExtras() ? 8 : 0);
//write magic byte //4
out.writeByte(msg.magic());
//write opcode byte //5
out.writeByte(msg.opCode());
//write key length (2 byte) //6
out.writeShort(key.length); //key length is max 2 bytes i.e. a Java short //7
//write extras length (1 byte)
int extraSize = msg.hasExtras() ? 0x08 : 0x0;
out.writeByte(extraSize);
//byte is the data type, not currently implemented in Memcached but required //8
out.writeByte(0);
//next two bytes are reserved, not currently implemented but are required //9
out.writeShort(0);
//write total body length ( 4 bytes - 32 bit int) //10
out.writeInt(bodySize);
//write opaque ( 4 bytes) - a 32 bit int that is returned in the response //11
out.writeInt(msg.id());
//write CAS ( 8 bytes)
out.writeLong(msg.cas()); //24 byte header finishes with the CAS //12
if (msg.hasExtras()) {
//write extras (flags and expiry, 4 bytes each) - 8 bytes total //13
out.writeInt(msg.flags());
out.writeInt(msg.expires());
}
//write key //14
out.writeBytes(key);
//write value //15
out.writeBytes(body);
}
}
總結(jié),編碼器 使用 Netty 的 ByteBuf 處理請求,編碼 MemcachedRequest 成一套正確排序的字節(jié)。詳細(xì)步驟為:
無論你放入什么到輸出緩沖區(qū)( 調(diào)用 ByteBuf) Netty 的將向服務(wù)器發(fā)送被寫入請求。下一節(jié)將展示如何進(jìn)行反向通過解碼器工作。
將 MemcachedRequest 對象轉(zhuǎn)為字節(jié)序列,Memcached 僅需將字節(jié)轉(zhuǎn)到響應(yīng)對象返回即可。
先見一個(gè) POJO:
Listing 14.7 Implementation of a MemcachedResponse
public class MemcachedResponse { //1
private final byte magic;
private final byte opCode;
private byte dataType;
private final short status;
private final int id;
private final long cas;
private final int flags;
private final int expires;
private final String key;
private final String data;
public MemcachedResponse(byte magic, byte opCode,
byte dataType, short status,
int id, long cas,
int flags, int expires, String key, String data) {
this.magic = magic;
this.opCode = opCode;
this.dataType = dataType;
this.status = status;
this.id = id;
this.cas = cas;
this.flags = flags;
this.expires = expires;
this.key = key;
this.data = data;
}
public byte magic() { //2
return magic;
}
public byte opCode() { //3
return opCode;
}
public byte dataType() { //4
return dataType;
}
public short status() { //5
return status;
}
public int id() { //6
return id;
}
public long cas() { //7
return cas;
}
public int flags() { //8
return flags;
}
public int expires() { //9
return expires;
}
public String key() { //10
return key;
}
public String data() { //11
return data;
}
}
下面為 MemcachedResponseDecoder, 使用了 ByteToMessageDecoder 基類,用于將 字節(jié)序列轉(zhuǎn)為 MemcachedResponse
Listing 14.4 MemcachedResponseDecoder class
public class MemcachedResponseDecoder extends ByteToMessageDecoder { //1
private enum State { //2
Header,
Body
}
private State state = State.Header;
private int totalBodySize;
private byte magic;
private byte opCode;
private short keyLength;
private byte extraLength;
private short status;
private int id;
private long cas;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) {
switch (state) { //3
case Header:
if (in.readableBytes() < 24) {
return;//response header is 24 bytes //4
}
magic = in.readByte(); //5
opCode = in.readByte();
keyLength = in.readShort();
extraLength = in.readByte();
in.skipBytes(1);
status = in.readShort();
totalBodySize = in.readInt();
id = in.readInt(); //referred to in the protocol spec as opaque
cas = in.readLong();
state = State.Body;
case Body:
if (in.readableBytes() < totalBodySize) {
return; //until we have the entire payload return //6
}
int flags = 0, expires = 0;
int actualBodySize = totalBodySize;
if (extraLength > 0) { //7
flags = in.readInt();
actualBodySize -= 4;
}
if (extraLength > 4) { //8
expires = in.readInt();
actualBodySize -= 4;
}
String key = "";
if (keyLength > 0) { //9
ByteBuf keyBytes = in.readBytes(keyLength);
key = keyBytes.toString(CharsetUtil.UTF_8);
actualBodySize -= keyLength;
}
ByteBuf body = in.readBytes(actualBodySize); //10
String data = body.toString(CharsetUtil.UTF_8);
out.add(new MemcachedResponse( //1
magic,
opCode,
status,
id,
cas,
flags,
expires,
key,
data
));
state = State.Header;
}
}
}
所以實(shí)現(xiàn)過程中發(fā)生了什么事?我們知道一個(gè) Memcached 響應(yīng)有24位頭;我們不知道是否所有數(shù)據(jù),響應(yīng)將被包含在輸入 ByteBuf ,當(dāng)解碼方法調(diào)用時(shí)。這是因?yàn)榈讓泳W(wǎng)絡(luò)堆??赡軐?shù)據(jù)分解成塊。所以確保我們只解碼當(dāng)我們有足夠的數(shù)據(jù),這段代碼檢查是否可用可讀的字節(jié)的數(shù)量至少是24。一旦我們有24個(gè)字節(jié),我們可以確定整個(gè)消息有多大,因?yàn)檫@個(gè)信息包含在24位頭。
當(dāng)我們解碼整個(gè)消息,我們創(chuàng)建一個(gè) MemcachedResponse 并將其添加到輸出列表。任何對象添加到該列表將被轉(zhuǎn)發(fā)到下一個(gè)ChannelInboundHandler 在 ChannelPipeline,因此允許處理。
更多建議: