1. 程式人生 > >go——併發

go——併發

  併發與並行的區別:
併發:邏輯上具備同時處理多個任務的能力
並行:物理上在同一時刻執行多個併發任務
通常都會說程式是併發設計的,也就是說它允許多個任務同時執行,但實際上並不一定真在同一時刻發生。
在單核處理器上,它們能以間隔方式切換執行
並行則依賴多核處理器等物理裝置,讓多個任務真正在同一時刻執行,它代表了當前程式執行狀態。
簡單點說,並行是併發設計的理想執行模式。
多執行緒或多程序是並行的基本條件,但單執行緒也可用協程(coroutine)做到併發
儘管協程在單個執行緒上通過主動切換來實現多工併發,但它也有自己的優勢。
除了將因阻塞而浪費的時間找回來外,還免去了執行緒切換開銷,有著不錯的執行效率。
協程上執行的多個任務本質上依舊序列,加上可控自主排程,所以並不需要做同步處理。
即使採用多執行緒也未必能並行。Python就因GIL限制,預設只能併發而不能並行,所以很多時候轉而使用“多程序”+“協程”架構。
通常情況下,用多程序來實現分散式和負載均衡,減輕單程序垃圾回收壓力;
用多執行緒(LWP)搶奪更多的處理器資源;用協程來提高處理器時間片利用率。

簡單將goroutine歸納為協程並不合適。執行時會建立多個執行緒來執行併發任務,且任務單元可被排程到其它執行緒並行執行。
這更像是多執行緒和協程的綜合體,能最大限度提升執行效率,發揮多核處理能力。
只須在函式呼叫前新增go關鍵字即可建立併發任務。

go fmt.PrintLn("hello")

關鍵字go並非執行併發操作,而是建立一個併發任務單元。
新建任務被放置在系統佇列中,等待排程器安排合適系統執行緒去獲取執行權。
當前流程不會阻塞,不會等待該任務啟動,且執行時也不保證併發任務的執行次序。

每個任務單元除儲存函式指標、呼叫引數外,還會分配執行所需的棧記憶體空間。
相比系統預設MB級別的執行緒棧,goroutine自定義棧初始僅須2KB,所以才能建立成千上萬的併發任務。
自定義棧採取按需分配策略,在需要時進行擴容,最大能到GB規模。

與defer一樣,goroutine也會因“延遲執行”而立即計算並複製執行引數。

package main

import (
	"fmt"
	"time"
)

var c int //初始為0

func counter() int {
	c++
	return c
}

func main() {
	a := 100

	go func(x, y int) { //匿名函式直接執行
		//hour min second  時分秒
		time.Sleep(time.Second) //休息一秒鐘
		fmt.Println("go", x, y) //執行counter 100 1  //goroutine在main之後執行
	}(a, counter()) //立即計算並複製引數

	a += 100
	fmt.Println("main:", a, counter()) //200 2
	time.Sleep(time.Second * 3)        //等待goroute結束

}

/*
main: 200 2
go 100 1
為什麼先列印main?
go建立一個併發單元,但是不會馬上執行,而是會放置在佇列中
*/

 

  程序退出時不會等待併發任務結束,可用通道(channel)阻塞,然後發出退出訊號。

package main

import (
	"fmt"
	"time"
)

func main() {
	// exit := make(chan struct{}) //建立通道。因為僅僅是通知,資料並沒有實際意義。

	go func() {
		time.Sleep(time.Second)
		fmt.Println("goroutine done.")

		// close(exit) //關閉通道發出訊號
	}()

	fmt.Println("main...")
	// <-exit //如通道關閉,立即解除阻塞。
	fmt.Println("main exit...")
}

 

<-exit接收操作符,如果exit代表了元素型別為byte的通道型別值,則此表示式就表示從exit中接收一個byte型別值的操作。
如果不建立通道,main直接結束之後程序就結束了,而不會等待併發任務的結束,這樣就不會執行併發任務。
所以,我們可以建立一個通道,當通道關閉後才會解除阻塞,整個程式才會結束

除關閉通道外,寫入資料也可解除阻塞。
如果等待多個任務結束,推薦使用sync.WaitGroup。
通過設定計數器,讓每個goroutine在退出前遞減,直至歸零時解除阻塞。

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var wg sync.WaitGroup

	for i := 0; i < 10; i++ {
		wg.Add(1) //累加計數,每迴圈以此加一

		go func(id int) { //建立多個任務單元
			defer wg.Done() //遞減計數,這個任務計數之前減一

			time.Sleep(time.Second)
			fmt.Println("goroutine", id, "done")
		}(i)
	}

	fmt.Println("main...")
	wg.Wait() //阻塞,直至wg歸零
	fmt.Println("main exit")
}

/*
main...
goroutine 6 done
goroutine 1 done
goroutine 2 done
goroutine 4 done
goroutine 9 done
goroutine 8 done
goroutine 0 done
goroutine 7 done
goroutine 5 done
goroutine 3 done
main exit
*/

  

  儘管WaitGroup.Add實現了原子操作,但建議在goroutine外累加計數器,以免Add尚未執行,wait已經退出。

go func(id int) {
	wg.Add(1)
	defer wg.Done()
}

  

  可在多處使用wait阻塞,它們都可以接收到通知。

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var wg sync.WaitGroup
	wg.Add(1)

	go func() {
		wg.Wait()
		fmt.Println("wait exit")
	}()

	go func() {
		time.Sleep(time.Second * 3)
		fmt.Println("done.")
		wg.Done()
	}()

	wg.Wait()
	fmt.Println("main exit.")
}

/*
done.
wait exit
main exit.
*/

  

GOMAXPROCS 執行緒數量
執行時可能會建立很多執行緒,但任何時候僅有限的幾個執行緒參與併發任務執行。
該數量預設與處理器核數相等,可用runtime.GOMAXPROCS函式(或環境變數)修改。

package main

import (
	"fmt"
	"math"
	"runtime"
	"sync"
	"time"
)

//測試目標函式
func count() {
	x := 0
	for i := 0; i < math.MaxUint32; i++ {
		x += 1
	}

	fmt.Println(x)
}

//迴圈執行
func test(n int) {
	for i := 0; i < n; i++ {
		count()
	}
}

//併發執行
func test2(n int) {
	var wg sync.WaitGroup
	wg.Add(n) //計數為4

	for i := 0; i < n; i++ {
		go func() {
			count()
			wg.Done()
		}()
	}

	wg.Wait()
}

func main() {
	n := runtime.GOMAXPROCS(0) //4
	fmt.Println(time.Now())
	// test(n)  //6s
	test2(n) //3s
	fmt.Println(time.Now())
}

  

Local Storage 區域性儲存器
與執行緒不同,goroutine任務無法設定優先順序,無法獲取編號,沒有區域性儲存(TLS),甚至連返回值都會被拋棄。

package main

import (
	"fmt"
	"sync"
)

func main() {
	var wg sync.WaitGroup
	var gs [5]struct { //使用陣列來進行區域性儲存
		id     int //編號
		result int //返回值
	}
	fmt.Println(gs)
	for i := 0; i < len(gs); i++ {
		wg.Add(1)

		go func(id int) {
			defer wg.Done()

			gs[id].id = id
			gs[id].result = (id + 1) * 100
		}(i)
	}

	wg.Wait()
	fmt.Printf("%+v\n", gs)
}

/*
[{0 0} {0 0} {0 0} {0 0} {0 0}]
[{id:0 result:100} {id:1 result:200} {id:2 result:300} {id:3 result:400} {id:4 result:500}]
*/

  

  如果使用map作為區域性儲存容器,建議做同步處理,因為執行時會對其做併發讀寫檢查。


Gosched
暫停,釋放執行緒去執行其它任務。當前任務被放回佇列,等待下次排程時恢復執行

package main

import (
	"fmt"
	"runtime"
)

func main() {
	runtime.GOMAXPROCS(1)
	exit := make(chan struct{})

	go func() {  /任務a
		defer close(exit)

		go func() {  //任務b
			fmt.Println("b")
		}()

		for i := 0; i < 4; i++ {
			fmt.Println("a:", i)

			if i == 1 {  
				runtime.Gosched()  //讓出當前執行緒,排程執行b
			}
		}
	}()

	<-exit
}

/*
a: 0
a: 1
b  //這個b有可能打印不出來
a: 2
a: 3
*/

  

該函式很少被使用,因為執行時會主動向長時間執行的任務發出搶佔排程。

Goexit


立即終止當前任務,執行時確保所有已註冊延遲呼叫被執行。
該函式不會影響其它併發任務,不會引發panic,自然也就無法捕獲。

package main

import (
	"fmt"
	"runtime"
)

func main() {
	exit := make(chan struct{})

	go func() {
		defer close(exit)
		defer fmt.Println("a") //執行3

		func() {
			defer func() {
				fmt.Println("b", recover() == nil) //defer總會執行 2
			}()

			func() {
				fmt.Println("c")       //執行1
				runtime.Goexit()       //立即終止當前任務
				fmt.Println("c done.") //不會執行
			}()

			fmt.Println("b done.") //不會執行
		}()

		fmt.Println("a done.") //不會執行
	}()

	<-exit
	fmt.Println("main exit") //主程式
}

/*
c
b true
a
main exit
*/

  

如果在main裡呼叫Goexit,它會等其它任務結束,然後讓程序直接崩潰。

package main

import (
	"fmt"
	"runtime"
	"time"
)

func main() {
	for i := 0; i < 2; i++ {
		go func(x int) {
			for n := 0; n < 2; n++ {
				fmt.Printf("%c: %d\n", 'a'+x, n)
				time.Sleep(time.Millisecond)
			}
		}(i)
	}

	runtime.Goexit() //等待所有任務結束
	fmt.Println("main exit.")
}

/*
b: 0
a: 0
a: 1
b: 1
fatal error: no goroutines (main called runtime.Goexit) - deadlock!
*/

  

無論身處那一層,Goexit都能立即終止整個呼叫堆疊,這與return僅退出當前函式不同。
標準庫函式os.Exit可終止程序,但不會執行延遲呼叫。