使用netty徒手擼一個簡單的kafkaClient
前兩天博文我們介紹瞭如何使用netty徒手擼一個kafka的客戶端. 所謂的kafka客戶端就是kafka的producer和consumer了.
吐槽一下kafka的api設計
大家都知道,kafka的客戶端是重構過一版的. 之前0.8的producer和consumer是使用scala開發的,後來因為各種原因,實在是改不動了. 到了0.9版本的時候,使用java重構了kafka的客戶端.
雖然現在java版的客戶端還在廣泛使用,而且沒有什麼太大的效能問題. 但是根據我這些天對kafka客戶端的api的研究,我總覺得,總有一天,kafka的客戶端還得來一次徹底的重構.因為什麼呢? 因為實在是太--亂--啦:
1. 多版本問題.
每個api都有好幾個版本,但是每個api使用的版本都不一致.
舉個例子,在kafka-client 1.0.0中,broker的版本是2.3.0時:
METADATA(拉取topic元資料)的api有1個version,當前使用版本是1.
PRODUCE(生產訊息)的api有6個version,當前使用版本是6
FETCH(拉取訊息)的api有5個version,當前使用版本是5
2. 報文的資料結構巨複雜
等下實現生產訊息的報文的時候,你們會看到,這個報文嵌套了6層,即有6個子結構體.
複製程式碼
以上我對kafka api的小小吐槽. 當然也可能是我水平不夠,未能理解到它這麼設計的用意和深意~~
kafka的訊息格式
kafka一直在不斷地優化自身,因此它的訊息格式也是一直在變.
在<<Apache kafka實戰 (胡夕著)>> 一書中(基於kafka 1.0.0),作者介紹了到目前為止,一共有3種訊息格式V0,V1,V2. 其中V0和V1由於各種弊端,早就逐漸的被淘汰了. 現在新版kafka使用的都是V2版本的訊息格式. 本文就是在kafka2.3.0上實現的,使用V2格式的訊息能測試通過.
因此這裡介紹的訊息格式都是V2版本的.
在開始介紹kafka的訊息格式之前,大家還要理解一個概念: 可變長度.
常規的長度欄位要麼就是使用4個位元組,要麼就是使用8個位元組來表示,總之這個欄位使用的位元組數一般都是固定的.
但是在kafka的v2版本的訊息裡就不一樣了.
它參考了Zig-zag的編碼方式,可以使用不同長度的欄位來表示不同的數值.
簡單來說就是這樣:
用 0 來表示 0
用 1 來表示 1
用 2 來表示 -1
用 3 來表示 2
用 4 來表示 -2
.....
這樣的好處就是可以用比較少的位元組數來表示絕對值比較小的數字,不用每個數字都佔用4個或8個位元組,從而可以節省很大的空間
複製程式碼
瞭解了"可變長度"這個概念後就可以來看kafka的v2版本的訊息格式了. 如下圖(截圖自<<Apache kafka實戰 (胡夕著)>> 一書):
我們來一個一個瞭解這些欄位 1. 訊息總長度. 顧名思義,就是這條訊息的總長度啦. 用的是Zig-zag編碼表示
2. 屬性. 一個位元組表示(8位),其中第三位用來表示壓縮方式.高5位保留,沒有用到
由於我這裡的實現沒有用到壓縮,所以這個欄位總是0
3. 時間戳增量.也是用Zig-zag編碼. 所謂增量,是指標對該訊息batch的第一條訊息的時間戳的增量.
訊息batch接下來會介紹.
4. 位移增量. 跟時間戳增量含義差不多
5. key length. 每條kafka訊息都可以有key,這個就表示key的位元組數
6. key. 這個欄位就是kafka訊息裡面的key.
7. value size. 更key length含義差不多
8 value. 就是kafka訊息的內容
9. header個數. kafka訊息也可以帶有header
10. header. kafka的header
複製程式碼
看到第3和第4個欄位是不是有點一臉懵?沒關係,繼續往下看你就明白了.
kafka傳送訊息的時候並不是有一條傳送一條的,而是把多條訊息集中在一起,然後再一併傳送的. 這就是所謂的kafka 訊息batch.
而且這訊息batch傳送到kafka的broker之後,它也同樣不會拆開,而是原封不動地把這個訊息batch發給消費者,或儲存到日誌檔案中.
因此理解這個訊息batch對我們實現傳送訊息和消費訊息都是必要的.
訊息batch的格式如下圖所示:
是不是一下子有點奔潰,一下子冒出了這麼多的欄位. 沒得辦法,我們再來一個個地看.
首先最後的"訊息"就是上面介紹的v2版本格式的訊息,可能會有x個,x就是倒數第二個欄位"訊息個數".
剩下的欄位:
1. 起始位移
最後面的"訊息"中第一條訊息的位移offset
2. 長度
表示接下來的報文的長度,即"訊息batch的總長度" - 8Byte(起始位移欄位) - 4Byte(長度欄位)
3. 分割槽leader版本號
我這裡的實現寫死為-1
4. 版本號
就是magic. 我們這裡是V2,所以是2
5. CRC
是指接下來的所有欄位的CRC碼
6. 屬性
跟上面訊息中的屬性的含義一致
7. 最大位移增量
就是最後一條訊息的"位置增量"的值
8. 起始時間戳
就是第一條訊息的時間戳
9. 最大時間戳
最後一條訊息的時間戳
10. 後面三個pid epoch,seq三個欄位都是跟事務等相關的,我們這裡沒有用到,所以都寫死為-1
複製程式碼
這裡的"訊息"和"訊息batch"我在程式碼中定義的bean分別是KafkaMsgRecordV2和KafkaMsgRecordBatch. 如果看上面的文字和圖片確實不好理解的話,可以跟著程式碼看,或者可以理解得更加深刻. 程式碼請見文末的github地址.
當然如果你理解了這一段,那很好.不過也別開心太早.因為上面說了,kafak傳送訊息的資料結構嵌套了6層,而這裡才兩層. 也就是還有4層等著我們去理解. 當然,那4層相對是比較簡單的. 最難理解的部分已經過去了
requestHeader和responseHeader
kafka每個api的請求都必須帶有一個請求的header,而每個api的響應體中也都帶有一個響應的header.requestHeader和responseHeader分別如圖所示:
響應的header比較簡單,就是一個correlationId,這個id其實是客戶端傳送給服務端,服務端原封不動的返回了. 作用跟zookeeper的xid一樣.
我們來看看requestHeader
apikey 和 apiVersion
public enum ApiKeys {
/**
* 傳送訊息
*/
PRODUCE(0,"Produce",(short) 5),/**
* fetch 訊息
*/
FETCH(1,"Fetch",(short)6),/**
* 拉取元資料
*/
METADATA(3,"Metadata",(short) 1);
public final short id;
public final String name;
public short apiVersion;
ApiKeys(int id,String name,short apiVersion) {
this.id = (short) id;
this.name = name;
this.apiVersion = apiVersion;
}
}
複製程式碼
程式碼中的id欄位就是apiKey,apiVersion對應的就是header中的apiVersion. 正如我們開頭吐槽的一樣,每個api的版本都是不一樣的. 在這次實現裡,我只實現了3個api. 但實際上kafka提供十幾個api.
correlationId
關聯性Id和zkClient中的xid作用是一樣的,主要是把請求和響應對應起來. kafka的響應報文中會包含這個欄位.
clientIdLen和clientId
不管是kafka生產者還是消費者,都需要指定一個clientId. 在官方的客戶端中,如果我們不指定的話,也會自動生成一個clientId.
最後值得一提的是,這裡的clientIdLen是用兩個位元組表示的. kafka裡面都是用2個位元組表示字串長度的. 這個跟zookeeper裡面是不一樣的.
生產者
生產者的邏輯實現在KafkaClient的send方法:
public ProduceResponse send(KafkaProduceConfig config,String topic,String key,String val)
複製程式碼
正如上面一直提到的,生產者的請求報文一共嵌套了6層,具體表現為:
1. ProduceRequest繼承KafkaRequestHeader,持有TopicProduceData物件
2. TopicProduceData 持有PartitionData物件
3. PartitionData持有Record物件
4. Record持有KafkaMsgRecordBatch物件
5. KafkaMsgRecordBatch持有KafkaMsgRecordV2物件
複製程式碼
可以看到,其實是以"broker資訊" => "topic資訊" =>"分割槽資訊" => "記錄資訊" => "訊息batch" => "訊息"等層次逐漸包裝的.
報文的的欄位和圖示這裡就不再給出了,有興趣的同學可以跟一下程式碼,直接從序列化入手,就可以理解kafka生產者的通訊協議了,大體邏輯如下所示:
- ProduceRequest.serializable()
- KafkaRequestHeader.serializable()
- TopicProduceData.serializable()
- PartitionData.serializable()
- Record.serializable()
- KafkaMsgRecordBatch.serializable()
- KafkaMsgRecordV2.serializable()
複製程式碼
經過上面的一系列serializable,最終把一個ProduceRequest物件轉換成一個ByteBuf,發往kafka的broker,一條訊息就成功的產生了.
消費者
生產者的邏輯實現在KafkaClient的poll方法:
public Map<Integer,List<ConsumerRecord>> poll(KafkaConsumerConfig consumerConfig,int partition,long fetchOffset)
複製程式碼
相對於生產者來說,消費者的請求報文相對簡單,也是一個從"broker配置"=>"topic資訊" => "分割槽資訊"的包裝過程
如下所示:
1. FetchRequest 繼承KafkaRequestHeader,持有FetchTopicRequest物件
2. FetchTopicRequest持有FetchTopicPartitionRequest物件
複製程式碼
然而,消費者的響應體就相對比生產者的響應體要複雜的多了.
因為上面說過,生產者傳送broker的"訊息batch",broker是不會把它解析成具體的訊息的. 而且原封不動地把它儲存到日誌中去,從而也是原封不動的被消費者消費到. 因此這個解析訊息的工作自然而然地就落到了消費者的肩上.
具體請參見KafkaClient#parseResp()方法
程式碼執行
和之前的ZkClient和RedisClient一樣,這裡也同樣實現了一個kafkaClientTest,方便體驗和除錯.
這次針對了幾種場景進行測試:
- 在kafkaClientTest中生產訊息,利用kafka自帶的kafka-console-consumer.sh 進行消費
生產訊息:
private final static String host = "localhost";
private final static int port = 9092;
private final static String topic = "testTopic1";
@Test
public void testProducer(){
KafkaClient kafkaClient = new KafkaClient("producer-111",host,port);
KafkaProduceConfig kafkaConfig = new KafkaProduceConfig();
// 注意這裡設定為0時,broker不會響應任何資料,但是訊息實際上是傳送到broker了的
short ack = -1;
kafkaConfig.setAck(ack);
kafkaConfig.setTimeout(30000);
ProduceResponse response = kafkaClient.send(kafkaConfig,topic,"testKey","helloWorld1113");
assert ack == 0 || response != null;
System.out.println(new Gson().toJson(response));
}
複製程式碼
可以在控制檯看到訊息被消費了:
lhhMacBook-Air:bin$ sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic1
helloWorld1113
複製程式碼
- 在kafkaClientTest中生產訊息(場景1的訊息),在kafkaClientTest消費訊息:
private final static String host = "localhost";
private final static int port = 9092;
private final static String topic = "testTopic1";
@Test
public void testConsumer(){
// 如果broker上不存在這個topic的話,直接消費可能會報錯,可以fetch一下metadata,或先生產訊息
// testMetaData();
// testProducer();
KafkaClient kafkaClient = new KafkaClient("consumer-111",port);
KafkaConsumerConfig consumerConfig = new KafkaConsumerConfig();
consumerConfig.setMaxBytes(Integer.MAX_VALUE);
consumerConfig.setMaxWaitTime(30000);
consumerConfig.setMinBytes(1);
Map<Integer,List<ConsumerRecord>> response = kafkaClient.poll(consumerConfig,0L);
assert response != null && response.size() > 0;
Set<Map.Entry<Integer,List<ConsumerRecord>>> entrySet =response.entrySet();
for(Map.Entry<Integer,List<ConsumerRecord>> entry : entrySet){
Integer partition = entry.getKey();
System.out.println("partition" + partition + "的資料:");
for(ConsumerRecord consumerRecord : entry.getValue()){
System.out.println(new Gson().toJson(consumerRecord));
}
}
}
複製程式碼
控制檯打印出剛剛生產的訊息(包含了之前測試的訊息),說明消費成功:
partition0的資料:
{"offset":0,"timeStamp":1573896186007,"key":"testKey","val":"helloWorld"}
{"offset":1,"timeStamp":1573896202787,"val":"helloWorld"}
{"offset":2,"timeStamp":1573896309808,"val":"helloWorld111"}
{"offset":3,"timeStamp":1573899639313,"val":"helloWorld1113"}
{"offset":4,"timeStamp":1574011584095,"val":"helloWorld1113"}
複製程式碼
- 利用kafka-console-producer.sh生產訊息,在kafkaClientTest消費訊息:
生產訊息:
lhhMacBook-Air:bin$ sh kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic222
>hi
>h
複製程式碼
消費訊息輸出,說明消費成功
partition0的資料:
{"offset":0,"timeStamp":1574012251856,"val":"hi"}
{"offset":1,"timeStamp":1574012270368,"val":"h"}
複製程式碼
原始碼
最後附上github原始碼地址:
感興趣的同學可以參考一下,共同學習進步.