1. 程式人生 > >Kafka安全認證SASL/PLAIN,並和springBoot整合

Kafka安全認證SASL/PLAIN,並和springBoot整合

kafka_2.11-1.1.0.tgz、zookeeper-3.4.10.tar.gz版本

1. kafka配置

kafka解壓目錄下工作

# 1.新建配置檔案
vi ./config/kafka_server_jaas.conf
# 檔案內容
# username定義一個公共的使用者名稱,用於節點之間進行通訊,user_xxxx主要是客戶端用來連線kafka的,等號後面是密碼,xxxxx是使用者名稱,這裡大小寫一個字都不能差,除了使用者名稱和密碼
KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule
required username="admin" password="admin-sec" user_admin="admin-sec" user_producer="prod-sec" user_consumer="cons-sec"; }; # 2.修改kafka啟動時的配置檔案,server.properties # 我的做法是複製一份 cp ./config/server.properties ./config/server_sasl.properties # 修改內容如下,在文末新增如下內容: # 注意點:192.168.186.130是我當前主機ip,9092是kafka通訊埠,其他的地方保持一致
listeners=SASL_PLAINTEXT://192.168.186.130:9092 security.inter.broker.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer allow.everyone.if.no.acl.found=true # 3.修改啟動指令碼 vi ./bin/kafka-server-start.sh
# 找到 export KAFKA_HEAP_OPTS #新增jvm 引數,注意kafka_server_jaas.conf檔案是之前第一步建立的安全認證檔案 #-Djava.security.auth.login.config=/usr/local/software/kafka/config/kafka_server_jaas.conf if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/usr/local/software/kafka/config/kafka_server_jaas.conf" fi

我的是三臺機器叢集,其他節點,也要按照這樣進行配置即可,我這個版本的kafka,zookeeper不需要進行相關配置

啟動zookeeper,kafka

# 分別啟動各個kafka,指定自定義的配置檔案,先不要後端啟動,觀察,日誌有沒有出錯
./bin/kafka-server-start.sh ./config/server_sasl.properties
# 然後登陸zookeeper檢視kafka是否註冊到zookeeper裡面
[zk: localhost:2181(CONNECTED) 56] ls /brokers/ids
[0, 1, 2]
# 0,1,2對應著kafka的brokerId

2. springBoot整合

下面使用springBoot(2.o.3.REALEASE)整合需要安全認證的kafka

引入依賴

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

resources下面新建檔案:kafka_client_jaas.conf

KafkaClient {
 org.apache.kafka.common.security.plain.PlainLoginModule required
 username="adminss"
 password="admin-sec";
};

application.yml檔案配置

spring:
  kafka:
    template:
      default-topic: myTopic2
    producer:
      bootstrap-servers: 192.168.186.130:9092,192.168.186.131:9092,192.168.186.132:9092
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties: 
        sasl.mechanism: PLAIN
        security.protocol: SASL_PLAINTEXT      
    consumer: 
      bootstrap-servers: 192.168.186.130:9092,192.168.186.131:9092,192.168.186.132:9092
      group-id: group-1
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties: 
        sasl.mechanism: PLAIN
        security.protocol: SASL_PLAINTEXT

程式啟動類:

@SpringBootApplication
public class DiscoveryApplication {
    //初始化系統屬性
    static {
        System.setProperty("java.security.auth.login.config", "D:/ITCloud/sts/no-rush-parent/no-rush-discovery/src/main/resources/kafka_client_jaas.conf");
    }

    public static void main(String[] args) {
        SpringApplication.run(DiscoveryApplication.class, args);
    }
}

訊息生產者:

package com.itcloud.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class Sender {
    @Autowired
    private KafkaTemplate<String, String> template;

    public void send(String msg) {

        this.template.sendDefault("my_msg", msg);
        System.out.println("傳送訊息:" + msg);
    }
}

訊息消費者

package com.itcloud.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class Receiver {

    @KafkaListener(topics = { "myTopic2" })
    public void receiveMessage(ConsumerRecord<String, String> record) {
        System.out.println("接收訊息");
        System.out.println("【*** 接收訊息 ***】key = " + record.key() + "、value = " + record.value());
    }
}

controller

@RestController
public class KafkaController {
    @Autowired
    private Sender sender;

    @PostMapping("/send/{msg}")
    public String send(@PathVariable("msg") String msg) {
        sender.send(msg);
        return msg;
    }
}

測試完美成功,可以嘗試,改變密碼,D:/ITCloud/sts/no-rush-parent/no-rush-discovery/src/main/resources/kafka_client_jaas.conf,傳送訊息就會失敗