1. 程式人生 > >Spark學習筆記(9)—— Spark IP位置查詢

Spark學習筆記(9)—— Spark IP位置查詢

1 資料來源

ip.txt

1.0.1.0|1.0.3.255|16777472|16778239|亞洲|中國|福建|福州||電信|350100|China|CN|119.306239|26.075302
1.0.8.0|1.0.15.255|16779264|16781311|亞洲|中國|廣東|廣州||電信|440100|China|CN|113.280637|23.125178
1.0.32.0|1.0.63.255|16785408|16793599|亞洲|中國|廣東|廣州||電信|440100|China|CN|113.280637|23.125178
1.1.0.0|1.1.0.255|16842752|16843007|亞洲|中國|福建|福州||電信|350100|China|CN|119.306239|26.075302
1.1.2.0|1.1.7.255|16843264|16844799|亞洲|中國|福建|福州||電信|350100|China|CN|119.306239|26.075302
1.1.8.0|1.1.63.255|16844800|16859135|亞洲|中國|廣東|廣州||電信|440100|China|CN|113.280637|23.125178
1.2.0.0|1.2.1.255|16908288|16908799|亞洲|中國|福建|福州||電信|350100|China|CN|119.306239|26.075302
1.2.2.0|1.2.2.255|16908800|16909055|亞洲|中國|北京|北京|海淀|北龍中網|110108|China|CN|116.29812|39.95931
1.2.4.0|1.2.4.255|16909312|16909567|亞洲|中國|北京|北京||中國網際網路資訊中心|110100|China|CN|116.405285|39.904989
1.2.5.0|1.2.7.255|16909568|16910335|亞洲|中國|福建|福州||電信|350100|China|CN|119.306239|26.075302
1.2.8.0|1.2.8.255|16910336|16910591|亞洲|中國|北京|北京||中國網際網路資訊中心|110100|China|CN|116.405285|39.904989
1.2.9.0|1.2.127.255|16910592|16941055|亞洲|中國|廣東|廣州||電信|440100|China|CN|113.280637|23.125178
1.3.0.0|1.3.255.255|16973824|17039359|亞洲|中國|廣東|廣州||電信|440100|China|CN|113.280637|23.125178
1.4.1.0|1.4.3.255|17039616|17040383|亞洲|中國|福建|福州||電信|350100|China|CN|119.306239|26.075302
1.4.4.0|1.4.4.255|17040384|17040639|亞洲|中國|北京|北京|海淀|北龍中網|110108|China|CN|116.29812|39.95931
......

access.log

20090121000132095572000|125.213.100.123|show.51.com|/shoplist.php?phpfile=shoplist2.php&style=1&sex=137|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; Mozilla/4.0(Compatible Mozilla/4.0(Compatible-EmbeddedWB 14.59 http://bsalsa.com/ EmbeddedWB- 14.59  from: http://bsalsa.com/ )|http://show.51.com/main.php|

2 原始碼

package iptest

import org.apache.spark.{SparkConf, SparkContext}

object IPLocation {
  def ip2Long(ip: String): Long = {
    val fragments = ip.split("[.]")
    var ipNum = 0L
    for (i <- 0 until fragments.length) {
      ipNum = fragments(i).toLong | ipNum << 8L
    }
    ipNum
  }
def binarySearch(lines: Array[(String, String, String)], ip: Long): Int = { var low = 0 var high = lines.length - 1 while (low <= high) { val middle = (low + high) / 2 if ((ip >= lines(middle)._1.toLong) && (ip <= lines(middle)._2.toLong)) return middle if (ip < lines(middle)._1.toLong) high = middle - 1 else { low = middle + 1 } } -1 } def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("IpLocation") val sc = new SparkContext(conf) val ipRulesRdd = sc.textFile("d://ip.txt").map(line => { val fields = line.split("\\|") val start_num = fields(2) val end_num = fields(3) val province = fields(6) (start_num, end_num, province) }) //全部的 IP 對映規則 val ipRulesArrary = ipRulesRdd.collect() //廣播規則 val ipRulesBroadcast = sc.broadcast(ipRulesArrary) // 載入要處理的資料 val ipsRDD = sc.textFile("d://access.log").map(line => { val fields = line.split("\\|") fields(1) }) val result = ipsRDD.map(ip => { val ipNum = ip2Long(ip) val index = binarySearch(ipRulesBroadcast.value, ipNum) val info = ipRulesBroadcast.value(index) info }) println(result.collect().toBuffer) sc.stop() } }
ArrayBuffer((2111136768,2111137023,重慶), (1969487872,1969618943,北京), 
(1969487872,1969618943,北京), (1937244160,1937276927,陝西), (2076475392,2076527103,河北), 
(3728160768,3728161535,重慶), (2076475392,2076527103,河北), (1937244160,1937276927,陝西), 
(1937244160,1937276927,陝西), (1969487872,1969618943,北京), (2076475392,2076527103,河北), 
(1969487872,1969618943,北京), (1937244160,1937276927,陝西), (1969487872,1969618943,北京), 
(1937244160,1937276927,陝西), (1969487872,1969618943,北京), (2076475392,2076527103,河北), 
......