【kafka】二、kafka框架介紹(消費者篇):
本人菜雞,正在努力學習,記錄知識以備後患!
該文章承接上文(kafka系列),暫有兩篇:
【kafka】一、kafka框架介紹(生產者篇):https://blog.csdn.net/lsr40/article/details/84029034
【kafka】二、kafka框架介紹(消費者篇):https://blog.csdn.net/lsr40/article/details/84034394
另一篇解決報錯的文章:
【kafka】報錯:advertised.listeners引數的重要性(外部訪問區域網kafka)https://blog.csdn.net/lsr40/article/details/84135959
消費者有兩個配置,一個是New Consumer Configs(讓kafka自己管理偏移量),一個是Old Consumer Configs(使用預設zk管理偏移量),具體使用的時候,請認準自己的kafka的版本
官網:http://kafka.apache.org/documentation/#consumerconfigs
(消費者偏移量記錄)當然,在說引數前,我要說說消費者的一些事情:
消費者消費資料的時候,必須記錄每個分割槽消費到哪了,也就是說,如果消費者A讀取的那個topic有10個分割槽的話,必須記錄10個值,這10個值分別表示每個分割槽被消費到哪裡(通過offset偏移量來記錄),因此就有以下的概念,這些消費者的消費記錄儲存在哪裡?
- 手動管理:通過api,獲取拿到的該條資料的偏移量和分割槽值,儲存在儲存系統中(可以是資料庫,可以是檔案等方式),當然自己管理是最麻煩的一種方式,但也是可控的東西最多的。關於這個我看到了有一個系列的文章(感謝作者:葬月魔帝),大家有興趣可以關注下!
如何管理Spark Streaming消費Kafka的偏移量(一) https://blog.csdn.net/u010454030/article/details/78535003 如何管理Spark Streaming消費Kafka的偏移量(二)
https://blog.csdn.net/u010454030/article/details/78554643 如何管理Spark Streaming消費Kafka的偏移量(三) https://blog.csdn.net/u010454030/article/details/78660643 - zookeeper管理:配置,offsets.storage=zookeeper,auto.commit.enable=true(開啟自動提交偏移量)接著auto.commit.interval.ms=60*1000(預設值:1分鐘提交一次),缺點:提交之後程式又運行了30s之後掛掉了,下次消費的時候就會重複消費那30s的資料,反覆的跟zk互動,影響效能。
- kafka管理:offsets.storage=kafka,enable.auto.commit=true,auto.commit.interval.ms=5000(預設是5s提交一次),實現原理是kafka自己建立一個名為__consumer_offsets的topic,將offset儲存在該topic下,推薦採用該方式,關於如何讀取這個topic的資訊:(感謝作者:huxihx)https://www.cnblogs.com/huxi2b/p/6061110.html,當然也可以使用其他的監控工具,例如kafka-manager之類的別人寫好的框架,通過前端頁面檢視topic的資料情況,和消費者偏移量情況。
- 偏移量的管理涉及到三種語義
至少一次 生產者同步傳送訊息(將acks設定為1或者-1/all) 至多一次 生產者非同步傳送訊息,無論接受到與否 僅此一次 手動管理偏移量,做到處理一條儲存一次偏移量,或者批量處理一批(處理未結束失敗了要記得回滾),儲存一次
-1.group.id:消費者的組id(非常重要),他涉及到了釋出訂閱和佇列模式
- 釋出訂閱模式:不同groupid的消費者之間消費資料互不影響,也就是說可以同時消費同一份資料。舉個例子:groupid就好像家庭id,家庭A定了一份人民日報(topic1),家庭B也定了一份人民日報(也是topic1),那麼當家庭A在看他們家買的人民日報的時候,絲毫不影響家庭B,家庭B可以同時看相同內容,也可以看不相同
- 佇列模式:相同group_id的消費者之間不可能消費同一份資料,舉個例子:家庭A定了一份人民日報(topic1),人民日報裡面有3個板塊(3個分割槽),當爸爸(消費者1)在看第一個板塊的時候(分割槽1),其他家庭成員就不能再看相同的板塊,只能去看板塊2(分割槽2)或者板塊3(分割槽3)
-2.Rebalance:通過zookeeper實現的消費者負載均衡機制,在佇列模式下,如果多個消費者的group_id相同,那麼他們就會觸發負載均衡機制。舉個例子:topicA有3個分割槽,我啟動了一個消費者x(預設這裡的消費者都只有一個執行緒),那麼消費者x就會獨自消費3個分割槽的資料。這時,我又啟動了消費者y(group_id和x相同),那麼就會馬上觸發負載均衡機制,一個消費者消費2個分割槽資料,另一個消費1個分割槽,這時候我又啟動了一個消費者z(group_id和x和y相同),這樣每個消費者都會消費1個分割槽的資料。也就是說當消費者的總個數(匯流排程數)和topicA的分割槽數一樣的時候,效能最好。如果這時候再增加消費者,那麼就會出現有一個消費者空轉(消耗資源,但接受不到資料),導致資源浪費。
-3.fetch.min.bytes:設定最少一次拉取多少資料,預設是一有資料就拉,可以調大,但是資料就會有延遲,但是請求的次數就降低,提升吞吐量(一般預設就可以了)
其他的基本上不需要太大的調整(當然如果有特殊需求的話,不同叢集情況下,引數最優肯定都是不同的,還需要具體測試),為了提升效能,有時候還需要調整broker和topic的配置,這兩部分本人暫時沒有做太深入的研究,就不在這裡瞎***吹了!
最後kafka自帶消費者:
bin/kafka-console-consumer.sh --topic topic名稱 --zookeeper zk1:埠1,zk2:埠2,zk3:埠3/zk上kafka的路徑
總結:考慮偏移量如何管理,考慮釋出訂閱還是佇列模式,考慮消費模式是否達到最優,是否設定拉資料的延遲,是否可以優化消費端的程式碼來減少資料延遲
kafka系列文章暫時到這,主要就介紹了對於叢集使用者的相關引數(生產者和消費者),對於搭建叢集(topic和broker)的一些引數,並沒有相關的介紹,原諒本人才疏學淺。如果有什麼問題或者筆誤的地方,歡迎可以留言評論~