1. 程式人生 > >Kafka SSL 和 ACL 配置

Kafka SSL 和 ACL 配置

很久沒寫文章了,之所以寫這篇文章是想其他同學少走彎路,因為我在進行配置的時候發現google及百度沒有一篇像樣的文章。官方doc說的又不是很清楚,所以比較蛋疼,最終還是折騰出來了。

Kafka SSL 配置 大家先可以去看官方doc:

我感覺比較誤導人。。

首先來看看整個叢集的架構

Kafka1 Kafka2 Kafka3
192.168.56.100 192.168.56.101 192.168.56.102
Zookeeper Zookeeper Zookeeper
Kafka broker 100 Kafka broker 101 Kafkabroker 102

叢集共三個節點如上述所示

配置步驟如下: zookeeper與kafka的安裝部署我這裡就不說了,主要說SSL配置 1、配置三個節點的server.properties 檔案
[[email protected] kafka-0.9.0.1]# cat config/server.properties
broker.id=100
#port=9020
port=9093
host.name=192.168.56.100
advertised.host.name=192.168.56.100
zookeeper.connect=192.168.56.100:2181,192.168.56.101:2181,192.168.56.102:2181/kafka91
allow.everyone.if.no.acl.found=true
#allow.everyone.if.no.acl.found=false
#super.users=User:Bob;User:Alice
super.users=User:CN=kafka1,OU=test,O=test,L=test,ST=test,C=test
#listeners=PLAINTEXT://192.168.56.100:9020, SSL://192.168.56.100:9093
#advertised.listeners=PLAINTEXT://192.168.56.100:9020, SSL://192.168.56.100:9093
listeners=SSL://192.168.56.100:9093
advertised.listeners=SSL://192.168.56.100:9093
ssl.keystore.location=/root/kafka1/kafka.server.keystore.jks
ssl.keystore.password=zbdba94
ssl.key.password=zbdba94
ssl.truststore.location=/root/kafka1/kafka.server.truststore.jks
ssl.truststore.password=zbdba94
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
security.inter.broker.protocol=SSL

#zookeeper.set.acl=true
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
principal.builder.class=org.apache.kafka.common.security.auth.DefaultPrincipalBuilder


# Replication configurations                     
num.replica.fetchers=4                           
replica.fetch.max.bytes=1048576                   
replica.fetch.wait.max.ms=500                     
replica.high.watermark.checkpoint.interval.ms=5000
replica.socket.timeout.ms=30000                   
replica.socket.receive.buffer.bytes=65536         
replica.lag.time.max.ms=10000                     
controller.socket.timeout.ms=30000               
controller.message.queue.size=10                 
default.replication.factor=3     

# Log configuration       
log.dir=/data1/kafka-0.9.0.1/data
kafka.logs.dir=logs                     
num.partitions=20                           
message.max.bytes=1000000                         
auto.create.topics.enable=true                   
log.index.interval.bytes=4096                     
log.index.size.max.bytes=10485760                 
log.retention.hours=720                           
log.flush.interval.ms=10000                       
log.flush.interval.messages=20000                 
log.flush.scheduler.interval.ms=2000             
log.roll.hours=168                               
log.retention.check.interval.ms=300000           
log.segment.bytes=1073741824     
delete.topic.enable=true               

# ZK configuration                               
zookeeper.connection.timeout.ms=6000             
zookeeper.sync.time.ms=2000                       

# Socket server configuration                     
num.io.threads=8                                 
num.network.threads=8                             
socket.request.max.bytes=104857600               
socket.receive.buffer.bytes=1048576               
socket.send.buffer.bytes=1048576                 
queued.max.requests=16                           
fetch.purgatory.purge.interval.requests=100       
producer.purgatory.purge.interval.requests=100   
2、在kafka1節點上面生成certificate和ca檔案
#!/bin/bash
name=$HOSTNAME
folder=securityDemo

cd /root
rm -rf $folder
mkdir $folder
cd $folder
printf "zbdba94\nzbdba94\kafka1\ntest\ntest\ntest\ntest\ntest\nyes\n\n" | keytool -keystore kafka.server.keystore.jks -alias $name  -validity 365 -genkey

printf "te\ntest\ntest\ntest\ntest\kafka1\
[email protected]
\n" | openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 -passout pass:zbdba94 echo "done" printf "zbdba94\nzbdba94\nyes\n" | keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert printf "zbdba94\nzbdba94\nyes\n" | keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert printf "zbdba94\n" | keytool -keystore kafka.server.keystore.jks -alias $name -certreq -file cert-file openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:zbdba94 printf "zbdba94\nyes\n" | keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert printf "zbdba94\nyes\n" | keytool -keystore kafka.server.keystore.jks -alias $name -import -file cert-signed #producer.propeties rm -rf producer.properties printf $PWD echo "bootstrap.servers=$name:9093" >> producer.properties echo "security.protocol=SSL" >> producer.properties echo "ssl.truststore.location=$PWD/kafka.client.truststore.jks">> producer.properties echo "ssl.truststore.password=zbdba94">> producer.properties echo "ssl.keystore.location=$PWD/kafka.server.keystore.jks">> producer.properties echo "ssl.keystore.password=zbdba94">> producer.properties echo "ssl.key.password=zbdba94">> producer.properties
注意將kafka1換成你機器的host 3、在kafka2機器上生成客戶端certificate並採用kafka1生成的ca檔案進行標識 執行以下指令碼

client1.sh

#!/bin/bash
name=$HOSTNAME
cd /root
dirname=securityDemo
rm -rf $dirname
mkdir $dirname
cd $dirname

printf "zbdba94\nzbdba94\n$name\ntest\ntest\ntest\ntest\ntest\nyes\n\n" |  keytool -keystore kafka.client.keystore.jks -alias $name -validity 36 -genkey
printf "zbdba94\nzbdba94\nyes\n" |keytool -keystore kafka.client.keystore.jks -alias $name -certreq -file cert-file

cp cert-file cert-file-$name
在kafka2節點上將kafka1生成的檔案拷貝過來 cd /root/ && scp -r kafka1:/root/securityDemo /root/kafka1 然後執行以下指令碼 client2.sh
#!/bin/bash
name=$HOSTNAME
cd /root
openssl x509 -req -CA /root/kafka1/ca-cert -CAkey /root/kafka1/ca-key -in /root/securityDemo/cert-file-$name  -out /root/securityDemo/cert-signed-$name  -days 36 -CAcreateserial -passin pass:zbdba94
然後執行以下指令碼 client3.sh
#!/bin/sh
name=$HOSTNAME

cd /root/securityDemo
printf "zbdba94\nyes\n" | keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file /root/kafka1/ca-cert
printf "zbdba94\nyes\n" | keytool -keystore kafka.client.keystore.jks -alias $name -import -file /root/securityDemo/cert-signed-$name



#producer.propeties
rm -rf producer.properties

printf $PWD


echo "bootstrap.servers=localhost:9093" >> producer.properties
echo "security.protocol=SSL" >> producer.properties
echo "ssl.truststore.location=$PWD/kafka.client.keystore.jks">> producer.properties
echo "ssl.truststore.password=zbdba94">> producer.properties
echo "ssl.keystore.location=$PWD/kafka.client.keystore.jks">> producer.properties
echo "ssl.keystore.password=zbdba94">> producer.properties
echo "ssl.key.password=zbdba94">> producer.properties
同理kafka3節點安裝kafka2節點進行配置 4、啟動叢集 啟動叢集logfile中列印如下日誌: 三個節點分別列印
INFO Registered broker 100 at path /brokers/ids/100 with addresses: SSL -> EndPoint(192.168.56.100,9093,SSL) (kafka.utils.ZkUtils)
INFO Registered broker 101 at path /brokers/ids/101 with addresses: SSL -> EndPoint(192.168.56.101,9093,SSL) (kafka.utils.ZkUtils)
INFO Registered broker 102 at path /brokers/ids/102 with addresses: SSL -> EndPoint(192.168.56.102,9093,SSL) (kafka.utils.ZkUtils)
然後進行驗證 可以按照官方給的demo驗證 To check quickly if the server keystore and truststore are setup properly you can run the following command openssl s_client -debug -connect localhost:9093 -tls1 (Note: TLSv1 should be listed under ssl.enabled.protocols) In the output of this command you should see server's certificate:   -----BEGIN CERTIFICATE-----   {variable sized random bytes}   -----END CERTIFICATE-----   subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani   issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/[email protected] 開始寫入和消費驗證 在kafka1寫入訊息:
/data1/kafka-0.9.0.1/bin/kafka-console-producer.sh --broker-list kafka1:9093 --topic jingbotest5 --producer.config /root/securityDemo/producer.properties
在kafka2消費訊息:
/data1/kafka-0.9.0.1/bin/kafka-console-consumer.sh --bootstrap-server kafka2:9093 --topic jingbotest5 --new-consumer --consumer.config /root/securityDemo/producer.properties
如果可以正常消費則沒有問題 下面看看Java client如何連線 這裡只給出簡單的demo,主要展示如何連線
package com.jingbo.test;

import java.util.Properties;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SslConfigs;

public class ProducerZbdba {
    public static void main(String[] args) {
         Properties props = new Properties();
         Properties producerProps = new Properties();
         producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.102:9093");
         producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "myApiKey");
         producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "C:/Users/zbdba/Downloads/kafka.client.keystore.jks");
         producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "zbdba94");
         producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "C:/Users/zbdba/Downloads/kafka.client.keystore.jks");
         producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "zbdba94");
         producerProps.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
         producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");

         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

         KafkaProducer producer = new KafkaProducer(producerProps);
         for(int i = 0; i < 100; i++)
             producer.send(new ProducerRecord<String, String>("jingbotest5", Integer.toString(i), Integer.toString(i)));
             System.out.println("test");
         producer.close();
    }
}

kafka.client.keystore.jks可以從任意一個節點拷貝下來。
之前在102 的消費者可以一直開著,這是寫入看那邊能否消費到。如果可以正常消費,那麼表示SSL已經配置成功了。
Kafka ACL 配置

本來想單獨開一篇文章的,但是感覺比較簡單就沒有必要,那為什麼要說這個呢,是因為還是 有點坑的。 大家可以先參考官方的doc:

我按照配置,最後出現瞭如下錯誤:

[2016-09-05 06:32:35,144] ERROR [KafkaApi-100] error when handling request Name:UpdateMetadataRequest;Version:1;Controller:100;ControllerEpoch:39;CorrelationId:116;ClientId:100;AliveBrokers:102 : (EndPoint(192.168.56.102,9093,SSL)),101 : (EndPoint(192.168.56.101,9093,SSL)),100 : (EndPoint(192.168.56.100,9093,SSL));PartitionState:[jingbotest5,2] -> (LeaderAndIsrInfo:(Leader:102,ISR:102,LeaderEpoch:42,ControllerEpoch:39),ReplicationFactor:3),AllReplicas:102,101,100),[jingbotest5,5] -> (LeaderAndIsrInfo:(Leader:102,ISR:102,LeaderEpoch:42,ControllerEpoch:39),ReplicationFactor:3),AllReplicas:102,100,101),[jingbotest5,8] -> (LeaderAndIsrInfo:(Leader:102,ISR:102,LeaderEpoch:40,ControllerEpoch:39),ReplicationFactor:3),AllReplicas:102,101,100) (kafka.server.KafkaApis)
kafka.common.ClusterAuthorizationException: Request Request(1,192.168.56.100:9093-192.168.56.100:43909,Session(User:CN=zbdba2,OU=test,O=test,L=test,ST=test,C=test,zbdba2/192.168.56.100),null,1473071555140,SSL) is not authorized.
        at kafka.server.KafkaApis.authorizeClusterAction(KafkaApis.scala:910)
        at kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:158)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:74)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
        at java.lang.Thread.run(Thread.java:744)
[2016-09-05 06:32:35,310] ERROR [ReplicaFetcherThread-2-101], Error for partition [jingbotest5,4] to broker 101:org.apache.kafka.common.errors.AuthorizationException: Topic authorization failed. (kafka.server.ReplicaFetcherThread)
官方的doc說配置了: principal.builder.class=CustomizedPrincipalBuilderClass 就可以為SSL使用者進行ACL驗證,但是CustomizedPrincipalBuilderClass已經過時,搜尋doc的時候發現已經變更為:class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder 於是開開心心拿去配置上,然而啟動錯誤。根據日誌發現其實不能用class的,也就是org.apache.kafka.common.security.auth.DefaultPrincipalBuilder 所以對熱愛看官方doc的人遇到kafka還是比較蛋疼的。 最終起來了,但是還是依然報以上的錯誤。看到這篇文章的人就踩不到坑了,因為上面我已經幫你配置好了。 super.users=User:CN=kafka1,OU=test,O=test,L=test,ST=test,C=test 直接將SSL使用者設定為superuser 這時候ACL就可以正常的跑起來了。 Kafka 的 SSL 和ACL 感覺整合起來可以實現一套完整的許可權控制,但是不知道真正執行起來是否有坑,對於效能影響方面大家可以去參考slideshare上面的一個ppt 當然你可以自行壓力測試,根據ppt上所示,效能會有30%左右的損耗。


作者也諮詢了各大廠商,用的人比較少。還有的準備要上。我們也在考慮是否要上,業務需求比較大。

以上的指令碼作者整理一下並且放入到了github中:

https://github.com/zbdba/Kafka-SSL-config

參考連結:

https://github.com/confluentinc/securing-kafka-blog 這裡面通過Vagrant整合全自動配置










相關推薦

Kafka SSL ACL 配置

很久沒寫文章了,之所以寫這篇文章是想其他同學少走彎路,因為我在進行配置的時候發現google及百度沒有一篇像樣的文章。官方doc說的又不是很清楚,所以比較蛋疼,最終還是折騰出來了。 Kafka SSL 配置 大家先可以去看官方doc: 我感覺比較誤導人。。 首先來看

kafka消費者生產者配置

http://blog.csdn.net/wackycrazy/article/details/47810741 http://www.cnblogs.com/yinchengzhe/p/5111635.html

Kafka SSL安裝與配置

1.概述 最近有同學諮詢說,Kafka的SSL安全認證如何安裝與使用?今天筆者將通過以下幾個方面來介紹Kafka的SSL: Kafka 許可權介紹 Kafka SSL的安裝與使用 Kafka Eagle中如何配置SSL? 2.內容 2.1 什麼是Kafka許可權認證? 在Kafka 0.9.0.0之後,K

初次使用git就遭遇不測,提示沒有這個服務連接需要配置git的一個http參數 NO network connection,SSl host could not be verified ...

連接 ... 客戶 用戶 eclispe img net ren src 第一次使用git 拉取服務上的項目到本地,結果,在拿到訪問的url地址後,輸入用戶名密碼,失敗了。 --eclispe 4.5.3 繼承了git客戶端插件的版本 -----

log4j實時將數據寫入到kafka,Demo相關的配置詳解

producer ceshi class ogg slf4 lte std att mage 一:在項目中引入對應的JAR包,如下,註意對應的包與之前包的沖突 <dependencies> <dependency> <group

spark-project專案的kafkascala配置

安裝scala 2.11.4 1、將scala-2.11.4.tgz使用WinSCP拷貝到sparkproject1的/usr/local目錄下。 2、對scala-2.11.4.tg

rabbit mq 配置mqtt ssl 測試

mosquitto_pub -h han1 -p 8883 -t "/mqtt/1"   --cafile /home/pi/ca/ca-chain.cert.pem  --cert /home/pi/ca/client.crt  --key /home/pi/ca/c

springboot配置kafka生產者消費者詳解

在原有pom.xml依賴下新新增一下kafka依賴ar包 <!--kafka--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka

記錄DCOS中SSL證書的配置除錯過程

mesosphere已經基本搭建完成,安裝了marathon-lb做請求分發,最後需要將所有的請求轉為https處理。由於不準備做全域性的證書,所以只能針對每個應用單獨進行證書配置。 起初沒

Kafka概念關於springboot配置Kafka引數詳解

1.基本概念 *Producer:            訊息生產者,往Topic釋出訊息 *Consumer:            訊息消費者,往Topic取訊

Kafka訊息處理系統配置簡單使用

Kafka:Kafka是由Apache軟體基金會開發的一個開源流處理平臺,由Scala和Java編寫。Kafka是一種高吞吐量的分散式釋出訂閱訊息系統,它可以處理消費者規模的網站中的所有動作流資料。 這種動作(網頁瀏覽,搜尋和其他使用者的行動)是在現代網路上的許多社會功能的一

【筆記】kafka權威指南-常用配置要點記錄

Kafka 的應用場景 訊息佇列 Kafka有更好的吞吐量,內建的分割槽,冗餘及容錯性,這讓Kafka成為了一個很好的大規模訊息處理應用的解決方案。 行為跟蹤和日誌收集。 敏感操作和日誌,都可以寫到 kafka 裡進行統一,分情況的監控、

kafka ssl & acl

一  KafkaSSL配置 1.1  建立金鑰和證書 以下在每臺kafka伺服器上執行 keytool -keystoreserver.keystore.jks -alias localhost -validity 365-keyalg RSA -genkey 以下在連線

IBM http Server WebSphere 配置 SSL 一些需要注意的地方

前段時間,在WebSphere的環境中, 配置SSL,有一些細節需要注意: 1. 最好是先安裝 ibm http server7(32bit),websphere7,再安裝外掛 2. http server 需要安裝外掛,外掛的下載地址是: https://www14.so

Kafka SASL ACL配置踩坑總結

源起:工程現階段中介軟體採用的是kafka。滿足了大資料的高吞吐,專案間的解耦合,也增強了工程的容錯率與擴充套件性。但是在安全這一塊還有漏洞,kafka叢集中,只要網站內的任何人知道kafka叢集的ip與topic,都可以肆無忌憚的往叢集中的topic中傳送資料與消費資料。 經過調研:kafka的sasl a

kafka安裝使用

grep keys operator comment 隨機 寫入 實時流處理 生產者 keyword kafka安裝和啟動 kafka的背景知識已經講了很多了,讓我們現在開始實踐吧,假設你現在沒有Kafka和ZooKeeper環境。 Step 1: 下

web.xml中配置spring監聽器spring配置文件位置

nco erl spring XML param onf ati spa extc <!-- spring配置文件位置 --> <context-param> <param-name>contextConfigLocation</

Java 異常處理 Log4j 配置文件

images orm 信息 not bound img source jar 下標 一、 程序錯誤 警告:黃線:Warning 錯誤:資源類:系統級(線程)/ 環境級(繪圖) 異常:編譯級:Exception 運行級:Exception -> RuntimeExc

springBoot(2):PropertiesYAML配置文件

springboot springboot的properties和yaml配置文件 一、配置文件的生效順序,會對值進行覆蓋1. @TestPropertySource 註解2. 命令行參數3. Java系統屬性(System.getProperties())4. 操作系統環境變量5. 只有在rando

jdk安裝環境配置

stat png string 改變 ima 繼續 out lib jar public class test{ public static void main(String[] args){ System.out.println("hello w