sparkstreaming中kafka的offset提交
阿新 • • 發佈:2019-01-31
就kafka而言,offset提交方式有兩種,自動和手動。
將enable.auto.commit設定為true,即可自動提交
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
或者採用commitAsync來自動提交。
sparkstreaming消費kafka資料,提交方式也是分為自動和手動兩種。貌似是一樣的,但細節上有所不同。
kafka首次啟動的時候,一般會出現下面的情況。因為有資料擠壓,所以會有很多在queued狀態的batch。
如果資料量特別大,可能會出問題,因此引數spark.streaming.kafka.maxRatePerPartition
此時如果引數enable.auto.commit設定為false,並且程式碼端也不手動提交,通過日誌你會發現每個batch可以正常消費,但是伺服器上檢視kafka消費情況,卻是保持沒變,是不是很詭異。當你把任務重啟之後,會發下再次從最初的位置開始消費,也就是上次執行完全沒有任何效果。
因此可以看出sparkstreaming在消費kafka的時候,自己內部儲存了一組offset。它只在第一次消費的時候從kafka取offset,然後會一直按照自己內部儲存這個offset來消費資料,但是不會把這個資料提交給任何地方(kafka或zookeeper)。因此,當任務重啟後,還是會從最初的地方開始消費,因為上次任務的消費沒提交,kafka內部的offset沒更新。
所以,除非你的streaming程式永遠不停,否則最好手動提交offset。