1. 程式人生 > 其它 >初識kafka 之 本地IDEA傳送訊息(非同步傳送,不帶回調函式)

初識kafka 之 本地IDEA傳送訊息(非同步傳送,不帶回調函式)

基礎環境

kafka叢集

bigdata01、bigdata02、bigdata03、bigdata04、bigdata05

pom配置

 <!-- kafka 客戶端依賴-->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.0.1</version>
</dependency>

建立包 

com.lzh.kafka

實現程式碼

package
com.lzh.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; // 不帶回調函式的API public class
CustomProducer { public static void main(String[] args) { // 1.配置 Properties properties = new Properties(); // 連線叢集 // 給kafka物件新增配置資訊 bootstrap.servers // 生產者連線叢集所需的 broker 地址清單 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092,bigdata02:9092");
// 指定傳送訊息的key和value的序列化型別。一定要寫全類名。 // key,value序列化 key.serializer,value.serializer // key序列化 // 全類名與下等價: properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // value序列化 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); // 2.建立 kafka 生產者物件 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); // 3.傳送資料 // 呼叫 send 方法,傳送訊息 for (int i = 1; i < 5; i++) { if (i == 1) { kafkaProducer.send(new ProducerRecord<String, String>("Mytopic","開始報數:" )); } kafkaProducer.send(new ProducerRecord<String, String>("Mytopic","" + i )); } // 4.關閉資源 kafkaProducer.close(); } }

測試

啟動kafka服務

cd /apps/kafka_2.12-3.0.1/bin
./kafka-server-start.sh -daemon ../config/server.properties

啟動消費者(訊息接收程式)

kafka-console-consumer.sh --bootstrap-server bigdata01:9092,bigdata02:9092 --topic Mytopic

本地IDEA執行程式,並在虛擬機器檢視結果: