1. 程式人生 > >spark通過ip計算IP所在省份,以及廣播變數的使用

spark通過ip計算IP所在省份,以及廣播變數的使用

其中需要一個IP段對應的碼錶內容大體如下(已經上傳到csdn,下載地址:點選跳轉下載頁面):

1,3708713472,3708715007,"河南省","信陽市","聯通","221.14.122.0","221.14.127.255"
2,3708649472,3708813311,"河南省",,"聯通","221.13.128.0","221.15.255.255"
3,3720390656,3720391679,"河北省","邢臺市","聯通","221.192.168.0","221.192.171.255"
4,1038992128,1038992383,"黑龍江省","齊齊哈爾市","鐵通","61.237.195.0"
,"61.237.195.255"

可以通過都好分割,其中第一列為ID,第二列為ip地址轉換成long型後的上界,第三列為下界,第四列為省份,第五列為城市,第六列為運營商,第七列ip上界,第八列ip下界

廣播變數其實就是和hadoop的map端join一樣,將資料分發到各個執行節點的記憶體裡面,在spark中使用:
sc.broadcast 這個方法就能將變數廣播到各個執行節點裡面,具體用法如下工程

工程專案如下:

大體內容:根據ip獲得訪問城市的省份,並且根據訪問次數進行排序

這裡寫圖片描述

其中Bootstrap:

package cn.lijie.business

import org.apache.spark.{SparkConf, SparkContext}

/**
  * User: lijie
  */
object Bootstrap { /** * 二分查詢 * * @param arr * @param ip * @return */ def binarySearch(arr: Array[(String, String, String, String)], ip: Long): Int = { var l = 0 var h = arr.length - 1 while (l <= h) { var m = (l + h) / 2 if ((ip >= arr(m)._1.toLong) && (ip <= arr(m)._2.toLong)) { return
m } else if (ip < arr(m)._1.toLong) { h = m - 1 } else { l = m + 1 } } -1 } /** * IP轉long * * @param ip * @return */ def ip2Long(ip: String): Long = { val arr = ip.split("[.]") var num = 0L for (i <- 0 until arr.length) { num = arr(i).toLong | num << 8L } num } def main(args: Array[String]): Unit = { // print(3395782400.00.toLong) //1,3708713472.00,3708715007.00,"河南省","信陽市","聯通","221.14.122.0","221.14.127.255" //id 下界 上界 省份 城市 運營商 ip段下界 ip段下界 //這裡對IP.txt裡面的內容進行排序,安裝上界的升序排 val conf = new SparkConf().setMaster("local[2]").setAppName("ip") val sc = new SparkContext(conf) val rdd1 = sc.textFile("src/main/file/*.txt").map(x => { val s = x.split(",") //下界 上界 省份 運營商 (s(1), s(2), s(3), s(5)) }).sortBy(_._1) //廣播變數 val bd = sc.broadcast(rdd1.collect) val rdd2 = sc.textFile("src/main/file/*.info").map(x => { val s = x.split(",") //(ip,1) (s(1), 1) }).reduceByKey(_ + _).sortBy(_._2) rdd2.map(x => { val ipLong = ip2Long(x._1) //獲取下標 val index = binarySearch(bd.value, ipLong) //沒找到的返回unknown if (index == -1) { (ipLong, x._1, x._2, "unknown", "unknown") } else { //獲取省份 val p = bd.value(index)._3 //獲取運營商 val y = bd.value(index)._4 (ipLong, x._1, x._2, p, y) } }).repartition(1).saveAsTextFile("C:\\Users\\Administrator\\Desktop\\out") sc.stop() } }

ip.txt檔案就是我上傳的那份檔案
下載地址:點選跳轉下載頁面

ip.info是我模擬的幾條資料:

14:45:17,202.98.248.242
15:45:17,219.220.199.250
16:45:17,219.220.199.250
18:45:17,202.98.248.242
18:45:17,202.98.248.242
18:45:17,202.98.248.242
18:45:17,202.98.248.242
18:45:17,202.98.248.242
16:45:17,114.139.223.13
15:45:17,219.220.199.250
16:45:17,219.220.199.250
15:45:17,219.220.199.250
16:45:17,219.220.199.250
15:45:17,219.220.199.250
16:45:17,219.220.199.250
13:45:17,114.139.223.13
10:45:17,114.139.223.13
13:45:17,114.139.223.13
10:45:17,114.139.223.13
10:45:17,114.10.123.13

執行完成後:

這裡寫圖片描述

這裡寫圖片描述