1. 程式人生 > >Spring Cloud_28_訊息驅動/Kafka的使用

Spring Cloud_28_訊息驅動/Kafka的使用

訊息驅動/Kafka的使用

  • 與RabbitMQ一樣,充當訊息代理中介軟體的角色

1、下載Zookeeper/Kafka

  • Kafka依賴於Zookeeper,Zookeeper是一個服務的管理框架,在啟動Kafka(2.11)服務之前,需要先啟動Zookeeper(3.4.8)

  • Kafka

2、啟動Zookeeper

  • 進入%Zookeeper_Home%\conf
  • 複製zoo_sample.cfg,並更名為zoo.cfg
  • 進入%Zookeeper_Home%\bin,進入命令列視窗
  • 使用zkServer命令,啟動zookeeper,預設埠2181

3、啟動Kafka

  • 進入%kafka_Home%bin\windows,進入命令列視窗
  • 預設是使用%kafka_Home%\config中的server.properties,啟動kafka
  • 使用 kafka-server-start ../../config/server.properties,啟動kafka,預設9092埠

4、建立atm_kafka_client

4.1、引入依賴

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId
>
<version>0.11.0.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.9</version> </dependency>

4.2、SendMessage

package com.atm.cloud;

import java.util.Properties;

import
org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; /** * 向Kafka伺服器傳送訊息 */ public class SendMessage { public static void main(String[] args) throws Exception { // 配置資訊 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); // String的序列化類 // 設定資料key的序列化處理類 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 設定資料value的序列化處理類 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 建立生產者例項 Producer<String, String> producer = new KafkaProducer<String, String>( props); // 建立一條新的記錄,第一個引數為Topic名稱 // 會向topic傳送userName-aitemi鍵值,所有的資料都是通過鍵值儲存的 ProducerRecord record = new ProducerRecord<String, String>("my-topic", "userName", "aitemi"); // 傳送記錄 producer.send(record); producer.close(); } }
  • 執行main
  • 進入%kafka_Home%bin\windows,進入命令列視窗,鍵如 kafka-topic –list –zookeeper localhost:2181
  • 檢視新建的topic

  • kafka中的topic類似rabbitmq中的佇列,我們剛剛向topic中傳送了一個訊息

4.3、ReadMessage

package com.atm.cloud;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
 * 消費者,訂閱"my-topic",獲取其中的資訊
 */
public class ReadMessage {

    public static void main(String[] args) throws Exception {
        // 配置資訊
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        // 必須指定消費者組
        props.put("group.id", "test");
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(
                props);
        // 訂閱 my-topic 的訊息,可以訂閱多個topic
        consumer.subscribe(Arrays.asList("my-topic"));
        // 到伺服器中讀取記錄,會一直拉取
        while (true) {
            // 通過consumer的一個拉取方法
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("這是消費者A,key: " + record.key() + ", value: "
                        + record.value());
            }
        }
    }
}
  • 每傳送一次訊息,均會接收到

4.4、消費者組

  • 消費者會為自己新增一個消費者組,每一條釋出到topic的記錄都會被交付到消費者組
  • 如果多個消費者例項有相同消費者組,那麼資訊會分配其中一個消費例項上
  • 如果所有的消費者都有不同的消費者組,那麼訊息會被廣播到全部的消費者進行處理
  • 通過這樣的機制來實現負載均衡的功能
// 必須指定消費者組
props.put("group.id", "test");