centos7單機安裝kafka,進行生產者消費者測試
阿新 • • 發佈:2019-08-16
【轉載請註明】:
原文出處:https://www.cnblogs.com/jstarseven/p/11364852.html 作者:jstarseven 碼字挺辛苦的.....
一、kafka介紹
Kafka是最初由Linkedin公司開發,是一個分散式、分割槽的、多副本的、多訂閱者,基於zookeeper協調的分散式日誌系統(也可以當做MQ系統),常見可以用於web/nginx日誌、訪問日誌,訊息服務等等,Linkedin於2010年貢獻給了Apache基金會併成為頂級開源專案。
主要應用場景是:日誌收集系統和訊息系統。
Kafka主要設計目標如下:
- 以時間複雜度為O(1)的方式提供訊息持久化能力,即使對TB級以上資料也能保證常數時間的訪問效能。
- 高吞吐率。即使在非常廉價的商用機器上也能做到單機支援每秒100K條訊息的傳輸。
- 支援Kafka Server間的訊息分割槽,及分散式消費,同時保證每個partition內的訊息順序傳輸。
- 同時支援離線資料處理和實時資料處理。
- 支援線上水平擴充套件
二、kafka架構圖
三、kafka安裝與測試
1、配置JDK環境
Kafka 使用Zookeeper 來儲存相關配置資訊,Kafka及Zookeeper 依賴Java 執行環境,從oracle網站下載JDK 安裝包,解壓安裝
1 tar zxvf jdk-8u171-linux-x64.tar.gz 2 mv jdk1.8.0_171 /usr/local/java/
設定Java 環境變數:
1 #java 2 export JAVA_HOME=/usr/local/java/jdk1.8.0_171 3 export PATH=$PATH:$JAVA_HOME/bin 4 export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
2、安裝kafka
下載地址:http://kafka.apache.org/downloads
1 cd /opt 2 wget http://mirror.bit.edu.cn/apache/kafka/2.3.0/kafka_2.11-2.3.0.tgz 3 tar zxvf kafka_2.11-2.3.0.tgz 4 mv kafka_2.11-2.3.0 /usr/local/apps/ 5 cd /usr/local/apps/ 6 ln -s kafka_2.11-2.3.0 kafka
3、啟動測試
(1)啟動Zookeeper服務
1 cd /usr/local/apps/kafka 2 #執行指令碼 3 bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 4 #檢視程序 5 jps
(2)啟動單機Kafka服務
1 #執行指令碼 2 bin/kafka-server-start.sh config/server.properties 3 #檢視程序 4 jps
(3)建立topic進行測試
1 #執行指令碼 2 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
(4)檢視topic列表
1 #執行指令碼 2 bin/kafka-topics.sh --list --zookeeper localhost:2181 3 輸出:test
(5)生產者訊息測試
1 #執行指令碼(使用kafka-console-producer.sh 傳送訊息) 2 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
(6)消費者訊息測試
1 #執行指令碼(使用kafka-console-consumer.sh 接收訊息並在終端列印) 2 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
4、單機多broker叢集配置
單機部署多個broker,不同的broker,設定不同的id、監聽埠、日誌目錄
1 cp config/server.properties config/server-1.properties 2 vim server-1.properties 3 #修改: 4 broker.id=1 5 port=9093 6 log.dir=/tmp/kafka-logs-1 7 #啟動Kafka服務 8 bin/kafka-server-start.sh config/server-1.properties &
5、java程式碼實現生產者消費者
(1)maven專案新增kafka依賴
1 <dependency> 2 <groupId>org.apache.kafka</groupId> 3 <artifactId>kafka-clients</artifactId> 4 <version>2.3.0</version> 5 </dependency>
(2)java程式碼實現
1 package com.server.kafka; 2 3 import org.apache.kafka.clients.consumer.ConsumerConfig; 4 import org.apache.kafka.clients.consumer.ConsumerRecord; 5 import org.apache.kafka.clients.consumer.ConsumerRecords; 6 import org.apache.kafka.clients.consumer.KafkaConsumer; 7 import org.apache.kafka.clients.producer.KafkaProducer; 8 import org.apache.kafka.clients.producer.ProducerConfig; 9 import org.apache.kafka.clients.producer.ProducerRecord; 10 import org.apache.kafka.common.serialization.StringDeserializer; 11 import org.apache.kafka.common.serialization.StringSerializer; 12 13 import java.util.Collections; 14 import java.util.Properties; 15 import java.util.Random; 16 17 18 public class KafakaExecutor { 19 20 public static String topic = "test"; 21 22 public static void main(String[] args) { 23 new Thread(()-> new Producer().execute()).start(); 24 new Thread(()-> new Consumer().execute()).start(); 25 } 26 27 public static class Consumer { 28 29 private void execute() { 30 Properties p = new Properties(); 31 p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.21.181:9092"); 32 p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 33 p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 34 p.put(ConsumerConfig.GROUP_ID_CONFIG, topic); 35 36 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(p); 37 // 訂閱訊息 38 kafkaConsumer.subscribe(Collections.singletonList(topic)); 39 40 while (true) { 41 ConsumerRecords<String, String> records = kafkaConsumer.poll(100); 42 for (ConsumerRecord<String, String> record : records) { 43 System.out.println(String.format("topic:%s,offset:%d,訊息:%s", // 44 record.topic(), record.offset(), record.value())); 45 } 46 } 47 } 48 } 49 50 51 public static class Producer { 52 53 private void execute() { 54 Properties p = new Properties(); 55 //kafka地址,多個地址用逗號分割 56 p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.21.181:9092"); 57 p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 58 p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 59 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p); 60 61 try { 62 while (true) { 63 String msg = "Hello," + new Random().nextInt(100); 64 ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg); 65 kafkaProducer.send(record); 66 System.out.println("訊息傳送成功:" + msg); 67 Thread.sleep(500); 68 } 69 } catch (InterruptedException e) { 70 e.printStackTrace(); 71 } finally { 72 kafkaProducer.close(); 73 } 74 } 75 76 } 77 }
(3)測試結果(上面使用指令碼命令執行消費者的終端也會同步輸出訊息資料)
參考:https://www.cnblogs.com/frankdeng/p/9310684.html
加七哥微信:kinyseven,來扯犢子啊
動動手,關注一下,扎心了
大道七哥