Spark大師之路:廣播變數(Broadcast)原始碼分析
概述
最近工作上忙死了……廣播變數這一塊其實早就看過了,一直沒有貼出來。
本文基於Spark 1.0原始碼分析,主要探討廣播變數的初始化、建立、讀取以及清除。
類關係
BroadcastManager類中包含一個BroadcastFactory物件的引用。大部分操作通過呼叫BroadcastFactory中的方法來實現。
BroadcastFactory是一個Trait,有兩個直接子類TorrentBroadcastFactory、HttpBroadcastFactory。這兩個子類實現了對HttpBroadcast、TorrentBroadcast的封裝,而後面兩個又同時集成了Broadcast抽象類。
圖……就不畫了
BroadcastManager的初始化
SparkContext初始化時會建立SparkEnv物件env,這個過程中會呼叫BroadcastManager的構造方法返回一個物件作為env的成員變數存在:
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
構造BroadcastManager物件時會呼叫initialize方法,主要根據配置初始化broadcastFactory成員變數,並呼叫其initialize方法。
val broadcastFactoryClass = conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.HttpBroadcastFactory") broadcastFactory = Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory] // Initialize appropriate BroadcastFactory and BroadcastObject broadcastFactory.initialize(isDriver, conf, securityManager)
兩個工廠類的initialize方法都是對其相應實體類的initialize方法的呼叫,下面分開兩個類來看。
HttpBroadcast的initialize方法
def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) { synchronized { if (!initialized) { bufferSize = conf.getInt("spark.buffer.size", 65536) compress = conf.getBoolean("spark.broadcast.compress", true) securityManager = securityMgr if (isDriver) { createServer(conf) conf.set("spark.httpBroadcast.uri", serverUri) } serverUri = conf.get("spark.httpBroadcast.uri") cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup, conf) compressionCodec = CompressionCodec.createCodec(conf) initialized = true } } }
除了一些變數的初始化外,主要做兩件事情,一是createServer(只有在Driver端會做),其次是建立一個MetadataCleaner物件。
createServer
private def createServer(conf: SparkConf) {
broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
server = new HttpServer(broadcastDir, securityManager)
server.start()
serverUri = server.uri
logInfo("Broadcast server started at " + serverUri)
}
首先建立一個存放廣播變數的目錄,預設是
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0)
然後初始化一個HttpServer物件並啟動(封裝了jetty),啟動過程中包括載入資原始檔,起埠和執行緒用來監控請求等。這部分的細節在org.apache.spark.HttpServer類中,此處不做展開。
建立MetadataCleaner物件
一個MetadataCleaner物件包裝了一個定時計劃Timer,每隔一段時間執行一個回撥函式,此處傳入的回撥函式為cleanup:
private def cleanup(cleanupTime: Long) {
val iterator = files.internalMap.entrySet().iterator()
while(iterator.hasNext) {
val entry = iterator.next()
val (file, time) = (entry.getKey, entry.getValue)
if (time < cleanupTime) {
iterator.remove()
deleteBroadcastFile(file)
}
}
}
即清楚存在吵過一定時長的broadcast檔案。在時長未設定(預設情況)時,不清除:
if (delaySeconds > 0) {
logDebug(
"Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds " +
"and period of " + periodSeconds + " secs")
timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000)
}
TorrentBroadcast的initialize方法
def initialize(_isDriver: Boolean, conf: SparkConf) {
TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests
synchronized {
if (!initialized) {
initialized = true
}
}
}
Torrent在此處沒做什麼,這也可以看出和Http的區別,Torrent的處理方式就是p2p,去中心化。而Http是中心化服務,需要啟動服務來接受請求。
建立broadcast變數
呼叫SparkContext中的 def broadcast[T: ClassTag](value: T): Broadcast[T]方法來初始化一個廣播變數,實現如下:
def broadcast[T: ClassTag](value: T): Broadcast[T] = {
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
cleaner.foreach(_.registerBroadcastForCleanup(bc))
bc
}
即呼叫broadcastManager的newBroadcast方法:
def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean) = {
broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
}
再呼叫工廠類的newBroadcast方法,此處返回的是一個Broadcast物件。
HttpBroadcastFactory的newBroadcast
def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
new HttpBroadcast[T](value_, isLocal, id)
即建立一個新的HttpBroadcast物件並返回。
構造物件時主要做兩件事情:
HttpBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
blockId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
}
if (!isLocal) {
HttpBroadcast.write(id, value_)
}
1.將變數id和值放入blockManager,但並不通知master
2.呼叫伴生物件的write方法
def write(id: Long, value: Any) {
val file = getFile(id)
val out: OutputStream = {
if (compress) {
compressionCodec.compressedOutputStream(new FileOutputStream(file))
} else {
new BufferedOutputStream(new FileOutputStream(file), bufferSize)
}
}
val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)
serOut.writeObject(value)
serOut.close()
files += file
}
write方法將物件值按照指定的壓縮、序列化寫入指定的檔案。這個檔案所在的目錄即是HttpServer的資源目錄,檔名和id的對應關係為:
case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {
def name = "broadcast_" + broadcastId + (if (field == "") "" else "_" + field)
}
TorrentBroadcastFactory的newBroadcast方法
def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
new TorrentBroadcast[T](value_, isLocal, id)
同樣是建立一個TorrentBroadcast物件,並返回。
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
}
if (!isLocal) {
sendBroadcast()
}
做兩件事情,第一步和Http一樣,第二步:
def sendBroadcast() {
val tInfo = TorrentBroadcast.blockifyObject(value_)
totalBlocks = tInfo.totalBlocks
totalBytes = tInfo.totalBytes
hasBlocks = tInfo.totalBlocks
// Store meta-info
val metaId = BroadcastBlockId(id, "meta")
val metaInfo = TorrentInfo(null, totalBlocks, totalBytes)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
}
// Store individual pieces
for (i <- 0 until totalBlocks) {
val pieceId = BroadcastBlockId(id, "piece" + i)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
}
}
}
可以看出,先將元資料資訊快取到blockManager,再將塊資訊快取過去。開頭可以看到有一個分塊動作,是呼叫伴生物件的blockifyObject方法:
def blockifyObject[T](obj: T): TorrentInfo
此方法將物件obj分塊(預設塊大小為4M),返回一個TorrentInfo物件,第一個引數為一個TorrentBlock物件(包含blockID和block位元組陣列)、塊數量以及obj的位元組流總長度。
元資料資訊中的blockId為廣播變數id+字尾,value為總塊數和總位元組數。
資料資訊是分塊快取,每塊的id為廣播變數id加字尾及塊變好,資料位一個TorrentBlock物件
讀取廣播變數的值
通過呼叫bc.value來取得廣播變數的值,其主要實現在反序列化方法readObject中
HttpBroadcast的反序列化
HttpBroadcast.synchronized {
SparkEnv.get.blockManager.getSingle(blockId) match {
case Some(x) => value_ = x.asInstanceOf[T]
case None => {
logInfo("Started reading broadcast variable " + id)
val start = System.nanoTime
value_ = HttpBroadcast.read[T](id)
/*
* We cache broadcast data in the BlockManager so that subsequent tasks using it
* do not need to re-fetch. This data is only used locally and no other node
* needs to fetch this block, so we don't notify the master.
*/
SparkEnv.get.blockManager.putSingle(
blockId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
val time = (System.nanoTime - start) / 1e9
logInfo("Reading broadcast variable " + id + " took " + time + " s")
}
}
}
首先檢視blockManager中是否已有,如有則直接取值,否則呼叫伴生物件的read方法進行讀取:
def read[T: ClassTag](id: Long): T = {
logDebug("broadcast read server: " + serverUri + " id: broadcast-" + id)
val url = serverUri + "/" + BroadcastBlockId(id).name
var uc: URLConnection = null
if (securityManager.isAuthenticationEnabled()) {
logDebug("broadcast security enabled")
val newuri = Utils.constructURIForAuthentication(new URI(url), securityManager)
uc = newuri.toURL.openConnection()
uc.setAllowUserInteraction(false)
} else {
logDebug("broadcast not using security")
uc = new URL(url).openConnection()
}
val in = {
uc.setReadTimeout(httpReadTimeout)
val inputStream = uc.getInputStream
if (compress) {
compressionCodec.compressedInputStream(inputStream)
} else {
new BufferedInputStream(inputStream, bufferSize)
}
}
val ser = SparkEnv.get.serializer.newInstance()
val serIn = ser.deserializeStream(in)
val obj = serIn.readObject[T]()
serIn.close()
obj
}
使用serverUri和block id對應的檔名直接開啟一個HttpConnection將中心伺服器上相應的資料取過來,使用配置的壓縮和序列化機制進行解壓和反序列化。
這裡可以看到,所有需要用到廣播變數值的executor都需要去driver上pull廣播變數的內容。
取到值後,快取到blockManager中,以便下次使用。
TorrentBroadcast的反序列化
private def readObject(in: ObjectInputStream) {
in.defaultReadObject()
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.getSingle(broadcastId) match {
case Some(x) =>
value_ = x.asInstanceOf[T]
case None =>
val start = System.nanoTime
logInfo("Started reading broadcast variable " + id)
// Initialize @transient variables that will receive garbage values from the master.
resetWorkerVariables()
if (receiveBroadcast()) {
value_ = TorrentBroadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
/* Store the merged copy in cache so that the next worker doesn't need to rebuild it.
* This creates a trade-off between memory usage and latency. Storing copy doubles
* the memory footprint; not storing doubles deserialization cost. Also,
* this does not need to be reported to BlockManagerMaster since other executors
* does not need to access this block (they only need to fetch the chunks,
* which are reported).
*/
SparkEnv.get.blockManager.putSingle(
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
// Remove arrayOfBlocks from memory once value_ is on local cache
resetWorkerVariables()
} else {
logError("Reading broadcast variable " + id + " failed")
}
val time = (System.nanoTime - start) / 1e9
logInfo("Reading broadcast variable " + id + " took " + time + " s")
}
}
}
和Http一樣,都是先檢視blockManager中是否已經快取,若沒有,則呼叫receiveBroadcast方法:
def receiveBroadcast(): Boolean = {
// Receive meta-info about the size of broadcast data,
// the number of chunks it is divided into, etc.
val metaId = BroadcastBlockId(id, "meta")
var attemptId = 10
while (attemptId > 0 && totalBlocks == -1) {
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.getSingle(metaId) match {
case Some(x) =>
val tInfo = x.asInstanceOf[TorrentInfo]
totalBlocks = tInfo.totalBlocks
totalBytes = tInfo.totalBytes
arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
hasBlocks = 0
case None =>
Thread.sleep(500)
}
}
attemptId -= 1
}
if (totalBlocks == -1) {
return false
}
/*
* Fetch actual chunks of data. Note that all these chunks are stored in
* the BlockManager and reported to the master, so that other executors
* can find out and pull the chunks from this executor.
*/
val recvOrder = new Random().shuffle(Array.iterate(0, totalBlocks)(_ + 1).toList)
for (pid <- recvOrder) {
val pieceId = BroadcastBlockId(id, "piece" + pid)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.getSingle(pieceId) match {
case Some(x) =>
arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
hasBlocks += 1
SparkEnv.get.blockManager.putSingle(
pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
case None =>
throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
}
}
}
hasBlocks == totalBlocks
}
和寫資料一樣,同樣是分成兩個部分,首先取元資料資訊,再根據元資料資訊讀取實際的block資訊。注意這裡都是從blockManager中讀取的,這裡貼出blockManager.getSingle的分析。
呼叫棧中最後到BlockManager.doGetRemote方法,中間有一條語句:
val locations = Random.shuffle(master.getLocations(blockId))
即將存有這個block的節點資訊隨機打亂,然後使用:
val data = BlockManagerWorker.syncGetBlock(
GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
來獲取。
從這裡可以看出,Torrent方法首先將廣播變數資料分塊,並存到BlockManager中;每個節點需要讀取廣播變數時,是分塊讀取,對每一塊都讀取其位置資訊,然後隨機選一個存有此塊資料的節點進行get;每個節點讀取後會將包含的快資訊報告給BlockManagerMaster,這樣本地節點也成為了這個廣播網路中的一個peer。
與Http方式形成鮮明對比,這是一個去中心化的網路,只需要保持一個tracker即可,這就是p2p的思想。
廣播變數的清除
廣播變數被建立時,緊接著有這樣一句程式碼:
cleaner.foreach(_.registerBroadcastForCleanup(bc))
cleaner是一個ContextCleaner物件,會將剛剛建立的廣播變數註冊到其中,呼叫棧為:
def registerBroadcastForCleanup[T](broadcast: Broadcast[T]) {
registerForCleanup(broadcast, CleanBroadcast(broadcast.id))
}
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask) {
referenceBuffer += new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue)
}
cleaner.foreach(_.start())
start方法中會呼叫keepCleaning方法,會遍歷註冊的清理任務(包括RDD、shuffle和broadcast),依次進行清理:
private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
reference.map(_.task).foreach { task =>
logDebug("Got cleaning task " + task)
referenceBuffer -= reference.get
task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
}
}
} catch {
case e: Exception => logError("Error in cleaning thread", e)
}
}
}
doCleanupBroadcast呼叫以下語句:
broadcastManager.unbroadcast(broadcastId, true, blocking)
然後是:
def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
broadcastFactory.unbroadcast(id, removeFromDriver, blocking)
}
每個工廠類呼叫其對應實體類的伴生物件的unbroadcast方法。
HttpBroadcast中的變數清除
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = synchronized {
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
if (removeFromDriver) {
val file = getFile(id)
files.remove(file)
deleteBroadcastFile(file)
}
}
1是刪除blockManager中的快取,2是刪除本地持久化的檔案
TorrentBroadcast中的變數清除
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = synchronized {
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
}
小結
Broadcast可以使用在executor端多次使用某個資料的場景(比如說字典),Http和Torrent兩種方式對應傳統的CS訪問方式和P2P訪問方式,當廣播變數較大或者使用較頻繁時,採用後者可以減少driver端的壓力。
BlockManager在此處充當P2P中的tracker角色,沒有展開描述,後續會開專題講這個部分。