Go語言開發(九)、Go語言並發編程
一、goroutine簡介
1、並發與並行簡介
並行(parallel):指在同一時刻,有多條指令在多個處理器上同時執行。
並發(concurrency):指在同一時刻只能有一條指令執行,但多個進程指令被快速的輪換執行,使得在宏觀上具有多個進程同時執行的效果,但在微觀上並不是同時執行的,只是把時間分成若幹段,使多個進程快速交替的執行。
並行在多處理器系統中存在,而並發可以在單處理器和多處理器系統中都存在,並發能夠在單處理器系統中存在是因為並發是並行的假象,並行要求程序能夠同時執行多個操作,而並發只是要求程序假裝同時執行多個操作(每個小時間片執行一個操作,多個操作快速切換執行)。?
2、Coroutine簡介
Coroutine(協程)是一種用戶態的輕量級線程,特點如下:
A、輕量級線程
B、非搶占式多任務處理,由協程主動交出控制權。
C、編譯器/解釋器/虛擬機層面的任務
D、多個協程可能在一個或多個線程上運行。
E、子程序是協程的一個特例。
不同語言對協程的支持:
A、C++通過Boost.Coroutine實現對協程的支持
B、Java不支持
C、Python通過yield關鍵字實現協程,Python3.5開始使用async def對原生協程的支持
3、goroutine簡介
在Go語言中,只需要在函數調用前加上關鍵字go即可創建一個並發任務單元,新建的任務會被放入隊列中,等待調度器安排。
go語言中並發指的是讓某個函數獨立於其它函數運行的能力,一個goroutine是一個獨立的工作單元,Go的runtime(運行時)會在邏輯處理器上調度goroutine來運行,一個邏輯處理器綁定一個操作系統線程,因此goroutine不是線程,是一個協程。
進程:一個程序對應一個獨立程序空間
線程:一個執行空間,一個進程可以有多個線程
邏輯處理器:執行創建的goroutine,綁定一個線程
調度器:Go運行時中的,分配goroutine給不同的邏輯處理器
全局運行隊列:所有剛創建的goroutine隊列
當創建一個goroutine後,會先存放在全局運行隊列中,等待Go運行時的調度器進行調度,把goroutine分配給其中的一個邏輯處理器,並放到邏輯處理器對應的本地運行隊列中,最終等著被邏輯處理器執行即可。
Go的並發是管理、調度、執行goroutine的方式。
默認情況下,Go默認會給每個可用的物理處理器都分配一個邏輯處理器。
可以在程序開頭使用runtime.GOMAXPROCS(n)設置邏輯處理器的數量。
如果需要設置邏輯處理器的數量,一般采用如下代碼設置:
runtime.GOMAXPROCS(runtime.NumCPU())
對於並發,Go語言本身自己實現的調度,對於並行,與物理處理器的核數有關,多核就可以並行並發,單核只能並發。
4、goroutinue使用示例
在Go語言中,只需要在函數調用前加上關鍵字go即可創建一個並發任務單元,新建的任務會被放入隊列中,等待調度器安排。
package main
import (
"fmt"
"sync"
)
func main(){
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < 10000; i++ {
fmt.Printf("Hello,Go.This is %d\n", i)
}
}()
go func() {
defer wg.Done()
for i := 0; i < 10000; i++ {
fmt.Printf("Hello,World.This is %d\n", i)
}
}()
wg.Wait()
}
sync.WaitGroup是一個計數的信號量,使main函數所在主線程等待兩個goroutine執行完成後再結束,否則兩個goroutine還在運行時,主線程已經結束。
sync.WaitGroup使用非常簡單,使用Add方法設設置計數器為2,每一個goroutine的函數執行完後,調用Done方法減1。Wait方法表示如果計數器大於0,就會阻塞,main函數會一直等待2個goroutine完成再結束。
5、goroutine的本質
goroutine是輕量級的線程,占用的資源非常小(Go將每個goroutine stack的size默認設置為2k)線程的切換由操作系統控制,而goroutine的切換則由用戶控制。
goroutinue本質上是協程。
?goroutinue可以實現並行,即多個goroutinue可以在多個處理器同時運行,而協程同一時刻只能在一個處理器上運行。
goroutine之間的通信是通過channel,而協程的通信是通過yield和resume()操作。
二、goroutine調度機制
1、線程調度模型
高級語言對內核線程的封裝實現通常有三種線程調度模型:
A、N:1模型。N個用戶空間線程在1個內核空間線程上運行,優勢是上下文切換非常快但無法利用多核系統的優點。
B、1:1模型。1個內核空間線程運行一個用戶空間線程,充分利用了多核系統的優勢但上下文切換非常慢,因為每一次調度都會在用戶態和內核態之間切換。
C、M:N模型。每個用戶線程對應多個內核空間線程,同時也可以一個內核空間線程對應多個用戶空間線程,使用任意個內核模型管理任意個goroutine,但缺點是調度的復雜性。
2、Go調度器簡介
Go的最小調度單元為goroutine,但操作系統最小的調度單元依然是線程,所以go調度器(go scheduler)要做的工作是如何將眾多的goroutine放在有限的線程上進行高效而公平的調度。
操作系統的調度不失為高效和公平,比如CFS調度算法。go引入goroutine的核心原因是goroutine輕量級,無論是從進程到線程,還是從線程到goroutine,其核心都是為了使調度單元更加輕量級,可以輕易創建幾萬幾十萬的goroutine而不用擔心內存耗盡等問題。go引入goroutine試圖在語言內核層做到足夠高性能得同時(充分利用多核優勢、使用epoll高效處理網絡/IO、實現垃圾回收等機制)盡量簡化編程。
3、Go調度器實現原理
?Go 1.1開始,Go scheduler實現了M:N的G-P-M線程調度模型,即任意數量的用戶態goroutine可以運行在任意數量的內核空間線程線程上,不僅可以使上線文切換更加輕量級,又可以充分利用多核優勢。
為了實現M:N線程調度機制,Go引入了3個結構體:
M:操作系統的內核空間線程
G:goroutine對象,G結構體包含調度一個goroutine所需要的堆棧和instruction pointer(IP指令指針),以及其它一些重要的調度信息。每次go調用的時候,都會創建一個G對象。
P:Processor,調度的上下文,實現M:N調度模型的關鍵,M必須拿到P才能對G進行調度,P限定了go調度goroutine的最大並發度。每一個運行的M都必須綁定一個P。
P的個數是GOMAXPROCS(最大256),啟動時固定,一般不修改;?M的個數和P的個數不一定相同(會有休眠的M或者不需要太多的M);每一個P保存著本地G任務隊列,也能使用全局G任務隊列。
全局G任務隊列會和各個本地G任務隊列按照一定的策略互相交換。
P是用一個全局數組(255)來保存的,並且維護著一個全局的P空閑鏈表。
每次調用go的時候,都會:
A、創建一個G對象,加入到本地隊列或者全局隊列
B、如果有空閑的P,則創建一個M
C、M會啟動一個底層線程,循環執行能找到的G任務
D、G任務的執行順序是先從本地隊列找,本地沒有則從全局隊列找(一次性轉移(全局G個數/P個數)個,再去其它P中找(一次性轉移一半)。
E、G任務執行是按照隊列順序(即調用go的順序)執行的。
創建一個M過程如下:
A、先找到一個空閑的P,如果沒有則直接返回。
B、調用系統API創建線程,不同的操作系統調用方法不一樣。
C、?在創建的線程裏循環執行G任務
如果一個系統調用或者G任務執行太長,會一直占用內核空間線程,由於本地隊列的G任務是順序執行的,其它G任務就會阻塞。因此,Go程序啟動的時候,會專門創建一個線程sysmon,用來監控和管理,sysmon內部是一個循環:
A、記錄所有P的G任務計數schedtick,schedtick會在每執行一個G任務後遞增。
B、如果檢查到?schedtick一直沒有遞增,說明P一直在執行同一個G任務,如果超過一定的時間(10ms),在G任務的棧信息裏面加一個標記。
C、G任務在執行的時候,如果遇到非內聯函數調用,就會檢查一次標記,然後中斷自己,把自己加到隊列末尾,執行下一個G。
D、如果沒有遇到非內聯函數(有時候正常的小函數會被優化成內聯函數)調用,會一直執行G任務,直到goroutine自己結束;如果goroutine是死循環,並且GOMAXPROCS=1,阻塞。
4、搶占式調度
Go沒有時間片的概念。如果某個G沒有進行system call調用、沒有進行I/O操作、沒有阻塞在一個channel操作上,M通過搶占式調度讓長任務G停下來並調度下一個G。
除非極端的無限循環或死循環,否則只要G調用函數,Go runtime就有搶占G的機會。Go程序啟動時,Go runtime會啟動一個名為sysmon的M(一般稱為監控線程),sysmon無需綁定P即可運行。sysmon是GO程序啟動時創建的一個用於監控管理的線程。
sysmon每20us~10ms啟動一次,sysmon主要完成如下工作:
A、釋放閑置超過5分鐘的span物理內存;
B、如果超過2分鐘沒有垃圾回收,強制執行;
C、將長時間未處理的netpoll結果添加到任務隊列;
D、向長時間運行的G任務發出搶占調度;
E、收回因syscall長時間阻塞的P;
如果一個G任務運行10ms,sysmon就會認為其運行時間太久而發出搶占式調度的請求。一旦G的搶占標誌位被設為true,那麽待G下一次調用函數或方法時,runtime便可以將G搶占,並移出運行狀態,放入P的local runq中,等待下一次被調度。
三、runtime包
1、Gosched
runtime.Gosched()用於讓出CPU時間片,讓出當前goroutine的執行權限,調度器安排其它等待的任務運行,並在下次某個時候從該位置恢復執行。
2、Goexit
調用runtime.Goexit()將立即終止當前goroutine執?,調度器確保所有已註冊defer延遲調用被執行。
3、GOMAXPROCS
調用runtime.GOMAXPROCS()用來設置可以並行計算的CPU核數的最大值,並返回設置前的值。
四、Channel通道
1、Channel簡介
Channel是goroutine之間通信的通道,用於goroutine之間發消息和接收消息。Channel是一種引用類型的數據,可以作為參數,也可以作為返回值。
2、Channel的創建
channel聲明使用chan關鍵字,channel的創建需要指定通道中發送和接收數據的類型。
使用make來建立一個信道:
var channel chan int = make(chan int)
// 或channel := make(chan int)
make有第二個參數,用於指定通道的大小。
3、Channel的操作
//發送數據:寫
channel<- data
//接收數據:讀
data := <- channel
關閉通道:發送方關閉通道,用於通知接收方已經沒有數據
關閉通道後,其它goroutine訪問通道獲取數據時,得到零值和false
有條件結束死循環:
for{
v ,ok := <- chan
if ok== false{
//通道已經關閉。。
break
}
}
//循環從通道中獲取數據,直到通道關閉。
for v := range channel{
//從通道讀取數據
}
Channel使用示例如下:
package main
import (
"fmt"
"time"
)
type Person struct {
name string
age uint8
address Address
}
type Address struct {
city string
district string
}
func SendMessage(person *Person, channel chan Person){
go func(person *Person, channel chan Person) {
fmt.Printf("%s send a message.\n", person.name)
channel<-*person
for i := 0; i < 5; i++ {
channel<- *person
}
close(channel)
fmt.Println("channel is closed.")
}(person, channel)
}
func main() {
channel := make(chan Person,1)
harry := Person{
"Harry",
30,
Address{"London","Oxford"},
}
go SendMessage(&harry, channel)
data := <-channel
fmt.Printf("main goroutine receive a message from %s.\n", data.name)
for {
i, ok := <-channel
time.Sleep(time.Second)
if !ok {
fmt.Println("channel is empty.")
break
}else{
fmt.Printf("receive %s\n",i.name)
}
}
}
結果如下:
Harry send a message.
main goroutine receive a message from Harry.
receive Harry
receive Harry
receive Harry
channel is closed.
receive Harry
receive Harry
channel is empty.
Go運行時系統並沒有在通道channel被關閉後立即把false作為相應接收操作的第二個結果,而是等到接收端把已在通道中的所有元素值都接收到後才這樣做,確保在發送端關閉通道的安全性。
被關閉的通道會禁止數據流入, 是只讀的,仍然可以從關閉的通道中取出數據,但不能再寫入數據。
給一個nil的channel發送數據,造成永遠阻塞?;從一個nil的channel接收數據,造成永遠阻塞。給一個已經關閉的channel發送數據,引起panic?;
從一個已經關閉的channel接收數據,返回帶緩存channel中緩存的值,如果通道中無緩存,返回0。
4、無緩沖通道
make創建通道時,默認沒有第二個參數,通道的大小為0,稱為無緩沖通道。
無緩沖的通道是指通道的大小為0,即通道在接收前沒有能力保存任何值,無緩沖通道發送goroutine和接收gouroutine必須是同步的,如果沒有同時準備好,先執行的操作就會阻塞等待,直到另一個相對應的操作準備好為止。無緩沖通道也稱為同步通道。
無緩沖的信道永遠不會存儲數據,只負責數據的流通。從無緩沖信道取數據,必須要有數據流進來才可以,否則當前goroutine會阻塞;數據流入無緩沖信道, 如果沒有其它goroutine來拿取走數據,那麽當前goroutine會阻塞。
package main
import (
"fmt"
)
func main() {
ch := make(chan int)
go func() {
var sum int = 0
for i := 0; i < 10; i++ {
sum += i
}
//發送數據到通道
ch <- sum
}()
//從通道接收數據
fmt.Println(<-ch)
}
在計算sum和的goroutine沒有執行完,將值賦發送到ch通道前,fmt.Println(<-ch)會一直阻塞等待,main函數所在的主goroutine就不會終止,只有當計算和的goroutine完成後,並且發送到ch通道的操作準備好後,main函數的<-ch會接收計算好的值,然後打印出來。
無緩存通道的發送數據和讀取數據的操作不能放在同一個協程中,防止發生死鎖。通常,先創建一個goroutine對通道進行操作,此時新創建goroutine會阻塞,然後再在主goroutine中進行通道的反向操作,實現goroutine解鎖,即必須goroutine在前,解鎖goroutine在後。
5、有緩沖通道
make創建通道時,指定通道的大小時,稱為有緩沖通道。
對於帶緩存通道,只要通道中緩存不滿,可以一直向通道中發送數據,直到緩存已滿;同理只要通道中緩存不為0,可以一直從通道中讀取數據,直到通道的緩存變為0才會阻塞。
相對於不帶緩存通道,帶緩存通道不易造成死鎖,可以同時在一個goroutine中放心使用。
帶緩存通道不僅可以流通數據,還可以緩存數據,當帶緩存通道達到滿的狀態的時候才會阻塞,此時帶緩存通道不能再承載更多的數據。
帶緩存通道是先進先出的。
6、單向通道
對於某些特殊的場景,需要限制一個通道只可以接收,不能發送;限制一個通道只能發送,不能接收。只能單向接收或發送的通道稱為單向通道。
定義單向通道只需要在定義的時候,帶上<-即可。
var send chan<- int //只能發送
var receive <-chan int //只能接收
<-操作符的位置在後面只能發送,對應發送操作;<-操作符的位置在前面只能接收,對應接收操作。
單向通道通常用於函數或者方法的參數。
五、channel應用
1、廣播功能實現
當一個通道關閉時, 所有對此通道的讀取的goroutine都會退出阻塞。
package main
import (
"fmt"
"time"
)
func notify(id int, channel chan int){
<-channel//接收到數據或通道關閉時退出阻塞
fmt.Printf("%d receive a message.\n", id)
}
func broadcast(channel chan int){
fmt.Printf("Broadcast:\n")
close(channel)//關閉通道
}
func main(){
channel := make(chan int,1)
for i:=0;i<10 ;i++ {
go notify(i,channel)
}
go broadcast(channel)
time.Sleep(time.Second)
}
2、select使用
select用於在多個channel上同時進行偵聽並收發消息,當任何一個case滿足條件時即執行,如果沒有可執行的case則會執行default的case,如果沒有指定default case,則會阻塞程序。select的語法如下:
select {
case communication clause :
statement(s);
case communication clause :
statement(s);
/*可以定義任意數量的 case */
default : /*可選 */
statement(s);
}
Select多路復用中:
A、每個case都必須是一次通信
B、所有channel表達式都會被求值
C、所有被發送的表達式都會被求值
D、如果任意某個通信可以進行,它就執行;其它被忽略。
E、如果有多個case都可以運行,Select會隨機公平地選出一個執行。其它不會執行。
F、否則,如果有default子句,則執行default語句。如果沒有default子句,select將阻塞,直到某個通信可以運行;Go不會重新對channel或值進行求值。
package main
import (
"fmt"
"time"
)
func doWork(channels *[10]chan int){
for {
select {
case x1 := <-channels[0]:
fmt.Println("receive x1: ",x1)
case x2 := <-channels[1]:
fmt.Println("receive x2: ",x2)
case x3 := <-channels[2]:
fmt.Println("receive x3: ",x3)
case x4 := <-channels[3]:
fmt.Println("receive x4: ",x4)
case x5 := <-channels[4]:
fmt.Println("receive x5: ",x5)
case x6 := <-channels[5]:
fmt.Println("receive x6: ",x6)
case x7 := <-channels[6]:
fmt.Println("receive x7: ",x7)
case x8 := <-channels[7]:
fmt.Println("receive x8: ",x8)
case x9 := <-channels[8]:
fmt.Println("receive x9: ",x9)
case x10 := <-channels[9]:
fmt.Println("receive x10: ",x10)
}
}
}
func main(){
var channels [10]chan int
go doWork(&channels)
for i := 0; i < 10; i++ {
channels[i] = make(chan int,1)
channels[i]<- i
}
time.Sleep(time.Second*5)
}
結果如下:
receive x4: 3
receive x10: 9
receive x9: 8
receive x5: 4
receive x2: 1
receive x7: 6
receive x8: 7
receive x1: 0
receive x3: 2
receive x6: 5
六、死鎖
Go程序中死鎖是指所有的goroutine在等待資源的釋放。
通常,死鎖的報錯信息如下:fatal error: all goroutines are asleep - deadlock!
Goroutine死鎖產生的原因如下:
A、只在單一的goroutine裏操作無緩沖信道,一定死鎖
B、非緩沖信道上如果發生流入無流出,或者流出無流入,會導致死鎖
因此,解決死鎖的方法有:
A、取走無緩沖通道的數據或是發送數據到無緩沖通道
B、使用緩沖通道
Go語言開發(九)、Go語言並發編程