《Go 語言併發之道》讀書筆記(七)
今天這篇筆記我們來學習Go 限流
限流是分散式系統中經常需要用到的技術,因為我們讓請求沒有限制,很容易就出現某個使用者開很多執行緒把我們的服務拉跨,進而影響到別的使用者。
限流
我們來看下Go語言層面可以怎麼做到限流,先看一段不限流的程式碼,
type APIConnection struct{} func Open() *APIConnection { return &APIConnection{} } func (a *APIConnection) ReadFile(ctx context.Context) error { //假裝我們在這裡有執行 return nil } func (a *APIConnection) ResolveAddress(ctx context.Context) error { //假裝我們在這裡有執行 return nil } func main() { defer log.Printf("Done") log.SetOutput(os.Stdout) log.SetFlags(log.Ltime | log.LUTC) apiConnection := Open() var wg sync.WaitGroup wg.Add(20) for i := 0; i < 10; i++ { go func() { defer wg.Done() err := apiConnection.ReadFile(context.Background()) if err != nil { log.Printf("cannot ReadFile : %v", err) } log.Printf("ReadFile") }() } for i := 0; i < 10; i++ { go func() { defer wg.Done() err := apiConnection.ResolveAddress(context.Background()) if err != nil { log.Printf("cannot ResolveAddress : %v", err) } log.Printf("ResolveAddress") }() } wg.Wait() }
上面的程式碼我們定義了兩個假想的方法ReadFile 和 ResolveAddress, 假設他們是去訪問檔案和讀取網路,都是比較耗資源的操作。然後開啟了20個goroutine去呼叫這兩個方法
這段程式碼的執行結果如下
02:32:52 ResolveAddress 02:32:52 ResolveAddress 02:32:52 ResolveAddress 02:32:52 ResolveAddress 02:32:52 ResolveAddress 02:32:52 ResolveAddress 02:32:52 ResolveAddress 02:32:52 ResolveAddress 02:32:52 ResolveAddress 02:32:52 ReadFile 02:32:52 ReadFile 02:32:52 ReadFile 02:32:52 ReadFile 02:32:52 ReadFile 02:32:52 ReadFile 02:32:52 ReadFile 02:32:52 ReadFile 02:32:52 ReadFile 02:32:52 ResolveAddress 02:32:52 ReadFile 02:32:52 Done
我們可以看到一瞬間就都執行完了,如果我們訪問了實際的資源,然後又開了很多的goroutine,那麼很容易就耗盡資源。 為了防止這樣的事情發生,我們引入限流,限定一段時間內,只能訪問一定的資源。 我們今天要講的是基於令牌桶演算法的限速,令牌桶是什麼演算法呢? 很簡單就是有一個基礎的令牌數d, 然後有固定的速度r往令牌桶中放令牌, 使用者拿到令牌才能進行下一步,拿不到就等待。
我們來看程式碼
type APIConnection struct { rateLimiter *rate.Limiter } func Open() *APIConnection { return &APIConnection{ rateLimiter: rate.NewLimiter(rate.Limit(2), 5), } } func (a *APIConnection) ReadFile(ctx context.Context) error { if err := a.rateLimiter.Wait(ctx); err != nil { return err } return nil } func (a *APIConnection) ResolveAddress(ctx context.Context) error { if err := a.rateLimiter.Wait(ctx); err != nil { return err } return nil }
main func我們沒有修改,這裡只是在APIConnection 中增加了一個
rateLimiter: rate.NewLimiter(rate.Limit(2), 5)
rate是golang.org/x/time/rate 下面的一個包, rate.NewLimiter是限速器,方法定義如下
func NewLimiter(r Limit, b int) *Limiter
r就是我們前面說的速率,每秒多少個令牌
b 就是令牌桶的高度,開始的時候有幾個。
然後在ReadFile 和 ResolveAddress 方法中增加了a.rateLimiter.Wait(ctx), Wait就是等待有令牌出現。
執行的結果如下所示
02:48:16 ReadFile
02:48:16 ReadFile
02:48:16 ReadFile
02:48:16 ReadFile
02:48:16 ResolveAddress
02:48:17 ResolveAddress
02:48:17 ResolveAddress
02:48:18 ReadFile
02:48:18 ResolveAddress
02:48:19 ResolveAddress
02:48:19 ResolveAddress
02:48:20 ResolveAddress
02:48:20 ReadFile
02:48:21 ResolveAddress
02:48:21 ReadFile
02:48:22 ReadFile
02:48:22 ReadFile
02:48:23 ResolveAddress
02:48:23 ReadFile
02:48:24 ResolveAddress
02:48:24 Done
通過時間我們可以看到前面很快執行了5次,就是拿到了令牌桶中的5個令牌,後面每秒中執行兩次,也就是我們的速率2個/秒。程式執行符合我們預期,達到了限速的效果。
組合限流
書中作者還舉了兩個例子,運用組合來限速,比如要求一秒中不能超過兩個,同時一分鐘不能超過10個。 屬於Go語言的一點組合功能,示例程式碼如下
type RateLimiter interface {
Wait(context.Context) error
Limit() rate.Limit
}
type multiLimiter struct {
limiters []RateLimiter
}
func MultiLimiter(limiters ...RateLimiter) *multiLimiter {
byLimit := func(i, j int) bool {
return limiters[i].Limit() < limiters[j].Limit()
}
sort.Slice(limiters, byLimit)
return &multiLimiter{limiters: limiters}
}
func (l *multiLimiter) Wait(ctx context.Context) error {
for _, l := range l.limiters {
if err := l.Wait(ctx); err != nil {
return err
}
}
return nil
}
func (l *multiLimiter) Limit() rate.Limit {
return l.limiters[0].Limit()
}
func Per(eventCount int, duration time.Duration) rate.Limit {
return rate.Every(duration / time.Duration(eventCount))
}
type APIConnection struct {
rateLimiter RateLimiter
}
func Open() *APIConnection {
secondLimit := rate.NewLimiter(Per(2, time.Second), 1)
minuteLimit := rate.NewLimiter(Per(10, time.Minute), 10)
return &APIConnection{rateLimiter: MultiLimiter(secondLimit, minuteLimit)}
}
func (a *APIConnection) ReadFile(ctx context.Context) error {
if err := a.rateLimiter.Wait(ctx); err != nil {
return err
}
return nil
}
func (a *APIConnection) ResolveAddress(ctx context.Context) error {
if err := a.rateLimiter.Wait(ctx); err != nil {
return err
}
return nil
}
定義了multiLimiter來組合這些限速器,然後定義了Wait方法。比較簡單,這裡不詳述
還可以不同的裝置分不同的限速器, 這裡也是貼出程式碼不詳述
type APIConnection struct {
networkLimit,
diskLimit,
apiLimit RateLimiter
}
func Open() *APIConnection {
return &APIConnection{
apiLimit: MultiLimiter(
rate.NewLimiter(Per(2, time.Second), 1),
rate.NewLimiter(Per(10, time.Minute), 10),
),
diskLimit: MultiLimiter(
rate.NewLimiter(rate.Limit(1), 1),
),
networkLimit: MultiLimiter(
rate.NewLimiter(Per(3, time.Second), 3),
),
}
}
func (a *APIConnection) ReadFile(ctx context.Context) error {
if err := MultiLimiter(a.apiLimit, a.diskLimit).Wait(ctx); err != nil {
return err
}
return nil
}
func (a *APIConnection) ResolveAddress(ctx context.Context) error {
if err := MultiLimiter(a.apiLimit, a.networkLimit).Wait(ctx); err != nil {
return err
}
return nil
}
不同使用者限流
我們做web請求的時候,會遇到這樣的需求,根據不同的使用者給不同的限速,這裡簡單的給個sample, 其實就是用map把使用者和限速器關聯起來。
var userLimit = make(map[string]*rate.Limiter)
func doWork(user string) {
if userLimit[user].Allow() {
log.Printf("%s do work \n", user)
} else {
log.Printf("%s not work \n", user)
}
}
func Per(eventCount int, duration time.Duration) rate.Limit {
return rate.Every(duration / time.Duration(eventCount))
}
func main() {
log.SetOutput(os.Stdout)
log.SetFlags(log.Ltime | log.LUTC)
userLimit["user1"] = rate.NewLimiter(Per(2, time.Second), 1)
userLimit["user2"] = rate.NewLimiter(Per(2, time.Minute), 5)
var wg sync.WaitGroup
wg.Add(20)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
doWork("user1")
}()
time.Sleep(500 * time.Millisecond)
}
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
doWork("user2")
}()
time.Sleep(500 * time.Millisecond)
}
wg.Wait()
}
上面的例子中使用者1被限制1秒訪問2次,使用者2被限制1分鐘訪問2次
03:19:14 user1 do work
03:19:15 user1 do work
03:19:15 user1 do work
03:19:16 user1 do work
03:19:16 user1 do work
03:19:17 user1 do work
03:19:17 user1 do work
03:19:18 user1 do work
03:19:18 user1 do work
03:19:19 user1 do work
03:19:20 user2 do work
03:19:20 user2 do work
03:19:21 user2 do work
03:19:21 user2 do work
03:19:22 user2 do work
03:19:22 user2 not work
03:19:23 user2 not work
03:19:23 user2 not work
03:19:24 user2 not work
03:19:24 user2 not work
這裡使用者1基本能得到執行,使用者2執行了5次後,由於沒有拿到令牌,就不能work了。這樣達到了不同使用者,不同的限速器。
總結
對於限速,可以在伺服器層面進行限速,我們這裡是在後臺程式端進行限速, 也有不少現成的解決方案 https://www.jianshu.com/p/c13843d2e1ec
對於分散式的系統這樣的限速自然是不夠的,可以結合redis的功能來進行限速,網上有看到些方法: https://blog.csdn.net/jim_007/article/details/110084822
沒有實際操作,後面我們再實際操作下再來記錄。