Crail-spark-io原始碼閱讀
阿新 • • 發佈:2018-12-04
crail-spark-io原始碼閱讀
storage
CrailDispatcher.scala
registerShuffle方法
/* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */
def registerShuffle(shuffleId: Int, numMaps: Int, partitions: Int) : Unit = {
//logInfo("registering shuffle " + shuffleId + ", time " + ", cacheSize " + fs.getCacheSize)
val shuffleStore = new CrailShuffleStore
val oldStore = shuffleCache.putIfAbsent(shuffleId, shuffleStore)
val futureQueue = new LinkedBlockingQueue[Future[ CrailNode]]()
val start = System.currentTimeMillis()
val shuffleIdDir = shuffleDir + "/shuffle_" + shuffleId
var future : Future[CrailNode] = fs.create(shuffleIdDir, CrailNodeType.DIRECTORY, CrailStorageClass.PARENT, CrailLocationClass.DEFAULT, true)
futureQueue.add(future)
var i = 0
while (i < partitions) {
val subDir = shuffleIdDir + "/" + "part_" + i.toString
future = fs.create(subDir, CrailNodeType.MULTIFILE, CrailStorageClass.PARENT, CrailLocationClass.DEFAULT, true)
futureQueue.add(future)
i+=1
}
val fileQueue = new LinkedBlockingQueue[CrailNode]()
while(!futureQueue.isEmpty){
val file = futureQueue.poll().get()
fileQueue.add(file)
}
while(!fileQueue.isEmpty){
fileQueue.poll().syncDir()
}
val end = System.currentTimeMillis()
val executionTime = (end - start) / 1000.0
}