1. 程式人生 > >golang 流量統計系統視訊總結(二)

golang 流量統計系統視訊總結(二)

總體流程

在這裡插入圖片描述

在這裡插入圖片描述

解析使用者訪問行為日誌部分
程式碼實現:

package main

import (
	"bufio"
	"crypto/md5"
	"encoding/hex"
	"flag"
	"github.com/mediocregopher/radix.v2/pool"
	"github.com/mgutz/str"
	"github.com/sirupsen/logrus"
	"io"
	"net/url"
	"os"
	"strconv"
	"strings"
	"time"
)

const HANDLE_DIG = " /dig?"
const HANDLE_MOVIE =
"/movie/" const HANDLE_LIST = "/list/" const HANDLE_HTML = ".html" // 收集命令列引數的結構體 type cmdParams struct{ logFilePath string routineNum int } // 用於儲存一條訪問日誌資訊解析後的關鍵內容 type digData struct { time string url string refer string ua string } // 用於儲存使用者的訪問行為 type urlData struct{ data digData uid string unode urlNode }
// 要記錄的資料節點,類似於資料庫中的單條資料 type urlNode struct{ unType string // 詳情頁 或者 列表頁 或者 首頁 unRid int // Resource ID 資源id unUrl string // 當前這個頁面的url unTime string // 當前訪問這個頁面的時間 } // 儲存一條pv/uv統計資料 type storageBlock struct{ counterType string storageModel string unode urlNode } var log = logrus.New() //var redisCli redis.Client
func init(){ log.Out = os.Stdout log.SetLevel( logrus.DebugLevel) /* redisCli,err := redis.Dial("tcp","localhost:6379") if err != nil{ log.Fatalln("Redis connect failed") } else { defer redisCli.Close() } */ } func main() { // 獲取引數 logFilePath := flag.String("logFilePath","/User/Pangee/Public/nginx/log/dig.log","target log file path") routineNum := flag.Int("routineNum",5,"consumer number by go routine") l := flag.String("l","/tmp/log","runtime log file") flag.Parse() params := cmdParams{*logFilePath, *routineNum} // 打日誌 logFd, err := os.OpenFile( *l ,os.O_CREATE|os.O_WRONLY, 0644) if (err == nil){ log.Out = logFd defer logFd.Close() } log.Infoln("exec strat.") log.Infoln("params:log filepath = %s,routineNum = %d",params.logFilePath,params.routineNum) // 初始化一些channel,用於資料傳遞 var logChannel = make(chan string, 3 * params.routineNum) // 用於日誌解析 var pvChannel = make(chan urlData, params.routineNum) // 用於pv統計 var uvChannel = make(chan urlData, params.routineNum) // 使用者uv統計 var storageChannel = make(chan storageBlock, params.routineNum) // 使用者儲存統計資料 // Redis pool redisPool, err := pool.New("tcp","localhost:6379",2*params.routineNum) if err!=nil{ log.Fatalln("redis pool created fail") panic(err) } else { // 保持redis的連線不閒置 go func() { for { redisPool.Cmd( "PING") time.Sleep( 3 * time.Second) } }() } // 日誌消費者,往logChannel中寫日誌資料 go readFileLineByLine(params, logChannel) // 建立一組日誌處理,從logChannel中讀資料,並將讀到的資料寫入到pvChannel,uvChannel for i:=0;i<params.routineNum;i++{ go logConsumer(logChannel, pvChannel, uvChannel) } // 建立PV/UV 統計器,從pvChannel和uvChannel中讀取資料,然後將資料寫入storageChannel go pvCounter(pvChannel, storageChannel) go uvCounter(uvChannel, storageChannel, redisPool) // 建立儲存器 go dataStorage(storageChannel, redisPool) // 之後會封裝成daemon,但是先讓程式跑起來 time.Sleep(1000 * time.Second) } //逐行消費日誌 func readFileLineByLine(params cmdParams,logChannel chan string) error { fd, err := os.Open(params.logFilePath) if(err != nil){ log.Warningf("ReadFileLineByLine can't open file: %s",params.logFilePath) return err } defer fd.Close() count := 0 bufferRead := bufio.NewReader( fd ) for { line, err := bufferRead.ReadString( '\n' ) logChannel <- line count++ if count%(1000*params.routineNum) == 0{ log.Infof("ReadLineByLine line: %d", count) } if err != nil { if err == io.EOF { //如果讀檔案讀完了,休息一下 time.Sleep( 3*time.Second ) log.Infof("ReadFileLineByLine wait,readLine: %d", count) } else { log.Warningf("ReadFileLineByLine read error") } } } return nil } //從logChannel中讀資料,並將讀到的資料寫入到pvChannel,uvChannel func logConsumer(logChannel chan string, pvChannel,uvChannel chan urlData) error { for logStr := range logChannel{ //切割日誌字串,摳出打點上報的資料 data := cutLogFetchData( logStr ) //uid,模擬生成uid,MD5(refer+ua) hasher := md5.New() hasher.Write( []byte( data.refer + data.ua ) ) uid := hex.EncodeToString( hasher.Sum(nil) ) //很多的解析工作都可以放到這裡完成 //json等.... uData := urlData{data, uid,formatUrl(data.url,data.time)} pvChannel<-uData uvChannel<-uData } return nil } //擷取上報資訊並返回結構體 func cutLogFetchData(logStr string) digData{ logStr = strings.TrimSpace(logStr) pos1 := str.IndexOf( logStr, HANDLE_DIG, 0) if pos1 == -1{ return digData{} } pos1 += len(HANDLE_DIG) //計算偏移量 pos2 := str.IndexOf( logStr, "HTTP/", pos1) d := str.Substr(logStr, pos1, pos2-pos1) //將擷取到的k=v&k=v的形式的字串轉換 urlInfo, err := url.Parse("http://localhost/?"+d) //要拼接成完整的網址是因為這個方法只認完整的網址才解析 if err != nil { return digData{} } data := urlInfo.Query() return digData{ data.Get("time"), data.Get("refer"), data.Get("url"), data.Get("ua"), } } // 統計pv,將統計到的資料放到storageBlock結構體中,然後寫入到storageChannel func pvCounter(pvChannel chan urlData, storageChannel chan storageBlock){ for data := range pvChannel{ sItem := storageBlock{"pv","ZINCRBY",data.unode} storageChannel <- sItem } } // 統計uv,使用redis的HyperLoglog去重使用者,將統計到的資料放到storageBlock結構體中,然後寫入到storageChannel func uvCounter(uvChannel chan urlData, storageChannel chan storageBlock, redisPool *pool.Pool){ for data := range uvChannel { //HyperLoglog redis 去重使用者 hyperLogLogKey := "uv_hpll_"+ getTime(data.data.time, "day") // 這行會報錯,因為執行到這裡時獲取不到redisCli這個例項,例項是在外部宣告的,在goroutine中可能獲取不到 // 可以通過函式引數的形式傳遞進來,一般這種資源型的東西,建議使用連線池 ret,err := redisPool.Cmd("PFADD",hyperLogLogKey,data.uid,"EX",86400).Int() if err != nil{ log.Warningln("UvCounter check redis hyperloglog failed, ",err) } if ret!=1 { continue } sItem := storageBlock{"uv","ZINCRBY",data.unode} storageChannel <- sItem } } // 迴圈讀取storageChannel中的內容,並使用redis進行相關資料統計 func dataStorage(storageChannel chan storageBlock, redisPool *pool.Pool){ for block := range storageChannel { prefix := block.counterType + "_" //逐層加洋蔥皮,網站-大分類-小分類-終極頁面,當用戶訪問任一級頁面時,需要給其上游頁面都新增相應的統計資料 // 維度:天-小時-分鐘 // 層級:網站-大分類-小分類-終極頁面 // 儲存模型:Redis SortedSet setKeys := []string{ prefix+"day_"+getTime(block.unode.unTime,"day"), //網站的uv、pv統計 prefix+"hour_"+getTime(block.unode.unTime,"hour"), prefix+"min_"+getTime(block.unode.unTime,"min"), prefix+block.unode.unType+"_day_"+getTime(block.unode.unTime,"day"), // 每種型別頁面的統計movie、list、home prefix+block.unode.unType+"_hour_"+getTime(block.unode.unTime,"hour"), prefix+block.unode.unType+"_min_"+getTime(block.unode.unTime,"min"), } rowId := block.unode.unRid for _,key := range setKeys{ ret, err := redisPool.Cmd( block.storageModel, key, 1, rowId).Int() if ret<=0 || err!=nil{ log.Errorln("DataStorage redis storage error.",block.storageModel,key,rowId) } } } } // 提取出要寫進儲存器的單條記錄 func formatUrl(url,t string) urlNode { // 一定從量大的著手,詳情頁>列表頁>=首頁 pos1 := str.IndexOf(url,HANDLE_MOVIE,0) if pos1 != -1{ pos1 += len(HANDLE_MOVIE) pos2 := str.IndexOf(url,HANDLE_HTML,pos1) idStr := str.Substr(url,pos1,pos2-pos1) id,_ := strconv.Atoi(idStr) return urlNode{"movie",id,url,t} } else { pos1 = str.IndexOf(url,HANDLE_LIST,0) if pos1 != -1{ pos1 += len(HANDLE_LIST) pos2 := str.IndexOf(url,HANDLE_LIST,pos1) idStr := str.Substr(url,pos1,pos2-pos1) id,_ := strconv.Atoi(idStr) return urlNode{"list",id,url,t} } else { return urlNode{"home",1,url,t} } // 如果頁面有很多種,就不斷在這裡擴充套件 } } // 根據時間型別獲取時間戳 func getTime( logtime,timeType string) string { var item string switch timeType { case "day": item = "2006-01-02" break case "hour": item = "2006-01-02 15" break case "min": item = "2006-01-02 15:04" break } t,_ := time.Parse( item,time.Now().Format(item)) return strconv.FormatInt(t.Unix(),10) }
基本流程:

1.通過命令列收集使用者輸入的引數:logFilePath(要分析的日誌所在路徑)、routineNum(想要設定的用於解析日誌的goroutine數目)、l(執行時日誌存放的路徑),引入logrus包進行執行時的日誌記錄
2.根據使用者定義的routineNum,初始化一些channel,用於資料傳遞
logChannel用於資料統計,pvChannel用於pv統計,uvChannel用於uv統計,storageChannel用於統計資料轉存redis
3.使用radix.v2/pool包維持redis連線池,每3秒ping一下redis
4.生成一個goroutine,逐行讀取日誌,將讀取到的內容寫入logChannel
5.根據使用者指定的routineNum建立一組日誌處理的goroutine,其將從logChannel中讀一行日誌資料進行解析,解析過程包括:

  • 切割日誌字串,摳出打點上報的資料(從一行日訪問志中解析出time,refer,url,ua,寫入digData結構體中並返回)
	data := cutLogFetchData( logStr )
  • uid,模擬生成uid,MD5(refer+ua),利用了crypto/md5encoding/hex
    這兩個包
	hasher := md5.New()
	hasher.Write( []byte( data.refer + data.ua ) )
	uid := hex.EncodeToString( hasher.Sum(nil) )

之後對digData中的url進行解析(呼叫formatUrl()),提取出使用者訪問的資源型別(‘movie’、‘list’、‘home’),並且與url的訪問時間一起寫入到urlNode這個結構體中,然後將解析日誌資料後返回的結構體(digData)、uid、解析url後返回的結構體(urlNode)寫入urlData這個結構體中,然後將該結構體寫入到pvChannel,uvChannel

	uData :=  urlData{data, uid,formatUrl(data.url,data.time)}
	pvChannel<-uData
	uvChannel<-uData

6.建立一個goroutine用於pv統計,迴圈讀取pvChannel中的內容,將統計型別(pv),儲存要用的資料型別(ZINCRBY),以及pvChannel中的unode結構體,構造成storageBlock結構體,並寫入storageChannel
7.建立一個goroutine用於uv統計,迴圈讀取uvChannel中的內容,並根據從uvChannel中讀取到的的結構體(urlData)裡面的uid,利用redis的HyperLogLog進行使用者去重,然後將統計型別(uv),儲存要用的資料型別(ZINCRBY),以及uvChannel中的unode結構體,構造成storageBlock結構體,並寫入storageChannel
8.建立一個goroutine,迴圈讀取storageChannel中的資料,構造不同的key,然後利用redis進行統計和儲存
9.最後為了讓程式順利跑起來,在主執行緒中設定了睡眠1000s

涉及的點

  • logrus包的使用
  • bufio的使用
    參考連線 bufio的解析
  • mgutz/str包的使用
  • radix.v2/pool包的使用
  • MD5生成(crypto/md5,encoding/hex)

參考連線 golang md5

	hasher := md5.New()
	hasher.Write( []byte( data.refer + data.ua ) )
	uid := hex.EncodeToString( hasher.Sum(nil) )

這裡直接對一串字串計算MD5。其中通過md5.New()初始化一個MD5物件,其實它是一個hash.Hash物件。 函式原型為 func New() hash.Hash 。該物件實現了hash.Hash的Sum介面:計算出校驗和。其函式原型 為 func Sum(data []byte) [Size]byte 這裡的官方Manual對其的描述我感覺有點問題。其官方描述為: " Sum returns the MD5 checksum of the data. "

通過翻閱原始碼可以看到他並不是對data進行校驗計算,而是對hash.Hash物件內部儲存的內容進行校驗和 計算然後將其追加到data的後面形成一個新的byte切片。因此通常的使用方法就是將data置為nil,sum方法描述

// Sum appends the current hash to b and returns the resulting slice.
// It does not change the underlying hash state.
Sum(b []byte) []byte

該方法返回一個Size大小的byte陣列,對於MD5來說就是一個128bit的16位元組byte陣列。

可參考連結 Golang計算MD5

然後 encoding/hex包是實現十六進位制編碼和解碼用的,參考連線 騰訊雲

二進位制是Binary,即bin
八進位制是Octal,即oct
十進位制為Decimal,即dec
十六進位制為Hexadecimal,即hex

  • golang的時間戳
import ( 
    "fmt" 
    "time"
) 
   
func main() { 
    t := time.Now() 
    fmt.Println(t) 
    t1 := time.Now().Format("2006-01-02 15:04:05") 
    fmt.Println(t1) 
    t2  := time.Now().Unix()    //秒
    fmt.Println(t2)
    t3  := time.Now().UnixNano()   //納秒
    fmt.Println(t3)
}

關於golang中的奇怪時間 2006-01-02 15:04:05:
仔細觀察這個日期,06年,1月2日下午3點4分5秒,查閱相關資料還有 -7時區,Monday,數字1~7都有了,而且都不重複。
其實日期為 2006-01-02T15:04:05Z07:00,每個數字都有意義
1 2 3 4 5 6 7

月 日 時 分 秒 年 時 區
道理其實跟別的語言 的 YYYY-mm-dd 一樣,只不過go用了這個特殊的包含1~7的時間而已。

可以看這裡 stackoverflow

以及這裡 簡書

  • url 解析query
d := str.Substr(logStr, pos1, pos2-pos1)
//將擷取到的k=v&k=v的形式的字串轉換
urlInfo, err := url.Parse("http://localhost/?"+d)    //要拼接成完整的網址是因為這個方法只認完整的網址才解析
if err != nil {
	return digData{}
}
data := urlInfo.Query()
return digData{
	data.Get("time"),
	data.Get("refer"),
	data.Get("url"),
	data.Get("ua"),
}