Kafka安全認證SASL/PLAIN,並和springBoot整合
阿新 • • 發佈:2019-02-09
kafka_2.11-1.1.0.tgz、zookeeper-3.4.10.tar.gz版本
1. kafka配置
kafka解壓目錄下工作
# 1.新建配置檔案
vi ./config/kafka_server_jaas.conf
# 檔案內容
# username定義一個公共的使用者名稱,用於節點之間進行通訊,user_xxxx主要是客戶端用來連線kafka的,等號後面是密碼,xxxxx是使用者名稱,這裡大小寫一個字都不能差,除了使用者名稱和密碼
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-sec"
user_admin="admin-sec"
user_producer="prod-sec"
user_consumer="cons-sec";
};
# 2.修改kafka啟動時的配置檔案,server.properties
# 我的做法是複製一份
cp ./config/server.properties ./config/server_sasl.properties
# 修改內容如下,在文末新增如下內容:
# 注意點:192.168.186.130是我當前主機ip,9092是kafka通訊埠,其他的地方保持一致
listeners=SASL_PLAINTEXT://192.168.186.130:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
# 3.修改啟動指令碼
vi ./bin/kafka-server-start.sh
# 找到 export KAFKA_HEAP_OPTS
#新增jvm 引數,注意kafka_server_jaas.conf檔案是之前第一步建立的安全認證檔案
#-Djava.security.auth.login.config=/usr/local/software/kafka/config/kafka_server_jaas.conf
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/usr/local/software/kafka/config/kafka_server_jaas.conf"
fi
我的是三臺機器叢集,其他節點,也要按照這樣進行配置即可,我這個版本的kafka,zookeeper不需要進行相關配置
啟動zookeeper,kafka
# 分別啟動各個kafka,指定自定義的配置檔案,先不要後端啟動,觀察,日誌有沒有出錯
./bin/kafka-server-start.sh ./config/server_sasl.properties
# 然後登陸zookeeper檢視kafka是否註冊到zookeeper裡面
[zk: localhost:2181(CONNECTED) 56] ls /brokers/ids
[0, 1, 2]
# 0,1,2對應著kafka的brokerId
2. springBoot整合
下面使用springBoot(2.o.3.REALEASE)整合需要安全認證的kafka
引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
resources下面新建檔案:kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="adminss"
password="admin-sec";
};
application.yml檔案配置
spring:
kafka:
template:
default-topic: myTopic2
producer:
bootstrap-servers: 192.168.186.130:9092,192.168.186.131:9092,192.168.186.132:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
sasl.mechanism: PLAIN
security.protocol: SASL_PLAINTEXT
consumer:
bootstrap-servers: 192.168.186.130:9092,192.168.186.131:9092,192.168.186.132:9092
group-id: group-1
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
sasl.mechanism: PLAIN
security.protocol: SASL_PLAINTEXT
程式啟動類:
@SpringBootApplication
public class DiscoveryApplication {
//初始化系統屬性
static {
System.setProperty("java.security.auth.login.config", "D:/ITCloud/sts/no-rush-parent/no-rush-discovery/src/main/resources/kafka_client_jaas.conf");
}
public static void main(String[] args) {
SpringApplication.run(DiscoveryApplication.class, args);
}
}
訊息生產者:
package com.itcloud.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class Sender {
@Autowired
private KafkaTemplate<String, String> template;
public void send(String msg) {
this.template.sendDefault("my_msg", msg);
System.out.println("傳送訊息:" + msg);
}
}
訊息消費者
package com.itcloud.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class Receiver {
@KafkaListener(topics = { "myTopic2" })
public void receiveMessage(ConsumerRecord<String, String> record) {
System.out.println("接收訊息");
System.out.println("【*** 接收訊息 ***】key = " + record.key() + "、value = " + record.value());
}
}
controller
@RestController
public class KafkaController {
@Autowired
private Sender sender;
@PostMapping("/send/{msg}")
public String send(@PathVariable("msg") String msg) {
sender.send(msg);
return msg;
}
}
測試完美成功,可以嘗試,改變密碼,D:/ITCloud/sts/no-rush-parent/no-rush-discovery/src/main/resources/kafka_client_jaas.conf,傳送訊息就會失敗