【專案】美團廣告流量實時統計
阿新 • • 發佈:2019-01-22
1.專案分析
技術分析:
SparkStreaming或者Strom
資料:
廣告流量點選資料
需求分析:
1)【 實時】統計【每天】【各省】【熱門】廣告(分組求廣告點選次數多的TopN)
2)實時統計某個階段廣告投放趨勢
資料調研:
timestamp:時間戳,使用者點選廣告的時間
province:省份,使用者在哪個省份點選的廣告
city:城市,使用者在哪個城市點選的廣告
userid:使用者的唯一標識
advid:被點選的廣告id
現在有資料來源在kafka裡面
2.黑名單過濾
import kafka.serializer.StringDecoder import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} /** * Create by jenrey on 2018/5/27 21:07 */ object AdvApplicationTest { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("AdvApplicationTest") conf.setMaster("local") conf.set("", "") //序列化 val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(5)) /** * TODO:第一步:從kafka獲取資料(direct 方式) */ /* K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String]*/ val kafkaParams = Map("metadata.broker.list" -> "hadoop04:9092") val topics = Set("aura") val logDStream: DStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2) //TODO:如果【一個使用者】【一天內】對【某個廣告】點選的次數超過了【100次】,這樣的使用者屬於黑名單使用者,這樣的資料就不統計了 /** * TODO:第二步:進行黑名單過濾 */ val filterLogDStream: DStream[String] = blackListFileter(logDStream,ssc) /** * TODO:第三步:動態生成黑名單 */ /** * TODO:第四步:實時統計每天各省各城市廣告點選量 */ /** * TODO:第五步:實時統計每天各省熱門廣告點選量 */ /** * TODO:第六步:實時統計每天每個廣告在最近一小時的滑動視窗的點選趨勢 */ ssc.start() ssc.awaitTermination() ssc.stop() } /** * 對黑名單進行過濾的方法 * * @param logDStream 從kafka讀取資料 * @return 進行黑名單過濾以後的資料 */ def blackListFileter(logDStream: DStream[String], ssc: StreamingContext): DStream[String] = { //這個地方的黑名單,應該是從我們持久化的資料庫裡面讀取的:有三個資料庫是我們常用的(Redis,hbase,mysql) val blackList = List((1L, true), (2L, true), (3L, true)) //把黑名單轉化成RDD val blackListRDD: RDD[(Long, Boolean)] = ssc.sparkContext.parallelize(blackList) //廣播黑名單 val blackListBroadcast: Broadcast[Array[(Long, Boolean)]] = ssc.sparkContext.broadcast(blackListRDD.collect()) //transform對傳進來的DStream中的每一個RDD進行操作 logDStream.transform(rdd => { //把傳進來的資料切分,組成kv形式 val user_lineRDD: RDD[(Long, String)] = rdd.map(line => { val fields: Array[String] = line.split(",") (fields(3).toLong, line) }) //注意廣播出去後,需要使用.value來獲取播放值 val blackRDD: RDD[(Long, Boolean)] = rdd.sparkContext.parallelize(blackListBroadcast.value) /** * List((22L, "qwe"), (2L, "asd"), (3L, "zxc")) * List((1L, true), (2L, true), (3L, true)) * leftOuterJoin 後的結果如下,此運算元必須都是kv形式才行 * (22,(qwe,None)) * (3,(zxc,Some(true))) * (2,(asd,Some(true))) */ val resultRDD: RDD[(Long, (String, Option[Boolean]))] = user_lineRDD.leftOuterJoin(blackRDD) //這個是返回值,返回進行黑名單過濾以後的資料 resultRDD.filter(tuple=>{ tuple._2._2.isEmpty }).map(_._2._1) }) } }
3.動態生成黑名單
import java.util.{Date, Properties} import kafka.serializer.StringDecoder import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} import utils.{ConnectionPool, DateUtils} /** * Create by jenrey on 2018/5/27 21:07 */ object AdvApplicationTest { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("AdvApplicationTest") conf.setMaster("local") conf.set("", "") //序列化 val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(5)) val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() /** * TODO:第一步:從kafka獲取資料(direct 方式) */ /* K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String]*/ val kafkaParams = Map("metadata.broker.list" -> "hadoop04:9092") val topics = Set("aura") val logDStream: DStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2) //TODO:如果【一個使用者】【一天內】對【某個廣告】點選的次數超過了【100次】,這樣的使用者屬於黑名單使用者,這樣的資料就不統計了 /** * TODO:第二步:進行黑名單過濾 */ val filterLogDStream: DStream[String] = blackListFileter(logDStream, ssc) /** * TODO:第三步:動態生成黑名單 實時生成黑名單 */ DynamicGenerationBlacklists(filterLogDStream,spark) /** * TODO:第四步:實時統計每天各省各城市廣告點選量 */ /** * TODO:第五步:實時統計每天各省熱門廣告點選量 */ /** * TODO:第六步:實時統計每天每個廣告在最近一小時的滑動視窗的點選趨勢 */ ssc.start() ssc.awaitTermination() ssc.stop() } /** * TODO:對黑名單進行過濾的方法 * * @param logDStream 從kafka讀取資料 * @return 進行黑名單過濾以後的資料 */ def blackListFileter(logDStream: DStream[String], ssc: StreamingContext): DStream[String] = { //這個地方的黑名單,應該是從我們持久化的資料庫裡面讀取的:有三個資料庫是我們常用的(Redis,hbase,mysql) val blackList = List((1L, true), (2L, true), (3L, true)) //把黑名單轉化成RDD val blackListRDD: RDD[(Long, Boolean)] = ssc.sparkContext.parallelize(blackList) //廣播黑名單 val blackListBroadcast: Broadcast[Array[(Long, Boolean)]] = ssc.sparkContext.broadcast(blackListRDD.collect()) //transform對傳進來的DStream中的每一個RDD進行操作 logDStream.transform(rdd => { //把傳進來的資料切分,組成kv形式 val user_lineRDD: RDD[(Long, String)] = rdd.map(line => { val fields: Array[String] = line.split(",") (fields(3).toLong, line) }) //注意廣播出去後,需要使用.value來獲取播放值 val blackRDD: RDD[(Long, Boolean)] = rdd.sparkContext.parallelize(blackListBroadcast.value) /** * List((22L, "qwe"), (2L, "asd"), (3L, "zxc")) * List((1L, true), (2L, true), (3L, true)) * leftOuterJoin 後的結果如下,此運算元必須都是kv形式才行 * (22,(qwe,None)) * (3,(zxc,Some(true))) * (2,(asd,Some(true))) */ val resultRDD: RDD[(Long, (String, Option[Boolean]))] = user_lineRDD.leftOuterJoin(blackRDD) //這個是返回值,返回進行黑名單過濾以後的資料 resultRDD.filter(tuple => { tuple._2._2.isEmpty }).map(_._2._1) }) } /** * TODO:動態生成黑名單 * * @param filterLogDStream 黑名單過濾完了以後的資料 * 如果【一個使用者】【一天內】對【某個廣告】點選的次數超過了【100次】,這樣的使用者屬於黑名單使用者 * 有三種方式:1)使用UpdateStateByKey 2)reduceByKey 存入HBase 3)Mysql的方式 */ def DynamicGenerationBlacklists(filterLogDStream: DStream[String], spark: SparkSession): Unit = { val date_userid_advid_ds: DStream[(String, Long)] = filterLogDStream.map(line => { val fields: Array[String] = line.split(",") val time = new Date(fields(0).toLong) val date: String = DateUtils.formatDateKey(time) val userid: String = fields(3) val advid: String = fields(4) (date + "_" + userid + "_" + advid, 1L) }).reduceByKey(_ + _) date_userid_advid_ds.foreachRDD(rdd => { rdd.foreachPartition(partition => { //下面是寫好的工具類,連線Mysql val connection = ConnectionPool.getConnection() val statement = connection.createStatement() partition.foreach { case (date_userid_advid, count) => { val fields = date_userid_advid.split("_") val date = fields(0) val userid = fields(1).toLong val advid = fields(2).toLong val sql = s"insert into aura.tmp_advclick_count values($date,$userid,$advid,$count)"; statement.execute(sql); } } ConnectionPool.returnConnection(connection) }) }) /** * 生成黑名單 */ val df: DataFrame = spark.read.format("jdbc") .option("url", "jdbc:mysql://localhost:3306/aura") .option("user", "aura") .option("password", "aura") .option("dbtable", "tmp_advclick_count") .load() df.createOrReplaceTempView("tmp_advclick_count") val sql = """ select userid from ( select date,userid,advid,sum(click_count) c_count from tmp_advclick_count group by date,userid,advid ) t where t.c_count>100 """ val blacklistdf= spark.sql(sql).distinct() val properties = new Properties() properties.put("user","aura") properties.put("password","aura") blacklistdf.write.mode(SaveMode.Append) .jdbc("jdbc:mysql://localhost:3306/aura","black_list",properties) } }
4.實時統計每天各省各城市廣告點選量
在上面程式碼後繼續寫下面程式碼就行了。
/** * 實時統計每天各省各城市廣告點選量 * * @param filterLogDStream */ def ProvinceCityAdvClick_Count(filterLogDStream: DStream[String]): DStream[(String, Long)] = { var f = (input: Seq[Long], state: Option[Long]) => { val current_count = input.sum val last_count = state.getOrElse(0) Some(current_count + last_count) } filterLogDStream.map(line => { val fields = line.split(",") val time = fields(0).toLong val mydate = new Date(time) val date = DateUtils.formatDateKey(mydate) val province = fields(1) val city = fields(2) val advid = fields(4) (date + "_" + province + "_" + city + "_" + advid, 1L) }).updateStateByKey(f) /** * 如果開發有需求的話,可以把這些資料庫寫入 MySQL資料庫 ,Hbase */
5.實時統計各省熱門廣告
/**
* 實時統計 各省熱門廣告
*
* transform : rdd -> datafram -> table -> sql
*
* @param date_province_city_advid_count
*/
def ProvinceAdvClick_Count(date_province_city_advid_count: DStream[(String, Long)], spark: SparkSession): Unit = {
date_province_city_advid_count.transform(rdd => {
var date_province_advid_count = rdd.map {
case (date_province_city_advid, count) => {
val fields = date_province_city_advid.split("_")
val date = fields(0)
val province = fields(1)
val advid = fields(3)
(date + "_" + province + "_" + advid, count)
}
}.reduceByKey(_ + _)
val rowRDD = date_province_advid_count.map(tuple => {
val fields = tuple._1.split("_")
val date = fields(0)
val provnice = fields(1)
val advid = fields(2).toLong
val count = tuple._2
Row(date, provnice, advid, count)
})
val schema = StructType(
StructField("date", StringType, true) ::
StructField("province", StringType, true) ::
StructField("advid", LongType, true) ::
StructField("count", LongType, true) :: Nil
)
val df = spark.createDataFrame(rowRDD, schema)
df.createOrReplaceTempView("temp_date_province_adv_count")
val sql =
"""
select
*
from
(
select
date,province,advid,count,row_number() over(partition by province ordr by count desc) rank
from
temp_date_province_adv_count
) temp
where temp.rank < 10
"""
/**
* 把結果持久化到資料庫
*/
spark.sql(sql)
rdd
})
}
6.總的程式碼
package sparkstreaming.lesson09
import java.sql.Date
import java.util.Properties
import kafka.serializer.StringDecoder
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import sparkstreaming.demo.lesson01.ConnectionPool
import sparkstreaming.demo.utils.DateUtils
/**
* Created by Administrator on 2018/5/12.
*
* timestamp:
* 時間戳,使用者點選廣告的時間
* province:
* 省份,使用者在哪個省份點選的廣告
* city:
* 城市,使用者在哪個城市點選的廣告
* userid:
* 使用者的唯一標識
* advid:
* 被點選的廣告id
*/
object AdvApplicationTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("AdvApplicationTest")
conf.set("","") //序列化
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(5))
val spark = SparkSession.builder()
.config(conf).getOrCreate()
/**
* 第一步:從kafka獲取資料(direct 方式)
* K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
*/
val kafkaParams = Map("metadata.broker.list" -> "hadoop1:9092")
val topics = Set("aura")
val logDstream: DStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics).map(_._2)
/**
* 第二步:進行黑名單過濾
*/
val filterLogDStream: DStream[String] = blackListFilter(logDstream,ssc)
/**
* 【一個使用者】【一天內】對【某個廣告】點選的次數超過了【100次】,這樣的使用者屬於黑名單使用者
*
*
* zhangsan:
* A:50 B:60
* lisi:
* A:50 A:20 A:40 這就是黑名單使用者
* 如果一個使用者今天是黑名單使用者,那麼明天還是黑名單使用者嗎?
* 這個看業務而定。
*
* 第三步:動態生成黑名單 實時生成黑名單
*/
DynamicGenerationBlacklists(filterLogDStream,spark)
/**
* 第四步:
* 實時統計每天各省各城市廣告點選量
*/
val dateProvinceCityAdvClick_Count = ProvinceCityAdvClick_Count(filterLogDStream)
/**
* 第五步:
* 實時統計每天各省熱門廣告
* 分組求TopN
*
* transform froeachRDD
* rdd => dataframe
* SparkSQL:
* SQL
*/
/**
* 第六步:
* 實時統計每天每個廣告在最近一小時的滑動視窗的點選趨勢
*/
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
/**
* 對黑名單資料進行過濾
* @param logDstream 從kafka讀取資料
* @return 進行黑名單過濾以後的資料
*/
def blackListFilter(logDstream: DStream[String],ssc:StreamingContext):DStream[String]={
/**
* 這個地方應該是去資料庫裡面去讀取資料
* black_list
*/
val blackList = List((1L,true),(2L,true),(3L,true))
val blackListRDD = ssc.sparkContext.parallelize(blackList)
val balckListBroadcast = ssc.sparkContext.broadcast(blackListRDD.collect())
/**
* 這個地方的黑名單,應該是從我們的持久化的資料庫裡面讀取的:有三個資料庫是我們常用的:
* 1)Reids 自己去百度一下
* 2) HBase 自己去百度一下
* 3) Mysql 上課演示過
* SparkCore的方式讀取的
* SparkSQL -> dataframe -> rdd
*/
logDstream.transform( rdd =>{
val user_lineRDD=rdd.map( line =>{
val fields = line.split(",")
(fields(3).toLong,line)
})
val blackRDD = rdd.sparkContext.parallelize(balckListBroadcast.value)
val resultRDD: RDD[(Long, (String, Option[Boolean]))] = user_lineRDD.leftOuterJoin(blackRDD)
resultRDD.filter( tuple =>{
tuple._2._2.isEmpty
}).map(_._2._1)
})
}
/**
* 動然生成黑名單
* @param filterLogDStream 黑名單過濾萬了以後的資料
* 【一個使用者】【一天內】對【某個廣告】點選的次數超過了【100次】,這樣的使用者屬於黑名單使用者
*
* 梳理一下思路:
* 這個需求 跟 我們單詞計數很像,無非不就是實時統計每個單詞出現了多少次
* 如果發現某個單詞出現了一個100,那麼他就是黑名單單詞
* 方式一:
* (date_userid_advid,v)=map
* 實時統計出來每個單詞出現了多少次=updateStateBykey (對記憶體的要求高一點)
* 張三 A 80
* 李四 B 99
* 100
* fitler 過濾出來次數 一百以上 把它寫入 MySQL,Reids,HBase 資料庫
* 方式二:
* (date_userid_advid,v)=map
* 每次處理的是本批次的資料 reduceBykey(對記憶體的要求低一點)
* HBase:
* rowkey: date_userid_advid 2
* 本批次 3
* 5
* Redis
* 方式三:
* MySQL的方式
*
*
*
*
*/
def DynamicGenerationBlacklists(filterLogDStream: DStream[String],spark:SparkSession):Unit={
val date_userid_advid_ds=filterLogDStream.map( line =>{
val fields = line.split(",")
val time = new Date( fields(0).toLong)
val date = DateUtils.formatDateKey(time)
val userid = fields(3)
val advid = fields(4)
//20180512_
(date+"_"+userid+"_"+advid,1L)
}).reduceByKey(_+_)
date_userid_advid_ds.foreachRDD( rdd =>{
rdd.foreachPartition( partition =>{
val connection = ConnectionPool.getConnection()
val statement = connection.createStatement()
partition.foreach{
case(date_userid_advid,count) =>{
val fields = date_userid_advid.split("_")
val date = fields(0)
val userid = fields(1).toLong
val advid = fields(2).toLong
val sql=s"insert into aura.tmp_advclick_count values($date,$userid,$advid,$count)";
statement.execute(sql);
}
}
ConnectionPool.returnConnection(connection)
})
})
/**
*生成黑名單
*/
val df: DataFrame = spark.read.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/aura")
.option("user", "aura")
.option("password", "aura")
.option("dbtable", "tmp_advclick_count")
.load()
df.createOrReplaceTempView("tmp_advclick_count")
val sql=
"""
SELECT
userid
FROM
(
SELECT
date,userid,advid,sum(click_count) c_count
FROM
tmp_advclick_count
GROUP BY
date,userid,advid
) t
WHERE
t.c_count > 100
"""
//統計出來黑名單
val blacklistdf = spark.sql(sql).distinct()
val properties = new Properties()
properties.put("user","aura")
properties.put("password","aura")
blacklistdf.write.mode(SaveMode.Append)
.jdbc("jdbc:mysql://localhost:3306/aura","black_list",properties)
}
/**
* 實時統計每天各省各城市廣告點選量
* @param filterLogDStream
*/
def ProvinceCityAdvClick_Count(filterLogDStream: DStream[String]):DStream[(String,Long)]={
/**
* 思路
* map => (k,v) => date+province+city+advid 1
* updateStateBykey
*/
var f=(input:Seq[Long],state:Option[Long]) =>{
val current_count = input.sum
val last_count = state.getOrElse(0)
Some(current_count+last_count)
}
filterLogDStream.map( line =>{
val fields = line.split(",")
val time = fields(0).toLong
val mydate = new Date(time)
val date = DateUtils.formatDateKey(mydate)
val province = fields(1)
val city = fields(2)
val advid = fields(4)
(date+"_"+province+"_"+city+"_"+advid,1L)
}).updateStateByKey(f)
/**
* 如果開發有需求的話,可以把這些資料庫寫入 MySQL資料庫 ,Hbase
*/
}
/**
* 實時統計 各省熱門廣告
*
* transform : rdd -> datafram -> table -> sql
* @param date_province_city_advid_count
*/
def ProvinceAdvClick_Count(date_province_city_advid_count:DStream[(String,Long)],spark:SparkSession): Unit ={
date_province_city_advid_count.transform( rdd =>{
var date_province_advid_count= rdd.map{
case(date_province_city_advid,count) =>{
val fields = date_province_city_advid.split("_")
val date = fields(0)
val province = fields(1)
val advid = fields(3)
(date+"_"+province+"_"+advid,count)
}
}.reduceByKey(_+_)
val rowRDD=date_province_advid_count.map( tuple =>{
val fields = tuple._1.split("_")
val date = fields(0)
val provnice = fields(1)
val advid = fields(2).toLong
val count = tuple._2
Row(date,provnice,advid,count)
})
val schema=StructType(
StructField("date",StringType,true)::
StructField("province",StringType,true)::
StructField("advid",LongType,true)::
StructField("count",LongType,true):: Nil
)
val df = spark.createDataFrame(rowRDD,schema)
df.createOrReplaceTempView("temp_date_province_adv_count")
val sql=
"""
select
*
from
(
select
date,province,advid,count,row_number() over(partition by province ordr by count desc) rank
from
temp_date_province_adv_count
) temp
where temp.rank < 10
"""
/**
* 把結果持久化到資料庫
*/
spark.sql(sql)
rdd
})
}
}