1. 程式人生 > 其它 >第六章 Kafka訊息佇列

第六章 Kafka訊息佇列

主要內容:

  • Kafka概述
  • Kafka叢集部署
  • Kafka與Flume比較

6.1 Kafka概述

6.1.1 訊息佇列

訊息系統負責將資料從一個應用程式傳輸到另一個應用程式,因此應用程式可以專注於資料本身,而不用擔心如何共享它。

訊息系統有兩種訊息模式可用

  • 點對點訊息系統
  • 釋出- 訂閱(pub-sub)訊息系統

1)點對點模式(一對一,消費者主動拉取資料,訊息收到後訊息清除)

點對點模型通常是一個基於拉取或者輪詢的訊息傳送模型,這種模型從佇列中請求資訊,而不是將訊息推送到客戶端。這個模型的特點是傳送到佇列的訊息被一個且只有一個接收者接收處理,即使有多個訊息監聽者也是如此。

2)釋出/訂閱模式(一對多)

釋出訂閱模型則是另一個訊息傳送模型。釋出訂閱模型可以有多種不同的訂閱者,臨時訂閱者只在主動監聽主題時才接收訊息,而持久訂閱者則監聽主題的所有訊息,即使當前訂閱者不可用,處於離線狀態。

6.1.2 為什麼需要訊息佇列

1)解耦:

允許你獨立的擴充套件或修改兩邊的處理過程,只要確保它們遵守同樣的介面約束。

2)冗餘:

訊息佇列把資料進行持久化直到它們已經被完全處理,通過這一方式規避了資料丟失風險。許多訊息佇列所採用的"插入-獲取-刪除"正規化中,在把一個訊息從佇列中刪除之前,需要你的處理系統明確的指出該訊息已經被處理完畢,從而確保你的資料被安全的儲存直到你使用完畢。

3)擴充套件性:

因為訊息佇列解耦了你的處理過程,所以增大訊息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。

4)靈活性 & 峰值處理能力:

在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見。如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用訊息佇列能夠使關鍵元件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。

5)可恢復性:

系統的一部分元件失效時,不會影響到整個系統。訊息佇列降低了程序間的耦合度,所以即使一個處理訊息的程序掛掉,加入佇列中的訊息仍然可以在系統恢復後被處理。

6)順序保證:

在大多使用場景下,資料處理的順序都很重要。大部分訊息佇列本來就是排序的,並且能保證資料會按照特定的順序來處理。(Kafka保證一個Partition

內的訊息的有序性)

7)緩衝:

有助於控制和優化資料流經過系統的速度解決生產訊息和消費訊息的處理速度不一致的情況。

8)非同步通訊:

很多時候,使用者不想也不需要立即處理訊息。訊息佇列提供了非同步處理機制,允許使用者把一個訊息放入佇列,但並不立即處理它。想向佇列中放入多少訊息就放多少,然後在需要的時候再去處理它們。

6.1.3 什麼是Kafka

在流式計算中,Kafka一般用來快取資料,Spark通過消費Kafka的資料進行計算。

1Apache Kafka是一個開源訊息系統,由Scala寫成。是由Apache軟體基金會開發的一個開源訊息系統專案。

2Kafka最初是由LinkedIn公司開發,並於2011年初開源。201210月從Apache Incubator畢業。該專案的目標是為處理實時資料提供一個統一、高通量、低等待的平臺。

3)Kafka是一個分散式訊息佇列。Kafka對訊息儲存是根據Topic進行歸類,傳送訊息者稱為Producer,訊息接受者稱為Consumer,此外kafka叢集有多個kafka例項組成,每個例項(server)稱為broker

4)無論是kafka叢集,還是consumer都依賴於zookeeper叢集儲存一些meta資訊,來保證系統可用性。

6.1.4 Kafka架

Kafka整體架構圖

Kafka詳細架構圖

1Producer :訊息生產者,就是向kafka broker發訊息的客戶端;

2Consumer :訊息消費者,向kafka broker取訊息的客戶端;

其中的offset(記錄著下一條將要傳送給Consumer的訊息的序號)需要存在其他位置,低版本0.9之前將offset儲存在Zookeeper中,0.9及之後儲存在Kafka的“__consumer_offsets”主題中。

3Topic :可以理解為一個佇列;

4 Consumer Group (CG):這是kafka用來實現一個topic訊息的廣播(發給所有的consumer)和單播(發給任意一個consumer)的手段。

一個topic可以有多個CG。topic的訊息會複製(不是真的複製,是概念上的)到所有的CG,但每個partion只會把訊息發給該CG中的一個consumer。

如果需要實現廣播,只要每個consumer有一個獨立的CG就可以了。要實現單播只要所有的consumer在同一個CG。用CG還可以將consumer進行自由的分組而不需要多次傳送訊息到不同的topic;

5Broker :一臺kafka伺服器就是一個broker。一個叢集由多個broker組成。一個broker可以容納多個topic;

6Partition:為了實現擴充套件性,一個非常大的topic可以分佈到多個broker(即伺服器)上,一個topic可以分為多個partition,每個partition是一個有序的佇列

partition中的每條訊息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將訊息發給consumer,不保證一個topic的整體(多個partition間)的順序;

6.2 Kafka叢集部署

6.2.1 環境準備

1 叢集規劃

01node 02node 03node

zk zk zk

kafka kafka kafka

2 jar包下載

http://kafka.apache.org/downloads.ht

6.2.2Kafka叢集部署

1)解壓安裝包,當前案例中使用的是2.4.0版本

# tar -zxvf kafka_2.11-2.4.0.tgz

2)修改解壓後的檔名稱

[root@01node local]# mv kafka_2.11-2.4.0 kafka

3)修改配置檔案

[root@01node local]# cd kafka/config
[root@01node config]# vi server.properties

修改以下內容:

#broker的全域性唯一編號,不能重複
broker.id=0
#kafka執行日誌存放的路徑
log.dirs=/usr/local/kafka/logs
#配置連線Zookeeper叢集地址
zookeeper.connect=01node:2181,02node:2181,03node:2181

4)配置環境變數

[root@01node config]$ vi /etc/profile

#KAFKA_HOME
export KAFKA_HOME=/usr/local/kafka
export PATH=$KAFKA_HOME/bin:$PATH

[root@01node config]# source /etc/profile

5)分發安裝包

[root@01node local]# scp -r kafka root@02node:/usr/local/
[root@01node local]# scp -r kafka root@03node:/usr/local/

[root@01node local]# scp /etc/profile  root@02node:/etc/profile
[root@01node local]# scp /etc/profile  root@03node:/etc/profile

記得重新整理0203節點的配置檔案

6)分別在02node和03node上修改配置檔案server.properties中的broker.id=1、broker.id=2

注:broker.id不得重複

7)在三個節點上啟動zookeeper叢集

[root@01node local]# cd zookeeper/
[root@01node zookeeper]# bin/zkServer.sh start

8)啟動kafka叢集

依次在01node、02node、03node節點上啟動kafka

[root@01node kafka]# bin/kafka-server-start.sh config/server.properties
[root@02node kafka]# bin/kafka-server-start.sh config/server.properties
[root@03node kafka]# bin/kafka-server-start.sh config/server.properties

9)關閉叢集

[root@01node kafka]$ bin/kafka-server-stop.sh stop
[root@02node kafka]$ bin/kafka-server-stop.sh stop
[root@03node kafka]$ bin/kafka-server-stop.sh stop
同時記得關閉zookeeper

6.2.3 Kafka命令列操作

1)檢視當前伺服器中的所有topic

[root@01node kafka]$ bin/kafka-topics.sh --zookeeper 01node:2181 –list

2)建立topic

[root@01node kafka]$ bin/kafka-topics.sh --zookeeper 01node:2181 --create --replication-factor 3 --partitions 1 --topic first

選項說明:

--topic 定義topic名

--replication-factor 定義副本數

--partitions 定義分割槽數

3)刪除topic

[root@01node kafka]$ bin/kafka-topics.sh --zookeeper 01node:2181 --delete --topic first

需要server.properties中設定delete.topic.enable=true否則只是標記刪除或者直接重啟。

4)傳送訊息

[root@01node kafka]$ bin/kafka-console-producer.sh --broker-list 01node:9092 --topic first
>hello world
>root  root

5)消費訊息

[root@02node kafka]$ bin/kafka-console-consumer.sh --bootstrap-server 01node:9092 --from-beginning --topic first

--from-beginning:會把主題中以往所有的資料都讀取出來。

6)檢視某個Topic的詳情

[root@01node kafka]$ bin/kafka-topics.sh --zookeeper 01node:2181 --describe --topic first

6.3 Kafka與Flume比較

在企業中必須要清楚流式資料採集框架flume和kafka的定位是什麼:

flume:cloudera公司研發:

適合多個生產者;

適合下游資料消費者不多的情況;

適合資料安全性要求不高的操作;

適合與Hadoop生態圈對接的操作。

kafka:linkedin公司研發:

適合資料下游消費眾多的情況;

適合資料安全性要求較高的操作,支援replication。

因此我們常用的一種模型是:

線上資料 --> flume--> kafka--> flume(根據情景增刪該流程) -->HDFS