1. 程式人生 > 實用技巧 >【轉】雲風版協程庫原始碼分析

【轉】雲風版協程庫原始碼分析

原文:https://www.cnblogs.com/motadou/p/12667166.html

_________________________________________

雲風版協程庫原始碼分析

我們知道一個程式可以包含多個程序,每個程序中可以建立多個執行緒,線上程中又可以建立成千上萬甚至更多個協程。程序和執行緒的建立以及排程需要在核心態和使用者態之間切換;而協程的建立和排程都在使用者態,不需要和核心態進行互動。所以這就註定建立和維持協程執行所犧牲的效能,要遠小於程序和執行緒。另外,協程都是以一組的形態存在於一個特定的執行緒內,那麼對於資料的共享,不必使用互斥鎖或者條件變數,來保證互斥和同步,應用程式效能上也有了很大的提升。這就是我們使用協程的原因。

協程適用於IO密集型,而不適用於計算密集型的程式。對於IO密集型程式,無論是讀取socket還是硬碟,這些操作基本上都是阻塞式呼叫,當協程遇到阻塞時,當前協程顯式或者隱式主動放棄控制權,儲存當前協程的硬體上下文和棧,然後排程器切換到其他就緒的協程繼續執行,而當阻塞IO完成後,排程器獲得通知,恢復原來協程的硬體上下文以及棧,再切換回來執行。而對於計算密集型的程式,當前協程除非顯式切換協程或者設定定時器,由定時器主動引起切換,否則通常不會主動放棄控制權,其他協程可能會一直等待排程,得不到執行。

一組協程執行在一個執行緒內,它們是序列執行的,而非並行,即是執行在一個CPU核上,那麼協程就無法利用多核CPU資源。如果我們既想使用協程,又想利用多核CPU,一般我們就採用”多程序+協程“的方式。

目前網上有很多協程的實現例子,本文主要分析雲風的協程庫,來探究協程的實現原理。原始碼之前了無祕密,大家也可以直接看協程庫的
註釋版
。 協程庫的實現方式 總體來說,目前有如下幾種方式來實現協程庫。
第一種:利用ucontext函式族來切換執行時上下文。比如雲風協程庫
第二種:利用匯編語言來切換執行時上下文。比如微信的libco
第三種:利用C語言語法switch-case來實現切換執行時上下文。比如Protothreads
第四種:利用C語言的setjmp和longjmp。

雲風版協程庫簡單來說,核心就是使用ucontext函式族不停的切換當前執行緒的執行時上下文,導致切換到不同的協程函式中去,以達到在同一個執行緒內切換協程的目的。無論協程怎麼切換,不會引起所屬執行緒的切換。 ucontext函式族說明
0001 0002 0003 0004 0005 0006 0007 0008 0009 #include <ucontext.h> intgetcontext(ucontext_t *ucp); intsetcontext(constucontext_t *ucp); voidmakecontext(ucontext_t *ucp, void(*func)(), intargc, ...); intswapcontext(ucontext_t *oucp, ucontext_t *ucp);
int  getcontext(ucontext_t *ucp);
該函式獲取當前執行時上下文,將其儲存到ucp中。
int  setcontext(const ucontext_t *ucp);
設定當前執行時上下文為ucp。
void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...);
該函式的作用是修改一個用getcontext()獲取的ucontext_t例項,也就是說ucp是在呼叫makecontext之前由getcontext初始化過的值。如果從字面上理解,覺得makecontext可以新建一個ucontext_t,但實際它僅做修改,所以它叫updatecontext顯然更加合適。makecontext用引數func指標和argc,以及後續的可變引數,來修改ucp。當這個ucp被setcontext或者swapcontext之後,執行流跳轉到func指向的新函式中去。
int  swapcontext(ucontext_t *oucp, ucontext_t *ucp);
該函式將當前的執行時上下文儲存到oucp中,然後切換到ucp所指向的上下文。

這4個函式都用到了ucontext_t結構體,它用來儲存執行時上下文,包括執行時各個暫存器的值、執行時棧、訊號等等資料。它大致的結構為:
0001 0002 0003 0004 0005 0006 0007 typedefstructucontext { structucontext *uc_link; sigset_t uc_sigmask; stack_t uc_stack; mcontext_t uc_mcontext; ... } ucontext_t;
其中:
uc_link :當前上下文執行結束時,系統恢復到uc_link指向的上下文;如果該值為NULL,則執行緒退出;
uc_stack :當前上下文中使用的棧;
uc_sigmask :當前上下文中的阻塞訊號集;
uc_mcontext:儲存的上下文的特定機器資料,包括呼叫執行緒的特定暫存器等;
雲風版協程庫的實現原理 首先我們看下該協程協程庫中的協程的狀態切換圖,圖中表明瞭幾個重要的函式:
由上述狀態圖,我們可以知道雲風版協程庫的使用方法:
  • 第一:使用者在主協程中呼叫coroutine_new來建立一個子協程,新建協程的狀態為COROUTINE_READY,表示協程就緒,等待排程。
  • 第二:在合適的時機,使用者在主協程中顯式呼叫coroutine_resume(),將某個新建協程投入執行。
    此時的coroutine_resume將協程狀態由COROUTINE_READY轉為COROUTINE_RUNNING,函式會相繼呼叫getcontext、makecontext、swapcontext,分別完成獲取當前上下文、製作新上下文、從主協程切換到子協程這三個動作。呼叫swapcontext之後,當前執行緒的控制權交給子協程。
    任一時刻,在一個執行緒內只有一個協程在執行,其他協程要麼是就緒態,要麼是掛起態。
  • 第三:在子協程中,使用者顯式呼叫coroutine_yeild()放棄控制權。
    此時的coroutine_yeild將協程狀態由COROUTINE_RUNNING轉為COROUTINE_SUSPEND狀態,函式會儲存當前子協程的上下文和執行時棧,然後呼叫swapcontext從子協程切換回主協程。呼叫swapcontext之後,當前執行緒的控制權又回到主協程。
  • 第四:控制權切回主協程之後,如果有新建的子協程,根據“第二”步的描述將其投入執行;
  • 第五:控制權切回主協程之後,如果有COROUTINE_SUSPEND狀態的協程,主協程根據排程演算法再次呼叫coroutine_resume()將其投入執行。
    此時的coroutine_resume將協程狀態由COROUTINE_SUSPEND轉為COROUTINE_RUNNING,函式會先恢復子協程的執行時棧,然後呼叫swapcontext從主協程切換到子協程。當前執行緒的控制權移交給子協程。
  • 第六:如果子協程的函式體執行完畢退出,再次切回主協程。恢復主協程的硬體上下文和執行時棧,執行排程子程式,或者在所有子程式都結束的情況下,主協程也退出。
參照上述描述,我們看下雲風的示例程式:
0001 0002 0003 0004 0005 0006 0007 0008 0009 0010 0011 0012 0013 0014 0015 0016 0017 0018 0019 0020 0021 0022 0023 0024 0025 0026 0027 0028 0029 0030 0031 0032 0033 0034 0035 0036 0037 0038 0039 0040 0041 0042 0043 0044 0045 0046 0047 0048 0049 #include "coroutine.h" #include <stdio.h> structargs { intn; }; staticvoidfoo(structschedule * S, void*ud) { structargs * arg = ud; intstart = arg->n; inti; for(i = 0; i < 5; i++) { printf("coroutine %d : %d\n",coroutine_running(S) , start + i); coroutine_yield(S); // 切出當前協程 } } staticvoidtest(structschedule *S) { structargs arg1 = { 0 }; structargs arg2 = { 100 }; intco1 = coroutine_new(S, foo, &arg1); // 建立協程 intco2 = coroutine_new(S, foo, &arg2); // 建立協程 printf("main start\n"); while(coroutine_status(S,co1) && coroutine_status(S,co2)) { coroutine_resume(S, co1); // 恢復協程1的執行 coroutine_resume(S, co2); // 恢復協程2的執行 } printf("main end\n"); } intmain() { structschedule * S = coroutine_open(); // 建立一個排程器,用來管理所有子協程 test(S); coroutine_close(S); // 關閉排程器 return0; }


struct schedule協程排程器
0001 0002 0003 0004 0005 0006 0007 0008 0009 0010 0011 0012 0013 0014 #define STACK_SIZE (1024*1024) // 預設的協程執行時棧大小 structcoroutine; // 排程器結構體 structschedule { charstack[STACK_SIZE]; // 協程執行時棧(被所有協程共享) ucontext_t main; // 主協程的上下文 intnco; // 當前存活的協程數量 intcap; // 排程器中的協程容器的最大容量。後期可以根據需要進行擴容。 intrunning; // 當前正在執行的協程ID structcoroutine **co; // 排程器中的協程容器 };
其中ucontext_t main用來儲存主協程的上下文,看全部原始碼,它在兩個地方被儲存。
  • 第一次是在coroutine_resume函式中,將協程從COROUTINE_READY轉為COROUTINE_RUNNING。參考原始碼corontine.c中第167行。
  • 第二次也是在coroutine_resume函式中,將狀態從COROUTINE_SUSPEND轉為COROUTINE_RUNNING。參考原始碼corontine.c中第174行。
這兩次均是呼叫swapcontext來儲存主協程的上下文到main中。

其中char stack[STACK_SIZE]用來做所有子協程的執行時棧,看全部程式碼,它在一個地方被儲存,一個地方被恢復。
    • 被儲存的地方是在coroutine_yield函式中,在從子協程切換到主協程之前,再次呼叫_save_stack將當前的執行時棧儲存到協程結構體中的棧快取中。參考原始碼corontine.c中第204行。
    • 被恢復的地方是在coroutine_resume函式中,將狀態從COROUTINE_SUSPEND轉為COROUTINE_RUNNING時。他將棧內容從協程結構體中的棧快取,拷貝到S->stack中,我們知道S->stack被所有子協程用作執行時棧。參考原始碼corontine.c中第171行。

struct coroutine協程結構體
0001 0002 0003 0004 0005 0006 0007 0008 0009 0010 0011 0012 // 協程結構體 structcoroutine { coroutine_func func; // 協程所執行的函式 void* ud; // 協程引數 ucontext_t ctx; // 當前協程的上下文 structschedule * sch; // 當前協程所屬的排程器 ptrdiff_tcap; // 當前棧快取的最大容量 ptrdiff_tsize; // 當前棧快取的大小 intstatus; // 當前協程的執行狀態(即:COROUTINE_{DEAD,READY,RUNNING,SUSPEND}這四種狀態其一) char* stack; // 當前協程切出時儲存下來的執行時棧 };
其中coroutine_func func是子協程真實執行的函式體,在此函式中完成業務主邏輯。實際上子協程先進入mainfunc這個函式,在這個函式再呼叫func進入真實函式。參考原始碼corontine.c中第166行。

其中ucontext_t ctx是當前子協程的執行時上下文,看全部原始碼它在一個地方被儲存,在兩個地方被恢復。
  • 被儲存的地方是在coroutine_yield函式中,儲存完當前子協程的棧後,呼叫swapcontext儲存當前子協程的執行時上下文到ctx中,然後恢復主協程的上下文。參考原始碼roroutine.c中第207行。
  • 被恢復的地方都是在coroutine_resume函式中,參考原始碼coroutine.c中的第167、174行。


協程的恢復coroutine_resume
0143 0144 0145 0146 0147 0148 0149 0150 0151 0152 0153 0154 0155 0156 0157 0158 0159 0160 0161 0162 0163 0164 0165 0166 0167 0168 0169 0170 0171 0172 0173 0174 0175 0176 0177 0178 0179 voidcoroutine_resume(structschedule * S, intid) { assert(S->running == -1); assert(id >=0 && id < S->cap); structcoroutine *C = S->co[id]; if(C == NULL) return; intstatus = C->status; switch(status) { caseCOROUTINE_READY: getcontext(&C->ctx); // 初始化結構體,將當前的上下文放到C->ctx中 C->ctx.uc_stack.ss_sp = S->stack; // 設定當前協程的執行時棧頂,每個協程都共享S->stack C->ctx.uc_stack.ss_size = STACK_SIZE; // 設定當前協程的執行時棧大小 C->ctx.uc_link = &S->main; // 設定後繼上下文,協程執行完畢後,切換到S->main指向的上下文中執行 // 如果該值設定為NULL,那麼協程執行完畢後,整個程式退出 C->status = COROUTINE_RUNNING; // 設定當前協程狀態為執行中 S->running = id; // 設定當前執行協程的ID uintptr_tptr = (uintptr_t)S; // 設定當待執行協程的執行函式體,以及所需引數 makecontext(&C->ctx, (void(*)(void))mainfunc, 2, (uint32_t)ptr, (uint32_t)(ptr>>32)); swapcontext(&S->main, &C->ctx); // 將當前上下文放到S->main中,再將C->ctx設定為當前的上下文 break; caseCOROUTINE_SUSPEND: // 將原來儲存的棧資料,拷貝到當前執行時棧中,恢復原執行時棧 memcpy(S->stack + STACK_SIZE - C->size, C->stack, C->size); S->running = id; C->status = COROUTINE_RUNNING; swapcontext(&S->main, &C->ctx); break; default: assert(0); } }


協程的切出coroutine_yield
0181 0182 0183 0184 0185 0186 0187 0188 0189 0190 0191 0192 0193 0194 0195 0196 0197 0198 0199 0200 0201 0202 0203 0204 0205 0206 0207 0208 staticvoid_save_stack(structcoroutine *C, char*top) { chardummy = 0; assert(top - &dummy <= STACK_SIZE); // top - &dummy表示當前協程所用的執行時棧的大小 if(C->cap < top - &dummy) // 如果協程結構體中棧空間小於所需空間大小,則重新分配記憶體空間 { free(C->stack); // 釋放老的棧快取區 C->cap = top - &dummy; // 設定新的棧快取區最大容量 C->stack = malloc(C->cap); // 重新分配棧快取區 } C->size = top - &dummy; // 設定新的棧快取區大小 memcpy(C->stack, &dummy, C->size); // 將當前的執行時棧的資料,儲存到協程中的資料快取區中 } voidcoroutine_yield(structschedule * S) { intid = S->running; // 獲得當前執行協程的id assert(id >= 0); structcoroutine * C = S->co[id]; assert((char*)&C > S->stack); _save_stack(C, S->stack + STACK_SIZE); // 儲存當前子協程的執行時棧,到協程私有棧快取中 C->status = COROUTINE_SUSPEND; // 設定為掛起狀態 S->running = -1; swapcontext(&C->ctx , &S->main); // 將當前執行時棧儲存到ctx中,並且切換到S->main所指向的上下文中 }
顯而易見,在coroutine_yield函式中有個關鍵步驟就是儲存當前執行時棧,它呼叫_save_stack來完成。接下來我們看下_save_stack的實現原理。要儲存下當前執行時棧,我們首先需要知道當前子協程用了多少棧空間。然後根據棧空間來開闢當前子協程中的私有棧快取,也就是struct coroutine結構體中char * stack資料域。我們知道棧空間是由高地址向下使用的,在makecontext設定棧資訊時,我們將最大棧頂設定為S->stack,那麼其棧底為S->Stack+ STACK_SIZE。在_save_stack中,我們先在棧中申明一個char型別的dummy,則dummy表示當前已使用棧空間的棧頂為(char *)&dummy。由此我們可以得出已使用棧空間大小,既可以精確的分配空間,而不至於在每個協程結構體中開闢一個STACK_SIZE大小的快取區,從而節省了空間。
參考 1.https://blog.csdn.net/qq910894904/article/details/41911175
2.https://github.com/zfengzhen/Blog/blob/master/article/ucontext簇函式學習.md
3.http://www.ilovecpp.com/2018/12/19/coroutine/ 分類:後臺開發指南