1. 程式人生 > >SpringBoot整合Kafka:簡單收發訊息案例

SpringBoot整合Kafka:簡單收發訊息案例

環境說明

  • Windows 10 1709
  • IDEA 2017.3.2
  • SpringBoot 2.0.M7
  • Spring-Kafka 2.1.0.RELEASE
  • JDK 1.8.0_144
  • Maven 3.5.0
  • 阿里雲ECS
    • CentOS 7
    • Kafka 2.12-1.0.0
    • zookeeper 3.4.10

下載並解壓Kafka

下載tgz包
wget http://mirrors.hust.edu.cn/apache/kafka/1.0.0/kafka_2.12-1.0.0.tgz

解壓
tar -zxvf kafka_2.12-1.0.0.tgz

進入解壓後的資料夾
cd kafka_2.12-1.0.0.tgz

啟動Kafka

Kafka使用ZooKeeper,所以需要先啟動一個ZooKeeper伺服器,如果你的機器上還沒有。你可以使用隨Kafka一起打包的便捷指令碼來獲取一個快速但是比較粗糙的單節點ZooKeeper例項。

bin/zookeeper-server-start.sh config/zookeeper.properties

配置kafka

Kafka在config目錄下提供了一個基本的配置檔案。為了保證可以遠端訪問Kafka,我們需要修改兩處配置。

開啟config/server.properties檔案,在很靠前的位置有listeners和 advertised.listeners兩處配置的註釋,去掉這兩個註釋,並且根據當前伺服器的IP修改如下:

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
delete.topic.enable=true
host.name=172.17.7.97
advertised.host.name=47.94.106.42
############################# Socket Server Settings ###
########################## # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://阿里雲內網ip:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). advertised.listeners=PLAINTEXT://阿里雲外網ip:9092

啟動kafka

bin/kafka-server-start.sh config/server.properties

可能會報錯,因為我的阿里雲記憶體是1g,所以記憶體不足啦.不用懷疑,kafka預設需要1g記憶體.

Java Hotspot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c5330000, 986513408, 0) failed; error='Cannot allocate memory' (errno=12)

將 kafka-server-start.sh的
export KAFKA_HEAP_OPTS=”-Xmx1G -Xms1G”
修改為
export KAFKA_HEAP_OPTS=”-Xmx256M -Xms128M”

再次啟動,成功啟動.

建立SpringBoot專案

pom.xml

在pom.xml中加入下面的依賴

 <!-- springBoot整合kafka-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.1.0.RELEASE</version>
        </dependency>

application.yml

在resource中新建file,命名為application.yml

# kafka
spring:
  kafka:
    # kafka伺服器地址(可以多個)
    bootstrap-servers: 阿里雲外網ip:9092
    consumer:
      # 指定一個預設的組名
      group-id: kafka2
      # earliest:當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
      # latest:當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分割槽下的資料
      # none:topic各分割槽都存在已提交的offset時,從offset後開始消費;只要有一個分割槽不存在已提交的offset,則丟擲異常
      auto-offset-reset: earliest
      # key/value的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # key/value的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 批量抓取
      batch-size: 65536
      # 快取容量
      buffer-memory: 524288
      # 伺服器地址
      bootstrap-servers: 阿里雲外網ip:9092

KafkaController.java

在預設包下,新建一個KafkaController.java

package xin.csqsx.kafka3;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 包名 xin.csqsx.restful
 * 類名 KafkaController
 * 類描述 springBoot整合kafka
 *
 * @author dell
 * @version 1.0
 * 建立日期 2017/12/15
 * 時間 11:55
 */
@RestController
@EnableAutoConfiguration
public class KafkaController {

    /**
     * 注入kafkaTemplate
     */
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;


    /**
     * 傳送訊息的方法
     *
     * @param key  推送資料的key
     * @param data 推送資料的data
     */
    private void send(String key, String data) {
        kafkaTemplate.send("test", key, data);
    }

    @RequestMapping("/kafka")
    public String testKafka() {
        int iMax = 6;
        for (int i = 1; i < iMax; i++) {
            send("key" + i, "data" + i);
        }
        return "success";
    }

    public static void main(String[] args) {
        SpringApplication.run(KafkaController.class, args);
    }

    /**
     * 使用日誌列印訊息
     */
    private static Logger logger = LoggerFactory.getLogger(KafkaController.class);

    @KafkaListener(topics = "test")
    public void receive(ConsumerRecord<?, ?> consumer) {
        logger.info("{} - {}:{}", consumer.topic(), consumer.key(), consumer.value());
    }


}
2017-12-18 10:40:19.877  INFO 10084 --- [nio-8080-exec-8] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0
2017-12-18 10:40:19.877  INFO 10084 --- [nio-8080-exec-8] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : aaa7af6d4a11b29d
2017-12-18 10:40:20.164  INFO 10084 --- [ntainer#0-0-C-1] xin.csqsx.kafka3.KafkaController         : test - key1:data1
2017-12-18 10:40:20.164  INFO 10084 --- [ntainer#0-0-C-1] xin.csqsx.kafka3.KafkaController         : test - key2:data2
2017-12-18 10:40:20.164  INFO 10084 --- [ntainer#0-0-C-1] xin.csqsx.kafka3.KafkaController         : test - key3:data3
2017-12-18 10:40:20.164  INFO 10084 --- [ntainer#0-0-C-1] xin.csqsx.kafka3.KafkaController         : test - key4:data4
2017-12-18 10:40:20.164  INFO 10084 --- [ntainer#0-0-C-1] xin.csqsx.kafka3.KafkaController         : test - key5:data5

2017/12/18
Lucifer*