Kafka之簡介
1.什麼是kafka?
1.1入門
1.1.1簡介
-
kafka是LinkedIn開源的一個分散式MQ系統,現在是Apache的孵化專案。
-
Kafka is a distributed,partitioned,replicated commit logservice ,在它的主頁描述kafka為一個高吞吐量的分散式(能將訊息分散到不同的節點上)MQ。
-
Kafka僅僅由7000行Scala編寫,據瞭解,Kafka每秒可以生產約25萬訊息(50 MB),每秒處理55萬訊息(110 MB)。
-
提供了類似於JMS的特性,但是在設計實現上完全不同,此外它並不是JMS的規範實現。
kafka對訊息儲存時根據Topic進行歸類,傳送訊息者成為Producer,訊息接受者成為Consumer,此外kafka叢集有多個kafka例項組成,每個例項(server)成為broker
-
kafka目前支援多種客戶端語言:java,python,c++,php等等。
-
kafka叢集的簡要圖解如下,producer寫入訊息,consumer讀取訊息:
1.1.2 kafka名詞解釋和工作方式:
-
Producer :訊息生產者,就是向kafka broker發訊息的客戶端。
-
Consumer :訊息消費者,向kafka broker取訊息的客戶端
-
Topic :可以理解為一個佇列
-
Consumer Group (CG):這是kafka用來實現一個topic訊息的廣播(發給所有的consumer)和單播(發給任意一個consumer)的手段。一個topic可以有多個CG。topic的訊息會複製(不是真的複製,是概念上的)到所有的CG,但每個CG只會把訊息發給該CG中的一個consumer。
如果需要實現廣播,只要每個consumer有一個獨立的CG就可以;要實現單播,只要所有的consumer在同一個CG;用CG還可以將consumer進行自由的分組而不需要多次傳送訊息到不同的topic。
-
Broker :一臺kafka伺服器就是一個broker。一個叢集由多個broker組成,一個broker可以容納多個topic
-
Partition:為了實現擴充套件性,一個非常大的topic可以分佈到多個broker(即伺服器)上,一個topic可以分為多個partition,每個partition是一個有序的佇列。partition中的每條訊息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將訊息發給consumer,不保證一個topic的整體(多個partition間)的順序。
-
Offset:kafka的儲存檔案都是按照offset.kafka來命名,用offset做名字的好處是方便查詢。例如你想找位於2049的位置,只要找到2048.kafka的檔案即可。當然the first offset就是00000000000.kafka
1.2、Topics/logs
一個Topic可以認為是一類訊息,每個topic將被分成多個partition(區),每個partition在儲存層面是append log檔案。任何釋出到此partition的訊息都會被直接追加到log檔案的尾部,每條訊息在檔案中的位置稱為offset(偏移量),offset為一個long型數字,它唯一標記一條訊息。kafka並沒有提供其他額外的索引機制來儲存offset,因為在kafka中幾乎不允許對訊息進行“隨機讀寫”。
-
kafka和JMS(Java Message Service)實現(activeMQ)不同的是:即使訊息被消費,訊息仍然不會被立即刪除.日誌檔案將會根據broker中的配置要求,保留一定的時間之後刪除;比如log檔案保留2天,那麼兩天後,檔案會被清除,無論其中的訊息是否被消費。kafka通過這種簡單的手段,來釋放磁碟空間,以及減少訊息消費之後對檔案內容改動的磁碟IO開支
-
對於consumer而言,它需要儲存消費訊息的offset,對於offset的儲存和使用,有consumer來控制;當consumer正常消費訊息時,offset將會"線性"的向前驅動,即訊息將依次順序被消費.事實上consumer可以使用任意順序消費訊息,它只需要將offset重置為任意值(offset將會儲存在zookeeper中,參見下文)
-
kafka叢集幾乎不需要維護任何consumer和producer狀態資訊,這些資訊由zookeeper儲存;因此producer和consumer的客戶端實現非常輕量級,它們可以隨意離開,而不會對叢集造成額外的影響.
-
partitions設計的目的的根本原因
partitions的設計目的有多個。最根本原因是kafka基於檔案儲存。通過分割槽,可以將日誌內容分散到多個server上,來避免檔案尺寸達到單機磁碟的上限,每個partiton都會被當前server(kafka例項)儲存;可以將一個topic切分多任意多個partitions,來訊息儲存/消費的效率.此外越多的partitions意味著可以容納更多的consumer,有效提升併發消費的能力.(具體原理參見下文).
1.3、分散式
一個Topic的多個partitions,被分佈在kafka叢集中的多個server上;每個server(kafka例項)負責partitions中訊息的讀寫操作;此外kafka還可以配置partitions需要備份的個數(replicas),每個partition將會被備份到多臺機器上,以提高可用性.
基於replicated方案,那麼就意味著需要對多個備份進行排程;每個partition都有一個server為"leader";leader負責所有的讀寫操作,如果leader失效,那麼將會有其他follower來接管(成為新的leader);follower只是單調的和leader跟進,同步訊息即可..由此可見作為leader的server承載了全部的請求壓力,因此從叢集的整體考慮,有多少個partitions就意味著有多少個"leader",kafka會將"leader"均衡的分散在每個例項上,來確保整體的效能穩定.
-
Producers
Producer將訊息釋出到指定的Topic中,同時Producer也能決定將此訊息歸屬於哪個partition;比如基於"round-robin"方式或者通過其他的一些演算法等.
-
Consumers
本質上kafka只支援Topic.每個consumer屬於一個consumer group;反過來說,每個group中可以有多個consumer.傳送到Topic的訊息,只會被訂閱此Topic的每個group中的一個consumer消費.
如果所有的consumer都具有相同的group,這種情況和queue模式很像;訊息將會在consumers之間負載均衡.
如果所有的consumer都具有不同的group,那這就是"釋出-訂閱";訊息將會廣播給所有的消費者.
在kafka中,一個partition中的訊息只會被group中的一個consumer消費;每個group中consumer訊息消費互相獨立;我們可以認為一個group是一個"訂閱"者,一個Topic中的每個partions,只會被一個"訂閱者"中的一個consumer消費,不過一個consumer可以消費多個partitions中的訊息.kafka只能保證一個partition中的訊息被某個consumer消費時,訊息是順序的.事實上,從Topic角度來說,訊息仍不是有序的.
kafka的設計原理決定,對於一個topic,同一個group中不能有多於partitions個數的consumer同時消費,否則將意味著某些consumer將無法得到訊息.
-
Guarantees
-
傳送到partitions中的訊息將會按照它接收的順序追加到日誌中
-
對於消費者而言,它們消費訊息的順序和日誌中訊息順序一致.
-
如果Topic的"replicationfactor"為N,那麼允許N-1個kafka例項失效.
-
2、使用場景
2.1 Messaging
對於一些常規的訊息系統,kafka是個不錯的選擇;partitons/replication和容錯,可以使kafka具有良好的擴充套件性和效能優勢.不過到目前為止,我們應該很清楚認識到,kafka並沒有提供JMS中的"事務性""訊息傳輸擔保(訊息確認機制)""訊息分組"等企業級特性;kafka只能使用作為"常規"的訊息系統,在一定程度上,尚未確保訊息的傳送與接收絕對可靠(比如,訊息重發,訊息傳送丟失等)
2.2 Websit activity tracking
kafka可以作為"網站活性跟蹤"的最佳工具;可以將網頁/使用者操作等資訊傳送到kafka中.並實時監控,或者離線統計分析等
2.3 Log Aggregation
kafka的特性決定它非常適合作為"日誌收集中心";application可以將操作日誌"批量""非同步"的傳送到kafka叢集中,而不是儲存在本地或者DB中;kafka可以批量提交訊息/壓縮訊息等,這對producer端而言,幾乎感覺不到效能的開支.此時consumer端可以使hadoop等其他系統化的儲存和分析系統.
3.設計原理
kafka的設計初衷是希望作為一個統一的資訊收集平臺,能夠實時的收集反饋資訊,並需要能夠支撐較大的資料量,且具備良好的容錯能力.
3.1、永續性
kafka使用檔案儲存訊息,這就直接決定kafka在效能上嚴重依賴檔案系統的本身特性.且無論任何OS下,對檔案系統本身的優化幾乎沒有可能.檔案快取/直接記憶體對映等是常用的手段.因為kafka是對日誌檔案進行append操作,因此磁碟檢索的開支是較小的;同時為了減少磁碟寫入的次數,broker會將訊息暫時buffer起來,當訊息的個數(或尺寸)達到一定閥值時,再flush到磁碟,這樣減少了磁碟IO呼叫的次數.
3.2、效能
需要考慮的影響效能點很多,除磁碟IO之外,我們還需要考慮網路IO,這直接關係到kafka的吞吐量問題.kafka並沒有提供太多高超的技巧;對於producer端,可以將訊息buffer起來,當訊息的條數達到一定閥值時,批量傳送給broker;對於consumer端也是一樣,批量fetch多條訊息.不過訊息量的大小可以通過配置檔案來指定.對於kafka broker端,似乎有個sendfile系統呼叫可以潛在的提升網路IO的效能:將檔案的資料對映到系統記憶體中,socket直接讀取相應的記憶體區域即可,而無需程序再次copy和交換. 其實對於producer/consumer/broker三者而言,CPU的開支應該都不大,因此啟用訊息壓縮機制是一個良好的策略;壓縮需要消耗少量的CPU資源,不過對於kafka而言,網路IO更應該需要考慮.可以將任何在網路上傳輸的訊息都經過壓縮.kafka支援gzip/snappy等多種壓縮方式.
3.3、生產者
負載均衡: producer將會和Topic下所有partition leader保持socket連線;訊息由producer直接通過socket傳送到broker,中間不會經過任何"路由層".事實上,訊息被路由到哪個partition上,有producer客戶端決定.比如可以採用"random""key-hash""輪詢"等,如果一個topic中有多個partitions,那麼在producer端實現"訊息均衡分發"是必要的.
其中partition leader的位置(host:port)註冊在zookeeper中,producer作為zookeeper client,已經註冊了watch用來監聽partition leader的變更事件.
非同步傳送:將多條訊息暫且在客戶端buffer起來,並將他們批量的傳送到broker,小資料IO太多,會拖慢整體的網路延遲,批量延遲傳送事實上提升了網路效率。不過這也有一定的隱患,比如說當producer失效時,那些尚未傳送的訊息將會丟失。
3.4、消費者
consumer端向broker傳送"fetch"請求,並告知其獲取訊息的offset;此後consumer將會獲得一定條數的訊息;consumer端也可以重置offset來重新消費訊息.
在JMS實現中,Topic模型基於push方式,即broker將訊息推送給consumer端.不過在kafka中,採用了pull方式,即consumer在和broker建立連線之後,主動去pull(或者說fetch)訊息;這種模式有些優點,首先consumer端可以根據自己的消費能力適時的去fetch訊息並處理,且可以控制訊息消費的進度(offset);此外,消費者可以良好的控制訊息消費的數量,batch fetch.
其他JMS實現,訊息消費的位置是有prodiver保留,以便避免重複傳送訊息或者將沒有消費成功的訊息重發等,同時還要控制訊息的狀態.這就要求JMS broker需要太多額外的工作.在kafka中,partition中的訊息只有一個consumer在消費,且不存在訊息狀態的控制,也沒有複雜的訊息確認機制,可見kafka broker端是相當輕量級的.當訊息被consumer接收之後,consumer可以在本地儲存最後訊息的offset,並間歇性的向zookeeper註冊offset.由此可見,consumer客戶端也很輕量級.
3.5、訊息傳送機制
對於JMS實現,訊息傳輸擔保非常直接:有且只有一次(exactly once).在kafka中稍有不同:
-
at most once: 最多一次,這個和JMS中"非持久化"訊息類似.傳送一次,無論成敗,將不會重發.
-
at least once: 訊息至少傳送一次,如果訊息未能接受成功,可能會重發,直到接收成功.
-
exactly once: 訊息只會傳送一次.
at most once: 消費者fetch訊息,然後儲存offset,然後處理訊息;當client儲存offset之後,但是在訊息處理過程中出現了異常,導致部分訊息未能繼續處理.那麼此後"未處理"的訊息將不能被fetch到,這就是"at most once".
at least once: 消費者fetch訊息,然後處理訊息,然後儲存offset.如果訊息處理成功之後,但是在儲存offset階段zookeeper異常導致儲存操作未能執行成功,這就導致接下來再次fetch時可能獲得上次已經處理過的訊息,這就是"at least once",原因offset沒有及時的提交給zookeeper,zookeeper恢復正常還是之前offset狀態.
exactly once: kafka中並沒有嚴格的去實現(基於2階段提交,事務),我們認為這種策略在kafka中是沒有必要的.
通常情況下"at-least-once"是我們搜選.(相比at most once而言,重複接收資料總比丟失資料要好).
3.6、複製備份
kafka將每個partition資料複製到多個server上,任何一個partition有一個leader和多個follower(可以沒有);備份的個數可以通過broker配置檔案來設定.leader處理所有的read-write請求,follower需要和leader保持同步.Follower和consumer一樣,消費訊息並儲存在本地日誌中;leader負責跟蹤所有的follower狀態,如果follower"落後"太多或者失效,leader將會把它從replicas同步列表中刪除.當所有的follower都將一條訊息儲存成功,此訊息才被認為是"committed",那麼此時consumer才能消費它.即使只有一個replicas例項存活,仍然可以保證訊息的正常傳送和接收,只要zookeeper叢集存活即可.(不同於其他分散式儲存,比如hbase需要"多數派"存活才行)
當leader失效時,需在followers中選取出新的leader,可能此時follower落後於leader,因此需要選擇一個"up-to-date"的follower.選擇follower時需要兼顧一個問題,就是新leaderserver上所已經承載的partition leader的個數,如果一個server上有過多的partition leader,意味著此server將承受著更多的IO壓力.在選舉新leader,需要考慮到"負載均衡".
3.7.日誌
如果一個topic的名稱為"my_topic",它有2個partitions,那麼日誌將會儲存在my_topic_0和my_topic_1兩個目錄中;日誌檔案中儲存了一序列"log entries"(日誌條目),每個log entry格式為"4個位元組的數字N表示訊息的長度" + "N個位元組的訊息內容";每個日誌都有一個offset來唯一的標記一條訊息,offset的值為8個位元組的數字,表示此訊息在此partition中所處的起始位置..每個partition在物理儲存層面,有多個log file組成(稱為segment).segmentfile的命名為"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始訊息的offset.
其中每個partiton中所持有的segments列表資訊會儲存在zookeeper中.
當segment檔案尺寸達到一定閥值時(可以通過配置檔案設定,預設1G),將會建立一個新的檔案;當buffer中訊息的條數達到閥值時將會觸發日誌資訊flush到日誌檔案中,同時如果"距離最近一次flush的時間差"達到閥值時,也會觸發flush到日誌檔案.如果broker失效,極有可能會丟失那些尚未flush到檔案的訊息.因為server意外實現,仍然會導致log檔案格式的破壞(檔案尾部),那麼就要求當server啟東是需要檢測最後一個segment的檔案結構是否合法並進行必要的修復.
獲取訊息時,需要指定offset和最大chunk尺寸,offset用來表示訊息的起始位置,chunk size用來表示最大獲取訊息的總長度(間接的表示訊息的條數).根據offset,可以找到此訊息所在segment檔案,然後根據segment的最小offset取差值,得到它在file中的相對位置,直接讀取輸出即可.
日誌檔案的刪除策略非常簡單:啟動一個後臺執行緒定期掃描log file列表,把儲存時間超過閥值的檔案直接刪除(根據檔案的建立時間).為了避免刪除檔案時仍然有read操作(consumer消費),採取copy-on-write方式.
3.8、kafka與zookeeper之間的聯絡
-
kafka使用zookeeper來實現動態的叢集擴充套件,不需要更改客戶端(producer和consumer)的配置。broker會在zookeeper註冊並保持相關的元資料(topic,partition資訊等)更新。
-
而客戶端會在zookeeper上註冊相關的watcher。一旦zookeeper發生變化,客戶端能及時感知並作出相應調整。這樣就保證了新增或去除broker時,各broker間仍能自動實現負載均衡。
kafka使用zookeeper來儲存一些meta資訊,並使用了zookeeper watch機制來發現meta資訊的變更並作出相應的動作(比如consumer失效,觸發負載均衡等)
- Broker node registry: 當一個kafkabroker啟動後,首先會向zookeeper註冊自己的節點資訊(臨時znode),同時當broker和zookeeper斷開連線時,此znode也會被刪除.
格式: /broker/ids/[0...N] -->host:port;其中[0..N]表示broker id,每個broker的配置檔案中都需要指定一個數字型別的id(全域性不可重複),znode的值為此broker的host:port資訊.
- Broker Topic Registry: 當一個broker啟動時,會向zookeeper註冊自己持有的topic和partitions資訊,仍然是一個臨時znode.
格式: /broker/topics/[topic]/[0...N] 其中[0..N]表示partition索引號.
- Consumer and Consumer group: 每個consumer客戶端被建立時,會向zookeeper註冊自己的資訊;此作用主要是為了"負載均衡".
一個group中的多個consumer可以交錯的消費一個topic的所有partitions;簡而言之,保證此topic的所有partitions都能被此group所消費,且消費時為了效能考慮,讓partition相對均衡的分散到每個consumer上.
- Consumer id Registry: 每個consumer都有一個唯一的ID(host:uuid,可以通過配置檔案指定,也可以由系統生成),此id用來標記消費者資訊.
格式:/consumers/[group_id]/ids/[consumer_id]
仍然是一個臨時的znode,此節點的值為{"topic_name":#streams...},即表示此consumer目前所消費的topic + partitions列表.
- Consumer offset Tracking: 用來跟蹤每個consumer目前所消費的partition中最大的offset.
格式:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]-->offset_value
此znode為持久節點,可以看出offset跟group_id有關,以表明當group中一個消費者失效,其他consumer可以繼續消費.
- Partition Owner registry: 用來標記partition被哪個consumer消費.臨時znode
格式:/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]-->consumer_node_id當consumer啟動時,所觸發的操作:
A) 首先進行"Consumer id Registry";
B) 然後在"Consumer id Registry"節點下注冊一個watch用來監聽當前group中其他consumer的"leave"和"join";只要此znode path下節點列表變更,都會觸發此group下consumer的負載均衡.(比如一個consumer失效,那麼其他consumer接管partitions).
C) 在"Broker id registry"節點下,註冊一個watch用來監聽broker的存活情況;如果broker列表變更,將會觸發所有的groups下的consumer重新balance.
-
Producer端使用zookeeper用來"發現"broker列表,以及和Topic下每個partition leader建立socket連線併發送訊息.
-
Broker端使用zookeeper用來註冊broker資訊,已經監測partitionleader存活性.
-
Consumer端使用zookeeper用來註冊consumer資訊,其中包括consumer消費的partition列表等,同時也用來發現broker列表,並和partition leader建立socket連線,並獲取訊息.
4、主要配置
4.1、Broker配置
4.2.Consumer主要配置
4.3.Producer主要配置
以上是關於kafka一些基礎說明,在其中我們知道如果要kafka正常執行,必須配置zookeeper,否則無論是kafka叢集還是客戶端的生產者和消費者都無法正常的工作的