1. 程式人生 > 其它 >《Go 語言併發之道》讀書筆記(七)

《Go 語言併發之道》讀書筆記(七)

今天這篇筆記我們來學習Go 限流
限流是分散式系統中經常需要用到的技術,因為我們讓請求沒有限制,很容易就出現某個使用者開很多執行緒把我們的服務拉跨,進而影響到別的使用者。

限流

我們來看下Go語言層面可以怎麼做到限流,先看一段不限流的程式碼,

type APIConnection struct{}

func Open() *APIConnection {
	return &APIConnection{}
}

func (a *APIConnection) ReadFile(ctx context.Context) error {
    //假裝我們在這裡有執行
	return nil
}

func (a *APIConnection) ResolveAddress(ctx context.Context) error {
    //假裝我們在這裡有執行
	return nil
}


func main() {
	defer log.Printf("Done")

	log.SetOutput(os.Stdout)
	log.SetFlags(log.Ltime | log.LUTC)

	apiConnection := Open()

	var wg sync.WaitGroup
	wg.Add(20)

	for i := 0; i < 10; i++ {
		go func() {
			defer wg.Done()
			err := apiConnection.ReadFile(context.Background())
			if err != nil {
				log.Printf("cannot ReadFile : %v", err)
			}
			log.Printf("ReadFile")
		}()
	}

	for i := 0; i < 10; i++ {
		go func() {
			defer wg.Done()
			err := apiConnection.ResolveAddress(context.Background())
			if err != nil {
				log.Printf("cannot ResolveAddress : %v", err)
			}
			log.Printf("ResolveAddress")
		}()
	}

	wg.Wait()

}

上面的程式碼我們定義了兩個假想的方法ReadFile 和 ResolveAddress, 假設他們是去訪問檔案和讀取網路,都是比較耗資源的操作。然後開啟了20個goroutine去呼叫這兩個方法
這段程式碼的執行結果如下

02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ResolveAddress
02:32:52 ReadFile
02:32:52 Done

我們可以看到一瞬間就都執行完了,如果我們訪問了實際的資源,然後又開了很多的goroutine,那麼很容易就耗盡資源。 為了防止這樣的事情發生,我們引入限流,限定一段時間內,只能訪問一定的資源。 我們今天要講的是基於令牌桶演算法的限速,令牌桶是什麼演算法呢? 很簡單就是有一個基礎的令牌數d, 然後有固定的速度r往令牌桶中放令牌, 使用者拿到令牌才能進行下一步,拿不到就等待。
我們來看程式碼

type APIConnection struct {
	rateLimiter *rate.Limiter
}

func Open() *APIConnection {
	return &APIConnection{
		rateLimiter: rate.NewLimiter(rate.Limit(2), 5),
	}
}

func (a *APIConnection) ReadFile(ctx context.Context) error {
	if err := a.rateLimiter.Wait(ctx); err != nil {
		return err
	}
	return nil
}

func (a *APIConnection) ResolveAddress(ctx context.Context) error {
	if err := a.rateLimiter.Wait(ctx); err != nil {
		return err
	}
	return nil
}

main func我們沒有修改,這裡只是在APIConnection 中增加了一個
rateLimiter: rate.NewLimiter(rate.Limit(2), 5)
rate是golang.org/x/time/rate 下面的一個包, rate.NewLimiter是限速器,方法定義如下
func NewLimiter(r Limit, b int) *Limiter
r就是我們前面說的速率,每秒多少個令牌
b 就是令牌桶的高度,開始的時候有幾個。

然後在ReadFile 和 ResolveAddress 方法中增加了a.rateLimiter.Wait(ctx), Wait就是等待有令牌出現。
執行的結果如下所示

02:48:16 ReadFile
02:48:16 ReadFile
02:48:16 ReadFile
02:48:16 ReadFile
02:48:16 ResolveAddress
02:48:17 ResolveAddress
02:48:17 ResolveAddress
02:48:18 ReadFile
02:48:18 ResolveAddress
02:48:19 ResolveAddress
02:48:19 ResolveAddress
02:48:20 ResolveAddress
02:48:20 ReadFile
02:48:21 ResolveAddress
02:48:21 ReadFile
02:48:22 ReadFile
02:48:22 ReadFile
02:48:23 ResolveAddress
02:48:23 ReadFile
02:48:24 ResolveAddress
02:48:24 Done

通過時間我們可以看到前面很快執行了5次,就是拿到了令牌桶中的5個令牌,後面每秒中執行兩次,也就是我們的速率2個/秒。程式執行符合我們預期,達到了限速的效果。

組合限流

書中作者還舉了兩個例子,運用組合來限速,比如要求一秒中不能超過兩個,同時一分鐘不能超過10個。 屬於Go語言的一點組合功能,示例程式碼如下


type RateLimiter interface {
	Wait(context.Context) error
	Limit() rate.Limit
}

type multiLimiter struct {
	limiters []RateLimiter
}

func MultiLimiter(limiters ...RateLimiter) *multiLimiter {
	byLimit := func(i, j int) bool {
		return limiters[i].Limit() < limiters[j].Limit()
	}
	sort.Slice(limiters, byLimit)
	return &multiLimiter{limiters: limiters}
}

func (l *multiLimiter) Wait(ctx context.Context) error {
	for _, l := range l.limiters {
		if err := l.Wait(ctx); err != nil {
			return err
		}
	}
	return nil
}

func (l *multiLimiter) Limit() rate.Limit {
	return l.limiters[0].Limit()
}

func Per(eventCount int, duration time.Duration) rate.Limit {
	return rate.Every(duration / time.Duration(eventCount))
}


type APIConnection struct {
	rateLimiter RateLimiter
}

func Open() *APIConnection {

	secondLimit := rate.NewLimiter(Per(2, time.Second), 1)
	minuteLimit := rate.NewLimiter(Per(10, time.Minute), 10)
	return &APIConnection{rateLimiter: MultiLimiter(secondLimit, minuteLimit)}
}

func (a *APIConnection) ReadFile(ctx context.Context) error {
	if err := a.rateLimiter.Wait(ctx); err != nil {
		return err
	}
	return nil
}

func (a *APIConnection) ResolveAddress(ctx context.Context) error {
	if err := a.rateLimiter.Wait(ctx); err != nil {
		return err
	}
	return nil
}

定義了multiLimiter來組合這些限速器,然後定義了Wait方法。比較簡單,這裡不詳述
還可以不同的裝置分不同的限速器, 這裡也是貼出程式碼不詳述

type APIConnection struct {
	networkLimit,
	diskLimit,
	apiLimit RateLimiter
}

func Open() *APIConnection {

	return &APIConnection{
		apiLimit: MultiLimiter(
			rate.NewLimiter(Per(2, time.Second), 1),
			rate.NewLimiter(Per(10, time.Minute), 10),
		),
		diskLimit: MultiLimiter(
			rate.NewLimiter(rate.Limit(1), 1),
		),
		networkLimit: MultiLimiter(
			rate.NewLimiter(Per(3, time.Second), 3),
		),
	}
}

func (a *APIConnection) ReadFile(ctx context.Context) error {
	if err := MultiLimiter(a.apiLimit, a.diskLimit).Wait(ctx); err != nil {
		return err
	}
	return nil
}

func (a *APIConnection) ResolveAddress(ctx context.Context) error {
	if err := MultiLimiter(a.apiLimit, a.networkLimit).Wait(ctx); err != nil {
		return err
	}
	return nil
}

不同使用者限流

我們做web請求的時候,會遇到這樣的需求,根據不同的使用者給不同的限速,這裡簡單的給個sample, 其實就是用map把使用者和限速器關聯起來。


var userLimit = make(map[string]*rate.Limiter)

func doWork(user string) {
	if userLimit[user].Allow() {
		log.Printf("%s do work \n", user)
	} else {
		log.Printf("%s not work \n", user)
	}

}

func Per(eventCount int, duration time.Duration) rate.Limit {
	return rate.Every(duration / time.Duration(eventCount))
}

func main() {

	log.SetOutput(os.Stdout)
	log.SetFlags(log.Ltime | log.LUTC)

	userLimit["user1"] = rate.NewLimiter(Per(2, time.Second), 1)
	userLimit["user2"] = rate.NewLimiter(Per(2, time.Minute), 5)

	var wg sync.WaitGroup
	wg.Add(20)

	for i := 0; i < 10; i++ {
		go func() {
			defer wg.Done()
			doWork("user1")

		}()

		time.Sleep(500 * time.Millisecond)

	}

	for i := 0; i < 10; i++ {
		go func() {
			defer wg.Done()
			doWork("user2")
		}()
		time.Sleep(500 * time.Millisecond)
	}

	wg.Wait()

}

上面的例子中使用者1被限制1秒訪問2次,使用者2被限制1分鐘訪問2次

03:19:14 user1 do work 
03:19:15 user1 do work 
03:19:15 user1 do work 
03:19:16 user1 do work 
03:19:16 user1 do work 
03:19:17 user1 do work 
03:19:17 user1 do work 
03:19:18 user1 do work 
03:19:18 user1 do work 
03:19:19 user1 do work 
03:19:20 user2 do work 
03:19:20 user2 do work 
03:19:21 user2 do work 
03:19:21 user2 do work 
03:19:22 user2 do work 
03:19:22 user2 not work 
03:19:23 user2 not work 
03:19:23 user2 not work 
03:19:24 user2 not work 
03:19:24 user2 not work 

這裡使用者1基本能得到執行,使用者2執行了5次後,由於沒有拿到令牌,就不能work了。這樣達到了不同使用者,不同的限速器。

總結

對於限速,可以在伺服器層面進行限速,我們這裡是在後臺程式端進行限速, 也有不少現成的解決方案 https://www.jianshu.com/p/c13843d2e1ec
對於分散式的系統這樣的限速自然是不夠的,可以結合redis的功能來進行限速,網上有看到些方法: https://blog.csdn.net/jim_007/article/details/110084822
沒有實際操作,後面我們再實際操作下再來記錄。