Kafka 入門實戰(4)--開啟 Kerberos 認證
本主要介紹在 Kafka 中如何配置 Kerberos 認證,文中所使用到的軟體版本:Java 1.8.0_191、Kafka 2.13-2.4.1、Kerberos1.15.1。
1、Kerberos 安裝
要使用 Kerberos 服務,需先安裝 Kerberos,安裝方法可參考:Kerberos 入門實戰(2)--Kerberos 安裝及使用。
2、Zookeeper 開啟 Kerberos 認證
Zookeeper 開啟 Kerberos 認證的方法參見:Zookeeper 入門實戰(4)--開啟 Kerberos 認證。
3、Kafka 開啟 Kerberos 認證
假設 Kafka 叢集資訊如下:
ip | 主機名 | 描述 |
10.49.196.10 | pxc1 | zookeeper、kafka |
10.49.196.11 | pxc2 | zookeeper、kafka |
10.49.196.12 | pxc3 | zookeeper、kafka |
3.1、建立 keytab
在安裝 Kerberos 的機器上進入 kadmin(Kerberos 服務端上使用 kadmin.local,安裝了 Kerberos Client 的機器上可以使用 kadmin),然後執行如下命令分別建立服務端和客戶端的 keytab:
kadmin.local: add_principal -randkey kafka-server/[email protected] kadmin.local: add_principal-randkey kafka-server/[email protected] kadmin.local: add_principal -randkey kafka-server/[email protected] kadmin.local: add_principal -randkey kafka-[email protected] kadmin.local: xst -k /root/zk-server.keytab kafka-server/[email protected] kadmin.local: xst -k /root/zk-server.keytab kafka-server/[email protected] kadmin.local: xst-k /root/zk-server.keytab kafka-server/[email protected] kadmin.local: xst -k /root/zk-client.keytab [email protected]
3.2、配置
3.2.1、配置 hosts 檔案
10.49.196.10 pxc1 10.49.196.11 pxc2 10.49.196.12 pxc3
3.2.2、Kerberos 相關配置
拷貝 krb5.conf 及 keytab 檔案到所有安裝 Kafka 的機器,這裡把檔案都放到 Kafka 的 config/kerveros 目錄下(kerberos 目錄需新建)。
2.2.3、Kafka 服務端配置
A、修改 config/server.properties,增加如下配置:
security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=GSSAPI sasl.enabled.mechanisms=GSSAPI sasl.kerberos.service.name=kafka-server
B、新建kafka-server-jaas.conf 檔案,該檔案也放到Kafka 的 config/kerveros 目錄下。
KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/home/hadoop/app/kafka_2.13-2.4.1/config/kerberos/kafka-server.keytab" storeKey=true useTicketCache=false principal="kafka-server/pxc1@ABC.COM"; #不同的主機,需修改為本機的主機名 }; //Zookeeper client authentication Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/home/hadoop/app/kafka_2.13-2.4.1/config/kerberos/kafka-server.keytab" storeKey=true useTicketCache=false principal="kafka-server/pxc1@ABC.COM"; #不同的主機,需修改為本機的主機名 };
C、修改 bin/kafka-server-start.sh指令碼,倒數第二行增加如下配置:
export KAFKA_OPTS="-Dzookeeper.sasl.client=true -Dzookeeper.sasl.client.username=zk-server -Djava.security.krb5.conf=/home/hadoop/app/kafka_2.13-2.4.1/config/kerberos/krb5.conf -Djava.security.auth.login.config=/home/hadoop/app/kafka_2.13-2.4.1/config/kerberos/kafka-server-jaas.conf"
如果 Zookeeper 沒有開啟 Kerberos 認證,則這裡zookeeper.sasl.client 可設為 false;僅僅啟用 Kafka 的 Kerberos 認證。
3.2.4、Kafka 客戶端配置
該配置主要為了使用 bin/kafka-topics.sh、bin/kafka-console-consumer.sh、kafka-console-producer.sh 等命令。
A、新建 client.properties 檔案,該檔案也放到Kafka 的 config/kerveros 目錄下。
security.protocol=SASL_PLAINTEXt sasl.mechanism=GSSAPI sasl.kerberos.service.name=kafka-server
B、新建kafka-client-jaas.conf 檔案,該檔案也放到Kafka 的 config/kerveros 目錄下。
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/home/hadoop/app/kafka_2.13-2.4.1/config/kerberos/kafka-client.keytab" storeKey=true useTicketCache=false principal="[email protected]"; };
C、修改bin/kafka-topics.sh、bin/kafka-console-consumer.sh、kafka-console-producer.sh 指令碼,倒數第二行增加如下配置:
export KAFKA_OPTS="-Djava.security.krb5.conf=/home/hadoop/app/kafka_2.13-2.4.1/config/kerberos/krb5.conf -Djava.security.auth.login.config=/home/hadoop/app/kafka_2.13-2.4.1/config/kerberos/kafka-client-jaas.conf"
3.3、啟動並測試
配置完成後就可以啟動 Kafka 叢集了:
cd /home/hadoop/app/kafka_2.13-2.4.1/bin
./kafka-server-start.sh -daemon ../config/server.properties
啟動完成後可以使用bin/kafka-topics.sh、bin/kafka-console-consumer.sh、kafka-console-producer.sh 來測試:
檢視 topic 資訊:
cd /home/hadoop/app/kafka_2.13-2.4.1/bin ./kafka-topics.sh --list --bootstrap-server 10.49.196.10:9092 --command-config ../config/kerberos/client.properties
傳送訊息:
cd /home/hadoop/app/kafka_2.13-2.4.1/bin ./kafka-console-producer.sh --broker-list 10.49.196.10:9092 --topic test --producer.config ../config/kerberos/client.properties
接受訊息:
cd /home/hadoop/app/kafka_2.13-2.4.1/bin ./kafka-console-consumer.sh --bootstrap-server 10.49.196.10:9092 --topic test --from-beginning --consumer.config ../config/kerberos/client.properties
3.4、java 程式連線 Zookeeper
java 可以使用 JAAS 來進行 Kerberos 認證,需要 JAAS 配置檔案、keytab 檔案及 Kerberos 配置檔案。
A、配置檔案
JAAS 配置檔案(kafka-client-jaas.conf):
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="D:\\workspaceidea\\demo\\demo\\src\\main\\resources\\kerberos\\kafka-client.keytab" storeKey=true useTicketCache=false principal="[email protected]"; };
keytab 檔案:
從 Kerberos 伺服器上拷貝到目標機器,拷貝路徑即為 JAAS 配置中間配置的路徑:D:\\workspaceidea\\demo\\demo\\src\\main\\resources\\kerberos\\kafka-client.keytab。
Kerberos 配置檔案(krb5.conf):
從Kerberos 伺服器上拷貝 /etc/krb5.conf 到目標機器即可。
B、配置 hosts 檔案
在 hosts 檔案中新增:
10.49.196.10 pxc1 10.49.196.11 pxc2 10.49.196.12 pxc3
C、引入依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.4.1</version> </dependency>
D、樣例程式
package com.inspur.demo.general.kafka; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.ListTopicsOptions; import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.admin.TopicListing; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.Collection; import java.util.Properties; public class KafkaKerberos { private AdminClient adminClient; @Before public void before() { System.setProperty("java.security.auth.login.config", "D:\\workspaceidea\\demo\\demo\\src\\main\\resources\\kerberos\\kafka-client-jaas.conf"); System.setProperty("java.security.krb5.conf", "D:\\workspaceidea\\demo\\demo\\src\\main\\resources\\kerberos\\krb5.conf"); Properties props = new Properties(); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "10.49.196.10:9092,10.49.196.11:9092,10.49.196.12:9092"); props.put("sasl.mechanism", "GSSAPI"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.kerberos.service.name", "kafka-server"); adminClient = AdminClient.create(props); } @After public void after() { adminClient.close(); } @Test public void listTopics() throws Exception { ListTopicsOptions listTopicsOptions = new ListTopicsOptions(); //是否羅列內部主題 listTopicsOptions.listInternal(true); ListTopicsResult result = adminClient.listTopics(listTopicsOptions); Collection<TopicListing> list = result.listings().get(); System.out.println(list); } }