kafka官網示例說明--KafkaConsumer
Kafka client會消費kafka cluster中的記錄。
它將透明地處理Kafka叢集中的伺服器故障,並透明地適應它在叢集內遷移的資料分割槽。該客戶機還與伺服器互動,以允許使用者組使用消費者組來負載平衡消費(如下所述)。
消費者維護TCP連線到必要的代理來獲取資料。使用後未能關閉消費者會洩漏這些連線。消費者不是執行緒安全的。更多細節見多執行緒處理。
偏移量
Kafka為分割槽中的每個記錄保留一個數值偏移量。這個偏移量充當該分割槽中記錄的唯一識別符號,並且表示該分割槽中的消費者的位置。也就是說,擁有5號位置的消費者使用偏移值為0到4的記錄,並將使用偏移量5記錄下一個記錄。實際上,與使用者的使用者有關的位置有兩個概念。消費者的位置將提供下一個記錄的偏移量。它將比消費者在該分割槽中看到的最高偏移量大一個。它在消費者每次接收資料呼叫poll(long)和接收訊息時自動地前進。
已提交的位置是安全儲存的最後一個偏移量。如果程序失敗並重新啟動,這將恢復到它將恢復的偏移量。消費者可以定期自動提交補償;或者,它可以選擇通過呼叫commitSync來手動控制這個提交的位置,它會阻塞,直到在提交過程中成功提交了補償或致命錯誤,或者提交了非阻塞的commitAsync,並將觸發OffsetCommitCallback,要麼成功提交,要麼失敗。
消費者組和訂閱主題
Kafka使用了消費者組(Consumer Groups)的概念來允許一個過程池來劃分消費和處理記錄的工作。用法示例
1. 自動確認Offset
Properties props = new Properties();
/* 定義kakfa 服務的地址,不需要將所有broker指定上 */
props.put("bootstrap.servers", "localhost:9092");
/* 制定consumer group */
props.put("group.id", "test");
/* 是否自動確認offset */
props.put("enable.auto.commit", "true");
/* 自動確認offset的時間間隔 */
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
/* key的序列化類 */
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
/* value的序列化類 */
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
/* 定義consumer */
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
/* 消費者訂閱的topic, 可同時訂閱多個 */
consumer.subscribe(Arrays.asList("foo", "bar"));
/* 讀取資料,讀取超時時間為100ms */
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
說明:
1. bootstrap.servers 只是代表kafka的連線入口,只需要指定叢集中的某一broker;
2. 一旦consumer和kakfa叢集建立連線,consumer會以心跳的方式來高速叢集自己還活著,如果session.timeout.ms 內心跳未到達伺服器,伺服器認為心跳丟失,會做rebalence。
2. 手工控制Offset
如果consumer在獲得資料後需要加入處理,資料完畢後才確認offset,需要程式來控制offset的確認。舉個栗子:
consumer獲得資料後,需要將資料持久化到DB中。自動確認offset的情況下,如果資料從kafka叢集讀出,就確認,但是持久化過程失敗,就會導致資料丟失。我們就需要控制offset的確認。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
/* 關閉自動確認選項 */
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
/* 資料達到批量要求,就寫入DB,同步確認offset */
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
還可以精細的控制對具體分割槽具體offset資料的確認:
try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
/* 同步確認某個分割槽的特定offset */
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
說明:確認的offset為已接受資料最大offset+1。
3. 分割槽訂閱
可以向特定的分割槽訂閱訊息。但是會失去partion的負載分擔。有幾種場景可能會這麼玩:
1. 只需要獲取本機磁碟的分割槽資料;
2. 程式自己或者外部程式能夠自己實現負載和錯誤處理。例如YARN/Mesos的介入,當consumer掛掉後,再啟動一個consumer。
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));
說明:
1. 此種情況用了consumer Group,也不會做負載均衡。
2. topic的訂閱和分割槽訂閱不可以在同一consumer中混用。
4. 外部儲存offset
消費者可以自定義kafka的offset儲存位置。該設計的主要目的是讓消費者將資料和offset進行原子性的儲存。這樣可以避免上面提到的重複消費問題。舉慄說明:
訂閱特定分割槽。儲存所獲得的記錄時,將每條記錄的offset一起儲存。保證資料和offset的儲存是原子性的。當非同步儲存被異常打斷時,凡已經儲存的資料,都有有相應的offset記錄。這種方式可以保證不會有資料丟失,也不會重複的從服務端讀取。
如何配置實現:
1. 去使能offset自動確認:enable.auto.commit=false;
2. 從ConsumerRecord中獲取offset,儲存下來;
3. Consumer重啟時,呼叫seek(TopicPartition, long)重置在服務端的消費記錄。
如果消費分割槽也是自定義的,這種方式用起來會很爽。如果分割槽是自動分配的,當分割槽發生reblance的時候,就要考慮清楚了。如果因為升級等原因,分割槽漂移到一個不會更新offset的consumer上,那就日了狗了。
該情況下:
1. 原consumer需要監聽分割槽撤銷事件,並在撤銷時確認好offset。介面:ConsumerRebalanceListener.onPartitionsRevoked(Collection);
2. 新consumer監聽分割槽分配事件,獲取當前分割槽消費的offset。介面:ConsumerRebalanceListener.onPartitionsAssigned(Collection);
3. consumer監聽到 ConsumerRebalance事件,還沒有處理或者持久化的快取資料flush掉。
5. 控制消費位置
大多數情況下,服務端的Consumer的消費位置都是由客戶端間歇性的確認。Kafka允許Consumer自己設定消費起點,達到的效果:
1. 可以消費已經消費過的資料;
2. 可以跳躍性的消費資料;
看下這樣做的一些場景:
1. 對Consumer來說,資料具備時效性,只需要獲取最近一段時間內的資料,就可以進行跳躍性的獲取資料;
2. 上面自己存offset的場景,重啟後就需要從指定的位置開始消費。
介面上面已經提到過了,用seek(TopicPartition, long)。、
麻蛋,說指標不就好了,這一小節就是多餘的叨叨。
6. 控制消費流Consumption Flow Control
如果一個consumer同時消費多個分割槽,預設情況下,這多個分割槽的優先順序是一樣的,同時消費。Kafka提供機制,可以讓暫停某些分割槽的消費,先獲取其他分割槽的內容。場景舉慄:
1. 流式計算,consumer同時消費兩個Topic,然後對兩個Topic的資料做Join操作。但是這兩個Topic裡面的資料產生速率差距較大。Consumer就需要控制下獲取邏輯,先獲取慢的Topic,慢的讀到資料後再去讀快的。
2. 同樣多個Topic同時消費,但是Consumer啟動是,本地已經存有了大量某些Topic資料。此時就可以優先去消費下其他的Topic。
調控的手段:讓某個分割槽消費先暫停,時機到了再恢復,然後接著poll。介面:pause(TopicPartition…),resume(TopicPartition…)
7. 多執行緒處理模型 Multi-threaded Processing
Kafka的Consumer的介面為非執行緒安全的。多執行緒共用IO,Consumer執行緒需要自己做好執行緒同步。
如果想立即終止consumer,唯一辦法是用呼叫介面:wakeup(),使處理執行緒產生WakeupException。
public class KafkaConsumerRunner implements Runnable {
/* 注意,這倆貨是類成員變數 */
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public void run() {
try {
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
ConsumerRecords records = consumer.poll(10000);
// Handle new records
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}
// Shutdown hook which can be called from a separate thread
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
說明:
1. KafkaConsumerRunner是runnable的,請自覺補腦多執行緒執行;
2. 外部執行緒控制KafkaConsumerRunner執行緒的停止;
3. 主要說的是多執行緒消費同一topic,而不是消費同一分割槽;
比較一下兩種模型:
Consumer單執行緒模型
優點:實現容易;
沒有執行緒之間的協作。通常比下面的那種更快;
單分割槽資料的順序處理;
缺點:多個TCP連線,但是關係不大,kafka對自己的server自信滿滿;
太多的Request可能導致server的吞吐降低一丟丟;
consumer數量受到分割槽數量限制,一個consumer一個分割槽;
Consumer多執行緒模型
優點:一個consumer任意多的執行緒,執行緒數不用受到分割槽數限制;
缺點:如果有保序需求,自己要加控制邏輯;
該模型中如果手動offset,自己要加控制邏輯;
一種可行的解決辦法:為每個分割槽分配獨立的儲存,獲取的資料根據資料所在分割槽進行hash儲存。這樣可以解決順序消費,和offset的確認問題。
相關推薦
kafka官網示例說明--KafkaConsumer
Kafka client會消費kafka cluster中的記錄。 它將透明地處理Kafka叢集中的伺服器故障,並透明地適應它在叢集內遷移的資料分割槽。該客戶機還與伺服器互動,以允許使用者組使用消費者組來負載平衡消費(如下所述)。 消費者維護TCP連線到必要的代理來獲
Leaflet_創建地圖(官網示例)
位置 custom phone world! -c setview art locate ipa 官網:http://leafletjs.com/examples.html 快速啟動指南 http://leafletjs.com/examples/quick-start/e
Dubbo入門-官網示例dubbo-demo的使用
2.進入剛clone下來的incubator-dubbo目錄,使用mvn編譯安裝【mvn install -Dmaven.test.skip=true】,等待安裝結束看到BuildSuccess即可。 3.目錄不變,設定專案使用idea編輯器開啟(mvn自己
Elasticsearch.Net 官網示例的坑
經過昨天的ElasticSearch 安裝,服務以及可以啟動了,接下來就可以開發了,找到了官網提供的API以及示例,Es 官方提供的.net 客戶端有兩個版本一個低階版本: 【Elasticsearch.Net.dll】這個dll檔案官方解釋無依賴關係的客戶端,對於您如何構建和表示您的請求和相應沒有任何意
Kafka官網介紹
本文內容來自:http://kafka.apache.org/documentation.html#quickstart Kafka is a distributed, partitioned, replicated commit log service. It prov
FineUI(開源版)v4.2.2釋出(8年125個版本,官網示例突破300個)!
開源版是 FineUI 的基石,從 2008 年至今已經持續釋出了 120 多個版本,擁有會員 15,000 多位,捐贈會員達到 1,200 多位。 FineUI(開源版)v4.2.2 是 8 年來的第 125 個版本,對錶單、表格進行底層結構的重要調整,使其更簡單更易於擴充套件,同時官網示例數也突破
kindeditor官網異步加載示例無效,解決無法通過方法初始化編輯器
har cdd kxml yep adf tr1 ket 沒有效果 iba 官網示例:http://kindeditor.net/ke4/examples/dynamic-load.html 項目中發現一個問題,kindeditor官網是通過 初始化編輯器,但是現在有
Vue官網todoMVC示例
這個示例是模仿官網示例樣式和功能用我自己的方式寫的,基本上沒有看官網的原始碼,只參考自定義指令。讓我們一步步來探討一下。官網demo 要實現的功能 單條新增todo 單條刪除todo 雙擊編輯todo 單條todo已完成相應樣式狀態改變 全部todo是已完成相應樣式狀態改變 清除
基於Kafka 入門小案例-官網學習
首先Maven引入 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <v
mongoDB官網中Storing Log Data的python示例程式碼有誤,
如題, 在這篇文章中的python程式碼中, 構造一個記錄的程式碼如下: >>> event = { ... _id: bson.ObjectId(), ... host: "127.0.0.1"
Redux官網Counter最基本示例的思考
1.不使用redux實現 如果不使用redux,僅僅依靠react去實現Counter功能是極其簡單的。程式碼如下: index.js import React from 'react'; import ReactDOM from 'react-dom'; import Cou
使用java的MultipartFile實現layui官網檔案上傳實現全部示例,java檔案上傳
layui(諧音:類UI) 是一款採用自身模組規範編寫的前端 UI 框架,遵循原生 HTML/CSS/JS 的書寫與組織形式,門檻極低,拿來即用。 layui檔案上傳示例地址:https://www.layui.com/demo/upload.html 本次教程是基於springboot2.0的。 測試
【Spark深入學習 -16】官網學習SparkSQL
客戶 .com pmu 參考資料 一行 uap lsa bmi orb ----本節內容-------1.概覽 1.1 Spark SQL 1.2 DatSets和DataFrame2.動手幹活 2.1 契入點:SparkSessi
vue2.0實踐 —— Node + vue 實現移動官網
縮放 one fix show htm cati 接口 簡介 tac 簡介 使用 Node + vue 對公司的官網進行了一個簡單的移動端的實現。 源碼 https://github.com/wx1993/node-vue-fabaocn 效果 組件 輪
關註PHPthinking官網微信公眾號——紅包來襲
技術 text font 微信訂閱號 微信紅包 water fonts pac think 歡迎大家掃描關註PHPthinking官方微信訂閱號,我們將給您定期發送質量博文、新聞趣事、站點公告等等,同一時候還有PHPthinking準備的每日微信紅包(金額不等,已發出百
Android學習 多讀官網,故意健康---手勢
same str ces 12px lis assume extend current -- 官網地址 ttp://developer.android.com/training/gestures/detector.html: 一、能夠直接覆蓋Activity的on
比特幣新技術|比特幣中國官網
利潤 需要 公司 影響 投資者 網站 社區 聯網 勝利 親愛的朋友們! 改善您生活最好方式之一,是每天在不斷地改善自己的所想法,感情,話與決定!如果您對額外的收入來源有興趣的話(類似互聯網交易,交易所交易,投資)那,歡迎您參與國際儲備體IRS公司。由於我們一直在努力為了
Python自學筆記-map和reduce函數(來自廖雪峰的官網Python3)
求和 rabl style 序列 list port lambda char att 感覺廖雪峰的官網http://www.liaoxuefeng.com/裏面的教程不錯,所以學習一下,把需要復習的摘抄一下。 以下內容主要為了自己復習用,詳細內容請登錄廖雪峰的官網查看。
入侵拿下DVBBS php官網詳細過程(圖)
sta 電話 subst wget 團隊 sim 不遠 cls 接下來 幾 個月前,DVBBS php2.0暴了一個可以直接讀出管理員密碼的sql註入漏洞,當時這個漏洞出來的時候,我看的心癢,怎麽還會有這麽弱智的漏洞,DVBBS php2.0這套代碼我還沒仔細看過,於是5月
做sxy官網的一點經驗
offset true document container fse dev cnblogs 可用 滾輪 jquery2及以上不再支持IE8;IE不支持document.body.scrollTop, 也不支持$().scrollTop(), 用 var top = wi