Spark Streaming foreachRDD的正確使用方式
重點:Spark Streaming的foreachRDD執行在Driver端,而foreach和foreachPartion執行在Worker節點。
備註:對資料的向外輸出,還是用foreach**運算元好,不要用Map**運算元,因為Map還要返回一個RDD。
誤區一:在driver上建立連線物件(比如網路連線或資料庫連線)
如果在driver上建立連線物件,然後在RDD的運算元函式內使用連線物件,那麼就意味著需要將連線物件序列化後從driver傳遞到worker上。而連線物件(比如Connection物件)通常來說是不支援序列化的,此時通常會報序列化的異常(serialization errors
dstream.foreachRDD { rdd=>
val connection = createNewConnection() // 在driver上執行
rdd.foreach { record =>
connection.send(record) // 在worker上執行
}
}
誤區二:為每一條記錄都建立一個連線物件
dstream.foreachRDD { rdd=>
rdd.foreach { record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}
通常來說,連線物件的建立和銷燬都是很消耗時間的。因此頻繁地建立和銷燬連線物件,可能會導致降低spark作業的整體效能和吞吐量。
正確做法一:為每個RDD分割槽建立一個連線物件
dstream.foreachRDD { rdd=>
rdd.foreachPartition { partitionOfRecords=>
val connection = createNewConnection()
partitionOfRecords.foreach(record =>connection.send(record))
connection.close()
}
}
比較正確的做法是:對DStream中的RDD,呼叫foreachPartition,對RDD中每個分割槽建立一個連線物件,使用一個連線物件將一個分割槽內的資料都寫入底層MySQL中。這樣可以大大減少建立的連線物件的數量。
正確做法二:為每個RDD分割槽使用一個連線池中的連線物件
dstream.foreachRDD { rdd=>
rdd.foreachPartition { partitionOfRecords=>
// 靜態連線池,同時連線是懶建立的
val connection =ConnectionPool.getConnection()
partitionOfRecords.foreach(record =>connection.send(record))
ConnectionPool.returnConnection(connection) // 用完以後將連線返回給連線池,進行復用
}
}