1. 程式人生 > >公網IP作為kafka叢集的生產者實現手記+SASL認證

公網IP作為kafka叢集的生產者實現手記+SASL認證

叢集基本情況:

192.168.1.27 (bg01)

192.168.1.28 (bg02)

192.168.7.152 (bg03)

專案需要,將192.168.1.27:9092對映到外網58.60.1xx.xxx:9092.供其他公網ip連線生產資料到叢集。

bg01機器的server.properties配置為:

############################# Socket Server Settings ############################# listeners=PLAINTEXT://192.168.1.27:9092 advertised.listeners=PLAINTEXT://58.60.1xx.xxx:9092 zookeeper.connect=localhost:2181

zookeeper.connect=192.168.1.27:2181,192.168.1.28:2181,192.168.7.152:2181

出於方便,沒有用公網ip除錯,而是用的本地電腦除錯。結果生產資料非常非常慢,kafka主機還接收不到資料。

在這個環節費了很多心力。後來用telnet 58.60.1xx.xxx 9092,結果telnet不通!

於是用fatjar打包程式放到遠端機器上除錯。一炮而成功。

(需要下載fatjar的請在我的資源裡下載,價格便宜,療效好)

總結就是,遠端連線kafka並沒有那麼難。我最開始被人誤導了要在本地機器模擬遠端機器,費了很多時間,心力交瘁。

認證:

1.配置kafka的server端(每個broker中):

vi $KAFKA_HOME/server.properties

listeners=SASL_PLAINTEXT://x-x-x-x:9092
security.inter.broker.protocol=SASL_PLAINTEXT 
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN

在$KAFKA_HOME路徑下新建JAAS檔案。

vi kafka_server_jaas.conf

KafkaServer {
 org.apache.kafka.common.security.plain.PlainLoginModule required
 username="kafka"
 password="kafka#secret"
 user_kafka="kafka#secret" 
 user_alice="alice#secret"
}

這個配置定義了2個使用者(kafka 和 alice)。在KafkaServer部分,username和password是broker用於初始化連線到其他的broker,在這個例子中,kafka使用者為broker間的通訊,useruserName定義了所有連線到broker和broker驗證的所有的客戶端連線包括其他broker的使用者密碼。(轉自連結:http://orchome.com/270) 另,useruserName必須配置kafka使用者,否則報錯。部落格http://wangzzu.github.io/2016/07/29/sasl-plain-kafka/和一些問答中都反應此問題。

JAAS檔案作為每個broker的jvm引數,在kafka-server-start.sh指令碼中增加如下配置。

vi $KAFKA_HOME/bin/kafka-server-start.sh

if [  "x$KAFKA_OPTS" ]; then
 export KAFKA_OPTS="-Djava.security.auth.login.config=/work/install/kafka_2.11-1.0.1/kafka_server_jaas.conf"
fi

2.配置kafka client端

第一種,console

在$KAFKA_HOME路徑下新建JAAS檔案。

vi kafka_client_jaas.conf

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafka#secret";
};

KafkaClient部分,username和password是客戶端用來配置客戶端連線broker的使用者,在這個例子中,客戶端使用kafka 使用者連線到broker。

修改consuer和producer的配置檔案(配置注意空格不然報錯)

在$KAFKA_HOME/config/consumer.properties和$KAFKA_HOME/config/producer.properties裡分別加上如下配置

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

在啟動consumer和producer時,分別新增jvm引數。

vi kafka-console-consumer.sh/kafka-console-producer.sh($KAFKA_HOME/bin目錄下)

if [  "x$KAFKA_OPTS" ]; then
 export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka_0.10/kafka_client_jaas.conf"
fi

produce

kafka-console-producer.sh --broker-list x.x.x.x:9092 --topic perf --producer.config /usr/local/kafka_0.10/config/producer.properties

consume 只支援新的消費方式--bootstrap-server

不報錯 --bootstrap-server

kafka-console-consumer.sh  --bootstrap-server x.x.x.x:9092  --topic perf --from-beginning --consumer.config /usr/local/kafka_0.10/config/consumer.properties --new-consumer

報錯 --zookeeper

kafka-console-consumer.sh --zookeeper x.x.x.x:21818/kafka --topic perf  --consumer.config /usr/local/kafka_0.10/config/consumer.properties

第二種,java客戶端消費

執行jar包的伺服器的指定路徑下必須有kafka_client_jaas.conf檔案

在程式中新增如下配置

System.setProperty("java.security.auth.login.config", "/tmp/kafka_client_jaas.conf"); //配置檔案路徑
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");

此配置重啟bg01的kafka報錯:

[2018-07-25 14:54:07,608] FATAL  (kafka.Kafka$) java.lang.IllegalArgumentException: requirement failed: inter.broker.listener.name must be a listener name defined in advertised.listeners. The valid options based on currently configured listeners are PLAINTEXT

把kafka_server_start.sh中的修改註釋掉,仍然報錯。

把server.properties中的修改註釋掉,就不報錯了。可見問題出在

#security.inter.broker.protocol=SASL_PLAINTEXT  #sasl.enabled.mechanisms=PLAIN #sasl.mechanism.inter.broker.protocol=PLAIN 結合上面的報錯,就把註釋取消,另外加上了

inter.broker.listener.name=58.60.186.153 security.inter.broker.protocol=SASL_PLAINTEXT  sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN 報錯:

org.apache.kafka.common.config.ConfigException: Only one of inter.broker.listener.name and security.inter.broker.protocol should be set.

於是把 security.inter.broker.protocol 註釋掉

kill掉kafka,重新啟動。報錯:

org.apache.kafka.common.config.ConfigException: Listener with name 58.60.186.153 defined in inter.broker.listener.name not found in listener.security.protocol.map.  

把kafka_server_start.sh和server.properties都改回原來。這個認證問題以後再說。