1. 程式人生 > 實用技巧 >Kafka學習之核心原理剖析

Kafka學習之核心原理剖析

一、Producer原理分析

  1、Producer執行流程

  整個生產者客戶端由兩個執行緒協調執行,這兩個執行緒分別為主執行緒和 Sender 執行緒 (傳送執行緒)。在主執行緒中由 KafkaProducer 建立訊息,然後通過可能的攔截器、序列化器和分割槽器的作用之後快取到訊息累加器( RecordAccumulator,也稱為訊息收集器〉中。 Sender 執行緒負責從RecordAccumulator 中獲取訊息並將其傳送到 Kafka 中 。

    

  • ProducerRecord: 訊息物件。
  • Interceptor: 攔截器,Kafka一共有兩種攔截器:生產者攔截器和消費者攔截器。生產者攔截器既可以用來在訊息傳送前做一些準備工作,比如按照某個規則過濾不符合要求的訊息、修改訊息的內容等,也可以用來在傳送回撥邏輯前做一些定製化的需求,比如統計類工作。
  • Serializer: 序列化器,把物件轉換成位元組陣列才能通過網路傳送給Kafka。
  • Partitioner: 分割槽器,訊息經過序列化之後就需要確定它發往的分割槽,如果訊息ProducerRecord中指定了partition欄位,那麼就不需要分割槽器的作用,因為partition代表的就是所要發往的分割槽號。如果訊息ProducerRecord中沒有指定partition欄位,那麼就需要依賴分割槽器,根據key這個欄位來計算partition的值。分割槽器的作用就是為訊息分配分割槽。預設分割槽器:DefaultPartitioner,在partition()方法中定義了主要的分割槽分配邏輯。如果key不為null,那麼預設的分割槽器會對key進行雜湊(採用MurmurHash2演算法,具備高運算效能及低碰撞率),最終根據得到的雜湊值取模(所有分割槽)來計算分割槽號,擁有相同key的訊息會被寫入同一個分割槽。如果key為null,那麼訊息將會以輪詢的方式發往主題內的各個可用分割槽。
  • RecordAccumulator: 主要用來快取訊息以便Sender執行緒可以批量傳送,進而減少網路傳輸的資源消耗以提升效能。RecordAccumulator快取的大小可以通過生產者客戶端引數buffer.memory配置,預設值為33554432B,即32M。如果生產者傳送訊息的速度超過傳送到伺服器的速度,則會導致生產者空間不足,這個時候KafkaProducer的send()方法呼叫要麼被阻塞,要麼丟擲異常,這個取決於引數max.block.ms的配置,此引數的預設值為60000,即60秒。主執行緒中傳送過來的訊息都會被迫加到RecordAccumulator的某個雙端佇列(Deque)中,在RecordAccumulator的內部為每個分割槽都維護了一個雙端佇列,佇列中的內容就是ProducerBatch,即Deque<ProducerBatch>。訊息寫入快取時,追加到雙端佇列的尾部:Sender讀取訊息時,從雙端佇列的頭部讀取。
  • InFlightRequests: 請求在從Sender執行緒發往Kafka之前還會儲存到InFlightRequests中,InFlightRequests儲存物件的具體形式為Map<Nodeld,Deque>,它的主要作用是快取了已經發出去但還沒有收到響應的請求(Nodeld表示broker節點的id編號)。InFlightRequests還提供了許多管理類的方法,並且通過配置引數還可以限制每個連線(也就是客戶端與Node之間的連線)最多快取的請求數。這個配置引數為max.in.flight.requests.per.connection,預設值為5,即每個連線最多隻能快取5個未響應的請求,超過該數值之後就不能再向這個連線傳送更多的請求了,除非有快取的請求收到了響應。InFlightRequests還可以獲得leastLoadedNode,即所有Node中負載最小的那一個。選擇leastLoadedNode傳送請求可以使它能夠儘快發出,避免因網路擁塞等異常而影響整體的進度。比如元資料請求,當需要更新元資料時,會先挑選出leastLoadedNode,然後向這個Node傳送MetadataRequest請求來獲取具體的元資料資訊。

          

  2、ACK機制

  為保證生產者傳送的資料,能可靠的傳送到指定的topic,topic的每個partition收到生產者傳送的資料後,都需要向生產者傳送ack(確認收到),如果生產者收到ack,就會進行下一輪的傳送,否則重新發送資料。ACK引數可設定的值為01all

  • 0:代表producer往叢集傳送資料不需要等到叢集的返回,不確保訊息傳送成功。安全性最低但是效率最高。
  • 1:代表producer往叢集傳送資料只要leader應答就可以傳送下一條,只確保leader傳送成功。
  • all:代表producer往叢集傳送資料需要所有的follower都完成從leader的同步才會傳送下一條,確保leader傳送成功和所有的副本都完成備份。安全性最高,但是效率最低。

  當ACK=all時,Leader和follower(ISR)落盤才會返回ack,會有資料重複現象,如果在leader已經寫完成,且follower同步完成,但是在返回ack的出現故障,則會出現資料重複現象;極限情況下,這個也會有資料丟失的情況,比如follower和leader通訊都很慢,所以ISR中只有一個leader節點,這個時候,leader完成落盤,就會返回ack,如果此時leader故障後,就會導致丟失資料。

二、Consumer原理分析

  消費者( Consumer )負責訂閱 Kafka 中的主題( Topic ),並且從訂閱的主題上拉取訊息。 在 Kafka 的消費理念中還有一層消費組( Consumer Group)的概念,每個消費者都有一個對應的消費組。當訊息釋出到主題後,只會被投遞給訂閱它的每個消費組中的一個消費者 。 每個消費者只能消費所分配到的分割槽中的訊息。換言之,每一個分割槽只能被一個消費組中的一個消費者所消費 。

    

  1、分割槽分配策略

  Kafka提供了消費者客戶端引數partition.assignment.strategy來設定消費者與訂閱主題之間的分割槽分配策略。預設情況下,採用 Range Assignor 分配策略。 Kafka 還提供了另外兩種分配策略: RoundRobinAssignor 和 StickyAssignor 。

  • RangeAssignor: RangeAssignor 分配策略的原理是按照消費者總數和分割槽總數進行整除運算來獲得一個跨度,然後將分割槽按照跨度進行平均分配,以保證分割槽儘可能均勻地分配給所有的消費者 。
  • RoundRobinAssignor: RoundRobinAssignor 分配策略的原理是將消費組內所有消費者及消費者訂閱的所有主題的分割槽按照字典序排序,然後通過輪詢方式逐個將分割槽依次分配給每個消費者。
  • StickyAssignor: Kafka從 0.11.x 版本開始引入這種分配策略,它主要有兩個目的 : 分割槽的分配要儘可能均勻;分割槽的分配儘可能與上次分配的保持相同。

  2、訊息消費

  Kafka中的消費是基於拉模式的,拉模式是消費者主動向服務端發起請求來拉取訊息。 Kafka 中的訊息消費是一個不斷輪詢的過程,消費者所要做的就是重複地呼叫 poll()方法 ,而 poll()方法返回的是所訂閱的主題(分割槽)上的一組訊息。

/**
* @see KafkaConsumer#poll(long)
*/
public ConsumerRecords<K, V> poll(long timeout);

  timeout 的設定取決於應用程式對響應速度的要求,比如需要在多長時間內將控制權移交給執行輪詢的應用執行緒。可以直接將 timeout 設定為 0 , 這樣 poll()方法會立刻返回,而不管是否己經拉取到了訊息。如果應用執行緒唯一的工作就是從 Kafka 中拉取並消費訊息,則可以將這個引數設定為最大值 Long.MAX_VALUE 。

  3、位移提交

  對於 Kafka 中的分割槽而言,它的每條訊息都有唯一的 offset,用來表示訊息在分割槽中對應的位置 。 對於消費者而言 ,它也有一個 offset 的概念,消費者使用 offset 來表示消費到分割槽中某個訊息所在的位 置。

  在每次呼叫 poll()方法時,它返回的是還沒有被消費過的訊息集, 要做到這一點,就需要記錄上一次消費時的消費位移 。 消費位移儲存在 Kafka 內 部的主題 consumer offsets 中 。 這裡把將消費位移儲存起來(持久化)的動作稱為提交 ,消費者在消費完訊息之後需要執行消費位移的提交。

    

  在Kafka中預設的消費位移的提交方式是自動提交,這個由消費者客戶端引數enable.auto.commit配置,預設值為true。當然這個預設的自動提交不是每消費一條訊息就提交一次,而是定期提交,這個定期的週期時間由戶端引數auto.commit.interval.ms配置,預設值為5秒,此引數生效的前提是enable.auto.commit引數為true。

  每個consumer會定期將自己消費分割槽的offset提交給kafka內部topic: consumer_offsets,提交過去的時候,key是consumerGroupId+topic+分割槽號,value就是當前offset的值,kafka會定期清理topic裡的訊息,最後就保留最新的那條資料。因為consumer_offsets可能會接收高併發的請求,kafka預設給其分配50個分割槽(可以通過offsets.topic.num.partitions設定),這樣可以通過加機器的方式擴大併發。

  4、Rebalance機制

  rebalance是指分割槽的所屬權從一個消費者轉移到另一消費者的行為,它為消費組具備高可用性和伸縮性提供保障。rebalance發生期間,消費組內的消費者是無法讀取訊息的。當一個分割槽被重新分配給另一個消費者時,消費者當前的狀態也會丟失。

  如下情況可能會觸發消費者rebalance

  • consumer所在伺服器重啟或宕機了;
  • 動態給topic增加了分割槽;
  • 消費組訂閱了更多的topic。
public interface ConsumerRebalanceListener{
  //rebalance開始之前和消費者停止讀取訊息之後被呼叫
  // 可以通過這個回撥方法
處理消費位移的提交,以此來避免一些不必要的重複消費現象的發生。   // 引數 partitions表示rebalance前所分配到的分割槽。   void onPartitionsRevoked(Collection<TopicPartition> partitions);   //重新分配分割槽之後和消費者開始讀取消費之前被呼叫,引數partitions表示再均衡後所分配到的分割槽。   void onPartitionsAssigned(Collection<TopicPartition> partitions); }

  5、消費者協調器和組協調器

  每個消費組的子集在服務端對應一個GroupCoordinator對其進行管理,GroupCoordinator是Kafka服務端中用於管理消費組的元件。而消費者客戶端中的ConsumerCoordinator元件負責與GroupCoordinator進行互動。ConsumerCoordinator與GroupCoordinator之間最重要的職責就是負責執行消費者再均衡的操作,包括前面提及的分割槽分配的工作也是在rebalance期間完成的。一共有如下幾種情形會觸發再均衡的操作:

  • 有新的消費者加入消費組。
  • 有消費者宕機下線。消費者並不一定需要真正下線,例如遇到長時間的GC、網路延遲導致消費者長時間未向GroupCoordinator傳送心跳等情況時,GroupCoordinator會認為消費者己經下線。
  • 有消費者主動退出消費組(傳送LeaveGroupRequest請求)。比如客戶端呼叫了unsubscrible()方法取消對某些主題的訂閱。
  • 消費組所對應的GroupCoorinator節點發生了變更。
  • 消費組內所訂閱的任一主題或者主題的分割槽數量發生變化。

  當有消費者加入消費組時,消費者、消費組及組協調器之間會經歷一下幾個階段。整個rebalance過程如下:

  第一階段:選擇組協調器(FINDCOORDINATOR)

  組協調器GroupCoordinator: 每個consumergroup都會選擇一個broker作為自己的組協調器coordinator,負責監控這個消費組裡的所有消費者的心跳,以及判斷是否宕機,然後開啟消費者rebalance。consumergroup中的每個consumer啟動時會向kafka叢集中的某個節點(負載最小的節點:InFlightRequests有記錄)傳送FindCoordinatorRequest請求來查詢對應的組協調器GroupCoordinator,並跟其建立網路連線。

  組協調器選擇方式: 通過如下公式可以選出consumer消費的offset要提交到consumer_offsets的哪個分割槽,這個分割槽leader對應的broker就是這個consumergroup的coordinator公式: hash(consumergroupid)%consumer_offsets 主題的分割槽數

  第二階段:加入消費組(JOINGROUP)

  在成功找到消費組所對應的GroupCoordinator之後就進入加入消費組的階段,在此階段的消費者會向GroupCoordinator傳送JoinGroupRequest請求,並處理響應。然後GroupCoordinator從一個consumergroup中選擇第一個加入group的consumer作為leader(消費組協調器),把consumergroup情況傳送給這個leader,接著這個leader會負責制定分割槽方案。

  選舉分割槽分配策略,根據各個消費者呈報的分配策略選舉,過程如下:

  • 收集各個消費者支援的所有分配策略,組成候選集candidates。
  • 每個消費者從候選集candidates中找出第一個自身支援的策略,為這個策略投上一票。
  • 計算候選集中各個策略的選票數,選票數最多的策略即為當前消費組的分配策略。

  每個消費者都向GroupCoordinator傳送JoinGroupRequest請求,其中攜帶了各自提案的分配策略和訂閱資訊。JoinGroupResonse回執中包含GroupCoordinator中投票選舉出的分配策略的資訊,並且只有leader消費者的回執中包含每個消費者的訂閱資訊。

  第三階段:同步消費組(SYNCGROUP)

  consumerleader通過給GroupCoordinator傳送SyncGroupRequest,接著GroupCoordinator就把分割槽方案下發給各個consumer,他們會根據指定分割槽的leaderbroker進行網路連線以及訊息消費。

  6、多執行緒

  KafkaConsumer是非執行緒安全的,KafkaConsumer中定義了一個acquire()方法,用來檢測當前是否只有一個執行緒在操作,若有其他執行緒正在操作則會丟擲ConcurrentModifcationException異常。

三、Broker原理分析

  1、分割槽管理

  優先副本的選舉:Kafka叢集的一個broker中最多隻能有它的一個副本。分割槽使用多副本機制來提升可靠性,但只有leader副本對外提供讀寫服務,而follower副本只負責在內部進行訊息的同步。如果一個分割槽的leader副本不可用,那麼就意味著整個分割槽變得不可用,此時就需要Kafka從剩餘的follower副本中挑選一個新的leader副本來繼續對外提供服務。為了能夠有效地治理負載失衡的情況,Kafka引入了優先副本(preferredreplica)的概念。所謂的優先副本是指在AR集合列表中的第一個副本。優先副本的選舉是指通過一定的方式促使優先副本選舉為leader副本,以此來促進叢集的負載均衡,這一行為也可以稱為“分割槽平衡”。

      

bin/kafka-topics.sh --describe --zookeeper localhost:2181 m --topic my-replicated-topic2  //檢視分割槽資訊

  結果如下所示(包含了部分Broker下線以及恢復的過程):

HoudeMacBook-Pro:kafka_2.12-2.6.0 houjing$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic  //檢視分割槽資訊
Topic: my-replicated-topic    PartitionCount: 3    ReplicationFactor: 3    Configs: 
    Topic: my-replicated-topic    Partition: 0    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: my-replicated-topic    Partition: 1    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: my-replicated-topic    Partition: 2    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
HoudeMacBook-Pro:kafka_2.12-2.6.0 houjing$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic   //kill Broker1後分區資訊
Topic: my-replicated-topic    PartitionCount: 3    ReplicationFactor: 3    Configs: 
    Topic: my-replicated-topic    Partition: 0    Leader: 2    Replicas: 2,0,1    Isr: 2,0
    Topic: my-replicated-topic    Partition: 1    Leader: 0    Replicas: 0,1,2    Isr: 0,2
    Topic: my-replicated-topic    Partition: 2    Leader: 2    Replicas: 1,2,0    Isr: 2,0
HoudeMacBook-Pro:kafka_2.12-2.6.0 houjing$ bin/kafka-server-start.sh -daemon config/server2.properties   //重啟broker1
HoudeMacBook-Pro:kafka_2.12-2.6.0 houjing$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic   //檢視分割槽資訊
Topic: my-replicated-topic    PartitionCount: 3    ReplicationFactor: 3    Configs: 
    Topic: my-replicated-topic    Partition: 0    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: my-replicated-topic    Partition: 1    Leader: 0    Replicas: 0,1,2    Isr: 0,2,1
    Topic: my-replicated-topic    Partition: 2    Leader: 2    Replicas: 1,2,0    Isr: 2,0,1

  在Kafka中可以提供分割槽自動平衡的功能,與此對應的broker端引數是auto.leader.rebalance.enable,此引數的預設值為true,即預設情況下此功能是開啟的。如果開啟分割槽自動平衡的功能,則Kafka的控制器會啟動一個定時任務,這個定時任務會輪詢所有的broker節點,計算每個broker節點的分割槽不平衡率 (broker中的不平衡率=非優先副本的leader個數/分割槽總數)是否超過leader.imbalance.per.broker.percentage引數配置的比值,預設值為10%,如果超過設定的比值則會自動執行優先副本的選舉動作以求分割槽平衡。執行週期由引數leader.imbalance.check.interval.seconds控制,預設值為300秒。Kafka中kafka-perferred-replica-election.sh指令碼提供了對分割槽leader副本進行重新平衡的功能。

  • 分割槽重分配:Kafka提供了kafka-reassign-partitions.sh指令碼來執行分割槽重分配的工作,它可以在叢集擴容、broker節點失效的場景下對分割槽進行遷移。
  • 失效副本:正常情況下,分割槽的所有副本都處於ISR集合,但是難免會有異常發生的情況,從而某些副本被剝離出ISR集合。當follower副本將leader副本LEO(Log End Offset)之前的日誌全部同步時,則認為該fillower副本已經追趕上leader副本,此時更新該副本的lastAoughtUpTimeMs標識,Kafka的副本管理器會啟動一個副本過期檢測的定時任務,而這個定時任務會定時檢查當前時間與副本的lastAoughtUpTimeMs差值是否大於引數replica.time.max.ms指定的值,預設10000。

  2、控制器Controller

  在Kafka叢集中會有一個或者多個broker,其中有一個broker會被選舉為控制器(KafkaController),它負責管理整個叢集中所有分割槽和副本的狀態。

  • 當某個分割槽的leader副本出現故障時,由控制器負責為該分割槽選舉新的leader副本。
  • 當檢測到某個分割槽的ISR集合發生變化時,由控制器負責通知所有broker更新其元資料資訊。
  • 當使用kafka-topics.sh指令碼為某個topic增加分割槽數量時,同樣還是由控制器負責分割槽的重新分配。

  Kafka中的控制器選舉的工作依賴於Zookeeper,成功競選為控制器的broker會在Zookeeper中建立/controller這個臨時(EPHEMERAL)節點,此臨時節點的內容參考如下:

# brokerid表示稱為控制器的broker的id編號,timestamp表示競選為控制器時的時間戳 
{"version":1,"brokerid":0,"timestamp":"1574831950372"}

  在任意時刻,叢集中有且僅有一個控制器。每個broker啟動的時候會去嘗試去讀取/controller節點的brokerid值,如果讀取到brokerid的值不為-1,則表示已經有其它broker節點成功競選為控制器,所以當前broker就會放棄競選:如果Zookeeper中不存在/controller這個節點,或者這個節點中的資料異常,那麼就會嘗試去建/controller這個節點,當前broker去建立節點的時候,也有可能其他broker同時去嘗試建立這個節點,只有建立成功的broker才會成為控制器,而建立失敗的broker則表示競選失敗。每個broker都會在記憶體中儲存當前控制器的brokerid值,這個值可以標識為activeControllerId。

  • Partition副本leader的選舉

  分割槽leader副本的選舉由控制器負責具體實施。當建立分割槽(建立主題或增加分割槽都有建立分割槽的動作)或分割槽上線(比如分割槽中原先的leader副本下線,此時分割槽需要選舉一個新的leader上線來對外提供服務)的時候都需要執行leader的選舉動作。這種策略的基本思路是按照AR集合中副本的順序查詢第一個存活的副本,並且這個副本在JSR集合中。一個分割槽的AR集合在分配的時候就被指定,並且只要不發生重分配的情況,集合內部副本的順序是保持不變的,而分割槽的ISR集合中副本的順序可能會改變。

##在zookeeper中檢視各broker節點資訊
[zk: localhost:2181(CONNECTED) 7] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.1.140.146:9092"]
  ,"jmx_port":-1,"host":"10.1.140.146","timestamp":"1603270074840","port":9092,"version":4} [zk: localhost:2181(CONNECTED) 8] get /brokers/ids/1  ##broker1 被kill後 org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/ids/1 [zk: localhost:2181(CONNECTED) 9] get /brokers/ids/2 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.1.140.146:9094"]
  ,"jmx_port":-1,"host":"10.1.140.146","timestamp":"1603270074178","port":9094,"version":4}
## 在zookeeper中檢視分割槽資訊
[zk: localhost:2181(CONNECTED) 15] get /brokers/topics/my-replicated-topic/partitions/2/state
{"controller_epoch":6,"leader":2,"version":1,"leader_epoch":1,"isr":[2,0,1]}
[zk: localhost:2181(CONNECTED) 16] get /brokers/topics/my-replicated-topic/partitions/1/state
{"controller_epoch":6,"leader":0,"version":1,"leader_epoch":1,"isr":[0,2,1]}
[zk: localhost:2181(CONNECTED) 17] get /brokers/topics/my-replicated-topic/partitions/0/state
{"controller_epoch":6,"leader":2,"version":1,"leader_epoch":1,"isr":[2,0,1]}

  3、LEO與HW高水位

  整個訊息追加的過程可以概括如下:

  • 生產者客戶端傳送訊息至leader副本中。
  • 訊息被迫加到leader副本的本地日誌,並且會更新日誌的偏移量。
  • follower副本向leader副本請求同步資料。
  • leader副本所在的伺服器讀取本地日誌,並更新對應拉取的follower副本的資訊。
  • leader副本所在的伺服器將拉取結果返回給follower副本。
  • follower副本收到leader副本返回的拉取結果,將訊息追加到本地日誌中,並更新日誌的偏移量資訊。

  在Kafka中,高水位(HighWatermark)的作用主要有2個:

  • 定義訊息可見性,即用來標識分割槽下的哪些訊息是可以被消費者消費的。
  • 幫助Kafka完成副本同步。

      

  在分割槽高水位以下的訊息被認為是已提交訊息,反之就是未提交訊息。消費者只能消費已提交訊息,即圖中位移小於15的所有訊息(不考慮事務)。位移值等於高水位的訊息也屬於未提交訊息。

  日誌末端位移(LEO:Log End Offset)表示副本寫入下一條訊息的位移值。同一個副本物件,其高水位值不會大於LEO值。高水位和LEO是副本物件的兩個重要屬性。Kafka所有副本都有對應的高水位和LEO值,而不僅僅是Leader本。只不過Leader副本比較特殊,Kafka使用Leader副本的高水位來定義所在分割槽的高水位。換句話說,分割槽的高水位就是其Leader副本的高水位。

  高水位更新機制

  在Leader副本所在的Broker上,還儲存了其他Follower副本的LEO值。

      

  Broker0上儲存了某分割槽的Leader副本和所有Follower副本的LEO值,而Broker1上僅僅儲存了該分割槽的某個Follower副本。Kafka把Broker0上儲存的這些Follower副本又稱為遠端副本(RemoteReplica)。Kafka副本機制在執行過程中,會更新Broker1上Follower副本的高水位和LEO值,同時也會更新Broker0上Leader副本的高水位和LEO以及所有遠端副本的LEO,但它不會更新遠端副本的高水位值,也就是圖中標記為灰色的部分。

  Broker0上儲存這些遠端副本主要作用是: 幫助Leader副本確定其高水位,也就是分割槽高水位。

  與Leader副本保持同步的判斷的條件有兩個:

  • 該遠端Follower副本在ISR中。
  • 該遠端Follower副本LEO值落後於Leader副本LEO值的時間,不超過Broker端引數replica.lag.time.max.ms的值。如果使用預設值的話,就是不超過10秒。

  取一個partition對應的ISR中最小的LEO(log-end-offset)作為HW,consumer最多隻能消費到HW所在的位置。對於leader新寫入的訊息,consumer不能立刻消費,leader會等待該訊息被所有ISR中的replicas同步後更HW,此時訊息才能被consumer消費。這樣就保證瞭如果leader所在的broker失效,該訊息仍然可以從新選舉的leader中獲取。對於來自內部broker的讀取請求,沒有HW的限制。

  副本同步機制

  當producer生產訊息至broker後,ISR以及HW和LEO的流轉過程:

      

  Kafka的複製機制既不是完全的同步複製,也不是單純的非同步複製。事實上,同步複製要求所有能工作的follower都複製完,這條訊息才會被commit,這種複製方式極大的影響了吞吐率。而非同步複製方式下,follower非同步的從leader複製資料,資料只要被leader寫入log就被認為已經commit,這種情況下如果follower都還沒有複製完,落後於leader時,突然leader宕機,則會丟失資料。而Kafka的這種使用ISR的方式則很好的均衡了確保資料不丟失以及吞吐率。

  結合HW和LEO看下acks=1的情況:

      

四、日誌儲存

  Kafka一個分割槽的訊息資料對應儲存在一個資料夾下,以topic名稱+分割槽號命名,kafka規定了一個分割槽內的.log檔案最大為1G。

      

# 部分訊息的offset索引檔案,kafka每次往分割槽發4K(可配置)訊息就會記錄一條當前訊息的offset到 index檔案,
# 如果要定位訊息的offset會先在這個檔案裡快速定位,再去log檔案裡找具體訊息 
00000000000000000000.index
# 訊息儲存檔案,主要存offset和訊息體
# LogSegment 的基準偏移量為 0,對應的日誌檔案為 00000000000000000000.log 
00000000000000000000.log # 訊息的傳送時間索引檔案,kafka每次往分割槽發4K(可配置)訊息就會記錄一條當前訊息的傳送時間戳與對 應的offset到timeindex檔案, # 如果需要按照時間來定位訊息的offset,會先在這個檔案裡查詢 00000000000000000000.timeindex 000000000000009900000.index 000000000000009900000.log 000000000000009900000.timeindex

  KafkaBroker有一個引數,log.segment.bytes,限定了每個日誌段檔案的大小,最大就是1GB。一個日誌段檔案滿了,就自動開一個新的日誌段檔案來寫入,避免單個檔案過大,影響檔案的讀寫效能,這個過程叫做logrolling,正在被寫入的那個日誌段檔案,叫做activelogsegment。

  1、日誌索引

  每個日誌分段檔案對應了兩個索引檔案,主要用來提高查詢訊息的效率。偏移量索引檔案用來建立訊息偏移量offset到實體地址之間的對映關係,方便快速定位訊息所在的物理檔案位置;時間戳索引檔案則根據指定的時間戳timestamp來查詢對應的偏移量資訊。Kafka中的索引檔案以稀疏索引(sparseindex)的方式構造訊息的索引,它並不保證每個訊息在索引檔案中都有對應的索引頁。每當寫入一定量(由broker端引數log.index.interval.bytes指定,預設值為4096,即4KB的訊息時,偏移量索引檔案和時間戳索引檔案分別增加一個偏移量索引項和時間戳索引項,增大或減小log.index.interval.bytes的值,對應地可以增加或縮小索引項的密度。

  稀疏索引通過MappedByteBuffer將索引檔案對映到記憶體中,以加快索引的查詢速度。偏移量索引檔案中的偏移量是單調遞增的,查詢指定偏移量時,使用二分查詢法來快速定位偏移量的位置,如果指定的偏移量不在索引檔案中,則會返回小於指定偏移量的最大偏移量。時間戳索引檔案中的時間戳也保持嚴格的單調遞增,查詢指定時間戳時,也根據二分查詢法來查詢不大於該時間戳的最大偏移量.

  • 偏移量索引:每個索引項佔用8個位元組,分為兩個部分。
    • relativeOffset: 相對偏移量,表示訊息相對於baseOffset的偏移量,佔用4個位元組,當前索引檔案的檔名即為baseOffset的值。
    • position: 實體地址,也就是訊息在日誌分段檔案中對應的物理位置,佔用4個位元組。
  • 時間戳索引:每個索引項佔用12個位元組,分為兩個部分。
    • timestamp:當前日誌分段最大的時間戳,佔用8個位元組。
    • relativeOffset:時間戳所對應的訊息的相對偏移量,佔用4個位元組。

  如果broker端引數log.message.timestamp.type設定為LogAppendTime,那麼訊息的時間戳必定能夠保持單調遞增;相反,如果是CreateTime型別則無法保證。

  2、零拷貝

  Kafka使用零拷貝技術來提升效能。所謂的零拷貝是指將資料直接從磁碟檔案複製到網絡卡裝置中,而不需要經由應用程式之手。零拷貝大大提高了應用程式的效能,減少了核心和使用者模式之間的上下文切換。對Linux作業系統而言,零拷貝技術依賴於底層的sendfile()方法實現。對應於Java語言,FileChannal.transferTo()方法的底層實現就是sendfile()方法。