1. 程式人生 > >使用awk生成access日誌多維度分析報表

使用awk生成access日誌多維度分析報表

原由:

某天某專案網站被一些IP惡意DDOS,因為沒有賣運營商的流量清洗等等之類的防護服務,導致該專案無法訪問
產生了三個處理需求:
    需要識別惡意IP進行封堵
    需要定位被攻擊的頁面查詢攻擊弱點
    需要定位攻擊頻繁的時段進行監控
這三個需求其實都不能實質解決問題
因為IP是封不完的,監控也是隻能知道自己有沒有被攻擊而已,而找到了被攻擊的頁面也只是找到了本次被攻擊的弱點而已
類似的漏洞是數不勝數的,解決辦法只有上架流量清洗裝置或者從運營商那裡買防護服務或者請專門的白帽子團隊進行測試

測試資料:

tomcat的access日誌格式配置:server.conf
pattern="%{X-FORWARDED-FOR}i %l %u %t %r %s %b %D %q %{User-Agent}i %T"
測試資料是生產某專案的真實access日誌做了IP和url的脫敏處理之後的資料

這裡寫圖片描述

識別惡意IP:

需求1:根據訪問IP的訪問量生成報表
分析1:如果以空格作為分隔符,那麼就是以第1列作為分析的資料列,進行分類彙總排序生成報表,單維度的分析報表
命令1:

cat tomcat8_access.log.2018-05-09.txt|\
awk '{IPs[$1]++}END{for(ip in IPs) print ip,IPs[ip]}'|\
sort -nk2|column -t|tail

這裡寫圖片描述
解釋1:

1. 使用cat命令讀取整個檔案
2. awk命令分為兩部分 {IPs[\$1]++} 和 END{for(ip in IPs) print ip,IPs[ip]}
3.
第一部分 {IPs[\$1]++} 將每一行的\$1,也就是第1列作為下標名,存入名為IPs的陣列 IPs陣列在建立前並沒有定義,因此是一個空的陣列 每一行的$1如果是首次存入該陣列,存入前該下標元素也是不存在的,為空 這裡的 ++ 相當於C語言的 i++ 操作,也就是加1,如果原來為空,則操作後為1,否則為自加1 這就實現了ip為下標的IPs的陣列元素的建立和自增計數: ip第一次出現的時候建立,值為1 如果不是第一次出現,那麼則自加 值就是ip出現的次數,那麼也就代表訪問IP的訪問量 4. 第二部分 END{for(ip in IPs) print
ip,IPs[ip]} 這裡的END表示所有的資料行全部處理完之後再做操作 如果沒有END,則表示每一行都進行這兩部分的操作,陣列自加和列印,處理邏輯就是錯誤的 所有的資料行全部處理完之後得到的IPs這個陣列是一個一維陣列,用for迴圈進行遍歷列印 列印每個ip和該ip作為陣列下標在IPs中的值,即訪問IP的訪問量 5. 使用sort命令對第二列進行升序排序 -k2,以數字型別進行排序 -n 6. 使用column命令格式化輸出,使用tail命令取出尾部幾行進行驗證

定位被攻擊的頁面:

可以簡單的以頁面的訪問量生成報表:

cat tomcat8_access.log.2018-05-09.txt|\
awk '{URLs[$7]++}END{for(url in URLs) print url,URLs[url]}'|\
sort -nk2|column -t|tail

但是這是不準確的,有些頁面如首頁是每次正常訪問都會被點選的,因此應該是根據IP和URL兩個維度生成報表

需求2:根據訪問IP的訪問量和被訪問的頁面生成報表
分析2:如果以空格作為分隔符,那麼就是第1列和第7列作為分析的資料列,生成雙維度的分析報表
命令2_1:

cat tomcat8_access.log.2018-05-09.txt|\
awk '{IPs[$1]++;IPsURLs[$1":"$7]++}
  END{for(ip in IPs)
       {for(ipurl in IPsURLs)
         {
           split(ipurl,INFO,":")
           if(INFO[1]==ip)
           printf("IP:%s\n\t總訪問量:%d\n\tIP訪問的相應URL:%s\n\t該URL訪問量:%d\n\tINFO中的IP:%s\n\tINFO中的URL:%s\n",ip,IPs[ip],ipurl,IPsURLs[ipurl],INFO[1],INFO[2])
         }
       }
     }'|head -18

這裡寫圖片描述
命令2_2,上一個命令基礎上優化展示資訊:

cat tomcat8_access.log.2018-05-09.txt|\
awk '{IPs[$1]++;IPsURLs[$1":"$7]++}
  END{for(ip in IPs)
       {for(ipurl in IPsURLs)
         {
           split(ipurl,INFO,":")
           if(INFO[1]==ip)
           printf("IP:%s 總訪問量:%d 訪問的URL:%s 訪問量:%d\n",ip,IPs[ip],INFO[2],IPsURLs[ipurl])
         }
       }
     }'|sort -nk2 -nrk4|column -t|tail -50

這裡寫圖片描述
解釋2_1:

根據解釋1,不難理解awk分為兩個部分
第一部分將$1存入IPs陣列,將$1$7用冒號拼接成一列,存入IPsURLs陣列
第二部分使用for迴圈遍歷IPs陣列,遍歷IPs的每一個ip的時候
再次使用for迴圈遍歷IPsURLs陣列,遍歷IPsURLs的每一個ipurl的時候
將該ipurl做切分,存入INFO陣列,下標從1開始的一個臨時陣列
此時我們就擁有了這些資料:
    IPs中的ip,IPs中的訪問量IPs[ip]
    INFO中的ip,INFO中的url,和該ip對應的url的訪問量IPsURLs[ipurl]
    因為這是兩個for迴圈的巢狀遍歷,因此當IPs中的ip和INFO中的ip相等時就是所需資料
    其他情況則是巢狀迴圈遍歷生成的冗餘資料,這個地方類似於笛卡爾積和等值連線的區別

解釋2_2:

我們換個角度去解釋這個awk的報表:
這裡有兩張關係表:
IPs(ip,pv)
    列ip是主鍵,列pv代表點選量,其值就是IPs[ip]
IPsURLs('ip:url',pv)
    列'ip:url'可以看作是兩欄位的組合主鍵
    那麼ip相當於外來鍵將兩表關聯成了一對多關係
    列pv依然代表點選量,但是代表的是某一個ip對某一個url的點選量
    其值就是IPsURLs['ip:url']
這就是awk的第一部分做的兩個資料的生成操作
第二部分操作,首先要對IPsURLs進行處理
    因為列'ip:url'實際上是兩列做的字串拼接出來的一列
    因此使用awk的內建函式split,指定分隔符為冒號,將分割出的倆列存入INFO中
    INFO是下標從1開始一個列表,相當於將每一個ipurl臨時生成了一個INFO表:INFO(ip,url)
此時我們就擁有了IPs表的ip和該ip的點選量
INFO表的ip和url以及這倆維度相關聯的點選量IPsURLs['ip:url']
那麼INFO表的ip和IPs表的ip相等時就是所需要的資料

將第二部分寫成SQL:
with info as(
select regexp_substr('ip:url','[^:]+',1,1) as split_ip,
       regexp_substr('ip:url','[^:]+',1,2) as split_url,
       pv as split_pv
  from IPsURLs)
select ip,pv,split_url,split_pv
  from info,IPs
 where info.split_ip=IPs.ip;
-- 這個是在oracle執行的sql,對mysql的高階sql沒有愛
-- 這個sql只是憑經驗寫的一個解釋性的sql,沒有經過資料驗證

通過這個解釋,我們看出,實際上是用awk做了關係資料的生成和拆解查詢
所有的所謂報表基本上都是對資料集合進行的操作
只不過是過程化處理+集合處理相結合的方式進行處理
這也是為啥BI職位需要C或者Python等語言的編碼能力和SQL能力

看到這裡,玩過hive的筒子們就能想到了,這不就是hive對此類需求的處理原理麼!
平面檔案-->格式轉換-->匯入hive庫-->查詢分析-->展示

定位攻擊頻繁的時段:

實際上滿足需求只需要根據IP和小時時刻兩個維度進行生成報表即可
上面兩個需求我們分析了單維度和雙維度的報表,這個需求我們做一下三維度的報表
根據IP、小時時刻和url生成報表,單純的使用空格分隔符是無法直接切出來這三個資訊的
需要使用空格和冒號兩個分隔符來切分資料:

head tomcat8_access.log.2018-05-09.txt |awk -F' |:' '{print $1,$5,$10}'

這裡寫圖片描述

使用BEGIN設定分隔符,替代-F引數,顯得更高大上一點
然後儘量以格式化的方法寫命令:

head tomcat8_access.log.2018-05-09.txt |\
awk 'BEGIN{FS=" |:"}
          {c1s[$1]++;c5s[$5]++;c10s[$10]++;
           c1_5s[$1":"$5]++;c1_10s[$1":"$10]++;c5_10s[$5":"$10]++
           c1_5_10s[$1":"$5":"$10]}
       END{for(c1 in c1s) {print c1,c1s[c1]}
           {print "\n\n"}
           for(c5 in c5s) {print c5,c5s[c5]}
           {print "\n\n"}
           for(c10 in c10s) {print c10,c10s[c10]}
           {print "\n\n"}
           for(c1_5 in c1_5s) {print c1_5,c1_5s[c1_5]}
           {print "\n\n"}
           for(c1_10 in c1_10s) {print c1_10,c1_10s[c1_10]}
           {print "\n\n"}
           for(c5_10 in c5_10s) {print c5_10,c5_10s[c5_10]}
           {print "\n\n"}
           for(c1_5_10 in c1_5_10s) {print c1_5_10,c1_5_10s[c1_5_10]}
           {print "\n\n"}
          }'

這裡寫圖片描述

head tomcat8_access.log.2018-05-09.txt |\
awk 'BEGIN{FS=" |:"}
          {c1s[$1]++;c5s[$5]++;c10s[$10]++;
           c1_5s[$1":"$5]++;c1_10s[$1":"$10]++;c5_10s[$5":"$10]++
           c1_5_10s[$1":"$5":"$10]}
       END{for(c1_5 in c1_5s)
             {split(c1_5,L1_5,":")
              printf("Col1: %s Col5: %s\n",L1_5[1],L1_5[2])}
           {print "\n\n"}
           for(c1_10 in c1_10s)
             {split(c1_10,L1_10,":")
              printf("Col1: %s Col10: %s\n",L1_10[1],L1_10[2])}
           {print "\n\n"}
           for(c5_10 in c5_10s)
             {split(c5_10,L5_10,":")
              printf("Col1: %s Col5: %s\n",L5_10[1],L5_10[2])}
           {print "\n\n"}           
           for(c1_5_10 in c1_5_10s)
             {split(c1_5_10,L1_5_10,":")
              printf("Col1: %s Col5: %s Col10: %s\n",L1_5_10[1],L1_5_10[2],L1_5_10[3])}
           {print "\n\n"}           
          }'

這裡寫圖片描述
剩下的就是根據維度展示的需求,for迴圈多層巢狀,if做等值判斷之後的列印操作了

公式化套用:

把常用的單維度和雙維度報表抽成公式,以便以後直接套用:

cat ...|\
awk 'BEGIN{FS="..."}
          {Colns[$n]++}
       END{for(Coln in Colns) print Coln,Colns[Coln]}'|\
sort -nk2|column -t

cat ...|\
awk 'BEGIN{FS="..."}
     {Colxs[$x]++;Colxys[$x":"$y]++}
     END{for(Colx in Colxs)
          {for(Colxy in Colxys)
            {
             split(Colxy,TEMP,":")
             if(TEMP[1]==Colx)
             print Colx,Colxs[Colx],Colxys[Colxy],TEMP[1],TEMP[2]
            }
          }
        }'|sort -nk2 -nrk4|column -t

[TOC]