1. 程式人生 > >Hadoop學習:HDFS和MapReduce

Hadoop學習:HDFS和MapReduce

記得曾經問過一個師兄一個問題:要學好一種程式語言怎麼做才好,怎樣才能像你一樣靈活運用?他跟我說:從頭學到尾是很不切實際的,要邊做專案邊學是最快的~今天才真正體會這句話,趁著做專案,也接觸了Hadoop,也對它有了初步的瞭解~

關於Hadoop

Hadoop是一個由Apache基金會所開發的分散式系統基礎架構。使用者可以在不瞭解分散式底層細節的情況下,開發分散式程式,充分利用叢集的威力進行高速運算和儲存。
Hadoop的框架最核心的設計就是:HDFS和MapReduceHDFS為海量的資料提供了儲存,相當於一個檔案中轉站的功能,而MapReduce為海量的資料提供了計算。但兩者只是理論基礎,不是具體可使用的高階應用,Hadoop旗下有很多經典子專案,比如HBase

Hive等,這些都是基於HDFS和MapReduce發展出來的。要想了解Hadoop,就必須知道HDFSMapReduce是什麼。

(1)HDFS

HDFS,英文為Hadoop Distributed File System,即分散式檔案系統,功能相當於一個網盤或者中轉站。
HDFS有高容錯性的特點,並且設計用來部署在低廉的硬體上,就算在由高失敗率的節點或網路組成的大叢集內執行的作業,Hadoop都可以讓作業成功完成。而且它提供高吞吐量(high throughput)來訪問應用程式的資料,適合那些有著超大資料集(large data set)的應用程式。HDFS放寬了POSIX的要求,可以以流的形式

訪問(streaming access)檔案系統中的資料。
PS:
(1)容錯
即是Fault Tolerance,確切地說是容故障(Fault),而並非容錯誤(Error)。Hadoop實現容錯的主要方法就是重新執行任務,具體的可以檢視此部落格:http://blog.csdn.net/magicdreaming/article/details/7616531,在此不過多贅述。
(2)吞吐量
指在單位時間內中央處理器(CPU)從儲存裝置讀取->處理->儲存資訊的量。
(3)流式資料訪問
HDFS應用需要流式訪問它們的資料集。其更多考慮到了資料批處理,而不是使用者互動處理。相比資料訪問的低延遲,HDFS應用要求能夠高速率、大批量地處理資料,極少有程式對單一的讀寫操作有嚴格的響應時間要求。POSIX標準設定的很多硬性約束對HDFS應用系統不是必需的。為了提高資料的吞吐量,在一些關鍵方面對POSIX的語義做了一些修改。
(4)POSIX

POSIX表示可移植作業系統介面(Portable Operating System Interface ,縮寫為 POSIX),POSIX標準定義了作業系統應該為應用程式提供的介面標準,是IEEE為要在各種UNIX作業系統上執行的軟體而定義的一系列API標準的總稱.

(2)MapReduce

此處輸入圖片的描述
按個人理解,如圖所示,MapReduce可以拆分成兩個單詞也是其兩個主要操作,Map的話即地圖,在這裡為對映的意思,表示從眾多的資料中匹配出所要的東西,而Reduce即減少,表示將匹配到的一個個資料進行統計整合成一個總的資料。
例如:
對大量資料中的性別進行匹配會得到:
女 1
女 1
女 1
…..
最終進行統計的結果就是一行:
女 1000
簡單的說,Map是把一組資料一對一的對映為另外的一組資料,其對映的規則由一個函式來指定,比如對[1,2,3,4]進行乘2的對映就變成了[2,4,6,8]。Reduce是對一組資料進行歸約,這個歸約的規則由一個函式指定,比如對[1,2,3,4]進行求和的歸約得到結果是10,而對它進行求積的歸約結果是24。

Hadoop的安裝使用

(1)安裝

那Hadoop既然這麼好,該如何去用呢?
在此提供Ubuntu下Hadoop的安裝部落格:
http://www.powerxing.com/install-hadoop/
其中詳細說明了Hadoop的整個安裝流程以及Hadoop的三種模式的配置方式:
(1)單機
(2)偽分佈
(3)叢集

(2)使用

安裝好了當然是要編寫程式來實現了,在此使用Java進行實現,有相應的部落格詳細說明:http://www.powerxing.com/hadoop-build-project-using-eclipse/
其中使用Eclipse編譯執行MapReduce程式,包括從安裝eclipse到編寫程式的整個流程。

(3)小tips

在我個人的安裝使用的過程中也出現一些小毛病,在這裡就跟大家分享一下遇到這些Problems時該如何去做呢?
(1)在賦予新使用者許可權的時候可能切換賬戶之後它依舊會說它沒有許可權,重新試了很多次還是沒用,此時建議刪除使用者再重複操作

sudo adduser hadoop sudo

(2)因為之前已經安裝了Eclipse,配置的是1.8的JDK,但按流程的話,在這一步,$JAVA_HOME/bin/java -version的輸出結果和java -version的輸出結果不一樣,這時不用怕,程式會進行最高預設。但在後面編寫程式時報錯,按照錯誤更改JDK的版本即可。
(3)在建立與 Hadoop 叢集的連線,建立 New Hadoop Location的時候會發現資料夾沒東西(不像圖所示),其實是關閉了 NameNode 和 DataNode 守護程序。這時輸入一下命令即可~

./sbin/start-dfs.sh

此處輸入圖片的描述

具體實現程式碼:

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
            String line = value.toString();
            line = line.replace("\\", "");
            String regex = "性別:</span><span class=\"pt_detail\">(.*?)</span>";
            Pattern pattern = Pattern.compile(regex);
            Matcher matcher = pattern.matcher(line);
            while(matcher.find()){
                String term = matcher.group(1);
                word.set(term);
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
            int sum = 0;
            for(IntWritable val :values){
                sum+= val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if(otherArgs.length != 2){
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }
        Job job = new Job(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true)?0:1);
    }


}

Hadoop的分享就先停一步,等待繼續攻克~

參考資料
(1)Hadoop的容錯性
(2)Hadoop簡介:HDFS和MapReduce的實現