1. 程式人生 > >知乎上一位朋友總結的特別好的spark的文章,很不錯以轉載!

知乎上一位朋友總結的特別好的spark的文章,很不錯以轉載!

private def addPendingTask(index: Int, readding: Boolean = false) {
  // Utility method that adds `index` to a list only if readding=false or it's not already there
  def addTo(list: ArrayBuffer[Int]) {
    if (!readding || !list.contains(index)) {
      list += index
    }
  }

  for (loc <- tasks(index).preferredLocations) {
    loc match {
      case e: ExecutorCacheTaskLocation =>
        addTo(pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer))
      case e: HDFSCacheTaskLocation => {
        val exe = sched.getExecutorsAliveOnHost(loc.host)
        exe match {
          case Some(set) => {
            for (e <- set) {
              addTo(pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer))
            }
            logInfo(s"Pending task $index has a cached location at ${e.host} " +
              ", where there are executors " + set.mkString(","))
          }
          case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
              ", but there are no executors alive there.")
        }
      }
      case _ => Unit
    }
    addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
    for (rack <- sched.getRackForHost(loc.host)) {
      addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
    }
  }

  if (tasks(index).preferredLocations == Nil) {
    addTo(pendingTasksWithNoPrefs)
  }

  if (!readding) {
    allPendingTasks += index  // No point scanning this whole list to find the old task there
  }
}