1. 程式人生 > >zeppelin原始碼分析(6)——note的執行過程

zeppelin原始碼分析(6)——note的執行過程

開發十年,就只剩下這套架構體系了! >>>   

上圖是zeppelin的前後臺互動模型,zeppelin採用單獨的jvm來啟動interpreter程序,該Interpreter程序與zeppelinServer程序之間採用Thrift協議通訊,其中RemoteInterpreterProcess是Thrift-Client端,而相應的RemoteInterpreterServer是Thrift-Server端。

Paragraph的執行分成“從前端UI提交ParagraphJob到其相關的Interpreter的Scheduler”和“Sheduler執行”2個部分,這2個部分是非同步執行的。

 

以上是從前臺請求執行指定的Note的指定的Paragraph開始,到該Paragraph提交到Scheduler之間的時序圖。這個執行邏輯是與語言無關的。任何語言寫的指令碼(儲存在Paragraph之中)都是上述提交執行的過程。

下面是Scheduler執行該ParagraphJob的時序圖:

這裡有如下的幾點需要注意:

1)      InterpreterFactory目前將所有的Interpreter都被例項化成了RemoteInterpreter,參見其createInterpretersForNote方法:

for (String intName : keys) {
  RegisteredInterpreter info = Interpreter.registeredInterpreters.get(intName);
  if (info.getClassName().equals(className)
      && info.getGroup().equals(groupName)) {
    Interpreter intp;

    if (option.isRemote()) {//根據option配置來建立
      intp = createRemoteRepl(info.getPath(),
          key,
          info.getClassName(),
          properties,
          interpreterSetting.id());
    } else {
      intp = createRepl(info.getPath(),
          info.getClassName(),
          properties);
    }

雖然InterpreterFactory在建立的時候做了判斷,但是其實所有的Option.remote屬性都為true,參見InterpreterFactory初始化的方法loadFromFile():

private void loadFromFile() throws IOException {
//省略了部分程式碼
for (String k : info.interpreterSettings.keySet()) {
  InterpreterSetting setting = info.interpreterSettings.get(k);

  // Always use separate interpreter process
  // While we decided to turn this feature on always (without providing
  // enable/disable option on GUI).
  // previously created setting should turn this feature on here.
  setting.getOption().setRemote(true);//全部置為true了
//省略了部分程式碼
}

2)      RemoteInterpreterProcess在reference相關的InterpreterGroup的時候,會使用apache common-exec框架建立新的程序。

public int reference(InterpreterGroup interpreterGroup) {
synchronized (referenceCount) {
  if (executor == null) {
    // start server process
    try {
      port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();//隨機可用埠
    } catch (IOException e1) {
      throw new InterpreterException(e1);
    }
 
CommandLine cmdLine = CommandLine.parse(interpreterRunner);
cmdLine.addArgument("-d", false);
cmdLine.addArgument(interpreterDir, false);
cmdLine.addArgument("-p", false);
cmdLine.addArgument(Integer.toString(port), false);
cmdLine.addArgument("-l", false);
cmdLine.addArgument(localRepoDir, false);

executor = new DefaultExecutor();

watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
executor.setWatchdog(watchdog);

running = true;
try {
  Map procEnv = EnvironmentUtils.getProcEnvironment();
  procEnv.putAll(env);

  logger.info("Run interpreter process {}", cmdLine);
  executor.execute(cmdLine, procEnv, this);//啟動新的程序
} catch (IOException e) {
  running = false;
  throw new InterpreterException(e);
}

其中interpreterRunner會指向bin/interpreter.sh指令碼,該指令碼的主要功能根據是否定義了SPARK_HOME環境變數(定位到Spark-submit指令碼),構建classpath,然後以指定的port執行ZEPPELIN_SERVER指定的主類,該變數被定義為: 

ZEPPELIN_SERVER=org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer

啟動程序的程式碼如下:

if [[ -n "${SPARK_SUBMIT}" ]]; then
    ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path "${ZEPPELIN_CLASSPATH_OVERRIDES}:${CLASSPATH}" --driver-java-options "${JAVA_INTP_OPTS}" ${SPARK_SUBMIT_OPTIONS} ${SPARK_APP_JAR} ${PORT} &
else
    ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_CLASSPATH_OVERRIDES}:${CLASSPATH} ${ZEPPELIN_SERVER} ${PORT} &
fi

 

而RemoteInterpreterServer實現了RemoteInterpreterService.Iface,是Thrift的Server端,RemoteInterpreterProcess是Thrift的client端。

3)      Remote的含義是(至少目前是)“另外一個程序“,與zeppelinServer所在的程序不是同一個,該程序並非啟動在另外一個獨立的的機器上,zeppelin目前還不支援叢集,所有的Interpreter jvm都啟動在localhost上。因此如果想除錯Interpreter的方法是如何工作的,需要為該Interpreter啟動獨立的除錯程序,在zeppelinServer所在的除錯程序中設定Interpreter.interpret(Stringst, InterpreterContext context)斷點想要命