Spark學習筆記(9)—— Spark IP位置查詢
阿新 • • 發佈:2018-12-16
1 資料來源
ip.txt
1.0.1.0|1.0.3.255|16777472|16778239|亞洲|中國|福建|福州||電信|350100|China|CN|119.306239|26.075302 1.0.8.0|1.0.15.255|16779264|16781311|亞洲|中國|廣東|廣州||電信|440100|China|CN|113.280637|23.125178 1.0.32.0|1.0.63.255|16785408|16793599|亞洲|中國|廣東|廣州||電信|440100|China|CN|113.280637|23.125178 1.1.0.0|1.1.0.255|16842752|16843007|亞洲|中國|福建|福州||電信|350100|China|CN|119.306239|26.075302 1.1.2.0|1.1.7.255|16843264|16844799|亞洲|中國|福建|福州||電信|350100|China|CN|119.306239|26.075302 1.1.8.0|1.1.63.255|16844800|16859135|亞洲|中國|廣東|廣州||電信|440100|China|CN|113.280637|23.125178 1.2.0.0|1.2.1.255|16908288|16908799|亞洲|中國|福建|福州||電信|350100|China|CN|119.306239|26.075302 1.2.2.0|1.2.2.255|16908800|16909055|亞洲|中國|北京|北京|海淀|北龍中網|110108|China|CN|116.29812|39.95931 1.2.4.0|1.2.4.255|16909312|16909567|亞洲|中國|北京|北京||中國網際網路資訊中心|110100|China|CN|116.405285|39.904989 1.2.5.0|1.2.7.255|16909568|16910335|亞洲|中國|福建|福州||電信|350100|China|CN|119.306239|26.075302 1.2.8.0|1.2.8.255|16910336|16910591|亞洲|中國|北京|北京||中國網際網路資訊中心|110100|China|CN|116.405285|39.904989 1.2.9.0|1.2.127.255|16910592|16941055|亞洲|中國|廣東|廣州||電信|440100|China|CN|113.280637|23.125178 1.3.0.0|1.3.255.255|16973824|17039359|亞洲|中國|廣東|廣州||電信|440100|China|CN|113.280637|23.125178 1.4.1.0|1.4.3.255|17039616|17040383|亞洲|中國|福建|福州||電信|350100|China|CN|119.306239|26.075302 1.4.4.0|1.4.4.255|17040384|17040639|亞洲|中國|北京|北京|海淀|北龍中網|110108|China|CN|116.29812|39.95931 ......
access.log
20090121000132095572000|125.213.100.123|show.51.com|/shoplist.php?phpfile=shoplist2.php&style=1&sex=137|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; Mozilla/4.0(Compatible Mozilla/4.0(Compatible-EmbeddedWB 14.59 http://bsalsa.com/ EmbeddedWB- 14.59 from: http://bsalsa.com/ )|http://show.51.com/main.php|
2 原始碼
package iptest
import org.apache.spark.{SparkConf, SparkContext}
object IPLocation {
def ip2Long(ip: String): Long = {
val fragments = ip.split("[.]")
var ipNum = 0L
for (i <- 0 until fragments.length) {
ipNum = fragments(i).toLong | ipNum << 8L
}
ipNum
}
def binarySearch(lines: Array[(String, String, String)], ip: Long): Int = {
var low = 0
var high = lines.length - 1
while (low <= high) {
val middle = (low + high) / 2
if ((ip >= lines(middle)._1.toLong) && (ip <= lines(middle)._2.toLong))
return middle
if (ip < lines(middle)._1.toLong)
high = middle - 1
else {
low = middle + 1
}
}
-1
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("IpLocation")
val sc = new SparkContext(conf)
val ipRulesRdd = sc.textFile("d://ip.txt").map(line => {
val fields = line.split("\\|")
val start_num = fields(2)
val end_num = fields(3)
val province = fields(6)
(start_num, end_num, province)
})
//全部的 IP 對映規則
val ipRulesArrary = ipRulesRdd.collect()
//廣播規則
val ipRulesBroadcast = sc.broadcast(ipRulesArrary)
// 載入要處理的資料
val ipsRDD = sc.textFile("d://access.log").map(line => {
val fields = line.split("\\|")
fields(1)
})
val result = ipsRDD.map(ip => {
val ipNum = ip2Long(ip)
val index = binarySearch(ipRulesBroadcast.value, ipNum)
val info = ipRulesBroadcast.value(index)
info
})
println(result.collect().toBuffer)
sc.stop()
}
}
ArrayBuffer((2111136768,2111137023,重慶), (1969487872,1969618943,北京),
(1969487872,1969618943,北京), (1937244160,1937276927,陝西), (2076475392,2076527103,河北),
(3728160768,3728161535,重慶), (2076475392,2076527103,河北), (1937244160,1937276927,陝西),
(1937244160,1937276927,陝西), (1969487872,1969618943,北京), (2076475392,2076527103,河北),
(1969487872,1969618943,北京), (1937244160,1937276927,陝西), (1969487872,1969618943,北京),
(1937244160,1937276927,陝西), (1969487872,1969618943,北京), (2076475392,2076527103,河北),
......