1. 程式人生 > >Spark Streaming 中管理 Kafka Offsets 的幾種方式

Spark Streaming 中管理 Kafka Offsets 的幾種方式

本文轉載自:https://www.jianshu.com/p/ef3f15cf400d(點選下面 閱讀原文 即可進入)

英文原文:http://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/

Offset管理概述

Spark Streaming集成了Kafka允許使用者從Kafka中讀取一個或者多個topic的資料。一個Kafka topic包含多個儲存訊息的分割槽(partition)。每個分割槽中的訊息是順序儲存,並且用offset(可以認為是位置)來標記訊息。開發者可以在他的Spark Streaming應用中通過offset來控制資料的讀取位置,但是這需要好的offset的管理機制。

Offsets管理對於保證流式應用在整個生命週期中資料的連貫性是非常有益的。舉個例子,如果在應用停止或者報錯退出之前沒有將offset儲存在持久化資料庫中,那麼offset rangges就會丟失。更進一步說,如果沒有儲存每個分割槽已經讀取的offset,那麼Spark Streaming就沒有辦法從上次斷開(停止或者報錯導致)的位置繼續讀取訊息。

640?wx_fmt=png

上面的圖描述通常的Spark Streaming應用管理offset流程。Offsets可以通過多種方式來管理,但是一般來說遵循下面的步驟:

  • 在 Direct DStream初始化的時候,需要指定一個包含每個topic的每個分割槽的offset用於讓Direct DStream從指定位置讀取資料。

    • offsets就是步驟4中所儲存的offsets位置

  • 讀取並處理訊息

  • 處理完之後儲存結果資料

    • 用虛線圈儲存和提交offset只是簡單強呼叫戶可能會執行一系列操作來滿足他們更加嚴格的語義要求。這包括冪等操作和通過原子操作的方式儲存offset。

  • 最後,將offsets儲存在外部持久化資料庫如 HBase, Kafka, HDFS, and ZooKeeper中

不同的方案可以根據不同的商業需求進行組合。Spark具有很好的程式設計正規化允許使用者很好的控制offsets的儲存時機。認真考慮以下的情形:一個Spark  Streaming 應用從Kafka中讀取資料,處理或者轉換資料,然後將資料傳送到另一個topic或者其他系統中(例如其他訊息系統、Hbase、Solr、DBMS等等)。在這個例子中,我們只考慮訊息處理之後傳送到其他系統中

將Offsests儲存在外部系統

在這一章節中,我們將來探討一下不同的外部持久化儲存選項

為了更好地理解這一章節中提到的內容,我們先來做一些鋪墊。如果是使用 spark-streaming-kafka-0-10,那麼我們建議將 enable.auto.commit 設為false。這個配置只是在這個版本生效,enable.auto.commit 如果設為true的話,那麼意味著 offsets 會按照 auto.commit.interval.ms 中所配置的間隔來週期性自動提交到Kafka中。在Spark Streaming中,將這個選項設定為true的話會使得Spark應用從kafka中讀取資料之後就自動提交,而不是資料處理之後提交,這不是我們想要的。所以為了更好地控制offsets的提交,我們建議將enable.auto.commit 設為false。

Spark Streaming checkpoints

使用Spark Streaming的checkpoint是最簡單的儲存方式,並且在Spark 框架中很容易實現。Spark Streaming checkpoints就是為儲存應用狀態而設計的,我們將路徑這在HDFS上,所以能夠從失敗中恢復資料。

對Kafka Stream 執行checkpoint操作使得offset儲存在checkpoint中,如果是應用掛掉的話,那麼SparkStreamig應用功能可以從儲存的offset中開始讀取訊息。但是,如果是對Spark Streaming應用進行升級的話,那麼很抱歉,不能checkpoint的資料沒法使用,所以這種機制並不可靠,特別是在嚴格的生產環境中,我們不推薦這種方式。

將offsets儲存在HBase中

HBase可以作為一個可靠的外部資料庫來持久化offsets。通過將offsets儲存在外部系統中,Spark Streaming應用功能能夠重讀或者回放任何仍然儲存在Kafka中的資料。

根據HBase的設計模式,允許應用能夠以rowkey和column的結構將多個Spark Streaming應用和多個Kafka topic存放在一張表格中。在這個例子中,表格以topic名稱、消費者group id和Spark Streaming 的batchTime.milliSeconds作為rowkey以做唯一標識。儘管batchTime.milliSeconds不是必須的,但是它能夠更好地展示歷史的每批次的offsets。表格將儲存30天的累積資料,如果超出30天則會被移除。下面是建立表格的DDL和結構

1DDL
2create 'stream_kafka_offsets', {NAME=>'offsets', TTL=>2592000}
3RowKey Layout:
4row:                     <TOPIC_NAME>:<GROUP_ID>:<EPOCH_BATCHTIME_MS>
5column family:    offsets
6qualifier:          <PARTITION_ID>
7value:                 <OFFSET_ID>

對每一個批次的訊息,使用saveOffsets()將從指定topic中讀取的offsets儲存到HBase中

 1/*
2 Save offsets for each batch into HBase
3*/

4def saveOffsets(TOPIC_NAME:String,GROUP_ID:String,offsetRanges:Array[OffsetRange],
5                hbaseTableName:String,batchTime: org.apache.spark.streaming.Time) ={
6  val hbaseConf = HBaseConfiguration.create()
7  hbaseConf.addResource("src/main/resources/hbase-site.xml")
8  val conn = ConnectionFactory.createConnection(hbaseConf)
9  val table = conn.getTable(TableName.valueOf(hbaseTableName))
10  val rowKey = TOPIC_NAME + ":" + GROUP_ID + ":" +String.valueOf(batchTime.milliseconds)
11  val put = new Put(rowKey.getBytes)
12  for(offset <- offsetRanges){
13    put.addColumn(Bytes.toBytes("offsets"),Bytes.toBytes(offset.partition.toString),
14          Bytes.toBytes(offset.untilOffset.toString))
15  }
16  table.put(put)
17  conn.close()
18}

在執行streaming任務之前,首先會使用getLastCommittedOffsets()來從HBase中讀取上一次任務結束時所儲存的offsets。該方法將採用常用方案來返回kafka topic分割槽offsets。

  • 情形1:Streaming任務第一次啟動,從zookeeper中獲取給定topic的分割槽數,然後將每個分割槽的offset都設定為0,並返回。

  • 情形2:一個運行了很長時間的streaming任務停止並且給定的topic增加了新的分割槽,處理方式是從zookeeper中獲取給定topic的分割槽數,對於所有老的分割槽,offset依然使用HBase中所儲存,對於新的分割槽則將offset設定為0。

  • 情形3:Streaming任務長時間執行後停止並且topic分割槽沒有任何變化,在這個情形下,直接使用HBase中所儲存的offset即可。

在Spark Streaming應用啟動之後如果topic增加了新的分割槽,那麼應用只能讀取到老的分割槽中的資料,新的是讀取不到的。所以如果想讀取新的分割槽中的資料,那麼就得重新啟動Spark Streaming應用。

 1/* Returns last committed offsets for all the partitions of a given topic from HBase in  
2following  cases.
3*/

4def getLastCommittedOffsets(TOPIC_NAME:String,GROUP_ID:String,hbaseTableName:String,
5zkQuorum:String,zkRootDir:String,sessionTimeout:Int,connectionTimeOut:Int):Map[TopicPartition,Long] ={
6  val hbaseConf = HBaseConfiguration.create()
7  val zkUrl = zkQuorum+"/"+zkRootDir
8  val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl,
9                                                sessionTimeout,connectionTimeOut)
10  val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2,false)
11  val zKNumberOfPartitionsForTopic = zkUtils.getPartitionsForTopics(Seq(TOPIC_NAME
12                                                 )).get(TOPIC_NAME).toList.head.size
13  zkClientAndConnection._1.close()
14  zkClientAndConnection._2.close()
15  //Connect to HBase to retrieve last committed offsets
16  val conn = ConnectionFactory.createConnection(hbaseConf)
17  val table = conn.getTable(TableName.valueOf(hbaseTableName))
18  val startRow = TOPIC_NAME + ":" + GROUP_ID + ":" +
19                                              String.valueOf(System.currentTimeMillis())
20  val stopRow = TOPIC_NAME + ":" + GROUP_ID + ":" + 0
21  val scan = new Scan()
22  val scanner = table.getScanner(scan.setStartRow(startRow.getBytes).setStopRow(
23                                                   stopRow.getBytes).setReversed(true))
24  val result = scanner.next()
25  var hbaseNumberOfPartitionsForTopic = 0 //Set the number of partitions discovered for a topic in HBase to 0
26  if (result != null){
27  //If the result from hbase scanner is not null, set number of partitions from hbase
28  to the  number of cells
29    hbaseNumberOfPartitionsForTopic = result.listCells().size()
30  }
31val fromOffsets = collection.mutable.Map[TopicPartition,Long]()
32  if(hbaseNumberOfPartitionsForTopic == 0){
33    // initialize fromOffsets to beginning
34    for (partition <- 0 to zKNumberOfPartitionsForTopic-1){
35      fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> 0)
36    }
37  } else if(zKNumberOfPartitionsForTopic > hbaseNumberOfPartitionsForTopic){
38  // handle scenario where new partitions have been added to existing kafka topic
39    for (partition <- 0 to hbaseNumberOfPartitionsForTopic-1){
40      val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"),
41                                        Bytes.toBytes(partition.toString)))
42      fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> fromOffset.toLong)
43    }
44    for (partition <- hbaseNumberOfPartitionsForTopic to zKNumberOfPartitionsForTopic-1){
45      fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> 0)
46    }
47  } else {
48  //initialize fromOffsets from last run
49    for (partition <- 0 to hbaseNumberOfPartitionsForTopic-1 ){
50      val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"),
51                                        Bytes.toBytes(partition.toString)))
52      fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> fromOffset.toLong)
53    }
54  }
55  scanner.close()
56  conn.close()
57  fromOffsets.toMap
58}

當我們獲取到offsets之後我們就可以建立一個Kafka Direct DStream

1val fromOffsets= getLastCommittedOffsets(topic,consumerGroupID,hbaseTableName,zkQuorum,
2                                        zkKafkaRootDir,zkSessionTimeOut,zkConnectionTimeOut)
3val inputDStream = KafkaUtils.createDirectStream[String,String](ssc,PreferConsistent,
4                           Assign[String, String](fromOffsets.keys,kafkaParams,fromOffsets))

在完成本批次的資料處理之後呼叫saveOffsets()儲存offsets.

 1/*
2For each RDD in a DStream apply a map transformation that processes the message.
3*/

4inputDStream.foreachRDD((rdd,batchTime) => {
5  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
6  offsetRanges.foreach(offset => println(offset.topic,offset.partition, offset.fromOffset,
7                        offset.untilOffset))
8  val newRDD = rdd.map(message => processMessage(message))
9  newRDD.count()
10  saveOffsets(topic,consumerGroupID,offsetRanges,hbaseTableName,batchTime)
11})

你可以到HBase中去檢視不同topic和消費者組的offset資料

 1hbase(main):001:0> scan 'stream_kafka_offsets', {REVERSED => true}
2ROW                                                COLUMN+CELL
3 kafkablog2:groupid-1:1497628830000                column=offsets:0, timestamp=1497628832448, value=285
4 kafkablog2:groupid-1:1497628830000                column=offsets:1, timestamp=1497628832448, value=285
5 kafkablog2:groupid-1:1497628830000                column=offsets:2, timestamp=1497628832448, value=285
6 kafkablog2:groupid-1:1497628770000                column=offsets:0, timestamp=1497628773773, value=225
7 kafkablog2:groupid-1:1497628770000                column=offsets:1, timestamp=1497628773773, value=225
8 kafkablog2:groupid-1:1497628770000                column=offsets:2, timestamp=1497628773773, value=225
9 kafkablog1:groupid-2:1497628650000                column=offsets:0, timestamp=1497628653451, value=165
10 kafkablog1:groupid-2:1497628650000                column=offsets:1, timestamp=1497628653451, value=165
11 kafkablog1:groupid-2:1497628650000                column=offsets:2, timestamp=1497628653451, value=165
12 kafkablog1:groupid-1:1497628530000                column=offsets:0, timestamp=1497628533108, value=120
13 kafkablog1:groupid-1:1497628530000                column=offsets:1, timestamp=1497628533108, value=120
14 kafkablog1:groupid-1:1497628530000                column=offsets:2, timestamp=1497628533108, value=120
154 row(s) in 0.5030 seconds
16hbase(main):002:0>
17

程式碼示例用的以下的版本

640?wx_fmt=png

將offsets儲存到 ZooKeeper中

在Spark Streaming連線Kafka應用中使用Zookeeper來儲存offsets也是一種比較可靠的方式。

在這個方案中,Spark Streaming任務在啟動時會去Zookeeper中讀取每個分割槽的offsets。如果有新的分割槽出現,那麼他的offset將會設定在最開始的位置。在每批資料處理完之後,使用者需要可以選擇儲存已處理資料的一個offset或者最後一個offset。此外,新消費者將使用跟舊的Kafka 消費者API一樣的格式將offset儲存在ZooKeeper中。因此,任何追蹤或監控Zookeeper中Kafka Offset的工具仍然生效的。

初始化Zookeeper connection來從Zookeeper中獲取offsets

 1val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)
2val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false)
3def readOffsets(topics: Seq[String], groupId:String):
4 Map[TopicPartition, Long] = {
5 val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long]
6 val partitionMap = zkUtils.getPartitionsForTopics(topics)
7 // /consumers/<groupId>/offsets/<topic>/
8 partitionMap.foreach(topicPartitions => {
9   val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1)
10   topicPartitions._2.foreach(partition => {
11     val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition
12     try {
13       val offsetStatTuple = zkUtils.readData(offsetPath)
14       if (offsetStatTuple != null) {
15         LOGGER.info("retrieving offset details - topic: {}, partition: {}, offset: {}, node path: {}", Seq[AnyRef](topicPartitions._1, partition.toString, offsetStatTuple._1, offsetPath): _*)
16         topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)),
17           offsetStatTuple._1.toLong)
18       }
19     } catch {
20       case e: Exception =>
21         LOGGER.warn("retrieving offset details - no previous node exists:" + " {}, topic: {}, partition: {}, node path: {}", Seq[AnyRef](e.getMessage, topicPartitions._1, partition.toString, offsetPath): _*)
22         topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), 0L)
23     }
24   })
25 })
26 topicPartOffsetMap.toMap
27}

使用獲取到的offsets來初始化Kafka Direct DStream

1val inputDStream = KafkaUtils.createDirectStream(ssc, PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics, kafkaParams, fromOffsets))

下面是從ZooKeeper獲取一組offsets的方法

注意: Kafka offset在ZooKeeper中的儲存路徑為/consumers/[groupId]/offsets/topic/[partitionId], 儲存的值為offset

 1def persistOffsets(offsets: Seq[OffsetRange], groupId: String, storeEndOffset: Boolean): Unit = {
2 offsets.foreach(or => {
3   val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic);
4   val acls = new ListBuffer[ACL]()
5   val acl = new ACL
6   acl.setId(ANYONE_ID_UNSAFE)

相關推薦

Spark Streaming 管理 Kafka Offsets方式

本文轉載自:https://www.jianshu.com/p/ef3f15cf400d(點選下

spark streaming維護kafka偏移量到外部介質

.exe topic _each keys off exec lose eat comm spark streaming中維護kafka偏移量到外部介質 以kafka偏移量維護到redis為例。 redis存儲格式 使用的數據結構為string,其中key為topic:

Spring在代碼獲取bean的方式(轉)

mxml get text spa 回調 獲取對象 ati -s null 獲取spring中bean的方式總結: 方法一:在初始化時保存ApplicationContext對象 1 ApplicationContext ac = new FileSystemXml

selenium xpath定位的方式

utf 文本 webdriver 元素 ins send 失敗 ima ive #-*-coding:utf-8-*-from selenium import webdriverfrom time import sleepfrom selenium.webdriver.co

javascript實現繼承的方式

eat 共享 all cto 原型 構造 child 構造函數 java javascript中實現繼承的幾種方式 1、借用構造函數實現繼承 function Parent1(){ this.name = "parent1" } function Child1()

在Spring依賴注入的方式實現鬆耦合

一、普通注入方式: (1)在IDEA工作空間內先建立lib包然後匯入Spring的架包(複製進去的架包要按住滑鼠右鍵出現Add as Library)。 (2)在已經建立好架包的基礎上在src目錄下建立XML檔案,檔案命為applicationContext.xml,需要注意的是我們建

C#訪問Lua的table的方式

C#訪問Lua中的全域性變數 C#程式碼 luaEnv = new LuaEnv(); luaEnv.DoString("require 'CSharpCallLua'"); //獲取Lua中的全域性變數 string name = luaEnv.Global.Get<s

Es6 快速複製陣列方式

1、第一種通過for迴圈方式 var arr = [1,2,3] var arr2 = []; for(var i =0; i<arr.length; i++){ arr2[i] = arr[i] } arr.push(4) arr2.pop() console.log(arr

react資訊傳遞的方式

1.父傳子 父親掛載一個屬性 兒子通過this.props.屬性接收 2.子傳父  父親掛載一個方法 兒子通過this.props呼叫這個方法並傳遞需要傳遞的引數 父親然後接收 3.路由傳參        &nb

Spring屬性注入的方式以及複雜屬性的注入

在Spring框架中,屬性的注入我們有多種方式,我們可以通過構造方法注入,可以通過set方法注入,也可以通過p名稱空間注入,方式多種多樣,對於複雜的資料型別比如物件、陣列、List集合、map集合、Properties等,我們也都有相應的注入方式。  OK,接下來我們就來看看

Linux系統安裝軟體的方式

目錄 紅帽派: Linux有很多種發行版本,各種發行版本之間安裝軟體方式和命令不一樣,同一個版本之間安裝軟體也有不同的方法。但是,大體來說,Linux有兩大派系,一個是紅帽派系,包含Redhat、Centos、Fedora等。還有一個是Debian

React圖片引入的方式

方式1 import tsIcon from '../images/typescript.jpeg'; 方式2 const tsIcon = require('../images/typescript

spring給容器註冊元件的方式,1.包掃描+元件標註註解[email protected](

              給容器中註冊元件;        1)、包掃描+元件標註註解(@Controller/@Service/@Repository/@Component)[

簡要描述 JavaScript 定義函式的方式

JavaScript 中,有三種定義函式的方式: 1、函式語句:即使用 function 關鍵字顯式定義函式。如: function f(x){ return x+1; } 2、函式定義表示式

Vue專案跨域的方式

經常使用vue + webpack搭建專案,但在請求某些json資料時存在跨域問題,此時有幾種修改方法 1. 修改後臺header, 但如果只是請求外部資料,是沒法修改後臺配置的 1 header('Access-Control-Allow-Origin:*');//允許所有來源

stm32CubeMX HAL庫延時的方式解析

/* * 本檔案包括四種延時方式: * 1. 原來的HAL庫函式HAL_Delay() 2. 採用定時器2設定延時函式 3. 採用系統滴答中斷方式的ms和us級延時 * 4. 採用系統滴答非中斷方式的ms和us級延時(在一次計數值範圍內的延時) */ /* Includes ----

Spring配置資料來源的方式

無論使用什麼形式的Spring DAO支援類,都需要配置資料來源的引用。Spring提供了多個選項,用於在Spring程式裡配置資料庫,其中包括: 1,由JDBC驅動程式定義的資料來源。 2,由JNDI查詢的資料來源。 3,連線池的資料來源。 在Spring裡,我們可以像使用其他

Shell整數計算的方式

在Shell中可以使用下列方式來做整數的計算(+,-,*,/) 方式一: linux:~ # A=1 linux:~ # B=2 linux:~ # C=$(($A+$B)) linux:~ # echo $C 3 方式二: linux:~ # A=1 linux:~ # B=2

java之在普通程式碼獲取bean的方式

普通程式碼中獲取bean的幾種方式 最近在專案中,因程式碼模式要求,需要在普通類中去主動呼叫bean例項,經過參考分析,做如下的整理。 在初始化時儲存ApplicationContext物件 通過Spring提供的utils類獲取ApplicationC

Java建立物件的方式

Java中建立物件的五種方式:   作為java開發者,我們每天建立很多物件,但是我們通常使用依賴注入的方式管理系統,比如:Spring去建立物件,然而這裡有很多建立物件的方法:使用New關鍵字、使用Class類的newInstance方法、使用Constructor類的newInstance方法、使用Cl