1. 程式人生 > >centos7單機安裝kafka,進行生產者消費者測試

centos7單機安裝kafka,進行生產者消費者測試

【轉載請註明】:

原文出處:https://www.cnblogs.com/jstarseven/p/11364852.html   作者:jstarseven    碼字挺辛苦的..... 


  

一、kafka介紹

Kafka是最初由Linkedin公司開發,是一個分散式、分割槽的、多副本的、多訂閱者,基於zookeeper協調的分散式日誌系統(也可以當做MQ系統),常見可以用於web/nginx日誌、訪問日誌,訊息服務等等,Linkedin於2010年貢獻給了Apache基金會併成為頂級開源專案。

主要應用場景是:日誌收集系統和訊息系統。

Kafka主要設計目標如下:

  • 以時間複雜度為O(1)的方式提供訊息持久化能力,即使對TB級以上資料也能保證常數時間的訪問效能。
  • 高吞吐率。即使在非常廉價的商用機器上也能做到單機支援每秒100K條訊息的傳輸。
  • 支援Kafka Server間的訊息分割槽,及分散式消費,同時保證每個partition內的訊息順序傳輸。
  • 同時支援離線資料處理和實時資料處理。
  • 支援線上水平擴充套件

二、kafka架構圖

三、kafka安裝與測試

1、配置JDK環境
Kafka 使用Zookeeper 來儲存相關配置資訊,Kafka及Zookeeper 依賴Java 執行環境,從oracle網站下載JDK 安裝包,解壓安裝

1 tar zxvf jdk-8u171-linux-x64.tar.gz
2 mv jdk1.8.0_171 /usr/local/java/

設定Java 環境變數:

1 #java 
2 export JAVA_HOME=/usr/local/java/jdk1.8.0_171
3 export PATH=$PATH:$JAVA_HOME/bin
4 export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib:$JAVA_HOME/jre/lib

2、安裝kafka

下載地址:http://kafka.apache.org/downloads

1 cd /opt
2 wget http://mirror.bit.edu.cn/apache/kafka/2.3.0/kafka_2.11-2.3.0.tgz
3 tar zxvf kafka_2.11-2.3.0.tgz
4 mv kafka_2.11-2.3.0 /usr/local/apps/
5 cd /usr/local/apps/
6 ln -s kafka_2.11-2.3.0 kafka

3、啟動測試

(1)啟動Zookeeper服務

1 cd /usr/local/apps/kafka
2 #執行指令碼
3 bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
4 #檢視程序
5 jps

(2)啟動單機Kafka服務

1 #執行指令碼
2 bin/kafka-server-start.sh config/server.properties
3 #檢視程序
4 jps

(3)建立topic進行測試

1 #執行指令碼
2 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

(4)檢視topic列表

1 #執行指令碼
2 bin/kafka-topics.sh --list --zookeeper localhost:2181
3 輸出:test

(5)生產者訊息測試

1 #執行指令碼(使用kafka-console-producer.sh 傳送訊息)
2 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

(6)消費者訊息測試

1 #執行指令碼(使用kafka-console-consumer.sh 接收訊息並在終端列印)
2 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

 4、單機多broker叢集配置

 單機部署多個broker,不同的broker,設定不同的id、監聽埠、日誌目錄

1     cp config/server.properties config/server-1.properties 
2     vim server-1.properties
3     #修改:
4     broker.id=1
5     port=9093
6     log.dir=/tmp/kafka-logs-1
7     #啟動Kafka服務
8     bin/kafka-server-start.sh config/server-1.properties &

 5、java程式碼實現生產者消費者

  (1)maven專案新增kafka依賴

1 <dependency>
2          <groupId>org.apache.kafka</groupId>
3          <artifactId>kafka-clients</artifactId>
4          <version>2.3.0</version>
5 </dependency>

  (2)java程式碼實現

 1 package com.server.kafka;
 2 
 3 import org.apache.kafka.clients.consumer.ConsumerConfig;
 4 import org.apache.kafka.clients.consumer.ConsumerRecord;
 5 import org.apache.kafka.clients.consumer.ConsumerRecords;
 6 import org.apache.kafka.clients.consumer.KafkaConsumer;
 7 import org.apache.kafka.clients.producer.KafkaProducer;
 8 import org.apache.kafka.clients.producer.ProducerConfig;
 9 import org.apache.kafka.clients.producer.ProducerRecord;
10 import org.apache.kafka.common.serialization.StringDeserializer;
11 import org.apache.kafka.common.serialization.StringSerializer;
12 
13 import java.util.Collections;
14 import java.util.Properties;
15 import java.util.Random;
16 
17 
18 public class KafakaExecutor {
19 
20     public static String topic = "test";
21 
22     public static void main(String[] args) {
23         new Thread(()-> new Producer().execute()).start();
24         new Thread(()-> new Consumer().execute()).start();
25     }
26 
27     public static class Consumer {
28 
29         private void execute() {
30             Properties p = new Properties();
31             p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.21.181:9092");
32             p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
33             p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
34             p.put(ConsumerConfig.GROUP_ID_CONFIG, topic);
35 
36             KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(p);
37             // 訂閱訊息
38             kafkaConsumer.subscribe(Collections.singletonList(topic));
39 
40             while (true) {
41                 ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
42                 for (ConsumerRecord<String, String> record : records) {
43                     System.out.println(String.format("topic:%s,offset:%d,訊息:%s", //
44                             record.topic(), record.offset(), record.value()));
45                 }
46             }
47         }
48     }
49 
50
51     public static class Producer {
52 
53         private void execute() {
54             Properties p = new Properties();
55             //kafka地址,多個地址用逗號分割
56             p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.21.181:9092");
57             p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
58             p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
59             KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);
60 
61             try {
62                 while (true) {
63                     String msg = "Hello," + new Random().nextInt(100);
64                     ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);
65                     kafkaProducer.send(record);
66                     System.out.println("訊息傳送成功:" + msg);
67                     Thread.sleep(500);
68                 }
69             } catch (InterruptedException e) {
70                 e.printStackTrace();
71             } finally {
72                 kafkaProducer.close();
73             }
74         }
75 
76     }
77 }

  (3)測試結果(上面使用指令碼命令執行消費者的終端也會同步輸出訊息資料)

 參考:https://www.cnblogs.com/frankdeng/p/9310684.html

 

加七哥微信:kinyseven,來扯犢子啊

動動手,關注一下,扎心了

大道七哥