1. 程式人生 > >Web日誌流處理的MapReduce程式 -- 兩個(一個使用Collections排序 一個使用MapReduce本身的排序)

Web日誌流處理的MapReduce程式 -- 兩個(一個使用Collections排序 一個使用MapReduce本身的排序)

我的這兩個專案程式碼地址:
Collections排序:
https://gitee.com/tanghongping/web_click_mr_hve
MapReduce排序:
https://gitee.com/tanghongping/MapReduceTest

這兩個專案裡面會有一些車市的程式碼,可以忽略。

使用Collections.sort排序

WeblogBean

package com.thp.bigdata.webClick.mrBean;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/**
 * 對接外部資料的層,表結構定義最好跟外部資料來源保持一致
 * @author 湯小萌
 *
 */
public class WeblogBean implements Writable {

	private boolean valid = true;		// 判斷資料是否合法
	private String remote_addr;			// 記錄客戶端的ip地址
	private String remote_user;			// 記錄客戶端使用者名稱稱  忽略屬性"-"
	private String time_local;			// 記錄訪問時間與時區
	private String request;				// 記錄請求的url與http協議
	private String status;				// 記錄請求狀態;成功是200
	private String body_bytes_sent;		// 記錄發給客戶單主體檔案的大小
	private String http_referer;		// 記錄使用者是從哪個連結過來的
	private String http_user_agent;		// 記錄客戶端瀏覽器的相關資訊
	
	public void set(boolean valid, String remote_addr, String remote_user, String time_local, String request,
			String status, String body_bytes_sent, String http_referer, String http_user_agent) {
		this.valid = valid;
		this.remote_addr = remote_addr;
		this.remote_user = remote_user;
		this.time_local = time_local;
		this.request = request;
		this.status = status;
		this.body_bytes_sent = body_bytes_sent;
		this.http_referer = http_referer;
		this.http_user_agent = http_user_agent;
	}

	public boolean isValid() {
		return valid;
	}

	public void setValid(boolean valid) {
		this.valid = valid;
	}

	public String getRemote_addr() {
		return remote_addr;
	}

	public void setRemote_addr(String remote_addr) {
		this.remote_addr = remote_addr;
	}

	public String getRemote_user() {
		return remote_user;
	}

	public void setRemote_user(String remote_user) {
		this.remote_user = remote_user;
	}

	public String getTime_local() {
		return time_local;
	}

	public void setTime_local(String time_local) {
		this.time_local = time_local;
	}

	public String getRequest() {
		return request;
	}

	public void setRequest(String request) {
		this.request = request;
	}

	public String getStatus() {
		return status;
	}

	public void setStatus(String status) {
		this.status = status;
	}

	public String getBody_bytes_sent() {
		return body_bytes_sent;
	}

	public void setBody_bytes_sent(String body_bytes_sent) {
		this.body_bytes_sent = body_bytes_sent;
	}

	public String getHttp_referer() {
		return http_referer;
	}

	public void setHttp_referer(String http_referer) {
		this.http_referer = http_referer;
	}

	public String getHttp_user_agent() {
		return http_user_agent;
	}

	public void setHttp_user_agent(String http_user_agent) {
		this.http_user_agent = http_user_agent;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeBoolean(this.valid);
		out.writeUTF(null==remote_addr?"":remote_addr);
		out.writeUTF(null==remote_user?"":remote_user);
		out.writeUTF(null==time_local?"":time_local);
		out.writeUTF(null==request?"":request);
		out.writeUTF(null==status?"":status);
		out.writeUTF(null==body_bytes_sent?"":body_bytes_sent);
		out.writeUTF(null==http_referer?"":http_referer);
		out.writeUTF(null==http_user_agent?"":http_user_agent);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.valid = in.readBoolean();
		this.remote_addr = in.readUTF();
		this.remote_user = in.readUTF();
		this.time_local = in.readUTF();
		this.request = in.readUTF();
		this.status = in.readUTF();
		this.body_bytes_sent = in.readUTF();
		this.http_referer = in.readUTF();
		this.http_user_agent = in.readUTF();
	}
	
	
	@Override
	public String toString() {
		StringBuilder sb = new StringBuilder();
		sb.append(this.valid);
		sb.append("\001").append(this.getRemote_addr());
		sb.append("\001").append(this.getRemote_user());
		sb.append("\001").append(this.getTime_local());
		sb.append("\001").append(this.getRequest());
		sb.append("\001").append(this.getStatus());
		sb.append("\001").append(this.getBody_bytes_sent());
		sb.append("\001").append(this.getHttp_referer());
		sb.append("\001").append(this.getHttp_user_agent());
		return sb.toString();
	}
	
	
}

PageViewsBean

package com.thp.bigdata.webClick.mrBean;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/**
 * 
 * @author 湯小萌
 *
 */
public class PageViewsBean implements Writable {
	
	private String session;			// sessionId
	private String remote_addr;		// 客戶端ip地址
	private String timeStr;			// 訪問的時間
	private String request;			// 請求的url
	private int step;				// 訪問的第幾步
	private String staylong;		// 停留的時間
	private String referal;			// 是從哪個頁面過來的
	private String useragent;		// 記錄跟瀏覽器相關資訊
	private String bytes_send;		// 傳送的資料位元組大小
	private String status;			// 本次請求的狀態
	
	
	
	public void set(String session, String remote_addr, String useragent, String timeStr, String request, int step, String staylong, String referal, String bytes_send, String status) {
		this.session = session;
		this.remote_addr = remote_addr;
		this.useragent = useragent;
		this.timeStr = timeStr;
		this.request = request;
		this.step = step;
		this.staylong = staylong;
		this.referal = referal;
		this.bytes_send = bytes_send;
		this.status = status;
	}
	
	public String getSession() {
		return session;
	}
	public void setSession(String session) {
		this.session = session;
	}
	public String getRemote_addr() {
		return remote_addr;
	}
	public void setRemote_addr(String remote_addr) {
		this.remote_addr = remote_addr;
	}
	public String getTimeStr() {
		return timeStr;
	}
	public void setTimeStr(String timeStr) {
		this.timeStr = timeStr;
	}
	public String getRequest() {
		return request;
	}
	public void setRequest(String request) {
		this.request = request;
	}
	public int getStep() {
		return step;
	}
	public void setStep(int step) {
		this.step = step;
	}
	public String getStaylong() {
		return staylong;
	}
	public void setStaylong(String staylong) {
		this.staylong = staylong;
	}
	public String getReferal() {
		return referal;
	}
	public void setReferal(String referal) {
		this.referal = referal;
	}
	public String getUseragent() {
		return useragent;
	}
	public void setUseragent(String useragent) {
		this.useragent = useragent;
	}
	public String getBytes_send() {
		return bytes_send;
	}
	public void setBytes_send(String bytes_send) {
		this.bytes_send = bytes_send;
	}
	public String getStatus() {
		return status;
	}
	public void setStatus(String status) {
		this.status = status;
	}
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(session);
		out.writeUTF(remote_addr);
		out.writeUTF(timeStr);
		out.writeUTF(request);
		out.writeInt(step);
		out.writeUTF(staylong);
		out.writeUTF(referal);
		out.writeUTF(useragent);
		out.writeUTF(bytes_send);
		out.writeUTF(status);
	}
	@Override
	public void readFields(DataInput in) throws IOException {
		this.session = in.readUTF();
		this.remote_addr = in.readUTF();
		this.timeStr = in.readUTF();
		this.request = in.readUTF();
		this.step = in.readInt();
		this.staylong = in.readUTF();
		this.referal = in.readUTF();
		this.useragent = in.readUTF();
		this.bytes_send = in.readUTF();
		this.status = in.readUTF();
	}
	
	
	
	
	
}

VisitBean

package com.thp.bigdata.webClick.mrBean;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/**
 * 
 * @author 湯小萌
 *
 */
public class VisitBean implements Writable {
	private String session;
	private String remote_addr;
	private String inTime;
	private String outTime;
	private String inPage;
	private String outPage;
	private String referal;
	private int pageVisits;
	
	public void set(String session, String remote_addr, String inTime, String outTime, String inPage, String outPage, String referal, int pageVisits) {
		this.session = session;
		this.remote_addr = remote_addr;
		this.inTime = inTime;
		this.outTime = outTime;
		this.inPage = inPage;
		this.outPage = outPage;
		this.referal = referal;
		this.pageVisits = pageVisits;
	}

	public String getSession() {
		return session;
	}

	public void setSession(String session) {
		this.session = session;
	}

	public String getRemote_addr() {
		return remote_addr;
	}

	public void setRemote_addr(String remote_addr) {
		this.remote_addr = remote_addr;
	}

	public String getInTime() {
		return inTime;
	}

	public void setInTime(String inTime) {
		this.inTime = inTime;
	}

	public String getOutTime() {
		return outTime;
	}

	public void setOutTime(String outTime) {
		this.outTime = outTime;
	}

	public String getInPage() {
		return inPage;
	}

	public void setInPage(String inPage) {
		this.inPage = inPage;
	}

	public String getOutPage() {
		return outPage;
	}

	public void setOutPage(String outPage) {
		this.outPage = outPage;
	}

	public String getReferal() {
		return referal;
	}

	public void setReferal(String referal) {
		this.referal = referal;
	}

	public int getPageVisits() {
		return pageVisits;
	}

	public void setPageVisits(int pageVisits) {
		this.pageVisits = pageVisits;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(session);
		out.writeUTF(remote_addr);
		out.writeUTF(inTime);
		out.writeUTF(outTime);
		out.writeUTF(inPage);
		out.writeUTF(outPage);
		out.writeUTF(referal);
		out.writeInt(pageVisits);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.session = in.readUTF();
		this.remote_addr = in.readUTF();
		this.inTime = in.readUTF();
		this.outTime = in.readUTF();
		this.inPage = in.readUTF();
		this.outPage = in.readUTF();
		this.referal = in.readUTF();
		this.pageVisits = in.readInt();
	}
	
	@Override
	public String toString() {
		return session + "\001" + remote_addr + "\001" + inTime + "\001" +
				outTime + "\001" + inPage + "\001" + outPage + "\001" + referal + "\001" + pageVisits;
	}

	
	
	
}

預處理解析類

package com.thp.bigdata.webClick.mrBean;

import java.io.IOException;
import java.io.InputStream;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Locale;
import java.util.Properties;
import java.util.Set;

import org.junit.Test;

/**
 * 對載入進來的資料進行 
 * @author 湯小萌
 *
 */
public class WeblogParser {
	
	
	
	/**
	 *  0 ) 194.237.142.21
		1 ) -
		2 ) -
		3 ) [18/Sep/2013:06:49:18
		4 ) +0000]
		5 ) "GET
		6 ) /wp-content/uploads/2013/07/rstudio-git3.png
		7 ) HTTP/1.1"
		8 ) 304
		9 ) 0
		10 ) "-"
		11 ) "Mozilla/4.0
		12 ) (compatible;)"
	 * @param line
	 * @return
	 */
	public static WeblogBean parser(String line) {
		WeblogBean weblogBean = new WeblogBean();
		String[] arr = line.split(" ");
		if(arr.length >11) {
			weblogBean.setRemote_addr(arr[0]);
			weblogBean.setRemote_user(arr[1]);
			String time_local = formatDate(arr[3].substring(1));
			if(null == time_local) time_local = "-invalid_time-";
			weblogBean.setTime_local(time_local);
			weblogBean.setRequest(arr[6]);
			
			weblogBean.setStatus(arr[8]);
			weblogBean.setBody_bytes_sent(arr[9]);
			weblogBean.setHttp_referer(arr[10]);
			
			// 如果useragent元素較多,則拼接useragent
			
			if(arr.length > 12) {
				StringBuffer sb = new StringBuffer();
				for(int i = 11; i < arr.length; i++) {
					sb.append(arr[i]);
				}
				weblogBean.setHttp_user_agent(sb.toString());
			} else {
				weblogBean.setHttp_user_agent(arr[11]);
			}
			
			if(Integer.parseInt(weblogBean.getStatus()) >= 400) {  // 狀態碼 >=400 說明請求錯誤
				weblogBean.setValid(false);
			}
			
			if("-invalid_time-".equals(weblogBean.getTime_local())) {
				weblogBean.setValid(false);
			}
			
		} else {
			weblogBean.setValid(false);
		}
		return weblogBean;
	}
	
	
	/**
	 * 過來靜態資源
	 */
	public static void filterStaticResource(WeblogBean bean, Set<String> pages) {
		if(!pages.contains(bean.getRequest())) {
			bean.setValid(false);   // 在這些定義的url資源以外的資源都是作為靜態資源處理
		}
	}
	
	
	
	public static SimpleDateFormat sdf1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.US);
	public static SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.US);
	/**
	 * 時間轉換
	 * @param time_local
	 * @return
	 */
	public static String formatDate(String time_local) {
		try {
			return sdf2.format(sdf1.parse(time_local));
		} catch (ParseException e) {
			e.printStackTrace();
		}
		return null;
	}
	
	@Test
	public void testSpilt() {
		String str = "194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] \"GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/4.0 (compatible;)\"";
		String[] arr = str.split(" ");
		int i = 1;
		for(String s : arr) {
			System.out.println(i + " ) " + s);
			i++;
		}
	}
	
	
	@Test
	public void testProp() throws IOException {
		
	}
	
	public static void main(String[] args) throws IOException {
		Properties pop = new Properties();
		InputStream is = WeblogParser.class.getClassLoader().getResourceAsStream("com/thp/bigdata/webClick/mrBean/url_1.propeties");
		
		pop.load(is);
		String str = (String) pop.get("url");
		System.out.println(str);
		
	}
	
}


MapReduce 程式

1 . 日誌的預處理:

package com.thp.bigdata.webClick.mr.pre;

import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.thp.bigdata.webClick.mrBean.WeblogBean;
import com.thp.bigdata.webClick.mrBean.WeblogParser;

/**
 * 處理原始的日誌,過濾出真實的PV情況
 * 1)轉換時間格式
 * 2)對缺失的欄位填充預設值
 * 3)對記錄標記valid和invalid
 * @author 湯小萌
 *
 */
public class WeblogPreProcess {

	
	static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
		
		Set<String> pages = new HashSet<String>();
		Text k = new Text();
		NullWritable v = NullWritable.get();
		/**
		 * 從外部載入網站url分類
		 */
		@Override
		protected void setup(Context context)
				throws IOException, InterruptedException {
			/*pages.add("/about/");
			pages.add("/black-ip-list/");
			pages.add("/cassandra-clustor/");
			pages.add("/finance-rhive-repurchase/");
			pages.add("/hadoop-family-roadmap/");
			pages.add("/hadoop-hive-intro/");
			pages.add("/hadoop-zookeeper-intro/");
			pages.add("/hadoop-mahout-roadmap/");*/
			
			Properties pop = new Properties();
			InputStream in = WeblogPreProcessMapper.class.getClassLoader().getResourceAsStream("url.propeties");
			pop.load(in);
			String urlStr = pop.getProperty("url");
			String[] urls = urlStr.split(",");
			for(String url : urls) {
				pages.add(url);
			}
			
		}
		
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			WeblogBean weblogBean = WeblogParser.parser(line);
			// 可插拔的方法  : 過濾  js/圖片/css等靜態資源
			WeblogParser.filterStaticResource(weblogBean, pages);
			
			if(weblogBean.isValid()) {  // 無效的資料都被過濾出去了
				k.set(weblogBean.toString());
				context.write(k, v);
			}
			
		}
		
		
	}
	
	
	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		job.setJarByClass(WeblogPreProcess.class);

		job.setMapperClass(WeblogPreProcessMapper.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

//		 FileInputFormat.setInputPaths(job, new Path(args[0]));
//		 FileOutputFormat.setOutputPath(job, new Path(args[1]));
		FileInputFormat.setInputPaths(job, new Path("f:/weblog/input"));
		FileOutputFormat.setOutputPath(job, new Path("f:/weblog/output"));

		job.setNumReduceTasks(0);
		
		job.waitForCompletion(true);

	}
	
	
	
	
}

2.分析出點選流:

package com.thp.bigdata.webClick.mr;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.Locale;
import java.util.UUID;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.thp.bigdata.webClick.mrBean.WeblogBean;

/**
 * 將清洗之後的日誌梳理出點選流pageViews模型資料
 * 輸入的資料是經過清洗之後的資料
 * 
 * 區分每一次會話,給每一次visit(session)增加了session-id(隨機uuid)
 * 梳理出每一次會話中所訪問的每個頁面(請求時間,url,停留時長,以及該頁面在這次session中的序號)
 * 保留referral_url,body_bytes_send,useragent
 * @author 湯小萌
 *
 */
public class ClickStream {
	
	static class ClickStreamMapper extends Mapper<LongWritable, Text, Text, WeblogBean> {
		Text k = new Text();
		WeblogBean v = new WeblogBean();
		
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
System.out.println(line);
			String[] fields = line.split("\001");
			if(fields.length < 9) return;
			v.set("true".equals(fields[0]) ? true : false, fields[1], fields[2], fields[3], fields[4], fields[5], 
					fields[6], fields[7], fields[8]);
			// 只有有效的記錄才會進入後續處理
			if(v.isValid()) {
				k.set(v.getRemote_addr());
				context.write(k, v);
			}
		}
	}
	
	static class ClickStreamReducer extends Reducer<Text, WeblogBean, NullWritable, Text> {
		Text v = new Text();
		@Override
		protected void reduce(Text key, Iterable<WeblogBean> values, Context context) throws IOException, InterruptedException {
			ArrayList<WeblogBean> beans = new ArrayList<WeblogBean>();
			// 先將每一個使用者都拿出來按照時間進行排序
			for(WeblogBean bean : values) {
				WeblogBean weblogBean = new WeblogBean();
				try {
					BeanUtils.copyProperties(weblogBean, bean);
				} catch (IllegalAccessException | InvocationTargetException e) {
					e.printStackTrace();
				}
				beans.add(weblogBean);
			}
			
			
			// 將Bean按照時間進行排序
			Collections.sort(beans, new Comparator<WeblogBean>() {
				@Override
				public int compare(WeblogBean o1, WeblogBean o2) {
					try {
						Date d1 = toDate(o1.getTime_local());
						Date d2 = toDate(o2.getTime_local());
						if(d1 == null || d2 == null) return 0;
						return d1.compareTo(d2);
					} catch (ParseException e) {
						e.printStackTrace();
					}
					return 0;
				}
				
			});
			
			int step = 1;
			String session = UUID.randomUUID().toString();
			for(int i = 0; i < beans.size(); i++) {
				WeblogBean bean = beans.get(i);
				// 如果僅有一條資料,則輸出
				if(1 == beans.size()) {
					// 設定預設停留時間為60s
					v.set(session+"\001"+key.toString()+"\001"+bean.getRemote_user() + "\001" +
					bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + 
					"\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + 
					bean.getBody_bytes_sent() + "\001"
							+ bean.getStatus());
					context.write(NullWritable.get(), v);
					// 重新生成session
					session = UUID.randomUUID().toString();
					break;
				} 
				if(i == 0) {  // 不止一條資料,那麼第一條要直接跳過,因為 bean.get(i-1)
					continue;
				}
				
				try {
					long timeDiff = timeDiff(bean.getTime_local(), beans.get(i - 1).getTime_local());
					if(timeDiff < 30*60*1000) {
						// 如果  本次  -  上次   時間差   <  30  min  ,則輸出前一次的頁面訪問資訊
						v.set(session+"\001"+key.toString()+"\001"+beans.get(i - 1).getRemote_user() + 
								"\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + 
								"\001" + step + "\001" + (timeDiff / 1000) + "\001" + beans.get(i - 1).getHttp_referer() + "\001"
								+ beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001"
								+ beans.get(i - 1).getStatus());
						context.write(NullWritable.get(), v);
						step++;
					} else {
						// 如果  本次 - 上次 時間差  > 30 min, 則輸出前一次的頁面訪問資訊,將step重置為1,以分隔為為新的visit
						v.set(session+"\001"+key.toString()+"\001"+beans.get(i - 1).getRemote_user() + "\001" +
						beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + 
								(step) + "\001" + (60) + "\001" + beans.get(i - 1).getHttp_referer() + "\001"
								+ beans.get(i - 1).getHttp_user_agent() + "\001" + 
								beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus());
						context.write(NullWritable.get(), v);
						// 輸出完上一條之後,重置step編號
						step = 1;
						// session 也要重新生成
						session = UUID.randomUUID().toString();
					}
				} catch (ParseException e) {
					e.printStackTrace();
				}
				
				// 如果此次遍歷時最後一條資料,則將本條資料輸出  session  在上面的邏輯都控制了
				if(i == beans.size() - 1) {
					// 設定預設停留時間為60s
					v.set(session+"\001"+key.toString()+"\001"+bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001" + bean.getStatus());
					context.write(NullWritable.get(), v);
				}
				
				
				
			}
			
			
			
			
		}
		
		
		private String toStr(Date date) {
			SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.US);
			return sdf.format(date);
		}
		
		private Date toDate(String timeStr) throws ParseException {
			SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.UK);
			return sdf.parse(timeStr);
		}
		
		// 算時間差
		private long timeDiff(String time1, String time2) throws ParseException {
			Date d1 = toDate(time1);
			Date d2 = toDate(time2);
			return d1.getTime() - d2.getTime();
		}
		
	}
	
	public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		job.setJarByClass(ClickStream.class);

		job.setMapperClass(ClickStreamMapper.class);
		job.setReducerClass(ClickStreamReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(WeblogBean.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

//		 FileInputFormat.setInputPaths(job, new Path(args[0]));
//		 FileOutputFormat.setOutputPath(job, new Path(args[1]));

		FileInputFormat.setInputPaths(job, new Path("f:/weblog_1/output"));
		FileOutputFormat.setOutputPath(job, new Path("f:/weblog_1/pageviews"));
		
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(new Path("f:/weblog_1/pageviews"))) {
			fs.delete(new Path("f:/weblog_1/pageviews"), true);
		}

		job.waitForCompletion(true);
	}
	
	
	
	
}

3.進一步梳理出visit模型:

package com.thp.bigdata.webClick.mr;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.thp.bigdata.webClick.mrBean.PageViewsBean;
import com.thp.bigdata.webClick.mrBean.VisitBean;

/**
 * 從PageViews模型資料結果中進一步梳理出visit模型
 * 
 * 經過這裡之後出去的資料格式:
 * sessionid   satrt-time   out-time   satrt-page   out-page   pagecounts  ...
 * 
 * @author 湯小萌
 *
 */
public class ClickStreamVisit {
	
	static class ClickStreamVisitMapper extends Mapper<LongWritable, Text, Text, PageViewsBean> {
		PageViewsBean pvBean = new PageViewsBean();
		Text k = new Text();
		
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] fields = line.split("\001");
			int step = Integer.parseInt(fields[5]);   // 訪問的步數
			pvBean.set(fields[0], fields[1], fields[2], fields[3],fields[4], step, fields[6], fields[7], fields[8], fields[9]);
			k.set(pvBean.getSession());
			context.write(k, pvBean);
		}
	}
	
	
	static class ClickStreamVisitReducer extends Reducer<Text, PageViewsBean, NullWritable, VisitBean> {
		@Override
		protected void reduce(Text session, Iterable<PageViewsBean> pvBeans, Context context)
				throws IOException, InterruptedException {
			// 將pvBean按照step排序
			ArrayList<PageViewsBean> pvBeanList = new ArrayList<PageViewsBean>();
			for(PageViewsBean pvBean : pvBeans) {
				PageViewsBean bean = new PageViewsBean();
				try {
					BeanUtils.copyProperties(bean, pvBean);
					pvBeanList.add(bean);
				} catch (IllegalAccessException | InvocationTargetException e) {
					e.printStackTrace();
				}
			}
			Collections.sort(pvBeanList, new Comparator<PageViewsBean>() {
				@Override
				public int compare(PageViewsBean o1, PageViewsBean o2) {
					return o1.getStep() > o2.getStep() ? 1 : -1;
				}
			});
			
			// 取這次visit的首尾pageViews記錄,放入VisitBean中
			VisitBean visitBean = new VisitBean();
			// 取visit 的首記錄
			visitBean.setInPage(pvBeanList.get(0).getRequest());
			visitBean.setInTime(pvBeanList.get(0).getTimeStr());
			// 取visit 的尾記錄
			visitBean.setOutPage(pvBeanList.get(pvBeanList.size() - 1).getRequest());
			visitBean.setOutTime(pvBeanList.get(pvBeanList.size() - 1).getTimeStr());
			// visit訪問的頁面數
			visitBean.setPageVisits(pvBeanList.size());
			// 來訪者的ip
			visitBean.setRemote_addr(pvBeanList.get(0).getRemote_addr());
			// 本次visit的referal
			visitBean.setReferal(pvBeanList.get(0).getReferal());
			visitBean.setSession(session.toString());
			
			context.write(NullWritable.get(), visitBean);
		}
	}
	
	public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		job.setJarByClass(ClickStreamVisit.class);

		job.setMapperClass(ClickStreamVisitMapper.class);
		job.setReducerClass(ClickStreamVisitReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(PageViewsBean.class);

		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(VisitBean.class);
		
		
//		FileInputFormat.setInputPaths(job, new Path(args[0]));
//		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		FileInputFormat.setInputPaths(job, new Path("f:/weblog_1/pageviews"));
		FileOutputFormat.setOutputPath(job, new Path("f:/weblog_1/visitout"));
		
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(new Path("f:/weblog_1/visitout"))) {
			fs.delete(new Path("f:/weblog_1/visitout"), true);
		}
		
		boolean res = job.waitForCompletion(true);
		System.exit(res?0:1);
	}
	
	/**
	 * 	
	 * 2018年11月29日 上午9:00:57
	 * @param a
	 */
	public void a1(int a) {
		new StringBuffer();
	}
	
}


使用MapReduce自身的排序

WeblogBean

package mr.flow.weblog.bean;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

/**
 * 對接外部資料的層,表結構定義最好跟外部資料來源保持一致
 * @author 湯小萌
 *
 */
public class WeblogBean implements WritableComparable<WeblogBean> {

	private boolean valid = true;		// 判斷資料是否合法
	private String remote_addr;			// 記錄客戶端的ip地址
	private String remote_user;			// 記錄客戶端使用者名稱稱  忽略屬性"-"
	private String time_local;			// 記錄訪問時間與時區
	private String request;				// 記錄請求的url與http協議
	private String status;				// 記錄請求狀態;成功是200
	private String body_bytes_sent;		// 記錄發給客戶單主體檔案的大小
	private String http_referer;		// 記錄使用者是從哪個連結過來的
	private String http_user_agent;		// 記錄客戶端瀏覽器的相關資訊
	
	public void set(boolean valid, String remote_addr, String remote_user, String time_local, String request,
			String status, String body_bytes_sent, String http_referer, String http_user_agent) {
		this.valid = valid;
		this.remote_addr = remote_addr;
		this.remote_user = remote_user;
		this.time_local = time_local;
		this.request = request;
		this.status = status;
		this.body_bytes_sent = body_bytes_sent;
		this.http_referer = http_referer;
		this.http_user_agent = http_user_agent;
	}

	public boolean isValid() {
		return valid;
	}

	public void setValid(boolean valid) {
		this.valid = valid;
	}

	public String getRemote_addr() {
		return remote_addr;
	}

	public void setRemote_addr(String remote_addr) {
		this.remote_addr = remote_addr;
	}

	public String getRemote_user() {
		return remote_user;
	}

	public void setRemote_user(String remote_user) {
		this.remote_user = remote_user;
	}

	public String getTime_local() {
		return time_local;
	}

	public void setTime_local(String time_local) {
		this.time_local = time_local;
	}

	public String getRequest() {
		return request;
	}

	public void setRequest(String request) {
		this.request = request;
	}

	public String getStatus() {
		return status;
	}

	public void setStatus(String status) {
		this.status = status;
	}

	public String getBody_bytes_sent() {
		return body_bytes_sent;
	}

	public void setBody_bytes_sent(String body_bytes_sent) {
		this.body_bytes_sent = body_bytes_sent;
	}

	public String getHttp_referer() {
		return http_referer;
	}

	public void setHttp_referer(String http_referer) {
		this.http_referer = http_referer;
	}

	public String getHttp_user_agent() {
		return http_user_agent;
	}

	public void setHttp_user_agent(String http_user_agent) {
		this.http_user_agent = http_user_agent;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeBoolean(this.valid);
		out.writeUTF(null==remote_addr?"":remote_addr);
		out.writeUTF(null==remote_user?"":remote_user);
		out.writeUTF(null==time_local?"":time_local);
		out.writeUTF(null==request?"":request);
		out.writeUTF(null==status?"":status);
		out.writeUTF(null==body_bytes_sent?"":body_bytes_sent);
		out.writeUTF(null==http_referer?"":http_referer);
		out.writeUTF(null==http_user_agent?"":http_user_agent);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.valid = in.readBoolean();
		this.remote_addr = in.readUTF();
		this.remote_user = in.readUTF();
		this.time_local = in.readUTF();
		this.request = in.readUTF();
		this.status = in.readUTF();
		this.body_bytes_sent = in.readUTF();
		this.http_referer = in.readUTF();
		this.http_user_agent = in.readUTF();
	}
	
	
	@Override
	public String toString() {
// System.out.println("=========================");
		StringBuilder sb = new StringBuilder();
		sb.append(this.valid);
		sb.append("\001").append(this.getRemote_addr());
		sb.append("\001").append(this.getRemote_user());
		sb.append("\001").append(this.getTime_local());
		sb.append("\001").append(this.getRequest());
		sb.append("\001").append(this.getStatus());
		sb.append("\001").append(this.getBody_bytes_sent());
		sb.append("\001").append(this.getHttp_referer());
		sb.append("\001").append(this.getHttp_user_agent());
		return sb.toString();
	}

	/**
	 * 跟時間先後順序排序
	 */
	@Override
	public int compareTo(WeblogBean o) {
		/*System.out.println("++++++++++++++++++++++++++++++++++++");
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.UK);
		try {
			Date d1 = sdf.parse(this.getTime_local());
			Date d2 = sdf.parse(o.getTime_local());
			if(d1 == null || d2 == null) return 0;
			System.out.println(d1.compareTo(d2));
			return d1.compareTo(d2);
		} catch (ParseException e) {
			e.printStackTrace();
		}*/
		
		
		// 先比較ip地址  --  【注意:】  這個ip必須要先繼續一次比較  兩個相同之後,才可以進行日期的比較   如果沒有比較ip就只比較日期那麼是會出問題的
		int ipCompareResult = this.getRemote_addr().compareTo(o.getRemote_addr());
		if(ipCompareResult == 0) { // ip地址相同,則繼續比較同一個ip下的訪問的時間
			SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.UK);
			try {
				Date d1 = sdf.parse(this.getTime_local());
				Date d2 = sdf.parse(o.getTime_local());
				if(d1 == null || d2 == null) return 0;
				// System.out.println(d1.compareTo(d2));
				return d1.compareTo(d2);
			} catch (ParseException e) {
				e.printStackTrace();
			}
		} else {
			return ipCompareResult;
		}
		return 0;
	}
	
	
	
	
	
}

PageViewsBean

package mr.flow.weblog.bean;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;

import org.apache.hadoop.io.WritableComparable;
import org.junit.Test;

/**
 * sessionId 
 * @author 湯小萌
 *
 */
public class PageViewsBean implements WritableComparable<PageViewsBean> {
	
	private String session;			// sessionId
	private String remote_addr;		// 客戶端ip地址
	private String timeStr;			// 訪問的時間
	private String request;			// 請求的url
	private int step;				// 訪問的第幾步
	private String staylong;		// 停留的時間
	private String referal;			// 是從哪個頁面過來的
	private String useragent;		// 記錄跟瀏覽器相關資訊
	private String bytes_send;		// 傳送的資料位元組大小
	private String status;			// 本次請求的狀態
	
	public void set(String session, String remote_addr, String useragent, String timeStr, String request, int step, String staylong, String referal, String bytes_send, String status) {
		this.session = session;
		this.remote_addr = remote_addr;
		this.useragent = useragent;
		this.timeStr = timeStr;
		this.request = request;
		this.step = step;
		this.staylong = staylong;
		this.referal = referal;
		this.bytes_send = bytes_send;
		this.status = status;
	}
	
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(session);
		out.writeUTF(remote_addr);
		out.writeUTF(timeStr);
		out.writeUTF(request);
		out.writeInt(step);
		out.writeUTF(staylong);
		out.writeUTF(referal);
		out.writeUTF(useragent);
		out.writeUTF(bytes_send);
		out.writeUTF(status);
	}
	@Override
	public void readFields(DataInput in) throws IOException {
		this.session = in.readUTF();
		this.remote_addr = in.readUTF();
		this.timeStr = in.readUTF();
		this.request = in.readUTF();
		this.step = in.readInt();
		this.staylong = in.readUTF();
		this.referal = in.readUTF();
		this.useragent = in.readUTF();
		this.bytes_send = in.readUTF();
		this.status = in.readUTF();
	}
	
	@Override
	public int compareTo(PageViewsBean o) {
		// 【注意:】這個session也要先進行比較,只有先進行了session的比較後面的step的比較菜有意義
		int sessionCompareResult = this.session.compareTo(o.getSession());
		if(sessionCompareResult == 0) {  // 相同的session的話就繼續比較  step
			return this.step - o.getStep() > 0 ? 1 : -1;   // 這種方式 是正序  從小島大排序
		} 
		return sessionCompareResult;
		// return 0;
	}

	public String getSession() {
		return session;
	}

	public void setSession(String session) {
		this.session = session;
	}

	public String getRemote_addr() {
		return remote_addr;
	}

	public void setRemote_addr(String remote_addr) {
		this.remote_addr = remote_addr;
	}

	public String getTimeStr() {
		return timeStr;
	}

	public void setTimeStr(String timeStr) {
		this.timeStr = timeStr;
	}

	public String getRequest() {
		return request;
	}

	public void setRequest(String request) {
		this.request = request;
	}

	public int getStep() {
		return step;
	}

	public void setStep(int step) {
		this.step = step;
	}

	public String getStaylong() {
		return staylong;
	}

	public void setStaylong(String staylong) {
		this.staylong = staylong;
	}

	public String getReferal() {
		return referal;
	}

	public void setReferal(String referal) {
		this.referal = referal;
	}

	public String getUseragent() {
		return useragent;
	}

	public void setUseragent(String useragent) {
		this.useragent = useragent;
	}

	public String getBytes_send() {
		return bytes_send;
	}

	public void setBytes_send(String bytes_send) {
		this.bytes_send = bytes_send;
	}

	public String getStatus() {
		return status;
	}

	public void setStatus(String status) {
		this.status = status;
	}
	
	
	@Override
	public String toString() {
		return this.session + " " + this.step + "";
	}
	
	
	@Test
	public void testCompareTo() {
		PageViewsBean pvb1 = new PageViewsBean();
		pvb1.set(null, null, null, null, null, 2, null, null, null, null);
		PageViewsBean pvb2 = new PageViewsBean();
		pvb2.set(null, null, null, null, null, 1, null, null, null, null);
		PageViewsBean pvb3 = new PageViewsBean();
		pvb3.set(null, null, null, null, null, 4, null, null, null, null);
		PageViewsBean pvb4 = new PageViewsBean();
		pvb4.set(null, null, null, null, null, 3, null, null, null, null);
		
		ArrayList<PageViewsBean> list = new ArrayList<PageViewsBean>();
		list.add(pvb1);
		list.add(pvb2);
		list.add(pvb3);
		list.add(pvb4);
		
		
		System.out.println(list);
		Collections.sort(list);
		System.out.println(list);
		
	}
	
	
	
	
	
}

VisitBean

package mr.flow.weblog.bean;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/**
 * 記錄的是一個訪問會話的  ip 地址   進入時間   出來時間   進來頁面   出來頁面  從哪個頁面過來的   總共瀏覽過多少個頁面
 * @author 湯小萌
 * @date 2018年11月28日 下午9:01:17
 */
public class VisitBean implements Writable {
	private String session;
	private String remote_addr;
	private String inTime;
	private String outTime;
	private String inPage;
	private String outPage;
	private String referal;
	private int pageVisits;
	
	
	public void set(String session, String remote_addr, String inTime, String outTime, String inPage, String outPage, String referal, int pageVisits) {
		this.session = session;
		this.remote_addr = remote_addr;
		this.inTime = inTime;
		this.outTime = outTime;
		this.inPage = inPage;
		this.outPage = outPage;
		this.referal = referal;
		this.pageVisits = pageVisits;
	}


	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(session);
		out.writeUTF(remote_addr);
		out.writeUTF(inTime);
		out.writeUTF(outTime);
		out.writeUTF(inPage);
		out.writeUTF(outPage);
		out.writeUTF(referal);
		out.writeInt(pageVisits);
	}


	@Override
	public void readFields(DataInput in) throws IOException {
		this.session = in.readUTF();
		this.remote_addr = in.readUTF();
		this.inTime = in.readUTF();
		this.outTime = in.readUTF();
		this.inPage = in.readUTF();
		this.outPage = in.readUTF();
		this.referal = in.readUTF();
		this.pageVisits = in.readInt();
	}
	
	@Override
	public String toString() {
		return session + "\001" + remote_addr + "\001" + inTime + "\001" +
				outTime + "\001" + inPage + "\001" + outPage + "\001" + referal + "\001" + pageVisits;
	}


	public String getSession() {
		return session;
	}


	public void setSession(String session) {
		this.session = session;
	}


	public String getRemote_addr() {
		return remote_addr;
	}


	public void setRemote_addr(String remote_addr) {
		this.remote_addr = remote_addr;
	}


	public String getInTime() {
		return inTime;
	}


	public void setInTime(String inTime) {
		this.inTime = inTime;
	}


	public String getOutTime() {
		return outTime;
	}


	public void setOutTime(String outTime) {
		this.outTime = outTime;
	}


	public String getInPage() {
		return inPage;
	}


	public void setInPage(String inPage) {
		this.inPage = inPage;
	}


	public String getOutPage() {
		return outPage;
	}


	public void setOutPage(String outPage) {
		this.outPage = outPage;
	}


	public String getReferal() {
		return referal;
	}


	public void setReferal(String referal) {
		this.referal = referal;
	}


	public int getPageVisits() {
		return pageVisits;
	}


	public void setPageVisits(int pageVisits) {
		this.pageVisits = pageVisits;
	}
	
	
	
}

WeblogParser

package mr.flow.weblog.bean;

import java.io.IOException;
import java.io.InputStream;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Locale;
import java.util.Properties;
import java.util.Set;

import org.junit.Test;

/**
 * 對載入進來的資料進行 
 * @author 湯小萌
 *
 */
public class WeblogParser {
	
	
	
	/**
	 *  0 ) 194.237.142.21
		1 ) -
		2 ) -
		3 ) [18/Sep/2013:06:49:18
		4 ) +0000]
		5 ) "GET
		6 ) /wp-content/uploads/2013/07/rstudio-git3.png
		7 ) HTTP/1.1"
		8 ) 304
		9 ) 0
		10 ) "-"
		11 ) "Mozilla/4.0
		12 ) (compatible;)"
	 * @param line
	 * @return
	 */
	public static WeblogBean parser(String line) {
		WeblogBean weblogBean = new WeblogBean();
		String[] arr = line.split(" ");
		if(arr.length >11) {
			weblogBean.setRemote_addr(arr[0]);
			weblogBean.setRemote_user(arr[1]);
			String time_local = formatDate(arr[3].substring(1));
			if(null == time_local) time_local = "-invalid_time-";
			weblogBean.setTime_local(time_local);
			weblogBean.setRequest(arr[6]);
			
			weblogBean.setStatus(arr[8]);
			weblogBean.setBody_bytes_sent(arr[9]);
			weblogBean.setHttp_referer(arr[10]);
			
			// 如果useragent元素較多,則拼接useragent
			
			if(arr.length > 12) {
				StringBuffer sb = new StringBuffer();
				for(int i = 11; i < arr.length; i++) {
					sb.append(arr[i]);
				}
				weblogBean.setHttp_user_agent(sb.toString());
			} else {
				weblogBean.setHttp_user_agent(arr[11]);
			}
			
			if(Integer.parseInt(weblogBean.getStatus()) >= 400) {  // 狀態碼 >=400 說明請求錯誤
				weblogBean.setValid(false);
			}
			
			if("-invalid_time-".equals(weblogBean.getTime_local())) {
				weblogBean.setValid(false);
			}
			
		} else {
			weblogBean.setValid(false);
		}
		return weblogBean;
	}
	
	
	/**
	 * 過來靜態資源
	 */
	public static void filterStaticResource(WeblogBean bean, Set<String> pages) {
		if(!pages.contains(bean.getRequest())) {
			bean.setValid(false);   // 在這些定義的url資源以外的資源都是作為靜態資源處理
		}
	}
	
	
	
	public static SimpleDateFormat sdf1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.US);
	public static SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.US);
	/**
	 * 時間轉換
	 * @param time_local
	 * @return
	 */
	public static String formatDate(String time_local) {
		try {
			return sdf2.format(sdf1.parse(time_local));
		} catch (ParseException e) {
			e.printStackTrace();
		}
		return null;
	}
	
	@Test
	public void testSpilt() {
		String str = "194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] \"GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/4.0 (compatible;)\"";
		String[] arr = str.split(" ");
		int i = 1;
		for(String s : arr) {
			System.out.println(i + " ) " + s);
			i++;
		}
	}
	
	
	@Test
	public void testProp() throws IOException {
		
	}
	
	public static void main(String[] args) throws IOException {
		Properties pop = new Properties();
		InputStream is = WeblogParser.class.getClassLoader().getResourceAsStream("com/thp/bigdata/webClick/mrBean/url_1.propeties");
		
		pop.load(is);
		String str = (String) pop.get("url");
		System.out.println(str);
		
	}
	
}

比較器
IpGroupingComparator

package mr.flow.weblog.bean;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * 自定義的聚合規則
 * 當key的ip相同的時候,就放入同一個reduce進行處理
 * @author 湯小萌
 *
 */
public class IpGroupingComparator extends WritableComparator {

	
	public IpGroupingComparator() {
		super(WeblogBean.class, true);
	}
	
	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		WeblogBean aBean = (WeblogBean) a;
		WeblogBean bBean = (WeblogBean) b;
		return aBean.getRemote_addr().compareTo(bBean.getRemote_addr());
	}
	
}

SessionIdGroupingComparator

package mr.flow.weblog.bean;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * 自定義的聚合規則
 * 相同的sessionId要進入同一個reduce進行處理
 * @author 湯小萌
 * @date 2018年11月28日 下午8:55:13
 */
public class SessionIdGroupingComparator extends WritableComparator {
	
	public SessionIdGroupingComparator() {
		super(PageViewsBean.class, true);
	}
	
	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		PageViewsBean aBean = (PageViewsBean) a;
		PageViewsBean bBean = (PageViewsBean) b;
		// System.out.println(aBean.getSession()  + " -- " + bBean.getSession());
		// System.out.println(aBean.getSession().compareTo(bBean.getSession()));
		return aBean.getSession().compareTo(bBean.getSession());
	}
}

1. WeblogPreProcess 日誌預處理

package mr.flow.weblog.pre;

import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import mr.flow.weblog.bean.WeblogBean;
import mr.flow.weblog.bean.WeblogParser;


/**
 * 處理原始的日誌,過濾出真實的PV情況
 * 1)轉換時間格式
 * 2)對缺失的欄位填充預設值
 * 3)對記錄標記valid和invalid
 * @author 湯小萌
 *
 */
public class WeblogPreProcess {

	
	static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
		
		Set<String> pages = new HashSet<String>();
		Text k = new Text();
		NullWritable v = NullWritable.get();
		/**
		 * 從外部載入網站url分類
		 */
		@Override
		protected void setup(Context context)
				throws IOException, InterruptedException {
			/*pages.add("/about/");
			pages.add("/black-ip-list/");
			pages.add("/cassandra-clustor/");
			pages.add("/finance-rhive-repurchase/");
			pages.add("/hadoop-family-roadmap/");
			pages.add("/hadoop-hive-intro/");
			pages.add("/hadoop-zookeeper-intro/");
			pages.add("/hadoop-mahout-roadmap/");*/
			
			Properties pop = new Properties();
			InputStream in = WeblogPreProcessMapper.class.getClassLoader().getResourceAsStream("url.propeties");
			pop.load(in);
			String urlStr = pop.getProperty("url");
			String[] urls = urlStr.split(",");
			for(String url : urls) {
				pages.add(url);
			}
			
		}
		
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			WeblogBean weblogBean = WeblogParser.parser(line);
			// 可插拔的方法  : 過濾  js/圖片/css等靜態資源
			WeblogParser.filterStaticResource(weblogBean, pages);
			
			if(weblogBean.isValid()) {  // 無效的資料都被過濾出去了
				k.set(weblogBean.toString());
				context.write(k, v);
			}
			
		}
		
		
	}
	
	
	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		job.setJarByClass(WeblogPreProcess.class);

		job.setMapperClass(WeblogPreProcessMapper.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

//		 FileInputFormat.setInputPaths(job, new Path(args[0]));
//		 FileOutputFormat.setOutputPath(job, new Path(args[1]));
		FileInputFormat.setInputPaths(job, new Path("f:/weblog_2/input/access.log.fensi"));
		FileOutputFormat.setOutputPath(job, new Path("f:/weblog_2/output"));
		
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(new Path("f:/weblog_2/output"))) {
			fs.delete(new Path("f:/weblog_2/output"), true);
		}
		
		job.setNumReduceTasks(0);
		
		job.waitForCompletion(true);

	}
	
	
	
	
}

2. ClickStream

package mr.flow.weblog.mr;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import java.util.UUID;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import mr.flow.weblog.bean.IpGroupingComparator;
import mr.flow.weblog.bean.WeblogBean;

/**
 * 將清洗過後的資料梳理出點選流pageViews模型資料
 * 輸入的資料是經過預處理之後的資料
 * 
 * 區分每一次會話,給每一次會話打上sessionId
 * 梳理出每一次會話所訪問的每個頁面  (請求時間,url,停留時長,以及該頁面在這次session中的序號)
 * 保留http_referer  body_bytes_sent   http_user_agent
 * 
 * @author 湯小萌
 *
 */
public class ClickStream {

	
	static class ClickStreamMapper extends Mapper<LongWritable, Text, WeblogBean, Text> {
		WeblogBean k = new WeblogBean();
		Text v = new Text();
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String[] fields = value.toString().split("\001");
			if(fields.length < 9) return;
			k.set("true".equals(fields[0]) ? true : false, fields[1], fields[2], fields[3], fields[4], fields[5], 
					fields[6], fields[7], fields[8]);
			if(k.isValid()) {  // 只有有效果的資料才會進入後續的處理
				context.write(k, v);
			}
			
		}
	}
	
	
	/**
	 * 需要生成的資料:
	 * 
	 *  sessionId  ip  time_local  request  step  http_referer  Http_user_agent  http_user_agent body_bytes_sent  status
	 * 
	 * @author 湯小萌
	 *
	 */
	static class ClickStreamReducer extends Reducer<WeblogBean, Text, Text, NullWritable> {
		
		Text k = new Text();
		NullWritable v = NullWritable.get();
		@Override
		protected void reduce(WeblogBean beanKey, Iterable<Text> values, Context context) 
				throws IOException, InterruptedException {
			
			// System.out.println(beanKey);
			
			System.out.println("---------------");
			
			int step = 1;	// 這個頁面在這個session是第幾次訪問的
			String sessionId = UUID.randomUUID().toString();	// 生成sessionId
			
			
			String lastTimeStr = null;
			String lastSaveStr = null;   // 需要保留上一條記錄的後面字串
			String lastIpAndUser = null; // 需要保留的上一條記錄的ip地址和使用者屬性
			String lastUrl = null;		 // 需要保留的上一條記錄的訪問的url
			Long stayTime = 0L;		 // 前後兩次停留的時間
			
			for(Text value : values) {
				// System.out.println(beanKey);
				
				/*k.set(sessionId+"\001"+beanKey.toString()+"\001"+beanKey.getRemote_user() + "\001" +
						beanKey.getTime_local() + "\001" + beanKey.getRequest() + "\001" + step + "\001" + (60) + 
						"\001" + beanKey.getHttp_referer() + "\001" + beanKey.getHttp_user_agent() + "\001" + 
						beanKey.getBody_bytes_sent() + "\001"
								+ beanKey.getStatus());
				context.write(k, v);*/
				
				if(lastTimeStr != null) {
					try {
						// beanKey又是下一次的記錄了  lastTimeStr 保留的是上一條記錄的訪問時間
						// stayTime = toDate(beanKey.getTime_local()).getTime() - toDate(lastTimeStr).getTime();
						stayTime = timeDiff(beanKey.getTime_local(), lastTimeStr);
					} catch (ParseException e) {
						e.printStackTrace();
					}
					
					if(stayTime < 30*60*1000) {  // 同一個IP訪問的時間差  <  30 min 認為是同一個 session
						k.set(sessionId + "\001" + lastIpAndUser + "\001" + lastUrl + "\001" + lastTimeStr + "\001" + 
								step + "\001" + (stayTime/1000) + "\001" + lastSaveStr);
						// 往外寫資料了
						context.write(k, v);
						step++;
					} else {   // 同一個IP訪問的時間差  > 30min  認為是不同的session   上一條記錄的訪問時間 是 60
						k.set(sessionId + "\001" + lastIpAndUser + "\001" + lastUrl + "\001" + lastTimeStr + "\001" + 
								step + "\001" + (60) + "\001" + lastSaveStr);
						context.write(k, v);   // 這一系的ip的最後一條資料  在這裡是不輸出的,  還要繼續往下走
						// 輸出完上一條之後,重置step編號
						step = 1;
						// session 也要重新生成
						sessionId = UUID.randomUUID().toString();
					}
					
					
				}
				// 初識的設定
				lastTimeStr = beanKey.getTime_local();
				lastSaveStr = beanKey.getHttp_referer() + "\001" + beanKey.getHttp_user_agent() + "\001" + 
						beanKey.getBody_bytes_sent() + "\001" + beanKey.getStatus();
				lastUrl = beanKey.getRequest();
				lastIpAndUser = beanKey.getRemote_addr() + "\001" + beanKey.getRemote_user() ;
			}
			
			// 下面的這條資料是最後一條資料
			k.set(sessionId + "\001" + lastIpAndUser + "\001" + lastUrl + "\001" + lastTimeStr + "\001" + 
					step + "\001" + (60) + "\001" + lastSaveStr);
			context.write(k, v);
			System.out.println("---------------");
			
		}
		
		//  **********************工具方法************************
		private String toStr(Date date) {
			SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.US);
			return sdf.format(date);
		}
		
		private Date toDate(String timeStr) throws ParseException {
			SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.UK);
			return sdf.parse(timeStr);
		}
		
		// 算時間差
		private long timeDiff(String time1, String time2) throws ParseException {
			Date d1 = toDate(time1);
			Date d2 = toDate(time2);
			return d1.getTime() - d2.getTime();
		}
	}
	
	
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(ClickStream.class);
		
		job.setMapperClass(ClickStreamMapper.class);
		job.setReducerClass(ClickStreamReducer.class);
		
		job.setMapOutputKeyClass(WeblogBean.class);
		job.setMapOutputValueClass(Text.class);
		
		// out
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		
		
		job.setGroupingComparatorClass(IpGroupingComparator.class);
		
		FileInputFormat.setInputPaths(job, new Path("f:/weblog_2/output"));
		FileOutputFormat.setOutputPath(job, new Path("f:/weblog_2/pageviews"));
		
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(new Path("f:/weblog_2/pageviews"))) {
			fs.delete(new Path("f:/weblog_2/pageviews"), true);
		}

		job.waitForCompletion(true);
		
		
		
		
	}

}

3. ClickStreamVisit

package mr.flow.weblog.mr;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import mr.flow.weblog.bean.PageViewsBean;
import mr.flow.weblog.bean.SessionIdGroupingComparator;
import mr.flow.weblog.bean.VisitBean;

/**
 * 從PageViews模型中根據sessionId來繼續梳理出同一次會話中的資訊
 * 
 *  梳理之後向外輸出的資料的格式 :
 *  sessionId   開始訪問的時間     訪問結束的時間      開始的頁面       訪問結束的頁面      總共訪問的頁數	
 * 
 * @author 湯小萌
 * @date 2018年11月28日 下午8:42:26
 */
public class ClickStreamVisit {
	
	
	static class ClickStreamVisitMapper extends Mapper<LongWritable, Text, PageViewsBean, Text> {
		
		/**
		 * 這個Mapper的輸出可以為NullWritable  由於當我我在測試的時候就寫成了Text就一直沒改
		 */
		PageViewsBean beanKey = new PageViewsBean();
		// NullWritable v = NullWritable.get();
		Text v = new Text();
		
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String[] fields = value.toString().split("\001");
			int step = Integer.parseInt(fields[5]);  // 需要將每個PageViewBean的在這個session中是處在第幾步這個step變成int型別
			beanKey.set(fields[0], fields[1], fields[2], fields[3],fields[4], step, fields[6], fields[7], fields[8], fields[9]);
			v.set(beanKey.getSession() + " " + step);
			context.write(beanKey, v);
		}
	}
	
	static class ClickStreamVisitReducer extends Reducer<PageViewsBean, Text, NullWritable, VisitBean> {
		
		NullWritable k = NullWritable.get();
		// 取這次visit的首尾pageViews記錄,放入VisitBean中
		VisitBean visitBean = new VisitBean();
		@Override
		
		protected void reduce(PageViewsBean beanKey, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
// System.out.println("----------------");
			
			ArrayList<PageViewsBean> pvBeanList = new ArrayList<PageViewsBean>();
			
			for(Text str : values) {
				// System.out.println(beanKey + " || " + str);
				// 不能直接這樣天劍   是為是引用型別
				// pvBeanList.add(beanKey);
				PageViewsBean pvBean = new PageViewsBean();
				try {
					BeanUtils.copyProperties(pvBean, beanKey);
					pvBeanList.add(pvBean);
				} catch (IllegalAccessException | InvocationTargetException e) {
					e.printStackTrace();
				}
			}
// System.out.println(pvBeanList);
// System.out.println("----------------");
			
			
			// 取visit 的首記錄
			// visitBean.setInPage(pvBeanList.get(0).getRequest());
			visitBean.setInTime(pvBeanList.get(0).getTimeStr());
			// 取visit 的尾記錄
			visitBean.setOutPage(pvBeanList.get(pvBeanList.size() - 1).getRequest());
			visitBean.setOutTime(pvBeanList.get(pvBeanList.size() - 1).getTimeStr());
			// visit訪問的頁面數
			visitBean.setPageVisits(pvBeanList.size());
			// 來訪者的ip
			visitBean.setRemote_addr(pvBeanList.get(0).getRemote_addr());
			// 本次visit的referal
			visitBean.setReferal(pvBeanList.get(0).getReferal());
			visitBean.setSession(pvBeanList.get(0).getSession());
			
			context.write(k, visitBean);
		}
	}
	
	
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(ClickStreamVisit.class);
		
		job.setMapperClass(ClickStreamVisitMapper.class);
		job.setReducerClass(ClickStreamVisitReducer.class);
		
		
		job.setMapOutputKeyClass(PageViewsBean.class);
		// job.setMapOutputValueClass(NullWritable.class);
		job.setMapOutputValueClass(Text.class);
		
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputKeyClass(VisitBean.class);
		
		job.setGroupingComparatorClass(SessionIdGroupingComparator.class);
		
		
		FileInputFormat.setInputPaths(job, new Path("f:/weblog_2/pageviews/testLog.txt"));
		FileOutputFormat.setOutputPath(job, new Path("f:/weblog_2/visitout"));
		
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(new Path("f:/weblog_2/visitout"))) {
			fs.delete(new Path("f:/weblog_2/visitout"), true);
		}
		
		
		boolean res = job.waitForCompletion(true);
		System.exit(res?0:1);
		
		
		
	}
	
}