spark stream中的dstream物件saveAsTextFiles問題
利用spark streaming從kafka讀取資料,進行流資料的統計分析,最後產生dstream型別的結果集,但是涉及到資料的儲存時,遇到了一點小障礙。
我們都知道,spark中普通rdd可以直接只用saveAsTextFile(path)的方式,儲存到本地,hdfs中,但是dstream物件沒有saveAsTextFile()方法,只有saveAsTextFiles()方法,而且,其引數只有
prefix: String, suffix: String = ""
官方api對該方法的介紹如下:
Save each RDD in this DStream as at text file,using string representation of elements. The file name at each batch intervalis generated based onprefix
suffix
:"prefix-TIME_IN_MS.suffix".
大致意思是將prefix與suffix字尾,中間加上time來拼接,找不到輸入path的地方,此處糾結了好久。
後來通過檢視原始碼可以發現這裡邊有個地方一直被忽視了。
原始碼如下:
def
saveAsTextFiles(prefix: String, suffix:
String = ""): Unit = ssc.withScope {
val saveFunc = (rdd: RDD[T], time: Time) => {
val file = rddToFileName(prefix, suffix,time)
rdd.saveAsTextFile(file)
}
this
}
其實在saveAsTextFiles方法中又呼叫了saveAsTextFile方法,所以path路徑應該是在此處指定的,即:我們需要將path包含在prefix中。
至此,我想前邊的疑惑也就全部都解開了。
saveAsTextFiles()方法的用法其實跟saveAsTextFile()一模一樣,只不過比它多了一個拼接時時間的過程。
所以,我們儲存dstream物件,完全可以這樣寫:
Val result_predict=model.predictOnValues(test_data).map(ss=>(ss._1+","+ss._2)).map(lines=>lines.split(","))
.map(arr=>(arr(0),arr(1),arr(2),"crm_ia_loginqry"))
result_predict.print()
result_predict.saveAsTextFiles("hdfs://ip:port/home/songsf/data/result1")