1. 程式人生 > >九 assign和subscribe

九 assign和subscribe

函數 false rec 現象 不支持 l命令 produce 服務端 col

1 subscribe: 自動安排分區, 通過group自動重新的負載均衡;

  • 關於Group的實驗: 如果auto commit = true, 重新啟動進程,如果是同樣的groupID,從上次commit的地方開始消費數據,但是如果換了group後,就可以繼續 消費了。
  • auto commit = true, 多個consumer是同一個group,並且consumer是同時創建的,同時消費: 不出現重復消費的現象。
  • auto commit = true, 多個consumer是同一個group, 但是多個consumer的啟動有時間間隔,一般也不會出現重復消費數據的情況。
  • auto commit = false,多個consumer是同一個group, 但是啟動有間隔, 會出現重復消費的情況,即使同一個consumer都會出現重復。

因為本地存了offset,在進程沒重啟之前,應該是不會消費重復數據,但是為什麽會導致有重復數據呢?猜測是因為,每隔一段時間有

consumer加進來,導致rebalance, offset混亂導致?(僅僅猜測

  • auto commit = false, 多個consumers同時創建,是同一個group, 沒有發現重復的數據。因為是同時創建,是在消費之前就有了,不用rebalance,所以不會重復消 費數據。
  • auto commit = false,一個consumer執行完3個poll,然後close consumer, 再啟動下一個(也就是多個consumer串行),可以重復消費。

總結: 通過以上test cases發現, 只要是auto commit = false, 並且在消費過程中,因為consumer個數的變化,就會導致有一些數據重復消費。這是因為本地保存了offset,但是沒有提交到server,rebalance會導致重復消費。

假設進程A正在消費分區1的信息,並提交了偏移量,之後又消費了10條數據,還沒來得及提交偏移量的時候,reblance機制讓進程B來繼續消費分區1的信息,

此時進程B會從上次進程A提交偏移量的地方開始消費,因此這10條數據就是重復消費的。
當reblance比較頻繁的時候,就會造成大量數據的重復。    

因為kafka的offset下標的記錄實際會有兩份
服務端會自己記錄一份,本地的消費者客戶端也會記錄一份,提交的offset會告訴服務端已經消費到這了, 但是本地的還沒有提交的(應該是保存在進程中)並不會因此而改變offset進行再次消費。

2 assign:

手動指定消費的分區(用戶指定分區);不支持group的自動負載均衡(因為分區已經指定了,就不會在consumer之間負載均衡了);

assign不會疊加,後一個會覆蓋前一個(調用assign兩次,後一個覆蓋前一個);

多個同樣配置的consumer同時消費同一個分區:

  • earliest,auto commit = false: 多個consumer同時創建,是同一組, 消費同一個分區,會重復消費,因為組根本沒意義。 如果此時繼續
  •      producer新消息, 也是重復消費
  • earliest:auto commit = false: 多個consumer間隔創建,是同一組, 消費同一個分區,會重復消費,因為組根本沒意義。如果此時繼續
  • producer新消息, 也是重復消費
  • auto commit = true這個配置不起作用, 每次還能從頭消費,證明沒有commit。



auto commit 對assign不起作用,實驗一下consumer.commitSync()函數:

  • poll完commit: 多個consumer同時創建,是同一組, 消費同一個分區,會重復消費,因為consumer同時創建,同時讀取, commit還沒來得及保存到server,所以重復消費。
  • earliest, poll完commit: 多個consumer間隔創建,是同一組, 消費同一個分區,不會重復消費,因為consumer不是同時的,先commit的consumer是可以提交到server的,後面的就不會重復消費了。


auto.offset.reset值含義解釋:

  • earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
  • latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
  • none:topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常

offset commit是在Consumer端進行的操作,將下一次消費的位置(本次poll/準確的說是fetch?的最大record的後一位)commit到服務器。
有兩種commit方式:自動提交與手動提交。
設置參數 props.put("enable.auto.commit", "true");開啟自動提交,這樣在執行poll命令後會立即將下一個offset提交至服務器。

另外,也可以通過seek函數手動控制Consumer的position(即設置poll時的起始offset),這樣就可以跳過一些數據或者獲取一些歷史數據:(註意使用seek設置指定partition的offset時該Consumer必須要先assign訂閱了該partition。)

九 assign和subscribe