spark通過ip計算IP所在省份,以及廣播變數的使用
阿新 • • 發佈:2019-01-31
其中需要一個IP段對應的碼錶內容大體如下(已經上傳到csdn,下載地址:點選跳轉下載頁面):
1,3708713472,3708715007,"河南省","信陽市","聯通","221.14.122.0","221.14.127.255"
2,3708649472,3708813311,"河南省",,"聯通","221.13.128.0","221.15.255.255"
3,3720390656,3720391679,"河北省","邢臺市","聯通","221.192.168.0","221.192.171.255"
4,1038992128,1038992383,"黑龍江省","齊齊哈爾市","鐵通","61.237.195.0" ,"61.237.195.255"
可以通過都好分割,其中第一列為ID,第二列為ip地址轉換成long型後的上界,第三列為下界,第四列為省份,第五列為城市,第六列為運營商,第七列ip上界,第八列ip下界
廣播變數其實就是和hadoop的map端join一樣,將資料分發到各個執行節點的記憶體裡面,在spark中使用:
sc.broadcast 這個方法就能將變數廣播到各個執行節點裡面,具體用法如下工程
工程專案如下:
大體內容:根據ip獲得訪問城市的省份,並且根據訪問次數進行排序
其中Bootstrap:
package cn.lijie.business
import org.apache.spark.{SparkConf, SparkContext}
/**
* User: lijie
*/
object Bootstrap {
/**
* 二分查詢
*
* @param arr
* @param ip
* @return
*/
def binarySearch(arr: Array[(String, String, String, String)], ip: Long): Int = {
var l = 0
var h = arr.length - 1
while (l <= h) {
var m = (l + h) / 2
if ((ip >= arr(m)._1.toLong) && (ip <= arr(m)._2.toLong)) {
return m
} else if (ip < arr(m)._1.toLong) {
h = m - 1
} else {
l = m + 1
}
}
-1
}
/**
* IP轉long
*
* @param ip
* @return
*/
def ip2Long(ip: String): Long = {
val arr = ip.split("[.]")
var num = 0L
for (i <- 0 until arr.length) {
num = arr(i).toLong | num << 8L
}
num
}
def main(args: Array[String]): Unit = {
// print(3395782400.00.toLong)
//1,3708713472.00,3708715007.00,"河南省","信陽市","聯通","221.14.122.0","221.14.127.255"
//id 下界 上界 省份 城市 運營商 ip段下界 ip段下界
//這裡對IP.txt裡面的內容進行排序,安裝上界的升序排
val conf = new SparkConf().setMaster("local[2]").setAppName("ip")
val sc = new SparkContext(conf)
val rdd1 = sc.textFile("src/main/file/*.txt").map(x => {
val s = x.split(",")
//下界 上界 省份 運營商
(s(1), s(2), s(3), s(5))
}).sortBy(_._1)
//廣播變數
val bd = sc.broadcast(rdd1.collect)
val rdd2 = sc.textFile("src/main/file/*.info").map(x => {
val s = x.split(",")
//(ip,1)
(s(1), 1)
}).reduceByKey(_ + _).sortBy(_._2)
rdd2.map(x => {
val ipLong = ip2Long(x._1)
//獲取下標
val index = binarySearch(bd.value, ipLong)
//沒找到的返回unknown
if (index == -1) {
(ipLong, x._1, x._2, "unknown", "unknown")
} else {
//獲取省份
val p = bd.value(index)._3
//獲取運營商
val y = bd.value(index)._4
(ipLong, x._1, x._2, p, y)
}
}).repartition(1).saveAsTextFile("C:\\Users\\Administrator\\Desktop\\out")
sc.stop()
}
}
ip.txt檔案就是我上傳的那份檔案
下載地址:點選跳轉下載頁面
ip.info是我模擬的幾條資料:
14:45:17,202.98.248.242
15:45:17,219.220.199.250
16:45:17,219.220.199.250
18:45:17,202.98.248.242
18:45:17,202.98.248.242
18:45:17,202.98.248.242
18:45:17,202.98.248.242
18:45:17,202.98.248.242
16:45:17,114.139.223.13
15:45:17,219.220.199.250
16:45:17,219.220.199.250
15:45:17,219.220.199.250
16:45:17,219.220.199.250
15:45:17,219.220.199.250
16:45:17,219.220.199.250
13:45:17,114.139.223.13
10:45:17,114.139.223.13
13:45:17,114.139.223.13
10:45:17,114.139.223.13
10:45:17,114.10.123.13
執行完成後: