1. 程式人生 > >hadoop streaming兩個資料檔案實現join合併操作

hadoop streaming兩個資料檔案實現join合併操作

hadoop做資料處理,大都是對集合進行操作,因此將資料檔案與另一個數據檔案進行join的操作需求非常常見。

有很多人詢問,下面將彙總一個例子讓入門的朋友掌握編寫方法:

[hdfs@server1]$ more clean_item_new
100002303,3368
100002865,11991
100003592,7995
100004955,7033
100006838,12630
100006855,6648
100006966,8561
100007628,6648
100008089,7791
100009560,11406
100010089,3064

... ...

欄位說明(user_id,classify)

[hdfs@server1]$ more clean_user_new
10001082,285259775,1,08
10001082,4368907,1,12
10001082,4368907,1,12
10001082,53616768,1,02
10001082,151466952,1,12
10001082,53616768,4,02
10001082,290088061,1,12
10001082,298397524,1,12
10001082,32104252,1,12
10001082,323339743,1,12
10001082,396795886,1,12
10001082,9947871,1,28
10001082,150720867,1,15

... ...

欄位說明(user_id,item_id,behavior_type,time)

使用python編寫mapper和reducer程式碼:

[hdfs@server1]$ cat clean_data_user_item_join.py
#!/usr/bin/python
# -*- coding:utf-8 -*-

import sys
import gc
import os

gc.disable()

def mapper():
    #filename = os.environ.get('mapreduce_map_input_file').split('/')
    filepath = os.environ["map_input_file"] //獲取多個檔案
    filename = os.path.split(filepath)[-1] 
    #print '---------------%s--------------------' % (filename)
    for line in sys.stdin:
        if line.strip()=="":
            continue
        fields = line[:-1].split(",")
        if filename == 'clean_item_new':  //判斷名字
            item_id = fields[0]
            item_category = fields[1]
            print ','.join((item_id,'0',item_category)) //輸出內容新增一個0資料,為了reducer中的處理便捷

        if filename == 'clean_user_new':
            user_id = fields[0]
            item_id = fields[1]
            behavior_type = fields[2]
            time = fields[3]
            print ','.join((item_id,'1',user_id,behavior_type,time))

def reducer():
    last_item_id = ""
    for line in sys.stdin:
        if line.strip()=='':
            continue
        fields = line[:-1].split(",")
        item_id = fields[0]
        if item_id != last_item_id:  //因為maper後資料已經排序好,下面將處理合並,判斷此行的item_id是與上一行的相同
            item_category=""
            if fields[1]=="0":
                item_category=fields[2].strip()
        elif item_id==last_item_id:
            if fields[1]=="1":
                user_id=fields[2].strip()
                behavior_type=fields[3].strip()
                time=fields[4].strip()
                if item_category:
                    print ','.join((last_item_id,user_id,item_category,behavior_type,time))
        last_item_id = item_id //重新賦值last_item_id

d = {'mapper':mapper, 'reducer':reducer}
if sys.argv[1] in d:
    d[sys.argv[1]]()

[hdfs@server1]$ cat clean_data_user_item_join.sh
#!/bin/bash

hdfs_input_path=/user/hdfs/tchi/input/*
hdfs_output_path=/user/hdfs/tchi/output

hdfs dfs -rm -r ${hdfs_output_path}

hadoop jar /opt/cloudera/parcels/CDH-5.5.1-1.cdh5.5.1.p0.11/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapreduce.job.queuename=root \
-D mapred.job.name='Clean Data' \
-D stream.num.map.output.key.fields=1 \
-D mapred.map.tasks=10  \
-D mapred.reduce.tasks=6 \
-input ${hdfs_input_path} \
-output ${hdfs_output_path} \
-file clean_data_user_item_join.py \
-mapper "python clean_data_user_item_join.py mapper" \
-reducer "python clean_data_user_item_join.py reducer"
hadoop steaming編寫比較簡單,掌握mapper和reducer就好。