深入理解kafka
摘自: 《kafka權威指南》
集群間成員關系
Kafka 使用Zoo keeper 來維護集群成員的信息。每個broker 都有一個唯一標識符,這個標識符可以在配置文件裏指定,也可以自動生成。在broker 啟動的時候,它通過創建
臨時節點把自己的ID 註冊到Zookeeper 。Kafka 組件訂閱Zoo keeper 的/brokers/ids 路徑(bro ker 在Zoo keeper 上的註冊路徑),當有broker 加入集群或退出集群時,這些組件就可以獲得通知。
如果你要啟動另一個具有相同ID 的broker ,會得到一個錯誤一一新broker 會試著進行註冊,但不會成功,因為Zoo keeper 裏已經有一個具有相同ID 的broker 。在broker 停機、出現網絡分區或長時間垃圾回收停頓時, broker 會從Zookeeper 上斷開連接,此時broker 在啟動時創建的臨時節點會自動從Zoo keeper 上移除。監聽broker 列表的
Kafka 組件會被告知該broker 已移除。
在關閉broker 時,它對應的節點也會消失,不過它的ID 會繼續存在於其他數據結構中。例如,主題的副本列表(下面會介紹)裏就可能包含這些白。在完全關閉一個broker 之後,如果使用相同的m 啟動另一個全新的broker ,它會立即加入集群,井擁有與舊broker相同的分區和主題。
上面說的kafka啟動之後會向zk註冊id,可以在zk上查看這些註冊的消息!
[zk: localhost:2181(CONNECTED) 3] ls / #顯示當前zk上的消息信息 [isr_change_notification, zookeeper, admin, consumers, config, controller, brokers, controller_epoch] [zk: localhost:2181(CONNECTED) 9] ls /brokers/ids #顯示註冊的brkers的id。 [3, 2, 1]
#可以嘗試著停掉一個對應kafka,對應的broker id就會自動從這裏刪除。
控制器
控制器是kafka集群中蠻重要的一個組件,下面我們會說明broker如何成為控制器的。
推薦一篇控制器的博文:https://blog.csdn.net/u013256816/article/details/80865540
控制器其實就是一個broker,只不過它除了具有一般broker 的功能之外,還負責分區首領的選舉。集群裏第一個啟動的broker通過在zookeeper裏創建一個臨時節點/controller讓自己成為控制器。其他broker 在啟動時也會嘗試創建這個節點,不過它們會收到一個“節點已存在”的異常,然後“意識”到控制器節點已存在,也就是說集群裏已經有一個控制器了。其他broker在控制器節點上創建Zookeeper watch對象,這樣它們就可以收到這個節點的變更通知。這種方式可以確保集群裏一次只有一個控制器存在。
如果控制器被關閉或者與Zookeeper 斷開連接,Zookeeper上的臨時節點就會消失。集群裏的其他broker 通過watch 對象得到控制器節點消失的通知,它們會嘗試讓自己成為新的控制器。第一個在Zookeeper 裏成功創建控制器節點的broker 就會成為新的控制器,其他節點會收到“節點已存在”的異常,然後在新的控制器節點上再次創建watch 對象。每個新選出的控制器通過Zookeeper 的條件遞增操作獲得一個全新的、數值更大的controller epoch。其他broker 在知道當前controller epoch後,如果收到由控制器發出的包含較舊epoch 的消息,就會忽略它們。
當控制器發現一個broker已經離開集群(通過觀察相關的Zookeeper 路徑),它就知道,那些失去首領的分區需要一個新首領(這些分區的首領剛好是在這個broker 上)。控制器遍歷這些分區,並確定誰應該成為新首領(簡單來說就是分區副本列表裏的下一個副本),然後向所有包含新首領或現有跟隨者的broker 發送請求。該請求消息包含了誰是新首領以及誰是分區跟隨者的信息。隨後,新首領開始處理來自生產者和消費者的請求,而跟隨者
開始從新首領那裏復制消息。
當控制器發現一個broker加入集群時,它會使用broker來檢查新加入的broker 是否包含現有分區的副本。如果有,控制器就把變更通知發送給新加入的broker 和其他broker,
新broker 上的副本開始從首領那裏復制消息。簡而言之, Kafka 使用Zookeeper的臨時節點來選舉控制器, 並在節點加入集群或退出集群時通知控制器。控制器負責在節點加入或離開集群時進行分區首領選舉。控制器使用epoch 來避免“腦裂” 。“腦裂”是指兩個節點同時認為自己是當前的控制器。
上面的過程說明了broker怎麽選舉為controller,以及當前的控制器宕之後,新的controller怎麽選舉過程!在zk上查看控制器的相關信息,如下!
[zk: localhost:2181(CONNECTED) 10] get /controller {"version":1,"brokerid":2,"timestamp":"1545987445714"} #顯示了當前那個broker是首領 cZxid = 0x600000142 ctime = Fri Dec 28 16:57:25 CST 2018 mZxid = 0x600000142 mtime = Fri Dec 28 16:57:25 CST 2018 pZxid = 0x600000142 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x367ed5d87210001 dataLength = 54 numChildren = 0 [zk: localhost:2181(CONNECTED) 13] get /controller_epoch 27 #這個反應了首領的變更次數 cZxid = 0x200000017 ctime = Thu Dec 20 14:38:04 CST 2018 mZxid = 0x600000143 mtime = Fri Dec 28 16:57:25 CST 2018 pZxid = 0x200000017 cversion = 0 dataVersion = 26 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 2 numChildren = 0
kafka復制
復制功能是Kafka 架構的核心。在Kafka 的文檔裏, Kafka 把自己描述成“ 一個分布式的、可分區的、可復制的提交日誌服務”。復制之所以這麽關鍵,是因為它可以在個別節點失效時仍能保證Kafka 的可用性和持久性。
Kafka 使用主題來組織數據,每個主題被分為若幹個分區,每個分區有多個副本。那些副本被保存在broker 上,每個broker 可以保存成百上千個屬於不同主題和分區的副本。
副本有以下兩種類型。
- 首領副本
每個分區都有一個首領副本。為了保證一致性,所有生產者請求和消費者請求都會經過這個副本。
- 跟隨者副本
首領以外的副本都是跟隨者副本。跟隨者副本不處理來自客戶端的請求,它們唯一的任務就是從首領那裏復制消息,保持與首領一致的狀態。如果首領發生崩漬,其中的一個
跟隨者會被提升為新首領。
首領的另一個任務是搞清楚哪個跟隨者的狀態與自己是一致的。跟隨者為了保持與首領的狀態一致、在有新消息到達時嘗試從首領那裏復制消息,不過有各種原因會導致同步失敗。例如,網絡擁塞導致復制變慢, broker 發生崩橫導致復制滯後,直到重啟brok er 後復制才會繼續。為了與首領保持同步,跟隨者向首領發送獲取數據的請求,這種請求與悄費者為了讀取悄息而發送的請求是一樣的。首領將響應消息發給跟隨者。請求消息裏包含了跟隨者想要獲取消息的偏移量,而且這些偏移量總是有序的。
通過查看每個跟隨者請求的最新偏移量,首領就會知道每個跟隨者復制的進度。如果跟隨者在10 s 內沒有請求任何消息,或者雖然在請求消息,但在10s 內沒有請求最新的數據,那麽它就會被認為是不同步的。如果一個副本無陸與首領保持一致,在首領發生失效時,它就不可能成為新首領一一畢竟它沒有包含全部的消息。相反,持續請求得到的最新悄息副本被稱為同步的副本。在首領發生失效時,只有同步副本才有可能被選為新首領。
除了當前首領之外,每個分區都有一個首選首領創建主題時選定的首領就是分區的首選首領。之所以把它叫作首選首領,是因為在創建分區時,需要在b roker 之間均衡首領
(後面會介紹在broker 間分布副本和首領的算怯)。因此,我們希望首選首領在成為真正的首領時, broker 間的負載最終會得到均衡。默認情況下,Kafka auto.leader.rebalance被設為true ,它會檢查首選首領是不是當前首領, 如果不是,並且該副本是同步的,那麽就會觸發首領選舉,讓首選首領成為當前首領。
kafka處理請求
broker 的大部分工作是處理客戶端、分區副本和控制器發送給分區首領的請求。Kafka 提供了一個二進制協議(基於TCP ),指定了請求消息的格式以及broker 如何對請求作出響應一一包括成功處理請求或在處理請求過程中遇到錯誤。客戶端發起連接並發送請求,broker 處理請求井作出響應。broker 按照請求到達的順序來處理它們一一這種順序保證讓Kafka 具有了消息隊列的特性,同時保證保存的消息也是有序的。
broker 會在它所監聽的每一個端口上運行一個acceptor線程,這個錢程會創建一個連接,並把它交給processor線程去處理。processor線程(也被叫作“網絡線程”)的數量是可配置的。網絡線程負責從客戶端獲取請求悄息,把它們放進請求隊列,然後從響應隊列獲取響應消息,把它們發送給客戶端。請求消息被放到請求隊列後,IO 線程會負責處理它們。
生產請求和獲取請求都必須發送給分區的首領副本。如果broker 收到一個針對特定分區的請求,而該分區的首領在另一個broker 上,那麽發送請求的客戶端會收到一個“非分區首領”的錯誤響應。當針對特定分區的獲取請求被發送到一個不含有該分區首領的broker上,也會出現同樣的錯誤。Kafka 客戶端要自己負責把生產請求和獲取請求發送到正確的broker 上。
那麽客戶端怎麽知道該往哪裏發送請求呢?客戶端使用了另一種請求類型,也就是元數據請求。這種請求包含了客戶端感興趣的主題列表。服務器端的響應消息裏指明了這些主題所包含的分區、每個分區都有哪些副本, 以及哪個副本是首領。元數據請求可以發送給任意一個broker ,因為所有broker 都緩存了這些信息。
一般情況下,客戶端會把這些信息緩存起來,並直接往目標broker 上發送生產請求和獲取請求。它們需要時不時地通過發送元數據請求來刷新這些信息(刷新的時間間隔通
過meta.max.age.ms參數來配置),從而知道元數據是否發生了變更一一比如,在新broker 加入集群時,部分副本會被移動到新的broker 上。另外,如果客戶端收到“非首領”錯誤,它會在嘗試重發請求之前先刷新元數據,因為這個錯誤說明了客戶端正在使用過期的元數據信息,之前的請求被發到了錯誤的broker 上。
生產者的請求:生產者的請求與acks參數配置的數值有關,這個參數指定了需要經過多個broker確定才算消息寫入成功。
之後,悄息被寫入本地磁盤。在Linux 系統上,消息會被寫到文件系統緩存裏,並不保證它們何時會被刷新到磁盤上。Kafl< a 不會一直等待數據被寫到磁盤上一一它依賴復制功能來保證消息的持久性。
在消息被寫入分區的首領之後, broker 開始檢查acks 配置參數一一如果acks 被設為0 或1,那麽broker 立即返回響應;如果ac k s 被設為all ,那麽請求會被保存在一個叫作煉獄的緩沖區裏,直到首領發現所有跟隨者副本都復制了消息,晌應才會被返回給客戶端。
消費者請求:
請求需要先到達指定的分區首領上,然後客戶端通過查詢元數據來確保請求的路由是正確的。首領在收到請求時,它會先檢查請求是否有效,然後broker 將按照客戶端指定的數量上限從分區裏讀取消息,再把消息返回給客戶端。Kafka 使用零復制技術向客戶端發送消息一一也就是說, Kafka 直接把消息從文件(或者更確切地說是Linux文件系統緩存)裏發送到網絡通道,而不需要經過任何中間緩沖區。這是Kafka 與其他大部分數據庫系統不一樣的地方,其他數據庫在將數據發送給客戶端之前會先把它們保存在本地緩存裏。這項技術避免了字節復制,也不需要管理內存緩沖區,從而獲得更好的性能。
客戶端可以是設置broker最多可以從一個分區返回多少數據,這個限制非常重要,因為客戶端需要為broker返回的數據分配足夠的內存。如果沒有這個限制,broker返回的大量數據可能耗盡客戶端內存。也可以設置返回數據的下限。這樣在主題消息量不是很大的情況下,這樣可以減少CPU和網絡開銷。客戶端發送一個請求,broker等到有足夠的數據時才把它們返回給客戶端,然後客戶端再發出請求,而不是讓客戶端每隔幾毫秒就發送一個請求。
當然,我們不會讓客戶端一直等待broker 累積數據。在等待了一段時間之後,就可以把可用的數據拿回處理,而不是一直等待下去。所以,客戶端可以定義一個超時時間,告
訴broker :“如果你無告在X 毫秒內累積滿足要求的數據量,那麽就把當前這些數據返回給我。"
有意思的是,並不是所有保存在分區首領上的數據都可以被客戶端讀取。大部分客戶端只能讀取已經被寫入所有同步副本的悄息(跟隨者副本也不行,盡管它們也是消費者否
則復制功能就無陸工作)。分區首領知道每個消息會被復制到哪個副本上,在消息還沒有被寫入所有同步副本之前,是不會發送給消費者的一一嘗試獲取這些消息的請求會得到空的響應而不是錯誤。
因為還沒有被足夠多副本復制的消息被認為是“不安全”的一一如果首領發生崩憤,另一個副本成為新首領,那麽這些消息就丟失了。如果我們允許消費者讀取這些消息,可能就會破壞一致性。試想, 一個悄費者讀取並處理了這樣的一個消息,而另一個消費者發現這個消息其實並不存在。所以,我們會等到所有同步副本復制了這些消息,才允許消費者讀取它們 。這也意味著,如果broker 間的消息復制因為某些原因變慢,那麽消息到達消費者的時間也會隨之變長(因為我們會先等待消息復制完畢)。延遲時間可以通過參數replica.lag.time.max.ms來配置,它指定了副本在復制消息時可被允許的最大延遲時間。
kafka的物理存儲
暫無
深入理解kafka