1. 程式人生 > >CentOS 7 搭建kafka_2.12-2.0.0叢集

CentOS 7 搭建kafka_2.12-2.0.0叢集

一、伺服器叢集

    kafka叢集把狀態儲存在zookeeper中,在搭建kafka叢集前先搭建zookeeper叢集。

    kafka叢集節點:192.168.0.24,192.168.0.48,192.168.0.60

二、搭建kafka叢集

    在三個節點新建kafka工作目錄:mkdir -p /usr/local/kafka

    將kafka解壓到/usr/local/kafka:tar -zxvf kafka_2.12-2.0.0.tgz -C /usr/local/kafka

    新建kafka日誌目錄:mkdir -p /usr/local/kafka/kafkalogs

    配置kafka配置檔案:vim /usr/local/kafka/kafka_2.12-2.0.0/config/server.properties

    server.properties配置如下:

#broker.id=0
host.name=<host_ip>
log.dirs=/usr/local/kafka/kafkalogs
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
delete.topic.enable=true
zookeeper.connect=192.168.0.24:2181,192.168.0.36:2181,192.168.0.48:2181

    將broker.id=0註釋掉,

    host.name為節點ip,

    zookeeper.connect為zookeeper叢集地址。

    kafka節點預設需要的記憶體為1G,如果需要修改記憶體,可以修改kafka-server-start.sh的配置項。

    vim /usr/local/kafka/kafka_2.12-2.0.0/bin/kafka-server-start.sh

    找到KAFKA_HEAP_OPTS配置項,例如修改如下:

    export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"

    在三個節點進行以上配置,配置完成後,即可在三個節點啟動kafka叢集。

    分別進入三個節點的kafka工作目錄:cd /usr/local/kafka/kafka_2.12-2.0.0/

    啟動三個節點的kafka:./bin/kafka-server-start.sh -daemon ./config/server.properties

    啟動後可以執行jps命令檢視kafka是否啟動,如果啟動失敗,可以進入logs目錄,檢視kafkaServer.out日誌記錄。

    kafka常用命令:

    停止kafka:./bin/kafka-server-stop.sh 

    建立topic:./bin/kafka-topics.sh --create --zookeeper 192.168.0.24:2181,192.168.0.36:2181,192.168.0.48:2181 --replication-factor 1 --partitions 1 --topic topic_name

    展示topic:./bin/kafka-topics.sh --list --zookeeper 192.168.0.24:2181,192.168.0.36:2181,192.168.0.48:2181

    描述topic:./bin/kafka-topics.sh --describe --zookeeper 192.168.0.24:2181,192.168.0.36:2181,192.168.0.48:2181 --topic topic_name

    生產者傳送訊息:./bin/kafka-console-producer.sh --broker-list 192.168.0.24:9092 --topic topic_name

    消費者消費訊息:./bin/kafka-console-consumer.sh --zookeeper 192.168.0.24:2181,192.168.0.36:2181, 192.168.0.48:2181 --topic topic_name --from-beginnin

    刪除topic:./bin/kafka-topics.sh --delete --topic topic_name --zookeeper 192.168.0.24:2181,192.168.0.36:2181, 192.168.0.48:2181

三、SpringBoot整合kafka

    新建maven工程kafka,在pom.xml檔案中引入kafka jar包,

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
	<version>2.1.10.RELEASE</version>
</dependency>
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka-test</artifactId>
	<version>2.1.10.RELEASE</version>
</dependency>

    在application.yml中配置kafka叢集引數:

spring:
  profiles: home
  application:
    name: kafka
  kafka:
    # kafka伺服器地址(可以多個)
    bootstrap-servers: 192.168.0.24:9092,192.168.0.48:9092,192.168.0.60:9092
    consumer:
      # 指定一個預設的組名
      group-id: kafka2
      # earliest:當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
      # latest:當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分割槽下的資料
      # none:topic各分割槽都存在已提交的offset時,從offset後開始消費;只要有一個分割槽不存在已提交的offset,則丟擲異常
      auto-offset-reset: earliest
      # key/value的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # key/value的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 批量抓取
      batch-size: 65536
      # 快取容量
      buffer-memory: 524288
      # 伺服器地址
      bootstrap-servers: 192.168.0.24:9092,192.168.0.48:9092,192.168.0.60:9092
app:
  topic:
    common: common

    新建傳送訊息的controller:

@RestController
@RequestMapping(value = "kafka")
public class KafkaController {

    private static final Logger logger = LoggerFactory.getLogger(KafkaController.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${app.topic.common}")
    private String topic;

    @RequestMapping(value = "send", method = RequestMethod.GET)
    @ResponseBody
    public void send(String key, String data) {
        kafkaTemplate.send(topic, key, data);
    }
}

    新建訊息監聽器:

@Component
public class KafkaListener {

    private static final Logger logger = LoggerFactory.getLogger(KafkaListener.class);

    @org.springframework.kafka.annotation.KafkaListener(topics = "${app.topic.common}")
    public void receive(ConsumerRecord<?, ?> consumer) {
        logger.info("{} - {} : {}", consumer.topic(), consumer.key(), consumer.value());
    }
}

    啟動程式,在瀏覽器中呼叫send介面,傳送訊息,觀察訊息監聽器輸出。

    在控制檯中可以看到監聽器的訊息輸出:

檢視程式碼