kafka的topic多分割槽的情況,如何保證跨區的訊息消費的順序性
這個問題嚴格來說是肯定有的,kafka只能保證分割槽內的有序性。
下面是kafka作者Jay Kreps的blog中介紹kafka設計思想的一段話。
Each partition is a totally ordered log, but there is no global ordering between partitions (other than perhaps some wall-clock time you might include in your messages). The assignment of the messages to a particular partition is controllable by the writer, with most users choosing to partition by some kind of key (e.g. user id). Partitioning allows log appends to occur without co-ordination between shards and allows the throughput of the system to scale linearly with the Kafka cluster size.
針對部分訊息有序(message.key相同的message要保證消費順序)場景,可以在producer往kafka插入資料時控制,同一key分發到同一partition上面。
kafka原始碼如下,支援該方式
private[kafka]classDefaultPartitioner[T]extendsPartitioner[T]{ privateval random = newjava.util.Random def partition(key: T, numPartitions: Int): Int = { if(key== null){ println("key is null") random.nextInt(numPartitions) } else{ println("key is "+ key + " hashcode is "+key.hashCode) math.abs(key.hashCode) % numPartitions } } }
在kafka-storm中,如果one partition -> one consumer instance 的話,就沒這樣的問題,但失去了並行。
如果N1 partitions -> N2 consumer instances的話 ,
1)N1<N2,這種情況會造成部分consumer空轉,資源浪費。
2)N1>N2(N2>1),這種情況,每個kafka-spout例項會消費固定的1個或者幾個partition,msg不會被不同consumer重複消費。
3)N1=N2,這種情況,實際操作發現,1個consumer instance都對應消費1個partition。1個partition只會有1個consumer例項,否則需要加鎖等操作,這樣減少了消費控制的複雜性。
具體應用場景:
計算使用者在某個位置的滯留時間,日誌內容可以抽象成使用者ID、時間點、位置。
應用系統-》日誌檔案sftp伺服器-》資料採集層-》kafka-》storm實時資料清洗處理層-》Redis、Hbase-》定時任務、mapreduce
在整合測試期間,由於沒有實際的日誌,所以在採集層模擬往kafka插入資料(特別在傳送頻率模擬的很粗糙),發現在實時處理層,計算出來使用者在某個位置滯留時間計算出來為負數,原因如下,
1)採集層模擬不真實(同一使用者往kafka插入的位置的時間是隨機生成),但要考慮目前的日誌檔案sftp伺服器 或者 採集層 是否會有這種情況,如果有,可以從業務層面規避,過濾掉該條無效資料。
2)就是storm中tuple處理失敗,重發,kafka-storm中就使offset回到失敗的那個位置,但之前位置資訊可能已經快取到了redis(為了減少hbase訪問次數,使用者的最近一條位置資訊放在了redis中),這樣offset之後的所有訊息會重新被消費,這樣以來滯留時間為負數,可以過濾掉該條記錄,不存到redis中。
真實資料:U1 T1 A1->U1 T2 A2
fail重發 :U1 T1 A1->U1 T2 A2 -> 前兩條都失敗,重發 -> U1 T1 A1(負數的滯留時間) -> U1 T2 A2
由於採用的是失敗重發,是at least once,如果是only once的話,就會沒有這樣的情況,
PS:一些原理性問題,可以參考“kafka消費原理”介紹。