1. 程式人生 > >SparkStreaming 實現廣告計費系統中線上黑名單過濾實戰

SparkStreaming 實現廣告計費系統中線上黑名單過濾實戰

本博文內容主要包括以下內容:

1、線上黑名單過濾實現解析
2、SparkStreaming實現線上黑名單過濾

一、線上黑名單過濾實現解析:

流式處理是現代資料處理的主流,各種電子商務網站,搜尋引擎等網站等,都需要做流式比如,通過使用者的點選和購買來推斷出使用者的興趣愛好,後臺能實時計算,這是比較重要的,給使用者推薦最好的商品等,推薦更新的資訊,給使用者更好的服務。Spark Streaming就是Spark Core上的一個應用程式。Spark Streaming中資料是不斷的流進來,流進來的資料不斷的生成Job,不斷的提交給叢集去處理,要是想更清晰的看到資料流進來,更清晰的看到資料被處理,只要把Batch Interval修改的足夠大,就可以看到了,對於想理解內部的執行過程,排除錯誤等,都是很有必要的。

廣告計費系統,是電商必不可少的一個功能點。為了防止惡意的廣告點選(假設商戶A和B同時在某電商做了廣告,A和B為競爭對手,那麼如果A使用點選機器人進行對B的廣告的惡意點選,那麼B的廣告費用將很快被用完),必須對廣告點選進行黑名單過濾。黑名單的過濾可以是ID,可以是IP等等,黑名單就是過濾的條件,利用SparkStreaming的流處理特性,可實現實時黑名單的過濾實現。可以使用leftouter join 對目標資料和黑名單資料進行關聯,將命中黑名單的資料過濾掉。

二、SparkStreaming實現線上黑名單過濾 :

1、實現程式碼如下:

/**
 * 背景描述:在廣告點選計費系統中,我們線上過濾掉黑名單的點選,進而保護廣告商的利益,只進行有效的廣告點選計費
 *  或者在防刷評分(或者流量)系統,過濾掉無效的投票或者評分或者流量;
 * 實現技術:使用transform Api直接基於RDD程式設計,進行join操作
 * 
 */
object OnlineBlackListFilter { def main(args: Array[String]){ /** * 第1步:建立Spark的配置物件SparkConf,設定Spark程式的執行時的配置資訊, * 例如說通過setMaster來設定程式要連結的Spark叢集的Master的URL,如果設定 * 為local,則代表Spark程式在本地執行,特別適合於機器配置條件非常差(例如 * 只有1G的記憶體)的初學者 * */ val conf = new SparkConf() //建立SparkConf物件
conf.setAppName("OnlineBlackListFilter") //設定應用程式的名稱,在程式執行的監控介面可以看到名稱 conf.setMaster("spark://Master:7077") //此時,程式在Spark叢集 val ssc = new StreamingContext(conf, Seconds(30)) /** * 黑名單資料準備,實際上黑名單一般都是動態的,例如在Redis或者資料庫中,黑名單的生成往往有複雜的業務 * 邏輯,具體情況演算法不同,但是在Spark Streaming進行處理的時候每次都能工訪問完整的資訊 */ val blackList = Array(("hadoop", true),("mahout", true)) val blackListRDD = ssc.sparkContext.parallelize(blackList, 8) val adsClickStream = ssc.socketTextStream("Master", 9999) /** * 此處模擬的廣告點選的每條資料的格式為:time、name * 此處map操作的結果是name、(time,name)的格式 */ val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(" ")(1), ads) } adsClickStreamFormatted.transform(userClickRDD => { //通過leftOuterJoin操作既保留了左側使用者廣告點選內容的RDD的所有內容,又獲得了相應點選內容是否在黑名單中 //左內連線:對於rdd和DStream連線 join是rdd和rdd連線 val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD) /** * 進行filter過濾的時候,其輸入元素是一個Tuple:(name,((time,name), boolean)) * 其中第一個元素是黑名單的名稱,第二元素的第二個元素是進行leftOuterJoin的時候是否存在在值 * 如果存在的話,表面當前廣告點選是黑名單,需要過濾掉,否則的話則是有效點選內容; */ val validClicked = joinedBlackListRDD.filter(joinedItem => { /** * 舉例說明: * val cd=scores.getOrElse("Bob", 0) * 如果scores包含Bob,那麼返回Bob,如果不包含,那麼返回0 */ //意思是:tuple._2._2能get到值,返回值,如果不能得到值,返回false if(joinedItem._2._2.getOrElse(false)) { false } else { true } }) validClicked.map(validClick => {validClick._2._1}) }).print /** * 計算後的有效資料一般都會寫入Kafka中,下游的計費系統會從kafka中pull到有效資料進行計費 */ ssc.start() ssc.awaitTermination() } }

2、將程式碼打包

3、將打好的包放入叢集中,並且寫好指令碼,指令碼內容如下:

/usr/local/spark/bin/spark-submit --class com.dt.spark.sparkstreaming.OnlineBlackListFilter --master spark://Master:7077 /root/Documents/WordCount.jar

4、啟動叢集、此時記得啟動history服務以及新開闢一個視窗執行 nc -lk 9999作為資料的輸入端:

5、此時執行指令碼,觀察結果

這裡寫圖片描述

6、進入18080埠觀察執行過程
這裡寫圖片描述

上述圖片說明我們的業務邏輯是正確的。

博文內容源自DT大資料夢工廠Spark課程總結的筆記。相關課程內容視訊可以參考:
百度網盤連結:http://pan.baidu.com/s/1slvODe1(如果連結失效或需要後續的更多資源,請聯絡QQ460507491或者微訊號:DT1219477246 獲取上述資料)。