從Kafka的一次broker假死介紹Kafka架構和DefaultPartitioner
最近公司的kafka叢集出現了節點已經失效但是節點程序和埠都還在的情況,目前我們的系統監控只是做到了程序監控,即為整個叢集的每臺機群建立程序快照,如果程序(如NameNode
、kakfa broker
)丟失,則報警並立刻自動重啟程序。但是這次的kafka事故程序和埠都還在,因此報警系統沒有能夠及時報警,因此對此次事故發生的過程和解決方式做詳細的分析。 首先,我們一個同學使用kafka的過程中發現訊息無法消費,因此進入進群進行如下檢查: 程序和埠:我們的kafka的3個broker
,程序和埠都在,正常使用kakfa-console-producer
進行訊息的生產,丟擲異常 使用kakfa-console-consumer
kafka-topics --describe
進行topic的詳細情況的分析,發現,partition 和 Isr(In-Sync Replication
)竟然只剩下一臺機器 我們知道,kafka在建立topic的時候會指定partition數量和replication數量,對於每一個partition,都會有一個broker作為leader broker,剩餘的broker作為slave broker。我們猜想在我們的程式碼中生產的訊息應該已經丟失。因此進行驗證。在緊急重啟了假死的兩臺broker以後,我們開始對訊息丟失情況進行驗證,令人驚訝的是,沒有發生訊息丟失。但是,為了以防萬一,無論訊息是否丟失,我們都必須找到足夠的證據。我們的topic屬性是2個partition、2個replication組成,當我們發現從這個topic消費訊息發生異常的時候,我們列印了這個topic的描述資訊:
[[email protected]-10-120-241-50 kafka]$ bin/kafka-topics.sh --describe --topic wuchang1 --zookeeper 10.120.241.50:2181
Topic:wuchang1 PartitionCount:2 ReplicationFactor:2 Configs:
Topic: wuchang1 Partition: 0 Leader: 110 Replicas: 110,50 Isr: 110
Topic: wuchang1 Partition: 1 Leader: -1 Replicas: 50,82 Isr:
這個資訊其實是我們在測試環境復現出來的現場。我覺得,一個資深的軟體工程師,非常注重對事故現場的復現,因為只有成功地復現問題,才能根本地解決問題。在正常情況下,3個broker工作正常,它的描述資訊是這樣的:
[[email protected]-10-120-241-50 kafka]$ bin/kafka-topics.sh --describe --topic wuchang1 --zookeeper 10.120.241.50:2181
Topic:wuchang1 PartitionCount:2 ReplicationFactor:2 Configs:
Topic: wuchang1 Partition: 0 Leader: 110 Replicas: 110,50 Isr: 110,50
Topic: wuchang1 Partition: 1 Leader: 50 Replicas: 50,82 Isr: 50,82
為了讓我們的kafka cluster能夠容忍部分機器宕機,我們的生產環境和測試環境打開了leader 自動選舉:auto.leader.rebalance.enable=true
這樣,當任何一個TopicPartition
的leader丟失,Controller會啟動一個監控執行緒監控所有partition的Leader狀態,如果發現某個Topic-Partition的leader丟失,則該執行緒會為該Leader啟動重新選舉,程式碼在KafkaController.scala
中:
def onControllerFailover() {
//省略
if (config.autoLeaderRebalanceEnable) {
info("starting the partition rebalance scheduler")
autoRebalanceScheduler.startup()
autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
}
deleteTopicManager.start()
}
else
info("Controller has been shut down, aborting startup/failover")
}
方法checkAndTriggerPartitionRebalance()
就是完成對所有Topic-Partition
的leader檢查,這個作為引數被定時呼叫,注意,Controller
與Leader不同,Leader是針對某個Topic-Partition
而言,而Controller
是整個叢集的Controller
。
為了能夠在自動leader選舉開啟的情況下讓某個Topic-Partition
失去leader,我們將這個topic的partition 1
對應的兩個replication 全部kill,這樣,即使自動leader檢查開啟,由於partition-1 已經不存在任何一個活著的replication,因此無從選舉出一個leader,此時,這個partition已經不再工作,partition-0
也只有僅剩的一個broker
來作為leader。
我們當時在測試環境復現問題的時候,在自動leader選舉開啟的情況下,只要某個partition的replication中有一個還活著(即ISR中還有任何一個broker),這個broker就會被自動選舉為leader。這就是Kafka高可用性的一個體現。只有當一個topic的全部replication全部丟失,這個kafka的這個Topic-Partitioin才會變為不可用狀態。
反過來看,如果我們的topic的replication-factor
設定為2,那麼,在自動leader rebalance
開啟的情況下,任何兩臺broker丟失,都不會對任何partition造成影響,除非這個Topic-Partition
的三個replication全部掛掉。
現在現場已經復現,我們就來驗證這種情況下訊息是否丟失。
我們線上環境生產訊息的程式碼來源於nginx lua外掛,用來將nginx收的的使用者訪問資訊傳送到kafka:
topic = args["pbtype"]
-- topic = "LivyRoomMsg"
if (topic == "LivyRoomMsg") then
msgJson = Convert_GjsWebLiveRoomWechatUserLogin(msgJson,args["messageSentTime"])
end
local ok, err = bp:send(topic, nil, msgJson)
從程式碼片段中我們可以看到,傳送訊息的時候,key為null。我們使用java程式碼,同樣設定key為null,傳送訊息到我們測試環境的現場,的確訊息未丟失,傳送的所有訊息都被打倒到了leader存在的partition上面了。
我們知道,Kafka通過key資訊決定了訊息傳送到哪個broker,我們使用的是預設的Partitioner, Kafka預設的Partitioner是DefaultPartitioner
,核心方法是partition()
:
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) { //從可用的partitioner中選擇一個
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {//從所有叢集中選擇一個
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {// 只要有key , 就按照key去確定
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
/**
* topicCounterMap維護了每個topic的一個計數器,這個計數器用來通過Round-Robin方式選擇一個partition
* @param topic
* @return
*/
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(new Random().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
預設partitioner的訊息分派邏輯是:
- 如果存在key,則通過
Round-Robin
的方式,從該topic的所有partition中選擇一個分割槽,即,無論現在分割槽的狀態如何,一旦key確定,對應的broker就確定了,到5; - 如果key為空,則到3;
- 如果這個topic還存在可用的Partition(還存在leader的partition),則通過
Round-Robin
的方式,以這個topic遞增的隨機數作為種子,從這些可用的partition中選擇一個partition,將訊息傳送到這個partition,否則到4; - 如果這個topic沒有任何一個可用的Partition,則通過
Round-Robin
的方式,以這個topic遞增的隨機數作為種子,從所有partition中選擇一個partition傳送訊息。很顯然,如果選擇的partition不可用,訊息傳送失敗; - 退出
DefaultPartition
使用 topicCounterMap
來維護每個topic用來通過Round-Robin
方式選擇partition的序列號,key是所有topic的名字,value是一個整數計數器,每次進行一次選擇則自增1,保證所有partition被依次使用到。
在我們使用bin/kafka-console-producer.sh
的命令列工具生產訊息的時候,其實最終也是呼叫了。本文中,我並不急於讓大家知道問題原因,而希望逐步撥雲見日,讓大家從程式碼層面循序漸漸,逐步接近問題真想。這樣做,不僅僅能夠找到問題原因,更能夠學到知識,而不僅僅是確認了或者解決了一個問題。
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "[email protected]"
執行kakfa-console-producer.sh
時,預設java程序堆記憶體大小為512M,當然,如果我們可以自行設定和修改。實際執行的,是ConsoleProducer
類,顯然,ConsoleProducer
負責從命令列中讀取我們輸入的訊息,然後生產到Kafka Server。看過$KAFKA_HOME/bin
目錄下面的指令碼程式碼你就會知道,kafka的程式碼重用做得非常好,即使是指令碼,也充分重用。$KAFKA_HOME/bin/kafka-run-class.sh
是一個公共啟動類,無論我們呼叫kakfa-console-producer.sh
、kakfa-console-consumer.sh
、kafka-topics.sh
等等指令碼,都是通過kafka-run-class.sh
執行起來的,只需要告訴kafka-run-class.sh需要啟動的java類以及額外的啟動引數,kafka-run-class.sh
就會執行這個java類,新增上這些額外的啟動引數,以及一些共用的、必須的classpath。 因此,我們繼續來看ConsoleProducer的實現,瞭解這個類的實現機制,直接關係到我們最常用的kafka-console-producer
的命令的行為:
def main(args: Array[String]) {
try {
val config = new ProducerConfig(args)
val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]
reader.init(System.in, getReaderProps(config))
val producer =
if(config.useOldProducer) {
new OldProducer(getOldProducerProps(config))
} else {
//NewShinyProducer只是對org.apache.kafka.clients.producer.KafkaProducer
//進行了簡單封裝,底層還是用org.apache.kafka.clients.producer.KafkaProducer傳送訊息
new NewShinyProducer(getNewProducerProps(config))
}
//省略
var message: ProducerRecord[Array[Byte], Array[Byte]] = null
do {
//reader的預設實現類是LineMessageReader,一行一行讀取使用者在命令列中的輸入
message = reader.readMessage()
if (message != null)
//在沒有特殊指定訊息的key的情況下,key為空
producer.send(message.topic, message.key, message.value)
} while (message != null)
} catch {
//省略
}
Exit.exit(0)
}
LineMessageReader是訊息讀取的實現類,用來讀取我們在命令列中輸入的Kafka訊息:
class LineMessageReader extends MessageReader {
var topic: String = null
var reader: BufferedReader = null
var parseKey = false
var keySeparator = "\t"
var ignoreError = false
var lineNumber = 0
override def init(inputStream: InputStream, props: Properties) {
topic = props.getProperty("topic")
//如果需要指定key,則在kafka-console-producer中增加引數--property "parse.key=true",--property "key.separator=:"
//用來告訴kafka是否使用key以及分割訊息和key的分隔符
if (props.containsKey("parse.key"))
parseKey = props.getProperty("parse.key").trim.equalsIgnoreCase("true")
if (props.containsKey("key.separator"))
keySeparator = props.getProperty("key.separator")
if (props.containsKey("ignore.error"))//是否忽略錯誤
ignoreError = props.getProperty("ignore.error").trim.equalsIgnoreCase("true")
reader = new BufferedReader(new InputStreamReader(inputStream))
}
override def readMessage() = {
lineNumber += 1
print(">")
(reader.readLine(), parseKey) match {
case (null, _) => null
case (line, true) =>
line.indexOf(keySeparator) match {
case -1 => //在使用者輸入的訊息中沒有找到keySeparator
if (ignoreError) new ProducerRecord(topic, line.getBytes)
else throw new KafkaException(s"No key found on line $lineNumber: $line")
case n => //找到keySeparator定義的字元,則提取訊息體和key,組裝成為ProducerRecord物件
val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes
new ProducerRecord(topic, line.substring(0, n).getBytes, value)
}
case (line, false) =>//使用者沒有開啟parse.key功能,則設定key為null
new ProducerRecord(topic, line.getBytes)
}
}
}
LineMessageReader
的主要職責是讀取命令列中使用者的輸入,然後使用KafkaProducer
把訊息傳送出去。我們可以通過引數parse.key=true
以及key.separator=:
來告訴kakfa我們會顯式指定key。絕大多數情況下,除非有特殊需求,我們都不會使用如此繁複冗長的引數。因此,實際上,我使用如下命令進行訊息的生產時,效果和我在程式碼中使用KafkaProducer
進行訊息的生產並將key設定為null的效果一樣,Kafka都會通過使用DefaultPartitioner
來進行訊息的分派,由於key為null,將選擇任何一個活著的broker,因此,雖然我們Kafka的某個topic的部分partition的leader丟失,訊息卻不會丟失。
其實,我們在使用Kafka過程中,我們會以為我們使用比如一個每次遞增1的key,可以實現訊息分派的負載均衡,即訊息會幾乎均勻地分佈到所有的partition上面去。但是其實這樣做可能會造成訊息的丟失,更好的做法,就是直接不指定key,此時Kafka會幫助我們在所有或者的broker中選擇一個進行訊息分派,不會造成訊息丟失,同時負載均衡Kafka也幫我們完成了。
Kafka的key的使用是用來滿足定製化的分派規則而不是訊息均勻分派,比如:
1. 我們希望這個topic的所有訊息打到同一個partition,這時候我們可以指定一個不變的任意的key,根據DefaultPartitioner
的實現,訊息會固定打到某個partition;
2. 我們希望根據訊息的內容完全定製化地控制這個訊息對應的partition,這時候我們需要自己實現一個Partitioner
。如果我們看過DefaultPartitioner
的實現,那麼實現自己的定製化的Partitioner
就太簡單了。
總體來說本文的這些程式碼難度不是很大,但是對於我們理解Kafka的執行機制從而正確地、毫無誤解地使用Kafka非常有幫助。相比我在部落格中介紹的Hadoop、Yarn的排程求、資源管理器程式碼,這段程式碼非常容易理解,但是也可以從中看到Kafka程式碼的優雅和規範,良好的介面定義帶來良好的可擴充套件性。
相關推薦
從Kafka的一次broker假死介紹Kafka架構和DefaultPartitioner
最近公司的kafka叢集出現了節點已經失效但是節點程序和埠都還在的情況,目前我們的系統監控只是做到了程序監控,即為整個叢集的每臺機群建立程序快照,如果程序(如NameNode、kakfa broker)丟失,則報警並立刻自動重啟程序。但是這次的kafka事故程序
一次線上mysql死鎖分析
一、現象 發運車次呼叫發車介面時發生異常,後臺丟擲資料庫死鎖日誌。 二、原因分析 通過日誌可以看出事務T1等待 heap no 8的行鎖 (X locks 排他鎖) 事務T2持有heap no 8的行鎖,等待heap no 7的行鎖 兩個更新運
從原始碼一次徹底理解Android的訊息機制
情景重現 button.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v
一次cgi服務卡死的問題排查記錄
問題現象 cgi服務無法處理請求,cpu偶爾飆高。 問題排查記錄 檢視呼叫棧 首先jstack 檢視程序的當前呼叫棧,發現很多執行緒處於Blocked狀態。 jstack pid > stack.txt 檢視gc情況 cpu偶
Logstash從資料庫一次同步多張表
一次同步多張表是開發中的一般需求。之前研究了很久找到方法,但沒有詳細總結。 博友前天線上提問,說明這塊理解的還不夠透徹。 我整理下, 一是為了儘快解決博友問題, 二是加深記憶,便於未來產品開發中快速上手。 1、同步原理 原有ES專欄中有詳解,不再贅述。詳細請參
一次查詢sqlserver死鎖的經歷
查詢bug是程式設計師的家常便飯,我身邊的人喜歡讓使用者來重現問題。當然他們也會從正式伺服器上下載錯誤log,然後嘗試分析log,不過當錯誤不是那種不經思考就可識別的情況,他們就會將問題推向使用者,甚至怪罪程式依賴的平臺。他們常用的藉口就是“這個問題很難重現,需要持續監控
記一次被“呼死你”電話騷擾的反騷擾經歷
一、事件回放 2018 年 7 月 23 日下午 6:23,接到了來自 010-53565784 的電話。對方聲稱是愛上街催收的,要求我通知薛**及時還清在愛上街 app 上借款。同時,還告知說薛**借款時將我的手機號填寫為緊急聯絡人。一肚子氣啊,自己交友不慎啊。就掛了電話
記一次Mysql線上死鎖
Mysql死鎖日誌解讀(SHOW ENGINE INNODB STATUS;)2018-02-01 09:20:25 2b113e040700 INNODB MONITOR OUTPUT ===================================== Per se
一次 MySQL 線上死鎖分析實戰
> 關鍵詞:MySQL Index Merge ## 前言 MySQL 的鎖機制相信大家在學習 MySQL 的時候都有簡單的瞭解過,那既然有鎖就必定繞不開死鎖這個問題。其實 MySQL 在大部分場景下是不會存在死鎖問題的(比如併發量不高,SQL 寫得不至於太拉胯的情況),但是在高併發的業務場景下,一
記錄一次apache服務器啟動報錯和解決方法
受限 png www img oot 端口 使用 rwx 環境 問題描述:在liunx系統上安裝軟件時需要較大的權限,一般用戶是不能隨便安裝的。為了省事,在安裝lamp環境時,整個過程都是以root身份安裝各種軟件的。最後整個環境是安裝成功,但是像apache這樣的服務器如
React 重要的一次重構:認識非同步渲染架構 Fiber
Diff 演算法 熟悉 react 的朋友都知道,在 react 中有個核心的演算法,叫 diff 演算法。web 介面由 dom 樹組成,不同的 dom 樹會渲染出不同的介面。react 使用 virtual dom 來表示 dom 樹,而 diff 演算法就是用於比較 virtual dom 樹的區別,
一次刪資料而認識的CountDownLatch和CyclicBarrier
公司之前有個任務,要求刪除一張資料庫表裡面2018/2/1之前的資料。這張表裡面存放的是車輛定位資料,一輛車每天能產生4000+條定位資料,所以整個表蠻大的,有65億+條資料。而且還有要求:根據每個地區要統計出來這個地區刪除了多少條資料。其中2月1號之前的有10億多條。當然這
來一次徹底解決Java的值傳遞和引用傳遞
本文旨在用最通俗的語言講述最枯燥的基本知識 學過Java基礎的人都知道:值傳遞和引用傳遞是初次接觸Java時的一個難點,有時候記得了語法卻記不得怎麼實際運用,有時候會的了運用卻解釋不出原理,而且坊間討論的話題又是充滿爭議:有的論壇帖子說Java只有值傳遞,有的部落格說兩者皆
記我的一次配置Apache伺服器的域名解析和泛域名解析過程
配置apache的多域名解析,需要用到下面的東西: C:\WINDOWS\system32\drivers\etc\hosts (DNS域名解析的檔案) Apache2.2.11\conf\httpd.conf Apache2.2.11\conf\extra\httpd
再一次重新學習Python——錯誤、除錯和測試
錯誤 一種用try...except...finally捕獲錯誤並用raise丟擲 除錯 assert 斷言 凡是用print來輔助檢視的地方,都可以用斷言(assert)來替代: # err.py def foo(s): n = int(s)
sql語句優化一次進行多條記錄的-----插入和修改
更新: update t_student set name = 'timy' where id = 10 現在我要更新ID為10、12 、13的age等於10、12、13 UPDATE t_student SET age= CASEWHEN id 10 THEN10WHE
關於一次失敗的專案開發的反思和總結
這次教訓比較深刻,磨刀不誤砍柴工也是這個道理,最大的體會就是:相比較技術而言,解決問題的思想和方式更為重要。在開發一項公司活動產品的過程中,我因為建表太過膚淺,不規範,導致後期開發的過程中,程式碼越來越
記錄一次錯誤處理 (xml序列化和反序列化相關)
vfl last events all 長度 pat vid pac ria XML序列化後,反序列化時出現錯誤 報錯現象 System.InvalidOperationException: XML 文檔(40, 11)中有錯誤。 ---> System.Xml.X
這一次 徹底搞懂Vue針對陣列和雙向繫結(MVVM)的處理方式
歡迎關注我的部落格:https://github.com/wangweianger/myblog Vue內部實現了一組觀察陣列的
libzip開發筆記(一):libzip庫介紹、編譯和工程模板
前言 Qt使用一些壓縮解壓功能,選擇libzip庫,libzip庫比較原始,也是很多其他庫的基礎支撐庫。 libzip libzip是一個C庫,用於讀取,建立和修改zip檔案。可以從資料緩衝區,檔案或直接從其他zip歸檔檔案直接複製的壓縮資料中新增檔案。在不關