hadoop streaming兩個資料檔案實現join合併操作
阿新 • • 發佈:2019-02-19
hadoop做資料處理,大都是對集合進行操作,因此將資料檔案與另一個數據檔案進行join的操作需求非常常見。
有很多人詢問,下面將彙總一個例子讓入門的朋友掌握編寫方法:
[hdfs@server1]$ more clean_item_new
100002303,3368
100002865,11991
100003592,7995
100004955,7033
100006838,12630
100006855,6648
100006966,8561
100007628,6648
100008089,7791
100009560,11406
100010089,3064
... ...
欄位說明(user_id,classify)
[hdfs@server1]$ more clean_user_new
10001082,285259775,1,08
10001082,4368907,1,12
10001082,4368907,1,12
10001082,53616768,1,02
10001082,151466952,1,12
10001082,53616768,4,02
10001082,290088061,1,12
10001082,298397524,1,12
10001082,32104252,1,12
10001082,323339743,1,12
10001082,396795886,1,12
10001082,9947871,1,28
10001082,150720867,1,15
... ...
欄位說明(user_id,item_id,behavior_type,time)
使用python編寫mapper和reducer程式碼:
[hdfs@server1]$ cat clean_data_user_item_join.py #!/usr/bin/python # -*- coding:utf-8 -*- import sys import gc import os gc.disable() def mapper(): #filename = os.environ.get('mapreduce_map_input_file').split('/') filepath = os.environ["map_input_file"] //獲取多個檔案 filename = os.path.split(filepath)[-1] #print '---------------%s--------------------' % (filename) for line in sys.stdin: if line.strip()=="": continue fields = line[:-1].split(",") if filename == 'clean_item_new': //判斷名字 item_id = fields[0] item_category = fields[1] print ','.join((item_id,'0',item_category)) //輸出內容新增一個0資料,為了reducer中的處理便捷 if filename == 'clean_user_new': user_id = fields[0] item_id = fields[1] behavior_type = fields[2] time = fields[3] print ','.join((item_id,'1',user_id,behavior_type,time)) def reducer(): last_item_id = "" for line in sys.stdin: if line.strip()=='': continue fields = line[:-1].split(",") item_id = fields[0] if item_id != last_item_id: //因為maper後資料已經排序好,下面將處理合並,判斷此行的item_id是與上一行的相同 item_category="" if fields[1]=="0": item_category=fields[2].strip() elif item_id==last_item_id: if fields[1]=="1": user_id=fields[2].strip() behavior_type=fields[3].strip() time=fields[4].strip() if item_category: print ','.join((last_item_id,user_id,item_category,behavior_type,time)) last_item_id = item_id //重新賦值last_item_id d = {'mapper':mapper, 'reducer':reducer} if sys.argv[1] in d: d[sys.argv[1]]()
hadoop steaming編寫比較簡單,掌握mapper和reducer就好。[hdfs@server1]$ cat clean_data_user_item_join.sh #!/bin/bash hdfs_input_path=/user/hdfs/tchi/input/* hdfs_output_path=/user/hdfs/tchi/output hdfs dfs -rm -r ${hdfs_output_path} hadoop jar /opt/cloudera/parcels/CDH-5.5.1-1.cdh5.5.1.p0.11/lib/hadoop-mapreduce/hadoop-streaming.jar \ -D mapreduce.job.queuename=root \ -D mapred.job.name='Clean Data' \ -D stream.num.map.output.key.fields=1 \ -D mapred.map.tasks=10 \ -D mapred.reduce.tasks=6 \ -input ${hdfs_input_path} \ -output ${hdfs_output_path} \ -file clean_data_user_item_join.py \ -mapper "python clean_data_user_item_join.py mapper" \ -reducer "python clean_data_user_item_join.py reducer"