1. 程式人生 > 其它 >Spark原始碼系列(一)spark-submit提交作業過程

Spark原始碼系列(一)spark-submit提交作業過程

前言

折騰了很久,終於開始學習Spark的原始碼了,第一篇我打算講一下Spark作業的提交過程。

這個是Spark的App執行圖,它通過一個Driver來和叢集通訊,叢集負責作業的分配。今天我要講的是如何建立這個Driver Program的過程。

作業提交方法以及引數

我們先看一下用Spark Submit提交的方法吧,下面是從官方上面摘抄的內容。

# Run on a Spark standalone cluster
./bin/spark-submit 
  --class org.apache.spark.examples.SparkPi 
  --master spark://207.184.161.138:7077 
  --executor-memory 20G 
  --total-executor-cores 100 
  /path/to/examples.jar 
  1000

這個是提交到standalone叢集的方式,開啟spark-submit這檔案,我們會發現它最後是呼叫了org.apache.spark.deploy.SparkSubmit這個類。

我們直接進去看就行了,main函式就幾行程式碼,太節省了。

def main(args: Array[String]) {
    val appArgs = new SparkSubmitArguments(args)
    val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
    launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose)
}

我們主要看看createLaunchEnv方法就可以了,launch是反射呼叫mainClass,精華全在createLaunchEnv裡面了。

在裡面我發現一些有用的資訊,可能在官方文件上面都沒有的,發出來大家瞅瞅。前面不帶--的可以在spark-defaults.conf裡面設定,帶--的直接在提交的時候指定,具體含義大家一看就懂。

val options = List[OptionAssigner](
      OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
      OptionAssigner(args.name, ALL_CLUSTER_MGRS, false, sysProp = "spark.app.name"),
      OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"),
      OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true,
        sysProp = "spark.driver.extraClassPath"),
      OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true,
        sysProp = "spark.driver.extraJavaOptions"),
      OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true,
        sysProp = "spark.driver.extraLibraryPath"),
      OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"),
      OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"),
      OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"),
      OptionAssigner(args.queue, YARN, true, clOption = "--queue"),
      OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"),
      OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"),
      OptionAssigner(args.numExecutors, YARN, false, sysProp = "spark.executor.instances"),
      OptionAssigner(args.executorMemory, YARN, true, clOption = "--executor-memory"),
      OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, false,
        sysProp = "spark.executor.memory"),
      OptionAssigner(args.executorCores, YARN, true, clOption = "--executor-cores"),
      OptionAssigner(args.executorCores, YARN, false, sysProp = "spark.executor.cores"),
      OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, false,
        sysProp = "spark.cores.max"),
      OptionAssigner(args.files, YARN, false, sysProp = "spark.yarn.dist.files"),
      OptionAssigner(args.files, YARN, true, clOption = "--files"),
      OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
      OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
      OptionAssigner(args.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
      OptionAssigner(args.archives, YARN, true, clOption = "--archives"),
      OptionAssigner(args.jars, YARN, true, clOption = "--addJars"),
      OptionAssigner(args.jars, ALL_CLUSTER_MGRS, false, sysProp = "spark.jars")
 )

Driver程式的部署模式有兩種,client和cluster,預設是client。client的話預設就是直接在本地運行了Driver程式了,cluster模式還會兜一圈把作業發到叢集上面去執行。

指定部署模式需要用引數--deploy-mode來指定,或者在環境變數當中新增DEPLOY_MODE變數來指定。

下面講的是cluster的部署方式,兜一圈的這種情況。

yarn模式的話mainClass是org.apache.spark.deploy.yarn.Client,standalone的mainClass是org.apache.spark.deploy.Client。

這次我們講org.apache.spark.deploy.Client,yarn的話單獨找一章出來單獨講,目前超哥還是推薦使用standalone的方式部署spark,具體原因不詳,據說是因為資源排程方面的問題。

說個快捷鍵吧,Ctrl+Shift+N,然後輸入Client就能找到這個類,這是IDEA的快捷鍵,相當好使。

我們直接找到它的main函式,發現了它居然使用了Akka框架,我百度了一下,被它震驚了。

Akka

在main函式裡面,主要程式碼就這麼三行。

//建立一個ActorSystem
val (actorSystem, _) = AkkaUtils.createActorSystem("driverClient",Utils.localHostName(),0,
  conf, new SecurityManager(conf))
//執行ClientActor的preStart方法和receive方法
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
//等待執行結束
actorSystem.awaitTermination()

看了這裡真的有點兒懵啊,這是啥玩意兒,不懂的朋友們,請點選這裡Akka。下面是它官方放出來的例子:

//定義一個case class用來傳遞引數
case class Greeting(who: String)
//定義Actor,比較重要的一個方法是receive方法,用來接收資訊的
class GreetingActor extends Actor with ActorLogging {
   def receive = {
       case Greeting(who) ⇒ log.info("Hello " + who)
   }
}
//建立一個ActorSystem
val system = ActorSystem("MySystem")
//給ActorSystem設定Actor
val greeter = system.actorOf(Props[GreetingActor], name = "greeter")
//向greeter傳送資訊,用Greeting來傳遞
greeter ! Greeting("Charlie Parker")

簡直是無比強大啊,就這麼幾行程式碼就搞定了,接下來看你會更加震驚的。

我們回到Client類當中,找到ClientActor,它有兩個方法,是之前說的preStart和receive方法,preStart方法用於連線master提交作業請求,receive方法用於接收從master返回的反饋資訊。

我們先看preStart方法吧。

override def preStart() = {
    // 這裡需要把master的地址轉換成akka的地址,然後通過這個akka地址獲得指定的actor
    // 它的格式是"akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
    masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master))
    // 把自身設定成遠端生命週期的事件
    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

    driverArgs.cmd match {
      case "launch" =>
        // 此處省略100個字
        val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
        // 此處省略100個字
        // 向master傳送提交Driver的請求,把driverDescription傳過去,RequestSubmitDriver前面說過了,是個case class
        masterActor ! RequestSubmitDriver(driverDescription)

      case "kill" =>
        val driverId = driverArgs.driverId
        val killFuture = masterActor ! RequestKillDriver(driverId)
    }
}

從上面的程式碼看得出來,它需要設定master的連線地址,最後提交了一個RequestSubmitDriver的資訊。在receive方法裡面,就是等待接受迴應了,有兩個Response分別對應著這裡的launch和kill。

線索貌似到這裡就斷了,那下一步在哪裡了呢?當然是在Master裡面啦,怎麼知道的,猜的,哈哈。

Master也是繼承了Actor,在它的main函式裡面找到了以下程式碼:

val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, 
  securityManager = securityMgr)
val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort, securityMgr), actorName)
val timeout = AkkaUtils.askTimeout(conf)
val respFuture = actor.ask(RequestWebUIPort)(timeout)
val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse]

和前面的actor基本一致,多了actor.ask這句話,查了一下官網的文件,這句話的意思的傳送訊息,並且接受一個Future作為response,和前面的actor ! message的區別就是它還接受返回值。

具體的Akka的用法,大家還是參照官網咖,Akka確實如它官網所言的那樣子,是一個簡單、強大、並行的分散式框架。

小結:

Akka的使用確實簡單,短短的幾行程式碼即刻完成一個通訊功能,比Socket簡單很多。但是它也逃不脫我們常說的那些東西,請求、接收請求、傳遞的訊息、註冊的地址和埠這些概念。

排程schedule

我們接下來查詢Master的receive方法吧,Master是作為接收方的,而不是主動請求,這點和hadoop是一致的。

    case RequestSubmitDriver(description) => {
        val driver = createDriver(description)
        persistenceEngine.addDriver(driver)
        waitingDrivers += driver
        drivers.add(driver)
        // 排程
        schedule()
         // 告訴client,提交成功了,把driver.id告訴它
        sender ! SubmitDriverResponse(true, Some(driver.id), s"Driver successfully submitted as ${driver.id}")
      }

這裡我們主要看schedule方法就可以了,它是執行排程的方法。

private def schedule() {
    if (state != RecoveryState.ALIVE) { return }

    // 首先排程Driver程式,從workers裡面隨機抽一些出來
    val shuffledWorkers = Random.shuffle(workers) 
    for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
      for (driver <- waitingDrivers) {
        // 判斷記憶體和cpu夠不夠,夠的就執行了哈
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          launchDriver(worker, driver)
          waitingDrivers -= driver
        }
      }
    }

    // 這裡是按照先進先出的,spreadOutApps是由spark.deploy.spreadOut引數來決定的,預設是true
    if (spreadOutApps) {
      // 遍歷一下app
      for (app <- waitingApps if app.coresLeft > 0) {
        // canUse裡面判斷了worker的記憶體是否夠用,並且該worker是否已經包含了該app的Executor
        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
          .filter(canUse(app, _)).sortBy(_.coresFree).reverse
        val numUsable = usableWorkers.length
        val assigned = new Array[Int](numUsable) 
        // 記錄每個節點的核心數
        var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
        var pos = 0
        // 遍歷直到分配結束
        while (toAssign > 0) {
          // 從0開始遍歷可用的work,如果可用的cpu減去已經分配的>0,就可以分配給它
          if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
            toAssign -= 1
            // 這個位置的work的可分配的cpu數+1
            assigned(pos) += 1
          }
          pos = (pos + 1) % numUsable
        }
        // 給剛才標記的worker分配任務
        for (pos <- 0 until numUsable) {
          if (assigned(pos) > 0) {
            val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
            launchExecutor(usableWorkers(pos), exec)
            app.state = ApplicationState.RUNNING
          }
        }
      }
    } else {
      // 這種方式和上面的方式的區別是,這種方式儘可能用少量的節點來完成這個任務
      for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
        for (app <- waitingApps if app.coresLeft > 0) {
          // 判斷條件是worker的記憶體比app需要的記憶體多
          if (canUse(app, worker)) {
            val coresToUse = math.min(worker.coresFree, app.coresLeft)
            if (coresToUse > 0) {
              val exec = app.addExecutor(worker, coresToUse)
              launchExecutor(worker, exec)
              app.state = ApplicationState.RUNNING
            }
          }
        }
      }
    }
  }

它的排程器是這樣的,先排程Driver程式,然後再排程App,排程App的方式是從各個worker的裡面和App進行匹配,看需要分配多少個cpu。

那我們接下來看兩個方法launchDriver和launchExecutor即可。

  def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
    worker.addDriver(driver)
    driver.worker = Some(worker)
    worker.actor ! LaunchDriver(driver.id, driver.desc)
    driver.state = DriverState.RUNNING
  }

給worker傳送了一個LaunchDriver的訊息,下面在看launchExecutor的方法。

  def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    worker.addExecutor(exec)
    worker.actor ! LaunchExecutor(masterUrl,
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
    exec.application.driver ! ExecutorAdded(
      exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
  }

它要做的事情多一點,除了給worker傳送LaunchExecutor指令外,還需要給driver傳送ExecutorAdded的訊息,說你的任務已經有人幹了。

在繼續Worker講之前,我們先看看它是怎麼註冊進來的,每個Worker啟動之後,會自動去請求Master去註冊自己,具體我們可以看receive的方法裡面的RegisterWorker這一段,它需要上報自己的記憶體、Cpu、地址、埠等資訊,註冊成功之後返回RegisteredWorker資訊給它,說已經註冊成功了。

Worker執行

同樣的,我們到Worker裡面在receive方法找LaunchDriver和LaunchExecutor就可以找到我們要的東西。

case LaunchDriver(driverId, driverDesc) => {
      logInfo(s"Asked to launch driver $driverId")
      val driver = new DriverRunner(driverId, workDir, sparkHome, driverDesc, self, akkaUrl)
      drivers(driverId) = driver
      driver.start()

      coresUsed += driverDesc.cores
      memoryUsed += driverDesc.mem
}

看一下start方法吧,start方法裡面,其實是new Thread().start(),run方法裡面是通過傳過來的DriverDescription構造的一個命令,丟給ProcessBuilder去執行命令,結束之後呼叫。

worker !DriverStateChanged通知worker,worker再通過master ! DriverStateChanged通知master,釋放掉worker的cpu和記憶體。

同理,LaunchExecutor執行完畢了,通過worker ! ExecutorStateChanged通知worker,然後worker通過master ! ExecutorStateChanged通知master,釋放掉worker的cpu和記憶體。

下面我們再梳理一下這個過程,只包括Driver註冊,Driver執行之後的過程在之後的文章再說,比較複雜。

1、Client通過獲得Url地址獲得ActorSelection(master的actor引用),然後通過ActorSelection給Master傳送註冊Driver請求(RequestSubmitDriver)

2、Master接收到請求之後就開始排程了,從workers列表裡面找出可以用的Worker

3、通過Worker的actor引用ActorRef給可用的Worker傳送啟動Driver請求(LaunchDriver)

4、排程完畢之後,給Client回覆註冊成功訊息(SubmitDriverResponse)

5、Worker接收到LaunchDriver請求之後,通過傳過來的DriverDescription的資訊構造出命令來,通過ProcessBuilder執行

6、ProcessBuilder執行完命令之後,通過DriverStateChanged通過Worker

7、Worker最後把DriverStateChanged彙報給Master

後記:聽超哥說,org.apache.spark.deploy.Client這個類快要被刪除了,不知道cluster的這種模式是不是也被放棄了,官方給出來的例子推薦的是client模式->直接執行程式。難怪在作業排程的時候,看到別的actor叫driverActor。

不過這篇文章還有存在的意義, Akka和排程這塊,和我現在正在寫的第三篇以及第四篇關係很密切。