1. 程式人生 > >kafka架構與原理

kafka架構與原理

1 簡介

它可以讓你釋出和訂閱記錄流。在這方面,它類似於一個訊息佇列或企業訊息系統。

它可以讓你持久化收到的記錄流,從而具有容錯能力。

首先,明確幾個概念:

  • Kafka執行在一個或多個伺服器上。
  • Kafka叢集分類儲存的記錄流被稱為主題(Topics)。
  • 每個訊息記錄包含一個鍵,一個值和時間戳。

Kafka有四個核心API:

  • 生產者 API 允許應用程式釋出記錄流至一個或多個Kafka的話題(Topics)。
  • 消費者API 允許應用程式訂閱一個或多個主題,並處理這些主題接收到的記錄流。
  • Streams API 允許應用程式充當流處理器(stream processor
    ,從一個或多個主題獲取輸入流,並生產一個輸出流至一個或多個的主題,能夠有效地變換輸入流為輸出流。
  • Connector API 允許構建和執行可重用的生產者或消費者,能夠把 Kafka主題連線到現有的應用程式或資料系統。例如,一個連線到關係資料庫的聯結器(connector)可能會獲取每個表的變化。

Kafka的客戶端和伺服器之間的通訊是靠一個簡單的,高效能的,與語言無關的TCP協議完成的。這個協議有不同的版本,並保持向前相容舊版本。Kafka不光提供了一個Java客戶端,還有許多語言版本的客戶端。

2 架構

2.1 Broker

每個kafka server稱為一個Broker,多個borker組成kafka cluster。一個機器上可以部署一個或者多個Broker,這多個Broker連線到相同的ZooKeeper就組成了Kafka叢集。

這裡寫圖片描述

2.2 主題Topic

讓我們先來了解Kafka的核心抽象概念記錄流 – 主題。主題是一種分類或釋出的一系列記錄的名義上的名字。Kafka的主題始終是支援多使用者訂閱的; 也就是說,一個主題可以有零個,一個或多個消費者訂閱寫入的資料。

Topic 與broker

一個Broker上可以建立一個或者多個Topic。同一個topic可以在同一叢集下的多個Broker中分佈。

這裡寫圖片描述

當然,Topic只是一個名義上的元件,真正在Broker間分散式的Partition。

2.3 分割槽與日誌

一個主題對應多個分割槽,一個分割槽對應一個日誌

Kafka會為每個topic維護了多個分割槽(partition),每個分割槽會對映到一個邏輯的日誌(log)檔案。每個分割槽是一個有序的,不可變的訊息序列,新的訊息不斷追加到這個有組織的有保證的日誌上。分割槽會給每個訊息記錄分配一個順序ID號 – 偏移量

, 能夠唯一地標識該分割槽中的每個記錄。

日誌分割槽是分散式的存在於一個kafka叢集的多個broker上。每個partition會被複制多份存在於不同的broker上。這樣做是為了容災。具體會複製幾份,會複製到哪些broker上,都是可以配置的。經過相關的複製策略後,每個topic在每個broker上會駐留一到多個partition:

這裡寫圖片描述

2.4 保留策略與Offset

Kafka叢集保留所有釋出的記錄,不管這個記錄有沒有被消費過,Kafka提供可配置的保留策略去刪除舊資料(還有一種策略根據分割槽大小刪除資料)。例如,如果將保留策略設定為兩天,在記錄公佈後兩天內,它可用於消費,之後它將被丟棄以騰出空間。Kafka的效能跟儲存的資料量的大小無關, 所以將資料儲存很長一段時間是沒有問題的。

這裡寫圖片描述

事實上,保留在每個消費者元資料中的最基礎的資料就是消費者正在處理的當前記錄的偏移量(offset)或位置(position)。這種偏移是由消費者控制:通常偏移會隨著消費者讀取記錄線性前進,但事實上,因為其位置是由消費者進行控制,消費者可以在任何它喜歡的位置讀取記錄。例如,消費者可以恢復到舊的偏移量對過去的資料再加工或者直接跳到最新的記錄,並消費從“現在”開始的新的記錄

這些功能的結合意味著,實現Kafka的消費者的代價都是很小的,他們可以增加或者減少而不會對叢集或其他消費者有太大影響。例如,你可以使用我們的命令列工具去追隨任何主題,而且不會改變任何現有的消費者消費的記錄。

2.5 Leader與Followers

一個Topic可能有很多分割槽,以便它能夠支援海量的的資料,更重要的意義是分割槽是進行並行處理的基礎單元。日誌的分割槽會跨伺服器的分佈在Kafka叢集中,每個分割槽可以配置一定數量的副本分割槽提供容錯能力。為了保證較高的處理效率,訊息的讀寫都是在固定的一個副本上完成。這個副本就是所謂的Leader,而其他副本則是Follower,而Follower則會定期地到Leader上同步資料

(1)leader處理所有的讀取和寫入分割槽的請求,而followers被動的從領導者拷貝資料。

(2)如果leader失敗了,followers之一將自動成為新的領導者。

(3)每個伺服器可能充當一些分割槽的leader和其他分割槽的follower,這樣的負載就會在叢集內很好的均衡分配。

(4)一個分割槽在同一時刻只能有一個消費者例項進行消費。

舉例:

這裡寫圖片描述

可以看見我們一共有3個分割槽分別是0,1,2, replica 有2個:

  • partition 0 的leader在broker1, follower在broker2
  • partition 1 的leader在broker2, follower在broker0
  • partition 2 的leader在broker0, follower在brokder1

一個broker中不會出現兩個一樣的Partition,replica會被均勻的分佈在各個kafka server(broker)上 。Kafka並不允許replicas 數設定大於 broker數,因為在一個broker上如果有2個replica其實是沒有意義的,因為再多的replica同時在一臺broker上,隨著該broker的crash,一起不可用。

(1)Leader選舉與ISR

如果某個分割槽所在的伺服器除了問題,不可用,kafka會從該分割槽的其他的副本中選擇一個作為新的Leader。之後所有的讀寫就會轉移到這個新的Leader上。現在的問題是應當選擇哪個作為新的Leader。顯然,只有那些跟Leader保持同步的Follower才應該被選作新的Leader。

Kafka會在Zookeeper上針對每個Topic維護一個稱為ISRin-sync replica,已同步的副本)的集合,該集合中是一些分割槽的副本。只有當這些副本都跟Leader中的副本同步了之後,kafka才會認為訊息已提交,並反饋給訊息的生產者。如果這個集合有增減,kafka會更新zookeeper上的記錄。如果某個分割槽的Leader不可用,Kafka就會從ISR集合中選擇一個副本作為新的Leader。顯然通過ISR,kafka需要的冗餘度較低,可以容忍的失敗數比較高。假設某個topic有f+1個副本,kafka可以容忍f個伺服器不可用。

(2)為什麼不用少數服從多數的方法

少數服從多數是一種比較常見的一致性演算法和Leader選舉法。它的含義是隻有超過半數的副本同步了,系統才會認為資料已同步;選擇Leader時也是從超過半數的同步的副本中選擇。這種演算法需要較高的冗餘度。譬如只允許一臺機器失敗,需要有三個副本;而如果只容忍兩臺機器失敗,則需要五個副本。而kafka的ISR集合方法,分別只需要兩個和三個副本。

(3)如果所有的ISR副本都失敗了怎麼辦

此時有兩種方法可選,一種是等待ISR集合中的副本復活,一種是選擇任何一個立即可用的副本,而這個副本不一定是在ISR集合中。這兩種方法各有利弊,實際生產中按需選擇。如果要等待ISR副本復活,雖然可以保證一致性,但可能需要很長時間。而如果選擇立即可用的副本,則很可能該副本並不一致。

2.6 生產者和消費者

(1)生產者

生產者釋出資料到他們所選擇的主題。生產者負責選擇把記錄分配到主題中的哪個分割槽。這可以使用輪詢演算法( round-robin)進行簡單地平衡負載,也可以根據一些更復雜的語義分割槽演算法(比如基於記錄一些鍵值)來完成。

(2)消費者

消費者以消費群(consumer group)的名稱來標識自己,每個釋出到主題的訊息都會發送給訂閱了這個主題的消費群裡面的一個消費者例項,即一個消費群只發送一次。消費者的例項可以在單獨的程序或單獨的機器上。

這裡寫圖片描述

上圖中兩個伺服器的Kafka叢集具有四個分割槽(P0-P3)和兩個消費群。A消費群有兩個消費者,B群有四個。更常見的是,我們會發現主題有少量的消費群,每一個都是“邏輯上的訂閱者”。每組都是由很多消費者例項組成,從而實現可擴充套件性和容錯性。這只不過是釋出 – 訂閱模式的再現,區別是這裡的訂閱者是一組消費者而不是一個單一的程序的消費者。

Kafka消費群的實現方式是通過分割分割槽給每個Consumer例項實現的,使每個例項在任何時間點的都可以“公平分享”獨佔的分割槽。維持消費群中的成員關係的這個過程是通過Kafka動態協議處理。如果新的例項加入該組,他將接管該組的其他成員的一些分割槽; 如果一個例項死亡,其分割槽將被分配到剩餘的例項。

Kafka只保證一個分割槽內的訊息有序,不能保證一個主題的不同分割槽之間的訊息有序。分割槽的訊息有序與依靠主鍵進行資料分割槽的能力相結合足以滿足大多數應用的要求。但是,如果你想要保證所有的訊息都絕對有序可以只為一個主題分配一個分割槽,雖然這將意味著每個消費群同時只能有一個消費程序在消費

3 資料可靠性與一致性

3.1 Partition Recovery機制

每個Partition會在磁碟記錄一個RecoveryPoint,記錄已經flush到磁碟的最大offset。當broker fail 重啟時,會進行loadLogs。 首先會讀取該Partition的RecoveryPoint,找到包含RecoveryPoint的segment及以後的segment, 這些segment就是可能沒有完全flush到磁碟segments。然後呼叫segment的recover,重新讀取各個segment的msg,並重建索引。

優點

  • 以segment為單位管理Partition資料,方便資料生命週期的管理,刪除過期資料簡單。
  • 在程式崩潰重啟時,加快recovery速度,只需恢復未完全flush到磁碟的segment。
  • 通過index中offset與物理偏移對映,用二分查詢能快速定位msg,並且通過分多個Segment,每個index檔案很小,查詢速度更快。

3.2 Partition Replica同步機制

  • Partition的多個replica中一個為Leader,其餘為follower
  • Producer只與Leader互動,把資料寫入到Leader中
  • Followers從Leader中拉取資料進行資料同步
  • Consumer只從Leader拉取資料

ISR:in-sync replica,已同步的副本。準確的定義是“所有不落後的replica集合”。不落後有兩層含義:距離上次FetchRequest的時間不大於某一個值或落後的訊息數不大於某一個值, Leader失敗後會從ISR中選取一個Follower做Leader。

3.4 訊息的順序消費問題

在說到訊息中介軟體的時候,我們通常都會談到一個特性:訊息的順序消費問題。這個問題看起來很簡單:Producer傳送訊息1, 2, 3;Consumer按1, 2, 3順序消費。但實際情況卻是:無論RocketMQ,還是Kafka,預設都不保證訊息的嚴格有序消費!困難如下:

(1)Producer

傳送端不能非同步傳送,非同步傳送在傳送失敗的情況下,就沒辦法保證訊息順序。比如你連續發了1,2,3。 過了一會,返回結果1失敗,2, 3成功。你把1再重新發送1遍,這個時候順序就亂掉了。

(2)儲存端

  • 對於儲存端,要保證訊息順序,會有以下幾個問題:
    訊息不能分割槽。也就是1個topic,只能有1個佇列。在Kafka中,它叫做partition;在RocketMQ中,它叫做queue。 如果你有多個佇列,那同1個topic的訊息,會分散到多個分割槽裡面,自然不能保證順序。
  • 即使只有1個佇列的情況下,會有第2個問題。該機器掛了之後,能否切換到其他機器?也就是高可用問題。比如你當前的機器掛了,上面還有訊息沒有消費完。此時切換到其他機器,可用性保證了。但訊息順序就亂掉了。要想保證,一方面要同步複製,不能非同步複製;另1方面得保證,切機器之前,掛掉的機器上面,所有訊息必須消費完了,不能有殘留。很明顯,這個很難。

(3)接收端

對於接收端,不能並行消費,也即不能開多執行緒或者多個客戶端消費同1個佇列。

3.5 Producer傳送訊息的配置

3.5.1 同步模式

kafka有同步(sync)、非同步(async)以及oneway這三種傳送方式,某些概念上區分也可以分為同步和非同步兩種,同步和非同步的傳送方式通過producer.type引數指定,而onewayrequest.require.acks引數指定。

producer.type的預設值是sync,即同步的方式。這個引數指定了在後臺執行緒中訊息的傳送方式是同步的還是非同步的。如果設定成非同步的模式,可以執行生產者以batch的形式push資料,這樣會極大的提高broker的效能,但是這樣會增加丟失資料的風險。

3.5.2 非同步模式

對於非同步模式,還有4個配套的引數,如下:

Property Default Description
queue.buffering.max.ms 5000 啟用非同步模式時,producer快取訊息的時間。比如我們設定成1000時,它會快取1s的資料再一次傳送出去,這樣可以極大的增加broker吞吐量,但也會造成時效性的降低。
queue.buffering.max.messages 10000 啟用非同步模式時,producer快取佇列裡最大快取的訊息數量,如果超過這個值,producer就會阻塞或者丟掉訊息。
queue.enqueue.timeout.ms -1 當達到上面引數時producer會阻塞等待的時間。如果設定為0,buffer佇列滿時producer不會阻塞,訊息直接被丟掉;若設定為-1,producer會被阻塞,不會丟訊息。
batch.num.messages 200 啟用非同步模式時,一個batch快取的訊息數量。達到這個數值時,producer才會傳送訊息。(每次批量傳送的數量)

3.5.3 oneway

oneway是隻顧訊息發出去而不管死活,訊息可靠性最低,但是低延遲、高吞吐,這種對於某些完全對可靠性沒有要求的場景還是適用的,即request.required.acks設定為0。

3.5.4 訊息可靠性級別

當Producer向Leader傳送資料時,可以通過request.required.acks引數設定資料可靠性的級別:

  • 0: 不論寫入是否成功,server不需要給Producer傳送Response,如果發生異常,server會終止連線,觸發Producer更新meta資料;
  • 1: Leader寫入成功後即傳送Response,此種情況如果Leader fail,會丟失資料
  • -1: 等待所有ISR接收到訊息後再給Producer傳送Response,這是最強保證
    僅設定acks=-1也不能保證資料不丟失,當Isr列表中只有Leader時,同樣有可能造成資料丟失。要保證資料不丟除了設定acks=-1, 還要保 證ISR的大小大於等於2,具體引數設定:
    • (1)request.required.acks: 設定為-1 等待所有ISR列表中的Replica接收到訊息後採算寫成功;
    • (2)min.insync.replicas: 設定為大於等於2,保證ISR中至少有兩個Replica

Producer要在吞吐率和資料可靠性之間做一個權衡。

3.5.5 一般配置

對於sync的傳送方式:

producer.type=sync 
request.required.acks=1

對於async的傳送方式:

producer.type=async 
request.required.acks=1 
queue.buffering.max.ms=5000 
queue.buffering.max.messages=10000 
queue.enqueue.timeout.ms = -1 
batch.num.messages=200

對於oneway的傳送傳送:

producer.type=async 
request.required.acks=0

4 應用場景

4.1 訊息系統

訊息處理模型歷來有兩種:

  • 佇列模型:一組消費者可以從伺服器讀取記錄,每個記錄都會被其中一個消費者處理,為保障訊息的順序,同一時刻只能有一個程序進行消費。
  • 釋出-訂閱模型:記錄被廣播到所有的消費者。

Kafka的消費群的推廣了這兩個概念。消費群可以像佇列一樣讓訊息被一組程序處理(消費群的成員),與釋出 – 訂閱模式一樣,Kafka可以讓你傳送廣播訊息到多個消費群。

Kafka兼顧了訊息的有序性和併發處理能力。傳統的訊息佇列的訊息在佇列中是有序的,多個消費者從佇列中消費訊息,伺服器按照儲存的順序派發訊息。然而,儘管伺服器是按照順序派發訊息,但是這些訊息記錄被非同步傳遞給消費者,消費者接收到的訊息也許已經是亂序的了。這實際上意味著訊息的排序在並行消費中都將丟失訊息系統通常靠 “排他性消費”( exclusive consumer)來解決這個問題,只允許一個程序從佇列中消費,當然,這意味著沒有並行處理的能力。

Kafka做的更好。通過一個概念:並行性-分割槽-主題實現主題內的並行處理,Kafka是能夠通過一組消費者的程序同時提供排序保證和並行處理以及負載均衡的能力:

(1)排序保障

每個主題的分割槽指定給每個消費群中的一個消費者,使每個分割槽只由該組中的一個消費者所消費。通過這樣做,我們確保消費者是一個分割槽唯一的讀者,從而順序的消費資料。

(2)並行處理

因為有許多的分割槽,所以負載還能夠均衡的分配到很多的消費者例項上去。但是請注意,一個消費群的消費者例項不能比分割槽數量多,因為分割槽數代表了一個主題的最大併發數,消費者的數量高於這個數量意義不大。

4.2 日誌採集

大多數時候,我們的log都會輸出到本地的磁碟上,排查問題也是使用linux命令來搞定,如果web程式組成負載叢集,那麼就有多臺機器,如果有幾十臺機器,幾十個服務,那麼想快速定位log問題和排查就比較麻煩了,所以很有必要有一個統一的平臺管理log,現在大多數公司的套路都是收集重要應用的log集中到kafka中,然後在分別匯入到es和hdfs上,一個做實時檢索分析,另一個做離線統計和資料備份。如何能快速收集應用日誌到kafka中?

方法一:使用log4j的整合包

kafka官網已經提供了非常方便的log4j的整合包 kafka-log4j-appender,我們只需要簡單配置log4j檔案,就能收集應用程式log到kafka中。

#log4j.rootLogger=WARN,console,kafka
log4j.rootLogger=INFO,console

# for package com.demo.kafka, log would be sent to kafka appender.
#log4j.logger.com.bigdata.xuele.streaming.SparkStreamingKmd*=info,kafka
# appender kafka
log4j.appender.kafka=kafka.producer.KafkaLog4jAppender
log4j.appender.kafka.topic=${kafka.log.topic}
# multiple brokers are separated by comma ",".
log4j.appender.kafka.brokerList=${kafka.log.brokers}
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
#log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
log4j.appender.kafka.layout.ConversionPattern=[%d] %p %m (%c)%n

# appender console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
#log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
log4j.appender.console.layout.ConversionPattern=[%d] [%p] [%t] %m%n

log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

注意,需要引入maven的依賴包:

       <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.8.2.1</version>
       </dependency>

非常簡單,一個maven依賴加一個log4j配置檔案即可,如果依然想寫入log到本地 檔案依然也是可以的,這種方式最簡單快速,但是預設的的log日誌是一行一行的純文字,有些場景下我們可能需要json格式的資料。

方法二: 重寫Log4jAppender

重寫Log4jAppender,自定義輸出格式,支援json格式,如果是json格式的資料打入到kafka中,後續收集程式可能就非常方便了,直接拿到json就能入到mongodb或者es中,如果打入到kafka中的資料是純文字,那麼收集程式,可能需要做一些etl,解析其中的一些欄位然後再入到es中,所以原生的輸出格式,可能稍不靈活,這樣就需要我們自己寫一些類,然後達到靈活的程度,github連線:

感興趣的朋友可以看下。

總結:

(1)方法一簡單快速,不支援json格式的輸出,打到kafka的訊息都是原樣的log日誌資訊

(2)方法二稍微複雜,需要自己擴充套件log收集類,但支援json格式的資料輸出,對於想落地json資料直接到儲存系統中是非常適合的。

此外需要注意,在除錯的時候log傳送資料到kafka模式最好是同步模式的否則你控制檯列印的資料很有可能不會被收集kafka中,程式就停止了。生產環境最好開啟非同步傳送資料模式,因為內部是批量的處理,所以能提升吞吐,但有一定的輕微延遲。

4.3 流處理

只是讀,寫,以及儲存資料流是不夠的,目的是能夠實時處理資料流。在Kafka中,流處理器是從輸入的主題連續的獲取資料流,然後對輸入進行一系列的處理,並生產連續的資料流到輸出主題。

這些簡單處理可以直接使用生產者和消費者的API做到。然而,對於更復雜的轉換Kafka提供了一個完全整合的流API。這允許應用程式把一些重要的計算過程從流中剝離或者加入流一起。這種設施可幫助解決這類應用面臨的難題:處理雜亂的資料,改變程式碼去重新處理輸入,執行有狀態的計算等。流API建立在Kafka提供的核心基礎單元之上:它使用生產者和消費者的API進行輸入輸出,使用Kafka儲存有狀態的資料,並使用群組機制在一組流處理例項中實現容錯。

把功能組合起來

訊息的傳輸,儲存和流處理的組合看似不尋常,卻是Kafka作為流處理平臺的關鍵。像HDFS分散式檔案系統,允許儲存靜態檔案進行批量處理。像這樣的系統允許儲存和處理過去的歷史資料。傳統的企業訊息系統允許處理您訂閱後才抵達的訊息。這樣的系統只能處理將來到達的資料。

Kafka結合了這些功能,這種結合對Kafka作為流應用平臺以及資料流處理的管道至關重要。通過整合儲存和低延遲訂閱,流處理應用可以把過去和未來的資料用相同的方式處理。這樣一個單獨的應用程式,不但可以處理歷史的,儲存的資料,當它到達最後一條記錄不會停止,繼續等待處理未來到達的資料。這是泛化了的流處理的概念,包括了批處理應用以及訊息驅動的應用。同樣,流資料處理的管道結合實時事件的訂閱使人們能夠用Kafka實現低延遲的管道; 可靠的儲存資料的能力使人們有可能使用它傳輸一些重要的必須保證可達的資料。可以與一個定期載入資料的線下系統整合,或者與一個因為維護長時間下線的系統整合。流處理的元件能夠保證轉換(處理)到達的資料。

5 Kafka與ActiveMQ對比

首先,Active MQ與Kafka的相同點只有一個,就是都是訊息中介軟體。其他沒有任何相同點。

5.1 consumer的不同

(1)AMQ消費完的訊息會被清理掉

AMQ無論在standalone還是分散式的情況下,都會使用mysql作為儲存,多一個consumer執行緒去消費多個queue, 消費完的message會在mysql中被清理掉。

(2)AMQ的消費邏輯在Broker中完成

作為AMQ的consume clinet的多個consumer執行緒去消費queue,AMQ Broker會接收到這些consume執行緒,阻塞在這裡,有message來了就會進行消費,沒有訊息就會阻塞在這裡。具體消費的邏輯也就是處理這些consumer執行緒都是AMQ Broker那面處理。

kafka是message都存在partition下的segment檔案裡面,有offsite偏移量去記錄那條消費了,哪條沒消費。某個consumer group下consumer執行緒消費完就會,這個consumer group 下的這個consumer對應這個partition的offset+1,kafka並不會刪除這條已經被消費的message。其他的consumer group也可以再次消費這個message。在high level api中offset會自動或手動的提交到zookeeper上(如果是自動提交就有可能處理失敗或還沒處理完就提交offset+1了,容易出現下次再啟動consumer group的時候這條message就被漏了),也可以使用low level api,那麼就是consumer程式中自己維護offset+1的邏輯。kafka中的message會定期刪除。

(3)Kafka有consumer group的概念,AMQ沒有。

一個consumer group下有多個consumer,每個consumer都是一個執行緒,consumer group是一個執行緒組。每個執行緒組consumer group之間互相獨立。同一個partition中的一個message只能被一個consumer group下的一個consumer執行緒消費,因為消費完了這個consumer group下的這個consumer對應的這個partition的offset就+1了,這個consumer group下的其他consumer還是這個consumer都不能在消費了。 但是另外一個consumer group是完全獨立的,可以設定一個from的offset位置,重新消費這個partition。

5.2 關於儲存結構

ActiveMQ的訊息持久化機制有JDBC,AMQ,KahaDB和LevelDB

Kafka是檔案儲存,每個topic有多個partition,每個partition有多個replica副本(每個partition和replica都是均勻分配在不同的kafka broker上的)。每個partition由多個segment檔案組成。這些檔案是順序儲存的。因此讀取和寫入都是順序的,因此,速度很快,省去了磁碟定址的時間。

很多系統、元件為了提升效率一般恨不得把所有資料都扔到記憶體裡,然後定期flush到磁碟上;而Kafka決定直接使用頁面快取;但是隨機寫入的效率很慢,為了維護彼此的關係順序還需要額外的操作和儲存,而線性的順序寫入可以避免磁碟定址時間,實際上,線性寫入(linear write)的速度大約是300MB/秒,但隨即寫入卻只有50k/秒,其中的差別接近10000倍。這樣,Kafka以頁面快取為中間的設計在保證效率的同時還提供了訊息的持久化,每個consumer自己維護當前讀取資料的offset(也可委託給zookeeper),以此可同時支援線上和離線的消費。

5.3 關於使用場景與吞吐量

ActiveMQ用於企業訊息中介軟體,使得業務邏輯和前端處理邏輯解耦。AMQ的吞吐量不大,zuora的AMQ就是用作jms來使用。AMQ吞吐量不夠,並且持久化message資料通過jdbc存在mysql,寫入和讀取message效能太低。而Kafka的吞吐量非常大。

5.4 push/pull 模型

對於消費者而言有兩種方式從訊息中介軟體獲取訊息:

①Push方式:由訊息中介軟體主動地將訊息推送給消費者,採用Push方式,可以儘可能快地將訊息傳送給消費者;②Pull方式:由消費者主動向訊息中介軟體拉取訊息,會增加訊息的延遲,即訊息到達消費者的時間有點長

但是,Push方式會有一個壞處:如果消費者的處理訊息的能力很弱(一條訊息需要很長的時間處理),而訊息中介軟體不斷地向消費者Push訊息,消費者的緩衝區可能會溢位。

AMQ的Push消費

ActiveMQ使用PUSH模型, 對於PUSH,broker很難控制資料傳送給不同消費者的速度。AMQ Broker將message推送給對應的BET consumer。ActiveMQ用prefetch limit 規定了一次可以向消費者Push(推送)多少條訊息。當推送訊息的數量到達了perfetch limit規定的數值時,消費者還沒有向訊息中介軟體返回ACK,訊息中介軟體將不再繼續向消費者推送訊息。

AMQ的Pull消費

ActiveMQ prefetch limit 設定成0意味著什麼?意味著此時,消費者去輪詢訊息中介軟體獲取訊息。不再是Push方式了,而是Pull方式了。即消費者主動去訊息中介軟體拉取訊息。

那麼,ActiveMQ中如何採用Push方式或者Pull方式呢?從是否阻塞來看,消費者有兩種方式獲取訊息。同步方式和非同步方式。

同步方式使用的是ActiveMQMessageConsumer的receive()方法。而非同步方式則是採用消費者實現MessageListener介面,監聽訊息。使用同步方式receive()方法獲取訊息時,prefetch limit即可以設定為0,也可以設定為大於0。

prefetch limit為零 意味著:“receive()方法將會首先發送一個PULL指令並阻塞,直到broker端返回訊息為止,這也意味著訊息只能逐個獲取(類似於Request<->Response)”。

prefetch limit 大於零 意味著:“broker端將會批量push給client 一定數量的訊息(<= prefetch),client端會把這些訊息(unconsumedMessage)放入到本地的佇列中,只要此佇列有訊息,那麼receive方法將會立即返回,當一定量的訊息ACK之後,broker端會繼續批量push訊息給client端。”

當使用MessageListener非同步獲取訊息時,prefetch limit必須大於零了。因為,prefetch limit 等於零 意味著訊息中介軟體不會主動給消費者Push訊息,而此時消費者又用MessageListener被動獲取訊息(不會主動去輪詢訊息)。這二者是矛盾的。

Kafka只有Pull消費方式

Kafka使用PULL模型,PULL可以由消費者自己控制,但是PULL模型可能造成消費者在沒有訊息的情況下盲等,這種情況下可以通過long polling機制緩解,而對於幾乎每時每刻都有訊息傳遞的流式系統,這種影響可以忽略。Kafka 的 consumer 是以pull的形式獲取訊息資料的。 pruducer push訊息到kafka cluster ,consumer從叢集中pull訊息。