04.Mapreduce例項——單表join
阿新 • • 發佈:2021-11-19
04.Mapreduce例項——單表join
實驗原理
以本實驗的buyer1(buyer_id,friends_id)表為例來闡述單表連線的實驗原理。單表連線,連線的是左表的buyer_id列和右表的friends_id列,且左表和右表是同一個表。因此,在map階段將讀入資料分割成buyer_id和friends_id之後,會將buyer_id設定成key,friends_id設定成value,直接輸出並將其作為左表;再將同一對buyer_id和friends_id中的friends_id設定成key,buyer_id設定成value進行輸出,作為右表。為了區分輸出中的左右表,需要在輸出的value中再加上左右表的資訊,比如在value的String最開始處加上字元1表示左表,加上字元2表示右表。這樣在map的結果中就形成了左表和右表,然後在shuffle過程中完成連線。reduce接收到連線的結果,其中每個key的value-list就包含了"buyer_idfriends_id--friends_idbuyer_id"關係。取出每個key的value-list進行解析,將左表中的buyer_id放入一個數組,右表中的friends_id放入一個數組,然後對兩個陣列求笛卡爾積就是最後的結果了。
實驗步驟
- 開啟Hadoop服務
Start-all.sh
- 建立目錄
mkdir-p/data/mapreduce7
- 將buyer1檔案上傳到該目錄下
- 上傳hadoop2lib檔案並解壓
unzip hadoop2lib.zip
- 在hdfs上新建/mymapreduce7/in目錄,然後將Linux本地/data/mapreduce7目錄下的buyer1檔案匯入到hdfs的/mymapreduce7/in目錄中。
hadoop fs -mkdir -p /mymapreduce7/in
hadoop fs -put /data/mapreduce7/buyer1 /mymapreduce7/in
- IDEA中編寫Java程式碼
- package mapreduce4;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DanJoin {
public static class Map extends Mapper<Object,Text,Text,Text>{
public void map(Object key,Text value,Context context)
throws IOException,InterruptedException{
String line = value.toString();
String[] arr = line.split(",");
String mapkey=arr[0];
String mapvalue=arr[1];
String relationtype=new String();
relationtype="1";
context.write(new Text(mapkey),new Text(relationtype+"+"+mapvalue));
//System.out.println(relationtype+"+"+mapvalue);
relationtype="2";
context.write(new Text(mapvalue),new Text(relationtype+"+"+mapkey));
//System.out.println(relationtype+"+"+mapvalue);
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text>{
public void reduce(Text key,Iterable<Text> values,Context context)
throws IOException,InterruptedException{
int buyernum=0;
String[] buyer=new String[20];
int friendsnum=0;
String[] friends=new String[20];
Iterator ite=values.iterator();
while(ite.hasNext()){
String record=ite.next().toString();
int len=record.length();
int i=2;
if(0==len){
continue;
}
char relationtype=record.charAt(0);
if('1'==relationtype){
buyer [buyernum]=record.substring(i);
buyernum++;
}
if('2'==relationtype){
friends[friendsnum]=record.substring(i);
friendsnum++;
}
}
if(0!=buyernum&&0!=friendsnum){
for(int m=0;m<buyernum;m++){
for(int n=0;n<friendsnum;n++){
if(buyer[m]!=friends[n]){
context.write(new Text(buyer[m]),new Text(friends[n]));
}
}
}
}
}
}
public static void main(String[] args) throws Exception{
Configuration conf=new Configuration();
String[] otherArgs=new String[2];
otherArgs[0]="hdfs://192.168.149.10:9000/mymapreduce7/in/buyer1";
otherArgs[1]="hdfs://192.168.149.10:9000/mymapreduce7/out";
Job job=new Job(conf," Table join");
job.setJarByClass(DanJoin.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
- 將hadoop2lib目錄中的jar包,拷貝到hadoop2lib目錄下。
- 拷貝log4j.properties檔案
- 執行結果