《Kafka官方文件》實現
1. API Design
Producer APIs
Producer API封裝了底層兩個Producer:
- kafka.producer.SyncProducer
- kafka.producer.async.AsyncProducer
class Producer { /* Sends the data, partitioned by key to the topic using either the */ /* synchronous or the asynchronous producer */ public void send(kafka.javaapi.producer.ProducerData<K,V> producerData); /* Sends a list of data, partitioned by key to the topic using either */ /* the synchronous or the asynchronous producer */ public void send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData); /* Closes the producer and cleans up */ public void close(); }
這麼做的目的是通過一個簡答的API暴露把所有Producer的功能暴露給Client。Kafka的Producer可以:
- 排隊/快取多個傳送請求並且非同步的批量分發出去:
kafka.producer.Producer
提供批量化多個傳送請求(producer.type=async),之後進行序列化併發送的分割槽的能力。批大小可以通過一些配置引數進行設定。將事件加入到queue中,他們會被緩衝在queue中,直到滿足queue.time
或batch.size
達到配置的值。後臺執行緒(kafka.producer.async.ProducerSendThread)從queue中獲取資料並使用kafka.producer.EventHandler對資料進行序列化併發送到合適的分割槽。可以通過event.handler
kafka.producer.async.CallbackHandler
介面並設定callback.handler
引數來實現。
- 使用使用者指定的
Encoder
來序列化資料:
interface Encoder<T> {
public Message toMessage(T data);
}
預設使用kafka.serializer.DefaultEncoder
。
- 通過使用者可選的
Partitioner
Partition的路由由kafka.producer.Partitioner
決定。
interface Partitioner<T> {
int partition(T key, int numPartitions);
}
分割槽選擇API使用key和分割槽總數來選擇最終的partition(返回選擇的partition id)。id用於從排序的partition列表中選擇最終的一個分割槽去傳送資料。預設的分割槽策略是hash(key)%numPartitions
。如果key是null,會隨機選擇一個分割槽。可以通過partitioner.class
引數來配置特定的分割槽選擇策略。
Consumer APIs
我們有兩個級別的Consumer API。低級別的“簡單的”API和單個Broker之間保持連結並且和傳送到服務端的網路請求有緊密的對應關係。這個API是無狀態的,每個請求都包含offset資訊,允許使用者維護這個元資料。
高級別的API在Consumer端隱藏了Broker的細節,並且允許從叢集消費資料而不關心底層的拓撲結構。同樣維持了“哪些資料已經被消費過”的狀態。高級別的API還提供了通過表示式訂閱的Topic的功能(例如通過白名單或者黑名單的方式訂閱)。
Low-level API
class SimpleConsumer {
/* Send fetch request to a broker and get back a set of messages. */
public ByteBufferMessageSet fetch(FetchRequest request);
/* Send a list of fetch requests to a broker and get back a response set. */
public MultiFetchResponse multifetch(List<FetchRequest> fetches);
/**
* Get a list of valid offsets (up to maxSize) before the given time.
* The result is a list of offsets, in descending order.
* @param time: time in millisecs,
* if set to OffsetRequest$.MODULE$.LATEST_TIME(), get from the latest offset available.
* if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest offset available.
*/
public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
}
低級別的API用於實現高級別的API,也被直接使用在一些在狀態上有特殊需求的“離線”Consumer。
High-level API
/* create a connection to the cluster */
ConsumerConnector connector = Consumer.create(consumerConfig);
interface ConsumerConnector {
/**
* This method is used to get a list of KafkaStreams, which are iterators over
* MessageAndMetadata objects from which you can obtain messages and their
* associated metadata (currently only topic).
* Input: a map of <topic, #streams>
* Output: a map of <topic, list of message streams>
*/
public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);
/**
* You can also obtain a list of KafkaStreams, that iterate over messages
* from topics that match a TopicFilter. (A TopicFilter encapsulates a
* whitelist or a blacklist which is a standard Java regex.)
*/
public List<KafkaStream> createMessageStreamsByFilter(
TopicFilter topicFilter, int numStreams);
/* Commit the offsets of all messages consumed so far. */
public commitOffsets()
/* Shut down the connector */
public shutdown()
}
這個API圍繞迭代器,通過KafkaStream類實現。一個KafkaStream表示了一個或多個分割槽(可以分佈在不同的Broker上)組成的訊息流。每個Stream被單個執行緒處理,客戶端可以在建立流時提供需要的個數。這樣,一個流背後可以是多個分割槽,但是一個分割槽只會屬於一個流。
createMessageStreams呼叫會吧Consumer註冊到Topic,促使Consumer/Broker的重新分配。API鼓勵在單次呼叫中建立多個Stream以減少充分配的次數。createMessageStreamsByFilter方法的呼叫(另外的)用於註冊watcher去發現匹配過濾規則的topic。createMessageStreamsByFilter返回的迭代器可以迭代來此多個Topic的訊息(如果多個Topic都符合過濾規則)。
2. Network Layer
網路層是一個直接的NIO Server,不會詳細的描述。sendfile是通過給MessageSet增加writeTo方法實現的。這允許使用檔案儲存的訊息用更高效的transferTo實現代替讀取資料到程序內的處理。執行緒模型是簡單的單acceptor多processor,每個執行緒處理固定數量的連線。這種設計已經被很好的測試並且是易於實現的。協議非常簡單,以便在將來實現其他語言的客戶端。
3. Messages
訊息包含一個固定大小的頭,一個變長且“不透明”的byte陣列表示的key和一個變長且“不透明”的byte陣列表示的內容。header包含以下內容:
- A CRC32 checksum to detect corruption or truncation.
- A format version.
- An attributes identifier
- A timestamp
保持key和value的不透明是正確的決定:現在序列化方面取得了巨大的進展,選擇特定的一個序列化方式都不太適合所有的場景。不用說使用Kafka的特定程式可能需要特定的序列化方式。MessageSet介面是批量讀寫訊息到NIO Channel的迭代器。
4. Message Format
/**
* 1. 4 byte CRC32 of the message
* 2. 1 byte "magic" identifier to allow format changes, value is 0 or 1
* 3. 1 byte "attributes" identifier to allow annotations on the message independent of the version
* bit 0 ~ 2 : Compression codec.
* 0 : no compression
* 1 : gzip
* 2 : snappy
* 3 : lz4
* bit 3 : Timestamp type
* 0 : create time
* 1 : log append time
* bit 4 ~ 7 : reserved
* 4. (Optional) 8 byte timestamp only if "magic" identifier is greater than 0
* 5. 4 byte key length, containing length K
* 6. K byte key
* 7. 4 byte payload length, containing length V
* 8. V byte payload
*/
5. Log
包含兩個partition,名稱為“my_topic”的Topic的日誌包含兩個目錄(名稱為my_topic_0和my_topic_1),其中包含該Topic的訊息的資料檔案。日誌檔案的格式是log entry的序列;每個log entry都是4位元組的訊息長度N加上後面N個位元組的訊息資料。每條訊息都有一個64位的offset標識這條訊息在這個Topic的Partition中的偏移量。訊息在磁碟中的儲存格式如下所示。每個日誌檔案都以它儲存的第一條訊息的offset命名。所以第一個檔案會命名為00000000000.kafka,隨後每個檔案的檔名將是前一個檔案的檔名加上S的正數,S是配置中指定的單個檔案的大小。
訊息的確切的二進位制格式都有版本,它保持一個標準的介面,讓訊息集可以根據需要在Producer、Broker、Consumer之間傳輸而不需要重新拷貝或者轉換,其格式如下:
On-disk format of a message
offset : 8 bytes
message length : 4 bytes (value: 4 + 1 + 1 + 8(if magic value > 0) + 4 + K + 4 + V)
crc : 4 bytes
magic value : 1 byte
attributes : 1 byte
timestamp : 8 bytes (Only exists when magic value is greater than zero)
key length : 4 bytes
key : K bytes
value length : 4 bytes
value : V bytes
使用訊息的Offset作為訊息ID是不常見的。我們初始的想法是在Producer生成一個GUID作為Message ID,並在Broker上維持ID和Offset之間的對映關係。但是因為Consumer需要為每個Server維持一個ID,那麼GUID的全域性唯一性就變得沒什麼意義了。此外,維持一個隨機的ID和Offset的對映關係將給索引的構建帶來巨大的負擔,本質上需要一個完整的持久化的隨機存取的資料結構。因此,為了簡化查詢結構,我們決定使用每個分割槽的原子計數器,它可以和分割槽ID加上ServerID來唯一標識一條訊息。一旦使用了計數器,直接使用Offset進行跳轉是順其自然的,兩者都是分割槽內單調遞增的整數。由於偏移量從消費者API中隱藏起來,因此這個決定是最終的實現細節,所以我們採用更有效的方法。
Writes
日誌允許連續追加到最後一個檔案。當檔案達到配置的大小時(如1GB)將滾動到一個新檔案。日誌採用兩個配置:M,配置達到多少條訊息後進行刷盤;S,配置多長時間之後進行刷盤。這個持久化策略保證最多隻丟失M條訊息或者S秒之內的訊息。
Reads
讀取通過提供64位的offset和S-byte的chunk大小來實現。這將返回包含在S-byte的buffer的訊息跌代替。S比任意單條訊息都大,但是如果在異常的超大訊息的情況下,讀取操作可以通過多次重試,每次都將buffer大小翻倍,直到訊息被讀取成功。最大訊息大小和buffer大小可以配置,用於拒絕超過特定大小的訊息,以限制客戶端讀取訊息時需要拓展的buffer大小。buffer可能以不完整的訊息作為結尾,這可以通過訊息大小來輕鬆的檢測到。
實際的讀取操作首先需要定位offset所在的檔案,再將offset轉化為檔案內相對的偏移量,然後從檔案的這個偏移量開始讀取資料。搜尋操作通過記憶體中對映的檔案的簡單的二分查詢完成。
日誌提供了獲取最近寫入訊息的能力以允許客戶端從“當前時間”開始訂閱。這在客戶端無法在指定天數內消費掉訊息的場景中非常有用。在這種情況下,如果客戶端嘗試消費一個不存在的offset將丟擲OutOfRangeException異常並且可以根據場景重置或者失敗。
這面是傳送到Consumer的資料的格式:
MessageSetSend (fetch result)
total length : 4 bytes
error code : 2 bytes
message 1 : x bytes
...
message n : x bytes
MultiMessageSetSend (multiFetch result)
total length : 4 bytes
error code : 2 bytes
messageSetSend 1
...
messageSetSend n
Deletes
資料刪除一次刪除一個日誌段。日誌管理器允許通過外掛的形式實現刪除策略來選擇那些檔案是合適刪除的。當前的刪除策略是日誌檔案的修改時間已經過去N天,保留最近N GB資料的策略也是有用的。為了避免刪除時鎖定讀取操作,我們採用copy-on-write的方式來實現,以保證一致性的檢視。
Guarantees
日誌提供了配置引數M,用於控制寫入多少條訊息之後進行一次刷盤。在日誌恢復過程中遍歷最新的日誌段的所有訊息並驗證每一條訊息是有效的。如果訊息的大小和偏移量之和小於檔案長度並且訊息的CRC32和儲存的CRC相同,那麼訊息是有效的。在異常事件被檢測到時,日誌會被擷取到最後一條有效的訊息的offset。
兩種錯誤需要處理:因為Crash導致的written塊丟失和無意義的block被新增到檔案中。這樣做的原因是,一般的作業系統不保證file inode和實際資料塊之間的寫入順序,所以除了丟失丟失written data,檔案還會獲得無意義的資料,在inode更新大小但是在block寫入資料之前。CRC檢測這個錯誤並防止損壞日誌(unwritten的訊息肯定會丟失)。
6. Distribution
Consumer Offset Tracking
high-level的Consume保持它自己消費過的每個分割槽的最大的offset並且週期性的提交,所以可以在重啟的時候恢復offset資訊。Kafka提供在offset manager中儲存所有offset的選項。任何Consumer例項都需要傳送offset到offset manager。high-level的Consumer自動化的處理offset。如果使用simple consumer,需要自己手動管理offset。在Java simple consumer中現在還不支援,Java simple consumer只能從ZooKeeper提交和獲取offset。如果使用Scala simple consumer,你會找到offset manager並且可以明確指定向offset manager提交和獲取offset。Consumer通過向Broker傳送GroupCoordinatorRequest請求並獲取包含offset manager的GroupCoordinatorResponse來獲取offset。之後Consumer可以向offset manager提交和獲取offset。如果offset manager移動,Consumer需要重新發現。如果你期望手動管理offset,可以檢視這些解釋如果提交OffsetCommitRequest和OffsetFetchRequest的程式碼。
當offset manager收到OffsetCommitRequest,將其新增到一個特殊的、壓縮的,成為_consumeroffsets的Kafka topic中。offset manager返回一個成功的offset commit的響應給Consumer,當所有的備份都收到offset之後。如果在配置的timeout時間內所有副本沒有完成備份,commit offset認為是失敗的,將在之後重試(high-consumer將自動執行)。broker週期性的壓縮offset資訊,因為它只需要儲存每個Partition最近的offset資訊即可。為了更快的響應獲取offset的請求,offset manager也會在記憶體快取offset資料。
當offset manager收到fetch offset的請求,它從cache中返回最近commit的offset。如果offset manager是剛啟動或者剛成為一些group的offset manager(通過成為一下offset topic的leader partition),它需要載入offset資訊。這種情況下,fetch offset request或返回OffsetsLoadInProgress異常,Consumer需要之後重試(high-level consumer會自動處理)。
Migrating offsets from ZooKeeper to Kafka
Kafka較早的版本將offset資訊儲存在ZooKeeper中。可以通過以下步驟將這些資料遷移到Kafka中:
- 在Consumer配置中設定offsets.storage=kafka,dual.commit.enabled=true
- 驗證Consumer正常
- 設定dual.commit.enabled=false
- 驗證Consumer正常
回滾(從Kafka到ZooKeeper)也可以通過以上的步驟執行,只需要將offsets.storage設定為zookeeper。
ZooKeeper Directories
以下說明ZooKeeper用於統籌Consumer和Broker的結構和演算法。
Notation
路徑中標誌位[xyz]表示xyz是不固定的,如/topics/[topic]表示/topics下每個topic都有一個對應的目錄。[0…5]表示0,1,2,3,4,5的序列。->符號用於指示一個節點的值,如/hello -> world表示/hello儲存的值是“world”。
Broker Node Registry
/brokers/ids/[0...N] --> {"jmx_port":...,"timestamp":...,"endpoints":[...],"host":...,"version":...,"port":...} (ephemeral node)
這是一個所有存在的Broker的節點列表,每個都提供一個唯一標識用於Consumer識別(必須作為配置的一部分)。在啟動時,Broker通過在/brokers/ids中建立一個znode來註冊自己。使用邏輯上的Broker ID的目的是可以在不影響Consumer的前提下將Broker移動到另外的物理機上。如果嘗試註冊一個已經存在的ID的Broker會失敗。
一旦Broker通過臨時節點註冊到ZK,註冊資訊是動態的並且在Broker宕機或者關閉後會丟失(那麼通知消費者Broker不再可用)。
Broker Topic Registry
/brokers/topics/[topic]/partitions/[0...N]/state --> {"controller_epoch":...,"leader":...,"version":...,"leader_epoch":...,"isr":[...]} (ephemeral node)
每個Broker將自己註冊到它包含的Topic下面,並儲存Topic的partition數量。
Consumers and Consumer Groups
Consumer同樣將自己註冊到ZK中,為了協調其他的Consumer並且做消費資料的負載均衡。Consumer還可以通過offsets.storage=zookeeper將offset資訊也儲存在ZK中。但是這個儲存一直在未來的版本中將被廢棄。因此建議將offset資訊遷移到Kafka中。
多個Consumer可以組成一個叢集共同消費一個Topic。一個Group中的每個Consumer例項共享一個group_id。
一個Group中的Consumer儘可能公平的分配partition,每個partition只能被一個group中的一個Consumer消費。
Consumer Id Registry
除了一個group內的所有Consumer例項共享一個groupid,每個Consumer例項還擁有一個唯一的consumerid(hostname:uuid)用於區分不同的例項。Consumer的id註冊在以下的目錄中。
/consumers/[group_id]/ids/[consumer_id] --> {"version":...,"subscription":{...:...},"pattern":...,"timestamp":...} (ephemeral node)
每個Consumer將自己註冊到Group目錄下並建立一個包含id的znode。節點的值包含<topic, #stream=””>的Map。id用於標識group中哪些Consumer是活躍的。節點是臨時節點,所以在Consumer程序關閉之後節點會丟失。
Consumer Offsets
Consumer記錄每個分割槽消費過的最大的offset資訊。如果配置了offsets.storage=zookeeper,這個資料會被記錄到ZK中。
/consumers/[group_id]/offsets/[topic]/[partition_id] --> offset_counter_value (persistent node)
Partition Owner registry
每個Partition都被一個group內的一個Consumer消費。這個Consumer必須在消費這個Partition之前建立對這個Partition的所有權。為了建立所有權,Consumer需要將id寫入到Partition下面。
/consumers/[group_id]/owners/[topic]/[partition_id] --> consumer_node_id (ephemeral node)
Cluster Id
Cluster id是唯一且不可變的,用於表示Cluster。Cluster id最長可以擁有22個字元,由[a-zA-Z0-9_-]+組成。從概念上講,它在Cluster第一次啟動的時候生成。
實現層面上,它在Broker(0.10.1或更新的版本)第一次成功啟動後產生。Broker在啟動時嘗試從/cluster/id節點獲取cluster id。如果不存在,Broker穿件一個新的cluster id寫入到這個節點中。
Broker node registration
Broker節點基本上是相互獨立的,所以他們只是釋出他們自己擁有的資訊。當一個Broker加入時,它將自己註冊到broker node,並寫入自己的資訊(host name和port)。Broker還將自己的Topic和Partition註冊到對應的目錄中。新Topic會被動態的註冊,當他們在Broker上建立的時候。
Consumer registration algorithm
當啟動一個Consumer時,它按照如下步驟操作:
- 在group下注冊自己的consumer id
- 註冊監聽器用於監聽新Consumer的加入和Consumer的關閉,Consumer變更都會觸發Partition的分配(負載均衡)。
- 註冊監聽器用於監聽Broker的加入和關閉,Broker變更都會觸發Partition的分配(負載均衡)。
- 如果Consumer通過filter建立了一個訊息流,同樣會註冊一個監聽器用於監聽新topic的加入。
- 強制自己在消費group內重新平衡。
Consumer rebalancing algorithm
Consumer的負載均衡演算法允許一個group內的所有Consumer對哪個Consumer消費哪些Partition達成一個共識。Consumer的負載均衡被Broker和同一個Group中的其他Consumer的新增和移除觸發。對於給定的topic和group,partitions被平均的分配個consumers。這個設計是為了簡化實現。如果我們允許一個分割槽同時被多個Consumer消費,那麼在這個分割槽上會有衝突,需要一些鎖去保證。如果consumer的數量超過了分割槽數,部分consumer會拿不到任何資料。在充分配演算法中,我們分配分割槽時儘量使consumer需要和最少的Broker通訊。
每個Consumer按照如下步驟進行重分配:
1. 對於Ci(Ci表示Consumer Instance)訂閱的Topic T
2. PT表示Topic T的所有分割槽T
3. CG表示group內所有的Consumer
4. 對PT進行排序 (那麼相同Broker上的分割槽會被集中到一起)
5. 對GC排序
6. i表示Ci在CG中的位置,N = size(PT)/size(CG)
7. 分配 i*N to (i+1)*N - 1 給Consumer Ci
8. 從分割槽所有者登錄檔中刪除Ci擁有的這些條目
9. 將分割槽新增到所有者的分割槽登錄檔中
(我們可能需要重試直到分割槽原來的擁有者釋放分割槽所有權)
當分割槽重分配在一個Consumer觸發時,重分配應該在同一時間在相同group內的其他Consumer上也觸發。