1. 程式人生 > 其它 >使用Docker(k8s)安裝Kafka並使用宿主機連線

使用Docker(k8s)安裝Kafka並使用宿主機連線

技術標籤:KafkaDockerKuberneteskafkadockerkubernetes

使用Docker(k8s)安裝Kafka並使用宿主機連線

  1. 安裝Docker及docker-compose
    具體安裝方法可以去官網看教程
    檢查docker-compose是否安裝成功
    在這裡插入圖片描述
  2. 建立 docker-compose.yml 檔案
version: '2'
services:
  zookeeper:
    image: "zookeeper"
    hostname: "zookeeper.local"
    container_name
: "zookeeper" #設定網路別名 可隨意取 networks: local: aliases: - "zookeeper.local" kafka: image: "wurstmeister/kafka" hostname: "kafka.local" container_name: "kafka" ports: - "9092:9092" networks
: local: aliases: - "kafka.local" environment: KAFKA_ADVERTISED_HOST_NAME: kafka.local KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 #設定網路,名為local模式 networks: local: driver: bridge
  1. 進入Kafka容器
docker exec -it kafka /bin/bash
  1. 建立Topic
/opt/kafka_2.13-2.7.0/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 -replication-factor 1 --partitions 1 --topic test_topic
  1. 測試命令列生產消費
  • 生產者
/opt/kafka_2.13-2.7.0/bin/kafka-console-producer.sh --broker-list kafka.local:9092 --topic test_topic

在這裡插入圖片描述

  • 消費者
/opt/kafka_2.13-2.7.0/bin/kafka-console-consumer.sh --bootstrap-server kafka.local:9092 --topic test_top
ic --from-beginning

在這裡插入圖片描述

  1. 從宿主機使用程式碼連線Kafka
    6.1 進入Zookeeper容器檢視brokers註冊資訊
    # 進入容器
    docker exec -it zookeeper /bin/bas
    
    # 進入zookeeper命令列
    bin/zkCli.sh
    
    6.2 檢視brokers註冊資訊
    get /brokers/ids/1001
    
    在這裡插入圖片描述
    6.3 配置宿主機hosts
    # 新增
    127.0.0.1	kafka.local
    
    6.4 使用Java程式碼連線Kafka
    public class KafkaConsumerDemo {
    public static void main(String[] args) {
    	//1.配置消費者連線屬性
    	Properties props = new Properties();
    	props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.local:9092");
    	props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    	props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    	props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_demo");
    
    
    	//2.建立Kafka消費者
    	KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    
    	//3.訂閱topics
    	consumer.subscribe(Arrays.asList("test_topic"));
    //		consumer.assign(Arrays.asList(new TopicPartition("test_topic", 0)));
    //		consumer.seekToBeginning(Arrays.asList(new TopicPartition("test_topic", 0)));//不改變當前offset
    
    	//4.死迴圈讀取訊息
    	for (; ; ) {
    		ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    
    		if (records != null && !records.isEmpty()) {
    			records.forEach(r -> {
    				System.out.println("key:" + r.key() + "----value:" + r.value());
    			});
    		}
    	}
      }
    }
    

測試成功
在這裡插入圖片描述