1. 程式人生 > 程式設計 >Giraph 執行流程(一)

Giraph 執行流程(一)

前言

本文主要分析了 Giraph1.3 SNAPSHOT 的 Job 提交和初始化的過程。其中 Job 提交部分的分析根據執行在 Standalone 模式下的 Hadoop 部分進行,分析僅涉及本地執行時執行的程式碼, 而初始化部分則主要根據叢集模式進行分析。

示例 Job

該部分不屬於原始碼,而是為了方便分析執行的一個示例 Job,Job 的具體配置和執行在 Giraph 程式設計實踐及原始碼編譯除錯 一文中已經說明。通常情況下, Job 提交過程如下:

GiraphConfiguration conf = new GiraphConfiguration(new Configuration());
//指定計算類
conf.setComputationClass(Shortestpath.class); //設定輸入和輸出格式 conf.setVertexInputFormatClass(JsonLongDoubleFloatDoubleVertexInputFormat.class); conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); //設定本地執行模式,方便除錯檢視原始碼 conf.setLocalTestMode(true); //設定 Worker 配置 conf.setWorkerConfiguration(1,1,100
); //本地模式下執行不分 Master 和 Worker GiraphConstants.SPLIT_Master_Worker.set(conf,false); GiraphJob job = new GiraphJob(conf,Shortestpath.class.getSimpleName()); //設定輸入和輸出路徑 GiraphTextInputFormat.setVertexInputPath(conf,new Path(INPUT_PATH)); GiraphTextOutputFormat.setOutputPath(job.getInternalJob(),new Path(OUTPUT_PATH)); •••••• //向 Giraph 提交 Job
job.run(true); 複製程式碼

首先指定一系列引數,然後呼叫 job.run(true) 向 Giraph 提交 Job

Giraph 向 Hadoop 提交 Job

Giraph 是基於 Hadoop 開發的,因此在向 Giraph 提交 Job 之後,Giraph 內部還會向 Hadoop 提交 Job。本部分主要分析 Giraph 如何向 Hadoop 提交 Job。首先檢視 run 方法:

org.apache.giraph.job.GiraphJob#run

/**
 * Runs the actual graph application through Hadoop Map-Reduce.
 *
 * @param verbose If true,provide verbose output,false otherwise
 * @return True if success,false otherwise
 * @throws ClassNotFoundException
 * @throws InterruptedException
 * @throws IOException
 */
public final boolean run(boolean verbose)
  throws IOException,InterruptedException,ClassNotFoundException {
  //更改 Job 的 counter 數量限制
  setIntConfIfDefault("mapreduce.job.counters.limit",512);

  //設定 Giraph 中 Worker 或者 Master 記憶體上限
  setIntConfIfDefault("mapred.job.map.memory.mb",1024);
  setIntConfIfDefault("mapred.job.reduce.memory.mb",0);

  // Speculative execution doesn't make sense for Giraph
  giraphConfiguration.setBoolean(
      "mapred.map.tasks.speculative.execution",false);

  // Set the ping interval to 5 minutes instead of one minute
  Client.setPingInterval(giraphConfiguration,60000 * 5);

  // 設定優先使用使用者上傳的 Jar 包的 class
  giraphConfiguration.setBoolean("mapreduce.user.classpath.first",true);
  giraphConfiguration.setBoolean("mapreduce.job.user.classpath.first",true);

  //不做 Checkpoint 的時候最大嘗試數為 1,為了讓不能恢復的 Job 更快的結束
  if (giraphConfiguration.getCheckpointFrequency() == 0) {
    int oldMaxTaskAttempts = giraphConfiguration.getMaxTaskAttempts();
    giraphConfiguration.setMaxTaskAttempts(1);
    
    ••••••
  }

  
  ImmutableClassesGiraphConfiguration conf =
      new ImmutableClassesGiraphConfiguration(giraphConfiguration);
  checkLocalJobRunnerConfiguration(conf);

  int tryCount = 0;
  //預設是 org.apache.giraph.job.DefaultGiraphJobRetryChecker
  GiraphJobRetryChecker retryChecker = conf.getJobRetryChecker();
  while (true) {
    ••••••

    tryCount++;
    //建立一個 Hadoop Job
    Job submittedJob = new Job(conf,jobName);
    if (submittedJob.getJar() == null) {
      submittedJob.setJarByClass(getClass());
    }
    //Giraph 不需要執行 Reduce 任務
    submittedJob.setNumReduceTasks(0);
    //設定 Mapper
    submittedJob.setMapperClass(GraphMapper.class);
    //設定輸入格式
    submittedJob.setInputFormatClass(BspInputFormat.class);
    //設定輸出格式,預設情況是 org.apache.giraph.bsp.BspOutputFormat
    submittedJob.setOutputFormatClass(
        GiraphConstants.HADOOP_OUTPUT_FORMAT_CLASS.get(conf));
    ••••••
    //提交 Job 
    submittedJob.submit();
    
    ••••••
    //獲取 Job 執行結果
    boolean passed = submittedJob.waitForCompletion(verbose);
    
    ••••••

    //如果執行失敗則會嘗試重啟 Job
    if (!passed) {
      //預設情況(指沒有指定 JobRetryChecker 情況)返回 null,即永遠不會重啟 Job
      String restartFrom = retryChecker.shouldRestartCheckpoint(submittedJob);
      if (restartFrom != null) {
        GiraphConstants.RESTART_JOB_ID.set(conf,restartFrom);
        continue;
      }
    }

    //如果 Job 執行成功或者失敗情況下不嘗試重新執行(預設情況下永遠不會嘗試嘗試重新執行)
    if (passed || !retryChecker.shouldRetry(submittedJob,tryCount)) {
      return passed;
    }
    •••••••
  }
}
複製程式碼

run 方法中首先會對 Hadoop 和 Giraph 進行配置,然後建立一個 Hadoop Job 物件。在設定好 Hadoop Job 的 MapperClass 和輸入輸出格式等相關資訊後,即會呼叫 submit 向 Hadoop 提交 Job。從程式碼中可以看到整個過程與提交普通的 Hadoop Job 基本無異。

Hadoop 內部執行

在 Giraph 呼叫 submit 向 Hadoop 提交 Job 之後,程式的執行就會進入到 Hadoop 內部,對於該部分主要需要了解 Hadoop 如何啟動 Giraph 的 MapTask。

內部提交 Job

org.apache.hadoop.mapreduce.Job#submit

public void submit() throws IOException,ClassNotFoundException {
  ensureState(JobState.DEFINE);
  //設定用新的 API
  setUseNewAPI();
  connect();
  final JobSubmitter submitter = 
      getJobSubmitter(cluster.getFileSystem(),cluster.getClient());
  //提交 Job 到系統
  status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException,ClassNotFoundException {
      return submitter.submitJobInternal(Job.this,cluster);
      }
  });
  state = JobState.RUNNING;
  ••••••
}
複製程式碼

submit 方法內部會建立 JobSubmitter 物件,然後通過 submitJobInternal 方法進一步提交 Job。

org.apache.hadoop.mapreduce.JobSubmitter#submitJobInternal

JobStatus submitJobInternal(Job job,Cluster cluster) 
throws ClassNotFoundException,IOException {

    ••••••

    Configuration conf = job.getConfiguration();
    addMRFrameworkToDistributedCache(conf);

    //獲得暫存目錄, 預設情況下路徑生成在 /tmp/hadoop/mapred/staging 下
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster,conf);
    ••••••
    //生成 Job ID
    JobID jobId = submitClient.getNewJobID();
    //設定 Job ID
    job.setJobID(jobId);
    //獲得提交 Job 的目錄
    Path submitJobDir = new Path(jobStagingArea,jobId.toString());
    JobStatus status = null;
    ••••••
    
    ••••••
    //實際提交 Job
    status = submitClient.submitJob(
        jobId,submitJobDir.toString(),job.getCredentials());
    ••••••
}
複製程式碼

在 submitJobInternal 中,Hadoop 會通過 submitClient 實際提交 Job。submitClient 是一個 ClientProtocol 介面,其有兩個實現,由於提交 Job 的時候 Hadoop 執行在 Standalone 模式下,所以這裡 submitClient 的實現是 LocalJobRunner。

啟動 MapTask

org.apache.hadoop.mapred.LocalJobRunner#submitJob

public org.apache.hadoop.mapreduce.JobStatus submitJob(
    org.apache.hadoop.mapreduce.JobID jobid,String jobSubmitDir,Credentials credentials) throws IOException {
  Job job = new Job(JobID.downgrade(jobid),jobSubmitDir);
  job.job.setCredentials(credentials);
  return job.status;
}
複製程式碼

org.apache.hadoop.mapred.LocalJobRunner.Job#Job

public Job(JobID jobid,String jobSubmitDir) throws IOException {
    ••••••

    this.start();
}
複製程式碼

submitJob 內部會建立一個 Job 物件,這裡的 Job 是繼承了 Thread 的 LocalJobRunner 的內部類。通過構造方法可以知道,submitJob 在建立 Job 的同時也開啟了執行緒,所以需要檢視 Job#run 方法。

org.apache.hadoop.mapred.LocalJobRunner.Job#run

@Override
public void run() {
    JobID jobId = profile.getJobID();
    JobContext jContext = new JobContextImpl(job,jobId);
    
    ••••••

    Map<TaskAttemptID,MapOutputFile> mapOutputFiles =
        Collections.synchronizedMap(new HashMap<TaskAttemptID,MapOutputFile>());
    
    //獲取需要執行的任務
    List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables(
        taskSplitMetaInfos,jobId,mapOutputFiles);
            
    initCounters(mapRunnables.size(),numReduceTasks);
    ExecutorService mapService = createMapExecutor();
    //執行任務
    runTasks(mapRunnables,mapService,"map");

    ••••••
    // delete the temporary directory in output directory
    outputCommitter.commitJob(jContext);
    status.setCleanupProgress(1.0f);

    ••••••
}
複製程式碼

org.apache.hadoop.mapred.LocalJobRunner.Job#getMapTaskRunnables

protected List<RunnableWithThrowable> getMapTaskRunnables(
        TaskSplitMetaInfo [] taskInfo,JobID jobId,Map<TaskAttemptID,MapOutputFile> mapOutputFiles) {

    int numTasks = 0;
    ArrayList<RunnableWithThrowable> list =
        new ArrayList<RunnableWithThrowable>();
  	//生成對應數量的 MapTaskRunnable
    for (TaskSplitMetaInfo task : taskInfo) {
    list.add(new MapTaskRunnable(task,numTasks++,mapOutputFiles));
    }

    return list;
}
複製程式碼

org.apache.hadoop.mapred.LocalJobRunner.Job#runTasks

private void runTasks(List<RunnableWithThrowable> runnables,ExecutorService service,String taskType) throws Exception {
    //提交任務
    for (Runnable r : runnables) {
    service.submit(r);
    }

    try {
    service.shutdown(); // Instructs queue to drain.

    // Wait for tasks to finish; do not use a time-based timeout.
    // (See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6179024)
    LOG.info("Waiting for " + taskType + " tasks");
    service.awaitTermination(Long.MAX_VALUE,TimeUnit.NANOSECONDS);
    } catch (InterruptedException ie) {
    // Cancel all threads.
    service.shutdownNow();
    throw ie;
    }
    ••••••
}
複製程式碼

對於 Job#run 方法應該主要關注 MapTaskRunnable 的生成和執行,可以看到 Hadoop 會通過 getMapTaskRunnables 方法根據分配的 Task 的數量生成對應數量的 MapTaskRunnable,然後會呼叫 runTasks 方法向執行緒池提交任務。

MapTaskRunnable 任務提交到執行緒池後繼續關注 MapTaskRunnable#run 方法

org.apache.hadoop.mapred.LocalJobRunner.Job.MapTaskRunnable#run

public void run() {
    try {
      ••••••
      MapTask map = new MapTask(systemJobFile.toString(),mapId,taskId,info.getSplitIndex(),1);
      ••••••
      try {
        ••••••
        map.run(localConf,Job.this);
        ••••••
    } catch (Throwable e) {
      this.storedException = e;
    }
  }
}
複製程式碼

從 MapTaskRunnable#run 中可以看到其建立了一個 MapTask 物件,並呼叫了 MapTask#run 方法。

org.apache.hadoop.mapred.MapTask#run

@Override
public void run(final JobConf job,final TaskUmbilicalProtocol umbilical)
throws IOException,ClassNotFoundException,InterruptedException {

    ••••••
    //org.apache.hadoop.mapreduce.Job#submit 設定了 useNewApi,所以返回 true
    boolean useNewApi = job.getUseNewMapper();

    ••••••

    if (useNewApi) {
        runNewMapper(job,splitMetaInfo,umbilical,reporter);
    } else {
        runOldMapper(job,reporter);
    }
    done(umbilical,reporter);
}
複製程式碼

MapTask#run 中會呼叫 runNewMapper 方法,所以繼續檢視該方法

org.apache.hadoop.mapred.MapTask#runNewMapper

private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewMapper(final JobConf job,final TaskSplitIndex splitIndex,final TaskUmbilicalProtocol umbilical,TaskReporter reporter
                    ) throws IOException,InterruptedException {
    // make a task context so we can get the classes
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
        new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,getTaskID(),reporter);
    // 反射獲取設定的 MapperClass 例項物件
    org.apache.hadoop.mapreduce.Mapper<INKEY,OUTVALUE> mapper =
        (org.apache.hadoop.mapreduce.Mapper<INKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getMapperClass(),job);

    ••••••

    //建立 Context
    org.apache.hadoop.mapreduce.MapContext<INKEY,OUTVALUE> 
    mapContext = 
        new MapContextImpl<INKEY,OUTVALUE>(job,input,output,committer,reporter,split);

    org.apache.hadoop.mapreduce.Mapper<INKEY,OUTVALUE>.Context 
        mapperContext = 
            new WrappedMapper<INKEY,OUTVALUE>().getMapContext(
                mapContext);

    try {
        ••••••
        mapper.run(mapperContext);
        ••••••
    } finally {
        ••••••
    }
}
複製程式碼

MapTask#runNewMapper 方法中會通過反射建立設定的 MapperClass 的物件,即 org.apache.giraph.job.GiraphJob#run 中設定的 GraphMapper 類的物件。在獲取到 GraphMapper 物件後,系統會呼叫其 run 方法,從而使得程式的執行進入到 Giraph 部分。

Giraph 執行 Job

org.apache.giraph.graph.GraphMapper#run

@Override
public void run(Context context) throws IOException,InterruptedException {
    // Notify the master quicker if there is Worker failure rather than
    // waiting for ZooKeeper to timeout and delete the ephemeral znodes
    try {
        //初始化
        setup(context);
        //執行計算
        while (context.nextKeyValue()) {
        graphTaskManager.execute();
        }
        //清理
        cleanup(context);
    } catch (RuntimeException e) {
        ••••••
    }
}
複製程式碼

從 GraphMapper#run 方法可以看到一個 Giraph Job 的執行能夠分為三個過程:

  • 初始化
  • 執行計算
  • 清理

下面針對初始化過程進行分析

初始化

org.apache.giraph.graph.GraphMapper#setup

@Override
public void setup(Context context)
  throws IOException,InterruptedException {
  // Execute all Giraph-related role(s) assigned to this compute node.
  // Roles can include "Master," "Worker," "zookeeper," or . . . ?
  graphTaskManager = new GraphTaskManager<I,V,E>(context);
  graphTaskManager.setup(
    DistributedCache.getLocalCacheArchives(context.getConfiguration()));
}
複製程式碼

GraphMapper#setup 方法中會建立 GraphTaskManager 物件,並呼叫其 setup 方法

org.apache.giraph.graph.GraphTaskManager#setup

public void setup(Path[] zkPathList) throws IOException,InterruptedException {
    Configuration hadoopConf = context.getConfiguration();
    //初始化一些配置
    conf = new ImmutableClassesGiraphConfiguration<I,E>(hadoopConf);
    ••••••
    //從配置中讀取 Zookeeper 的連線資訊,沒有提供外部 Zookeeper 情況下為空
    String serverPortList = conf.getZookeeperList();
    //如果沒有提供外部 Zookeeper 則需要自己啟動 Zookeeper
    if (serverPortList.isEmpty()) {
        if (startZooKeeperManager()) {
            return; // ZK connect/startup failed
        }
    } else {
        createZooKeeperCounter(serverPortList);
    }
    ••••••
    this.graphFunctions = determineGraphFunctions(conf,zkManager);
    if (zkManager != null && this.graphFunctions.isMaster()) {
        //將由 Master 建立的資料夾標記為刪除,檔案系統關閉時將會刪除檔案
        zkManager.cleanupOnExit();
    }
    try {
        //初始化 BSP 服務
        instantiateBspService();
    } catch (IOException e) {
        ••••••
    }
}
複製程式碼

GraphTaskManager#setup 方法主要做三件事:

  • 獲取 Zookeeper 連線資訊
  • 決定程式的角色
  • 初始化 BSP 服務

獲取 Zookeeper 連線資訊

GraphTaskManager#setup 中會通過 conf.getZookeeperList() 獲得 Zookeeper 的連線資訊。如果提供了外部 Zookeeper 則直接返回連線資訊,但如果沒有提供外部 Zookeeper 時,getZookeeperList() 會返回空值。此時 GraphTaskManager#setup 會呼叫 startZooKeeperManager 方法在某一個 Task 啟動 Zookeeper。

org.apache.giraph.graph.GraphTaskManager#startZooKeeperManager

/**
 * Instantiate and configure ZooKeeperManager for this job. This will
 * result in a Giraph-owned Zookeeper instance,a connection to an
 * existing quorum as specified in the job configuration,or task failure
 * @return true if this task should terminate
 */
private boolean startZooKeeperManager() throws IOException,InterruptedException {
    zkManager = new ZooKeeperManager(context,conf);
    context.setStatus("setup: Setting up Zookeeper manager.");
    zkManager.setup();
    //如果計算已經結束則不需要再啟動 Zookeeper,
    //大部分情況應該會在沒有提供外部 Zookeeper 且重啟 Task 時候起作用
    if (zkManager.computationDone()) {
        done = true;
        return true;
    }
    zkManager.onlineZooKeeperServer();
    //更新 Zookeeper 連線資訊,建立計數器
    String serverPortList = zkManager.getZooKeeperServerPortString();
    conf.setZookeeperList(serverPortList);
    createZooKeeperCounter(serverPortList);
    return false;
}
複製程式碼

startZooKeeperManager 中首先會建立 ZooKeeperManager 物件,然後呼叫其 setup 方法

org.apache.giraph.zk.ZooKeeperManager#setup

public void setup() throws IOException,InterruptedException {
    createCandidateStamp();
    getZooKeeperServerList();
}
複製程式碼

ZooKeeperManager#setup 方法中會首先呼叫 createCandidateStamp 方法

org.apache.giraph.zk.ZooKeeperManager#createCandidateStamp

/**
 * Create a HDFS stamp for this task.  If another task already
 * created it,then this one will fail,which is fine.
 */
public void createCandidateStamp() {
    ••••••
    fs.mkdirs(baseDirectory);
    ••••••
    fs.mkdirs(serverDirectory);
    ••••••
    if (!fs.getFileStatus(baseDirectory).isDir()) {
    throw new IllegalArgumentException(
        "createCandidateStamp: " + baseDirectory +
        " is not a directory,but should be.");
    }

    ••••••
    //根據 hostname 和 taskPartition 生成檔名
    Path myCandidacyPath = new Path(
        taskDirectory,myHostname +
        HOSTNAME_TASK_SEPARATOR + taskPartition);
    try {
        ••••••
        fs.createNewFile(myCandidacyPath);
    } catch (IOException e) {
        LOG.error("createCandidateStamp: Failed (maybe previous task " +
            "failed) to create filestamp " + myCandidacyPath,e);
    }
}
複製程式碼

在 createCandidateStamp 方法中,每個 Task 會根據自己的 hostname 和 taskPartition 在 _bsp/_defaultZkManagerDir/_task 下建立對應檔案,這些檔案將會在系統選擇某個 Task 啟動 Zookeeper 服務時用到。具體結果如下圖所示:

圖中 hostname 是 localhost 的原因在於,執行原始碼的時候 Hadoop 處於 Standalone 模式。

在 createCandidateStamp 執行完成之後,ZooKeeperManager#setup 會接著呼叫 getZooKeeperServerList

org.apache.giraph.zk.ZooKeeperManager#getZooKeeperServerList

private void getZooKeeperServerList() throws IOException,InterruptedException {
    String serverListFile;

    //taskPartition 為 0 的 Task 會建立 zooKeeperServerList 
    if (taskPartition == 0) {
      //0 號 Task 如果重啟檢查到已經有 serverList 則不會重新建立
      serverListFile = getServerListFile();
      if (serverListFile == null) {
        //建立 serverList
        createZooKeeperServerList();
      }
    }

    while (true) {
      //其餘 Task 等待 serverList 的建立
      serverListFile = getServerListFile();
      ••••••
      if (serverListFile != null) {
        break;
      }
      //減少 CPU 的佔用
      try {
        Thread.sleep(pollMsecs);
      } catch (InterruptedException e) {
        LOG.warn("getZooKeeperServerList: Strange interrupted " +
            "exception " + e.getMessage());
      }

    }

    //解析 serverList 中的資訊
    String[] serverHostList = serverListFile.substring(
        ZOOKEEPER_SERVER_LIST_FILE_PREFIX.length()).split(
            HOSTNAME_TASK_SEPARATOR);
    ••••••

    //獲得 Zookeeper 服務所在節點的 hostname
    zkServerHost = serverHostList[0];
    //獲得應該啟動 Zookeeper 服務的 Task 的 taskPartition
    zkServerTask = Integer.parseInt(serverHostList[1]);
     
    //各個 Task 更新自己的 zkServerPortString
    updateZkPortString();
  }
複製程式碼

getZooKeeperServerList 方法會根據 taskPartition 進行判斷,如果是 0 號 Task 則會先呼叫 createZooKeeperServerList 建立 serverListFile(serverListFile 表明了 Zookeeper 服務所在的 hostname 和 taskPartition),而如果是非 0 號 Task 則會進行輪詢來獲取 serverListFile 的檔名。在獲取到檔名後會對其進行解析來更新 zkServerHost、zkServerTask 以及 zkServerPortString。

接下來會對 createZooKeeperServerList 和 getZooKeeperServerList 進行分析以便更好的理解系統如何選取啟動 Zookeeper 服務的 Task

  • org.apache.giraph.zk.ZooKeeperManager#createZooKeeperServerList

    private void createZooKeeperServerList() throws IOException,InterruptedException {
      String host;
      String task;
      while (true) {
        //返回 Task 下檔案的元資料,會有一個檔名格式校驗的過程,會去掉以 . 開頭和 crc 結尾檔案
        FileStatus [] fileStatusArray = fs.listStatus(taskDirectory);
        if (fileStatusArray.length > 0) {
          //選取第一位的元資料標識的 Task 去啟動 Zookeeper 服務
          FileStatus fileStatus = fileStatusArray[0];
          //解析資訊
          String[] hostnameTaskArray =
              fileStatus.getPath().getName().split(
                  HOSTNAME_TASK_SEPARATOR);
          ••••••
          host = hostnameTaskArray[0];
          task = hostnameTaskArray[1];
          break;
        }
        Thread.sleep(pollMsecs);
      }
      //根據解析的資訊生成 serverListFile 檔名
      String serverListFile =
          ZOOKEEPER_SERVER_LIST_FILE_PREFIX + host +
          HOSTNAME_TASK_SEPARATOR + task;
      Path serverListPath =
          new Path(baseDirectory,serverListFile);
      ••••••
      }
      //建立檔案
      fs.createNewFile(serverListPath);
    }
    複製程式碼

    createZooKeeperServerList 中會獲取所有 Task 在 createCandidateStamp 方法中建立的檔案的檔名,然後選取返回陣列中第一個元素標識的 Task 資訊去建立 serverListFile。

  • org.apache.giraph.zk.ZooKeeperManager#getServerListFile

    private String getServerListFile() throws IOException {
      String serverListFile = null;
      //baseDirectory 是 _bsp/_defaultZkManagerDir,列出資料夾下的檔案元資料
      FileStatus [] fileStatusArray = fs.listStatus(baseDirectory);
      for (FileStatus fileStatus : fileStatusArray) {
        //篩選檔名中含有 zkServerList_ 的檔案,即 taskpartition 為 0 task 建立的 serverListFile
        if (fileStatus.getPath().getName().startsWith(
            ZOOKEEPER_SERVER_LIST_FILE_PREFIX)) {
          serverListFile = fileStatus.getPath().getName();
          break;
        }
      }
      return serverListFile;
    }
    複製程式碼

    getServerListFile 會獲取 baseDirectory 下的檔案元資料,然後篩選出對應的 serverListFile,最後返回其檔名。

接著回到 startZooKeeperManager 方法中,在選定了啟動 Zookeeper 服務的 Task 後,系統會首先判斷計算是否完成,如果已經完成則表明無需再繼續執行。否則會呼叫 onlineZooKeeperServer 方法啟動 Zookeeper 服務。

org.apache.giraph.zk.ZooKeeperManager#onlineZooKeeperServer

public void onlineZooKeeperServer() throws IOException {
  //如果當前 task 的 taskPartition 等於 zkServerTask,則需要啟動 Zookeeper 服務 
  if (zkServerTask == taskPartition) {
    File zkDirFile = new File(this.zkDir);
    try {
      //刪除舊的資料夾
      ••••••
      FileUtils.deleteDirectory(zkDirFile);
    } catch (IOException e) {
      ••••••
    }
    //生成 Zookeeper 配置
    generateZooKeeperConfig();
    synchronized (this) {
      zkRunner = createRunner();
      //啟動 Zookeeper 服務
      int port = zkRunner.start(zkDir,config);
      if (port > 0) {
        zkBasePort = port;
        updateZkPortString();
      }
    }

    // Once the server is up and running,notify that this server is up
    // and running by dropping a ready stamp.
    int connectAttempts = 0;
    final int maxConnectAttempts =
        conf.getZookeeperConnectionAttempts();
    while (connectAttempts < maxConnectAttempts) {
      try {
        ••••••
        //連線 Zookeeper 服務
        InetSocketAddress zkServerAddress =
            new InetSocketAddress(myHostname,zkBasePort);
        Socket testServerSock = new Socket();
        testServerSock.connect(zkServerAddress,5000);
        ••••••
        break;
      } catch (SocketTimeoutException e) {
        LOG.warn("onlineZooKeeperServers: Got " +
            "SocketTimeoutException",e);
      } catch (ConnectException e) {
        LOG.warn("onlineZooKeeperServers: Got " +
            "ConnectException",e);
      } catch (IOException e) {
        LOG.warn("onlineZooKeeperServers: Got " +
            "IOException",e);
      }

      ++connectAttempts;
      try {
        Thread.sleep(pollMsecs);
      } catch (InterruptedException e) {
        LOG.warn("onlineZooKeeperServers: Sleep of " + pollMsecs +
            " interrupted - " + e.getMessage());
      }
    }
    //超過最大的嘗試數,連線失敗
    if (connectAttempts == maxConnectAttempts) {
      throw new IllegalStateException(
          "onlineZooKeeperServers: Failed to connect in " +
              connectAttempts + " tries!");
    }
    //
    Path myReadyPath = new Path(
        serverDirectory,myHostname +
        HOSTNAME_TASK_SEPARATOR + taskPartition +
        HOSTNAME_TASK_SEPARATOR + zkBasePort);
    try {
      ••••••
      //建立檔案表明 Zookeeper 服務已經準備好,並且提供連線的資訊
      fs.createNewFile(myReadyPath);
    } catch (IOException e) {
      ••••••
    }
  } else {
    //其餘 Task 等待 Zookeeper 服務的啟動
    int readyRetrievalAttempt = 0;
    String foundServer = null;
    while (true) {
      try {
        FileStatus [] fileStatusArray =
            fs.listStatus(serverDirectory);
        //檢查 serverDirectory 資料夾下是否生成了 Zookeeper 連線資訊檔案
        if ((fileStatusArray != null) &&
            (fileStatusArray.length > 0)) {
          //解析檔案中的連線資訊
          for (int i = 0; i < fileStatusArray.length; ++i) {
            String[] hostnameTaskArray =
                fileStatusArray[i].getPath().getName().split(
                    HOSTNAME_TASK_SEPARATOR);
            if (hostnameTaskArray.length != 3) {
              throw new RuntimeException(
                  "getZooKeeperServerList: Task 0 failed " +
                      "to parse " +
                      fileStatusArray[i].getPath().getName());
            }
            //zookeeper 服務所在地址
            foundServer = hostnameTaskArray[0];
            //zookeeper 服務的連線埠
            zkBasePort = Integer.parseInt(hostnameTaskArray[2]);
            //更新 zookeeper 的連線資訊
            updateZkPortString();
          }
          ••••••
          //檢視 hostname 是否相同,相同則跳出等待,具體場景尚未想到
          if (zkServerHost.equals(foundServer)) {
            break;
          }
        } else {
          ••••••
        }
        Thread.sleep(pollMsecs);
        ++readyRetrievalAttempt;
      } catch (IOException e) {
        throw new RuntimeException(e);
      } catch (InterruptedException e) {
        ••••••
      }
    }
  }
}
複製程式碼

可以看到 onlineZooKeeperServer 實際做了兩件事情:一是在選定的 Task 上啟動 Zookeeper 服務,並建立檔案表明服務已經準備好。二是所有未啟動 Zookeeper 服務的 Task 去更新 Zookeeper 的連線資訊。

分配角色

在啟動完成 Zookeeper 服務之後系統會更新 Zookeeper 相關的配置資訊然後返回到 org.apache.giraph.graph.GraphTaskManager#setup 方法中,之後會呼叫 determineGraphFunctions

org.apache.giraph.graph.GraphTaskManager#determineGraphFunctions

private static GraphFunctions determineGraphFunctions(
  ImmutableClassesGiraphConfiguration conf,ZooKeeperManager zkManager) {
  //判斷是本地模式還是叢集模式,本地模式只會啟動一個 Task
  boolean splitMasterWorker = conf.getSplitMasterWorker();
  //獲取當前 Task 的 taskPartition
  int taskPartition = conf.getTaskPartition();
  //判斷是否提供了外部的 Zookeeper
  boolean zkAlreadyProvided = conf.isZookeeperExternal();
  //初始時刻 Task 的角色
  GraphFunctions functions = GraphFunctions.UNKNOWN;
  
  if (!splitMasterWorker) {
    //本地模式下如果是內部啟動 Zookeeper 則 Task 充當所有的角色,否則充當 Master 和 Worker
    if ((zkManager != null) && zkManager.runsZooKeeper()) {
      functions = GraphFunctions.ALL;
    } else {
      functions = GraphFunctions.ALL_EXCEPT_ZOOKEEPER;
    }
  } else {
    if (zkAlreadyProvided) {
      //如果有外部 Zookeeper 則 0 號 Task 就是 Master,其餘的都是 Worker
      if (taskPartition == 0) {
        functions = GraphFunctions.Master_ONLY;
      } else {
        functions = GraphFunctions.Worker_ONLY;
      }
    } else {
      //如果是內部啟動的 Zookeeper 服務,
      //則啟動 Zookeeper 服務的 Task 充當 Master 和 zookeeper 角色,其餘為 Worker
      if ((zkManager != null) && zkManager.runsZooKeeper()) {
        functions = GraphFunctions.Master_ZOOKEEPER_ONLY;
      } else {
        functions = GraphFunctions.Worker_ONLY;
      }
    }
  }
  return functions;
}
複製程式碼

determineGraphFunctions 主要是對 Task 的角色進行判斷,系統提供了 6 種角色:

  • UNKNOWN

    表明 Task 的角色還未知

  • Master_ONLY

    表明 Task 是 Master

  • Master_ZOOKEEPER_ONLY

    表明 Task 既是 Master 也是 Zookeeper

  • Worker_ONLY

    表明 Task 只是 Worker

  • ALL

    表明 Task 既是 Master 也是 Worker 和 Zookeeper

  • ALL_EXCEPT_ZOOKEEPER

    表明 Task 既是 Master 也是 Worker

初始化 BSP

在決定各個 Task 的角色之後,系統會呼叫 instantiateBspService 初始化 BSP 服務。

org.apache.giraph.graph.GraphTaskManager#instantiateBspService

private void instantiateBspService()
throws IOException,InterruptedException {
  if (graphFunctions.isMaster()) {
    ••••••
    //建立 Master 物件
    serviceMaster = new BspServiceMaster<I,E>(context,this);
    //Master 執行線上程裡面
    MasterThread = new MasterThread<I,E>(serviceMaster,context);
    MasterThread.setUncaughtExceptionHandler(
        createUncaughtExceptionHandler());
    MasterThread.start();
  }
  if (graphFunctions.isWorker()) {
    ••••••
    //建立 Worker 物件
    serviceWorker = new BspServiceWorker<I,this);
    installGCMonitoring();
    ••••••
  }
}
複製程式碼

instantiateBspService 中對於 Master 主要是建立 serviceMaster 物件,然後啟動 MasterThread 執行緒,對於 Worker 則是建立 serviceWorker 物件。

總結

總的來說,Giraph 的 Job 提交和初始化依據以下流程來執行:

  1. 使用者向 Giraph 提交 Job
  2. Giraph 向 Hadoop 提交 Job
  3. Hadoop 啟動 MapTask,並執行 GraphMapper 的 run 方法
  4. GraphMapper 建立 GraphTaskManager 物件進行初始化
  5. 初始化過程首先獲取 Zookeeper 連線資訊,如果沒有外接 Zookeeper 則需要從所有 MapTask 中進行選取 Task 來啟動 Zookeeper 服務。
  6. 獲取到 Zookeeper 連線資訊之後會根據 determineGraphFunctions 分配角色,由此區分 MapTask 中的 Master 和 Worker
  7. 分配完角色之後則會通過 instantiateBspService 來初始化 BSP 服務,由此結束整個初始化過程。