使用Docker(k8s)安裝Kafka並使用宿主機連線
阿新 • • 發佈:2021-01-13
技術標籤:KafkaDockerKuberneteskafkadockerkubernetes
使用Docker(k8s)安裝Kafka並使用宿主機連線
- 安裝Docker及docker-compose
具體安裝方法可以去官網看教程
檢查docker-compose是否安裝成功
- 建立 docker-compose.yml 檔案
version: '2'
services:
zookeeper:
image: "zookeeper"
hostname: "zookeeper.local"
container_name : "zookeeper"
#設定網路別名 可隨意取
networks:
local:
aliases:
- "zookeeper.local"
kafka:
image: "wurstmeister/kafka"
hostname: "kafka.local"
container_name: "kafka"
ports:
- "9092:9092"
networks :
local:
aliases:
- "kafka.local"
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka.local
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
#設定網路,名為local模式
networks:
local:
driver: bridge
- 進入Kafka容器
docker exec -it kafka /bin/bash
- 建立Topic
/opt/kafka_2.13-2.7.0/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 -replication-factor 1 --partitions 1 --topic test_topic
- 測試命令列生產消費
- 生產者
/opt/kafka_2.13-2.7.0/bin/kafka-console-producer.sh --broker-list kafka.local:9092 --topic test_topic
- 消費者
/opt/kafka_2.13-2.7.0/bin/kafka-console-consumer.sh --bootstrap-server kafka.local:9092 --topic test_top
ic --from-beginning
- 從宿主機使用程式碼連線Kafka
6.1 進入Zookeeper容器檢視brokers註冊資訊
6.2 檢視brokers註冊資訊# 進入容器 docker exec -it zookeeper /bin/bas # 進入zookeeper命令列 bin/zkCli.sh
get /brokers/ids/1001
6.3 配置宿主機hosts
6.4 使用Java程式碼連線Kafka# 新增 127.0.0.1 kafka.local
public class KafkaConsumerDemo { public static void main(String[] args) { //1.配置消費者連線屬性 Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.local:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_demo"); //2.建立Kafka消費者 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); //3.訂閱topics consumer.subscribe(Arrays.asList("test_topic")); // consumer.assign(Arrays.asList(new TopicPartition("test_topic", 0))); // consumer.seekToBeginning(Arrays.asList(new TopicPartition("test_topic", 0)));//不改變當前offset //4.死迴圈讀取訊息 for (; ; ) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); if (records != null && !records.isEmpty()) { records.forEach(r -> { System.out.println("key:" + r.key() + "----value:" + r.value()); }); } } } }
測試成功