1. 程式人生 > >Spark Streaming foreachRDD的正確使用方式

Spark Streaming foreachRDD的正確使用方式

重點:Spark Streaming的foreachRDD執行在Driver端,而foreach和foreachPartion執行在Worker節點。

備註:對資料的向外輸出,還是用foreach**運算元好,不要用Map**運算元,因為Map還要返回一個RDD。


誤區一:在driver上建立連線物件(比如網路連線或資料庫連線)

如果在driver上建立連線物件,然後在RDD的運算元函式內使用連線物件,那麼就意味著需要將連線物件序列化後從driver傳遞到worker上。而連線物件(比如Connection物件)通常來說是不支援序列化的,此時通常會報序列化的異常(serialization errors

)。因此連線物件必須在worker上建立,不要在driver上建立。

dstream.foreachRDD { rdd=>

  val connection = createNewConnection()  // 在driver上執行

  rdd.foreach { record =>

    connection.send(record) // 在worker上執行

  }

}

誤區二:為每一條記錄都建立一個連線物件

dstream.foreachRDD { rdd=>

  rdd.foreach { record =>

    val connection = createNewConnection()

    connection.send(record)

    connection.close()

  }

}

通常來說,連線物件的建立和銷燬都是很消耗時間的。因此頻繁地建立和銷燬連線物件,可能會導致降低spark作業的整體效能和吞吐量。

正確做法一:為每個RDD分割槽建立一個連線物件

dstream.foreachRDD { rdd=>

  rdd.foreachPartition { partitionOfRecords=>

    val connection = createNewConnection()

    partitionOfRecords.foreach(record =>connection.send(record))

    connection.close()

  }

}

比較正確的做法是:對DStream中的RDD,呼叫foreachPartition,對RDD中每個分割槽建立一個連線物件,使用一個連線物件將一個分割槽內的資料都寫入底層MySQL中。這樣可以大大減少建立的連線物件的數量。

正確做法二:為每個RDD分割槽使用一個連線池中的連線物件

dstream.foreachRDD { rdd=>

  rdd.foreachPartition { partitionOfRecords=>

// 靜態連線池,同時連線是懶建立的

    val connection =ConnectionPool.getConnection()

    partitionOfRecords.foreach(record =>connection.send(record))

   ConnectionPool.returnConnection(connection)  // 用完以後將連線返回給連線池,進行復用

  }

}