1. 程式人生 > >kafka權威指南中文版之二

kafka權威指南中文版之二

上圖所示,consumer訂閱kafka叢集中(一個broker中的一個topic中)的訊息,然後對broker發起一個獲取訊息的請求,請求中攜帶了topic、partition、offset等資訊,接著用pull的方式獲取kafka log中所有可用訊息,並對訊息中的資料進行處理,比如使用spark進行計算,將結果存入DB中。 consumer訂閱訊息時,會連線上任一個可用的broker,並獲取topic中leader partition的元資料metadata資訊,這樣consumer就可以直接與leader partition通訊,獲取訊息。 消費者Consumer客戶端可以用多種語言實現,如Java語言、C語言或者Python語言。這裡採用Java語言實現。 Consumer通過與brokers的TCP連線來獲取資料。未能及時關閉Consumer,將導致TCP連線洩露。 Consumer是非執行緒安全的; offset
在kafka中的每一個partition中 ,kafka會為每一條訊息記錄分配一個數值型的offset。offset值唯一標識了partition中的一條訊息,也表示消費者在分割槽的消費位置。也就是說,一個消費者Consumer的消費position為5,說明已經消費了offset為0,1,2,3,4的訊息,下一個要消費的訊息的offset為5。position對於Consumer來說,有兩層含義: position: Consumer的position等於將要消費記錄的offset;大於Consumer已消費paitition中的訊息的offset的最大值。在Consumer每次呼叫poll(long)方法獲取訊息時,position的值自動增加。 committed position:
是最後一個成功儲存的offset,重啟或者錯誤處理時,需要恢復到的offset的值等於committed position。Comsumer可以週期性的自動提交offset,也可以手工呼叫commitSync來或者commitAsync方法提交。commitSync方法將阻塞,直到提交成功或者發生錯誤。commitAsync方法不阻塞,無論成功或者失敗,都將呼叫回撥函式OffsetCommitCallback。 消費者分組和再平衡 kafka使用consumer groups來劃分訊息處理和消費的程序(池)。這些程序可以執行在同一臺機器上,也可以執行在多臺機器上(有利於擴充套件和容錯)。 一個消費者Comsumer只能屬於一個consumer group ,通過subscribe API 可以動態設定topic列表。kafka會將topic中的每一條訊息傳送給consumer group中的一條程序。為了使topic partition分割槽與consumer group中的程序達到平衡,每一個partition只會有consumer group中的一個消費者來消費。例如:如果一個topic有4個分割槽,一個consumer group有2個程序(消費者),那麼,每一個程序會消費2個分割槽中的訊息。 如果consumer group分組中的一個consumer失敗了,其消費的topic分割槽將會分配給相同分組中的其他消費者;如果consumer group分組中新增了一個consumer,topic分割槽將會從已有消費者上移動到新消費者上。上述過程稱為rebalancing再平衡
。當topic中增加一個新分割槽時,會採用相同的過程進行再平衡。 一個consumer group可以看做是一個獨立的邏輯訂閱者,此訂閱者可以包含多個程序。 當一個分組進行再平衡操作時,會通過 ConsumerRebalanceListener類來通知消費者,消費者可以藉此進行應用程式級別的邏輯處理:如狀態清零、手工提交offset等。 一個partition只會被一個消費者消費。 消費者consumer失敗檢測 消費者訂閱一系列topic主題之後,在呼叫poll(long)方法時,將自動歸屬於一個group。poll API來保證consumer的存活狀態。只要持續呼叫poll方法,消費者將一直存在於分組中,持續收到所屬partition中的訊息。底層實現內幕是:poll API會週期性的向server傳送心跳,當停止呼叫poll方法時,將會停止傳送心跳。如果在超過了session失效時間,那麼此消費者會被從當前組中移除,其消費的partition將會被重新分配。這樣做,是為了避免這樣的場景:一個消費者失敗了,仍然持有其partition。 這樣的設計意味著:poll迴圈中,訊息的處理時間要小於心跳的超時時間。如果大於心跳超時時間,消費者將無法提交offset(commitSync()方法會丟擲CommitFailedException 異常)。 消費者提供了兩種配置設定來控制這種行為: 1、session.timeout.ms:通過增加session失效時間,consumer可以有更多的時間來處理從poll(long)獲取的一批記錄。缺點是:延長了server發現consumer失敗的時間,進而導致延遲再平衡時間。但是不包括consumer呼叫close方法的情況,因為此時consumer會發送一個顯式的訊息到server,此時會觸發一個及時的再平衡操作。 2、max.poll.records: poll迴圈中的處理時間與處理的記錄數量成正比,所以要限制一次處理記錄的數量。可以通過此引數設定,預設情況下是沒有限制。 在一些場景下,訊息處理時間是很難預測的,上述兩種配置都不可行。推薦的方式為:將訊息處理邏輯放到一個獨立的執行緒中,這樣consumer可以繼續傳送心跳。需要注意的是,提交的offset不應該在實際位置之前。也就是說,你應該禁用自動提交offset,在訊息處理執行緒中手工提交offset。通常情況下,你需要使用 pause(Collection)方法來停止從partition中獲取新的訊息。消費者api提供了靈活性,覆蓋了各種消費的用例。

相關推薦

kafka權威指南中文版

上圖所示,consumer訂閱kafka叢集中(一個broker中的一個topic中)的訊息,然後對broker發起一個獲取訊息的請求,請求中攜帶了topic、partition、offset等資訊,接著用pull的方式獲取kafka log中所有可用訊息,並對訊息中的資料進行處理,比如使用spark進行計算

HTML5權威指南 中文版 高清PDF掃描版?

基礎知識 其他 表格 ava 兼容 第5章 網頁設計 javascrip 設計 HTML5權威指南是一本系統學習網頁設計的權威參考圖書。《HTML5權威指南》分為五部分:第一部分介紹學習本書的預備知識和HTML、CSS和JavaScript的最新進展;第二部分討論HTML

IDA Pro 權威指南學習筆記() - IDA 數據庫文件

names 標記 image 一個 輸入 需要 二叉 pro 樹形 把要分析的文件用 IDA 打開後,會生成 3 個數據庫文件 擴展名分別為 .id0,id1,nam .id0 文件是一個二叉樹形式的數據庫 .id1 文件包含描述每個程序字節的標記 .nam 文

《Netty權威指南》()NIO 入門

目錄 2.3 NIO 程式設計 2.3.1 Buffer、Channel、Selector   2.3 NIO 程式設計 NIO 官方稱為 New I/O,目標是要讓 Java 支援非阻塞 I/O,所以通常也叫非阻塞 I/O(Non-blocking I/O)

JavaScript權威指南手記(

並不是 簡單 arguments nat 都是 while 因此 regex define 1、詞法結構 程序設計語言的詞法結構是一套基本規則,用來詳細說明如何用這種語言來編寫程序,它是語言的最低層次,指定 了變量是什麽樣,註釋應該用什麽字符以及語句之間如何分割等 1.1、

《netty權威指南拆包粘包問題及解決方案1

客戶端和服務端程式碼 package com.lyzx.netty.netty02; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.net

《netty權威指南JBoss序列化框架Marshalling

前面講了netty解決拆包粘包的問題 我們發現拆包粘包問題的解決都只是解決netty傳送字串的情況 在企業及開發中很少有直接使用字串的,一般都有定義好的訊息體,這個訊息體一定對應實體類 如果要傳送實體類那麼久一定要對實體類做序列化 (序列化就是把檔案或者記憶體中的資料結構轉換

《netty權威指南模擬伺服器之間的心跳檢測

在叢集環境下伺服器之間是要定時進行心跳檢測的,那麼netty可以用來做這件事, 在叢集環境中,選定一臺服務區做master,其餘的做salve 即master <==>  server端    salve   <==>  客戶端 客戶端定時像服務端傳送

Kafka權威指南》——初識 Kafka

釋出與訂閱訊息系統 在正式討論Apache Kafka (以下簡稱Kafka)之前,先來了解發布與訂閱訊息系統的概念, 並認識這個系統的重要性。資料(訊息)的傳送者(釋出者)不會直接把訊息傳送給接收 者,這是釋出與訂閱訊息系統的一個特點。釋出者以某種方式對訊息進行分類,接收

kafka 權威指南--讀書筆記-(3)向kafka寫入資料

(1)kafka生產者設計和元件 (1)不同的應用場景對訊息有不同的需求,即是否允許訊息丟失、重複、延遲以及吞吐量的要求。不同場景對Kafka生產者的API使用和配置會有直接的影響。 例子1:信用卡事務處理系統,不允許訊息的重複和丟失,延遲最大500ms,對吞吐量要求較高

kafka權威指南閱讀筆記(一)

首先可以對分配給 socket讀寫緩衝區的記憶體大小作出調整,這樣可以顯著提升網路的傳 輸效能。 socket讀寫緩衝區對應的引數分別是net.core.wmem_default和 net.core.rmem_default,合理的值是 131 072 (也就是 128KB)。讀寫緩衝區最大值對應的引數分別是

kafka+java 偽分散式

對上一篇 kafka+java 偽分散式有生產者和消費者 ,接下來對High Level Consumer做處理 如下: 先看程式碼 :  package com.xiefg.kafka; import java.util.HashMap; import java.u

Jenkins 權威指南(中文版) 第四章 配置Jenkins的Server

        你使用這些變數通常有兩種方法。首先,你可以在你的構建指令碼中直接使用它們,使用$ {key}或$key(在這個例子中,你應該使用${ldapserver}或$ldapserver),這是最簡單的方法,但意味著你的構建工作和構建指令碼之間的緊耦合。如果你的指令碼使用不同的屬性名(含點號),你也

kafka權威指南中文翻譯之一

kafka初見(Meet Kafka) 在討論Kafka細節之前,有必要先來了解下訊息釋出/訂閱的概念,這個概念非常重要。 kafka中的資料單位是message。對比資料庫來說,可以把訊息看做資料庫中的記錄。對kafka而言,一個訊息就是一個位元組陣列,位元組陣列中的資

【筆記】kafka權威指南-常用配置和要點記錄

Kafka 的應用場景 訊息佇列 Kafka有更好的吞吐量,內建的分割槽,冗餘及容錯性,這讓Kafka成為了一個很好的大規模訊息處理應用的解決方案。 行為跟蹤和日誌收集。 敏感操作和日誌,都可以寫到 kafka 裡進行統一,分情況的監控、

zookeeper+kafka集群安裝

聲明 r+ object ise width cli top 直接 partition 版權聲明:本文為博主原創文章,未

Kafka】《Kafka權威指南》——寫資料

不管是把 Kafka 作為訊息佇列、訊息、匯流排還是資料儲存平臺來使用 ,總是需要有一個可以往 Kafka 寫入資料的生產者和一個

Kafka】《Kafka權威指南》——分割槽partition

在上篇的例子裡(【Kafka】《Kafka權威指南》——寫資料), ProducerRecord 物件包含了目標主題、鍵和值。 K

【JAVA進階架構師指南:JVM篇

## 前言   談到JAVA,就不得不提JVM---JAVA程式設計師繞不開的話題.也許有童鞋會說,我不懂JVM,但是我一樣可以寫出JAVA程式碼,我相信說這種話的童鞋,往往是隻有1-3年的初級開發人員,對JAVA理解還不深,不明白JVM的重要性,那接下來我們來說說,為什麼要學習JVM?   1.理解JV

Java性能權威指南讀書筆記--

任務 觸發 ber vivo 日誌 普通 參數 成對 初始 新生代填滿時,垃圾收集器會暫停所有的應用線程,回收新生代空間。這種操作被稱為Minor GC。 老年代被填滿時,垃圾收集器會暫停所有應用線程,對其進行回收,接著對堆空間進行整理。這個過程被稱為Full GC。 最主