1. 程式人生 > >Hadoop: Writing YARN Applications

Hadoop: Writing YARN Applications

1.Concepts and Flow

本文主要講述如何通過client向Yarn ResourceManager(RM)提交應用

應用可通過YarnClient物件向Yarn RM提交應用。當YarnClient啟動後,該client物件可為Applicaiton設定應用上下文、準備好一個初始容器(該容器即Container0,用來執行ApplicationMaster)、提交目標應用。

為了能夠正確的啟動執行應用,你需要提供如下資訊:應用需要用到的本地檔案/Jars、實際執行的命令(和命令引數,如果需要的話)、其他的OS環境等。另外,你需要描述你的應用需要用到的Unix程序,以便系統能夠幫你啟動這些程序。

之後,Yarn ResourceManager會按照你提供的資訊、在分配的容器裡啟動ApplicationMaster。在ApplicationMaster啟動之後,會和Yarn cluster保持通訊,處理應用執行過程的各種情況(以非同步的方式)。在應用啟動過程中, ApplicationMaster 的主要任務如下:

  • 和ResourceManage通訊,和ResourceManager協商以後為容器分配資源,該部分可利用AmRMClientAsync物件完成。AmRMClientAsync物件通過處理AMRMClientAsync.CallbackHandler事件方法來處理相關的通訊任務;相關的handler需要使用者顯示的指定。
  • 在容器分配完後,和Yarn NodeManagers通訊,以在NodeManage 上的容器中啟動應用。可在容器分配後,啟動一個runnable物件來啟動容器。作為啟動容器操作的一部分,AM需要指明ContainerLaunchContext,用來描述容器執行環境,如命令列、環境引數等。

在應用執行過程中,ApplicationMaster和NodeManager通過NMClientAsync物件通訊。所有的容器事件都通過NMClientAsync物件繫結的NMClientAsync.CallBackHander物件完成,典型的事件包括:啟動、停止、狀態更新、錯誤。另外,ApplicationMaster通過AMRMClientAsync.CallbackHander物件的getProgress()方法向ResourceManager報告整體的應用進度。

另外,除了非同步通訊方式,Yarn提供通過的通訊方式,對應的物件包括AMRMClient和NMClient。但是,由於非同步的方式使用起來更加簡單,因此推薦使用此方式。

1.1 本節要點

  • 首先通過 YarnClient物件和Yarn ResourceManager通訊,涉及到的動作包括:分配applicationMaster執行的containner;
  • ApplicationMaster 通過AMRMClient物件和Yarn通訊,處理任務容器分配、任務進度報告等的通訊;
  • ApplicationMaster通過NMClient物件和NodeManager通訊,管理容器執行中的各種事件。

2. Interfaces

重要介面:

  1. YarnClient : Client<–>ResourceManager

  2. AMRMClientAsync and AMRMClientAsync.CallbackHandler :ApplicationMaster<–>ResourceManager

  3. NMClientAsync 和 NMClientAsync.CallbackHandler : ApplicationMaster<–>NodeManager和 ApplicationMaster<–>NodeManager

上面的三個介面封裝了ApplicationClientProtocol 、ApplicationMasterProtocol 和ContainerManagementProtocol三個協議,用來為程式設計提供方便。

3.Writing a simple Yarn Applicaiton

3.1 Writing a simple Client

  1. 初始化並啟動YarnClient
  YarnClient yarnClient = YarnClient.createYarnClient();
  yarnClient.init(conf);
  yarnClient.start();
  1. 使用YarnClient建立Application並獲取application id
 YarnClientApplication app = yarnClient.createApplication();
  GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
- 獲取的appResponse中還包含了叢集的資訊,例如叢集中資源的最大最小值等。
  1. 設定應用資訊 3.1 應用client的一大難點就是設定ApplicationSubmissionContext 物件,該物件中包含了所有ResourceManager啟動ApplicationMaster所需要的資訊,包括:應用id,名字,所用的任務佇列,優先順序,提交應用的使用者資訊,ContainerLaunchContext。 3.2 ContainerLaunchContext:在該ContainerLaunchContext包含容器的資訊,該容器用來執行ApplicationMaster ,包括本機資源(如可執行檔案、jar,輸入檔案等),環境設定(如ClassPath等),待執行的命令和安全T*okens(RECT)
// 設定應用名字等基本資訊
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
appContext.setApplicationName(appName);

//設定本地資源、日誌配置
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
FileSystem fs = FileSystem.get(conf);
addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(),
    localResources, null);

if (!log4jPropFile.isEmpty()) {
  addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(),
      localResources, null);
}

//設定應用執行過程中需要用到的shell資源(為了讓任務container能夠訪問到,應當將該資源設定成Yarn叢集資源(並不是為了給ApplicationMaster使用的),如存放到HDFS中)
String hdfsShellScriptLocation = "";
long hdfsShellScriptLen = 0;
long hdfsShellScriptTimestamp = 0;
if (!shellScriptPath.isEmpty()) {
  Path shellSrc = new Path(shellScriptPath);
  String shellPathSuffix =
      appName + "/" + appId.toString() + "/" + SCRIPT_PATH;
  Path shellDst =
      new Path(fs.getHomeDirectory(), shellPathSuffix);
  fs.copyFromLocalFile(false, true, shellSrc, shellDst);
  hdfsShellScriptLocation = shellDst.toUri().toString();
  FileStatus shellFileStatus = fs.getFileStatus(shellDst);
  hdfsShellScriptLen = shellFileStatus.getLen();
  hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
}

if (!shellCommand.isEmpty()) {
  addToLocalResources(fs, null, shellCommandPath, appId.toString(),
      localResources, shellCommand);
}

if (shellArgs.length > 0) {
  addToLocalResources(fs, null, shellArgsPath, appId.toString(),
      localResources, StringUtils.join(shellArgs, " "));
}

// 設定Application Master執行的環境變數
Map<String, String> env = new HashMap<String, String>();

// put location of shell script into env
// using the env info, the application master will create the correct local resource for the
// eventual containers that will be launched to execute the shell scripts
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));

// 設定jar資訊
StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$())
  .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
for (String c : conf.getStrings(
    YarnConfiguration.YARN_APPLICATION_CLASSPATH,
    YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
  classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
  classPathEnv.append(c.trim());
}
classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append(
  "./log4j.properties");

// 設定命令引數
Vector<CharSequence> vargs = new Vector<CharSequence>(30);

// Set java executable command
LOG.info("Setting up app master command");
vargs.add(Environment.JAVA_HOME.$$() + "/bin/java");
// Set Xmx based on am memory size
vargs.add("-Xmx" + amMemory + "m");
// Set class name
vargs.add(appMasterMainClass);
// Set params for Application Master
vargs.add("--container_memory " + String.valueOf(containerMemory));
vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
vargs.add("--num_containers " + String.valueOf(numContainers));
vargs.add("--priority " + String.valueOf(shellCmdPriority));

for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
  vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
}
if (debugFlag) {
  vargs.add("--debug");
}

vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");

// Get final command
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
  command.append(str).append(" ");
}

LOG.info("Completed setting up app master command " + command.toString());
List<String> commands = new ArrayList<String>();
commands.add(command.toString());

// 設定applicationMaster containerContext
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
  localResources, env, commands, null, null, null);

// Set up resource type requirements
// For now, both memory and vcores are supported, so we set memory and
// vcores requirements
Resource capability = Resource.newInstance(amMemory, amVCores);
appContext.setResource(capability);

// Service data is a binary blob that can be passed to the application
// Not needed in this scenario
// amContainer.setServiceData(serviceData);

// Setup security tokens
if (UserGroupInformation.isSecurityEnabled()) {
  // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce
  Credentials credentials = new Credentials();
  String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
  if (tokenRenewer == null | | tokenRenewer.length() == 0) {
    throw new IOException(
      "Can't get Master Kerberos principal for the RM to use as renewer");
  }

  // For now, only getting tokens for the default file-system.
  final Token<?> tokens[] =
      fs.addDelegationTokens(tokenRenewer, credentials);
  if (tokens != null) {
    for (Token<?> token : tokens) {
      LOG.info("Got dt for " + fs.getUri() + "; " + token);
    }
  }
  DataOutputBuffer dob = new DataOutputBuffer();
  credentials.writeTokenStorageToStream(dob);
  ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
  amContainer.setTokens(fsTokens);
}

appContext.setAMContainerSpec(amContainer);
  1. 在設定過程結束後,應用client就可以提交應用到特定的應用佇列中去了
//設定應用優先順序
Priority pri = Priority.newInstance(amPriority);
appContext.setPriority(pri);

// 設定應用在Yarn中的任務佇列
appContext.setQueue(amQueue);

// 提交應用
yarnClient.submitApplication(appContext);
  1. 在提交完之後,RM在後臺就會接受該應用了, 然後,RM為AM分配包含所需要的資源的容器,然後設定好環境、在該容器中啟動AM.

  2. 應用Client通過如下方式來追蹤Application任務的進度:

    • 通過YarnClient物件,向yarn RM請求應用進度報告;
ApplicationReport report = yarnClient.getApplicationReport(appId);
    • 如果ApplicationMaster支援的化,可直接和ApplicationMaster通訊,請求任務進度報告。從RM收到ApplicationReport包括的資訊包括:1,通用的應用資訊,包括應用id,提交的任務佇列,使用者,啟動時間;2,AM詳細資訊,包括執行AM的主機,rpc埠等;3,應用追蹤資訊,如果AM支援進度追蹤,則keep通過applicationreport的getTrackingUrl方法來查詢任務資訊;4,應用狀態。

另外,在某些情況下,如應用耗費了大量時間或者其他元素,應用client可能期望能夠殺死應用。通過YarnClient的killApplication方法達成這個目的。該方法通過ResouceManagner給ApplicationMaster傳送kill訊號。

  yarnClient.killApplication(appId);

3.2 Writing an ApplicationMaster (AM)

關於AM : 你需要有如下的共識。

  1. AM是job的真正擁有者,由RM啟動,由應用client設定好各種任務引數。
  2. 由於AM也是執行在容器中,它可能會和其他應用共享物理主機,因此,AM不能對使用監控的埠做任何的假設(如只能使用預定義的8888埠,該埠可能被其他應用佔用了)
  3. 當AM啟動起來後,AM可以通過查詢到執行自身的NodeManager的資訊和其他的環境引數。當AM啟動起來後,AM可以通過查詢到執行自身的NodeManager的資訊和其他的環境引數。
  4. AM和ResourceManager做的任何互動都需要攜帶ApplicationAttemptId 引數。獲取ApplicationAttemptId 的方式可參見如下程式碼。
Map<String, String> envs = System.getenv();
String containerIdString =
    envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
if (containerIdString == null) {
  // container id should always be set in the env by the framework
  throw new IllegalArgumentException(
      "ContainerId not set in the environment");
}
ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
  1. 在AM自身初始化完成後,可以啟動兩個client用於和RM和NodeManager通訊,並設定好事件處理函式,用於處理各種事件。
  AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
  amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
  amRMClient.init(conf);
  amRMClient.start();

  containerListener = createNMCallbackHandler();
  nmClientAsync = new NMClientAsyncImpl(containerListener);
  nmClientAsync.init(conf);
  nmClientAsync.start();
  1. 另外,AM需要和RM保持heartbeats訊息,來告知AM自己還活躍。
// Register self with ResourceManager
// This will start heartbeating to the RM
appMasterHostname = NetUtils.getHostname();
RegisterApplicationMasterResponse response = amRMClient
    .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
        appMasterTrackingUrl);

在RM heartbeat response中,包含了一些叢集中現狀資訊

// 叢集當前最大可用記憶體、虛擬cpu核數,之前AM執行的Container數目
int maxMem = response.getMaximumResourceCapability().getMemory();
int maxVCores = response.getMaximumResourceCapability().getVirtualCores();

List<Container> previousAMRunningContainers =
    response.getContainersFromPreviousAttempts();
  1. AM可以請求一系列的容器,用來執行特定的job (申請的容器記憶體大小、容器數量、時機可根據heartbeat response中的叢集狀態調整,以提高成功率)
List<Container> previousAMRunningContainers =
    response.getContainersFromPreviousAttempts();
LOG.info("Received " + previousAMRunningContainers.size()
    + " previous AM's running containers on AM registration.");

int numTotalContainersToRequest =
    numTotalContainers - previousAMRunningContainers.size();
// Setup ask for containers from RM
// Send request for containers to RM
// Until we get our fully allocated quota, we keep on polling RM for
// containers
// Keep looping until all the containers are launched and shell script
// executed on them ( regardless of success/failure).
for (int i = 0; i < numTotalContainersToRequest; ++i) {
  ContainerRequest containerAsk = setupContainerAskForRM();
  amRMClient.addContainerRequest(containerAsk);
}

在setupContainerAskForRM中,你需要設定如下事件: (1)記憶體,vcores (2)優先順序 8. 在傳送了容器分配請求後,可通過AMRMClientAsync 物件來非同步的處理事件,來啟動容器。在AM處理onContainersAllocated回撥函式時,需要初始化設定ContainerLaunchContext 。由於設定過程可能比較繁瑣,最好新開一個執行緒,用於設定初始化、啟動容器,從而不要阻塞AM主執行緒,如下所示。在下面例子中,通過launchThread 執行緒完成容器初始化,啟動操作。

@Override
public void onContainersAllocated(List<Container> allocatedContainers) {
  LOG.info("Got response from RM for container ask, allocatedCnt="
      + allocatedContainers.size());
  numAllocatedContainers.addAndGet(allocatedContainers.size());
  for (Container allocatedContainer : allocatedContainers) {
    LaunchContainerRunnable runnableLaunchContainer =
        new LaunchContainerRunnable(allocatedContainer, containerListener);
    Thread launchThread = new Thread(runnableLaunchContainer);

    // launch and start the container on a separate thread to keep
    // the main thread unblocked
    // as all containers may not be allocated at one go.
    launchThreads.add(launchThread);
    launchThread.start();
  }
}
  1. launchThread 執行緒的具體任務如下所示。AM通過NMClientAsync和NodeManager通訊,啟動容器。
// Set the necessary command to execute on the allocated container
Vector<CharSequence> vargs = new Vector<CharSequence>(5);

// Set executable command
vargs.add(shellCommand);
// Set shell script path
if (!scriptPath.isEmpty()) {
  vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath
    : ExecShellStringPath);
}

// Set args for the shell command if any
vargs.add(shellArgs);
// Add log redirect params
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");

// Get final command
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
  command.append(str).append(" ");
}

List<String> commands = new ArrayList<String>();
commands.add(command.toString());

// Set up ContainerLaunchContext, setting local resource, environment,
// command and token for constructor.

// Note for tokens: Set up tokens for the container too. Today, for normal
// shell commands, the container in distribute-shell doesn't need any
// tokens. We are populating them mainly for NodeManagers to be able to
// download anyfiles in the distributed file-system. The tokens are
// otherwise also useful in cases, for e.g., when one is running a
// "hadoop dfs" command inside the distributed shell.
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
  localResources, shellEnv, commands, null, allTokens.duplicate(), null);
containerListener.addContainer(container.getId(), container);
nmClientAsync.startContainerAsync(container, ctx);
  1. 在AM heartbeat中,可通過如下方式來報告進度
@Override
public float getProgress() {
  // set progress to deliver to RM on next heartbeat
  float progress = (float) numCompletedContainers.get()
      / numTotalContainers;
  return progress;
}
  1. 在所有的任務完成後,AM需要和Yarn取消註冊,停止所有的client物件
try {
  amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
} catch (YarnException ex) {
  LOG.error("Failed to unregister application", ex);
} catch (IOException e) {
  LOG.error("Failed to unregister application", e);
}

amRMClient.stop();