1. 程式人生 > >hadoop 超詳細入門wordcount

hadoop 超詳細入門wordcount

概述

今天部落格收到了第一條評論,感覺很贊哦,最近一直在學習hadoop,主要是結合《實戰Hadop:開啟通向雲端計算的捷徑(劉鵬)》,然後apache官網的doc(還是要以官網為主,雖然是全英文的,但總比那些版本都不對的部落格來開得多得多),自己嘗試了一下hadoop的hello world之後,有繼續嘗試了使用docker來模擬多機的叢集分散式環境,最後再返回來看hadoop的核心框架hdfs和mapreduce的設計原理時方才恍然大悟,這個東西很厲害!本人在下部落格中學習到了很多,本章描述也會借用部落格中的一些描述,註明來源如下。(如有不妥,請告知刪除)。
http://blog.csdn.net/zhongwen7710/article/details/39376247

目標

這次部落格時,感覺對hadoop的框架已經有了一個基本的認識,大體的知道了如何去操作以及一些基本原理,本文嘗試記錄下此過程,不僅作為自己的記錄同時也與大家分享。
1.安裝並完成單機上的hadoop hello world(wordCount被認為是其上的hello world程式)。
2.從資料處理流程和任務排程流程兩個方面解析wordcount發生了啥。

hello world hadoop

需要軟體安ssh java(本人使用自帶openjdk-8)
jdk
ssh

下載解壓hadoop
wget http://www-us.apache.org/dist/hadoop/common/hadoop-2.7
.3/hadoop-2.7.43tar.gz tar xzfv hadoop-2.7.3.tar.gz 在etc/hadoop/hadoop-env.sh中配置java_home update-alternatives --config

圖1圖1

  # set to the root of your Java installation
  export JAVA_HOME=/usr/java/latest(這裡設定你自己的Java位置)

設定完成後應該可以使用Hadoop了
bin/hadoop 

按照官網的指令,執行例項程式碼,可以看到輸出結果
$ mkdir input
  $ cp etc/hadoop/*.xml input
$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar grep input output 'dfs[a-z.]+' $ cat output/* 此處單節點的任務就完成了,這個時候僅僅沒有啥排程就是一個節點完成的。下面接著看偽分佈。這裡我覺得應該是沒有涉及到HDFS的,因為都還沒有開啟hdfs嘛,這裡都是在本地檔案系統上操作的,充分說明這個mapreduce可以跑一般檔案系統上!(廢話)

Pseudo-Distributed Operation

Hadoop can also be run on a single-node in a pseudo-distributed mode where each Hadoop daemon runs in a separate Java process

首先需要配置幾個核心檔案
etc/hadoop/core-site.xml:

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>
</configuration>

etc/hadoop/hdfs-site.xml:

<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>

然後檢查一下ssh,一般的ubuntu自帶了ssh,可以使用ps -e|grep ssh來檢查是否開啟,service ssh start是開啟命令。

  $ ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
  $ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
  $ chmod 0600 ~/.ssh/authorized_keys

然後你就可以開始hdfs分散式檔案系統之旅咯,官網上直接使用/tmp作為預設namenode目錄,個人建議儲存到自己的目錄下,不然每次關機上次的東西都沒了。操作如下

在etc/hadoop/core-sites.xml里加入
    <property>
        <name>hadoop.tmp.dir</name>
        <value>這裡隨意把/namenode/hadoop_${user.name}</value>
    </property>
格式化作業系統
bin/hdfs namenode -format

sbin/start-dfs.sh

你就可以在 http://localhost:50070/裡面看到namenode的情況了

然後我們接著在分散式檔案系統裡面跑上述的例子,以下大家可以看到很多熟悉的操作,dfs 後面的操作都是hdfs下的檔案相關操作選項,具體可以參看hdfs更多的教程,此處不多述。
 $ bin/hdfs dfs -mkdir /user
  $ bin/hdfs dfs -mkdir /user/<username>
  $ bin/hdfs dfs -put etc/hadoop input
    $ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar grep input output 'dfs[a-z.]+'
     $ bin/hdfs dfs -get output output
  $ cat output/*
   $ bin/hdfs dfs -cat output/*
   $ sbin/stop-dfs.sh

好了,如果大家是第一次接觸,那麼到了這裡應該差不多就可以有個大致的感覺了,其實我特別不喜歡寫這個,沒有什麼思想,全是抄的官網文件,但是對於特別新手來說,可以參看一下,那麼我們接著來講一下真正的hello wolrd,到這個地方我們還沒有寫程式碼,也談不上啥框架理解,就是先跑起來再說。接著講wordcount之前,我說一下我的感受,為什麼會有hadoop這種東西呢,因為資料量大嘛,資料大了之後很自然的一個想法就是把資料分散式處理。這樣就有了hdfs,之後那我分散式了之後怎麼計算呢,我資料存在不同的地方,我算的時候把資料移動肯定是會浪費很大的頻寬的,mapreduce+hdfs實現了一個很妙的思路,就是說我不移動資料,我移動計算,這聽起來很不錯,計算本地化,那麼我們就可以不錯的解決大資料問題了。另外一個事情,mapreduce我現在的感覺並不是一個啥都能看的事情,海量資料的處理過程也可能是單一的,比方我們接下來要講的計算文章單詞的個數,或者一些常規的操作,我們利用map-reduce的程式設計思路試著去解決這些問題。好了,說多了大神也不看,小白也看不懂了,我們繼續往下吧。

hello world maprecude

本來先寫了一段介紹的,但是還是先上程式碼吧。大家把這段程式碼無腦跑通之後,我會給大家解釋一些東西的,相信你能明白。

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
需要設定一些東西
export JAVA_HOME=/usr/java/default
export PATH=${JAVA_HOME}/bin:${PATH}
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar

$ bin/hadoop com.sun.tools.javac.Main WordCount.java
$ jar cf wc.jar WordCount*.class
然後打包jar

根據上面的檔案操作保證你的檔案格式如下
$ bin/hadoop fs -ls /user/joe/wordcount/input/ /user/joe/wordcount/input/file01 /user/joe/wordcount/input/file02
這裡會有一些輸出提示xxxxx

$ bin/hadoop fs -cat /user/joe/wordcount/input/file01
Hello World Bye World

$ bin/hadoop fs -cat /user/joe/wordcount/input/file02
Hello Hadoop Goodbye Hadoop

輸入下程式碼前記得保證output為空,output不為空的話,應該會報錯。
$ bin/hadoop jar wc.jar WordCount /user/joe/wordcount/input /user/joe/wordcount/output

檢視結果
$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000`
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2`

恭喜你已經跑出來一個hello world了,你自己抄了java程式碼,然後運用了偽分散式檔案系統,修修補補之後你就可以完成新的功能了,在此之前,我具體介紹下上述java程式碼,這出強烈推薦《實戰Hadop:開啟通向雲端計算的捷徑(劉鵬)》該書第3章對這裡的程式碼做了充分的解讀。

wordcount資料處理過程

這裡我們主要介紹wordcount過程mapreduce處理裡資料都是啥樣子的。我這裡大概解釋一下,mapreduce的過程以鍵值對的方式傳遞,然後我們主要是對mapreduce函式進行重寫,同樣也是要滿足鍵值對的格式。這裡map就是以(word,1)的形式展示,然後reduce就是相加統計的過程,思路非常簡單。

這裡介紹了mapreduce的資料處理流程。
這裡寫圖片描述圖2

下圖是mapreduce過程的整體概括,map從input到split輸入,經過處理後(可能包括sort combine過程)輸出中間結果,這些資料會被儲存到磁碟而非hdfs中,之後被系統排程分配到各個reduce去繼續處理。
這裡寫圖片描述圖3
下面是更詳細的一個過程圖,與上圖思想一樣。
這裡寫圖片描述

下面我們看看我們wordcount中每個地方的資料都是什麼樣子的吧。
這裡寫圖片描述圖4

這裡寫圖片描述圖5
好了,這個地方留一點時間大家自己去看那個書裡面的java程式碼詳解,官網也有,不過沒有那麼詳細,書中介紹了java幾個大類的用處,介紹了job.setxxxx系列的初始預設條件,這些都是需要掌握的。

小結:上述圖中mapreduce的過程,此處只有一個reduce(預設),並且沒有涉及到排程過程,沒有看到hdfs的參與,只知道有個地方input輸入,然後會被切分成split,這裡就表現出我寫部落格的必要了,介紹一些常識split從input來,split不會跨檔案生成,比如input資料夾下面有file01,file02,那麼就會有2個以上的split,然後hdfs中檔案以block的形式儲存,每個大檔案(128M以上,可以設定大小)會被切分成block,,每個split包括大於等於1的整數個blocks。然後於此同時split決定了map的個數,有多少。
而reduce的個數是可以設定的。

wordcount整個排程過程

上面主要是講資料處理的過程,這裡講講排程過程。現在我假設你們知道什麼是jobTracker TaskTracker(這是mapreduce的主要排程部分)。第一張圖描述了mapreduce如何在hdfs的幫助下完成計算的轉移而不是資料的轉移。圖6中,我們開啟mapreduce程式的時候,mapreduce的主節點負責提交一個叫做job的東西,我們程式碼中設定了很多job的內容,包括用了什麼map什麼reduce以及其他配置,然後把這個jar包傳到hdfs中,這裡是非常厲害的,他把程式碼轉移就避免了資料的移動,這樣可以做到計算本地化,減小頻寬浪費。wordcount input中有2個檔案file01 file02就會劃分成2個split,完成了圖中第6步,系統根據這個劃分來分配任務,如果此時有3臺電腦,1個master兩個slaves(通常datanode也不和master在一起,這裡不太確定這個說法對不),那麼如果file01 file02的block分別在slave1 slave2上(這裡我們是無從得知的,因為hdfs是一個分散式作業系統,對外來說看起來是一個整體,我也不知道他放哪裡,但是他內部是有均衡操作的我認為,大神可以指教),那麼他可以把2個map任務分配到2個slave節點上。這裡需要強調的是:map任務不是隨隨便便地分配給某個TaskTracker的,這裡有個概念叫:資料本地化(Data-Local)。意思是:將map任務分配給含有該map處理的資料塊的TaskTracker上,同時將程式JAR包複製到該TaskTracker上來執行,這叫“運算移動,資料不移動”。而分配reduce任務時並不考慮資料本地化。

這裡寫圖片描述
圖6

第二張圖表示了計算任務的分佈,一般叢集中1個namenode n個datanode,上述描述針對資料儲存,namenode只負責儲存元資料,datanode儲存實際資料且負責與client互動(我們執行bin/hadoop就是開啟 了一個client)。jobTracker和namenode對應,taskTracker和datanode對應。
這裡寫圖片描述圖7