SparkStreaming(15):DStream轉換為RDD的Transform運算元
阿新 • • 發佈:2018-12-16
1.實現功能
DStream中還是缺少某些API的,比如sortByKey之類的。所以使用Transform直接操作DStream中的當前job/批次對應的RDD,來替換DStream的操作(可以直接使用RDD的api),比較方便。
2.程式碼
package _0809kafka import java.text.SimpleDateFormat import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext, Time} /** * */ object DStream2RddAPI { def main(args: Array[String]) { //1、建立sparkConf val sparkConf: SparkConf = new SparkConf() .setAppName("DStream2RddAPI") .setMaster("local[2]") //2、建立sparkContext val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc,Seconds(10)) val socketDStream: ReceiverInputDStream[String] = ssc.socketTextStream("bigdata.ibeifeng.com",9999) //dstream 當中有一些api是沒有的(例如:sortbyKey等) //將DStream轉換成RDD進行操作 val resultDStream: DStream[((String, String), Int)] = socketDStream.transform((rdd,timestamp) =>{ val sdf = new SimpleDateFormat("yyyyMMdd HH:mm:ss") val ts: String =sdf.format(timestamp.milliseconds) rdd.flatMap(_.split(" ")) .filter(word =>word.nonEmpty) .map(word =>((word,ts),1)) .reduceByKey(_ + _) //指定按照第二個位置上的資料型別排序,並且倒敘 .sortBy(t =>t._2,ascending = false) }) resultDStream.print() ssc.start() ssc.awaitTermination() } }
3.測試
(1)開啟nc
nc -lt 9999
(2)然後再執行程式,否則會報錯!說9999埠無法連線
(3)輸入測試