map/reduce例項wordCount單詞計數實現功能
hadoop
hadoop
hadoop
dajiangtai
dajiangtai
dajiangtai
hsg
qq.com
hello you
hello me her
map/reduce處理功能
執行步驟:
1. map任務處理
1.1 讀取輸入檔案內容,解析成key、value對。對輸入檔案的每一行,解析成key、value對。每一個鍵值對呼叫一次map函式。
1.2 寫自己的邏輯,對輸入的key、value處理,轉換成新的key、value輸出。
1.3 對輸出的key、value進行分割槽。
1.4 對不同分割槽的資料,按照key進行排序、分組。相同key的value放到一個集合中。
1.5 (可選)分組後的資料進行歸約。
2.reduce任務處理
2.1 對多個map任務的輸出,按照不同的分割槽,通過網路copy到不同的reduce節點。
2.2 對多個map任務的輸出進行合併、排序。寫reduce函式自己的邏輯,對輸入的key、values處理,轉換成新的key、value輸出。
2.3 把reduce的輸出儲存到檔案中。
1.3和1.4,1.5是hadoop自動幫我們做的,
我們做的就是上面寫的map函式的輸出邏輯1.2
map函式重寫功能
1.2 寫自己的邏輯,對輸入的key、value處理,轉換成新的key、value輸出。
//定義map
//LongWritable, Text, Text, LongWritable 前兩個引數為輸入的map型別,後兩個引數為輸出的map型別
//如<0,hello you>,<10,hello me> ---> <hello,1><you,1><hello,1><me,1>
public static class myMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
//定義一個k2,v2
Text k2 = new Text();
LongWritable v2 = new LongWritable();
//輸入map型別key,value值為<0,hello you><10,hello me>
//0或10為行起始位元組資料
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words= value.toString().split(" ");
for (String word:words) {
if(word.trim().isEmpty()==false)
{
Debug.println(word,"1");
//word表示第一行中的每個單詞,即k2
k2.set(word);
//沒排序分組前每個單詞都是1個,由於是long型別所以加L
v2.set(1L);
//寫出
context.write(k2, v2);
}
}
}
}
reduce函式重寫功能
2.1和2.2功能由hadoop幫我們做了,我們只需要寫自己的邏輯reduce函式
2.2 對多個map任務的輸出進行合併、排序。寫reduce函式自己的邏輯,對輸入的key、values處理,轉換成新的key、value輸出。
//下面這個myReducer函式是輸出<k3,v3>的函式,邏輯要我們自己寫
public static class myReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
LongWritable v3=new LongWritable();
//k2,v2s引數形式為<hello,{1,1}><you,{1}>變為---><hello,2><you,1>
@Override
protected void reduce(Text k2,Iterable<LongWritable> v2s,
Reducer<Text,LongWritable,Text,LongWritable>.Context context)
throws IOException,InterruptedException{
long count=0L;
for(LongWritable v2:v2s) {
count +=v2.get();
}
v3.set(count);
//k2就是k3,都是一個單詞
context.write(k2,v3);
}
}
package wordcount;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class wordcount {
//定義map
//LongWritable, Text, Text, LongWritable 前兩個引數為輸入的map型別,後兩個引數為輸出的map型別
//如<0,hello you>,<10,hello me> ---> <hello,1><you,1><hello,1><me,1>
public static class myMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
//定義一個k2,v2
Text k2 = new Text();
LongWritable v2 = new LongWritable();
//輸入map型別key,value值為<0,hello you><10,hello me>
//0或10為行起始位元組資料
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words= value.toString().split(" ");
for (String word:words) {
if(word.trim().isEmpty()==false)
{
Debug.println(word,"1");
//word表示第一行中的每個單詞,即k2
k2.set(word);
//沒排序分組前每個單詞都是1個,由於是long型別所以加L
v2.set(1L);
//寫出
context.write(k2, v2);
}
}
}
}
public static class myReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
LongWritable v3=new LongWritable();
//k2,v2s引數形式為<hello,{1,1}><you,{1}>變為---><hello,2><you,1>
@Override
protected void reduce(Text k2,Iterable<LongWritable> v2s,
Reducer<Text,LongWritable,Text,LongWritable>.Context context)
throws IOException,InterruptedException{
long count=0L;
for(LongWritable v2:v2s) {
count +=v2.get();
}
v3.set(count);
//k2就是k3,都是一個單詞
context.write(k2,v3);
}
}
//刪除輸出目錄
public static void deleteOutDir(Configuration conf,String out_dir)
throws IOException,URISyntaxException{
FileSystem fs=FileSystem.get(new URI(out_dir),conf);
if(fs.exists(new Path(out_dir))==true)
{
fs.delete(new Path(out_dir),true);
}
}
public static void main(String[] args) throws Exception
{
//載入hadoop conf 驅動
Configuration conf=new Configuration();
Job job=Job.getInstance(conf,wordcount.class.getSimpleName());
job.setJarByClass(wordcount.class);
Path in_path=new Path("hdfs://192.168.145.180:8020/user/root/input/djt.txt");
FileInputFormat.setInputPaths(job, in_path);
//通過TextInputFormat把讀到的資料處理成<k1,v1>形式
job.setInputFormatClass(TextInputFormat.class);
//job中加入Mapper,同時MyMapper類接受<k1,v1>作為引數傳給類中map函式進行資料處理
job.setMapperClass(myMapper.class);
//設定輸出的<k2,v2>的資料型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//job中加入Reducer,Reducer自動接收處理好的map資料
job.setReducerClass(myReducer.class);
//設定輸出的<k3,v3>的資料型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//設定輸出目錄檔案output2
String OUT_DIR = "hdfs://192.168.145.180:8020/user/root/output2";
FileOutputFormat.setOutputPath(job, new Path(OUT_DIR));
job.setOutputFormatClass(TextOutputFormat.class);
//如果這個檔案存在則刪除,如果檔案存在不刪除會報錯。
deleteOutDir(conf, OUT_DIR);
//把處理好的<k3,v3>的資料寫入檔案
job.waitForCompletion(true);
}
}
執行方法一:在eclipse開發環境中執行
wordcount.java編輯器中右鍵run as / run on hadoop執行OK
執行方法二:生成jar拷到hadoop伺服器上執行
生成jar方法:
Project Explorder專案工程樹型中選擇wordcoun.java右鍵Export/Java/Runnable Jar file
選擇項和輸入項
Launch configuration: wordcount-wordcount
export destination:D:\應用集合\eclipse\eclipse-workspace\bin_jar\wordcount.jar
Library handling:
可勾:Extract required libraries into generated JAR
點選Finish完成
把生成的jar拷到hadoop服務上執行
拷到/home/hadoop3/app/hadoop目錄中
執行:hadoop jar wordcount.jar
具體執行過程如下:
[[email protected] hadoop]$ hadoop jar wordcount.jar
18/09/03 19:22:38 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/09/03 19:22:41 INFO input.FileInputFormat: Total input paths to process : 1
18/09/03 19:22:42 INFO mapreduce.JobSubmitter: number of splits:1
18/09/03 19:22:42 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1535940670011_0002
18/09/03 19:22:42 INFO impl.YarnClientImpl: Submitted application application_1535940670011_0002
18/09/03 19:22:42 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1535940670011_0002/
18/09/03 19:22:42 INFO mapreduce.Job: Running job: job_1535940670011_0002
18/09/03 19:22:51 INFO mapreduce.Job: Job job_1535940670011_0002 running in uber mode : false
18/09/03 19:22:51 INFO mapreduce.Job: map 0% reduce 0%
18/09/03 19:23:01 INFO mapreduce.Job: map 100% reduce 0%
18/09/03 19:23:12 INFO mapreduce.Job: map 100% reduce 100%
18/09/03 19:23:12 INFO mapreduce.Job: Job job_1535940670011_0002 completed successfully
18/09/03 19:23:12 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=235
FILE: Number of bytes written=255343
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=205
HDFS: Number of bytes written=65
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=7766
Total time spent by all reduces in occupied slots (ms)=6780
Total time spent by all map tasks (ms)=7766
Total time spent by all reduce tasks (ms)=6780
Total vcore-milliseconds taken by all map tasks=7766
Total vcore-milliseconds taken by all reduce tasks=6780
Total megabyte-milliseconds taken by all map tasks=7952384
Total megabyte-milliseconds taken by all reduce tasks=6942720
Map-Reduce Framework
Map input records=10
Map output records=14
Map output bytes=201
Map output materialized bytes=235
Input split bytes=116
Combine input records=0
Combine output records=0
Reduce input groups=9
Reduce shuffle bytes=235
Reduce input records=14
Reduce output records=9
Spilled Records=28
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=169
CPU time spent (ms)=2070
Physical memory (bytes) snapshot=312471552
Virtual memory (bytes) snapshot=4165705728
Total committed heap usage (bytes)=152428544
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=89
File Output Format Counters
Bytes Written=65
[[email protected] hadoop]$
表示執行成功OK
其執行結果為:
hdfs://192.168.145.180:8020/user/root/output2
dajiangtai 3
hadoop 3
hello 2
her 1
hsg 1
me 1
qq.com 1
you 1
如果執行報錯:RunJar jarFile [mainClass] args…
則你可能採用Export/Java/Jar file生成的方式,沒有指定mainclass導致的問題
改用Export/Java/Runnable Jar file方式就OK,只不過生成的jar會比較大。
–the–end—-
相關推薦
map/reduce例項wordCount單詞計數實現功能
hadoop hadoop hadoop dajiangtai dajiangtai dajiangtai hsg qq.com hello you hello me her map/reduce處理功能 執行步驟: 1. map任務處理
hadoop入門(六)JavaAPI+Mapreduce例項wordCount單詞計數詳解
剛剛研究了一下haoop官網單詞計數的例子,把詳細步驟解析貼在下面: 準備工作: 1、haoop叢集環境搭建完成 2、新建一個檔案hello,並寫入2行單詞,如下: [[email protected] hadoop-2.6.0]# vi hello hello
Scala +Spark+Hadoop+Zookeeper+IDEA實現WordCount單詞計數(簡單例項)
IDEA+Scala +Spark實現wordCount單詞計數 一、新建一個Scala的object單例物件,修改pom檔案 (1)下面文章可以幫助參考安裝 IDEA 和 新建一個Scala程式。 (2)pom檔案 <?xml
Scala+Spark+Hadoop+IDEA實現WordCount單詞計數,上傳並執行任務(簡單例項-下)
Scala+Spark+Hadoop+IDEA上傳並執行任務 本文接續上一篇文章,已經在IDEA中執行Spark任務執行完畢,測試成功。 一、打包 1.1 將setMaster註釋掉 package day05 import
Hadoop學習:Map/Reduce初探與小Demo實現
pre 排序。 解決 想法 文本文 direction run page lang 一、 概念知識介紹 Hadoop MapReduce是一個用於處理海量數據的分布式計算框架。這個框架攻克了諸如數據分布式存儲、作業調度、容錯、機器間通信等復雜
Hadoop WordCount單詞計數原理
clas oop 圖片 tput 進行 打包 red div src 計算文件中出現每個單詞的頻數 輸入結果按照字母順序進行排序 編寫WordCount.java 包含Mapper類和Reducer類 編譯WordCount.java javac -classp
map-reduce之wordCount DEMO
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apa
用python的map/reduce函式實現int()功能,即字串轉換成數字
map()函式接收兩個引數,一個是函式,一個是可迭代物件,如列表,字串等,map將傳入的函式依次作用到序列(可迭代物件)的每個元素,並把結果作為新的Iterator(可迭代物件)返回。 再看reduc
WordCount結對作業——實現擴展功能
com 測試 tree 使用 .com 發揮 學習 發現 logs 合作者學號:201631081309 201631062307 作業鏈接 碼雲地址 一:實現的功能。 WordCount是一個簡單的小程序,實現了一下幾項基本功能。 - 統計字符數 - 統計單詞數 - 統計
Map Reduce用tree Map實現·topn
首先有如下如數,要統計每個頁面的訪問量,然後計算訪問量最大的五個頁面 2017/07/28 qq.com/a 2017/07/28 qq.com/bx 2017/07/28 qq.com/by 2017/07/28 qq.com/by3 2017/07/28 qq.com/news
WordCount結對作業——實現擴充套件功能
合作者學號:201631081309 201631062307 作業連結 碼雲地址 一:實現的功能。 WordCount是一個簡單的小程式,實現了一下幾項基本功能。 - 統計字元數 - 統計單詞數 - 統計文字行 以及擴充套件功能 - 統計註釋行、程式碼行、以及空行 - 遞迴處理目錄下符合條件的檔案 -
大資料處理神器map-reduce實現(僅python和shell版本)
熟悉java的人直接可以使用java實現map-reduce過程,而像我這種不熟悉java的怎麼辦?為了讓非java程式設計師方便處理資料,我把使用python,shell實現streaming的過程,也即為map-reduce過程,整理如下: 1.如果資料不在hive裡面,而在
Hadoop之MapReduce過程,單詞計數WordCount
單詞計數是最簡單也是最能體現MapReduce思想的程式之一,可以稱為MapReduce版“Hello World”,該程式的完整程式碼可以在Hadoop安裝包的src/example目錄下找到。單詞計數主要完成的功能:統計一系列文字檔案中每個單詞出現的次數,如下圖所示。 WordCo
18 | 散列表(上):Word文件中的單詞拼寫檢查功能是如何實現的?
Word 這種文字編輯器你平時應該經常用吧,那你有沒有留意過它的拼寫檢查功能呢?一旦我們在 Word 裡輸入一個錯誤的英文單詞,它就會用標紅的方式提示“拼寫錯誤”。Word 的這個單詞拼寫檢查功能,雖然很小但卻非常實用。你有沒有想過,這個功能是如何實現的呢? 其
大矩陣乘法運算map reduce實現思路
實現思路: 儲存: 大矩陣很多都是稀疏矩陣,並且有可能有上百萬的行和上百萬的列。 那麼矩陣可以存在類似HBase面向列的分散式資料庫中。 假設HTable中有兩個表A和表B分別儲存兩個巨型矩陣a和b。表A和表B都是隻有一個列族。列名都是1開始計數。 那麼表A和表B所儲存的矩
hadoop學習(七)WordCount+Block+Split+Shuffle+Map+Reduce技術詳解
1、在map task執行時,它的輸入資料來源於HDFS的block,當然在MapReduce概念中,map task只讀取split。Split與block的對應關係在上面我們已經說的很明白了。在WordCount例子裡,假設map的輸入資料都是像 “aaa”這樣的字串。 2、
Hadoop分佈環境搭建步驟,及自帶MapReduce單詞計數程式實現
參考騰訊雲實驗室 Hadoop分佈環境搭建步驟: 1.軟硬體環境 CentOS 7.2 64 位 JDK- 1.8 Hadoo p- 2.7.4 2.安裝SSH sudo yum install openssh-clients openssh-ser
Python遞迴實現多層巢狀dict遍歷例項--三級選單/多級選單功能
要求:構建一個三(N)級選單,實現使用者可以根據指示選擇進入選單,退出選單,返回上層選單功能。知識點:dict資料的遍歷方法,遞迴遍歷方法。環境:Python3.6實現程式碼:#!/usr/bin/evn python # -*-coding:utf8 -*- class m
scala 兩種方法實現單詞計數
val lines = List("hello world", "hello spark") val wordlist = lines.flatMap(line => line.split(" ")).map(word => (word, 1))
Linux 單詞計數 WordCount 以及程式碼案例
WordCount 首先是命令列的: WordCount(單詞計數) 1:啟動hadoop 使用 start-all.sh 命令啟動hdfs 2:在hadoop的安裝目錄下新建一個目錄,使用hdfs的shell命令 cd /usr/local