1. 程式人生 > >Spark RDD/DataFrame map儲存資料的兩種方式

Spark RDD/DataFrame map儲存資料的兩種方式

使用Spark RDD或DataFrame,有時需要在foreachPartition或foreachWith裡面儲存資料到本地或HDFS。

直接儲存資料

當然如果不需要在map裡面儲存資料,那麼針對RDD可以有如下方式

val rdd = // target rdd
rdd.saveAsHadoopFile // add some parameters 

針對DataFrame可以有如下方式儲存資料

val df = // target dataframe
// 儲存中間資料
df.registerTempTable("temp table name")

// 持久化資料
df.save // 使用save函式,指定模式等引數
df.saveAsParquetFile // depressed df.saveAsTable // depressed

foreach裡面儲存資料

呼叫foreachXXX之後,裡面的每條記錄都是Iterator[YYY]形式的資料,是可迭代資料。

儲存到檔案

儲存到檔案相對簡單,可以直接使用上面的save儲存,例如

def save2HDFS(sc: SparkContext, input: Iterator[Row]): Unit = {
    val result = input.map(item => item.getString(0) + "," + item.getInt
(1)).toSeq val tmpRDD = sc.parallelize(result) tmpRDD.saveAsObjectFile("//path") // 1 tmpRDD.saveAsTextFile("//path") // 2 tmpRDD.saveAsTextFile("",CompressClass) // 3 內容編碼類,繼承自org.apache.hadoop.io.compress.CompressionCodec }

儲存到資料庫

在foreachXXX裡面,可以將資料儲存到資料庫,這裡使用的方式為JDBC的方式。

 def save2DB(input: Iterator[Row]): Unit = {

    var
temp: Row = null while (input.hasNext) { temp = input.next // 將迭代資料儲存為入庫資料 } var dbconn: Connection = null var stmt: Statement = null try { dbconn = DriverManager.getConnection("", "", "") stmt = dbconn.createStatement() stmt.execute("truncate table TableName") } catch { case e: Exception => { // println(">>>>>>>>>>>>清空表失敗") // e.printStackTrace() } } finally { { // close connection if (stmt != null) stmt.close() if (dbconn != null) dbconn.close() } { // modify poiner to NULL stmt = null dbconn = null } } }

DataFrame讀入寫出操作

DataFrame可以方便的將要各種資料來源的資料,讀入到記憶體中,也可以方便的將DF資料寫為各種格式的資料。

讀入操作

sqlContext.read.jdbc// JDBC資料來源
sqlContext.read.json// JSON資料來源
sqlContext.read.parquet// Parquet資料來源

寫出操作

val tarDF =  // target dataframe 
tarDF.write.jdbc// 寫入JDBC資料庫
tarDF.write.json// 寫入JSON資料來源
tarDF.write.parquet// 寫入Parquet資料來源

以上幾種資料來源,是Spark自身帶有驅動程式的。其他檔案格式,需要相應的驅動程式,或相應的安裝包支援。