golang常見的幾種併發模型框架
阿新 • • 發佈:2019-02-20
在golang中,經常使用協程做高併發,本文列舉了幾種常見併發模型。
package main import ( "fmt" "math/rand" "os" "runtime" "sync" "sync/atomic" "time" ) type Scenario struct { Name string Description []string Examples []string RunExample func() } var s1 = &Scenario{ Name: "s1", Description: []string{ "簡單併發執行任務", }, Examples: []string{ "比如併發的請求後端某個介面", }, RunExample: RunScenario1, } var s2 = &Scenario{ Name: "s2", Description: []string{ "持續一定時間的高併發模型", }, Examples: []string{ "在規定時間內,持續的高併發請求後端服務, 防止服務死迴圈", }, RunExample: RunScenario2, } var s3 = &Scenario{ Name: "s3", Description: []string{ "基於大資料量的併發任務模型, goroutine worker pool", }, Examples: []string{ "比如技術支援要給某個客戶刪除幾個TB/GB的檔案", }, RunExample: RunScenario3, } var s4 = &Scenario{ Name: "s4", Description: []string{ "等待非同步任務執行結果(goroutine+select+channel)", }, Examples: []string{ "", }, RunExample: RunScenario4, } var s5 = &Scenario{ Name: "s5", Description: []string{ "定時的反饋結果(Ticker)", }, Examples: []string{ "比如測試上傳介面的效能,要實時給出指標: 吞吐率,IOPS,成功率等", }, RunExample: RunScenario5, } var Scenarios []*Scenario func init() { Scenarios = append(Scenarios, s1) Scenarios = append(Scenarios, s2) Scenarios = append(Scenarios, s3) Scenarios = append(Scenarios, s4) Scenarios = append(Scenarios, s5) } // 常用的併發與同步場景 func main() { if len(os.Args) == 1 { fmt.Println("請選擇使用場景 ==> ") for _, sc := range Scenarios { fmt.Printf("場景: %s ,", sc.Name) printDescription(sc.Description) } return } for _, arg := range os.Args[1:] { sc := matchScenario(arg) if sc != nil { printDescription(sc.Description) printExamples(sc.Examples) sc.RunExample() } } } func printDescription(str []string) { fmt.Printf("場景描述: %s \n", str) } func printExamples(str []string) { fmt.Printf("場景舉例: %s \n", str) } func matchScenario(name string) *Scenario { for _, sc := range Scenarios { if sc.Name == name { return sc } } return nil } var doSomething = func(i int) string { time.Sleep(time.Millisecond * time.Duration(10)) fmt.Printf("Goroutine %d do things .... \n", i) return fmt.Sprintf("Goroutine %d", i) } var takeSomthing = func(res string) string { time.Sleep(time.Millisecond * time.Duration(10)) tmp := fmt.Sprintf("Take result from %s.... \n", res) fmt.Println(tmp) return tmp } // 場景1: 簡單併發任務 func RunScenario1() { count := 10 var wg sync.WaitGroup for i := 0; i < count; i++ { wg.Add(1) go func(index int) { defer wg.Done() doSomething(index) }(i) } wg.Wait() } // 場景2: 按時間來持續併發 func RunScenario2() { timeout := time.Now().Add(time.Second * time.Duration(10)) n := runtime.NumCPU() waitForAll := make(chan struct{}) done := make(chan struct{}) concurrentCount := make(chan struct{}, n) for i := 0; i < n; i++ { concurrentCount <- struct{}{} } go func() { for time.Now().Before(timeout) { <-done concurrentCount <- struct{}{} } waitForAll <- struct{}{} }() go func() { for { <-concurrentCount go func() { doSomething(rand.Intn(n)) done <- struct{}{} }() } }() <-waitForAll } // 場景3:以 worker pool 方式 併發做事/傳送請求 func RunScenario3() { numOfConcurrency := runtime.NumCPU() taskTool := 10 jobs := make(chan int, taskTool) results := make(chan int, taskTool) var wg sync.WaitGroup // workExample workExampleFunc := func(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) { defer wg.Done() for job := range jobs { res := job * 2 fmt.Printf("Worker %d do things, produce result %d \n", id, res) time.Sleep(time.Millisecond * time.Duration(100)) results <- res } } for i := 0; i < numOfConcurrency; i++ { wg.Add(1) go workExampleFunc(i, jobs, results, &wg) } totalTasks := 100 wg.Add(1) go func() { defer wg.Done() for i := 0; i < totalTasks; i++ { n := <-results fmt.Printf("Got results %d \n", n) } close(results) }() for i := 0; i < totalTasks; i++ { jobs <- i } close(jobs) wg.Wait() } // 場景4: 等待非同步任務執行結果(goroutine+select+channel) func RunScenario4() { sth := make(chan string) result := make(chan string) go func() { id := rand.Intn(100) for { sth <- doSomething(id) } }() go func() { for { result <- takeSomthing(<-sth) } }() select { case c := <-result: fmt.Printf("Got result %s ", c) case <-time.After(time.Duration(30 * time.Second)): fmt.Errorf("指定時間內都沒有得到結果") } } var doUploadMock = func() bool { time.Sleep(time.Millisecond * time.Duration(100)) n := rand.Intn(100) if n > 50 { return true } else { return false } } // 場景5: 定時的反饋結果(Ticker) // 測試上傳介面的效能,要實時給出指標: 吞吐率,成功率等 func RunScenario5() { totalSize := int64(0) totalCount := int64(0) totalErr := int64(0) concurrencyCount := runtime.NumCPU() stop := make(chan struct{}) fileSizeExample := int64(10) timeout := 10 // seconds to stop go func() { for i := 0; i < concurrencyCount; i++ { go func(index int) { for { select { case <-stop: return default: break } res := doUploadMock() if res { atomic.AddInt64(&totalCount, 1) atomic.AddInt64(&totalSize, fileSizeExample) } else { atomic.AddInt64(&totalErr, 1) } } }(i) } }() t := time.NewTicker(time.Second) index := 0 for { select { case <-t.C: index++ tmpCount := atomic.LoadInt64(&totalCount) tmpSize := atomic.LoadInt64(&totalSize) tmpErr := atomic.LoadInt64(&totalErr) fmt.Printf("吞吐率: %d,成功率: %d \n", tmpSize/int64(index), tmpCount*100/(tmpCount+tmpErr)) if index > timeout { t.Stop() close(stop) return } } } }