1. 程式人生 > >MapReduce例項

MapReduce例項

本文參考http://michaelnielsen.org/blog/page/19/
從MapReduce的經典例子—單詞統計開始。
一個MapReduce job的輸入是一個(input_key, input_value)這樣的鍵值對集合。鍵值對集合可以使用python的dictionary資料型別來表示。在單詞統計例子中,input_key是檔名,input_value是檔案內容。

filenames = ['a.txt', 'b.txt', 'c.txt']
i = {}
for filename in filenames:
    f = open(filename)
    i[filename] = f.read()
    f.close()

python dictionary i 包含著MapReduce job的所有輸入。a.txt, b.txt和c.txt的內容如下:

text\a.txt:

The quick brown fox jumped over the lazy grey dogs.

text\b.txt:

That's one small step for a man, one giant leap for mankind.

text\c.txt:

Mary had a little lamb,
Its fleece was white as snow;
And everywhere that Mary went,
The lamb was sure to
go.

一個MapReduce job分為兩個階段:map階段和reduce階段。map階段產生 intermediate keys and values,這些 intermediate keys and values再由reduce階段處理。在map階段,一個mapper函式mapper(input_key,input_value)處理每個MapReduce job的輸入字典i中的鍵值對(input_key,input_value)mapper(input_key,input_value)產生由intermediate keys and values組成的列表。mapper("a.txt", i["a.txt"])

產生:

[('the', 1), ('quick', 1), ('brown', 1), ('fox', 1), ('jumped', 1), 
 ('over', 1), ('the', 1), ('lazy', 1), ('grey', 1), ('dogs', 1)]

mapper函式定義如下

 def mapper(input_key,input_value):
  return [(word,1) for word in 
          remove_punctuation(input_value.lower()).split()]

def remove_punctuation(s):
  return s.translate(string.maketrans("",""), string.punctuation)

定義這樣的mapper函式後,map階段的產出就是針對輸入的字典i呼叫mapper函式( mapper(“a.txt”), mapper(“b.txt”)和 mapper(“c.txt”) )所返回結果的合併:

[('the', 1), ('quick', 1), ('brown', 1), ('fox', 1), 
 ('jumped', 1), ('over', 1), ('the', 1), ('lazy', 1), ('grey', 1), 
 ('dogs', 1), ('mary', 1), ('had', 1), ('a', 1), ('little', 1), 
 ('lamb', 1), ('its', 1), ('fleece', 1), ('was', 1), ('white', 1), 
 ('as', 1), ('snow', 1), ('and', 1), ('everywhere', 1), 
 ('that', 1), ('mary', 1), ('went', 1), ('the', 1), ('lamb', 1), 
 ('was', 1), ('sure', 1), ('to', 1), ('go', 1), ('thats', 1), 
 ('one', 1), ('small', 1), ('step', 1), ('for', 1), ('a', 1), 
 ('man', 1), ('one', 1), ('giant', 1), ('leap', 1), ('for', 1), 
 ('mankind', 1)]

接下來進入reduce階段
MapReduce為reduce階段做一些預處理:將map階段產生的intermediate keys and values列表中含有相同key的value放在一起,生成一箇中間字典intermediate dictionary:

{'and': [1], 'fox': [1], 'over': [1], 'one': [1, 1], 'as': [1], 
 'go': [1], 'its': [1], 'lamb': [1, 1], 'giant': [1], 
 'for': [1, 1], 'jumped': [1], 'had': [1], 'snow': [1], 
 'to': [1], 'leap': [1], 'white': [1], 'was': [1, 1], 
 'mary': [1, 1], 'brown': [1], 'lazy': [1], 'sure': [1], 
 'that': [1], 'little': [1], 'small': [1], 'step': [1], 
 'everywhere': [1], 'mankind': [1], 'went': [1], 'man': [1], 
 'a': [1, 1], 'fleece': [1], 'grey': [1], 'dogs': [1], 
 'quick': [1], 'the': [1, 1, 1], 'thats': [1]}

reduce階段呼叫reducer函式,reducer(intermediate_key,intermediate_value_list)作用在intermediate dictionary中的每一項上。單詞統計的例子中,reducer函式將intermediate_key對應的
intermediate_value_list中的值加起來:

def reducer(intermediate_key,intermediate_value_list):
  return (intermediate_key,sum(intermediate_value_list))

intermediate dictionary經過reduce階段的處理,輸出:

[('and', 1), ('fox', 1), ('over', 1), ('one', 2), ('as', 1), 
 ('go', 1), ('its', 1), ('lamb', 2), ('giant', 1), ('for', 2), 
 ('jumped', 1), ('had', 1), ('snow', 1), ('to', 1), ('leap', 1), 
 ('white', 1), ('was', 2), ('mary', 2), ('brown', 1), 
 ('lazy', 1), ('sure', 1), ('that', 1), ('little', 1), 
 ('small', 1), ('step', 1), ('everywhere', 1), ('mankind', 1), 
 ('went', 1), ('man', 1), ('a', 2), ('fleece', 1), ('grey', 1), 
 ('dogs', 1), ('quick', 1), ('the', 3), ('thats', 1)]

參考程式:

#word_count.py

import string
import map_reduce

def mapper(input_key,input_value):
  return [(word,1) for word in 
          remove_punctuation(input_value.lower()).split()]

def remove_punctuation(s):
  return s.translate(string.maketrans("",""), string.punctuation)

def reducer(intermediate_key,intermediate_value_list):
  return (intermediate_key,sum(intermediate_value_list))

filenames = ["text\\a.txt","text\\b.txt","text\\c.txt"]
i = {}
for filename in filenames:
  f = open(filename)
  i[filename] = f.read()
  f.close()

print map_reduce.map_reduce(i,mapper,reducer)

map_reduce模組:

# map_reduce.py

import itertools

def map_reduce(i,mapper,reducer):
  intermediate = []
  for (key,value) in i.items():
    intermediate.extend(mapper(key,value))
  groups = {}
  for key, group in itertools.groupby(sorted(intermediate), 
                                      lambda x: x[0]):
    groups[key] = list([y for x, y in group])
  return [reducer(intermediate_key,groups[intermediate_key])
          for intermediate_key in groups] 

相關推薦

偽分散式執行Hadoop例項之yarn執行MapReduce例項

一、配置叢集 配置yarn-env.sh檔案 配置一下JAVA_HOME 配置yarn-site.xml <!-- reducer獲取資料的方式 --> <property> <name>yarn.nodemanager.au

Hadoop的MapReduce例項講解—Python寫的WordCount Demo

MapReduce是hadoop這隻大象的核心,Hadoop 中,資料處理核心就是 MapReduce 程式設計模型。一個Map/Reduce 作業(job) 通常會把輸入的資料集切分為若干獨立的資料塊,由 map任務(task)以完全並行的方式處理它們

執行一個mapreduce例項

本文改編自開啟 因為參考文中步驟有部分執行不正確,所以自己記錄下自己的步驟,並將原因整理了下。 Score.java檔案 下載 import java.io.IOException; import java.util.Iterator; import

Mapreduce例項-分組排重(group by distinct)

需要實現以下幾個類,程式碼太多,列了下主要程式碼,可根據排重資料的特徵判讀是否需要新增combiner來提速。public class GroupComparator implements RawComparator<MyBinaryKey> { @Over

【Hadoop】Windows 10 在Intellij IEDA本地執行Hadoop MapReduce例項

環境: 作業系統:Windows 10 Hadoop版本:2.7.3 Java版本: 1.8 前期準備: 1. 配置hadoop環境。 2. 配置maven環境。 1.下載maven部署包apache-maven-3.5.3-

7.測試hadoop安裝成功與否,並跑mapreduce例項

start-all.sh 2.建立hdfs目錄 hadoop fs -mkdir /input 3.上傳檔案 hadoop fs -put /data/hadoop-2.6.5/README.txt /input/ 4.修改檔名稱 hadoop fs -mv /input/READ

MapReduce例項】資料去重

一、例項描述 資料去重是利用並行化思想來對資料進行有意義的篩選。統計大資料集上的資料種類個數、從網站日誌中計算訪問等這些看似龐大的任務都會涉及資料去重。 比如,輸入檔案 file1.txt,其內容如下: 2017-12-9 a 2017-12-10 b

hadoop入門(六)JavaAPI+Mapreduce例項wordCount單詞計數詳解

剛剛研究了一下haoop官網單詞計數的例子,把詳細步驟解析貼在下面: 準備工作: 1、haoop叢集環境搭建完成 2、新建一個檔案hello,並寫入2行單詞,如下: [[email protected] hadoop-2.6.0]# vi hello hello

MapReduce例項

本文參考http://michaelnielsen.org/blog/page/19/ 從MapReduce的經典例子—單詞統計開始。 一個MapReduce job的輸入是一個(input_key, input_value)這樣的鍵值對集合。鍵值對集合可以

[hadoop]MapReduce例項之好友推薦(六)

一、定義好友檔案qq hadoop hello hdfs world tom cat cat dog hello world hello hdfs hadoop好友hello,hdfs好友worl

hodoop中使用MapReduce例項

網址: 1、http://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html 2、http://eric-gcm.

MapReduce例項淺析

Hadoop Map/Reduce是一個使用簡易的軟體框架,基於它寫出來的應用程式能夠執行在由上千個商用機器組成的大型叢集上,並以一種可靠容錯的方式並行處理上T級別的資料集。 一個Map/Reduce 作業(job) 通常會把輸入的資料集切分為若干獨立的資料塊,由 map任務(task)以完全並

Mapreduce例項---分割槽流量彙總

一:問題介紹 給一個數據檔案,檔案包含手機使用者的各種上網資訊,求每個手機使用者的總上行流量,總下行流量和總流量;並且按號碼歸屬地分省份彙總。 資料流程: 二:需要的jar包 hadoop-2

Mapreduce例項---統計單詞個數(wordcount)

一:問題介紹 統計每一個單詞在整個資料集中出現的總次數。 資料流程: 二:需要的jar包 Hadoop-2.4.1\share\hadoop\hdfs\hadoop-hdfs-2.4.1.ja

一個mapreduce例項加註釋

1.WCMapper.java package cn.itcast.hadoop.mr.wordcount; import java.io.IOException; import org.apache.commons.lang.StringUtils; import o

Hadoop2.0 Mapreduce例項WordCount體驗

     在Hadoop2.0中MapReduce程式的都需要繼承org.apache.hadoop.mapreduce.Mapper 和 org.apache.hadoop.mapreduce.Reducer這兩個基礎類,來定製自己的mapreduce功能,原始碼中主要的

Mapreduce資料分析例項

資料包 百度網盤 連結:https://pan.baidu.com/s/1v9M3jNdT4vwsqup9N0mGOA 提取碼:hs9c 複製這段內容後開啟百度網盤手機App,操作更方便哦 1、     資料清洗說明: (1)   

hadoop-mapreduce-examples Hadoop例項

[[email protected] hadoop-3.1.1]# bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.1.jar An example program must be given as the f

解讀MapReduce程式例項

Mapreduce 是一個分散式運算程式的程式設計框架,核心功能是將使用者編寫的業務邏輯程式碼和自帶預設元件整合成一個完整的 分散式運算程式,併發執行在一個 hadoop 叢集上。MapReduce採用“分而治之”策略,一個儲存在分散式檔案系統中的大規模資料集,會被切分成許多

大資料之Hadoop學習——動手實戰學習MapReduce程式設計例項

文章目錄 一、MapReduce程式設計例項 1.自定義物件序列化 需求分析 報錯:Exception in thread "main" java.lang.IllegalArgumentExcept