spark中廣播變數的使用
阿新 • • 發佈:2018-10-31
import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * Created by zx on 2017/10/9. */ object IpLoaction2 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("IpLoaction1").setMaster("local[4]") val sc = new SparkContext(conf) //取到HDFS中的ip規則 val rulesLines:RDD[String] = sc.textFile(args(0)) //整理ip規則資料 val ipRulesRDD: RDD[(Long, Long, String)] = rulesLines.map(line => { val fields = line.split("[|]") val startNum = fields(2).toLong val endNum = fields(3).toLong val province = fields(6) (startNum, endNum, province) }) //將分散在多個Executor中的部分IP規則收集到Driver端 val rulesInDriver: Array[(Long, Long, String)] = ipRulesRDD.collect() //將Driver端的資料廣播到Executor //廣播變數的引用(還在Driver端) val broadcastRef: Broadcast[Array[(Long, Long, String)]] = sc.broadcast(rulesInDriver) //建立RDD,讀取訪問日誌 val accessLines: RDD[String] = sc.textFile(args(1)) //整理資料 val proviceAndOne: RDD[(String, Int)] = accessLines.map(log => { //將log日誌的每一行進行切分 val fields = log.split("[|]") val ip = fields(1) //將ip轉換成十進位制 val ipNum = MyUtils.ip2Long(ip) //進行二分法查詢,通過Driver端的引用或取到Executor中的廣播變數 //(該函式中的程式碼是在Executor中別調用執行的,通過廣播變數的引用,就可以拿到當前Executor中的廣播的規則了) //Driver端廣播變數的引用是怎樣跑到Executor中的呢? //Task是在Driver端生成的,廣播變數的引用是伴隨著Task被髮送到Executor中的 val rulesInExecutor: Array[(Long, Long, String)] = broadcastRef.value //查詢 var province = "未知" val index = MyUtils.binarySearch(rulesInExecutor, ipNum) if (index != -1) { province = rulesInExecutor(index)._3 } (province, 1) }) //聚合 //val sum = (x: Int, y: Int) => x + y val reduced: RDD[(String, Int)] = proviceAndOne.reduceByKey(_+_) //將結果列印 //val r = reduced.collect() //println(r.toBuffer) /** reduced.foreach(tp => { //將資料寫入到MySQL中 //問?在哪一端獲取到MySQL的連結的? //是在Executor中的Task獲取的JDBC連線 val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?charatorEncoding=utf-8", "root", "123568") //寫入大量資料的時候,有沒有問題? val pstm = conn.prepareStatement("...") pstm.setString(1, tp._1) pstm.setInt(2, tp._2) pstm.executeUpdate() pstm.close() conn.close() }) */ //一次拿出一個分割槽(一個分割槽用一個連線,可以將一個分割槽中的多條資料寫完在釋放jdbc連線,這樣更節省資源) // reduced.foreachPartition(it => { // val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "123568") // //將資料通過Connection寫入到資料庫 // val pstm: PreparedStatement = conn.prepareStatement("INSERT INTO access_log VALUES (?, ?)") // //將一個分割槽中的每一條資料拿出來 // it.foreach(tp => { // pstm.setString(1, tp._1) // pstm.setInt(2, tp._2) // pstm.executeUpdate() // }) // pstm.close() // conn.close() // }) reduced.foreachPartition(it => MyUtils.data2MySQL(it)) sc.stop() } }