spark streaming中transform過濾廣告黑名單
阿新 • • 發佈:2018-12-20
/* transform操作,應用在DStream上時,可以用於執行任意的RDD到RDD的轉換操作。它可以用於實現,DStream API中 所沒有提供的操作。比如說,DStream API中,並沒有提供將一個DStream中的每個batch,與一個特定的RDD進行join 的操作。但是我們自己就可以使用transform操作來實現該功能。 DStream.join(),只能join其他DStream。在DStream每個batch的RDD計算出來之後,會去跟其他DStream的RDD進行 join */ import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 過濾廣告黑名單案例 * 使用者對我們的網站上的廣告可以進行點選 * 點選之後,是不是要進行實時計費,點一下,算一次錢 * 但是,對於那些幫助某些無良商家刷廣告的人,那麼我們有一個黑名單 * 只要是黑名單中的使用者點選的廣告,我們就給過濾掉 * transform操作,應用在DStream上,可以用於執行任意的RDD到RDD的轉換操作。 *他可以用於實現,DStream API中所沒有提供的操作。 * */ object transformDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("windowOpObj").setMaster("local[*]") val ssc= new StreamingContext(conf,Seconds(5)) //定義一個黑名單 val blackList=List(("tom",true),("jerry",true)) val blRDD = ssc.sparkContext.parallelize(blackList) //使用socketTestStream來監聽埠,也就是接收到的實時資料 val sockrtData = ssc.socketTextStream("192.168.88.130",8888) //解析出使用者的資訊,將接收到的資料轉換成和我們黑名單對應得形式資料格式 val user: DStream[(String, String)] = sockrtData.map(line=>(line.split("_")(1),line)) //將兩個結果集放到一個RDD裡面 //使用Spark Streaming中的transform運算元操作,實現過濾 //transform轉換之後DStream變為RDD val fllterRDD = user.transform(u=>{ //(KEY,("收到的行資訊",null/true)) val joinrdd=u.leftOuterJoin(blRDD) //我們將黑名單過濾後,將處理真正的白名單資料 val filterrdd = joinrdd.filter(tuple=>{ if(tuple._2._2.getOrElse(false)){ false }else{ true } }) val valedRDD = filterrdd.map(tuple=>tuple._2._1) valedRDD }) fllterRDD.print() ssc.start() ssc.awaitTermination() } } /* Time: 1541755835000 ms Time: 1541755840000 ms */