SparkStreaming 實現廣告計費系統中線上黑名單過濾實戰
本博文內容主要包括以下內容:
1、線上黑名單過濾實現解析
2、SparkStreaming實現線上黑名單過濾
一、線上黑名單過濾實現解析:
流式處理是現代資料處理的主流,各種電子商務網站,搜尋引擎等網站等,都需要做流式比如,通過使用者的點選和購買來推斷出使用者的興趣愛好,後臺能實時計算,這是比較重要的,給使用者推薦最好的商品等,推薦更新的資訊,給使用者更好的服務。Spark Streaming就是Spark Core上的一個應用程式。Spark Streaming中資料是不斷的流進來,流進來的資料不斷的生成Job,不斷的提交給叢集去處理,要是想更清晰的看到資料流進來,更清晰的看到資料被處理,只要把Batch Interval修改的足夠大,就可以看到了,對於想理解內部的執行過程,排除錯誤等,都是很有必要的。
廣告計費系統,是電商必不可少的一個功能點。為了防止惡意的廣告點選(假設商戶A和B同時在某電商做了廣告,A和B為競爭對手,那麼如果A使用點選機器人進行對B的廣告的惡意點選,那麼B的廣告費用將很快被用完),必須對廣告點選進行黑名單過濾。黑名單的過濾可以是ID,可以是IP等等,黑名單就是過濾的條件,利用SparkStreaming的流處理特性,可實現實時黑名單的過濾實現。可以使用leftouter join 對目標資料和黑名單資料進行關聯,將命中黑名單的資料過濾掉。
二、SparkStreaming實現線上黑名單過濾 :
1、實現程式碼如下:
/**
* 背景描述:在廣告點選計費系統中,我們線上過濾掉黑名單的點選,進而保護廣告商的利益,只進行有效的廣告點選計費
* 或者在防刷評分(或者流量)系統,過濾掉無效的投票或者評分或者流量;
* 實現技術:使用transform Api直接基於RDD程式設計,進行join操作
*
*/
object OnlineBlackListFilter {
def main(args: Array[String]){
/**
* 第1步:建立Spark的配置物件SparkConf,設定Spark程式的執行時的配置資訊,
* 例如說通過setMaster來設定程式要連結的Spark叢集的Master的URL,如果設定
* 為local,則代表Spark程式在本地執行,特別適合於機器配置條件非常差(例如
* 只有1G的記憶體)的初學者 *
*/
val conf = new SparkConf() //建立SparkConf物件
conf.setAppName("OnlineBlackListFilter") //設定應用程式的名稱,在程式執行的監控介面可以看到名稱
conf.setMaster("spark://Master:7077") //此時,程式在Spark叢集
val ssc = new StreamingContext(conf, Seconds(30))
/**
* 黑名單資料準備,實際上黑名單一般都是動態的,例如在Redis或者資料庫中,黑名單的生成往往有複雜的業務
* 邏輯,具體情況演算法不同,但是在Spark Streaming進行處理的時候每次都能工訪問完整的資訊
*/
val blackList = Array(("hadoop", true),("mahout", true))
val blackListRDD = ssc.sparkContext.parallelize(blackList, 8)
val adsClickStream = ssc.socketTextStream("Master", 9999)
/**
* 此處模擬的廣告點選的每條資料的格式為:time、name
* 此處map操作的結果是name、(time,name)的格式
*/
val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(" ")(1), ads) }
adsClickStreamFormatted.transform(userClickRDD => {
//通過leftOuterJoin操作既保留了左側使用者廣告點選內容的RDD的所有內容,又獲得了相應點選內容是否在黑名單中
//左內連線:對於rdd和DStream連線 join是rdd和rdd連線
val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD)
/**
* 進行filter過濾的時候,其輸入元素是一個Tuple:(name,((time,name), boolean))
* 其中第一個元素是黑名單的名稱,第二元素的第二個元素是進行leftOuterJoin的時候是否存在在值
* 如果存在的話,表面當前廣告點選是黑名單,需要過濾掉,否則的話則是有效點選內容;
*/
val validClicked = joinedBlackListRDD.filter(joinedItem => {
/**
* 舉例說明:
* val cd=scores.getOrElse("Bob", 0)
* 如果scores包含Bob,那麼返回Bob,如果不包含,那麼返回0
*/
//意思是:tuple._2._2能get到值,返回值,如果不能得到值,返回false
if(joinedItem._2._2.getOrElse(false))
{
false
} else {
true
}
})
validClicked.map(validClick => {validClick._2._1})
}).print
/**
* 計算後的有效資料一般都會寫入Kafka中,下游的計費系統會從kafka中pull到有效資料進行計費
*/
ssc.start()
ssc.awaitTermination()
}
}
2、將程式碼打包
3、將打好的包放入叢集中,並且寫好指令碼,指令碼內容如下:
/usr/local/spark/bin/spark-submit --class com.dt.spark.sparkstreaming.OnlineBlackListFilter --master spark://Master:7077 /root/Documents/WordCount.jar
4、啟動叢集、此時記得啟動history服務以及新開闢一個視窗執行 nc -lk 9999作為資料的輸入端:
5、此時執行指令碼,觀察結果
6、進入18080埠觀察執行過程
上述圖片說明我們的業務邏輯是正確的。
博文內容源自DT大資料夢工廠Spark課程總結的筆記。相關課程內容視訊可以參考:
百度網盤連結:http://pan.baidu.com/s/1slvODe1(如果連結失效或需要後續的更多資源,請聯絡QQ460507491或者微訊號:DT1219477246 獲取上述資料)。