python包之drmaa:叢集任務管理
目錄
搭建流程時,我們把各個模組指令碼都寫好了,現在通過編寫主程式將模組串起來,那麼怎麼樣依次(或者並行)將任務自動投遞到叢集呢?就是說這一步執行完之後,下一步自動執行。我們當然可以在指令碼中設一個標誌,反覆檢查這一個標誌是否出現來決定是否執行下一步,但這種方法太原始,太多弊端了,耗記憶體,無法並行,且不可預料的出錯。那麼,有沒有相應的工具來管理叢集任務投遞?有,python的drmaa包可以實現。
1. drmaa簡介
Distributed Resource Management Application API
C、C++、Perl、Python等程式語言都開發有相應的drmaa包來實現SGE叢集的任務管理。這裡記錄下drmaa-python:
Github:drmaa-python
PyPi:https://pypi.org/project/drmaa/
2. 安裝和配置
要求:Python2.7+;與DRMAA相容的叢集,如SGE。
#安裝
pip install drmaa
#設定路徑
export SGE_ROOT=/path/to/gridengine #SGE安裝的路徑
export SGE_CELL=default
#設定庫
export DRMAA_LIBRARY_PATH=/usr/lib/libdrmaa.so.1.0
#libdrmaa.so.1.0 C動態庫,是libdrmaa-dev包的一部分
3. 示例
3.1 開始和終止會話
Session
#!/usr/bin/env python import drmaa def main(): """Create a drmaa session and exit""" with drmaa.Session() as s: #自動初始化,組織工作提交 print('A session was started successfully') #with結束自動exit(),大部分函式都要在exit()前執行,如runJob/wait,getContact可在exit()後。 if __name__=='__main__': main()
使用可重新連線的會話,可以將DRMAA庫初始化為上一個會話,從而允許該庫訪問該會話的作業列表.
#!/usr/bin/env python
import drmaa
def main():
"""
Create a session, show that each session has an ID, use session ID to
disconnect, then reconnect. Finally, exit.
"""
s = drmaa.Session()
s.initialize()
print('A session was started successfully')
response = s.contact
print('session contact returns: %s' % response)
s.exit()
print('Exited from session')
s.initialize(response) #初始化上個session
print('Session was restarted successfullly')
s.exit()
if __name__=='__main__':
main()
3.2 執行工作
假設已知當前目錄有一個sleeper.sh
指令碼,後接兩個引數:
#!/bin/bash
echo "Hello world, the answer is $1"
sleep 3s
echo "$2 Bye world!"
drmaa將sleeper.sh提交到SGE:
#!/usr/bin/env python
import drmaa
import os
def main():
"""
Submit a job.
Note, need file called sleeper.sh in current directory.
"""
with drmaa.Session() as s:
print('Creating job template')
jt = s.createJobTemplate() #分配工作模板(儲存提交作業的資訊結構)
jt.remoteCommand = os.path.join(os.getcwd(), 'sleeper.sh') #設定remoteCommand屬性,找到要執行的程式。
#路徑預設為使用者的主目錄,相對路徑用workingDirectory屬性
jt.args = ['42', 'Simon says:'] #執行檔案的引數
jt.joinFiles=True
jobid = s.runJob(jt) #將分配給作業的ID放入我們傳遞給的字元陣列中runJob()
print('Your job has been submitted with ID %s' % jobid)
# jobid = s.runBulkJobs(jt, 1, 30, 2) #提交一個數組作業
# print('Your jobs have been submitted with IDs %s' % jobid)
print('Cleaning up')
s.deleteJobTemplate(jt) #刪除作業模板,釋放作業模板保留的DRMAA記憶體,但對提交的作業沒有影響
if __name__=='__main__':
main()
3.3 等待工作
即等待任務完成
#!/usr/bin/env python
import drmaa
import os
def main():
"""
Submit a job and wait for it to finish.
Note, need file called sleeper.sh in home directory.
"""
with drmaa.Session() as s:
print('Creating job template')
jt = s.createJobTemplate()
jt.remoteCommand = os.path.join(os.getcwd(), 'sleeper.sh')
jt.args = ['42', 'Simon says:']
jt.joinFiles = True
jobid = s.runJob(jt)
print('Your job has been submitted with ID %s' % jobid)
retval = s.wait(jobid, drmaa.Session.TIMEOUT_WAIT_FOREVER) #呼叫wait()等待作業結束
print('Job: {0} finished with status {1}'.format(retval.jobId, retval.hasExited))
#以下是提交多個作業的等待處理,synchronize替代wait
#joblist = s.runBulkJobs(jt, 1, 30, 2)
#print('Your jobs have been submitted with IDs %s' % joblist)
#s.synchronize(joblist, drmaa.Session.TIMEOUT_WAIT_FOREVER, True)
print('Cleaning up')
s.deleteJobTemplate(jt)
if __name__=='__main__':
main()
wait()返回一個JobInfo元組,其具有下面的屬性: jobId,hasExited,hasSignal,terminatedSignal,hasCoreDump, wasAborted,exitStatus,resourceUsage
synchronize()的第3個引數是該synchronize()的呼叫是否在工作後清除。工作完成後,它會留下一些統計資訊,如退出狀態和用途,直到wait() 或synchronize()的處理狀態變為True。確保每一項任務對這兩個函式之一呼叫是很有必要的,否則可能引起記憶體洩漏。如果想要每一項任務恢復統計資訊,可將synchronize()設定False。如下:
joblist = s.runBulkJobs(jt, 1, 30, 2)
print('Your jobs have been submitted with IDs %s' % joblist)
s.synchronize(joblist, drmaa.Session.TIMEOUT_WAIT_FOREVER, False) #False,每一項工作等待一次
for curjob in joblist:
print('Collecting job ' + curjob)
retval = s.wait(curjob, drmaa.Session.TIMEOUT_WAIT_FOREVER)
print('Job: {0} finished with status {1}'.format(retval.jobId,retval.hasExited))
3.4 控制工作
#!/usr/bin/env python
import drmaa
import os
def main():
"""Submit a job, then kill it.
Note, need file called sleeper.sh in home directory.
"""
with drmaa.Session() as s:
print('Creating job template')
jt = s.createJobTemplate()
jt.remoteCommand = os.path.join(os.getcwd(), 'sleeper.sh')
jt.args = ['42', 'Simon says:']
jt.joinFiles = True
jobid = s.runJob(jt)
print('Your job has been submitted with ID %s' % jobid)
# options are: SUSPEND, RESUME, HOLD, RELEASE, TERMINATE
s.control(jobid, drmaa.JobControlAction.TERMINATE) #刪除剛提交的作業
print('Cleaning up')
s.deleteJobTemplate(jt)
if __name__=='__main__':
main()
還可以用control()來暫停,恢復,保留或釋放工作。control()還可用於控制未通過DRMAA提交的作業,可以將任何有效的SGE作業ID傳遞control()為要刪除的作業ID。
3.5 查詢工作狀態
#!/usr/bin/env python
import drmaa
import time
import os
def main():
"""
Submit a job, and check its progress.
Note, need file called sleeper.sh in home directory.
"""
with drmaa.Session() as s:
print('Creating job template')
jt = s.createJobTemplate()
jt.remoteCommand = os.path.join(os.getcwd(), 'sleeper.sh')
jt.args = ['42', 'Simon says:']
jt.joinFiles=True
jobid = s.runJob(jt)
print('Your job has been submitted with ID %s' % jobid)
# Who needs a case statement when you have dictionaries?
decodestatus = {drmaa.JobState.UNDETERMINED: 'process status cannot be determined',
drmaa.JobState.QUEUED_ACTIVE: 'job is queued and active',
drmaa.JobState.SYSTEM_ON_HOLD: 'job is queued and in system hold',
drmaa.JobState.USER_ON_HOLD: 'job is queued and in user hold',
drmaa.JobState.USER_SYSTEM_ON_HOLD: 'job is queued and in user and system hold',
drmaa.JobState.RUNNING: 'job is running',
drmaa.JobState.SYSTEM_SUSPENDED: 'job is system suspended',
drmaa.JobState.USER_SUSPENDED: 'job is user suspended',
drmaa.JobState.DONE: 'job finished normally',
drmaa.JobState.FAILED: 'job finished, but failed'}
for ix in range(10):
print('Checking %s of 10 times' % ix)
print decodestatus(s.jobStatus(jobid)) #jobStatus()獲取作業的狀態
time.sleep(5)
print('Cleaning up')
s.deleteJobTemplate(jt)
if __name__=='__main__':
main() #確定工作狀態並報告
其他更多關於JobInfo,JobTemplate,Session等方法的屬性可參考:https://drmaa-python.readthedocs.io/en/latest/drmaa.html
4. 應用
寫一個示例應用。
#!/usr/bin/env python
import drmaa
import os
class SGE():
def __init__(self):
self.__sgeProject="Test"
self.__sgeQueue="test.q"
self.__maxvmen="1G"
self.__proc="1"
self.__script=""
self.__workdir=""
self.__session=""
def setSgeProject(self, p):
self.__sgeProject=p
def getSgeProject(self):
return self.__sgeProject
def setSgeQueue(self, q):
self.__sgeQueue=q
def getSgeQueue(self):
return self.__sgeQueue
def setMaxvmem(self, m):
self.__maxvmem=m
def setNumproc(self, proc):
self.__proc=proc
def getMaxvmem(self):
return self.__maxvmem
def setScript(self, s):
self.__script=s
def getScript(self):
return self.__script
def setWorkDir(self, w):
self.__workdir=w
def getWorkDir(self):
return self.__workdir
def setSession(self, ss):
self.__session=ss
def getSession(self):
return self.__session
def submit(self):
st=os.stat(self.__script) #系統 stat 的呼叫,返回stat結構
os.chmod(self.__script, st.st_mode | stat.S_IEXEC | stat.S_IXGRP) #S_IEXEC是S_IXUSR同義詞,所有者具有執行許可權;S_IXGRP,組具有執行許可權
jt = self.__session.createJobTemplate() ##分配工作模板
jt.remoteCommand = self.__script #remoteCommand屬性找到要執行的指令碼
jt.workingDirectory = self.__workdir #設定當前工作目錄
par4qsub="".join(["-binding linear:",self.__proc," -P ",self.__sgeProject," -q ",self.__sgeQueue," -cwd -l ","vf=",self.__maxvmem," -l p=",self.__proc])
print('qsub {0} {1}'.format(par4qsub,self.__script))
jt.nativeSpecification = par4qsub #傳遞給jt的指令
jobid =self.__session.runJob(jt) #將分配給作業的ID傳遞給的字元陣列
self.__session.deleteJobTemplate(jt)
return jobid
def main():
with drmaa.Session() as s:
sgeObj = SGE()
sgeObj.setSession(session)
sgeObj.setSgeProject("SGEProject")
sgeObj.setSgeQueue("SGEQueue")
dict_qsub_id={}
joblist=[]
cwdir=os.path.join(getcwd())
sgeObj.setWorkDir(cwdir)
sgeObj.setScript(os.path.join(cwdir,"test.sh"))
sgeObj.setMaxvmem("Memory")
sgeObj.setNumproc("1")
jobid=sgeObj.submit()
dict_qsub_id[jobid]=os.path.join(cwdir,"test.sh")
joblist.append(jobid)
s.synchronize(joblist, drmaa.Session.TIMEOUT_WAIT_FOREVER, False) #設為false
for curjob in joblist:
retval = session.wait(curjob, drmaa.Session.TIMEOUT_WAIT_FOREVER)
print('Job: {0} finished with status {1}'.format(retval.jobId,retval.hasExited))
if __name__=="__main__":
main()
Ref:https://drmaa-python.readthedocs.io/en/latest/tutorials.html#starting-and-stopping-a-session