1. 程式人生 > >kafka生產者和消費者

kafka生產者和消費者

在使用kafka時,有時候為驗證應用程式,需要手動讀取訊息或者手動生成訊息。這個時候可以藉助kafka-console-consumer.sh和kafka-console-producer.sh 這兩個工具,它們包裝了java客戶端,讓使用者不需要編寫整個應用程式就可以與kafka主題發生互動。

生產者

kafka-console-consumer.sh工具可以用於向kafka主題寫入訊息。預設情況下,該工具將命令列的每一行視為一個訊息,訊息的鍵和值以tab字元分隔,如果沒有出現table字元,那麼鍵就是null。

控制檯生產者有兩個引數是必須指定的: --broker-list引數指定了一個或多個broker,它們以逗號峰,格式為hostname:port;另一個引數是--topic指定了生成訊息的目標主題。在生成完訊息之後,需要傳送一個EOF字元來關閉客戶端。

[[email protected] bin]# ./kafka-console-producer.sh --broker-list 10.0.102.204:9092 --topic science
sample messsage 1

在控制檯使用生產者也是可以指定配置檔案的,有兩種形式!

  • 通過--producer.config  configfile指定消費者的配置檔案,其中configfile是配置檔案的全路徑。
  • 另一種方式是直接在命令列以--producer-property  key=value的格式傳遞一個或多個引數,key是引數名字,value是引數的值。

生產者有都許多命令列引數,可以調整其行為:

--key-serializer  classname:指定訊息鍵的編碼器的類名,預設是Kafka.serializer.DefaultEncoder.
--value-serializer  classname: 指定訊息值的編碼器的類名,預設是Kafka.serializer.DefaultEncoder.
--compress-codec  string:指定生成訊息所使用的壓縮型別,可以是none, gzip, snappy或lz4,預設是gzip。
--sync       :指定以同步的方式生成訊息,也就是說,在傳送下一個訊息之前會等待當前訊息得到確認。

消費者控制檯

kafka-console-consumer.sh工具提供了一種從一個或多個主題以上讀取訊息的方式。訊息被列印在標準輸出上,訊息之間以空行分割。預設情況下,它會列印沒有經過格式化的原始訊息位元組。它有很多可選引數,其中有一些引數是必選的。

第一:要指定是否使用新版本的消費者,並指定kafka叢集的地址。如果使用的是舊版本的消費者,只需要提供--zookeeper引數。如果使用了新版本的消費者,必須使用--new-consumer和--borker-list, --borker-list後面需要跟上以逗號分割的broker地址列表。

第二:指定要讀取的主題。有3個引數可以選擇,分別是--topic, --whitelist和 --blacklist。此處執行只指定一個引數。--topic用於指定單個待讀取的主題,--whitelist和 --blacklist後面跟著一個正則表示式(在命令列可能需要轉義)。與白名單匹配的主題將會被讀取,與黑名單匹配的主題不會被讀取。

[[email protected] bin]# ./kafka-console-consumer.sh --zookeeper 10.0.102.204:2181  --topic science
test 1

除了基本的命令列引數外,也可以把消費者的其他的配置引數傳遞給控制檯消費者。可以通過兩種方式來達到這個目的。第一種方式將配置引數寫在一個檔案裡,然後通過--consumer.config 指定配置檔案。另一種是在命令列以--consumer-property  key=value的格式傳遞一個或多個引數,其中key指定引數的名字,value指定引數的值。這種方式在設定消費者屬性時會很有用,比如設定群組的ID。

控制檯消費者和控制檯生產者有一個共同的引數--property,這個引數用於向訊息格式化器傳遞配置資訊,而不是給客戶端本身傳遞配置資訊。

控制檯消費者常用的配置如下:

--formatter classname :指定訊息格式化器的類名,用於解碼訊息,它的預設值是kafka.tools.DefaultFormatter.
--from-beginning: 指定從最舊的偏移量開始讀取資料,否則就從最新的偏移量開始讀取。
--max-messages num: 指定在退出之前最多讀取NUM個訊息
--partition num: 指定只讀ID為num的分割槽(需要新版本的消費者)


訊息格式器選項:
除了預設的訊息格式器之外,還有其他3中可用的格式化器。
kafka.tools.loggingMessageFormatter: 將訊息輸出到日誌,而不是輸出到標準的輸出裝置。日誌級別為INFO,並且包含了時間戳,鍵和值。
kafka.tools.ChecksumMessageFormatter: 只打印訊息的校驗和
kafka.tools.NoOpMessageFormatter: 讀取訊息但不列印訊息


kafka.tools.DefaultFormatter有一些非常有用的配置選項,這些選項可以通過--property命令列引數指定。
print.timestamp:  設定為true, 就會列印每個訊息的時間戳。
print.key :設定為true,除了列印訊息的值外,還會列印訊息的鍵。
key.separator: 指定列印訊息的鍵和訊息的值所用的分割符。
line.separator: 指定訊息之間的分割符。
key.deserializer: 指定列印訊息的鍵所用的發序列化器的類名。
value.deserializer: 指定列印訊息的值所用的發序列化器的類名。

讀取偏移量主題

【有點問題】

有時候,我們需要知道提交的消費者群組的偏移量是多少,比如某個特定的群組是否在提交偏移量,或者偏移量提交的頻度。這個可以通過讓控制檯消費者讀取一個特殊的內部主題__consumer_offsets來實現。所有消費者的偏移量都以訊息的形式寫到這個主題上。為了解碼這個主題的訊息,需要使用Kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter這個格式化器。

[[email protected] bin]# ./kafka-console-consumer.sh --zookeeper 10.0.102.204:2181  --topic __consumer_offsets --formatter "Kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter" --max-messages 1