1. 程式人生 > >離線日誌採集統計分析

離線日誌採集統計分析

專案中資料採集:
flume
ELK(ElasticSearch logstash kibana)
搜尋引擎 日誌採集 資料分析視覺化平臺

在使用flume時,最注重的是資料的安全性,所以一般情況下channle=file
flume叢集中彙總多個flume的日誌資訊,前一個flume的sink是avro序列化資料下沉。
問題:
agent1和agent2要解決一個問題:採集的資料有多種型別,如何區分?
因為在給下游傳遞的過程中是混在一起的,所以為了分類進行儲存,需要為資料做標記,使用攔截器interceptor為每一條flume的記錄(一個flume event【header, body】)在header中新增一個自定義的屬性(key-value鍵值對)

#通過flume採集access、ugchead、ugctail日誌


a1.sources = access ugchead ugctail
a1.sinks = k1 k2
a1.channels = c1

# 配置source 
a1.sources.access.type = exec
a1.sources.access.command = tail -F /home/hadoop/logs/data-clean/data-access.log
a1.sources.access.interceptors = i1 i2
a1.sources.access.interceptors.i1.type = static
# 靜態的在header中新增一個key value
a1.sources.access.interceptors.i1.key = type
a1.sources.access.interceptors.i1.value = access
# 對於hdfs-sink,對於所有與時間相關的轉義序列,
# 事件標題中必須存在帶有“timestamp”鍵的標頭
#(除非hdfs.useLocalTimeStamp設定為true)。
# 自動新增此方法的一種方法是使用TimestampInterceptor。
# 所以我們這邊設定一個TimestampInterceptor
a1.sources.access.interceptors.i2.type = timestamp


a1.sources.ugchead.type = exec
a1.sources.ugchead.command = tail -F /home/hadoop/logs/data-clean/data-ugchead.log
a1.sources.ugchead.interceptors = i1 i2
a1.sources.ugchead.interceptors.i1.type = static
# 靜態的在header中新增一個key value
a1.sources.ugchead.interceptors.i1.key = type
a1.sources.ugchead.interceptors.i1.value = ugchead
a1.sources.ugchead.interceptors.i2.type = timestamp

a1.sources.ugctail.type = exec
a1.sources.ugctail.command = tail -F /home/hadoop/logs/data-clean/data-ugctail.log
a1.sources.ugctail.interceptors = i1 i2
a1.sources.ugctail.interceptors.i1.type = static
# 靜態的在header中新增一個key value
a1.sources.ugctail.interceptors.i1.key = type
a1.sources.ugctail.interceptors.i1.value = ugctail
a1.sources.ugctail.interceptors.i2.type = timestamp



# 對於sink的配置描述 使用avro日誌做資料的消費
# a1.sinks.k1.type = logger
# a1.sinks.k1.type = avro
# a1.sinks.k1.hostname = hadoop03
# a1.sinks.k1.port = 44444

#define sinkgroups  此處配置k1、k2的組策略,型別為均衡負載方式  
# a1.sinkgroups=g1  
# a1.sinkgroups.g1.sinks=k1 k2  
# a1.sinkgroups.g1.processor.type=load_balance  
# a1.sinkgroups.g1.processor.backoff=true  
# a1.sinkgroups.g1.processor.selector=round_robin

#define sinkgroups  此處配置k1、k2的組策略,型別為Failover
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop02
a1.sinks.k1.port = 44444

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop03
a1.sinks.k2.port = 44444


# 對於channel的配置描述 使用檔案做資料的臨時快取 這種的安全性要高
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/hadoop/data/flume/data-clean/checkpoint
a1.channels.c1.dataDirs = /home/hadoop/data/flume/data-clean/data


# 通過channel c1將source r1 r2 r3 和sink k1 關聯起來
a1.sources.access.channels = c1
a1.sources.ugchead.channels = c1
a1.sources.ugctail.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

問題:
1、Flume source has been removed due to an error in configuration
中文含義: Flume的source部分因為配置異常被移除。
表象: Flume中source沒有啟動。
解決辦法:檢視上下文,檢視諸如java.lang.IllegalStateException: The parameter command must be specified類似的具體異常,具體定位問題。(一般是配置檔案或者自定義source未寫正確導致,可以try-catch捕獲處理某些異常。)

2、攔截器
timestamp問題:
對於hdfs-sink,對於所有與時間相關的轉義序列,事件標題中必須存在帶有“timestamp”鍵的標頭(除非hdfs.useLocalTimeStamp設定為true)。自動新增此方法的一種方法是使用TimestampInterceptor。
所以我們這邊設定一個TimestampInterceptor。
詳細參考:

flume官網中hdfs-sink
3、執行監聽日誌檔案中的新增記錄,操作異常:
java.lang.OutOfMemoryError: GC overhead limit exceeded,簡稱OOM/OOME
兩種方案解決:
第一種方案: 給該flume程式加大記憶體儲存容量
預設值為 -Xmx20m(最大堆記憶體大小) --> -Xmx2000m
-Xms10(初始堆記憶體大小)
第二種方案:第一種搞不定的時候,比如機器可用記憶體不夠的時候,使用其它channel解決
比如:磁碟檔案,jdbc等
詳細參考:flume官網中的flume-channels

在實現flume的HA的過程中,使用failover sink時,發現在設定多個sink的優先順序的時候,沒有效果,理論上是隻要有優先順序高的sink去下沉到HDFS,優先順序低的不啟用,但是,在實際測試中發現,兩個sink裡面都有寫入資料。。。
理論上應該是隻有一個臨時檔案

ETL(extract transform loading)
資料倉庫:設計

清楚最終的效果
資料的清洗
框架:
資料清洗使用的框架:
MR、Hive、Sparkcore
原始資料 --------》標準化的資料 :
判斷資料欄位:是否需要脫敏(對映),是否需要刪除,可不可以衍生
如:ip可以轉換—》省市,基於地域維度進行資料的統計(問題在於如何通過ip得到省市)
request(請求方法,請求地址,請求協議)
status,appid,userid,mid,login_type,http_referer,user_agent,time

資料分析模組:
次日留存率:
如何計算?
要計算出今日和昨天的使用者的交集,這個交集就是留存的使用者,留存率就是這個交集/昨日使用者數
MR:如何使用MR對兩天的資料求交集?
進行交集的關聯的欄位:userid
很多資料可能是遊客的身份進行訪問,所以不能將這部分非常大的資料進行捨棄,所以這個關聯欄位,就不僅僅是 userid,還需要我們唯一的使用者表示欄位mid。
mid+userid
mid+null 如果mid相同?
BIDUPSID=26B8D7FB2C1DE6F17DD917F85E31BF84 登陸狀態下
BIDUPSID=26B8D7FB2C1DE6F17DD917F85E31BF84 登出狀態下
BIDUPSID=26B8D7FB2C1DE6F17DD917F85E31BF84;清理cookie之後
BAIDUID EED2B4AC7B051972D61990DEF0120F8D 不同瀏覽器
鍵 值
BAIDUID=EED2B4AC7B051972D61990DEF0120F8D

 		使用者登陸了?
	現在的問題,是用單一的mid或者userid都能準確的判斷出使用者的唯一性,所以我們需要兩步來完成使用者唯一的判斷。
	第一步使用mid+userid這樣,這樣我們就能夠統計出所有的使用者資訊。
		去掉mid留下userid(去重)就是唯一的使用者,
	第二步如果userid為null,這個時候使用mid作為唯一標識即可,也就是說吧userid去掉。

如何使用MR完成表的(前天的表)join(昨天的表)
mr的join操作:
map join
考慮將其中一張表在一個task任務中複製一份,載入記憶體,和另外一張表進行關聯操作,
在編碼中就是map做對映。
前提是載入到記憶體中的表是一張小表,不建議這張表超過100M。
reduce join
將資料使用關聯的欄位傳送到reduce端進行join操作,所以我們就需要在reduce端知道那一條記錄來自於左表,那一條記錄來自於右表。
為每條記錄新增一個標籤。
原始資料: 598d5d7d-e1c4-46a0-92c5-d91366050f12 20202 。。。
Mapper1: 按照 mid+userid為key,時間為value為map的結果輸出
Reducer1: <mid+userid, [2018-11-22, 2018-11-23,2018-11-22, 2018-11-23] 去重
資料結果集: mid + uid + 使用者日誌操作日期集合
Mapper2: 2018-11-22_2018-11-23---->1
Reducer2: 2018-11-22_2018-11-23---->sum

外鏈TopN:
以外鏈為key,求wordcount
Mapper1和Reducer1: referer,sum
求topN:
Mapper2: setup TreeSet //設定一個有序集合,按照字串中的第二列進行降序排序
map ts.size() > topn ==> ts.pollLast()
cleanup 遍歷寫入
Reducer2: 同Mapper2邏輯相同
注意:
mr中的引數TopN的傳遞,不能使用全域性的靜態變數!
mr中傳遞引數只能通過configuration來進行!

將mysql中的使用者資料上傳到hdfs,方便和統計後的標準資料進行關聯,在此基礎上進行性別,年齡指標的統計,地域統計。
判斷:上傳一次還是每天增量上傳
mysql中儲存的使用者的資料,要增量的上傳到hdfs
1. 寫mysql指令碼,再將資料查詢到本地,然後再通過hdfs dfs -put 上傳到每天的目錄
2. mapreduce, 有DBInputFormat,將db中的資料,上傳到hdfs中
3. sqoop,將資料直接從db --> hdfs

yesterday=`date -d"1 day ago" +"%Y-%m-%d"`	
yesterdayDir=`date -d"1 day ago" +"%Y/%m/%d"`		
SQOOP_HOME=xxxx
$SQOOP_HOME/bin/sqoop import \
--connect xxxx \
--username xxxx \
--username xxxx \
--password xxxx \
--query "select * from t_user where DATE(regster_date) between ${yesterday} and ${yesterday} and \$conditions" \
--target-dir hdfs://hadoop/xxxx/${yesterdayDir}
-m 4

注意:通過sqoop匯入資料的時候,某一天的資料錄入失敗?
通知 發郵件等
補救方法: 手動提交指令碼 迴圈

UGC(使用者生成的內容)
只能在程式碼中加入埋點,當觸發該埋點之後,觸發一條資訊記錄。
ugchead (appid ip mid seid userid param time)
head
ugctail (appid ip mid seid userid param time)
tail
為了保障資料的不丟失,計算的時候,去ugchead和ugctail的並集(交集|差集|餘弦定理|距離公式|矩陣)
為了日誌資訊,可擴充套件,xml—>可以,但是不建議 ==》json
注意:
MapReduce常見問題