第二十六記·Kafka介紹及安裝 使用
XY個人記
概述
Kafka是一個開源流處理平臺,它允許釋出和訂閱記錄流。在這方面,它類似於訊息佇列或企業訊息傳遞系統。Kafka是一種高吞吐量的分散式釋出訂閱訊息系統,它可以處理消費者規模的網站中的所有動作流資料。 這種動作(網頁瀏覽,搜尋和其他使用者的行動)是在現代網路上的許多社會功能的一個關鍵因素。 這些資料通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。 對於像Hadoop的一樣的日誌資料和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行載入機制來統一線上和離線的訊息處理,也是為了通過叢集來提供實時的訊息。
由於Kafka存在高容錯、高擴充套件、分散式等特性,Kafka主要應用 訊息系統、日誌收集系統、Metrics監控系統
Kafka有四個核心API
Producer API:允許應用程式釋出的記錄流至一個或多個Kafka topics。
Consumer API:允許應用程式訂閱一個或多個topics,並處理所產生的對他們記錄的資料流。
Streams API:允許應用程式充當stream processor(流處理器),從一個或多個topics消耗的輸入流,併產生一個輸出流至一個或多個輸出的topics,有效地變換所述輸入流,以輸出流。
Connector API:允許構建和執行Kafka topics連線到現有的應用程式或資料系統中重用producers or consumers。例如,關係資料庫的聯結器可能捕獲對錶的每個更改。
Kafka的基本概念
Message (訊息佇列) |
傳遞的資料物件,主要由四部分構成: offset(偏移量)、key、value、timestamp(插入時間) |
Broker (代理者) | Kafka叢集包含一個或多個伺服器,這種伺服器被稱為broke, 是一個物理概念。 |
Topic (訊息類別) | 每條釋出到Kafka叢集的訊息都有一個類別,這個類別被稱為Topic。(物理上不同
Topic的訊息分開儲存,邏輯上一個Topic的訊息雖然保存於一個或多個broker上但
使用者只需指定訊息的Topic即可生產或消費資料而不必關心資料存於何處)。 |
Partition (分割槽) |
具體維護Kafka上的訊息資料的最小單位,一個Topic可以包含多個分割槽; Partition: 特性ordered&immutable。(在資料的產生和消費過程中,不需要關注資料具體儲存的Partition在那個Broker上,只需要指定Topic即可,由Kafka負責將資料和對應的Partition關聯上) |
Producer (生產者) | 負責將資料傳送到Kafka對應Topic的程序 |
Consumer (消費者) | 負責從對應Topic獲取資料的程序 |
Consumer Group (消費者組) | 每個consumer都屬於一個特定的group組,一個group組可以包含 多個consumer, 但一個組中只會有一個consumer消費資料。 |
Kafka 安裝
安裝方式主要由三種,分別是:單機、偽分散式、完全分散式;完全分散式搭建和偽分散式一樣區別在於在不同的機器上配置kafka的server.properties,其他都是一樣的。
前提要安裝好java和scala
配置偽分散式
1.下載tar包並解壓,我下載的是scala版本是2.11 kafka版本是0.8.2.1的
$ tar -zxf /opt/software/kafka_2.11-0.8.2.1.tgz -C ./
$ mv kafka_2.11-0.8.2.1 kafka-0.8.2.1
2.配置 server.properties 檔案,在config資料夾中有一個 server.properties檔案,一個 server.properties檔案代表一個Kafka的服務。
$ mv server.properties server0.properties
#指定該broker服務所管理的資料儲存的路徑
$ mkdir datas
2.1配置一個server的
broker.id=0 #每一個kafka的broker服務具有一個唯一的id,要求非負數
port=9092 #每個broker監聽的埠號,要求不被佔用
host.name=hadoop01.com #每一個broker監聽的主機名
advertised.host.name/advertised.port #指定生產者和消費者連線kafka叢集使用的主機名和埠號,預設使用host.name和port的配置
log.dirs=/opt/modules/apache/kafka-0.8.2.1/datas/0 #指定該broker服務所管理的資料儲存的路徑,可以指定多個,多個用逗號
zookeeper.connect=hadoop01.com:2181/jfy_kafka #給定kafka的元資料管理zk伺服器的路徑以及儲存元資料的路徑 #如果zk是多個,則用逗號隔開
2.2配置多個broker並更改相應的server裡面的資訊,修改的具體資訊見2.1
$ cp server0.properties server1.properties
$ cp server0.properties server2.properties
$ cp server0.properties server3.properties
$ cp server0.properties server3.properties
如更改server1.properties:
broker.id=1
port=9093
log.dirs=/opt/modules/apache/kafka-0.8.2.1/datas/1
其他以上依次修改,不重複即可
3.3詳細配置如圖所示,具體需要配置項如server1.properties,其他配置只需要修改broker.id port log.dirs 保證不重複即可
4.啟動服務
4.1.啟動zk服務
$ bin/zkServer.sh start
4.2啟動kafka的服務
$ bin/kafka-server-start.sh config/server0.properties
# 後臺啟動需要加【-daemon】
$ bin/kafka-server-start.sh -daemon config/server0.properties
$ bin/kafka-server-start.sh -daemon config/server1.properties
$ bin/kafka-server-start.sh -daemon config/server2.properties
$ bin/kafka-server-start.sh -daemon config/server3.properties
Kafka基本操作
1.建立Topic
#--topic ijeffrey0 名稱 partitions 分割槽數 replication-factor 備份數
$ bin/kafka-topics.sh --create --zookeeper hadoop01.com:2181/jfy_kafka --topic ijeffrey0 --partitions 2 --replication-factor 2
$ bin/kafka-topics.sh --create --zookeeper hadoop01.com:2181/jfy_kafka --topic ijeffrey1 --partitions 6 --replication-factor 2
#執行如下會報錯:Error while executing topic command replication factor: 6 larger than available brokers: 4
$ bin/kafka-topics.sh --create --zookeeper hadoop01.com:2181/jfy_kafka --topic ijeffrey2 --partitions 2 --replication-factor 6
注意:一個topic的分割槽數量可以超過broker服務的數量,一個分割槽的副本(備份)數不能超過broker服務的數量,一般的應用中,一個topic的分割槽數目為broker服務數目的1~2倍, 一般應用中,副本數最好不要超過3,另外不能大於broker的數量。
2.列出所有的Topic
[[email protected] kafka-0.8.2.1]$ bin/kafka-topics.sh --list --zookeeper hadoop01.com:2181/jfy_kafka
ijeffrey0
ijeffrey1
3.列出所有的Topic詳細資訊
[[email protected] kafka-0.8.2.1]$ bin/kafka-topics.sh --describe --zookeeper hadoop01.com:2181/jfy_kafka
Topic:ijeffrey0 PartitionCount:2 ReplicationFactor:2 Configs:
Topic: ijeffrey0 Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: ijeffrey0 Partition: 1 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic:ijeffrey1 PartitionCount:6 ReplicationFactor:2 Configs:
Topic: ijeffrey1 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: ijeffrey1 Partition: 1 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: ijeffrey1 Partition: 2 Leader: 0 Replicas: 0,3 Isr: 0,3
Topic: ijeffrey1 Partition: 3 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: ijeffrey1 Partition: 4 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: ijeffrey1 Partition: 5 Leader: 3 Replicas: 3,0 Isr: 3,0
#Replicas 備份 Isr存活的
# 列出指定的Topic的具體資訊
[[email protected] kafka-0.8.2.1]$ bin/kafka-topics.sh --describe --zookeeper hadoop01.com:2181/jfy_kafka --topic ijeffrey1
Topic:ijeffrey1 PartitionCount:6 ReplicationFactor:2 Configs:
Topic: ijeffrey1 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: ijeffrey1 Partition: 1 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: ijeffrey1 Partition: 2 Leader: 0 Replicas: 0,3 Isr: 0,3
Topic: ijeffrey1 Partition: 3 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: ijeffrey1 Partition: 4 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: ijeffrey1 Partition: 5 Leader: 3 Replicas: 3,0 Isr: 3,0
4.1.修改Topic配置資訊
$ bin/kafka-topics.sh --alter --zookeeper hadoop01.com:2181/jfy_kafka --topic ijeffrey1 --config max.message.bytes=102400
4.2.刪除Topic配置資訊
$ bin/kafka-topics.sh --alter --zookeeper hadoop01.com:2181/jfy_kafka --topic ijeffrey1 --delete-config max.message.bytes
5.更改分割槽數
# 分割槽數只能改大不能改小
$ bin/kafka-topics.sh --alter --zookeeper hadoop01.com:2181/jfy_kafka --topic ijeffre1 --partitions 7
6.刪除指定Topic
$ bin/kafka-topics.sh --delete --zookeeper hadoop01.com:2181/jfy_kafka --topic ijeffrey1
#只是標記刪除,並不是真正的刪除,我們可以在zookeeper裡面看到相應的資訊
現在呢我們檢視我們的Topic資訊,顯示我們的ijeffrey1 是一個標記刪除的狀態
然後我們可以刪除zookeeper裡面的ijeffrey的資訊
然後在檢視list,發現我們的刪除標記消失,那麼要怎麼徹底刪除呢?
徹底刪除Topic的方式有兩種: 第一種:在server.properties 新增一個引數delete.topic.enable ,設定為true,即可使用刪除命令刪除topic 第二種:執行刪除命令後再手動去磁碟刪除topic相關資訊以及zk中對應的topic內容
啟動Kafka自帶Producer和Consumer進行資料測試
1.啟動一個生產者
$ bin/kafka-console-producer.sh -broker-list hadoop01.com:9092,hadoop01.com:9093,hadoop01.com:9094,hadoop01.com:9095 --topic ijeffrey0
2.啟動一個消費者
$ bin/kafka-console-consumer.sh --topic ijeffrey0 --zookeeper hadoop01.com:2181/jfy_kafka
3.在客戶端生產者輸入資料,在消費者端可以接收到資料。如果生產者在開始生產資料過程中還沒有啟動消費者,當我們啟動消費者時需要接收啟動之前的資料,那麼需要加上引數 【--from-beginning】
#實際生產過程中由於資料量過大所以很少用
$ bin/kafka-console-consumer.sh --topic ijeffrey0 --zookeeper hadoop01.com:2181/jfy_kafka --from-beginning
在zookeeper裡可以檢視資訊,當我們再次生產資料的時候,位元組偏移量一會隨之變化,中間需要一定時間
Kafka訊息儲存機制
一個Topic分為多個Partition來進行資料管理,一個Partition中的資料是有序、不可變 的,使用偏移量(offset)唯一標識一條資料,是一個long型別的數據
Partition接收到producer傳送過來資料後,會產生一個遞增的offset偏移量資料,同時 將資料儲存到本地的磁碟檔案中(檔案內容追加的方式寫入資料);Partition中的資料存活時間超過引數值(log.retention.{ms,minutes,hours},預設7天)的時候進行刪除(預設)
Consumer根據offset消費對應Topic的Partition中的資料(也就是每個Consumer消費的每個Topic的Partition都擁有自己的offset偏移量)
注意:Kafka的資料消費是順序讀寫的,磁碟的順序讀寫速度(600MB/sec)比隨機讀寫 速度(100k/sec)快
Kafka訊息消費機制
Kafka有兩種模式消費資料:佇列和釋出訂閱;在佇列模式下,一條資料只會發 送給customer group中的一個customer進行消費;在釋出訂閱模式下,一條數 據會發送給多個customer進行消費;Kafka的Customer基於offset對kafka中的資料進行消費,對於一個customer group中的所有customer共享一個offset偏移量;Kafka中通過控制Customer的引數{group.id}來決定kafka是什麼資料消費模式, 如果所有消費者的該引數值是相同的,那麼此時的kafka就是類似於佇列模式, 資料只會傳送到一個customer,此時類似於負載均衡;否則就是釋出訂閱模式。
Kafka的資料是按照分割槽進行排序的(插入的順序),也就是每個分割槽中的資料是有 序的。在Consumer進行資料消費的時候,也是對分割槽的資料進行有序的消費的, 但是不保證所有資料的有序性(多個分割槽之間);Consumer Rebalance:當一個consumer group組中的消費者數量和對應Topic的分割槽數量一致的時候,此時一個Consumer消費一個Partition的資料; 如果不一致,那麼可能出現一個Consumer消費多個Partition的資料或者不消費 資料的情況,這個機制是根據Consumer和Partition的數量動態變化的;Consumer通過poll的方式主動從Kafka叢集中獲取資料。如下圖所示:
Kafka分散式機制
一個Topic中的所有資料分散式的儲存在kafka叢集的所有機器(broker)上,以分 區(partition)的形式進行資料儲存;每個分割槽允許存在備份資料/備份分割槽(存 儲在同一kafka叢集的其它broker上的分割槽)
每個資料分割槽在Kafka叢集中存在一個broker節點上的分割槽叫做leader,儲存在 其它broker上的備份分割槽叫做followers;只有leader節點負責該分割槽的資料讀 寫操作,followers節點作為leader節點的熱備節點,從leader節點備份資料;當 leader節點掛掉的時候,followers節點中會有一個節點變成leader節點,重新提 供服務
Kafka叢集的Partition的leader和followers切換依賴Zookeeper
接下來會使用Java程式碼來寫幾個詳細的測試案例