kafka生產者和消費者
在使用kafka時,有時候為驗證應用程式,需要手動讀取訊息或者手動生成訊息。這個時候可以藉助kafka-console-consumer.sh和kafka-console-producer.sh 這兩個工具,它們包裝了java客戶端,讓使用者不需要編寫整個應用程式就可以與kafka主題發生互動。
生產者
kafka-console-consumer.sh工具可以用於向kafka主題寫入訊息。預設情況下,該工具將命令列的每一行視為一個訊息,訊息的鍵和值以tab字元分隔,如果沒有出現table字元,那麼鍵就是null。
控制檯生產者有兩個引數是必須指定的: --broker-list引數指定了一個或多個broker,它們以逗號峰,格式為hostname:port;另一個引數是--topic指定了生成訊息的目標主題。在生成完訊息之後,需要傳送一個EOF字元來關閉客戶端。
[[email protected] bin]# ./kafka-console-producer.sh --broker-list 10.0.102.204:9092 --topic science sample messsage 1
在控制檯使用生產者也是可以指定配置檔案的,有兩種形式!
- 通過--producer.config configfile指定消費者的配置檔案,其中configfile是配置檔案的全路徑。
- 另一種方式是直接在命令列以--producer-property key=value的格式傳遞一個或多個引數,key是引數名字,value是引數的值。
生產者有都許多命令列引數,可以調整其行為:
--key-serializer classname:指定訊息鍵的編碼器的類名,預設是Kafka.serializer.DefaultEncoder. --value-serializer classname: 指定訊息值的編碼器的類名,預設是Kafka.serializer.DefaultEncoder. --compress-codec string:指定生成訊息所使用的壓縮型別,可以是none, gzip, snappy或lz4,預設是gzip。 --sync :指定以同步的方式生成訊息,也就是說,在傳送下一個訊息之前會等待當前訊息得到確認。
消費者控制檯
kafka-console-consumer.sh工具提供了一種從一個或多個主題以上讀取訊息的方式。訊息被列印在標準輸出上,訊息之間以空行分割。預設情況下,它會列印沒有經過格式化的原始訊息位元組。它有很多可選引數,其中有一些引數是必選的。
第一:要指定是否使用新版本的消費者,並指定kafka叢集的地址。如果使用的是舊版本的消費者,只需要提供--zookeeper引數。如果使用了新版本的消費者,必須使用--new-consumer和--borker-list, --borker-list後面需要跟上以逗號分割的broker地址列表。
第二:指定要讀取的主題。有3個引數可以選擇,分別是--topic, --whitelist和 --blacklist。此處執行只指定一個引數。--topic用於指定單個待讀取的主題,--whitelist和 --blacklist後面跟著一個正則表示式(在命令列可能需要轉義)。與白名單匹配的主題將會被讀取,與黑名單匹配的主題不會被讀取。
[[email protected] bin]# ./kafka-console-consumer.sh --zookeeper 10.0.102.204:2181 --topic science test 1
除了基本的命令列引數外,也可以把消費者的其他的配置引數傳遞給控制檯消費者。可以通過兩種方式來達到這個目的。第一種方式將配置引數寫在一個檔案裡,然後通過--consumer.config 指定配置檔案。另一種是在命令列以--consumer-property key=value的格式傳遞一個或多個引數,其中key指定引數的名字,value指定引數的值。這種方式在設定消費者屬性時會很有用,比如設定群組的ID。
控制檯消費者和控制檯生產者有一個共同的引數--property,這個引數用於向訊息格式化器傳遞配置資訊,而不是給客戶端本身傳遞配置資訊。
控制檯消費者常用的配置如下:
--formatter classname :指定訊息格式化器的類名,用於解碼訊息,它的預設值是kafka.tools.DefaultFormatter. --from-beginning: 指定從最舊的偏移量開始讀取資料,否則就從最新的偏移量開始讀取。 --max-messages num: 指定在退出之前最多讀取NUM個訊息 --partition num: 指定只讀ID為num的分割槽(需要新版本的消費者) 訊息格式器選項: 除了預設的訊息格式器之外,還有其他3中可用的格式化器。 kafka.tools.loggingMessageFormatter: 將訊息輸出到日誌,而不是輸出到標準的輸出裝置。日誌級別為INFO,並且包含了時間戳,鍵和值。 kafka.tools.ChecksumMessageFormatter: 只打印訊息的校驗和 kafka.tools.NoOpMessageFormatter: 讀取訊息但不列印訊息 kafka.tools.DefaultFormatter有一些非常有用的配置選項,這些選項可以通過--property命令列引數指定。 print.timestamp: 設定為true, 就會列印每個訊息的時間戳。 print.key :設定為true,除了列印訊息的值外,還會列印訊息的鍵。 key.separator: 指定列印訊息的鍵和訊息的值所用的分割符。 line.separator: 指定訊息之間的分割符。 key.deserializer: 指定列印訊息的鍵所用的發序列化器的類名。 value.deserializer: 指定列印訊息的值所用的發序列化器的類名。
讀取偏移量主題
【有點問題】
有時候,我們需要知道提交的消費者群組的偏移量是多少,比如某個特定的群組是否在提交偏移量,或者偏移量提交的頻度。這個可以通過讓控制檯消費者讀取一個特殊的內部主題__consumer_offsets來實現。所有消費者的偏移量都以訊息的形式寫到這個主題上。為了解碼這個主題的訊息,需要使用Kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter這個格式化器。
[[email protected] bin]# ./kafka-console-consumer.sh --zookeeper 10.0.102.204:2181 --topic __consumer_offsets --formatter "Kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter" --max-messages 1