1. 程式人生 > >kafka簡介和入門

kafka簡介和入門

一、Kafka概述

Kafka® is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.

這是Kafka官網裡的一段介紹,從這段話,我們可以知道,kafka在構建實時資料管道和streaming應用中被常使用,它可以橫向擴充套件,容錯,快速地執行在數以千計的公司產品上。

它可以像訊息系統一樣讀寫資料流,並且可以在實時業務的場景中寫可靠的流處理應用,並且能安全地儲存資料流到分散式、多副本、容錯的叢集中。所以通俗點理解,可以說Kafka就是一個訊息中介軟體。

Kafka場景比喻

接下來我大概比喻下Kafka的使用場景

訊息中介軟體:生產者和消費者

媽媽:生產者
你:消費者
饅頭:資料流、訊息
正常情況下: 生產一個  消費一個
其他情況:  
1)一直生產,你吃到某一個饅頭時,你卡主(機器故障), 饅頭就丟失了
2)一直生產,做饅頭速度快,你吃來不及,饅頭也就丟失了
為了放著其他生產情況的出現,我們可以拿個碗/籃子,饅頭做好以後先放到籃子裡,你要吃的時候去籃子裡面取出來吃,而這籃子/框就可以為:Kafka。當籃子滿了,饅頭就裝不下了,咋辦? 多準備幾個籃子 === Kafka的擴容

二、Kafka的架構和核心概念

這是張我在Kafka官網上截的圖,我大概可以把Kafka的主要結構分為以下幾點:

producer:生產者,就是生產饅頭(老媽)
consumer:消費者,就是吃饅頭的(你)
broker:籃子
topic:主題,給饅頭帶一個標籤,topica的饅頭是給你吃的,topicb的饅頭是給你弟弟吃

另外,我們在看下官網上的這段解釋:

 

First a few concepts:

  • Kafka is run as a cluster on one or more servers.
  • The Kafka cluster stores streams of records
     in categories called topics.
  • Each record consists of a key, a value, and a timestamp.

可以看出來:

1.Kafka可以作為叢集執行在一臺或者多個伺服器上面;

2.Kafka叢集可以分類地儲存記錄流,以打標籤的方式,就是採用topics,每個broker可以打個topic,這樣能保證消費者可以根據topic選擇性消費;

3.每個記錄由Key、Value、timestamp構成。

Kafka四個核心的API

1.ProducerAPI:允許一個應用向一個或多個topic裡釋出記錄流;

2.ConsumerAPI:允許一個應用訂閱一個或多個topics,處理topic裡的資料流,就相當於消費;

3.StreamAPI:允許應用扮演流處理的作用,從一個或多個topic裡消費資料流,然後產生輸出流資料到其他一個或多個topic裡,對輸入流資料有效傳輸到輸出口;

4.ConnectorAPI:允許執行和構建一個可重複利用的生產者和消費者,能將kafka的topic與其他存在的應用和資料庫裝置相連線,比如連結一個實時資料庫,可以捕捉到每張表的變化。

這四個API,主要應用在IDEA上對應用程式的開發中,通過程式碼的形式管理Kafka。在第四部分將會對前兩個API寫個簡單DEMO演示。

三、Kafka的快速使用

Kafka使用到了zookeeper,所以首先你得安裝zookeeper再安裝kafka。

1.單節點的broker部署

首先我們需要修改$KAFKA_HOME/config/server.properties這個配置檔案,主要以下幾處需要修改:
broker.id=0,每個broker的ID需要唯一
listeners:監聽的埠(此處筆者設定的是預設埠9092)
host.name:當前機器
log.dirs:儲存日誌的資料夾

num.partitions:分割槽的數量
zookeeper.connect:zookeeper的地址(預設為localhost:2181)

這幾處根據你自身需要進行配置,然後啟動步驟如下:

1)開啟zookeeper,此處需要注意的是zookeeper的conf目錄下的zoo.cfg配置檔案,主要修改的也是日誌儲存目錄那塊。

2)啟動Kafka,命令為:kafka-server-start.sh $KAFKA_HOME/config/server.properties

3)建立topic,需要指定zookeeper,命令為:kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic hello_topic。 注意指定zookeeper,後面幾個屬性可以根據你實際情況進行定義。另外檢視所有topic的命令為:
kafka-topics.sh --list --zookeeper hadoop000:2181

4)傳送訊息,需要指定broker,命令為:kafka-console-producer.sh --broker-list hadoop000:9092 --topic hello_topic

5)消費訊息,需要指定zookeeper,命令為:kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic hello_topic --from-beginning。意思就是指定zookeeper上的topic進行消費,from-beginning的設定,可以檢視之前的訊息。

2.單節點,多broker

主要是增加多個server.properties檔案,一個配置檔案就相當於一個broker,我就設定三個broker:

 

 
  1. server-1.properties

  2. log.dirs=/home/hadoop/app/tmp/kafka-logs-1

  3. listeners=PLAINTEXT://:9093

  4. broker.id=1

  5.  
  6. server-2.properties

  7. log.dirs=/home/hadoop/app/tmp/kafka-logs-2

  8. listeners=PLAINTEXT://:9094

  9. broker.id=2

  10.  
  11. server-3.properties

  12. log.dirs=/home/hadoop/app/tmp/kafka-logs-3

  13. listeners=PLAINTEXT://:9095

  14. broker.id=3

然後依次開啟,命令如下:

 

 
  1. kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties &

  2. kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties &

  3. kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties &

  4.  

 接下來就跟上面的步驟一樣:

 

 
  1. kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

  2.  
  3. kafka-console-producer.sh --broker-list hadoop000:9093,hadoop000:9094,hadoop000:9095 --topic my-replicated-topic

  4. kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic my-replicated-topic

 檢視 topic的詳細資訊:

 

kafka-topics.sh --describe --zookeeper hadoop000:2181 --topic my-replicated-topic

要注意的是,副本中會有個leader,而多副本也實現了kafka的容錯性,掛掉一個副本後,會自動在剩下副本里選出一個leader來同步操作。

 

根據上面步驟操作,我們在producer視窗輸入,在consumer消費視窗看到相應輸出。

四、Producer和Consumer API的使用

接下來展示一個簡單的Demo,在生產端簡單建立個執行緒進行迴圈輸出,然後用消費者端對輸出的內容進行展示,也就是消費。

配置檔案

 
  1. /**

  2. * Kafka常用配置檔案

  3. */

  4. public class KafkaProperties {

  5.  
  6. public static final String ZK = "192.168.199.111:2181";

  7.  
  8. public static final String TOPIC = "hello_topic";

  9.  
  10. public static final String BROKER_LIST = "192.168.199.111:9092";

  11.  
  12. public static final String GROUP_ID = "test_group1";

  13.  
  14. }

 Producer API DEMO

 
  1. import kafka.javaapi.producer.Producer;

  2. import kafka.producer.KeyedMessage;

  3. import kafka.producer.ProducerConfig;

  4.  
  5. import java.util.Properties;

  6.  
  7. /**

  8. * Kafka生產者

  9. */

  10. public class KafkaProducer extends Thread{

  11.  
  12. private String topic;

  13.  
  14. private Producer<Integer, String> producer;

  15.  
  16. public KafkaProducer(String topic) {

  17. this.topic = topic;

  18.  
  19. Properties properties = new Properties();

  20.  
  21. properties.put("metadata.broker.list",KafkaProperties.BROKER_LIST);

  22. properties.put("serializer.class","kafka.serializer.StringEncoder");

  23. properties.put("request.required.acks","1");

  24.  
  25. producer = new Producer<Integer, String>(new ProducerConfig(properties));

  26. }

  27.  
  28.  
  29. @Override

  30. public void run() {

  31.  
  32. int messageNo = 1;

  33.  
  34. while(true) {

  35. String message = "message_" + messageNo;

  36. producer.send(new KeyedMessage<Integer, String>(topic, message));

  37. System.out.println("Sent: " + message);

  38.  
  39. messageNo ++ ;

  40.  
  41. try{

  42. Thread.sleep(2000);

  43. } catch (Exception e){

  44. e.printStackTrace();

  45. }

  46. }

  47.  
  48. }

  49. }

Consumer API DEMO

 
  1. import kafka.consumer.Consumer;

  2. import kafka.consumer.ConsumerConfig;

  3. import kafka.consumer.ConsumerIterator;

  4. import kafka.consumer.KafkaStream;

  5. import kafka.javaapi.consumer.ConsumerConnector;

  6.  
  7. import java.util.HashMap;

  8. import java.util.List;

  9. import java.util.Map;

  10. import java.util.Properties;

  11.  
  12. /**

  13. * Kafka消費者

  14. */

  15. public class KafkaConsumer extends Thread{

  16.  
  17. private String topic;

  18.  
  19. public KafkaConsumer(String topic) {

  20. this.topic = topic;

  21. }

  22.  
  23.  
  24. private ConsumerConnector createConnector(){

  25. Properties properties = new Properties();

  26. properties.put("zookeeper.connect", KafkaProperties.ZK);

  27. properties.put("group.id",KafkaProperties.GROUP_ID);

  28. return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

  29. }

  30.  
  31. @Override

  32. public void run() {

  33. ConsumerConnector consumer = createConnector();

  34.  
  35. Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

  36. topicCountMap.put(topic, 1);

  37. // topicCountMap.put(topic2, 1);

  38. // topicCountMap.put(topic3, 1);

  39.  
  40. // String: topic

  41. // List<KafkaStream<byte[], byte[]>> 對應的資料流

  42. Map<String, List<KafkaStream<byte[], byte[]>>> messageStream = consumer.createMessageStreams(topicCountMap);

  43.  
  44. KafkaStream<byte[], byte[]> stream = messageStream.get(topic).get(0); //獲取我們每次接收到的暑假

  45.  
  46. ConsumerIterator<byte[], byte[]> iterator = stream.iterator();

  47.  
  48.  
  49. while (iterator.hasNext()) {

  50. String message = new String(iterator.next().message());

  51. System.out.println("rec: " + message);

  52. }

  53. }

  54. }

最後在main函式對這兩個類呼叫即可,結果如下:

 

 

--------------------- 作者:瘋兔子大叔 來源:CSDN 原文:https://blog.csdn.net/wing_93/article/details/78513782?utm_source=copy 版權宣告:本文為博主原創文章,轉載請附上博文連結!