1. 程式人生 > >基於論壇的apache common日誌分析專案

基於論壇的apache common日誌分析專案

專案描述

通過對技術論壇的apache common日誌進行分析,計算論壇關鍵指標,供運營者決策。

專案設計

  • MapReduce程式計算KPI
  • HBASE詳單查詢
  • HIVE資料倉庫多維分析

    日誌報告分析

開發步驟:

1. 使用flume把日誌資料匯入到hdfs中

技術:flume(源是資料夾,目的是hdfs和hbase,管道是檔案)
flume-hdfs.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/elon/log

a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop:9000/log/
a1.sinks.k1.hdfs.filePrefix = access-%Y-%m-%d
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2. 對資料進行清洗

技術:mapreduce
package com.elon33.bbs;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class bbsCleaner extends Configured implements Tool { @Override public int run(String[] args) throws Exception { final Job job = new Job(new Configuration(), bbsCleaner.class.getSimpleName()); job.setJarByClass(bbsCleaner.class); FileInputFormat.setInputPaths(job, args[0]); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception { ToolRunner.run(new bbsCleaner(), args); } static class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> { LogParser logParser = new LogParser(); Text v2 = new Text(); protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, Text>.Context context) throws java.io.IOException, InterruptedException { final String[] parsed = logParser.parse(value.toString()); // 過濾掉靜態資訊 if (parsed[2].startsWith("GET /static/") || parsed[2].startsWith("GET /uc_server")) { return; } // 過掉開頭的特定格式字串 if (parsed[2].startsWith("GET /")) { parsed[2] = parsed[2].substring("GET /".length()); } else if (parsed[2].startsWith("POST /")) { parsed[2] = parsed[2].substring("POST /".length()); } // 過濾結尾的特定格式字串 if (parsed[2].endsWith(" HTTP/1.1")) { parsed[2] = parsed[2].substring(0, parsed[2].length() - " HTTP/1.1".length()); } v2.set(parsed[0] + "\t" + parsed[1] + "\t" + parsed[2]); context.write(key, v2); }; } static class MyReducer extends Reducer<LongWritable, Text, Text, NullWritable> { protected void reduce(LongWritable k2, java.lang.Iterable<Text> v2s, org.apache.hadoop.mapreduce.Reducer<LongWritable, Text, Text, NullWritable>.Context context) throws java.io.IOException, InterruptedException { for (Text v2 : v2s) { context.write(v2, NullWritable.get()); } }; } static class LogParser { public static final SimpleDateFormat FORMAT = new SimpleDateFormat("d/MMM/yyyy:HH:mm:ss", Locale.ENGLISH); public static final SimpleDateFormat dateformat1 = new SimpleDateFormat("yyyyMMddHHmmss"); public static void main(String[] args) throws ParseException { final String S1 = "27.19.74.143 - - [30/May/2013:17:38:20 +0800] \"GET /static/image/common/faq.gif HTTP/1.1\" 200 1127"; LogParser parser = new LogParser(); final String[] array = parser.parse(S1); System.out.println("樣例資料: " + S1); System.out.format("解析結果: ip=%s, time=%s, url=%s, status=%s, traffic=%s", array[0], array[1], array[2], array[3], array[4]); } /** * 解析英文時間字串 * * @param string * @return * @throws ParseException */ private Date parseDateFormat(String string) { Date parse = null; try { parse = FORMAT.parse(string); } catch (ParseException e) { e.printStackTrace(); } return parse; } /** * 解析日誌的行記錄 * * @param line * @return 陣列含有5個元素,分別是ip、時間、url、狀態、流量 */ public String[] parse(String line) { String ip = parseIP(line); String time = parseTime(line); String url = parseURL(line); String status = parseStatus(line); String traffic = parseTraffic(line); return new String[] { ip, time, url, status, traffic }; } private String parseTraffic(String line) { final String trim = line.substring(line.lastIndexOf("\"") + 1).trim(); String traffic = trim.split(" ")[1]; return traffic; } private String parseStatus(String line) { final String trim = line.substring(line.lastIndexOf("\"") + 1).trim(); String status = trim.split(" ")[0]; return status; } private String parseURL(String line) { final int first = line.indexOf("\""); final int last = line.lastIndexOf("\""); String url = line.substring(first + 1, last); return url; } private String parseTime(String line) { final int first = line.indexOf("["); final int last = line.indexOf("+0800]"); String time = line.substring(first + 1, last).trim(); Date date = parseDateFormat(time); return dateformat1.format(date); } private String parseIP(String line) { String ip = line.split("- -")[0].trim(); return ip; } } }

資料清洗結果

hadoop fs -cat /user/elon/bbs_cleaned/2013_05_30/part-r-00000

3. 明細日誌使用hbase儲存,能夠利用ip、時間查詢

技術:設計表、預分割槽
在HBase中建立bbs_log表,其中包含一個列族cf
HBase=> create 'bbs_log','cf'
package com.elon33.bbs;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class bbsHBase extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {
        final Configuration configuration = new Configuration();
        // 設定zookeeper
        configuration.set("hbase.zookeeper.quorum", "hadoop");
        // 設定hbase表名稱
        configuration.set(TableOutputFormat.OUTPUT_TABLE, "bbs_log");// 先在shell下建立一個表:create
        // 將該值改大,防止hbase超時退出
        configuration.set("dfs.socket.timeout", "180000");

        final Job job = new Job(configuration, "bbsHBaseBatchImport");
        job.setJarByClass(bbsHBase.class);  //是否打jar包執行
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);

        // 設定map的輸出型別,不設定reduce的輸出型別
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setInputFormatClass(TextInputFormat.class);

        // 不再設定輸出路徑,而是設定輸出格式型別
        job.setOutputFormatClass(TableOutputFormat.class);

        FileInputFormat.setInputPaths(job, args[0]);  // 設定輸入檔案為mapreduce中已經清洗過的檔案
        job.waitForCompletion(true);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new bbsHBase(), args);
    }

    static class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
        Text v2 = new Text();
        public static final SimpleDateFormat dateformat = new SimpleDateFormat("yyyyMMddHHmmss");
        public static final SimpleDateFormat dateformat1 = new SimpleDateFormat("yyyyMMdd");

        protected void map(LongWritable key, Text value,
                org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, Text>.Context context)
                        throws java.io.IOException, InterruptedException {
            final String[] parsed = value.toString().split("\t");
            if(parsed.length==3){
                Date parseDate = null;
                String time1 = "";
                try {
                    parseDate = dateformat.parse(parsed[1]);
                    time1 = dateformat1.format(parseDate);
                } catch (ParseException e) {
                    e.printStackTrace();
                }
                String rowKey = parsed[0] + ":" + time1;// 設定行鍵:ip+time(只保留日期,去除時分秒)
                v2.set(rowKey + "\t" + parsed[0] + "\t" + parsed[1] + "\t" + parsed[2]); // ip+time ip  time  url
                context.write(key, v2);
            }else{
                return;
            }
        };
    }

    // 資料按行鍵和值 存入HBase中
    static class MyReducer extends TableReducer<LongWritable, Text, NullWritable> {
        protected void reduce(LongWritable k2, java.lang.Iterable<Text> v2s, Context context)
                throws java.io.IOException, InterruptedException {
            for (Text v2 : v2s) {
                final String[] splited = v2.toString().split("\t");
                final Put put = new Put(Bytes.toBytes(splited[0])); // 第一列行鍵
                put.add(Bytes.toBytes("cf"), Bytes.toBytes("date"), Bytes.toBytes(splited[1])); // 第二列IP
                put.add(Bytes.toBytes("cf"), Bytes.toBytes("time"), Bytes.toBytes(splited[2])); // 第三列time
                put.add(Bytes.toBytes("cf"), Bytes.toBytes("url"), Bytes.toBytes(splited[3])); // 第四列url
                context.write(NullWritable.get(), put);
            }
        };
    }
}

HBase中 bbs_log表儲存結果

4. 使用hive進行資料的多維分析

技術:hive(表、檢視)、自定義函式
# 存放資料的主分割槽表
hive -e "ALTER TABLE bbs ADD PARTITION(logdate='2013_05_30') LOCATION 'hdfs://hadoop:9000/user/elon/bbs_cleaned/2013_05_30';"
# create hive table everyday

## 統計單日PV數
hive -e "CREATE TABLE bbs_pv_2013_05_30 AS SELECT COUNT(1) AS PV FROM bbs WHERE logdate='2013_05_30';"

## 統計單日註冊數
hive -e "CREATE TABLE bbs_reguser_2013_05_30 AS SELECT COUNT(1) AS REGUSER FROM bbs WHERE logdate='2013_05_30' AND INSTR(url,'member.php?mod=register')>0;"

## 統計單日訪問IP使用者數
hive -e "CREATE TABLE bbs_ip_2013_05_30 AS SELECT COUNT(DISTINCT ip) AS IP FROM bbs WHERE logdate='2013_05_30';"

## 統計單日跳出數
hive -e "CREATE TABLE bbs_jumper_2013_05_30 AS SELECT COUNT(1) AS jumper FROM (SELECT COUNT(ip) AS times FROM bbs WHERE logdate='2013_05_30' GROUP BY ip HAVING times=1) e;"

## 以上四個結果彙總到一張表統計
hive -e "CREATE TABLE bbs_2013_05_30 AS SELECT '2013_05_30', a.pv, b.reguser, c.ip, d.jumper FROM bbs_pv_2013_05_30 a JOIN bbs_reguser_2013_05_30 b ON 1=1 JOIN bbs_ip_2013_05_30 c ON 1=1 JOIN bbs_jumper_2013_05_30 d ON 1=1;"

彙總表結果

5. 把hive分析結果使用sqoop匯出到mysql中

技術:sqoop、MySQL

建立bbs表

sqoop export --connect jdbc:mysql://hadoop:3306/bbs --username root --password 123456 --table bbs_logs --fields-terminated-by '\001' --export-dir 'hdfs://hadoop:9000/user/hive/warehouse/bbs_2013_05_30'

當最終分析的論壇指標資料匯出到MySQL中時,之前那些臨時表就可以刪除了。在下面的自動排程中,實現臨時表刪除。

6. 最後,使用linux的crontab做自動排程

要想通過指令碼實現每天自動排程進行日誌分析,就必須用到shell指令碼,將命令都封裝在shell指令碼中,通過每天日期的迭代和定時任務設定,實現自動排程分析日誌。

crontab -e 中實現定時任務設定

* 1 * * * bbs_daily.sh

bbs_daily.sh 指令碼中實現任務排程
bbs_daily.sh

#!/bin/sh

yesterday=`date --date='1 days ago' +%Y_%m_%d`
hmbbs_common.sh $yesterday

bbs_common.sh 中通過通用命令指令碼實現日誌分析過程,得到分析指標結果
bbs_common.sh

#!/bin/sh

#get yesterday format string
#yesterday=`date --date='1 days ago' +%Y_%m_%d`
yesterday=$1

#upload logs to hdfs
hadoop fs -put /apache_logs/access_${yesterday}.log  /bbs_logs

#cleaning data
hadoop jar /apache_logs/cleaned.jar  /bbs_logs/access_${yesterday}.log  /bbs_cleaned/${yesterday}  1>/dev/null


#alter hive table and then add partition to existed table
hive -e "ALTER TABLE bbs ADD PARTITION(logdate='${yesterday}') LOCATION '/bbs_cleaned/${yesterday}';"

#create hive table everyday
hive -e "CREATE TABLE bbs_pv_${yesterday} AS SELECT COUNT(1) AS PV FROM bbs WHERE logdate='${yesterday}';"
hive -e "CREATE TABLE bbs_reguser_${yesterday} AS SELECT COUNT(1) AS REGUSER FROM bbs WHERE logdate='${yesterday}' AND INSTR(url,'member.php?mod=register')>0;"
hive -e "CREATE TABLE bbs_ip_${yesterday} AS SELECT COUNT(DISTINCT ip) AS IP FROM bbs WHERE logdate='${yesterday}';"
hive -e "CREATE TABLE bbs_jumper_${yesterday} AS SELECT COUNT(1) AS jumper FROM (SELECT COUNT(ip) AS times FROM bbs WHERE logdate='${yesterday}' GROUP BY ip HAVING times=1) e;"
hive -e "CREATE TABLE bbs_${yesterday} AS SELECT '${yesterday}', a.pv, b.reguser, c.ip, d.jumper FROM bbs_pv_${yesterday} a JOIN bbs_reguser_${yesterday} b ON 1=1 JOIN bbs_ip_${yesterday} c ON 1=1 JOIN bbs_jumper_${yesterday} d ON 1=1;"

#delete hive tables
hive -e "drop table bbs_pv_${yesterday};"
hive -e "drop table bbs_reguser_${yesterday};"
hive -e "drop table bbs_ip_${yesterday};"
hive -e "drop table bbs_jumper_${yesterday};"


#sqoop export to mysql
sqoop export --connect jdbc:mysql://hadoop0:3306/bbs --username root --password admin --table bbs_logs --fields-terminated-by '\001' --export-dir '/user/hive/warehouse/bbs_${yesterday}'

#delete hive tables
hive -e "drop table bbs_${yesterday};"