1. 程式人生 > >kafka 訊息格式設計實現

kafka 訊息格式設計實現

目前kafka訊息格式有三個版本(假定v0,v1,v2),0.10.0之前使用的是v0版本,之後慢慢演變出v1,v2,後兩個版本在設計方式上沒有什麼特別大的區別,只是做了些空間上的優化,同樣的訊息,新版本的使用儲存空間會更小,優化主要在於訊息頭部的壓縮,當然還有些功能上的優化,例如添加了時間戳,相對偏移量等;這些不是今天討論的重點,今天主要來介紹下v0版本的訊息格式設計,首先來看下kafka訊息的具體構成
在這裡插入圖片描述
由上圖可知kafka訊息格式,主要

CRC : 校驗碼
Magic :版本號:0,1,2,對應目前的三個版本
Attribute : 屬性欄位,目前使用後3位儲存壓縮型別
Key len :key長度
Key :key值
Value len:value長度
Value :具體訊息內容

除了key和value以外的,我們成為訊息頭部,訊息頭部長度:

8+1+1+4+4=18 byte

有些文章指出crc效驗碼為4位元組,用的是無符號long型別,但這裡我們直接使用有符號數,所以多了4個位元組
先來定義一個Message.java物件
Message.java

package com.fan.reactor.protocol;

import java.io.Serializable;

public class Message implements Serializable {

    private long crc;//8,CRC32.getValue()
    private byte magic;//1,預設0
    private byte attribute;//1,後3位用來儲存壓縮型別
    private int keylen ;//4
    private String key;
    private int vallen;//4
    private String value;

    public Message(long crc, byte magic, byte attribute, int keylen, String key, int vallen, String value) {
        this.crc = crc;
        this.magic = magic;
        this.attribute = attribute;
        this.keylen = keylen;
        this.key = key;
        this.vallen = vallen;
        this.value = value;
    }

    @Override
    public String toString() {
        return "Message{" +
                "crc=" + crc +
                ", magic=" + magic +
                ", attribute=" + attribute +
                ", keylen=" + keylen +
                ", key='" + key + '\'' +
                ", vallen=" + vallen +
                ", value='" + value + '\'' +
                '}';
    }
}

常量類
Contants.java

package com.fan.reactor.protocol;

public class Contants {

    //壓縮型別
    public static final class Codec{
        public final static byte NONE = (byte)0;
        public final static byte GZiP = (byte)1;
        public final static byte Snappy = (byte)2;
        public final static byte LZ4 = (byte)3;
    }

    //版本號
    public static final byte MAGIC = (byte)0;
    public static final int MESSAGE_HEAD_LENGTH = 18;//預設訊息頭長度
}

測試序列化反序列化方法

package com.fan.reactor.protocol;

import java.nio.ByteBuffer;
import java.util.zip.CRC32;

public class Main {

    public static void main(String[] args) {
        String key = "first";
        String message = "this is the first message";
        //訊息轉換成bytebuffer
        ByteBuffer byteBuffer = message2byte(key,message);
        
        //bytebuffer 轉換成物件
        Message messageObj = byte2message(byteBuffer);
        System.out.println(messageObj);

    }

    public static ByteBuffer message2byte(String key,String message){
    //根據key和value的長度開闢ByteBuffer空間,避免空間浪費和不足
        ByteBuffer messageBuff = ByteBuffer.allocate(Contants.MESSAGE_HEAD_LENGTH+key.length()+message.length());
    //對value進行校驗,生成long型別的CRC校驗碼
        CRC32 crc32 = new CRC32();
        byte[] data = message.getBytes();
        crc32.update(data);
    // 按順序填充訊息
        messageBuff.putLong(crc32.getValue());
        messageBuff.put(Contants.MAGIC);
        messageBuff.put(Contants.Codec.GZiP);
        messageBuff.putInt(key.length());
        messageBuff.put(key.getBytes());
        messageBuff.putInt(message.length());
        messageBuff.put(message.getBytes());
    //返回bytebuff,用於訊息傳輸
        return messageBuff;
    }

//將接收到的訊息反序列化為具體物件
    public static Message byte2message(ByteBuffer messageBuff){
        System.out.println("messageBuff current postition is :"+messageBuff.position());
        //這裡特別要注意獲取訊息的postition
        int postition = 0;
        System.out.println("reset messageBuff's  posti+    tion");
        messageBuff.position(postition);
        long crc = messageBuff.getLong(postition);
        System.out.println("crc32:"+crc);//0
        postition +=8;
        byte magic = messageBuff.get(postition);
        System.out.println("MAGIC:"+magic);//8
        postition ++;
        byte codec = messageBuff.get(postition);
        System.out.println("Codec:"+codec);//9
        postition ++;
        int keylen = messageBuff.getInt(postition);//10
        System.out.println("Key length:"+keylen);
        byte[] keycontent = new byte[keylen];
        postition += 4;
        //重點注意
        messageBuff.position(postition);
        messageBuff.get(keycontent);
        System.out.println("keycontent:"+new String(keycontent));
        postition += keylen;
        int messagelen = messageBuff.getInt(postition);
        System.out.println("message length is :"+messagelen);
        postition += 4;
        //重點注意
        messageBuff.position(postition);
        byte[] messagescontent = new byte[messagelen];
        messageBuff.get(messagescontent);
        System.out.println("message is :"+new String(messagescontent));
        postition+=messagelen;
        System.out.println("next postition is :"+postition);
        return new Message(crc,magic,codec,keylen,new String(keycontent),messagelen,new String (messagescontent));
    }
}

輸出如下

messageBuff current postition is :48
reset messageBuff's  postition
crc32:4252115574
MAGIC:0
Codec:1
Key length:5
keycontent:first
message length is :25
message is :this is the first message
next postition is :48
Message{crc=4252115574, magic=0, attribute=1, keylen=5, key='first', vallen=25, value='this is the first message'}

我們可以看到這條帶key的訊息總長度為48個位元組,如果我們採用java預設的序列化方式,光物件頭部就需要使用16位元組,加上欄位開銷,一條空訊息可能差不多就要:16(物件頭部)+18(訊息頭部)+4(key 引用)+4 (value 引用)+k(若干物件對齊空間)>42個位元組;可想而知,一個好的訊息設計是多麼的重要,還有一點就是,java的序列化方式,其他語言是很難或者無法解析