1. 程式人生 > >【kafka】二、kafka框架介紹(消費者篇):

【kafka】二、kafka框架介紹(消費者篇):

本人菜雞,正在努力學習,記錄知識以備後患!

該文章承接上文(kafka系列),暫有兩篇:

【kafka】一、kafka框架介紹(生產者篇):https://blog.csdn.net/lsr40/article/details/84029034

【kafka】二、kafka框架介紹(消費者篇):https://blog.csdn.net/lsr40/article/details/84034394

另一篇解決報錯的文章:

【kafka】報錯:advertised.listeners引數的重要性(外部訪問區域網kafka)https://blog.csdn.net/lsr40/article/details/84135959

消費者有兩個配置,一個是New Consumer Configs(讓kafka自己管理偏移量),一個是Old Consumer Configs(使用預設zk管理偏移量),具體使用的時候,請認準自己的kafka的版本

官網:http://kafka.apache.org/documentation/#consumerconfigs

(消費者偏移量記錄)當然,在說引數前,我要說說消費者的一些事情:

消費者消費資料的時候,必須記錄每個分割槽消費到哪了,也就是說,如果消費者A讀取的那個topic有10個分割槽的話,必須記錄10個值,這10個值分別表示每個分割槽被消費到哪裡(通過offset偏移量來記錄),因此就有以下的概念,這些消費者的消費記錄儲存在哪裡?

  1. 手動管理:通過api,獲取拿到的該條資料的偏移量和分割槽值,儲存在儲存系統中(可以是資料庫,可以是檔案等方式),當然自己管理是最麻煩的一種方式,但也是可控的東西最多的。關於這個我看到了有一個系列的文章(感謝作者:葬月魔帝),大家有興趣可以關注下!
                如何管理Spark Streaming消費Kafka的偏移量(一) https://blog.csdn.net/u010454030/article/details/78535003

    如何管理Spark Streaming消費Kafka的偏移量(二)

    https://blog.csdn.net/u010454030/article/details/78554643
                如何管理Spark Streaming消費Kafka的偏移量(三) https://blog.csdn.net/u010454030/article/details/78660643
  2. zookeeper管理:配置,offsets.storage=zookeeper,auto.commit.enable=true(開啟自動提交偏移量)接著auto.commit.interval.ms=60*1000(預設值:1分鐘提交一次),缺點:提交之後程式又運行了30s之後掛掉了,下次消費的時候就會重複消費那30s的資料,反覆的跟zk互動,影響效能。
  3. kafka管理:offsets.storage=kafka,enable.auto.commit=true,auto.commit.interval.ms=5000(預設是5s提交一次),實現原理是kafka自己建立一個名為__consumer_offsets的topic,將offset儲存在該topic下,推薦採用該方式,關於如何讀取這個topic的資訊:(感謝作者:huxihx)https://www.cnblogs.com/huxi2b/p/6061110.html,當然也可以使用其他的監控工具,例如kafka-manager之類的別人寫好的框架,通過前端頁面檢視topic的資料情況,和消費者偏移量情況。
  4. 偏移量的管理涉及到三種語義
    至少一次 生產者同步傳送訊息(將acks設定為1或者-1/all)
    至多一次 生產者非同步傳送訊息,無論接受到與否
    僅此一次 手動管理偏移量,做到處理一條儲存一次偏移量,或者批量處理一批(處理未結束失敗了要記得回滾),儲存一次
    kafka有給出java版api:http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html,在Storing Offsets Outside Kafka該模組中有介紹。

-1.group.id:消費者的組id(非常重要),他涉及到了釋出訂閱和佇列模式

  • 釋出訂閱模式:不同groupid的消費者之間消費資料互不影響,也就是說可以同時消費同一份資料。舉個例子:groupid就好像家庭id,家庭A定了一份人民日報(topic1),家庭B也定了一份人民日報(也是topic1),那麼當家庭A在看他們家買的人民日報的時候,絲毫不影響家庭B,家庭B可以同時看相同內容,也可以看不相同
  • 佇列模式:相同group_id的消費者之間不可能消費同一份資料,舉個例子:家庭A定了一份人民日報(topic1),人民日報裡面有3個板塊(3個分割槽),當爸爸(消費者1)在看第一個板塊的時候(分割槽1),其他家庭成員就不能再看相同的板塊,只能去看板塊2(分割槽2)或者板塊3(分割槽3)

-2.Rebalance:通過zookeeper實現的消費者負載均衡機制,在佇列模式下,如果多個消費者的group_id相同,那麼他們就會觸發負載均衡機制。舉個例子:topicA有3個分割槽,我啟動了一個消費者x(預設這裡的消費者都只有一個執行緒),那麼消費者x就會獨自消費3個分割槽的資料。這時,我又啟動了消費者y(group_id和x相同),那麼就會馬上觸發負載均衡機制,一個消費者消費2個分割槽資料,另一個消費1個分割槽,這時候我又啟動了一個消費者z(group_id和x和y相同),這樣每個消費者都會消費1個分割槽的資料。也就是說當消費者的總個數(匯流排程數)和topicA的分割槽數一樣的時候,效能最好如果這時候再增加消費者,那麼就會出現有一個消費者空轉(消耗資源,但接受不到資料),導致資源浪費。

-3.fetch.min.bytes:設定最少一次拉取多少資料,預設是一有資料就拉,可以調大,但是資料就會有延遲,但是請求的次數就降低,提升吞吐量(一般預設就可以了)

其他的基本上不需要太大的調整(當然如果有特殊需求的話,不同叢集情況下,引數最優肯定都是不同的,還需要具體測試),為了提升效能,有時候還需要調整broker和topic的配置,這兩部分本人暫時沒有做太深入的研究,就不在這裡瞎***吹了!

最後kafka自帶消費者:

bin/kafka-console-consumer.sh --topic topic名稱 --zookeeper zk1:埠1,zk2:埠2,zk3:埠3/zk上kafka的路徑

總結:考慮偏移量如何管理,考慮釋出訂閱還是佇列模式,考慮消費模式是否達到最優,是否設定拉資料的延遲,是否可以優化消費端的程式碼來減少資料延遲

kafka系列文章暫時到這,主要就介紹了對於叢集使用者的相關引數(生產者和消費者),對於搭建叢集(topic和broker)的一些引數,並沒有相關的介紹,原諒本人才疏學淺。如果有什麼問題或者筆誤的地方,歡迎可以留言評論~