HDPCD-Java-複習筆記(20)
Orchestration of MapReduce jobs can be accomplished in several ways, including:
Linear Chain of MapReduce Jobs |
Use the return value of waitForCompletion to determine the success of each job in the chain |
JobControl |
A class in the MapReduce API that coordinates multiple jobs |
Apache Oozie |
A workflow scheduler system for managing jobs and also for scheduling recurring jobs |
The linear chain workflow is straightforward. Start a job in a run method of a Tool instance, wait for the job to complete, and then start subsequent jobs in the same run method. Continue until all jobs are complete.
For example:Job job1 = Job.getInstance(getConf(), "CreateBloomFilter"); boolean job1success = job1.waitForCompletion(true); if(!job1success) { System.out.println("The CreateBloomFilter job failed!"); return -1; } Job job2 = Job.getInstance(conf, "FilterStocksJob"); boolean job2success = job2.waitForCompletion(true); if(!job2success) { System.out.println("The FilterStocksJob failed!"); return -1; } return 1;
•A JobControl manages ControlledJob instances.JobControl control = new JobControl("bloom"); ControlledJob cjob1 = new ControlledJob(job1, null); List<ControlledJob> dependentJobs = new ArrayList<ControlledJob>(); dependentJobs.add(cjob1); ControlledJob cjob2 = new ControlledJob(job2, dependentJobs); control.addJob(cjob1); control.addJob(cjob2); new Thread(control).start(); while (!control.allFinished()) { Thread.sleep(1000); } control.stop();
•The addJob method is used to add a ControlledJob to a JobControl.
•A ControlledJob has two main components: a Job, and a List of that Job’s dependencies.
•A Job will not be started until all its dependencies are in the SUCCESS state.
•The JobControl class implements Runnable. To run the jobs in the JobControl instance, wrap it in a Thread and then start the Thread.
Oozie has two main capabilities:
- /appdir/workflow.xml
- /appdir/config-default.xml
- /appdir/lib/files.jar
Consider this example:
<?xml version="1.0" encoding="utf-8"?>
<workflow-app xmlns="uri:oozie:workflow:0.2" name="payroll-workflow">
<start to="payroll-job"/>
<action name="payroll-job">
<map-reduce>
<job-tracker>${resourceManager}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/user/${wf:user()}/payroll/result"/>
</prepare>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>mapred.mapper.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.reducer.new-api</name>
<value>true</value>
</property>
<property>
<name>mapreduce.job.map.class</name>
<value>payroll.PayrollMapper</value>
</property>
<property>
<name>mapreduce.job.reduce.class</name>
<value>payroll.PayrollReducer</value>
</property>
<property>
<name>mapreduce.job.inputformat.class</name>
<value>org.apache.hadoop.mapreduce.lib.input.TextInputFormat</value>
</property>
<property>
<name>mapreduce.job.outputformat.class</name>
<value>org.apache.hadoop.mapreduce.lib.output.NullOutputFormat</value>
</property>
<property>
<name>mapreduce.job.output.key.class</name>
<value>payroll.EmployeeKey</value>
</property>
<property>
<name>mapreduce.job.output.value.class</name>
<value>payroll.Employee</value>
</property>
<property>
<name>mapreduce.job.reduces</name>
<value>20</value>
</property>
<property>
<name>mapreduce.input.fileinputformat.inputdir</name>
<value>${nameNode}/user/${wf:user()}/payroll/input</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.outputdir</name>
<value>${nameNode}/user/${wf:user()}/payroll/result</value>
</property>
<property>
<name>taxCode</name>
<value>${taxCode}</value>
</property>
</configuration>
</map-reduce>
<ok to="compute-tax"/>
<error to="fail"/>
</action>
<action name="compute-tax">
<map-reduce>
<!-- define compute-tax job -->
</map-reduce>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Job failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>
•Every workflow must define a <start> and <end>.
•This workflow has two action nodes: payroll-job and compute-tax.
•Notice a workflow looks almost identical to a run method of a MapReduce job, except that the job properties are specified in XML.
•The <delete> function is a convenient way to delete an existing output folder.
<?xml version="1.0" encoding="utf-8"?>
<workflow-app xmlns="uri:oozie:workflow:0.2" name="whitehouse-workflow">
<start to="transform_whitehouse_visitors"/>
<action name="transform_whitehouse_visitors">
<pig>
<job-tracker>${resourceManager}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="wh_visits"/>
</prepare>
<script>whitehouse.pig</script>
</pig>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Job failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>
Hive Actions
<?xml version="1.0" encoding="utf-8"?>
<action name="find_congress_visits">
<hive xmlns="uri:oozie:hive-action:0.5">
<job-tracker>${resourceManager}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="congress_visits"/>
</prepare>
<job-xml>hive-site.xml</job-xml>
<configuration>
<property>
<name>mapreduce.map.output.compress</name>
<value>true</value>
</property>
</configuration>
<script>congress.hive</script>
</hive>
<ok to="end"/>
<error to="fail"/>
</action>
Submitting a Workflow Job
Oozie has a command line tool named oozie for submitting and executing workflows. The command looks like:
Note that the workflow is typically deployed in HDFS, while job.properties, which contains the properties passed into the workflow, is typically kept on the local filesystem.
The command above does not specify which Oozie Workflow to execute. This isspecified by the oozie.wf.application.path property:
Here is an example of a job.properties file:
- oozie.wf.application.path=hdfs://node:8020/path/to/app
- resourceManager=node:8050
- #Hadoop fs.defaultFS
- nameNode=hdfs://node:8020/
- #Hadoop mapreduce.job.queuename
- queueName=default
- #Application-specific properties
- taxCode=2012
Fork and Join Nodes
Oozie has fork and join nodes for controlling workflow. For example:
<fork name="forking">
<path start="firstparalleljob"/>
<path start="secondparalleljob"/>
</fork>
<action name="firstparallejob">
<map-reduce>
...
</map-reduce>
<ok to="joining"/>
<error to="kill"/>
</action>
<action name="secondparalleljob">
<map-reduce>
...
</map-reduce>
<ok to="joining"/>
<error to="kill"/>
</action>
<join name="joining" to="nextaction"/>
Defining an Oozie Coordinator Job
The Oozie Coordinator component enables recurring Oozie Workflows to be defined.These recurring jobs can be triggered by two types of events:
Time |
Similar to a cron job |
Data Availability |
The job triggers when a specified directory is created |
An Oozie Coordinator job consists of two files:
coordinator.xml |
The definition of the Coordinator application |
coordinator.properties |
For defining the job’s properties |
Schedule a Job Based on Time
Consider this example of a coordinator.xml file.
<?xml version="1.0" encoding="utf-8"?>
<coordinator-app xmlns="uri:oozie:coordinator:0.1"
name="tf-idf" frequency="1440" start="2013-01-01T00:00Z" end="2013-12-31T00:00Z" timezone="UTC">
<action>
<workflow>
<app-path>hdfs://node:8020/root/java/tfidf/workflow</app-path>
</workflow>
</action>
</coordinator-app>
The coordinator.properties file contains the path to thecoordinator app:
oozie.coord.application.path=hdfs://node:8020/path/to/app<?xml version="1.0" encoding="utf-8"?>
<coordinator-app xmlns="uri:oozie:coordinator:0.1"
name="file_check"
frequency="1440" start="2012-01-01T00:00Z" end="2015-12-31T00:00Z" timezone="UTC">
<datasets>
<dataset name="input1">
<uri-template>hdfs://node:8020/job/result/</uri-template>
</dataset>
</datasets>
<action>
<workflow>
<app-path>hdfs://node:8020/myapp/</app-path>
</workflow>
</action>
</coordinator-app>
•This Coordinator app is scheduled to run once a day.•If the folder hdfs://node :8020 /job/result/ exists, the <action> executes, which in this example is an Oozie Workflow deployed in the hdfs://node:8020/myapp folder.
•The use-case scenario for this example posits that a separate MapReduce job executes once a day at an unspecified time. When that job runs, it deletes the hdfs://node :8020/job/result directory, and then creates a new one, which triggers the Coordinator to run.
•This Coordinator runs once a day, and if /job/result exists, then the /myapp workflow will execute.