1. 程式人生 > 其它 >雪崩:限流演算法的golang實現

雪崩:限流演算法的golang實現

1. 什麼是伺服器雪崩

雪崩是由於區域性故障被正反饋迴圈,從而導致的不斷放大的連鎖故障,雪崩通常是由於整個系統中,一個很小的部分出現故障,進而導致整個系統不可用

雪崩出現的根本原因就是系統過載,如果在系統過載的情況下,不進行任何控制,那麼會導致系統雪崩

想要避免雪崩有幾種常見的方式,第一種就是快速減少系統負載,即熔斷、降級、限流等方式;第二種就是通過增加系統服務能力來避免雪崩,就是彈性擴容

這篇只討論限流的具體原理及實現

2. 常見限流演算法原理及其實現

2.1 固定視窗

固定視窗就是定義一個固定的統計週期,比如10s,然後在每個週期內統計當前週期中被接收到的請求數量,經過計數器累加後,如果超過設定的閾值就觸發限流,直到進入下一統計週期,計數器清零,重新統計

假設我們現在設定的是2s內不能超過100次請求,但是因為流量的進入往往都是不均勻的,所以固定視窗會存在以下兩個問題:

  • 抗抖動性差。由於流量突增使請求超過預期,導致流量可能在一個統計週期的前10ms內就達到了100次,給服務處理造成一定的壓力,同時後面的1990ms內的請求都會被限流。如果嘗試使用減小視窗值的方法來解決這個問題,那麼對應的每個視窗的閾值也將會變小,一個小的流量抖動就可能導致限流,系統抗抖動性極差
  • 如果上一個統計週期的流量集中在最後10ms,而這個統計週期集中在前10ms,那麼在這個20ms內系統就有可能收到200次請求,這違背了我們2s不超過100次請求的目的

2.2 滑動視窗

滑動視窗就是固定視窗的優化,它對固定視窗做了進一步的切分,將統計週期的粒度切分的更細,比如1min的固定視窗,切分為60個1s的滑動視窗,然後統計的範圍隨著時間的推移而同步後移

滑動時間視窗演算法,是從對普通時間視窗計數的優化。
使用普通時間視窗時,我們會為每個user_id/ip維護一個KV: uidOrIp: timestamp_requestCount。假設限制1秒1000個請求,那麼第100ms有一個請求,這個KV變成 uidOrIp: timestamp_1,遞200ms有1個請求,我們先比較距離記錄的timestamp有沒有超過1s,如果沒有隻更新count,此時KV變成 uidOrIp: timestamp_2。當第1100ms來一個請求時,更新記錄中的timestamp並重置計數,KV變成 uidOrIp: newtimestamp_1
普通時間視窗有一個問題,假設有500個請求集中在前1s的後100ms,500個請求集中在後1s的前100ms,其實在這200ms沒就已經請求超限了,但是由於時間窗每經過1s就會重置計數,就無法識別到此時的請求超限。

對於滑動時間視窗,我們可以把1ms的時間視窗劃分成10個time slot, 每個time slot統計某個100ms的請求數量。每經過100ms,有一個新的time slot加入視窗,早於當前時間100ms的time slot出視窗。視窗內最多維護10個time slot,儲存空間的消耗同樣是比較低的。

但是要注意的是,如果滑動視窗的統計視窗切分的過細,會增加系統性能和資源損耗的壓力

同時,滑動視窗和固定視窗一樣存在抗抖動性差的問題

golang實現滑動視窗限流:

var winMu map[string]*sync.RWMutex

func init() {
	winMu = make(map[string]*sync.RWMutex)
}

type timeSlot struct {
	timestamp time.Time // 這個timeSlot的時間起點
	count     int       // 落在這個timeSlot內的請求數
}

func countReq(win []*timeSlot) int {
	var count int
	for _, ts := range win {
		count += ts.count
	}
	return count
}

type SlidingWindowLimiter struct {
	SlotDuration time.Duration // time slot的長度
	WinDuration  time.Duration // sliding window的長度
	numSlots     int           // window內最多有多少個slot
	windows      map[string][]*timeSlot
	maxReq       int // win duration內允許的最大請求數
}

func NewSliding(slotDuration time.Duration, winDuration time.Duration, maxReq int) *SlidingWindowLimiter {
	return &SlidingWindowLimiter{
		SlotDuration: slotDuration,
		WinDuration:  winDuration,
		numSlots:     int(winDuration / slotDuration),
		windows:      make(map[string][]*timeSlot),
		maxReq:       maxReq,
	}
}

// 獲取user_id/ip的時間視窗
func (l *SlidingWindowLimiter) getWindow(uidOrIp string) []*timeSlot {
	win, ok := l.windows[uidOrIp]
	if !ok {
		win = make([]*timeSlot, 0, l.numSlots)
	}
	return win
}

func (l *SlidingWindowLimiter) storeWindow(uidOrIp string, win []*timeSlot) {
	l.windows[uidOrIp] = win
}

func (l *SlidingWindowLimiter) validate(uidOrIp string) bool {
	// 同一user_id/ip併發安全
	mu, ok := winMu[uidOrIp]
	if !ok {
		var m sync.RWMutex
		mu = &m
		winMu[uidOrIp] = mu
	}
	mu.Lock()
	defer mu.Unlock()

	win := l.getWindow(uidOrIp)
	now := time.Now()
	// 已經過期的time slot移出時間窗
	timeoutOffset := -1
	for i, ts := range win {
		if ts.timestamp.Add(l.WinDuration).After(now) {
			break
		}
		timeoutOffset = i
	}
	if timeoutOffset > -1 {
		win = win[timeoutOffset+1:]
	}

	// 判斷請求是否超限
	var result bool
	if countReq(win) < l.maxReq {
		result = true
	}

	// 記錄這次的請求數
	var lastSlot *timeSlot
	if len(win) > 0 {
		lastSlot = win[len(win)-1]
		if lastSlot.timestamp.Add(l.SlotDuration).Before(now) {
			lastSlot = &timeSlot{timestamp: now, count: 1}
			win = append(win, lastSlot)
		} else {
			lastSlot.count++
		}
	} else {
		lastSlot = &timeSlot{timestamp: now, count: 1}
		win = append(win, lastSlot)
	}

	l.storeWindow(uidOrIp, win)

	return result
}

func (l *SlidingWindowLimiter) getUidOrIp() string {
	return "127.0.0.1"
}

func (l *SlidingWindowLimiter) IsLimited() bool {
	return !l.validate(l.getUidOrIp())
}
func main() {
	limiter := NewSliding(100*time.Millisecond, time.Second, 10)

	for i := 0; i < 5; i++ {
		fmt.Println(limiter.IsLimited())
	}

	time.Sleep(100 * time.Millisecond)
	for i := 0; i < 5; i++ {
		fmt.Println(limiter.IsLimited())
	}

	// 這個請求觸發限流
	fmt.Println(limiter.IsLimited())

	for _, v := range limiter.windows[limiter.getUidOrIp()] {
		fmt.Println(v.timestamp, v.count)
	}

	fmt.Println("one thousand years later ...")
	time.Sleep(time.Second)
	for i := 0; i < 7; i++ {
		fmt.Println(limiter.IsLimited())
	}
	for _, v := range limiter.windows[limiter.getUidOrIp()] {
		fmt.Println(v.timestamp, v.count)
	}
}

2.3 漏桶

漏桶就像是一個漏斗,進來的水量就像訪問流量一樣,而出去的水量就像我們的系統處理請求一樣

當訪問量較大時,這個漏斗就會積水,如果水量太多就會溢位(拋棄請求)

相對於滑動視窗來說,漏桶有兩個改進點:

  • 增加了一個桶來快取請求,在流量突增時,可以先快取起來,直到超過桶的容量才觸發限流
  • 對出口的流量上限做了限制,使上游流量的抖動不會擴散到下游服務

但是漏桶提供的流量整形能力有一定的代價,超過漏桶流出速率的請求,需要先在漏桶中排隊等待

其中流出速率是漏桶限流的防線,一般會設定的相對保守,可是這樣就無法完全利用系統的效能,增加了請求的排隊時間

golang實現漏桶:

// Task 每個請求到來,需要把執行的業務邏輯封裝成Task,放入漏桶,等待worker取出執行
type Task struct {
	handler func() Result // worker從漏桶取出請求物件後要執行的業務邏輯函式
	resChan chan Result   // 等待worker執行並返回結果的channel
	taskID  int
}

// Result 封裝業務邏輯的執行結果
type Result struct{}

// handler 模擬封裝業務邏輯的函式
func handler() Result {
	time.Sleep(300 * time.Millisecond)
	return Result{}
}

func NewTask(id int) Task {
	return Task{
		handler: handler,
		resChan: make(chan Result),
		taskID:  id,
	}
}

// 漏桶的具體實現
type LeakyBucket struct {
	BucketSize int       // 漏桶大小
	NumWorker  int       // 同時從漏桶中獲取任務執行的worker數量
	bucket     chan Task // 存放任務的漏桶
}

func NewLeakyBucket(bucketSize int, numWorker int) *LeakyBucket {
	return &LeakyBucket{
		BucketSize: bucketSize,
		NumWorker:  numWorker,
		bucket:     make(chan Task, bucketSize),
	}
}

func (b *LeakyBucket) validate(task Task) bool {
	// 如果漏桶容量達到上限,返回false
	select {
	case b.bucket <- task:
	default:
		fmt.Printf("request[id=%d] is refused!\n", task.taskID)
		return false
	}

	// 等待worker執行
	<-task.resChan
	fmt.Printf("request[id=%d] is running!\n", task.taskID)
	return true
}

func (b *LeakyBucket) Start() {
	// 開啟worker從漏桶中獲取任務並執行
	go func() {
		for i := 0; i < b.NumWorker; i++ {
			go func() {
				for {
					task := <-b.bucket
					result := task.handler()
					task.resChan <- result
				}
			}()
		}
	}()
}
func main() {
	bucket := NewLeakyBucket(10, 4)
	bucket.Start()

	var wg sync.WaitGroup
	for i := 0; i < 20; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			task := NewTask(id)
			bucket.validate(task)
		}(i)
	}
	wg.Wait()
}

2.4 令牌桶

令牌桶演算法的核心是固定“進口”速率,限流器在一個一定容量的桶內,按照一定的速率放入Token,然後在處理程式在處理請求的時候,需要拿到Token才能處理

當大量流量進入時,只要令牌的生成速度大於等於請求被處理的速度,那麼此時系統的處理能力就是極限的

根據漏桶和令牌桶的特點,我們可以看到這兩種演算法都有一個恆定速率和一個可變速率:

  • 令牌桶以恆定速率生產令牌,請求令牌的速率是可變的
  • 漏桶以恆定速率處理請求,但是流入速率是可變的

我們可以看到,令牌桶相對於漏桶提升了系統利用率,減少請求排隊時間,但是失去了一定的流量整形能力,使得上游抖動可能會擴散到下游服務

golang實現令牌桶:

// 併發訪問同一個user_id/ip需要加鎖
var recordMu map[string]*sync.RWMutex

func init() {
	recordMu = make(map[string]*sync.RWMutex)
}

func max(a, b int) int {
	if a > b {
		return a
	}
	return b
}

// record 上次訪問時的時間戳和令牌數
type record struct {
	last  time.Time
	token int
}

// TokenBucket 令牌桶的具體實現
type TokenBucket struct {
	BucketSize int                // 令牌桶的容量,最多可以存放多少個令牌
	TokenRate  time.Duration      // 多長時間生成一個令牌
	records    map[string]*record // 報錯user_id/ip的訪問記錄
}

func NewTokenBucket(bucketSize int, tokenRate time.Duration) *TokenBucket {
	return &TokenBucket{
		BucketSize: bucketSize,
		TokenRate:  tokenRate,
		records:    make(map[string]*record),
	}
}

// getUidOrIp 獲取請求使用者的user_id/ip
func (t *TokenBucket) getUidOrIp() string {
	return "127.0.0.1"
}

// getRecord 獲取這個user_id/ip上次訪問的時間戳和令牌數
func (t *TokenBucket) getRecord(uidOrIp string) *record {
	if r, ok := t.records[uidOrIp]; ok {
		return r
	}
	return &record{}
}

func (t *TokenBucket) storeRecord(uidOrIp string, r *record) {
	t.records[uidOrIp] = r
}

func (t *TokenBucket) validate(uidOrIp string) bool {
	rl, ok := recordMu[uidOrIp]
	if !ok {
		var mu sync.RWMutex
		rl = &mu
		recordMu[uidOrIp] = rl
	}

	rl.Lock()
	defer rl.Unlock()

	r := t.getRecord(uidOrIp)
	now := time.Now()
	if r.last.IsZero() {
		// 第一次訪問初始化為最大令牌數
		r.last, r.token = now, t.BucketSize
	} else {
		if r.last.Add(t.TokenRate).Before(now) {
			// 如果與上一次請求隔了token rate
			// 增加令牌,更新last
			r.token += max(int(now.Sub(r.last)/t.TokenRate), t.BucketSize)
			r.last = now
		}
	}

	var result bool
	// 如果令牌數大於1,取走一個令牌,validate結果為true
	if r.token > 0 {
		r.token--
		result = true
	}

	// 儲存最新的record
	t.storeRecord(uidOrIp, r)
	return result
}

// IsLimited 是否被限流
func (t *TokenBucket) IsLimited() bool {
	return !t.validate(t.getUidOrIp())
}
func main() {
	tokenBucket := NewTokenBucket(5, 100*time.Millisecond)
	for i := 0; i < 6; i++ {
		fmt.Println(tokenBucket.IsLimited())
	}
	time.Sleep(100 * time.Millisecond)
	fmt.Println(tokenBucket.IsLimited())
}

3. 分散式限流

我們上面討論的幾種限流演算法在單機場景下都可以實現理想的限流效果,但是如果考慮分散式場景下,限流策略又需要什麼改變呢?

讓我們先來討論一下單節點限流和分散式限流的區別,再針對分散式限流來分析幾種限流方式

在單節點場景下,限流機制作用的位置是客戶端還是服務端?

一般來說,熔斷機制作用在客戶端,限流機制更多在服務端,因為熔斷更強調自適應能力,讓作用點分佈在客戶端是沒問題的,而限流機制更強調控制

如果將限流器作用在服務端,將會給服務端帶來額外的壓力,但是作用在客戶端,這就是一個天然的分散式限流場景了

我們可以考慮的一個策略是,在客戶端實現限流策略的底線,比如讓一個客戶端對一個介面的呼叫不能超過10000併發,這是一個正常情況下不可能達到的閾值,如果超過就進行客戶端限流,避免客戶端異常流量衝擊服務端,進而在服務端實現精細粒度的限流功能

其次,在觸發限流之後,我們應該拋棄請求還是阻塞等待?

一般來說,如果我們可以控制流量產生的速率,阻塞式限流就是一種更好的選擇

如果我們無法控制流量產生的速率,大量請求積壓造成系統資源不可用,那麼否決式限流是更好的選擇

對於線上業務來講,否決式限流觸發之後,使用者會在客戶端進行重試,所以不會對服務帶來明顯影響

對於想訊息佇列這種請求,為了避免打掛下遊服務,通常都會對push進行限速處理,這時候我們就可以採用阻塞等待,同時自適應調節訊息佇列的限速水平

如何實現分散式限流?

最直觀的一個想法是進行集中式限流。系統提供一個外部儲存來做集中限流(令牌桶),但這會給分散式系統帶來脆弱性:

  • 外部儲存成為效能瓶頸
  • 限流器故障導致服務不可用
  • 增加呼叫時延

另一個想法就是將分散式限流進行本地化處理。限流器在獲得一個服務限額的總閾值之後,按照權重分配給不同的例項,但是這樣最大的問題是沒有一個合理的分配比例模型,因為這種限流策略不能動態變化,而導致某些例項在觸發限流時可能會路由到其他例項,增大其處理壓力

一個折中的方案是採用集中式限流的基礎上,增加本地化處理。客戶端只有在令牌數不足時,才會通過限流器獲取令牌,而每次都會獲取一批令牌。