如何管理Spark Streaming消費Kafka的偏移量(三)
前面的文章已經介紹了在spark streaming整合kafka時,如何處理其偏移量的問題,由於spark streaming自帶的checkpoint弊端非常明顯,所以一些對資料一致性要求比較高的專案裡面,不建議採用其自帶的checkpoint來做故障恢復。
在spark streaming1.3之後的版本支援direct kafka stream,這種策略更加完善,放棄了原來使用Kafka的高階API自動儲存資料的偏移量,之後的版本採用Simple API也就是更加偏底層的api,我們既可以用checkpoint來容災,也可以通過低階api來獲取偏移量自己管理偏移量,這樣以來無論是程序升級,還是故障重啟,在框架端都可以做到Exact One準確一次的語義。
本篇文章,會再介紹下,如何手動管理kafka的offset,並給出具體的程式碼加以分析:
版本:
apache spark streaming2.1
apache kafka 0.9.0.0
手動管理offset的注意點:
(1)第一次專案啟動的時候,因為zk裡面沒有偏移量,所以使用KafkaUtils直接建立InputStream,預設是從最新的偏移量開始消費,這一點可以控制。
(2)如果非第一次啟動,zk裡面已經存在偏移量,所以我們讀取zk的偏移量,並把它傳入到KafkaUtils中,從上次結束時的偏移量開始消費處理。
(3)在foreachRDD裡面,對每一個批次的資料處理之後,再次更新存在zk裡面的偏移量
注意上面的3個步驟,1和2只會載入一次,第3個步驟是每個批次裡面都會執行一次。
下面看第一和第二個步驟的核心程式碼:
/****
*
* @param ssc StreamingContext
* @param kafkaParams 配置kafka的引數
* @param zkClient zk連線的client
* @param zkOffsetPath zk裡面偏移量的路徑
* @param topics 需要處理的topic
* @return InputDStream[(String, String)] 返回輸入流
*/
def createKafkaStream(ssc: StreamingContext,
kafkaParams: Map[String, String],
zkClient: ZkClient,
zkOffsetPath: String,
topics: Set[String]): InputDStream[(String, String)]={
//目前僅支援一個topic的偏移量處理,讀取zk裡面偏移量字串
val zkOffsetData=KafkaOffsetManager.readOffsets(zkClient,zkOffsetPath,topics.last)
val kafkaStream = zkOffsetData match {
case None => //如果從zk裡面沒有讀到偏移量,就說明是系統第一次啟動
log.info("系統第一次啟動,沒有讀取到偏移量,預設就最新的offset開始消費")
//使用最新的偏移量建立DirectStream
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
case Some(lastStopOffset) =>
log.info("從zk中讀取到偏移量,從上次的偏移量開始消費資料......")
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
//使用上次停止時候的偏移量建立DirectStream
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, lastStopOffset, messageHandler)
}
kafkaStream//返回建立的kafkaStream
}
主要是針對第一次啟動,和非首次啟動做了不同的處理。
然後看下第三個步驟的程式碼:
/****
* 儲存每個批次的rdd的offset到zk中
* @param zkClient zk連線的client
* @param zkOffsetPath 偏移量路徑
* @param rdd 每個批次的rdd
*/
def saveOffsets(zkClient: ZkClient, zkOffsetPath: String, rdd: RDD[_]): Unit = {
//轉換rdd為Array[OffsetRange]
val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//轉換每個OffsetRange為儲存到zk時的字串格式 : 分割槽序號1:偏移量1,分割槽序號2:偏移量2,......
val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.untilOffset}").mkString(",")
log.debug(" 儲存的偏移量: "+offsetsRangesStr)
//將最終的字串結果儲存到zk裡面
ZkUtils.updatePersistentPath(zkClient, zkOffsetPath, offsetsRangesStr)
}
主要是更新每個批次的偏移量到zk中。
例子已經上傳到github中,有興趣的同學可以參考這個連結:
後續文章會聊一下為了升級應用如何優雅的關閉的流程式,以及在kafka擴充套件分割槽時,上面的程式如何自動相容。
相關推薦
如何管理Spark Streaming消費Kafka的偏移量(二)
上篇文章,討論了在spark streaming中管理消費kafka的偏移量的方式,本篇就接著聊聊上次說升級失敗的案例。 事情發生一個月前,由於當時我們想提高spark streaming程式的並行處理效能,於是需要增加kafka分割槽個數,,這裡需要說下,在新版本sp
如何管理Spark Streaming消費Kafka的偏移量(三)
前面的文章已經介紹了在spark streaming整合kafka時,如何處理其偏移量的問題,由於spark streaming自帶的checkpoint弊端非常明顯,所以一些對資料一致性要求比較高的專案裡面,不建議採用其自帶的checkpoint來做故障恢復。 在sp
Spark Streaming 之 Kafka 偏移量管理
本文主要介紹 Spark Streaming 應用開發中消費 Kafka 訊息的相關內容,文章著重突出了開發環境的配置以及手動管理 Kafka 偏移量的實現。 一、開發環境 1、元件版本 CDH 叢集版本:6.0.1 Spark 版本:2.2.0 Kafka 版本:1.0.1 2、M
Kafka:ZK+Kafka+Spark Streaming集群環境搭建(三)安裝spark2.2.1
node word clas 執行 選擇 dir clust 用戶名 uil 如何配置centos虛擬機請參考《Kafka:ZK+Kafka+Spark Streaming集群環境搭建(一)VMW安裝四臺CentOS,並實現本機與它們能交互,虛擬機內部實現可以上網。》 如
kafka同步非同步消費和訊息的偏移量(四)
1. 消費者位置(consumer position) 因為kafka服務端不儲存訊息的狀態,所以消費端需要自己去做很多事情。我們每次呼叫poll()方法他總是返回已經儲存在生產者佇列中還未被消費者消費的訊息。訊息在每一個分割槽中都是順序的,那麼必然可以通過一
Kafka:ZK+Kafka+Spark Streaming集群環境搭建(九)安裝kafka_2.11-1.1.0
itl CA blog tor line cat pre PE atan 如何搭建配置centos虛擬機請參考《Kafka:ZK+Kafka+Spark Streaming集群環境搭建(一)VMW安裝四臺CentOS,並實現本機與它們能交互,虛擬機內部實現可以上網。》 如
Kafka:ZK+Kafka+Spark Streaming集群環境搭建(二)VMW安裝四臺CentOS,並實現本機與它們能交互,虛擬機內部實現可以上網。
centos 失敗 sco pan html top n 而且 div href Centos7出現異常:Failed to start LSB: Bring up/down networking. 按照《Kafka:ZK+Kafka+Spark Streaming集群環
Kafka:ZK+Kafka+Spark Streaming集群環境搭建(十三)定義一個avro schema使用comsumer發送avro字符流,producer接受avro字符流並解析
finall ges records ring ack i++ 一個 lan cde 參考《在Kafka中使用Avro編碼消息:Consumer篇》、《在Kafka中使用Avro編碼消息:Producter篇》 pom.xml <depende
Spark Streaming實時流處理筆記(6)—— Kafka 和 Flume的整合
1 整體架構 2 Flume 配置 https://flume.apache.org/releases/content/1.6.0/FlumeUserGuide.html 啟動kafka kafka-server-start.sh $KAFKA_HOME/config/se
Spark Streaming實時流處理筆記(5)—— Kafka API 程式設計
1 新建 Maven工程 pom檔案 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLo
Spark Streaming實時流處理筆記(4)—— 分散式訊息佇列Kafka
1 Kafka概述 和訊息系統類似 1.1 訊息中介軟體 生產者和消費者 1.2 Kafka 架構和概念 producer:生產者(生產饅頭) consumer:消費者(吃饅頭) broker:籃子 topic : 主題,給饅頭帶一個標籤,(
Spark Streaming消費Kafka的資料進行統計
流處理平臺: 這裡是第四步的實現: Spark Streaming整合Kafka採用的是Receiver-based,另一種方式Direct Approach,稍作修改就行。 package spark import org.apache.spark.SparkConf impo
Spark Streaming消費Kafka Direct方式資料零丟失實現
一、概述 上次寫這篇文章文章的時候,Spark還是1.x,kafka還是0.8x版本,轉眼間spark到了2.x,kafka也到了2.x,儲存offset的方式也發生了改變,筆者根據上篇文章和網上文章,將offset儲存到Redis,既保證了併發也保證了資料不丟失,經過測試,有效。 二、
Spark Streaming 和kafka 整合指導(kafka 0.8.2.1 或以上版本)
本節介紹一下如何配置Spark Streaming 來接收kafka的資料。有兩個方法: 1、老的方法 -使用Receivers 和kafka的高階API 2、新的方法( Spark 1.3 開始引入)-不適用Receivers。這兩個方式擁有不同的程式設計模型,效能特徵
Kafka筆記整理(三):消費形式驗證與性能測試
大數據 Kafka 性能測試 [TOC] Kafka筆記整理(三):消費形式驗證與性能測試 Kafka消費形式驗證 前面的《Kafka筆記整理(一)》中有提到消費者的消費形式,說明如下: 1、每個consumer屬於一個consumer group,可以指定組id。group.id 2、消費形
Apache 流框架 Flink,Spark Streaming,Storm對比分析(2)
此文已由作者嶽猛授權網易雲社群釋出。 歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。 2.Spark Streaming架構及特性分析 2.1 基本架構 基於是spark core的spark streaming架構。 Spark Streaming是將流式計算分解成一系列短小的批處理作業。這裡的批處
Spark Streaming實時流處理筆記(3)——日誌採集Flume
1 Flume介紹 1.1 設計目標 可靠性 擴充套件性 管理性 1.2 同類產品 Flume: Cloudera/Apache,Java Scribe: Facebook ,C/C++(不維護了) Chukwa: Yahoo
Spark Streaming實時流處理筆記(2)—— 實時處理介紹
1 實時和離線計算對比 1.1 資料來源 離線:HDFS 歷史資料,資料量較大 實時:訊息佇列(Kafka) 1.2 處理過程 離線:Mapreduce 實時:Spark(DStream/SS) 1.3 處理速度 離
Spark Streaming實時流處理筆記(1)——Spark-2.2.0原始碼編譯
1 下載原始碼 https://spark.apache.org/downloads.html 解壓 2 編譯原始碼 參考 https://www.imooc.com/article/18419 https://spark.apache.org/docs/2.2.2/bu
Apache 流框架 Flink,Spark Streaming,Storm對比分析(二)
本文由 網易雲 釋出2.Spark Streaming架構及特性分析2.1 基本架構基於是spark core的spark streaming架構。Spark Streaming是將流式計算分解成一系列短小的批處理作業。這裡的批處理引擎是Spark,也就是把Spark Str