spark streaming中的廣播變數應用
1. 廣播變數
我們知道spark 的廣播變數允許快取一個只讀的變數在每臺機器上面,而不是每個任務儲存一份拷貝。常見於spark在一些全域性統計的場景中應用。通過廣播變數,能夠以一種更有效率的方式將一個大資料量輸入集合的副本分配給每個節點。Spark也嘗試著利用有效的廣播演算法去分配廣播變數,以減少通訊的成本。
一個廣播變數可以通過呼叫SparkContext.broadcast(v)方法從一個初始變數v中建立。廣播變數是v的一個包裝變數,它的值可以通過value方法訪問,下面的程式碼說明了這個過程:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3)
2. Spark Streaming 廣播變數的更新
廣播變數的宣告很簡單,呼叫broadcast就能搞定,並且scala中一切可序列化的物件都是可以進行廣播的,這就給了我們很大的想象空間,可以利用廣播變數將一些經常訪問的大變數進行廣播,而不是每個任務儲存一份,這樣可以減少資源上的浪費。
但是,現在專案中遇到一種這樣的需求,用spark streaming 通過一些離線全域性更新好的資料對使用者進行實時推薦(當然這裡基於一些spark streaming的內部機制,不能實現真正的時效性):(1)日誌流通過kafka獲取 (2) 解析日誌流資料,融合離線的全域性資料,對每個Dtream進行計算(3)計算結果最後傳送到redis中。
其中就會涉及這樣的問題:(1)離線全域性的資料是需要全域性獲取的,不能區域性進行計算 (2)這部分資料是離線定期更新的,而spark streaming一旦開始,就長時間執行。如果離線資料更新了,如何在開始的流計算中,獲取到這部分更新後的資料。
針對上述問題,我們可以直接想的一種方法是,在driver端開啟一個附屬執行緒,週期性去獲取離線的全域性資料,然後通過diver分發到各個task中。但是考慮到這種方式:spark streaming整體的效能開銷會很大,並且重新開啟的後臺執行緒的不易管理。結合spark中的廣播變數,我們採用另一種方式來解決以上問題:
1> spark中的廣播變數是隻讀的,通過unpersist函式,可以記憶體中的相關序列化物件
2> 通過Dstream的foreachRDD方法,做到定時更新 (官網上有說明,該方法是在driver端執行的)
import java.io.{ObjectInputStream, ObjectOutputStream}
import com.bf.dt.wireless.config.WirelessConfig
import com.bf.dt.wireless.formator.WirelessFormator
import com.bf.dt.wireless.storage.MysqlConnectionPool
import com.bf.dt.wireless.utils.DateUtils
import kafka.serializer.StringDecoder
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.json4s._
import org.slf4j.LoggerFactory
import scala.collection.mutable
object WirelessLogAnalysis {
object BroadcastWrapper {
@volatile private var instance: Broadcast[Map[String, List[String]]] = null
private val map = mutable.LinkedHashMap[String, List[String]]()
def getMysql(): Map[String, List[String]] = {
//1.獲取mysql連線池的一個連線
val conn = MysqlConnectionPool.getConnection.get
//2.查詢新的資料
val sql = "select aid_type,aids from cf_similarity"
val ps = conn.prepareStatement(sql)
val rs = ps.executeQuery()
while (rs.next()) {
val aid = rs.getString("aid_type")
val aids = rs.getString("aids").split(",").toList
map += (aid -> aids)
}
//3.連線池回收連線
MysqlConnectionPool.closeConnection(conn)
map.toMap
}
def update(sc: SparkContext, blocking: Boolean = false): Unit = {
if (instance != null)
instance.unpersist(blocking)
instance = sc.broadcast(getMysql())
}
def getInstance(sc: SparkContext): Broadcast[Map[String, List[String]]] = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.broadcast(getMysql)
}
}
}
instance
}
private def writeObject(out: ObjectOutputStream): Unit = {
out.writeObject(instance)
}
private def readObject(in: ObjectInputStream): Unit = {
instance = in.readObject().asInstanceOf[Broadcast[Map[String, List[String]]]]
}
}
def main(args: Array[String]): Unit = {
val logger = LoggerFactory.getLogger(this.getClass)
val conf = new SparkConf()
.setAppName("wirelessLogAnalysis")
val ssc = new StreamingContext(conf, Seconds(10))
val kafkaConfig: Map[String, String] = Map(
"metadata.broker.list" -> WirelessConfig.getConf.get.getString("wireless.metadata.broker.list"),
"group.id" -> WirelessConfig.getConf.get.getString("wireless.group.id"),
"zookeeper.connect" -> WirelessConfig.getConf.get.getString("wireless.zookeeper.connect"),
"auto.offset.reset" -> WirelessConfig.getConf.get.getString("wireless.auto.offset.reset")
)
val androidvvTopic = WirelessConfig.getConf.get.getString("wireless.topic1")
val iphonevvToplic = WirelessConfig.getConf.get.getString("wireless.topic2")
val kafkaDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaConfig,
Set(androidvvTopic, iphonevvToplic)
)
//原始日誌流列印
kafkaDStream.print()
val jsonDstream = kafkaDStream.map(x =>
//解析日誌流
WirelessFormator.format(x._2)
)
//解密的日誌流列印
jsonDstream.print()
jsonDstream.foreachRDD {
rdd => {
// driver端執行,涉及操作:廣播變數的初始化和更新
// 可以自定義更新時間
if ((DateUtils.getNowTime().split(" ")(1) >= "08:00:00") && (DateUtils.getNowTime().split(" ")(1) <= "10:10:00")) {
BroadcastWrapper.update(rdd.sparkContext, true)
println("廣播變數更新成功: " + DateUtils.getNowTime())
}
//worker端執行,涉及操作:Dstream資料的處理和Redis更新
rdd.foreachPartition {
partitionRecords =>
//1.獲取redis連線,保證每個partition建立一次連線,避免每個記錄建立/關閉連線的效能消耗
partitionRecords.foreach(
record => {
//2.處理日誌流
val uid = record._1
val aid_type = record._2 + "_" + record._3
if (cf.value.keySet.contains(aid_type)) {
(uid, cf.value.get(aid_type))
println((uid, cf.value.get(aid_type)))
}
else
(uid, "-1")
}
//3.redis更新資料
)
//4.關閉redis連線
}
}
}
ssc.start()
ssc.awaitTermination()
}
}
說明:以上是無線推薦專案中部分程式碼,其中離線全域性資料儲存在mysql中,MysqlConnectionPool是mysql連線池定義類,WirelessFormator是日誌解密的定義類