Spring Cloud_28_訊息驅動/Kafka的使用
阿新 • • 發佈:2019-01-08
訊息驅動/Kafka的使用
- 與RabbitMQ一樣,充當訊息代理中介軟體的角色
1、下載Zookeeper/Kafka
-
Kafka依賴於Zookeeper,Zookeeper是一個服務的管理框架,在啟動Kafka(2.11)服務之前,需要先啟動Zookeeper(3.4.8)
2、啟動Zookeeper
- 進入%Zookeeper_Home%\conf
- 複製zoo_sample.cfg,並更名為zoo.cfg
- 進入%Zookeeper_Home%\bin,進入命令列視窗
- 使用zkServer命令,啟動zookeeper,預設埠2181
3、啟動Kafka
- 進入%kafka_Home%bin\windows,進入命令列視窗
- 預設是使用%kafka_Home%\config中的server.properties,啟動kafka
- 使用 kafka-server-start ../../config/server.properties,啟動kafka,預設9092埠
4、建立atm_kafka_client
4.1、引入依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId >
<version>0.11.0.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.9</version>
</dependency>
4.2、SendMessage
package com.atm.cloud;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* 向Kafka伺服器傳送訊息
*/
public class SendMessage {
public static void main(String[] args) throws Exception {
// 配置資訊
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// String的序列化類
// 設定資料key的序列化處理類
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
// 設定資料value的序列化處理類
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
// 建立生產者例項
Producer<String, String> producer = new KafkaProducer<String, String>(
props);
// 建立一條新的記錄,第一個引數為Topic名稱
// 會向topic傳送userName-aitemi鍵值,所有的資料都是通過鍵值儲存的
ProducerRecord record = new ProducerRecord<String, String>("my-topic",
"userName", "aitemi");
// 傳送記錄
producer.send(record);
producer.close();
}
}
- 執行main
- 進入%kafka_Home%bin\windows,進入命令列視窗,鍵如 kafka-topic –list –zookeeper localhost:2181
- 檢視新建的topic
- kafka中的topic類似rabbitmq中的佇列,我們剛剛向topic中傳送了一個訊息
4.3、ReadMessage
package com.atm.cloud;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
/**
* 消費者,訂閱"my-topic",獲取其中的資訊
*/
public class ReadMessage {
public static void main(String[] args) throws Exception {
// 配置資訊
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// 必須指定消費者組
props.put("group.id", "test");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(
props);
// 訂閱 my-topic 的訊息,可以訂閱多個topic
consumer.subscribe(Arrays.asList("my-topic"));
// 到伺服器中讀取記錄,會一直拉取
while (true) {
// 通過consumer的一個拉取方法
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("這是消費者A,key: " + record.key() + ", value: "
+ record.value());
}
}
}
}
- 每傳送一次訊息,均會接收到
4.4、消費者組
- 消費者會為自己新增一個消費者組,每一條釋出到topic的記錄都會被交付到消費者組
- 如果多個消費者例項有相同消費者組,那麼資訊會分配其中一個消費例項上
- 如果所有的消費者都有不同的消費者組,那麼訊息會被廣播到全部的消費者進行處理
- 通過這樣的機制來實現負載均衡的功能
// 必須指定消費者組
props.put("group.id", "test");