分散式釋出訂閱訊息系統—Apache Kafka
1.什麼是Kafka
Kafka是一種高吞吐量的分散式釋出訂閱訊息系統,它可以處理消費者規模的網站中的所有動作流資料。 這種動作(網頁瀏覽,搜尋和其他使用者的行動)是在現代網路上的許多社會功能的一個關鍵因素。 這些資料通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。 對於像Hadoop的一樣的日誌資料和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行載入機制來統一線上和離線的訊息處理,也是為了通過叢集來提供實時的消費。
2.特性
Kafka[1]是一種高吞吐量[2]的分散式釋出訂閱訊息系統,有如下特性:
- 通過O(1)的磁碟資料結構提供訊息的持久化,這種結構對於即使數以TB的訊息儲存也能夠保持長時間的穩定效能。
- 高吞吐量[2]:即使是非常普通的硬體Kafka也可以支援每秒數百萬[2]的訊息。
- 支援通過Kafka伺服器和消費機叢集來分割槽訊息。
- 支援Hadoop並行資料載入。[3]
3.Kafka相關術語介紹
- Broker Kafka叢集包含一個或多個伺服器,這種伺服器被稱為broker[5]
- Topic 每條釋出到Kafka叢集的訊息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的訊息分開儲存,邏輯上一個Topic的訊息雖然保存於一個或多個broker上但使用者只需指定訊息的Topic即可生產或消費資料而不必關心資料存於何處)
- Partition Partition是物理上的概念,每個Topic包含一個或多個Partition.
- Producer 負責釋出訊息到Kafka broker
- Consumer 訊息消費者,向Kafka broker讀取訊息的客戶端。
- Consumer Group 每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於預設的group)。
4.瞭解更多專業知識
5.kafka入門:簡介、使用場景、設計原理、主要配置及叢集搭建
題導讀:
1.zookeeper在kafka的作用是什麼?一、入門 1、簡介 Kafka is a distributed,partitioned,replicated commit logservice。它提供了類似於JMS的特性,但是在設計實現上完全不同,此外它並不是JMS規範的實現。kafka對訊息儲存時根據Topic進行歸類,傳送訊息者成為Producer,訊息接受者成為Consumer,此外kafka叢集有多個kafka例項組成,每個例項(server)成為broker。無論是kafka叢集,還是producer和consumer都依賴於zookeeper來保證系統可用性叢集儲存一些meta資訊。
2、Topics/logs 一個Topic可以認為是一類訊息,每個topic將被分成多個partition(區),每個partition在儲存層面是append log檔案。任何釋出到此partition的訊息都會被直接追加到log檔案的尾部,每條訊息在檔案中的位置稱為offset(偏移量),offset為一個long型數字,它是唯一標記一條訊息。它唯一的標記一條訊息。kafka並沒有提供其他額外的索引機制來儲存offset,因為在kafka中幾乎不允許對訊息進行“隨機讀寫”。
kafka和JMS(Java Message Service)實現(activeMQ)不同的是:即使訊息被消費,訊息仍然不會被立即刪除.日誌檔案將會根據broker中的配置要求,保留一定的時間之後刪除;比如log檔案保留2天,那麼兩天後,檔案會被清除,無論其中的訊息是否被消費.kafka通過這種簡單的手段,來釋放磁碟空間,以及減少訊息消費之後對檔案內容改動的磁碟IO開支. 對於consumer而言,它需要儲存消費訊息的offset,對於offset的儲存和使用,有consumer來控制;當consumer正常消費訊息時,offset將會"線性"的向前驅動,即訊息將依次順序被消費.事實上consumer可以使用任意順序消費訊息,它只需要將offset重置為任意值..(offset將會儲存在zookeeper中,參見下文) kafka叢集幾乎不需要維護任何consumer和producer狀態資訊,這些資訊有zookeeper儲存;因此producer和consumer的客戶端實現非常輕量級,它們可以隨意離開,而不會對叢集造成額外的影響. partitions的設計目的有多個.最根本原因是kafka基於檔案儲存.通過分割槽,可以將日誌內容分散到多個server上,來避免檔案尺寸達到單機磁碟的上限,每個partiton都會被當前server(kafka例項)儲存;可以將一個topic切分多任意多個partitions,來訊息儲存/消費的效率.此外越多的partitions意味著可以容納更多的consumer,有效提升併發消費的能力.(具體原理參見下文). 3、Distribution 一個Topic的多個partitions,被分佈在kafka叢集中的多個server上;每個server(kafka例項)負責partitions中訊息的讀寫操作;此外kafka還可以配置partitions需要備份的個數(replicas),每個partition將會被備份到多臺機器上,以提高可用性. 基於replicated方案,那麼就意味著需要對多個備份進行排程;每個partition都有一個server為"leader";leader負責所有的讀寫操作,如果leader失效,那麼將會有其他follower來接管(成為新的leader);follower只是單調的和leader跟進,同步訊息即可..由此可見作為leader的server承載了全部的請求壓力,因此從叢集的整體考慮,有多少個partitions就意味著有多少個"leader",kafka會將"leader"均衡的分散在每個例項上,來確保整體的效能穩定. Producers Producer將訊息釋出到指定的Topic中,同時Producer也能決定將此訊息歸屬於哪個partition;比如基於"round-robin"方式或者通過其他的一些演算法等. Consumers 本質上kafka只支援Topic.每個consumer屬於一個consumer group;反過來說,每個group中可以有多個consumer.傳送到Topic的訊息,只會被訂閱此Topic的每個group中的一個consumer消費. 如果所有的consumer都具有相同的group,這種情況和queue模式很像;訊息將會在consumers之間負載均衡. 如果所有的consumer都具有不同的group,那這就是"釋出-訂閱";訊息將會廣播給所有的消費者. 在kafka中,一個partition中的訊息只會被group中的一個consumer消費;每個group中consumer訊息消費互相獨立;我們可以認為一個group是一個"訂閱"者,一個Topic中的每個partions,只會被一個"訂閱者"中的一個consumer消費,不過一個consumer可以消費多個partitions中的訊息.kafka只能保證一個partition中的訊息被某個consumer消費時,訊息是順序的.事實上,從Topic角度來說,訊息仍不是有序的. kafka的設計原理決定,對於一個topic,同一個group中不能有多於partitions個數的consumer同時消費,否則將意味著某些consumer將無法得到訊息. Guarantees 1) 傳送到partitions中的訊息將會按照它接收的順序追加到日誌中 2) 對於消費者而言,它們消費訊息的順序和日誌中訊息順序一致. 3) 如果Topic的"replicationfactor"為N,那麼允許N-1個kafka例項失效. 二、使用場景 1、Messaging 對於一些常規的訊息系統,kafka是個不錯的選擇;partitons/replication和容錯,可以使kafka具有良好的擴充套件性和效能優勢.不過到目前為止,我們應該很清楚認識到,kafka並沒有提供JMS中的"事務性""訊息傳輸擔保(訊息確認機制)""訊息分組"等企業級特性;kafka只能使用作為"常規"的訊息系統,在一定程度上,尚未確保訊息的傳送與接收絕對可靠(比如,訊息重發,訊息傳送丟失等) 2、Websit activity tracking kafka可以作為"網站活性跟蹤"的最佳工具;可以將網頁/使用者操作等資訊傳送到kafka中.並實時監控,或者離線統計分析等
3、Log Aggregation kafka的特性決定它非常適合作為"日誌收集中心";application可以將操作日誌"批量""非同步"的傳送到kafka叢集中,而不是儲存在本地或者DB中;kafka可以批量提交訊息/壓縮訊息等,這對producer端而言,幾乎感覺不到效能的開支.此時consumer端可以使hadoop等其他系統化的儲存和分析系統. 三、設計原理 kafka的設計初衷是希望作為一個統一的資訊收集平臺,能夠實時的收集反饋資訊,並需要能夠支撐較大的資料量,且具備良好的容錯能力. 1、永續性 kafka使用檔案儲存訊息,這就直接決定kafka在效能上嚴重依賴檔案系統的本身特性.且無論任何OS下,對檔案系統本身的優化幾乎沒有可能.檔案快取/直接記憶體對映等是常用的手段.因為kafka是對日誌檔案進行append操作,因此磁碟檢索的開支是較小的;同時為了減少磁碟寫入的次數,broker會將訊息暫時buffer起來,當訊息的個數(或尺寸)達到一定閥值時,再flush到磁碟,這樣減少了磁碟IO呼叫的次數.
2、效能 需要考慮的影響效能點很多,除磁碟IO之外,我們還需要考慮網路IO,這直接關係到kafka的吞吐量問題.kafka並沒有提供太多高超的技巧;對於producer端,可以將訊息buffer起來,當訊息的條數達到一定閥值時,批量傳送給broker;對於consumer端也是一樣,批量fetch多條訊息.不過訊息量的大小可以通過配置檔案來指定.對於kafka broker端,似乎有個sendfile系統呼叫可以潛在的提升網路IO的效能:將檔案的資料對映到系統記憶體中,socket直接讀取相應的記憶體區域即可,而無需程序再次copy和交換. 其實對於producer/consumer/broker三者而言,CPU的開支應該都不大,因此啟用訊息壓縮機制是一個良好的策略;壓縮需要消耗少量的CPU資源,不過對於kafka而言,網路IO更應該需要考慮.可以將任何在網路上傳輸的訊息都經過壓縮.kafka支援gzip/snappy等多種壓縮方式. 3、生產者 負載均衡: producer將會和Topic下所有partition leader保持socket連線;訊息由producer直接通過socket傳送到broker,中間不會經過任何"路由層".事實上,訊息被路由到哪個partition上,有producer客戶端決定.比如可以採用"random""key-hash""輪詢"等,如果一個topic中有多個partitions,那麼在producer端實現"訊息均衡分發"是必要的. 其中partition leader的位置(host:port)註冊在zookeeper中,producer作為zookeeper client,已經註冊了watch用來監聽partition leader的變更事件. 非同步傳送:將多條訊息暫且在客戶端buffer起來,並將他們批量的傳送到broker,小資料IO太多,會拖慢整體的網路延遲,批量延遲傳送事實上提升了網路效率。不過這也有一定的隱患,比如說當producer失效時,那些尚未傳送的訊息將會丟失。
4、消費者 consumer端向broker傳送"fetch"請求,並告知其獲取訊息的offset;此後consumer將會獲得一定條數的訊息;consumer端也可以重置offset來重新消費訊息. 在JMS實現中,Topic模型基於push方式,即broker將訊息推送給consumer端.不過在kafka中,採用了pull方式,即consumer在和broker建立連線之後,主動去pull(或者說fetch)訊息;這中模式有些優點,首先consumer端可以根據自己的消費能力適時的去fetch訊息並處理,且可以控制訊息消費的進度(offset);此外,消費者可以良好的控制訊息消費的數量,batch fetch. 其他JMS實現,訊息消費的位置是有prodiver保留,以便避免重複傳送訊息或者將沒有消費成功的訊息重發等,同時還要控制訊息的狀態.這就要求JMS broker需要太多額外的工作.在kafka中,partition中的訊息只有一個consumer在消費,且不存在訊息狀態的控制,也沒有複雜的訊息確認機制,可見kafka broker端是相當輕量級的.當訊息被consumer接收之後,consumer可以在本地儲存最後訊息的offset,並間歇性的向zookeeper註冊offset.由此可見,consumer客戶端也很輕量級.
5、訊息傳送機制 對於JMS實現,訊息傳輸擔保非常直接:有且只有一次(exactly once).在kafka中稍有不同: 1) at most once: 最多一次,這個和JMS中"非持久化"訊息類似.傳送一次,無論成敗,將不會重發. 2) at least once: 訊息至少傳送一次,如果訊息未能接受成功,可能會重發,直到接收成功. 3) exactly once: 訊息只會傳送一次. at most once: 消費者fetch訊息,然後儲存offset,然後處理訊息;當client儲存offset之後,但是在訊息處理過程中出現了異常,導致部分訊息未能繼續處理.那麼此後"未處理"的訊息將不能被fetch到,這就是"at most once". at least once: 消費者fetch訊息,然後處理訊息,然後儲存offset.如果訊息處理成功之後,但是在儲存offset階段zookeeper異常導致儲存操作未能執行成功,這就導致接下來再次fetch時可能獲得上次已經處理過的訊息,這就是"at least once",原因offset沒有及時的提交給zookeeper,zookeeper恢復正常還是之前offset狀態. exactly once: kafka中並沒有嚴格的去實現(基於2階段提交,事務),我們認為這種策略在kafka中是沒有必要的. 通常情況下"at-least-once"是我們搜選.(相比at most once而言,重複接收資料總比丟失資料要好). 6、複製備份 kafka將每個partition資料複製到多個server上,任何一個partition有一個leader和多個follower(可以沒有);備份的個數可以通過broker配置檔案來設定.leader處理所有的read-write請求,follower需要和leader保持同步.Follower和consumer一樣,消費訊息並儲存在本地日誌中;leader負責跟蹤所有的follower狀態,如果follower"落後"太多或者失效,leader將會把它從replicas同步列表中刪除.當所有的follower都將一條訊息儲存成功,此訊息才被認為是"committed",那麼此時consumer才能消費它.即使只有一個replicas例項存活,仍然可以保證訊息的正常傳送和接收,只要zookeeper叢集存活即可.(不同於其他分散式儲存,比如hbase需要"多數派"存活才行) 當leader失效時,需在followers中選取出新的leader,可能此時follower落後於leader,因此需要選擇一個"up-to-date"的follower.選擇follower時需要兼顧一個問題,就是新leaderserver上所已經承載的partition leader的個數,如果一個server上有過多的partition leader,意味著此server將承受著更多的IO壓力.在選舉新leader,需要考慮到"負載均衡". 7.日誌 如果一個topic的名稱為"my_topic",它有2個partitions,那麼日誌將會儲存在my_topic_0和my_topic_1兩個目錄中;日誌檔案中儲存了一序列"log entries"(日誌條目),每個log entry格式為"4個位元組的數字N表示訊息的長度" + "N個位元組的訊息內容";每個日誌都有一個offset來唯一的標記一條訊息,offset的值為8個位元組的數字,表示此訊息在此partition中所處的起始位置..每個partition在物理儲存層面,有多個log file組成(稱為segment).segmentfile的命名為"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始訊息的offset.
其中每個partiton中所持有的segments列表資訊會儲存在zookeeper中. 當segment檔案尺寸達到一定閥值時(可以通過配置檔案設定,預設1G),將會建立一個新的檔案;當buffer中訊息的條數達到閥值時將會觸發日誌資訊flush到日誌檔案中,同時如果"距離最近一次flush的時間差"達到閥值時,也會觸發flush到日誌檔案.如果broker失效,極有可能會丟失那些尚未flush到檔案的訊息.因為server意外實現,仍然會導致log檔案格式的破壞(檔案尾部),那麼就要求當server啟東是需要檢測最後一個segment的檔案結構是否合法並進行必要的修復. 獲取訊息時,需要指定offset和最大chunk尺寸,offset用來表示訊息的起始位置,chunk size用來表示最大獲取訊息的總長度(間接的表示訊息的條數).根據offset,可以找到此訊息所在segment檔案,然後根據segment的最小offset取差值,得到它在file中的相對位置,直接讀取輸出即可. 日誌檔案的刪除策略非常簡單:啟動一個後臺執行緒定期掃描log file列表,把儲存時間超過閥值的檔案直接刪除(根據檔案的建立時間).為了避免刪除檔案時仍然有read操作(consumer消費),採取copy-on-write方式. 8、分配 kafka使用zookeeper來儲存一些meta資訊,並使用了zookeeper watch機制來發現meta資訊的變更並作出相應的動作(比如consumer失效,觸發負載均衡等) 1) Broker node registry: 當一個kafkabroker啟動後,首先會向zookeeper註冊自己的節點資訊(臨時znode),同時當broker和zookeeper斷開連線時,此znode也會被刪除. 格式: /broker/ids/[0...N] -->host:port;其中[0..N]表示broker id,每個broker的配置檔案中都需要指定一個數字型別的id(全域性不可重複),znode的值為此broker的host:port資訊. 2) Broker Topic Registry: 當一個broker啟動時,會向zookeeper註冊自己持有的topic和partitions資訊,仍然是一個臨時znode. 格式: /broker/topics/[topic]/[0...N] 其中[0..N]表示partition索引號. 3) Consumer and Consumer group: 每個consumer客戶端被建立時,會向zookeeper註冊自己的資訊;此作用主要是為了"負載均衡". 一個group中的多個consumer可以交錯的消費一個topic的所有partitions;簡而言之,保證此topic的所有partitions都能被此group所消費,且消費時為了效能考慮,讓partition相對均衡的分散到每個consumer上. 4) Consumer id Registry: 每個consumer都有一個唯一的ID(host:uuid,可以通過配置檔案指定,也可以由系統生成),此id用來標記消費者資訊. 格式:/consumers/[group_id]/ids/[consumer_id] 仍然是一個臨時的znode,此節點的值為{"topic_name":#streams...},即表示此consumer目前所消費的topic + partitions列表. 5) Consumer offset Tracking: 用來跟蹤每個consumer目前所消費的partition中最大的offset. 格式:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]-->offset_value 此znode為持久節點,可以看出offset跟group_id有關,以表明當group中一個消費者失效,其他consumer可以繼續消費. 6) Partition Owner registry: 用來標記partition被哪個consumer消費.臨時znode 格式:/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]-->consumer_node_id當consumer啟動時,所觸發的操作: A) 首先進行"Consumer id Registry"; B) 然後在"Consumer id Registry"節點下注冊一個watch用來監聽當前group中其他consumer的"leave"和"join";只要此znode path下節點列表變更,都會觸發此group下consumer的負載均衡.(比如一個consumer失效,那麼其他consumer接管partitions). C) 在"Broker id registry"節點下,註冊一個watch用來監聽broker的存活情況;如果broker列表變更,將會觸發所有的groups下的consumer重新balance.
1) Producer端使用zookeeper用來"發現"broker列表,以及和Topic下每個partition leader建立socket連線併發送訊息. 2) Broker端使用zookeeper用來註冊broker資訊,已經監測partitionleader存活性. 3) Consumer端使用zookeeper用來註冊consumer資訊,其中包括consumer消費的partition列表等,同時也用來發現broker列表,並和partition leader建立socket連線,並獲取訊息. 四、主要配置 1、Broker配置
2.Consumer主要配置
3.Producer主要配置
以上是關於kafka一些基礎說明,在其中我們知道如果要kafka正常執行,必須配置zookeeper,否則無論是kafka叢集還是客戶端的生存者和消費者都無法正常的工作的,以下是對zookeeper進行一些簡單的介紹:
五、zookeeper叢集 zookeeper是一個為分散式應用提供一致性服務的軟體,它是開源的Hadoop專案的一個子專案,並根據google發表的一篇論文來實現的。zookeeper為分散式系統提供了高笑且易於使用的協同服務,它可以為分散式應用提供相當多的服務,諸如統一命名服務,配置管理,狀態同步和組服務等。zookeeper介面簡單,我們不必過多地糾結在分散式系統程式設計難於處理的同步和一致性問題上,你可以使用zookeeper提供的現成(off-the-shelf)服務來實現來實現分散式系統額配置管理,組管理,Leader選舉等功能。 zookeeper叢集的安裝,準備三臺伺服器server1:192.168.0.1,server2:192.168.0.2, server3:192.168.0.3. 1)下載zookeeper 到http://zookeeper.apache.org/releases.html去下載最新版本Zookeeper-3.4.5的安裝包zookeeper-3.4.5.tar.gz.將檔案儲存server1的~目錄下 2)安裝zookeeper 先在伺服器server分別執行a-c步驟 a)解壓 tar -zxvf zookeeper-3.4.5.tar.gz 解壓完成後在目錄~下會發現多出一個目錄zookeeper-3.4.5,重新命令為zookeeper b)配置 將conf/zoo_sample.cfg拷貝一份命名為zoo.cfg,也放在conf目錄下。然後按照如下值修改其中的配置: # The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/home/wwb/zookeeper /data dataLogDir=/home/wwb/zookeeper/logs # the port at which the clients will connect clientPort=2181 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 server.1=192.168.0.1:3888:4888 server.2=192.168.0.2:3888:4888 server.3=192.168.0.3:3888:4888 tickTime:這個時間是作為 Zookeeper 伺服器之間或客戶端與伺服器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發送一個心跳。 dataDir:顧名思義就是 Zookeeper 儲存資料的目錄,預設情況下,Zookeeper 將寫資料的日誌檔案也儲存在這個目錄裡。 clientPort:這個埠就是客戶端連線 Zookeeper 伺服器的埠,Zookeeper 會監聽這個埠,接受客戶端的訪問請求。 initLimit:這個配置項是用來配置 Zookeeper 接受客戶端(這裡所說的客戶端不是使用者連線 Zookeeper 伺服器的客戶端,而是 Zookeeper 伺服器叢集中連線到 Leader 的 Follower 伺服器)初始化連線時最長能忍受多少個心跳時間間隔數。當已經超過 5個心跳的時間(也就是 tickTime)長度後 Zookeeper 伺服器還沒有收到客戶端的返回資訊,那麼表明這個客戶端連線失敗。總的時間長度就是 5*2000=10 秒 syncLimit:這個配置項標識 Leader 與Follower 之間傳送訊息,請求和應答時間長度,最長不能超過多少個 tickTime 的時間長度,總的時間長度就是2*2000=4 秒 server.A=B:C:D:其中 A 是一個數字,表示這個是第幾號伺服器;B 是這個伺服器的 ip 地址;C 表示的是這個伺服器與叢集中的 Leader 伺服器交換資訊的埠;D 表示的是萬一叢集中的 Leader 伺服器掛了,需要一個埠來重新進行選舉,選出一個新的 Leader,而這個埠就是用來執行選舉時伺服器相互通訊的埠。如果是偽叢集的配置方式,由於 B 都是一樣,所以不同的 Zookeeper 例項通訊埠號不能一樣,所以要給它們分配不同的埠號 注意:dataDir,dataLogDir中的wwb是當前登入使用者名稱,data,logs目錄開始是不存在,需要使用mkdir命令建立相應的目錄。並且在該目錄下建立檔案myid,serve1,server2,server3該檔案內容分別為1,2,3。 針對伺服器server2,server3可以將server1複製到相應的目錄,不過需要注意dataDir,dataLogDir目錄,並且檔案myid內容分別為2,3 3)依次啟動server1,server2,server3的zookeeper. /home/wwb/zookeeper/bin/zkServer.sh start,出現類似以下內容 JMX enabled by default Using config: /home/wwb/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... STARTED 4) 測試zookeeper是否正常工作,在server1上執行以下命令 /home/wwb/zookeeper/bin/zkCli.sh -server192.168.0.2:2181,出現類似以下內容 JLine support is enabled 2013-11-27 19:59:40,560 - INFO [main-SendThread(localhost.localdomain:2181):[email protected]]- Session establishmentcomplete on server localhost.localdomain/127.0.0.1:2181, sessionid = 0x1429cdb49220000, negotiatedtimeout = 30000 WATCHER:: WatchedEvent state:SyncConnected type:None path:null [zk: 127.0.0.1:2181(CONNECTED) 0] [[email protected]]# 即代表叢集構建成功了,如果出現錯誤那應該是第三部時沒有啟動好叢集, 執行,先利用 ps aux | grep zookeeper檢視是否有相應的程序的,沒有話,說明叢集啟動出現問題,可以在每個伺服器上使用 ./home/wwb/zookeeper/bin/zkServer.sh stop。再依次使用./home/wwb/zookeeper/binzkServer.sh start,這時在執行4一般是沒有問題,如果還是有問題,那麼先stop再到bin的上級目錄執行./bin/zkServer.shstart試試。 注意:zookeeper叢集時,zookeeper要求半數以上的機器可用,zookeeper才能提供服務。 六、kafka叢集 (利用上面server1,server2,server3,下面以server1為例項) 1)下載kafka0.8(http://kafka.apache.org/downloads.html),儲存到伺服器/home/wwb目錄下kafka-0.8.0-beta1-src.tgz(kafka_2.8.0-0.8.0-beta1.tgz) 2)解壓 tar -zxvf kafka-0.8.0-beta1-src.tgz,產生資料夾kafka-0.8.0-beta1-src更改為kafka01 3)配置 修改kafka01/config/server.properties,其中broker.id,log.dirs,zookeeper.connect必須根據實際情況進行修改,其他項根據需要自行斟酌。大致如下: broker.id=1 port=9091 num.network.threads=2 num.io.threads=2 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dir=./logs num.partitions=2 log.flush.interval.messages=10000 log.flush.interval.ms=1000 log.retention.hours=168 #log.retention.bytes=1073741824 log.segment.bytes=536870912 num.replica.fetchers=2 log.cleanup.interval.mins=10 zookeeper.connect=192.168.0.1:2181,192.168.0.2:2182,192.168.0.3:2183 zookeeper.connection.timeout.ms=1000000 kafka.metrics.polling.interval.secs=5 kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter kafka.csv.metrics.dir=/tmp/kafka_metrics kafka.csv.metrics.reporter.enabled=false 4)初始化因為kafka用scala語言編寫,因此執行kafka需要首先準備scala相關環境。 > cd kafka01 > ./sbt update > ./sbt package > ./sbt assembly-package-dependency 在第二個命令時可能需要一定時間,由於要下載更新一些依賴包。所以請大家 耐心點。 5) 啟動kafka01 >JMX_PORT=9997 bin/kafka-server-start.sh config/server.properties & a)kafka02操作步驟與kafka01雷同,不同的地方如下 修改kafka02/config/server.properties broker.id=2 port=9092 ##其他配置和kafka-0保持一致 啟動kafka02 JMX_PORT=9998 bin/kafka-server-start.shconfig/server.properties & b)kafka03操作步驟與kafka01雷同,不同的地方如下 修改kafka03/config/server.properties broker.id=3 port=9093 ##其他配置和kafka-0保持一致 啟動kafka02 JMX_PORT=9999 bin/kafka-server-start.shconfig/server.properties & 6)建立Topic(包含一個分割槽,三個副本) >bin/kafka-create-topic.sh--zookeeper 192.168.0.1:2181 --replica 3 --partition 1 --topicmy-replicated-topic 7)檢視topic情況 >bin/kafka-list-top.sh --zookeeper 192.168.0.1:2181 topic: my-replicated-topic partition: 0 leader: 1 replicas: 1,2,0 isr: 1,2,0 8)建立傳送者 >bin/kafka-console-producer.sh--broker-list 192.168.0.1:9091 --topic my-replicated-topic my test message1 my test message2 ^C 9)建立消費者 >bin/kafka-console-consumer.sh --zookeeper127.0.0.1:2181 --from-beginning --topic my-replicated-topic ... my test message1 my test message2 ^C 10)殺掉server1上的broker >pkill -9 -f config/server.properties 11)檢視topic >bin/kafka-list-top.sh --zookeeper192.168.0.1:2181 topic: my-replicated-topic partition: 0 leader: 1 replicas: 1,2,0 isr: 1,2,0 發現topic還正常的存在 11)建立消費者,看是否能查詢到訊息 >bin/kafka-console-consumer.sh --zookeeper192.168.0.1:2181 --from-beginning --topic my-replicated-topic ... my test message 1 my test message 2 ^C 說明一切都是正常的。 OK,以上就是對Kafka個人的理解,不對之處請大家及時指出。 補充說明: 1、public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap),其中該方法的引數Map的key為topic名稱,value為topic對應的分割槽數,譬如說如果在kafka中不存在相應的topic時,則會建立一個topic,分割槽數為value,如果存在的話,該處的value則不起什麼作用
2、關於生產者向指定的分割槽傳送資料,通過設定partitioner.class的屬性來指定向那個分割槽傳送資料,如果自己指定必須編寫相應的程式,預設是kafka.producer.DefaultPartitioner,分割槽程式是基於雜湊的鍵。
3、在多個消費者讀取同一個topic的資料,為了保證每個消費者讀取資料的唯一性,必須將這些消費者group_id定義為同一個值,這樣就構建了一個類似佇列的資料結構,如果定義不同,則類似一種廣播結構的。
4、在consumerapi中,引數設計到數字部分,類似Map<String,Integer>, numStream,指的都是在topic不存在的時,會建立一個topic,並且分割槽個數為Integer,numStream,注意如果數字大於broker的配置中num.partitions屬性,會以num.partitions為依據建立分割槽個數的。
5、producerapi,呼叫send時,如果不存在topic,也會建立topic,在該方法中沒有提供分割槽個數的引數,在這裡分割槽個數是由服務端broker的配置中num.partitions屬性決定的
6.分散式訊息系統
Kafka是分散式釋出-訂閱訊息系統。它最初由LinkedIn公司開發,之後成為Apache專案的一部分。Kafka是一個分散式的,可劃分的,冗餘備份的永續性的日誌服務。它主要用於處理活躍的流式資料。
在大資料系統中,常常會碰到一個問題,整個大資料是由各個子系統組成,資料需要在各個子系統中高效能,低延遲的不停流轉。傳統的企業訊息系統並不是非常適合大規模的資料處理。為了已在同時搞定線上應用(訊息)和離線應用(資料檔案,日誌)Kafka就出現了。Kafka可以起到兩個作用:
- 降低系統組網複雜度。
- 降低程式設計複雜度,各個子系統不在是相互協商介面,各個子系統類似插口插在插座上,Kafka承擔高速資料匯流排的作用。
Kafka主要特點:
- 同時為釋出和訂閱提供高吞吐量。據瞭解,Kafka每秒可以生產約25萬訊息(50 MB),每秒處理55萬訊息(110 MB)。
- 可進行持久化操作。將訊息持久化到磁碟,因此可用於批量消費,例如ETL,以及實時應用程式。通過將資料持久化到硬碟以及replication防止資料丟失。
- 分散式系統,易於向外擴充套件。所有的producer、broker和consumer都會有多個,均為分散式的。無需停機即可擴充套件機器。
- 訊息被處理的狀態是在consumer端維護,而不是由server端維護。當失敗時能自動平衡。
- 支援online和offline的場景。
Kafka的架構:
Kafka的整體架構非常簡單,是顯式分散式架構,producer、broker(kafka)和consumer都可以有多個。Producer,consumer實現Kafka註冊的介面,資料從producer傳送到broker,broker承擔一箇中間快取和分發的作用。broker分發註冊到系統中的consumer。broker的作用類似於快取,即活躍的資料和離線處理系統之間的快取。客戶端和伺服器端的通訊,是基於簡單,高效能,且與程式語言無關的TCP協議。幾個基本概念:
- Topic:特指Kafka處理的訊息源(feeds of messages)的不同分類。
- Partition:Topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的佇列。partition中的每條訊息都會被分配一個有序的id(offset)。
- Message:訊息,是通訊的基本單位,每個producer可以向一個topic(主題)釋出一些訊息。
- Producers:訊息和資料生產者,向Kafka的一個topic釋出訊息的過程叫做producers。
- Consumers:訊息和資料消費者,訂閱topics並處理其釋出的訊息的過程叫做consumers。
- Broker:快取代理,Kafa叢集中的一臺或多臺伺服器統稱為broker。
訊息傳送的流程:
- Producer根據指定的partition方法(round-robin、hash等),將訊息釋出到指定topic的partition裡面
- kafka叢集接收到Producer發過來的訊息後,將其持久化到硬碟,並保留訊息指定時長(可配置),而不關注訊息是否被消費。
- Consumer從kafka叢集pull資料,並控制獲取訊息的offset
Kafka的設計:
1、吞吐量
高吞吐是kafka需要實現的核心目標之一,為此kafka做了以下一些設計:
- 資料磁碟持久化:訊息不在記憶體中cache,直接寫入到磁碟,充分利用磁碟的順序讀寫效能
- zero-copy:減少IO操作步驟
- 資料批量傳送
- 資料壓縮
- Topic劃分為多個partition,提高parallelism
負載均衡
- producer根據使用者指定的演算法,將訊息傳送到指定的partition
- 存在多個partiiton,每個partition有自己的replica,每個replica分佈在不同的Broker節點上
- 多個partition需要選取出lead partition,lead partition負責讀寫,並由zookeeper負責fail over
- 通過zookeeper管理broker與consumer的動態加入與離開
拉取系統
由於kafka broker會持久化資料,broker沒有記憶體壓力,因此,consumer非常適合採取pull的方式消費資料,具有以下幾點好處:
- 簡化kafka設計
- consumer根據消費能力自主控制訊息拉取速度
- consumer根據自身情況自主選擇消費模式,例如批量,重複消費,從尾端開始消費等
可擴充套件性
當需要增加broker結點時,新增的broker會向zookeeper註冊,而producer及consumer會根據註冊在zookeeper上的watcher感知這些變化,並及時作出調整。
Kayka的應用場景:
1.訊息佇列
比起大多數的訊息系統來說,Kafka有更好的吞吐量,內建的分割槽,冗餘及容錯性,這讓Kafka成為了一個很好的大規模訊息處理應用的解決方案。訊息系統一般吞吐量相對較低,但是需要更小的端到端延時,並嚐嚐依賴於Kafka提供的強大的永續性保障。在這個領域,Kafka足以媲美傳統訊息系統,如ActiveMR或RabbitMQ。
2.行為跟蹤
Kafka的另一個應用場景是跟蹤使用者瀏覽頁面、搜尋及其他行為,以釋出-訂閱的模式實時記錄到對應的topic裡。那麼這些結果被訂閱者拿到後,就可以做進一步的實時處理,或實時監控,或放到hadoop/離線資料倉庫裡處理。
3.元資訊監控
作為操作記錄的監控模組來使用,即彙集記錄一些操作資訊,可以理解為運維性質的資料監控吧。
4.日誌收集
日誌收集方面,其實開源產品有很多,包括Scribe、Apache Flume。很多人使用Kafka代替日誌聚合(log aggregation)。日誌聚合一般來說是從伺服器上收集日誌檔案,然後放到一個集中的位置(檔案伺服器或HDFS)進行處理。然而Kafka忽略掉檔案的細節,將其更清晰地抽象成一個個日誌或事件的訊息流。這就讓Kafka處理過程延遲更低,更容易支援多資料來源和分散式資料處理。比起以日誌為中心的系統比如Scribe或者Flume來說,Kafka提供同樣高效的效能和因為複製導致的更高的耐用性保證,以及更低的端到端延遲。
5.流處理
這個場景可能比較多,也很好理解。儲存收集流資料,以提供之後對接的Storm或其他流式計算框架進行處理。很多使用者會將那些從原始topic來的資料進行階段性處理,彙總,擴充或者以其他的方式轉換到新的topic下再繼續後面的處理。例如一個文章推薦的處理流程,可能是先從RSS資料來源中抓取文章的內容,然後將其丟入一個叫做“文章”的topic中;後續操作可能是需要對這個內容進行清理,比如回覆正常資料或者刪除重複資料,最後再將內容匹配的結果返還給使用者。這就在一個獨立的topic之外,產生了一系列的實時資料處理的流程。Strom和Samza是非常著名的實現這種型別資料轉換的框架。
6.事件源
事件源是一種應用程式設計的方式,該方式的狀態轉移被記錄為按時間順序排序的記錄序列。Kafka可以儲存大量的日誌資料,這使得它成為一個對這種方式的應用來說絕佳的後臺。比如動態彙總(News feed)。
7.永續性日誌(commit log)
Kafka可以為一種外部的永續性日誌的分散式系統提供服務。這種日誌可以在節點間備份資料,併為故障節點資料回覆提供一種重新同步的機制。Kafka中日誌壓縮功能為這種用法提供了條件。在這種用法中,Kafka類似於Apache BookKeeper專案。
Kayka的設計要點:
1、直接使用linux 檔案系統的cache,來高效快取資料。
2、採用linux Zero-Copy提高發送效能。傳統的資料傳送需要傳送4次上下文切換,採用sendfile系統呼叫之後,資料直接在核心態交換,系統上下文切換減少為2次。根據測試結果,可以提高60%的資料傳送效能。Zero-Copy詳細的技術細節可以參考:https://www.ibm.com/developerworks/linux/library/j-zerocopy/
3、資料在磁碟上存取代價為O(1)。kafka以topic來進行訊息管理,每個topic包含多個part(ition),每個part對應一個邏輯log,有多個segment組成。每個segment中儲存多條訊息(見下圖),訊息id由其邏輯位置決定,即從訊息id可直接定位到訊息的儲存位置,避免id到位置的額外對映。每個part在記憶體中對應一個index,記錄每個segment中的第一條訊息偏移。釋出者發到某個topic的訊息會被均勻的分佈到多個part上(隨機或根據使用者指定的回撥函式進行分佈),broker收到釋出訊息往對應part的最後一個segment上新增該訊息,當某個segment上的訊息條數達到配置值或訊息釋出時間超過閾值時,segment上的訊息會被flush到磁碟,只有flush到磁碟上的訊息訂閱者才能訂閱到,segment達到一定的大小後將不會再往該segment寫資料,broker會建立新的segment。
4、顯式分散式,即所有的producer、broker和consumer都會有多個,均為分散式的。Producer和broker之間沒有負載均衡機制。broker和consumer之間利用zookeeper進行負載均衡。所有broker和consumer都會在zookeeper中進行註冊,且zookeeper會儲存他們的一些元資料資訊。如果某個broker和consumer發生了變化,所有其他的broker和consumer都會得到通知。
參考資料:
本節文章來自:http://blog.jobbole.com/75328/7.Apache Kafka:下一代分散式訊息系統
簡介
Apache Kafka是分散式釋出-訂閱訊息系統。它最初由LinkedIn公司開發,之後成為Apache專案的一部分。Kafka是一種快速、可擴充套件的、設計內在就是分散式的,分割槽的和可複製的提交日誌服務。
Apache Kafka與傳統訊息系統相比,有以下不同:
- 它被設計為一個分散式系統,易於向外擴充套件;
- 它同時為釋出和訂閱提供高吞吐量;
- 它支援多訂閱者,當失敗時能自動平衡消費者;
- 它將訊息持久化到磁碟,因此可用於批量消費,例如ETL,以及實時應用程式。
本文我將重點介紹Apache Kafka的架構、特性和特點,幫助我們理解Kafka為何比傳統訊息服務更好。
相關贊助商
QCon北京2017,4月16-18日,北京·國家會議中心,精彩內容搶先看
我將比較Kafak和傳統訊息服務RabbitMQ、Apache ActiveMQ的特點,討論一些Kafka優於傳統訊息服務的場景。在最後一節,我們將探討一個進行中的示例應用,展示Kafka作為訊息伺服器的用途。這個示例應用的完整原始碼在GitHub。關於它的詳細討論在本文的最後一節。
架構
首先,我介紹一下Kafka的基本概念。它的架構包括以下元件:
- 話題(Topic)是特定型別的訊息流。訊息是位元組的有效負載(Payload),話題是訊息的分類名或種子(Feed)名。
- 生產者(Producer)是能夠釋出訊息到話題的任何物件。
- 已釋出的訊息儲存在一組伺服器中,它們被稱為代理(Broker)或Kafka叢集。
- 消費者可以訂閱一個或多個話題,並從Broker拉資料,從而消費這些已釋出的訊息。
圖1:Kafka生產者、消費者和代理環境
生產者可以選擇自己喜歡的序列化方法對訊息內容編碼。為了提高效率,生產者可以在一個釋出請求中傳送一組訊息。下面的程式碼演示瞭如何建立生產者併發送訊息。
生產者示例程式碼:
producer = new Producer(…); message = new Message(“test message str”.getBytes()); set = new MessageSet(message); producer.send(“topic1”, set);
為了訂閱話題,消費者首先為話題建立一個或多個訊息流。釋出到該話題的訊息將被均衡地分發到這些流。每個訊息流為不斷產生的訊息提供了迭代介面。然後消費者迭代流中的每一條訊息,處理訊息的有效負載。與傳統迭代器不同,訊息流迭代器永不停止。如果當前沒有訊息,迭代器將阻塞,直到有新的訊息釋出到該話題。Kafka同時支援點到點分發模型(Point-to-point delivery model),即多個消費者共同消費佇列中某個訊息的單個副本,以及釋出-訂閱模型(Publish-subscribe model),即多個消費者接收自己的訊息副本。下面的程式碼演示了消費者如何使用訊息。
消費者示例程式碼:
streams[] = Consumer.createMessageStreams(“topic1”, 1) for (message : streams[0]) { bytes = message.payload(); // do something with the bytes }
Kafka的整體架構如圖2所示。因為Kafka內在就是分散式的,一個Kafka叢集通常包括多個代理。為了均衡負載,將話題分成多個分割槽,每個代理儲存一或多個分割槽。多個生產者和消費者能夠同時生產和獲取訊息。
圖2:Kafka架構
Kafka儲存
Kafka的儲存佈局非常簡單。話題的每個分割槽對應一個邏輯日誌。物理上,一個日誌為相同大小的一組分段檔案。每次生產者釋出訊息到一個分割槽,代理就將訊息追加到最後一個段檔案中。當釋出的訊息數量達到設定值或者經過一定的時間後,段檔案真正寫入磁碟中。寫入完成後,訊息公開給消費者。
與傳統的訊息系統不同,Kafka系統中儲存的訊息沒有明確的訊息Id。
訊息通過日誌中的邏輯偏移量來公開。這樣就避免了維護配套密集定址,用於對映訊息ID到實際訊息地址的隨機存取索引結構的開銷。訊息ID是增量的,但不連續。要計算下一訊息的ID,可以在其邏輯偏移的基礎上加上當前訊息的長度。
消費者始終從特定分割槽順序地獲取訊息,如果消費者知道特定訊息的偏移量,也就說明消費者已經消費了之前的所有訊息。消費者向代理髮出非同步拉請求,準備位元組緩衝區用於消費。每個非同步拉請求都包含要消費的訊息偏移量。Kafka利用sendfile API高效地從代理的日誌段檔案中分發位元組給消費者。
圖3:Kafka儲存架構
Kafka代理
與其它訊息系統不同,Kafka代理是無狀態的。這意味著消費者必須維護已消費的狀態資訊。這些資訊由消費者自己維護,代理完全不管。這種設計非常微妙,它本身包含了創新。
- 從代理刪除訊息變得很棘手,因為代理並不知道消費者是否已經使用了該訊息。Kafka創新性地解決了這個問題,它將一個簡單的基於時間的SLA應用於保留策略。當訊息在代理中超過一定時間後,將會被自動刪除。
- 這種創新設計有很大的好處,消費者可以故意倒回到老的偏移量再次消費資料。這違反了佇列的常見約定,但被證明是許多消費者的基本特徵。
ZooKeeper與Kafka
考慮一下有多個伺服器的分散式系統,每臺伺服器都負責儲存資料,在資料上執行操作。這樣的潛在例子包括分散式搜尋引擎、分散式構建系統或者已知的系統如Apache Hadoop。所有這些分散式系統的一個常見問題是,你如何在任一時間點確定哪些伺服器活著並且在工作中。最重要的是,當面對這些分散式計算的難題,例如網路失敗、頻寬限制、可變延遲連線、安全問題以及任何網路環境,甚至跨多個數據中心時可能發生的錯誤時,你如何可靠地做這些事。這些正是Apache ZooKeeper所關注的問題,它是一個快速、高可用、容錯、分散式的協調服務。你可以使用ZooKeeper構建可靠的、分散式的資料結構,用於群組成員、領導人選舉、協同工作流和配置服務,以及廣義的分散式資料結構如鎖、佇列、屏障(Barrier)和鎖存器(Latch)。許多知名且成功的專案依賴於ZooKeeper,其中包括HBase、Hadoop 2.0、Solr Cloud、Neo4J、Apache Blur(Incubating)和Accumulo。
ZooKeeper是一個分散式的、分層級的檔案系統,能促進客戶端間的鬆耦合,並提供最終一致的,類似於傳統檔案系統中檔案和目錄的Znode檢視。它提供了基本的操作,例如建立、刪除和檢查Znode是否存在。它提供了事件驅動模型,客戶端能觀察特定Znode的變化,例如現有Znode增加了一個新的子節點。ZooKeeper執行多個ZooKeeper伺服器,稱為Ensemble,以獲得高可用性。每個伺服器都持有分散式檔案系統的記憶體複本,為客戶端的讀取請求提供服務。
圖4:ZooKeeper Ensemble架構
上圖4展示了典型的ZooKeeper ensemble,一臺伺服器作為Leader,其它作為Follower。當Ensemble啟動時,先選出Leader,然後所有Follower複製Leader的狀態。所有寫請求都通過Leader路由,變更會廣播給所有Follower。變更廣播被稱為原子廣播。
Kafka中ZooKeeper的用途:正如ZooKeeper用於分散式系統的協調和促進,Kafka使用ZooKeeper也是基於相同的原因。ZooKeeper用於管理、協調Kafka代理。每個Kafka代理都通過ZooKeeper協調其它Kafka代理。當Kafka系統中新增了代理或者某個代理故障失效時,ZooKeeper服務將通知生產者和消費者。生產者和消費者據此開始與其它代理協調工作。Kafka整體系統架構如圖5所示。
圖5:Kafka分散式系統的總體架構
Apache Kafka對比其它訊息服務
讓我們瞭解一下使用Apache Kafka的兩個專案,以對比其它訊息服務。這兩個專案分別是LinkedIn和我的專案:
LinkedIn的研究
LinkedIn團隊做了個實驗研究,對比Kafka與Apache ActiveMQ V5.4和RabbitMQ V2.4的效能。他們使用ActiveMQ預設的訊息持久化庫Kahadb。LinkedIn在兩臺Linux機器上執行他們的實驗,每臺機器的配置為8核2GHz、16GB記憶體,6個磁碟使用RAID10。兩臺機器通過1GB網路連線。一臺機器作為代理,另一臺作為生產者或者消費者。
生產者測試
LinkedIn團隊在所有系統中配置代理,非同步將訊息刷入其持久化庫。對每個系統,執行一個生產者,總共釋出1000萬條訊息,每條訊息200位元組。Kafka生產者以1和50批量方式傳送訊息。ActiveMQ和RabbitMQ似乎沒有簡單的辦法來批量傳送訊息,LinkedIn假定它的批量值為1。結果如下面的圖6所示:
圖6:LinkedIn的生產者效能實驗結果
Kafka效能要好很多的主要原因包括:
- Kafka不等待代理的確認,以代理能處理的最快速度傳送訊息。
- Kafka有更高效的儲存格式。平均而言,Kafka每條訊息有9位元組的開銷