Spark RDD/DataFrame map儲存資料的兩種方式
阿新 • • 發佈:2019-01-29
使用Spark RDD或DataFrame,有時需要在foreachPartition或foreachWith裡面儲存資料到本地或HDFS。
直接儲存資料
當然如果不需要在map裡面儲存資料,那麼針對RDD可以有如下方式
val rdd = // target rdd
rdd.saveAsHadoopFile // add some parameters
針對DataFrame可以有如下方式儲存資料
val df = // target dataframe
// 儲存中間資料
df.registerTempTable("temp table name")
// 持久化資料
df.save // 使用save函式,指定模式等引數
df.saveAsParquetFile // depressed
df.saveAsTable // depressed
foreach裡面儲存資料
呼叫foreachXXX之後,裡面的每條記錄都是Iterator[YYY]形式的資料,是可迭代資料。
儲存到檔案
儲存到檔案相對簡單,可以直接使用上面的save儲存,例如
def save2HDFS(sc: SparkContext, input: Iterator[Row]): Unit = {
val result = input.map(item => item.getString(0) + "," + item.getInt (1)).toSeq
val tmpRDD = sc.parallelize(result)
tmpRDD.saveAsObjectFile("//path") // 1
tmpRDD.saveAsTextFile("//path") // 2
tmpRDD.saveAsTextFile("",CompressClass) // 3 內容編碼類,繼承自org.apache.hadoop.io.compress.CompressionCodec
}
儲存到資料庫
在foreachXXX裡面,可以將資料儲存到資料庫,這裡使用的方式為JDBC的方式。
def save2DB(input: Iterator[Row]): Unit = {
var temp: Row = null
while (input.hasNext) {
temp = input.next // 將迭代資料儲存為入庫資料
}
var dbconn: Connection = null
var stmt: Statement = null
try {
dbconn = DriverManager.getConnection("", "", "")
stmt = dbconn.createStatement()
stmt.execute("truncate table TableName")
} catch {
case e: Exception => {
// println(">>>>>>>>>>>>清空表失敗")
// e.printStackTrace()
}
} finally {
{ // close connection
if (stmt != null)
stmt.close()
if (dbconn != null)
dbconn.close()
}
{ // modify poiner to NULL
stmt = null
dbconn = null
}
}
}
DataFrame讀入寫出操作
DataFrame可以方便的將要各種資料來源的資料,讀入到記憶體中,也可以方便的將DF資料寫為各種格式的資料。
讀入操作
sqlContext.read.jdbc// JDBC資料來源
sqlContext.read.json// JSON資料來源
sqlContext.read.parquet// Parquet資料來源
寫出操作
val tarDF = // target dataframe
tarDF.write.jdbc// 寫入JDBC資料庫
tarDF.write.json// 寫入JSON資料來源
tarDF.write.parquet// 寫入Parquet資料來源
以上幾種資料來源,是Spark自身帶有驅動程式的。其他檔案格式,需要相應的驅動程式,或相應的安裝包支援。