基於windows fiber的協程(coroutine)實現
阿新 • • 發佈:2018-11-23
一個非常簡單,但是實用的協程實現,使用Windows的*Fiber
函式族(linux可以稍微改一下用*context
函式族)。
fco.h
#ifndef _MSC_VER #error "this fast coroutine library only supports MSVC building chain" #endif #include <Windows.h> #include <cstdint> #include <map> namespace fco { static constexpr int ERR_NOT_EXIST_CO = -1; enum Status { READY, // Set up to READY when fco::newco() called AWAIT, // Set up to AWAIT when fco::yield() called }; struct Scheduler; struct Coroutine; struct Coroutine { void (*task)(Scheduler*, void*); void* userData; char status; LPVOID winFiber; }; struct Scheduler { std::map<int, Coroutine*> coroutines; int currentIdx; LPVOID main; }; void __stdcall __entry(LPVOID lpParameter); Scheduler* initialize(); void destroy(Scheduler* s); int newco(Scheduler* scheduler, void (*task)(Scheduler*, void*), void* userData); void resume(Scheduler* s, int coid); void yield(Scheduler* scheduler); int current(Scheduler* scheduler); } // namespace fco
fco.cpp
#include "fco.h" // Initialize fco library, return a global scheduler fco::Scheduler* fco::initialize() { Scheduler* sched = new Scheduler; sched->currentIdx = ERR_NOT_EXIST_CO; sched->main = ConvertThreadToFiber(NULL); return sched; } // Release all resources void fco::destroy(Scheduler* s) { for (auto& c : s->coroutines) { DeleteFiber(c.second->winFiber); } delete s; } // This is should NEVER BE called on user land void __stdcall fco::__entry(LPVOID lpParameter) { // Execute the task of current coroutine Scheduler* s = (Scheduler*)lpParameter; Coroutine* currentCo = s->coroutines[s->currentIdx]; (currentCo->task)(s, currentCo->userData); // Clean up executed task s->coroutines.erase(s->coroutines.find(s->currentIdx)); s->currentIdx = ERR_NOT_EXIST_CO; currentCo->status = Status::READY; DeleteFiber(currentCo->winFiber); delete currentCo; // Switch to entry function SwitchToFiber(s->main); } // Create new coroutine and return an unique identity int fco::newco(Scheduler* scheduler, void (*task)(fco::Scheduler*, void*), void* userData) { Coroutine* co = new Coroutine; co->task = task; co->userData = userData; co->winFiber = CreateFiber(0, __entry, scheduler); if (co->winFiber == NULL) { return ERR_NOT_EXIST_CO; } co->status = Status::READY; int newCoId = scheduler->coroutines.size() != 0 ? scheduler->coroutines.end().operator--().operator*().first + 1 : 0; scheduler->coroutines.insert(std::make_pair(newCoId, co)); return newCoId; } // Resume suspended coroutine by given coid void fco::resume(fco::Scheduler* scheduler, int coid) { if (coid < 0) { return; } Coroutine* co = scheduler->coroutines[coid]; if (co->status == Status::READY || co->status == Status::AWAIT) { scheduler->currentIdx = coid; scheduler->coroutines[scheduler->currentIdx]->status = Status::AWAIT; co->status = Status::READY; SwitchToFiber(co->winFiber); } } // Yield CPU time to main coroutine void fco::yield(fco::Scheduler* scheduler) { Coroutine* co = scheduler->coroutines[scheduler->currentIdx]; co->status = Status::AWAIT; scheduler->currentIdx = ERR_NOT_EXIST_CO; SwitchToFiber(scheduler->main); } // Get current running coroutine identity int fco::current(Scheduler* scheduler) { return scheduler->currentIdx; }
example
- hello world
#include <cstdlib> #include <ctime> #include <iostream> #include <vector> #include "fco.h" void bar(fco::Scheduler* s, void* param) { for (int i = 0; i < 5; i++) { std::cout << "world\n"; fco::yield(s); } } int main() { fco::Scheduler* s = fco::initialize(); int barFunc = fco::newco(s, bar, nullptr); for (int i = 0; i < 5; i++) { std::cout << "hello\n"; fco::resume(s, barFunc); } fco::destroy(s); return 0; }
- 生產者消費者模型
#include <cstdlib>
#include <ctime>
#include <iostream>
#include <vector>
#include "fco.h"
std::vector<int> vec;
void producer(fco::Scheduler* s, void* param) {
while (vec.size() < 10) {
int resource = rand();
std::cout << "Producing " << resource << "\n";
vec.push_back(resource);
}
fco::resume(s, (int)param);
fco::yield(s);
}
void consumer(fco::Scheduler* s, void* param) {
int producerCo = fco::newco(s, producer, (void*)fco::current(s));
while (true) {
while (!vec.empty()) {
int resource = vec.back();
vec.pop_back();
std::cout << "Consuming " << resource << "\n";
}
fco::resume(s, producerCo);
}
}
void factory() {
fco::Scheduler* s = fco::initialize();
int consumerCo = fco::newco(s, consumer, nullptr);
fco::resume(s, consumerCo);
fco::destroy(s);
}
int main() {
srand((int)time(0));
factory();
system("pause");
return 0;
}