1. 程式人生 > >Intellij IDEA遠端向hadoop叢集提交mapreduce作業

Intellij IDEA遠端向hadoop叢集提交mapreduce作業

說明

安裝配置

1、首先將叢集上的hadoop環境下載到本地,本文是下載到“E:\javaws”。
2、在本地配置環境變數

HADOOP_HOME=D:\yangjm\Code\study\hadoop\hadoop-2.6.0
HADOOP_BIN_PATH=%HADOOP_HOME%\bin
HADOOP_PREFIX=D:\yangjm\Code\study\hadoop\hadoop-2.6.0
另外,PATH變數在最後追加;%HADOOP_HOME%\bin

3、去網上下載對應hadoop版本的hadoop.dll、winutils.exe,分別放到目錄“C:\Windows\System32”和“$HADOOP_HOME\bin”下。
說明:hadoop.dll主要是防止外掛報各種莫名錯誤,比如空物件引用,我本來以為intellij idea不需要安裝,結果被空指標錯誤拖了很久。
4、修改本地的hosts檔案。在目錄“C:\Windows\System32\drivers\etc”下。
新增叢集的ip地址和hostname的對應關係。hosts的具體作用這裡不再細說。
在該檔案中新增:

192.168.137.2 master
192.168.137.3 slave1
192.168.137.4 slave2

5、在intellij idea下新建maven專案
我最終的專案結構圖如下所示,注意是最終
這裡寫圖片描述
其中,gkd.xgs.yxm是我的package,裡面是WordCount.java程式。
resources是我新建的資料夾,裡面需要存放hadoop叢集中配置檔案core-site.xml、mapred-site.xml、yarn-site.xml,此外,將log4j.properties檔案也放在下面。
resources目錄結構如下圖:
這裡寫圖片描述
其中core-site.xml、mapred-site.xml、yarn-site.xml我都是直接複製的叢集中的檔案未做修改,log4j.properties配置複製的別人的也未做修改:
core-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Put site-specific property overrides in this file. -->

<configuration>
<property>
    <name>fs.defaultFS</name>
    <value>hdfs://master:9000</value>
</property> <property> <name>hadoop.tmp.dir</name> <value>file:/usr/local/hadoop-2.6.0/tmp</value> <description>Abase for other temporary directories.</description> </property> </configuration>

mapred-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
<property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
</property>
<property>
    <name>mapreduce.jobhistory.address</name>
    <value>master:10020</value>
</property>
<property>
    <name>mapreduce.jobhistory.webapp.address</name>
    <value>master:19888</value>
</property>
    <property>
        <name>mapred.remote.os</name>
        <value>Linux</value>
        <description>Remote MapReduce framework's OS, can be either Linux or Windows</description>
    </property>

</configuration>

yarn-site.xml

<?xml version="1.0"?>
<configuration>

<!-- Site specific YARN configuration properties -->
<property>
    <name>yarn.resourcemanager.hostname</name>
    <value>master</value>
</property>
<property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
</property>
</configuration>

log4j.properties

log4j.rootLogger=INFO, stdout
#log4j.logger.org.springframework=INFO
#log4j.logger.org.apache.activemq=INFO
#log4j.logger.org.apache.activemq.spring=WARN
#log4j.logger.org.apache.activemq.store.journal=INFO
#log4j.logger.org.activeio.journal=INFO


log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} | %-5.5p | %-16.16t | %-32.32c{1} | %-32.32C %4L | %m%n
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>gkd.xgs.yxm</groupId>
  <artifactId>WordC</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>WordC</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
  <repositories>
    <repository>
      <id>alimaven</id>
      <name>aliyun maven</name>
      <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
      <releases>
        <enabled>true</enabled>
      </releases>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </repository>
    <repository>
      <id>oschina</id>
      <name>oschina maven</name>
      <url>http://maven.oschina.net/content/groups/public/</url>
    </repository>
    <repository>
      <id>central</id>
      <name>central maven</name>
      <url>http://repo1.maven.org/maven2/</url>
    </repository>
    <repository>
      <id>Akka repository</id>
      <url>http://repo.akka.io/releases</url>
    </repository>
    <repository>
      <id>hadoop-pcap</id>
      <url>http://dl.bintray.com/hadoop-pcap/hadoop-pcap</url>
    </repository>
  </repositories>
  <dependencies>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>2.6.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>2.6.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>2.6.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
      <version>2.6.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-common</artifactId>
      <version>2.6.0</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>2.5.5</version>
        <configuration>
          <archive>
            <manifest>
              <addClasspath>true</addClasspath>
              <classpathPrefix>lib/</classpathPrefix>

              <mainClass>gkd.xgs.yxm.WordCount</mainClass>

            </manifest>
          </archive>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

說明:需要修改其中的 mainClass中的值,該值是用來指明入口類,格式為:包名.類名,其他的可以複製貼上。
7、新增library(感覺非必須,因為mvn中已經在pom.xml中添加了hadoop相關的依賴)
將本地hadoop2.6.0下的share目錄下的所有資料夾新增進來,具體步驟可以參考:http://www.cnblogs.com/yjmyzz/p/how-to-remote-debug-hadoop-with-eclipse-and-intellij-idea.html
8、設定執行環境
這裡寫圖片描述
說明:
(1)mainclass跟在pom.xml中設定的一樣,格式都是:包名.類名
(2)program arguments:檢視WordCount.java程式可知,執行程式時需要兩個引數,一個是輸入檔案,一個是結果輸出路徑。
我的第一個引數是:hdfs://master:9000/user/yxm/input/yxm.log
第二個引數是hdfs://master:9000/user/yxm/output
其中第一個路徑及檔案必須是已經存在的,第二個引數中的output目錄是不存在的。
(3)Working directory:工作路徑,選擇你本地安裝的hadoop2.6.0的路徑,比如我這裡是:E:\javaws\hadoop-2.6.0。
9、修改WordCount.java程式
新增以下程式碼

System.setProperty("hadoop.home.dir", "E:\\javaws\\hadoop-2.6.0");//如果沒有這句話,會報找不到winutils.exe的錯誤,也不知道是不是由於我修改了環境變數之後沒有重啟的原因。
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://master:9000");
        conf.set("mapreduce.app-submission.cross-platform", "true");//意思是跨平臺提交,在windows下如果沒有這句程式碼會報錯 "/bin/bash: line 0: fg: no job control",去網上搜答案很多都說是linux和windows環境不同導致的一般都是修改YarnRunner.java,但是其實添加了這行程式碼就可以了。
        conf.set("mapreduce.framework.name", "yarn");//叢集的方式執行,非本地執行。

10、打包此程式
直接使用mvn package打包,在target程式中會生成包含依賴可以直接執行的程式:WordC-1.0-SNAPSHOT-jar-with-dependencies.jar,我將此jar檔案複製到了工程根目錄下。
此時在WordCount.java程式中新增一行程式碼

conf.set("mapred.jar","E:\\javaws\\WordC\\WordC-1.0-SNAPSHOT-jar-with-dependencies.jar");

此時點選執行,可以成功。
這裡寫圖片描述
說明:
(1)如果不打包新增到程式中還是會報錯:No job jar file set. User classes may not be found. See Job or Job#setJar(String)。
(2)查詢別人的配置教程,一般就是將jar檔案直接放在根目錄下,配置時不寫絕對路徑,程式碼為:

conf.set("mapred.jar","WordC-1.0-SNAPSHOT-jar-with-dependencies.jar");

我這邊執行的時候會報錯找不到該jar檔案。
11、最終WordCount.java的程式碼

package gkd.xgs.yxm;

import java.io.IOException;
import java.util.StringTokenizer;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;


public class WordCount {


    public static class TokenizerMapper
            extends Mapper<Object, Text, Text, IntWritable> {


        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();


        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }


    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();


        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }


    public static void main(String[] args) throws Exception {
        System.setProperty("hadoop.home.dir", "E:\\javaws\\hadoop-2.6.0");
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://master:9000");
        conf.set("mapreduce.app-submission.cross-platform", "true");
        conf.set("mapreduce.framework.name", "yarn");
        conf.set("mapred.jar","E:\\javaws\\WordC\\WordC-1.0-SNAPSHOT-jar-with-dependencies.jar");
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length < 2) {
            System.err.println("Usage: wordcount <in> [<in>...] <out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        for (int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job,
                new Path(otherArgs[otherArgs.length - 1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}