SparkStreaming(5):例項-SparkStreaming處理本地或者HDFS檔案
阿新 • • 發佈:2018-11-08
1.實現功能:
SparkStreaming處理本地或者HDFS檔案,並進行wordcount的統計。
2.前提開啟:
(1)hdfs
(2)metastore
3.scala程式碼:
(1)本地目錄寫法:
file:///E:\\Tools\\WorkspaceforMyeclipse\\scalaProjectMaven\\datas\\
(2)hdfs目錄寫法:
/spark/
(3)程式碼(以本地為例)
package Spark import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 使用spark Streaming處理檔案系統(local/hdfs)的資料 */ object FileWordCount { def main(args: Array[String]): Unit = { val sparkConf=new SparkConf().setMaster("local[2]").setAppName("FileWordCount") val ssc=new StreamingContext(sparkConf,Seconds(5)) // file:///opt/modules/spark-2.1.0-bin-2.7.3/README.md val lines=ssc.textFileStream("file:///E:\\Tools\\WorkspaceforMyeclipse\\scalaProjectMaven\\datas\\") val result= lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) result.print() ssc.start() ssc.awaitTermination() } }
4.測試:
(1)將內容寫入test.log
(2)將檔案test.log採用cp方式,放到對應datas檔案下面
cp .\test.log .\datas\
(注意:(2)非常重要,一定要通過cp或者mv的方式移動進去,否者streaming讀取不到增加的流資訊!)