大資料:Spark Storage(二) 叢集下的broadcast
Spark BroadCast
Broadcast 簡單來說就是將資料從一個節點複製到其他各個節點,常見用於資料複製到節點本地用於計算,在前面一章中討論過Storage模組中BlockManager,Block既可以儲存在記憶體中,也可以儲存在磁碟中,當Executor節點本地沒有資料,通過Driver去獲取資料
Spark的官方描述:
A broadcast variable. Broadcast variables allow the programmer to keep a read-only variable * cached on each machine rather than shipping a copy of it with tasks. They can be used, for * example, to give every node a copy of a large input dataset in an efficient manner. Spark also * attempts to distribute broadcast variables using efficient broadcast algorithms to reduce * communication cost.
在Broadcast中,Spark只是傳遞只讀變數的內容,通常如果一個變數更新會涉及到多個節點的該變數的資料同步更新,為了保證資料一致性,Spark在broadcast 中只傳遞不可修改的資料。
Broadcast 只是細粒度化到executor? 在storage前面的文章中討論過BlockID 是以executor和實際的block塊組合的,executor 是執行submit的任務的子worker程序,隨著任務的結束而結束,對executor裡執行的子任務是同一程序執行,資料可以程序內直接共享(記憶體),所以BroadCast只需要細粒度化到executor就足夠了
TorrentBroadCast
Spark在老的版本1.2中有HttpBroadCast,但在2.1版本中就移除了,HttpBroadCast 中實現的原理是每個executor都是通過Driver來獲取Data資料,這樣很明顯的加大了Driver的網路負載和壓力,無法解決Driver的單點效能問題。
為了解決Driver的單點問題,Spark使用了Block Torrent的方式。
1. Driver 初始化的時候,會知道有幾個executor,以及多少個Block, 最後在Driver端會生成block所對應的節點位置,初始化的時候因為executor沒有資料,所有塊的location都是Driver
2. Executor 進行運算的時候,從BlockManager裡的獲取本地資料,如果本地資料不存在,然後從driver獲取資料的位置
bm.getLocalBytes(pieceId) match {
case Some(block) =>
blocks(pid) = block
releaseLock(pieceId)
case None =>
bm.getRemoteBytes(pieceId) match {
case Some(b) =>
if (checksumEnabled) {
val sum = calcChecksum(b.chunks(0))
if (sum != checksums(pid)) {
throw new SparkException(s"corrupt remote block $pieceId of $broadcastId:" +
s" $sum != ${checksums(pid)}")
}
}
// We found the block from remote executors/driver's BlockManager, so put the block
// in this executor's BlockManager.
if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) {
throw new SparkException(
s"Failed to store $pieceId of $broadcastId in local BlockManager")
}
blocks(pid) = b
case None =>
throw new SparkException(s"Failed to get $pieceId of $broadcastId")
}
3. Driver裡儲存的塊的位置只有Driver自己有,所以返回executer的位置列表只有driver
private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty
}
4. 通過塊的傳輸通道從Driver裡獲取到資料
blockTransferService.fetchBlockSync(
loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
5. 獲取資料後,使用BlockManager.putBytes ->最後使用doPutBytes儲存資料
private def doPutBytes[T](
blockId: BlockId,
bytes: ChunkedByteBuffer,
level: StorageLevel,
classTag: ClassTag[T],
tellMaster: Boolean = true,
keepReadLock: Boolean = false): Boolean = {
.....
val putBlockStatus = getCurrentBlockStatus(blockId, info)
val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
if (blockWasSuccessfullyStored) {
// Now that the block is in either the memory or disk store,
// tell the master about it.
info.size = size
if (tellMaster && info.tellMaster) {
reportBlockStatus(blockId, putBlockStatus)
}
addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
}
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
if (level.replication > 1) {
// Wait for asynchronous replication to finish
try {
Await.ready(replicationFuture, Duration.Inf)
} catch {
case NonFatal(t) =>
throw new Exception("Error occurred while waiting for replication to finish", t)
}
}
if (blockWasSuccessfullyStored) {
None
} else {
Some(bytes)
}
}.isEmpty
}
6. 在儲存資料後同時彙報該Block的狀態到Driver
7. Driver跟新executor 的BlockManager的狀態,並且把Executor的地址加入到該BlockID的地址集合中
private def updateBlockInfo(
blockManagerId: BlockManagerId,
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): Boolean = {
if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.isDriver && !isLocal) {
// We intentionally do not register the master (except in local mode),
// so we should not indicate failure.
return true
} else {
return false
}
}
if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
return true
}
blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
var locations: mutable.HashSet[BlockManagerId] = null
if (blockLocations.containsKey(blockId)) {
locations = blockLocations.get(blockId)
} else {
locations = new mutable.HashSet[BlockManagerId]
blockLocations.put(blockId, locations)
}
if (storageLevel.isValid) {
locations.add(blockManagerId)
} else {
locations.remove(blockManagerId)
}
// Remove the block from master tracking if it has been removed on all slaves.
if (locations.size == 0) {
blockLocations.remove(blockId)
}
true
}
如何實現Torrent?
1. 為了避免Driver的單點問題,在上面的分析中每個executor如果本地不存在資料的時候,通過Driver獲取了該BlockId的位置的集合,executor獲取到BlockId的地址集合隨機化後,優先找同主機的地址(這樣可以走回環),然後從隨機的地址集合按順序取地址一個一個嘗試去獲取資料,因為隨機化了地址,那麼executor不只會從Driver去獲取資料
/**
* Return a list of locations for the given block, prioritizing the local machine since
* multiple block managers can share the same host.
*/
private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
val locs = Random.shuffle(master.getLocations(blockId))
val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host }
preferredLocs ++ otherLocs
}
2. BlockID 的隨機化
通常資料會被分為多個BlockID,取決於你設定的每個Block的大小
spark.broadcast.blockSize=10M |
在獲取完整的BlockID塊的時候,在Torrent的演算法中,隨機化了BlockID
for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
......
}
在任務啟動的時候,新啟的executor都會同時從driver去獲取資料,大家如果都是以相同的Block的順序,基本上的每個Block資料對executor還是會從Driver去獲取, 而BlockID的簡單隨機化就可以保證每個executor從driver獲取到不同的塊,當不同的executor在取獲取其他塊的時候就有機會從其他的executor上獲取到,從而分散了對Driver的負載壓力。