九 assign和subscribe
阿新 • • 發佈:2018-12-11
函數 false rec 現象 不支持 l命令 produce 服務端 col
1 subscribe: 自動安排分區, 通過group自動重新的負載均衡;
- 關於Group的實驗: 如果auto commit = true, 重新啟動進程,如果是同樣的groupID,從上次commit的地方開始消費數據,但是如果換了group後,就可以繼續 消費了。
- auto commit = true, 多個consumer是同一個group,並且consumer是同時創建的,同時消費: 不出現重復消費的現象。
- auto commit = true, 多個consumer是同一個group, 但是多個consumer的啟動有時間間隔,一般也不會出現重復消費數據的情況。
- auto commit = false,多個consumer是同一個group, 但是啟動有間隔, 會出現重復消費的情況,即使同一個consumer都會出現重復。
因為本地存了offset,在進程沒重啟之前,應該是不會消費重復數據,但是為什麽會導致有重復數據呢?猜測是因為,每隔一段時間有
consumer加進來,導致rebalance, offset混亂導致?(僅僅猜測)
- auto commit = false, 多個consumers同時創建,是同一個group, 沒有發現重復的數據。因為是同時創建,是在消費之前就有了,不用rebalance,所以不會重復消 費數據。
- auto commit = false,一個consumer執行完3個poll,然後close consumer, 再啟動下一個(也就是多個consumer串行),可以重復消費。
總結: 通過以上test cases發現, 只要是auto commit = false, 並且在消費過程中,因為consumer個數的變化,就會導致有一些數據重復消費。這是因為本地保存了offset,但是沒有提交到server,rebalance會導致重復消費。
假設進程A正在消費分區1的信息,並提交了偏移量,之後又消費了10條數據,還沒來得及提交偏移量的時候,reblance機制讓進程B來繼續消費分區1的信息,
此時進程B會從上次進程A提交偏移量的地方開始消費,因此這10條數據就是重復消費的。
當reblance比較頻繁的時候,就會造成大量數據的重復。
因為kafka的offset下標的記錄實際會有兩份 ,服務端會自己記錄一份,本地的消費者客戶端也會記錄一份,提交的offset會告訴服務端已經消費到這了,
但是本地的還沒有提交的(應該是保存在進程中)並不會因此而改變offset進行再次消費。
2 assign:
手動指定消費的分區(用戶指定分區);不支持group的自動負載均衡(因為分區已經指定了,就不會在consumer之間負載均衡了);
assign不會疊加,後一個會覆蓋前一個(調用assign兩次,後一個覆蓋前一個);
多個同樣配置的consumer同時消費同一個分區:
- earliest,auto commit = false: 多個consumer同時創建,是同一組, 消費同一個分區,會重復消費,因為組根本沒意義。 如果此時繼續
- producer新消息, 也是重復消費。
- earliest:auto commit = false: 多個consumer間隔創建,是同一組, 消費同一個分區,會重復消費,因為組根本沒意義。如果此時繼續
- producer新消息, 也是重復消費。
- auto commit = true這個配置不起作用, 每次還能從頭消費,證明沒有commit。
auto commit 對assign不起作用,實驗一下consumer.commitSync()函數:
- poll完commit: 多個consumer同時創建,是同一組, 消費同一個分區,會重復消費,因為consumer同時創建,同時讀取, commit還沒來得及保存到server,所以重復消費。
- earliest, poll完commit: 多個consumer間隔創建,是同一組, 消費同一個分區,不會重復消費,因為consumer不是同時的,先commit的consumer是可以提交到server的,後面的就不會重復消費了。
auto.offset.reset值含義解釋:
- earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
- latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
- none:topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常
offset commit是在Consumer端進行的操作,將下一次消費的位置(本次poll/準確的說是fetch?的最大record的後一位)commit到服務器。 有兩種commit方式:自動提交與手動提交。 設置參數 props.put("enable.auto.commit", "true");開啟自動提交,這樣在執行poll命令後會立即將下一個offset提交至服務器。 另外,也可以通過seek函數手動控制Consumer的position(即設置poll時的起始offset),這樣就可以跳過一些數據或者獲取一些歷史數據:(註意使用seek設置指定partition的offset時該Consumer必須要先assign訂閱了該partition。)
九 assign和subscribe