初識kafka 之 本地IDEA傳送訊息(非同步傳送,不帶回調函式)
阿新 • • 發佈:2022-04-14
基礎環境
kafka叢集
bigdata01、bigdata02、bigdata03、bigdata04、bigdata05
pom配置
<!-- kafka 客戶端依賴--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.1</version> </dependency>
建立包
com.lzh.kafka
實現程式碼
packagecom.lzh.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; // 不帶回調函式的API public classCustomProducer { public static void main(String[] args) { // 1.配置 Properties properties = new Properties(); // 連線叢集 // 給kafka物件新增配置資訊 bootstrap.servers // 生產者連線叢集所需的 broker 地址清單 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092,bigdata02:9092");// 指定傳送訊息的key和value的序列化型別。一定要寫全類名。 // key,value序列化 key.serializer,value.serializer // key序列化 // 全類名與下等價: properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // value序列化 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); // 2.建立 kafka 生產者物件 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); // 3.傳送資料 // 呼叫 send 方法,傳送訊息 for (int i = 1; i < 5; i++) { if (i == 1) { kafkaProducer.send(new ProducerRecord<String, String>("Mytopic","開始報數:" )); } kafkaProducer.send(new ProducerRecord<String, String>("Mytopic","" + i )); } // 4.關閉資源 kafkaProducer.close(); } }
測試
啟動kafka服務
cd /apps/kafka_2.12-3.0.1/bin ./kafka-server-start.sh -daemon ../config/server.properties
啟動消費者(訊息接收程式)
kafka-console-consumer.sh --bootstrap-server bigdata01:9092,bigdata02:9092 --topic Mytopic
本地IDEA執行程式,並在虛擬機器檢視結果: