1. 程式人生 > >Spark之SparkStreaming案例-transform

Spark之SparkStreaming案例-transform

Transform 操作

transform操作允許將任意RDD到RDD函式應用於DStream。 它可用於應用任何未在DStream API中公開的RDD操作。 例如,將資料流中的每個批處理與其他資料集相結合的功能不會直接暴露在DStream API中。 但是,您可以輕鬆地使用transform來執行此操作。 這使得非常強大的可能性。 例如,可以通過將輸入資料流與預先計算的垃圾資訊(也可以用Spark一起生成)進行實時資料清理,然後根據它進行過濾。

一、案例:過濾刷廣告的使用者,

1.1、模擬一個黑名單

1.1.1、模擬使用者在網站上點選廣告, 但是存在刷廣告的現象, 所以對這類使用者的點選流量進行濾除,所以將此類使用者加入黑名單,

//黑名單列表  (user, boolean), true表示該使用者在黑名單中, 在後續的計算中,不記錄該使用者的點選效果。
final List<Tuple2<String, Boolean>> blockList = new ArrayList<Tuple2<String, Boolean>>();
//ture表示在黑名單上
blockList.add(new Tuple2<String, Boolean>("lisi", true));

1.1.1、將黑名單列表轉為一個RDD,

//黑名單RDD   (user, boolean)
JavaPairRDD<String, Boolean> blackRDD = jssc.sparkContext().parallelizePairs(blockList);

1.2、//從指定埠獲取模擬點選日誌:”date user”

//從指定埠獲取模擬點選日誌:"date  user"
JavaReceiverInputDStream<String> adsClickLogDStream = jssc.socketTextStream("192.168.1.224", 9999);

1.3、將資料流中的資料進行格式轉換

日誌格式為date user,為了在後續工作中, 和黑名單RDD進行join操作方便, 將日誌格式改為(user, log);

log:   date user
改為
(user, log//為了後面對資料流中的RDD和黑名單中RDD進行join操作, 將RDD中的資料進行格式化(user, log)
        JavaPairDStream<String, String> userAdsClickLogDStream = adsClickLogDStream.mapToPair(
                new PairFunction<String, String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, String> call(String log) throws Exception {
                //對日誌格式進行轉換,"date user" 變為(user, log)
                return new Tuple2<String, String>(log.split(" ")[1], log);
            }
        });

1.4、過濾黑名單中的使用者日誌, 此處使用transform操作

    //實時進行黑名單過濾, 執行transform操作, 將每個batch的RDD,與黑名單中的RDD進行join操作
        JavaDStream<String> validAdsClickLogDStream = userAdsClickLogDStream.transform(
                new Function<JavaPairRDD<String,String>, JavaRDD<String>>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public JavaRDD<String> call(JavaPairRDD<String, String> userAdsClickLogRDD)
                            throws Exception {
                        //將黑名單RDD和每個batch的RDD進行join操作
                        // 這裡為什麼是左外連線,因為並不是每個使用者都在黑名單中,所以直接用join,那麼沒有在黑名單中的資料,無法join到就會丟棄
                        // string是使用者,string是日誌,是否在黑名單裡是Optional
                        //(user, (log, boolean))
                        JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> joindRDD = 
                                userAdsClickLogRDD.leftOuterJoin(blackRDD);
                        //過濾    

                        JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> filteredRDD = 
                                joindRDD.filter(new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>, Boolean>() {
                                /*  
                                 *  public interface Function<T1, R> extends Serializable {
                                          R call(T1 v1) throws Exception;
                                        }

                                 */
                                    private static final long serialVersionUID = 1L;

                                    @Override
                                    public Boolean call(Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple)
                                            throws Exception {//(user, (log, boolean))
                                        //這裡tuple就是每個使用者對應的訪問日誌和在黑名單中狀態
                                        if (tuple._2._2.isPresent() && tuple._2._2.get()) {
                                            return false;
                                        }else {
                                            return true;
                                        }
                                    }
                                });

                        // 到此為止,filteredRDD中就只剩下沒有被過濾的正常使用者了,用map函式轉換成我們要的格式,我們只要點選日誌
                        JavaRDD<String> validAdsCiickLogRDD = filteredRDD.map(new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>, String>() {

                            private static final long serialVersionUID = 1L;

                            @Override
                            public String call( Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple)
                                    throws Exception {
                                return tuple._2._1;
                            }

                        });
                        //放回過濾的結果
                        return validAdsCiickLogRDD;
                    }
        });

1.4.1、在transfrom操作中, 對每個batch中的RDD進行join操作

//將黑名單RDD和每個batch的RDD進行join操作
// 這裡為什麼是左外連線,因為並不是每個使用者都在黑名單中,所以直接用join,那麼沒有在黑名單中的資料,無法join到就會丟棄
// string是使用者,string是日誌,是否在黑名單裡是Optional
//得到的結果:(user, (log, boolean))
JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> joindRDD = 
            userAdsClickLogRDD.leftOuterJoin(blackRDD);

1.4.2、黑名單和batch中的RDDjoin之後,對結果進行過濾

                        //過濾    
                        JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> filteredRDD = 
                                joindRDD.filter(new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>, Boolean>() {
                                /*  
                                 *  public interface Function<T1, R> extends Serializable {
                                          R call(T1 v1) throws Exception;
                                        }

                                 */
                                    private static final long serialVersionUID = 1L;

                                    @Override
                                    public Boolean call(Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple)
                                            throws Exception {//(user, (log, boolean))
                                        //這裡tuple就是每個使用者對應的訪問日誌和在黑名單中狀態
                                        if (tuple._2._2.isPresent() && tuple._2._2.get()) {
                                            return false;
                                        }else {
                                            return true;
                                        }
                                    }
                                });

1.4.3、就只剩下沒有被過濾的正常使用者了,用map函式轉換成我們要的格式,我們只要點選日誌


                        // 到此為止,filteredRDD中就只剩下沒有被過濾的正常使用者了,用map函式轉換成我們要的格式,我們只要點選日誌
                        JavaRDD<String> validAdsCiickLogRDD = filteredRDD.map(new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>, String>() {

                            private static final long serialVersionUID = 1L;

                            @Override
                            public String call( Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple)
                                    throws Exception {
                                return tuple._2._1;
                            }

                        });
                        //放回過濾的結果
                        return validAdsCiickLogRDD;

1.5、啟動

        // 這後面就可以寫入Kafka中介軟體訊息佇列,作為廣告計費服務的有效廣告點選資料
        validAdsClickLogDStream.print();

        jssc.start();
        jssc.awaitTermination();
        jssc.close();

scala

package com.chb.scala

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkContext

object BlackListFilter {
    def main(args: Array[String]): Unit = {
      val sparkConf = new SparkConf().setAppName("BlackListFilter")
            .setMaster("local[*]")
      val ssc = new StreamingContext(sparkConf, Seconds(5))

      //黑名單
      val blackList = Array(("jack", true), ("rose", true))
      //設定並行度
      val blackListRDD = ssc.sparkContext.parallelize(blackList, 3)

      //使用socketTextStream 監聽埠
      var st = ssc.socketTextStream("192.168.179.5", 8888)

      //user, boolean==> 
      val users = st.map { 
         line =>  (line.split(" ")(1), line)
      }

      val validRddDS = users.transform(ld => {
          //通過leftOuterJoin 將(k, v) join (k,w) ==> (k, (v, some(W)))
          val ljoinRdd = ld.leftOuterJoin(blackListRDD)

          //過濾掉黑名單
          val fRdd = ljoinRdd.filter(tuple => {
            println(tuple)
            if(tuple._2._2.getOrElse(false)) {  
                false
            } else {
                true
            }
          })

          //獲取白名單
          val validRdd = fRdd.map(tuple => tuple._2._1) 
          validRdd
      })

      validRddDS.print()
      ssc.start()
      ssc.awaitTermination()
    }
}