讀取大檔案資料進入redis作為快取:贈(廣播變數)
阿新 • • 發佈:2018-12-11
在專案中使用Redis做快取檔案(目的等同於廣播變數):
package com.app import com.utils.{JedisConnectionPool, RptUtils} import org.apache.commons.lang.StringUtils import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext} /** * 使用redis存放字典檔案 */ object AppRpt2 { def main(args: Array[String]): Unit = { if(args.length != 3){ println("目錄不存在,請重新輸入") sys.exit() } val Array(inputPath,ouputPath,resultPath) = args val conf = new SparkConf().setAppName(s"${this.getClass.getName}").setMaster("local[*]") //設定spark序列化方式 .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sc = new SparkContext(conf) val sQLContext = new SQLContext(sc) //配置壓縮格式 sQLContext.setConf("spark.sql.parquet.compression.codec", "snappy") //讀取字典檔案 val dicMap = sc.textFile(resultPath).map(_.split("\t",-1)).filter(_.length>=5) .map(arr=>{ // com.123.cn 愛奇藝 (arr(4),arr(1)) }) //將字典檔案存到Redis中 dicMap.foreachPartition(part=>{ val jedis = JedisConnectionPool.getConnection() part.foreach(t=>{ //存redis ip nams jedis.set(t._1,t._2) }) }) //讀取parquet檔案 val df = sQLContext.read.parquet(inputPath) df.mapPartitions(maps=>{ val jedis = JedisConnectionPool.getConnection() maps.map(row=> { var appname = row.getAs[String]("appname") //廣播變數對比 redis快取 從redis快取讀取 if (!StringUtils.isNotBlank(appname)) { //如果取到的值是null 則用他的id去字典表裡得到name //appname = broadcast.value.getOrElse("appid","unknow") val appid = row.getAs[String]("appid") appname = jedis.get(appid) } //把需要的欄位拿出來 // 原始請求數,有效請求數,廣告請求數 val requestmode = row.getAs[Int]("requestmode") val processnode = row.getAs[Int]("processnode") val iseffective = row.getAs[Int]("iseffective") // 參與競價數 競價成功數,展示數,點選數 val isbilling = row.getAs[Int]("isbilling") val isbid = row.getAs[Int]("isbid") val iswin = row.getAs[Int]("iswin") val adorderid = row.getAs[Int]("adorderid") // 廣告費用 廣告成本費用 val winPrice = row.getAs[Double]("winprice") val adpayment = row.getAs[Double]("adpayment") //呼叫業務的方法 val reqlist = RptUtils.calculateReq(requestmode, processnode) val rtblist = RptUtils.calculateRtb(iseffective, isbilling, isbid, iswin, adorderid, winPrice, adpayment) val cliklist = RptUtils.calculateTimes(requestmode, iseffective) (appname, reqlist ++ rtblist ++ cliklist) }) }).reduceByKey((list1,list2)=>{ // list(0,2,1,5) list(2,5,4,7) zip((0,2),(2,5),(1,4),(5,7)) list1.zip(list2).map(t=>t._1+t._2) }).map(t=>{ t._1+","+t._2.mkString(",") }).take(10).toBuffer.foreach(println) } }
贈:利用廣播變數廣播小檔案:
package com.app import com.utils.RptUtils import org.apache.commons.lang.StringUtils import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext /** * 使用廣播變數broadcast廣播小檔案 */ object AppRpt { def main(args: Array[String]): Unit = { if (args.length != 3) { println("目錄不存在,請重新輸入") sys.exit() } val Array(inputPath, outputPath,resultPath) = args val conf = new SparkConf().setAppName(s"${this.getClass.getName}").setMaster("local[*]") //搞定第二個需求 .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sc = new SparkContext(conf) val sQLContext = new SQLContext(sc) //在1.6版本時候預設的壓縮方式還不是snappy,到了2.0之後預設是snappy sQLContext.setConf("spark.sql.parquet.compression.codec", "snappy") //讀取字典檔案 val dicMap: Map[String, String] = sc.textFile(resultPath).map(_.split("\t",-1)).filter(_.length>=5) .map(arr=>{ // com.123.cn 愛奇藝 (arr(4),arr(1)) }).collect().toMap //dicMap.take(100).toBuffer.foreach(println) //將小檔案廣播出去 val broadcast = sc.broadcast(dicMap) val df = sQLContext.read.parquet(inputPath) df.map(row=>{ // 如果我們取到的是空值的話,那麼將取字典檔案中進行查詢 var appname = row.getAs[String]("appname") if(!StringUtils.isNotBlank(appname)){ // 這一塊 做的是通過我們的時間APPId獲取字典檔案中對應的APPID // 然後取到它的value //com.123.cn 愛奇藝 appname = broadcast.value.getOrElse(row.getAs[String]("appid"),"unknow") } //val appname = broadcast.value.getOrElse(row.getAs[String]("appid"),"unknow") //先把需要的欄位拿出來,再進行操作 //處理 原始請求數,有效請求數,廣告請求數 val requestmode = row.getAs[Int]("requestmode") val processnode = row.getAs[Int]("processnode") val iseffective = row.getAs[Int]("iseffective") //參與競價數,競價成功數,展示數,點選數 val isbilling = row.getAs[Int]("isbilling") val isbid = row.getAs[Int]("isbid") val iswin = row.getAs[Int]("iswin") val adorderid = row.getAs[Int]("adorderid") // 處理 廣告消費,廣告成本 val winPrice = row.getAs[Double]("winprice") val adpayment = row.getAs[Double]("adpayment") //呼叫業務的方法 val reqlist = RptUtils.calculateReq(requestmode,processnode) val rtblist = RptUtils.calculateRtb(iseffective,isbilling,isbid,iswin,adorderid,winPrice,adpayment) val cliklist = RptUtils.calculateTimes(requestmode,iseffective) (appname, reqlist++rtblist++cliklist) }).reduceByKey((list1,list2)=>{ // list(0,2,1,5) list(2,5,4,7) zip((0,2),(2,5),(1,4),(5,7)) list1.zip(list2).map(t=>t._1+t._2) }).map(t=>{ t._1+","+t._2.mkString(",") }).take(1000).toBuffer.foreach(println) } }