golang協程池throttler實現解析
阿新 • • 發佈:2019-12-31
這次來介紹下,golang協程池的使用,以throttler
實現為例。
首先介紹如何使用(拿作者github的例子為例)~
func ExampleThrottler() {
var urls = []string{
"http://www.golang.org/","http://www.google.com/","http://www.somestupidname.com/",}
引數1:啟動的協程數量
引數2:需要執行的任務數
t := New(2,len(urls))
for _,url := range urls {
// goroutine 啟動
go func(url string) {
// 請求url
err := http.Get(url)
//讓 throttler知道goroutines何時完成,然後throttler會新任命一個worker
t.Done(err)
}(url)
errorCount := t.Throttle()
if errorCount > 0 {
break
}
}
}
複製程式碼
雖然作者的readme.md
沒寫,但是我們也可用這樣用
package main
import (
"github.com/throttler"
"fmt"
)
func main() {
p := throttler.New(10,5)
go func() {
fmt.Println("hello world1")
defer p.Done(nil)
}()
fmt.Println(1)
p.Throttle()
go func() {
fmt.Println("hello world2" )
p.Done(nil)
}()
fmt.Println(2)
p.Throttle()
go func() {
fmt.Println("hello world3")
p.Done(nil)
}()
fmt.Println(3)
p.Throttle()
//fmt.Println(err + 3)
go func() {
fmt.Println("hello world4")
p.Done(nil)
}()
fmt.Println(4)
p.Throttle()
//fmt.Println(err + 2)
go func() {
fmt.Println("hello world5" )
p.Done(nil)
}()
fmt.Println(5)
p.Throttle()
}
複製程式碼
以上就是Throttle
的使用例子,看起來非常簡單,那麼它是如何實現的呢?
首先我們看下throttle
的主體結構,後續的操作都圍繞著主體結構實現的
// Throttler stores all the information about the number of workers,the active workers and error information
type Throttler struct {
maxWorkers int32 // 最大的worker數
workerCount int32 // 正在工作的worker數量
batchingTotal int32
batchSize int32 //
totalJobs int32 // 任務數量的和
jobsStarted int32 // 任務開始的數量(初始值為0)
jobsCompleted int32 // 任務完成的數量
doneChan chan struct{} // 非緩衝佇列,儲存的一半是count(totalJobs)
errsMutex *sync.Mutex // errMutex的併發
errs []error // 錯誤陣列的集合,一般是業務處理返回的error
errorCount int32
}
複製程式碼
New
操作建立一個協程池
func New(maxWorkers,totalJobs int) *Throttler {
// 如果小於1 panic
if maxWorkers < 1 {
panic("maxWorkers has to be at least 1")
}
return &Throttler{
// 最大協程數量
maxWorkers: int32(maxWorkers),batchSize: 1,// 所有的任務數
totalJobs: int32(totalJobs),doneChan: make(chan struct{},totalJobs),errsMutex: &sync.Mutex{},}
}
複製程式碼
當完成一個協程動作
func (t *Throttler) Done(err error) {
if err != nil {
// 如果出現錯誤,將錯誤追加到struct裡面,因為struct非執行緒安全,所以需要加鎖
t.errsMutex.Lock()
t.errs = append(t.errs,err)
// errorCount ++
atomic.AddInt32(&t.errorCount,1)
t.errsMutex.Unlock()
}
// 每當一個goroutine進來,向struct寫入一條資料
t.doneChan <- struct{}{}
}
複製程式碼
等待協程完成的函式實現,可能稍微有點複雜
func (t *Throttler) Throttle() int {
// 載入任務數 < 1 返回錯誤的數量
if atomic.LoadInt32(&t.totalJobs) < 1 {
return int(atomic.LoadInt32(&t.errorCount))
}
// jobStarted + 1
atomic.AddInt32(&t.jobsStarted,1)
// workerCount + 1
atomic.AddInt32(&t.workerCount,1)
// 檢查當前worker的數量是否和maxworker數量一致,等待這個workers完成
// 實際上就是協程數量到達上限,需要等待執行中的協程釋放資源
if atomic.LoadInt32(&t.workerCount) == atomic.LoadInt32(&t.maxWorkers) {
// 完成jobsCompleted - 1
atomic.AddInt32(&t.jobsCompleted,1)
// workerCount - 1
atomic.AddInt32(&t.workerCount,-1)
<-t.doneChan
}
// check to see if all of the jobs have been started,and if so,wait until all
// jobs have been completed before continuing
// 如果任務開始的數量和總共的任務數一致
if atomic.LoadInt32(&t.jobsStarted) == atomic.LoadInt32(&t.totalJobs) {
// 如果完成的數量小於總job數 等待Job完成
for atomic.LoadInt32(&t.jobsCompleted) < atomic.LoadInt32(&t.totalJobs) {
// jobcomplete + 1
atomic.AddInt32(&t.jobsCompleted,1)
<-t.doneChan
}
}
return int(atomic.LoadInt32(&t.errorCount))
}
複製程式碼
簡單列舉了下實現的流程:
假設有2個請求限制,3個請求,它的時序圖是這樣的
第一輪
totaljobs = 3
jobstarted = 1 workercount = 1 jobscompleted = 0 totaljobs = 3
複製程式碼
第二輪
jobstarted = 2 worker count = 2 jobscompleted = 0 totaljobs = 3
複製程式碼
第三輪
jobstarted = 3 worker count = 3 jobscompleted = 0 totaljobs = 3
// 操作1:因為goroutine限制為2,當前wokercount為3,需要阻塞,等待協程池釋放
// 協程池釋放:
jobstarted = 3 worker count = 2 jobscompleted = 1 totaljobs = 3
// 操作2:當前jobstarted與totaljobs相等,說明所有任務都已經池化了,則開始阻塞處理
//執行結束:
jobstarted = 3 worker count = 2 jobscompleted = 3 totaljobs = 3
複製程式碼
總的來說,該實現也是借用了channel
的能力進行阻塞,實現起來還是非常簡單的~