spark中動態廣播變數的使用
阿新 • • 發佈:2018-11-10
今天來說一下spark,動態廣播變數的用法,如果對廣播變數用法不清楚的可以檢視這個部落格,在實際專案中,有時候我們的廣播變數是動態的,比如需要一分鐘更新一次,這個也是可以實現的,我們知道廣播變數是在driver端初始化,在excetors端獲取這個變數,但是不能修改,所以,我們可以在driver端進行更新這個變數,具體的程式碼實現如下所示:
package test import java.sql.{Connection, DriverManager, ResultSet, Statement} import java.text.SimpleDateFormat import java.util.{Date, Properties} import kafka._ import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.{StringDeserializer} import org.apache.log4j.{Level, Logger} import org.apache.spark.broadcast.Broadcast import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies} import org.apache.spark.{SparkConf, SparkContext} object test3 { @volatile private var instance: Broadcast[Map[String, Double]] = null var kafkaStreams: InputDStream[ConsumerRecord[String, String]] = null val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS") def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.INFO) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.INFO) Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.INFO) val conf = new SparkConf().setAppName("Spark Streaming TO ES TOPIC") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") @transient val scc = new StreamingContext(conf, Seconds(1)) val topic = PropertiesScalaUtils.loadProperties("topic_combine") val topicSet = Set(topic) //設定kafka的topic; val kafkaParams = Map[String, Object]( "auto.offset.reset" -> "earliest", //latest;earliest "value.deserializer" -> classOf[StringDeserializer] //key,value的反序列化; , "key.deserializer" -> classOf[StringDeserializer] , "bootstrap.servers" -> PropertiesScalaUtils.loadProperties("broker") , "group.id" -> PropertiesScalaUtils.loadProperties("groupId_es") , "enable.auto.commit" -> (false: java.lang.Boolean) ) //初始化instance; getInstance(scc.sparkContext) kafkaStreams = KafkaUtils.createDirectStream[String, String]( scc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams)) kafkaStreams.foreachRDD(rdd => { val current_time = sdf.format(new Date()) val new_time = current_time.substring(14,16).toLong if(new_time % 5 == 0){ update(rdd.sparkContext,true) //五分鐘更新一次廣播變數的內容; } if (!rdd.isEmpty()) { val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //獲得偏移量物件陣列 rdd.foreachPartition(pr => { pr.foreach(pair => { val d = pair.value() if(instance.value.contains(d)){ //自己的處理邏輯; } }) }) } }) scc.start() scc.awaitTermination() } /** * 從sqlserver獲取資料放到一個map裡; * @return */ def getSqlServerData(): Map[String,Double] = { val time = sdf.format(new Date()) val enter_time = time.substring(0,10) var map = Map[String,Double]() var conn:Connection = null var stmt:Statement = null var rs:ResultSet = null val url = "" val user_name = "" val password = "" val sql = "" try { conn = DriverManager.getConnection(url,user_name,password) stmt = conn.createStatement rs = stmt.executeQuery(sql) while (rs.next) { val url = rs.getString("url") val WarningPrice = rs.getString("WarningPrice").toDouble map += (url -> WarningPrice) } if (rs != null) { rs.close rs = null } if (stmt != null) { stmt.close stmt = null } if (conn != null) { conn.close conn = null } } catch { case e: Exception => e.printStackTrace() println("sqlserver連線失敗:" + e) } map } /** * 更新instance; * @param sc * @param blocking */ def update(sc: SparkContext, blocking: Boolean = false): Unit = { if (instance != null){ instance.unpersist(blocking) instance = sc.broadcast(getSqlServerData()) } } /** * 初始化instance; * @param sc * @return */ def getInstance(sc: SparkContext): Broadcast[Map[String,Double]] = { if (instance == null) { synchronized { if (instance == null) { instance = sc.broadcast(getSqlServerData()) } } } instance } }
這個是從sqlserver獲取的資料,廣播到每一個excetors上,然後五分鐘更新一次,這個變數的值;
如果有寫的不對的地方,歡迎大家指正,如果有什麼疑問,可以加QQ群:340297350,謝謝