1. 程式人生 > >spark streaming 廣播變數的測試

spark streaming 廣播變數的測試

最近寫的一個流式的程式需要從redis 中獲取變數資訊,並廣播,其中redis裡面的資訊是變動的,要求廣播變數也要跟著改變,下面是測試程式碼:

val dStream = KafkaUtils.createDirectStream[String, String](
     ssc,
     PreferConsistent,
     Subscribe[String, String](topics, kafkaParams)
   ).map(x => URLDecoder.decode(x.value()))

 val appkeyMap2 = RedisFieldData.getMap
appKeyBroadCast =sc.broadcast(appkeyMap2)
//輸出每分鐘的計算結果 dStream.foreachRDD{ rdd => var map: scala.collection.mutable.Map[String, String] = mutable.Map[String, String]() var keySet: Set[String] = Set() var valueSet: Set[String] = Set() import scala.collection.JavaConversions._
val appkeyMap = RedisFieldData.getMap //.asInstanceOf[scala.collection.immutable.Map[String,String]] if (appkeyMap != null) { appkeyMap.map { x => keySet += x._1 } appkeyMap.map { x => valueSet += x._2 } } if(appKeyBroadCast!=null){ appKeyBroadCast.unpersist()
println("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx") } appKeyBroadCast =sc.broadcast(appkeyMap) println("廣播變數的數量xxxxx : "+appKeyBroadCast.value.size()) println("=============================================================================================") //rdd.take(10) /*if ((appkeySet -- keySet) != Set()){ map.putAll(updateOrSetAppKeyMap(appkeySet, keySet, valueSet)) RedisFieldData.setMap(map) //appkeyMap.putAll(map) map.putAll(appkeyMap) }*/ //println("new map :"+map.isEmpty) } // if(appKeyBroadCast==null){ println("廣播變數的數量 : "+appKeyBroadCast.value.size()) //} dStream.print()

 

程式碼分別在兩個進行廣播的測試,一個在foreachRDD 中@1 另一個為@2


@1:每一個duration中,廣播變數都會更新,並且更新的廣播變數在foreachRDD外無效

@2:廣播程式碼只在啟動後執行一次,其他duration不執行

另外,廣播變數時會遇到序列化問題,去掉checkPoint之後,問題解決,查詢之後得知現在版本的checkpoint還有很多問題,具體參考如下:checkpoint的弊端:  http://blog.csdn.net/u010454030/article/details/54985740