1. 程式人生 > 其它 >go time時間偏移_Go 語言中的執行緒池(Thread Pooling in Go Programming)

go time時間偏移_Go 語言中的執行緒池(Thread Pooling in Go Programming)

技術標籤:go time時間偏移

321a26ee-cd2f-eb11-8da9-e4434bdf6706.png

用過一段時間的 Go 之後,我學會了如何使用一個不帶快取的 channel 去建立一個 goroutine 池。我喜歡這個實現,這個實現甚至比這篇博文描述的更好。雖然這樣說,這篇博文仍然對它所描述的部分有一定的價值。

https://github.com/goinggo/work

介紹(Introduction)

在我的伺服器開發的職業生涯裡,執行緒池一直是在微軟系統的堆疊上構建健壯程式碼的關鍵。微軟在 .Net 上的失敗,是因為它給每個程序分配一個單獨的執行緒池,並認為在它們併發執行時能夠管理好。我早就已經意識到這是不可能的。至少,在我開發的伺服器上不可行。

當我用 Win32 API,C/C++ 構建系統時,我建立了一個抽象的 IOCP 類,它可以給我分配好執行緒池,我把工作扔給它(去處理)。這樣工作得非常好,並且我還能夠指定執行緒池的數量和併發度(能夠同時被執行的執行緒數)。在我使用 C# 開發的時間裡,我沿用了這段程式碼。如果你想了解更多,我在幾年前寫了一篇文章 http://www.theukwebdesigncompany.com/articles/iocp-thread-pooling.php 。 使用 IOCP,給我帶來了需要的效能和靈活性。 順便說一下,.Net 執行緒池使用了下面的 IOCP。

執行緒池的想法非常簡單。工作被髮送到伺服器,它們需要被處理。大多數工作本質上是非同步的,但不一定是。大多數時候,工作來自於一個內部協程的通訊。執行緒池將工作加入其中,然後這個池子中的一個執行緒會被分配來處理這個工作。工作按照接收的順序被執行。執行緒池為有效地執行工作提供了一個很好的模式。(設想一下,)每次需要處理工作時,產生一個新執行緒會給作業系統帶來沉重的負擔,並導致嚴重的效能問題。

那麼如何調整執行緒池的效能呢?你需要找出執行緒池包含多少個執行緒時,工作被處理得最快。當所有的執行緒都在忙著處理任務時,新的任務將待在佇列裡。這是你希望的,因為從某些方面來說,太多的執行緒(反而)會導致處理工作變得更慢。導致這個現象有幾個原因,像機器上的 CPU 核數需要有能力去處理資料庫請求(等)。經過測試,你可以找到最合適的數值。

我總是先找出(機器上的 CPU)有多少個核,以及要被處理的工作的型別。工作阻塞時,(它們)平均被阻塞多久。在微軟系統的堆疊上,我發現對於大多數工作來說,每個核上執行 3 個執行緒能夠獲得最好的效能。Go 的話,我還不知道最佳的數字。

你也可以為不同型別的工作建立不同的執行緒池。因為每種執行緒池都可以被配置,你可以花點時間使伺服器獲得最大輸出。通過這種方式的指揮和控制對於實現最大化伺服器能力至關重要。

在 Go 語言中我不建立執行緒,而是建立協程。協程函式類似於多執行緒函式,但由 Go 來管理實際上在系統層面執行的執行緒。瞭解更多關於 Go 中的併發,檢視這個文件:http://golang.org/doc/effective_go.html#concurrency

我建立了名為 workpool 和 jobpool 的包。它們通過 channel 和 go 協程來實現池的功能。

工作池(Workpool)

這個包建立了一個 go 協程池,專門用來處理髮布到池子中的工作。一個獨立的 Go 協程負責工作的排隊處理。協程提供安全的工作排隊,跟蹤佇列中工作量,當佇列滿時報告錯誤。

提交工作到佇列中是一個阻塞操作。這樣呼叫者才能知道工作是否已經進入佇列。(workpool 也會一直)保持工作佇列中活動程式數量的計數。

這是如何使用 workpool 的樣例程式碼:

package main
import (
 "bufio"
 "fmt"
 "os"
 "runtime"
 "strconv"
 "time"
 "github.com/goinggo/workpool"
)
type MyWork struct {
 Name string
 BirthYear int
 WP *workpool.WorkPool
}
func (mw *MyWork) DoWork(workRoutine int) {
 fmt.Printf("%s : %dn", mw.Name, mw.BirthYear)
 fmt.Printf("Q:%d R:%dn", mw.WP.QueuedWork(), mw.WP.ActiveRoutines())
 // Simulate some delay
 time.Sleep(100 * time.Millisecond)
}
func main() {
 runtime.GOMAXPROCS(runtime.NumCPU())
 workPool := workpool.New(runtime.NumCPU(), 800)
 shutdown := false // Race Condition, Sorry
 go func() {
  for i := 0; i < 1000; i++ {
   work := MyWork {
    Name: "A" + strconv.Itoa(i),
    BirthYear: i,
    WP: workPool,
   }
   if err := workPool.PostWork("routine", &work); err != nil {
    fmt.Printf("ERROR: %sn", err)
    time.Sleep(100 * time.Millisecond)
   }
   if shutdown == true {
    return
   }
  }
 }()
 fmt.Println("Hit any key to exit")
 reader := bufio.NewReader(os.Stdin)
 reader.ReadString(’n’)
 shutdown = true
 fmt.Println("Shutting Down")
 workPool.Shutdown("routine")
}

看下 main 函式,我們建立了一個協程池,協程數量基於機器上的核數。這意味每個核都對應有一個協程。如果每個核都處於忙碌狀態,(那麼)你將無法做更多的事情。再(執行)一次,效能測試會檢測出哪個數量是最合適的。第二個引數是佇列的大小。在這種情況下,我讓佇列足夠大(800),保證所有的請求都可以進來。

MyWork 型別定義了我需要執行的工作狀態。我們需要成員函式 DoWork,因為它實現了 PostWork 呼叫的介面。要將任何任務傳遞給執行緒池,都必須實現這個方法。

DoWork 方法做了兩件事。第一是,它顯示物件的狀態。第二,它實時報告佇列中的數量和 Go 協程併發執行的數量。這些數值可以用來檢查執行緒池的健康狀態和做效能測試。

最後,一個 Go 協程專門迴圈地將工作傳遞給工作池。同時,工作池為佇列中的每個物件執行 DoWork 方法。Go 協程最終會完成,工作池繼續執行它的工作。在任何時候當我們介入時,程式將優雅地停止。

在這個範例程式中,PostWork 方法能夠返回一個錯誤。這是因為 PostWork 方法將保證任務放在佇列中或者失敗。這個失敗的唯一原因是佇列已滿。(所以)設定佇列的長度是一個重要的考慮項。

作業池(Jobpool)

jobpool 包跟 workpool 包很相似,除了一個實現的細節。這個包包含兩個佇列,一個是普通的處理佇列,另外一個是高優先順序的處理佇列。阻塞的高優先順序佇列總是比阻塞的普通佇列先獲得處理。

兩種佇列的使用導致 jobpool 比 workpool 更加複雜。如果你不需要高優先順序的處理,那麼使用 workpool 將更快,更有效。

這是如何使用 jobpool 的範例程式碼:

package main
import (
 "fmt"
 "time"
 "github.com/goinggo/jobpool"
)
type WorkProvider1 struct {
 Name string
}
func (wp *WorkProvider1) RunJob(jobRoutine int) {
 fmt.Printf("Perform Job : Provider 1 : Started: %sn", wp.Name)
 time.Sleep(2 * time.Second)
 fmt.Printf("Perform Job : Provider 1 : DONE: %sn", wp.Name)
}
type WorkProvider2 struct {
 Name string
}
func (wp *WorkProvider2) RunJob(jobRoutine int) {
 fmt.Printf("Perform Job : Provider 2 : Started: %sn", wp.Name)
 time.Sleep(5 * time.Second)
 fmt.Printf("Perform Job : Provider 2 : DONE: %sn", wp.Name)
}
func main() {
 jobPool := jobpool.New(2, 1000)
 jobPool.QueueJob("main", &WorkProvider1{"Normal Priority : 1"}, false)
 fmt.Printf("*******> QW: %d AR: %dn",
  jobPool.QueuedJobs(),
  jobPool.ActiveRoutines())
 time.Sleep(1 * time.Second)
 jobPool.QueueJob("main", &WorkProvider1{"Normal Priority : 2"}, false)
 jobPool.QueueJob("main", &WorkProvider1{"Normal Priority : 3"}, false)
 jobPool.QueueJob("main", &WorkProvider2{"High Priority : 4"}, true)
 fmt.Printf("*******> QW: %d AR: %dn",
  jobPool.QueuedJobs(),
  jobPool.ActiveRoutines())
 time.Sleep(15 * time.Second)
 jobPool.Shutdown("main")
}

在這個範例程式碼中,我們建立了兩個 worker 型別的結構體。可以將每個 worker 都視為系統中一個獨立的作業。

在 main 函式中,我們建立了一個包含 2 個協程的作業池,支援 1000 個待處理的作業。首先我們建立了 3 個不同的 WorkProvider1 物件,並將她們傳遞給了佇列,設定優先順序標誌位為 false。接下來我們建立一個 WorkProvider2 物件,並將它傳遞給佇列,設定優先順序標誌位為 true。

因為作業池中有 2 個協程,先建立的兩個作業將進入佇列並被處理。一旦它們的任務完成,接下來的作業將從佇列中檢索。WorkProvider2 作業將會被執行,因為它被放在了高優先順序佇列中。

想獲取 workpool 包和 jobpool 包的程式碼,請訪問 github.com/goinggo

一如既往,我希望這份程式碼可以在某些方面幫上你一點點。