1. 程式人生 > 實用技巧 >spark-streaming與kafka的整合

spark-streaming與kafka的整合

1. 概述

在2.x中,spark有兩個用來與kafka整合的程式碼,版本代號為0.80.10,由於在0.8,kafka有兩套消費者api,根據高階api得到了Receiver-based Approach,根據低階api得到了Direct Approach,而在0.10由於kafka只有一套消費者api了,所以也只有Direct Approach

2. Direct Approach

由於0.80.10Direct Approach的實現差不多,這裡使用0.10來進行描述。該途徑由DirectKafkaInputDStream實現

private[spark] class DirectKafkaInputDStream[K, V](
    _ssc: StreamingContext,
    locationStrategy: LocationStrategy,
    consumerStrategy: ConsumerStrategy[K, V],
    ppc: PerPartitionConfig
  ) 

其中,LocationStrategy表示的是分配topic partition的consumer到executor的策略,有3個取值

  1. PreferConsistent: 均勻的分佈在executor中。原理是使用TopicPartition的hash值來決定到哪個可用的executor上執行
  2. PreferBrokers:偏向於將consumer分配到指定的topic partition所在的leader上;如果leader集中,會造成負載不均。如果可用的executor中沒有leader所在的主機,使用 PreferConsistent
  3. PreferFixed: 指定某些topic partition的consumer分配到指定的機器上,沒有指定的topic partition使用PreferConsistent

ConsumerStrategy用來指定消費的topic partition,也有三個子類來表示不同的策略:

  1. Subscribe:精確指定用來消費的topic
  2. SubscribePattern:使用正則表示式來指定消費的topic
  3. Assign: 精確指定用來消費的topic partition。上述2個都是消費topic的所有分割槽,這個可以選擇性的消費部分分割槽

PerPartitionConfig只有一個實現類DefaultPerPartitionConfig,用來執行反壓,控制每個分割槽的消費速率

2.1 執行過程

分為2步,一是DirectKafkaInputDStream計算每個分割槽及其消費開始的offset與結束的offset(不包含),二是將上述資訊封裝到KafkaRDD

中消費每個topic partition,一個topic partition對應一個task

2.2 常用的配置

配置分為兩種,一種是傳給ConsumerStrategy,為Kafka Consumer相關的配置,還有一種是傳給SparkConf的。
傳給ConsumerStrategykafkaParams

變數名描述其它
heartbeat.interval.ms 客戶端與broker的心跳間隔 不小於batchDuration,在有window操作時,要不小於slideDuration
session.timeout.ms 在該時間內,如果收不到客戶端心跳,會移除該客戶端 推薦3 * heartbeat.interval.ms,但要在範圍(group.min.session.timeout.ms, group.max.session.timeout.ms]
metadata.broker.list kafka叢集的broker列表,用來獲取整個叢集拓撲,不用完整 等價於bootstrap.servers
enable.auto.commit 是否自動提交消費的offset 在executor端,為false,通常也要設定為false
auto.commit.interval.ms 自動提交消費的offset的週期 預設為5s
group.id 消費組的id 儘量設定
auto.offset.reset 當指定消費的offset不存在時,採取的行為 在executor端為none
receive.buffer.bytes The size of the TCP receive buffer (SO_RCVBUF) to use when reading data 預設65536,設定的時候要大於該值

注意上述的預設值要根據使用的版本來確定。
傳給SparkConf

變數名描述其它
spark.streaming.kafka.consumer.poll.ms 消費者獲取訊息等待的最大時間 預設值spark.network.timeout
spark.streaming.kafka.allowNonConsecutiveOffsets 是否允許消費的offset不連續,通常都是連續的,除非中間的message被刪掉了 預設值false
spark.streaming.kafka.consumer.cache.enabled 在executor端將消費指定topic partition的consumer快取起來,這個是executor級別的快取,如果沒有使用動態資源分配,可以用來重用consumer 預設值true
spark.streaming.kafka.consumer.cache.maxCapacity 每個executor上能快取的consumer的最大值 預設值64
spark.streaming.kafka.consumer.cache.minCapacity 每個executor上能快取的consumer的最大值 預設值16
2.3 at-least-once與 exactly-once語義

使用者有三種方式來儲存每個分割槽消費的offset,分別是checkpointkafka儲存使用者自己處理

  1. checkpoint:driver端的容錯,最不靠譜,由於任務執行與checkpoint是非同步執行的,如果訊息沒有消費完程式報錯了,但是已經checkpoint,恢復後則會丟失資料
  1. kafka儲存:設定enable.auto.commit為false,需要呼叫在業務邏輯後加上如下程式碼。下面的程式碼注入的offsetRanges會在下次生成任務的時候被提交。這種方式不會丟資料,當輸出端不是冪等的,就只能保證at-least-once,因為輸出與儲存offset不是一個事務
val kafkaDStream = KafkaUtils.createDirectStream(...)
// 寫法一開始
// 一大堆流式處理程式碼
// 最後
kafkaDStream .foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offstietRanges
  kafkaDStream .asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
// 寫法一結束
// 寫法二
kafkaDStream .transform{ rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offstietRanges
  kafkaDStream .asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  rdd
}.一大堆流式處理程式碼
  1. 使用者自己處理,當輸出端不冪等,但是支援事務時,將儲存offset與輸出封裝到一個事務中可以實現exactly-once,官方文件streaming-kafka-0-10-integration裡提供一種寫法,筆者在這裡也提供一種,供讀者參考
// 故障恢復
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
  new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap

val stream = KafkaUtils.createDirectStream(...)
var offsetBroadCast: Broadcast[Array[OffsetRange]] = null
    messages.transform(rdd => {
      val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      offsetBroadCast = ssc.sparkContext.broadcast(ranges)
      rdd
    }).map(t => (t.key(), t.value())).foreachRDD(rdd => {
      val partitions = rdd.getNumPartitions
      val offset = offsetBroadCast.value
      rdd.mapPartitionsWithIndex((mapId, it) => Iterator((mapId, it)))
        .foreachPartition(it => {
          val (mapId, realIt) = it.buffered.head
          val commitOffRange = offset(mapid)
          // 開啟事務
          // 用 realIt 更新結果
          // 用 commitOffRange 更新對應分割槽的offset
          // 提交事務
        })
        offsetBroadCast.destroy(true)
    })

3. 與receiver-based的差別

這裡的receiver指的是ReliableKafkaReceiver並且開啟了WAL,如果不開啟WAL則可能會導致資料丟失,儲存資料的過程如下:

  1. 新增到BlockGenerator的buffer中
  2. 將buffer中的資料寫入到WAL,寫入到BlockManager
  3. 通知給ReceiverTracker,並把元資料寫入到WAL
  4. 更新offset到zookeeper

整個過程能保證資料不丟,但是隻能保證at-least-once語義。差別如下:

  1. receiver-based需要儲存一份資料,kafka裡面也有一份,這樣的資料冗餘沒有必要
  2. Direct approach少了receiver,整個架構更簡單
  3. receiver-based的topic partition offset只能由zookeeper管理,Direct approach更靈活