Spark 2.4 正式釋出,重要功能詳細介紹
美國時間 2018年11月08日 正式釋出了。一如既往,為了繼續實現 Spark 更快,更輕鬆,更智慧的目標,Spark 2.4帶來了許多新功能,如下:
- 新增一種支援屏障模式(barrier mode)的排程器,以便與基於MPI的程式更好地整合,例如, 分散式深度學習框架;
- 引入了許多內建的高階函式,以便更容易處理複雜的資料型別(比如陣列和 map);
- 開始支援 Scala 2.12;
- 允許我們對 notebooks 中的 DataFrame 進行熱切求值(eager evaluation),以便於除錯和排除故障;
- 引入新的內建 Avro 資料來源。
除了這些新功能外,該版本還重點關注可用性,穩定性和優化,解決了超過1000 個tickets。 Spark 貢獻者的其他顯著特徵包括:
- 消除 2GB 塊大小的限制 [SPARK-24296, SPARK-24307]
- 提升 Pandas UDF [SPARK-22274, SPARK-22239, SPARK-24624]
- 圖片模式資料來源(Image schema data source )[SPARK-22666]
- Spark SQL 加強[SPARK-23803, SPARK-4502, SPARK-24035,
- 內建檔案源改進 [SPARK-23456, SPARK-24576, SPARK-25419, SPARK-23972, SPARK-19018, SPARK-24244]
- Kubernetes 整合加強 [SPARK-23984, SPARK-23146]
簡要總結了一些更高級別的功能和改進。 有關 Spark 所有元件和 JIRA 已解決的主要功能的完整列表,請閱讀 Apache Spark 2.4.0 release notes。
Barrier Execution Mode
Barrier Execution Mode 是 Project Hydrogen 的一部分,這是 Apache Spark 的一項計劃,旨在將最先進的大資料和 AI 技術結合在一起。它可以將來自 AI 框架的分散式訓練作業正確地嵌入到 Spark 作業中。我們通常會像 All-Reduce 這樣來探討複雜通訊模式(complex communication patterns),因此所有的任務都需要同時執行。這不符合 Spark 當前使用的 MapReduce 模式 。使用這種新的執行模式,Spark 同時啟動所有訓練任務(例如,MPI 任務),並在任務失敗時重新啟動所有任務。Spark 還為屏障(barrier tasks)任務引入了一種新的容錯機制。當任何障礙任務在中間失敗時,Spark 將中止所有任務並重新啟動當前 stage。
內建高階函式
在 Spark 2.4 之前,為了直接操作複雜型別(例如陣列型別),有兩種典型的解決方案:
- 將巢狀結構展開為多行,並應用某些函式,然後再次建立結構;
- 編寫使用者自定義函式(UDF)。
新的內建函式可以直接操作複雜型別,高階函式可以使用匿名 lambda 函式直接操作複雜值,類似於UDF,但具有更好的效能。比如以下兩個高階函式:
|
關於內建函式和高階函式的進一步說明可以參見《Apache Spark 2.4 中解決複雜資料型別的內建函式和高階函式介紹》和《Apache Spark 2.4 新增內建函式和高階函式使用介紹》
內建 Avro 資料來源
Apache Avro 是一種流行的資料序列化格式。它廣泛用於 Apache Spark 和 Apache Hadoop 生態系統,尤其適用於基於 Kafka 的資料管道。從 Apache Spark 2.4 版本開始,Spark 為讀取和寫入 Avro 資料提供內建支援。新的內建 spark-avro 模組最初來自 Databricks 的開源專案Avro Data Source for Apache Spark。除此之外,它還提供以下功能:
- 新函式 from_avro() 和 to_avro() 用於在 DataFrame 中讀取和寫入 Avro 資料,而不僅僅是檔案。
- 支援 Avro 邏輯型別(logical types),包括 Decimal,Timestamp 和 Date型別。Spark SQL 和 Avro 的資料型別之間的轉換可以參見下面:
Spark SQL type Avro type Avro logical type ByteType int ShortType int BinaryType bytes DateType int date TimestampType long timestamp-micros DecimalType fixed decimal - 2倍讀取吞吐量提高和10%寫入吞吐量提升。
支援 Scala 2.12
從 Spark 2.4 開始,Spark 支援 Scala 2.12,並分別與 Scala 2.11 和 2.12 進行交叉構建,這兩個版本都可以在 Maven 儲存庫和下載頁面中使用。現在,使用者可以使用 Scala 2.12 來編寫 Spark 應用程式。
Scala 2.12 為 Java 8 帶來了更好的互操作性,Java 8 提供了改進的 lambda 函式序列化。 它還包括使用者期望的新功能和錯誤修復。
Pandas UDF 提升
Pandas UDF 是從 Spark 2.3 開始引入的。在此版本中,社群收集了使用者的反饋,並不斷改進 Pandas UDF。
除了錯誤修復之外,Spark 2.4 中還有2個新功能:
- SPARK-22239 使用 Pandas UDF 來使用者自定義視窗函式。
- SPARK-22274 使用 Pandas UDF 來使用者自定義聚合函式。
我們相信這些新功能將進一步改善 Pandas UDF 的使用,我們將在下一版本中不斷改進Pandas UDF。
Image Data Source
社群從影象/視訊/音訊處理行業看到了更多案例。為這些提供 Spark 內建資料來源簡化了使用者將資料匯入 ML 訓練的工作。在 Spark 2.3 版本中,影象資料來源是通過ImageSchema.readImages
實現的。Spark 2.4 發行版中的 SPARK-22666 引入了一個新的 Spark 資料來源,它可以作為 DataFrame 從目錄中遞迴載入影象檔案。現在載入影象非常簡單:
|
Kubernetes 整合增強
Spark 2.4 包含許多 Kubernetes 整合的增強功能。主要包括這三點:
- 首先,此版本支援在 Kubernetes 上執行容器化 PySpark 和 SparkR 應用程式。Spark 為 Dockerfiles 提供了 Python 和 R 繫結,供使用者構建基本映像或自定義它以構建自定義映像。
- 其次,提供了客戶端模式。使用者可以在 Kubernetes 叢集中執行的 pod 裡面執行互動式工具(例如,shell或 notebooks)。
- 最後,支援掛載以下型別的 Kubernetes volumes :emptyDir,hostPath 和 persistentVolumeClaim。
靈活的 Streaming Sink
許多外部儲存系統已經有批量聯結器(batch connectors),但並非所有外部儲存系統都有流式接收器(streaming sinks)。在此版本中,即使儲存系統不支援將流式傳輸作為接收器(streaming as a sink)。streamingDF.writeStream.foreachBatch(...)
允許我們在每個微批次(microbatch)的輸出中使用 batch data writers。例如,過往記憶告訴你可以使用 foreachBatch 中現有的 Apache Cassandra 聯結器直接將流式查詢的輸出寫入到 Cassandra。具體如下:
|
同樣,你也可以使用它將每個微批輸出(micro-batch output)應用於 streaming DataFrames 中,許多 DataFrame/Dataset 操作在 streaming DataFrames 是不支援的,具體使用如下:
|