1. 程式人生 > >map/reduce例項wordCount單詞計數實現功能

map/reduce例項wordCount單詞計數實現功能

hadoop
hadoop
hadoop
dajiangtai
dajiangtai
dajiangtai
hsg
qq.com
hello you
hello me  her

map/reduce處理功能

執行步驟:
 1. map任務處理
1.1 讀取輸入檔案內容,解析成key、value對。對輸入檔案的每一行,解析成key、value對。每一個鍵值對呼叫一次map函式。
1.2 寫自己的邏輯,對輸入的key、value處理,轉換成新的key、value輸出。
1.3 對輸出的key、value進行分割槽。
1.4 對不同分割槽的資料,按照key進行排序、分組。相同key的value放到一個集合中。
1.5
(可選)分組後的資料進行歸約。
2.reduce任務處理
2.1 對多個map任務的輸出,按照不同的分割槽,通過網路copy到不同的reduce節點。
2.2 對多個map任務的輸出進行合併、排序。寫reduce函式自己的邏輯,對輸入的key、values處理,轉換成新的key、value輸出。
2.3 把reduce的輸出儲存到檔案中。

1.3和1.4,1.5是hadoop自動幫我們做的,
我們做的就是上面寫的map函式的輸出邏輯1.2

map函式重寫功能

1.2 寫自己的邏輯,對輸入的key、value處理,轉換成新的key、value輸出。

//定義map
    //LongWritable, Text, Text, LongWritable  前兩個引數為輸入的map型別,後兩個引數為輸出的map型別
//如<0,hello you>,<10,hello me> ---> <hello,1><you,1><hello,1><me,1> public static class myMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ //定義一個k2,v2 Text k2 = new Text(); LongWritable v2 = new LongWritable(); //輸入map型別key,value值為<0,hello you><10,hello me>
//0或10為行起始位元組資料 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words= value.toString().split(" "); for (String word:words) { if(word.trim().isEmpty()==false) { Debug.println(word,"1"); //word表示第一行中的每個單詞,即k2 k2.set(word); //沒排序分組前每個單詞都是1個,由於是long型別所以加L v2.set(1L); //寫出 context.write(k2, v2); } } } }

reduce函式重寫功能

2.1和2.2功能由hadoop幫我們做了,我們只需要寫自己的邏輯reduce函式
2.2 對多個map任務的輸出進行合併、排序。寫reduce函式自己的邏輯,對輸入的key、values處理,轉換成新的key、value輸出。

//下面這個myReducer函式是輸出<k3,v3>的函式,邏輯要我們自己寫
public static class myReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
        LongWritable v3=new LongWritable();
        //k2,v2s引數形式為<hello,{1,1}><you,{1}>變為---><hello,2><you,1>
        @Override
        protected void reduce(Text k2,Iterable<LongWritable> v2s,
                     Reducer<Text,LongWritable,Text,LongWritable>.Context context) 
                     throws IOException,InterruptedException{
            long count=0L;
            for(LongWritable v2:v2s) {
                count +=v2.get();
            }
            v3.set(count);
            //k2就是k3,都是一個單詞
            context.write(k2,v3);
        }
    }
package wordcount;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class wordcount {    
    //定義map
    //LongWritable, Text, Text, LongWritable  前兩個引數為輸入的map型別,後兩個引數為輸出的map型別
    //如<0,hello you>,<10,hello me>  ---> <hello,1><you,1><hello,1><me,1>
    public static class myMapper    extends Mapper<LongWritable, Text, Text, LongWritable>{

        //定義一個k2,v2
        Text k2 = new Text();
        LongWritable v2 = new LongWritable();
        //輸入map型別key,value值為<0,hello you><10,hello me>
        //010為行起始位元組資料
        @Override       
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] words= value.toString().split(" ");
            for (String word:words) {           
                if(word.trim().isEmpty()==false)
                {
                     Debug.println(word,"1");
                     //word表示第一行中的每個單詞,即k2
                     k2.set(word);
                     //沒排序分組前每個單詞都是1個,由於是long型別所以加L
                     v2.set(1L);
                     //寫出
                     context.write(k2, v2);  
                }
             }   
        }
    }
    public static class myReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
        LongWritable v3=new LongWritable();
        //k2,v2s引數形式為<hello,{1,1}><you,{1}>變為---><hello,2><you,1>
        @Override
        protected void reduce(Text k2,Iterable<LongWritable> v2s,
                     Reducer<Text,LongWritable,Text,LongWritable>.Context context) 
                     throws IOException,InterruptedException{
            long count=0L;
            for(LongWritable v2:v2s) {
                count +=v2.get();
            }
            v3.set(count);
            //k2就是k3,都是一個單詞
            context.write(k2,v3);
        }
    }
    //刪除輸出目錄
    public static void deleteOutDir(Configuration conf,String out_dir)
               throws IOException,URISyntaxException{
        FileSystem fs=FileSystem.get(new URI(out_dir),conf);
        if(fs.exists(new Path(out_dir))==true)
        {
            fs.delete(new Path(out_dir),true);
        }
    }
    public static void main(String[] args) throws Exception
    {
        //載入hadoop conf 驅動
        Configuration conf=new Configuration(); 
        Job job=Job.getInstance(conf,wordcount.class.getSimpleName());
        job.setJarByClass(wordcount.class);
        Path in_path=new Path("hdfs://192.168.145.180:8020/user/root/input/djt.txt");
        FileInputFormat.setInputPaths(job, in_path);
        //通過TextInputFormat把讀到的資料處理成<k1,v1>形式
        job.setInputFormatClass(TextInputFormat.class);
        //job中加入Mapper,同時MyMapper類接受<k1,v1>作為引數傳給類中map函式進行資料處理
        job.setMapperClass(myMapper.class);
        //設定輸出的<k2,v2>的資料型別
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        //job中加入Reducer,Reducer自動接收處理好的map資料
        job.setReducerClass(myReducer.class);
        //設定輸出的<k3,v3>的資料型別
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        //設定輸出目錄檔案output2
        String OUT_DIR = "hdfs://192.168.145.180:8020/user/root/output2";
        FileOutputFormat.setOutputPath(job, new Path(OUT_DIR));
        job.setOutputFormatClass(TextOutputFormat.class);
        //如果這個檔案存在則刪除,如果檔案存在不刪除會報錯。
        deleteOutDir(conf, OUT_DIR);
        //把處理好的<k3,v3>的資料寫入檔案
        job.waitForCompletion(true);
    }
}

執行方法一:在eclipse開發環境中執行

wordcount.java編輯器中右鍵run as / run on hadoop執行OK

執行方法二:生成jar拷到hadoop伺服器上執行

生成jar方法:

Project Explorder專案工程樹型中選擇wordcoun.java右鍵Export/Java/Runnable Jar file
選擇項和輸入項
Launch configuration: wordcount-wordcount
export destination:D:\應用集合\eclipse\eclipse-workspace\bin_jar\wordcount.jar
Library handling:
可勾:Extract required libraries into generated JAR
點選Finish完成

把生成的jar拷到hadoop服務上執行
拷到/home/hadoop3/app/hadoop目錄中

執行:hadoop jar wordcount.jar

具體執行過程如下:

[[email protected] hadoop]$ hadoop jar wordcount.jar
18/09/03 19:22:38 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/09/03 19:22:41 INFO input.FileInputFormat: Total input paths to process : 1
18/09/03 19:22:42 INFO mapreduce.JobSubmitter: number of splits:1
18/09/03 19:22:42 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1535940670011_0002
18/09/03 19:22:42 INFO impl.YarnClientImpl: Submitted application application_1535940670011_0002
18/09/03 19:22:42 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1535940670011_0002/
18/09/03 19:22:42 INFO mapreduce.Job: Running job: job_1535940670011_0002
18/09/03 19:22:51 INFO mapreduce.Job: Job job_1535940670011_0002 running in uber mode : false
18/09/03 19:22:51 INFO mapreduce.Job:  map 0% reduce 0%
18/09/03 19:23:01 INFO mapreduce.Job:  map 100% reduce 0%
18/09/03 19:23:12 INFO mapreduce.Job:  map 100% reduce 100%
18/09/03 19:23:12 INFO mapreduce.Job: Job job_1535940670011_0002 completed successfully
18/09/03 19:23:12 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=235
        FILE: Number of bytes written=255343
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=205
        HDFS: Number of bytes written=65
        HDFS: Number of read operations=6
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters 
        Launched map tasks=1
        Launched reduce tasks=1
        Data-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=7766
        Total time spent by all reduces in occupied slots (ms)=6780
        Total time spent by all map tasks (ms)=7766
        Total time spent by all reduce tasks (ms)=6780
        Total vcore-milliseconds taken by all map tasks=7766
        Total vcore-milliseconds taken by all reduce tasks=6780
        Total megabyte-milliseconds taken by all map tasks=7952384
        Total megabyte-milliseconds taken by all reduce tasks=6942720
    Map-Reduce Framework
        Map input records=10
        Map output records=14
        Map output bytes=201
        Map output materialized bytes=235
        Input split bytes=116
        Combine input records=0
        Combine output records=0
        Reduce input groups=9
        Reduce shuffle bytes=235
        Reduce input records=14
        Reduce output records=9
        Spilled Records=28
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=169
        CPU time spent (ms)=2070
        Physical memory (bytes) snapshot=312471552
        Virtual memory (bytes) snapshot=4165705728
        Total committed heap usage (bytes)=152428544
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=89
    File Output Format Counters 
        Bytes Written=65
[[email protected] hadoop]$ 

表示執行成功OK
其執行結果為:
hdfs://192.168.145.180:8020/user/root/output2

dajiangtai  3
hadoop  3
hello   2
her 1
hsg 1
me  1
qq.com  1
you 1

如果執行報錯:RunJar jarFile [mainClass] args…
則你可能採用Export/Java/Jar file生成的方式,沒有指定mainclass導致的問題
改用Export/Java/Runnable Jar file方式就OK,只不過生成的jar會比較大。
–the–end—-

相關推薦

map/reduce例項wordCount單詞計數實現功能

hadoop hadoop hadoop dajiangtai dajiangtai dajiangtai hsg qq.com hello you hello me her map/reduce處理功能 執行步驟: 1. map任務處理

hadoop入門(六)JavaAPI+Mapreduce例項wordCount單詞計數詳解

剛剛研究了一下haoop官網單詞計數的例子,把詳細步驟解析貼在下面: 準備工作: 1、haoop叢集環境搭建完成 2、新建一個檔案hello,並寫入2行單詞,如下: [[email protected] hadoop-2.6.0]# vi hello hello

Scala +Spark+Hadoop+Zookeeper+IDEA實現WordCount單詞計數(簡單例項

                 IDEA+Scala +Spark實現wordCount單詞計數 一、新建一個Scala的object單例物件,修改pom檔案 (1)下面文章可以幫助參考安裝 IDEA 和 新建一個Scala程式。 (2)pom檔案 <?xml

Scala+Spark+Hadoop+IDEA實現WordCount單詞計數,上傳並執行任務(簡單例項-下)

                 Scala+Spark+Hadoop+IDEA上傳並執行任務 本文接續上一篇文章,已經在IDEA中執行Spark任務執行完畢,測試成功。 一、打包 1.1  將setMaster註釋掉 package day05 import

Hadoop學習:Map/Reduce初探與小Demo實現

pre 排序。 解決 想法 文本文 direction run page lang 一、 概念知識介紹 Hadoop MapReduce是一個用於處理海量數據的分布式計算框架。這個框架攻克了諸如數據分布式存儲、作業調度、容錯、機器間通信等復雜

Hadoop WordCount單詞計數原理

clas oop 圖片 tput 進行 打包 red div src 計算文件中出現每個單詞的頻數 輸入結果按照字母順序進行排序 編寫WordCount.java 包含Mapper類和Reducer類 編譯WordCount.java javac -classp

map-reducewordCount DEMO

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apa

用python的map/reduce函式實現int()功能,即字串轉換成數字

map()函式接收兩個引數,一個是函式,一個是可迭代物件,如列表,字串等,map將傳入的函式依次作用到序列(可迭代物件)的每個元素,並把結果作為新的Iterator(可迭代物件)返回。 再看reduc

WordCount結對作業——實現擴展功能

com 測試 tree 使用 .com 發揮 學習 發現 logs 合作者學號:201631081309 201631062307 作業鏈接 碼雲地址 一:實現的功能。 WordCount是一個簡單的小程序,實現了一下幾項基本功能。 - 統計字符數 - 統計單詞數 - 統計

Map Reduce用tree Map實現·topn

 首先有如下如數,要統計每個頁面的訪問量,然後計算訪問量最大的五個頁面 2017/07/28 qq.com/a 2017/07/28 qq.com/bx 2017/07/28 qq.com/by 2017/07/28 qq.com/by3 2017/07/28 qq.com/news

WordCount結對作業——實現擴充套件功能

合作者學號:201631081309 201631062307 作業連結 碼雲地址 一:實現的功能。 WordCount是一個簡單的小程式,實現了一下幾項基本功能。 - 統計字元數 - 統計單詞數 - 統計文字行 以及擴充套件功能 - 統計註釋行、程式碼行、以及空行 - 遞迴處理目錄下符合條件的檔案 -

大資料處理神器map-reduce實現(僅python和shell版本)

熟悉java的人直接可以使用java實現map-reduce過程,而像我這種不熟悉java的怎麼辦?為了讓非java程式設計師方便處理資料,我把使用python,shell實現streaming的過程,也即為map-reduce過程,整理如下: 1.如果資料不在hive裡面,而在

Hadoop之MapReduce過程,單詞計數WordCount

單詞計數是最簡單也是最能體現MapReduce思想的程式之一,可以稱為MapReduce版“Hello World”,該程式的完整程式碼可以在Hadoop安裝包的src/example目錄下找到。單詞計數主要完成的功能:統計一系列文字檔案中每個單詞出現的次數,如下圖所示。 WordCo

18 | 散列表(上):Word文件中的單詞拼寫檢查功能是如何實現的?

Word 這種文字編輯器你平時應該經常用吧,那你有沒有留意過它的拼寫檢查功能呢?一旦我們在 Word 裡輸入一個錯誤的英文單詞,它就會用標紅的方式提示“拼寫錯誤”。Word 的這個單詞拼寫檢查功能,雖然很小但卻非常實用。你有沒有想過,這個功能是如何實現的呢? 其

大矩陣乘法運算map reduce實現思路

實現思路: 儲存: 大矩陣很多都是稀疏矩陣,並且有可能有上百萬的行和上百萬的列。 那麼矩陣可以存在類似HBase面向列的分散式資料庫中。 假設HTable中有兩個表A和表B分別儲存兩個巨型矩陣a和b。表A和表B都是隻有一個列族。列名都是1開始計數。 那麼表A和表B所儲存的矩

hadoop學習(七)WordCount+Block+Split+Shuffle+Map+Reduce技術詳解

       1、在map task執行時,它的輸入資料來源於HDFS的block,當然在MapReduce概念中,map task只讀取split。Split與block的對應關係在上面我們已經說的很明白了。在WordCount例子裡,假設map的輸入資料都是像 “aaa”這樣的字串。         2、

Hadoop分佈環境搭建步驟,及自帶MapReduce單詞計數程式實現

參考騰訊雲實驗室 Hadoop分佈環境搭建步驟: 1.軟硬體環境 CentOS 7.2 64 位 JDK- 1.8 Hadoo p- 2.7.4 2.安裝SSH sudo yum install openssh-clients openssh-ser

Python遞迴實現多層巢狀dict遍歷例項--三級選單/多級選單功能

要求:構建一個三(N)級選單,實現使用者可以根據指示選擇進入選單,退出選單,返回上層選單功能。知識點:dict資料的遍歷方法,遞迴遍歷方法。環境:Python3.6實現程式碼:#!/usr/bin/evn python # -*-coding:utf8 -*- class m

scala 兩種方法實現單詞計數

val lines = List("hello world", "hello spark") val wordlist = lines.flatMap(line => line.split(" ")).map(word => (word, 1))

Linux 單詞計數 WordCount 以及程式碼案例

WordCount 首先是命令列的:             WordCount(單詞計數) 1:啟動hadoop 使用 start-all.sh 命令啟動hdfs 2:在hadoop的安裝目錄下新建一個目錄,使用hdfs的shell命令 cd /usr/local