1. 程式人生 > >Kafka詳解安裝

Kafka詳解安裝

一:介紹

Kafka是最初由Linkedin公司開發,是一個分散式、支援分割槽的(partition)、多副本的(replica),基於zookeeper協調的分散式訊息系統,它的最大的特性就是可以實時的處理大量資料以滿足各種需求場景:比如基於hadoop的批處理系統、低延遲的實時系統、storm/Spark流式處理引擎,web/nginx日誌、訪問日誌,訊息服務等等,用scala語言編寫,Linkedin於2010年貢獻給了Apache基金會併成為頂級開源 專案。

二:特性

  • 高吞吐量、低延遲:kafka每秒可以處理幾十萬條訊息,它的延遲最低只有幾毫秒,每個topic可以分多個partition, consumer group 對partition進行consume操作。
  • 可擴充套件性:kafka叢集支援熱擴充套件
  • 永續性、可靠性:訊息被持久化到本地磁碟,並且支援資料備份防止資料丟失
  • 容錯性:允許叢集中節點失敗(若副本數量為n,則允許n-1個節點失敗)
  • 高併發:支援數千個客戶端同時讀寫

三:使用場景
- 日誌收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一介面服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
- 訊息系統:解耦和生產者和消費者、快取訊息等。
- 使用者活動跟蹤:Kafka經常被用來記錄web使用者或者app使用者的各種活動,如瀏覽網頁、搜尋、點選等活動,這些活動資訊被各個伺服器釋出到kafka的topic中,然後訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到hadoop、資料倉庫中做離線分析和挖掘。
- 運營指標:Kafka也經常用來記錄運營監控資料。包括收集各種分散式應用的資料,生產各種操作的集中反饋,比如報警和報告。
- 流式處理:比如spark streaming和storm
- 事件源
這裡寫圖片描述

四:檔案儲存機制

Kafka中釋出訂閱的物件是topic。我們可以為每類資料建立一個topic,把向topic釋出訊息的客戶端稱作producer,從topic訂閱訊息的客戶端稱作consumer。Producers和consumers可以同時從多個topic讀寫資料。一個kafka叢集由一個或多個broker伺服器組成,它負責持久化和備份具體的kafka訊息。
Broker:Kafka節點,一個Kafka節點就是一個broker,多個broker可以組成一個Kafka叢集。
Topic:一類訊息,訊息存放的目錄即主題,例如page view日誌、click日誌等都可以以topic的形式存在,Kafka叢集能夠同時負責多個topic的分發。
Partition:topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的佇列
Segment:partition物理上由多個segment組成,每個Segment存著message資訊
Producer : 生產message傳送到topic
Consumer : 訂閱topic消費message, consumer作為一個執行緒來消費
Consumer Group:一個Consumer Group包含多個consumer, 這個是預先在配置檔案中配置好的。各個consumer(consumer 執行緒)可以組成一個組(Consumer group ),partition中的每個message只能被組(Consumer group ) 中的一個consumer(consumer 執行緒 )消費,如果一個message可以被多個consumer(consumer 執行緒 ) 消費的話,那麼這些consumer必須在不同的組。Kafka不支援一個partition中的message由兩個或兩個以上的consumer thread來處理,即便是來自不同的consumer group的也不行。它不能像AMQ那樣可以多個BET作為consumer去處理message,這是因為多個BET去消費一個Queue中的資料的時候,由於要保證不能多個執行緒拿同一條message,所以就需要行級別悲觀所(for update),這就導致了consume的效能下降,吞吐量不夠。而kafka為了保證吞吐量,只允許一個consumer執行緒去訪問一個partition。如果覺得效率不高的時候,可以加partition的數量來橫向擴充套件,那麼再加新的consumer thread去消費。這樣沒有鎖競爭,充分發揮了橫向的擴充套件性,吞吐量極高。這也就形成了分散式消費的概念。

五:拓撲結構
這裡寫圖片描述
一個典型的Kafka叢集中包含若干Producer(可以是web前端FET,或者是伺服器日誌等),若干broker(Kafka支援水平擴充套件,一般broker數量越多,叢集吞吐率越高),若干ConsumerGroup,以及一個Zookeeper叢集。Kafka通過Zookeeper管理Kafka叢集配置:選舉Kafka broker的leader,以及在Consumer Group發生變化時進行rebalance,因為consumer消費kafka topic的partition的offsite資訊是存在Zookeeper的。Producer使用push模式將訊息釋出到broker,Consumer使用pull模式從broker訂閱並消費訊息。

六:搭建zk叢集
Kafka叢集是把狀態儲存在Zookeeper中的,首先要搭建Zookeeper叢集。
1、軟體環境
(3臺伺服器-我的測試)
192.168.229.135 cluster1
192.168.229.130 cluster2
192.168.229.136 cluster3

推薦使用3.4.6 因為kafka在此版本上進行了大量測試 修復了很多bug,經生產環境驗證,此版本很穩定。

搭建步驟:

1.示例中使用的是3.4.5 請注意更換版本。
2.修改zoo.cfg 中clientPort=12181 預設是2181

啟動ZK:
cd /usr/softAddress/zookeeper/zookeeper-3.4.6/bin

./zkServer.sh start

七:搭建kafka叢集

1、軟體環境
1、linux一臺或多臺,大於等於2
2、已經搭建好的zookeeper叢集
3、軟體版本kafka_2.11-0.9.0.1.tgz
2、建立目錄並下載安裝軟體
cd /usr/softAddress;
mkdir kafka #建立專案目錄
cd kafka
mkdir kafkalogs #建立kafka訊息目錄,主要存放kafka訊息

下載軟體

解壓軟體

tar -zxvf kafka_2.11-0.9.0.1.tgz
3、修改三臺機器的配置檔案
進入到config目錄
cd /usr/softAddress/kafka/kafka_2.11-0.9.0.1/config
這裡寫圖片描述
有很多檔案,這裡可以發現有Zookeeper檔案,我們可以根據Kafka內帶的zk叢集來啟動,但是建議使用獨立的zk叢集
修改
cd /usr/softAddress/kafka/kafka_2.11-0.9.0.1/config
vi server.properties
broker.id=0 每臺伺服器的broker.id都不能相同 三臺分別為0,1,2

hostname

host.name=192.168.229.135

advertised.host.name=192.168.229.130

log.dirs=/usr/softAddress/kafka/kafkalogs

在log.retention.hours=168 下面新增下面三項

message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880

設定zookeeper的連線埠

zookeeper.connect=192.168.229.135:12181,192.168.229.136:12181,192.168.229.130:12181
引數解釋:瞭解即可

broker.id=0  #當前機器在叢集中的唯一標識,和zookeeper的myid性質一樣
port=9092 #當前kafka對外提供服務的埠預設是9092
host.name=192.168.7.100 #這個引數預設是關閉的,在0.8.1有個bug,DNS解析問題,失敗率的問題。
num.network.threads=3 #這個是borker進行網路處理的執行緒數
num.io.threads=8 #這個是borker進行I/O處理的執行緒數
log.dirs=/opt/kafka/kafkalogs/ #訊息存放的目錄,這個目錄可以配置為“,”逗號分割的表示式,上面的num.io.threads要大於這個目錄的個數這個目錄,如果配置多個目錄,新建立的topic他把訊息持久化的地方是,當前以逗號分割的目錄中,那個分割槽數最少就放那一個
socket.send.buffer.bytes=102400 #傳送緩衝區buffer大小,資料不是一下子就傳送的,先回儲存到緩衝區了到達一定的大小後在傳送,能提高效能
socket.receive.buffer.bytes=102400 #kafka接收緩衝區大小,當資料到達一定大小後在序列化到磁碟
socket.request.max.bytes=104857600 #這個引數是向kafka請求訊息或者向kafka傳送訊息的請請求的最大數,這個值不能超過java的堆疊大小
num.partitions=1 #預設的分割槽數,一個topic預設1個分割槽數
log.retention.hours=168 #預設訊息的最大持久化時間,168小時,7天
message.max.byte=5242880  #訊息儲存的最大值5M
default.replication.factor=2  #kafka儲存訊息的副本數,如果一個副本失效了,另一個還可以繼續提供服務
replica.fetch.max.bytes=5242880  #取訊息的最大直接數
log.segment.bytes=1073741824 #這個引數是:因為kafka的訊息是以追加的形式落地到檔案,當超過這個值的時候,kafka會新起一個檔案
log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄檢視是否有過期的訊息如果有,刪除
log.cleaner.enable=false #是否啟用log壓縮,一般不用啟用,啟用的話可以提高效能
zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #設定zookeeper的連線埠

4.啟動Kafka叢集並測試
1、啟動服務
從後臺啟動Kafka叢集(3臺都需要啟動)

cd /usr/softAddress/kafka/kafka_2.11-0.9.0.1/bin

./kafka-server-start.sh -daemon ../config/server.properties

2、檢查服務是否啟動
執行命令jps
這裡寫圖片描述
建立Topic來驗證是否建立成功

建立Topic

./kafka-topics.sh –create –zookeeper 192.168.229.135:12181 –replication-factor 2 –partitions 1 –topic test
這裡寫圖片描述

解釋

–replication-factor 2 #複製兩份
–partitions 1 #建立1個分割槽
–topic #主題為shuaige

”’在192.168.229.135伺服器上建立一個釋出者”’
./kafka-console-producer.sh –broker-list 192.168.229.135:9092 –topic test

”’在192.168.229.136伺服器上建立一個訂閱者”’
./kafka-console-consumer.sh –zookeeper 192.168.229.136:12181 –topic test –from-beginning

效果如下:

傳送訊息:
這裡寫圖片描述
接收訊息:
這裡寫圖片描述

八:kafka生產者模型
8.1、同步生產模型
這裡寫圖片描述
傳送一條訊息,如果沒有收到kafka叢集的確認收到的訊號,則再次重發,直到傳送次數超過設定的最大次數為止。其中有一次收到了確認,就接著傳送下一條訊息。
8.2非同步生產模型
這裡寫圖片描述
訊息傳送到客戶端的緩衝佇列中,如果佇列中條數到了設定的佇列最大數或存放時間達到最大值,就把佇列中的訊息打包,一次性發送給kafka服務端。
兩種生產模型對比:
同步生產模型:
(1)低訊息丟失率;
(2)高訊息重複率(由於網路原因,回覆確認未收到);
(3)高延遲
非同步生產模型:
(1)低延遲;
(2)高發送效能;
(3)高訊息丟失率(無確認機制,傳送端佇列滿)

九:Kafka消費者模型
Kafka訊息系統基於釋出-訂閱模式,相對於ActiveMQ,Rabbitmq沒有點對點訊息處理機制。
9.1、分割槽消費模型
這裡寫圖片描述
2臺kafka 伺服器,4個分割槽(P0-P3) ,分割槽消費模型即為:1個分割槽對應1個消費例項,如圖4個分割槽,需要4個消費者例項從分割槽中取資料。

9.2 分割槽消費編碼思路
(1)獲取分割槽的size,一共多少個分割槽;
(2)針對每一個分割槽,分別建立一個執行緒,去消費該分割槽的資料
(3)每個執行緒即為一個消費者例項,通過連線;執行消費者構建;消費offset (偏移量);記錄訊息偏移量。

9.3 組消費模型
這裡寫圖片描述
同樣4個分割槽,P0-P3,這裡使用GroupA,GroupB,GroupA可獲取0,3,1,2分割槽的資料,GourpB也是。分組消費模型中,每個組都能拿到kafka叢集當前全量資料。
4、組消費實現思路
(1)獲取group裡有多少個consumer例項
(2)根據例項個數,建立執行緒
(3)執行run方法,啟動消費
兩種消費模型對比:
分割槽消費模型更加靈活但是:
(1)需要自己處理各種異常情況;
(2)需要自己管理offset(以實現訊息傳遞的其他語義);
組消費模型更加簡單,但是不靈活:
(1)不需要自己處理異常情況,不需要自己管理offset;
(2)只能實現kafka預設的最少一次訊息傳遞語義;

十:Java編寫生產者和消費者
生產者:

10.1 建立maven工程,pom.xml中增加如下

 <dependency>  
          <groupId>org.apache.kafka</groupId>  
          <artifactId>kafka_2.10</artifactId>  
          <version>0.8.2.0</version>  
   </dependency> 

10.2.向主題test內寫入資料

package com.kafka;

import java.util.Properties;  
import java.util.concurrent.TimeUnit;  

import kafka.javaapi.producer.Producer;  
import kafka.producer.KeyedMessage;  
import kafka.producer.ProducerConfig;  
import kafka.serializer.StringEncoder;  




public class kafkaProducer extends Thread{  

    private String topic;  

    public kafkaProducer(String topic){  
        super();  
        this.topic = topic;  
    }  


    @Override  
    public void run() {  
        Producer producer = createProducer();  
        int i=0;  
        while(true){  
            producer.send(new KeyedMessage<Integer, String>(topic, "message: " + i++));  
            try {  
                TimeUnit.SECONDS.sleep(1);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }  

    private Producer createProducer() {  
        Properties properties = new Properties();  
        properties.put("zookeeper.connect", "192.168.229.130:12181,192.168.229.136:12181,192.168.229.135:12181");//宣告zk  
        properties.put("serializer.class", StringEncoder.class.getName());  
        properties.put("metadata.broker.list", "192.168.229.130:9092,192.168.229.135:9092,192.168.229.136:9092");// 宣告kafka broker  
        return new Producer<Integer, String>(new ProducerConfig(properties));  
     }  


    public static void main(String[] args) {  
        new kafkaProducer("test").start();// 使用kafka叢集中建立好的主題 test   

    }  

}  

消費者:

package com.kafka;
import java.util.HashMap;  
import java.util.List;  
import java.util.Map;  
import java.util.Properties;  

import kafka.consumer.Consumer;  
import kafka.consumer.ConsumerConfig;  
import kafka.consumer.ConsumerIterator;  
import kafka.consumer.KafkaStream;  
import kafka.javaapi.consumer.ConsumerConnector;  
public class kafkaConsumer extends Thread{  

    private String topic;  

    public kafkaConsumer(String topic){  
        super();  
        this.topic = topic;  
    }  


    @Override  
    public void run() {  
        ConsumerConnector consumer = createConsumer();  
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
        topicCountMap.put(topic, 1); // 一次從主題中獲取一個數據  
         Map<String, List<KafkaStream<byte[], byte[]>>>  messageStreams = consumer.createMessageStreams(topicCountMap);  
         KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 獲取每次接收到的這個資料  
         ConsumerIterator<byte[], byte[]> iterator =  stream.iterator();  
         while(iterator.hasNext()){  
             String message = new String(iterator.next().message());  
             System.out.println("接收到: " + message);  
         }  
    }  

    private ConsumerConnector createConsumer() {  
        Properties properties = new Properties();  
        properties.put("zookeeper.connect", "192.168.229.130:12181,192.168.229.136:12181,192.168.229.135:12181");//宣告zk  
        properties.put("group.id", "group1");// 必須要使用別的組名稱, 如果生產者和消費者都在同一組,則不能訪問同一組內的topic資料  
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));  
     }  


    public static void main(String[] args) {  
        new kafkaConsumer("test").start();// 使用kafka叢集中建立好的主題 test   

    }  

}  

測試:
執行生產者和消費者
檢視消費者輸出
這裡寫圖片描述