1. 程式人生 > >spark stream中的dstream物件saveAsTextFiles問題

spark stream中的dstream物件saveAsTextFiles問題

利用spark streaming從kafka讀取資料,進行流資料的統計分析,最後產生dstream型別的結果集,但是涉及到資料的儲存時,遇到了一點小障礙。

我們都知道,spark中普通rdd可以直接只用saveAsTextFile(path)的方式,儲存到本地,hdfs中,但是dstream物件沒有saveAsTextFile()方法,只有saveAsTextFiles()方法,而且,其引數只有

prefix: String, suffix: String = ""

官方api對該方法的介紹如下:

Save each RDD in this DStream as at text file,using string representation of elements. The file name at each batch intervalis generated based onprefix

 and suffix:"prefix-TIME_IN_MS.suffix".

大致意思是將prefix與suffix字尾,中間加上time來拼接,找不到輸入path的地方,此處糾結了好久。

後來通過檢視原始碼可以發現這裡邊有個地方一直被忽視了。

原始碼如下:

def saveAsTextFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {
  val saveFunc = (rdd: RDD[T], time: Time) => {
    val file = rddToFileName(prefix, suffix,time)
    rdd.saveAsTextFile(file)
  }
  this

.foreachRDD(saveFunc)
}

其實在saveAsTextFiles方法中又呼叫了saveAsTextFile方法,所以path路徑應該是在此處指定的,即:我們需要將path包含在prefix中。

至此,我想前邊的疑惑也就全部都解開了。

 saveAsTextFiles()方法的用法其實跟saveAsTextFile()一模一樣,只不過比它多了一個拼接時時間的過程。

所以,我們儲存dstream物件,完全可以這樣寫:

Val result_predict=model.predictOnValues(test_data).map(ss=>(ss._1+","+ss._2)).map(lines=>lines.split(","))

     .map(arr=>(arr(0),arr(1),arr(2),"crm_ia_loginqry"))

   result_predict.print()

   result_predict.saveAsTextFiles("hdfs://ip:port/home/songsf/data/result1")