將MapReduce分析手機上網記錄的結果進行排序操作
1.編寫Java程式碼,並將其打包成jar包
在eclipse上建立個新的java專案,建立lib資料夾,將上次的jar同樣匯入進來
然後建立個TelBean類 這裡實現了WritableComparable介面,就是序列化的比較,詳情查詢api文件
public interface Comparator比較功能,對一些物件的集合施加了一個整體排序 。 可以將比較器傳遞給排序方法(如Collections.sort或Arrays.sort ),以便對排序順序進行精確控制。 比較器還可以用來控制某些資料結構(如順序sorted sets或sorted maps ),或對於不具有物件的集合提供的排序natural ordering 。 通過比較c上的一組元素S的確定的順序對被認為是與equals一致當且僅當c.compare(e1, e2)==0具有用於S每e1和e2相同布林值e1.equals(e2)。
當使用能夠強制排序不一致的比較器時,應注意使用排序集(或排序圖)。 假設具有顯式比較器c的排序集(或排序對映)與從集合S中繪製的元素(或鍵) 一起使用 。 如果88446235254451上的c強制的排序與equals不一致,則排序集(或排序對映)將表現為“奇怪”。 特別是排序集(或排序圖)將違反用於設定(或對映)的一般合同,其按equals定義。
例如,假設一個將兩個元件a和b ,使得(a.equals(b) && c.compare(a, b) != 0)到空TreeSet與比較c 。 因為a和b與樹集的角度不相等,所以第二個add操作將返回true(並且樹集的大小將增加),即使這與Set.add方法的規範相反。
注意:這通常是一個好主意比較,也能實現java.io.Serializable,因為它們可能被用來作為排序的序列化資料結構的方法(如TreeSet , TreeMap )。 為了使資料結構成功序列化,比較器(如果提供)必須實現Serializable 。
對於數學上的傾斜,即限定了施加順序 ,給定的比較器c上一組給定物件的S強加關係式為:
{(x, y) such that c.compare(x, y) <= 0}. 這個總訂單的商是: {(x, y) such that c.compare(x, y) == 0}. 它從合同compare,該商數是S的等價關係緊隨其後,而強加的排序是S, 總訂單 。 當我們說S上的c所規定的順序與等於一致時,我們的意思是排序的商是由物件’ equals(Object)方法定義的等價關係: {(x, y) such that x.equals(y)}. 與Comparable不同,比較器可以可選地允許比較空引數,同時保持對等價關係的要求。
此介面是成員Java Collections Framework 。
從以下版本開始: 1.2 另請參見: Comparable , Serializable
package com.zy.hadoop.entity;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
public class TelBean implements WritableComparable<TelBean>{
private String tel;
private long upPayLoad;
private long downPayLoad;
private long totalPayLoad;
public String getTel() {
return tel;
}
public void setTel(String tel) {
this.tel = tel;
}
public long getUpPayLoad() {
return upPayLoad;
}
public void setUpPayLoad(long upPayLoad) {
this.upPayLoad = upPayLoad;
}
public long getDownPayLoad() {
return downPayLoad;
}
public void setDownPayLoad(long downPayLoad) {
this.downPayLoad = downPayLoad;
}
public long getTotalPayLoad() {
return totalPayLoad;
}
public void setTotalPayLoad(long totalPayLoad) {
this.totalPayLoad = totalPayLoad;
}
public TelBean(String tel, long upPayLoad, long downPayLoad, long totalPayLoad) {
super();
this.tel = tel;
this.upPayLoad = upPayLoad;
this.downPayLoad = downPayLoad;
this.totalPayLoad = totalPayLoad;
}
public TelBean() {
super();
// TODO Auto-generated constructor stub
}
@Override
public String toString() {
return tel + "\t" + upPayLoad + "\t" + downPayLoad + "\t"
+ totalPayLoad ;
}
//反序列化的過程
@Override
public void readFields(DataInput in) throws IOException {
this.tel = in.readUTF();
this.upPayLoad = in.readLong();
this.downPayLoad = in.readLong();
this.totalPayLoad = in.readLong();
}
//序列化的過程
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(this.tel);
out.writeLong(this.upPayLoad);
out.writeLong(this.downPayLoad);
out.writeLong(this.totalPayLoad);
}
//compare比較,詳情查閱java的api文件
@Override
public int compareTo(TelBean bean) {
// TODO Auto-generated method stub
return (int)(this.totalPayLoad-bean.getTotalPayLoad());
}
}
然後在mr包下依次建立SortMapper,SortReducer,SortCount
SortMapper
package com.zy.hadoop.mr2;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import com.zy.hadoop.entity.TelBean;
public class SortMapper extends Mapper<LongWritable, Text, TelBean, NullWritable>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, TelBean, NullWritable>.Context context)
throws IOException, InterruptedException {
//value ,第一mr出來的結果中的每一行
String line = value.toString();
//拆分字串"\t"
String[] strs = line.split("\t");
//直接通過下標取值
//電話號碼
String tel = strs[0];
//上行流量
long upPayLoad=Long.parseLong(strs[2]);
//下行流量
long downPayLoad=Long.parseLong(strs[3]);
//總流量
long totalPayLoad=Long.parseLong(strs[4]);
//把去除的值封裝到物件中
TelBean telBean = new TelBean(tel, upPayLoad, downPayLoad, totalPayLoad);
//輸出k2,v2
context.write(telBean, NullWritable.get());
}
}
SortReducer
package com.zy.hadoop.mr2;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import com.zy.hadoop.entity.TelBean;
public class SortReducer extends Reducer<TelBean, NullWritable, TelBean, NullWritable>{
@Override
protected void reduce(TelBean arg0, Iterable<NullWritable> arg1,
Reducer<TelBean, NullWritable, TelBean, NullWritable>.Context arg2)
throws IOException, InterruptedException {
arg2.write(arg0, NullWritable.get());
}
}
SortCount
package com.zy.hadoop.mr2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.zy.hadoop.entity.TelBean;
public class SortCount {
public static void main(String[] args) throws Exception {
// 1.獲取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2.指定job使用的類
job.setJarByClass(SortCount.class);
// 3.設定Mapper的屬性
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(TelBean.class);
job.setMapOutputValueClass(NullWritable.class);
// 4.設定輸入檔案
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 5.設定reducer的屬性
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(TelBean.class);
job.setMapOutputValueClass(NullWritable.class);
// 6.設定輸出資料夾,檢視結果儲存到hdfs資料夾中的位置
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7.提交 true 提交的時候列印日誌資訊
job.waitForCompletion(true);
}
}
接下來將專案打包成jar包,上傳到虛擬機器/usr/tmp下
虛擬機器上執行jar包,檢視執行結果
啟動hadoop叢集服務
start-all.sh
檢視是否成功
我們將之前處理過一次的檔案/tel1/part-r-00000(/tel2下的進行過分割槽了,所以不進行處理)作為原始檔進行分析排序
hadoop jar tel_3.jar /tel/part-r-00000 /tel3
等待執行完畢檢視結果
結果如下
[[email protected] tmp]# hadoop fs -ls /
hadoopFound 5 items
-rw-r--r-- 1 root supergroup 2315 2018-10-19 19:33 /tel.log
drwxr-xr-x - root supergroup 0 2018-10-19 19:59 /tel1
drwxr-xr-x - root supergroup 0 2018-10-19 20:10 /tel2
drwxr-xr-x - root supergroup 0 2018-10-19 20:47 /tel3
drwx------ - root supergroup 0 2018-10-19 19:58 /tmp
[[email protected] tmp]# hadoop fs -ls /tel3
Found 2 items
-rw-r--r-- 1 root supergroup 0 2018-10-19 20:47 /tel3/_SUCCESS
-rw-r--r-- 1 root supergroup 477 2018-10-19 20:47 /tel3/part-r-00000
[[email protected] tmp]# hadoop fs -cat /tel3/part-r-00000
13926251106 240 0 240
13826544101 264 0 264
13480253104 180 180 360
13926435656 132 1512 1644
15989002119 1938 180 2118
18211575961 1527 2106 3633
13560436666 2232 1908 4140
13602846565 1938 2910 4848
84138413 4116 1432 5548
15920133257 3156 2936 6092
13922314466 3008 3720 6728
15013685858 3659 3538 7197
13660577991 6960 690 7650
13560439658 2034 5892 7926
18320173382 9531 2412 11943
13726238888 2481 24681 27162
13925057413 11058 48243 59301
13502468823 7335 110349 117684
這就是MapReduce進行簡單的資料分析
不過hadoop叢集的datanode節點如果過多會導致速度慢 , 接下來會介紹zookeeper的高效hadoop叢集如何搭建