1. 程式人生 > >Kafka負載均衡、Kafka自定義Partition、Kafk檔案儲存機制

Kafka負載均衡、Kafka自定義Partition、Kafk檔案儲存機制

1、Kafka整體結構圖
Kafka名詞解釋和工作方式

 Producer :訊息生產者,就是向kafka broker發訊息的客戶端。
 Consumer :訊息消費者,向kafka broker取訊息的客戶端
 Topic :咋們可以理解為一個佇列。
 Consumer Group (CG):這是kafka用來實現一個topic訊息的廣播(發給所有的consumer)和單播(發給任意一個consumer)的手段。一個topic可以有多個CG。topic的訊息會複製(不是真的複製,是概念上的)到所有的CG,但每個partion只會把訊息發給該CG中的一個consumer。如果需要實現廣播,只要每個consumer有一個獨立的CG就可以了。要實現單播只要所有的consumer在同一個CG。用CG還可以將consumer進行自由的分組而不需要多次傳送訊息到不同的topic。
 Broker :一臺kafka伺服器就是一個broker。一個叢集由多個broker組成。一個broker可以容納多個topic。
 Partition:為了實現擴充套件性,一個非常大的topic可以分佈到多個broker(即伺服器)上,一個topic可以分為多個partition,每個partition是一個有序的佇列。partition中的每條訊息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將訊息發給consumer,不保證一個topic的整體(多個partition間)的順序。
 Offset:kafka的儲存檔案都是按照offset.kafka來命名,用offset做名字的好處是方便查詢。例如你想找位於2049的位置,只要找到2048.kafka的檔案即可。當然the first offset就是00000000000.kafka
2、Consumer與topic關係
本質上kafka只支援Topic;
 每個group中可以有多個consumer,每個consumer屬於一個consumer group;
通常情況下,一個group中會包含多個consumer,這樣不僅可以提高topic中訊息的併發消費能力,而且還能提高”故障容錯”性,如果group中的某個consumer失效那麼其消費的partitions將會有其他consumer自動接管。
 對於Topic中的一條特定的訊息,只會被訂閱此Topic的每個group中的其中一個consumer消費,此訊息不會發送給一個group的多個consumer;
那麼一個group中所有的consumer將會交錯的消費整個Topic,每個group中consumer訊息消費互相獨立,我們可以認為一個group是一個”訂閱”者。
 在kafka中,一個partition中的訊息只會被group中的一個consumer消費(同一時刻);
一個Topic中的每個partions,只會被一個”訂閱者”中的一個consumer消費,不過一個consumer可以同時消費多個partitions中的訊息。
 kafka的設計原理決定,對於一個topic,同一個group中不能有多於partitions個數的consumer同時消費,否則將意味著某些consumer將無法得到訊息。
kafka只能保證一個partition中的訊息被某個consumer消費時是順序的;事實上,從Topic角度來說,當有多個partitions時,訊息仍不是全域性有序的。
3、Kafka訊息的分發
Producer客戶端負責訊息的分發
 kafka叢集中的任何一個broker都可以向producer提供metadata資訊,這些metadata中包含”叢集中存活的servers列表”/”partitions leader列表”等資訊;
 當producer獲取到metadata資訊之後, producer將會和Topic下所有partition leader保持socket連線;
 訊息由producer直接通過socket傳送到broker,中間不會經過任何”路由層”,事實上,訊息被路由到哪個partition上由producer客戶端決定;
比如可以採用”random”“key-hash”“輪詢”等,如果一個topic中有多個partitions,那麼在producer端實現”訊息均衡分發”是必要的。
 在producer端的配置檔案中,開發者可以指定partition路由的方式。

Producer訊息傳送的應答機制
設定傳送資料是否需要服務端的反饋,有三個值0,1,-1
0: producer不會等待broker傳送ack
1: 當leader接收到訊息之後傳送ack
-1: 當所有的follower都同步訊息成功後傳送ack
request.required.acks=0
4、Consumer的負載均衡
當一個group中,有consumer加入或者離開時,會觸發partitions均衡.均衡的最終目的,是提升topic的併發消費能力,步驟如下:
1、 假如topic1,具有如下partitions: P0,P1,P2,P3
2、 加入group中,有如下consumer: C1,C2
3、 首先根據partition索引號對partitions排序: P0,P1,P2,P3
4、 根據consumer.id排序: C0,C1
5、 計算倍數: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)
6、 然後依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]

5、kafka檔案儲存機制
5.1、Kafka檔案儲存基本結構
 在Kafka檔案儲存中,同一個topic下有多個不同partition,每個partition為一個目錄,partiton命名規則為topic名稱+有序序號,第一個partiton序號從0開始,序號最大值為partitions數量減1。
 每個partion(目錄)相當於一個巨型檔案被平均分配到多個大小相等segment(段)資料檔案中。但每個段segment file訊息數量不一定相等,這種特性方便old segment file快速被刪除。預設保留7天的資料。

 每個partiton只需要支援順序讀寫就行了,segment檔案生命週期由服務端配置引數決定。(什麼時候建立,什麼時候刪除)

資料有序的討論?
一個partition的資料是否是有序的? 間隔性有序,不連續
針對一個topic裡面的資料,只能做到partition內部有序,不能做到全域性有序。
特別加入消費者的場景後,如何保證消費者消費的資料全域性有序的?偽命題。

只有一種情況下才能保證全域性有序?就是隻有一個partition。
5.2、Kafka Partition Segment
 Segment file組成:由2大部分組成,分別為index file和data file,此2個檔案一一對應,成對出現,字尾”.index”和“.log”分別表示為segment索引檔案、資料檔案。

 Segment檔案命名規則:partion全域性的第一個segment從0開始,後續每個segment檔名為上一個segment檔案最後一條訊息的offset值。數值最大為64位long大小,19位數字字元長度,沒有數字用0填充。
 索引檔案儲存大量元資料,資料檔案儲存大量訊息,索引檔案中元資料指向對應資料檔案中message的物理偏移地址。

3,497:當前log檔案中的第幾條資訊,存放在磁碟上的那個地方

上述圖中索引檔案儲存大量元資料,資料檔案儲存大量訊息,索引檔案中元資料指向對應資料檔案中message的物理偏移地址。
其中以索引檔案中元資料3,497為例,依次在資料檔案中表示第3個message(在全域性partiton表示第368772個message)、以及該訊息的物理偏移地址為497。

 segment data file由許多message組成, qq物理結構如下:
關鍵字 解釋說明
8 byte offset 在parition(分割槽)內的每條訊息都有一個有序的id號,這個id號被稱為偏移(offset),它可以唯一確定每條訊息在parition(分割槽)內的位置。即offset表示partiion的第多少message
4 byte message size message大小
4 byte CRC32 用crc32校驗message
1 byte “magic” 表示本次釋出Kafka服務程式協議版本號
1 byte “attributes” 表示為獨立版本、或標識壓縮型別、或編碼型別。
4 byte key length 表示key的長度,當key為-1時,K byte key欄位不填
K byte key 可選
value bytes payload 表示實際訊息資料。

5.3、Kafka 查詢message
讀取offset=368776的message,需要通過下面2個步驟查詢。

5.3.1、查詢segment file
00000000000000000000.index表示最開始的檔案,起始偏移量(offset)為0
00000000000000368769.index的訊息量起始偏移量為368770 = 368769 + 1
00000000000000737337.index的起始偏移量為737338=737337 + 1
其他後續檔案依次類推。
以起始偏移量命名並排序這些檔案,只要根據offset 二分查詢檔案列表,就可以快速定位到具體檔案。當offset=368776時定位到00000000000000368769.index和對應log檔案。
5.3.2、通過segment file查詢message
當offset=368776時,依次定位到00000000000000368769.index的元資料物理位置和00000000000000368769.log的物理偏移地址
然後再通過00000000000000368769.log順序查詢直到offset=368776為止。
6、Kafka自定義Partition
見程式碼

關於訊息佇列的使用—-ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
一、訊息佇列概述
訊息佇列中介軟體是分散式系統中重要的元件,主要解決應用解耦,非同步訊息,流量削鋒等問題,實現高效能,高可用,可伸縮和最終一致性架構。目前使用較多的訊息佇列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
二、訊息佇列應用場景
以下介紹訊息佇列在實際應用中常用的使用場景。非同步處理,應用解耦,流量削鋒和訊息通訊四個場景。
2.1非同步處理
場景說明:使用者註冊後,需要發註冊郵件和註冊簡訊。傳統的做法有兩種 1.序列的方式;2.並行方式
a、序列方式:將註冊資訊寫入資料庫成功後,傳送註冊郵件,再發送註冊簡訊。以上三個任務全部完成後,返回給客戶端。

b、並行方式:將註冊資訊寫入資料庫成功後,傳送註冊郵件的同時,傳送註冊簡訊。以上三個任務完成後,返回給客戶端。與序列的差別是,並行的方式可以提高處理的時間

假設三個業務節點每個使用50毫秒鐘,不考慮網路等其他開銷,則序列方式的時間是150毫秒,並行的時間可能是100毫秒。
因為CPU在單位時間內處理的請求數是一定的,假設CPU1秒內吞吐量是100次。則序列方式1秒內CPU可處理的請求量是7次(1000/150)。並行方式處理的請求量是10次(1000/100)
小結:如以上案例描述,傳統的方式系統的效能(併發量,吞吐量,響應時間)會有瓶頸。如何解決這個問題呢?
引入訊息佇列,將不是必須的業務邏輯,非同步處理。改造後的架構如下:

按照以上約定,使用者的響應時間相當於是註冊資訊寫入資料庫的時間,也就是50毫秒。註冊郵件,傳送簡訊寫入訊息佇列後,直接返回,因此寫入訊息佇列的速度很快,基本可以忽略,因此使用者的響應時間可能是50毫秒。因此架構改變後,系統的吞吐量提高到每秒20 QPS。比序列提高了3倍,比並行提高了兩倍。
2.2應用解耦
場景說明:使用者下單後,訂單系統需要通知庫存系統。傳統的做法是,訂單系統呼叫庫存系統的介面。如下圖:

傳統模式的缺點:假如庫存系統無法訪問,則訂單減庫存將失敗,從而導致訂單失敗,訂單系統與庫存系統耦合
如何解決以上問題呢?引入應用訊息佇列後的方案,如下圖:

訂單系統:使用者下單後,訂單系統完成持久化處理,將訊息寫入訊息佇列,返回使用者訂單下單成功
庫存系統:訂閱下單的訊息,採用拉/推的方式,獲取下單資訊,庫存系統根據下單資訊,進行庫存操作
假如:在下單時庫存系統不能正常使用。也不影響正常下單,因為下單後,訂單系統寫入訊息佇列就不再關心其他的後續操作了。實現訂單系統與庫存系統的應用解耦
2.3流量削鋒
流量削鋒也是訊息佇列中的常用場景,一般在秒殺或團搶活動中使用廣泛。
應用場景:秒殺活動,一般會因為流量過大,導致流量暴增,應用掛掉。為解決這個問題,一般需要在應用前端加入訊息佇列。
a、可以控制活動的人數
b、可以緩解短時間內高流量壓垮應用

使用者的請求,伺服器接收後,首先寫入訊息佇列。假如訊息佇列長度超過最大數量,則直接拋棄使用者請求或跳轉到錯誤頁面。
秒殺業務根據訊息佇列中的請求資訊,再做後續處理
2.4日誌處理
日誌處理是指將訊息佇列用在日誌處理中,比如Kafka的應用,解決大量日誌傳輸的問題。架構簡化如下

日誌採集客戶端,負責日誌資料採集,定時寫受寫入Kafka佇列
Kafka訊息佇列,負責日誌資料的接收,儲存和轉發
日誌處理應用:訂閱並消費kafka佇列中的日誌資料
2.5訊息通訊
訊息通訊是指,訊息佇列一般都內建了高效的通訊機制,因此也可以用在純的訊息通訊。比如實現點對點訊息佇列,或者聊天室等
點對點通訊:

客戶端A和客戶端B使用同一佇列,進行訊息通訊。
聊天室通訊:

客戶端A,客戶端B,客戶端N訂閱同一主題,進行訊息釋出和接收。實現類似聊天室效果。
以上實際是訊息佇列的兩種訊息模式,點對點或釋出訂閱模式。模型為示意圖,供參考。
三、訊息中介軟體示例
3.1電商系統

訊息佇列採用高可用,可持久化的訊息中介軟體。比如Active MQ,Rabbit MQ,Rocket Mq。
(1)應用將主幹邏輯處理完成後,寫入訊息佇列。訊息傳送是否成功可以開啟訊息的確認模式。(訊息佇列返回訊息接收成功狀態後,應用再返回,這樣保障訊息的完整性)
(2)擴充套件流程(發簡訊,配送處理)訂閱佇列訊息。採用推或拉的方式獲取訊息並處理。
(3)訊息將應用解耦的同時,帶來了資料一致性問題,可以採用最終一致性方式解決。比如主資料寫入資料庫,擴充套件應用根據訊息佇列,並結合資料庫方式實現基於訊息佇列的後續處理。
3.2日誌收集系統

分為Zookeeper註冊中心,日誌收集客戶端,Kafka叢集和Storm叢集(OtherApp)四部分組成。
Zookeeper註冊中心,提出負載均衡和地址查詢服務
日誌收集客戶端,用於採集應用系統的日誌,並將資料推送到kafka佇列
Kafka叢集:接收,路由,儲存,轉發等訊息處理
Storm叢集:與OtherApp處於同一級別,採用拉的方式消費佇列中的資料
四、JMS訊息服務
講訊息佇列就不得不提JMS 。JMS(Java Message Service,Java訊息服務)API是一個訊息服務的標準/規範,允許應用程式元件基於JavaEE平臺建立、傳送、接收和讀取訊息。它使分散式通訊耦合度更低,訊息服務更加可靠以及非同步性。
在EJB架構中,有訊息bean可以無縫的與JM訊息服務整合。在J2EE架構模式中,有訊息服務者模式,用於實現訊息與應用直接的解耦。
4.1訊息模型
在JMS標準中,有兩種訊息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)。
4.1.1 P2P模式

P2P模式包含三個角色:訊息佇列(Queue),傳送者(Sender),接收者(Receiver)。每個訊息都被髮送到一個特定的佇列,接收者從佇列中獲取訊息。佇列保留著訊息,直到他們被消費或超時。
P2P的特點
每個訊息只有一個消費者(Consumer)(即一旦被消費,訊息就不再在訊息佇列中)
傳送者和接收者之間在時間上沒有依賴性,也就是說當傳送者傳送了訊息之後,不管接收者有沒有正在執行,它不會影響到訊息被髮送到佇列
接收者在成功接收訊息之後需向佇列應答成功
如果希望傳送的每個訊息都會被成功處理的話,那麼需要P2P模式。
4.1.2 Pub/Sub模式

包含三個角色主題(Topic),釋出者(Publisher),訂閱者(Subscriber) 多個釋出者將訊息傳送到Topic,系統將這些訊息傳遞給多個訂閱者。
Pub/Sub的特點
每個訊息可以有多個消費者
釋出者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須建立一個訂閱者之後,才能消費釋出者的訊息
為了消費訊息,訂閱者必須保持執行的狀態
為了緩和這樣嚴格的時間相關性,JMS允許訂閱者建立一個可持久化的訂閱。這樣,即使訂閱者沒有被啟用(執行),它也能接收到釋出者的訊息。
如果希望傳送的訊息可以不被做任何處理、或者只被一個訊息者處理、或者可以被多個消費者處理的話,那麼可以採用Pub/Sub模型。
4.2訊息消費
在JMS中,訊息的產生和消費都是非同步的。對於消費來說,JMS的訊息者可以通過兩種方式來消費訊息。
(1)同步
訂閱者或接收者通過receive方法來接收訊息,receive方法在接收到訊息之前(或超時之前)將一直阻塞;
(2)非同步
訂閱者或接收者可以註冊為一個訊息監聽器。當訊息到達之後,系統自動呼叫監聽器的onMessage方法。
JNDI:Java命名和目錄介面,是一種標準的Java命名系統介面。可以在網路上查詢和訪問服務。通過指定一個資源名稱,該名稱對應於資料庫或命名服務中的一個記錄,同時返回資源連線建立所必須的資訊。
JNDI在JMS中起到查詢和訪問傳送目標或訊息來源的作用。
五、常用訊息佇列
一般商用的容器,比如WebLogic,JBoss,都支援JMS標準,開發上很方便。但免費的比如Tomcat,Jetty等則需要使用第三方的訊息中介軟體。本部分內容介紹常用的訊息中介軟體(Active MQ,Rabbit MQ,Zero MQ,Kafka)以及他們的特點。
5.1 ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力強勁的開源訊息匯流排。ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。
ActiveMQ特性如下:
⒈ 多種語言和協議編寫客戶端。語言: Java,C,C++,C#,Ruby,Perl,Python,PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
⒉ 完全支援JMS1.1和J2EE 1.4規範 (持久化,XA訊息,事務)
⒊ 對spring的支援,ActiveMQ可以很容易內嵌到使用Spring的系統裡面去,而且也支援Spring2.0的特性
⒋ 通過了常見J2EE伺服器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何相容J2EE 1.4 商業伺服器上
⒌ 支援多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
⒍ 支援通過JDBC和journal提供高速的訊息持久化
⒎ 從設計上保證了高效能的叢集,客戶端-伺服器,點對點
⒏ 支援Ajax
⒐ 支援與Axis的整合
⒑ 可以很容易得呼叫內嵌JMS provider,進行測試
5.2 Kafka
Kafka是一種高吞吐量的分散式釋出訂閱訊息系統,它可以處理消費者規模的網站中的所有動作流資料。 這種動作(網頁瀏覽,搜尋和其他使用者的行動)是在現代網路上的許多社會功能的一個關鍵因素。 這些資料通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。 對於像Hadoop的一樣的日誌資料和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過hadoop的並行載入機制來統一線上和離線的訊息處理,也是為了通過叢集機來提供實時的消費。
Kafka是一種高吞吐量的分散式釋出訂閱訊息系統,有如下特性:
通過O(1)的磁碟資料結構提供訊息的持久化,這種結構對於即使數以TB的訊息儲存也能夠保持長時間的穩定效能。(檔案追加的方式寫入資料,過期的資料定期刪除)
高吞吐量:即使是非常普通的硬體Kafka也可以支援每秒數百萬的訊息
支援通過Kafka伺服器和消費機叢集來分割槽訊息
支援Hadoop並行資料載入
Kafka相關概念
Broker
Kafka叢集包含一個或多個伺服器,這種伺服器被稱為broker[5]
Topic
每條釋出到Kafka叢集的訊息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的訊息分開儲存,邏輯上一個Topic的訊息雖然保存於一個或多個broker上但使用者只需指定訊息的Topic即可生產或消費資料而不必關心資料存於何處)
Partition
Parition是物理上的概念,每個Topic包含一個或多個Partition.
Producer
負責釋出訊息到Kafka broker
Consumer
訊息消費者,向Kafka broker讀取訊息的客戶端。
Consumer Group
每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於預設的group)。
一般應用在大資料日誌處理或對實時性(少量延遲),可靠性(少量丟資料)要求稍低的場景使用。