Hadoop/MapReduce 及 Spark KMeans聚類演算法實現
阿新 • • 發佈:2019-02-07
package kmeans; import java.io.BufferedReader; import java.io.DataInput; import java.io.DataOutput; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableFactories; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.google.gson.Gson; /*** * KMeans演算法的MapReduce實現 * @author chenjie */ public class KMeans extends Configured implements Tool { /** * 要聚類的簇數量 */ public static int K = 3; /*** * 迭代次數 */ public static int REPEAT = 10; /*** * 標記是否是第一次迭代(第一次從輸入檔案裡隨機選擇聚類中心;其他次則從上一次的輸出檔案讀取聚類中心) */ public static boolean firstTime = true; /** * 輸入檔名 */ public static String FILE = "/media/chenjie/0009418200012FF3/ubuntu/kmeans_input_file.txt"; /*** * 輸出資料夾 */ public static String REDUCE_OUTPUT_DIR = "/media/chenjie/0009418200012FF3/ubuntu/kmeans/"; /*** * 輸出檔案 */ public static String REDUCE_OUTPUT = REDUCE_OUTPUT_DIR + "part-r-00000"; /*** * 快取的簇中心集合 */ public static List<ArrayList<Double>> cachedCenters = new ArrayList<ArrayList<Double>>(); /*** * 從檔案中讀取簇中心向量集合 * @param path 檔案路徑 * @param K 中心點個數 * @return 從檔案中讀取簇中心向量集合 */ private static List<ArrayList<Double>> readRandomCenterFromInputFile(String path,int K) { List<ArrayList<Double>> list = new ArrayList<ArrayList<Double>>(); try{ BufferedReader br = new BufferedReader(new FileReader(path));//構造一個BufferedReader類來讀取檔案 String s = null; int count = 0;//記錄已經讀取到的點的個數 while((s = br.readLine())!=null && count < K){//使用readLine方法,一次讀一行 System.out.println("readRandomCenterFromInputFile讀取一行:" + s); count ++; String tokens[] = s.split(" ");//輸入檔案中,點的分量座標以空格隔開 ArrayList<Double> vector = new ArrayList<Double>();//點的分量集合中 for(String token : tokens) { vector.add(Double.valueOf(token));//將點的各個分量座標存到點的分量集合中 } list.add(vector);//將點新增到點集合 } br.close(); }catch(Exception e){ e.printStackTrace(); return list; } return list; } /*** * 對映器,將文字檔案作為輸入。 * 寫出將由規約器處理的鍵值對,其中鍵是離輸入點最近的簇中心,值是一個d維向量。鍵和值都用自定義型別ListWritable表示 * @author chenjie */ public static class KMeansMapper extends Mapper<LongWritable, Text, ListWritable, ListWritable> { /*** * 在map之前呼叫,從檔案中讀取簇中心向量集合從而載入到記憶體中 */ @Override protected void setup( Mapper<LongWritable, Text, ListWritable, ListWritable>.Context context)throws IOException, InterruptedException { super.setup(context); if(firstTime)//如果是第一次迭代 { KMeans.cachedCenters = readRandomCenterFromInputFile(FILE,K);//從輸入檔案中得到隨機K個點 firstTime = false;//不再是第一次迭代 } System.out.println("----------setup------------"); System.out.println("----------centers------------"); for(ArrayList<Double> vector : cachedCenters) { System.out.println(vector);//輸出各個點的座標 } } /*** * key為行號,value為每一行的內容,即每一個點的座標。context為hadoop上下文 */ @Override protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException { System.out.println("map value=" + value.toString()); ArrayList<Double> valueVector = getVectorFromString(value.toString());//得到這行對應的這個點的座標 System.out.println("valueVector=" + valueVector.toString()); ArrayList<Double> nearest = null;//儲存與輸入點有最小距離的簇中心的座標 double nearestDistance = Double.MAX_VALUE;//儲存這個點到各個簇中心的最近距離 for(ArrayList<Double> center : cachedCenters)//對於每個簇中心 { double distance = calculateDistance(center,valueVector);//計算這個點到這個簇中心的距離 if(nearest == null)//如果之前沒有與輸入點有最小距離的簇中心,則這個簇中心是目前與輸入點有最小距離的簇中心 { nearest = center;//更新與輸入點有最小距離的簇中心 nearestDistance = distance;//更新這個點到各個簇中心的最近距離 } else//如果之前有與輸入點有最小距離的簇中心,則將[這個點到這個簇中心的距離]與[這個點到各個簇中心的最近距離]進行比較 { if(distance < nearestDistance )//[這個點到這個簇中心的距離]比[這個點到各個簇中心的最近距離]還要小,則說明發現新的簇中心,要更新 { nearest = center; nearestDistance = distance; } } } if(nearest != null)//與輸入點有最小距離的簇中心存在,則將其輸出給combine處理 { List<Writable> nearestWritableList = new ArrayList<Writable>(); //由於List<Double>不能作為MapReduce的鍵、值型別,因此要自定義一個List<Writable>型別 for(Double d : nearest) { nearestWritableList.add(new DoubleWritable(d));//講簇中心的各個分量進行DoubleWritable包裝 } ListWritable outputkey = new ListWritable(nearestWritableList); List<Writable> valueWritableList = new ArrayList<Writable>(); for(Double d : valueVector) { valueWritableList.add(new DoubleWritable(d)); } ListWritable outputvalue = new ListWritable(valueWritableList); System.out.println("map 生成:" + outputkey + "," + outputvalue); context.write(outputkey, outputvalue); } } /** * * @param vector1 向量1:(X1,X2,...) * @param vector2 向量2:(Y1,Y2,...) * @return 計算兩個向量的歐幾里德距離:d=sqrt((X1-Y1)^2+(X2-Y2)^2+...) */ private double calculateDistance(ArrayList<Double> vector1, ArrayList<Double> vector2) { double sum = 0.0; int length = vector1.size(); for(int i=0;i<length;i++) { sum += Math.pow((vector1.get(i)-vector2.get(i)), 2); } return Math.sqrt(sum); } /** * @param string 將字串轉為向量 * @return 向量 */ private ArrayList<Double> getVectorFromString(String string) { String tokens[] = string.split(" "); ArrayList<Double> vector = new ArrayList<Double>(); for(String value : tokens) { vector.add(Double.valueOf(value)); } return vector; } } /*** * 組合器,組合對映任務的中間資料 * 累加向量各個維的值 * @author chenjie */ public static class KMeansCombiner extends Reducer<ListWritable, ListWritable, ListWritable, ListWritable> { @Override protected void reduce(ListWritable key,Iterable<ListWritable> values,Context context) throws IOException, InterruptedException { System.out.println("----------------------KMeansCombiner---------------------"); System.out.println("key=" + key); System.out.println("values:" ); ArrayList<Double> sum = new ArrayList<Double>(); //sum向量用來儲存key值相同的所有value的向量分量之和 //sum0=x0+y0 //sum1=x1+y1 sum.add(0D); sum.add(0D); int count = 0;//儲存values的長度 for(ListWritable value : values) { count ++; System.out.println("value=" + value); if(value.get().isEmpty()) continue; List<Writable> writables = value.get(); for(int i=0;i<writables.size();i++) { DoubleWritable dw = (DoubleWritable) writables.get(i); sum.set(i, sum.get(i)+dw.get()); } } List<Writable> sumWritableList = new ArrayList<Writable>(); for(Double d : sum) { sumWritableList.add(new DoubleWritable(d / count));//將各個分量取平均值 } System.out.println("sumWritableList=" + sumWritableList); ListWritable outputValue = new ListWritable(sumWritableList); context.write(key, outputValue); } } public static class KMeansReducer extends Reducer<ListWritable, ListWritable, ListWritable, NullWritable> { @Override protected void reduce(ListWritable key,Iterable<ListWritable> values,Context context)throws IOException, InterruptedException { System.out.println("----------------------reduce---------------------"); System.out.println("key=" + key); System.out.println("values:"); ArrayList<Double> newCenter = new ArrayList<Double>();//新簇中心座標分量集合 newCenter.add(0D);//初始化為0 newCenter.add(0D);//初始化為0 int count = 0; for(ListWritable value : values) { System.out.println(value); count ++; for(int i=0;i<value.get().size();i++) { DoubleWritable dw = (DoubleWritable) value.get().get(i); newCenter.set(i, newCenter.get(i)+dw.get()); } } for(int i=0;i<key.get().size();i++) { newCenter.set(i, newCenter.get(i).doubleValue()/count); } List<Writable> newCenterWritableList = new ArrayList<Writable>(); for(Double d : newCenter) { newCenterWritableList.add(new DoubleWritable(d)); } ListWritable outputValue = new ListWritable(newCenterWritableList); System.out.println("reduce生成:" + key + "|" + outputValue); context.write(outputValue,NullWritable.get() ); } } public static void main(String[] args) throws Exception { getCJKMeansConf(); args = new String[2]; args[0] = FILE; args[1] = REDUCE_OUTPUT_DIR; while(REPEAT > 0) { int jobStatus = submitJob(args); if(jobStatus == 0) { KMeans.cachedCenters = readRandomCenterFromInputFile(REDUCE_OUTPUT,K);//每次reduce結束後,將reduce的結果快取起來 } REPEAT --; } System.out.println("----------------------------------------KMeans聚類結果--------------------------------------"); for(ArrayList<Double> point : KMeans.cachedCenters) { System.out.println(point); } } public static int submitJob(String[] args) throws Exception { int jobStatus = ToolRunner.run(new KMeans(), args); return jobStatus; } @SuppressWarnings("deprecation") @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = new Job(conf); job.setJobName("Kmeans"); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.class); job.setOutputKeyClass(ListWritable.class); job.setOutputValueClass(ListWritable.class); job.setMapOutputKeyClass(ListWritable.class); job.setMapOutputValueClass(ListWritable.class); job.setMapperClass(KMeansMapper.class); job.setReducerClass(KMeansReducer.class); job.setCombinerClass(KMeansCombiner.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); FileSystem fs = FileSystem.get(conf); Path outPath = new Path(args[1]); if(fs.exists(outPath)) { fs.delete(outPath, true); } boolean status = job.waitForCompletion(true); return status ? 0 : 1; } /*** * 自定義向量類,可以作為MapReduce的鍵和值 * @author chenjie */ public static class ListWritable implements Writable , WritableComparable<ListWritable>{ private Class<? extends Writable> valueClass; @SuppressWarnings("rawtypes") private Class<? extends List> listClass; private List<Writable> values; public ListWritable() { } public ListWritable(List<Writable> values) { listClass = values.getClass(); valueClass = values.get(0).getClass(); this.values = values; } public Class<? extends Writable> getValueClass() { return valueClass; } @SuppressWarnings("rawtypes") public Class<? extends List> getListClass() { return listClass; } public void set(List<Writable> values) { this.values = values; } public List<Writable> get() { return values; } @SuppressWarnings({ "unchecked", "rawtypes" }) public void readFields(DataInput in) throws IOException { String listClass = in.readUTF(); try { this.listClass = (Class<? extends List>) Class.forName(listClass); String valueClass = in.readUTF(); this.valueClass = (Class<? extends Writable>) Class .forName(valueClass); } catch (ClassNotFoundException e1) { e1.printStackTrace(); } int size = in.readInt(); // construct values try { values = this.listClass.newInstance(); } catch (InstantiationException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } for (int i = 0; i < size; i++) { Writable value = WritableFactories.newInstance(this.valueClass); value.readFields(in); // read a value values.add(value); // store it in values } } public void write(DataOutput out) throws IOException { out.writeUTF(listClass.getName()); out.writeUTF(valueClass.getName()); out.writeInt(values.size()); // write values Iterator<Writable> iterator = values.iterator(); while (iterator.hasNext()) { iterator.next().write(out); } } public int size() { return values.size(); } public boolean isEmpty() { return values==null? true :false; } @Override public int compareTo(ListWritable o) { int flag = 0; for(int i=0;i<values.size() && i < o.size();i++) { DoubleWritable dw1 = (DoubleWritable) values.get(i); DoubleWritable dw2 = (DoubleWritable) o.get().get(i); if(Double.compare(dw1.get(), dw2.get()) == 1) { flag =1; break; } else if(Double.compare(dw1.get(), dw2.get()) == -1) { flag =-1; break; } } return flag; } @Override public String toString() { String str = ""; for(Writable w : values) { str += w + " "; } return str.trim(); } } /*** * 從json文字檔案中讀取配置 */ public static void getCJKMeansConf() { System.out.println("--------------------------------------------------------"); File file = new File("cj_kmeans_conf.json"); if(file.exists()) { StringBuilder sb = new StringBuilder(); try{ BufferedReader br = new BufferedReader(new FileReader(file));//構造一個BufferedReader類來讀取檔案 String s = null; while((s = br.readLine())!=null){//使用readLine方法,一次讀一行 System.out.println("getCJKMeansConf讀取一行:" + s); sb.append(s); } br.close(); Gson gson = new Gson(); CJKMeansConf conf = gson.fromJson(sb.toString(), CJKMeansConf.class); System.out.println(conf); KMeans.K = conf.getK(); KMeans.REPEAT = conf.getRepeat(); KMeans.FILE = conf.getInputFile(); KMeans.REDUCE_OUTPUT_DIR = conf.getOutputDir(); KMeans.REDUCE_OUTPUT = KMeans.REDUCE_OUTPUT_DIR + "part-r-00000"; }catch(Exception e){ e.printStackTrace(); } } } /*** * 封裝配置,以便打包成jar包後能夠更改配置 * @author chenjie * */ public static class CJKMeansConf { private int k; private int repeat; private String inputFile; private String outputDir; public int getK() { return k; } public void setK(int k) { this.k = k; } public int getRepeat() { return repeat; } public void setRepeat(int repeat) { this.repeat = repeat; } public String getInputFile() { return inputFile; } public void setInputFile(String inputFile) { this.inputFile = inputFile; } public String getOutputDir() { return outputDir; } public void setOutputDir(String outputDir) { this.outputDir = outputDir; } @Override public String toString() { return "CJKMeansConf [k=" + k + ", repeat=" + repeat + ", inputFile=" + inputFile + ", outputDir=" + outputDir + "]"; } } }
輸入:kmeans_input_file.txt
1.0 2.0 1.0 3.0 1.0 4.0 2.0 5.0 2.0 6.0 2.0 7.0 2.0 8.0 3.0 100.0 3.0 101.0 3.0 102.0 3.0 103.0 3.0 104.0輸出:
2017-11-18 13:40:59,061 INFO [localfetcher#4] reduce.LocalFetcher (LocalFetcher.java:copyMapOutput(141)) - localfetcher#4 about to shuffle output of map attempt_local1325636158_0004_m_000000_0 decomp: 476 len: 480 to MEMORY 2017-11-18 13:40:59,061 INFO [localfetcher#4] reduce.InMemoryMapOutput (InMemoryMapOutput.java:shuffle(100)) - Read 476 bytes from map-output for attempt_local1325636158_0004_m_000000_0 2017-11-18 13:40:59,061 INFO [localfetcher#4] reduce.MergeManagerImpl (MergeManagerImpl.java:closeInMemoryFile(315)) - closeInMemoryFile -> map-output of size: 476, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->476 2017-11-18 13:40:59,062 INFO [EventFetcher for fetching Map Completion Events] reduce.EventFetcher (EventFetcher.java:run(76)) - EventFetcher is interrupted.. Returning 2017-11-18 13:40:59,062 INFO [pool-13-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 1 / 1 copied. 2017-11-18 13:40:59,063 INFO [pool-13-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(687)) - finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs 2017-11-18 13:40:59,064 INFO [pool-13-thread-1] mapred.Merger (Merger.java:merge(597)) - Merging 1 sorted segments 2017-11-18 13:40:59,064 INFO [pool-13-thread-1] mapred.Merger (Merger.java:merge(696)) - Down to the last merge-pass, with 1 segments left of total size: 396 bytes 2017-11-18 13:40:59,064 INFO [pool-13-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(754)) - Merged 1 segments, 476 bytes to disk to satisfy reduce memory limit 2017-11-18 13:40:59,065 INFO [pool-13-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(784)) - Merging 1 files, 480 bytes from disk 2017-11-18 13:40:59,065 INFO [pool-13-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(799)) - Merging 0 segments, 0 bytes from memory into reduce 2017-11-18 13:40:59,065 INFO [pool-13-thread-1] mapred.Merger (Merger.java:merge(597)) - Merging 1 sorted segments 2017-11-18 13:40:59,066 INFO [pool-13-thread-1] mapred.Merger (Merger.java:merge(696)) - Down to the last merge-pass, with 1 segments left of total size: 396 bytes 2017-11-18 13:40:59,066 INFO [pool-13-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 1 / 1 copied. ----------------------reduce--------------------- key=1.0 2.5 values: 1.0 3.0 reduce生成:1.0 2.5|1.0 3.0 ----------------------reduce--------------------- key=1.8 6.0 values: 2.0 6.5 reduce生成:1.8 6.0|2.0 6.5 ----------------------reduce--------------------- key=3.0 102.0 values: 3.0 102.0 reduce生成:3.0 102.0|3.0 102.0 2017-11-18 13:40:59,073 INFO [pool-13-thread-1] mapred.Task (Task.java:done(1001)) - Task:attempt_local1325636158_0004_r_000000_0 is done. And is in the process of committing 2017-11-18 13:40:59,075 INFO [pool-13-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 1 / 1 copied. 2017-11-18 13:40:59,075 INFO [pool-13-thread-1] mapred.Task (Task.java:commit(1162)) - Task attempt_local1325636158_0004_r_000000_0 is allowed to commit now 2017-11-18 13:40:59,077 INFO [pool-13-thread-1] output.FileOutputCommitter (FileOutputCommitter.java:commitTask(439)) - Saved output of task 'attempt_local1325636158_0004_r_000000_0' to file:/media/chenjie/0009418200012FF3/ubuntu/kmeans/_temporary/0/task_local1325636158_0004_r_000000 2017-11-18 13:40:59,077 INFO [pool-13-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - reduce > reduce 2017-11-18 13:40:59,078 INFO [pool-13-thread-1] mapred.Task (Task.java:sendDone(1121)) - Task 'attempt_local1325636158_0004_r_000000_0' done. 2017-11-18 13:40:59,078 INFO [pool-13-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:run(325)) - Finishing task: attempt_local1325636158_0004_r_000000_0 2017-11-18 13:40:59,078 INFO [Thread-101] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - reduce task executor complete. 2017-11-18 13:40:59,968 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1367)) - Job job_local1325636158_0004 running in uber mode : false 2017-11-18 13:40:59,969 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1374)) - map 100% reduce 100% 2017-11-18 13:40:59,970 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1385)) - Job job_local1325636158_0004 completed successfully 2017-11-18 13:40:59,979 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1392)) - Counters: 33 File System Counters FILE: Number of bytes read=9264 FILE: Number of bytes written=2077542 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 Map-Reduce Framework Map input records=12 Map output records=12 Map output bytes=1872 Map output materialized bytes=480 Input split bytes=130 Combine input records=12 Combine output records=3 Reduce input groups=3 Reduce shuffle bytes=480 Reduce input records=3 Reduce output records=3 Spilled Records=6 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=0 CPU time spent (ms)=0 Physical memory (bytes) snapshot=0 Virtual memory (bytes) snapshot=0 Total committed heap usage (bytes)=1292894208 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=106 File Output Format Counters Bytes Written=38 readRandomCenterFromInputFile讀取一行:1.0 3.0 readRandomCenterFromInputFile讀取一行:2.0 6.5 readRandomCenterFromInputFile讀取一行:3.0 102.0 2017-11-18 13:41:00,004 INFO [main] jvm.JvmMetrics (JvmMetrics.java:init(71)) - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized 2017-11-18 13:41:00,011 WARN [main] mapreduce.JobResourceUploader (JobResourceUploader.java:uploadFiles(171)) - No job jar file set. User classes may not be found. See Job or Job#setJar(String). 2017-11-18 13:41:00,012 INFO [main] input.FileInputFormat (FileInputFormat.java:listStatus(281)) - Total input paths to process : 1 2017-11-18 13:41:00,023 INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(199)) - number of splits:1 2017-11-18 13:41:00,034 INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:printTokens(288)) - Submitting tokens for job: job_local1998692281_0005 2017-11-18 13:41:00,098 INFO [main] mapreduce.Job (Job.java:submit(1301)) - The url to track the job: http://localhost:8080/ 2017-11-18 13:41:00,098 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1346)) - Running job: job_local1998692281_0005 2017-11-18 13:41:00,098 INFO [Thread-128] mapred.LocalJobRunner (LocalJobRunner.java:createOutputCommitter(471)) - OutputCommitter set in config null 2017-11-18 13:41:00,100 INFO [Thread-128] mapred.LocalJobRunner (LocalJobRunner.java:createOutputCommitter(489)) - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 2017-11-18 13:41:00,102 INFO [Thread-128] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(448)) - Waiting for map tasks 2017-11-18 13:41:00,102 INFO [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:run(224)) - Starting task: attempt_local1998692281_0005_m_000000_0 2017-11-18 13:41:00,103 INFO [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:initialize(587)) - Using ResourceCalculatorProcessTree : [ ] 2017-11-18 13:41:00,104 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:runNewMapper(753)) - Processing split: file:/media/chenjie/0009418200012FF3/ubuntu/kmeans_input_file.txt:0+106 2017-11-18 13:41:00,167 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:setEquator(1202)) - (EQUATOR) 0 kvi 26214396(104857584) 2017-11-18 13:41:00,167 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(995)) - mapreduce.task.io.sort.mb: 100 2017-11-18 13:41:00,167 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(996)) - soft limit at 83886080 2017-11-18 13:41:00,167 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(997)) - bufstart = 0; bufvoid = 104857600 2017-11-18 13:41:00,167 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(998)) - kvstart = 26214396; length = 6553600 2017-11-18 13:41:00,168 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:createSortingCollector(402)) - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer ----------setup------------ ----------centers------------ [1.0, 3.0] [2.0, 6.5] [3.0, 102.0] map value=1.0 2.0 valueVector=[1.0, 2.0] map 生成:1.0 3.0,1.0 2.0 map value=1.0 3.0 valueVector=[1.0, 3.0] map 生成:1.0 3.0,1.0 3.0 map value=1.0 4.0 valueVector=[1.0, 4.0] map 生成:1.0 3.0,1.0 4.0 map value=2.0 5.0 valueVector=[2.0, 5.0] map 生成:2.0 6.5,2.0 5.0 map value=2.0 6.0 valueVector=[2.0, 6.0] map 生成:2.0 6.5,2.0 6.0 map value=2.0 7.0 valueVector=[2.0, 7.0] map 生成:2.0 6.5,2.0 7.0 map value=2.0 8.0 valueVector=[2.0, 8.0] map 生成:2.0 6.5,2.0 8.0 map value=3.0 100.0 valueVector=[3.0, 100.0] map 生成:3.0 102.0,3.0 100.0 map value=3.0 101.0 valueVector=[3.0, 101.0] map 生成:3.0 102.0,3.0 101.0 map value=3.0 102.0 valueVector=[3.0, 102.0] map 生成:3.0 102.0,3.0 102.0 map value=3.0 103.0 valueVector=[3.0, 103.0] map 生成:3.0 102.0,3.0 103.0 map value=3.0 104.0 valueVector=[3.0, 104.0] map 生成:3.0 102.0,3.0 104.0 2017-11-18 13:41:00,171 INFO [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 2017-11-18 13:41:00,172 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1457)) - Starting flush of map output 2017-11-18 13:41:00,172 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1475)) - Spilling map output 2017-11-18 13:41:00,172 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1476)) - bufstart = 0; bufend = 1872; bufvoid = 104857600 2017-11-18 13:41:00,172 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1478)) - kvstart = 26214396(104857584); kvend = 26214352(104857408); length = 45/6553600 ----------------------KMeansCombiner--------------------- key=1.0 3.0 values: value=1.0 4.0 value=1.0 3.0 value=1.0 2.0 sumWritableList=[1.0, 3.0] ----------------------KMeansCombiner--------------------- key=2.0 6.5 values: value=2.0 8.0 value=2.0 7.0 value=2.0 6.0 value=2.0 5.0 sumWritableList=[2.0, 6.5] ----------------------KMeansCombiner--------------------- key=3.0 102.0 values: value=3.0 104.0 value=3.0 103.0 value=3.0 102.0 value=3.0 101.0 value=3.0 100.0 sumWritableList=[3.0, 102.0] 2017-11-18 13:41:00,176 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:sortAndSpill(1660)) - Finished spill 0 2017-11-18 13:41:00,179 INFO [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:done(1001)) - Task:attempt_local1998692281_0005_m_000000_0 is done. And is in the process of committing 2017-11-18 13:41:00,184 INFO [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - map 2017-11-18 13:41:00,184 INFO [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:sendDone(1121)) - Task 'attempt_local1998692281_0005_m_000000_0' done. 2017-11-18 13:41:00,184 INFO [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:run(249)) - Finishing task: attempt_local1998692281_0005_m_000000_0 2017-11-18 13:41:00,184 INFO [Thread-128] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - map task executor complete. 2017-11-18 13:41:00,185 INFO [Thread-128] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(448)) - Waiting for reduce tasks 2017-11-18 13:41:00,185 INFO [pool-16-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:run(302)) - Starting task: attempt_local1998692281_0005_r_000000_0 2017-11-18 13:41:00,186 INFO [pool-16-thread-1] mapred.Task (Task.java:initialize(587)) - Using ResourceCalculatorProcessTree : [ ] 2017-11-18 13:41:00,187 INFO [pool-16-thread-1] mapred.ReduceTask (ReduceTask.java:run(362)) - Using ShuffleConsumerPlugin:[email protected] 2017-11-18 13:41:00,187 INFO [pool-16-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:<init>(197)) - MergerManager: memoryLimit=1283037568, maxSingleShuffleLimit=320759392, mergeThreshold=846804800, ioSortFactor=10, memToMemMergeOutputsThreshold=10 2017-11-18 13:41:00,188 INFO [EventFetcher for fetching Map Completion Events] reduce.EventFetcher (EventFetcher.java:run(61)) - attempt_local1998692281_0005_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events 2017-11-18 13:41:00,189 INFO [localfetcher#5] reduce.LocalFetcher (LocalFetcher.java:copyMapOutput(141)) - localfetcher#5 about to shuffle output of map attempt_local1998692281_0005_m_000000_0 decomp: 476 len: 480 to MEMORY 2017-11-18 13:41:00,189 INFO [localfetcher#5] reduce.InMemoryMapOutput (InMemoryMapOutput.java:shuffle(100)) - Read 476 bytes from map-output for attempt_local1998692281_0005_m_000000_0 2017-11-18 13:41:00,190 INFO [localfetcher#5] reduce.MergeManagerImpl (MergeManagerImpl.java:closeInMemoryFile(315)) - closeInMemoryFile -> map-output of size: 476, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->476 2017-11-18 13:41:00,190 INFO [EventFetcher for fetching Map Completion Events] reduce.EventFetcher (EventFetcher.java:run(76)) - EventFetcher is interrupted.. Returning 2017-11-18 13:41:00,190 INFO [pool-16-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 1 / 1 copied. 2017-11-18 13:41:00,191 INFO [pool-16-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(687)) - finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs 2017-11-18 13:41:00,193 INFO [pool-16-thread-1] mapred.Merger (Merger.java:merge(597)) - Merging 1 sorted segments 2017-11-18 13:41:00,194 INFO [pool-16-thread-1] mapred.Merger (Merger.java:merge(696)) - Down to the last merge-pass, with 1 segments left of total size: 396 bytes 2017-11-18 13:41:00,194 INFO [pool-16-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(754)) - Merged 1 segments, 476 bytes to disk to satisfy reduce memory limit 2017-11-18 13:41:00,195 INFO [pool-16-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(784)) - Merging 1 files, 480 bytes from disk 2017-11-18 13:41:00,195 INFO [pool-16-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(799)) - Merging 0 segments, 0 bytes from memory into reduce 2017-11-18 13:41:00,195 INFO [pool-16-thread-1] mapred.Merger (Merger.java:merge(597)) - Merging 1 sorted segments 2017-11-18 13:41:00,195 INFO [pool-16-thread-1] mapred.Merger (Merger.java:merge(696)) - Down to the last merge-pass, with 1 segments left of total size: 396 bytes 2017-11-18 13:41:00,196 INFO [pool-16-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 1 / 1 copied. ----------------------reduce--------------------- key=1.0 3.0 values: 1.0 3.0 reduce生成:1.0 3.0|1.0 3.0 ----------------------reduce--------------------- key=2.0 6.5 values: 2.0 6.5 reduce生成:2.0 6.5|2.0 6.5 ----------------------reduce--------------------- key=3.0 102.0 values: 3.0 102.0 reduce生成:3.0 102.0|3.0 102.0 2017-11-18 13:41:00,203 INFO [pool-16-thread-1] mapred.Task (Task.java:done(1001)) - Task:attempt_local1998692281_0005_r_000000_0 is done. And is in the process of committing 2017-11-18 13:41:00,204 INFO [pool-16-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 1 / 1 copied. 2017-11-18 13:41:00,204 INFO [pool-16-thread-1] mapred.Task (Task.java:commit(1162)) - Task attempt_local1998692281_0005_r_000000_0 is allowed to commit now 2017-11-18 13:41:00,206 INFO [pool-16-thread-1] output.FileOutputCommitter (FileOutputCommitter.java:commitTask(439)) - Saved output of task 'attempt_local1998692281_0005_r_000000_0' to file:/media/chenjie/0009418200012FF3/ubuntu/kmeans/_temporary/0/task_local1998692281_0005_r_000000 2017-11-18 13:41:00,207 INFO [pool-16-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - reduce > reduce 2017-11-18 13:41:00,207 INFO [pool-16-thread-1] mapred.Task (Task.java:sendDone(1121)) - Task 'attempt_local1998692281_0005_r_000000_0' done. 2017-11-18 13:41:00,207 INFO [pool-16-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:run(325)) - Finishing task: attempt_local1998692281_0005_r_000000_0 2017-11-18 13:41:00,207 INFO [Thread-128] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - reduce task executor complete. 2017-11-18 13:41:01,099 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1367)) - Job job_local1998692281_0005 running in uber mode : false 2017-11-18 13:41:01,099 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1374)) - map 100% reduce 100% 2017-11-18 13:41:01,100 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1385)) - Job job_local1998692281_0005 completed successfully 2017-11-18 13:41:01,106 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1392)) - Counters: 33 File System Counters FILE: Number of bytes read=11828 FILE: Number of bytes written=2598434 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 Map-Reduce Framework Map input records=12 Map output records=12 Map output bytes=1872 Map output materialized bytes=480 Input split bytes=130 Combine input records=12 Combine output records=3 Reduce input groups=3 Reduce shuffle bytes=480 Reduce input records=3 Reduce output records=3 Spilled Records=6 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=0 CPU time spent (ms)=0 Physical memory (bytes) snapshot=0 Virtual memory (bytes) snapshot=0 Total committed heap usage (bytes)=1503657984 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=106 File Output Format Counters Bytes Written=38 readRandomCenterFromInputFile讀取一行:1.0 3.0 readRandomCenterFromInputFile讀取一行:2.0 6.5 readRandomCenterFromInputFile讀取一行:3.0 102.0 2017-11-18 13:41:01,148 INFO [main] jvm.JvmMetrics (JvmMetrics.java:init(71)) - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized 2017-11-18 13:41:01,160 WARN [main] mapreduce.JobResourceUploader (JobResourceUploader.java:uploadFiles(171)) - No job jar file set. User classes may not be found. See Job or Job#setJar(String). 2017-11-18 13:41:01,164 INFO [main] input.FileInputFormat (FileInputFormat.java:listStatus(281)) - Total input paths to process : 1 2017-11-18 13:41:01,179 INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(199)) - number of splits:1 2017-11-18 13:41:01,195 INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:printTokens(288)) - Submitting tokens for job: job_local2141033359_0006 2017-11-18 13:41:01,278 INFO [main] mapreduce.Job (Job.java:submit(1301)) - The url to track the job: http://localhost:8080/ 2017-11-18 13:41:01,278 INFO [Thread-155] mapred.LocalJobRunner (LocalJobRunner.java:createOutputCommitter(471)) - OutputCommitter set in config null 2017-11-18 13:41:01,278 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1346)) - Running job: job_local2141033359_0006 2017-11-18 13:41:01,279 INFO [Thread-155] mapred.LocalJobRunner (LocalJobRunner.java:createOutputCommitter(489)) - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 2017-11-18 13:41:01,286 INFO [Thread-155] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(448)) - Waiting for map tasks 2017-11-18 13:41:01,286 INFO [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:run(224)) - Starting task: attempt_local2141033359_0006_m_000000_0 2017-11-18 13:41:01,288 INFO [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:initialize(587)) - Using ResourceCalculatorProcessTree : [ ] 2017-11-18 13:41:01,288 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:runNewMapper(753)) - Processing split: file:/media/chenjie/0009418200012FF3/ubuntu/kmeans_input_file.txt:0+106 2017-11-18 13:41:01,354 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:setEquator(1202)) - (EQUATOR) 0 kvi 26214396(104857584) 2017-11-18 13:41:01,354 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(995)) - mapreduce.task.io.sort.mb: 100 2017-11-18 13:41:01,355 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(996)) - soft limit at 83886080 2017-11-18 13:41:01,355 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(997)) - bufstart = 0; bufvoid = 104857600 2017-11-18 13:41:01,355 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(998)) - kvstart = 26214396; length = 6553600 2017-11-18 13:41:01,356 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:createSortingCollector(402)) - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer ----------setup------------ ----------centers------------ [1.0, 3.0] [2.0, 6.5] [3.0, 102.0] map value=1.0 2.0 valueVector=[1.0, 2.0] map 生成:1.0 3.0,1.0 2.0 map value=1.0 3.0 valueVector=[1.0, 3.0] map 生成:1.0 3.0,1.0 3.0 map value=1.0 4.0 valueVector=[1.0, 4.0] map 生成:1.0 3.0,1.0 4.0 map value=2.0 5.0 valueVector=[2.0, 5.0] map 生成:2.0 6.5,2.0 5.0 map value=2.0 6.0 valueVector=[2.0, 6.0] map 生成:2.0 6.5,2.0 6.0 map value=2.0 7.0 valueVector=[2.0, 7.0] map 生成:2.0 6.5,2.0 7.0 map value=2.0 8.0 valueVector=[2.0, 8.0] map 生成:2.0 6.5,2.0 8.0 map value=3.0 100.0 valueVector=[3.0, 100.0] map 生成:3.0 102.0,3.0 100.0 map value=3.0 101.0 valueVector=[3.0, 101.0] map 生成:3.0 102.0,3.0 101.0 map value=3.0 102.0 valueVector=[3.0, 102.0] map 生成:3.0 102.0,3.0 102.0 map value=3.0 103.0 valueVector=[3.0, 103.0] map 生成:3.0 102.0,3.0 103.0 map value=3.0 104.0 valueVector=[3.0, 104.0] map 生成:3.0 102.0,3.0 104.0 2017-11-18 13:41:01,359 INFO [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 2017-11-18 13:41:01,359 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1457)) - Starting flush of map output 2017-11-18 13:41:01,359 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1475)) - Spilling map output 2017-11-18 13:41:01,359 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1476)) - bufstart = 0; bufend = 1872; bufvoid = 104857600 2017-11-18 13:41:01,360 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1478)) - kvstart = 26214396(104857584); kvend = 26214352(104857408); length = 45/6553600 ----------------------KMeansCombiner--------------------- key=1.0 3.0 values: value=1.0 4.0 value=1.0 3.0 value=1.0 2.0 sumWritableList=[1.0, 3.0] ----------------------KMeansCombiner--------------------- key=2.0 6.5 values: value=2.0 8.0 value=2.0 7.0 value=2.0 6.0 value=2.0 5.0 sumWritableList=[2.0, 6.5] ----------------------KMeansCombiner--------------------- key=3.0 102.0 values: value=3.0 104.0 value=3.0 103.0 value=3.0 102.0 value=3.0 101.0 value=3.0 100.0 sumWritableList=[3.0, 102.0] 2017-11-18 13:41:01,363 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:sortAndSpill(1660)) - Finished spill 0 2017-11-18 13:41:01,364 INFO [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:done(1001)) - Task:attempt_local2141033359_0006_m_000000_0 is done. And is in the process of committing 2017-11-18 13:41:01,365 INFO [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - map 2017-11-18 13:41:01,366 INFO [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:sendDone(1121)) - Task 'attempt_local2141033359_0006_m_000000_0' done. 2017-11-18 13:41:01,366 INFO [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:run(249)) - Finishing task: attempt_local2141033359_0006_m_000000_0 2017-11-18 13:41:01,366 INFO [Thread-155] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - map task executor complete. 2017-11-18 13:41:01,366 INFO [Thread-155] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(448)) - Waiting for reduce tasks 2017-11-18 13:41:01,366 INFO [pool-19-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:run(302)) - Starting task: attempt_local2141033359_0006_r_000000_0 2017-11-18 13:41:01,368 INFO [pool-19-thread-1] mapred.Task (Task.java:initialize(587)) - Using ResourceCalculatorProcessTree : [ ] 2017-11-18 13:41:01,368 INFO [pool-19-thread-1] mapred.ReduceTask (ReduceTask.java:run(362)) - Using ShuffleConsumerPlugin:[email protected] 2017-11-18 13:41:01,369 INFO [pool-19-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:<init>(197)) - MergerManager: memoryLimit=1283037568, maxSingleShuffleLimit=320759392, mergeThreshold=846804800, ioSortFactor=10, memToMemMergeOutputsThreshold=10 2017-11-18 13:41:01,383 INFO [EventFetcher for fetching Map Completion Events] reduce.EventFetcher (EventFetcher.java:run(61)) - attempt_local2141033359_0006_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events 2017-11-18 13:41:01,388 INFO [localfetcher#6] reduce.LocalFetcher (LocalFetcher.java:copyMapOutput(141)) - localfetcher#6 about to shuffle output of map attempt_local2141033359_0006_m_000000_0 decomp: 476 len: 480 to MEMORY 2017-11-18 13:41:01,388 INFO [localfetcher#6] reduce.InMemoryMapOutput (InMemoryMapOutput.java:shuffle(100)) - Read 476 bytes from map-output for attempt_local2141033359_0006_m_000000_0 2017-11-18 13:41:01,389 INFO [localfetcher#6] reduce.MergeManagerImpl (MergeManagerImpl.java:closeInMemoryFile(315)) - closeInMemoryFile -> map-output of size: 476, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->476 2017-11-18 13:41:01,389 INFO [EventFetcher for fetching Map Completion Events] reduce.EventFetcher (EventFetcher.java:run(76)) - EventFetcher is interrupted.. Returning 2017-11-18 13:41:01,390 INFO [pool-19-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 1 / 1 copied. 2017-11-18 13:41:01,390 INFO [pool-19-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(687)) - finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs 2017-11-18 13:41:01,391 INFO [pool-19-thread-1] mapred.Merger (Merger.java:merge(597)) - Merging 1 sorted segments 2017-11-18 13:41:01,391 INFO [pool-19-thread-1] mapred.Merger (Merger.java:merge(696)) - Down to the last merge-pass, with 1 segments left of total size: 396 bytes 2017-11-18 13:41:01,391 INFO [pool-19-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(754)) - Merged 1 segments, 476 bytes to disk to satisfy reduce memory limit 2017-11-18 13:41:01,391 INFO [pool-19-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(784)) - Merging 1 files, 480 bytes from disk 2017-11-18 13:41:01,392 INFO [pool-19-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(799)) - Merging 0 segments, 0 bytes from memory into reduce 2017-11-18 13:41:01,392 INFO [pool-19-thread-1] mapred.Merger (Merger.java:merge(597)) - Merging 1 sorted segments 2017-11-18 13:41:01,392 INFO [pool-19-thread-1] mapred.Merger (Merger.java:merge(696)) - Down to the last merge-pass, with 1 segments left of total size: 396 bytes 2017-11-18 13:41:01,392 INFO [pool-19-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 1 / 1 copied. ----------------------reduce--------------------- key=1.0 3.0 values: 1.0 3.0 reduce生成:1.0 3.0|1.0 3.0 ----------------------reduce--------------------- key=2.0 6.5 values: 2.0 6.5 reduce生成:2.0 6.5|2.0 6.5 ----------------------reduce--------------------- key=3.0 102.0 values: 3.0 102.0 reduce生成:3.0 102.0|3.0 102.0 2017-11-18 13:41:01,400 INFO [pool-19-thread-1] mapred.Task (Task.java:done(1001)) - Task:attempt_local2141033359_0006_r_000000_0 is done. And is in the process of committing 2017-11-18 13:41:01,401 INFO [pool-19-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 1 / 1 copied. 2017-11-18 13:41:01,401 INFO [pool-19-thread-1] mapred.Task (Task.java:commit(1162)) - Task attempt_local2141033359_0006_r_000000_0 is allowed to commit now 2017-11-18 13:41:01,402 INFO [pool-19-thread-1] output.FileOutputCommitter (FileOutputCommitter.java:commitTask(439)) - Saved output of task 'attempt_local2141033359_0006_r_000000_0' to file:/media/chenjie/0009418200012FF3/ubuntu/kmeans/_temporary/0/task_local2141033359_0006_r_000000 2017-11-18 13:41:01,403 INFO [pool-19-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - reduce > reduce 2017-11-18 13:41:01,403 INFO [pool-19-thread-1] mapred.Task (Task.java:sendDone(1121)) - Task 'attempt_local2141033359_0006_r_000000_0' done. 2017-11-18 13:41:01,403 INFO [pool-19-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:run(325)) - Finishing task: attempt_local2141033359_0006_r_000000_0 2017-11-18 13:41:01,403 INFO [Thread-155] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - reduce task executor complete. 2017-11-18 13:41:02,279 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1367)) - Job job_local2141033359_0006 running in uber mode : false 2017-11-18 13:41:02,280 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1374)) - map 100% reduce 100% 2017-11-18 13:41:02,281 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1385)) - Job job_local2141033359_0006 completed successfully 2017-11-18 13:41:02,288 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1392)) - Counters: 33 File System Counters FILE: Number of bytes read=14392 FILE: Number of bytes written=3119326 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 Map-Reduce Framework Map input records=12 Map output records=12 Map output bytes=1872 Map output materialized bytes=480 Input split bytes=130 Combine input records=12 Combine output records=3 Reduce input groups=3 Reduce shuffle bytes=480 Reduce input records=3 Reduce output records=3 Spilled Records=6 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=12 CPU time spent (ms)=0 Physical memory (bytes) snapshot=0 Virtual memory (bytes) snapshot=0 Total committed heap usage (bytes)=1714946048 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=106 File Output Format Counters Bytes Written=38 readRandomCenterFromInputFile讀取一行:1.0 3.0 readRandomCenterFromInputFile讀取一行:2.0 6.5 readRandomCenterFromInputFile讀取一行:3.0 102.0 2017-11-18 13:41:02,316 INFO [main] jvm.JvmMetrics (JvmMetrics.java:init(71)) - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized 2017-11-18 13:41:02,324 WARN [main] mapreduce.JobResourceUploader (JobResourceUploader.java:uploadFiles(171)) - No job jar file set. User classes may not be found. See Job or Job#setJar(String). 2017-11-18 13:41:02,325 INFO [main] input.FileInputFormat (FileInputFormat.java:listStatus(281)) - Total input paths to process : 1 2017-11-18 13:41:02,346 INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(199)) - number of splits:1 2017-11-18 13:41:02,356 INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:printTokens(288)) - Submitting tokens for job: job_local382131182_0007 2017-11-18 13:41:02,419 INFO [main] mapreduce.Job (Job.java:submit(1301)) - The url to track the job: http://localhost:8080/ 2017-11-18 13:41:02,419 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1346)) - Running job: job_local382131182_0007 2017-11-18 13:41:02,419 INFO [Thread-182] mapred.LocalJobRunner (LocalJobRunner.java:createOutputCommitter(471)) - OutputCommitter set in config null 2017-11-18 13:41:02,420 INFO [Thread-182] mapred.LocalJobRunner (LocalJobRunner.java:createOutputCommitter(489)) - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 2017-11-18 13:41:02,422 INFO [Thread-182] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(448)) - Waiting for map tasks 2017-11-18 13:41:02,422 INFO [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:run(224)) - Starting task: attempt_local382131182_0007_m_000000_0 2017-11-18 13:41:02,423 INFO [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:initialize(587)) - Using ResourceCalculatorProcessTree : [ ] 2017-11-18 13:41:02,424 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:runNewMapper(753)) - Processing split: file:/media/chenjie/0009418200012FF3/ubuntu/kmeans_input_file.txt:0+106 2017-11-18 13:41:02,491 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:setEquator(1202)) - (EQUATOR) 0 kvi 26214396(104857584) 2017-11-18 13:41:02,491 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(995)) - mapreduce.task.io.sort.mb: 100 2017-11-18 13:41:02,491 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(996)) - soft limit at 83886080 2017-11-18 13:41:02,492 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(997)) - bufstart = 0; bufvoid = 104857600 2017-11-18 13:41:02,492 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(998)) - kvstart = 26214396; length = 6553600 2017-11-18 13:41:02,492 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:createSortingCollector(402)) - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer ----------setup------------ ----------centers------------ [1.0, 3.0] [2.0, 6.5] [3.0, 102.0] map value=1.0 2.0 valueVector=[1.0, 2.0] map 生成:1.0 3.0,1.0 2.0 map value=1.0 3.0 valueVector=[1.0, 3.0] map 生成:1.0 3.0,1.0 3.0 map value=1.0 4.0 valueVector=[1.0, 4.0] map 生成:1.0 3.0,1.0 4.0 map value=2.0 5.0 valueVector=[2.0, 5.0] map 生成:2.0 6.5,2.0 5.0 map value=2.0 6.0 valueVector=[2.0, 6.0] map 生成:2.0 6.5,2.0 6.0 map value=2.0 7.0 valueVector=[2.0, 7.0] map 生成:2.0 6.5,2.0 7.0 map value=2.0 8.0 valueVector=[2.0, 8.0] map 生成:2.0 6.5,2.0 8.0 map value=3.0 100.0 valueVector=[3.0, 100.0] map 生成:3.0 102.0,3.0 100.0 map value=3.0 101.0 valueVector=[3.0, 101.0] map 生成:3.0 102.0,3.0 101.0 map value=3.0 102.0 valueVector=[3.0, 102.0] map 生成:3.0 102.0,3.0 102.0 map value=3.0 103.0 valueVector=[3.0, 103.0] map 生成:3.0 102.0,3.0 103.0 map value=3.0 104.0 valueVector=[3.0, 104.0] map 生成:3.0 102.0,3.0 104.0 2017-11-18 13:41:02,495 INFO [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 2017-11-18 13:41:02,495 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1457)) - Starting flush of map output 2017-11-18 13:41:02,495 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1475)) - Spilling map output 2017-11-18 13:41:02,495 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1476)) - bufstart = 0; bufend = 1872; bufvoid = 104857600 2017-11-18 13:41:02,495 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1478)) - kvstart = 26214396(104857584); kvend = 26214352(104857408); length = 45/6553600 ----------------------KMeansCombiner--------------------- key=1.0 3.0 values: value=1.0 4.0 value=1.0 3.0 value=1.0 2.0 sumWritableList=[1.0, 3.0] ----------------------KMeansCombiner--------------------- key=2.0 6.5 values: value=2.0 8.0 value=2.0 7.0 value=2.0 6.0 value=2.0 5.0 sumWritableList=[2.0, 6.5] ----------------------KMeansCombiner--------------------- key=3.0 102.0 values: value=3.0 104.0 value=3.0 103.0 value=3.0 102.0 value=3.0 101.0 value=3.0 100.0 sumWritableList=[3.0, 102.0] 2017-11-18 13:41:02,500 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:sortAndSpill(1660)) - Finished spill 0 2017-11-18 13:41:02,500 INFO [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:done(1001)) - Task:attempt_local382131182_0007_m_000000_0 is done. And is in the process of committing 2017-11-18 13:41:02,502 INFO [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - map 2017-11-18 13:41:02,502 INFO [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:sendDone(1121)) - Task 'attempt_local382131182_0007_m_000000_0' done. 2017-11-18 13:41:02,502 INFO [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:run(249)) - Finishing task: attempt_local382131182_0007_m_000000_0 2017-11-18 13:41:02,502 INFO [Thread-182] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - map task executor complete. 2017-11-18 13:41:02,503 INFO [Thread-182] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(448)) - Waiting for reduce tasks 2017-11-18 13:41:02,503 INFO [pool-22-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:run(302)) - Starting task: attempt_local382131182_0007_r_000000_0 2017-11-18 13:41:02,504 INFO [pool-22-thread-1] mapred.Task (Task.java:initialize(587)) - Using ResourceCalculatorProcessTree : [ ] 2017-11-18 13:41:02,504 INFO [pool-22-thread-1] mapred.ReduceTask (ReduceTask.java:run(362)) - Using ShuffleConsumerPlugin: [email protected] 2017-11-18 13:41:02,504 INFO [pool-22-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:<init>(197)) - MergerManager: memoryLimit=1283037568, maxSingleShuffleLimit=320759392, mergeThreshold=846804800, ioSortFactor=10, memToMemMergeOutputsThreshold=10 2017-11-18 13:41:02,505 INFO [EventFetcher for fetching Map Completion Events] reduce.EventFetcher (EventFetcher.java:run(61)) - attempt_local382131182_0007_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events 2017-11-18 13:41:02,508 INFO [localfetcher#7] reduce.LocalFetcher (LocalFetcher.java:copyMapOutput(141)) - localfetcher#7 about to shuffle output of map attempt_local382131182_0007_m_000000_0 decomp: 476 len: 480 to MEMORY 2017-11-18 13:41:02,509 INFO [localfetcher#7] reduce.InMemoryMapOutput (InMemoryMapOutput.java:shuffle(100)) - Read 476 bytes from map-output for attempt_local382131182_0007_m_000000_0 2017-11-18 13:41:02,509 INFO [localfetcher#7] reduce.MergeManagerImpl (MergeManagerImpl.java:closeInMemoryFile(315)) - closeInMemoryFile -> map-output of size: 476, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->476 2017-11-18 13:41:02,512 INFO [EventFetcher for fetching Map Completion Events] reduce.EventFetcher (EventFetcher.java:run(76)) - EventFetcher is interrupted.. Returning 2017-11-18 13:41:02,512 INFO [pool-22-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 1 / 1 copied. 2017-11-18 13:41:02,512 INFO [pool-22-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(687)) - finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs 2017-11-18 13:41:02,514 INFO [pool-22-thread-1] mapred.Merger (Merger.java:merge(597)) - Merging 1 sorted segments 2017-11-18 13:41:02,514 INFO [pool-22-thread-1] mapred.Merger (Merger.java:merge(696)) - Down to the last merge-pass, with 1 segments left of total size: 396 bytes 2017-11-18 13:41:02,514 INFO [pool-22-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(754)) - Merged 1 segments, 476 bytes to disk to satisfy reduce memory limit 2017-11-18 13:41:02,515 INFO [pool-22-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(784)) - Merging 1 files, 480 bytes from disk 2017-11-18 13:41:02,515 INFO [pool-22-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(799)) - Merging 0 segments, 0 bytes from memory into reduce 2017-11-18 13:41:02,515 INFO [pool-22-thread-1] mapred.Merger (Merger.java:merge(597)) - Merging 1 sorted segments 2017-11-18 13:41:02,515 INFO [pool-22-thread-1] mapred.Merger (Merger.java:merge(696)) - Down to the last merge-pass, with 1 segments left of total size: 396 bytes 2017-11-18 13:41:02,516 INFO [pool-22-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 1 / 1 copied. ----------------------reduce--------------------- key=1.0 3.0 values: 1.0 3.0 reduce生成:1.0 3.0|1.0 3.0 ----------------------reduce--------------------- key=2.0 6.5 values: 2.0 6.5 reduce生成:2.0 6.5|2.0 6.5 ----------------------reduce--------------------- key=3.0 102.0 values: 3.0 102.0 reduce生成:3.0 102.0|3.0 102.0 2017-11-18 13:41:02,523 INFO [pool-22-thread-1] mapred.Task (Task.java:done(1001)) - Task:attempt_local382131182_0007_r_000000_0 is done. And is in the process of committing 2017-11-18 13:41:02,524 INFO [pool-22-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 1 / 1 copied. 2017-11-18 13:41:02,524 INFO [pool-22-thread-1] mapred.Task (Task.java:commit(1162)) - Task attempt_local382131182_0007_r_000000_0 is allowed to commit now 2017-11-18 13:41:02,525 INFO [pool-22-thread-1] output.FileOutputCommitter (FileOutputCommitter.java:commitTask(439)) - Saved output of task 'attempt_local382131182_0007_r_000000_0' to file:/media/chenjie/0009418200012FF3/ubuntu/kmeans/_temporary/0/task_local382131182_0007_r_000000 2017-11-18 13:41:02,526 INFO [pool-22-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - reduce > reduce 2017-11-18 13:41:02,526 INFO [pool-22-thread-1] mapred.Task (Task.java:sendDone(1121)) - Task 'attempt_local382131182_0007_r_000000_0' done. 2017-11-18 13:41:02,526 INFO [pool-22-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:run(325)) - Finishing task: attempt_local382131182_0007_r_000000_0 2017-11-18 13:41:02,526 INFO [Thread-182] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - reduce task executor complete. 2017-11-18 13:41:03,420 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1367)) - Job job_local382131182_0007 running in uber mode : false 2017-11-18 13:41:03,420 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1374)) - map 100% reduce 100% 2017-11-18 13:41:03,421 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1385)) - Job job_local382131182_0007 completed successfully 2017-11-18 13:41:03,427 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1392)) - Counters: 33 File System Counters FILE: Number of bytes read=16956 FILE: Number of bytes written=3637458 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 Map-Reduce Framework Map input records=12 Map output records=12 Map output bytes=1872 Map output materialized bytes=480 Input split bytes=130 Combine input records=12 Combine output records=3 Reduce input groups=3 Reduce shuffle bytes=480 Reduce input records=3 Reduce output records=3 Spilled Records=6 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=0 CPU time spent (ms)=0 Physical memory (bytes) snapshot=0 Virtual memory (bytes) snapshot=0 Total committed heap usage