1. 程式人生 > 其它 >Go併發排程進階-GMP初始化,最難啃的有時候耐心看完還是很簡單的

Go併發排程進階-GMP初始化,最難啃的有時候耐心看完還是很簡單的

Go併發排程進階-【公粽號:堆疊future】

請移步到這裡看

2. GMP初始化

1. M的初始化

M 只有自旋和非自旋兩種狀態。自旋的時候,會努力找工作;找不到的時候會進入非自旋狀態,之後會休眠,直到有工作需要處理時,被其他工作執行緒喚醒,又進入自旋狀態。

//src/runtime/proc.go

funcmcommoninit(mp*m,idint64){
_g_:=getg()
...
lock(&sched.lock)
...
//random初始化,用於竊取G
mp.fastrand[0]=uint32(int64Hash(uint64(mp.id),fastrandseed))
mp.fastrand[1]=uint32(int64Hash(uint64(cputicks()),^fastrandseed))
ifmp.fastrand[0]|mp.fastrand[1]==0{
mp.fastrand[1]=1
}

//建立用於訊號處理的gsignal,只是簡單的從堆上分配一個g結構體物件,然後把棧設定好就返回了
mpreinit(mp)
ifmp.gsignal!=nil{
mp.gsignal.stackguard1=mp.gsignal.stack.lo+_StackGuard
}

//把M掛入全域性連結串列allm之中
mp.alllink=allm
...
}

這裡傳入的 id 是-1,初次呼叫會將 id 設定為 0,這裡並未對M0做什麼關於排程相關的初始化,所以可以簡單的認為這個函式只是把M0放入全域性連結串列allm之中就返回了。當然這個M0就是主M。

2. P的初始化

  • 通常情況下(在程式執行時不調整 P 的個數),P 只會在上圖中的四種狀態下進行切換。當程式剛開始執行進行初始化時,所有的 P 都處於 _Pgcstop 狀態, 隨著 P 的初始化(runtime.procresize),會被置於 _Pidle

  • 當 M 需要執行時,會 runtime.acquirep 來使 P 變成 Prunning 狀態,並通過 runtime.releasep 來釋放。

  • 當 G 執行時需要進入系統呼叫,P 會被設定為 _Psyscall, 如果這個時候被系統監控搶奪(runtime.retake),則 P 會被重新修改為 _Pidle

  • 如果在程式執行中發生 GC,則 P 會被設定為 _Pgcstop, 並在 runtime.startTheWorld 時重新調整為 _Prunning

varallp[]*p

funcprocresize(nprocsint32)*p{
//獲取先前的P個數
old:=gomaxprocs
//更新統計資訊
now:=nanotime()
ifsched.procresizetime!=0{
sched.totaltime+=int64(old)*(now-sched.procresizetime)
}
sched.procresizetime=now
//根據runtime.MAXGOPROCS調整p的數量,因為runtime.MAXGOPROCS使用者可以自行設定
ifnprocs>int32(len(allp)){
lock(&allpLock)
ifnprocs<=int32(cap(allp)){
allp=allp[:nprocs]
}else{
nallp:=make([]*p,nprocs)
copy(nallp,allp[:cap(allp)])
allp=nallp
}
unlock(&allpLock)
}

//初始化新的P
fori:=old;i<nprocs;i++{
pp:=allp[i]
//為空,則申請新的P物件
ifpp==nil{
pp=new(p)
}
pp.init(i)
atomicstorep(unsafe.Pointer(&allp[i]),unsafe.Pointer(pp))
}

_g_:=getg()
//P不為空,並且id小於nprocs,那麼可以繼續使用當前P
if_g_.m.p!=0&&_g_.m.p.ptr().id<nprocs{
//continuetousethecurrentP
_g_.m.p.ptr().status=_Prunning
_g_.m.p.ptr().mcache.prepareForSweep()
}else{
//釋放當前P,因為已失效
if_g_.m.p!=0{
_g_.m.p.ptr().m=0
}
_g_.m.p=0
p:=allp[0]
p.m=0
p.status=_Pidle
//P0繫結到當前的M0
acquirep(p)
}
//從未使用的P釋放資源
fori:=nprocs;i<old;i++{
p:=allp[i]
p.destroy()
//不能釋放p本身,因為他可能在m進入系統呼叫時被引用
}
//釋放完P之後重置allp的長度
ifint32(len(allp))!=nprocs{
lock(&allpLock)
allp=allp[:nprocs]
unlock(&allpLock)
}
varrunnablePs*p
//將沒有本地任務的P放到空閒連結串列中
fori:=nprocs-1;i>=0;i--{
p:=allp[i]
//當前正在使用的P略過
if_g_.m.p.ptr()==p{
continue
}
//設定狀態為_Pidle
p.status=_Pidle
//P的任務列表是否為空
ifrunqempty(p){
//放入到空閒列表中
pidleput(p)
}else{
//獲取空閒M繫結到P上
p.m.set(mget())
//
p.link.set(runnablePs)
runnablePs=p
}
}
stealOrder.reset(uint32(nprocs))
varint32p*int32=&gomaxprocs//makecompilercheckthatgomaxprocsisanint32
atomic.Store((*uint32)(unsafe.Pointer(int32p)),uint32(nprocs))
returnrunnablePs
}

procresize方法的執行過程如下:

  1. allp 是全域性變數 P 的資源池,如果 allp 的切片中的處理器數量少於期望數量,會對切片進行擴容;

  2. 擴容的時候會使用 new 申請一個新的 P ,然後使用 init 初始化,需要注意的是初始化的 P 的 id 就是傳入的 i 的值,狀態為 _Pgcstop;

  3. 然後通過 g.m.p 獲取 M0,如果 M0 已與有效的 P 繫結上,則將 被繫結的 P 的狀態修改為 _Prunning。否則獲取 allp[0] 作為 P0 呼叫 runtime.acquirep 與 M0 進行繫結;

  4. 超過處理器個數的 P 通過p.destroy釋放資源,p.destroy會將與 P 相關的資源釋放,並將 P 狀態設定為 _Pdead;

  5. 通過截斷改變全域性變數 allp 的長度保證與期望處理器數量相等;

  6. 遍歷 allp 檢查 P 的是否處於空閒狀態,是的話放入到空閒列表中;

3. G的初始化

這是G的狀態流轉圖,G的初始化相當複雜,需要大家下去對照原始碼再看一遍。

funcnewproc(sizint32,fn*funcval){
//從fn的地址增加一個指標的長度,從而獲取第一引數地址
argp:=add(unsafe.Pointer(&fn),sys.PtrSize)
gp:=getg()
pc:=getcallerpc()//獲取呼叫方PC/IP暫存器值

//用g0系統棧建立Goroutine物件
//傳遞的引數包括fn函式入口地址,argp引數起始地址,siz引數長度,gp(g0),呼叫方pc(goroutine)
systemstack(func(){
newg:=newproc1(fn,argp,siz,gp,pc)

_p_:=getg().m.p.ptr()//獲取p
runqput(_p_,newg,true)//並將其放入P本地佇列的隊頭或全域性佇列

//檢查空閒的 P,將其喚醒,準備執行 G,但我們目前處於初始化階段,主 Goroutine 尚未開始執行,因此這裡不會喚醒 P。
ifmainStarted{
wakep()
}
})
}

//建立一個執行 fn 的新 g,具有 narg 位元組大小的引數,從 argp 開始。
// callerps 是 go 語句的起始地址。新建立的 g 會被放入 g 的佇列中等待執行。
funcnewproc1(fn*funcval,argpunsafe.Pointer,nargint32,callergp*g,callerpcuintptr)*g{
_g_:=getg()//因為是在系統棧執行所以此時的g為g0
acquirem()//禁止搶佔
iffn==nil{
_g_.m.throwing=-1//donotdumpfullstacks
throw("goofnilfuncvalue")
}

...
siz:=narg
siz=(siz+7)&^7


...

//當前工作執行緒所繫結的p
//初始化時_p_=g0.m.p,也就是_p_=allp[0]
_p_:=_g_.m.p.ptr()
//從p的本地緩衝裡獲取一個沒有使用的g,初始化時為空,返回nil
newg:=gfget(_p_)
ifnewg==nil{
//建立一個擁有_StackMin大小的棧的g
newg=malg(_StackMin)
//將新建立的g從_Gidle更新為_Gdead狀態
casgstatus(newg,_Gidle,_Gdead)
//將Gdead狀態的g新增到allg,這樣GC不會掃描未初始化的棧
allgadd(newg)
}
ifnewg.stack.hi==0{
throw("newproc1:newgmissingstack")
}

ifreadgstatus(newg)!=_Gdead{
throw("newproc1:newgisnotGdead")
}

...
//確定sp位置
sp:=newg.stack.hi-totalSize
//確定引數入棧位置
spArg:=sp

...

ifnarg>0{
//將引數從執行newproc函式的棧拷貝到新g的棧
memmove(unsafe.Pointer(spArg),argp,uintptr(narg))
...
}

//設定newg的排程相關資訊
memclrNoHeapPointers(unsafe.Pointer(&newg.sched),unsafe.Sizeof(newg.sched))
newg.sched.sp=sp
newg.stktopsp=sp
newg.sched.pc=funcPC(goexit)+sys.PCQuantum
newg.sched.g=guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched,fn)
newg.gopc=callerpc
newg.ancestors=saveAncestors(callergp)
newg.startpc=fn.fn
if_g_.m.curg!=nil{
newg.labels=_g_.m.curg.labels
}
ifisSystemGoroutine(newg,false){
atomic.Xadd(&sched.ngsys,+1)
}

//設定g的狀態為_Grunnable,可以運行了
casgstatus(newg,_Gdead,_Grunnable)

//設定goid
newg.goid=int64(_p_.goidcache)
_p_.goidcache++

...
releasem(_g_.m)//恢復搶佔本質上是加鎖
returnnewg
}

建立 G 的過程也是相對比較複雜的,我們來總結一下這個過程:

  1. 首先嚐試從 P 本地 gfree 連結串列或全域性 gfree 佇列獲取已經執行過的 g

  2. 初始化過程中程式無論是本地佇列還是全域性佇列都不可能獲取到 g,因此建立一個新的 g,併為其分配執行執行緒(執行棧),這時 g 處於 _Gidle 狀態

  3. 建立完成後,g 被更改為 _Gdead 狀態,並根據要執行函式的入口地址和引數,初始化執行棧的 SP 和引數的入棧位置,並將需要的引數拷貝一份存入執行棧中

  4. 根據 SP、引數,在 g.sched 中儲存 SP 和 PC 指標來初始化 g 的執行現場

  5. 將呼叫方、要執行的函式的入口 PC 進行儲存,並將 g 的狀態更改為 _Grunnable

  6. 給 Goroutine 分配 id,並將其放入 P 本地佇列的隊頭或全域性佇列(初始化階段佇列肯定不是滿的,因此不可能放入全域性佇列)

  7. 檢查空閒的 P,將其喚醒,準備執行 G,但我們目前處於初始化階段,主 Goroutine 尚未開始執行,因此這裡不會喚醒 P。

4. 小結

結合上篇GMP結構和這篇的初始化,那麼對於GMP的排程我相信會有一定的理解,起碼你知道了底層的一些點點滴滴,後續文章還會給大家繼續講解GMP排程,你堅持學習到最後發現,最難啃的GMP也就那麼回事!

公粽號:堆疊future

使很多處於迷茫階段的coder能從這裡找到光明,堆疊創世,功在當代,利在千秋