1. 程式人生 > 其它 >zookeeper和訊息佇列kafka

zookeeper和訊息佇列kafka

一、Zookeeper是什麼?

1Zookeeper服務叢集的條件

2Zookeeper工作機制

3Zookeeper資料結構

4Zookeper特點

5Zookeeper選舉機制

6Zookeeper應用場景

二、Zookeeper叢集部署

1、環境部署

2、安裝Zookeeper服務

3 設定myid號以及啟動指令碼

三、什麼是kafka

四、kafka的場景應用kafka的模式

五、kafka系統基礎架構

六、資料儲存到kafka中的流程

七、Kafka原則

一、Zookeeper是什麼?

ZooKeeper是一種分散式應用所涉及的高可用、高效能且一致性的開源協調服務,

它提供了一項基本服務:分散式鎖服務。分散式應用可以基於它實現更高階的服務,實現諸如同步服務、配置維護和叢集管理或者命名的服務。

1Zookeeper服務叢集的條件

Zookeeper服務自身組成叢集,2n+1個(奇數)主機。

在叢集中,允許n個主機宕機,只要叢集中有一半以上的機器可用,zookeeper叢集就可用。

#比如:
1、假如zookeeper為3臺機器組成的叢集,那麼就可以允許失效一臺,如果失效了2臺,就會導致zookeeper叢集不可用。
2、所以在搭建zookeeper叢集時,主機數需要為奇數。
3、奇數的目的:為了提高容錯能允許多損失一臺。

2Zookeeper工作機制

Zookeeper從設計模式角度來理解

是一個基於觀察者模式設計的分散式服務管理框架,它負責儲存和管理大家都關心的資料,然後接受觀察者的註冊,一旦這些資料的狀態發生變化,Zookeeper就將負責通知已在Zookeeper上註冊的那些觀察者做出相應的反應。

也就是說Zookeeper=檔案系統+通知機制。

檔案系統: 就是將儲存的資料通過zookeeper的檔案系統進行儲存到各個節點上。

通知機制: 當某個節點出現故障,zookeeper會將資訊通知到客戶端上。

總結:每個節點伺服器都會在zookeeper中進行註冊登記,client也獲取當前線上伺服器的列表,也會在zookeeper上進行註冊登記,

clientzookeeper叢集上儲存的資料。都會通過檔案系統分散式儲存到各個叢集節點中,當叢集中某個節點出現故障,zookeeper也會通知到client客戶端。

3Zookeeper資料結構

Zookeeper資料模型的結構與linux檔案系統很類似,整體上可以看作時一棵樹,每個節點乘坐一個Znode

每個znode預設能儲存1mb的資料,每個znode都可以通過其路徑唯一標識。

 4 zookeeper資料特點

Zookeeper: 一個領導者(Leader),多個跟隨者(Follower)組成的叢集

Zookeeper叢集中只有半數以上節點儲存,Zookeeper叢集就能正常服務,所以Zookeeper適合安裝奇數臺伺服器。

全域性資料一致性: 每個server儲存一份相同的資料副本,client無論連線到哪個server,資料都是一致的。

更新請求順序執行: 來自同一個client的更新請求按其傳送順序依次執行,即先進先出。

資料更新原子性: 一次資料更新,要麼成功,要麼失敗。

實時性: 在一定範圍內,client能讀到最新資料

總結:zookeeper叢集中有一個lleader和多個follower,且zookeeper叢集具有資料的一致性、原子性、實時性。且資料更新時按照發送順序進行更新。

5Zookeeper選舉機制

5.1 第一次啟動選舉機制

1、伺服器1啟動,發起一次選舉。伺服器1投自己一票:

此時伺服器1的票數為1票,不夠半數以上(5臺節點,半數2.5),選舉無法完成,伺服器1狀態保持為LOOKING.

2、伺服器2啟動,再發起一次選舉,伺服器12分別投自己1票,並交換選票資訊:

此時伺服器1發現伺服器2myid比自己投票的伺服器myid要大,所以將票投給伺服器

此時伺服器10票,伺服器22票,不夠半數,選舉無法完成,伺服器12都保持looking狀態。

3、伺服器3啟動,發起一次選舉:

伺服器12發現伺服器3myid最大,就將選票資訊投給伺服器3.

此時伺服器12都有0票。伺服器33票,超過半數,伺服器3稱為leader,伺服器12更換狀態為following

4、伺服器4啟動,發起一次選舉,此時伺服器123已不是looking狀態,不會更改選票資訊:

此時伺服器33票,伺服器41票,此時伺服器服從多數,更改修選票資訊為伺服器3,並更改狀態為following

5、伺服器5啟動,情況和4一樣,狀態為following

5.2 非第一次啟動選舉機制

1、當zookeerper叢集中一臺伺服器出現一下兩種請款之一時,就會開始進行leader選舉。

伺服器初始化啟動(第一次啟動選舉機制)

伺服器執行期間無法和leader保持連線(不知道leader是否已經產生了,或者leader宕機)

2、當一臺機器進入leader選舉流程時,當前叢集也可能處於一下兩種狀態。

①叢集中本來就已經儲存一個leader

對於已存在leader的情況,機器試圖去選舉時,被會告知當前伺服器的leader資訊,對於該機器來說,僅僅需要和leader機器建立連線,並進行狀態同步即可。

②叢集中leader宕機了。

假設zookeeper5臺伺服器組成,SID分別為12345ZXID分別為:8887.並且此時SID3的伺服器時leader

某一時刻,當35都出現故障時,因此又重新開始選舉。

#選舉leader的規則
1、EPOCH大的直接勝出。
2、EPOCH相同,事務ID(ZXID)大的勝出.
3、ZXID相同,伺服器ID大的勝出

----------------名詞解釋-----------------------
#1、SID:
伺服器ID,用來標識一臺Zookeeper叢集中的機器,每臺機器不能重複,和myid一致。

#2、ZXID
事務ID,ZXID是一個事務ID,用來標識一次伺服器狀態的變更,在某一時刻,叢集中的每臺機器的ZXID值不一定完全一致,這和Zookeeper伺服器對於客戶端“更新請求”的處理邏輯速度有關。

#2、Epoch
每個leader任期的代號,沒有leader時選舉方式跟第一次啟動方法相同,每投完一次一票,這個資料就會增加。

5.3 總結

第一次啟動選舉機制:

主要需要看啟動順序,再看他的myid,只要選舉票數超過半數,就會選舉出一個leader。新加的機器都會指向這個leader

非第一次啟動選舉機制

假如存在leader,新加入的機器會獲取到leader的資訊,然後進行連線。

假如沒有leader,會先比較Epoch(任期數),再比較ZXID(事務ID),再比較SID(服務id)。

6Zookeeper應用場景

提供的服務包括:統一命令服務,統一配置管理,統一叢集管理,服務節點動態上下線,軟負載均衡等,

①統一命令服務:

在分散式環境下,經常需要對應用/服務進行統一命令,便於識別,例如:IP容易記住,而域名容易記住。

②統一配置管理:

分散式環境下,配置檔案同步非常常見,一般要求一個叢集中,所有節點的配置資訊是一致的,比如kafka叢集,對配置檔案修改後,希望能快速同步到各個節點上。

配置管理可交由Zookeeper實現,可將配置資訊寫入Zookeeper上的zonde,各個客戶端伺服器監聽這個znode,一旦znode中的資料被修改,zookeeper將通知各個客戶端伺服器。

③統一叢集管理:

分散式環境中,時實掌握每個節點的狀態是必要的,可根據節點時實狀態製作出一些調整,

zookeeper可以實現時實監控節點狀態變化,可將節點資訊寫入zookeeper上的Znode。監聽這個Znode可以獲取它的時實狀態變化。

④服務動態上下線:

客戶端能時實洞察到伺服器上下線的變化。(是否宕機)

⑤軟負載均衡:

Zookeeper中記錄每臺伺服器的訪問數,讓訪問數最少的伺服器去處理最新的客戶端請求。

二、Zookeeper叢集部署

1、環境部署

1.1 環境架構

主機名   ip地址              安裝軟體            系統版本
node1 192.168.111.19 apache-zookeeper-3.5.7-bin.tar.gz centos7.5 node2 192.168.111.20 apache-zookeeper-3.5.7-bin.tar.gz centos7.5 node2 192.168.111.21 apache-zookeeper-3.5.7-bin.tar.gz centos7.5

2安裝zookeeper服務

三臺全部關閉防火牆

#三臺主機都需要執行,以node1演示

systemctl stop firewalld
systemctl disable firewalld
setenforce 0

hostnamectl set-hostname node{1,2,3}

安裝環境

yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
#這裡yum安裝JDK環境。方便。

java -version

#獲取軟體包
cd /opt
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz
#或者直接將軟體包上傳到/opt目錄下。

tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin /usr/local/zookeeper-3.5.7

 修改配置檔案

cd /usr/local/zookeeper-3.5.7/conf
cp zoo_sample.cfg zoo.cfg

vim zoo.cfg
tickTime=2000     
#通訊心跳時間,Zookeeper伺服器與客戶端心跳時間,單位毫秒
initLimit=10      
#Leader和Follower初始連線時能容忍的最多心跳數( tickTime的數量),這裡表示為10*2s 
syncLimit=5     
#Leader和Follower之間同步通訊的超時時間,這裡表示如果超過5*2s,Leader認為Follwer死掉,並從伺服器列表中刪除Follwer
dataDir=/usr/local/zookeeper-3.5.7/data       
#●修改,指定儲存Zookeeper中的資料的目錄,目錄需要單獨建立
dataLogDir=/usr/local/zookeeper-3.5.7/1ogs    
#●新增,指定存放日誌的目錄,目錄需要單獨建立
clientPort=2181      #客戶端連線埠

#新增叢集資訊
server.1=192.168.111.19:3188:3288
server.2=192.168.111.20:3188:3288
server.3=192.168.111.21:3188:3288

mkdir /usr/local/zookeeper-3.5.7/data
mkdir /usr/local/zookeeper-3.5.7/logs

3 設定myid號以及啟動指令碼

到這裡就不要設定同步了,下面的操作,做好一臺機器一臺機器的配置。

echo 1 >/usr/local/zookeeper-3.5.7/data/myid
# node1上配置

echo 2 >/usr/local/zookeeper-3.5.7/data/myid
#node2上配置

echo 3 >/usr/local/zookeeper-3.5.7/data/myid
#node3上配置

#//配置啟動指令碼,指令碼在開啟啟動執行的目錄中建立
vim /etc/init.d/zookeeper
#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/usr/local/zookeeper-3.5.7'
case $1 in
start)
    echo "----------zookeeper啟動----------"
    $ZK_HOME/bin/zkServer.sh start
;;
stop)
    echo "---------- zookeeper停止-----------"
    $ZK_HOME/bin/zkServer.sh stop
;;
restart)
    echo "---------- zookeeper 重啟------------"
    $ZK_HOME/bin/zkServer.sh restart
;;
status)
    echo "---------- zookeeper 狀態------------"
    $ZK_HOME/bin/zkServer.sh status
;;
*)
    echo "Usage: $0 {start|stop|restart|status}"
esac

chmod +x /etc/init.d/zookeeper
chkconfig --add zookeeper

service zookeeper start

service zookeeper status

192.168.111.21

192.168.111.19

192.168.111.20

 啟動node1

 啟動node2

 啟動node3

 三、什麼是kafka?

kafka是由Linkedin公司開發,是一個分散式,支援分割槽(partition)、多副本的(repilca),基於zookeeper協調的分散式訊息系統。

它的最大的特性就是可以實時的處理大量資料以滿足各種需求場景 ,比如:基於hadoop(分散式)的批處理系統,低延遲的實時系統,storm/Spark流式處理引擎,web/nginx日誌、訪問日誌,訊息服務等等,用scala語言編寫,Linkedin於2010年貢獻給了Apache基金會併成為頂級開源 專案。

kafka時一個分散式流式計算平臺,但常用於訊息系統使用。他是一個分散式訊息佇列

kafka的特性

高吞吐量、低延遲 : kafka每秒可以處理幾十萬條訊息,它的延遲最低只有幾毫秒。
可擴充套件性(分散式): kafka叢集支援熱擴充套件
永續性、可靠性: 訊息被持久化到本地磁碟,並且支援資料備份防止資料丟失
容錯性: 允許叢集中節點失敗
高併發: 支援數千個客戶端同時讀寫。
1、使用kafka訊息佇列的好處
解耦

允許你獨立的擴充套件或修改兩邊的處理過程,只要確保他們遵守同樣的介面約束。
可恢復性

系統的一部分元件失效時,不會影響到整個系統,訊息佇列降低了程序間的耦合度,所以即使一個處理訊息的程序掛掉,加入佇列中的訊息仍然可以在系統恢復後被處理。
緩衝

有助於控制和優化資料流經過系統的速度,解決生產訊息和消費訊息的處理速度不一致的情況。
靈活性和峰值處理能力

訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見。如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用訊息佇列能夠使關鍵元件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。
非同步通訊

很多時候,使用者不想也不需要立即處理訊息。訊息佇列提供了非同步處理機制,允許使用者把一個訊息放入佇列,但並不立即處理它。想向佇列中放入多少訊息就放多少,然後在需要的時候再去處理它們。
四、kafka的場景應用和模式
日誌收集: 一個公司可以用kafka手機各種服務的日誌,通過kafka以統一介面服務的方式開放給各種consumer(消費者)。

訊息系統: 解耦和生產者和消費者、快取訊息等。

使用者活動跟蹤: kafka經常被用來記錄web使用者或者app使用者的各種活動,如瀏覽網頁、搜尋、點選等活動,這些活動資訊被各個伺服器釋出到kafka的topic中,然後訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到hadoop、資料倉庫中做離線分析和挖掘。

運營指標:Kafka也經常用來記錄運營監控資料。包括收集各種分散式應用的資料,生產各種操作的集中反饋,比如報警和報告。

流式處理

事件源

kafka的模式
kafka的模式不遵守JMS規範(訊息佇列的規範)。

1、點對點訊息傳遞模式
(一對一,消費者主動拉取資料,訊息收到後訊息清除)

在點對點訊息系統中,訊息持久化到一個佇列中。此時,將有一個或多個消費者消費佇列中的資料。

但是一條訊息只能被消費一次。當一個消費者消費了佇列中的某條資料之後,該條資料則從訊息佇列中刪除。

該模式即使有多個消費者同時消費資料,也能保證資料處理的順序。

2、釋出訂閱訊息傳遞模式
在釋出 – 訂閱訊息系統中,訊息被持久化到一個 topic 中。
與點對點訊息系統不同的是,消費者可以訂閱一個或多個 topic,消費者可以消費該 topic 中所有的資料,同一條資料可以被多個消費者消費,資料被消費後不會立馬刪除。

五 架構

1Producer(生產者)訊息的生產者,是訊息的入口

2Broker(例項)一臺kafka伺服器就是一個broker,一個叢集由多個broker組成。一個broker可以容納多個topic(主題)

3Topic(主題)訊息的主題,可以理解成訊息的分類,kafka獲取到的資料就是按照不同的型別儲存在不同的topic主題中。

topic主題中有很多的分割槽。

4Partition(分割槽)

Topic的分割槽,每個Topic可以有很多個分割槽,分割槽作用是用來做負載均衡,提高kafka的吞吐量。

同一個Topic在不同的分割槽的資料是不重複的,分割槽的表現形式就是一個個資料夾。

分割槽的原因: ①方便在叢集中擴充套件,②實現負載均衡的效果。③提高kafka的吞吐量。④實現高併發的效果。

5Repica(副本):每一個分割槽都有多個副本,副本的作用就是備份資料。一個Topic的每個分割槽都有若干個副本,一個leader和若干個follower

當主分割槽故障後,副本進行頂替它的位置。

6leader(領導者):每個分割槽有多個副本,其中有且僅有一個作為leaderleader是當前負責資料讀寫的分割槽。

7follower(追隨者)follower跟隨leader,所有請求都通過leader路由,資料變更會廣播給所有followerfollowerleader保持資料同步,follower只負責備份,不負責資料的讀寫。

如果leader故障,則從follower中選舉出一個新的leader

follower掛掉,卡主或者同步太慢,leader會把這個follower從叢集列表中刪除,重新建立一個follower

#例項和主題和分割槽和副本的理解。
1、例項(broker)就是一臺伺服器,裝好kafka的伺服器。多個例項組成一個kafka叢集
2、主題(topic)是儲存的型別,將不同生產者生成的資料按照型別儲存。理解成一個邏輯概念,下面有很多的分割槽(partition),分割槽才是用來儲存具體資料的,分割槽也會稱為leader。每一個分割槽都會進行生成一個或多個副本(Repica),用來備份分割槽的資料,也會被稱為follower。

一個Topic會產生多個分割槽Partition,分割槽中分為Leader和Follower,訊息一般傳送到Leader,Follower通過資料的同步與Leader保持同步,消費的話也是在Leader中發生消費,如果多個消費者,則分別消費Leader和各個Follower中的訊息,當Leader發生故障的時候,某個Follower會成為主節點,此時會對齊訊息的偏移量。

8Message(訊息):訊息的實體

9Consumer:消費者,訊息的出口

10Consumer Group

多個消費者組成一個消費者組,在kafka的設計中,同一個分割槽的資料只能被訊息者組中的某一個消費者消費,同一個的消費組的消費者可以消費同一個主題中的不同分割槽的資料。

11offset偏移量:

可以唯一的標識一條訊息

偏移量決定讀取資料的位置,不會有執行緒安全的問題,消費者通過偏移量來決定下次讀取的訊息(即消費位置)。

消費被消費之後,並不會被刪除,這樣多個業務就可以重複使用kafka的訊息。

某一個業務也可以通過修改偏移量達到重新讀取訊息的目的,偏移量由使用者控制。

訊息最終還是會被刪除,預設生命週期為1周(168小時)。

12zookeeperkafka叢集依賴zookeeper來儲存meta(變化)資訊。

#offset和zookeeper理解

由於consumer在消費過程中可能會出現斷電宕機等故障,consumer恢復後,需要從故障前的位置繼續消費,所以consumer需要時實記錄自己消費到了哪個offset,以便恢復後繼續消費。

kafka 在0.9 版本之前,consumer預設將offset儲存在zookeeper中。
從0.9版本後,consumer預設將offset儲存在kafka一個內建的topic中,該topic為_consumer_offset.

也就是說,zookeeper的作用就是,生產者輸出資料到kafka叢集,就必須要找到kafka叢集的節點在哪裡,這些都是通過zookeeper去尋找的,消費者消費到哪一條資料,也需要zookeeper的支援,從zookeeper獲得offset,offset記錄上一次消費的資料消費到哪裡,這樣就可以接著下一跳資料進行消費。

六 資料儲存到kafka中的流程

1、生產者先獲取分割槽中的leader

2Producter將訊息傳送給leader

3Leader將訊息寫入本機檔案

4Followerleader同步訊息(follower主動去leader進行同步)

5Follower將訊息寫入本地後向leader傳送ACK確認訊息。

6leader收到所有副本的ACK後,向生產者傳送ACK確認訊息。

1、分割槽的原因

便於在叢集中擴充套件,每個Partition可以通過調整以適應它所在的機器,而一個topic又可以有多個Partition組成,因此整個叢集就可以適應任意大小的資料了。

可以提高併發,因為可以以分割槽為單位讀寫。

2、分割槽的目的

生產者採用push模式將資料釋出到broker,每條訊息追加到分割槽中,順序寫入磁碟,所以保證同一分割槽內的資料時有序的。

資料會寫入到不同的分割槽。為什麼進行分割槽?

**1、方便擴充套件:**因為一個topic可以有多個partition,所以我們可以通過擴充套件機器去輕鬆的應對日益增長的資料。

2、提高併發:以分割槽為單位進行讀寫資料,提高消費的處理效率。

七、Kafka原則

類似於負載均衡,當我們向某個伺服器傳送請求的時候,服務端可能會對請求做一個負載,將流量分發到不同的伺服器,那在kafka中,如果某個topic有多個partitionproducer又怎麼知道該將資料發往哪個partition呢?kafka中有幾個原則:

prtition在寫入的時候可以指定需要寫入的partition,如果有指定,則寫入對應的partition

如果沒有指定partition,但是設定了資料的key,則會根據key的值hash出一個partition

如果既沒指定partition,又沒有設定key,則會輪詢選出一個partition

保證訊息不丟失是一個訊息佇列中介軟體的基本保證,那producer在向kafka寫入訊息的時候,怎麼保證訊息不丟失呢?其實上面的寫入流程圖中有描述出來,那就是通過ACK應答機制!在生產者向佇列寫入資料的時候可以設定引數來確定是否確認kafka接收到資料,這個引數可設定的值為01all

10代表producer往叢集傳送資料不需要等到叢集的返回,不確保訊息傳送成功。安全性最低但是效率最高。

21代表producer往叢集傳送資料只要leader應答就可以傳送下一條,只確保leader傳送成功。

3all代表producer往叢集傳送資料需要所有的follower都完成從leader的同步才會傳送下一條,確保leader傳送成功和所有的副本都完成備份。安全性最高,但是效率最低。

效率最高。

21代表producer往叢集傳送資料只要leader應答就可以傳送下一條,只確保leader傳送成功。

3all代表producer往叢集傳送資料需要所有的follower都完成從leader的同步才會傳送下一條,確保leader傳送成功和所有的副本都完成備份。安全性最高,但是效率最低。