streaming流式讀取hdfs採坑記
阿新 • • 發佈:2018-12-14
package rockerMQ import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext, sql} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @Auther: sss * @Date: 2018/11/26 10:05 * @Description:Streaming接收hdfsDemo */ object Demo02 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(5)) val session = SparkSession.builder().config(conf).getOrCreate() import session.implicits._ val value = ssc.textFileStream("hdfs://192.168.xx.xx:9000/tmp/cxbtest") value.foreachRDD(rdd => { val df= rdd.map(row => { val name = row.split(" ", row.length)(0) val name1 = row.split(" ", row.length)(1) val name2 = row.split(" ", row.length)(2) Test(name, name1, name2) }).toDF() df.createOrReplaceTempView("tmp") session.sql("select name,name1,name2 from tmp where name='a' ").show() }) ssc.start() ssc.awaitTermination() } } case class Test(name: String, name1: String, name2: String)
程式碼如上,自己測試寫的
發現流式讀取hdfs有個大坑,公司是用rocketMQ作為訊息中介軟體,實時將資料接到hdfs上,因為不會
用sparkStreaming整合mq,就用這種方式來接,發現死活接收不到hdfs的流資料,自己在hdfs上建立目錄然後再往裡面上傳文字發現程式碼是好使的,最後猜想原因是
自己往hdfs上傳的檔案或是文字都是幾KB,而hdfs接收mq的資料都是幾GB為一個文字,感覺流式讀取hdfs是以
一個個文字為讀取的批次?? 不是沒有讀到文字,而是一個文字形成的時間太久了,hdfs還沒形成一個文字,我這邊就給kill了