Apache Spark 2.4 內建的 Avro 資料來源實戰
文章目錄
Apache Avro 是一種流行的資料序列化格式。它廣泛用於 Apache Spark 和 Apache Hadoop 生態系統,尤其適用於基於 Kafka 的資料管道。從 Apache Spark 2.4 版本開始(參見 Apache Spark 2.4 正式釋出,重要功能詳細介紹),Spark 為讀取和寫入 Avro 資料提供內建支援。新的內建 spark-avro 模組最初來自 Databricks 的開源專案
-
新函式 from_avro() 和 to_avro() 用於在 DataFrame 中讀取和寫入 Avro 資料,而不僅僅是檔案。
-
支援 Avro 邏輯型別(logical types),包括 Decimal,Timestamp 和 Date型別。
-
2倍讀取吞吐量提高和10%寫入吞吐量提升。
本文將通過示例介紹上面的每個功能。
載入和儲存函式
在 Apache Spark 2.4 中,為了讀寫 Avro 格式的資料,你只需在 DataFrameReader 和 DataFrameWriter 中將檔案格式指定為“avro”即可。其用法和其他資料來源用法很類似。如下所示:
val iteblogDF = spark.read.format("avro").load("examples/src/main/resources/iteblog.avro")
iteblogDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
from_avro() 和 to_avro() 的使用
為了進一步簡化資料轉換流程(transformation pipeline),社群引入了兩個新的內建函式:from_avro() 和 to_avro()。Avro 通常用於序列化/反序列化基於 Apache Kafka 的資料管道中的訊息或資料,在讀取或寫入 Kafka 時,將 Avro records 作為列將非常有用。每個 Kafka 鍵值記錄都會增加一些元資料,例如 Kafka 的攝取時間戳,Kafka 的偏移量等。
在以下三種場景,from_avro() 和 to_avro() 函式將非常有用:
- 當使用 Spark 從 Kafka 中讀取 Avro 格式的資料,可以使用 from_avro() 函式來抽取你要的資料,清理資料並對其進行轉換。
- 當你想要將 structs 格式的資料轉換為 Avro 二進位制記錄,然後將它們傳送到 Kafka 或寫入到檔案,你可以使用 to_avro()。
- 如果你需要將多個列重新編碼為單個列,請使用to_avro().
目前這兩個函式僅在 Scala 和 Java 語言中可用。from_avro 和 to_avro 函式的使用除了需要人為指定 Avro schema,其他的和使用 from_json 和 to_json 函式一樣,下面是這兩個函式的使用示例。
在程式碼裡面指定 Avro 模式
import org.apache.spark.sql.avro._
import org.apache.avro.SchemaBuilder
val servers = "www.iteblog.com:9092"
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
// Convert structured data to binary from string (key column) and
// int (value column) and save them to a Kafka topic.
iteblogDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
通過 Schema Registry 服務提供 Avro 模式
如果我們有 Schema Registry 服務,那麼我們就不需要在程式碼裡面指定 Avro 模式了,如下:
import org.apache.spark.sql.avro._
// Read a Kafka topic "t", assuming the key and value are already
// registered in Schema Registry as subjects "t-key" and "t-value" of type
// string and int. The binary key and value columns are turned into string
// and int type with Avro and Schema Registry. The schema of the resulting DataFrame
// is: <key: string, value: int>.
val schemaRegistryAddr = "https://www.iteblog.com"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr).as("value"))
// Given that key and value columns are registered in Schema Registry, convert
// structured data of key and value columns to Avro binary data by reading the schema
// info from the Schema Registry. The converted data is saved to Kafka as a Kafka topic "t".
iteblogDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
通過檔案設定 Avro 模式
我們還可以將 Avro 模式寫入到檔案裡面,然後在程式碼裡面讀取模式檔案:
import org.apache.spark.sql.avro._
// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "iteblog1")
.load()
// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
.select(from_avro('value, jsonFormatSchema) as 'user)
.where("user.favorite_color == \"red\"")
.select(to_avro($"user.name") as 'value)
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "iteblog2")
.start()
與 Databricks spark-avro的相容性
因為 Spark 內建對讀寫 Avro 資料的支援是從 Spark 2.4 才引入的,所以在這些版本之前,可能有使用者已經使用了 Databricks 開源的 spark-avro。但是不用急,內建的 spark-avro 模組和這個是完全相容的。我們僅僅需要將之前引入的 com.databricks.spark.avro 修改成 org.apache.spark.sql.avro._ 即可。
效能測試
基於 SPARK-24800 的優化,內建 Avro 資料來源讀寫 Avro 檔案的效能得到很大提升。社群在這方面進行了相關的基準測試,結果表明,在1百萬行的資料(包含 Int/Double/String/Map/Array/Struct 等各種資料格式)測試中,讀取的效能提升了2倍,寫的效能提升了8%。基準測試的程式碼可參見 這裡,測試比較如下:
結論
內建的 spark-avro 模組為 Spark SQL 和 Structured Streaming 提供了更好的使用者體驗以及 IO 效能。
轉載自過往記憶(https://www.iteblog.com/)
本文連結: 【Apache Spark 2.4 內建的 Avro 資料來源介紹】(https://www.iteblog.com/archives/2476.html)