hadoop streaming reduce端join的python兩種實現方式
實現student和course資料表的join操作,以學生編號(sno)為連線欄位
測試資料
- student.txt檔案
#以一個空格分隔 #學生編號 姓名 #sno sname 01 lily 02 tom 03 jack 04 rose
course.txt檔案
#以一個空格分隔 #學生編號 課程名 課程成績 #sno cname grade 01 English 80 01 Math 90 02 English 82 02 Math 95
最終reduce端連線結果
sno sname cname grade 01 lily English 80
思路1
按照不同的檔案輸出時對資料新增標記,然後通過配置map階段的排序欄位,保證學生資訊出現在課程成績資訊的最上面,reduce中,發現是學生資訊時,對後面接收的課程資訊進行join並輸出。步驟如下:
1:map端有多個輸入檔案,以os.environ[‘map_input_file’]獲取輸入的檔案資訊,如果是student.txt檔案的輸入,則在輸出中增加一個’0’,是course.txt的輸入,在輸出中增加一個’1’.
2: 以sno為key進行partation,確保將同一個學生的資訊和課程資訊分配到同一個reduce中。同時以”sno,標識” 作為map階段輸出的排序依據(以\t分隔的前兩列),確保在reduce中同一個學生的學生資訊在課程資訊的上面(sno,0在所有sno,1的行前面)。
3: 在reduce中,遇到0標記的行儲存sno和sname資訊,遇到1標記的行將sno和sname和當前行的課程資訊一起輸出。
示例程式碼
mapper和reducer程式碼
#! /usr/bin/env python # coding:utf-8 import os import
work.bash
#! /bin/bash #定義map和reduce的目錄 export WORK_PATH=/var/tmp/reduce_join #執行stream的jar包地址 stream=$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.5.jar #資料輸入目錄 input=/lcy/reduce_join/input #輸出結果目錄 output=/lcy/reduce_join/output #刪除mr輸出目錄 if $HADOOP_HOME/bin/hdfs dfs -test -d $output then $HADOOP_HOME/bin/hdfs dfs -rm -r $output fi #執行mapreduce程式 $HADOOP_HOME/bin/hadoop jar $stream \ -D mapreduce.job.reduces=2 \ #設定reduce個數 -D num.key.fields.for.partition=1 \ #設定第一個欄位為分割槽欄位(預設使用\t作為map輸出分隔符) -D stream.num.map.output.key.fields=2 \ #設定map的輸出以前兩個欄位作為排序依據,這個配置很重要,否則reduce的輸入格式不正確 -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \ #和partition引數一起使用 -input $input/* \ -output $output \ -mapper "python mr.py mapper" \ -reducer "python mr.py reducer" \ -file $WORK_PATH/mr.py
思路2
在reduce中,將學生資訊和課程成績資訊儲存到記憶體中,等遍歷完一個學生後,對兩個資料進行join操作。這種方式可以讓學生資訊在課程資訊的後面(因為經過partation和shuffle後,同一個sno的學生資訊和課程成績資訊會是連續的,要麼學生資訊在上面,要麼學生資訊在下面)步驟如下:
1:同思路1的第一步
2:以sno為key進行partation,確保將同一個學生的資訊和課程資訊分配到同一個reduce中。其中map端的排序可以不用設定了。
3:reduce中,遇到0標記的行儲存sno和sname資訊,遇到1標記的行,將cname和grade組成list並追加到arr中。遇到新的sno時遍歷arr,並和sno,sname一起輸出。
- 測試程式碼
mapper和reducer
#! /usr/bin/env python # coding:utf-8 import os import sys def read_input(file,sepr=' '): for line in file: line = line.split(sepr) yield line def mapper(): filepath = os.environ['map_input_file'] filename = os.path.split(filepath)[1] lines = read_input(sys.stdin) for data in lines: if data[0].strip() == "": continue if "student.txt" == filename: if len(data) != 2: continue else: print "%s\t%s\t%s" % (data[0],"0",data[1].strip()) else: if len(data) != 3: continue else: print "%s\t%s\t%s\t%s" %(data[0],"1",data[1].strip(),data[2].strip()) def reducer(): sno = None sname = None tpl = [] line = read_input(sys.stdin,'\t') for data in line: if (len(data) !=3 and len(data) !=4 ) or data[0].split() == "": continue if data[0] != sno: #遇到新的sno時,進行連線輸出 if sno != None and len(tpl) > 0: for row in tpl: print '%s\t%s\t%s\t%s' % (sno,sname,row[0],row[1]) tpl = [] #清空list,否則會變成笛卡爾積 sno = data[0] if data[1] == "0": sname = data[2].strip('\n') else: tpl.append([data[2].strip(),data[3].strip()]) else: if data[1] == "0": sname = data[2].strip('\n') else: tpl.append([data[2].strip(),data[3].strip()]) if sno != None and len(tpl) > 0: for row in tpl: print '%s\t%s\t%s\t%s' % (sno,sname,row[0],row[1]) if __name__ == "__main__": d = {"mapper":mapper,"reducer":reducer} if sys.argv[1] in d: d[sys.argv[1]]()
work.bash
#! /bin/bash #定義map和reduce的目錄 export WORK_PATH=/var/tmp/reduce_join2 #執行stream的jar包地址 stream=$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.5.jar #資料輸入目錄 input=/lcy/reduce_join2/input #輸出結果目錄 output=/lcy/reduce_join2/output #刪除mr輸出目錄 if $HADOOP_HOME/bin/hdfs dfs -test -d $output then $HADOOP_HOME/bin/hdfs dfs -rm -r $output fi #執行mapreduce程式 $HADOOP_HOME/bin/hadoop jar $stream \ -D mapreduce.job.reduces=2 \ -D num.key.fields.for.partition=1 \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \ -input $input/* \ -output $output \ -mapper "python mr.py mapper" \ -reducer "python mr.py reducer" \ -file $WORK_PATH/mr.py
相關推薦
判斷機器大小端的兩種實現方式
一、為什麼會有大小端之分 這是因為在計算機系統中,我們是以位元組為單位的,每個地址單元都對應著一個位元組,一個位元組為 8bit。但是在C語言中除了8bit的char之外,還有16bit的short型,32bit的long型(要看具體的編譯器),另外,對於位數
hadoop streaming reduce端join的python兩種實現方式
實現student和course資料表的join操作,以學生編號(sno)為連線欄位 測試資料 student.txt檔案 #以一個空格分隔 #學生編號 姓名 #sno sname 01 lily 02 tom 03 jac
自動補全、自動提示的兩種實現方式(前端實現與後端實現)
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Document</title> <link rel="style
判斷機器大小端的兩種實現方法
大端模式(Big-endian): 是指資料的低位(就是權值較小的後面那幾位)儲存在記憶體的高地址中,而資料的高位,儲存在記憶體的低地址 中,這樣的儲存模式有點兒類似於把資料當作字串順序處理:地址由小
移動端HTML5導航欄吸頂:IOS(sticky)和Android兩種實現方式
混合App,前端H5頁面,實現導航欄滑動到的時候貼頂。 注意: 首先寫的時候,監聽scroll事件,滑動到指定位置時改為定位 position:fixed;,實際運用過程中,IOS無法實時監聽scroll事件,在滾動停止之後才觸發的 $(wind
[轉]Web APi之認證(Authentication)兩種實現方式【二】(十三)
用戶數 ted das 客戶 元素 基礎 目標 開始 net 本文轉自:http://www.cnblogs.com/CreateMyself/p/4857799.html 前言 上一節我們詳細講解了認證及其基本信息,這一節我們通過兩種不同方式來實現認證,並且分析如
多線程兩種實現方式的區別
http [] tick 避免 main 單繼承 style 區別 tar 請解釋Thread類與Runnable接口實現多線程的區別?(請解釋多線程兩種實現方式的區別?) 1. Thread類時Runnable接口的子類,使用Runnable接口實現多線程可以避免單繼承局
JPA 派生標識符的兩種實現方式
string column public tid man pri one embed page 方法一:@Entity@IdClass(ModuleId.class)public class Module { @Id private Integer index;
14、Fibonacci的兩種實現方式
等於 cheng pos art log ref clas gpo tar 斐波納契數列,又稱黃金分割數列,指的是這樣一個數列:1、1、2、3、5、8、13、21、……在數學上,斐波納契數列以如下被以遞歸的方法定義:F0=0,F1=1,Fn=F(n-1)+F(n-2)(n&
Web APi之認證(Authentication)兩種實現方式【二】(十三)
基於web 推薦 zed {0} scheme sage https 函數 ges 原文:Web APi之認證(Authentication)兩種實現方式【二】(十三)前言 上一節我們詳細講解了認證及其基本信息,這一節我們通過兩種不同方式來實現認證,並且分析如何合理的利用
spring ----> aop的兩種實現方式
select imp ack exe readv expr gpo for public 實現1:基於xml 1 package com.rr.spring3.interf; //接口 2 3 public interface SayHello { 4 5
Ajax的兩種實現方式
enc () != 部分 pen clas servlet 瀏覽器 pop //ajax的jquery實現 function aclick(){//alert("測
圖形驗證碼的兩種實現方式
ava 輸入 urn color deb rect lac prev 後臺 情形一:圖形驗證碼跟短信驗證碼一起,只需要將後臺提供的動態鏈接填到(id="img")的src中即可生成動態驗證碼。 然後,在需要請求接口的地方,只需把(id="imgCode")中用戶輸入的信息通
前端路由的兩種實現方式
color 前端路由 his 頁面 無刷新 原理 range window 使用 什麽是路由? 路由是根據不同的 url 地址展示不同的內容或頁面 早期的路由都是後端直接根據 url 來 reload 頁面實現的,即後端控制路由。 後來頁面越來越復雜,服務器壓力越來越大,隨
題目24-多線程兩種實現方式
類重寫 直接 解決方案 做的 子類 是否為空 缺點 多線程同步 弊端 1、多線程兩種實現方式 (1)繼承Thread 定義類繼承Thread 重寫run方法 把新線程要做的事寫在run方法中 創建線程對象 開啟新線程, 內部會自動執行run方法(2)實現Runnable
線程的兩種實現方式
class args new pub nds runnable implement ide start 線程的兩種實現方式 (1)繼承Thread類`` /** * 繼承Thread類 * */ public class PayThread extends T
iOS活動倒計時的兩種實現方式
ofo orm ren 年-月 ats omd string 分享 截圖 代碼地址如下:<br>http://www.demodashi.com/demo/11076.html 在做些活動界面或者限時驗證碼時, 經常會使用一些倒計時突出展現. 現提供兩種方
Brute-Force模式匹配演算法兩種實現方式
1. public static int indexOf(String mainStr,String subString,int start) { if((mainStr.length()<subString.length()) || mainStr==null || subStr
單例的兩種實現方式、多個版本及利弊對照
單例設計模式,顧明思議,只有一個例項,先交代重要一點,為防止外界對該類進行例項化,需要把類的建構函式宣告為私有的,這樣大家對原理理解更深入些。 1、餓漢式 餓漢模式單例程式碼,經典,可用,無需改進。 package com.sing
Java base64加密解密 兩種實現方式
1、為什麼要使用Base 64 Base 64主要用途不是加密,而是把一些二進位制數轉成普通字元,方便在網路上傳輸。 由於一些二進位制字元在傳輸協議中屬於 控制字元,不能直接傳送,所以需要轉換一下才可以。由於某些系統中只能使用ASCII字元,Base64