Go併發排程進階-GMP初始化,最難啃的有時候耐心看完還是很簡單的
Go併發排程進階-【公粽號:堆疊future】
2. GMP初始化
1. M的初始化
M 只有自旋和非自旋兩種狀態。自旋的時候,會努力找工作;找不到的時候會進入非自旋狀態,之後會休眠,直到有工作需要處理時,被其他工作執行緒喚醒,又進入自旋狀態。
//src/runtime/proc.go funcmcommoninit(mp*m,idint64){ _g_:=getg() ... lock(&sched.lock) ... //random初始化,用於竊取G mp.fastrand[0]=uint32(int64Hash(uint64(mp.id),fastrandseed)) mp.fastrand[1]=uint32(int64Hash(uint64(cputicks()),^fastrandseed)) ifmp.fastrand[0]|mp.fastrand[1]==0{ mp.fastrand[1]=1 } //建立用於訊號處理的gsignal,只是簡單的從堆上分配一個g結構體物件,然後把棧設定好就返回了 mpreinit(mp) ifmp.gsignal!=nil{ mp.gsignal.stackguard1=mp.gsignal.stack.lo+_StackGuard } //把M掛入全域性連結串列allm之中 mp.alllink=allm ... }
這裡傳入的 id 是-1,初次呼叫會將 id 設定為 0,這裡並未對M0做什麼關於排程相關的初始化,所以可以簡單的認為這個函式只是把M0放入全域性連結串列allm之中就返回了。當然這個M0就是主M。
2. P的初始化
-
通常情況下(在程式執行時不調整 P 的個數),P 只會在上圖中的四種狀態下進行切換。當程式剛開始執行進行初始化時,所有的 P 都處於
_Pgcstop
狀態, 隨著 P 的初始化(runtime.procresize),會被置於_Pidle
。 -
當 M 需要執行時,會 runtime.acquirep 來使 P 變成
Prunning
狀態,並通過 runtime.releasep 來釋放。 -
當 G 執行時需要進入系統呼叫,P 會被設定為
_Psyscall
, 如果這個時候被系統監控搶奪(runtime.retake),則 P 會被重新修改為_Pidle
。 -
如果在程式執行中發生 GC,則 P 會被設定為
_Pgcstop
, 並在 runtime.startTheWorld 時重新調整為_Prunning
。
varallp[]*p funcprocresize(nprocsint32)*p{ //獲取先前的P個數 old:=gomaxprocs //更新統計資訊 now:=nanotime() ifsched.procresizetime!=0{ sched.totaltime+=int64(old)*(now-sched.procresizetime) } sched.procresizetime=now //根據runtime.MAXGOPROCS調整p的數量,因為runtime.MAXGOPROCS使用者可以自行設定 ifnprocs>int32(len(allp)){ lock(&allpLock) ifnprocs<=int32(cap(allp)){ allp=allp[:nprocs] }else{ nallp:=make([]*p,nprocs) copy(nallp,allp[:cap(allp)]) allp=nallp } unlock(&allpLock) } //初始化新的P fori:=old;i<nprocs;i++{ pp:=allp[i] //為空,則申請新的P物件 ifpp==nil{ pp=new(p) } pp.init(i) atomicstorep(unsafe.Pointer(&allp[i]),unsafe.Pointer(pp)) } _g_:=getg() //P不為空,並且id小於nprocs,那麼可以繼續使用當前P if_g_.m.p!=0&&_g_.m.p.ptr().id<nprocs{ //continuetousethecurrentP _g_.m.p.ptr().status=_Prunning _g_.m.p.ptr().mcache.prepareForSweep() }else{ //釋放當前P,因為已失效 if_g_.m.p!=0{ _g_.m.p.ptr().m=0 } _g_.m.p=0 p:=allp[0] p.m=0 p.status=_Pidle //P0繫結到當前的M0 acquirep(p) } //從未使用的P釋放資源 fori:=nprocs;i<old;i++{ p:=allp[i] p.destroy() //不能釋放p本身,因為他可能在m進入系統呼叫時被引用 } //釋放完P之後重置allp的長度 ifint32(len(allp))!=nprocs{ lock(&allpLock) allp=allp[:nprocs] unlock(&allpLock) } varrunnablePs*p //將沒有本地任務的P放到空閒連結串列中 fori:=nprocs-1;i>=0;i--{ p:=allp[i] //當前正在使用的P略過 if_g_.m.p.ptr()==p{ continue } //設定狀態為_Pidle p.status=_Pidle //P的任務列表是否為空 ifrunqempty(p){ //放入到空閒列表中 pidleput(p) }else{ //獲取空閒M繫結到P上 p.m.set(mget()) // p.link.set(runnablePs) runnablePs=p } } stealOrder.reset(uint32(nprocs)) varint32p*int32=&gomaxprocs//makecompilercheckthatgomaxprocsisanint32 atomic.Store((*uint32)(unsafe.Pointer(int32p)),uint32(nprocs)) returnrunnablePs }
procresize方法的執行過程如下:
-
allp 是全域性變數 P 的資源池,如果 allp 的切片中的處理器數量少於期望數量,會對切片進行擴容;
-
擴容的時候會使用 new 申請一個新的 P ,然後使用 init 初始化,需要注意的是初始化的 P 的 id 就是傳入的 i 的值,狀態為 _Pgcstop;
-
然後通過 g.m.p 獲取 M0,如果 M0 已與有效的 P 繫結上,則將 被繫結的 P 的狀態修改為 _Prunning。否則獲取 allp[0] 作為 P0 呼叫 runtime.acquirep 與 M0 進行繫結;
-
超過處理器個數的 P 通過p.destroy釋放資源,p.destroy會將與 P 相關的資源釋放,並將 P 狀態設定為 _Pdead;
-
通過截斷改變全域性變數 allp 的長度保證與期望處理器數量相等;
-
遍歷 allp 檢查 P 的是否處於空閒狀態,是的話放入到空閒列表中;
3. G的初始化
這是G的狀態流轉圖,G的初始化相當複雜,需要大家下去對照原始碼再看一遍。
funcnewproc(sizint32,fn*funcval){
//從fn的地址增加一個指標的長度,從而獲取第一引數地址
argp:=add(unsafe.Pointer(&fn),sys.PtrSize)
gp:=getg()
pc:=getcallerpc()//獲取呼叫方PC/IP暫存器值
//用g0系統棧建立Goroutine物件
//傳遞的引數包括fn函式入口地址,argp引數起始地址,siz引數長度,gp(g0),呼叫方pc(goroutine)
systemstack(func(){
newg:=newproc1(fn,argp,siz,gp,pc)
_p_:=getg().m.p.ptr()//獲取p
runqput(_p_,newg,true)//並將其放入P本地佇列的隊頭或全域性佇列
//檢查空閒的 P,將其喚醒,準備執行 G,但我們目前處於初始化階段,主 Goroutine 尚未開始執行,因此這裡不會喚醒 P。
ifmainStarted{
wakep()
}
})
}
//建立一個執行 fn 的新 g,具有 narg 位元組大小的引數,從 argp 開始。
// callerps 是 go 語句的起始地址。新建立的 g 會被放入 g 的佇列中等待執行。
funcnewproc1(fn*funcval,argpunsafe.Pointer,nargint32,callergp*g,callerpcuintptr)*g{
_g_:=getg()//因為是在系統棧執行所以此時的g為g0
acquirem()//禁止搶佔
iffn==nil{
_g_.m.throwing=-1//donotdumpfullstacks
throw("goofnilfuncvalue")
}
...
siz:=narg
siz=(siz+7)&^7
...
//當前工作執行緒所繫結的p
//初始化時_p_=g0.m.p,也就是_p_=allp[0]
_p_:=_g_.m.p.ptr()
//從p的本地緩衝裡獲取一個沒有使用的g,初始化時為空,返回nil
newg:=gfget(_p_)
ifnewg==nil{
//建立一個擁有_StackMin大小的棧的g
newg=malg(_StackMin)
//將新建立的g從_Gidle更新為_Gdead狀態
casgstatus(newg,_Gidle,_Gdead)
//將Gdead狀態的g新增到allg,這樣GC不會掃描未初始化的棧
allgadd(newg)
}
ifnewg.stack.hi==0{
throw("newproc1:newgmissingstack")
}
ifreadgstatus(newg)!=_Gdead{
throw("newproc1:newgisnotGdead")
}
...
//確定sp位置
sp:=newg.stack.hi-totalSize
//確定引數入棧位置
spArg:=sp
...
ifnarg>0{
//將引數從執行newproc函式的棧拷貝到新g的棧
memmove(unsafe.Pointer(spArg),argp,uintptr(narg))
...
}
//設定newg的排程相關資訊
memclrNoHeapPointers(unsafe.Pointer(&newg.sched),unsafe.Sizeof(newg.sched))
newg.sched.sp=sp
newg.stktopsp=sp
newg.sched.pc=funcPC(goexit)+sys.PCQuantum
newg.sched.g=guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched,fn)
newg.gopc=callerpc
newg.ancestors=saveAncestors(callergp)
newg.startpc=fn.fn
if_g_.m.curg!=nil{
newg.labels=_g_.m.curg.labels
}
ifisSystemGoroutine(newg,false){
atomic.Xadd(&sched.ngsys,+1)
}
//設定g的狀態為_Grunnable,可以運行了
casgstatus(newg,_Gdead,_Grunnable)
//設定goid
newg.goid=int64(_p_.goidcache)
_p_.goidcache++
...
releasem(_g_.m)//恢復搶佔本質上是加鎖
returnnewg
}
建立 G 的過程也是相對比較複雜的,我們來總結一下這個過程:
-
首先嚐試從 P 本地 gfree 連結串列或全域性 gfree 佇列獲取已經執行過的 g
-
初始化過程中程式無論是本地佇列還是全域性佇列都不可能獲取到 g,因此建立一個新的 g,併為其分配執行執行緒(執行棧),這時 g 處於 _Gidle 狀態
-
建立完成後,g 被更改為 _Gdead 狀態,並根據要執行函式的入口地址和引數,初始化執行棧的 SP 和引數的入棧位置,並將需要的引數拷貝一份存入執行棧中
-
根據 SP、引數,在 g.sched 中儲存 SP 和 PC 指標來初始化 g 的執行現場
-
將呼叫方、要執行的函式的入口 PC 進行儲存,並將 g 的狀態更改為 _Grunnable
-
給 Goroutine 分配 id,並將其放入 P 本地佇列的隊頭或全域性佇列(初始化階段佇列肯定不是滿的,因此不可能放入全域性佇列)
-
檢查空閒的 P,將其喚醒,準備執行 G,但我們目前處於初始化階段,主 Goroutine 尚未開始執行,因此這裡不會喚醒 P。
4. 小結
結合上篇GMP結構和這篇的初始化,那麼對於GMP的排程我相信會有一定的理解,起碼你知道了底層的一些點點滴滴,後續文章還會給大家繼續講解GMP排程,你堅持學習到最後發現,最難啃的GMP也就那麼回事!
公粽號:堆疊future
使很多處於迷茫階段的coder能從這裡找到光明,堆疊創世,功在當代,利在千秋