1. 程式人生 > 程式設計 >golang協程池throttler實現解析

golang協程池throttler實現解析

這次來介紹下,golang協程池的使用,以throttler實現為例。

首先介紹如何使用(拿作者github的例子為例)~

func ExampleThrottler() {
	var urls = []string{
		"http://www.golang.org/","http://www.google.com/","http://www.somestupidname.com/",}
	引數1:啟動的協程數量
	引數2:需要執行的任務數
	t := New(2,len(urls))
	for _,url := range urls {

		// goroutine 啟動
		go func(url string) {
			// 請求url
			err := http.Get(url)
			//讓 throttler知道goroutines何時完成,然後throttler會新任命一個worker
		    t.Done(err)
		}(url)
		errorCount := t.Throttle()
		if
errorCount > 0 { break } } } 複製程式碼

雖然作者的readme.md沒寫,但是我們也可用這樣用

package main

import (
	"github.com/throttler"
	"fmt"
)

func main() {

	p := throttler.New(10,5)

	go func() {
		fmt.Println("hello world1")
		defer p.Done(nil)
	}()
	fmt.Println(1)
	p.Throttle()
	go func() {
		fmt.Println("hello world2"
) p.Done(nil) }() fmt.Println(2) p.Throttle() go func() { fmt.Println("hello world3") p.Done(nil) }() fmt.Println(3) p.Throttle() //fmt.Println(err + 3) go func() { fmt.Println("hello world4") p.Done(nil) }() fmt.Println(4) p.Throttle() //fmt.Println(err + 2) go func() { fmt.Println("hello world5"
) p.Done(nil) }() fmt.Println(5) p.Throttle() } 複製程式碼

以上就是Throttle的使用例子,看起來非常簡單,那麼它是如何實現的呢?

首先我們看下throttle的主體結構,後續的操作都圍繞著主體結構實現的

// Throttler stores all the information about the number of workers,the active workers and error information
type Throttler struct {
	maxWorkers    int32				// 最大的worker數
	workerCount   int32				// 正在工作的worker數量
	batchingTotal int32
	batchSize     int32				// 
	totalJobs     int32    			// 任務數量的和
	jobsStarted   int32  			// 任務開始的數量(初始值為0)
	jobsCompleted int32	 			// 任務完成的數量
	doneChan      chan struct{}		// 非緩衝佇列,儲存的一半是count(totalJobs)
	errsMutex     *sync.Mutex		// errMutex的併發
	errs          []error 			// 錯誤陣列的集合,一般是業務處理返回的error
	errorCount    int32
}
複製程式碼

New操作建立一個協程池

func New(maxWorkers,totalJobs int) *Throttler {
	// 如果小於1 panic
	if maxWorkers < 1 {
		panic("maxWorkers has to be at least 1")
	}

	return &Throttler{
		// 最大協程數量
		maxWorkers: int32(maxWorkers),batchSize:  1,// 所有的任務數
		totalJobs:  int32(totalJobs),doneChan:   make(chan struct{},totalJobs),errsMutex:  &sync.Mutex{},}
}
複製程式碼

當完成一個協程動作

func (t *Throttler) Done(err error) {
	if err != nil {
		// 如果出現錯誤,將錯誤追加到struct裡面,因為struct非執行緒安全,所以需要加鎖
		t.errsMutex.Lock()
		t.errs = append(t.errs,err)
		// errorCount ++
		atomic.AddInt32(&t.errorCount,1)
		t.errsMutex.Unlock()
	} 
	// 每當一個goroutine進來,向struct寫入一條資料
	t.doneChan <- struct{}{}
}
複製程式碼

等待協程完成的函式實現,可能稍微有點複雜

func (t *Throttler) Throttle() int {
	// 載入任務數  < 1 返回錯誤的數量
	if atomic.LoadInt32(&t.totalJobs) < 1 {
		return int(atomic.LoadInt32(&t.errorCount))
	}

	// jobStarted + 1 
	atomic.AddInt32(&t.jobsStarted,1)
	// workerCount + 1
	atomic.AddInt32(&t.workerCount,1)


	// 檢查當前worker的數量是否和maxworker數量一致,等待這個workers完成

	// 實際上就是協程數量到達上限,需要等待執行中的協程釋放資源
	if atomic.LoadInt32(&t.workerCount) == atomic.LoadInt32(&t.maxWorkers) {
		// 完成jobsCompleted - 1
		atomic.AddInt32(&t.jobsCompleted,1)
		// workerCount - 1
		atomic.AddInt32(&t.workerCount,-1)
		<-t.doneChan
	}

	// check to see if all of the jobs have been started,and if so,wait until all
	// jobs have been completed before continuing

	// 如果任務開始的數量和總共的任務數一致
	if atomic.LoadInt32(&t.jobsStarted) == atomic.LoadInt32(&t.totalJobs) {
		// 如果完成的數量小於總job數 等待Job完成
		for atomic.LoadInt32(&t.jobsCompleted) < atomic.LoadInt32(&t.totalJobs) {
			// jobcomplete + 1
			atomic.AddInt32(&t.jobsCompleted,1)
			<-t.doneChan
		}
	}

	return int(atomic.LoadInt32(&t.errorCount))
}
複製程式碼

簡單列舉了下實現的流程:

假設有2個請求限制,3個請求,它的時序圖是這樣的

第一輪

totaljobs = 3
jobstarted = 1 workercount = 1   jobscompleted = 0 totaljobs = 3
複製程式碼

第二輪

jobstarted = 2 worker count = 2   jobscompleted = 0 totaljobs = 3
複製程式碼

第三輪

jobstarted = 3 worker count = 3 jobscompleted = 0 totaljobs = 3

// 操作1:因為goroutine限制為2,當前wokercount為3,需要阻塞,等待協程池釋放

// 協程池釋放:
jobstarted = 3 worker count = 2 jobscompleted = 1 totaljobs = 3

// 操作2:當前jobstarted與totaljobs相等,說明所有任務都已經池化了,則開始阻塞處理


//執行結束:

jobstarted = 3 worker count = 2 jobscompleted = 3 totaljobs = 3

複製程式碼

總的來說,該實現也是借用了channel的能力進行阻塞,實現起來還是非常簡單的~