1. 程式人生 > >自定義OutputFormat -實現往不同的目錄輸出檔案

自定義OutputFormat -實現往不同的目錄輸出檔案

程式碼地址 :
https://gitee.com/tanghongping/hadoopMapReduce/tree/master/src/com/thp/bigdata/myInputFormat

需求:

現有一些原始日誌需要做增強解析處理,流程:
1、 從原始日誌檔案中讀取資料
2、 根據日誌中的一個URL欄位到外部知識庫中獲取資訊增強到原始日誌
3、 如果成功增強,則輸出到增強結果目錄;如果增強失敗,則抽取原始資料中URL欄位輸出到待爬清單目錄

分析:

程式的關鍵點是要在一個mapreduce程式中根據資料的不同輸出型別結果到不同的目錄,這類靈活的輸出需求可以通過自定義的OutputFormat來實現。

DBLoader : 連線資料庫,從資料庫中將字典資料快取出來

package com.thp.bigdata.logEnhance;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

/**
 * 資料庫連線
 * java.sql包中的介面,它是sun公司為了簡化,統一對資料庫的操作,定義了一套java操作資料庫的規範,
 * 由各個資料庫公司自己實現,資料庫有mysql oracle等,
 * 而com.mysql.jdbc包中的類是mysql自己實現規範介面的類,
 * 不同的資料庫有不同的實現,為了能夠只寫一套程式碼,實現跨資料庫使用,
 * 書寫傳統jdbc需要匯入的包就使用java.sql包,而不用考慮具體的實現類。
 * @author 湯小萌
 *
 */
public class DBLoader {
	
	/**
	 * 從資料庫中將url對應的內容全部放到HashMap中進行快取
	 */
	public static void dbLoader(Map<String, String> urlContentMap) {
		Connection con = null;
		Statement st = null;
		ResultSet rs = null;
		try {
			Class.forName("com.mysql.jdbc.Driver");
			con = DriverManager.getConnection("jdbc:mysql://localhost:3306/urldb", "root", "root");
			st = con.createStatement();
			rs = st.executeQuery("select url, content from url_rule");
			while(rs.next()) {
				urlContentMap.put(rs.getString(1), rs.getString(2));
			}
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		} catch (SQLException e) {
			e.printStackTrace();
		} finally {
			if(rs != null) {
				try {
					rs.close();
				} catch (SQLException e) {
					e.printStackTrace();
				} finally {
					rs = null;
				}
			}
			if(st != null) {
				try {
					st.close();
				} catch (SQLException e) {
					e.printStackTrace();
				} finally {
					st = null;
				}
			}
			if(con != null) {
				try {
					con.close();
				} catch (SQLException e) {
					e.printStackTrace();
				} finally {
					con = null;
				}
			}
		}
	}
	
	// 測試資料庫連線成功
	public static void main(String[] args) {
		HashMap<String, String> urlContentMap = new HashMap<String, String>();
		dbLoader(urlContentMap);
		// Set<Entry<String, String>> entrySet = urlContentMap.entrySet();
		for(Entry<String, String> entrySet : urlContentMap.entrySet()) {
			System.out.println(entrySet.getKey() + " : " + entrySet.getValue());
		}
	}
}

自定義的OutputFormat : 可以實現跟資料型別的不同向不同的目錄輸出:

package com.thp.bigdata.logEnhance;

import java.io.IOException;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 日誌增強的自定義OutputFormat
 * 根據資料的不同輸出型別到不同的輸出目錄
 * @author 湯小萌
 *
 */
public class LogEnhanceOutputFormat extends FileOutputFormat<Text, NullWritable> {

	@Override
	public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
		FileSystem fs = FileSystem.get(job.getConfiguration());
		Path enhancePath = new Path("f:/enhancelog/output_en/log.txt");
		Path tocrawlPath = new Path("f:/enhancelog/output_crw/url.txt");
		FSDataOutputStream  enhanceOS = fs.create(enhancePath);
		FSDataOutputStream  tocrawlOS = fs.create(tocrawlPath);
		
		return new EnhanceRecordWriter(enhanceOS, tocrawlOS);
	}
	
	
	/**
	 * 這個RecordWriter類才是真正往外寫檔案的
	 * 需要往這個類的構造方法中傳遞輸出流,在getRecordWriter()方法中就要進行構建這兩個輸出流
	 * @author 湯小萌
	 *
	 */
	static class EnhanceRecordWriter extends RecordWriter<Text, NullWritable> {
		FSDataOutputStream enhanceOS = null;
		FSDataOutputStream tocrawlOS = null;
		public EnhanceRecordWriter(FSDataOutputStream enhanceOS, FSDataOutputStream tocrawlOS) {
			super();
			this.enhanceOS = enhanceOS;
			this.tocrawlOS = tocrawlOS;
		}
		
		// 往外寫檔案的邏輯
		@Override
		public void write(Text key, NullWritable value) throws IOException, InterruptedException {
			String dataLine = key.toString(); 
			if(dataLine.contains("tocrawl")) {
				// 如果寫的資料裡面包含 "tocrawl",那麼就是不完全的資料,需要寫入待爬清單檔案
				tocrawlOS.write(dataLine.getBytes());
			} else {
				// 如果寫的資料沒有包含"tocrawl",就說明寫出的資料是增強日誌,那麼就需要寫入增強日誌的檔案
				enhanceOS.write(dataLine.getBytes());
			}
		}

		@Override
		public void close(TaskAttemptContext context) throws IOException, InterruptedException {
			if(enhanceOS != null) {
				enhanceOS.close();
			}
			if(tocrawlOS != null) {
				tocrawlOS.close();
			}
		}
		
	}

}

MapReduce 執行 :

package com.thp.bigdata.logEnhance;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;

/**
 * 日誌增強:
 * 寫入不同的檔案
 * @author 湯小萌
 *
 */
public class LogEnhance {
	static class LogEnhanceMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
		Map<String, String> urlContentMap = new HashMap<String, String>();
		Text k = new Text();
		NullWritable v = NullWritable.get();
		
		/**
		 * 從資料庫中載入資料到HashMap快取下來
		 */
		@Override
		protected void setup(Context context)
				throws IOException, InterruptedException {
			DBLoader.dbLoader(urlContentMap);
		}
		
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			// 獲取一個計數器 - (這個計數器是全域性的計數器)  記錄不合法的日誌行數,組名,計數器名稱
			Counter counter = context.getCounter("malFormed", "malFormedCounter");
			String line = value.toString();
			String[] fields = line.split("\t");
			try {
				String url = fields[28];
				System.out.println(url);
				String content_tag = urlContentMap.get(url);
				// System.out.println(content_tag);
				if(content_tag == null) { // 從知識庫中根據對應的url查詢的內容為空,
					k.set(url + "\t" + "tocrawl" + "\n");  // 自定義的輸出流沒有包裝,不能換行 
					context.write(k, v);
				} else {
					k.set(line + "\t" + content_tag + "\n");
					System.out.println(k.toString());
					context.write(k, v);
				}
			} catch (Exception e) {
				// 有的資料可能是不合法的,長度不夠,不完整的資料
				e.printStackTrace();
				System.err.println("資料不合法");
				counter.increment(1);
			}
		}
	}
	
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		
		job.setJarByClass(LogEnhance.class);
		job.setMapperClass(LogEnhanceMapper.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		
		// 要控制不同的檔案內容寫往不同的目標路徑,採用自定義的OutputStream
		job.setOutputFormatClass(LogEnhanceOutputFormat.class);
		
		FileInputFormat.setInputPaths(job, new Path("f:/enhancelog/input"));
		
		// 儘管我們在自定義的OutputFormat裡面已經設定好了輸出的路徑
		// 但是在FileOutputFormat中,必須輸出一個_success檔案,所以還需要設定輸出path
		FileOutputFormat.setOutputPath(job, new Path("f:/enhancelog/output"));
		
		// 現在只是做日誌的清洗,還不需要reduce task
		job.setNumReduceTasks(0);
		
		System.exit(job.waitForCompletion(true) ? 0 : 1);
		
	}
	
	
	
	@Test
	public void test() {
		// String str = "1374609560.11	1374609560.16	1374609560.16	1374609560.16	110	5	8615038208365	460023383869133	8696420056841778	2	460	0	14615			54941	10.188.77.252	61.145.116.27	35020	80	6	cmnet	1	221.177.218.34	221.177.217.161	221.177.218.34	221.177.217.167	ad.veegao.com	http://ad.veegao.com/veegao/iris.action		Apache-HttpClient/UNAVAILABLE (java 1.4)	POST	200	593	310	4	3	0	0	4	3	0	0	0	0	http://ad.veegao.com/veegao/iris.action	5903903079251243019	5903903103500771339	5980728";
		String str = "1374609557.12	1374609557.15	1374609557.15	1374609557.74	110	5	8615093268715	460023934411519	3588660433773101	2	460	0	14822			29343	10.188.77.164	223.203.194.156	42384	80	6	cmnet	1	221.177.218.41	221.177.217.161	221.177.218.41	221.177.217.167	ugc.moji001.com	http://ugc.moji001.com/sns/GetNewestShare/100/489?UserID=42958568&Platform=Android&Version=10023802&BaseOSVer=10&PartnerKey=5007&Model=GT-S7500&Device=phone&VersionType=1&TS=		Apache-HttpClient/UNAVAILABLE (java 1.4)	GET	200	421	363	3	2	0	0	3	2	0	0	0	0	http://ugc.moji001.com/sns/GetNewestShare/100/489?UserID=42958568&Platform=Android&Version=10023802&BaseOSVer=10&PartnerKey=5007&Model=GT-S7500&Device=phone&VersionType=1&TS=	5903903047315243019	5903903087191863307	5980488";
		
		String[] fields = str.split("\t");
		int count = 1;
		for(String field : fields) {
			System.out.println(count + " >   " + field);
			count++;
		}
	}
	
	
}

日誌資料:

https://pan.baidu.com/s/1xzlfGQ8R67bsDsTqTsOcrQ

資料庫中的字典資料的sql檔案:
https://pan.baidu.com/s/1SrExNEebLBVZqtr-MasjLA

https://pan.baidu.com/s/19YYg39yteIa5VMcwaGSMfg