SparkSql實現access中的ip與ip規則庫的關聯(方法二)
阿新 • • 發佈:2018-12-27
根據業務不同,一般都是需要自定義udf來操作
package Test import Test.SQLIIpLocation1.ip2Long import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} /** * 使用SparkSql實現access中的ip與ip規則庫的關聯 優點:不需要提前拿到全量的ip地址庫,可以採用這種自定義函式的方式讀取關係型資料庫、Nosql、第三方Api等等 */ object SQLIpLocation2 { /** * 定義一個ip轉換的成十進位制 * * @param ip * @return */ def ip2Long(ip: String): Long = { val fragments = ip.split("[.]") var ipNum = 0L for (i <- 0 until fragments.length) { ipNum = fragments(i).toLong | ipNum << 8L } ipNum } /** * 二分查詢 * @param lines * @param ip * @return */ def binarySearch(lines:Array[(Long,Long,String)],ip:Long):Int={ //定義一個初始值 var low =0 //定義一個末位置 var high =lines.length-1 while(low<= high){ val middle =(low +high) /2 if((ip>=lines(middle)._1) && (ip <=lines(middle)._2)) return middle if (ip< lines(middle)._1) high = middle -1 else{ low = middle +1 } } -1 } def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("SQLIpLocation2") .master("local[*]") .getOrCreate() //讀取ip規則資料 val ipRulesLines: Dataset[String] = spark.read.textFile(args(0)) //匯入隱式轉換 import spark.implicits._ //整理ip規則資料 val tpDs: Dataset[(Long, Long, String)] = ipRulesLines.map(line => { val fields: Array[String] = line.split("[|]") val startNum = fields(2).toLong val endNum = fields(3).toLong val province = fields(6) (startNum, endNum, province) }) //將全部的ip規則收集到Driver端 val ipRulesInDriver: Array[(Long, Long, String)] = tpDs.collect() //廣播,阻塞的方法,沒有廣播完,不在往下執行 val broadCastRef: Broadcast[Array[(Long, Long, String)]] = spark.sparkContext.broadcast(ipRulesInDriver) //讀取訪問日誌資料 val accessLog: Dataset[String] = spark.read.textFile(args(1)) //整理訪問日誌資料 val ipLogs: DataFrame = accessLog.map(line => { val fields: Array[String] = line.split("[|]") val ip: String = fields(1) ip2Long(ip) }).toDF("ip_num") //將ip日誌註冊成檢視 ipLogs.createOrReplaceTempView("v_ip_logs") //udf,定義並註冊一個自定義函式 //自定義函式是在哪裡定義的?(Driver),業務邏輯在哪裡執行?(Executor) spark.udf.register("ip_num2Province",(ipNum:Long)=>{ //獲取廣播到Executor端的全部ip規則 val rulesInExecutor: Array[(Long, Long, String)] = broadCastRef.value val index = binarySearch(rulesInExecutor,ipNum) var province="未知省份" if(index != -1) province =rulesInExecutor(index)._3 province }) val result: DataFrame = spark.sql("SELECT ip_num2Province(ip_num),province,count(1) counts from v_ip_logs group by province order by counts desc") result.show() spark.stop() } }