1. 程式人生 > >Spark 2.4 正式釋出,重要功能詳細介紹

Spark 2.4 正式釋出,重要功能詳細介紹

美國時間 2018年11月08日 正式釋出了。一如既往,為了繼續實現 Spark 更快,更輕鬆,更智慧的目標,Spark 2.4帶來了許多新功能,如下:

  • 新增一種支援屏障模式(barrier mode)的排程器,以便與基於MPI的程式更好地整合,例如, 分散式深度學習框架;
  • 引入了許多內建的高階函式,以便更容易處理複雜的資料型別(比如陣列和 map);
  • 開始支援 Scala 2.12;
  • 允許我們對 notebooks 中的 DataFrame 進行熱切求值(eager evaluation),以便於除錯和排除故障;
  • 引入新的內建 Avro 資料來源。

除了這些新功能外,該版本還重點關注可用性,穩定性和優化,解決了超過1000 個tickets。 Spark 貢獻者的其他顯著特徵包括:

簡要總結了一些更高級別的功能和改進。 有關 Spark 所有元件和 JIRA 已解決的主要功能的完整列表,請閱讀 Apache Spark 2.4.0 release notes

Barrier Execution Mode

Barrier Execution Mode 是 Project Hydrogen 的一部分,這是 Apache Spark 的一項計劃,旨在將最先進的大資料和 AI 技術結合在一起。它可以將來自 AI 框架的分散式訓練作業正確地嵌入到 Spark 作業中。我們通常會像 All-Reduce 這樣來探討複雜通訊模式(complex communication patterns),因此所有的任務都需要同時執行。這不符合 Spark 當前使用的 MapReduce 模式 。使用這種新的執行模式,Spark 同時啟動所有訓練任務(例如,MPI 任務),並在任務失敗時重新啟動所有任務。Spark 還為屏障(barrier tasks)任務引入了一種新的容錯機制。當任何障礙任務在中間失敗時,Spark 將中止所有任務並重新啟動當前 stage。

內建高階函式

在 Spark 2.4 之前,為了直接操作複雜型別(例如陣列型別),有兩種典型的解決方案:

  • 將巢狀結構展開為多行,並應用某些函式,然後再次建立結構;
  • 編寫使用者自定義函式(UDF)。

新的內建函式可以直接操作複雜型別,高階函式可以使用匿名 lambda 函式直接操作複雜值,類似於UDF,但具有更好的效能。比如以下兩個高階函式:

SELECT array_distinct(array(1, 2, 3, null, 3));

["1","2","3",null]

 

SELECT array_intersect(array(1, 2, 3), array(1, 3, 5));

["1","3"]

關於內建函式和高階函式的進一步說明可以參見《Apache Spark 2.4 中解決複雜資料型別的內建函式和高階函式介紹》《Apache Spark 2.4 新增內建函式和高階函式使用介紹》

內建 Avro 資料來源

Apache Avro 是一種流行的資料序列化格式。它廣泛用於 Apache Spark 和 Apache Hadoop 生態系統,尤其適用於基於 Kafka 的資料管道。從 Apache Spark 2.4 版本開始,Spark 為讀取和寫入 Avro 資料提供內建支援。新的內建 spark-avro 模組最初來自 Databricks 的開源專案Avro Data Source for Apache Spark。除此之外,它還提供以下功能:

  • 新函式 from_avro() 和 to_avro() 用於在 DataFrame 中讀取和寫入 Avro 資料,而不僅僅是檔案。
  • 支援 Avro 邏輯型別(logical types),包括 Decimal,Timestamp 和 Date型別。Spark SQL 和 Avro 的資料型別之間的轉換可以參見下面:
    Spark SQL type Avro type Avro logical type
    ByteType int  
    ShortType int  
    BinaryType bytes  
    DateType int date
    TimestampType long timestamp-micros
    DecimalType fixed decimal
  • 2倍讀取吞吐量提高和10%寫入吞吐量提升。

支援 Scala 2.12

從 Spark 2.4 開始,Spark 支援 Scala 2.12,並分別與 Scala 2.11 和 2.12 進行交叉構建,這兩個版本都可以在 Maven 儲存庫和下載頁面中使用。現在,使用者可以使用 Scala 2.12 來編寫 Spark 應用程式。

Scala 2.12 為 Java 8 帶來了更好的互操作性,Java 8 提供了改進的 lambda 函式序列化。 它還包括使用者期望的新功能和錯誤修復。

Pandas UDF 提升

Pandas UDF 是從 Spark 2.3 開始引入的。在此版本中,社群收集了使用者的反饋,並不斷改進 Pandas UDF。

除了錯誤修復之外,Spark 2.4 中還有2個新功能:

  • SPARK-22239 使用 Pandas UDF 來使用者自定義視窗函式。
  • SPARK-22274 使用 Pandas UDF 來使用者自定義聚合函式。

我們相信這些新功能將進一步改善 Pandas UDF 的使用,我們將在下一版本中不斷改進Pandas UDF。

Image Data Source

社群從影象/視訊/音訊處理行業看到了更多案例。為這些提供 Spark 內建資料來源簡化了使用者將資料匯入 ML 訓練的工作。在 Spark 2.3 版本中,影象資料來源是通過ImageSchema.readImages 實現的。Spark 2.4 發行版中的 SPARK-22666 引入了一個新的 Spark 資料來源,它可以作為 DataFrame 從目錄中遞迴載入影象檔案。現在載入影象非常簡單:

df = spark.read.format("image").load("...")

Kubernetes 整合增強

Spark 2.4 包含許多 Kubernetes 整合的增強功能。主要包括這三點:

  • 首先,此版本支援在 Kubernetes 上執行容器化 PySpark 和 SparkR 應用程式。Spark 為 Dockerfiles 提供了 Python 和 R 繫結,供使用者構建基本映像或自定義它以構建自定義映像。
  • 其次,提供了客戶端模式。使用者可以在 Kubernetes 叢集中執行的 pod 裡面執行互動式工具(例如,shell或 notebooks)。
  • 最後,支援掛載以下型別的 Kubernetes volumes :emptyDir,hostPath 和 persistentVolumeClaim。

靈活的 Streaming Sink

許多外部儲存系統已經有批量聯結器(batch connectors),但並非所有外部儲存系統都有流式接收器(streaming sinks)。在此版本中,即使儲存系統不支援將流式傳輸作為接收器(streaming as a sink)。streamingDF.writeStream.foreachBatch(...) 允許我們在每個微批次(microbatch)的輸出中使用 batch data writers。例如,過往記憶告訴你可以使用 foreachBatch 中現有的 Apache Cassandra 聯結器直接將流式查詢的輸出寫入到 Cassandra。具體如下:

 

streamingDF.writeStream

.foreachBatch { (iteblogBatchDF: DataFrame, batchId: Long) =>

    iteblogBatchDF.write       // Use Cassandra batch data source to write streaming out

      .cassandraFormat(tableName, keyspace)

      .option("cluster", "iteblog_hadoop")

      .mode("append")

      .save()

  }

同樣,你也可以使用它將每個微批輸出(micro-batch output)應用於 streaming DataFrames 中,許多 DataFrame/Dataset 操作在 streaming DataFrames 是不支援的,具體使用如下:

streamingDF.writeStream.foreachBatch { (iteblogBatchDF: DataFrame, batchId: Long) =>

  iteblogBatchDF.cache()

  iteblogBatchDF.write.format(...).save(...)  // location 1

  iteblogBatchDF.write.format(...).save(...)  // location 2

  iteblogBatchDF.uncache()

}