SparkSql實現access中的ip與ip規則的關聯(方法一)
阿新 • • 發佈:2018-12-27
package Test import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} /** * 使用SparkSql實現access中的ip與ip規則的關聯 * 前提條件:需要提前拿到全量的ip資源庫資訊,才可以生成DataFrame */ object SQLIIpLocation1 { /** * 定義一個ip轉換的成十進位制 * @param ip * @return */ def ip2Long(ip:String):Long={ val fragments = ip.split("[.]") var ipNum =0L for(i<- 0 until fragments.length){ ipNum = fragments(i).toLong | ipNum << 8L } ipNum } def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("SQLIIpLocation1") .master("local[*]") .getOrCreate() //讀取ip規則資料 val ipRulesLines: Dataset[String] = spark.read.textFile(args(0)) //匯入隱式轉換 import spark.implicits._ //整理ip規則資料 val tpDs: Dataset[(Long, Long, String)] = ipRulesLines.map(line => { val fields: Array[String] = line.split("[|]") val startNum = fields(2).toLong val endNum = fields(3).toLong val province = fields(6) (startNum, endNum, province) }) //將ip規則轉換成DataFrame val ipRulesDF: DataFrame = tpDs.toDF("start_num","end_num","province") //將ip規則註冊成檢視 ipRulesDF.createOrReplaceTempView("v_ip_rules") //讀取訪問日誌資料 val accessLog: Dataset[String] = spark.read.textFile(args(1)) //整理訪問日誌資料 val ipLogs: DataFrame = accessLog.map(line => { val fields: Array[String] = line.split("[|]") val ip: String = fields(1) ip2Long(ip) }).toDF("ip") //將ip日誌註冊成檢視 ipLogs.createOrReplaceTempView("v_ip_logs") val result = spark.sql("SELECT province,Count(1) counts From v_ip_rules a,v_ip_logs b WHERE b.ip >=a.start_num and b.ip <= a.end_num GROUP BY province ORDER BY counts desc") result.show() //釋放資源 spark.stop() } }