使用JavaAPI 實現操作消費Kafak資料,偶遇一坑!
阿新 • • 發佈:2018-12-08
一、檢查環境是否正常
檢視虛擬機器中的各個節點啟動是否正常,這一步很關鍵。產品上線前不可能直接拉到伺服器上測試,肯定在自己搭建的叢集中先行測試;通過kafka控制檯消費者是否可以消費資料;通過Java API 是否可以獲取到kafka的訊息。
二、示例程式碼!
import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class kafkaConsumer { public static void main(String[] args) { Properties props = new Properties(); // 定義kakfa 服務的地址,不需要將所有broker指定上 props.put("bootstrap.servers", "192.168.22.132:9092"); // 制定consumer group props.put("group.id", "gg"); // 是否自動確認offset props.put("enable.auto.commit", "true"); // 自動確認offset的時間間隔 props.put("auto.commit.interval.ms", "1000"); // key的序列化類 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的序列化類 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 定義consumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 消費者訂閱的topic, 可同時訂閱多個 consumer.subscribe(Arrays.asList("tt")); while (true) { // 讀取資料,讀取超時時間為100ms ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } }
三、在虛擬機器中,先要生產資料!
1.啟動生產資料命令!
java -cp /home/hadoop/tools/producter.jar producter.ProductLog /home/hadoop/install/a.tsv
2.啟動flume
bin/flume-ng agent -c conf --name a1 -f /home/hadoop/install/flume/conf/flume-kafka.conf
3.開啟消費者
./bin/kafka-console-consumer.sh --zookeeper 192.168.22.132:2181 --topic tt --from-beginning
4.檢視kafka的logs 是否產生資料!
5.然後再操作JavaAPI,這是會在控制檯輸出資料,說明已經成功!!!!
四、出現的問題
1.控制檯沒有資料產生,或者報錯,那麼這時需要到虛擬機器中修改一個配置檔案新增port 和 host.naee即可!
[[email protected] config]$ pwd
/home/hadoop/install/kafka/config
[[email protected] config]$ vi server.properties
# The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 #listeners=PLAINTEXT://:9092 ############################新增port 和 host.naee############## port=9092 host.name=192.168.22.132 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092
配置完成後,重啟kafka叢集!!!