1. 程式人生 > 其它 >core net 消費kafka_docker環境,搭建kafka叢集

core net 消費kafka_docker環境,搭建kafka叢集

技術標籤:core net 消費kafka

5731672b993fcd569520d0336e3cb683.png

​這篇文章分享如何在docker環境下搭建kafka叢集。

bin/jdk:8u221映象的構建,請參考 -- docker基礎環境搭建
bin/zookeeper:3.5.7映象的構建,請參考 -- docker環境,搭建zookeeper叢集

kafka的叢集資訊由zookeeper管理,先建立kafka-net docker網路並啟動zookeeper

sudo docker network create kafka-net
sudo docker run -d --name zk  --network kafka-net --network-alias zk bin/zookeeper:3.5.7

可以啟動zookeeper叢集,這裡為了簡單,只使用了單機的zookeeper。

下載kafka_2.12-2.3.1.tgz,並構建bin/kafka:2.3.1,Dockerfile如下

FROM bin/jdk:8u221

WORKDIR /usr/lib

COPY kafka_2.12-2.3.1.tgz .
COPY docker-entrypoint.sh /usr/local/bin
RUN tar -xzf kafka_2.12-2.3.1.tgz && rm kafka_2.12-2.3.1.tgz 
&& groupadd -r kafka && useradd -r -g kafka kafka 
&& mkdir -p /usr/local/kafka/data/ 
&& chown -R kafka:kafka /usr/local/kafka/data/ 
&& chmod 777  /usr/local/bin/docker-entrypoint.sh

VOLUME /usr/local/kafka/data/

WORKDIR /usr/lib/kafka_2.12-2.3.1
ENTRYPOINT ["docker-entrypoint.sh"]

docker-entrypoint.sh負責修改配置檔案,啟動kafka程序

#!/bin/bash

sed -i "/zookeeper.connect=/c zookeeper.connect=${ZOOKEEPER_CONNECT}" 
config/server.properties
sed -i "/log.dirs=/c log.dirs=/usr/local/kafka/data/" 
config/server.properties
sed -i "/broker.id=/c broker.id=${BROKER_ID}" 
config/server.properties
echo >> config/server.properties
if [ -n  "$LISTENERS" ]; then 
    echo "listeners=${LISTENERS}" >> 
    config/server.properties
fi
if [ -n "$ADVERTISED_LISTENERS" ]; then
    echo "advertised.listeners=${ADVERTISED_LISTENERS}" >> 
    config/server.properties
fi

exec gosu kafka bin/kafka-server-start.sh config/server.properties

使用sed的c命令對配置檔案進行整行替換,共修改了zookeeper.connect/log.dirs/broker.id三個配置。

listeners:啟動kafka服務監聽的ip/hostname和埠,不配置則使用java.net.InetAddress.getCanonicalHostName()獲取的值和9092埠。
advertised.listeners:生產者和消費者連線的地址,kafka會把該地址註冊到zookeeper中,不配置則使用listeners配置。

啟動kafka容器

for i in `seq 1 3`; do
    sudo  docker run  -d --name kafka-$i  
--network kafka-net --network-alias kafka-$i 
-e KAFKA_HEAP_OPTS="-Xmx512M -Xms512M" 
-e LISTENERS="PLAINTEXT://kafka-${i}:9092" 
-e ZOOKEEPER_CONNECT=zk:2181  
-e BROKER_ID=$i bin/kafka:2.3.1
done

由於宿主機記憶體較少,這裡使用KAFKA_HEAP_OPTS配置將kafka程序使用記憶體修改為-Xmx512M -Xms512M

建立topic

sudo docker exec kafka-1 bin/kafka-topics.sh --create 
 --bootstrap-server kafka-1:9092,kafka-2:9092,kafka-3:9092 
 --replication-factor 1 --partitions 1 --topic helloKafka

傳送訊息

$ sudo docker exec -it kafka-1 bin/kafka-console-producer.sh 
--broker-list kafka-1:9092,kafka-2:9092,kafka-3:9092   --topic helloKafka
>This is a message
>This is another message

消費訊息

$ sudo docker exec -it kafka-1 bin/kafka-console-consumer.sh 
--bootstrap-server kafka-1:9092,kafka-2:9092,kafka-3:9092 
--topic helloKafka --from-beginning
This is a message
This is another message

通過zookeeper檢視broker

$ sudo docker exec -it zk bin/zkCli.sh
[zk] get /brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-1:9092"],"jmx_port":-1,"host":"kafka-1","timestamp":"1583589384712","port":9092,"version":4}

如果您覺得本文不錯,歡迎關注我的微信公眾號,您的關注是我堅持的動力!

d084dc4fa58f61acbd3ca1ad84deb25c.png