1. 程式人生 > >HDPCD-Java-複習筆記(20)

HDPCD-Java-複習筆記(20)

Orchestration of MapReduce jobs can be accomplished in several ways, including:

Linear Chain of MapReduce Jobs

Use the return value of waitForCompletion to determine the success of each job in the chain

JobControl

A class in the MapReduce API that coordinates multiple jobs

Apache Oozie

A workflow scheduler system for managing jobs and also for scheduling recurring jobs



The linear chain workflow is  straightforward. Start a job in a run method of a Tool instance, wait for the job to  complete, and then start subsequent jobs in the same run method. Continue until all jobs are  complete.

For example:
Job job1 = Job.getInstance(getConf(), "CreateBloomFilter");
boolean job1success = job1.waitForCompletion(true);
if(!job1success) {
  System.out.println("The CreateBloomFilter job failed!");
  return -1;
}
Job job2 = Job.getInstance(conf, "FilterStocksJob");
boolean job2success = job2.waitForCompletion(true);
if(!job2success) {
  System.out.println("The FilterStocksJob failed!");
  return -1;
}
return 1;
JobControl control = new JobControl("bloom");
ControlledJob cjob1 = new ControlledJob(job1, null);
List<ControlledJob> dependentJobs =
	   new ArrayList<ControlledJob>();
dependentJobs.add(cjob1);
ControlledJob cjob2 =
	  new ControlledJob(job2, dependentJobs);
control.addJob(cjob1);
control.addJob(cjob2);
new Thread(control).start();
   
while (!control.allFinished()) {
   Thread.sleep(1000);
}
control.stop();
•A JobControl manages ControlledJob instances.
•The addJob method is used to add a ControlledJob to a JobControl.
•A ControlledJob has two main components: a Job, and a List of that Job’s dependencies.
•A Job will not be started until all its dependencies are in the SUCCESS state.
•The JobControl class implements Runnable. To run the jobs in the JobControl instance, wrap it in a Thread and then start the Thread.

Oozie has two main capabilities:

  • /appdir/workflow.xml
  • /appdir/config-default.xml
  • /appdir/lib/files.jar
The config-default.xml file is optional, and contains properties shared by all workflows.

Consider this example:

<?xml version="1.0" encoding="utf-8"?>

<workflow-app xmlns="uri:oozie:workflow:0.2" name="payroll-workflow">  
  <start to="payroll-job"/>  
  <action name="payroll-job"> 
    <map-reduce> 
      <job-tracker>${resourceManager}</job-tracker>  
      <name-node>${nameNode}</name-node>  
      <prepare> 
        <delete path="${nameNode}/user/${wf:user()}/payroll/result"/> 
      </prepare>  
      <configuration> 
        <property> 
          <name>mapreduce.job.queuename</name>  
          <value>${queueName}</value> 
        </property>  
        <property> 
          <name>mapred.mapper.new-api</name>  
          <value>true</value> 
        </property>  
        <property> 
          <name>mapred.reducer.new-api</name>  
          <value>true</value> 
        </property>  
        <property> 
          <name>mapreduce.job.map.class</name>  
          <value>payroll.PayrollMapper</value> 
        </property>  
        <property> 
          <name>mapreduce.job.reduce.class</name>  
          <value>payroll.PayrollReducer</value> 
        </property>  
        <property> 
          <name>mapreduce.job.inputformat.class</name>  
          <value>org.apache.hadoop.mapreduce.lib.input.TextInputFormat</value> 
        </property>  
        <property> 
          <name>mapreduce.job.outputformat.class</name>  
          <value>org.apache.hadoop.mapreduce.lib.output.NullOutputFormat</value> 
        </property>  
        <property> 
          <name>mapreduce.job.output.key.class</name>  
          <value>payroll.EmployeeKey</value> 
        </property>  
        <property> 
          <name>mapreduce.job.output.value.class</name>  
          <value>payroll.Employee</value> 
        </property>  
        <property> 
          <name>mapreduce.job.reduces</name>  
          <value>20</value> 
        </property>  
        <property> 
          <name>mapreduce.input.fileinputformat.inputdir</name>  
          <value>${nameNode}/user/${wf:user()}/payroll/input</value> 
        </property>  
        <property> 
          <name>mapreduce.output.fileoutputformat.outputdir</name>  
          <value>${nameNode}/user/${wf:user()}/payroll/result</value> 
        </property>  
        <property> 
          <name>taxCode</name>  
          <value>${taxCode}</value> 
        </property> 
      </configuration> 
    </map-reduce>  
    <ok to="compute-tax"/>  
    <error to="fail"/> 
  </action>  
  <action name="compute-tax"> 
    <map-reduce> 
      <!-- define compute-tax job --> 
    </map-reduce>  
    <ok to="end"/>  
    <error to="fail"/> 
  </action>  
  <kill name="fail"> 
    <message>Job failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> 
  </kill>  
  <end name="end"/> 
</workflow-app>

Every workflow must define a <start> and <end>.
This workflow has two action nodes: payroll-job and compute-tax.
Notice a workflow looks almost identical to a run method of a MapReduce job, except that the job properties are specified in XML.
The <delete> function is a convenient way to delete an existing output folder.

<?xml version="1.0" encoding="utf-8"?>

<workflow-app xmlns="uri:oozie:workflow:0.2" name="whitehouse-workflow">  
  <start to="transform_whitehouse_visitors"/>  
  <action name="transform_whitehouse_visitors"> 
    <pig> 
      <job-tracker>${resourceManager}</job-tracker>  
      <name-node>${nameNode}</name-node>  
      <prepare> 
        <delete path="wh_visits"/> 
      </prepare>  
      <script>whitehouse.pig</script> 
    </pig>  
    <ok to="end"/>  
    <error to="fail"/> 
  </action>  
  <kill name="fail"> 
    <message>Job failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> 
  </kill>  
  <end name="end"/> 
</workflow-app>
Hive Actions
<?xml version="1.0" encoding="utf-8"?>

<action name="find_congress_visits"> 
  <hive xmlns="uri:oozie:hive-action:0.5">  
    <job-tracker>${resourceManager}</job-tracker>  
    <name-node>${nameNode}</name-node>  
    <prepare> 
      <delete path="congress_visits"/> 
    </prepare>  
    <job-xml>hive-site.xml</job-xml>  
    <configuration> 
      <property> 
        <name>mapreduce.map.output.compress</name>  
        <value>true</value> 
      </property> 
    </configuration>  
    <script>congress.hive</script> 
  </hive>  
  <ok to="end"/>  
  <error to="fail"/> 
</action>
Submitting a Workflow Job

Oozie has a command line tool named oozie for submitting and executing workflows. The command looks like:

•# oozie job -config job.properties -run

Note that the workflow is typically deployed in HDFS, while job.properties, which contains the properties passed into the workflow, is typically kept on the local filesystem.

The command above does not specify which Oozie Workflow to execute. This isspecified by the oozie.wf.application.path property:

oozie.wf.application.path=hdfs://node:8020/path/to/app

Here is an example of a job.properties file:

  • oozie.wf.application.path=hdfs://node:8020/path/to/app
  • resourceManager=node:8050
  • #Hadoop fs.defaultFS
  • nameNode=hdfs://node:8020/
  • #Hadoop mapreduce.job.queuename
  • queueName=default
  • #Application-specific properties
  • taxCode=2012

Fork and Join Nodes

Oozie has fork and join nodes for controlling workflow. For example:

<fork name="forking">
	   <path start="firstparalleljob"/>
	   <path start="secondparalleljob"/>
</fork>
<action name="firstparallejob">
	   <map-reduce>
...
	   </map-reduce>
	   <ok to="joining"/>
	   <error to="kill"/>
</action>
<action name="secondparalleljob">
	   <map-reduce>
...
	   </map-reduce>
	   <ok to="joining"/>
	   <error to="kill"/>
</action>
<join name="joining" to="nextaction"/>

Defining an Oozie Coordinator Job

The Oozie Coordinator component enables recurring Oozie Workflows to be defined.These recurring jobs can be triggered by two types of events:

Time

Similar to a cron job

Data Availability

The job triggers when a specified directory is created


An Oozie Coordinator job consists of two files:

coordinator.xml

The definition of the Coordinator application

coordinator.properties

For defining the job’s properties


Schedule a Job Based on Time
Consider this example of a coordinator.xml file.
<?xml version="1.0" encoding="utf-8"?>

<coordinator-app xmlns="uri:oozie:coordinator:0.1" 
    name="tf-idf" frequency="1440" start="2013-01-01T00:00Z" end="2013-12-31T00:00Z" timezone="UTC">  
  <action> 
    <workflow> 
      <app-path>hdfs://node:8020/root/java/tfidf/workflow</app-path> 
    </workflow> 
  </action> 
</coordinator-app>

The coordinator.properties file contains the path to thecoordinator app:

oozie.coord.application.path=hdfs://node:8020/path/to/app
<?xml version="1.0" encoding="utf-8"?>

<coordinator-app xmlns="uri:oozie:coordinator:0.1" 
    name="file_check" 
    frequency="1440" start="2012-01-01T00:00Z" end="2015-12-31T00:00Z" timezone="UTC">  
  <datasets> 
    <dataset name="input1"> 
      <uri-template>hdfs://node:8020/job/result/</uri-template> 
    </dataset> 
  </datasets>  
  <action> 
    <workflow> 
      <app-path>hdfs://node:8020/myapp/</app-path> 
    </workflow> 
  </action> 
</coordinator-app>
•This Coordinator app is scheduled to run once a day.
•If the folder hdfs://node :8020 /job/result/ exists, the <action> executes, which in this example is an Oozie Workflow deployed in the hdfs://node:8020/myapp folder.
•The use-case scenario for this example posits that a separate MapReduce job executes once a day at an unspecified time. When that job runs, it deletes the hdfs://node :8020/job/result directory, and then creates a new one, which triggers the Coordinator to run.
•This Coordinator runs once a day, and if /job/result exists, then the /myapp workflow will execute.