kafka 訊息格式設計實現
阿新 • • 發佈:2018-11-12
目前kafka訊息格式有三個版本(假定v0,v1,v2),0.10.0之前使用的是v0版本,之後慢慢演變出v1,v2,後兩個版本在設計方式上沒有什麼特別大的區別,只是做了些空間上的優化,同樣的訊息,新版本的使用儲存空間會更小,優化主要在於訊息頭部的壓縮,當然還有些功能上的優化,例如添加了時間戳,相對偏移量等;這些不是今天討論的重點,今天主要來介紹下v0版本的訊息格式設計,首先來看下kafka訊息的具體構成
由上圖可知kafka訊息格式,主要
CRC : 校驗碼 Magic :版本號:0,1,2,對應目前的三個版本 Attribute : 屬性欄位,目前使用後3位儲存壓縮型別 Key len :key長度 Key :key值 Value len:value長度 Value :具體訊息內容
除了key和value以外的,我們成為訊息頭部,訊息頭部長度:
8+1+1+4+4=18 byte
有些文章指出crc效驗碼為4位元組,用的是無符號long型別,但這裡我們直接使用有符號數,所以多了4個位元組
先來定義一個Message.java物件
Message.java
package com.fan.reactor.protocol; import java.io.Serializable; public class Message implements Serializable { private long crc;//8,CRC32.getValue() private byte magic;//1,預設0 private byte attribute;//1,後3位用來儲存壓縮型別 private int keylen ;//4 private String key; private int vallen;//4 private String value; public Message(long crc, byte magic, byte attribute, int keylen, String key, int vallen, String value) { this.crc = crc; this.magic = magic; this.attribute = attribute; this.keylen = keylen; this.key = key; this.vallen = vallen; this.value = value; } @Override public String toString() { return "Message{" + "crc=" + crc + ", magic=" + magic + ", attribute=" + attribute + ", keylen=" + keylen + ", key='" + key + '\'' + ", vallen=" + vallen + ", value='" + value + '\'' + '}'; } }
常量類
Contants.java
package com.fan.reactor.protocol; public class Contants { //壓縮型別 public static final class Codec{ public final static byte NONE = (byte)0; public final static byte GZiP = (byte)1; public final static byte Snappy = (byte)2; public final static byte LZ4 = (byte)3; } //版本號 public static final byte MAGIC = (byte)0; public static final int MESSAGE_HEAD_LENGTH = 18;//預設訊息頭長度 }
測試序列化反序列化方法
package com.fan.reactor.protocol;
import java.nio.ByteBuffer;
import java.util.zip.CRC32;
public class Main {
public static void main(String[] args) {
String key = "first";
String message = "this is the first message";
//訊息轉換成bytebuffer
ByteBuffer byteBuffer = message2byte(key,message);
//bytebuffer 轉換成物件
Message messageObj = byte2message(byteBuffer);
System.out.println(messageObj);
}
public static ByteBuffer message2byte(String key,String message){
//根據key和value的長度開闢ByteBuffer空間,避免空間浪費和不足
ByteBuffer messageBuff = ByteBuffer.allocate(Contants.MESSAGE_HEAD_LENGTH+key.length()+message.length());
//對value進行校驗,生成long型別的CRC校驗碼
CRC32 crc32 = new CRC32();
byte[] data = message.getBytes();
crc32.update(data);
// 按順序填充訊息
messageBuff.putLong(crc32.getValue());
messageBuff.put(Contants.MAGIC);
messageBuff.put(Contants.Codec.GZiP);
messageBuff.putInt(key.length());
messageBuff.put(key.getBytes());
messageBuff.putInt(message.length());
messageBuff.put(message.getBytes());
//返回bytebuff,用於訊息傳輸
return messageBuff;
}
//將接收到的訊息反序列化為具體物件
public static Message byte2message(ByteBuffer messageBuff){
System.out.println("messageBuff current postition is :"+messageBuff.position());
//這裡特別要注意獲取訊息的postition
int postition = 0;
System.out.println("reset messageBuff's posti+ tion");
messageBuff.position(postition);
long crc = messageBuff.getLong(postition);
System.out.println("crc32:"+crc);//0
postition +=8;
byte magic = messageBuff.get(postition);
System.out.println("MAGIC:"+magic);//8
postition ++;
byte codec = messageBuff.get(postition);
System.out.println("Codec:"+codec);//9
postition ++;
int keylen = messageBuff.getInt(postition);//10
System.out.println("Key length:"+keylen);
byte[] keycontent = new byte[keylen];
postition += 4;
//重點注意
messageBuff.position(postition);
messageBuff.get(keycontent);
System.out.println("keycontent:"+new String(keycontent));
postition += keylen;
int messagelen = messageBuff.getInt(postition);
System.out.println("message length is :"+messagelen);
postition += 4;
//重點注意
messageBuff.position(postition);
byte[] messagescontent = new byte[messagelen];
messageBuff.get(messagescontent);
System.out.println("message is :"+new String(messagescontent));
postition+=messagelen;
System.out.println("next postition is :"+postition);
return new Message(crc,magic,codec,keylen,new String(keycontent),messagelen,new String (messagescontent));
}
}
輸出如下
messageBuff current postition is :48
reset messageBuff's postition
crc32:4252115574
MAGIC:0
Codec:1
Key length:5
keycontent:first
message length is :25
message is :this is the first message
next postition is :48
Message{crc=4252115574, magic=0, attribute=1, keylen=5, key='first', vallen=25, value='this is the first message'}
我們可以看到這條帶key的訊息總長度為48個位元組,如果我們採用java預設的序列化方式,光物件頭部就需要使用16位元組,加上欄位開銷,一條空訊息可能差不多就要:16(物件頭部)+18(訊息頭部)+4(key 引用)+4 (value 引用)+k(若干物件對齊空間)>42個位元組;可想而知,一個好的訊息設計是多麼的重要,還有一點就是,java的序列化方式,其他語言是很難或者無法解析