1. 程式人生 > >分散式系統限流演算法分析與實現

分散式系統限流演算法分析與實現

一、限流的關鍵作用

對於大型網際網路架構中,限流的設計是必不可少的一個環節。在給定的時間內, 客戶端請求次數過多, 伺服器就會攔截掉部分請求,避免請求流量過大造成資料庫負載高的問題。

 

二、常見限流演算法利弊分析

計數器限流

計數器限流主要有固定視窗計數器和滑動視窗計數器。固定視窗計數器即:在單位時間內請求數達到了所限定的數量時,請求如需不被攔截則需要等待下一個單位時間開始;滑動視窗計數器:在單位時間內請求數達到了所限定的數量時,當前時刻的請求會被攔截,隨時間視窗的滑動計數器數量會變化,當計數器數量小於限定數量時請求正常執行。

常規計數器限流

 

常規計數器限流是指在一個時間段內允許一定數量的請求執行,超過最大限制則會阻止請求直到超過當前時間段為止。如上圖所示,10s內限制1000個請求,在第11s的時候計數器會從0重新開始計數。 

 

常規計數器限流瞬時流量問題

如上圖所示,常規計數器模式下,在第9.9s的時候執行了1000請求,在第10.1s時計數器已清0,此時又有1000請求到來,這樣相當於在0.2s的時間內有2000請求,顯然違背了限流的初衷。

 

滑動視窗計數器限流

滑動視窗計數器限流是在計數器限流的基礎上將固定的時間段劃分為若干個時間視窗,隨著時間的推移,保持時間段內的滑動視窗個數,在常規計數器限流的基礎上避免了瞬時流量對伺服器的壓力。如上圖所示,0-3s內有600請求,8-13s有700請求,當第16s時新增500請求會觸發限流。

 

計數器限流突刺現象

 

特點:如果在當前時間視窗最後半秒請求數突然達到最大限制,半秒後進入下一個時間視窗開始,如果請求繼續在半秒內達到週期上限,則相當於1秒內請求達到2倍的限制請求數;如果60s為一個週期,在第10s的時候伺服器已處理完請求,在計數器限流模式下會使得伺服器空閒50s無法處理請求。

計數器限流原理簡單,實現比較容易,但是也有一個痛點問題就是它的突刺現象,如上圖所示,10s限制1000請求,到第2s時已達請求上限,那麼在第3-10s內的請求將會持續拒絕,在伺服器資源空閒的狀態下會造成極大的浪費。

 

漏桶限流

 

請求進入到漏桶中,漏桶以固定的速度流出,當訪問頻率超過介面響應頻率流速過大時拒絕請求,可以看到漏桶相當於一個佇列,進隊的速率不受限制,出隊是固定速率。

特點:由於出水速率是固定的,當突發大流量時會導致大量請求被限制,無法處理。

漏桶限流,請求進入漏桶不受限制,並以固定的速率流出,當桶滿並且當前流入的請求大於當前流出的請求時,限制請求。漏桶限流解決了計數器限流模式下流量突刺的問題,當伺服器處理完請求後,只要能從漏桶中流出請求則能繼續處理,不會造成長時間等待拒絕請求。

 

漏桶限流突發流量問題

 

漏桶限流突發流量問題,如上圖所示,漏桶滿後此時大量請求到來,由於伺服器已擴容可以滿足請求處理,但是漏桶會拒絕大量請求,導致無法應對突發流量問題。

 

令牌桶限流

令牌token以固定的速率向桶中放入令牌直至桶滿,在執行請求前需要先從桶中獲取令牌,形式上也相當於佇列,入隊以固定速率,出對不受限制,這點與漏桶剛好相反。

特點:可以應對突發流量,只要桶中有令牌即可執行請求。

 

三、golang語言層面實現限流演算法

簡單計數器限流

 

package main

import (
	"fmt"
	"sync"
	"time"
)

// CounterLimiter 簡單計數器限流
type CounterLimiter struct {
	Interval int64     // 重新計數時間
	LastTime time.Time // 上一次請求時間
	MaxCount int       // 最大計數
	Lck      *sync.Mutex
	ReqCount int // 目前的請求數
}

// NewCounterLimiter 初始化簡單計數器限流
func NewCounterLimiter(interval int64, maxCount int) *CounterLimiter {
	return &CounterLimiter{
		Interval: interval,
		LastTime: time.Now(),
		MaxCount: maxCount,
		Lck:      new(sync.Mutex),
		ReqCount: 0,
	}
}

// counterLimit 簡單計數器限流實現
func (r *CounterLimiter) counterLimit() bool {
	r.Lck.Lock()
	defer r.Lck.Unlock()
	now := time.Now()
	if now.Unix()-r.LastTime.Unix() > r.Interval {
		r.LastTime = now
		r.ReqCount = 0
	}
	if r.ReqCount < r.MaxCount {
		r.ReqCount += 1
		return true
	}
	return false
}

func main() {
	// 定義1秒最多5個請求
	r := NewCounterLimiter(1, 5)
	for i := 0; i < 20; i++ {
		ok := r.counterLimit()
		if ok {
			fmt.Println("pass ", i)
		} else {
			fmt.Println("limit ", i)
		}
		time.Sleep(100 * time.Millisecond)
	}
}

 

  

滑動視窗計數器限流 

package main

import (
	"fmt"
	"sync"
	"time"
)

// SlidingWindowLimiter 滑動視窗計數器限流
type SlidingWindowLimiter struct {
	Interval    int64       // 總計數時間
	LastTime    time.Time   // 上一個視窗時間
	Lck         *sync.Mutex // 鎖
	WinCount    []int64     // 視窗中請求當前數量
	TicketSize  int64       // 視窗最大容量
	TicketCount int64       // 視窗個數
	CurIndex    int64       // 目前使用的視窗下標
}

// NewSlidingWindowLimiter 初始化滑動視窗計數器限流
func NewSlidingWindowLimiter(interval int64, ticketCount int64, ticketSize int64) *SlidingWindowLimiter {
	return &SlidingWindowLimiter{
		Interval:    interval,
		LastTime:    time.Now(),
		TicketSize:  ticketSize,
		TicketCount: ticketCount,
		WinCount:    make([]int64, ticketSize, ticketSize),
		CurIndex:    0,
		Lck:         new(sync.Mutex),
	}
}

// slidingCounterLimit 滑動視窗計數器限流實現
func (r *SlidingWindowLimiter) slidingCounterLimit() bool {
	r.Lck.Lock()
	defer r.Lck.Unlock()
	eachTicketTime := r.Interval / r.TicketCount
	now := time.Now()
	// 如果間隔時間超過一個視窗的時間 當前視窗置0 指向下一個視窗
	if now.Unix()-r.LastTime.Unix() > eachTicketTime {
		r.WinCount[r.CurIndex] = 0
		r.CurIndex = (r.CurIndex + 1) % r.TicketCount
		r.LastTime = now
	}
	fmt.Println("當前視窗:", r.CurIndex)
	// 當前視窗未滿則正常計數
	if r.WinCount[r.CurIndex] < r.TicketSize {
		r.WinCount[r.CurIndex]++
		return true
	}
	return false
}

func main() {
	// 定義1秒10個時間視窗 每個視窗大小為1  即1秒10個請求
	r := NewSlidingWindowLimiter(1, 10, 1)
	for i := 0; i < 20; i++ {
		ok := r.slidingCounterLimit()
		if ok {
			fmt.Println("pass ", i)
		} else {
			fmt.Println("limit ", i)
		}
		time.Sleep(100 * time.Millisecond)
	}
}

 

 

漏桶限流

package main

import (
	"fmt"
	"sync"
	"time"
)

// BucketLimiter 定義漏桶演算法struct
type BucketLimiter struct {
	Lck      *sync.Mutex // 鎖
	Rate     float64     //最大速率限制
	Balance  float64     //漏桶的餘量
	Cap      float64     //漏桶的最大容量限制
	LastTime time.Time   //上次檢查的時間
}

// NewBucketLimiter 初始化BucketLimiter
func NewBucketLimiter(rate int, cap int) *BucketLimiter {
	return &BucketLimiter{
		Lck:      new(sync.Mutex),
		Rate:     float64(rate),
		Balance:  float64(cap),
		Cap:      float64(cap),
		LastTime: time.Now(),
	}
}

// leakyBucket 漏桶演算法實現
func (r *BucketLimiter) leakyBucket() bool {
	ok := false
	r.Lck.Lock()
	defer r.Lck.Unlock()
	now := time.Now()
	dur := now.Sub(r.LastTime).Seconds() //當前時間與上一次檢查時間差
	r.LastTime = now
	water := dur * r.Rate //計算這段時間內漏桶流出水的流量water
	r.Balance += water    //漏桶流出water容量的水,自然漏桶的餘量多出water
	if r.Balance > r.Cap {
		r.Balance = r.Cap
	}
	if r.Balance >= 1 { //漏桶餘量足夠容下當前的請求
		r.Balance -= 1
		ok = true
	}
	return ok
}

func main() {
	// 初始化 限制每秒2個請求 漏洞容量為5
	r := NewBucketLimiter(2, 5)
	for i := 0; i < 20; i++ {
		ok := r.leakyBucket()
		if ok {
			fmt.Println("pass ", i)
		} else {
			fmt.Println("limit ", i)
		}
		time.Sleep(100 * time.Millisecond)
	}
}

  

令牌桶限流

package main

import (
	"fmt"
	"math"
	"sync"
	"time"
)

// TokenBucket 定義令牌桶結構
type TokenBucket struct {
	LastTime time.Time // 當前請求時間
	Capacity float64   // 桶的容量(存放令牌的最大量)
	Rate     float64   // 令牌放入速度
	Tokens   float64   // 當前令牌總量
	Lck      *sync.Mutex
}

// NewTokenBucket 初始化TokenBucket
func NewTokenBucket(rate int, cap int) *TokenBucket {
	return &TokenBucket{
		LastTime: time.Now(),
		Capacity: float64(cap),
		Rate:     float64(rate),
		Tokens:   float64(cap),
		Lck:      new(sync.Mutex),
	}
}

// getToken 判斷是否獲取令牌(若能獲取,則處理請求)
func (r *TokenBucket) getToken() bool {
	now := time.Now()
	r.Lck.Lock()
	defer r.Lck.Unlock()
	// 先新增令牌
	tokens := math.Min(r.Capacity, r.Tokens+now.Sub(r.LastTime).Seconds()*r.Rate)
	r.Tokens = tokens
	if tokens < 1 {
		// 若桶中一個令牌都沒有了,則拒絕
		return false
	} else {
		// 桶中還有令牌,領取令牌
		r.Tokens -= 1
		r.LastTime = now
		return true
	}
}

func main() {
	// 初始化 限制每秒2個請求 令牌桶容量為5
	r := NewTokenBucket(2, 5)
	for i := 0; i < 20; i++ {
		ok := r.getToken()
		if ok {
			fmt.Println("pass ", i)
		} else {
			fmt.Println("limit ", i)
		}
		time.Sleep(100 * time.Millisecond)
	}
}

 

 

四、nginx限流及實現

Nginx 提供了兩種限流手段:一是控制速率,二是控制併發連線數。
1. 控制速率
我們需要使用 limit_req_zone 用來限制單位時間內的請求數,即速率限制,示例配置如下:
limit_req_zone $binary_remote_addr zone=mylimit:10m rate=2r/s;
server {
    location / {
        limit_req zone=mylimit;
    }
}
limit_req zone=mylimit方案:
以上配置表示,限制每個 IP 訪問的速度為 2r/s,因為 Nginx 的限流統計是基於毫秒的,我們設定的速度是 2r/s,轉換一下就是 500ms 內單個 IP 只允許通過 1 個請求,從 501ms 開始才允許通過第 2 個請求。
我們使用單 IP 在 10ms 內發併發送了 6 個請求的執行結果如下:
從以上結果可以看出他的執行符合我們的預期,只有 1 個執行成功了,其他的 5 個被拒絕了(第 2 個在 501ms 才會被正常執行)。
表現為對收到的請求無延時 超過訪問頻率則503

limit_req zone=mylimit burst=3方案:
上面的速率控制雖然很精準但是應用於真實環境未免太苛刻了,真實情況下我們應該控制一個 IP 單位總時間內的總訪問次數,而不是像上面那麼精確但毫秒,我們可以使用 burst 關鍵字開啟此設定,示例配置如下:
limit_req_zone $binary_remote_addr zone=mylimit:10m rate=2r/s;
server {
    location / {
        limit_req zone=mylimit burst=3;
    }
}
burst=3 表示每個 IP 最多允許3個突發請求,如果單個 IP 在 10ms 內傳送 6 次請求的結果如下:
從以上結果可以看出,有 1 個請求被立即處理了,3 個請求被放到 burst 佇列裡排隊執行了,另外 2被丟棄了。
超過了burst緩衝佇列長度和rate處理能力的請求被直接丟棄
表現為對收到的請求有延時 所有請求排隊
limit_req zone=mylimit burst=3 nodelay方案:
server {
    location / {
        limit_req zone=mylimit burst=3 nodelay;
    }
}
如果單個 IP 在 10ms 內傳送 6 次請求的結果如下:
依照在limit_req_zone中配置的rate來處理請求,同時設定了一個大小為3的緩衝佇列,
當請求到來時,會爆發出一個峰值處理能力,表示這3個請求立刻處理,對於峰值處理數量之外的請求,直接丟棄
緩衝佇列按rate來釋放
表現為對收到的請求無延時 緩衝已滿則503


2. 控制併發數
這個模組用來限制單個IP的請求數。並非所有的連線都被計數,只有在伺服器處理了請求並且已經讀取了整個請求頭時,連線才被計數。
利用 limit_conn_zone 和 limit_conn 兩個指令即可控制併發數,示例配置如下:
limit_conn_zone $binary_remote_addr zone=perip:10m;
limit_conn_zone $server_name zone=perserver:10m;
server {
    ...
    limit_conn perip 10;
    limit_conn perserver 100;
}
其中 limit_conn perip 10 表示限制單個 IP 同時最多能持有 10 個連線;limit_conn perserver 100 表示 server 同時能處理併發連線的總數為 100 個。
只有當 request header 被後端處理後,這個連線才進行計數。

 

五、基於redis實現限流演算法

對於上述限流演算法目前已有很多成熟的第三方庫實現了,但是對於分散式系統來說無法起到嚴格意義上的限流,因此基於redis以gin中介軟體的方式實現上述限流演算法。

滑動視窗計數器限流

func Limiter(ctx *gin.Context) {
	now := time.Now().UnixNano()
	username, exists := ctx.Get("username")
	if !exists {
		ctx.JSON(http.StatusBadRequest, gin.H{"message": "username獲取失敗"})
	}
	key := fmt.Sprintf(redis.KeyLimitArticleUser, username)
	c, err := redis.Client.RedisCon.Dial()
	if err != nil || c == nil {
		ctx.JSON(http.StatusBadRequest, gin.H{"message": "redis連線失敗"})
		return
	}

	//限制五秒一次請求
	var limit int64 = 1
	dura := time.Second * 5
	//刪除有序集合中的五秒之前的資料
	_, err = c.Do("ZREMRANGEBYSCORE", key, "0", fmt.Sprint(now-(dura.Nanoseconds())))
	if err != nil {
		ctx.JSON(http.StatusBadRequest, gin.H{"message": "redis操作ZREMRANGEBYSCORE失敗"})
	}
	reqs, _ := redisPool.Int64(c.Do("ZCARD", key))
	if reqs >= limit {
		ctx.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{
			"status":  http.StatusTooManyRequests,
			"message": "too many request",
		})
		return
	}

	ctx.Next()
	_, err = c.Do("ZADD", key, float64(now), float64(now))
	if err != nil {
		ctx.JSON(http.StatusBadRequest, gin.H{"message": "redis操作ZADD失敗"})
	}
	_, err = c.Do("EXPIRE", key, dura)
	if err != nil {
		ctx.JSON(http.StatusBadRequest, gin.H{"message": "redis操作EXPIRE失敗"})
	}
}

 

漏桶限流

// LeakyBucket redis實現漏桶限流
func LeakyBucket(ctx *gin.Context) {
	username, exists := ctx.Get("username")
	if !exists {
		ctx.JSON(http.StatusBadRequest, gin.H{"message": "username獲取失敗"})
	}
	key := fmt.Sprintf(redis.KeyLeakyBucketArticleUser, username)
	c, err := redis.Client.RedisCon.Dial()
	if err != nil || c == nil {
		ctx.JSON(http.StatusBadRequest, gin.H{"message": "redis連線失敗"})
		return
	}

	rate := 2                                                       // 每秒2個請求
	capacity := 5                                                   // 桶容量
	lastTime, err := redisPool.Int64(c.Do("hget", key, "lastTime")) // 上次請求時間
	now := time.Now().Unix()
	water := int(now-lastTime) * rate                           // 經過一段時間後桶流出的請求
	balance, err := redisPool.Int(c.Do("hget", key, "balance")) // 上一次桶的餘量
	balance += water                                            // 當前桶的餘量
	if balance > capacity {
		balance = capacity
	}
	if balance >= 1 {
		balance--
		lastTime = now // 記錄當前請求時間 秒為單位
		c.Do("hset", key, "lastTime", lastTime)
		c.Do("hset", key, "balance", balance)
		return
	}

	// 無空閒balance可用時 429狀態碼限流提示
	ctx.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{
		"status":  http.StatusTooManyRequests,
		"message": "too many request",
	})
}

  

令牌桶限流

// BucketLimit redis實現令牌桶限流
func BucketLimit(ctx *gin.Context) {
	username, exists := ctx.Get("username")
	if !exists {
		ctx.JSON(http.StatusBadRequest, gin.H{"message": "username獲取失敗"})
	}
	key := fmt.Sprintf(redis.KeyBucketLimitArticleUser, username)
	c, err := redis.Client.RedisCon.Dial()
	if err != nil || c == nil {
		ctx.JSON(http.StatusBadRequest, gin.H{"message": "redis連線失敗"})
		return
	}

	rate := 1                                                       // 令牌生成速度 每秒1個token
	capacity := 1                                                   // 桶容量
	tokens, err := redisPool.Int(c.Do("hget", key, "tokens"))       // 桶中的令牌數
	lastTime, err := redisPool.Int64(c.Do("hget", key, "lastTime")) // 上次令牌生成時間
	now := time.Now().Unix()

	// 初始狀態下 令牌數量為桶的容量
	existKey, err := redisPool.Int(c.Do("exists", key))
	if existKey != 1 {
		tokens = capacity
		c.Do("hset", key, "lastTime", now)
	}
	deltaTokens := int(now-lastTime) * rate // 經過一段時間後生成的令牌
	if deltaTokens > 1 {
		tokens = tokens + deltaTokens // 增加令牌
	}
	if tokens > capacity {
		tokens = capacity
	}
	if tokens >= 1 {
		tokens-- // 請求進來了,令牌就減少1
		c.Do("hset", key, "lastTime", now)
		c.Do("hset", key, "tokens", tokens)
		return
	}

	// 無空閒token可用時 429狀態碼限流提示
	ctx.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{
		"status":  http.StatusTooManyRequests,
		"message": "too many request",
	})
}

 

redis+lua實現執行緒安全的分散式限流演算法

以令牌桶演算法為例:

實現流程圖

 

定義lua指令碼

// lua指令碼實現令牌桶演算法限流
	ScriptTokenLimit = `
local rateLimit = redis.pcall('HMGET',KEYS[1],'lastTime','tokens')
local lastTime = rateLimit[1]
local tokens = tonumber(rateLimit[2])
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
if tokens == nil then
  tokens = capacity
else
  local deltaTokens = math.floor((now-lastTime)*rate)
  tokens = tokens+deltaTokens
  if tokens>capacity then
    tokens = capacity
  end
end
local result = false
lastTime = now
if(tokens>0) then
  result = true
  tokens = tokens-1
end
redis.call('HMSET',KEYS[1],'lastTime',lastTime,'tokens',tokens)
return result
`


通過lua指令碼實現令牌桶演算法限流
// LuaTokenBucket 通過lua指令碼實現令牌桶演算法限流
func LuaTokenBucket(c redis.Conn, key string, capacity, rate, now int64) (bool, error) {
	defer c.Close()
	lua := redis.NewScript(1, ScriptTokenLimit)
	// lua指令碼中的引數為key和value
	res, err := redis.Bool(lua.Do(c, key, capacity, rate, now))
	if err != nil {
		return false, err
	}
	return res, nil
}

 

限流中介軟體

// LuaTokenBucket 通過lua指令碼實現令牌桶演算法限流
func LuaTokenBucket(c redis.Conn, key string, capacity, rate, now int64) (bool, error) {
	defer c.Close()
	lua := redis.NewScript(1, ScriptTokenLimit)
	// lua指令碼中的引數為key和value
	res, err := redis.Bool(lua.Do(c, key, capacity, rate, now))
	if err != nil {
		return false, err
	}
	return res, nil
}

 

六、總結

計數器、漏桶、令牌桶演算法限流有各自的特點及應用場景,不能單一維度地判斷哪個演算法最好。計數器演算法實現簡單,適用於對介面頻次的限制,如防惡意刷帖限制等;漏桶限流適用於處理流量突刺現象,因為只要桶為空就可以接受請求;而令牌桶限流適用於應對突發流量,也是目前網際網路架構中最常用的一種限流方式,只要能取到令牌即可處理請求。
nginx限流控制介面頻次其實現方式實質上是用到了漏桶演算法,如果是http請求並且使用了nginx作為反向代理,那麼可以使用nginx作為流量入口限制的第一關。
在分散式場景下,一般選擇使用redis來實現限流演算法,配合lua指令碼使得限流的判斷是一個原子操作。

&n