基於論壇的apache common日誌分析專案
阿新 • • 發佈:2019-02-15
專案描述
通過對技術論壇的apache common日誌進行分析,計算論壇關鍵指標,供運營者決策。
專案設計
- MapReduce程式計算KPI
- HBASE詳單查詢
HIVE資料倉庫多維分析
開發步驟:
1. 使用flume把日誌資料匯入到hdfs中
技術:flume(源是資料夾,目的是hdfs和hbase,管道是檔案)
flume-hdfs.conf
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /home/elon/log a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://hadoop:9000/log/ a1.sinks.k1.hdfs.filePrefix = access-%Y-%m-%d a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.useLocalTimeStamp = true # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
2. 對資料進行清洗
技術:mapreduce
package com.elon33.bbs;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class bbsCleaner extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
final Job job = new Job(new Configuration(), bbsCleaner.class.getSimpleName());
job.setJarByClass(bbsCleaner.class);
FileInputFormat.setInputPaths(job, args[0]);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new bbsCleaner(), args);
}
static class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
LogParser logParser = new LogParser();
Text v2 = new Text();
protected void map(LongWritable key, Text value,
org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, Text>.Context context)
throws java.io.IOException, InterruptedException {
final String[] parsed = logParser.parse(value.toString());
// 過濾掉靜態資訊
if (parsed[2].startsWith("GET /static/") || parsed[2].startsWith("GET /uc_server")) {
return;
}
// 過掉開頭的特定格式字串
if (parsed[2].startsWith("GET /")) {
parsed[2] = parsed[2].substring("GET /".length());
} else if (parsed[2].startsWith("POST /")) {
parsed[2] = parsed[2].substring("POST /".length());
}
// 過濾結尾的特定格式字串
if (parsed[2].endsWith(" HTTP/1.1")) {
parsed[2] = parsed[2].substring(0, parsed[2].length() - " HTTP/1.1".length());
}
v2.set(parsed[0] + "\t" + parsed[1] + "\t" + parsed[2]);
context.write(key, v2);
};
}
static class MyReducer extends Reducer<LongWritable, Text, Text, NullWritable> {
protected void reduce(LongWritable k2, java.lang.Iterable<Text> v2s,
org.apache.hadoop.mapreduce.Reducer<LongWritable, Text, Text, NullWritable>.Context context)
throws java.io.IOException, InterruptedException {
for (Text v2 : v2s) {
context.write(v2, NullWritable.get());
}
};
}
static class LogParser {
public static final SimpleDateFormat FORMAT = new SimpleDateFormat("d/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);
public static final SimpleDateFormat dateformat1 = new SimpleDateFormat("yyyyMMddHHmmss");
public static void main(String[] args) throws ParseException {
final String S1 = "27.19.74.143 - - [30/May/2013:17:38:20 +0800] \"GET /static/image/common/faq.gif HTTP/1.1\" 200 1127";
LogParser parser = new LogParser();
final String[] array = parser.parse(S1);
System.out.println("樣例資料: " + S1);
System.out.format("解析結果: ip=%s, time=%s, url=%s, status=%s, traffic=%s", array[0], array[1], array[2],
array[3], array[4]);
}
/**
* 解析英文時間字串
*
* @param string
* @return
* @throws ParseException
*/
private Date parseDateFormat(String string) {
Date parse = null;
try {
parse = FORMAT.parse(string);
} catch (ParseException e) {
e.printStackTrace();
}
return parse;
}
/**
* 解析日誌的行記錄
*
* @param line
* @return 陣列含有5個元素,分別是ip、時間、url、狀態、流量
*/
public String[] parse(String line) {
String ip = parseIP(line);
String time = parseTime(line);
String url = parseURL(line);
String status = parseStatus(line);
String traffic = parseTraffic(line);
return new String[] { ip, time, url, status, traffic };
}
private String parseTraffic(String line) {
final String trim = line.substring(line.lastIndexOf("\"") + 1).trim();
String traffic = trim.split(" ")[1];
return traffic;
}
private String parseStatus(String line) {
final String trim = line.substring(line.lastIndexOf("\"") + 1).trim();
String status = trim.split(" ")[0];
return status;
}
private String parseURL(String line) {
final int first = line.indexOf("\"");
final int last = line.lastIndexOf("\"");
String url = line.substring(first + 1, last);
return url;
}
private String parseTime(String line) {
final int first = line.indexOf("[");
final int last = line.indexOf("+0800]");
String time = line.substring(first + 1, last).trim();
Date date = parseDateFormat(time);
return dateformat1.format(date);
}
private String parseIP(String line) {
String ip = line.split("- -")[0].trim();
return ip;
}
}
}
資料清洗結果
hadoop fs -cat /user/elon/bbs_cleaned/2013_05_30/part-r-00000
3. 明細日誌使用hbase儲存,能夠利用ip、時間查詢
技術:設計表、預分割槽
在HBase中建立bbs_log表,其中包含一個列族cf
HBase=> create 'bbs_log','cf'
package com.elon33.bbs;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class bbsHBase extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
final Configuration configuration = new Configuration();
// 設定zookeeper
configuration.set("hbase.zookeeper.quorum", "hadoop");
// 設定hbase表名稱
configuration.set(TableOutputFormat.OUTPUT_TABLE, "bbs_log");// 先在shell下建立一個表:create
// 將該值改大,防止hbase超時退出
configuration.set("dfs.socket.timeout", "180000");
final Job job = new Job(configuration, "bbsHBaseBatchImport");
job.setJarByClass(bbsHBase.class); //是否打jar包執行
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
// 設定map的輸出型別,不設定reduce的輸出型別
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
// 不再設定輸出路徑,而是設定輸出格式型別
job.setOutputFormatClass(TableOutputFormat.class);
FileInputFormat.setInputPaths(job, args[0]); // 設定輸入檔案為mapreduce中已經清洗過的檔案
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new bbsHBase(), args);
}
static class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
Text v2 = new Text();
public static final SimpleDateFormat dateformat = new SimpleDateFormat("yyyyMMddHHmmss");
public static final SimpleDateFormat dateformat1 = new SimpleDateFormat("yyyyMMdd");
protected void map(LongWritable key, Text value,
org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, Text>.Context context)
throws java.io.IOException, InterruptedException {
final String[] parsed = value.toString().split("\t");
if(parsed.length==3){
Date parseDate = null;
String time1 = "";
try {
parseDate = dateformat.parse(parsed[1]);
time1 = dateformat1.format(parseDate);
} catch (ParseException e) {
e.printStackTrace();
}
String rowKey = parsed[0] + ":" + time1;// 設定行鍵:ip+time(只保留日期,去除時分秒)
v2.set(rowKey + "\t" + parsed[0] + "\t" + parsed[1] + "\t" + parsed[2]); // ip+time ip time url
context.write(key, v2);
}else{
return;
}
};
}
// 資料按行鍵和值 存入HBase中
static class MyReducer extends TableReducer<LongWritable, Text, NullWritable> {
protected void reduce(LongWritable k2, java.lang.Iterable<Text> v2s, Context context)
throws java.io.IOException, InterruptedException {
for (Text v2 : v2s) {
final String[] splited = v2.toString().split("\t");
final Put put = new Put(Bytes.toBytes(splited[0])); // 第一列行鍵
put.add(Bytes.toBytes("cf"), Bytes.toBytes("date"), Bytes.toBytes(splited[1])); // 第二列IP
put.add(Bytes.toBytes("cf"), Bytes.toBytes("time"), Bytes.toBytes(splited[2])); // 第三列time
put.add(Bytes.toBytes("cf"), Bytes.toBytes("url"), Bytes.toBytes(splited[3])); // 第四列url
context.write(NullWritable.get(), put);
}
};
}
}
HBase中 bbs_log表儲存結果
4. 使用hive進行資料的多維分析
技術:hive(表、檢視)、自定義函式
# 存放資料的主分割槽表
hive -e "ALTER TABLE bbs ADD PARTITION(logdate='2013_05_30') LOCATION 'hdfs://hadoop:9000/user/elon/bbs_cleaned/2013_05_30';"
# create hive table everyday
## 統計單日PV數
hive -e "CREATE TABLE bbs_pv_2013_05_30 AS SELECT COUNT(1) AS PV FROM bbs WHERE logdate='2013_05_30';"
## 統計單日註冊數
hive -e "CREATE TABLE bbs_reguser_2013_05_30 AS SELECT COUNT(1) AS REGUSER FROM bbs WHERE logdate='2013_05_30' AND INSTR(url,'member.php?mod=register')>0;"
## 統計單日訪問IP使用者數
hive -e "CREATE TABLE bbs_ip_2013_05_30 AS SELECT COUNT(DISTINCT ip) AS IP FROM bbs WHERE logdate='2013_05_30';"
## 統計單日跳出數
hive -e "CREATE TABLE bbs_jumper_2013_05_30 AS SELECT COUNT(1) AS jumper FROM (SELECT COUNT(ip) AS times FROM bbs WHERE logdate='2013_05_30' GROUP BY ip HAVING times=1) e;"
## 以上四個結果彙總到一張表統計
hive -e "CREATE TABLE bbs_2013_05_30 AS SELECT '2013_05_30', a.pv, b.reguser, c.ip, d.jumper FROM bbs_pv_2013_05_30 a JOIN bbs_reguser_2013_05_30 b ON 1=1 JOIN bbs_ip_2013_05_30 c ON 1=1 JOIN bbs_jumper_2013_05_30 d ON 1=1;"
彙總表結果
5. 把hive分析結果使用sqoop匯出到mysql中
技術:sqoop、MySQL
建立bbs表
sqoop export --connect jdbc:mysql://hadoop:3306/bbs --username root --password 123456 --table bbs_logs --fields-terminated-by '\001' --export-dir 'hdfs://hadoop:9000/user/hive/warehouse/bbs_2013_05_30'
當最終分析的論壇指標資料匯出到MySQL中時,之前那些臨時表就可以刪除了。在下面的自動排程中,實現臨時表刪除。
6. 最後,使用linux的crontab做自動排程
要想通過指令碼實現每天自動排程進行日誌分析,就必須用到shell指令碼,將命令都封裝在shell指令碼中,通過每天日期的迭代和定時任務設定,實現自動排程分析日誌。
在 crontab -e
中實現定時任務設定
* 1 * * * bbs_daily.sh
在 bbs_daily.sh
指令碼中實現任務排程
bbs_daily.sh
#!/bin/sh
yesterday=`date --date='1 days ago' +%Y_%m_%d`
hmbbs_common.sh $yesterday
在bbs_common.sh
中通過通用命令指令碼實現日誌分析過程,得到分析指標結果
bbs_common.sh
#!/bin/sh
#get yesterday format string
#yesterday=`date --date='1 days ago' +%Y_%m_%d`
yesterday=$1
#upload logs to hdfs
hadoop fs -put /apache_logs/access_${yesterday}.log /bbs_logs
#cleaning data
hadoop jar /apache_logs/cleaned.jar /bbs_logs/access_${yesterday}.log /bbs_cleaned/${yesterday} 1>/dev/null
#alter hive table and then add partition to existed table
hive -e "ALTER TABLE bbs ADD PARTITION(logdate='${yesterday}') LOCATION '/bbs_cleaned/${yesterday}';"
#create hive table everyday
hive -e "CREATE TABLE bbs_pv_${yesterday} AS SELECT COUNT(1) AS PV FROM bbs WHERE logdate='${yesterday}';"
hive -e "CREATE TABLE bbs_reguser_${yesterday} AS SELECT COUNT(1) AS REGUSER FROM bbs WHERE logdate='${yesterday}' AND INSTR(url,'member.php?mod=register')>0;"
hive -e "CREATE TABLE bbs_ip_${yesterday} AS SELECT COUNT(DISTINCT ip) AS IP FROM bbs WHERE logdate='${yesterday}';"
hive -e "CREATE TABLE bbs_jumper_${yesterday} AS SELECT COUNT(1) AS jumper FROM (SELECT COUNT(ip) AS times FROM bbs WHERE logdate='${yesterday}' GROUP BY ip HAVING times=1) e;"
hive -e "CREATE TABLE bbs_${yesterday} AS SELECT '${yesterday}', a.pv, b.reguser, c.ip, d.jumper FROM bbs_pv_${yesterday} a JOIN bbs_reguser_${yesterday} b ON 1=1 JOIN bbs_ip_${yesterday} c ON 1=1 JOIN bbs_jumper_${yesterday} d ON 1=1;"
#delete hive tables
hive -e "drop table bbs_pv_${yesterday};"
hive -e "drop table bbs_reguser_${yesterday};"
hive -e "drop table bbs_ip_${yesterday};"
hive -e "drop table bbs_jumper_${yesterday};"
#sqoop export to mysql
sqoop export --connect jdbc:mysql://hadoop0:3306/bbs --username root --password admin --table bbs_logs --fields-terminated-by '\001' --export-dir '/user/hive/warehouse/bbs_${yesterday}'
#delete hive tables
hive -e "drop table bbs_${yesterday};"