golang 流量統計系統視訊總結(二)
總體流程
解析使用者訪問行為日誌部分
程式碼實現:
package main
import (
"bufio"
"crypto/md5"
"encoding/hex"
"flag"
"github.com/mediocregopher/radix.v2/pool"
"github.com/mgutz/str"
"github.com/sirupsen/logrus"
"io"
"net/url"
"os"
"strconv"
"strings"
"time"
)
const HANDLE_DIG = " /dig?"
const HANDLE_MOVIE = "/movie/"
const HANDLE_LIST = "/list/"
const HANDLE_HTML = ".html"
// 收集命令列引數的結構體
type cmdParams struct{
logFilePath string
routineNum int
}
// 用於儲存一條訪問日誌資訊解析後的關鍵內容
type digData struct {
time string
url string
refer string
ua string
}
// 用於儲存使用者的訪問行為
type urlData struct{
data digData
uid string
unode urlNode
}
// 要記錄的資料節點,類似於資料庫中的單條資料
type urlNode struct{
unType string // 詳情頁 或者 列表頁 或者 首頁
unRid int // Resource ID 資源id
unUrl string // 當前這個頁面的url
unTime string // 當前訪問這個頁面的時間
}
// 儲存一條pv/uv統計資料
type storageBlock struct{
counterType string
storageModel string
unode urlNode
}
var log = logrus.New()
//var redisCli redis.Client
func init(){
log.Out = os.Stdout
log.SetLevel( logrus.DebugLevel)
/*
redisCli,err := redis.Dial("tcp","localhost:6379")
if err != nil{
log.Fatalln("Redis connect failed")
} else {
defer redisCli.Close()
}
*/
}
func main() {
// 獲取引數
logFilePath := flag.String("logFilePath","/User/Pangee/Public/nginx/log/dig.log","target log file path")
routineNum := flag.Int("routineNum",5,"consumer number by go routine")
l := flag.String("l","/tmp/log","runtime log file")
flag.Parse()
params := cmdParams{*logFilePath, *routineNum}
// 打日誌
logFd, err := os.OpenFile( *l ,os.O_CREATE|os.O_WRONLY, 0644)
if (err == nil){
log.Out = logFd
defer logFd.Close()
}
log.Infoln("exec strat.")
log.Infoln("params:log filepath = %s,routineNum = %d",params.logFilePath,params.routineNum)
// 初始化一些channel,用於資料傳遞
var logChannel = make(chan string, 3 * params.routineNum) // 用於日誌解析
var pvChannel = make(chan urlData, params.routineNum) // 用於pv統計
var uvChannel = make(chan urlData, params.routineNum) // 使用者uv統計
var storageChannel = make(chan storageBlock, params.routineNum) // 使用者儲存統計資料
// Redis pool
redisPool, err := pool.New("tcp","localhost:6379",2*params.routineNum)
if err!=nil{
log.Fatalln("redis pool created fail")
panic(err)
} else { // 保持redis的連線不閒置
go func() {
for {
redisPool.Cmd( "PING")
time.Sleep( 3 * time.Second)
}
}()
}
// 日誌消費者,往logChannel中寫日誌資料
go readFileLineByLine(params, logChannel)
// 建立一組日誌處理,從logChannel中讀資料,並將讀到的資料寫入到pvChannel,uvChannel
for i:=0;i<params.routineNum;i++{
go logConsumer(logChannel, pvChannel, uvChannel)
}
// 建立PV/UV 統計器,從pvChannel和uvChannel中讀取資料,然後將資料寫入storageChannel
go pvCounter(pvChannel, storageChannel)
go uvCounter(uvChannel, storageChannel, redisPool)
// 建立儲存器
go dataStorage(storageChannel, redisPool)
// 之後會封裝成daemon,但是先讓程式跑起來
time.Sleep(1000 * time.Second)
}
//逐行消費日誌
func readFileLineByLine(params cmdParams,logChannel chan string) error {
fd, err := os.Open(params.logFilePath)
if(err != nil){
log.Warningf("ReadFileLineByLine can't open file: %s",params.logFilePath)
return err
}
defer fd.Close()
count := 0
bufferRead := bufio.NewReader( fd )
for {
line, err := bufferRead.ReadString( '\n' )
logChannel <- line
count++
if count%(1000*params.routineNum) == 0{
log.Infof("ReadLineByLine line: %d", count)
}
if err != nil {
if err == io.EOF { //如果讀檔案讀完了,休息一下
time.Sleep( 3*time.Second )
log.Infof("ReadFileLineByLine wait,readLine: %d", count)
} else {
log.Warningf("ReadFileLineByLine read error")
}
}
}
return nil
}
//從logChannel中讀資料,並將讀到的資料寫入到pvChannel,uvChannel
func logConsumer(logChannel chan string, pvChannel,uvChannel chan urlData) error {
for logStr := range logChannel{
//切割日誌字串,摳出打點上報的資料
data := cutLogFetchData( logStr )
//uid,模擬生成uid,MD5(refer+ua)
hasher := md5.New()
hasher.Write( []byte( data.refer + data.ua ) )
uid := hex.EncodeToString( hasher.Sum(nil) )
//很多的解析工作都可以放到這裡完成
//json等....
uData := urlData{data, uid,formatUrl(data.url,data.time)}
pvChannel<-uData
uvChannel<-uData
}
return nil
}
//擷取上報資訊並返回結構體
func cutLogFetchData(logStr string) digData{
logStr = strings.TrimSpace(logStr)
pos1 := str.IndexOf( logStr, HANDLE_DIG, 0)
if pos1 == -1{
return digData{}
}
pos1 += len(HANDLE_DIG) //計算偏移量
pos2 := str.IndexOf( logStr, "HTTP/", pos1)
d := str.Substr(logStr, pos1, pos2-pos1)
//將擷取到的k=v&k=v的形式的字串轉換
urlInfo, err := url.Parse("http://localhost/?"+d) //要拼接成完整的網址是因為這個方法只認完整的網址才解析
if err != nil {
return digData{}
}
data := urlInfo.Query()
return digData{
data.Get("time"),
data.Get("refer"),
data.Get("url"),
data.Get("ua"),
}
}
// 統計pv,將統計到的資料放到storageBlock結構體中,然後寫入到storageChannel
func pvCounter(pvChannel chan urlData, storageChannel chan storageBlock){
for data := range pvChannel{
sItem := storageBlock{"pv","ZINCRBY",data.unode}
storageChannel <- sItem
}
}
// 統計uv,使用redis的HyperLoglog去重使用者,將統計到的資料放到storageBlock結構體中,然後寫入到storageChannel
func uvCounter(uvChannel chan urlData, storageChannel chan storageBlock, redisPool *pool.Pool){
for data := range uvChannel {
//HyperLoglog redis 去重使用者
hyperLogLogKey := "uv_hpll_"+ getTime(data.data.time, "day")
// 這行會報錯,因為執行到這裡時獲取不到redisCli這個例項,例項是在外部宣告的,在goroutine中可能獲取不到
// 可以通過函式引數的形式傳遞進來,一般這種資源型的東西,建議使用連線池
ret,err := redisPool.Cmd("PFADD",hyperLogLogKey,data.uid,"EX",86400).Int()
if err != nil{
log.Warningln("UvCounter check redis hyperloglog failed, ",err)
}
if ret!=1 {
continue
}
sItem := storageBlock{"uv","ZINCRBY",data.unode}
storageChannel <- sItem
}
}
// 迴圈讀取storageChannel中的內容,並使用redis進行相關資料統計
func dataStorage(storageChannel chan storageBlock, redisPool *pool.Pool){
for block := range storageChannel {
prefix := block.counterType + "_"
//逐層加洋蔥皮,網站-大分類-小分類-終極頁面,當用戶訪問任一級頁面時,需要給其上游頁面都新增相應的統計資料
// 維度:天-小時-分鐘
// 層級:網站-大分類-小分類-終極頁面
// 儲存模型:Redis SortedSet
setKeys := []string{
prefix+"day_"+getTime(block.unode.unTime,"day"), //網站的uv、pv統計
prefix+"hour_"+getTime(block.unode.unTime,"hour"),
prefix+"min_"+getTime(block.unode.unTime,"min"),
prefix+block.unode.unType+"_day_"+getTime(block.unode.unTime,"day"), // 每種型別頁面的統計movie、list、home
prefix+block.unode.unType+"_hour_"+getTime(block.unode.unTime,"hour"),
prefix+block.unode.unType+"_min_"+getTime(block.unode.unTime,"min"),
}
rowId := block.unode.unRid
for _,key := range setKeys{
ret, err := redisPool.Cmd( block.storageModel, key, 1, rowId).Int()
if ret<=0 || err!=nil{
log.Errorln("DataStorage redis storage error.",block.storageModel,key,rowId)
}
}
}
}
// 提取出要寫進儲存器的單條記錄
func formatUrl(url,t string) urlNode {
// 一定從量大的著手,詳情頁>列表頁>=首頁
pos1 := str.IndexOf(url,HANDLE_MOVIE,0)
if pos1 != -1{
pos1 += len(HANDLE_MOVIE)
pos2 := str.IndexOf(url,HANDLE_HTML,pos1)
idStr := str.Substr(url,pos1,pos2-pos1)
id,_ := strconv.Atoi(idStr)
return urlNode{"movie",id,url,t}
} else {
pos1 = str.IndexOf(url,HANDLE_LIST,0)
if pos1 != -1{
pos1 += len(HANDLE_LIST)
pos2 := str.IndexOf(url,HANDLE_LIST,pos1)
idStr := str.Substr(url,pos1,pos2-pos1)
id,_ := strconv.Atoi(idStr)
return urlNode{"list",id,url,t}
} else {
return urlNode{"home",1,url,t}
} // 如果頁面有很多種,就不斷在這裡擴充套件
}
}
// 根據時間型別獲取時間戳
func getTime( logtime,timeType string) string {
var item string
switch timeType {
case "day":
item = "2006-01-02"
break
case "hour":
item = "2006-01-02 15"
break
case "min":
item = "2006-01-02 15:04"
break
}
t,_ := time.Parse( item,time.Now().Format(item))
return strconv.FormatInt(t.Unix(),10)
}
基本流程:
1.通過命令列收集使用者輸入的引數:logFilePath
(要分析的日誌所在路徑)、routineNum
(想要設定的用於解析日誌的goroutine數目)、l
(執行時日誌存放的路徑),引入logrus
包進行執行時的日誌記錄
2.根據使用者定義的routineNum,初始化一些channel,用於資料傳遞
logChannel
用於資料統計,pvChannel
用於pv統計,uvChannel
用於uv統計,storageChannel
用於統計資料轉存redis
3.使用radix.v2/pool
包維持redis連線池,每3秒ping一下redis
4.生成一個goroutine,逐行讀取日誌,將讀取到的內容寫入logChannel
中
5.根據使用者指定的routineNum
建立一組日誌處理的goroutine,其將從logChannel
中讀一行日誌資料進行解析,解析過程包括:
- 切割日誌字串,摳出打點上報的資料(從一行日訪問志中解析出time,refer,url,ua,寫入digData結構體中並返回)
data := cutLogFetchData( logStr )
- uid,模擬生成uid,MD5(refer+ua),利用了
crypto/md5
,encoding/hex
這兩個包
hasher := md5.New()
hasher.Write( []byte( data.refer + data.ua ) )
uid := hex.EncodeToString( hasher.Sum(nil) )
之後對digData中的url進行解析(呼叫formatUrl()),提取出使用者訪問的資源型別(‘movie’、‘list’、‘home’),並且與url的訪問時間一起寫入到urlNode
這個結構體中,然後將解析日誌資料後返回的結構體(digData)、uid、解析url後返回的結構體(urlNode)寫入urlData這個結構體中,然後將該結構體寫入到pvChannel,uvChannel
uData := urlData{data, uid,formatUrl(data.url,data.time)}
pvChannel<-uData
uvChannel<-uData
6.建立一個goroutine用於pv統計,迴圈讀取pvChannel
中的內容,將統計型別(pv),儲存要用的資料型別(ZINCRBY),以及pvChannel
中的unode結構體,構造成storageBlock
結構體,並寫入storageChannel
中
7.建立一個goroutine用於uv統計,迴圈讀取uvChannel
中的內容,並根據從uvChannel
中讀取到的的結構體(urlData)裡面的uid,利用redis的HyperLogLog
進行使用者去重,然後將統計型別(uv),儲存要用的資料型別(ZINCRBY),以及uvChannel
中的unode結構體,構造成storageBlock
結構體,並寫入storageChannel
中
8.建立一個goroutine,迴圈讀取storageChannel
中的資料,構造不同的key,然後利用redis進行統計和儲存
9.最後為了讓程式順利跑起來,在主執行緒中設定了睡眠1000s
涉及的點
- logrus包的使用
- bufio的使用
參考連線 bufio的解析 - mgutz/str包的使用
- radix.v2/pool包的使用
- MD5生成(crypto/md5,encoding/hex)
參考連線 golang md5
hasher := md5.New()
hasher.Write( []byte( data.refer + data.ua ) )
uid := hex.EncodeToString( hasher.Sum(nil) )
這裡直接對一串字串計算MD5。其中通過md5.New()初始化一個MD5物件,其實它是一個hash.Hash物件。 函式原型為 func New() hash.Hash 。該物件實現了hash.Hash的Sum介面:計算出校驗和。其函式原型 為 func Sum(data []byte) [Size]byte 這裡的官方Manual對其的描述我感覺有點問題。其官方描述為: " Sum returns the MD5 checksum of the data. "
通過翻閱原始碼可以看到他並不是對data進行校驗計算,而是對hash.Hash物件內部儲存的內容進行校驗和 計算然後將其追加到data的後面形成一個新的byte切片。因此通常的使用方法就是將data置為nil,sum方法描述
// Sum appends the current hash to b and returns the resulting slice.
// It does not change the underlying hash state.
Sum(b []byte) []byte
該方法返回一個Size大小的byte陣列,對於MD5來說就是一個128bit的16位元組byte陣列。
可參考連結 Golang計算MD5
然後 encoding/hex包是實現十六進位制編碼和解碼用的,參考連線 騰訊雲
二進位制是Binary,即bin
八進位制是Octal,即oct
十進位制為Decimal,即dec
十六進位制為Hexadecimal,即hex
- golang的時間戳
import (
"fmt"
"time"
)
func main() {
t := time.Now()
fmt.Println(t)
t1 := time.Now().Format("2006-01-02 15:04:05")
fmt.Println(t1)
t2 := time.Now().Unix() //秒
fmt.Println(t2)
t3 := time.Now().UnixNano() //納秒
fmt.Println(t3)
}
關於golang中的奇怪時間 2006-01-02 15:04:05
:
仔細觀察這個日期,06年,1月2日下午3點4分5秒,查閱相關資料還有 -7時區,Monday,數字1~7都有了,而且都不重複。
其實日期為 2006-01-02T15:04:05Z07:00,每個數字都有意義
1 2 3 4 5 6 7
月 日 時 分 秒 年 時 區
道理其實跟別的語言 的 YYYY-mm-dd 一樣,只不過go用了這個特殊的包含1~7的時間而已。
可以看這裡 stackoverflow
以及這裡 簡書
- url 解析query
d := str.Substr(logStr, pos1, pos2-pos1)
//將擷取到的k=v&k=v的形式的字串轉換
urlInfo, err := url.Parse("http://localhost/?"+d) //要拼接成完整的網址是因為這個方法只認完整的網址才解析
if err != nil {
return digData{}
}
data := urlInfo.Query()
return digData{
data.Get("time"),
data.Get("refer"),
data.Get("url"),
data.Get("ua"),
}