spark streaming 廣播變數的測試
阿新 • • 發佈:2018-10-31
最近寫的一個流式的程式需要從redis 中獲取變數資訊,並廣播,其中redis裡面的資訊是變動的,要求廣播變數也要跟著改變,下面是測試程式碼:
val dStream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ).map(x => URLDecoder.decode(x.value())) val appkeyMap2 = RedisFieldData.getMapappKeyBroadCast =sc.broadcast(appkeyMap2) //輸出每分鐘的計算結果 dStream.foreachRDD{ rdd => var map: scala.collection.mutable.Map[String, String] = mutable.Map[String, String]() var keySet: Set[String] = Set() var valueSet: Set[String] = Set() import scala.collection.JavaConversions._val appkeyMap = RedisFieldData.getMap //.asInstanceOf[scala.collection.immutable.Map[String,String]] if (appkeyMap != null) { appkeyMap.map { x => keySet += x._1 } appkeyMap.map { x => valueSet += x._2 } } if(appKeyBroadCast!=null){ appKeyBroadCast.unpersist()println("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx") } appKeyBroadCast =sc.broadcast(appkeyMap) println("廣播變數的數量xxxxx : "+appKeyBroadCast.value.size()) println("=============================================================================================") //rdd.take(10) /*if ((appkeySet -- keySet) != Set()){ map.putAll(updateOrSetAppKeyMap(appkeySet, keySet, valueSet)) RedisFieldData.setMap(map) //appkeyMap.putAll(map) map.putAll(appkeyMap) }*/ //println("new map :"+map.isEmpty) } // if(appKeyBroadCast==null){ println("廣播變數的數量 : "+appKeyBroadCast.value.size()) //} dStream.print()
程式碼分別在兩個進行廣播的測試,一個在foreachRDD 中@1 另一個為@2
@1:每一個duration中,廣播變數都會更新,並且更新的廣播變數在foreachRDD外無效
@2:廣播程式碼只在啟動後執行一次,其他duration不執行
另外,廣播變數時會遇到序列化問題,去掉checkPoint之後,問題解決,查詢之後得知現在版本的checkpoint還有很多問題,具體參考如下:checkpoint的弊端: http://blog.csdn.net/u010454030/article/details/54985740