Spark1.6-----原始碼解讀之BlockManager的概述
BlockManager的實現
BlockManager是spark儲存體系中的核心元件,Driver 和Executor都會建立BlockManager。
在SparkEnv 364行會建立BlockManager:
// NB: blockManager is not valid until initialize() is called later. val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster, serializer, conf, memoryManager, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores)
BlockManager 74行為BlockMange包含的成員:
磁碟塊管理器DiskBlockManager。
TimeStampedHashMap[BlockId, BlockInfo]為快取BlockId和對應的BlockInfo
記憶體儲存MemoryStore
磁碟儲存DiskStore
Shuffle客戶端ShuffleClient 如果有外部的ShuffleClient呼叫外部的ShuffleClient的初始化方法否則為BlockTransferService。看來這個可以自己寫啊。
跟BlockManagerMaster通訊的BlockManagerSlaveEndpoint
壓縮演算法CompressionCodec
非廣播Block清理器metadataCleaner和廣播Block清理器broadcastCleaner(不講解)
val diskBlockManager = new DiskBlockManager(this, conf) private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo] private val futureExecutionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128)) // Actual storage of where blocks are kept private var externalBlockStoreInitialized = false private[spark] val memoryStore = new MemoryStore(this, memoryManager) private[spark] val diskStore = new DiskStore(this, diskBlockManager) private[spark] lazy val externalBlockStore: ExternalBlockStore = { externalBlockStoreInitialized = true new ExternalBlockStore(this, executorId) } memoryManager.setMemoryStore(memoryStore) // Note: depending on the memory manager, `maxStorageMemory` may actually vary over time. // However, since we use this only for reporting and logging, what we actually want here is // the absolute maximum value that `maxStorageMemory` can ever possibly reach. We may need // to revisit whether reporting this value as the "max" is intuitive to the user. private val maxMemory = memoryManager.maxStorageMemory private[spark] val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) // Port used by the external shuffle service. In Yarn mode, this may be already be // set through the Hadoop configuration as the server is launched in the Yarn NM. private val externalShuffleServicePort = { val tmpPort = Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt if (tmpPort == 0) { // for testing, we set "spark.shuffle.service.port" to 0 in the yarn config, so yarn finds // an open port. But we still need to tell our spark apps the right port to use. So // only if the yarn config has the port set to 0, we prefer the value in the spark config conf.get("spark.shuffle.service.port").toInt } else { tmpPort } } var blockManagerId: BlockManagerId = _ // Address of the server that serves this executor's shuffle files. This is either an external // service, or just our own Executor's BlockManager. private[spark] var shuffleServerId: BlockManagerId = _ // Client to read other executors' shuffle files. This is either an external service, or just the // standard BlockTransferService to directly connect to other Executors. private[spark] val shuffleClient = if (externalShuffleServiceEnabled) { val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores) new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(), securityManager.isSaslEncryptionEnabled()) } else { blockTransferService } // Whether to compress broadcast variables that are stored private val compressBroadcast = conf.getBoolean("spark.broadcast.compress", true) // Whether to compress shuffle output that are stored private val compressShuffle = conf.getBoolean("spark.shuffle.compress", true) // Whether to compress RDD partitions that are stored serialized private val compressRdds = conf.getBoolean("spark.rdd.compress", false) // Whether to compress shuffle output temporarily spilled to disk private val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true) private val slaveEndpoint = rpcEnv.setupEndpoint( "BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next, new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker)) // Pending re-registration action being executed asynchronously or null if none is pending. // Accesses should synchronize on asyncReregisterLock. private var asyncReregisterTask: Future[Unit] = null private val asyncReregisterLock = new Object private val metadataCleaner = new MetadataCleaner( MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf) private val broadcastCleaner = new MetadataCleaner( MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf) // Field related to peer block managers that are necessary for block replication @volatile private var cachedPeers: Seq[BlockManagerId] = _ private val peerFetchLock = new Object private var lastPeerFetchTime = 0L /* The compression codec to use. Note that the "lazy" val is necessary because we want to delay * the initialization of the compression codec until it is first used. The reason is that a Spark * program could be using a user-defined codec in a third party jar, which is loaded in * Executor.updateDependencies. When the BlockManager is initialized, user level jars hasn't been * loaded yet. */ private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf)
BlockManager要生效必須進行初始化
初始化之前先說一下:
①Executor的BlockManager跟Driver的BlockManager進行通訊。
②對BlockManager將進行操作時,如果MemorySotre的記憶體不夠了,就會寫到DiskStore中
③通過訪問遠端節點的Executor的BlockManager中的TransportServer提供的Rpc服務下載或者上傳Block
④遠端節點會訪問本地的Executor的BlockManager的TransportServer的Rpc服務下載或者上傳Block
⑤ShuffleClient包含TransportServer
BlockManager 174行為初始化方法:
def initialize(appId: String): Unit = {
//元件初始化
blockTransferService.init(this)
shuffleClient.init(appId)
blockManagerId = BlockManagerId(
executorId, blockTransferService.hostName, blockTransferService.port)
shuffleServerId = if (externalShuffleServiceEnabled) {
logInfo(s"external shuffle service port = $externalShuffleServicePort")
BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
} else {
blockManagerId
}
//向Master註冊自己
master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
// Register Executors' configuration with the local shuffle service, if one should exist.
if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
registerWithExternalShuffleServer()
}
}