1. 程式人生 > >hadoop streaming reduce端join的python兩種實現方式

hadoop streaming reduce端join的python兩種實現方式

實現student和course資料表的join操作,以學生編號(sno)為連線欄位

  • 測試資料

    • student.txt檔案
    
    #以一個空格分隔
    
    
    #學生編號   姓名
    
    
    #sno    sname
    
    01 lily
    02 tom
    03 jack
    04 rose
    • course.txt檔案

      
      #以一個空格分隔
      
      
      #學生編號    課程名  課程成績
      
      
      #sno cname grade 
      
      01 English 80
      01 Math 90
      02 English 82
      02 Math 95
    • 最終reduce端連線結果

    sno  sname   cname   grade
    01   lily    English   80
    01 lily Math 90 02 tom English 82 02 tom Math 95
  • 思路1

    按照不同的檔案輸出時對資料新增標記,然後通過配置map階段的排序欄位,保證學生資訊出現在課程成績資訊的最上面,reduce中,發現是學生資訊時,對後面接收的課程資訊進行join並輸出。步驟如下:

    1:map端有多個輸入檔案,以os.environ[‘map_input_file’]獲取輸入的檔案資訊,如果是student.txt檔案的輸入,則在輸出中增加一個’0’,是course.txt的輸入,在輸出中增加一個’1’.

    2: 以sno為key進行partation,確保將同一個學生的資訊和課程資訊分配到同一個reduce中。同時以”sno,標識” 作為map階段輸出的排序依據(以\t分隔的前兩列),確保在reduce中同一個學生的學生資訊在課程資訊的上面(sno,0在所有sno,1的行前面)。

    3: 在reduce中,遇到0標記的行儲存sno和sname資訊,遇到1標記的行將sno和sname和當前行的課程資訊一起輸出。

    • 示例程式碼

      mapper和reducer程式碼

          #! /usr/bin/env python
          # coding:utf-8
          import os
          import
      sys def read_input(file,sepr=' '): for line in file: line = line.split(sepr) yield line def mapper(): filepath = os.environ['map_input_file'] filename = os.path.split(filepath)[1] lines = read_input(sys.stdin) for data in lines: if data[0].strip() == "": continue if "student.txt" == filename: if len(data) != 2: continue else: print "%s\t%s\t%s" % (data[0],"0",data[1].strip()) else: if len(data) != 4: continue else: print "%s\t%s\t%s\t%s" %(data[0],"1",data[2].strip(),data[3].strip()) def reducer(): sno = None sname = None line = read_input(sys.stdin,'\t') for data in line: if (len(data) !=3 and len(data) !=4 ) or data[0].split() == "": continue if data[0] != sno: sno = data[0] if data[1] == "0": sname = data[2].strip('\n') else: cname = data[2] cnum = data[3].strip('\n') print "%s\t%s\t%s\t%s" % (sno,sname,cname,cnum) if __name__ == "__main__": d = {"mapper":mapper,"reducer":reducer} if sys.argv[1] in d: d[sys.argv[1]]()

      work.bash

      
      #! /bin/bash
      
      
      #定義map和reduce的目錄
      
      export WORK_PATH=/var/tmp/reduce_join
      
      #執行stream的jar包地址
      
      stream=$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.5.jar
      
      #資料輸入目錄
      
      input=/lcy/reduce_join/input
      
      #輸出結果目錄
      
      output=/lcy/reduce_join/output
      
      
      #刪除mr輸出目錄
      
      if $HADOOP_HOME/bin/hdfs dfs -test -d $output
      then
          $HADOOP_HOME/bin/hdfs dfs -rm -r $output
      fi
      
      
      #執行mapreduce程式
      
      $HADOOP_HOME/bin/hadoop jar $stream \
      -D mapreduce.job.reduces=2 \        #設定reduce個數
      -D num.key.fields.for.partition=1   \   #設定第一個欄位為分割槽欄位(預設使用\t作為map輸出分隔符)
      -D stream.num.map.output.key.fields=2 \  #設定map的輸出以前兩個欄位作為排序依據,這個配置很重要,否則reduce的輸入格式不正確
      -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \  #和partition引數一起使用
      -input $input/* \
      -output $output \
      -mapper "python mr.py mapper" \
      -reducer "python mr.py reducer" \
      -file $WORK_PATH/mr.py
  • 思路2

    在reduce中,將學生資訊和課程成績資訊儲存到記憶體中,等遍歷完一個學生後,對兩個資料進行join操作。這種方式可以讓學生資訊在課程資訊的後面(因為經過partation和shuffle後,同一個sno的學生資訊和課程成績資訊會是連續的,要麼學生資訊在上面,要麼學生資訊在下面)步驟如下:

    1:同思路1的第一步

    2:以sno為key進行partation,確保將同一個學生的資訊和課程資訊分配到同一個reduce中。其中map端的排序可以不用設定了。

    3:reduce中,遇到0標記的行儲存sno和sname資訊,遇到1標記的行,將cname和grade組成list並追加到arr中。遇到新的sno時遍歷arr,並和sno,sname一起輸出。

    • 測試程式碼

    mapper和reducer

    
    #! /usr/bin/env python
    
    
    # coding:utf-8
    
    import os
    import sys
    
    def read_input(file,sepr=' '):
        for line in file:
            line = line.split(sepr)
            yield line
    
    def mapper():
        filepath = os.environ['map_input_file']
        filename = os.path.split(filepath)[1]
        lines = read_input(sys.stdin)
        for data in lines:
            if data[0].strip() == "":
                continue
    
            if "student.txt" == filename:
                if len(data) != 2:
                    continue
                else:
                    print "%s\t%s\t%s" % (data[0],"0",data[1].strip())
            else:
                if len(data) != 3:
                    continue
                else:
                    print "%s\t%s\t%s\t%s" %(data[0],"1",data[1].strip(),data[2].strip())
    
    def reducer():
        sno = None
        sname = None
        tpl = []
        line = read_input(sys.stdin,'\t')
        for data in line:
            if (len(data) !=3 and len(data) !=4 ) or data[0].split() == "":
                continue
    
            if data[0] != sno:
                #遇到新的sno時,進行連線輸出
                if sno != None and len(tpl) > 0:
                    for row in tpl:
                        print '%s\t%s\t%s\t%s' % (sno,sname,row[0],row[1])
                    tpl = []   #清空list,否則會變成笛卡爾積
    
                sno = data[0]
                if data[1] == "0":
                    sname = data[2].strip('\n')
                else:
                    tpl.append([data[2].strip(),data[3].strip()])
            else:
                if data[1] == "0":
                    sname = data[2].strip('\n')
                else:
                    tpl.append([data[2].strip(),data[3].strip()])
    
        if sno != None and len(tpl) > 0:
            for row in tpl:
                print '%s\t%s\t%s\t%s' % (sno,sname,row[0],row[1])
    
    
    
    if __name__ == "__main__":
        d = {"mapper":mapper,"reducer":reducer}
        if sys.argv[1] in d:
            d[sys.argv[1]]()

    work.bash

    
    #! /bin/bash
    
    
    #定義map和reduce的目錄
    
    export WORK_PATH=/var/tmp/reduce_join2
    
    #執行stream的jar包地址
    
    stream=$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.5.jar
    
    #資料輸入目錄
    
    input=/lcy/reduce_join2/input
    
    #輸出結果目錄
    
    output=/lcy/reduce_join2/output
    
    
    #刪除mr輸出目錄
    
    if $HADOOP_HOME/bin/hdfs dfs -test -d $output
    then
        $HADOOP_HOME/bin/hdfs dfs -rm -r $output
    fi
    
    
    #執行mapreduce程式
    
    $HADOOP_HOME/bin/hadoop jar $stream \
    -D mapreduce.job.reduces=2 \
    -D num.key.fields.for.partition=1   \
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
    -input $input/* \
    -output $output \
    -mapper "python mr.py mapper" \
    -reducer "python mr.py reducer" \
    -file $WORK_PATH/mr.py

相關推薦

判斷機器大小實現方式

一、為什麼會有大小端之分 這是因為在計算機系統中,我們是以位元組為單位的,每個地址單元都對應著一個位元組,一個位元組為 8bit。但是在C語言中除了8bit的char之外,還有16bit的short型,32bit的long型(要看具體的編譯器),另外,對於位數

hadoop streaming reducejoin的python實現方式

實現student和course資料表的join操作,以學生編號(sno)為連線欄位 測試資料 student.txt檔案 #以一個空格分隔 #學生編號 姓名 #sno sname 01 lily 02 tom 03 jac

自動補全、自動提示的實現方式(前端實現與後實現

<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Document</title> <link rel="style

判斷機器大小實現方法

大端模式(Big-endian): 是指資料的低位(就是權值較小的後面那幾位)儲存在記憶體的高地址中,而資料的高位,儲存在記憶體的低地址 中,這樣的儲存模式有點兒類似於把資料當作字串順序處理:地址由小

移動HTML5導航欄吸頂:IOS(sticky)和Android實現方式

混合App,前端H5頁面,實現導航欄滑動到的時候貼頂。 注意: 首先寫的時候,監聽scroll事件,滑動到指定位置時改為定位 position:fixed;,實際運用過程中,IOS無法實時監聽scroll事件,在滾動停止之後才觸發的 $(wind

[轉]Web APi之認證(Authentication)實現方式【二】(十三)

用戶數 ted das 客戶 元素 基礎 目標 開始 net 本文轉自:http://www.cnblogs.com/CreateMyself/p/4857799.html 前言 上一節我們詳細講解了認證及其基本信息,這一節我們通過兩種不同方式來實現認證,並且分析如

多線程實現方式的區別

http [] tick 避免 main 單繼承 style 區別 tar 請解釋Thread類與Runnable接口實現多線程的區別?(請解釋多線程兩種實現方式的區別?) 1. Thread類時Runnable接口的子類,使用Runnable接口實現多線程可以避免單繼承局

JPA 派生標識符的實現方式

string column public tid man pri one embed page 方法一:@Entity@IdClass(ModuleId.class)public class Module { @Id private Integer index;

14、Fibonacci的實現方式

等於 cheng pos art log ref clas gpo tar 斐波納契數列,又稱黃金分割數列,指的是這樣一個數列:1、1、2、3、5、8、13、21、……在數學上,斐波納契數列以如下被以遞歸的方法定義:F0=0,F1=1,Fn=F(n-1)+F(n-2)(n&

Web APi之認證(Authentication)實現方式【二】(十三)

基於web 推薦 zed {0} scheme sage https 函數 ges 原文:Web APi之認證(Authentication)兩種實現方式【二】(十三)前言 上一節我們詳細講解了認證及其基本信息,這一節我們通過兩種不同方式來實現認證,並且分析如何合理的利用

spring ----> aop的實現方式

select imp ack exe readv expr gpo for public 實現1:基於xml 1 package com.rr.spring3.interf; //接口 2 3 public interface SayHello { 4 5

Ajax的實現方式

enc () != 部分 pen clas servlet 瀏覽器 pop //ajax的jquery實現 function aclick(){//alert("測

圖形驗證碼的實現方式

ava 輸入 urn color deb rect lac prev 後臺 情形一:圖形驗證碼跟短信驗證碼一起,只需要將後臺提供的動態鏈接填到(id="img")的src中即可生成動態驗證碼。 然後,在需要請求接口的地方,只需把(id="imgCode")中用戶輸入的信息通

前端路由的實現方式

color 前端路由 his 頁面 無刷新 原理 range window 使用 什麽是路由? 路由是根據不同的 url 地址展示不同的內容或頁面 早期的路由都是後端直接根據 url 來 reload 頁面實現的,即後端控制路由。 後來頁面越來越復雜,服務器壓力越來越大,隨

題目24-多線程實現方式

類重寫 直接 解決方案 做的 子類 是否為空 缺點 多線程同步 弊端 1、多線程兩種實現方式 (1)繼承Thread 定義類繼承Thread 重寫run方法 把新線程要做的事寫在run方法中 創建線程對象 開啟新線程, 內部會自動執行run方法(2)實現Runnable

線程的實現方式

class args new pub nds runnable implement ide start 線程的兩種實現方式 (1)繼承Thread類`` /** * 繼承Thread類 * */ public class PayThread extends T

iOS活動倒計時的實現方式

ofo orm ren 年-月 ats omd string 分享 截圖 代碼地址如下:<br>http://www.demodashi.com/demo/11076.html 在做些活動界面或者限時驗證碼時, 經常會使用一些倒計時突出展現. 現提供兩種方

Brute-Force模式匹配演算法實現方式

1. public static int indexOf(String mainStr,String subString,int start) { if((mainStr.length()<subString.length()) || mainStr==null || subStr

單例的實現方式、多個版本及利弊對照

        單例設計模式,顧明思議,只有一個例項,先交代重要一點,為防止外界對該類進行例項化,需要把類的建構函式宣告為私有的,這樣大家對原理理解更深入些。 1、餓漢式 餓漢模式單例程式碼,經典,可用,無需改進。 package com.sing

Java base64加密解密 實現方式

1、為什麼要使用Base 64     Base 64主要用途不是加密,而是把一些二進位制數轉成普通字元,方便在網路上傳輸。 由於一些二進位制字元在傳輸協議中屬於 控制字元,不能直接傳送,所以需要轉換一下才可以。由於某些系統中只能使用ASCII字元,Base64