Python 協程與多工排程
協程與多工排程
時間 2016-03-31 23:02:15 IT技術部落格大學習
原文
在電腦科學中,多工(multitasking)是指在同一個時間段內執行多個任務,現代計算機作為一個複雜的系統,執行的任務往往不止一個,所以多工排程對於計算機來說尤為重要。現階段多工排程主要分為搶佔式多工和協作式多工,搶佔式多工由作業系統決定程序的排程方案,而協作式多工是當前任務主動放棄執行後,下一個任務繼續進行。由於協作式任務管理受惡意程式的威脅更大,現階段幾乎所有的計算機都採用搶佔式多工管理。
現階段,主要靠多程序或多執行緒的方式來實現多工:
#include <stdio.h>
#include <unistd.h>
int main()
{
pid_t pid;
pid = fork();
if(pid < 0){
printf("Fork Error!\n");
}else if (pid > 0){
printf("This is the parent Process! Process Id is %d, Child id is %d\n",getpid(),pid);
int i = 0;
while(i < 10){
printf("This is parent Process output of i %d
i++;
}
}else if (pid == 0){
printf("This is the child Process! Process Id is %d, parent id is %d\n",getpid(),getppid());
int j = 0;
while(j < 10){
printf("This is child Process output of j %d\n",j);
j++;
}
}
return 0;
}
在 《協程與yield》 中,我們說到了協程是一種比程序和執行緒更加輕量級的解決方案,也通過yield實現了協程,但最大的疑問是沒有提供像程序或執行緒類的任務排程,沒有體現出協程的優勢,下面我們來實現一個簡單的協程和協作式的多工排程。
首先我們需要對任務(Task)進行包裝:
class Task():
def __init__(self,taskid,coroutine):
self.__taskId = taskid
self.__coroutine = coroutine
self.__sendValue = ''
self.__beforeFirstYield = True
self.isFinished = False
def getTaskId(self):
return self.__taskId
def setValue(self,value):
self.__sendValue == value
def run(self):
if(self.__beforeFirstYield):
self.__beforeFirstYield = False
return self.__coroutine.next()
else:
try:
retval = self.__coroutine.send(self.__sendValue)
return retval
except StopIteration:
self.isFinished = True
return ""
這裡的“任務”類似系統的程序,有ID,有傳送給使用者程式的訊息sendValue.
接下來需要一個任務排程器,專門用來管理任務:
from Queue import Queue
class Scheduler():
def __init__(self):
self.taskQueue = Queue()
self.maxTaskId = 0
self.taskMap = dict()
def scheduler(self,task):
self.taskQueue.put(task)
def newTask(self,coroutine):
self.maxTaskId+=1
task = Task(self.maxTaskId,coroutine)
self.taskMap[self.maxTaskId] = task
self.scheduler(task)
return self.maxTaskId
def KillTask(self,taskid):
if not taskid in self.taskMap:
return False
i = 0
while i < self.taskQueue.qsize():
tmp = self.taskQueue.get()
if tmp == self.taskMap[taskid]:
del self.taskMap[taskid]
break
else:
self.scheduler(tmp)
i+=1
return True
def run(self):
while not self.taskQueue.empty():
task = self.taskQueue.get()
retval = task.run()
if task.isFinished:
tid = task.getTaskId()
del self.taskMap[tid]
else:
self.scheduler(task)
任務排程器是系統最核心的功能,相當於Linux中的init程式,用來管理所有的系統任務。其它任務通過註冊到任務排程器來實現其功能:
def task1():
i = 0
while i < 10:
print "This is task 1 i is %s"%i
i+=1
yield
def task2():
i = 0
while i < 10:
print "This is task 2 i is %s"%i
i+=1
yield
sch = Scheduler()
sch.newTask(task1())
sch.newTask(task2())
sch.run()
其結果輸出如下,可以看出任務一和任務二確實是交替執行,實現了任務排程的功能
This is task 1 i is 0
This is task 2 i is 0
This is task 1 i is 1
This is task 2 i is 1
This is task 1 i is 2
This is task 2 i is 2
This is task 1 i is 3
This is task 2 i is 3
This is task 1 i is 4
This is task 2 i is 4
This is task 1 i is 5
This is task 2 i is 5
This is task 1 i is 6
This is task 2 i is 6
This is task 1 i is 7
This is task 2 i is 7
This is task 1 i is 8
This is task 2 i is 8
This is task 1 i is 9
This is task 2 i is 9
上面我們實現多個任務的排程,它們能夠很好的交替執行,yield在這裡實現上提供類一個類似 中斷 的功能,一旦系統出現yield,排程器會自動呼叫另外的任務繼續執行。
然而,在上面的例子中,一但我們把任務提交給排程器,對程式就沒有了控制權,必須要等到任務執行結束。我們需要對任務有必要的控制權,如獲取任務ID,結束任務,複製任務等等,這裡需要用到和排程器的通訊,這裡就用到了yield的進行傳值。類似Linux一樣,我們可以給任務提供一些函式介面,任務通過yield把需要呼叫的函式傳給排程器,排程器返回結果給任務,如下:
def task3():
pid = yield getpid()
print "This taskid is %d"%pid
i = 0
while i < 10:
print "This is task 3 i is %d"%i
yield
要實現上面的呼叫,可以新增一個系統呼叫類:
class SysCall():
def __init__(self,callback):
self.__callback= callback
def __call__(self,task,schedular):
if not isinstance(task,Task):
raise TypeError(task.__name__+" is not instance of Task")
self.__callback(task,schedular)
然後對Scheduler類的run方法作出更改
def run(self):
while not self.taskQueue.empty():
task = self.taskQueue.get()
retval = task.run()
if isinstance(retval,SysCall):
retval(task,self)
continue
if task.isFinished:
tid = task.getTaskId()
del self.taskMap[tid]
else:
self.scheduler(task)
然後新增供任務使用的介面函式:
def getpid():
def tmp(task,schedular):
task.setValue(task.getTaskId())
schedular.scheduler(task)
return SysCall(tmp)
def Killpid():
def tmp(task,scheduler):
task.setValue(scheduler.KillTask(taskid))
return SysCall(tmp)
def fork():
pass
這裡實現SysCall的主要目的是方便排程器對傳遞過去的函式型別進行控制,為了系統安全考慮,防止使用者提交危險函式破壞系統,不屬於SysCall類的函式一律不以執行。
至此,我們實現了一個完整協程任務排程器,而不是利用yield進行簡單的資料傳遞,yield是如此好用,以至於很多語言都逐漸加入對其的支援,如PHP5.5開始加入yield,javascript 6(ECMAScript 6)也加入了對其的支援,雖然其使用起來有一些區別,但是原理是相通的,深入理解協程和yield,對於理解任務排程,系統原理意義重大。
檔案下載
參考資料