kafka叢集搭建及原理
Apache Kafka 企業級訊息佇列
爬蟲課程:原生佇列、多執行緒重複消費的問題、ArrayBlockingQueue阻塞佇列
分散式爬蟲:使用Redis的list資料結構做佇列。
分散式電商:AMQ 訊息佇列、釋出一個商品時傳送一個訊息,有程式消費訊息建立靜態化頁面。
Apache Kafka:訊息佇列、隨著大資料興起,現在非常流行。
1、課程目標
- 理解 Apache Kafka是什麼
- 掌握Apache Kafka的基本架構
- 搭建Kafka叢集
- 掌握操作叢集的兩種方式
- 熟悉Apache Kafka原理
- Kafka原理-分片與副本機制
- Kafka原理-訊息不丟失機制
- Kafka原理-訊息儲存及查詢機制
- Kafka原理-生產者資料分發策略
- Kafka原理-消費者的負載均衡機制
- 瞭解Apache Kafka 監控及運維
2、Apache Kafka是什麼
是什麼?有什麼用?怎麼用?
是什麼?三個定義
- Apache Kafka 是一個訊息佇列(生產者消費者模式)
- Apache Kafka 目標:構建企業中統一的、高通量、低延時的訊息平臺。
- 大多的是訊息佇列(訊息中介軟體)都是基於JMS標準實現的,Apache Kafka 類似於JMS的實現。
有什麼用?(訊息佇列有什麼用?)
- 作為緩衝,來異構、解耦系統。
- 使用者註冊需要完成多個步驟,每個步驟執行都需要很長時間。代表使用者等待時間是所有步驟的累計時間。
- 為了減少使用者等待的時間,使用並行執行,有多少個步驟,就開啟多少個執行緒來執行。代表使用者等待時間是所有步驟中耗時最長的那個步驟時間。
- 有了新得問題:開啟多執行緒執行每個步驟,如果以一個步驟執行異常,或者嚴重超時,使用者等待的時間就不可控了。
- 通過訊息佇列來保證。
- 註冊時,立即返回成功。
- 傳送註冊成功的訊息到訊息平臺。
- 對註冊資訊感興趣的程式,可以訊息訊息。
3、Apache Kafka的基本架構
Kafka Cluster:由多個伺服器組成。每個伺服器單獨的名字broker(掮客)。
Kafka Producer:生產者、負責生產資料。
Kafka consumer:消費者、負責消費資料。
Kafka Topic: 主題,一類訊息的名稱。儲存資料時將一類資料存放在某個topci下,消費資料也是消費一類資料。
訂單系統:建立一個topic,叫做order。
使用者系統:建立一個topic,叫做user。
商品系統:建立一個topic,叫做product。
注意:Kafka的元資料都是存放在zookeeper中。
4、搭建Kafka叢集
4.1、準備3臺虛擬機器
192.168.140.128 kafka01
192.168.140.129 kafka02
192.168.140.130 kafka03
4.2、初始化環境
1)安裝jdk、安裝zookeeper
2)安裝目錄
安裝包存放的目錄:/export/software
安裝程式存放的目錄:/export/servers
資料目錄:/export/data
日誌目錄:/export/logs
mkdir -p /export/servers/ mkdir -p /export/software / mkdir -p /export/data / mkdir -p /export/logs / |
3)安裝使用者
安裝hadoop,會建立一個hadoop使用者
安裝kafka,建立一個kafka使用者
或者 建立bigdata使用者,用來安裝所有的大資料軟體。
本例:使用root使用者
- 驗證環境
- jdk環境
-
- zookeeper環境
zkServer.sh status
4.3、搭建Kafka叢集
4.3.1、準備安裝包
由於kafka是scala語言編寫的,基於scala的多個版本,kafka釋出了多個版本。
其中2.11是推薦版本。
4.3.2、下載安裝包及解壓
tar -zxvf kafka_2.11-1.0.0.tgz -C /export/servers/ cd /export/servers/ rm -rf /export/servers/kafka rm -rf /export/logs/kafka/ rm -rf /export/data/kafka mv kafka_2.11-1.0.0 kafka |
- 解壓檔案
- 刪除之前的安裝記錄
- 重新命名
4.3.3、檢視目錄及修改配置檔案
4.3.3.1檢視目錄
4.3.3.2修改配置檔案
進入配置目錄,檢視server.properties檔案
cat server.properties |grep -v "#"
通過以上命令,檢視到了預設的配置檔案,對預設的檔案進行修改。
修改三個地方
- Borker.id
- 資料存放的目錄,注意目錄如果不存在,需要新建下。
- zookeeper的地址資訊
# broker.id 標識了kafka叢集中一個唯一broker。 broker.id=0 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 # 存放生產者生產的資料 資料一般以topic的方式存放 # 建立一個數據存放目錄 /export/data/kafka --- mkdir -p /export/data/kafka log.dirs=/export/data/kafka num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 # zk的資訊 zookeeper.connect=zk01:2181,zk02:2181,zk03:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 |
4.3.4、分發配置檔案及修改brokerid
將修改好的配置檔案,分發到node02,node03上。
先在node02、node03上刪除以往的安裝記錄
rm -rf /export/servers/kafka rm -rf /export/logs/kafka/ rm -rf /export/data/kafka |
分發安裝包
scp -r /export/servers/kafka/ node02:/export/servers/ scp -r /export/servers/kafka/ node03:/export/servers/ |
修改node02上的broker.id
vi /export/servers/kafka/config/server.properties |
修改node03上的broker.id
vi /export/servers/kafka/config/server.properties |
4.3.4、啟動叢集
cd /export/servers/kafka/bin ./kafka-server-start.sh /export/servers/kafka/config/server.properties |
4.3.5、檢視Kafka叢集
由於kafka叢集並沒有UI介面可以檢視。
需要藉助外部工具,來檢視卡夫卡的叢集
這個工具是一個java程式,必須要安裝好JDK
5、操作叢集的兩種方式
需求:訂單系統,需要傳送訊息。 後面後3個程式需要接受這個訊息,並做後續的處理。
5.1、使用控制檯執行
- 建立一個訂單的topic。
bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic order |
- 編寫程式碼啟動一個生產者,生產資料
bin/kafka-console-producer.sh --broker-list kafka01:9092 --topic order |
- 編寫程式碼啟動給一個消費者,消費資料
bin/kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning --topic order |
5.2、使用Java api執行
1)java工程-maven,依賴。
<dependency> |
- 編寫程式碼-寫生產者的程式碼
/** |
- 編寫程式碼-寫消費者的程式碼
/** |
6、Apache Kafka原理
6.1、Apache Kafka原理-分片與副本機制
bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic order
分片:solrcloud中有提及到。
當資料量非常大的時候,一個伺服器存放不了,就將資料分成兩個或者多個部分,存放在多臺伺服器上。每個伺服器上的資料,叫做一個分片。
副本:solrcloud中有提及到。
當資料只儲存一份的時候,有丟失的風險。為了更好的容錯和容災,將資料拷貝幾份,儲存到不同的機器上。
6.2、Apache Kafka原理-訊息不丟失機制
6.2.1、生產者端訊息不丟失
- 訊息生產分為同步模式和非同步模式
- 訊息確認分為三個狀態
- 0:生產者只負責傳送資料
- 1:某個partition的leader收到資料給出響應
- -1:某個partition的所有副本都收到資料後給出響應
- 在同步模式下
- 生產者等待10S,如果broker沒有給出ack響應,就認為失敗。
- 生產者重試3次,如果還沒有響應,就報錯。
- 在非同步模式下
- 先將資料儲存在生產者端的buffer中。Buffer大小是2萬條。
- 滿足資料閾值或者數量閾值其中的一個條件就可以傳送資料。
- 傳送一批資料的大小是500條。
如果broker遲遲不給ack,而buffer又滿了。
開發者可以設定是否直接清空buffer中的資料。
6.2.2、Borker端訊息不丟失
broker端的訊息不丟失,其實就是用partition副本機制來保證。
Producer ack -1. 能夠保證所有的副本都同步好了資料。其中一臺機器掛了,並不影像資料的完整性。
6.2.3、消費者端訊息不丟失
只要記錄offset值,消費者端不會存在訊息不丟失的可能。只會重複消費。
6.3、Apache Kafka原理-訊息儲存及查詢機制
6.3.1、檔案儲存機制
segment段中有兩個核心的檔案一個是log,一個是index。 當log檔案等於1G時,新的會寫入到下一個segment中。
通過下圖中的資料,可以看到一個segment段差不多會儲存70萬條資料。
6.3.2、檔案查詢機制
6.4、Apache Kafka原理-生產者資料分發策略
kafka在資料生產的時候,有一個數據分發策略。預設的情況使用DefaultPartitioner.class類。
這個類中就定義資料分發的策略。
- 如果是使用者制定了partition,生產就不會呼叫DefaultPartitioner.partition()方法
- 當用戶指定key,使用hash演算法。如果key一直不變,同一個key算出來的hash值是個固定值。如果是固定值,這種hash取模就沒有意義。
Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions
- 當用既沒有指定partition也沒有key。
資料分發策略的時候,可以指定資料發往哪個partition。
當ProducerRecord 的構造引數中有partition的時候,就可以傳送到對應partition上
/** |
如果生產者沒有指定partition,但是傳送訊息中有key,就key的hash值。
/** |
既沒有指定partition,也沒有key的情況下如何傳送資料。
使用輪詢的方式傳送資料。
/** |
6.5、Apache Kafka原理-消費者的負載均衡機制
一個partition只能被一個組中的成員消費。
所以如果消費組中有多於partition數量的消費者,那麼一定會有消費者無法消費資料。
7、瞭解Apache Kafka 監控及運維
7.1、一鍵啟動Kafka
7.2 、UI介面