1. 程式人生 > >Kafka 集群配置SASL+ACL

Kafka 集群配置SASL+ACL

ima ted 認證 管理 ces 外部網絡 apache 其他 true

** Kafka 集群配置SASL+ACL

測試環境:**

                    系統: CentOS 6.5 x86_64
                    JDK : java version 1.8.0_121
                    kafka: kafka_2.11-1.0.0.tgz
                    zookeeper: 3.4.5
                    ip: 192.168.49.161 (我們這裏在一臺機上部署整套環境)

kafka 名詞解析:

                    Broker: Kafka 集群包含一個或多個服務器,這種服務器被稱為broker
                    Topic: 每條發布到Kafka 集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic 的消息
                    分開存儲,邏輯上一個Topic 的消息雖然保存於一個或多個broker 上但用戶只需指定消息的Topic 即可生產或
                    消費數據而不必關心數據存於何處)
                    Partition: Parition 是物理上的概念,每個Topic 包含一個或多個Partition
                    Producer: 負責發布消息到Kafka broker 即生產者
                    Consumer: 消息消費者,向Kafka broker 讀取消息的客戶端即消費者
                    Consumer Group:每個Consumer 屬於一個特定的Consumer Group(可為每個Consumer 指定group
                    name,若不指定group name 則屬於默認的group)

Kafka 拓撲結構(借用別人的圖)
技術分享圖片
一個典型的Kafka 集群中包含若幹Producer(可以是web 前端產生的Page View,或者是服務器日誌,系統 CPU、Memory 等),若幹broker(Kafka 支持水平擴展,一般broker 數量越多,集群吞吐率越高),若幹Consumer Group,以及一個Zookeeper 集群。Kafka 通過Zookeeper 管理集群配置,選舉leader,以及在Consumer Group 發生變化時進行rebalance。Producer 使用push 模式將消息發布到broker,Consumer 使用pull 模式從broker 訂閱並消費消息

kafka 集群部署
一 Zookeeper 集群部署(這裏部署的偽集群)

  1. zookeeper 部署比較簡單,解壓修改配置文件後即可使用, 默認的配置文件為zoo_sample.cfg 這裏我們直接新建一個zoo.cfg 的文件內容如下:

各參數意義這裏不作詳細解釋,想了解可以自行參考官網

                                        tickTime=2000
                                        initLimit=5
                                        syncLimit=2
                                        dataDir=/opt/zook/zookeeper1/data //指定數據路徑
                                        dataLogDir=/opt/zook/zookeeper1/logs //指定日誌路徑
                                        clientPort=2181 //監聽端口
                                        server.1=127.0.0.1:7771:7772
                                        server.2=127.0.0.1:7771:7772
                                        server.3=127.0.0.1:7771:7772
  2. 在zookeeper/data 下創建一個名為myid 的文件,並在文件內輸入數字1
 3. 集群部署則將上述配置的單節點再復制兩份,並將配置文件中
                            ```
                            dataDir , dataLogDir 兩個參數根據實際路徑修改
                            clientPort 分別配為2182,2183
                            myid 分別配置為2 3
                            ```
  4. 啟動集群

二 Kafka 集群部署

 由於集群最終是對外部網絡開放的,出於安全考慮本文采用的是SASL+ACL 控制和授權

1. broker 配置
1.1. 將配置server.properties 復制2 份並分別重命名成server-1.properties server-2.properties server-3.properties 修改server.properties 配置:(請根據實際環境填寫)

                                                                                                                    broker.id = 1 //另外兩個節點分別為2 3
                                                                                                                    host.name=192.168.49.161
                                                                                                                    log.dirs=/tmp/kafka-logs-1 //另外兩個節點分別為kafka-logs-2 kafka-logs-3 可以不
                                                                                                                    放在/tmp 下,如果放在其他地方必須註意目錄權限
                                                                                                                    zookeeper.connect=192.168.49.161:2181,192.168.49.161:2182,192.168.49.161:2183
                                                                                                                    port=9092 //另外兩個節點分別為9093 9094 也可以自定義
                                                                                                                    listeners=SASL_PLAINTEXT://192.168.49.161:9092 //另外兩個節點分別為9093 9094
                                    到這裏基礎配置已完成
                    1.2    要配置SASL 和ACL,需要在broker 端進行兩個方面的設置。
               首先是創建包含所有認證用戶信息的JAAS 文件,本例中我們有admin qjld 兩個用戶,其中admin 為管理員用於broker 集群間的認證, qjld 為遠程應用的認證用戶, 新增認證信息文件
                                                                                                                        ```
                                                                                                                        kafka_server_jaas.conf 內容為:
                                                                                                                        KafkaServer {
                                                                                                                        org.apache.kafka.common.security.plain.PlainLoginModule required
                                                                                                                        username="admin"
                                                                                                                        password="admin-mgr998778123"
                                                                                                                        user_admin="admin-mgr998778123"
                                                                                                                        user_qjld="123456";
                                                                                                                        };
                                                                                                                        ```
                                                                        本例中路徑為: /opt/kafka_2.11-1.0.0/config/kafka_server_jaas.conf , 我們需要將配置文件中的內容傳遞給JVM,因此需要修改/opt/kafka_2.11-                                                         1.0.0/bin/kafka-server-start.sh 腳本文件如下:
                                                                    ```
                                                                    在exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@" 之前一行添加export KAFKA_OPTS="-                                  Djava.security.auth.login.config=/opt/kafka_2.11-1.0.0/config/kafka_server_jaas.conf"
                                                                    然後保存退出。
                                                                    ```
                            其次我們要在所有broker 配置文件(server-x.properties)中增加:
                                                                                            ```
                                                                                            #ACL 入口類
                                                                                            authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
                                                                                            #SASL_PLAINTEXT
                                                                                            sasl.enabled.mechanisms=PLAIN
                                                                                            security.inter.broker.protocol=SASL_PLAINTEXT
                                                                                            sasl.mechanism.inter.broker.protocol=PLAIN
                                                                                            allow.everyone.if.no.acl.found=true
                                                                                            #設置admin 為超級用戶
                                                                                            super.users=User:admin
                                                                                            ```
                            現在我們可以啟動broker 測試了
                                                                                        ```
                                                                                        bin/kafka-server-start.sh config/server-1.properties &
                                                                                        bin/kafka-server-start.sh config/server-2.properties &
                                                                                        bin/kafka-server-start.sh config/server-3.properties &
                                                                                        ```
           啟動成功,此時broker 可以接收已認證的客戶端連接了,下面我們進行客戶端配置
                                                                        ```
                                                                        //創建一個topic 名為test
                                            bin/kafka-topics.sh--create --zookeeper 192.168.49.161:2181,192.168.49.161:2182, 192.168.49.161:2183 --replication-factor 1 --partitions 1 --topic test    
                                                                        //查看topic 是否創建成功
                                                                        bin/kafka-topics.sh --list --zookeeper 192.168.49.161:2181 
                                                                        ```
  **2. Client 配置**
            2.1 新增一個配置文件如:/opt/kafka_2.11-1.0.0/config/kafka_client_jaas.conf,內容為:
                                                                                                                        ```
                                                                                                                        KafkaClient {
                                                                                                                        org.apache.kafka.common.security.plain.PlainLoginModule required
                                                                                                                        username="qjld"
                                                                                                                        password="123456";
                                                                                                                        };
                                                                                                                        ```

同理我們也要將配置文件內容傳遞給JVM, 因此需要修改

                                                                /opt/kafka_2.11-1.0.0/bin/kafka-console-producer.sh 腳本文件如下:在exec $base_dir/kafka-run-class.sh
                                                                $EXTRA_ARGS kafka.Kafka "$@" 之前一行添加export
                                                                KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka_2.11-1.0.0/config/kafka_client_jaas.conf"
                                                                同樣的方法修改kafka-console-consumer.sh
                            2.2 需要在config/consumer.properties 和config/producer.properties 配置文件中追加:
                                                                ```
                                                                security.protocol=SASL_PLAINTEXT
                                                                sasl.mechanism=PLAIN
                                                                ```
                                    這兩行配置, 完成後再試運行:
                                                                ```
                                                                bin/kafka-console-producer.sh --broker-list 192.168.49.161:9092 --topic test                                                                 --producer.config config/producer.properties
                                                                bin/kafka-console-consumer.sh --bootstrap-server 192.168.49.161:9092 --topic test
                                                                --from-beginning  --consumer.config config/consumer.properties
                                                                ```
                                    接下來測試消息的寫入和讀取, 開啟兩個終端分別輸入下列
                                                            ```
                                                            bin/kafka-console-producer.sh --broker-list 192.168.49.161:9092 --topic test //消息寫入
                                                            bin/kafka-console-consumer.sh --bootstrap-server 192.168.49.161:9092 --topic test  --from-beginning 消息讀取
                                                            ```
               如果報錯信息如下:
                           `WARN Bootstrap broker 192.168.49.161:9092 disconnected (org.apache.kafka.clients.NetworkClient)`
               則說明配置的security 已生效, 要想普通用戶能讀寫消息,需要配置ACL
         2.3 配置ACL
                                                            ```
                                                            bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties
                                                            zookeeper.connect=192.168.49.161:2181,192.168.49.161:2182,192.168.49.161:2183 --add
                                                            --allow-principal User:qjld --operation All --topic test
                                                            ```
             這是為qjld 這個用戶配置所有權限,如果有更細的要求則可參考網上的,如:Read Write 等只需要將--operation all 改為--operation Read 多種則在其後面加一個--         operation write 即可

三 測試
這裏我選用python2.7 編寫的腳本進行測試
先安裝pip27 install kafka-python
技術分享圖片

Kafka 集群配置SASL+ACL