1. 程式人生 > >sparkstreaming中kafka的offset提交

sparkstreaming中kafka的offset提交

就kafka而言,offset提交方式有兩種,自動和手動。

將enable.auto.commit設定為true,即可自動提交

props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");

或者採用commitAsync來自動提交。

sparkstreaming消費kafka資料,提交方式也是分為自動和手動兩種。貌似是一樣的,但細節上有所不同。

kafka首次啟動的時候,一般會出現下面的情況。因為有資料擠壓,所以會有很多在queued狀態的batch。

如果資料量特別大,可能會出問題,因此引數spark.streaming.kafka.maxRatePerPartition

就尤為重要,他設定sparkstreaming沒秒每個分割槽最大的消費數量,使得擠壓的資料可以慢慢消費。


此時如果引數enable.auto.commit設定為false,並且程式碼端也不手動提交,通過日誌你會發現每個batch可以正常消費,但是伺服器上檢視kafka消費情況,卻是保持沒變,是不是很詭異。當你把任務重啟之後,會發下再次從最初的位置開始消費,也就是上次執行完全沒有任何效果。

因此可以看出sparkstreaming在消費kafka的時候,自己內部儲存了一組offset。它只在第一次消費的時候從kafka取offset,然後會一直按照自己內部儲存這個offset來消費資料,但是不會把這個資料提交給任何地方(kafka或zookeeper)。因此,當任務重啟後,還是會從最初的地方開始消費,因為上次任務的消費沒提交,kafka內部的offset沒更新。

所以,除非你的streaming程式永遠不停,否則最好手動提交offset。