docker:zookeeper與kafka實現分散式訊息佇列
阿新 • • 發佈:2019-01-11
一、安裝
下載映象
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
通過docker-compose啟動
docker-compose.yml指令碼(zk+kafka版)
version: '2' services: zoo1: image: wurstmeister/zookeeper restart: unless-stopped hostname: zoo1 ports: - "2182:2181" container_name: zookeeper kafka1: image: wurstmeister/kafka ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: localhost KAFKA_ZOOKEEPER_CONNECT: "zoo1:2182" KAFKA_BROKER_ID: 1 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_CREATE_TOPICS: "stream-in:1:1,stream-out:1:1" depends_on: - zoo1 container_name: kafka
啟動
docker-compose up -d
二、使用
進入kafka
docker exec -it kafka /bin/bash
cd /opt/kafka_2.11-1.1.0
建立主題
bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic mykafka //zk為容器版 bin/kafka-topics.sh --create --zookeeper 192.168.247.131:2181 --replication-factor 1 --partitions 1 --topic mykafka //zk為本地版 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mykafka //執行一個訊息生產者,指定topic為剛剛建立的主題 bin/kafka-console-consumer.sh --zookeeper 192.168.247.131:2181 --topic mykafka --from-beginning //執行一個消費者,指定同樣的主題
檢視
bin/kafka-topics.sh --list --zookeeper localhost:2181
傳送訊息
傳送訊息 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message 啟動消費者 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message
單節點叢集
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
config/server-1.properties:
broker.id=2
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-1
zookeeper.connect=192.168.247.131:2182
config/server-2.properties:
broker.id=3
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs-2
zookeeper.connect=192.168.247.131:2183
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
//配置並啟動基於zk叢集的兩個kafka
bin/kafka-topics.sh --create --zookeeper 192.168.247.131:2182 --replication-factor 2 --partitions 1 --topic my-replicated-topic
複製數量小於等於相關聯的zk叢集數,且埠號必須是叢集內的zk
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
//檢視叢集狀態
//“leader”是負責給定分割槽的所有讀取和寫入的節點。每個節點將成為隨機選擇的分割槽部分的領導者。
//“replicas”是複製此分割槽日誌的節點列表,無論它們是否為領導者,或者即使它們當前處於活動狀態。
//“isr”是“同步”複製品的集合。這是副本列表的子集,該列表當前處於活躍狀態並且已經被領導者捕獲。
資料處理(使用Kafka Connect匯入/匯出資料)
echo -e "foo\nbar" > test.txt
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
more test.sink.txt
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
echo Another line>> test.txt