大資料教程(9.6)map端join實現
阿新 • • 發佈:2018-12-13
上一篇文章講了mapreduce配合實現join,本節博主將講述在map端的join實現;
一、需求
實現兩個“表”的join操作,其中一個表資料量小,一個表很大,這種場景在實際中非常常見,比如“訂單日誌” join “產品資訊”
二、分析
--原理闡述:適用於關聯表中有小表的情形;可以將小表分發到所有的map節點,這樣,map節點就可以在本地對自己所讀到的大表資料進行join並輸出最終結果,可以大大提高join操作的併發度,加快處理速度
--示例:先在mapper類中預先定義好小表,進行join
--並用distributedcache機制將小表的資料分發到每一個maptask執行節點,從而每一個maptask節點可以從本地載入到小表的資料,進而在本地即可實現join
三、程式碼實現
package com.empire.hadoop.mr.mapsidejoin; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MapSideJoin { public static class MapSideJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> { // 用一個hashmap來載入儲存產品資訊表 Map<String, String> pdInfoMap = new HashMap<String, String>(); Text k = new Text(); /** * 通過閱讀父類Mapper的原始碼,發現 setup方法是在maptask處理資料之前呼叫一次 可以用來做一些初始化工作 */ @Override protected void setup(Context context) throws IOException, InterruptedException { BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("product.txt"))); String line; while (StringUtils.isNotEmpty(line = br.readLine())) { String[] fields = line.split("\t"); pdInfoMap.put(fields[0], fields[2]); } br.close(); } // 由於已經持有完整的產品資訊表,所以在map方法中就能實現join邏輯了 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String orderLine = value.toString(); String[] fields = orderLine.split("\t"); String pdName = pdInfoMap.get(fields[1]); k.set(orderLine + "\t" + pdName); context.write(k, NullWritable.get()); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MapSideJoin.class); //job.setJar("D:/mapsidejoin.jar"); job.setMapperClass(MapSideJoinMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 指定需要快取一個檔案到所有的maptask執行節點工作目錄 /* job.addArchiveToClassPath(archive); */// 快取jar包到task執行節點的classpath中 /* job.addFileToClassPath(file); */// 快取普通檔案到task執行節點的classpath中 /* job.addCacheArchive(uri); */// 快取壓縮包檔案到task執行節點的工作目錄 /* job.addCacheFile(uri) */// 快取普通檔案到task執行節點的工作目錄 // 將產品表文件快取到task工作節點的工作目錄中去 //job.addCacheFile(new URI("file:/D:/srcdata/mapjoincache/pdts.txt")); job.addCacheFile(new URI("hdfs://centos-aaron-h1:9000/rjoin/mapjoincache/product.txt")); //map端join的邏輯不需要reduce階段,設定reducetask數量為0 job.setNumReduceTasks(0); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }
四、執行程式
#上傳jar
Alt+p
lcd d:/
put mapsidejoin.jar
#準備hadoop處理的資料檔案
cd /home/hadoop/apps/hadoop-2.9.1
hadoop fs -mkdir -p /rjoin/mapjoinsideinput
hadoop fs -mkdir -p /rjoin/mapjoincache
hdfs dfs -put order.txt /rjoin/mapjoinsideinput
hdfs dfs -put product.txt /rjoin/mapjoincache
#執行mapsidejoin程式
hadoop jar mapsidejoin.jar com.empire.hadoop.mr.mapsidejoin.MapSideJoin /rjoin/mapjoinsideinput /rjoin/mapjoinsideoutput
五、執行效果
IPC Parameter Sending Thread #0] DEBUG org.apache.hadoop.ipc.Client - IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop sending #87 org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB.getCounters
[IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop] DEBUG org.apache.hadoop.ipc.Client - IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop got value #87
[main] DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine - Call: getCounters took 36ms
[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 30
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=189612
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=218
HDFS: Number of bytes written=108
HDFS: Number of read operations=5
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=3057
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=3057
Total vcore-milliseconds taken by all map tasks=3057
Total megabyte-milliseconds taken by all map tasks=3130368
Map-Reduce Framework
Map input records=4
Map output records=4
Input split bytes=125
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=99
CPU time spent (ms)=350
Physical memory (bytes) snapshot=117669888
Virtual memory (bytes) snapshot=845942784
Total committed heap usage (bytes)=16121856
File Input Format Counters
Bytes Read=93
File Output Format Counters
Bytes Written=108
[main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:hadoop (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:328)
[IPC Parameter Sending Thread #0] DEBUG org.apache.hadoop.ipc.Client - IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop sending #88 org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB.getJobReport
[IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop] DEBUG org.apache.hadoop.ipc.Client - IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop got value #88
[main] DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine - Call: getJobReport took 0ms
[pool-4-thread-1] DEBUG org.apache.hadoop.ipc.Client - stopping client from cache: org.apache.hadoop.ipc.Client@303c7016
[Thread-3] DEBUG org.apache.hadoop.util.ShutdownHookManager - ShutdownHookManger complete shutdown.
六、執行結果
[hadoop@centos-aaron-h1 ~]$ hdfs dfs -cat /rjoin/mapjoinsideoutput/part-m-00000
1001 20150710 P0001 2 小米5
1002 20150710 P0001 3 小米5
1002 20150710 P0002 3 錘子T1
1003 20150710 P0003 3 錘子
最後寄語,以上是博主本次文章的全部內容,如果大家覺得博主的文章還不錯,請點贊;如果您對博主其它伺服器大資料技術或者博主本人感興趣,請關注博主部落格,並且歡迎隨時跟博主溝通交流。