1. 程式人生 > >Spark Streaming之妙用foreachRDD和foreachPartition

Spark Streaming之妙用foreachRDD和foreachPartition

0. 前言

  DStream中的foreachRDD是一個非常強大函式,它允許你把資料傳送給外部系統。因為輸出操作實際上是允許外部系統消費轉換後的資料,它們觸發的實際操作是DStream轉換。所以要掌握它,對它要有深入瞭解。下面有一些常用的錯誤需要理解。經常寫資料到外部系統需要建立一個連線的物件(例如根據TCP協議連線到遠端的伺服器,我們連線外部資料庫需要自己的控制代碼)和傳送資料到遠端的系統。為此,開發者需要在Spark的driver建立一個物件用於連線。

1. 問題

  為了達到這個目的,開發人員可能不經意的在Spark驅動中建立一個連線物件,但是在 Spark worker 中嘗試呼叫這個連線物件儲存記錄到RDD中。如下:

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}

2. 解決方案

  這是不正確的,因為這需要先序列化連線物件,然後將它從driver傳送到worker中。這樣的連線物件在機器之間不能傳送。它可能表現為序列化錯誤(連線物件不可序列化)或者初始化錯誤(連線物件應該 在worker中初始化)等等。正確的解決辦法是在worker中建立連線物件。

  然而,這會造成另外一個常見的錯誤:為每一個記錄建立了一個連線物件。例如:

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

3. 優化方案

  通常,建立一個連線物件有資源和時間的開支。因此,為每個記錄建立和銷燬連線物件會導致非常高的開支,明顯的減少系統的整體吞吐量。一個更好的解決辦法是利用rdd.foreachPartition方法。 為RDD的partition建立一個連線物件,用這個連線物件傳送 partition 中的所有記錄。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => 
        connection.send(record)
    )
    connection.close()
  }
}

4. 進一步優化  

  最後,可以通過在多個RDD或者批資料間重用連線物件做更進一步的優化。開發者可以保有一個靜態的連線物件池,重複使用池中的物件將多批次的RDD推送到外部系統,以進一步節省開支:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record =>   
         connection.send(record)
    )
    ConnectionPool.returnConnection(connection)  
  }
}

  需要注意的是,池中的連線物件應該根據需要延遲建立,並且在空閒一段時間後自動超時。這樣就獲取了最有效的方式發生資料到外部系統。

5. 寫在最後

其它需要注意的地方:

  1. 輸出操作通過懶執行的方式操作DStreams,正如RDD action通過懶執行的方式操作RDD。具體地看,RDD actions和DStreams輸出操作接收資料的處理。因此,如果你的應用程式沒有任何輸出操作或者 用於輸出操作dstream.foreachRDD(),但是沒有任何RDD action操作在dstream.foreachRDD()裡面,那麼什麼也不會執行。系統僅僅會接收輸入,然後丟棄它們。

  2. 預設情況下,DStreams輸出操作是分時執行的,它們按照應用程式的定義順序按序執行。

對機器學習和人工智慧感興趣,請掃碼關注微信公眾號!
這裡寫圖片描述