1. 程式人生 > >Sparkstreaming and Kafka

Sparkstreaming and Kafka

表達 bool 描述 優先 類型 java ext mat 項目管理

簡介

Kafka 0.10的Spark Streaming集成設計與0.8 Direct Stream方法類似。 它提供了簡單的並行性,Kafka分區和Spark分區之間的1:1對應關系,以及對偏移量和元數據的訪問。 但是,由於較新的集成使用新的Kafka消費者API而不是簡單的API,所以在使用上存在顯著差異。 這個版本的集成被標記為實驗,所以API可能會有變化。

LINK(依賴)

對於使用SBT / Maven項目定義的Scala / Java應用程序,請將您的流應用程序與以下工件鏈接起來。

1 groupId = org.apache.spark
2 artifactId = spark-streaming-kafka-0-10_2.11
3 version = 2.2.0

不要在org.apache.kafka構件上手動添加依賴項(例如kafka-clients)。 spark-streaming-kafka-0-10已經具有適當的傳遞依賴性,不同的版本可能在診斷的方式上不兼容。

Creating a Direct Stream

請註意,導入的名稱空間包含版本org.apache.spark.streaming.kafka010

 1 import org.apache.kafka.clients.consumer.ConsumerRecord
 2 import org.apache.kafka.common.serialization.StringDeserializer
3 import org.apache.spark.streaming.kafka010._ 4 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 5 import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 6 7 val kafkaParams = Map[String, Object]( 8 "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
9 "key.deserializer" -> classOf[StringDeserializer], 10 "value.deserializer" -> classOf[StringDeserializer], 11 "group.id" -> "use_a_separate_group_id_for_each_stream", 12 "auto.offset.reset" -> "latest", 13 "enable.auto.commit" -> (false: java.lang.Boolean) 14 ) 15 16 val topics = Array("topicA", "topicB") 17 val stream = KafkaUtils.createDirectStream[String, String]( 18 streamingContext, 19 PreferConsistent, 20 Subscribe[String, String](topics, kafkaParams) 21 ) 22 23 stream.map(record => (record.key, record.value))

流中的每個項目都是一個ConsumerRecord(消費者記錄)

有關可能的kafkaParams,請參閱Kafka使用者配置文檔。 如果您的Spark批處理持續時間大於默認Kafka心跳會話超時(30秒),請適當增加heartbeat.interval.ms和session.timeout.ms。 對於大於5分鐘的批次,這將需要更改代理上的group.max.session.timeout.ms。 請註意,該示例將enable.auto.commit設置為false,有關討論,請參閱下面的“存儲偏移量”。

LocationStrategies(位置策略)

新的Kafka消費者API將預取消息到緩沖區中。因此,性能方面的原因是Spark集成將緩存的消費者保留在執行者上(而不是為每個批次重新創建它們),並且傾向於在具有相應使用者的主機位置上調度分區。

在大多數情況下,您應該使用LocationStrategies.PreferConsistent,如上所示。這會將分區平均分配給可用的執行者。如果您的執行者(executors )與您的Kafka經紀人(brokers)位於同一個主機上,請使用PreferBrokers,它將優先為該分區的Kafka領導安排分區。最後,如果分區間負載存在明顯偏移,請使用PreferFixed。這允許您指定分區到主機的明確映射(任何未指定的分區將使用一致的位置)。

消費者緩存的默認最大大小為64.如果您希望處理多於(執行者數量為64 *)的Kafka分區,則可以通過spark.streaming.kafka.consumer.cache.maxCapacity更改此設置。

如果您想禁用Kafka使用者的緩存,則可以將spark.streaming.kafka.consumer.cache.enabled設置為false。可能需要禁用緩存來解決SPARK-19185中描述的問題。 SPARK-19185解決後,該屬性可能會在更高版本的Spark中刪除。

緩存由topicpartition和group.id鍵入,因此每次調用createDirectStream時都要使用一個單獨的group.id。

ConsumerStrategies(消費者策略)

新的Kafka消費者API有許多不同的方式來指定主題,其中一些需要大量的後對象實例化設置。 ConsumerStrategies提供了一個抽象,允許Spark從檢查點重新啟動後即可獲得正確配置的使用者。

ConsumerStrategies.Subscribe,如上所示,允許您訂閱固定的主題集合。 SubscribePattern允許您使用正則表達式來指定感興趣的主題。 請註意,與0.8集成不同,使用Subscribe或SubscribePattern應該在正在運行的流中添加分區。 最後,Assign允許你指定一個固定的分區集合。 所有這三種策略都有重載的構造函數,允許您指定特定分區的起始偏移量。

如果您具有以上選項不能滿足的特定消費者設置需求,則ConsumerStrategy是可以擴展的公共類。

Creating an RDD

如果您有一個更適合批處理的用例,則可以為已定義的偏移範圍創建一個RDD。

1 // Import dependencies and create kafka params as in Create Direct Stream above
2 
3 val offsetRanges = Array(
4   // topic, partition, inclusive starting offset, exclusive ending offset
5   OffsetRange("test", 0, 0, 100),
6   OffsetRange("test", 1, 0, 100)
7 )
8 
9 val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)

請註意,您不能使用PreferBrokers,因為如果沒有流,則不存在驅動程序方面的消費者為您自動查找代理元數據。 如有必要,請使用PreferFixed與您自己的元數據查找。

Obtaining Offsets

1 stream.foreachRDD { rdd =>
2   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
3   rdd.foreachPartition { iter =>
4     val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
5     println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
6   }
7 }

請註意,HasOffsetRanges的類型轉換只會在createDirectStream的結果中調用的第一個方法中完成,而不是稍後向下的一系列方法。 請註意,RDD分區與Kafka分區之間的一對一映射在任何混洗或重新分區的方法(例如, reduceByKey()或window()。

Storing Offsets

Kafka交付語義在失敗的情況下取決於如何以及何時存儲偏移量。 spark輸出操作至少一次。 所以,如果你想要相當於一次的語義,你必須在冪等輸出之後存儲偏移量,或者在輸出中存儲原子事務的偏移量。 通過這種集成,您可以選擇3個選項,以提高可靠性(和代碼復雜度),以便如何存儲偏移量。

Checkpoints

如果啟用了Spark檢查點,偏移量將被存儲在檢查點中。 這很容易啟用,但也有缺點。 你的輸出操作必須是冪等的,因為你將得到重復的輸出; 轉換不是一種選擇。 此外,如果應用程序代碼已更改,則無法從檢查點恢復。 對於計劃升級,可以通過在舊代碼的同時運行新代碼來緩解這種情況(因為無論如何輸出需要是冪等的,它們不應該發生沖突)。 但是對於需要更改代碼的意外故障,除非您有另外的方法來識別已知的良好起始偏移量,否則將會丟失數據。

Kafka itself

Kafka有一個偏移提交API,用於在特殊的Kafka主題中存儲偏移量。 默認情況下,新的使用者將定期自動提交偏移量。 這幾乎肯定不是你想要的,因為由消費者成功輪詢的消息可能還沒有導致Spark輸出操作,導致未定義的語義。 這就是為什麽上面的流示例將“enable.auto.commit”設置為false的原因。 但是,在知道輸出已存儲之後,可以使用commitAsync API將偏移量提交給Kafka。 與檢查點相比,好處在於,無論應用程序代碼如何變化,Kafka都是耐用的商店。 然而,Kafka不是轉換型的,所以你的輸出仍然是冪等的。

1 stream.foreachRDD { rdd =>
2   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
3 
4   // some time later, after outputs have completed
5   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
6 }

和HasOffsetRanges一樣,如果調用createDirectStream的結果,而不是在轉換之後,轉換為CanCommitOffsets將會成功。 commitAsync調用是線程安全的,但是如果你想要有意義的語義,則必須在輸出之後進行。

Your own data store

對於支持事務的數據存儲,即使在出現故障的情況下,也可以在同一個事務中保存偏移量,以保持兩者同步。 如果仔細檢測重復或跳過的偏移量範圍,回滾事務將防止重復或丟失的消息影響結果。 這給出了相當於一次的語義。 甚至可以使用這種策略,即使對於通常難以產生冪等性的聚合產生的輸出也是如此。

 1 // The details depend on your data store, but the general idea looks like this
 2 
 3 // begin from the the offsets committed to the database
 4 val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
 5   new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
 6 }.toMap
 7 
 8 val stream = KafkaUtils.createDirectStream[String, String](
 9   streamingContext,
10   PreferConsistent,
11   Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
12 )
13 
14 stream.foreachRDD { rdd =>
15   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
16 
17   val results = yourCalculation(rdd)
18 
19   // begin your transaction
20 
21   // update results
22   // update offsets where the end of existing offsets matches the beginning of this batch of offsets
23   // assert that offsets were updated correctly
24 
25   // end your transaction
26 }

SSL / TLS

新的Kafka使用者支持SSL。 要啟用它,請在傳遞給createDirectStream / createRDD之前適當地設置kafkaParams。 請註意,這僅適用於Spark和Kafkabroker之間的溝通; 您仍負責單獨確保Spark節點間通信。

1 val kafkaParams = Map[String, Object](
2   // the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
3   "security.protocol" -> "SSL",
4   "ssl.truststore.location" -> "/some-directory/kafka.client.truststore.jks",
5   "ssl.truststore.password" -> "test1234",
6   "ssl.keystore.location" -> "/some-directory/kafka.client.keystore.jks",
7   "ssl.keystore.password" -> "test1234",
8   "ssl.key.password" -> "test1234"
9 )

Deploying

與任何Spark應用程序一樣,spark-submit用於啟動您的應用程序。

對於Scala和Java應用程序,如果您使用SBT或Maven進行項目管理,請將spark-streaming-kafka-0-10_2.11及其依賴項打包到應用程序JAR中。 確保spark-core_2.11和spark-streaming_2.11被標記為提供的依賴關系,因為這些已經存在於Spark安裝中。 然後使用spark-submit啟動您的應用程序(請參閱主編程指南中的部署)。

 

Sparkstreaming and Kafka