Kafka SSL 和 ACL 配置
很久沒寫文章了,之所以寫這篇文章是想其他同學少走彎路,因為我在進行配置的時候發現google及百度沒有一篇像樣的文章。官方doc說的又不是很清楚,所以比較蛋疼,最終還是折騰出來了。
Kafka SSL 配置 大家先可以去看官方doc:我感覺比較誤導人。。
首先來看看整個叢集的架構
Kafka1 | Kafka2 | Kafka3 |
192.168.56.100 | 192.168.56.101 | 192.168.56.102 |
Zookeeper | Zookeeper | Zookeeper |
Kafka broker 100 | Kafka broker 101 |
Kafkabroker 102 |
叢集共三個節點如上述所示
2、在kafka1節點上面生成certificate和ca檔案[[email protected] kafka-0.9.0.1]# cat config/server.properties broker.id=100 #port=9020 port=9093 host.name=192.168.56.100 advertised.host.name=192.168.56.100 zookeeper.connect=192.168.56.100:2181,192.168.56.101:2181,192.168.56.102:2181/kafka91 allow.everyone.if.no.acl.found=true #allow.everyone.if.no.acl.found=false #super.users=User:Bob;User:Alice super.users=User:CN=kafka1,OU=test,O=test,L=test,ST=test,C=test #listeners=PLAINTEXT://192.168.56.100:9020, SSL://192.168.56.100:9093 #advertised.listeners=PLAINTEXT://192.168.56.100:9020, SSL://192.168.56.100:9093 listeners=SSL://192.168.56.100:9093 advertised.listeners=SSL://192.168.56.100:9093 ssl.keystore.location=/root/kafka1/kafka.server.keystore.jks ssl.keystore.password=zbdba94 ssl.key.password=zbdba94 ssl.truststore.location=/root/kafka1/kafka.server.truststore.jks ssl.truststore.password=zbdba94 ssl.client.auth=required ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 ssl.keystore.type=JKS ssl.truststore.type=JKS security.inter.broker.protocol=SSL #zookeeper.set.acl=true authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer principal.builder.class=org.apache.kafka.common.security.auth.DefaultPrincipalBuilder # Replication configurations num.replica.fetchers=4 replica.fetch.max.bytes=1048576 replica.fetch.wait.max.ms=500 replica.high.watermark.checkpoint.interval.ms=5000 replica.socket.timeout.ms=30000 replica.socket.receive.buffer.bytes=65536 replica.lag.time.max.ms=10000 controller.socket.timeout.ms=30000 controller.message.queue.size=10 default.replication.factor=3 # Log configuration log.dir=/data1/kafka-0.9.0.1/data kafka.logs.dir=logs num.partitions=20 message.max.bytes=1000000 auto.create.topics.enable=true log.index.interval.bytes=4096 log.index.size.max.bytes=10485760 log.retention.hours=720 log.flush.interval.ms=10000 log.flush.interval.messages=20000 log.flush.scheduler.interval.ms=2000 log.roll.hours=168 log.retention.check.interval.ms=300000 log.segment.bytes=1073741824 delete.topic.enable=true # ZK configuration zookeeper.connection.timeout.ms=6000 zookeeper.sync.time.ms=2000 # Socket server configuration num.io.threads=8 num.network.threads=8 socket.request.max.bytes=104857600 socket.receive.buffer.bytes=1048576 socket.send.buffer.bytes=1048576 queued.max.requests=16 fetch.purgatory.purge.interval.requests=100 producer.purgatory.purge.interval.requests=100
#!/bin/bash
name=$HOSTNAME
folder=securityDemo
cd /root
rm -rf $folder
mkdir $folder
cd $folder
printf "zbdba94\nzbdba94\kafka1\ntest\ntest\ntest\ntest\ntest\nyes\n\n" | keytool -keystore kafka.server.keystore.jks -alias $name -validity 365 -genkey
printf "te\ntest\ntest\ntest\ntest\kafka1\ [email protected]\n" | openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 -passout pass:zbdba94
echo "done"
printf "zbdba94\nzbdba94\nyes\n" | keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert
printf "zbdba94\nzbdba94\nyes\n" | keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert
printf "zbdba94\n" | keytool -keystore kafka.server.keystore.jks -alias $name -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:zbdba94
printf "zbdba94\nyes\n" | keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
printf "zbdba94\nyes\n" | keytool -keystore kafka.server.keystore.jks -alias $name -import -file cert-signed
#producer.propeties
rm -rf producer.properties
printf $PWD
echo "bootstrap.servers=$name:9093" >> producer.properties
echo "security.protocol=SSL" >> producer.properties
echo "ssl.truststore.location=$PWD/kafka.client.truststore.jks">> producer.properties
echo "ssl.truststore.password=zbdba94">> producer.properties
echo "ssl.keystore.location=$PWD/kafka.server.keystore.jks">> producer.properties
echo "ssl.keystore.password=zbdba94">> producer.properties
echo "ssl.key.password=zbdba94">> producer.properties
注意將kafka1換成你機器的host
3、在kafka2機器上生成客戶端certificate並採用kafka1生成的ca檔案進行標識
執行以下指令碼
client1.sh
#!/bin/bash
name=$HOSTNAME
cd /root
dirname=securityDemo
rm -rf $dirname
mkdir $dirname
cd $dirname
printf "zbdba94\nzbdba94\n$name\ntest\ntest\ntest\ntest\ntest\nyes\n\n" | keytool -keystore kafka.client.keystore.jks -alias $name -validity 36 -genkey
printf "zbdba94\nzbdba94\nyes\n" |keytool -keystore kafka.client.keystore.jks -alias $name -certreq -file cert-file
cp cert-file cert-file-$name
在kafka2節點上將kafka1生成的檔案拷貝過來
cd /root/ && scp -r kafka1:/root/securityDemo /root/kafka1
然後執行以下指令碼
client2.sh
#!/bin/bash
name=$HOSTNAME
cd /root
openssl x509 -req -CA /root/kafka1/ca-cert -CAkey /root/kafka1/ca-key -in /root/securityDemo/cert-file-$name -out /root/securityDemo/cert-signed-$name -days 36 -CAcreateserial -passin pass:zbdba94
然後執行以下指令碼
client3.sh
#!/bin/sh
name=$HOSTNAME
cd /root/securityDemo
printf "zbdba94\nyes\n" | keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file /root/kafka1/ca-cert
printf "zbdba94\nyes\n" | keytool -keystore kafka.client.keystore.jks -alias $name -import -file /root/securityDemo/cert-signed-$name
#producer.propeties
rm -rf producer.properties
printf $PWD
echo "bootstrap.servers=localhost:9093" >> producer.properties
echo "security.protocol=SSL" >> producer.properties
echo "ssl.truststore.location=$PWD/kafka.client.keystore.jks">> producer.properties
echo "ssl.truststore.password=zbdba94">> producer.properties
echo "ssl.keystore.location=$PWD/kafka.client.keystore.jks">> producer.properties
echo "ssl.keystore.password=zbdba94">> producer.properties
echo "ssl.key.password=zbdba94">> producer.properties
同理kafka3節點安裝kafka2節點進行配置
4、啟動叢集
啟動叢集logfile中列印如下日誌:
三個節點分別列印
INFO Registered broker 100 at path /brokers/ids/100 with addresses: SSL -> EndPoint(192.168.56.100,9093,SSL) (kafka.utils.ZkUtils)
INFO Registered broker 101 at path /brokers/ids/101 with addresses: SSL -> EndPoint(192.168.56.101,9093,SSL) (kafka.utils.ZkUtils)
INFO Registered broker 102 at path /brokers/ids/102 with addresses: SSL -> EndPoint(192.168.56.102,9093,SSL) (kafka.utils.ZkUtils)
然後進行驗證
可以按照官方給的demo驗證
To check quickly if the server keystore and truststore are setup properly you can run the following command
openssl s_client -debug -connect localhost:9093 -tls1
(Note: TLSv1 should be listed under ssl.enabled.protocols)
In the output of this command you should see server's certificate:
-----BEGIN CERTIFICATE-----
{variable sized random bytes}
-----END CERTIFICATE-----
subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani
issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/[email protected]
開始寫入和消費驗證
在kafka1寫入訊息:
/data1/kafka-0.9.0.1/bin/kafka-console-producer.sh --broker-list kafka1:9093 --topic jingbotest5 --producer.config /root/securityDemo/producer.properties
在kafka2消費訊息:/data1/kafka-0.9.0.1/bin/kafka-console-consumer.sh --bootstrap-server kafka2:9093 --topic jingbotest5 --new-consumer --consumer.config /root/securityDemo/producer.properties
如果可以正常消費則沒有問題
下面看看Java client如何連線
這裡只給出簡單的demo,主要展示如何連線
package com.jingbo.test;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SslConfigs;
public class ProducerZbdba {
public static void main(String[] args) {
Properties props = new Properties();
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.102:9093");
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "myApiKey");
producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "C:/Users/zbdba/Downloads/kafka.client.keystore.jks");
producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "zbdba94");
producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "C:/Users/zbdba/Downloads/kafka.client.keystore.jks");
producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "zbdba94");
producerProps.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer(producerProps);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("jingbotest5", Integer.toString(i), Integer.toString(i)));
System.out.println("test");
producer.close();
}
}
kafka.client.keystore.jks可以從任意一個節點拷貝下來。
之前在102 的消費者可以一直開著,這是寫入看那邊能否消費到。如果可以正常消費,那麼表示SSL已經配置成功了。
Kafka ACL 配置
本來想單獨開一篇文章的,但是感覺比較簡單就沒有必要,那為什麼要說這個呢,是因為還是 有點坑的。 大家可以先參考官方的doc:
我按照配置,最後出現瞭如下錯誤:
[2016-09-05 06:32:35,144] ERROR [KafkaApi-100] error when handling request Name:UpdateMetadataRequest;Version:1;Controller:100;ControllerEpoch:39;CorrelationId:116;ClientId:100;AliveBrokers:102 : (EndPoint(192.168.56.102,9093,SSL)),101 : (EndPoint(192.168.56.101,9093,SSL)),100 : (EndPoint(192.168.56.100,9093,SSL));PartitionState:[jingbotest5,2] -> (LeaderAndIsrInfo:(Leader:102,ISR:102,LeaderEpoch:42,ControllerEpoch:39),ReplicationFactor:3),AllReplicas:102,101,100),[jingbotest5,5] -> (LeaderAndIsrInfo:(Leader:102,ISR:102,LeaderEpoch:42,ControllerEpoch:39),ReplicationFactor:3),AllReplicas:102,100,101),[jingbotest5,8] -> (LeaderAndIsrInfo:(Leader:102,ISR:102,LeaderEpoch:40,ControllerEpoch:39),ReplicationFactor:3),AllReplicas:102,101,100) (kafka.server.KafkaApis)
kafka.common.ClusterAuthorizationException: Request Request(1,192.168.56.100:9093-192.168.56.100:43909,Session(User:CN=zbdba2,OU=test,O=test,L=test,ST=test,C=test,zbdba2/192.168.56.100),null,1473071555140,SSL) is not authorized.
at kafka.server.KafkaApis.authorizeClusterAction(KafkaApis.scala:910)
at kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:158)
at kafka.server.KafkaApis.handle(KafkaApis.scala:74)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:744)
[2016-09-05 06:32:35,310] ERROR [ReplicaFetcherThread-2-101], Error for partition [jingbotest5,4] to broker 101:org.apache.kafka.common.errors.AuthorizationException: Topic authorization failed. (kafka.server.ReplicaFetcherThread)
官方的doc說配置了:
principal.builder.class=CustomizedPrincipalBuilderClass
就可以為SSL使用者進行ACL驗證,但是CustomizedPrincipalBuilderClass已經過時,搜尋doc的時候發現已經變更為:class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
於是開開心心拿去配置上,然而啟動錯誤。根據日誌發現其實不能用class的,也就是org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
所以對熱愛看官方doc的人遇到kafka還是比較蛋疼的。
最終起來了,但是還是依然報以上的錯誤。看到這篇文章的人就踩不到坑了,因為上面我已經幫你配置好了。
super.users=User:CN=kafka1,OU=test,O=test,L=test,ST=test,C=test
直接將SSL使用者設定為superuser
這時候ACL就可以正常的跑起來了。
Kafka 的 SSL 和ACL 感覺整合起來可以實現一套完整的許可權控制,但是不知道真正執行起來是否有坑,對於效能影響方面大家可以去參考slideshare上面的一個ppt
當然你可以自行壓力測試,根據ppt上所示,效能會有30%左右的損耗。
作者也諮詢了各大廠商,用的人比較少。還有的準備要上。我們也在考慮是否要上,業務需求比較大。
以上的指令碼作者整理一下並且放入到了github中:
https://github.com/zbdba/Kafka-SSL-config
參考連結:
https://github.com/confluentinc/securing-kafka-blog 這裡面通過Vagrant整合全自動配置
相關推薦
Kafka SSL 和 ACL 配置
很久沒寫文章了,之所以寫這篇文章是想其他同學少走彎路,因為我在進行配置的時候發現google及百度沒有一篇像樣的文章。官方doc說的又不是很清楚,所以比較蛋疼,最終還是折騰出來了。 Kafka SSL 配置 大家先可以去看官方doc: 我感覺比較誤導人。。 首先來看
kafka消費者和生產者配置
http://blog.csdn.net/wackycrazy/article/details/47810741 http://www.cnblogs.com/yinchengzhe/p/5111635.html
Kafka SSL安裝與配置
1.概述 最近有同學諮詢說,Kafka的SSL安全認證如何安裝與使用?今天筆者將通過以下幾個方面來介紹Kafka的SSL: Kafka 許可權介紹 Kafka SSL的安裝與使用 Kafka Eagle中如何配置SSL? 2.內容 2.1 什麼是Kafka許可權認證? 在Kafka 0.9.0.0之後,K
初次使用git就遭遇不測,提示沒有這個服務連接和需要配置git的一個http參數 NO network connection,SSl host could not be verified ...
連接 ... 客戶 用戶 eclispe img net ren src 第一次使用git 拉取服務上的項目到本地,結果,在拿到訪問的url地址後,輸入用戶名密碼,失敗了。 --eclispe 4.5.3 繼承了git客戶端插件的版本 -----
log4j實時將數據寫入到kafka,Demo和相關的配置詳解
producer ceshi class ogg slf4 lte std att mage 一:在項目中引入對應的JAR包,如下,註意對應的包與之前包的沖突 <dependencies> <dependency> <group
spark-project專案的kafka和scala配置
安裝scala 2.11.4 1、將scala-2.11.4.tgz使用WinSCP拷貝到sparkproject1的/usr/local目錄下。 2、對scala-2.11.4.tg
rabbit mq 配置mqtt ssl 和測試
mosquitto_pub -h han1 -p 8883 -t "/mqtt/1" --cafile /home/pi/ca/ca-chain.cert.pem --cert /home/pi/ca/client.crt --key /home/pi/ca/c
springboot配置kafka生產者和消費者詳解
在原有pom.xml依賴下新新增一下kafka依賴ar包 <!--kafka--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka
記錄DCOS中SSL證書的配置和除錯過程
mesosphere已經基本搭建完成,安裝了marathon-lb做請求分發,最後需要將所有的請求轉為https處理。由於不準備做全域性的證書,所以只能針對每個應用單獨進行證書配置。 起初沒
Kafka概念和關於springboot配置Kafka引數詳解
1.基本概念 *Producer: 訊息生產者,往Topic釋出訊息 *Consumer: 訊息消費者,往Topic取訊
Kafka訊息處理系統配置和簡單使用
Kafka:Kafka是由Apache軟體基金會開發的一個開源流處理平臺,由Scala和Java編寫。Kafka是一種高吞吐量的分散式釋出訂閱訊息系統,它可以處理消費者規模的網站中的所有動作流資料。 這種動作(網頁瀏覽,搜尋和其他使用者的行動)是在現代網路上的許多社會功能的一
【筆記】kafka權威指南-常用配置和要點記錄
Kafka 的應用場景 訊息佇列 Kafka有更好的吞吐量,內建的分割槽,冗餘及容錯性,這讓Kafka成為了一個很好的大規模訊息處理應用的解決方案。 行為跟蹤和日誌收集。 敏感操作和日誌,都可以寫到 kafka 裡進行統一,分情況的監控、
kafka ssl & acl
一 KafkaSSL配置 1.1 建立金鑰和證書 以下在每臺kafka伺服器上執行 keytool -keystoreserver.keystore.jks -alias localhost -validity 365-keyalg RSA -genkey 以下在連線
IBM http Server 和 WebSphere 配置 SSL 一些需要注意的地方
前段時間,在WebSphere的環境中, 配置SSL,有一些細節需要注意: 1. 最好是先安裝 ibm http server7(32bit),websphere7,再安裝外掛 2. http server 需要安裝外掛,外掛的下載地址是: https://www14.so
Kafka SASL ACL配置踩坑總結
源起:工程現階段中介軟體採用的是kafka。滿足了大資料的高吞吐,專案間的解耦合,也增強了工程的容錯率與擴充套件性。但是在安全這一塊還有漏洞,kafka叢集中,只要網站內的任何人知道kafka叢集的ip與topic,都可以肆無忌憚的往叢集中的topic中傳送資料與消費資料。 經過調研:kafka的sasl a
kafka安裝和使用
grep keys operator comment 隨機 寫入 實時流處理 生產者 keyword kafka安裝和啟動 kafka的背景知識已經講了很多了,讓我們現在開始實踐吧,假設你現在沒有Kafka和ZooKeeper環境。 Step 1: 下
web.xml中配置spring監聽器和spring配置文件位置
nco erl spring XML param onf ati spa extc <!-- spring配置文件位置 --> <context-param> <param-name>contextConfigLocation</
Java 異常處理和 Log4j 配置文件
images orm 信息 not bound img source jar 下標 一、 程序錯誤 警告:黃線:Warning 錯誤:資源類:系統級(線程)/ 環境級(繪圖) 異常:編譯級:Exception 運行級:Exception -> RuntimeExc
springBoot(2):Properties和YAML配置文件
springboot springboot的properties和yaml配置文件 一、配置文件的生效順序,會對值進行覆蓋1. @TestPropertySource 註解2. 命令行參數3. Java系統屬性(System.getProperties())4. 操作系統環境變量5. 只有在rando
jdk安裝和環境配置
stat png string 改變 ima 繼續 out lib jar public class test{ public static void main(String[] args){ System.out.println("hello w