1. 程式人生 > >kafak-python函式使用詳解

kafak-python函式使用詳解

 

  • Consumer的非執行緒安全
  • Kafka只保證訊息不漏,即at lease once,而不保證訊息不重。關鍵點:假如consumer掛了重啟,那它將從committed offset位置(告訴server的消費的位置點)開始重新消費,而不是consume offset位置(真正的消費位置點)。這也就意味著有可能重複消費(自己消費到了某個位置,而後在告訴伺服器這個位置時,傳送失敗)
  • kafka可以重置commit嗎?給伺服器指定任意值為最後消費位置,下次消費從這個指定的位置開始消費。可以,使用commit函式,下文有講。但是需要注意:修改偏移量不會改變當前會話,在新連線裡生效
  • subscribe表示訂閱topic,從kafka記錄的offset開始消費。assign表示從指定的offset開始消費。  
  • kafka自動會從上次沒有消費的地方開始消費
  • 使用kafak自帶的指令碼檢視偏移量:./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test --offsets
  • 使用了subscribe,就不能使用assign
  • 提交:更新分割槽的當前位置稱為提交,當前版本(0.10.1.1)用topic ___consumer_offsets 儲存提交的偏移量

     

  • 偏移量:消費者在Kafka追蹤到訊息在分割槽裡的位置
  • 消費者在崩潰或者有新的消費者加入群組,就會觸發再均衡。這時需要讀取最後一次偏移量,然後從偏移量指定的地方繼續處理。提交的偏移量小於真實的偏移量,訊息會被重複處理。大於真實的偏移量,訊息會丟失。

 

from kafka.structs import TopicPartition,OffsetAndMetadata
configs = {
            'bootstrap_servers': '10.57.19.60',
            'enable_auto_commit'
: False, 'group_id': 'test', 'api_version': (0, 8, 2), 'ssl_check_hostname': False, 'consumer_timeout_ms': 3000, # 若不指定 consumer_timeout_ms,預設一直迴圈等待接收,若指定,則超時返回,不再等待 # 'ssl_certfile': ssl_certfile, # 'security_protocol': 'SSL', # 'ssl_cafile': ssl_cafile } topics=('test', ) # 注意指定分割槽將會失去故障轉移/負載均衡的支援,當然也沒有了自動分配分割槽的功能(因為已經人為指定了嘛) topic_partition = TopicPartition(topic='test',partition=0) # consumer = KafkaConsumer(**configs) # 引數必須是列表,表示訂閱的topic/partition列表 consumer.assign([topic_partition]) # 獲取分給當前使用者的topic/partition資訊 consumer.assignment() # 提交偏移量:可以告知伺服器當前偏移量,也可以設定偏移量 consumer.commit({TopicPartition(topic='test', partition=0): OffsetAndMetadata(offset=280, metadata='')}) # 非同步提交 consumer.commit_async() # 獲取伺服器的最後確認的偏移量,即最新資料開始讀取的地方 consumer.committed(TopicPartition(topic='test', partition=0)) # 獲取伺服器當前最新的偏移量,讀到這個偏移量後,所有資料都讀取完了 consumer.highwater(TopicPartition(topic='test', partition=0)) # 獲取消費的效能 consumer.metrics() # 獲取某個topic的partition資訊 consumer.partitions_for_topic(topic) # 獲取下一條資料開始讀取的偏移量,即從這個便宜量開始繼續讀取資料 consumer.position(TopicPartition(topic='test', partition=0)) # 從指定偏移量位置開始讀取資料 consumer.seek(TopicPartition(topic='test', partition=0), 283) # 從頭開始讀取資料 consumer.seek_to_beginning() # 從最後開始讀取資料 consumer.seek_to_end() # 訂閱topic,可以訂閱多個,可以使用正則表示式匹配多個 consumer.subscribe() # 獲取訂閱的資訊,無法獲取使用assign分配的topic/partition資訊 consumer.subscription() # 獲取當前使用者授權的topic資訊 consumer.topics() # 取消訊息的訂閱 consumer.unsubscribe()
# 一起消費多條訊息,最多等待時間timeout_ms,最多消費max_records
consumer.poll(self, timeout_ms=0, max_records=None) # 獲取指定分割槽第一個偏移量 consumer.beginning_offsets([topic_partition]) # 獲取指定分割槽最後一個偏移量,最新的偏移量 consumer.end_offsets([topic_partition]) # 關閉連線 consumer.close() # #consumer.seek(topic_partition,
284) for message in consumer: print(message)

 

重複消費是如何產生的?

消費者設定為自動提交偏移量時,需要同時設定自動提交偏移量的時間間隔。如果消費完若干訊息後,還沒有到自動提交偏移量的時間時,應用掛了,則系統記錄的偏移量還是之前的值,那麼剛才消費的若干訊息,會在應用重連之後重新消費

如何保證不會重複消費?

消費段記錄下傳送給伺服器的偏移量,獲取最新資料時再判斷這個偏移量是否正確

生產的訊息佇列長度,會堆積嗎?

消費的資訊佇列長度,會堆積嗎?

生產者速度大於消費者速度怎麼處理?

kafka 認證與授權機制

Kafka 目前支援SSL、SASL/Kerberos、SASL/PLAIN三種認證機制。目前支援以下安全措施:

  • clients 與 brokers 認證
  • brokers 與 zookeeper認證
  • 資料傳輸加密  between  brokers and clients, between brokers, or between brokers and tools using SSL
  • 授權clients read/write

 

kafka偏移量的相關配置

enable.auto.commit

true(預設):自動提交偏移量,可以通過配置 auto.commit.interval.ms屬性來控制提交偏移量的頻率。(基於時間間隔)

false:手動控制偏移量。可以在程式邏輯必要的時候提交偏移量,而不是基於時間隔。此時可以進行同步,非同步,同步非同步組合(參考相應api)。

auto.offset.reset

無法讀取偏移量時候讀取訊息的設定

latest(預設):從最新記錄讀取資料。

earliest:從起始位置讀取資料

參考:

1、https://zhuanlan.zhihu.com/p/33238750

2、https://help.aliyun.com/document_detail/68331.html

3、https://blog.csdn.net/xiaoguozi0218/article/details/80513849

4、https://zhuanlan.zhihu.com/p/38330574

5、https://blog.csdn.net/ZhongGuoZhiChuang/article/details/79550570

6、https://help.aliyun.com/document_detail/67233.html