1. 程式人生 > >傳統Active MQ與大資料下的分散式Kafka

傳統Active MQ與大資料下的分散式Kafka

本人在Zuora工作的時候,幾乎所有的非同步業務邏輯都使用ActiveMQ,對AMQ也算頗為熟悉。現如今每天和Kafka打交道,對kafka也算駕馭的不錯。現在基於這兩者做個小比較。 首先,Active MQ與Kafka的相同點只有一個,就是都是訊息中介軟體。其他沒有任何相同點。
  1. 關於consume
Active mq是完全遵循JMS標準的。amq無論在standalone還是分散式的情況下,都會使用mysql作為儲存,多一個consumer執行緒去消費多個queue, 消費完的message會在mysql中被清理掉。作為AMQ的consume clinet的多個consumer執行緒去消費queue,AMQ Broker會接收到這些consume執行緒,阻塞在這裡,有message來了就會進行消費,沒有訊息就會阻塞在這裡。具體消費的邏輯也就是處理這些consumer執行緒都是AMQ Broker那面處理。其實就是queue的message存在mysql,多個執行緒監聽這個queue,
Kafka有consumer group的概念。一個consumer group下有多個consumer,每個consumer都是一個執行緒,consumer group是一個執行緒組。每個執行緒組consumer group之間互相獨立。同一個partition中的一個message只能被一個consumer group下的一個consumer執行緒消費,因為消費完了這個consumer group下的這個consumer對應的這個partition的offset就+1了,這個consumer group下的其他consumer還是這個consumer都不能在消費了。 但是另外一個consumer group是完全獨立的,可以設定一個from的offset位置,重新消費這個partition。 kafka是message都存在partition下的segment檔案裡面,有offsite偏移量去記錄那條消費了,哪條沒消費。某個consumer group下consumer執行緒消費完就會,這個consumer group 下的這個consumer對應這個partition的offset+1,kafka並不會刪除這條已經被消費的message。其他的consumer group也可以再次消費這個message。在high level api中offset會自動或手動的提交到zookeeper上(如果是自動提交就有可能處理失敗或還沒處理完就提交offset+1了,容易出現下次再啟動consumer group的時候這條message就被漏了),也可以使用low level api,那麼就是consumer程式中自己維護offset+1的邏輯。 kafka中的message會定期刪除。
  1. 關於儲存結構
ActiveMQ的訊息持久化機制有JDBC,AMQ,KahaDB和LevelDB Kafka是檔案儲存,每個topic有多個partition,每個partition有多個replica副本(每個partition和replica都是均勻分配在不同的kafka broker上的)。每個partition由多個segment檔案組成。這些檔案是順序儲存的。因此讀取和寫入都是順序的,因此,速度很快,省去了磁碟定址的時間。 很多系統、元件為了提升效率一般恨不得把所有資料都扔到記憶體裡,然後定期flush到磁碟上;而Kafka決定直接使用頁面快取;但是隨機寫入的效率很慢,為了維護彼此的關係順序還需要額外的操作和儲存,而線性的順序寫入可以避免磁碟定址時間,實際上,線性寫入(linear write)的速度大約是300MB/秒,但隨即寫入卻只有50k/秒,其中的差別接近10000倍。這樣,Kafka以頁面快取為中間的設計在保證效率的同時還提供了訊息的持久化,每個consumer自己維護當前讀取資料的offset(也可委託給zookeeper),以此可同時支援線上和離線的消費。
  1. 關於使用場景與吞吐量
ActiveMQ用於企業訊息中介軟體,使得業務邏輯和前端處理邏輯解耦。AMQ的吞吐量不大,zuora的AMQ就是用作jms來使用。AMQ吞吐量不夠,並且持久化message資料通過jdbc存在mysql,寫入和讀取message效能太低。 Kafka的吞吐量非常大 TalkingData的kafka吞吐量非常大,並且會堆積message資料, kafka更多的作為儲存來用,可以淤積資料。
  1.  關於訊息傳遞模型
傳統的訊息佇列最少提供兩種訊息模型,一種P2P,一種PUB/SUB Kafka並沒有這麼做,它提供了一個消費者組的概念,一個訊息可以被多個消費者組消費,但是隻能被一個消費者組裡的一個消費者消費,這樣當只有一個消費者組時就等同與P2P模型,當存在多個消費者組時就是PUB/SUB模型
  1. push/pull 模型
對於消費者而言有兩種方式從訊息中介軟體獲取訊息: ①Push方式:由訊息中介軟體主動地將訊息推送給消費者,採用Push方式,可以儘可能快地將訊息傳送給消費者;②Pull方式:由消費者主動向訊息中介軟體拉取訊息,會增加訊息的延遲,即訊息到達消費者的時間有點長 但是,Push方式會有一個壞處:如果消費者的處理訊息的能力很弱(一條訊息需要很長的時間處理),而訊息中介軟體不斷地向消費者Push訊息,消費者的緩衝區可能會溢位。 ActiveMQ使用PUSH模型, 對於PUSH,broker很難控制資料傳送給不同消費者的速度。AMQ Broker將message推送給對應的BET consumer。ActiveMQ用prefetch limit 規定了一次可以向消費者Push(推送)多少條訊息。當推送訊息的數量到達了perfetch limit規定的數值時,消費者還沒有向訊息中介軟體返回ACK,訊息中介軟體將不再繼續向消費者推送訊息。 ActiveMQ  prefetch limit 設定成0意味著什麼?意味著此時,消費者去輪詢訊息中介軟體獲取訊息。不再是Push方式了,而是Pull方式了。即消費者主動去訊息中介軟體拉取訊息。

那麼,ActiveMQ中如何採用Push方式或者Pull方式呢?

從是否阻塞來看,消費者有兩種方式獲取訊息。同步方式和非同步方式。

同步方式使用的是ActiveMQMessageConsumer的receive()方法。而非同步方式則是採用消費者實現MessageListener介面,監聽訊息。

使用同步方式receive()方法獲取訊息時,prefetch limit即可以設定為0,也可以設定為大於0

prefetch limit為零 意味著:“receive()方法將會首先發送一個PULL指令並阻塞,直到broker端返回訊息為止,這也意味著訊息只能逐個獲取(類似於Request<->Response)”

prefetch limit 大於零 意味著:“broker端將會批量push給client 一定數量的訊息(<= prefetch),client端會把這些訊息(unconsumedMessage)放入到本地的佇列中,只要此佇列有訊息,那麼receive方法將會立即返回,當一定量的訊息ACK之後,broker端會繼續批量push訊息給client端。” 當使用MessageListener非同步獲取訊息時,prefetch limit必須大於零了。因為,prefetch limit 等於零 意味著訊息中介軟體不會主動給消費者Push訊息,而此時消費者又用MessageListener被動獲取訊息(不會主動去輪詢訊息)。這二者是矛盾的。 Kafka使用PULL模型,PULL可以由消費者自己控制,但是PULL模型可能造成消費者在沒有訊息的情況下盲等,這種情況下可以通過long polling機制緩解,而對於幾乎每時每刻都有訊息傳遞的流式系統,這種影響可以忽略。 Kafka 的 consumer 是以pull的形式獲取訊息資料的。 pruducer push訊息到kafka cluster ,consumer從叢集中pull訊息。