1. 程式人生 > >libgo 原始碼剖析(2. libgo排程策略原始碼實現)

libgo 原始碼剖析(2. libgo排程策略原始碼實現)

本文將從原始碼實現上對 libgo 的排程策略進行分析,主要涉及到上一篇文章中的三個結構體的定義:

  • 排程器 Scheduler(簡稱 S)
  • 執行器 Processer(簡稱 P)
  • 協程 Task(簡稱 T)

三者的關係如下圖所示:

libgo 原始碼剖析(2. libgo排程策略原始碼實現)


本文會列出類內的主要成員和主要函式做以分析。


1. 協程排程器:class Scheduler

libgo/scheduler/scheduler.h

class Scheduler{
public:

    /*
    *  建立一個排程器,初始化 libgo
    *  建立主執行緒的執行器,如果後續 STart 的時候沒有引數,預設只有一個執行器去做
    *  當僅使用一個執行緒進行協程排程時, 協程地執行會嚴格地遵循其建立順序.
    * */
    static Scheduler* Create();

    /*
    * 建立一個協程 Task 物件,並新增到當前的執行器 processer 的任務佇列中,
    * 排程器的任務數 taskCount_ +1
    * */
    void CreateTask(TaskF const& fn, TaskOpt const& opt);

    /* 啟動排程器
    * @minThreadNumber : 最小排程執行緒數, 為0時, 設定為cpu核心數.
    * @maxThreadNumber : 最大排程執行緒數, 為0時, 設定為minThreadNumber.
    *          如果maxThreadNumber大於minThreadNumber, 則當協程產生長時間阻塞時,
    *          可以自動擴充套件排程執行緒數.
    *  喚醒定時器執行緒
    *  每個排程執行緒都會呼叫 Process 開始排程,最後開啟 id 為 0 的排程執行緒
    * 如果 maxThreadNumber_ > 1 的話,會開啟排程執行緒 DispatcherThread
    * */
    void Start(int minThreadNumber = 1, int maxThreadNumber = 0);

    /*
    *  停止排程,停止後無法恢復, 僅用於安全退出main函式
    *  如果某個排程執行緒被協程阻塞, 必須等待阻塞結束才能退出.
    * */
    void Stop();

private:
    /*
    *  排程執行緒,主要為平衡多個 processer 的負載將高負載或阻塞的 p 中的協程 steal 給低負載的 p
    *  如果全部阻塞但是還有協程待執行,會起新執行緒,執行緒數不超過
    maxThreadNumber_
    *  會將阻塞 P 中的協程分攤給負載較少的 P
    * */
    void DispatcherThread();

    /*
    *  建立一個新的 Processer,並新增到雙端佇列 processers_ 中
    * */
    void NewProcessThread();

private:
    atomic_t<uint32_t> taskCount_{0};   // 用來統計協程數量
    Deque<Processer*> processers_;    // DispatcherThread雙端佇列,用來存放所有的執行器,每個執行器都會單獨開一個執行緒去執行,執行緒中回撥 Process() 方法。
    LFLock started_;    // libgo 提供的自選鎖

};

排程器負責管理 1~N 個排程執行緒,每個排程執行緒一個執行器 Processer。排程器僅負責均衡各個執行器的負載,防止全部卡住的情況,並不涉及協程的切換等工作。

使用

ligbo提供了預設的協程排程器 co_sched

#define g_Scheduler ::co::Scheduler::getInstance()
#define co_sched g_Scheduler

使用者也可以建立自己的協程排程器

co::Scheduler* my_sched = co::Scheduler::Create();

啟動排程

std::thread t([my_sched]{mysched->Start();});
t.detach();

排程器原理

  1. schedule 負責整個系統的協程排程,協程的執行依賴於執行器 Processer(簡稱 P),因此在排程器初始化的時候會選擇建立 P 的數量(支援動態增長),所有的執行器會新增到雙端佇列中。主執行緒也作為一個執行器,在建立 Scheduler 物件的時候建立,位於雙端佇列下標為 0 的位置(注意:只是建立物件,並沒有開始執行);

  2. 當呼叫了 Start() 函式後,會正式開始執行。在 Start 函式內部,會建立指定數量的執行器 P,具體數量取決於引數,預設會建立 minThreadNumber 個,當全部執行器都阻塞之後,會動態擴充套件,最多 maxThreadNumber 個執行器。每個執行器都會運行於一個單獨的執行緒,執行器負責該執行緒內部協程的切換和執行;

  3. 當建立協程時,會將協程新增到某一個處於活躍狀態的執行器,如果恰好都不活躍,也會新增到某一個 P 中,這並不影響執行器的正常工作,因為排程器的排程執行緒會去處理它;

  4. Start 函式內部,除了上述執行器所線上程,還會開啟排程執行緒 DispatcherThread,排程執行緒會平衡各個 P 的協程數量和負載,進行 steal,如果所有 P 都阻塞,會根據 maxThreadNumber 動態增加 P 的數量,如果僅僅部分 P 阻塞,會將阻塞的 P 中的協程全部拿出(steal),均攤到負載最小的 P 中;

  5. Schedule 也會選擇性開啟協程的定時器執行緒;

  6. 開啟 FastSteadyClock 執行緒。

關於定時器以及時鐘的實現,會在之後的文章中討論。


2. 協程執行器:class Processer

libgo/scheduler/processer.h

每個協程執行器對應一個執行緒,負責本執行緒的協程排程,但並非執行緒安全的,是協程排程的核心。

class Processer
{
public:
    // 協程掛起標識,用於後續進行喚醒和超時判斷
    struct SuspendEntry {
             // ...
    };

    // 協程切出
    ALWAYS_INLINE static void StaticCoYield();

    // 掛起當前協程
    static SuspendEntry Suspend();

    // 掛起當前協程, 並在指定時間後自動喚醒
    static SuspendEntry Suspend(FastSteadyClock::duration dur);

    // 喚醒協程
    static bool Wakeup(SuspendEntry const& entry);

private:
    /*
    *  執行器對協程的排程,也是執行器所在現在的主處理邏輯
    * */
    void Process();

    /*
    * 從當前執行器中偷 n 個協程並返回
    * n 為0則全部偷出來,否則取出相應的個數
    * */
    SList<Task> Steal(std::size_t n);

private:
    int id_;    // 執行緒 id,與 shcedule 中的 _processer 下標對應
    Scheduler * scheduler_;     // 該執行器依賴的排程器
    volatile bool active_ = true;   // 該執行器的活躍狀態,活躍表明該執行器未被阻塞,由排程器的排程執行緒控制
    volatile uint64_t switchCount_ = 0; // 協程排程的次數

    // 當前正在執行的協程
    Task* runningTask_{nullptr};
    Task* nextTask_{nullptr};

    // 協程佇列
    typedef TSQueue<Task, true> TaskQueue;
    TaskQueue runnableQueue_;   // 執行協程佇列
    TaskQueue waitQueue_;   // 等待協程佇列
    TSQueue<Task, false> gcQueue_;  // 待回收的協程佇列,協程執行完畢之後,會被新增到該佇列中,等待回收
    TaskQueue newQueue_;    // 新新增到該執行器中的協程,包括剛剛 steal 過來的協程,該佇列中的協程暫不會執行,會由 Process() 函式將該佇列中的協程不斷新增到 runnableQueue_ 中

    volatile uint64_t switchCount_ = 0; // 協程排程的次數

    // 執行器等待的條件變數
    std::mutex cvMutex_;
    std::condition_variable cv_;
    std::atomic_bool waiting_{false};
};

// 通過條件變數,喚醒處於等待狀態但是有任務的 P
void NotifyCondition();

執行器對協程的排程 Process()

執行器 Processer 維護了三個執行緒安全的協程佇列:

  • runnableQueue_:可執行協程佇列;
  • waitQueue_:存放掛起的協程;
  • newQueue_:該佇列中存放的是新加入的協程,包括新建立的協程,喚醒掛起的協程,還有 steal 來的協程;

void Processer::Process()
{
    GetCurrentProcesser() = this;

    bool & isStop = *stop_;

    while (!isStop)
    {
        runnableQueue_.front(runningTask_);

        // 獲取一個可以執行對協程物件
        if (!runningTask_) {
            if (AddNewTasks())
                runnableQueue_.front(runningTask_);

            if (!runningTask_) {
                WaitCondition();    // 沒有可以執行的協程,wait 條件變數
                AddNewTasks();
                continue;
            }
        }

        addNewQuota_ = 1;
        while (runningTask_ && !isStop) {
            runningTask_->state_ = TaskState::runnable;
            runningTask_->proc_ = this;

            ++switchCount_;
            runningTask_->SwapIn();
            switch (runningTask_->state_) {
                case TaskState::runnable:
                    {
                        std::unique_lock<TaskQueue::lock_t> lock(runnableQueue_.LockRef());
                        auto next = (Task*)runningTask_->next;
                        if (next) {
                            runningTask_ = next;
                            runningTask_->check_ = runnableQueue_.check_;
                            break;
                        }

                        if (addNewQuota_ < 1 || newQueue_.emptyUnsafe()) {
                            runningTask_ = nullptr;
                        } else {
                            lock.unlock();
                            if (AddNewTasks()) {
                                runnableQueue_.next(runningTask_, runningTask_);
                                -- addNewQuota_;
                            } else {
                                std::unique_lock<TaskQueue::lock_t> lock2(runnableQueue_.LockRef());
                                runningTask_ = nullptr;
                            }
                        }
                    }
                    break;

                case TaskState::block:
                    {
                        std::unique_lock<TaskQueue::lock_t> lock(runnableQueue_.LockRef());
                        runningTask_ = nextTask_;
                        nextTask_ = nullptr;
                    }
                    break;

                case TaskState::done:
                default:
                    {
                        runnableQueue_.next(runningTask_, nextTask_);
                        if (!nextTask_ && addNewQuota_ > 0) {
                            if (AddNewTasks()) {
                                runnableQueue_.next(runningTask_, nextTask_);
                                -- addNewQuota_;
                            }
                        }

                        DebugPrint(dbg_task, "task(%s) done.", runningTask_->DebugInfo());
                        runnableQueue_.erase(runningTask_);
                        if (gcQueue_.size() > 16)    // 執行完畢的協程,需要回收資源
                            GC();
                        gcQueue_.push(runningTask_);
                        if (runningTask_->eptr_) {
                            std::exception_ptr ep = runningTask_->eptr_;
                            std::rethrow_exception(ep);
                        }

                        std::unique_lock<TaskQueue::lock_t> lock(runnableQueue_.LockRef());
                        runningTask_ = nextTask_;
                        nextTask_ = nullptr;
                    }
                    break;
            }
        }
    }
}

在排程器 Schedule 執行 Stop() 函式之前,執行器 P 會一直處於排程協程階段 Process()。在期間,執行器 P 會將執行佇列 runnableQueue 中的第一個協程獲取進行執行,如果可執行佇列為空,執行器會嘗試將處於 newQueue 中的協程新增到可執行佇列中去,如果 newQueue_ 為空,說明此時該執行器處於無協程可排程狀態,通過設定條件變數,將執行器設定為等待狀態;

當獲取到一個可執行協程之後,會執行該協程的任務。協程的執行流程是通過狀態機來實現的。(協程有三個狀態:執行中,阻塞,執行完畢)

  • 對於執行中的協程,我們只需要確定下一個要執行的協程物件即可;
  • 對於阻塞的協程,只有當協程掛起時(呼叫了 Suspend 方法),狀態才會切換到這裡,因此,這時候只需要去執行 nextTask 即可;
  • 對於執行完畢的協程,只有當 Task 處理函式執行完成之後,狀態才會切換到這裡,因此,需要考慮對該協程資源進行回收;

條件變數

Processer 使用了 std::mutex,並且提供了條件變數用來喚醒。當排程器嘗試獲取下一個可執行的協程物件時,若此時無可用協程物件,就會主動去等待該條件變數,預設100毫秒的超時時間。

void Processer::WaitCondition()
{
    GC();
    std::unique_lock<std::mutex> lock(cvMutex_);
    waiting_ = true;
    cv_.wait_for(lock, std::chrono::milliseconds(100));
    waiting_ = false;
}

當排程器向該執行器中增加了新的協程物件時,會喚醒該條件變數,繼續執行 Process 流程。使用條件變數喚醒的效率,要遠遠高於不斷去輪詢。

為什麼在使用了條件變數後還要設定超時時間,定時輪詢,即使條件變數沒有被喚醒也希望它返回呢?
因為我們不希望執行緒會在這裡阻塞,只要沒有新的協程加入,就一直在死等。我們希望執行緒在等待的同時,也可以定時跳出,執行一些其它的檢測工作等。

從執行器中偷指定數量的協程出來 -> steal()

簡單來說,從執行器中取協程出來,就是從執行器維護的雙端佇列中獲取執行個數的結點。

為什麼要取出來?前面提到過,要麼該執行器負載過大,要麼該執行器處於阻塞的狀態。

SList<Task> Processer::Steal(std::size_t n)
{
    if (n > 0) {
        // steal 指定個數協程
        newQueue_.AssertLink();
        auto slist = newQueue_.pop_back(n);
        newQueue_.AssertLink();
        if (slist.size() >= n)
            return slist;

        std::unique_lock<TaskQueue::lock_t> lock(runnableQueue_.LockRef());
        bool pushRunningTask = false, pushNextTask = false;
        if (runningTask_)
            pushRunningTask = runnableQueue_.eraseWithoutLock(runningTask_, true) || slist.erase(runningTask_, newQueue_.check_);
        if (nextTask_)
            pushNextTask = runnableQueue_.eraseWithoutLock(nextTask_, true) || slist.erase(nextTask_, newQueue_.check_);
        auto slist2 = runnableQueue_.pop_backWithoutLock(n - slist.size());
        if (pushRunningTask)
            runnableQueue_.pushWithoutLock(runningTask_);
        if (pushNextTask)
            runnableQueue_.pushWithoutLock(nextTask_);
        lock.unlock();

        slist2.append(std::move(slist));
        if (!slist2.empty())
            DebugPrint(dbg_scheduler, "Proc(%d).Stealed = %d", id_, (int)slist2.size());
        return slist2;
    } else {
        // steal all
        newQueue_.AssertLink();
        auto slist = newQueue_.pop_all();
        newQueue_.AssertLink();

        std::unique_lock<TaskQueue::lock_t> lock(runnableQueue_.LockRef());
        bool pushRunningTask = false, pushNextTask = false;
        if (runningTask_)
            pushRunningTask = runnableQueue_.eraseWithoutLock(runningTask_, true) || slist.erase(runningTask_, newQueue_.check_);
        if (nextTask_)
            pushNextTask = runnableQueue_.eraseWithoutLock(nextTask_, true) || slist.erase(nextTask_, newQueue_.check_);
        auto slist2 = runnableQueue_.pop_allWithoutLock();
        if (pushRunningTask)
            runnableQueue_.pushWithoutLock(runningTask_);
        if (pushNextTask)
            runnableQueue_.pushWithoutLock(nextTask_);
        lock.unlock();

        slist2.append(std::move(slist));
        if (!slist2.empty())
            DebugPrint(dbg_scheduler, "Proc(%d).Stealed all = %d", id_, (int)slist2.size());
        return slist2;
    }
}

首先,會從 newQueue 佇列中獲取協程結點,因為 newQueue 中的結點還沒有新增到執行佇列中,因此可以直接取出;如果 newQueue 中協程數量不足,會從 runnableQueue 佇列尾部中繼續獲取結點。由於 runnableQueue 佇列中我們記錄了正在執行的協程和下一次將執行的協程(runningTask & nextTask),需要特殊處理。在從 runnableQueue 偷協程之前,會將 runningTask & nextTask 從佇列刪除,待偷完結點之後再次新增到當前 runnableQueue_ 佇列中。

簡單說,偷協程的工作,不會從佇列中獲取到 runningTask & nextTask 標識的協程。

協程掛起 Suspend

static SuspendEntry Suspend();

一種方式是直接掛起,會將該協程狀態轉換為 TaskState::block,然後將該協程從 runnableQueue 中刪除,再新增到 waitQueue 中;

另外一種方式是掛起之後(第一種方式執行完畢之後),允許配置一個時間段之後去自動喚醒該協程。

wakeup

用於喚醒協程

喚醒協程要做的,就是講待喚醒的協程從 waitQueue_ 中刪除並重新新增到 newQueue_中去。

StaticCoYield

用於在一個執行器中切出當前協程

有兩種可能,一種是協程被阻塞需要掛起;另外一種是協程執行完畢,主動切出。

具體實現是通過獲取當前執行器正在執行的協程 Task,呼叫 SwapOut() 方法實現。

ALWAYS_INLINE void Processer::StaticCoYield()
{
    auto proc = GetCurrentProcesser();
    if (proc) proc->CoYield();
}

ALWAYS_INLINE void Processer::CoYield()
{
    Task *tk = GetCurrentTask();
    assert(tk);

    ++ tk->yieldCount_;

#if ENABLE_DEBUGGER
    DebugPrint(dbg_yield, "yield task(%s) state = %s", tk->DebugInfo(), GetTaskStateName(tk->state_));
    if (Listener::GetTaskListener())
        Listener::GetTaskListener()->onSwapOut(tk->id_);
#endif

    tk->SwapOut();
}

幾個需要注意的問題

> 可能會切出協程上下文的幾種情況:

  1. 協程被掛起;
  2. 協程執行完畢;
  3. 使用者主動切出 co_yield。
    #define co_yield do { ::co::Processer::StaticCoYield(); } while (0)

> 協程被掛起的幾種情況:

  1. 系統函式被 hook;
  2. libgo_poll (被 hook 的 io 操作函式會呼叫 libgo_poll 實現切換)
  3. select
  4. sleep、usleep、nanosleep
  5. 呼叫了協程鎖 CoMutex(co_mutex),協程讀寫鎖 CoRWMutex(co_rwmutex),或者使用了 channel。

> 切入協程上下文的幾種情況:

  1. 執行器在排程(Process)期間;
  2. 喚醒掛起協程不會切入上下文,只是從等待佇列中重新新增到 newQueue_。

3. 協程物件:struct Task

# 協程狀態
enum class TaskState
{
    runnable,   // 可執行
    block,      // 阻塞
    done,       // 協程執行完畢
};

typedef std::function<void()> TaskF;    // c++11提供的函式模板

struct Task
{
    TaskState state_ = TaskState::runnable;
    uint64_t id_;       // 當前排程器下協程編號,從0開始
    TaskF fn_;          // 協程執行的函式
    uint64_t yieldCount_ = 0;   // 協程切出的次數
    Context ctx_;       // 上下文資訊
    Processer* proc_ = nullptr;     // 歸屬於哪個執行器

    // 提供了協程切入、切出、切換到指定執行緒三個函式
    ALWAYS_INLINE void SwapIn();
    ALWAYS_INLINE void SwapTo(Task* other);
    ALWAYS_INLINE void SwapOut();

private:
    static void StaticRun(intptr_t vp);     // 引數為 Task*,函式會去執行該 Task 的 fn_(),執行完畢後,協程狀態改為 TaskState::done,並在執行器 P 中切出
};

每個 Task 物件是一個協程,在使用過程中,建立一個協程實際就是建立了一個 Task 物件,再新增到對應的執行器 P 中。之前提到過,執行器進行協程排程是通過一個狀態機來實現的,這裡的 TaskState 就是協程狀態,協程函式 fn_ 會在 StaticRun 靜態方法中呼叫,該靜態方法註冊到了協程上下文 _ctx 中。

除此之外,Task 類內部,也提供了協程的切入切出方法,本質也是呼叫了上下文的切換。

StaticRun

控制協程的執行,內部呼叫了 Task::Run() 方法,會在協程函式 fn_ 執行完畢之後,將協程狀態轉換為 TaskState::done,並將協程切出。

void Task::Run()
{
    auto call_fn = [this]() {
        this->fn_();
        this->fn_ = TaskF(); //讓協程function物件的析構也在協程中執行
    };

    \\ ...
        call_fn();
    \\ ...
    state_ = TaskState::done;
    Processer::StaticCoYield();
}

void Task::StaticRun(intptr_t vp)
{
    Task* tk = (Task*)vp;
    tk->Run();
}

這裡就是對 libgo 排程相關實現的描述,本文跳過了對定時器和時鐘部分的實現,這個會在之後單獨敘述。本文涉及到的程式碼在原始碼目錄下的

libgo-master/libgo/scheduler/processer.cpp   
libgo-master/libgo/scheduler/processer.h
libgo-master/libgo/scheduler/scheduler.cpp
libgo-master/libgo/scheduler/scheduler.h

有興趣的讀者可以對照原始碼學習,歡迎討論學習