1. 程式人生 > >Structured Streaming整合Kafka《官方文件翻譯》

Structured Streaming整合Kafka《官方文件翻譯》

目錄

1. 連結

1. 連結

groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.11
version = 2.4.0

2. 從Kafka讀資料

2.1 從流查詢建立Kafka資料來源

// 訂閱一個主題
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "idayan00:9092,idayan01:9092,idayan02:9092")
  .option("subscribe", "topicA")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

//  訂閱多個主題
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "idayan00:9092,idayan01:9092,idayan02:9092")
  .option("subscribe", "topicA,topicA")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// 訂閱通過正則匹配的主題
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "idayan00:9092,idayan01:9092,idayan02:9092")
  .option("subscribePattern", "topic*")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

2.2 從批查詢Kafka資料來源(spark.readStream變成了spark.read)

// 訂閱一個主題, 預設使用最早、最新的偏移量
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:9092,host2:9092")
  .option("subscribe", "topicA")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// 訂閱多個主題,, 指定明確的 Kafka 偏移量
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:9092,host2:9092")
  .option("subscribe", "topicA,topicA")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// 訂閱通過正則匹配的主題, 使用最早、最新的偏移量
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

資料來源中各欄位模式:

Column Type
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

Kafka source必需的引數如下:

Option value meaning
assign JSON字串:{"topicA":[0,1],"topicB":[2,4]} 指定主題分割槽消費。只有“assgin”,"subscribe"或"subscribePattern"當中的一個可以指定給Kafka source,
subscribe 逗號分隔的主題列表 需要訂閱的主題。只有“assgin”,"subscribe"或"subscribePattern"當中的一個可以指定給Kafka source,
subscribePattern Java正則字串 用來訂閱主題的正則表示式。只有“assgin”,"subscribe"或"subscribePattern"當中的一個可以指定給Kafka source,
kafka.bootstrap.servers 逗號分隔的主機埠列表 Kafka的"bootstrap.servers"配置

下述配置是可選的:

Option value default query type meaning
startingOffsets "earliest", "latest" (streaming only), or json string """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """  "latest" for streaming, "earliest" for batch streaming and batch 查詢的起點。“earliest”是從最早的offset起,“latest”最新的offset.也可使用json字串來指定每個主題分割槽的起點和結束點。在json中 -2代表最早,-1代表最新。在批查詢中latest不可用,流查詢中,只有啟動新查詢時會使用這個值,並在下一次查詢中重新獲取(上一次查詢的結束點)。新發現的分割槽會從最早點開始
endingOffsets latest or json string {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}  latest batch query 批查詢的結束點。...
failOnDataLoss true or false true streaming query 當資料可能丟失時查詢是否會失敗。這可作為失敗預警,當它不能按預期工作時可以禁用掉。批查詢時,如果因為資料丟失而獲取不到資料,則查詢總是會失敗
kafkaConsumer.pollTimeoutMs long 512 streaming and batch 檢測executor中kafka資料的超時時間,單位毫秒
fetchOffset.numRetries int 3 streaming and batch 放棄獲取kafka offset前嘗試的次數
fetchOffset.retryIntervalMs long 10 streaming and batch 重試獲取kfaka offset前的等待時間
maxOffsetsPerTrigger long none streaming and batch 每個trigger間隔處理的最大offset數的比率。指定的offset總數會被成比例的分割到不同容量的主題分割槽去。

3. 向Kafka寫資料

這裡我們討論如何將流查詢和批查詢結果寫出到Apache Kafka.需要注意的是Apache Kafka只支援最少一次的寫演算法。所以, 當寫到Kafka時(不管是流查詢還是批查詢),有些訊息可能會重複。這是可能發生的,例如,kafka需要重試沒有被Broker確認的資訊時,即使Broker已經接收到並且寫入了這個訊息。因為kafka的這個寫演算法,Structured Streaming 不能保證這樣的重複不發生。不過,如果寫操作成功,你可以認為這個查詢結果最少被寫出了一次。一個在讀取這些資料時去除重的方案是引入一個唯一key。

    寫出到Kafka的DataFrame應該有下列結構:

Column Type
key(可選) string或 binary
value(必須) string 或 binary
topic(*option) string

* 如果主題的配置選項沒有指定,那麼topic這一列是必須的。

    value 列是唯一一個必須的。如果key 列沒有指定,會預設指定為null。如果主題列不存在,那麼他的值會被用作主題寫給kafka,除非設定了主題配置。主題配置會覆蓋主題列。

Kafka sink 必須配置下列選項:

option value meaning
kafka.bootstrap.servers 逗號分隔的主機:埠列表 Kafka "bootstrap.servers" 配置

下述引數是可選的:

Option value default query type meaning
topic string none streaming and batch 設定寫出的kafka 主題,這個配置將覆蓋資料中的主題列配置

3.1 建立流查詢Kafka Sink

// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
val ds = df
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()

3.2 建立批查詢Kafka Sink

/ Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save()

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save()

4 Kafka 特有引數配置

Kafka特有的配置可以通過DataStreamReader.option的kafka.字首配置。例如 stream.option("kafka.bootstrap.servers", "host:port"),kafka可用的引數可以參見kafka生產者/消費者引數配置文件。
    請注意,下述引數不可以設定,否則kafka source 或sink將會丟擲異常:

  • group.id: Kafka Source會為每個查詢建立一個唯一的group id.
  • auto.offset.reset: 設為source選項 startingOffsets為指定offset.
  • key.deserializer: key總是使用ByteArrayDeserializer反序列化為位元組陣列。
  • value.deserializer:value總是使用ByteArrayDeserializer反序列化為位元組陣列。
  • key.serializer: key總是使用ByteArraySerializer 或StringSerializer序列化。
  • value.serializer:value 總是使用ByteArraySerializer 或StringSerializer序列化。
  • enable.auto.commit: Kafka source 不提交任何offset
  • interceptor.classes: kafka source將values讀取做位元組陣列。使用ConsumerInterceptor可能會破壞查詢,因此是不安全的。