用MapReduce實現矩陣乘法
-
import org.apache.hadoop.mapred.JobConf;
-
public class MainRun {
-
public static final String HDFS = "hdfs://192.168.1.210:9000";
-
public static final Pattern DELIMITER = Pattern.compile("[\t,]");
-
public static void main(String[] args) {
-
martrixMultiply();
-
}
-
public static void martrixMultiply() {
-
Map<String, String> path = new HashMap<String, String>();
-
path.put("m1", "logfile/matrix/m1.csv");// 本地的資料檔案
-
path.put("m2", "logfile/matrix/m2.csv");
-
path.put("input", HDFS + "/user/hdfs/matrix");// HDFS的目錄
-
path.put("input1", HDFS + "/user/hdfs/matrix/m1");
-
path.put("input2", HDFS + "/user/hdfs/matrix/m2");
-
path.put("output", HDFS + "/user/hdfs/matrix/output");
-
try {
-
MartrixMultiply.run(path);// 啟動程式
-
} catch (Exception e) {
-
e.printStackTrace();
-
}
-
System.exit(0);
-
}
-
public static JobConf config() {// Hadoop叢集的遠端配置資訊
-
JobConf conf = new JobConf(MainRun.class);
-
conf.setJobName("MartrixMultiply");
-
conf.addResource("classpath:/hadoop/core-site.xml");
-
conf.addResource("classpath:/hadoop/hdfs-site.xml");
-
conf.addResource("classpath:/hadoop/mapred-site.xml");
-
return conf;
-
}
-
}
3).新建MR程式:MartrixMultiply.java
MapReduce程式
-
package org.conan.myhadoop.matrix;
-
import java.io.IOException;
-
import java.util.HashMap;
-
import java.util.Iterator;
-
import java.util.Map;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.IntWritable;
-
import org.apache.hadoop.io.LongWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapred.JobConf;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Mapper;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-
import org.conan.myhadoop.hdfs.HdfsDAO;
-
public class MartrixMultiply {
-
public static class MatrixMapper extends Mapper<LongWritable, Text, Text, Text> {
-
private String flag;// m1 or m2
-
private int rowNum = 2;// 矩陣A的行數
-
private int colNum = 2;// 矩陣B的列數
-
private int rowIndexA = 1; // 矩陣A,當前在第幾行
-
private int rowIndexB = 1; // 矩陣B,當前在第幾行
-
@Override
-
protected void setup(Context context) throws IOException, InterruptedException {
-
FileSplit split = (FileSplit) context.getInputSplit();
-
flag = split.getPath().getName();// 判斷讀的資料集
-
}
-
@Override
-
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
-
String[] tokens = MainRun.DELIMITER.split(values.toString());
-
if (flag.equals("m1")) {
-
for (int i = 1; i <= rowNum; i++) {
-
Text k = new Text(rowIndexA + "," + i);
-
for (int j = 1; j <= tokens.length; j++) {
-
Text v = new Text("A:" + j + "," + tokens[j - 1]);
-
context.write(k, v);
-
System.out.println(k.toString() + " " + v.toString());
-
}
-
}
-
rowIndexA++;
-
} else if (flag.equals("m2")) {
-
for (int i = 1; i <= tokens.length; i++) {
-
for (int j = 1; j <= colNum; j++) {
-
Text k = new Text(i + "," + j);
-
Text v = new Text("B:" + rowIndexB + "," + tokens[j - 1]);
-
context.write(k, v);
-
System.out.println(k.toString() + " " + v.toString());
-
}
-
}
-
rowIndexB++;
-
}
-
}
-
}
-
public static class MatrixReducer extends Reducer<Text, Text, Text, IntWritable> {
-
@Override
-
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-
Map<String, String> mapA = new HashMap<String, String>();
-
Map<String, String> mapB = new HashMap<String, String>();
-
System.out.print(key.toString() + ":");
-
for (Text line : values) {
-
String val = line.toString();
-
System.out.print("("+val+")");
-
if (val.startsWith("A:")) {
-
String[] kv = MainRun.DELIMITER.split(val.substring(2));
-
mapA.put(kv[0], kv[1]);
-
// System.out.println("A:" + kv[0] + "," + kv[1]);
-
} else if (val.startsWith("B:")) {
-
String[] kv = MainRun.DELIMITER.split(val.substring(2));
-
mapB.put(kv[0], kv[1]);
-
// System.out.println("B:" + kv[0] + "," + kv[1]);
-
}
-
}
-
int result = 0;
-
Iterator<String> iter = mapA.keySet().iterator();
-
while (iter.hasNext()) {
-
String mapk = iter.next();
-
result += Integer.parseInt(mapA.get(mapk)) * Integer.parseInt(mapB.get(mapk));
-
}
-
context.write(key, new IntWritable(result));
-
System.out.println();
-
// System.out.println("C:" + key.toString() + "," + result);
-
}
-
}
-
public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
-
JobConf conf = MainRun.config();
-
String input = path.get("input");
-
String input1 = path.get("input1");
-
String input2 = path.get("input2");
-
String output = path.get("output");
-
HdfsDAO hdfs = new HdfsDAO(MainRun.HDFS, conf);
-
hdfs.rmr(input);
-
hdfs.mkdirs(input);
-
hdfs.copyFile(path.get("m1"), input1);
-
hdfs.copyFile(path.get("m2"), input2);
-
Job job = new Job(conf);
-
job.setJarByClass(MartrixMultiply.class);
-
job.setOutputKeyClass(Text.class);
-
job.setOutputValueClass(Text.class);
-
job.setMapperClass(MatrixMapper.class);
-
job.setReducerClass(MatrixReducer.class);
-
job.setInputFormatClass(TextInputFormat.class);
-
job.setOutputFormatClass(TextOutputFormat.class);
-
FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 載入2個輸入資料集
-
FileOutputFormat.setOutputPath(job, new Path(output));
-
job.waitForCompletion(true);
-
}
-
}
執行日誌
-
Delete: hdfs://192.168.1.210:9000/user/hdfs/matrix
-
Create: hdfs://192.168.1.210:9000/user/hdfs/matrix
-
copy from: logfile/matrix/m1.csv to hdfs://192.168.1.210:9000/user/hdfs/matrix/m1
-
copy from: logfile/matrix/m2.csv to hdfs://192.168.1.210:9000/user/hdfs/matrix/m2
-
2014-1-15 10:48:03 org.apache.hadoop.util.NativeCodeLoader <clinit>
-
警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-
2014-1-15 10:48:03 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
-
警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
-
2014-1-15 10:48:03 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
-
警告: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
-
2014-1-15 10:48:03 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
-
資訊: Total input paths to process : 2
-
2014-1-15 10:48:03 org.apache.hadoop.io.compress.snappy.LoadSnappy <clinit>
-
警告: Snappy native library not loaded
-
2014-1-15 10:48:04 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
-
資訊: Running job: job_local_0001
-
2014-1-15 10:48:04 org.apache.hadoop.mapred.Task initialize
-
資訊: Using ResourceCalculatorPlugin : null
-
2014-1-15 10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
-
資訊: io.sort.mb = 100
-
2014-1-15 10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
-
資訊: data buffer = 79691776/99614720
-
2014-1-15 10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
-
資訊: record buffer = 262144/327680
-
1,1 A:1,1
-
1,1 A:2,0
-
1,1 A:3,2
-
1,2 A:1,1
-
1,2 A:2,0
-
1,2 A:3,2
-
2,1 A:1,-1
-
2,1 A:2,3
-
2,1 A:3,1
-
2,2 A:1,-1
-
2,2 A:2,3
-
2,2 A:3,1
-
2014-1-15 10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
-
資訊: Starting flush of map output
-
2014-1-15 10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
-
資訊: Finished spill 0
-
2014-1-15 10:48:04 org.apache.hadoop.mapred.Task done
-
資訊: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
-
2014-1-15 10:48:05 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
-
資訊: map 0% reduce 0%
-
2014-1-15 10:48:07 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
-
資訊:
-
2014-1-15 10:48:07 org.apache.hadoop.mapred.Task sendDone
-
資訊: Task 'attempt_local_0001_m_000000_0' done.
-
2014-1-15 10:48:07 org.apache.hadoop.mapred.Task initialize
-
資訊: Using ResourceCalculatorPlugin : null
-
2014-1-15 10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
-
資訊: io.sort.mb = 100
-
2014-1-15 10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
-
資訊: data buffer = 79691776/99614720
-
2014-1-15 10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
-
資訊: record buffer = 262144/327680
-
1,1 B:1,3
-
2014-1-15 10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
-
資訊: Starting flush of map output
-
1,2 B:1,1
-
2,1 B:1,3
-
2,2 B:1,1
-
1,1 B:2,2
-
1,2 B:2,1
-
2,1 B:2,2
-
2,2 B:2,1
-
1,1 B:3,1
-
1,2 B:3,0
-
2,1 B:3,1
-
2,2 B:3,0
-
2014-1-15 10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
-
資訊: Finished spill 0
-
2014-1-15 10:48:07 org.apache.hadoop.mapred.Task done
-
資訊: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
-
2014-1-15 10:48:08 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
-
資訊: map 100% reduce 0%
-
2014-1-15 10:48:10 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
-
資訊:
-
2014-1-15 10:48:10 org.apache.hadoop.mapred.Task sendDone
-
資訊: Task 'attempt_local_0001_m_000001_0' done.
-
2014-1-15 10:48:10 org.apache.hadoop.mapred.Task initialize
-
資訊: Using ResourceCalculatorPlugin : null
-
2014-1-15 10:48:10 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
-
資訊:
-
2014-1-15 10:48:10 org.apache.hadoop.mapred.Merger$MergeQueue merge
-
資訊: Merging 2 sorted segments
-
2014-1-15 10:48:10 org.apache.hadoop.mapred.Merger$MergeQueue merge
-
資訊: Down to the last merge-pass, with 2 segments left of total size: 294 bytes
-
2014-1-15 10:48:10 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
-
資訊:
-
1,1:(B:1,3)(B:2,2)(B:3,1)(A:1,1)(A:2,0)(A:3,2)
-
1,2:(A:1,1)(A:2,0)(A:3,2)(B:1,1)(B:2,1)(B:3,0)
-
2,1:(B:1,3)(B:2,2)(B:3,1)(A:1,-1)(A:2,3)(A:3,1)
-
2,2:(A:1,-1)(A:2,3)(A:3,1)(B:1,1)(B:2,1)(B:3,0)
-
2014-1-15 10:48:10 org.apache.hadoop.mapred.Task done
-
資訊: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
-
2014-1-15 10:48:10 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
-
資訊:
-
2014-1-15 10:48:10 org.apache.hadoop.mapred.Task commit
-
資訊: Task attempt_local_0001_r_000000_0 is allowed to commit now
-
2014-1-15 10:48:10 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
-
資訊: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/matrix/output
-
2014-1-15 10:48:13 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
-
資訊: reduce > reduce
-
2014-1-15 10:48:13 org.apache.hadoop.mapred.Task sendDone
-
資訊: Task 'attempt_local_0001_r_000000_0' done.
-
2014-1-15 10:48:14 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
-
資訊: map 100% reduce 100%
-
2014-1-15 10:48:14 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
-
資訊: Job complete: job_local_0001
-
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
-
資訊: Counters: 19
-
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
-
資訊: File Output Format Counters
-
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
-
資訊: Bytes Written=24
-
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
-
資訊: FileSystemCounters
-
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
-
資訊: FILE_BYTES_READ=1713
-
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
-
資訊: HDFS_BYTES_READ=75
-
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
-
資訊: FILE_BYTES_WRITTEN=125314
-
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
-
資訊: HDFS_BYTES_WRITTEN=114
-
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
-
資訊: File Input Format Counters
-
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
-
資訊: Bytes Read=30
-
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
-
資訊: Map-Reduce Framework
-
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
-
資訊: Map output materialized bytes=302
-
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
-
資訊: Map input records=5
-
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
-
資訊: Reduce shuffle bytes=0
-
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
-
資訊: Spilled Records=48
-
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
-
資訊: Map output bytes=242
-
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
-
資訊: Total committed heap usage (bytes)=764215296
-
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
-
資訊: SPLIT_RAW_BYTES=220
-
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
-
資訊: Combine input records=0
-
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
-
資訊: Reduce input records=24
-
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
-
資訊: Reduce input groups=4
-
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
-
資訊: Combine output records=0
-
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
-
資訊: Reduce output records=4
-
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
-
資訊: Map output records=24
4. 稀疏矩陣乘法的MapReduce計算
我們在用矩陣處理真實資料的時候,一般都是非常稀疏矩陣,為了節省儲存空間,通常只會儲存非0的資料。
下面我們來做一個稀疏矩陣:
- R語言的實現矩陣乘法
- 新建2個矩陣資料檔案sm1.csv, sm2.csv
- 修改啟動程式:MainRun.java
- 新建MR程式:SparseMartrixMultiply.java
1). R語言的實現矩陣乘法
R語言程式
-
> m1<-matrix(c(1,0,0,3,2,5,0,4,0,0,0,1,4,7,1,2),nrow=4,byrow=TRUE);m1
-
[,1] [,2] [,3] [,4]
-
[1,] 1 0 0 3
-
[2,] 2 5 0 4
-
[3,] 0 0 0 1
-
[4,] 4 7 1 2
-
> m2<-matrix(c(5,0,0,2,0,0,3,1),nrow=4,byrow=TRUE);m2
-
[,1] [,2]
-
[1,] 5 0
-
[2,] 0 2
-
[3,] 0 0
-
[4,] 3 1
-
> m3<-m1 %*% m2;m3
-
[,1] [,2]
-
[1,] 14 3
-
[2,] 22 14
-
[3,] 3 1
-
[4,] 26 16
2).新建2個稀疏矩陣資料檔案sm1.csv, sm2.csv
只儲存非0的資料,3列儲存,第一列“原矩陣行”,第二列“原矩陣列”,第三列“原矩陣值”。
sm1.csv
-
1,1,1
-
1,4,3
-
2,1,2
-
2,2,5
-
2,4,4
-
3,4,1
-
4,1,4
-
4,2,7
-
4,3,1
-
4,4,2
sm2.csv
-
1,1,5
-
2,2,2
-
4,1,3
-
4,2,1
3).修改啟動程式:MainRun.java
增加SparseMartrixMultiply的啟動配置
-
public static void main(String[] args) {
-
sparseMartrixMultiply();
-
}
-
public static void sparseMartrixMultiply() {
-
Map<String, String> path = new HashMap<String, String>();
-
path.put("m1", "logfile/matrix/sm1.csv");// 本地的資料檔案
-
path.put("m2", "logfile/matrix/sm2.csv");
-
path.put("input", HDFS + "/user/hdfs/matrix");// HDFS的目錄
-
path.put("input1", HDFS + "/user/hdfs/matrix/m1");
-
path.put("input2", HDFS + "/user/hdfs/matrix/m2");
-
path.put("output", HDFS + "/user/hdfs/matrix/output");
-
try {
-
SparseMartrixMultiply.run(path);// 啟動程式
-
} catch (Exception e) {
-
e.printStackTrace();
-
}
-
System.exit(0);
-
}
4). 新建MR程式:SparseMartrixMultiply.java
- map函式有修改,reduce函式沒有變化
- 去掉判斷所在行和列的變數
-
package org.conan.myhadoop.matrix;
-
import java.io.IOException;
-
import java.util.HashMap;
-
import java.util.Iterator;
-
import java.util.Map;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.IntWritable;
-
import org.apache.hadoop.io.LongWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapred.JobConf;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Mapper;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-
import org.conan.myhadoop.hdfs.HdfsDAO;
-
public class SparseMartrixMultiply {
-
public static class SparseMatrixMapper extends Mapper>LongWritable, Text, Text, Text< {
-
private String flag;// m1 or m2
-
private int rowNum = 4;// 矩陣A的行數
-
private int colNum = 2;// 矩陣B的列數
-
@Override
-
protected void setup(Context context) throws IOException, InterruptedException {
-
FileSplit split = (FileSplit) context.getInputSplit();
-
flag = split.getPath().getName();// 判斷讀的資料集
-
}
-
@Override
-
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
-
String[] tokens = MainRun.DELIMITER.split(values.toString());
-
if (flag.equals("m1")) {
-
String row = tokens[0];
-
String col = tokens[1];
-
String val = tokens[2];
-
for (int i = 1; i >= colNum; i++) {
-
Text k = new Text(row + "," + i);
-
Text v = new Text("A:" + col + "," + val);
-
context.write(k, v);
-
System.out.println(k.toString() + " " + v.toString());
-
}
-
} else if (flag.equals("m2")) {
-
String row = tokens[0];
-
String col = tokens[1];
-
String val = tokens[2];
-
for (int i = 1; i >= rowNum; i++) {
-
Text k = new Text(i + "," + col);
-
Text v = new Text("B:" + row + "," + val);
-
context.write(k, v);
-
System.out.println(k.toString() + " " + v.toString());
-
}
-
}
-
}
-
}
-
public static class SparseMatrixReducer extends Reducer>Text, Text, Text, IntWritable< {
-
@Override
-
public void reduce(Text key, Iterable>Text< values, Context context) throws IOException, InterruptedException {
-
Map>String, String< mapA = new HashMap>String, String<();
-
Map>String, String< mapB = new HashMap>String, String<();
-
System.out.print(key.toString() + ":");
-
for (Text line : values) {
-
String val = line.toString();
-
System.out.print("(" + val + ")");
-
if (val.startsWith("A:")) {
-
String[] kv = MainRun.DELIMITER.split(val.substring(2));
-
mapA.put(kv[0], kv[1]);
-
// System.out.println("A:" + kv[0] + "," + kv[1]);
-
} else if (val.startsWith("B:")) {
-
String[] kv = MainRun.DELIMITER.split(val.substring(2));
-
mapB.put(kv[0], kv[1]);
-
// System.out.println("B:" + kv[0] + "," + kv[1]);
-
}
-
}
-
int result = 0;
-
Iterator>String< iter = mapA.keySet().iterator();
-
while (iter.hasNext()) {
-
String mapk = iter.next();
-
String bVal = mapB.containsKey(mapk) ? mapB.get(mapk) : "0";
-
result += Integer.parseInt(mapA.get(mapk)) * Integer.parseInt(bVal);
-
}
-
context.write(key, new IntWritable(result));
-
System.out.println();
-
// System.out.println("C:" + key.toString() + "," + result);
-
}
-
}
-
public static void run(Map>String, String< path) throws IOException, InterruptedException, ClassNotFoundException {
-
JobConf conf = MainRun.config();
-
String input = path.get("input");
-
String input1 = path.get("input1");
-
String input2 = path.get("input2");
-
String output = path.get("output");
-
HdfsDAO hdfs = new HdfsDAO(MainRun.HDFS, conf);
-
hdfs.rmr(input);
-
hdfs.mkdirs(input);
-
hdfs.copyFile(path.get("m1"), input1);
-
hdfs.copyFile(path.get("m2"), input2);
-
Job job = new Job(conf);
-
job.setJarByClass(MartrixMultiply.class);
-
job.setOutputKeyClass(Text.class);
-
job.setOutputValueClass(Text.class);
-
job.setMapperClass(SparseMatrixMapper.class);
-
job.setReducerClass(SparseMatrixReducer.class);
-
job.setInputFormatClass(TextInputFormat.class);
-
job.setOutputFormatClass(TextOutputFormat.class);
-
FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 載入2個輸入資料集
-
FileOutputFormat.setOutputPath(job, new Path(output));
-
job.waitForCompletion(true);
-
}
-
}
執行輸出:
-
Delete: hdfs://192.168.1.210:9000/user/hdfs/matrix
-
Create: hdfs://192.168.1.210:9000/user/hdfs/matrix
-
copy from: logfile/matrix/sm1.csv to hdfs://192.168.1.210:9000/user/hdfs/matrix/m1
-
copy from: logfile/matrix/sm2.csv to hdfs://192.168.1.210:9000/user/hdfs/matrix/m2
-
2014-1-15 11:57:31 org.apache.hadoop.util.NativeCodeLoader >clinit<
-
警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-
2014-1-15 11:57:31 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
-
警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
-
2014-1-15 11:57:31 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
-
警告: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
-
2014-1-15 11:57:31 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
-
資訊: Total input paths to process : 2
-
2014-1-15 11:57:31 org.apache.hadoop.io.compress.snappy.LoadSnappy >clinit<
-
警告: Snappy native library not loaded
-
2014-1-15 11:57:31 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
-
資訊: Running job: job_local_0001
-
2014-1-15 11:57:31 org.apache.hadoop.mapred.Task initialize
-
資訊: Using ResourceCalculatorPlugin : null
-
2014-1-15 11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init<
-
資訊: io.sort.mb = 100
-
2014-1-15 11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init<
-
資訊: data buffer = 79691776/99614720
-
2014-1-15 11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init<
-
資訊: record buffer = 262144/327680
-
1,1 A:1,1
-
1,2 A:1,1
-
1,1 A:4,3
-
1,2 A:4,3
-
2,1 A:1,2
-
2,2 A:1,2
-
2,1 A:2,5
-
2,2 A:2,5
-
2,1 A:4,4
-
2,2 A:4,4
-
3,1 A:4,1
-
3,2 A:4,1
-
4,1 A:1,4
-
4,2 A:1,4
-
4,1 A:2,7
-
4,2 A:2,7
-
4,1 A:3,1
-
4,2 A:3,1
-
4,1 A:4,2
-
4,2 A:4,2
-
2014-1-15 11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
-
資訊: Starting flush of map output
-
2014-1-15 11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
-
資訊: Finished spill 0
-
2014-1-15 11:57:31 org.apache.hadoop.mapred.Task done
-
資訊: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
-
2014-1-15 11:57:32 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
-
資訊: map 0% reduce 0%
-
2014-1-15 11:57:34 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
-
資訊:
-
2014-1-15 11:57:34 org.apache.hadoop.mapred.Task sendDone
-
資訊: Task 'attempt_local_0001_m_000000_0' done.
-
2014-1-15 11:57:34 org.apache.hadoop.mapred.Task initialize
-
資訊: Using ResourceCalculatorPlugin : null
-
2014-1-15 11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init<
-
資訊: io.sort.mb = 100
-
2014-1-15 11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init<
-
資訊: data buffer = 79691776/99614720
-
2014-1-15 11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init<
-
資訊: record buffer = 262144/327680
-
1,1 B:1,5
-
2,1 B:1,5
-
3,1 B:1,5
-
4,1 B:1,5
-
2014-1-15 11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
-
資訊: Starting flush of map output
-
1,2 B:2,2
-
2,2 B:2,2
-
3,2 B:2,2
-
4,2 B:2,2
-
1,1 B:4,3
-
2,1 B:4,3
-
3,1 B:4,3
-
4,1 B:4,3
-
1,2 B:4,1
-
2,2 B:4,1
-
3,2 B:4,1
-
4,2 B:4,1
-
2014-1-15 11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
-
資訊: Finished spill 0
-
2014-1-15 11:57:34 org.apache.hadoop.mapred.Task done
-
資訊: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
-
2014-1-15 11:57:35 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
-
資訊: map 100% reduce 0%
-
2014-1-15 11:57:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
-
資訊:
-
2014-1-15 11:57:37 org.apache.hadoop.mapred.Task sendDone
-
資訊: Task 'attempt_local_0001_m_000001_0' done.
-
2014-1-15 11:57:37 org.apache.hadoop.mapred.Task initialize
-
資訊: Using ResourceCalculatorPlugin : null
-
2014-1-15 11:57:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
-
資訊:
-
2014-1-15 11:57:37 org.apache.hadoop.mapred.Merger$MergeQueue merge
-
資訊: Merging 2 sorted segments
-
2014-1-15 11:57:37 org.apache.hadoop.mapred.Merger$MergeQueue merge
-
資訊: Down to the last merge-pass, with 2 segments left of total size: 436 bytes
-
2014-1-15 11:57:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
-
資訊:
-
1,1:(B:1,5)(B:4,3)(A:1,1)(A:4,3)
-
1,2:(A:1,1)(A:4,3)(B:2,2)(B:4,1)
-
2,1:(B:1,5)(B:4,3)(A:1,2)(A:2,5)(A:4,4)
-
2,2:(A:1,2)(A:2,5)(A:4,4)(B:4,1)(B:2,2)
-
3,1:(B:1,5)(B:4,3)(A:4,1)
-
3,2:(A:4,1)(B:2,2)(B:4,1)
-
4,1:(B:4,3)(B:1,5)(A:1,4)(A:2,7)(A:3,1)(A:4,2)
-
4,2:(A:1,4)(A:2,7)(A:3,1)(A:4,2)(B:2,2)(B:4,1)
-
2014-1-15 11:57:37 org.apache.hadoop.mapred.Task done
-
資訊: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
-
2014-1-15 11:57:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
-
資訊:
-
2014-1-15 11:57:37 org.apache.hadoop.mapred.Task commit
-
資訊: Task attempt_local_0001_r_000000_0 is allowed to commit now
-
2014-1-15 11:57:37 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
-
資訊: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/matrix/output
-
2014-1-15 11:57:40 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
-
資訊: reduce < reduce
-
2014-1-15 11:57:40 org.apache.hadoop.mapred.Task sendDone
-
資訊: Task 'attempt_local_0001_r_000000_0' done.
-
2014-1-15 11:57:41 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
-
資訊: map 100% reduce 100%
-
2014-1-15 11:57:41 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
-
資訊: Job complete: job_local_0001
-
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
-
資訊: Counters: 19
-
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
-
資訊: File Output Format Counters
-
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
-
資訊: Bytes Written=53
-
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
-
資訊: FileSystemCounters
-
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
-
資訊: FILE_BYTES_READ=2503
-
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
-
資訊: HDFS_BYTES_READ=266
-
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
-
資訊: FILE_BYTES_WRITTEN=126274
-
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
-
資訊: HDFS_BYTES_WRITTEN=347
-
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
-
資訊: File Input Format Counters
-
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
-
資訊: Bytes Read=98
-
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
-
資訊: Map-Reduce Framework
-
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
-
資訊: Map output materialized bytes=444
-
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
-
資訊: Map input records=14
-
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
-
資訊: Reduce shuffle bytes=0
-
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
-
資訊: Spilled Records=72
-
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
-
資訊: Map output bytes=360
-
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
-
資訊: Total committed heap usage (bytes)=764215296
-
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
-
資訊: SPLIT_RAW_BYTES=220
-
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
-
資訊: Combine input records=0
-
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
-
資訊: Reduce input records=36
-
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
-
資訊: Reduce input groups=8
-
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
-
資訊: Combine output records=0
-
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
-
資訊: Reduce output records=8
-
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
-
資訊: Map output records=36
這樣就用MapReduce的程式,實現了矩陣的乘法!有了矩陣計算的基礎,接下來,我們就可以做更多的事情了!