1. 程式人生 > >將MapReduce分析手機上網記錄的結果進行排序操作

將MapReduce分析手機上網記錄的結果進行排序操作

1.編寫Java程式碼,並將其打包成jar包

在eclipse上建立個新的java專案,建立lib資料夾,將上次的jar同樣匯入進來

然後建立個TelBean類 這裡實現了WritableComparable介面,就是序列化的比較,詳情查詢api文件

public interface Comparator比較功能,對一些物件的集合施加了一個整體排序 。 可以將比較器傳遞給排序方法(如Collections.sort或Arrays.sort ),以便對排序順序進行精確控制。 比較器還可以用來控制某些資料結構(如順序sorted sets或sorted maps ),或對於不具有物件的集合提供的排序natural ordering 。 通過比較c上的一組元素S的確定的順序對被認為是與equals一致當且僅當c.compare(e1, e2)==0具有用於S每e1和e2相同布林值e1.equals(e2)。

當使用能夠強制排序不一致的比較器時,應注意使用排序集(或排序圖)。 假設具有顯式比較器c的排序集(或排序對映)與從集合S中繪製的元素(或鍵) 一起使用 。 如果88446235254451上的c強制的排序與equals不一致,則排序集(或排序對映)將表現為“奇怪”。 特別是排序集(或排序圖)將違反用於設定(或對映)的一般合同,其按equals定義。

例如,假設一個將兩個元件a和b ,使得(a.equals(b) && c.compare(a, b) != 0)到空TreeSet與比較c 。 因為a和b與樹集的角度不相等,所以第二個add操作將返回true(並且樹集的大小將增加),即使這與Set.add方法的規範相反。

注意:這通常是一個好主意比較,也能實現java.io.Serializable,因為它們可能被用來作為排序的序列化資料結構的方法(如TreeSet , TreeMap )。 為了使資料結構成功序列化,比較器(如果提供)必須實現Serializable 。

對於數學上的傾斜,即限定了施加順序 ,給定的比較器c上一組給定物件的S強加關係式為:

{(x, y) such that c.compare(x, y) <= 0}. 這個總訂單的商是: {(x, y) such that c.compare(x, y) == 0}. 它從合同compare,該商數是S的等價關係緊隨其後,而強加的排序是S, 總訂單 。 當我們說S上的c所規定的順序與等於一致時,我們的意思是排序的商是由物件’ equals(Object)方法定義的等價關係: {(x, y) such that x.equals(y)}. 與Comparable不同,比較器可以可選地允許比較空引數,同時保持對等價關係的要求。

此介面是成員Java Collections Framework 。

從以下版本開始: 1.2 另請參見: Comparable , Serializable

package com.zy.hadoop.entity;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

public class TelBean implements WritableComparable<TelBean>{
	
	private String tel;
	private long upPayLoad;
	private long downPayLoad;
	private long totalPayLoad;
	
	
	
	public String getTel() {
		return tel;
	}

	public void setTel(String tel) {
		this.tel = tel;
	}

	public long getUpPayLoad() {
		return upPayLoad;
	}

	public void setUpPayLoad(long upPayLoad) {
		this.upPayLoad = upPayLoad;
	}

	public long getDownPayLoad() {
		return downPayLoad;
	}

	public void setDownPayLoad(long downPayLoad) {
		this.downPayLoad = downPayLoad;
	}

	public long getTotalPayLoad() {
		return totalPayLoad;
	}

	public void setTotalPayLoad(long totalPayLoad) {
		this.totalPayLoad = totalPayLoad;
	}

	public TelBean(String tel, long upPayLoad, long downPayLoad, long totalPayLoad) {
		super();
		this.tel = tel;
		this.upPayLoad = upPayLoad;
		this.downPayLoad = downPayLoad;
		this.totalPayLoad = totalPayLoad;
	}

	public TelBean() {
		super();
		// TODO Auto-generated constructor stub
	}

	@Override
	public String toString() {
		return  tel + "\t" + upPayLoad + "\t" + downPayLoad + "\t"
				+ totalPayLoad ;
	}

	//反序列化的過程
	@Override
	public void readFields(DataInput in) throws IOException {
		this.tel = in.readUTF();
		this.upPayLoad = in.readLong();
		this.downPayLoad = in.readLong();
		this.totalPayLoad = in.readLong();
	}

	//序列化的過程
	@Override
	public void write(DataOutput out) throws IOException {
		// TODO Auto-generated method stub
		out.writeUTF(this.tel);
		out.writeLong(this.upPayLoad);
		out.writeLong(this.downPayLoad);
		out.writeLong(this.totalPayLoad);
	}
	//compare比較,詳情查閱java的api文件
	@Override
	public int compareTo(TelBean bean) {
		// TODO Auto-generated method stub
		return (int)(this.totalPayLoad-bean.getTotalPayLoad());
	}

}

然後在mr包下依次建立SortMapper,SortReducer,SortCount

SortMapper

package com.zy.hadoop.mr2;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import com.zy.hadoop.entity.TelBean;

public class SortMapper extends Mapper<LongWritable, Text, TelBean, NullWritable>{

	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, TelBean, NullWritable>.Context context)
			throws IOException, InterruptedException {
		//value ,第一mr出來的結果中的每一行
		String line = value.toString();
		//拆分字串"\t"
		String[] strs = line.split("\t");
		//直接通過下標取值
		//電話號碼
		
		String tel = strs[0];
		//上行流量
		long upPayLoad=Long.parseLong(strs[2]);
		//下行流量
		long downPayLoad=Long.parseLong(strs[3]);
		//總流量
		long totalPayLoad=Long.parseLong(strs[4]);
		//把去除的值封裝到物件中
		TelBean telBean = new TelBean(tel, upPayLoad, downPayLoad, totalPayLoad);
		//輸出k2,v2
		context.write(telBean, NullWritable.get());
	}

}

SortReducer

package com.zy.hadoop.mr2;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import com.zy.hadoop.entity.TelBean;

public class SortReducer extends Reducer<TelBean, NullWritable, TelBean, NullWritable>{

	@Override
	protected void reduce(TelBean arg0, Iterable<NullWritable> arg1,
			Reducer<TelBean, NullWritable, TelBean, NullWritable>.Context arg2)
			throws IOException, InterruptedException {
		arg2.write(arg0, NullWritable.get());
	}

	
}

SortCount

package com.zy.hadoop.mr2;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.zy.hadoop.entity.TelBean;

public class SortCount {

	public static void main(String[] args) throws Exception {
		// 1.獲取job
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		// 2.指定job使用的類
		job.setJarByClass(SortCount.class);

		// 3.設定Mapper的屬性
		job.setMapperClass(SortMapper.class);
		job.setMapOutputKeyClass(TelBean.class);
		job.setMapOutputValueClass(NullWritable.class);

		// 4.設定輸入檔案
		FileInputFormat.setInputPaths(job, new Path(args[0]));

		// 5.設定reducer的屬性
		job.setReducerClass(SortReducer.class);
		job.setOutputKeyClass(TelBean.class);
		job.setMapOutputValueClass(NullWritable.class);

		// 6.設定輸出資料夾,檢視結果儲存到hdfs資料夾中的位置
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		// 7.提交 true 提交的時候列印日誌資訊
		job.waitForCompletion(true);
	}

}

接下來將專案打包成jar包,上傳到虛擬機器/usr/tmp下

虛擬機器上執行jar包,檢視執行結果

啟動hadoop叢集服務

start-all.sh

檢視是否成功

我們將之前處理過一次的檔案/tel1/part-r-00000(/tel2下的進行過分割槽了,所以不進行處理)作為原始檔進行分析排序

hadoop jar tel_3.jar /tel/part-r-00000 /tel3

等待執行完畢檢視結果

結果如下

[[email protected] tmp]# hadoop fs -ls /
hadoopFound 5 items
-rw-r--r--   1 root supergroup       2315 2018-10-19 19:33 /tel.log
drwxr-xr-x   - root supergroup          0 2018-10-19 19:59 /tel1
drwxr-xr-x   - root supergroup          0 2018-10-19 20:10 /tel2
drwxr-xr-x   - root supergroup          0 2018-10-19 20:47 /tel3
drwx------   - root supergroup          0 2018-10-19 19:58 /tmp
[[email protected] tmp]# hadoop fs -ls /tel3
Found 2 items
-rw-r--r--   1 root supergroup          0 2018-10-19 20:47 /tel3/_SUCCESS
-rw-r--r--   1 root supergroup        477 2018-10-19 20:47 /tel3/part-r-00000
[[email protected] tmp]# hadoop fs -cat /tel3/part-r-00000
13926251106	240	0	240
13826544101	264	0	264
13480253104	180	180	360
13926435656	132	1512	1644
15989002119	1938	180	2118
18211575961	1527	2106	3633
13560436666	2232	1908	4140
13602846565	1938	2910	4848
84138413	4116	1432	5548
15920133257	3156	2936	6092
13922314466	3008	3720	6728
15013685858	3659	3538	7197
13660577991	6960	690	7650
13560439658	2034	5892	7926
18320173382	9531	2412	11943
13726238888	2481	24681	27162
13925057413	11058	48243	59301
13502468823	7335	110349	117684

這就是MapReduce進行簡單的資料分析

不過hadoop叢集的datanode節點如果過多會導致速度慢 , 接下來會介紹zookeeper的高效hadoop叢集如何搭建