kafka 消費者offset記錄位置和方式
阿新 • • 發佈:2017-08-22
inter size als 設置 zookeeper least partition tor topic
我們大家都知道,kafka消費者在會保存其消費的進度,也就是offset,存儲的位置根據選用的kafka api不同而不同。
首先來說說消費者如果是根據javaapi來消費,也就是【kafka.javaapi.consumer.ConsumerConnector】,我們會配置參數【zookeeper.connect】來消費。這種情況下,消費者的offset會更新到zookeeper的【consumers/{group}/offsets/{topic}/{partition}】目錄下,例如:
[zk: localhost(CONNECTED) 0] get /kafka/consumers/zoo-consumer-group/offsets/my-topic/0 5662 cZxid = 0x20006d28a ctime = Wed Apr 12 18:20:51 CST 2017 mZxid = 0x30132b0ed mtime = Tue Aug 22 18:53:22 CST 2017 pZxid = 0x20006d28a cversion = 0 dataVersion = 5758 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 4 numChildren = 0
如果是根據kafka默認的api來消費,即【org.apache.kafka.clients.consumer.KafkaConsumer】,我們會配置參數【bootstrap.servers】來消費。而其消費者的offset會更新到一個kafka自帶的topic【__consumer_offsets】下面,查看當前group的消費進度,則要依靠kafka自帶的工具【kafka-consumer-offset-checker】,例如:
[[email protected] data]# kafka-consumer-offset-checker --zookeeper localhost :2181/kafka --group test-consumer-group --topic stable-test [2017-08-22 19:24:24,222] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$) Group Topic Pid Offset logSize Lag Owner test-consumer-group stable-test 0 601808 601808 0 none test-consumer-group stable-test 1 602826 602828 2 none test-consumer-group stable-test 2 602136 602136 0 none
offset更新的方式,不區分是用的哪種api,大致分為兩類:
- 自動提交,設置enable.auto.commit=true,更新的頻率根據參數【auto.commit.interval.ms】來定。這種方式也被稱為【at most once】,fetch到消息後就可以更新offset,無論是否消費成功。
- 手動提交,設置enable.auto.commit=false,這種方式稱為【at least once】。fetch到消息後,等消費完成再調用方法【consumer.commitSync()】,手動更新offset;如果消費失敗,則offset也不會更新,此條消息會被重復消費一次。
kafka 消費者offset記錄位置和方式