淺談分散式訊息技術:Kafka
Kafka的基本介紹
Kafka是最初由Linkedin公司開發,是一個分散式、分割槽的、多副本的、多訂閱者,基於zookeeper協調的分散式日誌系統(也可以當做MQ系統),常見可以用於web/nginx日誌、訪問日誌,訊息服務等等,Linkedin於2010年貢獻給了Apache基金會併成為頂級開源專案。
主要應用場景是:日誌收集系統和訊息系統。
Kafka主要設計目標如下:
- 以時間複雜度為O(1)的方式提供訊息持久化能力,即使對TB級以上資料也能保證常數時間的訪問效能。
- 高吞吐率。即使在非常廉價的商用機器上也能做到單機支援每秒100K條訊息的傳輸。
- 支援Kafka Server間的訊息分割槽,及分散式消費,同時保證每個partition內的訊息順序傳輸。
- 同時支援離線資料處理和實時資料處理。
Kafka的設計原理分析
一個典型的kafka叢集中包含若干producer,若干broker,若干consumer,以及一個Zookeeper叢集。Kafka通過Zookeeper管理叢集配置,選舉leader,以及在consumer group發生變化時進行rebalance。producer使用push模式將訊息釋出到broker,consumer使用pull模式從broker訂閱並消費訊息。
Kafka專用術語:
- Broker:訊息中介軟體處理結點,一個Kafka節點就是一個broker,多個broker可以組成一個Kafka叢集。
- Topic:一類訊息,Kafka叢集能夠同時負責多個topic的分發。
- Partition:topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的佇列。
- Segment:partition物理上由多個segment組成。
- offset:每個partition都由一系列有序的、不可變的訊息組成,這些訊息被連續的追加到partition中。partition中的每個訊息都有一個連續的序列號叫做offset,用於partition唯一標識一條訊息。
- Producer:負責釋出訊息到Kafka broker。
- Consumer:訊息消費者,向Kafka broker讀取訊息的客戶端。
- Consumer Group:每個Consumer屬於一個特定的Consumer Group。
Kafka資料傳輸的事務特點
at most once:最多一次,這個和JMS中”非持久化”訊息類似,傳送一次,無論成敗,將不會重發。消費者fetch訊息,然後儲存offset,然後處理訊息;當client儲存offset之後,但是在訊息處理過程中出現了異常,導致部分訊息未能繼續處理。那麼此後”未處理”的訊息將不能被fetch到,這就是”at most once”。
at least once:訊息至少傳送一次,如果訊息未能接受成功,可能會重發,直到接收成功。消費者fetch訊息,然後處理訊息,然後儲存offset。如果訊息處理成功之後,但是在儲存offset階段zookeeper異常導致儲存操作未能執行成功,這就導致接下來再次fetch時可能獲得上次已經處理過的訊息,這就是”at least once”,原因offset沒有及時的提交給zookeeper,zookeeper恢復正常還是之前offset狀態。
exactly once:訊息只會傳送一次。kafka中並沒有嚴格的去實現(基於2階段提交),我們認為這種策略在kafka中是沒有必要的。
通常情況下”at-least-once”是我們首選。
Kafka訊息儲存格式
Topic & Partition
一個topic可以認為一個一類訊息,每個topic將被分成多個partition,每個partition在儲存層面是append log檔案。
在Kafka檔案儲存中,同一個topic下有多個不同partition,每個partition為一個目錄,partiton命名規則為topic名稱+有序序號,第一個partiton序號從0開始,序號最大值為partitions數量減1。
- 每個partion(目錄)相當於一個巨型檔案被平均分配到多個大小相等segment(段)資料檔案中。但每個段segment file訊息數量不一定相等,這種特性方便old segment file快速被刪除。
- 每個partiton只需要支援順序讀寫就行了,segment檔案生命週期由服務端配置引數決定。
這樣做的好處就是能快速刪除無用檔案,有效提高磁碟利用率。
- segment file組成:由2大部分組成,分別為index file和data file,此2個檔案一一對應,成對出現,字尾”.index”和“.log”分別表示為segment索引檔案、資料檔案。
- segment檔案命名規則:partion全域性的第一個segment從0開始,後續每個segment檔名為上一個segment檔案最後一條訊息的offset值。數值最大為64位long大小,19位數字字元長度,沒有數字用0填充。
segment中index與data file對應關係物理結構如下:
上圖中索引檔案儲存大量元資料,資料檔案儲存大量訊息,索引檔案中元資料指向對應資料檔案中message的物理偏移地址。
其中以索引檔案中元資料3,497為例,依次在資料檔案中表示第3個message(在全域性partiton表示第368772個message),以及該訊息的物理偏移地址為497。
瞭解到segment data file由許多message組成,下面詳細說明message物理結構如下:
引數說明:
副本(replication)策略
Kafka的高可靠性的保障來源於其健壯的副本(replication)策略。
1) 資料同步
kafka在0.8版本前沒有提供Partition的Replication機制,一旦Broker宕機,其上的所有Partition就都無法提供服務,而Partition又沒有備份資料,資料的可用性就大大降低了。所以0.8後提供了Replication機制來保證Broker的failover。
引入Replication之後,同一個Partition可能會有多個Replica,而這時需要在這些Replication之間選出一個Leader,Producer和Consumer只與這個Leader互動,其它Replica作為Follower從Leader中複製資料。
2) 副本放置策略
為了更好的做負載均衡,Kafka儘量將所有的Partition均勻分配到整個叢集上。Kafka分配Replica的演算法如下:
- 將所有存活的N個Brokers和待分配的Partition排序
- 將第i個Partition分配到第(i mod n)個Broker上,這個Partition的第一個Replica存在於這個分配的Broker上,並且會作為partition的優先副本
- 將第i個Partition的第j個Replica分配到第((i + j) mod n)個Broker上
假設叢集一共有4個brokers,一個topic有4個partition,每個Partition有3個副本。下圖是每個Broker上的副本分配情況。
3) 同步策略
Producer在釋出訊息到某個Partition時,先通過ZooKeeper找到該Partition的Leader,然後無論該Topic的Replication Factor為多少,Producer只將該訊息傳送到該Partition的Leader。Leader會將該訊息寫入其本地Log。每個Follower都從Leader pull資料。這種方式上,Follower儲存的資料順序與Leader保持一致。Follower在收到該訊息並寫入其Log後,向Leader傳送ACK。一旦Leader收到了ISR中的所有Replica的ACK,該訊息就被認為已經commit了,Leader將增加HW並且向Producer傳送ACK。
為了提高效能,每個Follower在接收到資料後就立馬向Leader傳送ACK,而非等到資料寫入Log中。因此,對於已經commit的訊息,Kafka只能保證它被存於多個Replica的記憶體中,而不能保證它們被持久化到磁碟中,也就不能完全保證異常發生後該條訊息一定能被Consumer消費。
Consumer讀訊息也是從Leader讀取,只有被commit過的訊息才會暴露給Consumer。
Kafka Replication的資料流如下圖所示:
對於Kafka而言,定義一個Broker是否“活著”包含兩個條件:
- 一是它必須維護與ZooKeeper的session(這個通過ZooKeeper的Heartbeat機制來實現)。
- 二是Follower必須能夠及時將Leader的訊息複製過來,不能“落後太多”。
Leader會跟蹤與其保持同步的Replica列表,該列表稱為ISR(即in-sync Replica)。如果一個Follower宕機,或者落後太多,Leader將把它從ISR中移除。這裡所描述的“落後太多”指Follower複製的訊息落後於Leader後的條數超過預定值或者Follower超過一定時間未向Leader傳送fetch請求。
Kafka只解決fail/recover,一條訊息只有被ISR裡的所有Follower都從Leader複製過去才會被認為已提交。這樣就避免了部分資料被寫進了Leader,還沒來得及被任何Follower複製就宕機了,而造成資料丟失(Consumer無法消費這些資料)。而對於Producer而言,它可以選擇是否等待訊息commit。這種機制確保了只要ISR有一個或以上的Follower,一條被commit的訊息就不會丟失。
4) leader選舉
Leader選舉本質上是一個分散式鎖,有兩種方式實現基於ZooKeeper的分散式鎖:
- 節點名稱唯一性:多個客戶端建立一個節點,只有成功建立節點的客戶端才能獲得鎖
- 臨時順序節點:所有客戶端在某個目錄下建立自己的臨時順序節點,只有序號最小的才獲得鎖
Majority Vote的選舉策略和ZooKeeper中的Zab選舉是類似的,實際上ZooKeeper內部本身就實現了少數服從多數的選舉策略。kafka中對於Partition的leader副本的選舉採用了第一種方法:為Partition分配副本,指定一個ZNode臨時節點,第一個成功建立節點的副本就是Leader節點,其他副本會在這個ZNode節點上註冊Watcher監聽器,一旦Leader宕機,對應的臨時節點就會被自動刪除,這時註冊在該節點上的所有Follower都會收到監聽器事件,它們都會嘗試建立該節點,只有建立成功的那個follower才會成為Leader(ZooKeeper保證對於一個節點只有一個客戶端能建立成功),其他follower繼續重新註冊監聽事件。
Kafka訊息分組,訊息消費原理
同一Topic的一條訊息只能被同一個Consumer Group內的一個Consumer消費,但多個Consumer Group可同時消費這一訊息。
這是Kafka用來實現一個Topic訊息的廣播(發給所有的Consumer)和單播(發給某一個Consumer)的手段。一個Topic可以對應多個Consumer Group。如果需要實現廣播,只要每個Consumer有一個獨立的Group就可以了。要實現單播只要所有的Consumer在同一個Group裡。用Consumer Group還可以將Consumer進行自由的分組而不需要多次傳送訊息到不同的Topic。
Push vs. Pull
作為一個訊息系統,Kafka遵循了傳統的方式,選擇由Producer向broker push訊息並由Consumer從broker pull訊息。
push模式很難適應消費速率不同的消費者,因為訊息傳送速率是由broker決定的。push模式的目標是儘可能以最快速度傳遞訊息,但是這樣很容易造成Consumer來不及處理訊息,典型的表現就是拒絕服務以及網路擁塞。而pull模式則可以根據Consumer的消費能力以適當的速率消費訊息。
對於Kafka而言,pull模式更合適。pull模式可簡化broker的設計,Consumer可自主控制消費訊息的速率,同時Consumer可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現不同的傳輸語義。
Kafak順序寫入與資料讀取
生產者(producer)是負責向Kafka提交資料的,Kafka會把收到的訊息都寫入到硬碟中,它絕對不會丟失資料。為了優化寫入速度Kafak採用了兩個技術,順序寫入和MMFile。
順序寫入
因為硬碟是機械結構,每次讀寫都會定址,寫入,其中定址是一個“機械動作”,它是最耗時的。所以硬碟最“討厭”隨機I/O,最喜歡順序I/O。為了提高讀寫硬碟的速度,Kafka就是使用順序I/O。
每條訊息都被append到該Partition中,屬於順序寫磁碟,因此效率非常高。
對於傳統的message queue而言,一般會刪除已經被消費的訊息,而Kafka是不會刪除資料的,它會把所有的資料都保留下來,每個消費者(Consumer)對每個Topic都有一個offset用來表示讀取到了第幾條資料。
即便是順序寫入硬碟,硬碟的訪問速度還是不可能追上記憶體。所以Kafka的資料並不是實時的寫入硬碟,它充分利用了現代作業系統分頁儲存來利用記憶體提高I/O效率。
在Linux Kernal 2.2之後出現了一種叫做“零拷貝(zero-copy)”系統呼叫機制,就是跳過“使用者緩衝區”的拷貝,建立一個磁碟空間和記憶體空間的直接對映,資料不再複製到“使用者態緩衝區”系統上下文切換減少2次,可以提升一倍效能。
通過mmap,程序像讀寫硬碟一樣讀寫記憶體(當然是虛擬機器記憶體)。使用這種方式可以獲取很大的I/O提升,省去了使用者空間到核心空間複製的開銷(呼叫檔案的read會把資料先放到核心空間的記憶體中,然後再複製到使用者空間的記憶體中。)
消費者(讀取資料)
試想一下,一個Web Server傳送一個靜態檔案,如何優化?答案是zero copy。傳統模式下我們從硬碟讀取一個檔案是這樣的。
先複製到核心空間(read是系統呼叫,放到了DMA,所以用核心空間),然後複製到使用者空間(1、2);從使用者空間重新複製到核心空間(你用的socket是系統呼叫,所以它也有自己的核心空間),最後傳送給網絡卡(3、4)。
Zero Copy中直接從核心空間(DMA的)到核心空間(Socket的),然後傳送網絡卡。這個技術非常普遍,Nginx也是用的這種技術。
實際上,Kafka把所有的訊息都存放在一個一個的檔案中,當消費者需要資料的時候Kafka直接把“檔案”傳送給消費者。當不需要把整個檔案發出去的時候,Kafka通過呼叫Zero Copy的sendfile這個函式,這個函式包括:
- out_fd作為輸出(一般及時socket的控制代碼)
- in_fd作為輸入檔案控制代碼
- off_t表示in_fd的偏移(從哪裡開始讀取)
- size_t表示讀取多少個
Reference
本文轉載自 linkedkeeper.com (文/張鬆然)