自定義OutputFormat -實現往不同的目錄輸出檔案
阿新 • • 發佈:2018-11-24
程式碼地址 :
https://gitee.com/tanghongping/hadoopMapReduce/tree/master/src/com/thp/bigdata/myInputFormat
需求:
現有一些原始日誌需要做增強解析處理,流程:
1、 從原始日誌檔案中讀取資料
2、 根據日誌中的一個URL欄位到外部知識庫中獲取資訊增強到原始日誌
3、 如果成功增強,則輸出到增強結果目錄;如果增強失敗,則抽取原始資料中URL欄位輸出到待爬清單目錄
分析:
程式的關鍵點是要在一個mapreduce程式中根據資料的不同輸出型別結果到不同的目錄,這類靈活的輸出需求可以通過自定義的OutputFormat來實現。
DBLoader : 連線資料庫,從資料庫中將字典資料快取出來
package com.thp.bigdata.logEnhance; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Set; /** * 資料庫連線 * java.sql包中的介面,它是sun公司為了簡化,統一對資料庫的操作,定義了一套java操作資料庫的規範, * 由各個資料庫公司自己實現,資料庫有mysql oracle等, * 而com.mysql.jdbc包中的類是mysql自己實現規範介面的類, * 不同的資料庫有不同的實現,為了能夠只寫一套程式碼,實現跨資料庫使用, * 書寫傳統jdbc需要匯入的包就使用java.sql包,而不用考慮具體的實現類。 * @author 湯小萌 * */ public class DBLoader { /** * 從資料庫中將url對應的內容全部放到HashMap中進行快取 */ public static void dbLoader(Map<String, String> urlContentMap) { Connection con = null; Statement st = null; ResultSet rs = null; try { Class.forName("com.mysql.jdbc.Driver"); con = DriverManager.getConnection("jdbc:mysql://localhost:3306/urldb", "root", "root"); st = con.createStatement(); rs = st.executeQuery("select url, content from url_rule"); while(rs.next()) { urlContentMap.put(rs.getString(1), rs.getString(2)); } } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (SQLException e) { e.printStackTrace(); } finally { if(rs != null) { try { rs.close(); } catch (SQLException e) { e.printStackTrace(); } finally { rs = null; } } if(st != null) { try { st.close(); } catch (SQLException e) { e.printStackTrace(); } finally { st = null; } } if(con != null) { try { con.close(); } catch (SQLException e) { e.printStackTrace(); } finally { con = null; } } } } // 測試資料庫連線成功 public static void main(String[] args) { HashMap<String, String> urlContentMap = new HashMap<String, String>(); dbLoader(urlContentMap); // Set<Entry<String, String>> entrySet = urlContentMap.entrySet(); for(Entry<String, String> entrySet : urlContentMap.entrySet()) { System.out.println(entrySet.getKey() + " : " + entrySet.getValue()); } } }
自定義的OutputFormat : 可以實現跟資料型別的不同向不同的目錄輸出:
package com.thp.bigdata.logEnhance; import java.io.IOException; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 日誌增強的自定義OutputFormat * 根據資料的不同輸出型別到不同的輸出目錄 * @author 湯小萌 * */ public class LogEnhanceOutputFormat extends FileOutputFormat<Text, NullWritable> { @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { FileSystem fs = FileSystem.get(job.getConfiguration()); Path enhancePath = new Path("f:/enhancelog/output_en/log.txt"); Path tocrawlPath = new Path("f:/enhancelog/output_crw/url.txt"); FSDataOutputStream enhanceOS = fs.create(enhancePath); FSDataOutputStream tocrawlOS = fs.create(tocrawlPath); return new EnhanceRecordWriter(enhanceOS, tocrawlOS); } /** * 這個RecordWriter類才是真正往外寫檔案的 * 需要往這個類的構造方法中傳遞輸出流,在getRecordWriter()方法中就要進行構建這兩個輸出流 * @author 湯小萌 * */ static class EnhanceRecordWriter extends RecordWriter<Text, NullWritable> { FSDataOutputStream enhanceOS = null; FSDataOutputStream tocrawlOS = null; public EnhanceRecordWriter(FSDataOutputStream enhanceOS, FSDataOutputStream tocrawlOS) { super(); this.enhanceOS = enhanceOS; this.tocrawlOS = tocrawlOS; } // 往外寫檔案的邏輯 @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { String dataLine = key.toString(); if(dataLine.contains("tocrawl")) { // 如果寫的資料裡面包含 "tocrawl",那麼就是不完全的資料,需要寫入待爬清單檔案 tocrawlOS.write(dataLine.getBytes()); } else { // 如果寫的資料沒有包含"tocrawl",就說明寫出的資料是增強日誌,那麼就需要寫入增強日誌的檔案 enhanceOS.write(dataLine.getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { if(enhanceOS != null) { enhanceOS.close(); } if(tocrawlOS != null) { tocrawlOS.close(); } } } }
MapReduce 執行 :
package com.thp.bigdata.logEnhance;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;
/**
* 日誌增強:
* 寫入不同的檔案
* @author 湯小萌
*
*/
public class LogEnhance {
static class LogEnhanceMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
Map<String, String> urlContentMap = new HashMap<String, String>();
Text k = new Text();
NullWritable v = NullWritable.get();
/**
* 從資料庫中載入資料到HashMap快取下來
*/
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
DBLoader.dbLoader(urlContentMap);
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 獲取一個計數器 - (這個計數器是全域性的計數器) 記錄不合法的日誌行數,組名,計數器名稱
Counter counter = context.getCounter("malFormed", "malFormedCounter");
String line = value.toString();
String[] fields = line.split("\t");
try {
String url = fields[28];
System.out.println(url);
String content_tag = urlContentMap.get(url);
// System.out.println(content_tag);
if(content_tag == null) { // 從知識庫中根據對應的url查詢的內容為空,
k.set(url + "\t" + "tocrawl" + "\n"); // 自定義的輸出流沒有包裝,不能換行
context.write(k, v);
} else {
k.set(line + "\t" + content_tag + "\n");
System.out.println(k.toString());
context.write(k, v);
}
} catch (Exception e) {
// 有的資料可能是不合法的,長度不夠,不完整的資料
e.printStackTrace();
System.err.println("資料不合法");
counter.increment(1);
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(LogEnhance.class);
job.setMapperClass(LogEnhanceMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 要控制不同的檔案內容寫往不同的目標路徑,採用自定義的OutputStream
job.setOutputFormatClass(LogEnhanceOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("f:/enhancelog/input"));
// 儘管我們在自定義的OutputFormat裡面已經設定好了輸出的路徑
// 但是在FileOutputFormat中,必須輸出一個_success檔案,所以還需要設定輸出path
FileOutputFormat.setOutputPath(job, new Path("f:/enhancelog/output"));
// 現在只是做日誌的清洗,還不需要reduce task
job.setNumReduceTasks(0);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
@Test
public void test() {
// String str = "1374609560.11 1374609560.16 1374609560.16 1374609560.16 110 5 8615038208365 460023383869133 8696420056841778 2 460 0 14615 54941 10.188.77.252 61.145.116.27 35020 80 6 cmnet 1 221.177.218.34 221.177.217.161 221.177.218.34 221.177.217.167 ad.veegao.com http://ad.veegao.com/veegao/iris.action Apache-HttpClient/UNAVAILABLE (java 1.4) POST 200 593 310 4 3 0 0 4 3 0 0 0 0 http://ad.veegao.com/veegao/iris.action 5903903079251243019 5903903103500771339 5980728";
String str = "1374609557.12 1374609557.15 1374609557.15 1374609557.74 110 5 8615093268715 460023934411519 3588660433773101 2 460 0 14822 29343 10.188.77.164 223.203.194.156 42384 80 6 cmnet 1 221.177.218.41 221.177.217.161 221.177.218.41 221.177.217.167 ugc.moji001.com http://ugc.moji001.com/sns/GetNewestShare/100/489?UserID=42958568&Platform=Android&Version=10023802&BaseOSVer=10&PartnerKey=5007&Model=GT-S7500&Device=phone&VersionType=1&TS= Apache-HttpClient/UNAVAILABLE (java 1.4) GET 200 421 363 3 2 0 0 3 2 0 0 0 0 http://ugc.moji001.com/sns/GetNewestShare/100/489?UserID=42958568&Platform=Android&Version=10023802&BaseOSVer=10&PartnerKey=5007&Model=GT-S7500&Device=phone&VersionType=1&TS= 5903903047315243019 5903903087191863307 5980488";
String[] fields = str.split("\t");
int count = 1;
for(String field : fields) {
System.out.println(count + " > " + field);
count++;
}
}
}
日誌資料:
https://pan.baidu.com/s/1xzlfGQ8R67bsDsTqTsOcrQ
資料庫中的字典資料的sql檔案:
https://pan.baidu.com/s/1SrExNEebLBVZqtr-MasjLA